Skip to content

Commit 952ca0f

Browse files
authored
External data sources: restore (#14586)
1 parent 92e49da commit 952ca0f

File tree

3 files changed

+176
-15
lines changed

3 files changed

+176
-15
lines changed

ydb/apps/ydb/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Include external data sources and external tables in local backups (`ydb tools dump` and `ydb tools restore`). Both scheme objects are backed up as YQL creation queries saved in the `create_external_data_source.sql` and `create_external_table.sql` files respectively, which can be executed to recreate the original scheme objects.
12
* Fixed a bug where `ydb auth get-token` command tried to authenticate twice: while listing andpoints and while executing actual token request.
23
* Fixed a bug where `ydb import file csv` command was saving progress even if a batch upload had been failed.
34
* Include coordination nodes in local backups (`ydb tools dump` and `ydb tools restore`). Rate limiters that utilize the coordination node are saved in the coordination node's backup folder, preserving the existing path hierarchy.

ydb/public/lib/ydb_cli/dump/restore_impl.cpp

Lines changed: 162 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,14 @@ TString ReadAsyncReplicationQuery(const TFsPath& fsDirPath, const TLog* log) {
6969
return ReadFromFile(fsDirPath, log, NFiles::CreateAsyncReplication());
7070
}
7171

72+
TString ReadExternalDataSourceQuery(const TFsPath& fsDirPath, const TLog* log) {
73+
return ReadFromFile(fsDirPath, log, NFiles::CreateExternalDataSource());
74+
}
75+
76+
TString ReadExternalTableQuery(const TFsPath& fsDirPath, const TLog* log) {
77+
return ReadFromFile(fsDirPath, log, NFiles::CreateExternalTable());
78+
}
79+
7280
template <typename TProtoType>
7381
TProtoType ReadProtoFromFile(const TFsPath& fsDirPath, const TLog* log, const NFiles::TFileInfo& fileInfo) {
7482
TProtoType proto;
@@ -340,6 +348,16 @@ TRestoreResult TRestoreClient::RetryViewRestoration() {
340348
return result;
341349
}
342350

351+
TRestoreResult TRestoreClient::RestoreExternalTables() {
352+
for (const auto& [fsPath, dbPath, settings, isAlreadyExisting] : ExternalTableRestorationCalls) {
353+
auto result = RestoreExternalTable(fsPath, dbPath, settings, isAlreadyExisting);
354+
if (!result.IsSuccess()) {
355+
return result;
356+
}
357+
}
358+
return Result<TRestoreResult>();
359+
}
360+
343361
TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings) {
344362
LOG_I("Restore " << fsPath.Quote() << " to " << dbPath.Quote());
345363

@@ -376,8 +394,11 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP
376394

377395
// restore
378396
auto restoreResult = RestoreFolder(fsPath, dbPath, "", settings, oldEntries);
379-
if (auto retryViewResult = RetryViewRestoration(); !retryViewResult.IsSuccess()) {
380-
restoreResult = retryViewResult;
397+
if (auto result = RetryViewRestoration(); !result.IsSuccess()) {
398+
restoreResult = result;
399+
}
400+
if (auto result = RestoreExternalTables(); !result.IsSuccess()) {
401+
restoreResult = result;
381402
}
382403

383404
if (restoreResult.IsSuccess()) {
@@ -399,6 +420,8 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP
399420
return restoreResult;
400421
}
401422

423+
TVector<const TSchemeEntry*> entriesToDropInSecondPass;
424+
402425
for (const auto& entry : newDirectoryList.Entries) {
403426
if (oldEntries.contains(entry.Name)) {
404427
continue;
@@ -419,7 +442,7 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP
419442
break;
420443
case ESchemeEntryType::View:
421444
result = QueryClient.RetryQuerySync([&path = fullPath](NQuery::TSession session) {
422-
return session.ExecuteQuery(std::format("DROP VIEW IF EXISTS `{}`;", path),
445+
return session.ExecuteQuery(std::format("DROP VIEW `{}`;", path),
423446
NQuery::TTxControl::NoTx()).ExtractValueSync();
424447
});
425448
break;
@@ -433,6 +456,15 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP
433456
return client.DropNode(path).ExtractValueSync();
434457
});
435458
break;
459+
case ESchemeEntryType::ExternalDataSource:
460+
entriesToDropInSecondPass.emplace_back(&entry);
461+
continue;
462+
case ESchemeEntryType::ExternalTable:
463+
result = QueryClient.RetryQuerySync([&path = fullPath](NQuery::TSession session) {
464+
return session.ExecuteQuery(std::format("DROP EXTERNAL TABLE `{}`;", path),
465+
NQuery::TTxControl::NoTx()).ExtractValueSync();
466+
});
467+
break;
436468
default:
437469
break;
438470
}
@@ -442,7 +474,27 @@ TRestoreResult TRestoreClient::Restore(const TString& fsPath, const TString& dbP
442474
return restoreResult;
443475
} else if (!result->IsSuccess()) {
444476
LOG_E("Error removing " << entry.Type << ": " << TString{fullPath}.Quote()
445-
<< ": " << result->GetIssues().ToOneLineString());
477+
<< ", issues: " << result->GetIssues().ToOneLineString());
478+
return restoreResult;
479+
}
480+
}
481+
482+
for (const auto* entry : entriesToDropInSecondPass) {
483+
TMaybe<TStatus> result;
484+
switch (entry->Type) {
485+
case ESchemeEntryType::ExternalDataSource:
486+
result = QueryClient.RetryQuerySync([&path = entry->Name](NQuery::TSession session) {
487+
return session.ExecuteQuery(std::format("DROP EXTERNAL DATA SOURCE `{}`;", path),
488+
NQuery::TTxControl::NoTx()).ExtractValueSync();
489+
});
490+
break;
491+
default:
492+
break;
493+
}
494+
Y_ENSURE(result, "Unexpected entry to drop in the second pass");
495+
if (!result->IsSuccess()) {
496+
LOG_E("Error removing " << entry->Type << ": " << TString{entry->Name}.Quote()
497+
<< ", issues: " << result->GetIssues().ToOneLineString());
446498
return restoreResult;
447499
}
448500
}
@@ -476,7 +528,7 @@ TRestoreResult TRestoreClient::RestoreClusterRoot(const TFsPath& fsPath) {
476528
}
477529

478530
LOG_I("Restore cluster root " << ClusterRootPath.Quote() << " from " << fsPath.GetPath().Quote());
479-
531+
480532
if (!fsPath.Exists()) {
481533
return Result<TRestoreResult>(EStatus::BAD_REQUEST,
482534
TStringBuilder() << "Specified folder does not exist: " << fsPath.GetPath());
@@ -508,7 +560,7 @@ TRestoreResult TRestoreClient::RestoreClusterRoot(const TFsPath& fsPath) {
508560
if (auto result = RestoreGroupMembers(rootTableClient, fsPath, ClusterRootPath); !result.IsSuccess()) {
509561
return result;
510562
}
511-
563+
512564
if (auto result = RestorePermissionsImpl(rootSchemeClient, fsPath, ClusterRootPath); !result.IsSuccess()) {
513565
return result;
514566
}
@@ -521,7 +573,7 @@ TRestoreResult TRestoreClient::WaitForAvailableNodes(const TString& database, TD
521573
dbDriverConfig.SetDatabase(database);
522574

523575
THPTimer timer;
524-
576+
525577
NDiscovery::TDiscoveryClient client(dbDriverConfig);
526578
TDuration retrySleep = TDuration::MilliSeconds(1000);
527579
while (true) {
@@ -561,13 +613,13 @@ TRestoreResult TRestoreClient::RestoreUsers(TTableClient& client, const TFsPath&
561613
auto statementResult = client.RetryOperationSync([&](TSession session) {
562614
return session.ExecuteSchemeQuery(statement).ExtractValueSync();
563615
});
564-
616+
565617
if (statement.StartsWith("CREATE")
566618
&& statementResult.GetStatus() == EStatus::PRECONDITION_FAILED
567619
&& statementResult.GetIssues().ToOneLineString().find("exists") != TString::npos)
568620
{
569621
LOG_D("User from create statement " << statement.Quote() << " already exists, trying to alter it");
570-
auto alterStatement = "ALTER" + statement.substr(6);
622+
auto alterStatement = "ALTER" + statement.substr(6);
571623
auto alterStatementResult = client.RetryOperationSync([&](TSession session) {
572624
return session.ExecuteSchemeQuery(alterStatement).ExtractValueSync();
573625
});
@@ -658,7 +710,7 @@ TRestoreResult TRestoreClient::ReplaceClusterRoot(TString& outPath) {
658710
if (clusterRootEnd != std::string::npos) {
659711
outPath = ClusterRootPath + outPath.substr(clusterRootEnd);
660712
} else {
661-
return Result<TRestoreResult>(EStatus::INTERNAL_ERROR,
713+
return Result<TRestoreResult>(EStatus::INTERNAL_ERROR,
662714
TStringBuilder() << "Can't find cluster root path in "
663715
<< outPath.Quote() << " to replace it on "
664716
<< ClusterRootPath.Quote());
@@ -686,7 +738,7 @@ TRestoreResult TRestoreClient::RestoreDatabaseImpl(const TString& fsPath, const
686738
}
687739

688740
LOG_I("Restore database from " << fsPath.Quote() << " to " << dbPath.Quote());
689-
741+
690742
if (auto result = CreateDatabase(CmsClient, dbPath, TCreateDatabaseSettings(dbDesc)); !result.IsSuccess()) {
691743
if (result.GetStatus() == EStatus::ALREADY_EXISTS) {
692744
LOG_W("Database " << dbPath.Quote() << " already exists, continue restoring to this database");
@@ -723,8 +775,11 @@ TRestoreResult TRestoreClient::RestoreDatabaseImpl(const TString& fsPath, const
723775

724776
if (settings.WithContent_) {
725777
auto restoreResult = RestoreFolder(fsPath, dbPath, "", {}, {});
726-
if (auto retryViewResult = RetryViewRestoration(); !retryViewResult.IsSuccess()) {
727-
restoreResult = retryViewResult;
778+
if (auto result = RetryViewRestoration(); !result.IsSuccess()) {
779+
restoreResult = result;
780+
}
781+
if (auto result = RestoreExternalTables(); !result.IsSuccess()) {
782+
restoreResult = result;
728783
}
729784
return restoreResult;
730785
} else {
@@ -761,6 +816,7 @@ TRestoreResult TRestoreClient::RestoreDatabases(const TFsPath& fsPath, const TRe
761816
if (IsFileExists(fsPath.Child(NFiles::Database().FileName))) {
762817
TRestoreDatabaseSettings dbSettings = {
763818
.WaitNodesDuration_ = settings.WaitNodesDuration_,
819+
.Database_ = std::nullopt,
764820
.WithContent_ = false
765821
};
766822

@@ -866,6 +922,16 @@ TRestoreResult TRestoreClient::RestoreFolder(
866922
return RestoreReplication(fsPath, dbRestoreRoot, dbPathRelativeToRestoreRoot, settings, oldEntries.contains(objectDbPath));
867923
}
868924

925+
if (IsFileExists(fsPath.Child(NFiles::CreateExternalDataSource().FileName))) {
926+
return RestoreExternalDataSource(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath));
927+
}
928+
929+
if (IsFileExists(fsPath.Child(NFiles::CreateExternalTable().FileName))) {
930+
// delay external table restoration
931+
ExternalTableRestorationCalls.emplace_back(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath));
932+
return Result<TRestoreResult>();
933+
}
934+
869935
if (IsFileExists(fsPath.Child(NFiles::Empty().FileName))) {
870936
return RestoreEmptyDir(fsPath, objectDbPath, settings, oldEntries.contains(objectDbPath));
871937
}
@@ -889,6 +955,11 @@ TRestoreResult TRestoreClient::RestoreFolder(
889955
result = RestoreCoordinationNode(child, childDbPath, settings, oldEntries.contains(childDbPath));
890956
} else if (IsFileExists(child.Child(NFiles::CreateAsyncReplication().FileName))) {
891957
result = RestoreReplication(child, dbRestoreRoot, Join('/', dbPathRelativeToRestoreRoot, child.GetName()), settings, oldEntries.contains(childDbPath));
958+
} else if (IsFileExists(child.Child(NFiles::CreateExternalDataSource().FileName))) {
959+
result = RestoreExternalDataSource(child, childDbPath, settings, oldEntries.contains(childDbPath));
960+
} else if (IsFileExists(child.Child(NFiles::CreateExternalTable().FileName))) {
961+
// delay external table restoration
962+
ExternalTableRestorationCalls.emplace_back(child, childDbPath, settings, oldEntries.contains(childDbPath));
892963
} else if (child.IsDirectory()) {
893964
result = RestoreFolder(child, dbRestoreRoot, Join('/', dbPathRelativeToRestoreRoot, child.GetName()), settings, oldEntries);
894965
}
@@ -900,7 +971,7 @@ TRestoreResult TRestoreClient::RestoreFolder(
900971

901972
const bool dbPathExists = oldEntries.contains(dbPath);
902973
if (!result.Defined() && !dbPathExists) {
903-
// This situation occurs when all the children of the folder are views.
974+
// This situation occurs when all the children of the folder are views or external tables.
904975
return RestoreEmptyDir(fsPath, dbPath, settings, dbPathExists);
905976
}
906977

@@ -1118,6 +1189,82 @@ TRestoreResult TRestoreClient::RestoreCoordinationNode(
11181189
return Result<TRestoreResult>(dbPath, std::move(result));
11191190
}
11201191

1192+
TRestoreResult TRestoreClient::RestoreExternalDataSource(
1193+
const TFsPath& fsPath,
1194+
const TString& dbPath,
1195+
const TRestoreSettings& settings,
1196+
bool isAlreadyExisting)
1197+
{
1198+
LOG_D("Process " << fsPath.GetPath().Quote());
1199+
1200+
if (auto error = ErrorOnIncomplete(fsPath)) {
1201+
return *error;
1202+
}
1203+
1204+
LOG_I("Restore external data source " << fsPath.GetPath().Quote() << " to " << dbPath.Quote());
1205+
1206+
if (settings.DryRun_) {
1207+
return CheckExistenceAndType(SchemeClient, dbPath, ESchemeEntryType::ExternalDataSource);
1208+
}
1209+
1210+
TString query = ReadExternalDataSourceQuery(fsPath, Log.get());
1211+
1212+
NYql::TIssues issues;
1213+
if (!RewriteCreateQuery(query, "CREATE EXTERNAL DATA SOURCE IF NOT EXISTS `{}`", dbPath, issues)) {
1214+
return Result<TRestoreResult>(fsPath.GetPath(), EStatus::BAD_REQUEST, issues.ToString());
1215+
}
1216+
1217+
auto result = QueryClient.RetryQuerySync([&](NQuery::TSession session) {
1218+
return session.ExecuteQuery(query, NQuery::TTxControl::NoTx()).ExtractValueSync();
1219+
});
1220+
1221+
if (result.IsSuccess()) {
1222+
LOG_D("Created " << dbPath.Quote());
1223+
return RestorePermissions(fsPath, dbPath, settings, isAlreadyExisting);
1224+
}
1225+
1226+
LOG_E("Failed to create " << dbPath.Quote());
1227+
return Result<TRestoreResult>(dbPath, std::move(result));
1228+
}
1229+
1230+
TRestoreResult TRestoreClient::RestoreExternalTable(
1231+
const TFsPath& fsPath,
1232+
const TString& dbPath,
1233+
const TRestoreSettings& settings,
1234+
bool isAlreadyExisting)
1235+
{
1236+
LOG_D("Process " << fsPath.GetPath().Quote());
1237+
1238+
if (auto error = ErrorOnIncomplete(fsPath)) {
1239+
return *error;
1240+
}
1241+
1242+
LOG_I("Restore external table " << fsPath.GetPath().Quote() << " to " << dbPath.Quote());
1243+
1244+
if (settings.DryRun_) {
1245+
return CheckExistenceAndType(SchemeClient, dbPath, ESchemeEntryType::ExternalTable);
1246+
}
1247+
1248+
TString query = ReadExternalTableQuery(fsPath, Log.get());
1249+
1250+
NYql::TIssues issues;
1251+
if (!RewriteCreateQuery(query, "CREATE EXTERNAL TABLE IF NOT EXISTS `{}`", dbPath, issues)) {
1252+
return Result<TRestoreResult>(fsPath.GetPath(), EStatus::BAD_REQUEST, issues.ToString());
1253+
}
1254+
1255+
auto result = QueryClient.RetryQuerySync([&](NQuery::TSession session) {
1256+
return session.ExecuteQuery(query, NQuery::TTxControl::NoTx()).ExtractValueSync();
1257+
});
1258+
1259+
if (result.IsSuccess()) {
1260+
LOG_D("Created " << dbPath.Quote());
1261+
return RestorePermissions(fsPath, dbPath, settings, isAlreadyExisting);
1262+
}
1263+
1264+
LOG_E("Failed to create " << dbPath.Quote());
1265+
return Result<TRestoreResult>(dbPath, std::move(result));
1266+
}
1267+
11211268
TRestoreResult TRestoreClient::RestoreTable(
11221269
const TFsPath& fsPath,
11231270
const TString& dbPath,
@@ -1544,7 +1691,7 @@ TRestoreResult TRestoreClient::RestorePermissionsImpl(
15441691
if (result.GetStatus() == EStatus::UNAUTHORIZED) {
15451692
LOG_W("Not enough rights to restore permissions on " << dbPath.Quote() << ", skipping");
15461693
return Result<TRestoreResult>();
1547-
}
1694+
}
15481695

15491696
return result;
15501697
}

ydb/public/lib/ydb_cli/dump/restore_impl.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ class TRestoreClient {
135135
TRestoreResult RestoreCoordinationNode(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
136136
TRestoreResult RestoreDependentResources(const TFsPath& fsPath, const TString& dbPath);
137137
TRestoreResult RestoreRateLimiter(const TFsPath& fsPath, const TString& coordinationNodePath, const TString& resourcePath);
138+
TRestoreResult RestoreExternalDataSource(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
139+
TRestoreResult RestoreExternalTable(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
138140

139141
TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
140142
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc, ui32 partitionCount);
@@ -147,6 +149,7 @@ class TRestoreClient {
147149
TRestoreResult ReplaceClusterRoot(TString& outPath);
148150
TRestoreResult WaitForAvailableNodes(const TString& database, TDuration waitDuration);
149151
TRestoreResult RetryViewRestoration();
152+
TRestoreResult RestoreExternalTables();
150153

151154
TRestoreResult RestoreClusterRoot(const TFsPath& fsPath);
152155
TRestoreResult RestoreDatabases(const TFsPath& fsPath, const TRestoreClusterSettings& settings);
@@ -196,6 +199,16 @@ class TRestoreClient {
196199
// If the dependency is not created yet, then the view restoration will fail.
197200
// We retry failed view creation attempts until either all views are created, or the errors are persistent.
198201
TVector<TRestoreViewCall> ViewRestorationCalls;
202+
203+
struct TRestoreExternalTableCall {
204+
TFsPath FsPath;
205+
TString DbPath;
206+
TRestoreSettings Settings;
207+
bool IsAlreadyExisting;
208+
};
209+
// External Tables depend on External Data Sources and need to be restored after them.
210+
TVector<TRestoreExternalTableCall> ExternalTableRestorationCalls;
211+
199212
TString ClusterRootPath;
200213
}; // TRestoreClient
201214

0 commit comments

Comments
 (0)