Skip to content

Commit 2038471

Browse files
authored
Add coop create cdc evaluation to copy/create table calls (#10503)
1 parent 8074419 commit 2038471

19 files changed

+299
-99
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,16 @@ message TIndexCreationConfig {
974974
}
975975
}
976976

977+
message TGenericTxInFlyExtraData {
978+
message TTxCopyTableExtraData {
979+
optional NKikimrProto.TPathID CdcPathId = 1;
980+
}
981+
982+
// we do not use oneof here as far as we want to combine some subops rarely
983+
// and it can be used in this case
984+
optional TTxCopyTableExtraData TxCopyTableExtraData = 1;
985+
}
986+
977987
message TIndexAlteringConfig {
978988
optional string Name = 1;
979989
optional EIndexState State = 2;
@@ -1022,6 +1032,9 @@ message TCopyTableConfig { //TTableDescription implemets copying a table in orig
10221032
optional bool OmitIndexes = 3 [default = false];
10231033
optional bool OmitFollowers = 4 [default = false];
10241034
optional bool IsBackup = 5 [default = false];
1035+
1036+
// additionally creates cdc stream on src table consistently with taking snapshot
1037+
optional TCreateCdcStream CreateSrcCdcStream = 6;
10251038
}
10261039

10271040
message TConsistentTableCopyingConfig {

ydb/core/protos/tx_datashard.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,12 @@ message TMoveIndex {
451451
optional TRemapIndexPathId ReMapIndex = 3;
452452
}
453453

454+
455+
message TCreateIncrementalBackupSrc {
456+
optional TSendSnapshot SendSnapshot = 1;
457+
optional TCreateCdcStreamNotice CreateCdcStreamNotice = 2;
458+
}
459+
454460
message TFlatSchemeTransaction {
455461
optional NKikimrSchemeOp.TTableDescription CreateTable = 1;
456462
optional NKikimrSchemeOp.TTableDescription DropTable = 2;
@@ -482,6 +488,7 @@ message TFlatSchemeTransaction {
482488
optional TMoveIndex MoveIndex = 21;
483489

484490
optional NKikimrSchemeOp.TRestoreIncrementalBackup CreateIncrementalRestoreSrc = 22;
491+
optional TCreateIncrementalBackupSrc CreateIncrementalBackupSrc = 23;
485492
}
486493

487494
message TDistributedEraseTransaction {

ydb/core/tx/datashard/build_scheme_tx_out_rs_unit.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,18 @@ EExecutionStatus TBuildSchemeTxOutRSUnit::Execute(TOperation::TPtr op,
4444
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
4545

4646
auto &schemeTx = tx->GetSchemeTx();
47-
if (!schemeTx.HasSendSnapshot())
47+
if (!schemeTx.HasSendSnapshot() && !schemeTx.HasCreateIncrementalBackupSrc())
4848
return EExecutionStatus::Executed;
4949

5050
Y_ABORT_UNLESS(!op->InputSnapshots().empty(), "Snapshots expected");
5151

5252
auto &outReadSets = op->OutReadSets();
5353
ui64 srcTablet = DataShard.TabletID();
5454

55-
const auto& snapshot = schemeTx.GetSendSnapshot();
55+
const auto& snapshot =
56+
schemeTx.HasSendSnapshot() ?
57+
schemeTx.GetSendSnapshot() :
58+
schemeTx.GetCreateIncrementalBackupSrc().GetSendSnapshot();
5659
ui64 targetTablet = snapshot.GetSendTo(0).GetShard();
5760
ui64 tableId = snapshot.GetTableId_Deprecated();
5861
if (snapshot.HasTableId()) {

ydb/core/tx/datashard/check_scheme_tx_unit.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class TCheckSchemeTxUnit : public TExecutionUnit {
3939
bool CheckAlterCdcStream(TActiveTransaction *activeTx);
4040
bool CheckDropCdcStream(TActiveTransaction *activeTx);
4141
bool CheckCreateIncrementalRestoreSrc(TActiveTransaction *activeTx);
42+
bool CheckCreateIncrementalBackupSrc(TActiveTransaction *activeTx);
4243

4344
bool CheckSchemaVersion(TActiveTransaction *activeTx, ui64 proposedSchemaVersion, ui64 currentSchemaVersion, ui64 expectedSchemaVersion);
4445

@@ -384,6 +385,9 @@ bool TCheckSchemeTxUnit::CheckSchemeTx(TActiveTransaction *activeTx)
384385
case TSchemaOperation::ETypeCreateIncrementalRestoreSrc:
385386
res = CheckCreateIncrementalRestoreSrc(activeTx);
386387
break;
388+
case TSchemaOperation::ETypeCreateIncrementalBackupSrc:
389+
res = CheckCreateIncrementalBackupSrc(activeTx);
390+
break;
387391
default:
388392
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD,
389393
"Unknown scheme tx type detected at tablet "
@@ -757,6 +761,27 @@ bool TCheckSchemeTxUnit::CheckCreateIncrementalRestoreSrc(TActiveTransaction *ac
757761
return true;
758762
}
759763

764+
bool TCheckSchemeTxUnit::CheckCreateIncrementalBackupSrc(TActiveTransaction *activeTx) {
765+
if (HasDuplicate(activeTx, "CreateIncrementalBackupSrc", &TPipeline::HasCreateIncrementalBackupSrc)) {
766+
return false;
767+
}
768+
769+
const auto &snap = activeTx->GetSchemeTx().GetCreateIncrementalBackupSrc().GetSendSnapshot();
770+
ui64 tableId = snap.GetTableId_Deprecated();
771+
if (snap.HasTableId()) {
772+
Y_ABORT_UNLESS(DataShard.GetPathOwnerId() == snap.GetTableId().GetOwnerId());
773+
tableId = snap.GetTableId().GetTableId();
774+
}
775+
Y_ABORT_UNLESS(DataShard.GetUserTables().contains(tableId));
776+
777+
const auto &notice = activeTx->GetSchemeTx().GetCreateIncrementalBackupSrc().GetCreateCdcStreamNotice();
778+
if (!HasPathId(activeTx, notice, "CreateIncrementalBackupSrc")) {
779+
return false;
780+
}
781+
782+
return true;
783+
}
784+
760785
void TCheckSchemeTxUnit::Complete(TOperation::TPtr,
761786
const TActorContext &)
762787
{

ydb/core/tx/datashard/create_cdc_stream_unit.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ class TCreateCdcStreamUnit : public TExecutionUnit {
2525
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
2626

2727
auto& schemeTx = tx->GetSchemeTx();
28-
if (!schemeTx.HasCreateCdcStreamNotice()) {
28+
if (!schemeTx.HasCreateCdcStreamNotice() && !schemeTx.HasCreateIncrementalBackupSrc()) {
2929
return EExecutionStatus::Executed;
3030
}
3131

32-
const auto& params = schemeTx.GetCreateCdcStreamNotice();
32+
const auto& params =
33+
schemeTx.HasCreateCdcStreamNotice() ?
34+
schemeTx.GetCreateCdcStreamNotice() :
35+
schemeTx.GetCreateIncrementalBackupSrc().GetCreateCdcStreamNotice();
3336
const auto& streamDesc = params.GetStreamDescription();
3437
const auto streamPathId = PathIdFromPathId(streamDesc.GetPathId());
3538

ydb/core/tx/datashard/datashard_active_transaction.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ bool TActiveTransaction::BuildSchemeTx()
442442
+ (ui32)SchemeTx->HasDropCdcStreamNotice()
443443
+ (ui32)SchemeTx->HasMoveIndex()
444444
+ (ui32)SchemeTx->HasCreateIncrementalRestoreSrc()
445+
+ (ui32)SchemeTx->HasCreateIncrementalBackupSrc()
445446
;
446447
if (count != 1)
447448
return false;
@@ -480,6 +481,8 @@ bool TActiveTransaction::BuildSchemeTx()
480481
SchemeTxType = TSchemaOperation::ETypeMoveIndex;
481482
else if (SchemeTx->HasCreateIncrementalRestoreSrc())
482483
SchemeTxType = TSchemaOperation::ETypeCreateIncrementalRestoreSrc;
484+
else if (SchemeTx->HasCreateIncrementalBackupSrc())
485+
SchemeTxType = TSchemaOperation::ETypeCreateIncrementalBackupSrc;
483486
else
484487
SchemeTxType = TSchemaOperation::ETypeUnknown;
485488

ydb/core/tx/datashard/datashard_active_transaction.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct TSchemaOperation {
5555
ETypeDropCdcStream = 15,
5656
ETypeMoveIndex = 16,
5757
ETypeCreateIncrementalRestoreSrc = 17,
58+
ETypeCreateIncrementalBackupSrc = 18,
5859

5960
ETypeUnknown = Max<ui32>()
6061
};
@@ -111,6 +112,7 @@ struct TSchemaOperation {
111112
bool IsAlterCdcStream() const { return Type == ETypeAlterCdcStream; }
112113
bool IsDropCdcStream() const { return Type == ETypeDropCdcStream; }
113114
bool IsCreateIncrementalRestoreSrc() const { return Type == ETypeCreateIncrementalRestoreSrc; }
115+
bool IsCreateIncrementalBackupSrc() const { return Type == ETypeCreateIncrementalBackupSrc; }
114116

115117
bool IsReadOnly() const { return ReadOnly; }
116118
};

ydb/core/tx/datashard/datashard_pipeline.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ class TPipeline : TNonCopyable {
172172
bool HasAlterCdcStream() const { return SchemaTx && SchemaTx->IsAlterCdcStream(); }
173173
bool HasDropCdcStream() const { return SchemaTx && SchemaTx->IsDropCdcStream(); }
174174
bool HasCreateIncrementalRestoreSrc() const { return SchemaTx && SchemaTx->IsCreateIncrementalRestoreSrc(); }
175+
bool HasCreateIncrementalBackupSrc() const { return SchemaTx && SchemaTx->IsCreateIncrementalBackupSrc(); }
175176

176177
ui64 CurrentSchemaTxId() const {
177178
if (SchemaTx)

ydb/core/tx/datashard/make_snapshot_unit.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,16 +47,19 @@ EExecutionStatus TMakeSnapshotUnit::Execute(TOperation::TPtr op,
4747
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
4848

4949
auto &schemeTx = tx->GetSchemeTx();
50-
if (!schemeTx.HasSendSnapshot())
50+
if (!schemeTx.HasSendSnapshot() && !schemeTx.HasCreateIncrementalBackupSrc())
5151
return EExecutionStatus::Executed;
5252

5353
if (!op->IsWaitingForSnapshot()) {
54-
ui64 tableId = schemeTx.GetSendSnapshot().GetTableId_Deprecated();
55-
if (schemeTx.GetSendSnapshot().HasTableId()) {
56-
Y_ABORT_UNLESS(DataShard.GetPathOwnerId() == schemeTx.GetSendSnapshot().GetTableId().GetOwnerId());
57-
tableId = schemeTx.GetSendSnapshot().GetTableId().GetTableId();
54+
auto& snapshot =
55+
schemeTx.HasSendSnapshot() ?
56+
schemeTx.GetSendSnapshot() :
57+
schemeTx.GetCreateIncrementalBackupSrc().GetSendSnapshot();
58+
ui64 tableId = snapshot.GetTableId_Deprecated();
59+
if (snapshot.HasTableId()) {
60+
Y_ABORT_UNLESS(DataShard.GetPathOwnerId() == snapshot.GetTableId().GetOwnerId());
61+
tableId = snapshot.GetTableId().GetTableId();
5862
}
59-
6063
Y_ABORT_UNLESS(DataShard.GetUserTables().contains(tableId));
6164
ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid;
6265
const auto& openTxs = txc.DB.GetOpenTxs(localTableId);

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3537,9 +3537,15 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
35373537
if (!path->UserAttrs->AlterData) {
35383538
path->UserAttrs->AlterData = new TUserAttributes(path->UserAttrs->AlterVersion + 1);
35393539
}
3540+
} else if (txState.TxType == TTxState::TxCopyTable) {
3541+
if (!extraData.empty()) {
3542+
NKikimrSchemeOp::TGenericTxInFlyExtraData proto;
3543+
bool deserializeRes = ParseFromStringNoSizeLimit(proto, extraData);
3544+
Y_ABORT_UNLESS(deserializeRes);
3545+
txState.CdcPathId = PathIdFromPathId(proto.GetTxCopyTableExtraData().GetCdcPathId());
3546+
}
35403547
}
35413548

3542-
35433549
Y_ABORT_UNLESS(txState.TxType != TTxState::TxInvalid);
35443550
Y_ABORT_UNLESS(txState.State != TTxState::Invalid);
35453551

0 commit comments

Comments
 (0)