Skip to content

Commit 7ee2eb1

Browse files
authored
Add ds incremental restore src unit and use it (#9694)
1 parent c1fc810 commit 7ee2eb1

33 files changed

+888
-155
lines changed

ydb/core/protos/config.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1178,6 +1178,17 @@ message TImmediateControlsConfig {
11781178
MinValue: 0,
11791179
MaxValue: 1,
11801180
DefaultValue: 0 }];
1181+
1182+
optional uint64 IncrementalRestoreReadAheadLo = 17 [(ControlOptions) = {
1183+
Description: "Override for incremental restore readahead (low watermark)",
1184+
MinValue: 0,
1185+
MaxValue: 67108864,
1186+
DefaultValue: 0 }];
1187+
optional uint64 IncrementalRestoreReadAheadHi = 18 [(ControlOptions) = {
1188+
Description: "Override for incremental restore readahead (high watermark)",
1189+
MinValue: 0,
1190+
MaxValue: 134217728,
1191+
DefaultValue: 0 }];
11811192
}
11821193

11831194
message TTxLimitControls {

ydb/core/protos/counters_schemeshard.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ enum ESimpleCounters {
220220
COUNTER_IN_FLIGHT_OPS_TxCreateResourcePool = 174 [(CounterOpts) = {Name: "InFlightOps/CreateResourcePool"}];
221221
COUNTER_IN_FLIGHT_OPS_TxDropResourcePool = 175 [(CounterOpts) = {Name: "InFlightOps/DropResourcePool"}];
222222
COUNTER_IN_FLIGHT_OPS_TxAlterResourcePool = 176 [(CounterOpts) = {Name: "InFlightOps/AlterResourcePool"}];
223+
224+
COUNTER_IN_FLIGHT_OPS_TxRestoreIncrementalBackupAtTable = 177 [(CounterOpts) = {Name: "InFlightOps/RestoreIncrementalBackupAtTable"}];
223225
}
224226

225227
enum ECumulativeCounters {
@@ -351,6 +353,8 @@ enum ECumulativeCounters {
351353
COUNTER_FINISHED_OPS_TxCreateResourcePool = 103 [(CounterOpts) = {Name: "FinishedOps/CreateResourcePool"}];
352354
COUNTER_FINISHED_OPS_TxDropResourcePool = 104 [(CounterOpts) = {Name: "FinishedOps/DropResourcePool"}];
353355
COUNTER_FINISHED_OPS_TxAlterResourcePool = 105 [(CounterOpts) = {Name: "FinishedOps/AlterResourcePool"}];
356+
357+
COUNTER_FINISHED_OPS_TxRestoreIncrementalBackupAtTable = 106 [(CounterOpts) = {Name: "FinishedOps/RestoreIncrementalBackupAtTable"}];
354358
}
355359

356360
enum EPercentileCounters {

ydb/core/protos/datashard_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ message TDataShardConfig {
2121
optional string CdcInitialScanTaskName = 17 [default = "cdc_initial_scan"];
2222
optional uint32 CdcInitialScanTaskPriority = 18 [default = 10];
2323
optional bool DisabledOnSchemeShard = 19 [default = false];
24+
optional uint64 IncrementalRestoreReadAheadLo = 20 [default = 524288];
25+
optional uint64 IncrementalRestoreReadAheadHi = 21 [default = 1048576];
2426
}

ydb/core/protos/flat_scheme_op.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,7 +1015,10 @@ message TDropContinuousBackup {
10151015

10161016
message TRestoreIncrementalBackup {
10171017
optional string SrcTableName = 1;
1018+
optional NKikimrProto.TPathID SrcPathId = 3;
1019+
10181020
optional string DstTableName = 2;
1021+
optional NKikimrProto.TPathID DstPathId = 4;
10191022
}
10201023

10211024
enum EIndexType {
@@ -1661,6 +1664,7 @@ enum EOperationType {
16611664
ESchemeOpAlterResourcePool = 102;
16621665

16631666
ESchemeOpRestoreIncrementalBackup = 103;
1667+
ESchemeOpRestoreIncrementalBackupAtTable = 104;
16641668
}
16651669

16661670
message TApplyIf {
@@ -1931,6 +1935,7 @@ enum EPathState {
19311935
EPathStateMigrated = 9;
19321936
EPathStateRestore = 10;
19331937
EPathStateMoving = 11;
1938+
EPathStateOutgoingIncrementalRestore = 12;
19341939
}
19351940

19361941
message TPathVersion {

ydb/core/protos/tx_datashard.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,8 @@ message TFlatSchemeTransaction {
480480
optional TAlterCdcStreamNotice AlterCdcStreamNotice = 19;
481481
optional TDropCdcStreamNotice DropCdcStreamNotice = 20;
482482
optional TMoveIndex MoveIndex = 21;
483+
484+
optional NKikimrSchemeOp.TRestoreIncrementalBackup CreateIncrementalRestoreSrc = 22;
483485
}
484486

485487
message TDistributedEraseTransaction {

ydb/core/tx/datashard/change_sender_async_index.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,13 @@ class TAsyncIndexChangeSenderMain
211211
}
212212

213213
IActor* CreateSender(ui64 partitionId) const override {
214-
return CreateTableChangeSenderShard(SelfId(), DataShard, partitionId, TargetTablePathId, TagMap);
214+
return CreateTableChangeSenderShard(
215+
SelfId(),
216+
DataShard,
217+
partitionId,
218+
TargetTablePathId,
219+
TagMap,
220+
ETableChangeSenderType::AsyncIndex);
215221
}
216222

217223
void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {

ydb/core/tx/datashard/change_sender_incr_restore.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,13 @@ class TIncrRestoreChangeSenderMain
126126
}
127127

128128
IActor* CreateSender(ui64 partitionId) const override {
129-
return CreateTableChangeSenderShard(SelfId(), DataShard, partitionId, TargetTablePathId, TagMap);
129+
return CreateTableChangeSenderShard(
130+
SelfId(),
131+
DataShard,
132+
partitionId,
133+
TargetTablePathId,
134+
TagMap,
135+
ETableChangeSenderType::IncrementalRestore);
130136
}
131137

132138
void Handle(NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TPtr& ev) {

ydb/core/tx/datashard/change_sender_table_base.cpp

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,16 @@ class TTableChangeSenderShard: public TActorBootstrapped<TTableChangeSenderShard
151151
record.SetPathOwnerId(TargetTablePathId.OwnerId);
152152
record.SetLocalPathId(TargetTablePathId.LocalPathId);
153153

154-
Y_ABORT_UNLESS(record.HasAsyncIndex());
155-
AdjustTags(*record.MutableAsyncIndex());
154+
switch(Type) {
155+
case ETableChangeSenderType::AsyncIndex:
156+
Y_ABORT_UNLESS(record.HasAsyncIndex());
157+
AdjustTags(*record.MutableAsyncIndex());
158+
break;
159+
case ETableChangeSenderType::IncrementalRestore:
160+
Y_ABORT_UNLESS(record.HasIncrementalRestore());
161+
AdjustTags(*record.MutableIncrementalRestore());
162+
break;
163+
}
156164
}
157165

158166
void AdjustTags(NKikimrChangeExchange::TDataChange& record) const {
@@ -282,15 +290,21 @@ class TTableChangeSenderShard: public TActorBootstrapped<TTableChangeSenderShard
282290
return NKikimrServices::TActivity::CHANGE_SENDER_ASYNC_INDEX_ACTOR_PARTITION;
283291
}
284292

285-
TTableChangeSenderShard(const TActorId& parent, const TDataShardId& dataShard, ui64 shardId,
286-
const TPathId& indexTablePathId, const TMap<TTag, TTag>& tagMap)
287-
: Parent(parent)
288-
, DataShard(dataShard)
289-
, ShardId(shardId)
290-
, TargetTablePathId(indexTablePathId)
291-
, TagMap(tagMap)
292-
, LeaseConfirmationCookie(0)
293-
, LastRecordOrder(0)
293+
TTableChangeSenderShard(
294+
const TActorId& parent,
295+
const TDataShardId& dataShard,
296+
ui64 shardId,
297+
const TPathId& targetTablePathId,
298+
const TMap<TTag, TTag>& tagMap,
299+
ETableChangeSenderType type)
300+
: Type(type)
301+
, Parent(parent)
302+
, DataShard(dataShard)
303+
, ShardId(shardId)
304+
, TargetTablePathId(targetTablePathId)
305+
, TagMap(tagMap)
306+
, LeaseConfirmationCookie(0)
307+
, LastRecordOrder(0)
294308
{
295309
}
296310

@@ -308,6 +322,7 @@ class TTableChangeSenderShard: public TActorBootstrapped<TTableChangeSenderShard
308322
}
309323

310324
private:
325+
const ETableChangeSenderType Type;
311326
const TActorId Parent;
312327
const TDataShardId DataShard;
313328
const ui64 ShardId;
@@ -331,10 +346,17 @@ IActor* CreateTableChangeSenderShard(
331346
const TActorId& parent,
332347
const TDataShardId& dataShard,
333348
ui64 shardId,
334-
const TPathId& indexTablePathId,
335-
const TMap<TTag, TTag>& tagMap)
349+
const TPathId& targetTablePathId,
350+
const TMap<TTag, TTag>& tagMap,
351+
ETableChangeSenderType type)
336352
{
337-
return new TTableChangeSenderShard(parent, dataShard, shardId, indexTablePathId, tagMap);
353+
return new TTableChangeSenderShard(
354+
parent,
355+
dataShard,
356+
shardId,
357+
targetTablePathId,
358+
tagMap,
359+
type);
338360
}
339361

340362
} // namespace NKikimr::NDataShard

ydb/core/tx/datashard/change_sender_table_base.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,11 +309,17 @@ struct TSchemeChecksMixin
309309

310310
};
311311

312+
enum class ETableChangeSenderType {
313+
AsyncIndex,
314+
IncrementalRestore,
315+
};
316+
312317
IActor* CreateTableChangeSenderShard(
313318
const TActorId& parent,
314319
const TDataShardId& dataShard,
315320
ui64 shardId,
316-
const TPathId& indexTablePathId,
317-
const TMap<TTag, TTag>& tagMap);
321+
const TPathId& targetTablePathId,
322+
const TMap<TTag, TTag>& tagMap,
323+
ETableChangeSenderType type);
318324

319325
} // namespace NKikimr::NDataShard

ydb/core/tx/datashard/check_scheme_tx_unit.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class TCheckSchemeTxUnit : public TExecutionUnit {
3838
bool CheckCreateCdcStream(TActiveTransaction *activeTx);
3939
bool CheckAlterCdcStream(TActiveTransaction *activeTx);
4040
bool CheckDropCdcStream(TActiveTransaction *activeTx);
41+
bool CheckCreateIncrementalRestoreSrc(TActiveTransaction *activeTx);
4142

4243
bool CheckSchemaVersion(TActiveTransaction *activeTx, ui64 proposedSchemaVersion, ui64 currentSchemaVersion, ui64 expectedSchemaVersion);
4344

@@ -380,6 +381,9 @@ bool TCheckSchemeTxUnit::CheckSchemeTx(TActiveTransaction *activeTx)
380381
case TSchemaOperation::ETypeDropCdcStream:
381382
res = CheckDropCdcStream(activeTx);
382383
break;
384+
case TSchemaOperation::ETypeCreateIncrementalRestoreSrc:
385+
res = CheckCreateIncrementalRestoreSrc(activeTx);
386+
break;
383387
default:
384388
LOG_ERROR_S(TActivationContext::AsActorContext(), NKikimrServices::TX_DATASHARD,
385389
"Unknown scheme tx type detected at tablet "
@@ -743,6 +747,16 @@ bool TCheckSchemeTxUnit::CheckDropCdcStream(TActiveTransaction *activeTx) {
743747
return CheckSchemaVersion(activeTx, notice);
744748
}
745749

750+
bool TCheckSchemeTxUnit::CheckCreateIncrementalRestoreSrc(TActiveTransaction *activeTx) {
751+
if (HasDuplicate(activeTx, "CreateIncrementalRestoreSrc", &TPipeline::HasCreateIncrementalRestoreSrc)) {
752+
return false;
753+
}
754+
755+
// TODO: add additional checks
756+
757+
return true;
758+
}
759+
746760
void TCheckSchemeTxUnit::Complete(TOperation::TPtr,
747761
const TActorContext &)
748762
{

0 commit comments

Comments
 (0)