Skip to content

Commit ba01fbb

Browse files
va-kuznecovVlad KuznecovSammyVimes
authored
Execute LogWrite and ChunkWrite from GetEvents thread in PDisk (#8984)
Co-authored-by: Vlad Kuznecov <va-kuznecov@nebius.com> Co-authored-by: Semyon Danilov <samvimes@yandex.ru>
1 parent 1960e5f commit ba01fbb

File tree

3 files changed

+30
-3
lines changed

3 files changed

+30
-3
lines changed

ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,25 @@ class TRealBlockDevice : public IBlockDevice {
368368
}
369369
}
370370

371+
void ExecuteOrScheduleCompletion(TCompletionAction *action) {
372+
if (action->ShouldBeExecutedInCompletionThread) {
373+
Device.CompletionThread->Schedule(action);
374+
} else {
375+
if (action->CanHandleResult()) {
376+
action->Exec(Device.PCtx->ActorSystem);
377+
} else {
378+
TString errorReason = action->ErrorReason;
379+
380+
action->Release(Device.PCtx->ActorSystem);
381+
382+
if (!Device.QuitCounter.IsBlocked()) {
383+
Device.BecomeErrorState(TStringBuilder()
384+
<< " CompletionAction error, operation info# " << errorReason);
385+
}
386+
}
387+
}
388+
}
389+
371390
void Exec(TAsyncIoOperationResult *event) {
372391
IAsyncIoOperation *op = event->Operation;
373392
// Add up the execution time of all the events
@@ -425,7 +444,7 @@ class TRealBlockDevice : public IBlockDevice {
425444
WaitingNoops[idx % MaxWaitingNoops] = completionAction->FlushAction;
426445
completionAction->FlushAction = nullptr;
427446
}
428-
Device.CompletionThread->Schedule(completionAction);
447+
ExecuteOrScheduleCompletion(completionAction);
429448
auto seqnoL6 = AtomicGetAndIncrement(Device.Mon.SeqnoL6);
430449
Device.Mon.L6.Set(duration > Device.Reordering, seqnoL6);
431450
}
@@ -443,7 +462,7 @@ class TRealBlockDevice : public IBlockDevice {
443462
LWTRACK(PDiskDeviceGetFromWaiting, WaitingNoops[i]->Orbit);
444463
double durationMs = HPMilliSecondsFloat(HPNow() - WaitingNoops[i]->GetTime);
445464
Device.Mon.DeviceFlushDuration.Increment(durationMs);
446-
Device.CompletionThread->Schedule(WaitingNoops[i]);
465+
ExecuteOrScheduleCompletion(WaitingNoops[i]);
447466
WaitingNoops[i] = nullptr;
448467
}
449468
++NextPossibleNoop;

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ struct TCompletionAction {
1818
NWilson::TTraceId TraceId;
1919
EIoResult Result = EIoResult::Unknown;
2020
TString ErrorReason;
21+
// Only reads should be executed in a separate thread since their completions consist of
22+
// time-consuming deciphering of read data. But currently some completion actions can write
23+
// to BlockDevice from Exec() and it's more safe to use WhiteList to allow only
24+
// LogWrite and ChunkWrite to be executed from GetThread
25+
bool ShouldBeExecutedInCompletionThread = true;
2126

2227
mutable NLWTrace::TOrbit Orbit;
2328
protected:

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ class TCompletionChunkWrite : public TCompletionAction {
9393
, ReqId(reqId)
9494
, Span(std::move(span))
9595
{
96+
TCompletionAction::ShouldBeExecutedInCompletionThread = false;
9697
}
9798

9899
~TCompletionChunkWrite() {
@@ -144,7 +145,9 @@ class TCompletionLogWrite : public TCompletionAction {
144145
, LogWriteQueue(std::move(logWriteQueue))
145146
, Commits(std::move(commits))
146147
, CommitedLogChunks(std::move(commitedLogChunks))
147-
{}
148+
{
149+
TCompletionAction::ShouldBeExecutedInCompletionThread = false;
150+
}
148151

149152
TVector<ui32>* GetCommitedLogChunksPtr() {
150153
return &CommitedLogChunks;

0 commit comments

Comments
 (0)