Skip to content

Commit 2992756

Browse files
authored
Add local queue configs (#14802)
1 parent 4a69e43 commit 2992756

File tree

10 files changed

+57
-26
lines changed

10 files changed

+57
-26
lines changed

ydb/core/driver_lib/run/auto_config_initializer.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,13 @@ namespace NKikimr::NAutoConfigInitializer {
328328
} else {
329329
executor->SetSpinThreshold(1);
330330
}
331+
332+
if (config->HasMinLocalQueueSize()) {
333+
executor->SetMinLocalQueueSize(config->GetMinLocalQueueSize());
334+
}
335+
if (config->HasMaxLocalQueueSize()) {
336+
executor->SetMaxLocalQueueSize(config->GetMaxLocalQueueSize());
337+
}
331338
}
332339
}
333340

ydb/core/driver_lib/run/config_helpers.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,14 @@ void AddExecutorPool(NActors::TCpuManagerConfig& cpuManager, const NKikimrConfig
8686
basic.MaxThreadCount = poolConfig.GetMaxThreads();
8787
basic.DefaultThreadCount = poolConfig.GetThreads();
8888
basic.Priority = poolConfig.GetPriority();
89+
if (poolConfig.HasMinLocalQueueSize()) {
90+
basic.MinLocalQueueSize = poolConfig.GetMinLocalQueueSize();
91+
}
92+
if (poolConfig.HasMaxLocalQueueSize()) {
93+
basic.MaxLocalQueueSize = poolConfig.GetMaxLocalQueueSize();
94+
}
8995
cpuManager.Basic.emplace_back(std::move(basic));
96+
9097
break;
9198
}
9299

ydb/core/protos/config.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ message TActorSystemConfig {
8686
optional int32 MaxAvgPingDeviation = 17;
8787

8888
optional bool HasSharedThread = 18;
89+
optional uint32 MaxLocalQueueSize = 20;
90+
optional uint32 MinLocalQueueSize = 21;
8991
}
9092

9193
message TScheduler {
@@ -125,6 +127,9 @@ message TActorSystemConfig {
125127

126128
optional bool MonitorStuckActors = 15;
127129
optional EActorSystemProfile ActorSystemProfile = 16;
130+
131+
optional uint32 MinLocalQueueSize = 20;
132+
optional uint32 MaxLocalQueueSize = 21;
128133
}
129134

130135
message TStaticNameserviceConfig {

ydb/library/actors/core/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ namespace NActors {
3232
EASProfile ActorSystemProfile = EASProfile::Default;
3333
bool HasSharedThread = false;
3434
bool UseRingQueue = false;
35+
ui16 MinLocalQueueSize = 0;
36+
ui16 MaxLocalQueueSize = 0;
3537
};
3638

3739
struct TSharedExecutorPoolConfig {

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ namespace NActors {
113113
, Harmonizer(harmonizer)
114114
, SoftProcessingDurationTs(cfg.SoftProcessingDurationTs)
115115
, HasOwnSharedThread(cfg.HasSharedThread)
116+
, MaxLocalQueueSize(cfg.MaxLocalQueueSize)
117+
, MinLocalQueueSize(cfg.MinLocalQueueSize)
116118
, Priority(cfg.Priority)
117119
, Jail(jail)
118120
, ActorSystemProfile(cfg.ActorSystemProfile)
@@ -124,13 +126,9 @@ namespace NActors {
124126
threads = threads - 1;
125127
}
126128

127-
if constexpr (NFeatures::IsLocalQueues()) {
129+
if (MaxLocalQueueSize) {
128130
LocalQueues.Reset(new NThreading::TPadded<std::queue<ui32>>[threads]);
129-
if constexpr (NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE) {
130-
LocalQueueSize = *NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE;
131-
} else {
132-
LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE;
133-
}
131+
LocalQueueSize = MinLocalQueueSize;
134132
}
135133
if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
136134
for (ui32 idx = 0; idx < threads; ++idx) {
@@ -284,7 +282,7 @@ namespace NActors {
284282
}
285283

286284
TMailbox* TBasicExecutorPool::GetReadyActivation(ui64 revolvingCounter) {
287-
if constexpr (NFeatures::IsLocalQueues()) {
285+
if (MaxLocalQueueSize) {
288286
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "local queue");
289287
return GetReadyActivationLocalQueue(revolvingCounter);
290288
} else {
@@ -389,7 +387,7 @@ namespace NActors {
389387
}
390388

391389
void TBasicExecutorPool::ScheduleActivationEx(TMailbox* mailbox, ui64 revolvingCounter) {
392-
if constexpr (NFeatures::IsLocalQueues()) {
390+
if (MaxLocalQueueSize) {
393391
ScheduleActivationExLocalQueue(mailbox, revolvingCounter);
394392
} else {
395393
ScheduleActivationExCommon(mailbox, revolvingCounter, AtomicGet(Semaphore));
@@ -629,9 +627,8 @@ namespace NActors {
629627
}
630628

631629
void TBasicExecutorPool::SetLocalQueueSize(ui16 size) {
632-
if constexpr (!NFeatures::TLocalQueuesFeatureFlags::FIXED_LOCAL_QUEUE_SIZE) {
633-
LocalQueueSize.store(std::max(size, NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE), std::memory_order_relaxed);
634-
}
630+
size = std::min(size, MaxLocalQueueSize);
631+
LocalQueueSize.store(size, std::memory_order_relaxed);
635632
}
636633

637634
void TBasicExecutorPool::Initialize() {
@@ -753,4 +750,16 @@ namespace NActors {
753750
return EventsPerMailboxValue;
754751
}
755752

753+
ui16 TBasicExecutorPool::GetLocalQueueSize() const {
754+
return LocalQueueSize.load(std::memory_order_relaxed);
755+
}
756+
757+
ui16 TBasicExecutorPool::GetMaxLocalQueueSize() const {
758+
return MaxLocalQueueSize;
759+
}
760+
761+
ui16 TBasicExecutorPool::GetMinLocalQueueSize() const {
762+
return MinLocalQueueSize;
763+
}
764+
756765
}

ydb/library/actors/core/executor_pool_basic.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ namespace NActors {
174174
IHarmonizer *Harmonizer;
175175
ui64 SoftProcessingDurationTs = 0;
176176
bool HasOwnSharedThread = false;
177+
ui16 MaxLocalQueueSize = 0;
178+
ui16 MinLocalQueueSize = 0;
177179

178180
const i16 Priority = 0;
179181
const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
@@ -243,7 +245,9 @@ namespace NActors {
243245
void ScheduleActivationExLocalQueue(TMailbox* mailbox, ui64 revolvingWriteCounter);
244246

245247
void SetLocalQueueSize(ui16 size);
246-
248+
ui16 GetLocalQueueSize() const;
249+
ui16 GetMaxLocalQueueSize() const;
250+
ui16 GetMinLocalQueueSize() const;
247251
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
248252
void Start() override;
249253
void PrepareStop() override;

ydb/library/actors/core/executor_pool_basic_feature_flags.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ namespace NActors::NFeatures {
2121
struct TLocalQueuesFeatureFlags {
2222
static constexpr EActorSystemOptimizationType OptimizationType = EActorSystemOptimizationType::LocalQueues;
2323

24-
static constexpr ui16 MIN_LOCAL_QUEUE_SIZE = 0;
25-
static constexpr ui16 MAX_LOCAL_QUEUE_SIZE = 16;
26-
static constexpr std::optional<ui16> FIXED_LOCAL_QUEUE_SIZE = std::nullopt;
27-
2824
static constexpr bool UseIfAllOtherThreadsAreSleeping = false;
2925
static constexpr bool UseOnMicroburst = false;
3026
};
@@ -43,8 +39,4 @@ namespace NActors::NFeatures {
4339
return TFeatureFlags::OptimizationType == EActorSystemOptimizationType::Common;
4440
}
4541

46-
consteval bool IsLocalQueues() {
47-
return TFeatureFlags::OptimizationType == EActorSystemOptimizationType::LocalQueues;
48-
}
49-
5042
}

ydb/library/actors/core/harmonizer/harmonizer.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,11 @@ void THarmonizer::ProcessNeedyState() {
179179
ProcessingBudget -= 1.0;
180180
LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
181181
}
182-
if constexpr (NFeatures::IsLocalQueues()) {
182+
if (pool.MaxLocalQueueSize) {
183183
bool needToExpandLocalQueue = ProcessingBudget < 1.0 || threadCount >= pool.MaxFullThreadCount;
184184
needToExpandLocalQueue &= (bool)pool.BasicPool;
185185
needToExpandLocalQueue &= (pool.MaxFullThreadCount > 1);
186-
needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE);
186+
needToExpandLocalQueue &= (pool.LocalQueueSize < pool.MaxLocalQueueSize);
187187
if (needToExpandLocalQueue) {
188188
pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize);
189189
}
@@ -258,8 +258,8 @@ void THarmonizer::ProcessHoggishState() {
258258
if (threadCount == pool.MinFullThreadCount && Shared && SharedInfo.ForeignThreadsAllowed[hoggishPoolIdx] != 0) {
259259
Shared->SetForeignThreadSlots(hoggishPoolIdx, 0);
260260
}
261-
if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
262-
pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
261+
if (pool.BasicPool && pool.LocalQueueSize > pool.MinLocalQueueSize) {
262+
pool.LocalQueueSize = std::min<ui16>(pool.MinLocalQueueSize, pool.LocalQueueSize / 2);
263263
pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
264264
}
265265
HARMONIZER_DEBUG_PRINT("poolIdx", hoggishPoolIdx, "threadCount", threadCount, "pool.MinFullThreadCount", pool.MinFullThreadCount, "freeCpu", freeCpu);
@@ -389,6 +389,9 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
389389
if (poolInfo.BasicPool) {
390390
poolInfo.WaitingStats.reset(new TWaitingStats<ui64>());
391391
poolInfo.MovingWaitingStats.reset(new TWaitingStats<double>());
392+
poolInfo.MinLocalQueueSize = poolInfo.BasicPool->GetMinLocalQueueSize();
393+
poolInfo.MaxLocalQueueSize = poolInfo.BasicPool->GetMaxLocalQueueSize();
394+
poolInfo.LocalQueueSize = poolInfo.MinLocalQueueSize;
392395
}
393396
PriorityOrder.clear();
394397
}

ydb/library/actors/core/harmonizer/pool.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ namespace NActors {
1212
LWTRACE_USING(ACTORLIB_PROVIDER);
1313

1414
TPoolInfo::TPoolInfo()
15-
: LocalQueueSize(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE)
1615
{}
1716

1817
double TPoolInfo::GetCpu(i16 threadIdx) const {

ydb/library/actors/core/harmonizer/pool.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,17 @@ struct TPoolInfo {
3535
float MinThreadCount = 0;
3636
float MaxThreadCount = 0;
3737

38+
ui16 LocalQueueSize;
39+
ui16 MaxLocalQueueSize = 0;
40+
ui16 MinLocalQueueSize = 0;
41+
3842
i16 Priority = 0;
3943
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
4044
NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
4145
ui32 MaxAvgPingUs = 0;
4246
ui64 LastUpdateTs = 0;
4347
ui64 NotEnoughCpuExecutions = 0;
4448
ui64 NewNotEnoughCpuExecutions = 0;
45-
ui16 LocalQueueSize;
4649

4750
std::atomic<float> SharedCpuQuota = 0;
4851
std::atomic<i64> LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish

0 commit comments

Comments
 (0)