Skip to content

Commit f9e79db

Browse files
authored
Fix miscalculated budget in THarmonizer (#12422)
1 parent a15d870 commit f9e79db

33 files changed

+2644
-1014
lines changed

ydb/library/actors/core/cpu_manager.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ namespace NActors {
3737
poolsWithSharedThreads.push_back(cfg.PoolId);
3838
}
3939
}
40-
Shared.reset(new TSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
41-
auto sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
40+
Shared.reset(CreateSharedExecutorPool(Config.Shared, ExecutorPoolCount, poolsWithSharedThreads));
41+
auto sharedPool = static_cast<ISharedExecutorPool*>(Shared.get());
4242

4343
ui64 ts = GetCycleCountFast();
4444
Harmonizer.reset(MakeHarmonizer(ts));
@@ -135,11 +135,9 @@ namespace NActors {
135135
for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
136136
if (cfg.PoolId == poolId) {
137137
if (cfg.HasSharedThread) {
138-
auto *sharedPool = static_cast<TSharedExecutorPool*>(Shared.get());
138+
auto *sharedPool = Shared.get();
139139
auto *pool = new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());
140-
if (pool) {
141-
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
142-
}
140+
pool->AddSharedThread(sharedPool->GetSharedThread(poolId));
143141
return pool;
144142
} else {
145143
return new TBasicExecutorPool(cfg, Harmonizer.get(), Jail.get());

ydb/library/actors/core/cpu_manager.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
#include "config.h"
44
#include "executor_pool_jail.h"
5-
#include "harmonizer.h"
65
#include "executor_pool.h"
76
#include "executor_pool_shared.h"
87
#include "mon_stats.h"
8+
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
99
#include <memory>
1010

1111
namespace NActors {
@@ -16,7 +16,7 @@ namespace NActors {
1616
const ui32 ExecutorPoolCount;
1717
TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
1818
std::unique_ptr<IHarmonizer> Harmonizer;
19-
std::unique_ptr<TSharedExecutorPool> Shared;
19+
std::unique_ptr<ISharedExecutorPool> Shared;
2020
std::unique_ptr<TExecutorPoolJail> Jail;
2121
TCpuManagerConfig Config;
2222

ydb/library/actors/core/executor_pool.h

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,16 @@ namespace NActors {
1414
struct TExecutorThreadStats;
1515
class TExecutorPoolJail;
1616
class ISchedulerCookie;
17+
struct TSharedExecutorThreadCtx;
1718

1819
struct TCpuConsumption {
19-
double ConsumedUs = 0;
20-
double BookedUs = 0;
20+
double CpuUs = 0;
21+
double ElapsedUs = 0;
2122
ui64 NotEnoughCpuExecutions = 0;
2223

2324
void Add(const TCpuConsumption& other) {
24-
ConsumedUs += other.ConsumedUs;
25-
BookedUs += other.BookedUs;
25+
CpuUs += other.CpuUs;
26+
ElapsedUs += other.ElapsedUs;
2627
NotEnoughCpuExecutions += other.NotEnoughCpuExecutions;
2728
}
2829
};
@@ -176,6 +177,16 @@ namespace NActors {
176177
return 1;
177178
}
178179

180+
virtual TSharedExecutorThreadCtx* ReleaseSharedThread() {
181+
return nullptr;
182+
}
183+
virtual void AddSharedThread(TSharedExecutorThreadCtx*) {
184+
}
185+
186+
virtual bool IsSharedThreadEnabled() const {
187+
return false;
188+
}
189+
179190
virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
180191
Y_UNUSED(threadIdx);
181192
return TCpuConsumption{0.0, 0.0};

ydb/library/actors/core/executor_pool_base.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "actorsystem.h"
2+
#include "activity_guard.h"
23
#include "actor.h"
34
#include "executor_pool_base.h"
45
#include "executor_pool_basic_feature_flags.h"

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,6 @@ namespace NActors {
407407
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
408408
poolStats.DecreasingThreadsByExchange = stats.DecreasingThreadsByExchange;
409409
poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
410-
poolStats.MaxConsumedCpuUs = stats.MaxConsumedCpu;
411-
poolStats.MinConsumedCpuUs = stats.MinConsumedCpu;
412-
poolStats.MaxBookedCpuUs = stats.MaxBookedCpu;
413-
poolStats.MinBookedCpuUs = stats.MinBookedCpu;
414410
}
415411

416412
statsCopy.resize(MaxFullThreadCount + 1);
@@ -429,7 +425,7 @@ namespace NActors {
429425
void TBasicExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
430426
if (Harmonizer) {
431427
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
432-
poolState.UsedCpu = stats.AvgConsumedCpu;
428+
poolState.ElapsedCpu = stats.AvgElapsedCpu;
433429
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
434430
} else {
435431
poolState.PossibleMaxLimit = poolState.MaxLimit;
@@ -625,7 +621,7 @@ namespace NActors {
625621
TExecutorThreadCtx& threadCtx = Threads[threadIdx];
626622
TExecutorThreadStats stats;
627623
threadCtx.Thread->GetCurrentStatsForHarmonizer(stats);
628-
return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs), stats.NotEnoughCpuExecutions};
624+
return {static_cast<double>(stats.CpuUs), Ts2Us(stats.SafeElapsedTicks), stats.NotEnoughCpuExecutions};
629625
}
630626

631627
i16 TBasicExecutorPool::GetBlockingThreadCount() const {

ydb/library/actors/core/executor_pool_basic.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
#include "executor_pool_basic_feature_flags.h"
88
#include "scheduler_queue.h"
99
#include "executor_pool_base.h"
10-
#include "harmonizer.h"
1110
#include <memory>
11+
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
1212
#include <ydb/library/actors/actor_type/indexes.h>
1313
#include <ydb/library/actors/util/unordered_cache.h>
1414
#include <ydb/library/actors/util/threadparkpad.h>
@@ -278,8 +278,11 @@ namespace NActors {
278278
void CalcSpinPerThread(ui64 wakingUpConsumption);
279279
void ClearWaitingStats() const;
280280

281-
TSharedExecutorThreadCtx* ReleaseSharedThread();
282-
void AddSharedThread(TSharedExecutorThreadCtx* thread);
281+
TSharedExecutorThreadCtx* ReleaseSharedThread() override;
282+
void AddSharedThread(TSharedExecutorThreadCtx* thread) override;
283+
bool IsSharedThreadEnabled() const override {
284+
return true;
285+
}
283286

284287
private:
285288
void AskToGoToSleep(bool *needToWait, bool *needToBlock);

ydb/library/actors/core/executor_pool_io.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ namespace NActors {
156156
void TIOExecutorPool::GetExecutorPoolState(TExecutorPoolState &poolState) const {
157157
if (Harmonizer) {
158158
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
159-
poolState.UsedCpu = stats.AvgConsumedCpu;
159+
poolState.ElapsedCpu = stats.AvgElapsedCpu;
160160
}
161161
poolState.CurrentLimit = PoolThreads;
162162
poolState.MaxLimit = PoolThreads;

ydb/library/actors/core/executor_pool_io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
#include "actorsystem.h"
44
#include "executor_thread.h"
55
#include "executor_thread_ctx.h"
6-
#include "harmonizer.h"
76
#include "scheduler_queue.h"
87
#include "executor_pool_base.h"
8+
#include <ydb/library/actors/core/harmonizer/harmonizer.h>
99
#include <ydb/library/actors/actor_type/indexes.h>
1010
#include <ydb/library/actors/util/ticket_lock.h>
1111
#include <ydb/library/actors/util/unordered_cache.h>

ydb/library/actors/core/executor_pool_shared.cpp

Lines changed: 121 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,50 @@
1212

1313
namespace NActors {
1414

15+
class TSharedExecutorPool: public ISharedExecutorPool {
16+
public:
17+
TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads);
18+
19+
// IThreadPool
20+
void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) override;
21+
void Start() override;
22+
void PrepareStop() override;
23+
void Shutdown() override;
24+
bool Cleanup() override;
25+
26+
TSharedExecutorThreadCtx *GetSharedThread(i16 poolId) override;
27+
void GetSharedStats(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
28+
void GetSharedStatsForHarmonizer(i16 pool, std::vector<TExecutorThreadStats>& statsCopy) override;
29+
TCpuConsumption GetThreadCpuConsumption(i16 poolId, i16 threadIdx) override;
30+
std::vector<TCpuConsumption> GetThreadsCpuConsumption(i16 poolId) override;
31+
32+
i16 ReturnOwnHalfThread(i16 pool) override;
33+
i16 ReturnBorrowedHalfThread(i16 pool) override;
34+
void GiveHalfThread(i16 from, i16 to) override;
35+
36+
i16 GetSharedThreadCount() const override;
37+
38+
TSharedPoolState GetState() const override;
39+
40+
void Init(const std::vector<IExecutorPool*>& pools, bool withThreads) override;
41+
42+
private:
43+
TSharedPoolState State;
44+
45+
std::vector<IExecutorPool*> Pools;
46+
47+
i16 PoolCount;
48+
i16 SharedThreadCount;
49+
std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
50+
51+
std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
52+
std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
53+
54+
TDuration TimePerMailbox;
55+
ui32 EventsPerMailbox;
56+
ui64 SoftProcessingDurationTs;
57+
}; // class TSharedExecutorPool
58+
1559
TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads)
1660
: State(poolCount, poolsWithThreads.size())
1761
, Pools(poolCount)
@@ -29,40 +73,47 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
2973
}
3074
}
3175

32-
void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
33-
// ActorSystem = actorSystem;
34-
35-
ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
36-
ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
37-
38-
std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
39-
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
40-
for (IExecutorPool* pool : poolsBasic) {
41-
Pools[pool->PoolId] = dynamic_cast<TBasicExecutorPool*>(pool);
42-
i16 threadIdx = State.ThreadByPool[pool->PoolId];
43-
if (threadIdx >= 0) {
44-
poolByThread[threadIdx] = pool;
45-
}
76+
void TSharedExecutorPool::Init(const std::vector<IExecutorPool*>& pools, bool withThreads) {
77+
std::vector<IExecutorPool*> poolByThread(SharedThreadCount);
78+
for (IExecutorPool* pool : pools) {
79+
Pools[pool->PoolId] = pool;
80+
i16 threadIdx = State.ThreadByPool[pool->PoolId];
81+
if (threadIdx >= 0) {
82+
poolByThread[threadIdx] = pool;
4683
}
84+
}
4785

48-
for (i16 i = 0; i != SharedThreadCount; ++i) {
49-
// !TODO
50-
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
86+
for (i16 i = 0; i != SharedThreadCount; ++i) {
87+
// !TODO
88+
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
89+
if (withThreads) {
5190
Threads[i].Thread.reset(
5291
new TSharedExecutorThread(
5392
-1,
54-
actorSystem,
55-
&Threads[i],
56-
PoolCount,
57-
"SharedThread",
58-
SoftProcessingDurationTs,
59-
TimePerMailbox,
93+
nullptr,
94+
&Threads[i],
95+
PoolCount,
96+
"SharedThread",
97+
SoftProcessingDurationTs,
98+
TimePerMailbox,
6099
EventsPerMailbox));
61-
ScheduleWriters[i].Init(ScheduleReaders[i]);
62100
}
101+
}
102+
}
103+
104+
void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) {
105+
ScheduleReaders.reset(new NSchedulerQueue::TReader[SharedThreadCount]);
106+
ScheduleWriters.reset(new NSchedulerQueue::TWriter[SharedThreadCount]);
107+
108+
std::vector<IExecutorPool*> poolsBasic = actorSystem->GetBasicExecutorPools();
109+
Init(poolsBasic, true);
63110

64-
*scheduleReaders = ScheduleReaders.get();
65-
*scheduleSz = SharedThreadCount;
111+
for (i16 i = 0; i != SharedThreadCount; ++i) {
112+
ScheduleWriters[i].Init(ScheduleReaders[i]);
113+
}
114+
115+
*scheduleReaders = ScheduleReaders.get();
116+
*scheduleSz = SharedThreadCount;
66117
}
67118

68119
void TSharedExecutorPool::Start() {
@@ -99,24 +150,27 @@ TSharedExecutorThreadCtx* TSharedExecutorPool::GetSharedThread(i16 pool) {
99150
return &Threads[threadIdx];
100151
}
101152

102-
void TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
153+
i16 TSharedExecutorPool::ReturnOwnHalfThread(i16 pool) {
103154
i16 threadIdx = State.ThreadByPool[pool];
104-
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
155+
IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
105156
Y_ABORT_UNLESS(borrowingPool);
106-
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
157+
i16 borrowedPool = State.PoolByBorrowedThread[threadIdx];
158+
State.BorrowedThreadByPool[borrowedPool] = -1;
107159
State.PoolByBorrowedThread[threadIdx] = -1;
108160
// TODO(kruall): Check on race
109161
borrowingPool->ReleaseSharedThread();
162+
return borrowedPool;
110163
}
111164

112-
void TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
165+
i16 TSharedExecutorPool::ReturnBorrowedHalfThread(i16 pool) {
113166
i16 threadIdx = State.BorrowedThreadByPool[pool];
114-
TBasicExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
167+
IExecutorPool* borrowingPool = Threads[threadIdx].ExecutorPools[1].exchange(nullptr, std::memory_order_acq_rel);
115168
Y_ABORT_UNLESS(borrowingPool);
116169
State.BorrowedThreadByPool[State.PoolByBorrowedThread[threadIdx]] = -1;
117170
State.PoolByBorrowedThread[threadIdx] = -1;
118171
// TODO(kruall): Check on race
119172
borrowingPool->ReleaseSharedThread();
173+
return State.PoolByThread[threadIdx];
120174
}
121175

122176
void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
@@ -127,14 +181,14 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
127181
if (borrowedThreadIdx != -1) {
128182
i16 originalPool = State.PoolByThread[borrowedThreadIdx];
129183
if (originalPool == to) {
130-
return ReturnOwnHalfThread(to);
184+
ReturnOwnHalfThread(to);
131185
} else {
132186
ReturnOwnHalfThread(originalPool);
133187
}
134188
from = originalPool;
135189
}
136190
i16 threadIdx = State.ThreadByPool[from];
137-
TBasicExecutorPool* borrowingPool = Pools[to];
191+
IExecutorPool* borrowingPool = Pools[to];
138192
Threads[threadIdx].ExecutorPools[1].store(borrowingPool, std::memory_order_release);
139193
State.BorrowedThreadByPool[to] = threadIdx;
140194
State.PoolByBorrowedThread[threadIdx] = to;
@@ -143,16 +197,16 @@ void TSharedExecutorPool::GiveHalfThread(i16 from, i16 to) {
143197
}
144198

145199
void TSharedExecutorPool::GetSharedStats(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
146-
statsCopy.resize(SharedThreadCount + 1);
200+
statsCopy.resize(SharedThreadCount);
147201
for (i16 i = 0; i < SharedThreadCount; ++i) {
148-
Threads[i].Thread->GetSharedStats(poolId, statsCopy[i + 1]);
202+
Threads[i].Thread->GetSharedStats(poolId, statsCopy[i]);
149203
}
150204
}
151205

152206
void TSharedExecutorPool::GetSharedStatsForHarmonizer(i16 poolId, std::vector<TExecutorThreadStats>& statsCopy) {
153-
statsCopy.resize(SharedThreadCount + 1);
207+
statsCopy.resize(SharedThreadCount);
154208
for (i16 i = 0; i < SharedThreadCount; ++i) {
155-
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i + 1]);
209+
Threads[i].Thread->GetSharedStatsForHarmonizer(poolId, statsCopy[i]);
156210
}
157211
}
158212

@@ -181,4 +235,34 @@ TSharedPoolState TSharedExecutorPool::GetState() const {
181235
return State;
182236
}
183237

238+
ISharedExecutorPool *CreateSharedExecutorPool(const TSharedExecutorPoolConfig &config, i16 poolCount, std::vector<i16> poolsWithThreads) {
239+
return new TSharedExecutorPool(config, poolCount, poolsWithThreads);
240+
}
241+
242+
TString TSharedPoolState::ToString() const {
243+
TStringBuilder builder;
244+
builder << '{';
245+
builder << "ThreadByPool: [";
246+
for (ui32 i = 0; i < ThreadByPool.size(); ++i) {
247+
builder << ThreadByPool[i] << (i == ThreadByPool.size() - 1 ? "" : ", ");
248+
}
249+
builder << "], ";
250+
builder << "PoolByThread: [";
251+
for (ui32 i = 0; i < PoolByThread.size(); ++i) {
252+
builder << PoolByThread[i] << (i == PoolByThread.size() - 1 ? "" : ", ");
253+
}
254+
builder << "], ";
255+
builder << "BorrowedThreadByPool: [";
256+
for (ui32 i = 0; i < BorrowedThreadByPool.size(); ++i) {
257+
builder << BorrowedThreadByPool[i] << (i == BorrowedThreadByPool.size() - 1 ? "" : ", ");
258+
}
259+
builder << "], ";
260+
builder << "PoolByBorrowedThread: [";
261+
for (ui32 i = 0; i < PoolByBorrowedThread.size(); ++i) {
262+
builder << PoolByBorrowedThread[i] << (i == PoolByBorrowedThread.size() - 1 ? "" : ", ");
263+
}
264+
builder << ']';
265+
return builder << '}';
266+
}
267+
184268
}

0 commit comments

Comments
 (0)