Skip to content

Improve behaviour in corner cases when used shared threads (#19693) #20704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: stable-25-1-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace NActors {

constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX;

TString GetCurrentThreadKind() {
TString GetCurrentThreadKind() {
if (TlsThreadContext) {
return TlsThreadContext->WorkerId() >= 0 ? "[common]" : "[shared]";
}
Expand Down Expand Up @@ -268,7 +268,7 @@ namespace NActors {
if (StopFlag.load(std::memory_order_acquire)) {
return nullptr;
}

TWorkerId workerId = TlsThreadContext->WorkerId();
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "");
NHPTimer::STime hpnow = GetCycleCountFast();
Expand Down Expand Up @@ -391,7 +391,7 @@ namespace NActors {
EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "shared pool wake up local threads");
return;
}
}
}

i16 sleepThreads = 0;
Y_UNUSED(sleepThreads);
Expand Down Expand Up @@ -675,7 +675,7 @@ namespace NActors {
i16 TBasicExecutorPool::GetMaxFullThreadCount() const {
return MaxFullThreadCount;
}

ui32 TBasicExecutorPool::GetThreads() const {
return MaxFullThreadCount;
}
Expand Down
61 changes: 47 additions & 14 deletions ydb/library/actors/core/harmonizer/cpu_consumption.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ void TCpuConsumptionInfo::Clear() {

void THarmonizerCpuConsumption::Init(i16 poolCount) {
PoolConsumption.resize(poolCount);
PoolFullThreadConsumption.resize(poolCount);
PoolForeignConsumption.resize(poolCount);
IsNeedyByPool.reserve(poolCount);
NeedyPools.reserve(poolCount);
HoggishPools.reserve(poolCount);
Expand All @@ -25,8 +27,9 @@ namespace {
return Max(0.0, Min(1.0, value * (1.0/0.9)));
}

void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption) {
void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption, TCpuConsumptionInfo *poolFullThreadConsumption) {
poolConsumption->Clear();
poolFullThreadConsumption->Clear();
for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
float threadElapsed = Rescale(pool.GetElapsed(threadIdx));
float threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx));
Expand All @@ -36,13 +39,17 @@ namespace {
poolConsumption->LastSecondElapsed += threadLastSecondElapsed;
poolConsumption->Cpu += threadCpu;
poolConsumption->LastSecondCpu += threadLastSecondCpu;
poolFullThreadConsumption->Elapsed += threadElapsed;
poolFullThreadConsumption->LastSecondElapsed += threadLastSecondElapsed;
poolFullThreadConsumption->Cpu += threadCpu;
poolFullThreadConsumption->LastSecondCpu += threadLastSecondCpu;
LWPROBE_WITH_DEBUG(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), threadIdx, threadElapsed, threadCpu, threadLastSecondElapsed, threadLastSecondCpu);
}
for (i16 sharedIdx = 0; sharedIdx < static_cast<i16>(pool.SharedInfo.size()); ++sharedIdx) {
float sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx));
float sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx));
float sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx));
float sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx));
float sharedElapsed = pool.GetSharedElapsed(sharedIdx);
float sharedLastSecondElapsed = pool.GetLastSecondSharedElapsed(sharedIdx);
float sharedCpu = pool.GetSharedCpu(sharedIdx);
float sharedLastSecondCpu = pool.GetLastSecondSharedCpu(sharedIdx);
poolConsumption->Elapsed += sharedElapsed;
poolConsumption->LastSecondElapsed += sharedLastSecondElapsed;
poolConsumption->Cpu += sharedCpu;
Expand All @@ -51,12 +58,19 @@ namespace {
}
}

void UpdatePoolForeignConsumption(const TPoolInfo& pool, TPoolForeignConsumptionInfo *poolForeignConsumption, const TSharedInfo& sharedInfo) {
float prevElapsed = std::exchange(poolForeignConsumption->PrevElapsedValue, sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignElapsed);
float prevCpu = std::exchange(poolForeignConsumption->PrevCpuValue, sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignCpu);
poolForeignConsumption->Elapsed = sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignElapsed - prevElapsed;
poolForeignConsumption->Cpu = sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignCpu - prevCpu;
}

bool IsStarved(double elapsed, double cpu) {
return Max(elapsed, cpu) > 0.1 && (cpu < elapsed * 0.7 || elapsed - cpu > 0.5);
}

bool IsHoggish(double elapsed, double currentThreadCount) {
return elapsed < currentThreadCount - 0.5;
return elapsed <= currentThreadCount - 1.0;
}

} // namespace
Expand All @@ -82,34 +96,51 @@ void THarmonizerCpuConsumption::Pull(const std::vector<std::unique_ptr<TPoolInfo

AdditionalThreads += Max(0, pool.GetFullThreadCount() - pool.DefaultFullThreadCount);
float currentThreadCount = pool.GetThreadCount();
float currentFullThreadCount = pool.GetFullThreadCount();
StoppingThreads += pool.Pool->GetBlockingThreadCount();
HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "current thread count", currentThreadCount, "stopping threads", StoppingThreads, "default thread count", pool.DefaultThreadCount);

UpdatePoolConsumption(pool, &PoolConsumption[poolIdx]);

HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "elapsed", PoolConsumption[poolIdx].Elapsed, "cpu", PoolConsumption[poolIdx].Cpu, "last second elapsed", PoolConsumption[poolIdx].LastSecondElapsed, "last second cpu", PoolConsumption[poolIdx].LastSecondCpu);
UpdatePoolConsumption(pool, &PoolConsumption[poolIdx], &PoolFullThreadConsumption[poolIdx]);
UpdatePoolForeignConsumption(pool, &PoolForeignConsumption[poolIdx], sharedInfo);

HARMONIZER_DEBUG_PRINT("CpuConsumption::Pull",
"pool:", poolIdx,
"pool name:", pool.Pool->GetName(),
"elapsed:", PoolConsumption[poolIdx].Elapsed,
"cpu:", PoolConsumption[poolIdx].Cpu,
"last second elapsed:", PoolConsumption[poolIdx].LastSecondElapsed,
"last second cpu:", PoolConsumption[poolIdx].LastSecondCpu,
"full thread elapsed:", PoolFullThreadConsumption[poolIdx].Elapsed,
"full thread cpu:", PoolFullThreadConsumption[poolIdx].Cpu,
"last second full thread elapsed:", PoolFullThreadConsumption[poolIdx].LastSecondElapsed,
"last second full thread cpu:", PoolFullThreadConsumption[poolIdx].LastSecondCpu,
"foreign elapsed:", PoolForeignConsumption[poolIdx].Elapsed,
"foreign cpu:", PoolForeignConsumption[poolIdx].Cpu
);

bool isStarved = IsStarved(PoolConsumption[poolIdx].Elapsed, PoolConsumption[poolIdx].Cpu)
|| IsStarved(PoolConsumption[poolIdx].LastSecondElapsed, PoolConsumption[poolIdx].LastSecondCpu);
if (isStarved) {
IsStarvedPresent = true;
}

bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].LastSecondCpu >= currentThreadCount);
float expectedThreadCount = pool.GetFullThreadCount() + (sharedInfo.OwnedThreads[poolIdx] != -1 ? 1 : 0) + 0.5;
bool isMoreThanExpected = (PoolConsumption[poolIdx].LastSecondCpu >= expectedThreadCount) && (PoolFullThreadConsumption[poolIdx].LastSecondCpu >= currentFullThreadCount - 1);
bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].LastSecondCpu >= currentThreadCount || isMoreThanExpected);
IsNeedyByPool.push_back(isNeedy);
if (isNeedy) {
NeedyPools.push_back(poolIdx);
}

bool isHoggish = IsHoggish(PoolConsumption[poolIdx].Elapsed, currentThreadCount);
bool isHoggish = !isNeedy && IsHoggish(PoolFullThreadConsumption[poolIdx].Elapsed, currentFullThreadCount) && IsHoggish(PoolFullThreadConsumption[poolIdx].LastSecondElapsed, currentFullThreadCount);
if (isHoggish) {
float freeCpu = currentThreadCount - PoolConsumption[poolIdx].Elapsed;
float freeCpu = std::min(currentFullThreadCount - PoolFullThreadConsumption[poolIdx].Elapsed, currentFullThreadCount - PoolFullThreadConsumption[poolIdx].LastSecondElapsed);
HoggishPools.push_back({poolIdx, freeCpu});
}

Elapsed += PoolConsumption[poolIdx].Elapsed;
Cpu += PoolConsumption[poolIdx].Cpu;
LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed;
LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed;
LastSecondCpu += PoolConsumption[poolIdx].LastSecondCpu;
pool.LastFlags.store((i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2), std::memory_order_relaxed);
LWPROBE_WITH_DEBUG(
Expand Down Expand Up @@ -140,10 +171,12 @@ void THarmonizerCpuConsumption::Pull(const std::vector<std::unique_ptr<TPoolInfo
HARMONIZER_DEBUG_PRINT("NeedyPools", NeedyPools.size(), "HoggishPools", HoggishPools.size());

Budget = TotalCores - Elapsed;
BudgetLS = TotalCores - LastSecondElapsed;
BudgetWithoutSharedCpu = Budget - sharedInfo.FreeCpu;
BudgetLSWithoutSharedCpu = BudgetLS - sharedInfo.FreeCpu;
Overbooked = -Budget;
LostCpu = Max<float>(0.0f, Elapsed - Cpu);
if (Budget < -0.1) {
if (BudgetLS < -0.1) {
IsStarvedPresent = true;
}
HARMONIZER_DEBUG_PRINT("IsStarvedPresent", IsStarvedPresent, "Budget", Budget, "Overbooked", Overbooked, "TotalCores", TotalCores, "Elapsed", Elapsed, "Cpu", Cpu, "LastSecondElapsed", LastSecondElapsed, "LastSecondCpu", LastSecondCpu);
Expand Down
13 changes: 13 additions & 0 deletions ydb/library/actors/core/harmonizer/cpu_consumption.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,28 @@ struct TCpuConsumptionInfo {
void Clear();
}; // struct TCpuConsumptionInfo

struct TPoolForeignConsumptionInfo {
float Elapsed;
float Cpu;
float PrevElapsedValue;
float PrevCpuValue;
}; // struct TPoolForeignConsumptionInfo

struct THarmonizerCpuConsumption {
std::vector<TCpuConsumptionInfo> PoolConsumption;
std::vector<TCpuConsumptionInfo> PoolFullThreadConsumption;
std::vector<TPoolForeignConsumptionInfo> PoolForeignConsumption;

float TotalCores = 0;
i16 AdditionalThreads = 0;
i16 StoppingThreads = 0;
bool IsStarvedPresent = false;

float Budget = 0.0;
float BudgetLS = 0.0;
float BudgetWithoutSharedCpu = 0.0;
float BudgetLSWithoutSharedCpu = 0.0;

float Overbooked = 0.0;
float LostCpu = 0.0;

Expand Down
Loading
Loading