Skip to content

Commit f57bdce

Browse files
committed
Add ICB-configurable parameters for accelerates (#7534)
1 parent 8e1e06b commit f57bdce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+1065
-308
lines changed

ydb/core/base/blobstorage.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,8 @@ struct TEvBlobStorage {
492492
EvInplacePatch,
493493
EvAssimilate,
494494

495+
EvGetQueuesInfo, // for debugging purposes
496+
495497
//
496498
EvPutResult = EvPut + 512, /// 268 632 576
497499
EvGetResult,
@@ -506,6 +508,8 @@ struct TEvBlobStorage {
506508
EvInplacePatchResult,
507509
EvAssimilateResult,
508510

511+
EvQueuesInfo, // for debugging purposes
512+
509513
// proxy <-> vdisk interface
510514
EvVPut = EvPut + 2 * 512, /// 268 633 088
511515
EvVGet,
@@ -873,6 +877,7 @@ struct TEvBlobStorage {
873877
EvRunActor = EvPut + 15 * 512,
874878
EvVMockCtlRequest,
875879
EvVMockCtlResponse,
880+
EvDelayedMessageWrapper,
876881

877882
// incremental huge blob keeper
878883
EvIncrHugeInit = EvPut + 17 * 512,

ydb/core/blobstorage/backpressure/common.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,39 @@
1515
#define QLOG_DEBUG_S(marker, arg) QLOG_LOG_S(marker, NActors::NLog::PRI_DEBUG , arg)
1616

1717
LWTRACE_USING(BLOBSTORAGE_PROVIDER);
18+
19+
namespace NKikimr::NBsQueue {
20+
21+
// Special timer for debug purposes, which works with virtual time of TTestActorSystem
22+
struct TActivationContextTimer {
23+
TActivationContextTimer()
24+
: CreationTimestamp(NActors::TActivationContext::Monotonic())
25+
{}
26+
27+
double Passed() const {
28+
return (NActors::TActivationContext::Monotonic() - CreationTimestamp).SecondsFloat();
29+
}
30+
31+
TMonotonic CreationTimestamp;
32+
};
33+
34+
struct TBSQueueTimer {
35+
TBSQueueTimer(bool useActorSystemTime)
36+
{
37+
if (useActorSystemTime) {
38+
Timer.emplace<TActivationContextTimer>();
39+
} else {
40+
Timer.emplace<THPTimer>();
41+
}
42+
}
43+
44+
std::variant<THPTimer, TActivationContextTimer> Timer;
45+
46+
double Passed() const {
47+
return std::visit([](const auto& timer) -> double {
48+
return timer.Passed();
49+
}, Timer);
50+
}
51+
};
52+
53+
} // namespace NKikimr::NBsQueue

ydb/core/blobstorage/backpressure/event.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ IEventBase *TEventHolder::MakeErrorReply(NKikimrProto::EReplyStatus status, cons
2727

2828
void TEventHolder::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId,
2929
ui64 sequenceId, bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
30-
const THPTimer& processingTimer) {
30+
const TBSQueueTimer& processingTimer) {
3131
// check that we are not discarded yet
3232
Y_ABORT_UNLESS(Type != 0);
3333

ydb/core/blobstorage/backpressure/event.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class TEventHolder {
142142

143143
void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId, ui64 sequenceId,
144144
bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
145-
const THPTimer& processingTimer);
145+
const TBSQueueTimer& processingTimer);
146146

147147
void Discard();
148148
};

ydb/core/blobstorage/backpressure/queue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ namespace NKikimr::NBsQueue {
44

55
TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
66
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
7-
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility)
7+
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility, bool useActorSystemTime)
88
: Queues(bspctx)
99
, WindowSize(0)
1010
, InFlightCost(0)
@@ -16,6 +16,7 @@ TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamic
1616
, ClientId(clientId)
1717
, BytesWaiting(0)
1818
, InterconnectChannel(interconnectChannel)
19+
, UseActorSystemTime(useActorSystemTime)
1920
// use parent group visibility
2021
, QueueWaitingItems(counters->GetCounter("QueueWaitingItems", false, visibility))
2122
, QueueWaitingBytes(counters->GetCounter("QueueWaitingBytes", false, visibility))

ydb/core/blobstorage/backpressure/queue.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,16 @@ class TBlobStorageQueue {
5151
const ui64 QueueCookie;
5252
ui64 Cost;
5353
bool DirtyCost;
54-
THPTimer ProcessingTimer;
54+
TBSQueueTimer ProcessingTimer;
55+
5556
TTrackableList<TItem>::iterator Iterator;
5657

5758
template<typename TEvent>
5859
TItem(TAutoPtr<TEventHandle<TEvent>>& event, TInstant deadline,
5960
const ::NMonitoring::TDynamicCounters::TCounterPtr& serItems,
6061
const ::NMonitoring::TDynamicCounters::TCounterPtr& serBytes,
6162
const TBSProxyContextPtr& bspctx, ui32 interconnectChannel,
62-
bool local)
63+
bool local, bool useActorSystemTime)
6364
: Queue(EItemQueue::NotSet)
6465
, CostEssence(*event->Get())
6566
, Span(TWilson::VDiskTopLevel, std::move(event->TraceId), "Backpressure.InFlight")
@@ -70,6 +71,7 @@ class TBlobStorageQueue {
7071
, QueueCookie(RandomNumber<ui64>())
7172
, Cost(0)
7273
, DirtyCost(true)
74+
, ProcessingTimer(useActorSystemTime)
7375
{
7476
if (Span) {
7577
Span
@@ -129,6 +131,8 @@ class TBlobStorageQueue {
129131

130132
const ui32 InterconnectChannel;
131133

134+
const bool UseActorSystemTime;
135+
132136
public:
133137
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingItems;
134138
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingBytes;
@@ -156,7 +160,8 @@ class TBlobStorageQueue {
156160
TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
157161
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
158162
const TBlobStorageGroupType &gType,
159-
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public);
163+
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public,
164+
bool useActorSystemTime = false);
160165

161166
~TBlobStorageQueue();
162167

@@ -213,7 +218,8 @@ class TBlobStorageQueue {
213218
TItemList::iterator newIt;
214219
if (Queues.Unused.empty()) {
215220
newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline,
216-
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local);
221+
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local,
222+
UseActorSystemTime);
217223
++*QueueSize;
218224
} else {
219225
newIt = Queues.Unused.begin();
@@ -222,7 +228,7 @@ class TBlobStorageQueue {
222228
TItem& item = *newIt;
223229
item.~TItem();
224230
new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx,
225-
InterconnectChannel, local);
231+
InterconnectChannel, local, UseActorSystemTime);
226232
}
227233

228234
newIt->Iterator = newIt;

ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,13 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
7777
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
7878
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
7979
ui32 interconnectChannel, bool /*local*/, TDuration watchdogTimeout,
80-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility)
80+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
81+
bool useActorSystemTime)
8182
: BSProxyCtx(bspctx)
8283
, QueueName(queueName)
8384
, Counters(counters->GetSubgroup("queue", queueName))
8485
, Queue(Counters, LogPrefix, bspctx, clientId, interconnectChannel,
85-
(info ? info->Type : TErasureType::ErasureNone), visibility)
86+
(info ? info->Type : TErasureType::ErasureNone), visibility, useActorSystemTime)
8687
, VDiskIdShort(vdiskId)
8788
, QueueId(queueId)
8889
, QueueWatchdogTimeout(watchdogTimeout)
@@ -975,9 +976,10 @@ IActor* CreateVDiskBackpressureClient(const TIntrusivePtr<TBlobStorageGroupInfo>
975976
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
976977
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
977978
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
978-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility) {
979+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
980+
bool useActorSystemTime) {
979981
return new NBsQueue::TVDiskBackpressureClientActor(info, vdiskId, queueId, counters, bspctx, clientId, queueName,
980-
interconnectChannel, local, watchdogTimeout, flowRecord, visibility);
982+
interconnectChannel, local, watchdogTimeout, flowRecord, visibility, useActorSystemTime);
981983
}
982984

983985
} // NKikimr

ydb/core/blobstorage/backpressure/queue_backpressure_client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ namespace NKikimr {
5050
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
5151
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
5252
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
53-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility);
53+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
54+
bool useActorSystemTime = false);
5455

5556
} // NKikimr

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ const ui32 MaskSizeBits = 32;
5555
constexpr bool DefaultEnablePutBatching = true;
5656
constexpr bool DefaultEnableVPatch = false;
5757

58+
constexpr float DefaultSlowDiskThreshold = 2;
59+
constexpr float DefaultPredictedDelayMultiplier = 1;
60+
5861
constexpr bool WithMovingPatchRequestToStaticNode = true;
5962

6063
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -168,6 +171,11 @@ inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TE
168171
}
169172
}
170173

174+
struct TAccelerationParams {
175+
double SlowDiskThreshold = 2;
176+
double PredictedDelayMultiplier = 1;
177+
};
178+
171179
template<typename TDerived>
172180
class TBlobStorageGroupRequestActor : public TActor<TDerived> {
173181
public:
@@ -696,6 +704,7 @@ struct TBlobStorageGroupPutParameters {
696704
bool TimeStatsEnabled;
697705
TDiskResponsivenessTracker::TPerDiskStatsPtr Stats;
698706
bool EnableRequestMod3x3ForMinLatency;
707+
TAccelerationParams AccelerationParams;
699708
};
700709
IActor* CreateBlobStorageGroupPutRequest(TBlobStorageGroupPutParameters params);
701710

@@ -713,6 +722,7 @@ struct TBlobStorageGroupMultiPutParameters {
713722
NKikimrBlobStorage::EPutHandleClass HandleClass;
714723
TEvBlobStorage::TEvPut::ETactic Tactic;
715724
bool EnableRequestMod3x3ForMinLatency;
725+
TAccelerationParams AccelerationParams;
716726

717727
static ui32 CalculateRestartCounter(TBatchedVec<TEvBlobStorage::TEvPut::TPtr>& events) {
718728
ui32 maxRestarts = 0;
@@ -732,6 +742,7 @@ struct TBlobStorageGroupGetParameters {
732742
.Activity = NKikimrServices::TActivity::BS_PROXY_GET_ACTOR,
733743
};
734744
TNodeLayoutInfoPtr NodeLayout;
745+
TAccelerationParams AccelerationParams;
735746
};
736747
IActor* CreateBlobStorageGroupGetRequest(TBlobStorageGroupGetParameters params);
737748

@@ -833,12 +844,20 @@ IActor* CreateBlobStorageGroupAssimilateRequest(TBlobStorageGroupAssimilateParam
833844

834845
IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon);
835846

847+
struct TBlobStorageProxyParameters {
848+
bool UseActorSystemTimeInBSQueue = false;
849+
850+
const TControlWrapper& EnablePutBatching;
851+
const TControlWrapper& EnableVPatch;
852+
const TControlWrapper& SlowDiskThreshold;
853+
const TControlWrapper& PredictedDelayMultiplier;
854+
};
855+
836856
IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info,
837857
bool forceWaitAllDrives, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
838-
TIntrusivePtr<TStoragePoolCounters>&& storagePoolCounters, const TControlWrapper &enablePutBatching,
839-
const TControlWrapper &enableVPatch);
858+
TIntrusivePtr<TStoragePoolCounters>&& storagePoolCounters, const TBlobStorageProxyParameters& params);
840859

841860
IActor* CreateBlobStorageGroupProxyUnconfigured(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
842-
const TControlWrapper &enablePutBatching, const TControlWrapper &enableVPatch);
861+
const TBlobStorageProxyParameters& params);
843862

844863
}//NKikimr

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,17 @@ ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQu
178178
}
179179

180180
void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
181-
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
181+
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
182+
double multiplier) const {
182183
outNWorst->resize(Disks.size());
183184
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
184-
(*outNWorst)[diskIdx] = { GetPredictedDelayNs(info, groupQueues, diskIdx, queueId), diskIdx };
185+
(*outNWorst)[diskIdx] = {
186+
static_cast<ui64>(GetPredictedDelayNs(info, groupQueues, diskIdx, queueId) * multiplier),
187+
diskIdx
188+
};
185189
}
186-
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, (ui32)Disks.size()), outNWorst->end());
190+
ui32 sortedPrefixSize = std::min(3u, (ui32)Disks.size());
191+
std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end());
187192
}
188193

189194
bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
@@ -361,7 +366,8 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
361366
}
362367

363368
EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec<IStrategy*, 1>& s,
364-
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
369+
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished,
370+
const TBlobStorageGroupInfo::TGroupVDisks *expired) {
365371
for (auto it = BlobStates.begin(); it != BlobStates.end(); ) {
366372
auto& blob = it->second;
367373
if (!std::exchange(blob.IsChanged, false)) {
@@ -373,7 +379,7 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
373379
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
374380
TString errorReason;
375381
for (IStrategy *strategy : s) {
376-
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) {
382+
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests, accelerationParams)) {
377383
case EStrategyOutcome::IN_PROGRESS:
378384
status = NKikimrProto::UNKNOWN;
379385
break;
@@ -415,8 +421,9 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
415421
}
416422

417423
EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s,
418-
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
419-
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, finished, expired);
424+
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished,
425+
const TBlobStorageGroupInfo::TGroupVDisks *expired) {
426+
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, accelerationParams, finished, expired);
420427
}
421428

422429
TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
@@ -458,13 +465,17 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde
458465
}
459466

460467
void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
461-
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
468+
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
469+
double multiplier) const {
462470
ui32 totalVDisks = info.GetTotalVDisksNum();
463471
outNWorst->resize(totalVDisks);
464472
for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) {
465-
(*outNWorst)[orderNumber] = { groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId), orderNumber };
473+
(*outNWorst)[orderNumber] = {
474+
static_cast<ui64>(groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId) * multiplier),
475+
orderNumber
476+
};
466477
}
467-
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, totalVDisks), outNWorst->end());
478+
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(3u, totalVDisks), outNWorst->end());
468479
}
469480

470481
void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {

0 commit comments

Comments
 (0)