Skip to content

Commit cffada5

Browse files
authored
Add unit test for shared executor pool (#16112)
1 parent a6070b7 commit cffada5

11 files changed

+918
-207
lines changed

ydb/library/actors/core/cpu_manager.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,22 @@ namespace NActors {
5656
if (sharedThreadCount) {
5757
sht = 1;
5858
}
59-
poolInfos.push_back(TPoolShortInfo{static_cast<i16>(Config.Basic[poolIds[i]].PoolId), sharedThreadCount, true, Config.Basic[poolIds[i]].PoolName});
59+
poolInfos.push_back(TPoolShortInfo{
60+
.PoolId = static_cast<i16>(Config.Basic[poolIds[i]].PoolId),
61+
.SharedThreadCount = sharedThreadCount,
62+
.ForeignSlots = 0,
63+
.InPriorityOrder = true,
64+
.PoolName = Config.Basic[poolIds[i]].PoolName
65+
});
6066
}
6167
for (ui32 i = 0; i < Config.IO.size(); ++i) {
62-
poolInfos.push_back(TPoolShortInfo{static_cast<i16>(Config.IO[i].PoolId), 0, false, Config.IO[i].PoolName});
68+
poolInfos.push_back(TPoolShortInfo{
69+
.PoolId = static_cast<i16>(Config.IO[i].PoolId),
70+
.SharedThreadCount = 0,
71+
.ForeignSlots = 0,
72+
.InPriorityOrder = false,
73+
.PoolName = Config.IO[i].PoolName
74+
});
6375
}
6476
Shared = std::make_unique<TSharedExecutorPool>(Config.Shared, poolInfos);
6577

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ namespace NActors {
248248
}
249249
} else {
250250
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
251-
if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) {
251+
if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(revolvingCounter++);}, Activations)) {
252252
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "activation found");
253253
Threads[workerId].SetWork();
254254
AtomicDecrement(Semaphore);
@@ -381,6 +381,7 @@ namespace NActors {
381381
x = AtomicIncrement(Semaphore);
382382
needToChangeOldSemaphore = false;
383383
semaphore = TSemaphore::GetSemaphore(x);
384+
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "Semaphore incremented to ", semaphore.OldSemaphore, " CurrentSleepThreadCount == ", semaphore.CurrentSleepThreadCount);
384385
} else {
385386
x = *initSemaphore;
386387
semaphore = TSemaphore::GetSemaphore(x);
@@ -796,7 +797,7 @@ namespace NActors {
796797
return nullptr;
797798
} else {
798799
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
799-
if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(++revolvingCounter);}, Activations)) {
800+
if (const ui32 activation = std::visit([&revolvingCounter](auto &x) {return x.Pop(revolvingCounter++);}, Activations)) {
800801
SharedPool->Threads[workerId].SetWork();
801802
AtomicDecrement(Semaphore);
802803
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "activation == ", activation, " semaphore == ", semaphore.OldSemaphore);

ydb/library/actors/core/executor_pool_shared.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
namespace NActors {
3333

34-
TPoolManager::TPoolManager(const TVector<TPoolShortInfo> &poolInfos)
34+
TPoolManager::TPoolManager(const std::vector<TPoolShortInfo> &poolInfos)
3535
: PoolInfos(poolInfos)
3636
{
3737
PoolThreadRanges.resize(poolInfos.size());
@@ -53,7 +53,7 @@ namespace NActors {
5353

5454
TSharedExecutorPool::TSharedExecutorPool(
5555
const TSharedExecutorPoolConfig& cfg,
56-
const TVector<TPoolShortInfo> &poolInfos
56+
const std::vector<TPoolShortInfo> &poolInfos
5757
) : TExecutorPoolBaseMailboxed(0)
5858
, PoolThreads(SumThreads(poolInfos))
5959
, PoolManager(poolInfos)
@@ -80,9 +80,10 @@ namespace NActors {
8080
}
8181
}
8282
for (ui64 i = 0; i < PoolManager.PoolInfos.size(); ++i) {
83-
ForeignThreadsAllowedByPool[i].store(0, std::memory_order_release);
84-
ForeignThreadSlots[i].store(0, std::memory_order_release);
85-
LocalThreads[i].store(PoolManager.PoolInfos[i].SharedThreadCount, std::memory_order_release);
83+
auto &poolInfo = PoolManager.PoolInfos[i];
84+
ForeignThreadsAllowedByPool[i].store(poolInfo.ForeignSlots, std::memory_order_release);
85+
ForeignThreadSlots[i].store(poolInfo.ForeignSlots, std::memory_order_release);
86+
LocalThreads[i].store(poolInfo.SharedThreadCount, std::memory_order_release);
8687
LocalNotifications[i].store(0, std::memory_order_release);
8788
}
8889
Y_ABORT_UNLESS(passedThreads == static_cast<ui64>(PoolThreads), "Passed threads %" PRIu64 " != PoolThreads %" PRIu64, passedThreads, static_cast<ui64>(PoolThreads));
@@ -94,7 +95,7 @@ namespace NActors {
9495
Threads.Destroy();
9596
}
9697

97-
i16 TSharedExecutorPool::SumThreads(const TVector<TPoolShortInfo> &poolInfos) {
98+
i16 TSharedExecutorPool::SumThreads(const std::vector<TPoolShortInfo> &poolInfos) {
9899
i16 threadCount = 0;
99100
for (const auto &poolInfo : poolInfos) {
100101
threadCount += poolInfo.SharedThreadCount;
@@ -403,6 +404,14 @@ namespace NActors {
403404
return PoolThreads;
404405
}
405406

407+
i16 TSharedExecutorPool::GetMaxFullThreadCount() const {
408+
return PoolThreads;
409+
}
410+
411+
i16 TSharedExecutorPool::GetMinFullThreadCount() const {
412+
return PoolThreads;
413+
}
414+
406415
ui32 TSharedExecutorPool::GetThreads() const {
407416
return PoolThreads;
408417
}
@@ -479,6 +488,7 @@ namespace NActors {
479488
}
480489
}
481490
}
491+
EXECUTOR_POOL_SHARED_DEBUG(EDebugLevel::Activation, "no threads");
482492
return false;
483493
}
484494

ydb/library/actors/core/executor_pool_shared.h

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace NActors {
2929
struct TPoolShortInfo {
3030
i16 PoolId = 0;
3131
i16 SharedThreadCount = 0;
32+
i16 ForeignSlots = 0;
3233
bool InPriorityOrder = false;
3334
TString PoolName;
3435
};
@@ -39,11 +40,11 @@ namespace NActors {
3940
};
4041

4142
struct TPoolManager {
42-
TStackVec<TPoolShortInfo, 8> PoolInfos;
43+
std::vector<TPoolShortInfo> PoolInfos;
4344
TStackVec<TPoolThreadRange, 8> PoolThreadRanges;
4445
TStackVec<i16, 8> PriorityOrder;
4546

46-
TPoolManager(const TVector<TPoolShortInfo> &poolInfos);
47+
TPoolManager(const std::vector<TPoolShortInfo> &poolInfos);
4748
};
4849

4950
class ISharedPool {
@@ -115,10 +116,10 @@ namespace NActors {
115116
static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX;
116117
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX;
117118

118-
explicit TSharedExecutorPool(const TSharedExecutorPoolConfig& cfg, const TVector<TPoolShortInfo> &poolInfos);
119+
explicit TSharedExecutorPool(const TSharedExecutorPoolConfig& cfg, const std::vector<TPoolShortInfo> &poolInfos);
119120
~TSharedExecutorPool();
120121

121-
i16 SumThreads(const TVector<TPoolShortInfo> &poolInfos);
122+
i16 SumThreads(const std::vector<TPoolShortInfo> &poolInfos);
122123
void Initialize() override;
123124
i16 FindPoolForWorker(TSharedExecutorThreadCtx& thread, ui64 revolvingReadCounter);
124125
TMailbox* GetReadyActivation(ui64 revolvingReadCounter) override;
@@ -148,6 +149,8 @@ namespace NActors {
148149

149150
ui32 GetThreads() const override;
150151
float GetThreadCount() const override;
152+
i16 GetMaxFullThreadCount() const override;
153+
i16 GetMinFullThreadCount() const override;
151154
float GetDefaultThreadCount() const override;
152155
float GetMinThreadCount() const override;
153156
float GetMaxThreadCount() const override;
@@ -180,5 +183,9 @@ namespace NActors {
180183
TAffinity* Affinity() const override {
181184
return nullptr;
182185
}
186+
187+
const TPoolManager& GetPoolManager() const {
188+
return PoolManager;
189+
}
183190
};
184191
}

ydb/library/actors/core/ut/executor_pool_basic_ut.cpp

Lines changed: 0 additions & 188 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,6 @@
99

1010
using namespace NActors;
1111

12-
#define VALUES_EQUAL(a, b, ...) \
13-
UNIT_ASSERT_VALUES_EQUAL_C((a), (b), (i64)semaphore.OldSemaphore \
14-
<< ' ' << (i64)semaphore.CurrentSleepThreadCount \
15-
<< ' ' << (i64)semaphore.CurrentThreadCount __VA_ARGS__);
16-
1712
////////////////////////////////////////////////////////////////////////////////
1813

1914
struct TEvMsg : public NActors::TEventLocal<TEvMsg, 10347> {
@@ -90,191 +85,8 @@ THolder<TActorSystemSetup> GetActorSystemSetup(TBasicExecutorPool* pool)
9085
return setup;
9186
}
9287

93-
Y_UNIT_TEST_SUITE(WaitingBenchs) {
94-
95-
Y_UNIT_TEST(SpinPause) {
96-
const ui32 count = 1'000'000;
97-
ui64 startTs = GetCycleCountFast();
98-
for (ui32 idx = 0; idx < count; ++idx) {
99-
SpinLockPause();
100-
}
101-
ui64 stopTs = GetCycleCountFast();
102-
Cerr << Ts2Us(stopTs - startTs) / count << Endl;
103-
Cerr << double(stopTs - startTs) / count << Endl;
104-
}
105-
106-
struct TThread : public ISimpleThread {
107-
static const ui64 CyclesInMicroSecond;
108-
std::array<ui64, 128> Hist;
109-
ui64 WakingTime = 0;
110-
ui64 AwakeningTime = 0;
111-
ui64 SleepTime = 0;
112-
ui64 IterationCount = 0;
113-
114-
std::atomic<ui64> Awakens = 0;
115-
std::atomic<ui64> *OtherAwaken;
116-
117-
TThreadParkPad OwnPad;
118-
TThreadParkPad *OtherPad;
119-
120-
bool IsWaiting = false;
121-
122-
void GoToWait() {
123-
ui64 start = GetCycleCountFast();
124-
OwnPad.Park();
125-
ui64 elapsed = GetCycleCountFast() - start;
126-
AwakeningTime += elapsed;
127-
ui64 idx = std::min(Hist.size() - 1, (elapsed - 20 * CyclesInMicroSecond) / CyclesInMicroSecond);
128-
Hist[idx]++;
129-
Awakens++;
130-
}
131-
132-
void GoToWakeUp() {
133-
ui64 start = GetCycleCountFast();
134-
OtherPad->Unpark();
135-
ui64 elapsed = GetCycleCountFast() - start;
136-
WakingTime += elapsed;
137-
ui64 idx = std::min(Hist.size() - 1, elapsed / CyclesInMicroSecond);
138-
Hist[idx]++;
139-
}
140-
141-
void GoToSleep() {
142-
ui64 start = GetCycleCountFast();
143-
ui64 stop = start;
144-
while (stop - start < 20 * CyclesInMicroSecond) {
145-
SpinLockPause();
146-
stop = GetCycleCountFast();
147-
}
148-
SleepTime += stop - start;
149-
}
150-
151-
void* ThreadProc() {
152-
for (ui32 idx = 0; idx < IterationCount; ++idx) {
153-
if (IsWaiting) {
154-
GoToWait();
155-
} else {
156-
GoToSleep();
157-
GoToWakeUp();
158-
while(OtherAwaken->load() == idx) {
159-
SpinLockPause();
160-
}
161-
}
162-
}
163-
return nullptr;
164-
}
165-
};
166-
167-
const ui64 TThread::CyclesInMicroSecond = NHPTimer::GetCyclesPerSecond() * 0.000001;
168-
169-
Y_UNIT_TEST(WakingUpTest) {
170-
TThread a, b;
171-
constexpr ui64 iterations = 100'000;
172-
std::fill(a.Hist.begin(), a.Hist.end(), 0);
173-
std::fill(b.Hist.begin(), b.Hist.end(), 0);
174-
a.IterationCount = iterations;
175-
b.IterationCount = iterations;
176-
a.IsWaiting = true;
177-
b.IsWaiting = false;
178-
b.OtherAwaken = &a.Awakens;
179-
a.OtherPad = &b.OwnPad;
180-
b.OtherPad = &a.OwnPad;
181-
a.Start();
182-
b.Start();
183-
a.Join();
184-
b.Join();
185-
186-
ui64 awakeningTime = a.AwakeningTime + b.AwakeningTime - a.SleepTime - b.SleepTime;
187-
ui64 wakingUpTime = a.WakingTime + b.WakingTime;
188-
189-
Cerr << "AvgAwakeningCycles: " << double(awakeningTime) / iterations << Endl;
190-
Cerr << "AvgAwakeningUs: " << Ts2Us(awakeningTime) / iterations << Endl;
191-
Cerr << "AvgSleep20usCycles:" << double(b.SleepTime) / iterations << Endl;
192-
Cerr << "AvgSleep20usUs:" << Ts2Us(b.SleepTime) / iterations << Endl;
193-
Cerr << "AvgWakingUpCycles: " << double(wakingUpTime) / iterations << Endl;
194-
Cerr << "AvgWakingUpUs: " << Ts2Us(wakingUpTime) / iterations << Endl;
195-
196-
Cerr << "AwakeningHist:\n";
197-
for (ui32 idx = 0; idx < a.Hist.size(); ++idx) {
198-
if (a.Hist[idx]) {
199-
if (idx + 1 != a.Hist.size()) {
200-
Cerr << " [" << idx << "us - " << idx + 1 << "us] " << a.Hist[idx] << Endl;
201-
} else {
202-
Cerr << " [" << idx << "us - ...] " << a.Hist[idx] << Endl;
203-
}
204-
}
205-
}
206-
207-
Cerr << "WakingUpHist:\n";
208-
for (ui32 idx = 0; idx < b.Hist.size(); ++idx) {
209-
if (b.Hist[idx]) {
210-
if (idx + 1 != b.Hist.size()) {
211-
Cerr << " [" << idx << "us - " << idx + 1 << "us] " << b.Hist[idx] << Endl;
212-
} else {
213-
Cerr << " [" << idx << "us - ...] " << b.Hist[idx] << Endl;
214-
}
215-
}
216-
}
217-
}
218-
219-
}
220-
22188
Y_UNIT_TEST_SUITE(BasicExecutorPool) {
22289

223-
Y_UNIT_TEST(Semaphore) {
224-
TBasicExecutorPool::TSemaphore semaphore;
225-
semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(0);
226-
227-
VALUES_EQUAL(0, semaphore.ConvertToI64());
228-
semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(-1);
229-
VALUES_EQUAL(-1, semaphore.ConvertToI64());
230-
semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(1);
231-
VALUES_EQUAL(1, semaphore.ConvertToI64());
232-
233-
for (i64 value = -1'000'000; value <= 1'000'000; ++value) {
234-
VALUES_EQUAL(TBasicExecutorPool::TSemaphore::GetSemaphore(value).ConvertToI64(), value);
235-
}
236-
237-
for (i8 sleepThreads = -10; sleepThreads <= 10; ++sleepThreads) {
238-
239-
semaphore = TBasicExecutorPool::TSemaphore();
240-
semaphore.CurrentSleepThreadCount = sleepThreads;
241-
i64 initialValue = semaphore.ConvertToI64();
242-
243-
semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(initialValue - 1);
244-
VALUES_EQUAL(-1, semaphore.OldSemaphore);
245-
246-
i64 value = initialValue;
247-
value -= 100;
248-
for (i32 expected = -100; expected <= 100; ++expected) {
249-
semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value);
250-
UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore
251-
<< ' ' << (i64)semaphore.CurrentSleepThreadCount
252-
<< ' ' << (i64)semaphore.CurrentThreadCount);
253-
UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore
254-
<< ' ' << (i64)semaphore.CurrentSleepThreadCount
255-
<< ' ' << (i64)semaphore.CurrentThreadCount);
256-
semaphore = TBasicExecutorPool::TSemaphore();
257-
semaphore.OldSemaphore = expected;
258-
semaphore.CurrentSleepThreadCount = sleepThreads;
259-
UNIT_ASSERT_VALUES_EQUAL(semaphore.ConvertToI64(), value);
260-
value++;
261-
}
262-
263-
for (i32 expected = 101; expected >= -101; --expected) {
264-
semaphore = TBasicExecutorPool::TSemaphore::GetSemaphore(value);
265-
UNIT_ASSERT_VALUES_EQUAL_C(expected, semaphore.OldSemaphore, (i64)semaphore.OldSemaphore
266-
<< ' ' << (i64)semaphore.CurrentSleepThreadCount
267-
<< ' ' << (i64)semaphore.CurrentThreadCount);
268-
UNIT_ASSERT_VALUES_EQUAL_C(sleepThreads, semaphore.CurrentSleepThreadCount, (i64)semaphore.OldSemaphore
269-
<< ' ' << (i64)semaphore.CurrentSleepThreadCount
270-
<< ' ' << (i64)semaphore.CurrentThreadCount);
271-
value--;
272-
}
273-
}
274-
275-
//UNIT_ASSERT_VALUES_EQUAL_C(-1, TBasicExecutorPool::TSemaphore::GetSemaphore(value-1).OldSemaphore);
276-
}
277-
27890
Y_UNIT_TEST(CheckCompleteOne) {
27991
const size_t size = 4;
28092
const size_t msgCount = 1e4;

0 commit comments

Comments
 (0)