Skip to content

Commit 4ac9f23

Browse files
Shred sectors and track dirty in-memory #12483 (#14411)
1 parent 108ab9a commit 4ac9f23

14 files changed

+335
-20
lines changed

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,5 +404,17 @@ void TChunkTrimCompletion::Exec(TActorSystem *actorSystem) {
404404
delete this;
405405
}
406406

407+
void TChunkShredCompletion::Exec(TActorSystem *actorSystem) {
408+
LOG_TRACE_S(*actorSystem, NKikimrServices::BS_PDISK_SHRED,
409+
"PDiskId# " << PDisk->PCtx->PDiskId << " ReqId# " << ReqId
410+
<< " TChunkShredCompletion Chunk# " << Chunk
411+
<< " SectorIdx# " << SectorIdx
412+
<< " SizeBytes# " << SizeBytes);
413+
PDisk->Mon.ChunkShred.CountResponse();
414+
TChunkShredResult *shredResult = PDisk->ReqCreator.CreateFromArgs<TChunkShredResult>(Chunk, SectorIdx, SizeBytes);
415+
PDisk->InputRequest(shredResult);
416+
delete this;
417+
}
418+
407419
} // NPDisk
408420
} // NKikimr

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,30 @@ class TChunkTrimCompletion : public TCompletionAction {
359359
}
360360
};
361361

362+
class TChunkShredCompletion : public TCompletionAction {
363+
TPDisk *PDisk;
364+
TChunkIdx Chunk;
365+
ui32 SectorIdx;
366+
size_t SizeBytes;
367+
TReqId ReqId;
368+
369+
public:
370+
TChunkShredCompletion(TPDisk *pdisk, TChunkIdx chunk, ui32 sectorIdx, size_t sizeBytes, TReqId reqId)
371+
: PDisk(pdisk)
372+
, Chunk(chunk)
373+
, SectorIdx(sectorIdx)
374+
, SizeBytes(sizeBytes)
375+
, ReqId(reqId)
376+
{}
377+
378+
void Exec(TActorSystem *actorSystem) override;
379+
380+
void Release(TActorSystem *actorSystem) override {
381+
Y_UNUSED(actorSystem);
382+
delete this;
383+
}
384+
};
385+
362386
class TCompletionSequence : public TCompletionAction {
363387
TVector<TCompletionAction*> Actions;
364388

ydb/core/blobstorage/pdisk/blobstorage_pdisk_free_chunks.h

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,32 @@ class TFreeChunks {
5353
return idx;
5454
}
5555

56-
ui32 Size() const {
57-
return AtomicGet(FreeChunkCount);
56+
TDeque<TChunkIdx>::const_iterator begin() const {
57+
return FreeChunks.begin();
5858
}
59+
60+
TDeque<TChunkIdx>::const_iterator end() const {
61+
return FreeChunks.end();
62+
}
63+
64+
TChunkIdx PopAt(TDeque<TChunkIdx>::const_iterator it) {
65+
Y_VERIFY(it != FreeChunks.end());
66+
Y_VERIFY(FreeChunks.size() > 0);
67+
TChunkIdx idx = *it;
68+
FreeChunks.erase(it);
69+
AtomicDecrement(FreeChunkCount);
70+
MonFreeChunks->Dec();
71+
return idx;
72+
}
73+
74+
void PushFront(TChunkIdx idx) {
75+
FreeChunks.push_front(idx);
76+
AtomicIncrement(FreeChunkCount);
77+
MonFreeChunks->Inc();
78+
}
79+
80+
// A thread-safe function that returns the current number of free chunks.
81+
ui32 Size() const { return AtomicGet(FreeChunkCount); }
5982
};
6083

6184
} // NPDisk

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 163 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2577,6 +2577,9 @@ void TPDisk::ProcessFastOperationsQueue() {
25772577
case ERequestType::RequestMarkDirty:
25782578
ProcessMarkDirty(static_cast<TMarkDirty&>(*req));
25792579
break;
2580+
case ERequestType::RequestChunkShredResult:
2581+
ProcessChunkShredResult(static_cast<TChunkShredResult&>(*req));
2582+
break;
25802583
default:
25812584
Y_FAIL_S("Unexpected request type# " << TypeName(*req));
25822585
break;
@@ -3187,6 +3190,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
31873190
case ERequestType::RequestPreShredCompactVDiskResult:
31883191
case ERequestType::RequestShredVDiskResult:
31893192
case ERequestType::RequestMarkDirty:
3193+
case ERequestType::RequestChunkShredResult:
31903194
break;
31913195
case ERequestType::RequestStopDevice:
31923196
BlockDevice->Stop();
@@ -3883,6 +3887,7 @@ bool TPDisk::HandleReadOnlyIfWrite(TRequestBase *request) {
38833887
case ERequestType::RequestPreShredCompactVDiskResult:
38843888
case ERequestType::RequestShredVDiskResult:
38853889
case ERequestType::RequestMarkDirty:
3890+
case ERequestType::RequestChunkShredResult:
38863891
// These requests don't require response.
38873892
return true;
38883893
}
@@ -3913,8 +3918,31 @@ void TPDisk::AddCbsSet(ui32 ownerId) {
39133918
SchedulerConfigure(conf);
39143919
}
39153920

3921+
TChunkIdx TPDisk::GetUnshreddedFreeChunk() {
3922+
// Find a free unshredded chunk
3923+
for (TFreeChunks* freeChunks : {&Keeper.UntrimmedFreeChunks, &Keeper.TrimmedFreeChunks}) {
3924+
for (auto it = freeChunks->begin(); it != freeChunks->end(); ++it) {
3925+
TChunkIdx chunkIdx = *it;
3926+
TChunkState& state = ChunkState[chunkIdx];
3927+
// Look for free chunks that haven't been shredded in this generation
3928+
if (state.CommitState == TChunkState::FREE && state.IsDirty && state.ShredGeneration < ShredGeneration) {
3929+
// Found an unshredded free chunk
3930+
TChunkIdx unshreddedChunkIdx = freeChunks->PopAt(it);
3931+
Y_VERIFY(unshreddedChunkIdx == chunkIdx);
3932+
// Mark it as being shredded and update its generation
3933+
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
3934+
"PDisk# " << PCtx->PDiskId
3935+
<< " found unshredded free chunk# " << chunkIdx
3936+
<< " ShredGeneration# " << ShredGeneration);
3937+
return unshreddedChunkIdx;
3938+
}
3939+
}
3940+
}
3941+
return 0;
3942+
}
3943+
39163944
void TPDisk::ProgressShredState() {
3917-
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
3945+
LOG_TRACE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
39183946
"ProgressShredState at PDisk# " << PCtx->PDiskId
39193947
<< " ShredGeneration# " << ShredGeneration
39203948
<< " ShredState# " << (ui32)ShredState);
@@ -3959,9 +3987,80 @@ void TPDisk::ProgressShredState() {
39593987
<< " has finished all pre-shred compact VDisk requests"
39603988
<< " ShredGeneration# " << ShredGeneration
39613989
<< " finishedCount# " << finishedCount);
3990+
// All preparations are done, no junk chunks can be unmarked,
3991+
// Update chunk states and start shredding the empty space
3992+
for (TChunkIdx chunkIdx = 0; chunkIdx < ChunkState.size(); ++chunkIdx) {
3993+
TChunkState& state = ChunkState[chunkIdx];
3994+
// Update shred generation for all the clean chunks
3995+
if (!state.IsDirty) {
3996+
state.ShredGeneration = ShredGeneration;
3997+
}
3998+
}
39623999
ShredState = EShredStateSendShredVDisk;
39634000
}
39644001
if (ShredState == EShredStateSendShredVDisk) {
4002+
// Shred free space while possible
4003+
if (ChunkBeingShredded == 0) {
4004+
ChunkBeingShredded = GetUnshreddedFreeChunk();
4005+
}
4006+
if (ChunkBeingShredded != 0) {
4007+
// Continue shredding the free chunk
4008+
while (true) {
4009+
if (ChunkBeingShreddedInFlight >= 2) {
4010+
// We have enough in-flight requests, don't start a new one
4011+
return;
4012+
}
4013+
if (ChunkBeingShreddedNextSectorIdx * Format.SectorSize >= Format.ChunkSize) {
4014+
++ChunkBeingShreddedIteration;
4015+
ChunkBeingShreddedNextSectorIdx = 0;
4016+
}
4017+
if (ChunkBeingShreddedIteration >= 2) {
4018+
// We have enough iterations, don't start a new one, just wait for the in-flight requests to finish
4019+
if (ChunkBeingShreddedInFlight > 0) {
4020+
return;
4021+
}
4022+
// Done shredding the chunk, mark it clean and push it back to the free chunks
4023+
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED, "PDisk# " << PCtx->PDiskId
4024+
<< " is done shredding chunk ChunkBeingShredded# " << ChunkBeingShredded);
4025+
TChunkState &state = ChunkState[ChunkBeingShredded];
4026+
state.OperationsInProgress--;
4027+
state.IsDirty = false;
4028+
state.ShredGeneration = ShredGeneration;
4029+
Y_VERIFY(ChunkState[ChunkBeingShredded].OperationsInProgress == 0);
4030+
Keeper.UntrimmedFreeChunks.PushFront(ChunkBeingShredded);
4031+
ChunkBeingShredded = GetUnshreddedFreeChunk();
4032+
ChunkBeingShreddedIteration = 0;
4033+
ChunkBeingShreddedNextSectorIdx = 0;
4034+
}
4035+
if (ChunkBeingShredded) {
4036+
if (ChunkBeingShreddedIteration == 0 && ChunkBeingShreddedNextSectorIdx == 0) {
4037+
Y_VERIFY(ChunkState[ChunkBeingShredded].OperationsInProgress == 0);
4038+
ChunkState[ChunkBeingShredded].OperationsInProgress++;
4039+
}
4040+
// Continue shredding the chunk: send a write request to the device using the iteration-specific pattern
4041+
THolder<TAlignedData>& payload = ShredPayload[ChunkBeingShreddedIteration];
4042+
if (payload == nullptr) {
4043+
payload = MakeHolder<TAlignedData>(Format.RoundUpToSectorSize(2097152));
4044+
ui8* data = payload->Get();
4045+
memset(data, ChunkBeingShreddedIteration == 0 ? 0x55 : 0xaa, payload->Size());
4046+
}
4047+
ui64 size = std::min((ui64)Format.ChunkSize - ChunkBeingShreddedNextSectorIdx * Format.SectorSize, (ui64)payload->Size());
4048+
ui64 offset = Format.Offset(ChunkBeingShredded, ChunkBeingShreddedNextSectorIdx);
4049+
ui64 reqIdx = ShredReqIdx++;
4050+
TCompletionAction *completionAction = new TChunkShredCompletion(this, ChunkBeingShredded, ChunkBeingShreddedNextSectorIdx, size, TReqId(TReqId::ChunkShred, reqIdx));
4051+
++ChunkBeingShreddedInFlight;
4052+
ChunkBeingShreddedNextSectorIdx += size / Format.SectorSize;
4053+
Mon.ChunkShred.CountRequest(size);
4054+
BlockDevice->PwriteAsync(payload->Get(), size, offset, completionAction,
4055+
TReqId(TReqId::ChunkShred, reqIdx), {});
4056+
return;
4057+
}
4058+
break;
4059+
}
4060+
}
4061+
4062+
// If there are no free chunks unshredded, we should ask a vdisk to shred its free space
4063+
ui32 shreddedFreeChunks = Keeper.GetFreeChunkCount();
39654064
ui32 finishedCount = 0;
39664065
for (ui32 ownerId = 0; ownerId < OwnerData.size(); ++ownerId) {
39674066
TOwnerData &data = OwnerData[ownerId];
@@ -3971,22 +4070,31 @@ void TPDisk::ProgressShredState() {
39714070
} else if (data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED
39724071
&& data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED) {
39734072
std::vector<TChunkIdx> chunksToShred;
3974-
chunksToShred.reserve(ChunkState.size());
4073+
chunksToShred.reserve(shreddedFreeChunks/2);
39754074
for (TChunkIdx chunkIdx = 0; chunkIdx < ChunkState.size(); ++chunkIdx) {
3976-
if (ChunkState[chunkIdx].OwnerId == ownerId) {
3977-
// TODO(cthulhu): check if chunk is dirty
4075+
TChunkState& state = ChunkState[chunkIdx];
4076+
// We need to shred only chunks that got dirty before the current shred generation
4077+
if (state.OwnerId == ownerId && state.IsDirty && state.ShredGeneration < ShredGeneration) {
39784078
chunksToShred.push_back(chunkIdx);
4079+
if (chunksToShred.size() >= shreddedFreeChunks/2) {
4080+
break;
4081+
}
39794082
}
39804083
}
3981-
THolder<TEvShredVDisk> shredRequest(new TEvShredVDisk(ShredGeneration, chunksToShred));
3982-
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
3983-
"PDisk# " << PCtx->PDiskId
3984-
<< " sends shred request to VDisk# " << data.VDiskId
3985-
<< " ownerId# " << ownerId
3986-
<< " request# " << shredRequest->ToString());
3987-
PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release()));
3988-
data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED;
3989-
data.LastShredGeneration = ShredGeneration;
4084+
if (chunksToShred.size() > 0) {
4085+
THolder<TEvShredVDisk> shredRequest(new TEvShredVDisk(ShredGeneration, chunksToShred));
4086+
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
4087+
"PDisk# " << PCtx->PDiskId
4088+
<< " sends shred request to VDisk# " << data.VDiskId
4089+
<< " ownerId# " << ownerId
4090+
<< " request# " << shredRequest->ToString());
4091+
PCtx->ActorSystem->Send(new IEventHandle(data.CutLogId, PCtx->PDiskActor, shredRequest.Release()));
4092+
data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_REQUESTED;
4093+
data.LastShredGeneration = ShredGeneration;
4094+
} else {
4095+
data.ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED;
4096+
data.LastShredGeneration = ShredGeneration;
4097+
}
39904098
}
39914099
if (data.ShredState != TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED) {
39924100
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
@@ -4170,12 +4278,52 @@ void TPDisk::ProcessShredVDiskResult(TShredVDiskResult& request) {
41704278
ShredRequesters.clear();
41714279
return;
41724280
}
4173-
OwnerData[request.Owner].ShredState = TOwnerData::VDISK_SHRED_STATE_SHRED_FINISHED;
4281+
OwnerData[request.Owner].ShredState = TOwnerData::VDISK_SHRED_STATE_COMPACT_FINISHED;
41744282
ProgressShredState();
41754283
}
41764284

41774285
void TPDisk::ProcessMarkDirty(TMarkDirty& request) {
4178-
Y_UNUSED(request);
4286+
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
4287+
"ProcessMarkDirty at PDisk# " << PCtx->PDiskId
4288+
<< " ShredGeneration# " << ShredGeneration
4289+
<< " request# " << request.ToString());
4290+
{
4291+
bool isLogged = false;
4292+
ui64 markedDirty = 0;
4293+
TGuard<TMutex> guard(StateMutex);
4294+
for (auto chunkIdx : request.ChunksToMarkDirty) {
4295+
if (chunkIdx >= ChunkState.size()) {
4296+
if (!isLogged) {
4297+
isLogged = true;
4298+
LOG_CRIT_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
4299+
"MarkDirty contains invalid chunkIdx# " << chunkIdx << " for PDisk# " << PCtx->PDiskId
4300+
<< " ShredGeneration# " << ShredGeneration << " request# " << request.ToString());
4301+
}
4302+
} else {
4303+
if (!ChunkState[chunkIdx].IsDirty) {
4304+
ChunkState[chunkIdx].IsDirty = true;
4305+
markedDirty++;
4306+
LOG_DEBUG_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
4307+
"PDisk# " << PCtx->PDiskId << " marked chunkIdx# " << chunkIdx << " as dirty"
4308+
<< " chunk.ShredGeneration# " << ChunkState[chunkIdx].ShredGeneration
4309+
<< " ShredGeneration# " << ShredGeneration);
4310+
}
4311+
}
4312+
}
4313+
if (markedDirty > 0) {
4314+
// TODO(cthulhu): save dirty chunks to syslog
4315+
}
4316+
}
4317+
}
4318+
4319+
void TPDisk::ProcessChunkShredResult(TChunkShredResult& request) {
4320+
LOG_TRACE_S(*PCtx->ActorSystem, NKikimrServices::BS_PDISK_SHRED,
4321+
"ProcessChunkShredResult at PDisk# " << PCtx->PDiskId
4322+
<< " ShredGeneration# " << ShredGeneration
4323+
<< " request# " << request.ToString());
4324+
Y_ABORT_UNLESS(ChunkBeingShreddedInFlight > 0);
4325+
--ChunkBeingShreddedInFlight;
4326+
ProgressShredState();
41794327
}
41804328

41814329
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,13 @@ class TPDisk : public IPDisk {
157157
};
158158
EShredState ShredState = EShredStateDefault;
159159
ui64 ShredGeneration = 0;
160+
TChunkIdx ChunkBeingShredded = 0;
161+
ui64 ChunkBeingShreddedIteration = 0;
162+
ui64 ChunkBeingShreddedNextSectorIdx = 0;
163+
ui64 ShredReqIdx = 0;
164+
std::atomic<ui64> ChunkBeingShreddedInFlight = 0;
160165
std::deque<std::tuple<TActorId, ui64>> ShredRequesters;
166+
THolder<TAlignedData> ShredPayload[2];
161167

162168
// Chunks that are owned by killed owner, but have operations InFlight
163169
TVector<TChunkIdx> QuarantineChunks;
@@ -399,11 +405,13 @@ class TPDisk : public IPDisk {
399405
void HandleNextWriteMetadata();
400406
void ProcessWriteMetadataResult(TWriteMetadataResult& request);
401407

408+
TChunkIdx GetUnshreddedFreeChunk();
402409
void ProgressShredState();
403410
void ProcessShredPDisk(TShredPDisk& request);
404411
void ProcessPreShredCompactVDiskResult(TPreShredCompactVDiskResult& request);
405412
void ProcessShredVDiskResult(TShredVDiskResult& request);
406413
void ProcessMarkDirty(TMarkDirty& request);
414+
void ProcessChunkShredResult(TChunkShredResult& request);
407415

408416
void DropAllMetadataRequests();
409417

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,10 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo
10381038
if (logWrite.CommitRecord.DeleteToDecommitted) {
10391039
for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) {
10401040
TChunkState& state = ChunkState[chunkIdx];
1041+
if (!state.IsDirty) {
1042+
// TODO(cthulhu): log that chunk got dirty
1043+
state.IsDirty = true;
1044+
}
10411045
switch (state.CommitState) {
10421046
case TChunkState::DATA_RESERVED:
10431047
state.CommitState = TChunkState::DATA_RESERVED_DECOMMIT_IN_PROGRESS;
@@ -1057,6 +1061,10 @@ NKikimrProto::EReplyStatus TPDisk::BeforeLoggingCommitRecord(const TLogWrite &lo
10571061
} else {
10581062
for (ui32 chunkIdx : logWrite.CommitRecord.DeleteChunks) {
10591063
TChunkState& state = ChunkState[chunkIdx];
1064+
if (!state.IsDirty) {
1065+
// TODO(cthulhu): log that chunk got dirty
1066+
state.IsDirty = true;
1067+
}
10601068
if (state.HasAnyOperationsInProgress()) {
10611069
switch (state.CommitState) {
10621070
case TChunkState::DATA_RESERVED:

ydb/core/blobstorage/pdisk/blobstorage_pdisk_keeper.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ class TKeeper {
2626
TFreeChunks TrimmedFreeChunks; // Trimmed free chunk list for fast allocation
2727

2828
TChunkTracker ChunkTracker;
29+
30+
friend class TPDisk;
2931
public:
3032

3133
TKeeper(TPDiskMon &mon, TIntrusivePtr<TPDiskConfig> cfg)

ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ TPDiskMon::TPDiskMon(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& count
235235
IO_REQ_INIT(PDiskGroup, WriteHuge, WriteHuge);
236236
IO_REQ_INIT(PDiskGroup, WriteComp, WriteComp);
237237
IO_REQ_INIT(PDiskGroup, Trim, WriteTrim);
238+
IO_REQ_INIT(PDiskGroup, ChunkShred, WriteShred);
238239

239240
IO_REQ_INIT_IF_EXTENDED(PDiskGroup, ReadSyncLog, ReadSyncLog);
240241
IO_REQ_INIT_IF_EXTENDED(PDiskGroup, ReadComp, ReadComp);

ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,7 @@ struct TPDiskMon {
479479
TIoCounters WriteHuge;
480480
TIoCounters WriteComp;
481481
TIoCounters Trim;
482-
482+
TIoCounters ChunkShred;
483483
TIoCounters ReadSyncLog;
484484
TIoCounters ReadComp;
485485
TIoCounters ReadOnlineRt;

0 commit comments

Comments
 (0)