Skip to content

Commit 6d97ff6

Browse files
authored
Add 'export' feature. (#18525)
1 parent bca683a commit 6d97ff6

File tree

2 files changed

+85
-8
lines changed

2 files changed

+85
-8
lines changed

ydb/apps/etcd_proxy/proxy.cpp

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,23 @@ int TProxy::Run() {
108108
if (const auto res = Init()) {
109109
return res;
110110
}
111-
if (Initialize_) {
112-
if (const auto res = InitDatabase()) {
111+
if (!ExportTo_.empty()) {
112+
if (const auto res = ExportDatabase()) {
113113
return res;
114114
}
115-
}
116-
if (!ImportPrefix_.empty()) {
117-
if (const auto res = ImportDatabase()) {
118-
return res;
115+
} else {
116+
if (Initialize_) {
117+
if (const auto res = InitDatabase()) {
118+
return res;
119+
}
120+
}
121+
if (!ImportPrefix_.empty()) {
122+
if (const auto res = ImportDatabase()) {
123+
return res;
124+
}
119125
}
120126
}
121-
if (!Initialize_ && ImportPrefix_.empty()) {
127+
if (!Initialize_ && ImportPrefix_.empty() && ExportTo_.empty()) {
122128
if (const auto res = StartServer()) {
123129
return res;
124130
}
@@ -225,6 +231,75 @@ int TProxy::ImportDatabase() {
225231
return 0;
226232
}
227233

234+
int TProxy::ExportDatabase() {
235+
auto credentials = grpc::InsecureChannelCredentials();
236+
if (!Root.empty() || !Cert.empty() || !Key.empty()) {
237+
const grpc::SslCredentialsOptions opts {
238+
.pem_root_certs = TFileInput(Root).ReadAll(),
239+
.pem_private_key = TFileInput(Key).ReadAll(),
240+
.pem_cert_chain = TFileInput(Cert).ReadAll()
241+
};
242+
credentials = grpc::SslCredentials(opts);
243+
}
244+
245+
const auto channel = grpc::CreateChannel(TString(ExportTo_), credentials);
246+
const std::unique_ptr<etcdserverpb::KV::Stub> kv = etcdserverpb::KV::NewStub(channel);
247+
248+
NYdb::TDriverConfig config;
249+
config.SetEndpoint(Endpoint);
250+
config.SetDatabase(Database);
251+
if (!Token.empty())
252+
config.SetAuthToken(Token);
253+
if (!CA.empty())
254+
config.UseSecureConnection(TFileInput(CA).ReadAll());
255+
256+
const auto driver = NYdb::TDriver(config);
257+
auto client = NYdb::NTable::TTableClient(driver);
258+
auto count = 0ULL;
259+
if (const auto res = client.CreateSession().ExtractValueSync(); res.IsSuccess()) {
260+
NYdb::NTable::TReadTableSettings settings;
261+
settings.BatchLimitRows(97ULL).AppendColumns("key").AppendColumns("value").AppendColumns("lease");
262+
if (auto it = res.GetSession().ReadTable(Database + Folder + "/current", settings).ExtractValueSync(); it.IsSuccess()) {
263+
for (;;) {
264+
auto streamPart = it.ReadNext().ExtractValueSync();
265+
if (streamPart.EOS())
266+
break;
267+
268+
if (!streamPart.IsSuccess()) {
269+
std::cout << streamPart.GetIssues().ToString() << std::endl;
270+
return 1;
271+
}
272+
273+
etcdserverpb::TxnRequest txnRequest;
274+
for (auto parser = NYdb::TResultSetParser(streamPart.ExtractPart()); parser.TryNextRow();) {
275+
if (const auto lease = NYdb::TValueParser(parser.GetValue("lease")).GetOptionalInt64(); lease && !*lease) {
276+
++count;
277+
const auto put = txnRequest.add_success()->mutable_request_put();
278+
put->set_key(*NYdb::TValueParser(parser.GetValue("key")).GetOptionalString());
279+
put->set_value(*NYdb::TValueParser(parser.GetValue("value")).GetOptionalString());
280+
}
281+
}
282+
283+
grpc::ClientContext txnCtx;
284+
etcdserverpb::TxnResponse txnResponse;
285+
if (const auto& status = kv->Txn(&txnCtx, txnRequest, &txnResponse); !status.ok()) {
286+
std::cout << status.error_message() << std::endl;
287+
return 1;
288+
}
289+
}
290+
} else {
291+
std::cout << it.GetIssues().ToString() << std::endl;
292+
return 1;
293+
}
294+
} else {
295+
std::cout << res.GetIssues().ToString() << std::endl;
296+
return 1;
297+
}
298+
299+
std::cout << count << " keys exported successfully." << std::endl;
300+
return 0;
301+
}
302+
228303
int TProxy::Shutdown() {
229304
if (GRpcServer)
230305
GRpcServer->Stop();
@@ -252,6 +327,7 @@ TProxy::TProxy(int argc, char** argv)
252327

253328
opts.AddLongOption("import-from", "Import existing data from etcd base").RequiredArgument("ENDPOINT").DefaultValue("localhost:2379").StoreResult(&ImportFrom_);
254329
opts.AddLongOption("import-prefix", "Prefix of data to import").RequiredArgument("PREFIX").StoreResult(&ImportPrefix_);
330+
opts.AddLongOption("export-to", "Export existing data to etcd from ydb").RequiredArgument("ENDPOINT").StoreResult(&ExportTo_);
255331

256332
opts.AddLongOption("ca", "SSL CA certificate file").Optional().RequiredArgument("CA").StoreResult(&Root);
257333
opts.AddLongOption("cert", "SSL certificate file").Optional().RequiredArgument("CERT").StoreResult(&Cert);

ydb/apps/etcd_proxy/proxy.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class TProxy {
3333
int Discovery();
3434
int InitDatabase();
3535
int ImportDatabase();
36+
int ExportDatabase();
3637
int StartServer();
3738

3839
static THolder<NActors::TActorSystemSetup> BuildActorSystemSetup();
@@ -48,7 +49,7 @@ class TProxy {
4849
std::string Database, Endpoint, Token, CA, Folder;
4950
ui16 ListeningPort = 2379U;
5051
std::string Root, Cert, Key;
51-
std::string ImportFrom_, ImportPrefix_;
52+
std::string ImportFrom_, ImportPrefix_, ExportTo_;
5253
};
5354

5455
}

0 commit comments

Comments
 (0)