Skip to content

Commit f68af91

Browse files
va-kuznecovVlad Kuznecov
andauthored
PDisk LWTracing improvements (#8160)
Co-authored-by: Vlad Kuznecov <va-kuznecov@nebius.com>
1 parent 1a1e14a commit f68af91

7 files changed

+38
-20
lines changed

ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,10 @@ struct TEventTypeField {
214214
PROBE(PDiskChunkReadPieceComplete, GROUPS("PDisk", "PDiskRequest"), \
215215
TYPES(TPDiskIdField, ui64, ui64, double), \
216216
NAMES("pdisk", "size", "relativeOffset", "deviceTimeMs")) \
217-
PROBE(PDiskAddWritePieceToScheduler, GROUPS("PDisk", "PDiskRequest"), \
217+
PROBE(PDiskChunkWriteAddToScheduler, GROUPS("PDisk", "PDiskRequest"), \
218218
TYPES(TPDiskIdField, ui64, double, ui64, bool, ui64, ui64), \
219219
NAMES("pdisk", "reqId", "creationTimeSec", "owner", "isFast", "priorityClass", "size")) \
220-
PROBE(PDiskChunkWritePieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
220+
PROBE(PDiskChunkWriteLastPieceSendToDevice, GROUPS("PDisk", "PDiskRequest"), \
221221
TYPES(TPDiskIdField, ui64, ui64, ui64, ui64), \
222222
NAMES("pdisk", "owner", "chunkIdx", "pieceOffset", "pieceSize")) \
223223
PROBE(PDiskLogWriteComplete, GROUPS("PDisk", "PDiskRequest"), \

ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,9 @@ class TRealBlockDevice : public IBlockDevice {
242242
EIoResult ret = EIoResult::TryAgain;
243243
while (ret == EIoResult::TryAgain) {
244244
action->SubmitTime = HPNow();
245+
if (action->FlushAction) {
246+
action->FlushAction->SubmitTime = action->SubmitTime;
247+
}
245248

246249
if (op->GetType() == IAsyncIoOperation::EType::PWrite) {
247250
PDISK_FAIL_INJECTION(1);
@@ -562,6 +565,9 @@ class TRealBlockDevice : public IBlockDevice {
562565
EIoResult ret = EIoResult::TryAgain;
563566
while (ret == EIoResult::TryAgain) {
564567
action->SubmitTime = HPNow();
568+
if (action->FlushAction) {
569+
action->FlushAction->SubmitTime = action->SubmitTime;
570+
}
565571
ret = Device.IoContext->Submit(op, Device.SharedCallback.Get());
566572
if (ret == EIoResult::Ok) {
567573
return true;

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ void TCompletionChunkReadPart::Exec(TActorSystem *actorSystem) {
276276
}
277277

278278
double deviceTimeMs = HPMilliSecondsFloat(GetTime - SubmitTime);
279-
LWTRACK(PDiskChunkReadPieceComplete, Read->Orbit, PDisk->PDiskId, RawReadSize, CommonBufferOffset, deviceTimeMs);
279+
LWTRACK(PDiskChunkReadPieceComplete, Orbit, PDisk->PDiskId, RawReadSize, CommonBufferOffset, deviceTimeMs);
280+
Read->Orbit.Join(Orbit);
280281
CumulativeCompletion->PartReadComplete(actorSystem);
281282
CumulativeCompletion = nullptr;
282283

ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class TCompletionChunkWrite : public TCompletionAction {
111111
Mon->IncrementResponseTime(PriorityClass, responseTimeMs, SizeBytes);
112112
}
113113
LWTRACK(PDiskChunkResponseTime, Orbit, PDiskId, ReqId.Id, PriorityClass, responseTimeMs, SizeBytes);
114+
Event->Orbit = std::move(Orbit);
114115
actorSystem->Send(Recipient, Event.Release());
115116
if (Mon) {
116117
Mon->GetWriteCounter(PriorityClass)->CountResponse();

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -848,9 +848,6 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
848848

849849
guard.Release();
850850

851-
LWTRACK(PDiskChunkWritePieceSendToDevice, evChunkWrite->Orbit, PDiskId, evChunkWrite->Owner, chunkIdx,
852-
pieceShift, pieceSize);
853-
854851
ui32 bytesAvailable = pieceSize;
855852
Y_ABORT_UNLESS(evChunkWrite->BytesWritten == pieceShift);
856853
const ui32 count = evChunkWrite->PartsPtr->Size();
@@ -909,6 +906,9 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
909906
LOG_INFO(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " chunkIdx# %" PRIu32
910907
" was zero-padded after writing", (ui32)PDiskId, (ui32)chunkIdx);
911908
}
909+
LWTRACK(PDiskChunkWriteLastPieceSendToDevice, evChunkWrite->Orbit, PDiskId, evChunkWrite->Owner, chunkIdx,
910+
pieceShift, pieceSize);
911+
912912
auto traceId = evChunkWrite->SpanStack.GetTraceId();
913913
evChunkWrite->Completion->Orbit = std::move(evChunkWrite->Orbit);
914914
writer.Flush(evChunkWrite->ReqId, &traceId, evChunkWrite->Completion.Release());
@@ -951,7 +951,7 @@ void TPDisk::SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringSt
951951
}
952952

953953
TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector,
954-
ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId) {
954+
ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit) {
955955
if (read->IsReplied) {
956956
return ReadPieceResultOk;
957957
}
@@ -1020,6 +1020,8 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &
10201020
THolder<TCompletionChunkReadPart> completion(new TCompletionChunkReadPart(this, read, bytesToRead,
10211021
payloadBytesToRead, payloadOffset, read->FinalCompletion, isTheLastPart, std::move(span)));
10221022
completion->CostNs = DriveModel.TimeForSizeNs(bytesToRead, read->ChunkIdx, TDriveModel::OP_TYPE_READ);
1023+
LWTRACK(PDiskChunkReadPiecesSendToDevice, orbit, PDiskId);
1024+
completion->Orbit = std::move(orbit);
10231025
Y_ABORT_UNLESS(bytesToRead <= completion->GetBuffer()->Size());
10241026
ui8 *data = completion->GetBuffer()->Data();
10251027
BlockDevice->PreadAsync(data, bytesToRead, readOffset, completion.Release(),
@@ -2289,7 +2291,6 @@ void TPDisk::ProcessChunkReadQueue() {
22892291
bool isComplete = false;
22902292
ui8 priorityClass = read->PriorityClass;
22912293
NHPTimer::STime creationTime = read->CreationTime;
2292-
LWTRACK(PDiskChunkReadPiecesSendToDevice, read->Orbit, PDiskId);
22932294
if (!read->IsReplied) {
22942295
LOG_DEBUG_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << (ui32)PDiskId
22952296
<< " ReqId# " << reqId
@@ -2301,7 +2302,7 @@ void TPDisk::ProcessChunkReadQueue() {
23012302
ui64 currentLimit = Min(bufferSize, piece->PieceSizeLimit - size);
23022303
ui64 reallyReadDiskBytes;
23032304
EChunkReadPieceResult result = ChunkReadPiece(read, piece->PieceCurrentSector + size / Format.SectorSize,
2304-
currentLimit, &reallyReadDiskBytes, piece->SpanStack.GetTraceId());
2305+
currentLimit, &reallyReadDiskBytes, piece->SpanStack.GetTraceId(), std::move(piece->Orbit));
23052306
isComplete = (result != ReadPieceResultInProgress);
23062307
// Read pieces is sliced previously and it is expected that ChunkReadPiece will read exactly
23072308
// currentLimit bytes
@@ -2977,7 +2978,6 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
29772978

29782979
auto result = std::make_unique<TEvChunkWriteResult>(NKikimrProto::OK, ev.ChunkIdx, ev.Cookie,
29792980
GetStatusFlags(ev.Owner, ev.OwnerGroupType), TString());
2980-
result->Orbit = std::move(ev.Orbit);
29812981

29822982
++state.OperationsInProgress;
29832983
++ownerData.InFlight->ChunkWrites;
@@ -3219,7 +3219,7 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
32193219
TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize, std::move(span));
32203220
piece->EstimateCost(DriveModel);
32213221
AddJobToForseti(cbs, piece, request->JobKind);
3222-
LWTRACK(PDiskAddWritePieceToScheduler, request->Orbit, PDiskId, request->ReqId.Id,
3222+
LWTRACK(PDiskChunkWriteAddToScheduler, request->Orbit, PDiskId, request->ReqId.Id,
32233223
HPSecondsFloat(HPNow() - request->CreationTime), request->Owner, request->IsFast,
32243224
request->PriorityClass, whole->TotalSize);
32253225
} else if (request->GetType() == ERequestType::RequestChunkRead) {
@@ -3240,7 +3240,8 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
32403240
// Schedule small job.
32413241
auto piece = new TChunkReadPiece(read, idx * smallJobSize,
32423242
smallJobSize * Format.SectorSize, false, std::move(span));
3243-
LWTRACK(PDiskChunkReadPieceAddToScheduler, read->Orbit, PDiskId, idx, idx * smallJobSize * Format.SectorSize,
3243+
read->Orbit.Fork(piece->Orbit);
3244+
LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PDiskId, idx, idx * smallJobSize * Format.SectorSize,
32443245
smallJobSize * Format.SectorSize);
32453246
piece->EstimateCost(DriveModel);
32463247
piece->SelfPointer = piece;
@@ -3251,7 +3252,8 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
32513252
span.Attribute("is_last_piece", true);
32523253
auto piece = new TChunkReadPiece(read, smallJobCount * smallJobSize,
32533254
largeJobSize * Format.SectorSize, true, std::move(span));
3254-
LWTRACK(PDiskChunkReadPieceAddToScheduler, read->Orbit, PDiskId, smallJobCount,
3255+
read->Orbit.Fork(piece->Orbit);
3256+
LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PDiskId, smallJobCount,
32553257
smallJobCount * smallJobSize * Format.SectorSize, largeJobSize * Format.SectorSize);
32563258
piece->EstimateCost(DriveModel);
32573259
piece->SelfPointer = piece;

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ class TPDisk : public IPDisk {
291291
void SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringStream& errorReason,
292292
NKikimrProto::EReplyStatus status);
293293
EChunkReadPieceResult ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector, ui64 pieceSizeLimit,
294-
ui64 *reallyReadBytes, NWilson::TTraceId traceId);
294+
ui64 *reallyReadBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit);
295295
void SplitChunkJobSize(ui32 totalSize, ui32 *outSmallJobSize, ui32 *outLargeJObSize, ui32 *outSmallJobCount);
296296
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
297297
// Chunk locking

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,31 @@ class TLogFlushCompletionAction : public TCompletionAction {
1919
: EndChunkIdx(endChunkIdx)
2020
, EndSectorIdx(endSectorIdx)
2121
, CommonLogger(commonLogger)
22-
, CompletionLogWrite(completionLogWrite) { }
22+
, CompletionLogWrite(completionLogWrite)
23+
{
24+
Orbit = std::move(completionLogWrite->Orbit);
25+
}
26+
27+
void SetUpCompletionLogWrite() {
28+
CompletionLogWrite->SubmitTime = SubmitTime;
29+
CompletionLogWrite->GetTime = GetTime;
30+
CompletionLogWrite->SetResult(Result);
31+
CompletionLogWrite->SetErrorReason(ErrorReason);
32+
CompletionLogWrite->Orbit = std::move(Orbit);
33+
}
2334

2435
void Exec(TActorSystem *actorSystem) override {
2536
CommonLogger->FirstUncommitted = TFirstUncommitted(EndChunkIdx, EndSectorIdx);
2637

27-
CompletionLogWrite->SetResult(Result);
28-
CompletionLogWrite->SetErrorReason(ErrorReason);
38+
SetUpCompletionLogWrite();
2939
CompletionLogWrite->Exec(actorSystem);
3040

3141
delete this;
3242
}
3343

3444
void Release(TActorSystem *actorSystem) override {
35-
CompletionLogWrite->SetResult(Result);
36-
CompletionLogWrite->SetErrorReason(ErrorReason);
45+
SetUpCompletionLogWrite();
3746
CompletionLogWrite->Release(actorSystem);
38-
3947
delete this;
4048
}
4149
};

0 commit comments

Comments
 (0)