Skip to content

Commit 8e44a97

Browse files
authored
Change activation queue to variant of queues (#8078)
1 parent ea5d31f commit 8e44a97

File tree

4 files changed

+22
-21
lines changed

4 files changed

+22
-21
lines changed

ydb/library/actors/core/executor_pool_base.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,20 @@ namespace NActors {
6565
}
6666
#endif
6767

68-
TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity)
68+
TExecutorPoolBase::TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue)
6969
: TExecutorPoolBaseMailboxed(poolId)
7070
, PoolThreads(threads)
7171
, ThreadsAffinity(affinity)
72-
#ifdef RING_ACTIVATION_QUEUE
73-
, Activations(threads == 1)
74-
#endif
75-
{}
72+
{
73+
if (useRingQueue) {
74+
Activations.emplace<TRingActivationQueue>(threads == 1);
75+
} else {
76+
Activations.emplace<TUnorderedCacheActivationQueue>();
77+
}
78+
}
7679

7780
TExecutorPoolBase::~TExecutorPoolBase() {
78-
while (Activations.Pop(0))
81+
while (std::visit([](auto &x){return x.Pop(0);}, Activations))
7982
;
8083
}
8184

ydb/library/actors/core/executor_pool_base.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,16 @@ namespace NActors {
4747

4848
class TExecutorPoolBase: public TExecutorPoolBaseMailboxed {
4949
protected:
50-
51-
#ifdef RING_ACTIVATION_QUEUE
52-
using TActivationQueue = TRingActivationQueue;
53-
#else
54-
using TActivationQueue = TUnorderedCache<ui32, 512, 4>;
55-
#endif
50+
using TUnorderedCacheActivationQueue = TUnorderedCache<ui32, 512, 4>;
5651

5752
const i16 PoolThreads;
5853
TIntrusivePtr<TAffinity> ThreadsAffinity;
5954
TAtomic Semaphore = 0;
60-
TActivationQueue Activations;
55+
std::variant<TUnorderedCacheActivationQueue, TRingActivationQueue> Activations;
6156
TAtomic ActivationsRevolvingCounter = 0;
6257
std::atomic_bool StopFlag = false;
6358
public:
64-
TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity);
59+
TExecutorPoolBase(ui32 poolId, ui32 threads, TAffinity* affinity, bool useRingQueue);
6560
~TExecutorPoolBase();
6661
void ScheduleActivation(ui32 activation) override;
6762
void SpecificScheduleActivation(ui32 activation) override;

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ namespace NActors {
8282
}
8383

8484
TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer, TExecutorPoolJail *jail)
85-
: TExecutorPoolBase(cfg.PoolId, cfg.Threads, new TAffinity(cfg.Affinity))
85+
: TExecutorPoolBase(cfg.PoolId, cfg.Threads, new TAffinity(cfg.Affinity), false)
8686
, DefaultSpinThresholdCycles(cfg.SpinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
8787
, SpinThresholdCycles(DefaultSpinThresholdCycles)
8888
, SpinThresholdCyclesPerThread(new NThreading::TPadded<std::atomic<ui64>>[cfg.Threads])
@@ -235,7 +235,7 @@ namespace NActors {
235235
}
236236
} else {
237237
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
238-
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
238+
if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) {
239239
if (workerId >= 0) {
240240
Threads[workerId].SetWork();
241241
} else {
@@ -308,8 +308,9 @@ namespace NActors {
308308

309309
void TBasicExecutorPool::ScheduleActivationExCommon(ui32 activation, ui64 revolvingCounter, TAtomic x) {
310310
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
311-
312-
Activations.Push(activation, revolvingCounter);
311+
std::visit([activation, revolvingCounter](auto &x) {
312+
x.Push(activation, revolvingCounter);
313+
}, Activations);
313314
bool needToWakeUp = false;
314315
bool needToChangeOldSemaphore = true;
315316

ydb/library/actors/core/executor_pool_io.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace NActors {
99
TIOExecutorPool::TIOExecutorPool(ui32 poolId, ui32 threads, const TString& poolName, TAffinity* affinity)
10-
: TExecutorPoolBase(poolId, threads, affinity)
10+
: TExecutorPoolBase(poolId, threads, affinity, false)
1111
, Threads(new TExecutorThreadCtx[threads])
1212
, PoolName(poolName)
1313
{}
@@ -53,7 +53,7 @@ namespace NActors {
5353
}
5454

5555
while (!StopFlag.load(std::memory_order_acquire)) {
56-
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
56+
if (const ui32 activation = std::visit([&revolvingCounter](auto &x){return x.Pop(++revolvingCounter);}, Activations)) {
5757
return activation;
5858
}
5959
SpinLockPause();
@@ -86,7 +86,9 @@ namespace NActors {
8686
}
8787

8888
void TIOExecutorPool::ScheduleActivationEx(ui32 activation, ui64 revolvingWriteCounter) {
89-
Activations.Push(activation, revolvingWriteCounter);
89+
std::visit([activation, revolvingWriteCounter](auto &x) {
90+
x.Push(activation, revolvingWriteCounter);
91+
}, Activations);
9092
const TAtomic x = AtomicIncrement(Semaphore);
9193
if (x <= 0) {
9294
for (;; ++revolvingWriteCounter) {

0 commit comments

Comments
 (0)