Skip to content

Commit 7a47534

Browse files
committed
Improve behaviour in corner cases when used shared threads (#19693)
1 parent 4df3536 commit 7a47534

File tree

10 files changed

+387
-56
lines changed

10 files changed

+387
-56
lines changed

ydb/library/actors/core/executor_pool_basic.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ namespace NActors {
4949

5050
constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX;
5151

52-
TString GetCurrentThreadKind() {
52+
TString GetCurrentThreadKind() {
5353
if (TlsThreadContext) {
5454
return TlsThreadContext->WorkerId() >= 0 ? "[common]" : "[shared]";
5555
}
@@ -268,7 +268,7 @@ namespace NActors {
268268
if (StopFlag.load(std::memory_order_acquire)) {
269269
return nullptr;
270270
}
271-
271+
272272
TWorkerId workerId = TlsThreadContext->WorkerId();
273273
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "");
274274
NHPTimer::STime hpnow = GetCycleCountFast();
@@ -391,7 +391,7 @@ namespace NActors {
391391
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "shared pool wake up local threads");
392392
return;
393393
}
394-
}
394+
}
395395

396396
i16 sleepThreads = 0;
397397
Y_UNUSED(sleepThreads);
@@ -507,6 +507,10 @@ namespace NActors {
507507
TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId);
508508
poolState.ElapsedCpu = stats.AvgElapsedCpu;
509509
poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount;
510+
poolState.SharedCpuQuota = stats.SharedCpuQuota;
511+
poolState.IsNeedy = stats.IsNeedy;
512+
poolState.IsStarved = stats.IsStarved;
513+
poolState.IsHoggish = stats.IsHoggish;
510514
} else {
511515
poolState.PossibleMaxLimit = poolState.MaxLimit;
512516
}
@@ -675,7 +679,7 @@ namespace NActors {
675679
i16 TBasicExecutorPool::GetMaxFullThreadCount() const {
676680
return MaxFullThreadCount;
677681
}
678-
682+
679683
ui32 TBasicExecutorPool::GetThreads() const {
680684
return MaxFullThreadCount;
681685
}

ydb/library/actors/core/harmonizer/cpu_consumption.cpp

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ void TCpuConsumptionInfo::Clear() {
1414

1515
void THarmonizerCpuConsumption::Init(i16 poolCount) {
1616
PoolConsumption.resize(poolCount);
17+
PoolFullThreadConsumption.resize(poolCount);
18+
PoolForeignConsumption.resize(poolCount);
1719
IsNeedyByPool.reserve(poolCount);
1820
NeedyPools.reserve(poolCount);
1921
HoggishPools.reserve(poolCount);
@@ -25,8 +27,9 @@ namespace {
2527
return Max(0.0, Min(1.0, value * (1.0/0.9)));
2628
}
2729

28-
void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption) {
30+
void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption, TCpuConsumptionInfo *poolFullThreadConsumption) {
2931
poolConsumption->Clear();
32+
poolFullThreadConsumption->Clear();
3033
for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
3134
float threadElapsed = Rescale(pool.GetElapsed(threadIdx));
3235
float threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx));
@@ -36,13 +39,17 @@ namespace {
3639
poolConsumption->LastSecondElapsed += threadLastSecondElapsed;
3740
poolConsumption->Cpu += threadCpu;
3841
poolConsumption->LastSecondCpu += threadLastSecondCpu;
42+
poolFullThreadConsumption->Elapsed += threadElapsed;
43+
poolFullThreadConsumption->LastSecondElapsed += threadLastSecondElapsed;
44+
poolFullThreadConsumption->Cpu += threadCpu;
45+
poolFullThreadConsumption->LastSecondCpu += threadLastSecondCpu;
3946
LWPROBE_WITH_DEBUG(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), threadIdx, threadElapsed, threadCpu, threadLastSecondElapsed, threadLastSecondCpu);
4047
}
4148
for (i16 sharedIdx = 0; sharedIdx < static_cast<i16>(pool.SharedInfo.size()); ++sharedIdx) {
42-
float sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx));
43-
float sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx));
44-
float sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx));
45-
float sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx));
49+
float sharedElapsed = pool.GetSharedElapsed(sharedIdx);
50+
float sharedLastSecondElapsed = pool.GetLastSecondSharedElapsed(sharedIdx);
51+
float sharedCpu = pool.GetSharedCpu(sharedIdx);
52+
float sharedLastSecondCpu = pool.GetLastSecondSharedCpu(sharedIdx);
4653
poolConsumption->Elapsed += sharedElapsed;
4754
poolConsumption->LastSecondElapsed += sharedLastSecondElapsed;
4855
poolConsumption->Cpu += sharedCpu;
@@ -51,12 +58,19 @@ namespace {
5158
}
5259
}
5360

61+
void UpdatePoolForeignConsumption(const TPoolInfo& pool, TPoolForeignConsumptionInfo *poolForeignConsumption, const TSharedInfo& sharedInfo) {
62+
float prevElapsed = std::exchange(poolForeignConsumption->PrevElapsedValue, sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignElapsed);
63+
float prevCpu = std::exchange(poolForeignConsumption->PrevCpuValue, sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignCpu);
64+
poolForeignConsumption->Elapsed = sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignElapsed - prevElapsed;
65+
poolForeignConsumption->Cpu = sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignCpu - prevCpu;
66+
}
67+
5468
bool IsStarved(double elapsed, double cpu) {
5569
return Max(elapsed, cpu) > 0.1 && (cpu < elapsed * 0.7 || elapsed - cpu > 0.5);
5670
}
5771

5872
bool IsHoggish(double elapsed, double currentThreadCount) {
59-
return elapsed < currentThreadCount - 0.5;
73+
return elapsed <= currentThreadCount - 1.0;
6074
}
6175

6276
} // namespace
@@ -82,34 +96,51 @@ void THarmonizerCpuConsumption::Pull(const std::vector<std::unique_ptr<TPoolInfo
8296

8397
AdditionalThreads += Max(0, pool.GetFullThreadCount() - pool.DefaultFullThreadCount);
8498
float currentThreadCount = pool.GetThreadCount();
99+
float currentFullThreadCount = pool.GetFullThreadCount();
85100
StoppingThreads += pool.Pool->GetBlockingThreadCount();
86101
HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "current thread count", currentThreadCount, "stopping threads", StoppingThreads, "default thread count", pool.DefaultThreadCount);
87102

88-
UpdatePoolConsumption(pool, &PoolConsumption[poolIdx]);
89-
90-
HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "elapsed", PoolConsumption[poolIdx].Elapsed, "cpu", PoolConsumption[poolIdx].Cpu, "last second elapsed", PoolConsumption[poolIdx].LastSecondElapsed, "last second cpu", PoolConsumption[poolIdx].LastSecondCpu);
103+
UpdatePoolConsumption(pool, &PoolConsumption[poolIdx], &PoolFullThreadConsumption[poolIdx]);
104+
UpdatePoolForeignConsumption(pool, &PoolForeignConsumption[poolIdx], sharedInfo);
105+
106+
HARMONIZER_DEBUG_PRINT("CpuConsumption::Pull",
107+
"pool:", poolIdx,
108+
"pool name:", pool.Pool->GetName(),
109+
"elapsed:", PoolConsumption[poolIdx].Elapsed,
110+
"cpu:", PoolConsumption[poolIdx].Cpu,
111+
"last second elapsed:", PoolConsumption[poolIdx].LastSecondElapsed,
112+
"last second cpu:", PoolConsumption[poolIdx].LastSecondCpu,
113+
"full thread elapsed:", PoolFullThreadConsumption[poolIdx].Elapsed,
114+
"full thread cpu:", PoolFullThreadConsumption[poolIdx].Cpu,
115+
"last second full thread elapsed:", PoolFullThreadConsumption[poolIdx].LastSecondElapsed,
116+
"last second full thread cpu:", PoolFullThreadConsumption[poolIdx].LastSecondCpu,
117+
"foreign elapsed:", PoolForeignConsumption[poolIdx].Elapsed,
118+
"foreign cpu:", PoolForeignConsumption[poolIdx].Cpu
119+
);
91120

92121
bool isStarved = IsStarved(PoolConsumption[poolIdx].Elapsed, PoolConsumption[poolIdx].Cpu)
93122
|| IsStarved(PoolConsumption[poolIdx].LastSecondElapsed, PoolConsumption[poolIdx].LastSecondCpu);
94123
if (isStarved) {
95124
IsStarvedPresent = true;
96125
}
97126

98-
bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].LastSecondCpu >= currentThreadCount);
127+
float expectedThreadCount = pool.GetFullThreadCount() + (sharedInfo.OwnedThreads[poolIdx] != -1 ? 1 : 0) + 0.5;
128+
bool isMoreThanExpected = (PoolConsumption[poolIdx].LastSecondCpu >= expectedThreadCount) && (PoolFullThreadConsumption[poolIdx].LastSecondCpu >= currentFullThreadCount - 1);
129+
bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].LastSecondCpu >= currentThreadCount || isMoreThanExpected);
99130
IsNeedyByPool.push_back(isNeedy);
100131
if (isNeedy) {
101132
NeedyPools.push_back(poolIdx);
102133
}
103134

104-
bool isHoggish = IsHoggish(PoolConsumption[poolIdx].Elapsed, currentThreadCount);
135+
bool isHoggish = !isNeedy && IsHoggish(PoolFullThreadConsumption[poolIdx].Elapsed, currentFullThreadCount) && IsHoggish(PoolFullThreadConsumption[poolIdx].LastSecondElapsed, currentFullThreadCount);
105136
if (isHoggish) {
106-
float freeCpu = currentThreadCount - PoolConsumption[poolIdx].Elapsed;
137+
float freeCpu = std::min(currentFullThreadCount - PoolFullThreadConsumption[poolIdx].Elapsed, currentFullThreadCount - PoolFullThreadConsumption[poolIdx].LastSecondElapsed);
107138
HoggishPools.push_back({poolIdx, freeCpu});
108139
}
109140

110141
Elapsed += PoolConsumption[poolIdx].Elapsed;
111142
Cpu += PoolConsumption[poolIdx].Cpu;
112-
LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed;
143+
LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed;
113144
LastSecondCpu += PoolConsumption[poolIdx].LastSecondCpu;
114145
pool.LastFlags.store((i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2), std::memory_order_relaxed);
115146
LWPROBE_WITH_DEBUG(
@@ -140,10 +171,12 @@ void THarmonizerCpuConsumption::Pull(const std::vector<std::unique_ptr<TPoolInfo
140171
HARMONIZER_DEBUG_PRINT("NeedyPools", NeedyPools.size(), "HoggishPools", HoggishPools.size());
141172

142173
Budget = TotalCores - Elapsed;
174+
BudgetLS = TotalCores - LastSecondElapsed;
143175
BudgetWithoutSharedCpu = Budget - sharedInfo.FreeCpu;
176+
BudgetLSWithoutSharedCpu = BudgetLS - sharedInfo.FreeCpu;
144177
Overbooked = -Budget;
145178
LostCpu = Max<float>(0.0f, Elapsed - Cpu);
146-
if (Budget < -0.1) {
179+
if (BudgetLS < -0.1) {
147180
IsStarvedPresent = true;
148181
}
149182
HARMONIZER_DEBUG_PRINT("IsStarvedPresent", IsStarvedPresent, "Budget", Budget, "Overbooked", Overbooked, "TotalCores", TotalCores, "Elapsed", Elapsed, "Cpu", Cpu, "LastSecondElapsed", LastSecondElapsed, "LastSecondCpu", LastSecondCpu);

ydb/library/actors/core/harmonizer/cpu_consumption.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,28 @@ struct TCpuConsumptionInfo {
1616
void Clear();
1717
}; // struct TCpuConsumptionInfo
1818

19+
struct TPoolForeignConsumptionInfo {
20+
float Elapsed;
21+
float Cpu;
22+
float PrevElapsedValue;
23+
float PrevCpuValue;
24+
}; // struct TPoolForeignConsumptionInfo
25+
1926
struct THarmonizerCpuConsumption {
2027
std::vector<TCpuConsumptionInfo> PoolConsumption;
28+
std::vector<TCpuConsumptionInfo> PoolFullThreadConsumption;
29+
std::vector<TPoolForeignConsumptionInfo> PoolForeignConsumption;
2130

2231
float TotalCores = 0;
2332
i16 AdditionalThreads = 0;
2433
i16 StoppingThreads = 0;
2534
bool IsStarvedPresent = false;
35+
2636
float Budget = 0.0;
37+
float BudgetLS = 0.0;
2738
float BudgetWithoutSharedCpu = 0.0;
39+
float BudgetLSWithoutSharedCpu = 0.0;
40+
2841
float Overbooked = 0.0;
2942
float LostCpu = 0.0;
3043

0 commit comments

Comments
 (0)