Skip to content

Commit fb2c723

Browse files
authored
Restore backup collection op (#12003)
1 parent b574773 commit fb2c723

25 files changed

+800
-155
lines changed

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,12 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
385385
break;
386386
}
387387

388+
case NKqpProto::TKqpSchemeOperation::kRestore: {
389+
const auto& modifyScheme = schemeOp.GetRestore();
390+
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
391+
break;
392+
}
393+
388394
default:
389395
InternalError(TStringBuilder() << "Unexpected scheme operation: "
390396
<< (ui32) schemeOp.GetOperationCase());

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,6 +1337,10 @@ class TKikimrIcGateway : public IKqpGateway {
13371337
return NotImplemented<TGenericResult>();
13381338
}
13391339

1340+
TFuture<TGenericResult> Restore(const TString&, const NYql::TBackupSettings&) override {
1341+
return NotImplemented<TGenericResult>();
1342+
}
1343+
13401344
TFuture<TGenericResult> CreateUser(const TString& cluster, const NYql::TCreateUserSettings& settings) override {
13411345
using TRequest = TEvTxUserProxy::TEvProposeTransaction;
13421346

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,6 +1439,49 @@ class TKqpGatewayProxy : public IKikimrGateway {
14391439
}
14401440
}
14411441

1442+
TFuture<TGenericResult> Restore(const TString& cluster, const NYql::TBackupSettings& settings) override {
1443+
CHECK_PREPARED_DDL(Restore);
1444+
1445+
try {
1446+
if (cluster != SessionCtx->GetCluster()) {
1447+
return MakeFuture(ResultFromError<TGenericResult>("Invalid cluster: " + cluster));
1448+
}
1449+
1450+
std::pair<TString, TString> pathPair;
1451+
if (settings.Name.StartsWith("/")) {
1452+
TString error;
1453+
if (!NSchemeHelpers::SplitTablePath(settings.Name, GetDatabase(), pathPair, error, true)) {
1454+
return MakeFuture(ResultFromError<TGenericResult>(error));
1455+
}
1456+
} else {
1457+
pathPair.second = ".backups/collections/" + settings.Name;
1458+
}
1459+
1460+
NKikimrSchemeOp::TModifyScheme tx;
1461+
tx.SetWorkingDir(GetDatabase() ? GetDatabase() : *GetDomainName());
1462+
tx.SetOperationType(NKikimrSchemeOp::ESchemeOpRestoreBackupCollection);
1463+
1464+
auto& op = *tx.MutableRestoreBackupCollection();
1465+
op.SetName(pathPair.second);
1466+
1467+
if (IsPrepare()) {
1468+
auto& phyQuery = *SessionCtx->Query().PreparingQuery->MutablePhysicalQuery();
1469+
auto& phyTx = *phyQuery.AddTransactions();
1470+
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
1471+
phyTx.MutableSchemeOperation()->MutableRestore()->Swap(&tx);
1472+
1473+
TGenericResult result;
1474+
result.SetSuccess();
1475+
return MakeFuture(result);
1476+
} else {
1477+
return Gateway->ModifyScheme(std::move(tx));
1478+
}
1479+
}
1480+
catch (yexception& e) {
1481+
return MakeFuture(ResultFromException<TGenericResult>(e));
1482+
}
1483+
}
1484+
14421485
TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) override {
14431486
CHECK_PREPARED_DDL(CreateUser);
14441487

ydb/core/kqp/provider/yql_kikimr_datasink.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,12 @@ class TKiSinkIntentDeterminationTransformer: public TKiSinkVisitorTransformer {
179179
return TStatus::Ok;
180180
}
181181

182+
TStatus HandleRestore(TKiRestore node, TExprContext& ctx) override {
183+
Y_UNUSED(ctx);
184+
Y_UNUSED(node);
185+
return TStatus::Ok;
186+
}
187+
182188
TStatus HandleCreateUser(TKiCreateUser node, TExprContext& ctx) override {
183189
ctx.AddError(TIssue(ctx.GetPosition(node.Pos()), TStringBuilder()
184190
<< "CreateUser is not yet implemented for intent determination transformer"));
@@ -613,6 +619,7 @@ class TKikimrDataSink : public TDataProviderBase
613619

614620
if (node.IsCallable(TKiBackup::CallableName())
615621
|| node.IsCallable(TKiBackupIncremental::CallableName())
622+
|| node.IsCallable(TKiRestore::CallableName())
616623
) {
617624
return true;
618625
}
@@ -1531,6 +1538,14 @@ class TKikimrDataSink : public TDataProviderBase
15311538
.Prefix().Build(key.GetBackupCollectionPath().Prefix)
15321539
.Done()
15331540
.Ptr();
1541+
} else if (mode == "restore") {
1542+
return Build<TKiRestore>(ctx, node->Pos())
1543+
.World(node->Child(0))
1544+
.DataSink(node->Child(1))
1545+
.BackupCollection().Build(key.GetBackupCollectionPath().Name)
1546+
.Prefix().Build(key.GetBackupCollectionPath().Prefix)
1547+
.Done()
1548+
.Ptr();
15341549
} else {
15351550
ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown operation type for backup collection: " << TString(mode)));
15361551
return nullptr;
@@ -1810,6 +1825,10 @@ IGraphTransformer::TStatus TKiSinkVisitorTransformer::DoTransform(TExprNode::TPt
18101825
return HandleBackupIncremental(node.Cast(), ctx);
18111826
}
18121827

1828+
if (auto node = TMaybeNode<TKiRestore>(input)) {
1829+
return HandleRestore(node.Cast(), ctx);
1830+
}
1831+
18131832
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "(Kikimr DataSink) Unsupported function: "
18141833
<< callable.CallableName()));
18151834
return TStatus::Error;

ydb/core/kqp/provider/yql_kikimr_exec.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2660,6 +2660,28 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer<TKi
26602660
}, "Executing BACKUP INCREMENTAL");
26612661
}
26622662

2663+
if (auto maybeRestore = TMaybeNode<TKiRestore>(input)) {
2664+
auto requireStatus = RequireChild(*input, 0);
2665+
if (requireStatus.Level != TStatus::Ok) {
2666+
return SyncStatus(requireStatus);
2667+
}
2668+
2669+
auto restore = maybeRestore.Cast();
2670+
2671+
TBackupSettings settings;
2672+
settings.Name = TString(restore.BackupCollection());
2673+
2674+
auto cluster = TString(restore.DataSink().Cluster());
2675+
auto future = Gateway->Restore(cluster, settings);
2676+
2677+
return WrapFuture(future,
2678+
[](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) {
2679+
Y_UNUSED(res);
2680+
auto resultNode = ctx.NewWorld(input->Pos());
2681+
return resultNode;
2682+
}, "Executing RESTORE");
2683+
}
2684+
26632685
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder()
26642686
<< "(Kikimr DataSink) Failed to execute node: " << input->Content()));
26652687
return SyncError();

ydb/core/kqp/provider/yql_kikimr_expr_nodes.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,17 @@
584584
{"Index": 2, "Name": "BackupCollection", "Type": "TCoAtom"},
585585
{"Index": 3, "Name": "Prefix", "Type": "TCoAtom"}
586586
]
587+
},
588+
{
589+
"Name": "TKiRestore",
590+
"Base": "TCallable",
591+
"Match": {"Type": "Callable", "Name": "KiRestore!"},
592+
"Children": [
593+
{"Index": 0, "Name": "World", "Type": "TExprBase"},
594+
{"Index": 1, "Name": "DataSink", "Type": "TKiDataSink"},
595+
{"Index": 2, "Name": "BackupCollection", "Type": "TCoAtom"},
596+
{"Index": 3, "Name": "Prefix", "Type": "TCoAtom"}
597+
]
587598
}
588599
]
589600
}

ydb/core/kqp/provider/yql_kikimr_gateway.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,6 +1106,8 @@ class IKikimrGateway : public TThrRefBase {
11061106

11071107
virtual NThreading::TFuture<TGenericResult> BackupIncremental(const TString& cluster, const TBackupSettings& settings) = 0;
11081108

1109+
virtual NThreading::TFuture<TGenericResult> Restore(const TString& cluster, const TBackupSettings& settings) = 0;
1110+
11091111
virtual NThreading::TFuture<TGenericResult> CreateUser(const TString& cluster, const TCreateUserSettings& settings) = 0;
11101112

11111113
virtual NThreading::TFuture<TGenericResult> AlterUser(const TString& cluster, const TAlterUserSettings& settings) = 0;

ydb/core/kqp/provider/yql_kikimr_provider.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ struct TKikimrData {
7979
DataSinkNames.insert(TKiDropBackupCollection::CallableName());
8080
DataSinkNames.insert(TKiBackup::CallableName());
8181
DataSinkNames.insert(TKiBackupIncremental::CallableName());
82+
DataSinkNames.insert(TKiRestore::CallableName());
8283

8384
CommitModes.insert(CommitModeFlush);
8485
CommitModes.insert(CommitModeRollback);
@@ -130,7 +131,8 @@ struct TKikimrData {
130131
TYdbOperation::AlterBackupCollection |
131132
TYdbOperation::DropBackupCollection |
132133
TYdbOperation::Backup |
133-
TYdbOperation::BackupIncremental;
134+
TYdbOperation::BackupIncremental |
135+
TYdbOperation::Restore;
134136

135137
SystemColumns = {
136138
{"_yql_partition_id", NKikimr::NUdf::EDataSlot::Uint64}
@@ -442,7 +444,7 @@ bool TKikimrKey::Extract(const TExprNode& key) {
442444
KeyType = Type::PGObject;
443445
Target = key.Child(0)->Child(1)->Child(0)->Content();
444446
ObjectType = key.Child(0)->Child(2)->Child(0)->Content();
445-
} else if (tagName == "backupCollection" || tagName == "backup") {
447+
} else if (tagName == "backupCollection" || tagName == "backup" || tagName == "restore") {
446448
KeyType = Type::BackupCollection;
447449
Target = key.Child(0)->Child(1)->Child(0)->Content();
448450
ExplicitPrefix = key.Child(0)->Child(2)->Child(0)->Content();

ydb/core/kqp/provider/yql_kikimr_provider.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ enum class TYdbOperation : ui64 {
250250
DropBackupCollection = 1ull << 30,
251251
Backup = 1ull << 31,
252252
BackupIncremental = 1ull << 32,
253+
Restore = 1ull << 33,
253254
};
254255

255256
Y_DECLARE_FLAGS(TYdbOperations, TYdbOperation);

ydb/core/kqp/provider/yql_kikimr_provider_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class TKiSinkVisitorTransformer : public TSyncTransformerBase {
8181
virtual TStatus HandleDropBackupCollection(NNodes::TKiDropBackupCollection node, TExprContext& ctx) = 0;
8282
virtual TStatus HandleBackup(NNodes::TKiBackup node, TExprContext& ctx) = 0;
8383
virtual TStatus HandleBackupIncremental(NNodes::TKiBackupIncremental node, TExprContext& ctx) = 0;
84+
virtual TStatus HandleRestore(NNodes::TKiRestore node, TExprContext& ctx) = 0;
8485
};
8586

8687
class TKikimrKey {

0 commit comments

Comments
 (0)