Skip to content

Commit 9213a37

Browse files
va-kuznecovVlad Kuznecov
andauthored
Limit PDisk's batches and improve LWTrace around PDisk's event loop (#9099)
Co-authored-by: Vlad Kuznecov <va-kuznecov@nebius.com>
1 parent 5c36245 commit 9213a37

File tree

6 files changed

+38
-8
lines changed

6 files changed

+38
-8
lines changed

ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,8 @@ struct TEventTypeField {
221221
TYPES(TPDiskIdField, ui64, ui64, ui64, ui64), \
222222
NAMES("pdisk", "owner", "chunkIdx", "pieceOffset", "pieceSize")) \
223223
PROBE(PDiskLogWriteComplete, GROUPS("PDisk", "PDiskRequest"), \
224-
TYPES(TPDiskIdField, ui64, double, double, double, double, double, double, double), \
225-
NAMES("pdisk", "reqId", "creationTimeSec", "costMs", "responseTimeMs", "inputTimeMs", "scheduleTimeMs", "deviceTotalTimeMs", "deviceOnlyTimeMs")) \
224+
TYPES(TPDiskIdField, ui64, double, double, double, double, double, double, double, ui64), \
225+
NAMES("pdisk", "reqId", "creationTimeSec", "costMs", "responseTimeMs", "inputTimeMs", "scheduleTimeMs", "deviceTotalTimeMs", "deviceOnlyTimeMs", "batchSize")) \
226226
PROBE(PDiskChunkResponseTime, GROUPS("PDisk", "PDiskRequest"), \
227227
TYPES(TPDiskIdField, ui64, ui64, double, ui64), \
228228
NAMES("pdisk", "reqId", "priorityClass", "responseTimeMs", "sizeBytes")) \
@@ -261,8 +261,11 @@ struct TEventTypeField {
261261
TYPES(ui32, ui64, ui64), \
262262
NAMES("chunkIdx", "size", "offset")) \
263263
PROBE(PDiskUpdateCycleDetails, GROUPS("PDisk"), \
264-
TYPES(float, float, float, float, float), \
265-
NAMES("entireUpdateMs", "inputQueueMs", "schedulingMs", "processingMs", "waitingMs")) \
264+
TYPES(ui32, float, float, float, float, float), \
265+
NAMES("pdisk", "entireUpdateMs", "inputQueueMs", "schedulingMs", "processingMs", "waitingMs")) \
266+
PROBE(PDiskEnqueueAllDetails, GROUPS("PDisk"), \
267+
TYPES(ui64, size_t, size_t, size_t, double), \
268+
NAMES("pdisk", "initialQueueSize", "processedReqs", "pushedToForsetiReqs", "spentTimeMs")) \
266269
PROBE(DSProxyGetEnqueue, GROUPS("DSProxy", "LWTrackStart"), TYPES(), NAMES()) \
267270
PROBE(DSProxyGetBootstrap, GROUPS("DSProxy"), TYPES(), NAMES()) \
268271
PROBE(DSProxyGetHandle, GROUPS("DSProxy", "LWTrackStart"), TYPES(), NAMES()) \

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#include "blobstorage_pdisk_impl.h"
33
#include "blobstorage_pdisk_sectorrestorator.h"
44

5+
constexpr size_t MAX_RESULTS_PER_BATCH = 50; // It took ~0.25ms in VDisk's handler to process such batch
6+
57
namespace NKikimr {
68
namespace NPDisk {
79

@@ -54,7 +56,8 @@ void TCompletionLogWrite::Exec(TActorSystem *actorSystem) {
5456
HPMilliSecondsFloat(evLog.InputTime - evLog.CreationTime),
5557
HPMilliSecondsFloat(evLog.ScheduleTime - evLog.InputTime),
5658
HPMilliSecondsFloat(now - evLog.ScheduleTime),
57-
HPMilliSecondsFloat(GetTime - SubmitTime));
59+
HPMilliSecondsFloat(GetTime - SubmitTime),
60+
batch ? batch->Result->Results.size() : 0);
5861
if (evLog.Result->Results) {
5962
evLog.Result->Results.front().Orbit = std::move(evLog.Orbit);
6063
}
@@ -66,7 +69,7 @@ void TCompletionLogWrite::Exec(TActorSystem *actorSystem) {
6669
}
6770
if (evLog.Result->Status == NKikimrProto::OK) {
6871
if (batch) {
69-
if (batch->Sender == evLog.Sender) {
72+
if (batch->Sender == evLog.Sender && batch->Result->Results.size() < MAX_RESULTS_PER_BATCH) {
7073
batch->Result->Results.push_back(std::move(evLog.Result->Results[0]));
7174
} else {
7275
sendResponse(batch);

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
#include <util/system/unaligned_mem.h>
1515

16+
constexpr size_t MAX_REQS_PER_CYCLE = 200; // 200 requests take ~0.2ms in EnqueueAll function
17+
1618
namespace NKikimr {
1719
namespace NPDisk {
1820

@@ -3392,7 +3394,14 @@ void TPDisk::ProcessYardInitSet() {
33923394
}
33933395

33943396
void TPDisk::EnqueueAll() {
3397+
TInstant start = TInstant::Now();
3398+
33953399
TGuard<TMutex> guard(StateMutex);
3400+
size_t initialQueueSize = InputQueue.GetWaitingSize();
3401+
size_t processedReqs = 0;
3402+
size_t pushedToForsetiReqs = 0;
3403+
3404+
33963405
while (InputQueue.GetWaitingSize() > 0) {
33973406
TRequestBase* request = InputQueue.Pop();
33983407
AtomicSub(InputQueueCost, request->Cost);
@@ -3435,9 +3444,17 @@ void TPDisk::EnqueueAll() {
34353444
} else {
34363445
if (PreprocessRequest(request)) {
34373446
PushRequestToForseti(request);
3447+
++pushedToForsetiReqs;
34383448
}
34393449
}
3450+
++processedReqs;
3451+
if (processedReqs >= MAX_REQS_PER_CYCLE) {
3452+
break;
3453+
}
34403454
}
3455+
3456+
double spentTimeMs = (TInstant::Now() - start).MillisecondsFloat();
3457+
LWPROBE(PDiskEnqueueAllDetails, PCtx->PDiskId, initialQueueSize, processedReqs, pushedToForsetiReqs, spentTimeMs);
34413458
}
34423459

34433460
void TPDisk::Update() {

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -875,8 +875,8 @@ void TPDisk::LogWrite(TLogWrite &evLog, TVector<ui32> &logChunksToCommit) {
875875
P_LOG(PRI_ERROR, BPD70, str.Str());
876876
evLog.Result.Reset(new NPDisk::TEvLogResult(NKikimrProto::OUT_OF_SPACE,
877877
NotEnoughDiskSpaceStatusFlags(evLog.Owner, evLog.OwnerGroupType), str.Str()));
878-
evLog.Result->Results.push_back(NPDisk::TEvLogResult::TRecord(evLog.Lsn, evLog.Cookie));
879878
Y_ABORT_UNLESS(evLog.Result.Get());
879+
evLog.Result->Results.push_back(NPDisk::TEvLogResult::TRecord(evLog.Lsn, evLog.Cookie));
880880
return;
881881
}
882882
if (!CommonLogger->NextChunks.empty()) {

ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ TPDiskMon::TPDiskMon(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& count
120120
COUNTER_INIT(DeviceGroup, DeviceIoErrors, true);
121121
COUNTER_INIT_IF_EXTENDED(DeviceGroup, DeviceWaitTimeMs, true);
122122

123+
UpdateDurationTracker.SetPDiskId(PDiskId);
123124
UpdateDurationTracker.SetCounter(DeviceGroup->GetCounter("PDiskThreadBusyTimeNs", true));
124125

125126
// queue subgroup

ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ struct TPDiskMon {
157157

158158
::NMonitoring::TDynamicCounters::TCounterPtr PDiskThreadBusyTimeNs;
159159

160+
ui32 PDiskId = 0;
161+
160162
public:
161163
NMonitoring::TPercentileTrackerLg<5, 4, 15> UpdateCycleTime;
162164

@@ -165,6 +167,10 @@ struct TPDiskMon {
165167
: BeginUpdateAt(HPNow())
166168
{}
167169

170+
void SetPDiskId(ui32 pdiskId) {
171+
PDiskId = pdiskId;
172+
}
173+
168174
void SetCounter(const ::NMonitoring::TDynamicCounters::TCounterPtr& pDiskThreadBusyTimeNs) {
169175
PDiskThreadBusyTimeNs = pDiskThreadBusyTimeNs;
170176
}
@@ -208,7 +214,7 @@ struct TPDiskMon {
208214
float schedulingMs = HPMilliSecondsFloat(ProcessingStartAt - SchedulingStartAt);
209215
float processingMs = HPMilliSecondsFloat(WaitingStartAt - ProcessingStartAt);
210216
float waitingMs = HPMilliSecondsFloat(updateEndedAt - WaitingStartAt);
211-
GLOBAL_LWPROBE(BLOBSTORAGE_PROVIDER, PDiskUpdateCycleDetails, entireUpdateMs, inputQueueMs,
217+
GLOBAL_LWPROBE(BLOBSTORAGE_PROVIDER, PDiskUpdateCycleDetails, PDiskId, entireUpdateMs, inputQueueMs,
212218
schedulingMs, processingMs, waitingMs);
213219
}
214220
BeginUpdateAt = updateEndedAt;

0 commit comments

Comments
 (0)