9
9
#include < ydb/public/lib/ydb_cli/common/retry_func.h>
10
10
#include < ydb/public/lib/ydb_cli/dump/files/files.h>
11
11
#include < ydb/public/lib/ydb_cli/dump/util/log.h>
12
+ #include < ydb/public/lib/ydb_cli/dump/util/rewrite_query.h>
12
13
#include < ydb/public/lib/ydb_cli/dump/util/util.h>
13
14
#include < ydb/public/lib/ydb_cli/dump/util/view_utils.h>
14
15
#include < ydb-cpp-sdk/client/proto/accessor.h>
@@ -48,12 +49,24 @@ bool IsFileExists(const TFsPath& path) {
48
49
return path.Exists () && path.IsFile ();
49
50
}
50
51
51
- template <typename TProtoType>
52
- TProtoType ReadProtoFromFile (const TFsPath& fsDirPath, const TLog* log, const NFiles::TFileInfo& fileInfo) {
52
+ TString ReadFromFile (const TFsPath& fsDirPath, const TLog* log, const NFiles::TFileInfo& fileInfo) {
53
53
const auto fsPath = fsDirPath.Child (fileInfo.FileName );
54
54
LOG_IMPL (log, ELogPriority::TLOG_DEBUG, " Read " << fileInfo.LogObjectType << " from " << fsPath.GetPath ().Quote ());
55
+ return TFileInput (fsPath).ReadAll ();
56
+ }
57
+
58
+ TString ReadViewQuery (const TFsPath& fsDirPath, const TLog* log) {
59
+ return ReadFromFile (fsDirPath, log, NFiles::CreateView ());
60
+ }
61
+
62
+ TString ReadAsyncReplicationQuery (const TFsPath& fsDirPath, const TLog* log) {
63
+ return ReadFromFile (fsDirPath, log, NFiles::CreateAsyncReplication ());
64
+ }
65
+
66
+ template <typename TProtoType>
67
+ TProtoType ReadProtoFromFile (const TFsPath& fsDirPath, const TLog* log, const NFiles::TFileInfo& fileInfo) {
55
68
TProtoType proto;
56
- Y_ENSURE (google::protobuf::TextFormat::ParseFromString (TFileInput (fsPath). ReadAll ( ), &proto));
69
+ Y_ENSURE (google::protobuf::TextFormat::ParseFromString (ReadFromFile (fsDirPath, log, fileInfo ), &proto));
57
70
return proto;
58
71
}
59
72
@@ -465,6 +478,10 @@ TRestoreResult TRestoreClient::RestoreFolder(
465
478
return RestoreCoordinationNode (fsPath, objectDbPath, settings, oldEntries.contains (objectDbPath));
466
479
}
467
480
481
+ if (IsFileExists (fsPath.Child (NFiles::CreateAsyncReplication ().FileName ))) {
482
+ return RestoreReplication (fsPath, objectDbPath, settings, oldEntries.contains (objectDbPath));
483
+ }
484
+
468
485
if (IsFileExists (fsPath.Child (NFiles::Empty ().FileName ))) {
469
486
return RestoreEmptyDir (fsPath, objectDbPath, settings, oldEntries.contains (objectDbPath));
470
487
}
@@ -486,6 +503,8 @@ TRestoreResult TRestoreClient::RestoreFolder(
486
503
result = RestoreTopic (child, childDbPath, settings, oldEntries.contains (childDbPath));
487
504
} else if (IsFileExists (child.Child (NFiles::CreateCoordinationNode ().FileName ))) {
488
505
result = RestoreCoordinationNode (child, childDbPath, settings, oldEntries.contains (childDbPath));
506
+ } else if (IsFileExists (child.Child (NFiles::CreateAsyncReplication ().FileName ))) {
507
+ result = RestoreReplication (child, childDbPath, settings, oldEntries.contains (childDbPath));
489
508
} else if (child.IsDirectory ()) {
490
509
result = RestoreFolder (child, dbRestoreRoot, Join (' /' , dbPathRelativeToRestoreRoot, child.GetName ()), settings, oldEntries);
491
510
}
@@ -520,13 +539,12 @@ TRestoreResult TRestoreClient::RestoreView(
520
539
const TString dbPath = dbRestoreRoot + dbPathRelativeToRestoreRoot;
521
540
LOG_I (" Restore view " << fsPath.GetPath ().Quote () << " to " << dbPath.Quote ());
522
541
523
- const auto createViewFile = fsPath.Child (NFiles::CreateView ().FileName );
524
- TString query = TFileInput (createViewFile).ReadAll ();
542
+ TString query = ReadViewQuery (fsPath, Log.get ());
525
543
526
544
NYql::TIssues issues;
527
545
const bool isDb = IsDatabase (SchemeClient, dbRestoreRoot);
528
- if (!RewriteCreateViewQuery (query, dbRestoreRoot, isDb, dbPath, createViewFile. GetPath (). Quote (), issues)) {
529
- return Result<TRestoreResult>(dbPath , EStatus::BAD_REQUEST, issues.ToString ());
546
+ if (!RewriteCreateViewQuery (query, dbRestoreRoot, isDb, dbPath, issues)) {
547
+ return Result<TRestoreResult>(fsPath. GetPath () , EStatus::BAD_REQUEST, issues.ToString ());
530
548
}
531
549
532
550
if (settings.DryRun_ ) {
@@ -584,6 +602,44 @@ TRestoreResult TRestoreClient::RestoreTopic(
584
602
return Result<TRestoreResult>(dbPath, std::move (result));
585
603
}
586
604
605
+ TRestoreResult TRestoreClient::RestoreReplication (
606
+ const TFsPath& fsPath,
607
+ const TString& dbPath,
608
+ const TRestoreSettings& settings,
609
+ bool isAlreadyExisting)
610
+ {
611
+ LOG_D (" Process " << fsPath.GetPath ().Quote ());
612
+
613
+ if (auto error = ErrorOnIncomplete (fsPath)) {
614
+ return *error;
615
+ }
616
+
617
+ LOG_I (" Restore async replication " << fsPath.GetPath ().Quote () << " to " << dbPath.Quote ());
618
+
619
+ if (settings.DryRun_ ) {
620
+ return CheckExistenceAndType (SchemeClient, dbPath, ESchemeEntryType::Replication);
621
+ }
622
+
623
+ auto query = ReadAsyncReplicationQuery (fsPath, Log.get ());
624
+
625
+ NYql::TIssues issues;
626
+ if (!RewriteCreateQuery (query, " CREATE ASYNC REPLICATION `{}`" , dbPath, issues)) {
627
+ return Result<TRestoreResult>(fsPath.GetPath (), EStatus::BAD_REQUEST, issues.ToString ());
628
+ }
629
+
630
+ auto result = QueryClient.RetryQuerySync ([&](NQuery::TSession session) {
631
+ return session.ExecuteQuery (query, NQuery::TTxControl::NoTx ()).ExtractValueSync ();
632
+ });
633
+
634
+ if (result.IsSuccess ()) {
635
+ LOG_D (" Created " << dbPath.Quote ());
636
+ return RestorePermissions (fsPath, dbPath, settings, isAlreadyExisting);
637
+ }
638
+
639
+ LOG_E (" Failed to create " << dbPath.Quote ());
640
+ return Result<TRestoreResult>(dbPath, std::move (result));
641
+ }
642
+
587
643
TRestoreResult TRestoreClient::RestoreRateLimiter (
588
644
const TFsPath& fsPath,
589
645
const TString& coordinationNodePath,
0 commit comments