Skip to content

Commit 9918d3c

Browse files
va-kuznecovVlad Kuznecov
andauthored
Refactor chunk reads and writes (#8893)
Co-authored-by: Vlad Kuznecov <va-kuznecov@nebius.com>
1 parent a3bbcb9 commit 9918d3c

File tree

3 files changed

+76
-115
lines changed

3 files changed

+76
-115
lines changed

ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,8 @@ class TRealBlockDevice : public IBlockDevice {
438438
Device.Mon.DeviceWriteDuration.Increment(duration);
439439
LWPROBE(PDiskDeviceWriteDuration, Device.GetPDiskId(), duration, opSize);
440440
}
441+
P_LOG(PRI_TRACE, BPD01, "iop is done", (Type, op->GetType()), (Duration, duration),
442+
(Offset, op->GetOffset()), (Size, opSize));
441443
if (completionAction->FlushAction) {
442444
ui64 idx = completionAction->FlushAction->OperationIdx;
443445
Y_ABORT_UNLESS(WaitingNoops[idx % MaxWaitingNoops] == nullptr);

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 73 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ TPDisk::TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig
6060
0, 64 * 1024 * 1024);
6161
ForsetiMaxLogBatchNs = TControlWrapper((PDiskCategory.IsSolidState() ? 50'000ll : 500'000ll), 0, 100'000'000ll);
6262
ForsetiMaxLogBatchNsCached = ForsetiMaxLogBatchNs;
63-
ForsetiOpPieceSizeSsd = TControlWrapper(64 * 1024, 1, 64 * 1024 * 1024);
64-
ForsetiOpPieceSizeRot = TControlWrapper(2 * 1024 * 1024, 1, 64 * 1024 * 1024);
63+
ForsetiOpPieceSizeSsd = TControlWrapper(64 * 1024, 1, 512 * 1024);
64+
ForsetiOpPieceSizeRot = TControlWrapper(512 * 1024, 1, 512 * 1024);
6565
ForsetiOpPieceSizeCached = PDiskCategory.IsSolidState() ? ForsetiOpPieceSizeSsd : ForsetiOpPieceSizeRot;
6666

6767
if (Cfg->SectorMap) {
@@ -809,10 +809,8 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi
809809
<< " evChunkWrite->TotalSize# " << evChunkWrite->TotalSize);
810810

811811
ui32 chunkIdx = evChunkWrite->ChunkIdx;
812-
813812
Y_ABORT_UNLESS(chunkIdx != 0);
814813

815-
816814
ui64 desiredSectorIdx = 0;
817815
ui64 sectorOffset = 0;
818816
ui64 lastSectorIdx;
@@ -937,7 +935,7 @@ void TPDisk::SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringSt
937935
}
938936

939937
TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector,
940-
ui64 pieceSizeLimit, ui64 *reallyReadDiskBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit) {
938+
ui64 pieceSizeLimit, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit) {
941939
if (read->IsReplied) {
942940
return ReadPieceResultOk;
943941
}
@@ -950,13 +948,12 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &
950948
sectorsToRead = pieceSizeLimit / Format.SectorSize;
951949
bytesToRead = sectorsToRead * Format.SectorSize;
952950
}
951+
Y_VERIFY_S(bytesToRead == pieceSizeLimit, bytesToRead << " " << pieceSizeLimit);
952+
Y_VERIFY_S(sectorsToRead == pieceSizeLimit / Format.SectorSize, sectorsToRead << " " << pieceSizeLimit);
953+
Y_VERIFY_S(pieceSizeLimit % Format.SectorSize == 0, pieceSizeLimit);
953954

954955
Y_ABORT_UNLESS(sectorsToRead);
955956

956-
if (reallyReadDiskBytes) {
957-
*reallyReadDiskBytes = bytesToRead;
958-
}
959-
960957
ui64 firstSector;
961958
ui64 lastSector;
962959
ui64 sectorOffset;
@@ -2229,9 +2226,14 @@ void TPDisk::ProcessChunkWriteQueue() {
22292226
case ERequestType::RequestChunkWritePiece:
22302227
{
22312228
TChunkWritePiece *piece = static_cast<TChunkWritePiece*>(req);
2232-
if (ChunkWritePiece(piece->ChunkWrite.Get(), piece->PieceShift, piece->PieceSize)) {
2233-
Mon.IncrementQueueTime(piece->ChunkWrite->PriorityClass,
2234-
piece->ChunkWrite->LifeDurationMs(now));
2229+
P_LOG(PRI_NOTICE, BPD01, "ChunkWritePiece",
2230+
(ChunkIdx, piece->ChunkWrite->ChunkIdx),
2231+
(Offset, piece->PieceShift),
2232+
(Size, piece->PieceSize)
2233+
);
2234+
bool lastPart = ChunkWritePiece(piece->ChunkWrite.Get(), piece->PieceShift, piece->PieceSize);
2235+
if (lastPart) {
2236+
Mon.IncrementQueueTime(piece->ChunkWrite->PriorityClass, piece->ChunkWrite->LifeDurationMs(now));
22352237
}
22362238
delete piece;
22372239
break;
@@ -2250,49 +2252,38 @@ void TPDisk::ProcessChunkReadQueue() {
22502252
ui64 bufferSize = BufferPool->GetBufferSize() / Format.SectorSize * Format.SectorSize;
22512253

22522254
for (auto& req : JointChunkReads) {
2253-
22542255
req->SpanStack.PopOk();
22552256
req->SpanStack.Push(TWilson::PDiskDetailed, "PDisk.InBlockDevice", NWilson::EFlags::AUTO_END);
2256-
switch (req->GetType()) {
2257-
case ERequestType::RequestChunkReadPiece:
2258-
{
2259-
TChunkReadPiece *piece = static_cast<TChunkReadPiece*>(req.Get());
2260-
Y_ABORT_UNLESS(!piece->SelfPointer);
2261-
TIntrusivePtr<TChunkRead> &read = piece->ChunkRead;
2262-
TReqId reqId = read->ReqId;
2263-
ui32 chunkIdx = read->ChunkIdx;
2264-
bool isComplete = false;
2265-
ui8 priorityClass = read->PriorityClass;
2266-
NHPTimer::STime creationTime = read->CreationTime;
2267-
if (!read->IsReplied) {
2268-
P_LOG(PRI_DEBUG, BPD36, "Performing TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx));
2269-
2270-
ui32 size = 0;
2271-
while (!isComplete && size < piece->PieceSizeLimit) {
2272-
ui64 currentLimit = Min(bufferSize, piece->PieceSizeLimit - size);
2273-
ui64 reallyReadDiskBytes;
2274-
EChunkReadPieceResult result = ChunkReadPiece(read, piece->PieceCurrentSector + size / Format.SectorSize,
2275-
currentLimit, &reallyReadDiskBytes, piece->SpanStack.GetTraceId(), std::move(piece->Orbit));
2276-
isComplete = (result != ReadPieceResultInProgress);
2277-
// Read pieces is sliced previously and it is expected that ChunkReadPiece will read exactly
2278-
// currentLimit bytes
2279-
Y_VERIFY_S(reallyReadDiskBytes == currentLimit, reallyReadDiskBytes << " != " << currentLimit);
2280-
size += currentLimit;
2281-
}
2282-
}
2283-
piece->OnSuccessfulDestroy(PCtx->ActorSystem);
2284-
if (isComplete) {
2285-
//
2286-
// WARNING: Don't access "read" after this point.
2287-
// Don't add code before the warning!
2288-
//
2289-
Mon.IncrementQueueTime(priorityClass, HPMilliSeconds(now - creationTime));
2290-
P_LOG(PRI_DEBUG, BPD37, "enqueued all TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx));
2291-
}
2292-
break;
2293-
}
2294-
default:
2295-
Y_FAIL_S("Unexpected request type# " << ui64(req->GetType()) << " in JointChunkReads");
2257+
2258+
Y_VERIFY_S(req->GetType() == ERequestType::RequestChunkReadPiece, "Unexpected request type# " << ui64(req->GetType()) << " in JointChunkReads");
2259+
TChunkReadPiece *piece = static_cast<TChunkReadPiece*>(req.Get());
2260+
Y_ABORT_UNLESS(!piece->SelfPointer);
2261+
TIntrusivePtr<TChunkRead> &read = piece->ChunkRead;
2262+
TReqId reqId = read->ReqId;
2263+
ui32 chunkIdx = read->ChunkIdx;
2264+
ui8 priorityClass = read->PriorityClass;
2265+
NHPTimer::STime creationTime = read->CreationTime;
2266+
Y_VERIFY(!read->IsReplied);
2267+
P_LOG(PRI_NOTICE, BPD36, "Performing TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx),
2268+
(PieceCurrentSector, piece->PieceCurrentSector),
2269+
(PieceSizeLimit, piece->PieceSizeLimit),
2270+
(IsTheLastPiece, piece->IsTheLastPiece),
2271+
(BufferSize, bufferSize)
2272+
);
2273+
2274+
ui64 currentLimit = Min(bufferSize, piece->PieceSizeLimit);
2275+
EChunkReadPieceResult result = ChunkReadPiece(read, piece->PieceCurrentSector, currentLimit,
2276+
piece->SpanStack.GetTraceId(), std::move(piece->Orbit));
2277+
bool isComplete = (result != ReadPieceResultInProgress);
2278+
Y_VERIFY_S(isComplete || currentLimit >= piece->PieceSizeLimit, isComplete << " " << currentLimit << " " << piece->PieceSizeLimit);
2279+
piece->OnSuccessfulDestroy(PCtx->ActorSystem);
2280+
if (isComplete) {
2281+
//
2282+
// WARNING: Don't access "read" after this point.
2283+
// Don't add code before the warning!
2284+
//
2285+
Mon.IncrementQueueTime(priorityClass, HPMilliSeconds(now - creationTime));
2286+
P_LOG(PRI_NOTICE, BPD37, "enqueued all TChunkReadPiece", (ReqId, reqId), (chunkIdx, chunkIdx));
22962287
}
22972288
}
22982289
LWTRACK(PDiskProcessChunkReadQueue, UpdateCycleOrbit, PCtx->PDiskId, JointChunkReads.size());
@@ -3150,65 +3141,46 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
31503141
if (!isAdded) {
31513142
if (request->GetType() == ERequestType::RequestChunkWrite) {
31523143
TIntrusivePtr<TChunkWrite> whole(static_cast<TChunkWrite*>(request));
3153-
ui32 smallJobSize = 0;
3154-
ui32 smallJobCount = 0;
3155-
ui32 largeJobSize = 0;
3156-
SplitChunkJobSize(whole->TotalSize, &smallJobSize, &largeJobSize, &smallJobCount);
3157-
for (ui32 idx = 0; idx < smallJobCount; ++idx) {
3158-
// Schedule small job.
3144+
3145+
const ui32 jobSizeLimit = ui64(ForsetiOpPieceSizeCached) * Format.SectorPayloadSize() / Format.SectorSize;
3146+
const ui32 jobCount = (whole->TotalSize + jobSizeLimit - 1) / jobSizeLimit;
3147+
3148+
ui32 remainingSize = whole->TotalSize;
3149+
for (ui32 idx = 0; idx < jobCount; ++idx) {
31593150
auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END);
31603151
span.Attribute("small_job_idx", idx)
3161-
.Attribute("is_last_piece", false);
3162-
TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * smallJobSize, smallJobSize, std::move(span));
3152+
.Attribute("is_last_piece", idx == jobCount - 1);
3153+
ui32 jobSize = Min(remainingSize, jobSizeLimit);
3154+
TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * jobSizeLimit, jobSize, std::move(span));
31633155
piece->EstimateCost(DriveModel);
31643156
AddJobToForseti(cbs, piece, request->JobKind);
3157+
remainingSize -= jobSize;
31653158
}
3166-
// Schedule large job (there always is one)
3167-
auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END);
3168-
span.Attribute("is_last_piece", true);
3169-
TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize, std::move(span));
3170-
piece->EstimateCost(DriveModel);
3171-
AddJobToForseti(cbs, piece, request->JobKind);
3172-
LWTRACK(PDiskChunkWriteAddToScheduler, request->Orbit, PCtx->PDiskId, request->ReqId.Id,
3173-
HPSecondsFloat(HPNow() - request->CreationTime), request->Owner, request->IsFast,
3174-
request->PriorityClass, whole->TotalSize);
3159+
Y_VERIFY_S(remainingSize == 0, remainingSize);
31753160
} else if (request->GetType() == ERequestType::RequestChunkRead) {
31763161
TIntrusivePtr<TChunkRead> read = std::move(static_cast<TChunkRead*>(request)->SelfPointer);
31773162
ui32 totalSectors = read->LastSector - read->FirstSector + 1;
31783163

3179-
ui32 smallJobSize = (ForsetiOpPieceSizeCached + Format.SectorSize - 1) / Format.SectorSize;
3180-
ui32 smallJobCount = totalSectors / smallJobSize;
3181-
if (smallJobCount) {
3182-
smallJobCount--;
3183-
}
3184-
ui32 largeJobSize = totalSectors - smallJobSize * smallJobCount;
3185-
3186-
for (ui32 idx = 0; idx < smallJobCount; ++idx) {
3164+
Y_DEBUG_ABORT_UNLESS(ForsetiOpPieceSizeCached % Format.SectorSize == 0);
3165+
const ui32 jobSizeLimit = ForsetiOpPieceSizeCached / Format.SectorSize;
3166+
const ui32 jobCount = (totalSectors + jobSizeLimit - 1) / jobSizeLimit;
3167+
for (ui32 idx = 0; idx < jobCount; ++idx) {
31873168
auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkReadPiece", NWilson::EFlags::AUTO_END);
3169+
bool isLast = idx == jobCount - 1;
31883170
span.Attribute("small_job_idx", idx)
3189-
.Attribute("is_last_piece", false);
3190-
// Schedule small job.
3191-
auto piece = new TChunkReadPiece(read, idx * smallJobSize,
3192-
smallJobSize * Format.SectorSize, false, std::move(span));
3171+
.Attribute("is_last_piece", isLast);
3172+
3173+
ui32 jobSize = Min(totalSectors, jobSizeLimit);
3174+
auto piece = new TChunkReadPiece(read, idx * jobSizeLimit, jobSize * Format.SectorSize, isLast, std::move(span));
31933175
read->Orbit.Fork(piece->Orbit);
3194-
LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, idx, idx * smallJobSize * Format.SectorSize,
3195-
smallJobSize * Format.SectorSize);
3176+
LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, idx, idx * jobSizeLimit * Format.SectorSize,
3177+
jobSizeLimit * Format.SectorSize);
31963178
piece->EstimateCost(DriveModel);
31973179
piece->SelfPointer = piece;
31983180
AddJobToForseti(cbs, piece, request->JobKind);
3181+
totalSectors -= jobSize;
31993182
}
3200-
// Schedule large job (there always is one)
3201-
auto span = request->SpanStack.CreateChild(TWilson::PDiskBasic, "PDisk.ChunkReadPiece");
3202-
span.Attribute("is_last_piece", true);
3203-
auto piece = new TChunkReadPiece(read, smallJobCount * smallJobSize,
3204-
largeJobSize * Format.SectorSize, true, std::move(span));
3205-
read->Orbit.Fork(piece->Orbit);
3206-
LWTRACK(PDiskChunkReadPieceAddToScheduler, piece->Orbit, PCtx->PDiskId, smallJobCount,
3207-
smallJobCount * smallJobSize * Format.SectorSize, largeJobSize * Format.SectorSize);
3208-
piece->EstimateCost(DriveModel);
3209-
piece->SelfPointer = piece;
3210-
AddJobToForseti(cbs, piece, request->JobKind);
3211-
3183+
Y_VERIFY_S(totalSectors == 0, totalSectors);
32123184
} else {
32133185
AddJobToForseti(cbs, request, request->JobKind);
32143186
}
@@ -3221,17 +3193,6 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) {
32213193
}
32223194
}
32233195

3224-
// Always produces a large job and sometimes produces some small jobs and a large job.
3225-
void TPDisk::SplitChunkJobSize(ui32 totalSize, ui32 *outSmallJobSize, ui32 *outLargeJobSize, ui32 *outSmallJobCount) {
3226-
const ui64 sectorPayloadSize = Format.SectorPayloadSize();
3227-
*outSmallJobSize = (ForsetiOpPieceSizeCached + sectorPayloadSize - 1) / sectorPayloadSize * sectorPayloadSize;
3228-
*outSmallJobCount = totalSize / *outSmallJobSize;
3229-
if (*outSmallJobCount) {
3230-
(*outSmallJobCount)--;
3231-
}
3232-
*outLargeJobSize = totalSize - *outSmallJobSize * *outSmallJobCount;
3233-
}
3234-
32353196
void TPDisk::AddJobToForseti(NSchLab::TCbs *cbs, TRequestBase *request, NSchLab::EJobKind jobKind) {
32363197
LWTRACK(PDiskAddToScheduler, request->Orbit, PCtx->PDiskId, request->ReqId.Id, HPSecondsFloat(request->CreationTime),
32373198
request->Owner, request->IsFast, request->PriorityClass);
@@ -3458,18 +3419,17 @@ void TPDisk::Update() {
34583419
Mon.UpdateDurationTracker.UpdateStarted();
34593420
LWTRACK(PDiskUpdateStarted, UpdateCycleOrbit, PCtx->PDiskId);
34603421

3461-
// ui32 userSectorSize = 0;
3462-
3463-
// Make input queue empty
34643422
{
34653423
TGuard<TMutex> guard(StateMutex);
34663424
ForsetiMaxLogBatchNsCached = ForsetiMaxLogBatchNs;
34673425
ForsetiOpPieceSizeCached = PDiskCategory.IsSolidState() ? ForsetiOpPieceSizeSsd : ForsetiOpPieceSizeRot;
3468-
EnqueueAll();
3469-
/*userSectorSize = */Format.SectorPayloadSize();
3470-
3426+
ForsetiOpPieceSizeCached = Min<i64>(ForsetiOpPieceSizeCached, Cfg->BufferPoolBufferSizeBytes);
3427+
ForsetiOpPieceSizeCached = AlignDown<i64>(ForsetiOpPieceSizeCached, Format.SectorSize);
34713428
// Switch the scheduler when possible
34723429
ForsetiScheduler.SetIsBinLogEnabled(EnableForsetiBinLog);
3430+
3431+
// Make input queue empty
3432+
EnqueueAll();
34733433
}
34743434

34753435
// Make token injection to correct drive model underestimations and avoid disk underutilization

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,7 @@ class TPDisk : public IPDisk {
294294
void SendChunkReadError(const TIntrusivePtr<TChunkRead>& read, TStringStream& errorReason,
295295
NKikimrProto::EReplyStatus status);
296296
EChunkReadPieceResult ChunkReadPiece(TIntrusivePtr<TChunkRead> &read, ui64 pieceCurrentSector, ui64 pieceSizeLimit,
297-
ui64 *reallyReadBytes, NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit);
298-
void SplitChunkJobSize(ui32 totalSize, ui32 *outSmallJobSize, ui32 *outLargeJObSize, ui32 *outSmallJobCount);
297+
NWilson::TTraceId traceId, NLWTrace::TOrbit&& orbit);
299298
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
300299
// Chunk locking
301300
TVector<TChunkIdx> LockChunksForOwner(TOwner owner, const ui32 count, TString &errorReason);

0 commit comments

Comments
 (0)