Skip to content

Commit a97c3d0

Browse files
authored
Return back table for current slice. (#17352)
1 parent f309eb3 commit a97c3d0

File tree

6 files changed

+53
-35
lines changed

6 files changed

+53
-35
lines changed

ydb/apps/etcd_proxy/proxy.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,13 @@ int TProxy::ImportDatabase() {
210210
const auto driver = NYdb::TDriver(config);
211211
auto client = NYdb::NTable::TTableClient(driver);
212212

213-
if (const auto res = client.BulkUpsert(Database + Folder + "/content", std::move(value)).ExtractValueSync(); !res.IsSuccess()) {
213+
if (const auto res = client.BulkUpsert(Database + Folder + "/current", std::move(value)).ExtractValueSync(); !res.IsSuccess()) {
214+
std::cout << res.GetIssues().ToString() << std::endl;
215+
return 1;
216+
}
217+
218+
const auto& param = NYdb::TParamsBuilder().AddParam("$Prefix").String(ImportPrefix_).Build().Build();
219+
if (const auto res = Stuff->Client->ExecuteQuery("insert into `history` select * from `current` where startswith(`key`,$Prefix);", NYdb::NQuery::TTxControl::NoTx(), param).ExtractValueSync(); !res.IsSuccess()) {
214220
std::cout << res.GetIssues().ToString() << std::endl;
215221
return 1;
216222
}
Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,16 @@
1-
CREATE TABLE content
1+
CREATE TABLE current
2+
(
3+
`key` Bytes NOT NULL,
4+
`created` Int64 NOT NULL,
5+
`modified` Int64 NOT NULL,
6+
`version` Int64 NOT NULL,
7+
`value` Bytes NOT NULL,
8+
`lease` Int64 NOT NULL,
9+
PRIMARY KEY (`key`)
10+
)
11+
WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11);
12+
13+
CREATE TABLE history
214
(
315
`key` Bytes NOT NULL,
416
`created` Int64 NOT NULL,
@@ -7,7 +19,8 @@ CREATE TABLE content
719
`value` Bytes NOT NULL,
820
`lease` Int64 NOT NULL,
921
PRIMARY KEY (`key`, `modified`)
10-
);
22+
)
23+
WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 101);
1124

1225
CREATE TABLE leases
1326
(
@@ -16,6 +29,7 @@ CREATE TABLE leases
1629
`created` Datetime NOT NULL,
1730
`updated` Datetime NOT NULL,
1831
PRIMARY KEY (`id`)
19-
);
32+
)
33+
WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11);
2034

21-
ALTER TABLE content ADD CHANGEFEED changes WITH (format="JSON", mode="UPDATES");
35+
ALTER TABLE history ADD CHANGEFEED changes WITH (format="JSON", mode="UPDATES");

ydb/apps/etcd_proxy/service/etcd_base_init.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ std::string GetCreateTablesSQL(const std::string& prefix) {
99
}
1010

1111
std::string GetLastRevisionSQL(const std::string& prefix) {
12-
return prefix + "select nvl(max(`modified`), 1L) from `content`; select nvl(max(`id`), 1L) from `leases`;";
12+
return prefix + "select nvl(max(`modified`), 1L) from `history`; select nvl(max(`id`), 1L) from `leases`;";
1313
}
1414

1515
}

ydb/apps/etcd_proxy/service/etcd_impl.cpp

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,13 @@ struct TOperation {
3535
};
3636

3737
void MakeSlice(const std::string_view& where, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr, const i64 revision = 0LL) {
38-
sql << "select * from (select max_by(TableRow(), `modified`) from `content`" << where;
39-
if (revision)
38+
if (revision) {
39+
sql << "select * from (select max_by(TableRow(), `modified`) from `history`" << where;
4040
sql << " and " << AddParam("Rev", params, revision, paramsCounter) << " >= `modified`";
41-
sql << " group by `key`) flatten columns where 0L < `version`";
41+
sql << " group by `key`) flatten columns where 0L < `version`";
42+
} else {
43+
sql << "select * from `current`" << where;
44+
}
4245
}
4346

4447
void MakeSlice(const std::string_view& key, const std::string_view& rangeEnd, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter, const i64 revision = 0LL) {
@@ -252,7 +255,7 @@ struct TPut : public TOperation {
252255
const auto& oldResultSetName = GetNameWithIndex("Old", resultsCounter);
253256
const auto& newResultSetName = GetNameWithIndex("New", resultsCounter);
254257

255-
sql << oldResultSetName << " = select * from (select * from `content`" << keyFilter << " order by `modified` desc limit 1UL) where 0L < `version`;" << std::endl;
258+
sql << oldResultSetName << " = select * from `current` " << keyFilter << ';' << std::endl;
256259
sql << newResultSetName << " = select" << std::endl;
257260
sql << '\t' << keyParamName << " as `key`," << std::endl;
258261
sql << '\t' << "if(`version` > 0L, `created`, $Revision) as `created`," << std::endl;
@@ -274,7 +277,8 @@ struct TPut : public TOperation {
274277
sql << " where " << txnFilter << ')';
275278
sql << ';' << std::endl;
276279

277-
sql << "insert into `content` select * from " << newResultSetName << ';' << std::endl;
280+
sql << (update ? "update `current` on" : "upsert into `current`") << " select * from " << newResultSetName << ';' << std::endl;
281+
sql << "insert into `history` select * from " << newResultSetName << ';' << std::endl;
278282

279283
if (GetPrevious || NotifyWatchtower || update) {
280284
if (resultsCounter)
@@ -376,7 +380,7 @@ struct TDeleteRange : public TOperation {
376380
sql << " where " << txnFilter;
377381
sql << ';' << std::endl;
378382

379-
sql << "insert into `content`" << std::endl;
383+
sql << "insert into `history`" << std::endl;
380384
sql << "select `key`, `created`, $Revision as `modified`, 0L as `version`, `value`, `lease` from " << oldResultSetName << ';' << std::endl;
381385

382386
sql << "select count(*) from " << oldResultSetName << ';' << std::endl;
@@ -385,6 +389,10 @@ struct TDeleteRange : public TOperation {
385389
++(*resultsCounter);
386390
sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from " << oldResultSetName << ';' << std::endl;
387391
}
392+
sql << "delete from `current`" << keyFilter;
393+
if (!txnFilter.empty())
394+
sql << " and " << txnFilter;
395+
sql << ';' << std::endl;
388396
}
389397

390398
void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr, size_t* resultsCounter = nullptr, const std::string_view& txnFilter = {}) {
@@ -1074,12 +1082,12 @@ class TCompactRequest
10741082
}
10751083

10761084
void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params) final {
1077-
sql << "$Trash = select c.key as key, c.modified as modified from `content` as c inner join (" << std::endl;
1078-
sql << "select max_by((`key`, `modified`), `modified`) as pair from `content`" << std::endl;
1085+
sql << "$Trash = select c.key as key, c.modified as modified from `history` as c inner join (" << std::endl;
1086+
sql << "select max_by((`key`, `modified`), `modified`) as pair from `history`" << std::endl;
10791087
sql << "where `modified` < " << AddParam("Revision", params, KeyRevision) << " and 0L = `version` group by `key`" << std::endl;
10801088
sql << ") as keys on keys.pair.0 = c.key where c.modified <= keys.pair.1;" << std::endl;
10811089
sql << "select count(*) from $Trash;" << std::endl;
1082-
sql << "delete from `content` on select * from $Trash;" << std::endl;
1090+
sql << "delete from `history` on select * from $Trash;" << std::endl;
10831091
}
10841092

10851093
void ReplyWith(const NYdb::TResultSets& results, const TActorContext& ctx) final {
@@ -1163,16 +1171,15 @@ class TLeaseRevokeRequest
11631171

11641172
sql << "select count(*) > 0UL from `leases` where " << leaseParamName << " = `id`;" << std::endl;
11651173

1166-
sql << "$Victims = ";
1167-
MakeSimpleSlice(sql, params);
1168-
sql << " and " << leaseParamName << " = `lease`;" << std::endl;
1169-
1170-
sql << "insert into `content`" << std::endl;
1171-
sql << "select `key`, `created`, " << revisionParamName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl;
1174+
sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from `current` where " << leaseParamName << " = `lease`;" << std::endl;
11721175

11731176
if constexpr (NotifyWatchtower) {
11741177
sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from $Victims;" << std::endl;
11751178
}
1179+
1180+
sql << "insert into `history`" << std::endl;
1181+
sql << "select `key`, `created`, " << revisionParamName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl;
1182+
sql << "delete from `current` on select `key` from $Victims;" << std::endl;
11761183
sql << "delete from `leases` where " << leaseParamName << " = `id`;" << std::endl;
11771184
}
11781185

@@ -1237,9 +1244,7 @@ class TLeaseTimeToLiveRequest
12371244

12381245
sql << "select `ttl`, `ttl` - unwrap(cast(CurrentUtcDatetime(`id`) - `updated` as Int64) / 1000000L) as `granted` from `leases` where " << leaseParamName << " = `id`;" << std::endl;
12391246
if (Keys) {
1240-
sql << "select `key` from (";
1241-
MakeSimpleSlice(sql, params);
1242-
sql << " and " << leaseParamName << " = `lease`);" << std::endl;
1247+
sql << "select `key` from `current` where " << leaseParamName << " = `lease`;" << std::endl;
12431248
}
12441249
}
12451250

@@ -1343,10 +1348,6 @@ std::string MakeSimplePredicate(const std::string_view& key, const std::string_v
13431348
return keyParamName;
13441349
}
13451350

1346-
void MakeSimpleSlice(std::ostream& sql, NYdb::TParamsBuilder& params) {
1347-
MakeSlice(std::string_view(), sql, params);
1348-
}
1349-
13501351
NActors::IActor* MakeRange(std::unique_ptr<NKikimr::NGRpcService::IRequestCtx> p, TSharedStuff::TPtr stuff) {
13511352
return new TRangeRequest(std::move(p), std::move(stuff));
13521353
}

ydb/apps/etcd_proxy/service/etcd_impl.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,5 @@ namespace NEtcd {
2020
template<typename TValueType>
2121
std::string AddParam(const std::string_view& name, NYdb::TParamsBuilder& params, const TValueType& value, size_t* counter = nullptr);
2222

23-
void MakeSimpleSlice(std::ostream& sql, NYdb::TParamsBuilder& params);
24-
2523
std::string MakeSimplePredicate(const std::string_view& key, const std::string_view& rangeEnd, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr);
2624
}

ydb/apps/etcd_proxy/service/etcd_watch.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,9 @@ class TWatch : public TActorBootstrapped<TWatch> {
172172
std::ostringstream sql;
173173
sql << Stuff->TablePrefix;
174174
if (WithPrevious) {
175-
sql << "select * from (select max_by(TableRow(), `modified`) from `content` where " << revName << " > `modified` and " << where.view() << " group by `key`) flatten columns union all" << std::endl;
175+
sql << "select * from (select max_by(TableRow(), `modified`) from `history` where " << revName << " > `modified` and " << where.view() << " group by `key`) flatten columns union all" << std::endl;
176176
}
177-
sql << "select * from `content` where " << revName << " <= `modified` and " << where.view() << " order by `modified` asc;" << std::endl;
177+
sql << "select * from `history` where " << revName << " <= `modified` and " << where.view() << " order by `modified` asc;" << std::endl;
178178
// std::cout << std::endl << sql.view() << std::endl;
179179

180180
TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult {
@@ -600,12 +600,11 @@ class TWatchtower : public TActorBootstrapped<TWatchtower> {
600600
const auto& revName = AddParam("Revision", params, Revision);
601601

602602
sql << "$Leases = select 0L as `lease` union all select `id` as `lease` from `leases` where unwrap(interval('PT1S') * `ttl` + `updated`) > CurrentUtcDatetime(`id`);" << std::endl;
603-
sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from (";
604-
MakeSimpleSlice(sql, params);
605-
sql << ") as h left only join $Leases as l using(`lease`);" << std::endl;
603+
sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from `current` as h left only join $Leases as l using(`lease`);" << std::endl;
606604

607605
sql << "insert into `content`" << std::endl;
608606
sql << "select `key`, `created`, " << revName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl;
607+
sql << "delete from `current` on select `key` from $Victims;" << std::endl;
609608

610609
if constexpr (NotifyWatchtower) {
611610
sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from $Victims;" << std::endl;

0 commit comments

Comments
 (0)