Skip to content

Commit f692e8c

Browse files
authored
Add cleanup request for KV tablet (#14440)
1 parent dc604a5 commit f692e8c

11 files changed

+555
-79
lines changed

ydb/core/keyvalue/keyvalue_events.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct TEvKeyValue {
2424
EvReportWriteLatency,
2525
EvUpdateWeights,
2626
EvCompleteGC,
27+
EvCleanUpDataRequest,
2728

2829
EvRead = EvRequest + 16,
2930
EvReadRange,
@@ -32,6 +33,8 @@ struct TEvKeyValue {
3233
EvAcquireLock,
3334

3435
EvResponse = EvRequest + 512,
36+
EvForceTabletDataCleanup,
37+
EvCleanUpDataResponse,
3538

3639
EvReadResponse = EvResponse + 16,
3740
EvReadRangeResponse,
@@ -197,6 +200,59 @@ struct TEvKeyValue {
197200
: Repeat(repeat)
198201
{}
199202
};
203+
204+
struct TEvCleanUpDataResponse;
205+
206+
struct TEvCleanUpDataRequest : public TEventPB<TEvCleanUpDataRequest,
207+
NKikimrKeyValue::CleanUpDataRequest, EvCleanUpDataRequest> {
208+
using TResponse = TEvCleanUpDataResponse;
209+
210+
TEvCleanUpDataRequest() = default;
211+
212+
TEvCleanUpDataRequest(ui64 generation, bool reset=false) {
213+
Record.set_generation(generation);
214+
Record.set_reset_actual_generation(reset);
215+
}
216+
};
217+
218+
struct TEvCleanUpDataResponse : public TEventPB<TEvCleanUpDataResponse,
219+
NKikimrKeyValue::CleanUpDataResponse, EvCleanUpDataResponse> {
220+
using TRequest = TEvCleanUpDataRequest;
221+
222+
TEvCleanUpDataResponse() = default;
223+
224+
TEvCleanUpDataResponse(ui64 generation, NKikimrKeyValue::CleanUpDataResponse::Status status, const TString& errorReason, ui64 actualGeneration) {
225+
Record.set_generation(generation);
226+
Record.set_status(status);
227+
Record.set_error_reason(errorReason);
228+
Record.set_actual_generation(actualGeneration);
229+
}
230+
231+
static std::unique_ptr<TEvCleanUpDataResponse> MakeSuccess(ui64 generation) {
232+
return std::make_unique<TEvCleanUpDataResponse>(generation, NKikimrKeyValue::CleanUpDataResponse::STATUS_SUCCESS, "", generation);
233+
}
234+
235+
static std::unique_ptr<TEvCleanUpDataResponse> MakeAborted(ui64 generation, const TString& errorReason, ui64 actualGeneration) {
236+
return std::make_unique<TEvCleanUpDataResponse>(generation, NKikimrKeyValue::CleanUpDataResponse::STATUS_ABORTED, errorReason, actualGeneration);
237+
}
238+
239+
static std::unique_ptr<TEvCleanUpDataResponse> MakeAlreadyCompleted(ui64 generation, ui64 actualGeneration) {
240+
return std::make_unique<TEvCleanUpDataResponse>(generation, NKikimrKeyValue::CleanUpDataResponse::STATUS_ALREADY_COMPLETED, "", actualGeneration);
241+
}
242+
243+
static std::unique_ptr<TEvCleanUpDataResponse> MakeError(ui64 generation, const TString& errorReason, ui64 actualGeneration) {
244+
return std::make_unique<TEvCleanUpDataResponse>(generation, NKikimrKeyValue::CleanUpDataResponse::STATUS_ERROR, errorReason, actualGeneration);
245+
}
246+
};
247+
248+
struct TEvForceTabletDataCleanup : public TEventLocal<TEvForceTabletDataCleanup, EvForceTabletDataCleanup> {
249+
ui64 Generation;
250+
251+
TEvForceTabletDataCleanup(ui64 generation)
252+
: Generation(generation)
253+
{}
254+
};
255+
200256
};
201257

202258
} // NKikimr

ydb/core/keyvalue/keyvalue_flat_impl.h

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,68 @@ class TKeyValueFlat : public TActor<TKeyValueFlat>, public NTabletFlatExecutor::
246246
}
247247
};
248248

249+
struct TTxResetCleanupGeneration : public NTabletFlatExecutor::ITransaction {
250+
TKeyValueFlat *Self;
251+
ui64 Generation;
252+
TActorId Sender;
253+
TVector<TLogoBlobID> TrashBeingCommitted;
254+
255+
TTxResetCleanupGeneration(TKeyValueFlat *keyValueFlat, ui64 generation, TActorId sender)
256+
: Self(keyValueFlat)
257+
, Generation(generation)
258+
, Sender(sender)
259+
{}
260+
261+
bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext& /*ctx*/) override {
262+
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet << " TTxResetCleanupGeneration Execute");
263+
TSimpleDbFlat db(txc.DB, TrashBeingCommitted);
264+
Self->State.UpdateCleanupGeneration(db, Generation - 1);
265+
return true;
266+
}
267+
268+
void Complete(const TActorContext &ctx) override {
269+
Self->State.ResetCleanupGeneration(ctx, Generation - 1);
270+
Self->State.StartCleanupData(Generation, Sender);
271+
}
272+
};
273+
274+
struct TTxCompleteCleanupData : public NTabletFlatExecutor::ITransaction {
275+
TKeyValueFlat *Self;
276+
ui64 CleanupResetGeneration;
277+
TVector<TLogoBlobID> TrashBeingCommitted;
278+
ui64 CleanupGeneration;
279+
280+
TTxCompleteCleanupData(TKeyValueFlat *keyValueFlat, ui64 cleanupResetGeneration, ui64 cleanupGeneration)
281+
: Self(keyValueFlat)
282+
, CleanupResetGeneration(cleanupResetGeneration)
283+
, CleanupGeneration(cleanupGeneration)
284+
{}
285+
286+
bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
287+
ui64 actualCleanupResetGeneration = Self->State.GetCleanupResetGeneration();
288+
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << txc.Tablet
289+
<< " TTxCompleteCleanupData Execute cleanupResetGeneration# " << CleanupResetGeneration
290+
<< " actualCleanupResetGeneration# " << actualCleanupResetGeneration
291+
<< " CleanupGeneration# " << CleanupGeneration);
292+
if (CleanupResetGeneration == actualCleanupResetGeneration) {
293+
TSimpleDbFlat db(txc.DB, TrashBeingCommitted);
294+
Self->State.CompleteCleanupDataExecute(db, ctx, CleanupGeneration);
295+
}
296+
return true;
297+
}
298+
299+
void Complete(const TActorContext &ctx) override {
300+
ui64 actualCleanupResetGeneration = Self->State.GetCleanupResetGeneration();
301+
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << Self->TabletID()
302+
<< " TTxCompleteCleanupData Complete cleanupResetGeneration# " << CleanupResetGeneration
303+
<< " actualCleanupResetGeneration# " << actualCleanupResetGeneration
304+
<< " CleanupGeneration# " << CleanupGeneration);
305+
if (CleanupResetGeneration == actualCleanupResetGeneration) {
306+
Self->State.CompleteCleanupDataComplete(ctx, Self->Info(), CleanupGeneration);
307+
}
308+
}
309+
};
310+
249311
using TExecuteMethod = void (TKeyValueState::*)(ISimpleDb &db, const TActorContext &ctx);
250312
using TCompleteMethod = void (TKeyValueState::*)(const TActorContext &ctx, const TTabletStorageInfo *info);
251313

@@ -486,6 +548,27 @@ class TKeyValueFlat : public TActor<TKeyValueFlat>, public NTabletFlatExecutor::
486548
SetActivityType(NKikimrServices::TActivity::KEYVALUE_ACTOR);
487549
}
488550

551+
void Handle(TEvKeyValue::TEvCleanUpDataRequest::TPtr &ev) {
552+
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
553+
<< " Handle TEvCleanUpDataRequest " << ev->Get()->ToString());
554+
ui64 generation = ev->Get()->Record.generation();
555+
if (generation == 0) {
556+
Send(ev->Sender, TEvKeyValue::TEvCleanUpDataResponse::MakeError(generation, "generation can't be 0", generation));
557+
return;
558+
}
559+
if (ev->Get()->Record.reset_actual_generation()) {
560+
Execute(new TTxResetCleanupGeneration(this, generation, ev->Sender));
561+
} else {
562+
State.StartCleanupData(generation, ev->Sender);
563+
}
564+
}
565+
566+
void Handle(TEvKeyValue::TEvForceTabletDataCleanup::TPtr &ev) {
567+
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
568+
<< " Handle TEvForceTabletDataCleanup generation# " << ev->Get()->Generation);
569+
Executor()->CleanupData(ev->Get()->Generation);
570+
}
571+
489572
public:
490573
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
491574
return NKikimrServices::TActivity::KEYVALUE_ACTOR;
@@ -526,6 +609,12 @@ class TKeyValueFlat : public TActor<TKeyValueFlat>, public NTabletFlatExecutor::
526609
return false;
527610
}
528611

612+
void DataCleanupComplete(ui64 dataCleanupGeneration, const TActorContext &ctx) override {
613+
STLOG(NLog::PRI_DEBUG, NKikimrServices::KEYVALUE_GC, KV271, "DataCleanupComplete",
614+
(TabletId, TabletID()));
615+
Execute(new TTxCompleteCleanupData(this, State.GetCleanupResetGeneration(), dataCleanupGeneration), ctx);
616+
}
617+
529618
STFUNC(StateInit) {
530619
RestoreActorActivity();
531620
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()
@@ -555,6 +644,8 @@ class TKeyValueFlat : public TActor<TKeyValueFlat>, public NTabletFlatExecutor::
555644
HFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
556645
HFunc(TEvents::TEvPoisonPill, Handle);
557646

647+
hFunc(TEvKeyValue::TEvCleanUpDataRequest, Handle);
648+
hFunc(TEvKeyValue::TEvForceTabletDataCleanup, Handle);
558649
default:
559650
if (!HandleDefaultEvents(ev, SelfId())) {
560651
ALOG_DEBUG(NKikimrServices::KEYVALUE, "KeyValue# " << TabletID()

ydb/core/keyvalue/keyvalue_helpers.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,13 @@ void THelpers::DbEraseCollect(ISimpleDb &db) {
9090
db.Erase(key);
9191
}
9292

93+
void THelpers::DbUpdateCleanUpGeneration(ui64 generation, ISimpleDb &db) {
94+
TString key = THelpers::GenerateKeyFor(EIT_CLEAN_UP_GENERATION, nullptr, 0);
95+
TString value = TString::Uninitialized(sizeof(generation));
96+
memcpy(const_cast<char*>(value.data()), &generation, sizeof(generation));
97+
db.Update(key, value);
98+
}
99+
93100
THelpers::TGenerationStep THelpers::GenerationStep(const TLogoBlobID &id) {
94101
return std::make_tuple(id.Generation(), id.Step());
95102
}

ydb/core/keyvalue/keyvalue_helpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ struct THelpers {
2424
static void DbEraseTrash(const TLogoBlobID &id, ISimpleDb &db);
2525
static void DbUpdateTrash(const TLogoBlobID &id, ISimpleDb &db);
2626
static void DbEraseCollect(ISimpleDb &db);
27+
static void DbUpdateCleanUpGeneration(ui64 generation, ISimpleDb &db);
2728

2829
using TGenerationStep = std::tuple<ui32, ui32>;
2930
static TGenerationStep GenerationStep(const TLogoBlobID &id);

ydb/core/keyvalue/keyvalue_item_type.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ enum EItemType {
1212
EIT_COLLECT = 4,
1313
EIT_STATE = 5,
1414
EIT_KEYVALUE_2 = 6,
15-
EIT_END = 7
15+
EIT_CLEAN_UP_GENERATION = 7,
16+
EIT_END = 8
1617
};
1718

1819
} // NKeyValue

ydb/core/keyvalue/keyvalue_state.cpp

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ void TKeyValueState::Clear() {
8888
NextLogoBlobCookie = 1;
8989
Index.clear();
9090
RefCounts.clear();
91+
CompletedCleanupGeneration = 0;
92+
CompletedCleanupTrashGeneration = 0;
9193
Trash.clear();
94+
TrashForCleanup.clear();
9295
InFlightForStep.clear();
9396
CollectOperation.Reset(nullptr);
9497
IsCollectEventSent = false;
@@ -483,7 +486,7 @@ void TKeyValueState::Load(const TString &key, const TString& value) {
483486
Y_ABORT_UNLESS(value.size() == 0);
484487
Y_ABORT_UNLESS(arbitraryPart.size() == sizeof(TTrashKeyArbitrary));
485488
const TTrashKeyArbitrary *trashKey = (const TTrashKeyArbitrary *) arbitraryPart.data();
486-
Trash.insert(trashKey->LogoBlobId);
489+
GetCurrentTrashBin().insert(trashKey->LogoBlobId);
487490
TotalTrashSize += trashKey->LogoBlobId.BlobSize();
488491
CountInitialTrashRecord(trashKey->LogoBlobId);
489492
break;
@@ -504,6 +507,12 @@ void TKeyValueState::Load(const TString &key, const TString& value) {
504507
StoredState = *data;
505508
break;
506509
}
510+
case EIT_CLEAN_UP_GENERATION: {
511+
Y_ABORT_UNLESS(value.size() == sizeof(ui64));
512+
CompletedCleanupGeneration = *(const ui64 *) value.data();
513+
CompletedCleanupTrashGeneration = CompletedCleanupGeneration;
514+
break;
515+
}
507516
default: {
508517
Y_ABORT_UNLESS(false, "Unexcpected header.ItemType# %" PRIu32, (ui32)header.ItemType);
509518
break;
@@ -717,6 +726,11 @@ void TKeyValueState::SendCutHistory(const TActorContext &ctx, const TTabletStora
717726
for (const TLogoBlobID& id : Trash) {
718727
usedBlob(id);
719728
}
729+
for (const auto& [_, bin] : TrashForCleanup) {
730+
for (const TLogoBlobID& id : bin) {
731+
usedBlob(id);
732+
}
733+
}
720734

721735
for (const auto& [channel, fromGeneration] : uselessItems) {
722736
TAutoPtr<TEvTablet::TEvCutTabletHistory> ev(new TEvTablet::TEvCutTabletHistory);
@@ -1389,16 +1403,27 @@ void TKeyValueState::CmdTrimLeakedBlobs(THolder<TIntermediate>& intermediate, IS
13891403
auto it = RefCounts.find(id);
13901404
if (it != RefCounts.end()) {
13911405
Y_ABORT_UNLESS(it->second != 0);
1392-
} else if (!Trash.count(id)) { // we found a candidate for trash
1393-
if (numItems < intermediate->TrimLeakedBlobs->MaxItemsToTrim) {
1394-
ALOG_WARN(NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " trimming " << id);
1395-
Trash.insert(id);
1396-
TotalTrashSize += id.BlobSize();
1397-
CountUncommittedTrashRecord(id);
1398-
THelpers::DbUpdateTrash(id, db);
1399-
++numItems;
1400-
} else {
1401-
++numUntrimmed;
1406+
} else {
1407+
bool found = Trash.count(id);
1408+
if (!found) {
1409+
for (const auto& [_, bin] : TrashForCleanup) {
1410+
if (bin.count(id)) {
1411+
found = true;
1412+
break;
1413+
}
1414+
}
1415+
}
1416+
if (!found) { // we found a candidate for trash
1417+
if (numItems < intermediate->TrimLeakedBlobs->MaxItemsToTrim) {
1418+
ALOG_WARN(NKikimrServices::KEYVALUE, "KeyValue# " << TabletId << " trimming " << id);
1419+
GetCurrentTrashBin().insert(id);
1420+
TotalTrashSize += id.BlobSize();
1421+
CountUncommittedTrashRecord(id);
1422+
THelpers::DbUpdateTrash(id, db);
1423+
++numItems;
1424+
} else {
1425+
++numUntrimmed;
1426+
}
14021427
}
14031428
}
14041429
}
@@ -1748,7 +1773,7 @@ void TKeyValueState::Dereference(const TLogoBlobID& id, ISimpleDb& db, bool init
17481773
}
17491774

17501775
void TKeyValueState::PushTrashBeingCommitted(TVector<TLogoBlobID>& trashBeingCommitted, const TActorContext& ctx) {
1751-
Trash.insert(trashBeingCommitted.begin(), trashBeingCommitted.end());
1776+
GetCurrentTrashBin().insert(trashBeingCommitted.begin(), trashBeingCommitted.end());
17521777
for (const TLogoBlobID& id : trashBeingCommitted) {
17531778
CountTrashCommitted(id);
17541779
}
@@ -3612,13 +3637,31 @@ void TKeyValueState::RenderHTMLPage(IOutputStream &out) const {
36123637
}
36133638
}
36143639
TABLEBODY() {
3640+
bool first = true;
36153641
ui64 idx = 1;
3616-
for (auto it = Trash.begin(); it != Trash.end(); ++it) {
3617-
TABLER() {
3618-
TABLED() {out << idx;}
3619-
++idx;
3620-
TABLED() {out << *it;}
3642+
auto printTrashBin = [&](const auto& bin) {
3643+
if (first) {
3644+
first = false;
3645+
} else {
3646+
TABLER() {
3647+
TABLED() {out << "---";}
3648+
TABLED() {out << "---";}
3649+
}
36213650
}
3651+
3652+
for (auto it = bin.begin(); it != bin.end(); ++it) {
3653+
TABLER() {
3654+
TABLED() {out << idx;}
3655+
++idx;
3656+
TABLED() {out << *it;}
3657+
}
3658+
}
3659+
};
3660+
for (const auto& [generation, bin] : TrashForCleanup) {
3661+
printTrashBin(bin);
3662+
}
3663+
if (!Trash.empty()) {
3664+
printTrashBin(Trash);
36223665
}
36233666
}
36243667
}

0 commit comments

Comments
 (0)