Skip to content

Commit 1bb608a

Browse files
authored
merge 24-3: add ring queue config (#8780)
1 parent a1a4409 commit 1bb608a

File tree

8 files changed

+30
-24
lines changed

8 files changed

+30
-24
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ void AddExecutorPool(
303303
TBasicExecutorPoolConfig basic;
304304
basic.PoolId = poolId;
305305
basic.PoolName = poolConfig.GetName();
306+
basic.UseRingQueue = systemConfig.HasUseRingQueue() && systemConfig.GetUseRingQueue();
306307
if (poolConfig.HasMaxAvgPingDeviation()) {
307308
auto poolGroup = counters->GetSubgroup("execpool", basic.PoolName);
308309
auto &poolInfo = cpuManager.PingInfoByPool[poolId];

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ message TActorSystemConfig {
118118
optional ENodeType NodeType = 14 [default = COMPUTE];
119119
optional uint32 ForceIOPoolThreads = 17;
120120
optional bool UseSharedThreads = 18;
121+
optional bool UseRingQueue = 19;
121122

122123
optional bool MonitorStuckActors = 15;
123124
optional EActorSystemProfile ActorSystemProfile = 16;

ydb/library/actors/core/config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ namespace NActors {
3131
i16 SoftProcessingDurationTs = 0;
3232
EASProfile ActorSystemProfile = EASProfile::Default;
3333
bool HasSharedThread = false;
34+
bool UseRingQueue = false;
3435
};
3536

3637
struct TSharedExecutorPoolConfig {
@@ -47,6 +48,7 @@ namespace NActors {
4748
TString PoolName;
4849
ui32 Threads = 1;
4950
TCpuMask Affinity; // Executor thread affinity
51+
bool UseRingQueue = false;
5052
};
5153

5254
struct TSelfPingInfo {

ydb/library/actors/core/executor_pool_base.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,20 @@ namespace NActors {
6565
}
6666
#endif
6767

68-
TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity)
68+
TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue)
6969
: TExecutorPoolBaseMailboxed(poolId)
7070
, PoolThreads(threads)
7171
, ThreadsAffinity(affinity)
72-
#ifdef RING_ACTIVATION_QUEUE
73-
, Activations(threads == 1)
74-
#endif
75-
{}
72+
{
73+
if (useRingQueue) {
74+
Activations.emplace<TRingActivationQueue>(threads == 1);
75+
} else {
76+
Activations.emplace<TUnorderedCacheActivationQueue>();
77+
}
78+
}
7679

7780
TExecutorPoolBase::~TExecutorPoolBase() {
78-
while (Activations.Pop(0))
81+
while (std::visit([](auto &x){return x.Pop(0);}, Activations))
7982
;
8083
}
8184

ydb/library/actors/core/executor_pool_base.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,16 @@ namespace NActors {
4747

4848
class TExecutorPoolBase: public TExecutorPoolBaseMailboxed {
4949
protected:
50-
51-
#ifdef RING_ACTIVATION_QUEUE
52-
using TActivationQueue = TRingActivationQueue;
53-
#else
54-
using TActivationQueue = TUnorderedCache<ui32, 512, 4>;
55-
#endif
50+
using TUnorderedCacheActivationQueue = TUnorderedCache<ui32, 512, 4>;
5651

5752
const i16 PoolThreads;
5853
TIntrusivePtr<TAffinity> ThreadsAffinity;
5954
TAtomic Semaphore = 0;
60-
TActivationQueue Activations;
55+
std::variant<TUnorderedCacheActivationQueue, TRingActivationQueue> Activations;
6156
TAtomic ActivationsRevolvingCounter = 0;
6257
std::atomic_bool StopFlag = false;
6358
public:
64-
TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity);
59+
TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue);
6560
~TExecutorPoolBase();
6661
void ScheduleActivation(ui32 activation) override;
6762
void SpecificScheduleActivation(ui32 activation) override;

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ namespace NActors {
8282
}
8383

8484
TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer, TExecutorPoolJail *jail)
85-
: TExecutorPoolBase(cfg.PoolId, cfg.Threads, new TAffinity(cfg.Affinity))
85+
: TExecutorPoolBase(cfg.PoolId, cfg.Threads, new TAffinity(cfg.Affinity), cfg.UseRingQueue)
8686
, DefaultSpinThresholdCycles(cfg.SpinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
8787
, SpinThresholdCycles(DefaultSpinThresholdCycles)
8888
, SpinThresholdCyclesPerThread(new NThreading::TPadded<std::atomic<ui64>>[cfg.Threads])
@@ -235,7 +235,7 @@ namespace NActors {
235235
}
236236
} else {
237237
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
238-
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
238+
if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) {
239239
if (workerId >= 0) {
240240
Threads[workerId].SetWork();
241241
} else {
@@ -308,8 +308,9 @@ namespace NActors {
308308

309309
void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter, TAtomic x) {
310310
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
311-
312-
Activations.Push(activation, revolvingCounter);
311+
std::visit([activation, revolvingCounter](auto &x) {
312+
x.Push(activation, revolvingCounter);
313+
}, Activations);
313314
bool needToWakeUp = false;
314315
bool needToChangeOldSemaphore = true;
315316

ydb/library/actors/core/executor_pool_io.cpp

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
#include <ydb/library/actors/util/datetime.h>
77

88
namespace NActors {
9-
TIOExecutorPool::TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName, TAffinity* affinity)
10-
: TExecutorPoolBase(poolId, threads, affinity)
9+
TIOExecutorPool::TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName, TAffinity* affinity, bool useRingQueue)
10+
: TExecutorPoolBase(poolId, threads, affinity, useRingQueue)
1111
, Threads(new TExecutorThreadCtx[threads])
1212
, PoolName(poolName)
1313
{}
@@ -17,7 +17,8 @@ namespace NActors {
1717
cfg.PoolId,
1818
cfg.Threads,
1919
cfg.PoolName,
20-
new TAffinity(cfg.Affinity)
20+
new TAffinity(cfg.Affinity),
21+
cfg.UseRingQueue
2122
)
2223
{
2324
Harmonizer = harmonizer;
@@ -53,7 +54,7 @@ namespace NActors {
5354
}
5455

5556
while (!StopFlag.load(std::memory_order_acquire)) {
56-
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
57+
if (const ui32 activation = std::visit([&revolvingCounter](auto &x){return x.Pop(++revolvingCounter);}, Activations)) {
5758
return activation;
5859
}
5960
SpinLockPause();
@@ -86,7 +87,9 @@ namespace NActors {
8687
}
8788

8889
void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) {
89-
Activations.Push(activation, revolvingWriteCounter);
90+
std::visit([activation, revolvingWriteCounter](auto &x) {
91+
x.Push(activation, revolvingWriteCounter);
92+
}, Activations);
9093
const TAtomic x = AtomicIncrement(Semaphore);
9194
if (x <= 0) {
9295
for (;; ++revolvingWriteCounter) {

ydb/library/actors/core/executor_pool_io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ namespace NActors {
2626
const TString PoolName;
2727
const ui32 ActorSystemIndex = NActors::TActorTypeOperator::GetActorSystemIndex();
2828
public:
29-
TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr);
29+
TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName = "", TAffinity* affinity = nullptr, bool useRingQueue = false);
3030
explicit TIOExecutorPool(const TIOExecutorPoolConfig& cfg, IHarmonizer *harmonizer = nullptr);
3131
~TIOExecutorPool();
3232

0 commit comments

Comments
 (0)