Skip to content

Commit d8c1e82

Browse files
Merge pull request #17734 from stanislav-shchetinin/auto-dropping-25-1
25-1: Auto-remove temporary export tables
2 parents c77771f + 4bf5919 commit d8c1e82

File tree

10 files changed

+390
-68
lines changed

10 files changed

+390
-68
lines changed

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,5 @@ message TFeatureFlags {
199199
optional bool EnableChangefeedsExport = 174 [default = false];
200200
optional bool SwitchToConfigV2 = 179 [default = false];
201201
optional bool SwitchToConfigV1 = 180 [default = false];
202+
optional bool EnableExportAutoDropping = 183 [default = false];
202203
}

ydb/core/tx/schemeshard/schemeshard_export.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn
100100
case TExportInfo::EState::CopyTables:
101101
exprt.SetProgress(Ydb::Export::ExportProgress::PROGRESS_PREPARING);
102102
break;
103-
103+
104+
case TExportInfo::EState::AutoDropping:
104105
case TExportInfo::EState::Transferring:
105106
case TExportInfo::EState::Done:
106107
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {

ydb/core/tx/schemeshard/schemeshard_export__create.cpp

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
432432
Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id);
433433
}
434434

435+
void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) {
436+
bool isContinued = false;
437+
PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx) {
438+
exportInfo->PendingDropItems.push_back(itemIdx);
439+
isContinued = true;
440+
AllocateTxId(exportInfo, itemIdx);
441+
});
442+
if (!isContinued) {
443+
AllocateTxId(exportInfo);
444+
}
445+
}
446+
435447
void SubscribeTx(TTxId txId) {
436448
Send(Self->SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(ui64(txId)));
437449
}
@@ -617,7 +629,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
617629
const auto& item = exportInfo->Items.at(itemIdx);
618630

619631
if (item.WaitTxId == InvalidTxId) {
620-
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
632+
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable && item.State <= EState::Transferring) {
621633
pendingTables.emplace_back(itemIdx);
622634
} else {
623635
UploadScheme(exportInfo, itemIdx, ctx);
@@ -665,6 +677,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
665677
SendNotificationsIfFinished(exportInfo);
666678
break;
667679

680+
case EState::AutoDropping:
668681
case EState::Dropping:
669682
if (!exportInfo->AllItemsAreDropped()) {
670683
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
@@ -749,6 +762,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
749762
}
750763
break;
751764

765+
case EState::AutoDropping:
752766
case EState::Dropping:
753767
if (exportInfo->PendingDropItems) {
754768
itemIdx = PopFront(exportInfo->PendingDropItems);
@@ -821,6 +835,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
821835
}
822836
break;
823837

838+
case EState::AutoDropping:
824839
case EState::Dropping:
825840
if (isMultipleMods || isNotExist) {
826841
if (record.GetPathDropTxId()) {
@@ -925,6 +940,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
925940
Self->PersistExportItemState(db, exportInfo, itemIdx);
926941
break;
927942

943+
case EState::AutoDropping:
928944
case EState::Dropping:
929945
if (!exportInfo->AllItemsAreDropped()) {
930946
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
@@ -1011,7 +1027,11 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10111027
Self->PersistExportItemState(db, exportInfo, itemIdx);
10121028

10131029
if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
1014-
EndExport(exportInfo, EState::Done, db);
1030+
if (!AppData()->FeatureFlags.GetEnableExportAutoDropping()) {
1031+
EndExport(exportInfo, EState::Done, db);
1032+
} else {
1033+
PrepareAutoDropping(Self, exportInfo, db);
1034+
}
10151035
}
10161036
} else if (exportInfo->State == EState::Cancellation) {
10171037
item.State = EState::Cancelled;
@@ -1140,14 +1160,19 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
11401160
}
11411161
}
11421162
if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
1143-
exportInfo->State = EState::Done;
1144-
exportInfo->EndTime = TAppData::TimeProvider->Now();
1163+
if (!AppData()->FeatureFlags.GetEnableExportAutoDropping()) {
1164+
exportInfo->State = EState::Done;
1165+
exportInfo->EndTime = TAppData::TimeProvider->Now();
1166+
} else {
1167+
PrepareAutoDropping(Self, exportInfo, db);
1168+
}
11451169
}
11461170

11471171
Self->PersistExportItemState(db, exportInfo, itemIdx);
11481172
break;
11491173
}
11501174

1175+
case EState::AutoDropping:
11511176
case EState::Dropping:
11521177
if (!exportInfo->AllItemsAreDropped()) {
11531178
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
@@ -1163,6 +1188,10 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
11631188
} else {
11641189
SendNotificationsIfFinished(exportInfo, true); // for tests
11651190

1191+
if (exportInfo->State == EState::AutoDropping) {
1192+
return EndExport(exportInfo, EState::Done, db);
1193+
}
1194+
11661195
if (exportInfo->Uid) {
11671196
Self->ExportsByUid.erase(exportInfo->Uid);
11681197
}

ydb/core/tx/schemeshard/schemeshard_export__forget.cpp

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,23 +77,8 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase {
7777
LOG_D("TExport::TTxForget, dropping export tables"
7878
<< ", info: " << exportInfo->ToString()
7979
);
80-
exportInfo->WaitTxId = InvalidTxId;
81-
exportInfo->State = TExportInfo::EState::Dropping;
82-
Self->PersistExportState(db, exportInfo);
83-
84-
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
85-
auto& item = exportInfo->Items.at(itemIdx);
86-
87-
item.WaitTxId = InvalidTxId;
88-
item.State = TExportInfo::EState::Dropped;
89-
90-
const TPath itemPath = TPath::Resolve(ExportItemPathName(Self, exportInfo, itemIdx), Self);
91-
if (itemPath.IsResolved() && !itemPath.IsDeleted()) {
92-
item.State = TExportInfo::EState::Dropping;
93-
}
94-
95-
Self->PersistExportItemState(db, exportInfo, itemIdx);
96-
}
80+
81+
PrepareDropping(Self, exportInfo, db);
9782

9883
Progress = true;
9984
}

ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,5 +319,39 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) {
319319
return TStringBuilder() << exportPathName << "/" << itemIdx;
320320
}
321321

322+
void PrepareDropping(
323+
TSchemeShard* ss,
324+
TExportInfo::TPtr exportInfo,
325+
NIceDb::TNiceDb& db,
326+
TExportInfo::EState droppingState,
327+
std::function<void(ui64)> func)
328+
{
329+
Y_ABORT_UNLESS(IsIn({TExportInfo::EState::AutoDropping, TExportInfo::EState::Dropping}, droppingState));
330+
331+
exportInfo->WaitTxId = InvalidTxId;
332+
exportInfo->State = droppingState;
333+
ss->PersistExportState(db, exportInfo);
334+
335+
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
336+
auto& item = exportInfo->Items.at(itemIdx);
337+
338+
item.WaitTxId = InvalidTxId;
339+
item.State = TExportInfo::EState::Dropped;
340+
const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss);
341+
if (itemPath.IsResolved() && !itemPath.IsDeleted()) {
342+
item.State = TExportInfo::EState::Dropping;
343+
if (exportInfo->State == TExportInfo::EState::AutoDropping) {
344+
func(itemIdx);
345+
}
346+
}
347+
348+
ss->PersistExportItemState(db, exportInfo, itemIdx);
349+
}
350+
}
351+
352+
void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) {
353+
PrepareDropping(ss, exportInfo, db, TExportInfo::EState::Dropping, [](ui64){});
354+
}
355+
322356
} // NSchemeShard
323357
} // NKikimr

ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,9 @@ THolder<TEvSchemeShard::TEvCancelTx> CancelPropose(
4747
TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx);
4848
TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx);
4949

50+
void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db,
51+
TExportInfo::EState droppingState, std::function<void(ui64)> func);
52+
void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db);
53+
5054
} // NSchemeShard
5155
} // NKikimr

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2668,6 +2668,7 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> {
26682668
Done = 240,
26692669
Dropping = 241,
26702670
Dropped = 242,
2671+
AutoDropping = 243,
26712672
Cancellation = 250,
26722673
Cancelled = 251,
26732674
};
@@ -2782,12 +2783,16 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> {
27822783
return State == EState::Dropping;
27832784
}
27842785

2786+
bool IsAutoDropping() const {
2787+
return State == EState::AutoDropping;
2788+
}
2789+
27852790
bool IsCancelling() const {
27862791
return State == EState::Cancellation;
27872792
}
27882793

27892794
bool IsInProgress() const {
2790-
return IsPreparing() || IsWorking() || IsDropping() || IsCancelling();
2795+
return IsPreparing() || IsWorking() || IsDropping() || IsAutoDropping() || IsCancelling();
27912796
}
27922797

27932798
bool IsDone() const {

0 commit comments

Comments
 (0)