Skip to content

Commit 9d39bb3

Browse files
authored
Decouple incremental backup from async replication (#7667)
1 parent d12d1cb commit 9d39bb3

18 files changed

+183
-23
lines changed

ydb/core/protos/flat_scheme_op.proto

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,6 @@ message TTableReplicationConfig {
348348
enum EReplicationMode {
349349
REPLICATION_MODE_NONE = 0;
350350
REPLICATION_MODE_READ_ONLY = 1;
351-
REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP = 2;
352351
}
353352

354353
enum EConsistency {
@@ -361,6 +360,22 @@ message TTableReplicationConfig {
361360
optional EConsistency Consistency = 2;
362361
}
363362

363+
message TTableIncrementalBackupConfig {
364+
enum ERestoreMode {
365+
RESTORE_MODE_NONE = 0;
366+
RESTORE_MODE_INCREMENTAL_BACKUP = 1;
367+
}
368+
369+
enum EConsistency {
370+
CONSISTENCY_UNKNOWN = 0;
371+
CONSISTENCY_STRONG = 1;
372+
CONSISTENCY_WEAK = 2;
373+
}
374+
375+
optional ERestoreMode Mode = 1;
376+
optional EConsistency Consistency = 2;
377+
}
378+
364379
message TTableDescription {
365380
optional string Name = 1;
366381
optional uint64 Id_Deprecated = 2; // LocalPathId, deprecated
@@ -397,7 +412,10 @@ message TTableDescription {
397412
repeated TCdcStreamDescription CdcStreams = 38;
398413
repeated TSequenceDescription Sequences = 39;
399414

400-
optional TTableReplicationConfig ReplicationConfig = 40;
415+
oneof IncomingStreamConfig {
416+
TTableReplicationConfig ReplicationConfig = 40;
417+
TTableIncrementalBackupConfig IncrementalBackupConfig = 43;
418+
}
401419

402420
optional bool Temporary = 41;
403421

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,6 +1571,15 @@ class TDataShard
15711571
return false;
15721572
}
15731573

1574+
bool IsIncrementalRestore() const {
1575+
for (const auto& [_, info] : TableInfos) {
1576+
if (info->IsIncrementalRestore()) {
1577+
return true;
1578+
}
1579+
}
1580+
return false;
1581+
}
1582+
15741583
ui32 Generation() const { return Executor()->Generation(); }
15751584
bool IsFollower() const { return Executor()->GetStats().IsFollower; }
15761585
bool SyncSchemeOnFollower(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx,

ydb/core/tx/datashard/datashard_repl_apply.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShar
3232
return true;
3333
}
3434

35-
if (!Self->IsReplicated()) {
35+
if (!Self->IsReplicated() && !Self->IsIncrementalRestore()) {
3636
Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
3737
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
3838
NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST,
39-
TStringBuilder() << "Table is not replicated");
39+
TStringBuilder() << "Table is nor replicated nor under incremental restore");
4040
return true;
4141
}
4242

@@ -106,13 +106,13 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShar
106106
TTransactionContext& txc, const TTableId& tableId, const TUserTable& userTable,
107107
TReplicationSourceState& source, const NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& change)
108108
{
109-
Y_ABORT_UNLESS(userTable.IsReplicated());
109+
Y_ABORT_UNLESS(userTable.IsReplicated() || Self->IsIncrementalRestore());
110110

111111
// TODO: check source and offset, persist new values
112112
i64 sourceOffset = change.GetSourceOffset();
113113

114114
ui64 writeTxId = change.GetWriteTxId();
115-
if (userTable.ReplicationConfig.HasWeakConsistency()) {
115+
if (userTable.ReplicationConfig.HasWeakConsistency() || userTable.IncrementalBackupConfig.HasWeakConsistency()) {
116116
if (writeTxId) {
117117
Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
118118
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,

ydb/core/tx/datashard/datashard_user_table.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,10 @@ bool TUserTable::IsReplicated() const {
231231
}
232232
}
233233

234+
bool TUserTable::IsIncrementalRestore() const {
235+
return IncrementalBackupConfig.Mode == NKikimrSchemeOp::TTableIncrementalBackupConfig::RESTORE_MODE_INCREMENTAL_BACKUP;
236+
}
237+
234238
void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
235239
{
236240
// We expect schemeshard to send us full list of storage rooms
@@ -311,6 +315,7 @@ void TUserTable::ParseProto(const NKikimrSchemeOp::TTableDescription& descr)
311315
TableSchemaVersion = descr.GetTableSchemaVersion();
312316
IsBackup = descr.GetIsBackup();
313317
ReplicationConfig = TReplicationConfig(descr.GetReplicationConfig());
318+
IncrementalBackupConfig = TIncrementalBackupConfig(descr.GetIncrementalBackupConfig());
314319

315320
CheckSpecialColumns();
316321

@@ -393,6 +398,7 @@ void TUserTable::AlterSchema() {
393398
schema.SetPartitionRangeEndIsInclusive(Range.ToInclusive);
394399

395400
ReplicationConfig.Serialize(*schema.MutableReplicationConfig());
401+
IncrementalBackupConfig.Serialize(*schema.MutableIncrementalBackupConfig());
396402

397403
schema.SetName(Name);
398404
schema.SetPath(Path);

ydb/core/tx/datashard/datashard_user_table.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,36 @@ struct TUserTable : public TThrRefBase {
352352
}
353353
};
354354

355+
struct TIncrementalBackupConfig {
356+
NKikimrSchemeOp::TTableIncrementalBackupConfig::ERestoreMode Mode;
357+
NKikimrSchemeOp::TTableIncrementalBackupConfig::EConsistency Consistency;
358+
359+
TIncrementalBackupConfig()
360+
: Mode(NKikimrSchemeOp::TTableIncrementalBackupConfig::RESTORE_MODE_NONE)
361+
, Consistency(NKikimrSchemeOp::TTableIncrementalBackupConfig::CONSISTENCY_UNKNOWN)
362+
{
363+
}
364+
365+
TIncrementalBackupConfig(const NKikimrSchemeOp::TTableIncrementalBackupConfig& config)
366+
: Mode(config.GetMode())
367+
, Consistency(config.GetConsistency())
368+
{
369+
}
370+
371+
bool HasWeakConsistency() const {
372+
return Consistency == NKikimrSchemeOp::TTableIncrementalBackupConfig::CONSISTENCY_WEAK;
373+
}
374+
375+
bool HasStrongConsistency() const {
376+
return Consistency == NKikimrSchemeOp::TTableIncrementalBackupConfig::CONSISTENCY_STRONG;
377+
}
378+
379+
void Serialize(NKikimrSchemeOp::TTableIncrementalBackupConfig& proto) const {
380+
proto.SetMode(Mode);
381+
proto.SetConsistency(Consistency);
382+
}
383+
};
384+
355385
struct TStats {
356386
NTable::TStats DataStats;
357387
ui64 MemRowCount = 0;
@@ -395,6 +425,7 @@ struct TUserTable : public TThrRefBase {
395425
TVector<ui32> KeyColumnIds;
396426
TSerializedTableRange Range;
397427
TReplicationConfig ReplicationConfig;
428+
TIncrementalBackupConfig IncrementalBackupConfig;
398429
bool IsBackup = false;
399430

400431
TMap<TPathId, TTableIndex> Indexes;
@@ -483,6 +514,7 @@ struct TUserTable : public TThrRefBase {
483514
bool NeedSchemaSnapshots() const;
484515

485516
bool IsReplicated() const;
517+
bool IsIncrementalRestore() const;
486518

487519
private:
488520
void DoApplyCreate(NTabletFlatExecutor::TTransactionContext& txc, const TString& tableName, bool shadow,

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
307307
return true;
308308
}
309309

310-
typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString, bool, TString> TTableRec;
310+
typedef std::tuple<TPathId, ui32, ui64, TString, TString, TString, ui64, TString, bool, TString, bool, TString, TString> TTableRec;
311311
typedef TDeque<TTableRec> TTableRows;
312312

313313
template <typename SchemaTable, typename TRowSet>
@@ -323,7 +323,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
323323
rowSet.template GetValueOrDefault<typename SchemaTable::IsBackup>(false),
324324
rowSet.template GetValueOrDefault<typename SchemaTable::ReplicationConfig>(),
325325
rowSet.template GetValueOrDefault<typename SchemaTable::IsTemporary>(false),
326-
rowSet.template GetValueOrDefault<typename SchemaTable::OwnerActorId>("")
326+
rowSet.template GetValueOrDefault<typename SchemaTable::OwnerActorId>(""),
327+
rowSet.template GetValueOrDefault<typename SchemaTable::IncrementalBackupConfig>()
327328
);
328329
}
329330

@@ -1829,6 +1830,15 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
18291830
}
18301831
}
18311832

1833+
if (const auto incrementalBackupConfig = std::get<12>(rec)) {
1834+
bool parseOk = ParseFromStringNoSizeLimit(tableInfo->MutableIncrementalBackupConfig(), incrementalBackupConfig);
1835+
Y_ABORT_UNLESS(parseOk);
1836+
1837+
if (tableInfo->IsRestoreTable()) {
1838+
Self->PathsById.at(pathId)->SetRestoreTable();
1839+
}
1840+
}
1841+
18321842
tableInfo->IsBackup = std::get<8>(rec);
18331843

18341844
Self->Tables[pathId] = tableInfo;

ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ TTableInfo::TAlterDataPtr ParseParams(const TPath& path, TTableInfo::TPtr table,
8686
if (!hasSchemaChanges
8787
&& !copyAlter.HasPartitionConfig()
8888
&& !copyAlter.HasTTLSettings()
89-
&& !copyAlter.HasReplicationConfig())
89+
&& !copyAlter.HasReplicationConfig()
90+
&& !copyAlter.HasIncrementalBackupConfig())
9091
{
9192
errStr = Sprintf("No changes specified");
9293
status = NKikimrScheme::StatusInvalidParameter;
@@ -731,7 +732,7 @@ TVector<ISubOperation::TPtr> CreateConsistentAlterTable(TOperationId id, const T
731732
// Admins can alter indexImplTable unconditionally.
732733
// Regular users can only alter allowed fields.
733734
if (!IsSuperUser(context.UserToken.Get())
734-
&& (!CheckAllowedFields(alter, {"Name", "PathId", "PartitionConfig", "ReplicationConfig"})
735+
&& (!CheckAllowedFields(alter, {"Name", "PathId", "PartitionConfig", "ReplicationConfig", "IncrementalBackupConfig"})
735736
|| (alter.HasPartitionConfig()
736737
&& !CheckAllowedFields(alter.GetPartitionConfig(), {"PartitioningPolicy"})
737738
)

ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,9 @@ class TCopyTable: public TSubOperation {
517517
// replication config is not copied
518518
schema.ClearReplicationConfig();
519519

520+
// incr backup config is not copied
521+
schema.ClearIncrementalBackupConfig();
522+
520523
NKikimrSchemeOp::TPartitionConfig compilationPartitionConfig;
521524
if (!TPartitionConfigMerger::ApplyChanges(compilationPartitionConfig, srcTableInfo->PartitionConfig(), schema.GetPartitionConfig(), AppData(), errStr)
522525
|| !TPartitionConfigMerger::VerifyCreateParams(compilationPartitionConfig, AppData(), IsShadowDataAllowed(), errStr)) {

ydb/core/tx/schemeshard/schemeshard__operation_create_restore_incremental_backup.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ void DoCreateAlterTable(
105105

106106
PathIdFromPathId(dstTablePath.Base()->PathId, desc.MutablePathId());
107107

108-
auto& replicationConfig = *desc.MutableReplicationConfig();
109-
replicationConfig.SetMode(NKikimrSchemeOp::TTableReplicationConfig::REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP);
110-
replicationConfig.SetConsistency(NKikimrSchemeOp::TTableReplicationConfig::CONSISTENCY_WEAK);
108+
auto& restoreConfig = *desc.MutableIncrementalBackupConfig();
109+
restoreConfig.SetMode(NKikimrSchemeOp::TTableIncrementalBackupConfig::RESTORE_MODE_INCREMENTAL_BACKUP);
110+
restoreConfig.SetConsistency(NKikimrSchemeOp::TTableIncrementalBackupConfig::CONSISTENCY_WEAK);
111111

112112
result.push_back(CreateAlterTable(NextPartId(opId, result), outTx));
113113
}

ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,10 @@ class TCreateTable: public TSubOperation {
654654
newTable->SetAsyncReplica();
655655
}
656656

657+
if (tableInfo->IsRestoreTable()) {
658+
newTable->SetRestoreTable();
659+
}
660+
657661
context.SS->Tables[newTable->PathId] = tableInfo;
658662
context.SS->TabletCounters->Simple()[COUNTER_TABLE_COUNT].Add(1);
659663
context.SS->IncrementPathDbRefCount(newTable->PathId, "new path created");

0 commit comments

Comments
 (0)