Skip to content

Commit 93c8c7f

Browse files
Auto-remove temporary export tables (#16076)
1 parent c26c715 commit 93c8c7f

File tree

9 files changed

+250
-45
lines changed

9 files changed

+250
-45
lines changed

ydb/core/tx/schemeshard/schemeshard_export__create.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
796796
SendNotificationsIfFinished(exportInfo);
797797
break;
798798

799+
case EState::AutoDropping:
799800
case EState::Dropping:
800801
if (!exportInfo->AllItemsAreDropped()) {
801802
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
@@ -880,6 +881,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
880881
}
881882
break;
882883

884+
case EState::AutoDropping:
883885
case EState::Dropping:
884886
if (exportInfo->PendingDropItems) {
885887
itemIdx = PopFront(exportInfo->PendingDropItems);
@@ -952,6 +954,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
952954
}
953955
break;
954956

957+
case EState::AutoDropping:
955958
case EState::Dropping:
956959
if (isMultipleMods || isNotExist) {
957960
if (record.GetPathDropTxId()) {
@@ -1056,6 +1059,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10561059
Self->PersistExportItemState(db, exportInfo, itemIdx);
10571060
break;
10581061

1062+
case EState::AutoDropping:
10591063
case EState::Dropping:
10601064
if (!exportInfo->AllItemsAreDropped()) {
10611065
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
@@ -1142,7 +1146,8 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
11421146
Self->PersistExportItemState(db, exportInfo, itemIdx);
11431147

11441148
if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
1145-
EndExport(exportInfo, EState::Done, db);
1149+
PrepareDropping(Self, exportInfo, db, true);
1150+
AllocateTxId(exportInfo);
11461151
}
11471152
} else if (exportInfo->State == EState::Cancellation) {
11481153
item.State = EState::Cancelled;
@@ -1342,14 +1347,15 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
13421347
}
13431348
}
13441349
if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
1345-
exportInfo->State = EState::Done;
1346-
exportInfo->EndTime = TAppData::TimeProvider->Now();
1350+
PrepareDropping(Self, exportInfo, db, true);
1351+
AllocateTxId(exportInfo);
13471352
}
13481353

13491354
Self->PersistExportItemState(db, exportInfo, itemIdx);
13501355
break;
13511356
}
13521357

1358+
case EState::AutoDropping:
13531359
case EState::Dropping:
13541360
if (!exportInfo->AllItemsAreDropped()) {
13551361
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
@@ -1359,12 +1365,16 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
13591365
item.WaitTxId = InvalidTxId;
13601366
Self->PersistExportItemState(db, exportInfo, itemIdx);
13611367

1362-
if (exportInfo->AllItemsAreDropped()) {
1368+
if (exportInfo->AllItemsAreDropped() || exportInfo->State == EState::AutoDropping) {
13631369
AllocateTxId(exportInfo);
13641370
}
13651371
} else {
13661372
SendNotificationsIfFinished(exportInfo, true); // for tests
13671373

1374+
if (exportInfo->State == EState::AutoDropping) {
1375+
return EndExport(exportInfo, EState::Done, db);
1376+
}
1377+
13681378
if (exportInfo->Uid) {
13691379
Self->ExportsByUid.erase(exportInfo->Uid);
13701380
}

ydb/core/tx/schemeshard/schemeshard_export__forget.cpp

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,23 +77,8 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase {
7777
LOG_D("TExport::TTxForget, dropping export tables"
7878
<< ", info: " << exportInfo->ToString()
7979
);
80-
exportInfo->WaitTxId = InvalidTxId;
81-
exportInfo->State = TExportInfo::EState::Dropping;
82-
Self->PersistExportState(db, exportInfo);
83-
84-
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
85-
auto& item = exportInfo->Items.at(itemIdx);
86-
87-
item.WaitTxId = InvalidTxId;
88-
item.State = TExportInfo::EState::Dropped;
89-
90-
const TPath itemPath = TPath::Resolve(ExportItemPathName(Self, exportInfo, itemIdx), Self);
91-
if (itemPath.IsResolved() && !itemPath.IsDeleted()) {
92-
item.State = TExportInfo::EState::Dropping;
93-
}
94-
95-
Self->PersistExportItemState(db, exportInfo, itemIdx);
96-
}
80+
81+
PrepareDropping(Self, exportInfo, db);
9782

9883
Progress = true;
9984
}

ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,5 +328,27 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) {
328328
return TStringBuilder() << exportPathName << "/" << itemIdx;
329329
}
330330

331+
void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, bool isAutoDropping) {
332+
exportInfo->WaitTxId = InvalidTxId;
333+
exportInfo->State = isAutoDropping ? TExportInfo::EState::AutoDropping : TExportInfo::EState::Dropping;
334+
ss->PersistExportState(db, exportInfo);
335+
336+
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
337+
auto& item = exportInfo->Items.at(itemIdx);
338+
339+
item.WaitTxId = InvalidTxId;
340+
item.State = TExportInfo::EState::Dropped;
341+
const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss);
342+
if (itemPath.IsResolved() && !itemPath.IsDeleted()) {
343+
item.State = TExportInfo::EState::Dropping;
344+
if (isAutoDropping) {
345+
exportInfo->PendingDropItems.push_back(itemIdx);
346+
}
347+
}
348+
349+
ss->PersistExportItemState(db, exportInfo, itemIdx);
350+
}
351+
}
352+
331353
} // NSchemeShard
332354
} // NKikimr

ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,7 @@ THolder<TEvSchemeShard::TEvCancelTx> CancelPropose(
4747
TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx);
4848
TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx);
4949

50+
void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, bool isAutoDropping = false);
51+
5052
} // NSchemeShard
5153
} // NKikimr

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2688,6 +2688,7 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> {
26882688
Done = 240,
26892689
Dropping = 241,
26902690
Dropped = 242,
2691+
AutoDropping = 243,
26912692
Cancellation = 250,
26922693
Cancelled = 251,
26932694
};
@@ -2805,12 +2806,16 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> {
28052806
return State == EState::Dropping;
28062807
}
28072808

2809+
bool IsAutoDropping() const {
2810+
return State == EState::AutoDropping;
2811+
}
2812+
28082813
bool IsCancelling() const {
28092814
return State == EState::Cancellation;
28102815
}
28112816

28122817
bool IsInProgress() const {
2813-
return IsPreparing() || IsWorking() || IsDropping() || IsCancelling();
2818+
return IsPreparing() || IsWorking() || IsDropping() || IsAutoDropping() || IsCancelling();
28142819
}
28152820

28162821
bool IsDone() const {

ydb/core/tx/schemeshard/ut_export/ut_export.cpp

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,10 +1231,24 @@ partitioning_settings {
12311231
}
12321232
)", port));
12331233
const ui64 exportId = txId;
1234+
::NKikimrSubDomains::TDiskSpaceUsage afterExport;
1235+
1236+
TTestActorRuntime::TEventObserver prevObserverFunc;
1237+
prevObserverFunc = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
1238+
if (auto* p = event->CastAsLocal<TEvSchemeShard::TEvModifySchemeTransaction>()) {
1239+
auto& record = p->Record;
1240+
if (record.TransactionSize() >= 1 &&
1241+
record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpDropTable) {
1242+
afterExport = waitForStats(2);
1243+
}
1244+
}
1245+
return prevObserverFunc(event);
1246+
});
1247+
12341248
env.TestWaitNotification(runtime, exportId);
12351249

12361250
TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
1237-
const auto afterExport = waitForStats(2);
1251+
12381252
UNIT_ASSERT_STRINGS_EQUAL(expected.DebugString(), afterExport.DebugString());
12391253

12401254
TestForgetExport(runtime, ++txId, "/MyRoot", exportId);
@@ -1250,13 +1264,23 @@ partitioning_settings {
12501264
TTestEnv env(runtime);
12511265
ui64 txId = 100;
12521266

1267+
TBlockEvents<NKikimr::NWrappers::NExternalStorage::TEvPutObjectRequest> blockPartition0(runtime, [](auto&& ev) {
1268+
return ev->Get()->Request.GetKey() == "/data_01.csv";
1269+
});
1270+
12531271
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
12541272
Name: "Table"
1255-
Columns { Name: "key" Type: "Utf8" }
1273+
Columns { Name: "key" Type: "Uint32" }
12561274
Columns { Name: "value" Type: "Utf8" }
12571275
KeyColumnNames: ["key"]
1276+
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 10 } }}}
12581277
)");
12591278
env.TestWaitNotification(runtime, txId);
1279+
1280+
WriteRow(runtime, ++txId, "/MyRoot/Table", 0, 1, "v1");
1281+
env.TestWaitNotification(runtime, txId);
1282+
WriteRow(runtime, ++txId, "/MyRoot/Table", 1, 100, "v100");
1283+
env.TestWaitNotification(runtime, txId);
12601284

12611285
TPortManager portManager;
12621286
const ui16 port = portManager.GetPort();
@@ -1274,17 +1298,35 @@ partitioning_settings {
12741298
}
12751299
}
12761300
)", port));
1301+
1302+
1303+
runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition0.size() >= 1; });
1304+
bool isCompleted = false;
1305+
1306+
while (!isCompleted) {
1307+
const auto desc = TestGetExport(runtime, txId, "/MyRoot");
1308+
const auto entry = desc.GetResponse().GetEntry();
1309+
const auto& item = entry.GetItemsProgress(0);
1310+
1311+
if (item.parts_completed() > 0) {
1312+
isCompleted = true;
1313+
UNIT_ASSERT_VALUES_EQUAL(item.parts_total(), 2);
1314+
UNIT_ASSERT_VALUES_EQUAL(item.parts_completed(), 1);
1315+
UNIT_ASSERT(item.has_start_time());
1316+
} else {
1317+
runtime.SimulateSleep(TDuration::Seconds(1));
1318+
}
1319+
}
1320+
1321+
blockPartition0.Stop();
1322+
blockPartition0.Unblock();
1323+
12771324
env.TestWaitNotification(runtime, txId);
12781325

12791326
const auto desc = TestGetExport(runtime, txId, "/MyRoot");
1280-
const auto& entry = desc.GetResponse().GetEntry();
1281-
UNIT_ASSERT_VALUES_EQUAL(entry.ItemsProgressSize(), 1);
1327+
const auto entry = desc.GetResponse().GetEntry();
12821328

1283-
const auto& item = entry.GetItemsProgress(0);
1284-
UNIT_ASSERT_VALUES_EQUAL(item.parts_total(), 1);
1285-
UNIT_ASSERT_VALUES_EQUAL(item.parts_completed(), 1);
1286-
UNIT_ASSERT(item.has_start_time());
1287-
UNIT_ASSERT(item.has_end_time());
1329+
UNIT_ASSERT_VALUES_EQUAL(entry.ItemsProgressSize(), 1);
12881330
}
12891331

12901332
Y_UNIT_TEST(ShouldRestartOnScanErrors) {
@@ -2888,4 +2930,41 @@ attributes {
28882930
UNIT_ASSERT_C(ivs.insert(iv.GetBinaryString()).second, key);
28892931
}
28902932
}
2933+
2934+
Y_UNIT_TEST(AutoDropping) {
2935+
TTestBasicRuntime runtime;
2936+
2937+
TPortManager portManager;
2938+
const ui16 port = portManager.GetPort();
2939+
2940+
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
2941+
UNIT_ASSERT(s3Mock.Start());
2942+
2943+
2944+
auto request = Sprintf(R"(
2945+
ExportToS3Settings {
2946+
endpoint: "localhost:%d"
2947+
scheme: HTTP
2948+
items {
2949+
source_path: "/MyRoot/Table"
2950+
destination_prefix: ""
2951+
}
2952+
}
2953+
)", port);
2954+
2955+
TTestEnv env(runtime);
2956+
2957+
Run(runtime, env, TVector<TString>{
2958+
R"(
2959+
Name: "Table"
2960+
Columns { Name: "key" Type: "Utf8" }
2961+
Columns { Name: "value" Type: "Utf8" }
2962+
KeyColumnNames: ["key"]
2963+
)",
2964+
}, request, Ydb::StatusIds::SUCCESS, "/MyRoot");
2965+
2966+
auto desc = DescribePath(runtime, "/MyRoot");
2967+
UNIT_ASSERT_EQUAL(desc.GetPathDescription().ChildrenSize(), 1);
2968+
UNIT_ASSERT_EQUAL(desc.GetPathDescription().GetChildren(0).GetName(), "Table");
2969+
}
28912970
}

0 commit comments

Comments
 (0)