diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index e1182da7d923..fdceebc853db 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -49,7 +49,7 @@ namespace NActors { constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX; - TString GetCurrentThreadKind() { + TString GetCurrentThreadKind() { if (TlsThreadContext) { return TlsThreadContext->WorkerId() >= 0 ? "[common]" : "[shared]"; } @@ -268,7 +268,7 @@ namespace NActors { if (StopFlag.load(std::memory_order_acquire)) { return nullptr; } - + TWorkerId workerId = TlsThreadContext->WorkerId(); EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, ""); NHPTimer::STime hpnow = GetCycleCountFast(); @@ -391,7 +391,7 @@ namespace NActors { EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "shared pool wake up local threads"); return; } - } + } i16 sleepThreads = 0; Y_UNUSED(sleepThreads); @@ -675,7 +675,7 @@ namespace NActors { i16 TBasicExecutorPool::GetMaxFullThreadCount() const { return MaxFullThreadCount; } - + ui32 TBasicExecutorPool::GetThreads() const { return MaxFullThreadCount; } diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp index 9e9e714f1373..ace01c7caa85 100644 --- a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp +++ b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp @@ -14,6 +14,8 @@ void TCpuConsumptionInfo::Clear() { void THarmonizerCpuConsumption::Init(i16 poolCount) { PoolConsumption.resize(poolCount); + PoolFullThreadConsumption.resize(poolCount); + PoolForeignConsumption.resize(poolCount); IsNeedyByPool.reserve(poolCount); NeedyPools.reserve(poolCount); HoggishPools.reserve(poolCount); @@ -25,8 +27,9 @@ namespace { return Max(0.0, Min(1.0, value * (1.0/0.9))); } - void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption) { + void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption, TCpuConsumptionInfo *poolFullThreadConsumption) { poolConsumption->Clear(); + poolFullThreadConsumption->Clear(); for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { float threadElapsed = Rescale(pool.GetElapsed(threadIdx)); float threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx)); @@ -36,13 +39,17 @@ namespace { poolConsumption->LastSecondElapsed += threadLastSecondElapsed; poolConsumption->Cpu += threadCpu; poolConsumption->LastSecondCpu += threadLastSecondCpu; + poolFullThreadConsumption->Elapsed += threadElapsed; + poolFullThreadConsumption->LastSecondElapsed += threadLastSecondElapsed; + poolFullThreadConsumption->Cpu += threadCpu; + poolFullThreadConsumption->LastSecondCpu += threadLastSecondCpu; LWPROBE_WITH_DEBUG(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), threadIdx, threadElapsed, threadCpu, threadLastSecondElapsed, threadLastSecondCpu); } for (i16 sharedIdx = 0; sharedIdx < static_cast(pool.SharedInfo.size()); ++sharedIdx) { - float sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx)); - float sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx)); - float sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx)); - float sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx)); + float sharedElapsed = pool.GetSharedElapsed(sharedIdx); + float sharedLastSecondElapsed = pool.GetLastSecondSharedElapsed(sharedIdx); + float sharedCpu = pool.GetSharedCpu(sharedIdx); + float sharedLastSecondCpu = pool.GetLastSecondSharedCpu(sharedIdx); poolConsumption->Elapsed += sharedElapsed; poolConsumption->LastSecondElapsed += sharedLastSecondElapsed; poolConsumption->Cpu += sharedCpu; @@ -51,12 +58,19 @@ namespace { } } + void UpdatePoolForeignConsumption(const TPoolInfo& pool, TPoolForeignConsumptionInfo *poolForeignConsumption, const TSharedInfo& sharedInfo) { + float prevElapsed = std::exchange(poolForeignConsumption->PrevElapsedValue, sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignElapsed); + float prevCpu = std::exchange(poolForeignConsumption->PrevCpuValue, sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignCpu); + poolForeignConsumption->Elapsed = sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignElapsed - prevElapsed; + poolForeignConsumption->Cpu = sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignCpu - prevCpu; + } + bool IsStarved(double elapsed, double cpu) { return Max(elapsed, cpu) > 0.1 && (cpu < elapsed * 0.7 || elapsed - cpu > 0.5); } bool IsHoggish(double elapsed, double currentThreadCount) { - return elapsed < currentThreadCount - 0.5; + return elapsed <= currentThreadCount - 1.0; } } // namespace @@ -82,12 +96,27 @@ void THarmonizerCpuConsumption::Pull(const std::vectorGetBlockingThreadCount(); HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "current thread count", currentThreadCount, "stopping threads", StoppingThreads, "default thread count", pool.DefaultThreadCount); - UpdatePoolConsumption(pool, &PoolConsumption[poolIdx]); - - HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "elapsed", PoolConsumption[poolIdx].Elapsed, "cpu", PoolConsumption[poolIdx].Cpu, "last second elapsed", PoolConsumption[poolIdx].LastSecondElapsed, "last second cpu", PoolConsumption[poolIdx].LastSecondCpu); + UpdatePoolConsumption(pool, &PoolConsumption[poolIdx], &PoolFullThreadConsumption[poolIdx]); + UpdatePoolForeignConsumption(pool, &PoolForeignConsumption[poolIdx], sharedInfo); + + HARMONIZER_DEBUG_PRINT("CpuConsumption::Pull", + "pool:", poolIdx, + "pool name:", pool.Pool->GetName(), + "elapsed:", PoolConsumption[poolIdx].Elapsed, + "cpu:", PoolConsumption[poolIdx].Cpu, + "last second elapsed:", PoolConsumption[poolIdx].LastSecondElapsed, + "last second cpu:", PoolConsumption[poolIdx].LastSecondCpu, + "full thread elapsed:", PoolFullThreadConsumption[poolIdx].Elapsed, + "full thread cpu:", PoolFullThreadConsumption[poolIdx].Cpu, + "last second full thread elapsed:", PoolFullThreadConsumption[poolIdx].LastSecondElapsed, + "last second full thread cpu:", PoolFullThreadConsumption[poolIdx].LastSecondCpu, + "foreign elapsed:", PoolForeignConsumption[poolIdx].Elapsed, + "foreign cpu:", PoolForeignConsumption[poolIdx].Cpu + ); bool isStarved = IsStarved(PoolConsumption[poolIdx].Elapsed, PoolConsumption[poolIdx].Cpu) || IsStarved(PoolConsumption[poolIdx].LastSecondElapsed, PoolConsumption[poolIdx].LastSecondCpu); @@ -95,21 +124,23 @@ void THarmonizerCpuConsumption::Pull(const std::vector= currentThreadCount); + float expectedThreadCount = pool.GetFullThreadCount() + (sharedInfo.OwnedThreads[poolIdx] != -1 ? 1 : 0) + 0.5; + bool isMoreThanExpected = (PoolConsumption[poolIdx].LastSecondCpu >= expectedThreadCount) && (PoolFullThreadConsumption[poolIdx].LastSecondCpu >= currentFullThreadCount - 1); + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].LastSecondCpu >= currentThreadCount || isMoreThanExpected); IsNeedyByPool.push_back(isNeedy); if (isNeedy) { NeedyPools.push_back(poolIdx); } - bool isHoggish = IsHoggish(PoolConsumption[poolIdx].Elapsed, currentThreadCount); + bool isHoggish = !isNeedy && IsHoggish(PoolFullThreadConsumption[poolIdx].Elapsed, currentFullThreadCount) && IsHoggish(PoolFullThreadConsumption[poolIdx].LastSecondElapsed, currentFullThreadCount); if (isHoggish) { - float freeCpu = currentThreadCount - PoolConsumption[poolIdx].Elapsed; + float freeCpu = std::min(currentFullThreadCount - PoolFullThreadConsumption[poolIdx].Elapsed, currentFullThreadCount - PoolFullThreadConsumption[poolIdx].LastSecondElapsed); HoggishPools.push_back({poolIdx, freeCpu}); } Elapsed += PoolConsumption[poolIdx].Elapsed; Cpu += PoolConsumption[poolIdx].Cpu; - LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed; + LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed; LastSecondCpu += PoolConsumption[poolIdx].LastSecondCpu; pool.LastFlags.store((i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2), std::memory_order_relaxed); LWPROBE_WITH_DEBUG( @@ -140,10 +171,12 @@ void THarmonizerCpuConsumption::Pull(const std::vector(0.0f, Elapsed - Cpu); - if (Budget < -0.1) { + if (BudgetLS < -0.1) { IsStarvedPresent = true; } HARMONIZER_DEBUG_PRINT("IsStarvedPresent", IsStarvedPresent, "Budget", Budget, "Overbooked", Overbooked, "TotalCores", TotalCores, "Elapsed", Elapsed, "Cpu", Cpu, "LastSecondElapsed", LastSecondElapsed, "LastSecondCpu", LastSecondCpu); diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.h b/ydb/library/actors/core/harmonizer/cpu_consumption.h index 8779cf1eb64a..8801dad4353a 100644 --- a/ydb/library/actors/core/harmonizer/cpu_consumption.h +++ b/ydb/library/actors/core/harmonizer/cpu_consumption.h @@ -16,15 +16,28 @@ struct TCpuConsumptionInfo { void Clear(); }; // struct TCpuConsumptionInfo +struct TPoolForeignConsumptionInfo { + float Elapsed; + float Cpu; + float PrevElapsedValue; + float PrevCpuValue; +}; // struct TPoolForeignConsumptionInfo + struct THarmonizerCpuConsumption { std::vector PoolConsumption; + std::vector PoolFullThreadConsumption; + std::vector PoolForeignConsumption; float TotalCores = 0; i16 AdditionalThreads = 0; i16 StoppingThreads = 0; bool IsStarvedPresent = false; + float Budget = 0.0; + float BudgetLS = 0.0; float BudgetWithoutSharedCpu = 0.0; + float BudgetLSWithoutSharedCpu = 0.0; + float Overbooked = 0.0; float LostCpu = 0.0; diff --git a/ydb/library/actors/core/harmonizer/harmonizer.cpp b/ydb/library/actors/core/harmonizer/harmonizer.cpp index e1cf16c3d1f7..4dd985d75698 100644 --- a/ydb/library/actors/core/harmonizer/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer/harmonizer.cpp @@ -28,8 +28,6 @@ namespace NActors { -constexpr float kMinFreeCpuThreshold = 0.1; - LWTRACE_USING(ACTORLIB_PROVIDER); @@ -67,6 +65,8 @@ class THarmonizer: public IHarmonizer { void ProcessNeedyState(); void ProcessExchange(); void ProcessHoggishState(); + + void SetForeignThreadSlotsForCurrentFullThreadCount(ui16 poolIdx); public: THarmonizer(ui64 ts); virtual ~THarmonizer(); @@ -106,7 +106,7 @@ void THarmonizer::PullStats(ui64 ts) { SharedInfo.Pull(Pools, *Shared); } CpuConsumption.Pull(Pools, SharedInfo); - ProcessingBudget = CpuConsumption.BudgetWithoutSharedCpu; + ProcessingBudget = CpuConsumption.BudgetLSWithoutSharedCpu; } void THarmonizer::ProcessWaitingStats() { @@ -131,6 +131,16 @@ void THarmonizer::ProcessWaitingStats() { } } +void THarmonizer::SetForeignThreadSlotsForCurrentFullThreadCount(ui16 poolIdx) { + if (Shared) { + bool hasOwnSharedThread = SharedInfo.OwnedThreads[poolIdx] != -1; + i16 currentFullThreadCount = Pools[poolIdx]->GetFullThreadCount(); + i16 slots = Pools[poolIdx]->MaxThreadCount - currentFullThreadCount - hasOwnSharedThread; + i16 maxSlots = Shared->GetSharedThreadCount() - hasOwnSharedThread; + Shared->SetForeignThreadSlots(poolIdx, Min(slots, maxSlots)); + } +} + void THarmonizer::ProcessStarvedState() { HARMONIZER_DEBUG_PRINT("ProcessStarvedState"); HARMONIZER_DEBUG_PRINT("shared info", SharedInfo.ToString()); @@ -138,11 +148,19 @@ void THarmonizer::ProcessStarvedState() { TPoolInfo &pool = *Pools[poolIdx]; i64 threadCount = pool.GetFullThreadCount(); HARMONIZER_DEBUG_PRINT("poolIdx", poolIdx, "threadCount", threadCount, "pool.DefaultFullThreadCount", pool.DefaultFullThreadCount); + if (CpuConsumption.PoolConsumption[poolIdx].Elapsed > pool.GetThreadCount()) { + continue; + } + i16 maxSharedCpuQuota = i16(SharedInfo.OwnedThreads[poolIdx] != -1) + SharedInfo.ForeignThreadsAllowed[poolIdx]; + if (SharedInfo.CpuConsumption[poolIdx].CpuQuota > maxSharedCpuQuota) { + continue; + } while (threadCount > pool.DefaultFullThreadCount) { pool.SetFullThreadCount(--threadCount); pool.DecreasingThreadsByStarvedState.fetch_add(1, std::memory_order_relaxed); CpuConsumption.AdditionalThreads--; CpuConsumption.StoppingThreads++; + SetForeignThreadSlotsForCurrentFullThreadCount(poolIdx); LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) { @@ -166,21 +184,20 @@ void THarmonizer::ProcessNeedyState() { if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) { continue; } + float fullThreadCount = pool.GetFullThreadCount(); float threadCount = pool.GetFullThreadCount() + SharedInfo.CpuConsumption[needyPoolIdx].CpuQuota; - if (ProcessingBudget >= 1.0 && threadCount + 1 <= pool.MaxFullThreadCount && SharedInfo.FreeCpu < kMinFreeCpuThreshold) { + float foreignElapsed = CpuConsumption.PoolForeignConsumption[needyPoolIdx].Elapsed; + float extraForeignElapsed = std::max(0.0f, foreignElapsed - std::max(0, SharedInfo.ForeignThreadsAllowed[needyPoolIdx] - 1)); + bool allowedNextThreadCount = pool.GetFullThreadCount() + 1 <= pool.MaxFullThreadCount; + if (ProcessingBudget > 0.0 && ProcessingBudget + extraForeignElapsed >= 1.0 && allowedNextThreadCount) { pool.IncreasingThreadsByNeedyState.fetch_add(1, std::memory_order_relaxed); CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; CpuConsumption.AdditionalThreads++; - pool.SetFullThreadCount(threadCount + 1); - if (Shared) { - bool hasOwnSharedThread = SharedInfo.OwnedThreads[needyPoolIdx] != -1; - i16 slots = pool.MaxThreadCount - threadCount - 1 - hasOwnSharedThread; - i16 maxSlots = Shared->GetSharedThreadCount(); - Shared->SetForeignThreadSlots(needyPoolIdx, Min(slots, maxSlots)); - } + pool.SetFullThreadCount(fullThreadCount + 1); + SetForeignThreadSlotsForCurrentFullThreadCount(needyPoolIdx); ProcessingBudget -= 1.0; - LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", fullThreadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } if (pool.MaxLocalQueueSize) { bool needToExpandLocalQueue = ProcessingBudget < 1.0 || threadCount >= pool.MaxFullThreadCount; @@ -223,12 +240,7 @@ void THarmonizer::ProcessExchange() { CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; takingAwayThreads++; pool.SetFullThreadCount(fullThreadCount + 1); - if (Shared) { - bool hasOwnSharedThread = SharedInfo.OwnedThreads[needyPoolIdx] != -1; - i16 slots = pool.MaxThreadCount - fullThreadCount - 1 - hasOwnSharedThread; - i16 maxSlots = Shared->GetSharedThreadCount(); - Shared->SetForeignThreadSlots(needyPoolIdx, Min(slots, maxSlots)); - } + SetForeignThreadSlotsForCurrentFullThreadCount(needyPoolIdx); LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } @@ -248,12 +260,7 @@ void THarmonizer::ProcessExchange() { } takingAwayThreads -= currentTakingAwayThreads; pool.SetFullThreadCount(fullThreadCount - currentTakingAwayThreads); - if (Shared) { - bool hasOwnSharedThread = SharedInfo.OwnedThreads[poolIdx] != -1; - i16 slots = pool.MaxThreadCount - fullThreadCount + currentTakingAwayThreads - hasOwnSharedThread; - i16 maxSlots = Shared->GetSharedThreadCount(); - Shared->SetForeignThreadSlots(poolIdx, Min(slots, maxSlots)); - } + SetForeignThreadSlotsForCurrentFullThreadCount(poolIdx); pool.DecreasingThreadsByExchange.fetch_add(currentTakingAwayThreads, std::memory_order_relaxed); LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", fullThreadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); @@ -268,12 +275,7 @@ void THarmonizer::ProcessHoggishState() { if (fullThreadCount > pool.MinFullThreadCount && freeCpu >= 1) { pool.DecreasingThreadsByHoggishState.fetch_add(1, std::memory_order_relaxed); pool.SetFullThreadCount(fullThreadCount - 1); - if (Shared) { - bool hasOwnSharedThread = SharedInfo.OwnedThreads[hoggishPoolIdx] != -1; - i16 slots = pool.MaxThreadCount - fullThreadCount + 1 - hasOwnSharedThread; - i16 maxSlots = Shared->GetSharedThreadCount(); - Shared->SetForeignThreadSlots(hoggishPoolIdx, Min(slots, maxSlots)); - } + SetForeignThreadSlotsForCurrentFullThreadCount(hoggishPoolIdx); LWPROBE_WITH_DEBUG(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", fullThreadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } if (pool.BasicPool && pool.LocalQueueSize > pool.MinLocalQueueSize) { @@ -334,11 +336,15 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { float poolSharedElapsedCpu = SharedInfo.CpuConsumption[poolIdx].Elapsed; possibleMaxSharedQuota = std::min(poolSharedElapsedCpu + freeSharedCpu, sharedThreads); } - float threadCount = pool.GetFullThreadCount(); - float elapsedCpu = pool.ElapsedCpu.GetAvgPart(); - float parkedCpu = Max(0.0f, threadCount - elapsedCpu); + + float fullThreadCount = pool.GetFullThreadCount(); + float elapsedCpu = CpuConsumption.PoolFullThreadConsumption[poolIdx].Elapsed; + float parkedCpu = Max(0.0f, fullThreadCount - elapsedCpu); float budgetWithoutSharedAndParkedCpu = std::max(0.0f, budgetWithoutSharedCpu - parkedCpu); - i16 potentialMaxThreadCountWithoutSharedCpu = std::min(pool.MaxThreadCount, threadCount + budgetWithoutSharedAndParkedCpu); + float potentialMaxThreadCountWithoutSharedCpu = std::min(pool.MaxThreadCount, fullThreadCount + budgetWithoutSharedAndParkedCpu); + if (!Shared) { + potentialMaxThreadCountWithoutSharedCpu = std::floor(potentialMaxThreadCountWithoutSharedCpu); + } float potentialMaxThreadCount = std::min(pool.MaxThreadCount, potentialMaxThreadCountWithoutSharedCpu + possibleMaxSharedQuota); pool.PotentialMaxThreadCount.store(potentialMaxThreadCount, std::memory_order_relaxed); @@ -350,7 +356,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { "potential max thread count: ", potentialMaxThreadCount, "potential max thread count without shared cpu: ", potentialMaxThreadCountWithoutSharedCpu, "possible max shared quota: ", possibleMaxSharedQuota, - "thread count: ", threadCount, + "thread count: ", fullThreadCount, "elapsed cpu: ", elapsedCpu, "parked cpu: ", parkedCpu ); @@ -434,7 +440,7 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo, bool ign TVector ownedThreads(Pools.size(), -1); Shared->FillOwnedThreads(ownedThreads); bool hasOwnSharedThread = ownedThreads[pool->PoolId] != -1; - Shared->SetForeignThreadSlots(pool->PoolId, Min(poolInfo.MaxThreadCount - hasOwnSharedThread, Shared->GetSharedThreadCount())); + Shared->SetForeignThreadSlots(pool->PoolId, Min(poolInfo.MaxThreadCount, Shared->GetSharedThreadCount()) - hasOwnSharedThread); } if (pingInfo) { poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; @@ -499,7 +505,7 @@ void THarmonizer::SetSharedPool(ISharedPool* pool) { } TString TPoolHarmonizerStats::ToString() const { - return TStringBuilder() << '{' + return TStringBuilder() << '{' << "IncreasingThreadsByNeedyState: " << IncreasingThreadsByNeedyState << ", " << "IncreasingThreadsByExchange: " << IncreasingThreadsByExchange << ", " << "DecreasingThreadsByStarvedState: " << DecreasingThreadsByStarvedState << ", " @@ -531,7 +537,8 @@ TString THarmonizerStats::ToString() const { << "MaxElapsedCpu: " << MaxElapsedCpu << ", " << "MinElapsedCpu: " << MinElapsedCpu << ", " << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << ", " - << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << '}'; + << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << ", " + << '}'; } } diff --git a/ydb/library/actors/core/harmonizer/harmonizer.h b/ydb/library/actors/core/harmonizer/harmonizer.h index 40b91e439e12..5d1886f7692c 100644 --- a/ydb/library/actors/core/harmonizer/harmonizer.h +++ b/ydb/library/actors/core/harmonizer/harmonizer.h @@ -6,6 +6,7 @@ namespace NActors { class IExecutorPool; class ISharedPool; struct TSelfPingInfo; + struct THarmonizerIterationState; template struct TWaitingStats; diff --git a/ydb/library/actors/core/harmonizer/shared_info.cpp b/ydb/library/actors/core/harmonizer/shared_info.cpp index d6d6b966db39..2b153aac7d65 100644 --- a/ydb/library/actors/core/harmonizer/shared_info.cpp +++ b/ydb/library/actors/core/harmonizer/shared_info.cpp @@ -29,6 +29,10 @@ void TSharedInfo::Pull(const std::vector> &pools, con parked -= CpuConsumptionPerThread[poolId][threadId].Elapsed; CpuConsumption[poolId].Elapsed += CpuConsumptionPerThread[poolId][threadId].Elapsed; CpuConsumption[poolId].Cpu += CpuConsumptionPerThread[poolId][threadId].Cpu; + if (poolId != ThreadOwners[threadId]) { + CpuConsumption[poolId].ForeignElapsed += CpuConsumptionPerThread[poolId][threadId].Elapsed; + CpuConsumption[poolId].ForeignCpu += CpuConsumptionPerThread[poolId][threadId].Cpu; + } } CpuConsumption[ThreadOwners[threadId]].CpuQuota += parked; FreeCpu += parked; @@ -95,6 +99,8 @@ TString TPoolSharedThreadCpuConsumption::ToString() const { builder << " Elapsed: " << Elapsed << "; "; builder << " Cpu: " << Cpu << "; "; builder << " CpuQuota: " << CpuQuota << "; "; + builder << " ForeignElapsed: " << ForeignElapsed << "; "; + builder << " ForeignCpu: " << ForeignCpu << "; "; builder << "}"; return builder; } diff --git a/ydb/library/actors/core/harmonizer/shared_info.h b/ydb/library/actors/core/harmonizer/shared_info.h index 382d1a3ce096..5e5a08107630 100644 --- a/ydb/library/actors/core/harmonizer/shared_info.h +++ b/ydb/library/actors/core/harmonizer/shared_info.h @@ -13,6 +13,8 @@ struct TPoolSharedThreadCpuConsumption { float Elapsed = 0; float Cpu = 0; float CpuQuota = 0; + float ForeignElapsed = 0; + float ForeignCpu = 0; TString ToString() const; }; @@ -34,6 +36,7 @@ struct TSharedInfo { std::vector> CpuConsumptionPerThread; TVector ThreadStats; // for pulling only float FreeCpu = 0.0; + float FreeCpuLS = 0.0; void Init(i16 poolCount, const ISharedPool *shared); void Pull(const std::vector>& pools, const ISharedPool& shared);