Skip to content

Commit babf6c1

Browse files
EnjectionCyberROFL
andauthored
[backups] Add restore incremental backup op (#6979)
Co-authored-by: Ilnaz Nizametdinov <i.nizametdinov@gmail.com>
1 parent ded90c7 commit babf6c1

24 files changed

+518
-45
lines changed

ydb/core/backup/impl/table_writer_impl.h

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,28 +131,35 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
131131

132132
break;
133133
}
134-
case NKikimrChangeExchange::TDataChange::kReset:
134+
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];
135135
default:
136136
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
137137
}
138138
}
139139

140140
// just pass through, all conversions are on level above
141141
void SerializeRestore(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const {
142-
Y_ABORT_UNLESS(
143-
ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert,
144-
"Invariant violation");
145-
146142
record.SetSourceOffset(GetOrder());
147143
// TODO: fill WriteTxId
148144

149145
record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());
150146

151-
auto& upsert = *record.MutableUpsert();
152-
*upsert.MutableTags() = {
153-
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
154-
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
155-
upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
147+
switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
148+
case NKikimrChangeExchange::TDataChange::kUpsert: {
149+
auto& upsert = *record.MutableUpsert();
150+
*upsert.MutableTags() = {
151+
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
152+
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
153+
upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
154+
break;
155+
}
156+
case NKikimrChangeExchange::TDataChange::kErase:
157+
record.MutableErase();
158+
break;
159+
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];
160+
default:
161+
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
162+
}
156163
}
157164

158165
}; // TChangeRecord

ydb/core/persqueue/offload_actor.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,13 @@ class TOffloadActor
7070

7171
auto CreateWriterFactory() {
7272
return [=]() -> IActor* {
73-
return NBackup::NImpl::CreateLocalTableWriter(
74-
PathIdFromPathId(Config.GetIncrementalBackup().GetDstPathId()));
73+
if (Config.HasIncrementalBackup()) {
74+
return NBackup::NImpl::CreateLocalTableWriter(PathIdFromPathId(Config.GetIncrementalBackup().GetDstPathId()));
75+
} else {
76+
return NBackup::NImpl::CreateLocalTableWriter(
77+
PathIdFromPathId(Config.GetIncrementalRestore().GetDstPathId()),
78+
NBackup::NImpl::EWriterType::Restore);
79+
}
7580
};
7681
}
7782

ydb/core/protos/flat_scheme_op.proto

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ message TTableReplicationConfig {
347347
enum EReplicationMode {
348348
REPLICATION_MODE_NONE = 0;
349349
REPLICATION_MODE_READ_ONLY = 1;
350+
REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP = 2;
350351
}
351352

352353
enum EConsistency {
@@ -886,6 +887,7 @@ enum ECdcStreamMode {
886887
ECdcStreamModeNewImage = 3;
887888
ECdcStreamModeOldImage = 4;
888889
ECdcStreamModeNewAndOldImages = 5;
890+
ECdcStreamModeRestoreIncrBackup = 106;
889891
}
890892

891893
enum ECdcStreamFormat {
@@ -961,6 +963,7 @@ message TAlterContinuousBackup {
961963
}
962964

963965
message TTakeIncrementalBackup {
966+
optional string DstPath = 1;
964967
}
965968

966969
oneof Action {
@@ -973,6 +976,11 @@ message TDropContinuousBackup {
973976
optional string TableName = 1;
974977
}
975978

979+
message TRestoreIncrementalBackup {
980+
optional string SrcTableName = 1;
981+
optional string DstTableName = 2;
982+
}
983+
976984
enum EIndexType {
977985
EIndexTypeInvalid = 0;
978986
EIndexTypeGlobal = 1;
@@ -1614,6 +1622,8 @@ enum EOperationType {
16141622
ESchemeOpCreateResourcePool = 100;
16151623
ESchemeOpDropResourcePool = 101;
16161624
ESchemeOpAlterResourcePool = 102;
1625+
1626+
ESchemeOpRestoreIncrementalBackup = 103;
16171627
}
16181628

16191629
message TApplyIf {
@@ -1795,6 +1805,8 @@ message TModifyScheme {
17951805
optional bool AllowCreateInTempDir = 71 [default = false];
17961806

17971807
optional TResourcePoolDescription CreateResourcePool = 72;
1808+
1809+
optional TRestoreIncrementalBackup RestoreIncrementalBackup = 73;
17981810
}
17991811

18001812
message TCopySequence {

ydb/core/protos/pqconfig.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,13 @@ message TOffloadConfig {
311311
optional NKikimrProto.TPathID DstPathId = 2;
312312
}
313313

314+
message TIncrementalRestore {
315+
optional NKikimrProto.TPathID DstPathId = 1;
316+
}
317+
314318
oneof Strategy {
315319
TIncrementalBackup IncrementalBackup = 1;
320+
TIncrementalRestore IncrementalRestore = 2;
316321
}
317322
}
318323

ydb/core/tx/datashard/cdc_stream_scan.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,30 @@ class TDataShard::TTxCdcStreamScanProgress
208208
return updates;
209209
}
210210

211+
static std::optional<TVector<TUpdateOp>> MakeRestoreUpdates(TArrayRef<const TCell> cells, TArrayRef<const TTag> tags, TUserTable::TCPtr table) {
212+
Y_ABORT_UNLESS(cells.size() >= 1);
213+
TVector<TUpdateOp> updates(::Reserve(cells.size() - 1));
214+
215+
bool foundSpecialColumn = false;
216+
Y_ABORT_UNLESS(cells.size() == tags.size());
217+
for (TPos pos = 0; pos < cells.size(); ++pos) {
218+
const auto tag = tags.at(pos);
219+
auto it = table->Columns.find(tag);
220+
Y_ABORT_UNLESS(it != table->Columns.end());
221+
if (it->second.Name == "__incrBackupImpl_deleted") {
222+
if (const auto& cell = cells.at(pos); !cell.IsNull() && cell.AsValue<bool>()) {
223+
return std::nullopt;
224+
}
225+
foundSpecialColumn = true;
226+
continue;
227+
}
228+
updates.emplace_back(tag, ECellOp::Set, TRawTypeValue(cells.at(pos).AsRef(), it->second.Type));
229+
}
230+
Y_ABORT_UNLESS(foundSpecialColumn);
231+
232+
return updates;
233+
}
234+
211235
static TRowState MakeRow(TArrayRef<const TCell> cells) {
212236
TRowState row(cells.size());
213237

@@ -307,6 +331,13 @@ class TDataShard::TTxCdcStreamScanProgress
307331
case NKikimrSchemeOp::ECdcStreamModeUpdate:
308332
Serialize(body, ERowOp::Upsert, key, keyTags, MakeUpdates(v.GetCells(), valueTags, table));
309333
break;
334+
case NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup:
335+
if (auto updates = MakeRestoreUpdates(v.GetCells(), valueTags, table); updates) {
336+
Serialize(body, ERowOp::Upsert, key, keyTags, *updates);
337+
} else {
338+
Serialize(body, ERowOp::Erase, key, keyTags, {});
339+
}
340+
break;
310341
case NKikimrSchemeOp::ECdcStreamModeNewImage:
311342
case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages: {
312343
const auto newImage = MakeRow(v.GetCells());

ydb/core/tx/datashard/change_collector_cdc_stream.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
227227
case NKikimrSchemeOp::ECdcStreamModeUpdate:
228228
Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, MakeUpdates(**initialState, valueTags, valueTypes));
229229
break;
230+
case NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup: {
231+
Y_FAIL_S("Invariant violation: source table must be locked before restore.");
232+
break;
233+
}
230234
case NKikimrSchemeOp::ECdcStreamModeNewImage:
231235
case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages:
232236
Persist(tableId, pathId, ERowOp::Upsert, key, keyTags, nullptr, &*initialState, valueTags);
@@ -246,6 +250,8 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
246250
case NKikimrSchemeOp::ECdcStreamModeUpdate:
247251
Persist(tableId, pathId, rop, key, keyTags, updates);
248252
break;
253+
case NKikimrSchemeOp::ECdcStreamModeRestoreIncrBackup:
254+
Y_FAIL_S("Invariant violation: source table must be locked before restore.");
249255
case NKikimrSchemeOp::ECdcStreamModeNewImage:
250256
case NKikimrSchemeOp::ECdcStreamModeOldImage:
251257
case NKikimrSchemeOp::ECdcStreamModeNewAndOldImages:

0 commit comments

Comments
 (0)