Skip to content

Commit e4f6ac9

Browse files
committed
Fix acceleration in mirror-3-dc groups (#7931)
1 parent f57bdce commit e4f6ac9

25 files changed

+304
-240
lines changed

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ const ui32 MaskSizeBits = 32;
5555
constexpr bool DefaultEnablePutBatching = true;
5656
constexpr bool DefaultEnableVPatch = false;
5757

58-
constexpr float DefaultSlowDiskThreshold = 2;
59-
constexpr float DefaultPredictedDelayMultiplier = 1;
58+
constexpr double DefaultSlowDiskThreshold = 2;
59+
constexpr double DefaultPredictedDelayMultiplier = 1;
60+
constexpr ui32 DefaultMaxNumOfSlowDisks = 2;
6061

6162
constexpr bool WithMovingPatchRequestToStaticNode = true;
6263

@@ -172,8 +173,9 @@ inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TE
172173
}
173174

174175
struct TAccelerationParams {
175-
double SlowDiskThreshold = 2;
176-
double PredictedDelayMultiplier = 1;
176+
double SlowDiskThreshold = DefaultSlowDiskThreshold;
177+
double PredictedDelayMultiplier = DefaultPredictedDelayMultiplier;
178+
ui32 MaxNumOfSlowDisks = DefaultMaxNumOfSlowDisks;
177179
};
178180

179181
template<typename TDerived>
@@ -851,6 +853,7 @@ struct TBlobStorageProxyParameters {
851853
const TControlWrapper& EnableVPatch;
852854
const TControlWrapper& SlowDiskThreshold;
853855
const TControlWrapper& PredictedDelayMultiplier;
856+
const TControlWrapper& MaxNumOfSlowDisks = TControlWrapper(DefaultMaxNumOfSlowDisks, 1, 2);
854857
};
855858

856859
IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info,

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,16 @@ ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQu
179179

180180
void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
181181
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
182-
double multiplier) const {
182+
const TAccelerationParams& accelerationParams) const {
183183
outNWorst->resize(Disks.size());
184184
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
185+
ui64 predictedDelayNs = GetPredictedDelayNs(info, groupQueues, diskIdx, queueId);
185186
(*outNWorst)[diskIdx] = {
186-
static_cast<ui64>(GetPredictedDelayNs(info, groupQueues, diskIdx, queueId) * multiplier),
187+
static_cast<ui64>(predictedDelayNs * accelerationParams.PredictedDelayMultiplier),
187188
diskIdx
188189
};
189190
}
190-
ui32 sortedPrefixSize = std::min(3u, (ui32)Disks.size());
191+
ui32 sortedPrefixSize = std::min(accelerationParams.MaxNumOfSlowDisks + 1, (ui32)Disks.size());
191192
std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end());
192193
}
193194

@@ -466,16 +467,18 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde
466467

467468
void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
468469
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
469-
double multiplier) const {
470+
const TAccelerationParams& accelerationParams) const {
470471
ui32 totalVDisks = info.GetTotalVDisksNum();
471472
outNWorst->resize(totalVDisks);
472473
for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) {
474+
ui64 predictedDelayNs = groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId);
473475
(*outNWorst)[orderNumber] = {
474-
static_cast<ui64>(groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId) * multiplier),
476+
static_cast<ui64>(predictedDelayNs * accelerationParams.PredictedDelayMultiplier),
475477
orderNumber
476478
};
477479
}
478-
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(3u, totalVDisks), outNWorst->end());
480+
ui32 sortedPrefixSize = std::min(accelerationParams.MaxNumOfSlowDisks + 1, totalVDisks);
481+
std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end());
479482
}
480483

481484
void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {
@@ -542,4 +545,35 @@ void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
542545
}
543546
}
544547

548+
void TBlackboard::MarkSlowDisks(TBlobState& state, bool isPut, const TAccelerationParams& accelerationParams) {
549+
// by default all disks are considered fast
550+
for (TBlobState::TDisk& disk : state.Disks) {
551+
disk.IsSlow = false;
552+
}
553+
554+
ui32 maxNumSlow = accelerationParams.MaxNumOfSlowDisks;
555+
if (Info->GetTotalVDisksNum() <= maxNumSlow) {
556+
// all disks cannot be slow
557+
return;
558+
}
559+
560+
TDiskDelayPredictions worstDisks;
561+
state.GetWorstPredictedDelaysNs(*Info, *GroupQueues,
562+
(isPut ? HandleClassToQueueId(PutHandleClass) : HandleClassToQueueId(GetHandleClass)),
563+
&worstDisks, accelerationParams);
564+
565+
ui64 slowThreshold = worstDisks[maxNumSlow].PredictedNs * accelerationParams.SlowDiskThreshold;
566+
if (slowThreshold == 0) {
567+
// invalid or non-initialized predicted ns, consider all disks not slow
568+
return;
569+
}
570+
571+
for (ui32 idx = 0; idx < maxNumSlow; ++idx) {
572+
if (worstDisks[idx].PredictedNs > slowThreshold) {
573+
ui32 orderNumber = worstDisks[idx].DiskIdx;
574+
state.Disks[orderNumber].IsSlow = true;
575+
}
576+
}
577+
}
578+
545579
}//NKikimr

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,9 @@ struct TBlobState {
9898
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
9999
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
100100
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
101-
double multipler = 1) const;
101+
const TAccelerationParams& accelerationParams) const;
102102
TString ToString() const;
103103
bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const;
104-
105104
static TString SituationToString(ESituation situation);
106105
};
107106

@@ -165,7 +164,7 @@ class IStrategy {
165164

166165
struct TBlackboard {
167166
enum EAccelerationMode {
168-
AccelerationModeSkipOneSlowest,
167+
AccelerationModeSkipNSlowest,
169168
AccelerationModeSkipMarked
170169
};
171170

@@ -189,7 +188,7 @@ struct TBlackboard {
189188
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass)
190189
: Info(info)
191190
, GroupQueues(groupQueues)
192-
, AccelerationMode(AccelerationModeSkipOneSlowest)
191+
, AccelerationMode(AccelerationModeSkipNSlowest)
193192
, PutHandleClass(putHandleClass)
194193
, GetHandleClass(getHandleClass)
195194
{}
@@ -212,7 +211,7 @@ struct TBlackboard {
212211
void ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapIndex, ui32 responseIndex, NKikimrProto::EReplyStatus status);
213212
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
214213
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
215-
double multiplier = 1) const;
214+
const TAccelerationParams& accelerationParams) const;
216215
TString ToString() const;
217216

218217
void ChangeAll() {
@@ -225,6 +224,8 @@ struct TBlackboard {
225224

226225
void RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx);
227226

227+
void MarkSlowDisks(TBlobState& state, bool isPut, const TAccelerationParams& accelerationParams);
228+
228229
TBlobState& operator [](const TLogoBlobID& id);
229230
};
230231

ydb/core/blobstorage/dsproxy/dsproxy_get.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,9 +314,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
314314
}
315315

316316
void TryScheduleGetAcceleration() {
317-
if (!IsGetAccelerateScheduled && GetsAccelerated < 2) {
318-
// Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate
319-
if (CountDisksWithActiveRequests() <= 2) {
317+
if (!IsGetAccelerateScheduled && GetsAccelerated < AccelerationParams.MaxNumOfSlowDisks) {
318+
// Count VDisks with requests in flight, if there are <= the maximum number of slow VDisks, Accelerate
319+
if (CountDisksWithActiveRequests() <= AccelerationParams.MaxNumOfSlowDisks) {
320320
ui64 timeToAccelerateUs = GetImpl.GetTimeToAccelerateGetNs(LogCtx) / 1000;
321321
TDuration timeToAccelerate = TDuration::MicroSeconds(timeToAccelerateUs);
322322
TInstant now = TActivationContext::Now();
@@ -333,9 +333,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
333333
}
334334

335335
void TrySchedulePutAcceleration() {
336-
if (!IsPutAccelerateScheduled && PutsAccelerated < 2) {
337-
// Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate
338-
if (CountDisksWithActiveRequests() <= 2) {
336+
if (!IsPutAccelerateScheduled && PutsAccelerated < AccelerationParams.MaxNumOfSlowDisks) {
337+
// Count VDisks with requests in flight, if there are <= the maximum number of slow VDisks, Accelerate
338+
if (CountDisksWithActiveRequests() <= AccelerationParams.MaxNumOfSlowDisks) {
339339
ui64 timeToAccelerateUs = GetImpl.GetTimeToAcceleratePutNs(LogCtx) / 1000;
340340
TDuration timeToAccelerate = TDuration::MicroSeconds(timeToAccelerateUs);
341341
TInstant now = TActivationContext::Now();

ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,13 @@ ui64 TGetImpl::GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EV
164164
// Find the slowest disk
165165
TDiskDelayPredictions worstDisks;
166166
if (Blackboard.BlobStates.size() == 1) {
167-
Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs(
168-
*Info, *Blackboard.GroupQueues, queueId, &worstDisks,
169-
AccelerationParams.PredictedDelayMultiplier);
167+
Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues,
168+
queueId, &worstDisks, AccelerationParams);
170169
} else {
171-
Blackboard.GetWorstPredictedDelaysNs(
172-
*Info, *Blackboard.GroupQueues, queueId, &worstDisks,
173-
AccelerationParams.PredictedDelayMultiplier);
170+
Blackboard.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, queueId, &worstDisks,
171+
AccelerationParams);
174172
}
175-
return worstDisks[std::min(3u, (ui32)worstDisks.size() - 1)].PredictedNs;
173+
return worstDisks[std::min(AccelerationParams.MaxNumOfSlowDisks, (ui32)worstDisks.size() - 1)].PredictedNs;
176174
}
177175

178176
ui64 TGetImpl::GetTimeToAccelerateGetNs(TLogContext &logCtx) {

ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace NKikimr {
1919
, EnableVPatch(params.EnableVPatch)
2020
, SlowDiskThreshold(params.SlowDiskThreshold)
2121
, PredictedDelayMultiplier(params.PredictedDelayMultiplier)
22+
, MaxNumOfSlowDisks(params.MaxNumOfSlowDisks)
2223
{}
2324

2425
TBlobStorageGroupProxy::TBlobStorageGroupProxy(ui32 groupId, bool isEjected,TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
@@ -32,6 +33,7 @@ namespace NKikimr {
3233
, EnableVPatch(params.EnableVPatch)
3334
, SlowDiskThreshold(params.SlowDiskThreshold)
3435
, PredictedDelayMultiplier(params.PredictedDelayMultiplier)
36+
, MaxNumOfSlowDisks(params.MaxNumOfSlowDisks)
3537
{}
3638

3739
IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon) {

ydb/core/blobstorage/dsproxy/dsproxy_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,10 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
119119
bool HasInvalidGroupId() const { return GroupId.GetRawId() == Max<ui32>(); }
120120
void ProcessInitQueue();
121121

122+
// Acceleration parameters
122123
TMemorizableControlWrapper SlowDiskThreshold;
123124
TMemorizableControlWrapper PredictedDelayMultiplier;
125+
TMemorizableControlWrapper MaxNumOfSlowDisks;
124126

125127
TAccelerationParams GetAccelerationParams();
126128

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
382382
}
383383

384384
void TryToScheduleNextAcceleration() {
385-
if (!IsAccelerateScheduled && AccelerateRequestsSent < 2) {
386-
if (WaitingVDiskCount > 0 && WaitingVDiskCount <= 2 && RequestsSent > 1) {
385+
if (!IsAccelerateScheduled && AccelerateRequestsSent < AccelerationParams.MaxNumOfSlowDisks) {
386+
if (WaitingVDiskCount > 0 && WaitingVDiskCount <= AccelerationParams.MaxNumOfSlowDisks && RequestsSent > 1) {
387387
ui64 timeToAccelerateUs = Max<ui64>(1, PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000);
388388
if (RequestsPendingBeforeAcceleration == 1 && AccelerateRequestsSent == 1) {
389389
// if there is only one request pending, but first accelerate is unsuccessful, make a pause

ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx) {
9191
// Find the n'th slowest disk
9292
TDiskDelayPredictions worstDisks;
9393
state.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, HandleClassToQueueId(Blackboard.PutHandleClass),
94-
&worstDisks, AccelerationParams.PredictedDelayMultiplier);
95-
nthWorstPredictedNsVec[idx++] = worstDisks[2].PredictedNs;
94+
&worstDisks, AccelerationParams);
95+
nthWorstPredictedNsVec[idx++] = worstDisks[AccelerationParams.MaxNumOfSlowDisks].PredictedNs;
9696
}
9797
return *MaxElement(nthWorstPredictedNsVec.begin(), nthWorstPredictedNsVec.end());
9898
}

ydb/core/blobstorage/dsproxy/dsproxy_state.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ namespace NKikimr {
327327
return TAccelerationParams{
328328
.SlowDiskThreshold = .001f * SlowDiskThreshold.Update(TActivationContext::Now()),
329329
.PredictedDelayMultiplier = .001f * PredictedDelayMultiplier.Update(TActivationContext::Now()),
330+
.MaxNumOfSlowDisks = (ui32)MaxNumOfSlowDisks.Update(TActivationContext::Now()),
330331
};
331332
}
332333

0 commit comments

Comments
 (0)