Skip to content

Commit 6ca983e

Browse files
authored
Mantain the revision on the server side. (#18773)
1 parent 36dc938 commit 6ca983e

20 files changed

+895
-353
lines changed

ydb/apps/etcd_proxy/proxy.cpp

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/discovery/discovery.h>
1212
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
1313
#include <ydb/apps/etcd_proxy/service/etcd_base_init.h>
14+
#include <ydb/apps/etcd_proxy/service/etcd_gate.h>
15+
#include <ydb/apps/etcd_proxy/service/etcd_lease.h>
1416
#include <ydb/apps/etcd_proxy/service/etcd_watch.h>
1517
#include <ydb/apps/etcd_proxy/service/etcd_grpc.h>
1618
#include <ydb/core/grpc_services/base/base.h>
@@ -61,25 +63,6 @@ int TProxy::Discovery() {
6163
}
6264

6365
int TProxy::StartServer() {
64-
if (const auto res = Stuff->Client->ExecuteQuery(NEtcd::GetLastRevisionSQL(Stuff->TablePrefix), NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); res.IsSuccess()) {
65-
if (auto result = res.GetResultSetParser(0); result.TryNextRow()) {
66-
Stuff->Revision.store(NYdb::TValueParser(result.GetValue(0)).GetInt64());
67-
} else {
68-
std::cout << "Unexpected result of get last revision." << std::endl;
69-
return 1;
70-
}
71-
if (auto result = res.GetResultSetParser(1); result.TryNextRow()) {
72-
Stuff->Lease.store(NYdb::TValueParser(result.GetValue(0)).GetInt64());
73-
} else {
74-
std::cout << "Unexpected result of get last lease." << std::endl;
75-
return 1;
76-
}
77-
std::cout << "The last revision is " << Stuff->Revision.load() << ", the last lease is " << Stuff->Lease.load() << '.' << std::endl;
78-
} else {
79-
std::cout << res.GetIssues().ToString() << std::endl;
80-
return 1;
81-
}
82-
8366
NYdbGrpc::TServerOptions opts;
8467
opts.SetPort(ListeningPort);
8568

@@ -94,11 +77,13 @@ int TProxy::StartServer() {
9477
}
9578

9679
const auto watchtower = ActorSystem->Register(NEtcd::BuildWatchtower(Counters, Stuff));
80+
const auto holderhouse = ActorSystem->Register(NEtcd::BuildHolderHouse(Counters, Stuff));
81+
ActorSystem->Register(NEtcd::BuildMainGate(Counters, Stuff));
9782

9883
GRpcServer = std::make_unique<NYdbGrpc::TGRpcServer>(opts, Counters);
99-
GRpcServer->AddService(new NEtcd::TEtcdKVService(ActorSystem.get(), Counters, watchtower, Stuff));
84+
GRpcServer->AddService(new NEtcd::TEtcdKVService(ActorSystem.get(), Counters, {}, Stuff));
10085
GRpcServer->AddService(new NEtcd::TEtcdWatchService(ActorSystem.get(), Counters, watchtower, Stuff));
101-
GRpcServer->AddService(new NEtcd::TEtcdLeaseService(ActorSystem.get(), Counters, watchtower, Stuff));
86+
GRpcServer->AddService(new NEtcd::TEtcdLeaseService(ActorSystem.get(), Counters, holderhouse, Stuff));
10287
GRpcServer->Start();
10388
std::cout << "Etcd service over " << Database << " on " << Endpoint << " was started." << std::endl;
10489
return 0;
@@ -135,13 +120,18 @@ int TProxy::Run() {
135120
}
136121

137122
int TProxy::InitDatabase() {
138-
if (const auto res = Stuff->Client->ExecuteQuery(NEtcd::GetCreateTablesSQL(Stuff->TablePrefix), NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); res.IsSuccess()) {
139-
std::cout << "Database " << Database << " on " << Endpoint << " was initialized." << std::endl;
140-
return 0;
141-
} else {
123+
if (const auto res = Stuff->Client->ExecuteQuery(NEtcd::GetCreateTablesSQL(Stuff->TablePrefix), NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); !res.IsSuccess()) {
142124
std::cout << res.GetIssues().ToString() << std::endl;
143125
return 1;
144126
}
127+
if (ImportPrefix_.empty()) {
128+
if (const auto res = Stuff->Client->ExecuteQuery(NEtcd::GetInitializeTablesSQL(Stuff->TablePrefix), NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); !res.IsSuccess()) {
129+
std::cout << res.GetIssues().ToString() << std::endl;
130+
return 1;
131+
}
132+
}
133+
std::cout << "Database " << Database << " on " << Endpoint << " was initialized." << std::endl;
134+
return 0;
145135
}
146136

147137
int TProxy::ImportDatabase() {
@@ -216,13 +206,28 @@ int TProxy::ImportDatabase() {
216206
const auto driver = NYdb::TDriver(config);
217207
auto client = NYdb::NTable::TTableClient(driver);
218208

209+
if (const auto res = Stuff->Client->ExecuteQuery(Stuff->TablePrefix + "ALTER TABLE `current` DROP INDEX `lease`;", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); !res.IsSuccess()) {
210+
std::cout << res.GetIssues().ToString() << std::endl;
211+
return 1;
212+
}
213+
219214
if (const auto res = client.BulkUpsert(Database + Folder + "/current", std::move(value)).ExtractValueSync(); !res.IsSuccess()) {
220215
std::cout << res.GetIssues().ToString() << std::endl;
221216
return 1;
222217
}
223218

219+
if (const auto res = Stuff->Client->ExecuteQuery(Stuff->TablePrefix + "ALTER TABLE `current` ADD INDEX `lease` GLOBAL ON (`lease`);", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); !res.IsSuccess()) {
220+
std::cout << res.GetIssues().ToString() << std::endl;
221+
return 1;
222+
}
223+
224224
const auto& param = NYdb::TParamsBuilder().AddParam("$Prefix").String(ImportPrefix_).Build().Build();
225-
if (const auto res = Stuff->Client->ExecuteQuery("insert into `history` select * from `current` where startswith(`key`,$Prefix);", NYdb::NQuery::TTxControl::NoTx(), param).ExtractValueSync(); !res.IsSuccess()) {
225+
if (const auto res = Stuff->Client->ExecuteQuery(Stuff->TablePrefix + R"(
226+
insert into `history` select * from `current` where startswith(`key`,$Prefix);
227+
insert into `revision` (`stub`,`revision`,`timestamp`) values (true,0L,CurrentUtcTimestamp());
228+
insert into `revision` select false as `stub`, nvl(max(`modified`), 0L) as `revision`, CurrentUtcTimestamp(max(`modified`)) as `timestamp` from `history`;
229+
insert into `commited` select `revision`, `timestamp` from `revision` where not `stub`;
230+
)", NYdb::NQuery::TTxControl::NoTx(), param).ExtractValueSync(); !res.IsSuccess()) {
226231
std::cout << res.GetIssues().ToString() << std::endl;
227232
return 1;
228233
}

ydb/apps/etcd_proxy/readme.txt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
Etcd over YDB. Zero version.
22

33
Main restriction: Can work only in single instance mode.
4-
5-
To remove the restriction two tasks must be done:
6-
- First, get and update a current revision number on YDB side trought a separate request before any logical query.
7-
- Second, use CDC for implement the watches instead of self-notifications.
4+
To remove the restriction we have to use CDC for implement the watches instead of self-notifications.
85

96
And other todo's:
107
- Add merics.

ydb/apps/etcd_proxy/service/create.sql

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
CREATE TABLE revision
2+
(
3+
`stub` Bool NOT NULL,
4+
`revision` Int64 NOT NULL,
5+
`timestamp` Timestamp NOT NULL,
6+
PRIMARY KEY (`stub`)
7+
) WITH (PARTITION_AT_KEYS=(false,true));
8+
9+
CREATE TABLE commited
10+
(
11+
`revision` Int64 NOT NULL,
12+
`timestamp` Timestamp NOT NULL,
13+
PRIMARY KEY (`revision`)
14+
)
15+
WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11);
16+
117
CREATE TABLE current
218
(
319
`key` Bytes NOT NULL,
@@ -8,7 +24,6 @@ CREATE TABLE current
824
`lease` Int64 NOT NULL,
925
PRIMARY KEY (`key`),
1026
INDEX `lease` GLOBAL ON (`lease`)
11-
1227
)
1328
WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11);
1429

@@ -34,4 +49,4 @@ CREATE TABLE leases
3449
)
3550
WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11);
3651

37-
ALTER TABLE history ADD CHANGEFEED changes WITH (format="JSON", mode="UPDATES");
52+
ALTER TABLE `current` ADD CHANGEFEED changes WITH (format="JSON", mode="NEW_AND_OLD_IMAGES");

ydb/apps/etcd_proxy/service/etcd_base_init.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ std::string GetCreateTablesSQL(const std::string& prefix) {
88
return prefix + NResource::Find("create.sql"sv);
99
}
1010

11-
std::string GetLastRevisionSQL(const std::string& prefix) {
12-
return prefix + "select nvl(max(`modified`), 1L) from `history`; select nvl(max(`id`), 1L) from `leases`;";
11+
std::string GetInitializeTablesSQL(const std::string& prefix) {
12+
return prefix + "insert into `revision` (`stub`,`revision`,`timestamp`) values (false,0L,CurrentUtcTimestamp()),(true,0L,CurrentUtcTimestamp());";
1313
}
1414

1515
}

ydb/apps/etcd_proxy/service/etcd_base_init.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace NEtcd {
66

77
std::string GetCreateTablesSQL(const std::string& prefix);
88

9-
std::string GetLastRevisionSQL(const std::string& prefix);
9+
std::string GetInitializeTablesSQL(const std::string& prefix);
1010

1111
}
1212

ydb/apps/etcd_proxy/service/etcd_events.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ enum Ev : ui32 {
3838
Changes,
3939
Cancel,
4040

41+
RequestRevision,
42+
ReturnRevision,
43+
4144
End
4245
};
4346

@@ -111,6 +114,25 @@ class TEtcdRequestStreamWrapper
111114
const TIntrusivePtr<IStreamCtx> Ctx_;
112115
};
113116

117+
using TKeysSet = std::set<std::pair<std::string, std::string>>;
118+
119+
struct TEvRequestRevision : public NActors::TEventLocal<TEvRequestRevision, Ev::RequestRevision> {
120+
explicit TEvRequestRevision(TKeysSet&& keysSet = {})
121+
: KeysSet(std::move(keysSet))
122+
{}
123+
124+
TKeysSet KeysSet;
125+
};
126+
127+
using TGuard = std::shared_ptr<void>;
128+
129+
struct TEvReturnRevision : public NActors::TEventLocal<TEvReturnRevision, Ev::ReturnRevision> {
130+
explicit TEvReturnRevision(const i64 revision, const TGuard& guard = {}) : Revision(revision), Guard(std::move(guard)) {}
131+
132+
const i64 Revision;
133+
const TGuard Guard;
134+
};
135+
114136
using TEvWatchRequest = TEtcdRequestStreamWrapper<Ev::Watch, etcdserverpb::WatchRequest, etcdserverpb::WatchResponse>;
115137
using TEvLeaseKeepAliveRequest = TEtcdRequestStreamWrapper<Ev::LeaseKeepAlive, etcdserverpb::LeaseKeepAliveRequest, etcdserverpb::LeaseKeepAliveResponse>;
116138

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
#include "etcd_gate.h"
2+
#include "etcd_shared.h"
3+
#include "etcd_events.h"
4+
#include "etcd_impl.h"
5+
6+
#include <vector>
7+
#include <ydb/library/actors/core/actor_bootstrapped.h>
8+
#include <ydb/library/actors/core/executor_thread.h>
9+
10+
namespace NEtcd {
11+
12+
using namespace NActors;
13+
using namespace NYdb::NQuery;
14+
15+
namespace {
16+
17+
class TMainGate : public TActorBootstrapped<TMainGate> {
18+
public:
19+
TMainGate(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TSharedStuff::TPtr stuff)
20+
: Counters(std::move(counters)), Stuff(std::move(stuff)), Query(Stuff->TablePrefix + NResource::Find("revision.sql"sv))
21+
{}
22+
23+
void Bootstrap(const TActorContext&) {
24+
Become(&TThis::StateFunc);
25+
Stuff->MainGate = SelfId();
26+
}
27+
private:
28+
using TActorsList = std::vector<TActorId>;
29+
std::deque<std::tuple<TKeysSet, TActorsList>> Queue;
30+
31+
STFUNC(StateFunc) {
32+
switch (ev->GetTypeRewrite()) {
33+
hFunc(TEvRequestRevision, Handle);
34+
hFunc(TEvReturnRevision, Handle);
35+
36+
HFunc(TEvQueryResult, Handle);
37+
HFunc(TEvQueryError, Handle);
38+
}
39+
}
40+
41+
static bool HasIntersection(const TKeysSet& lhs, const TKeysSet& rhs) {
42+
for (auto i = lhs.cbegin(), j = rhs.cbegin(); lhs.cend() != i && rhs.cend() != j;) {
43+
if (*i < *j) {
44+
if (!i->second.empty() && (Endless == i->second || i->second >= j->first))
45+
return true;
46+
else
47+
i = lhs.lower_bound(*j);
48+
} else if (*i > *j) {
49+
if (!j->second.empty() && (Endless == j->second || j->second >= i->first))
50+
return true;
51+
else
52+
j = rhs.lower_bound(*i);
53+
} else
54+
return true;
55+
}
56+
57+
return false;
58+
}
59+
60+
void Handle(TEvRequestRevision::TPtr &ev) {
61+
if (Queue.empty()) {
62+
Queue.emplace_back(std::move(ev->Get()->KeysSet), TActorsList(1U, ev->Sender));
63+
RequestNextRevision();
64+
} else {
65+
if (!ev->Get()->KeysSet.empty()) {
66+
for (auto it = Queue.begin(); Queue.end() > it; ++it) {
67+
if (auto& keys = std::get<TKeysSet>(*it); !keys.empty() && !HasIntersection(ev->Get()->KeysSet, keys)) {
68+
keys.merge(std::move(ev->Get()->KeysSet));
69+
std::get<TActorsList>(*it).emplace_back(ev->Sender);
70+
return;
71+
}
72+
}
73+
}
74+
Queue.emplace_back(std::move(ev->Get()->KeysSet), TActorsList(1U, ev->Sender));
75+
}
76+
}
77+
78+
void Handle(TEvQueryResult::TPtr &ev, const TActorContext& ctx) {
79+
i64 revision = 0LL;
80+
if (auto parser = NYdb::TResultSetParser(ev->Get()->Results.front()); parser.TryNextRow()) {
81+
revision = NYdb::TValueParser(parser.GetValue(0)).GetInt64();
82+
}
83+
84+
const auto current = std::get<TActorsList>(std::move(Queue.front()));
85+
Queue.pop_front();
86+
87+
const auto guard = current.size() > 1U ? std::shared_ptr<TMainGate>(this, [revision, my = this->SelfId(), stuff = TSharedStuff::TWeakPtr(Stuff)](void*) {
88+
if (const auto lock = stuff.lock())
89+
lock->ActorSystem->Send(my, new TEvReturnRevision(revision));
90+
}) : std::shared_ptr<TMainGate>() ;
91+
92+
for (const auto& actor : current)
93+
ctx.Send(actor, new TEvReturnRevision(revision, guard));
94+
RequestNextRevision();
95+
}
96+
97+
void Handle(TEvQueryError::TPtr &ev, const TActorContext& ctx) {
98+
std::cout << "Request revision SQL error received " << ev->Get()->Issues.ToString() << std::endl;
99+
const auto current = std::get<TActorsList>(std::move(Queue.front()));
100+
Queue.pop_front();
101+
for (const auto& actor : current)
102+
ctx.Send(actor, new TEvQueryError(ev->Get()->Issues));
103+
RequestNextRevision();
104+
}
105+
106+
void RequestNextRevision() {
107+
if (Queue.empty())
108+
return;
109+
110+
TQueryClient::TQueryResultFunc callback = [query = Query](TQueryClient::TSession session) -> TAsyncExecuteQueryResult {
111+
return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx());
112+
};
113+
114+
Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my = this->SelfId(), stuff = TSharedStuff::TWeakPtr(Stuff)](const auto& future) {
115+
if (const auto lock = stuff.lock()) {
116+
if (const auto res = future.GetValueSync(); res.IsSuccess())
117+
lock->ActorSystem->Send(my, new TEvQueryResult(res.GetResultSets()));
118+
else
119+
lock->ActorSystem->Send(my, new TEvQueryError(res.GetIssues()));
120+
}
121+
});
122+
}
123+
124+
void Handle(TEvReturnRevision::TPtr &ev) {
125+
NYdb::TParamsBuilder params;
126+
const auto& revisionParamName = AddParam("Revision", params, ev->Get()->Revision);
127+
128+
std::ostringstream sql;
129+
sql << Stuff->TablePrefix;
130+
sql << "insert into `commited` (`revision`,`timestamp`) values (" << revisionParamName << ",CurrentUtcTimestamp());" << std::endl;
131+
// std::cout << std::endl << sql.view() << std::endl;
132+
133+
TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult {
134+
return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), args);
135+
};
136+
137+
const auto rev = ev->Get()->Revision;
138+
Stuff->UpdateRevision(rev);
139+
Stuff->Client->RetryQuery(std::move(callback)).Subscribe([rev](const auto& future) {
140+
if (const auto res = future.GetValue(); res.IsSuccess())
141+
std::cout << "Revision " << rev << " commited succesfully." << std::endl;
142+
else
143+
std::cout << "Revision " << rev << " commit finished with errors: " << res.GetIssues().ToString() << std::endl;
144+
});
145+
}
146+
147+
const TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
148+
const TSharedStuff::TPtr Stuff;
149+
const std::string Query;
150+
};
151+
152+
}
153+
154+
NActors::IActor* BuildMainGate(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TSharedStuff::TPtr stuff) {
155+
return new TMainGate(std::move(counters), std::move(stuff));
156+
157+
}
158+
159+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#pragma once
2+
3+
#include "etcd_shared.h"
4+
#include <ydb/library/actors/core/actor.h>
5+
#include <library/cpp/monlib/dynamic_counters/counters.h>
6+
7+
namespace NEtcd {
8+
9+
NActors::IActor* BuildMainGate(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TSharedStuff::TPtr stuff);
10+
11+
}

0 commit comments

Comments
 (0)