From 7a4753482a2261e2088dbf7be017d8c8403976ea Mon Sep 17 00:00:00 2001 From: kruall Date: Thu, 19 Jun 2025 15:13:35 +0300 Subject: [PATCH 1/2] Improve behaviour in corner cases when used shared threads (#19693) --- .../actors/core/executor_pool_basic.cpp | 12 +- .../core/harmonizer/cpu_consumption.cpp | 61 +++-- .../actors/core/harmonizer/cpu_consumption.h | 13 + .../actors/core/harmonizer/harmonizer.cpp | 86 ++++--- .../actors/core/harmonizer/harmonizer.h | 4 + .../actors/core/harmonizer/shared_info.cpp | 6 + .../actors/core/harmonizer/shared_info.h | 3 + .../core/manual_test/shared_threads/main.cpp | 237 ++++++++++++++++++ .../core/manual_test/shared_threads/ya.make | 11 + ydb/library/actors/core/mon_stats.h | 10 + 10 files changed, 387 insertions(+), 56 deletions(-) create mode 100644 ydb/library/actors/core/manual_test/shared_threads/main.cpp create mode 100644 ydb/library/actors/core/manual_test/shared_threads/ya.make diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index e1182da7d923..d64bcec6183c 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -49,7 +49,7 @@ namespace NActors { constexpr TDuration TBasicExecutorPool::DEFAULT_TIME_PER_MAILBOX; - TString GetCurrentThreadKind() { + TString GetCurrentThreadKind() { if (TlsThreadContext) { return TlsThreadContext->WorkerId() >= 0 ? "[common]" : "[shared]"; } @@ -268,7 +268,7 @@ namespace NActors { if (StopFlag.load(std::memory_order_acquire)) { return nullptr; } - + TWorkerId workerId = TlsThreadContext->WorkerId(); EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, ""); NHPTimer::STime hpnow = GetCycleCountFast(); @@ -391,7 +391,7 @@ namespace NActors { EXECUTOR_POOL_BASIC_DEBUG(EDebugLevel::Activation, "shared pool wake up local threads"); return; } - } + } i16 sleepThreads = 0; Y_UNUSED(sleepThreads); @@ -507,6 +507,10 @@ namespace NActors { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); poolState.ElapsedCpu = stats.AvgElapsedCpu; poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount; + poolState.SharedCpuQuota = stats.SharedCpuQuota; + poolState.IsNeedy = stats.IsNeedy; + poolState.IsStarved = stats.IsStarved; + poolState.IsHoggish = stats.IsHoggish; } else { poolState.PossibleMaxLimit = poolState.MaxLimit; } @@ -675,7 +679,7 @@ namespace NActors { i16 TBasicExecutorPool::GetMaxFullThreadCount() const { return MaxFullThreadCount; } - + ui32 TBasicExecutorPool::GetThreads() const { return MaxFullThreadCount; } diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp index 9e9e714f1373..ace01c7caa85 100644 --- a/ydb/library/actors/core/harmonizer/cpu_consumption.cpp +++ b/ydb/library/actors/core/harmonizer/cpu_consumption.cpp @@ -14,6 +14,8 @@ void TCpuConsumptionInfo::Clear() { void THarmonizerCpuConsumption::Init(i16 poolCount) { PoolConsumption.resize(poolCount); + PoolFullThreadConsumption.resize(poolCount); + PoolForeignConsumption.resize(poolCount); IsNeedyByPool.reserve(poolCount); NeedyPools.reserve(poolCount); HoggishPools.reserve(poolCount); @@ -25,8 +27,9 @@ namespace { return Max(0.0, Min(1.0, value * (1.0/0.9))); } - void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption) { + void UpdatePoolConsumption(const TPoolInfo& pool, TCpuConsumptionInfo *poolConsumption, TCpuConsumptionInfo *poolFullThreadConsumption) { poolConsumption->Clear(); + poolFullThreadConsumption->Clear(); for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) { float threadElapsed = Rescale(pool.GetElapsed(threadIdx)); float threadLastSecondElapsed = Rescale(pool.GetLastSecondElapsed(threadIdx)); @@ -36,13 +39,17 @@ namespace { poolConsumption->LastSecondElapsed += threadLastSecondElapsed; poolConsumption->Cpu += threadCpu; poolConsumption->LastSecondCpu += threadLastSecondCpu; + poolFullThreadConsumption->Elapsed += threadElapsed; + poolFullThreadConsumption->LastSecondElapsed += threadLastSecondElapsed; + poolFullThreadConsumption->Cpu += threadCpu; + poolFullThreadConsumption->LastSecondCpu += threadLastSecondCpu; LWPROBE_WITH_DEBUG(HarmonizeCheckPoolByThread, pool.Pool->PoolId, pool.Pool->GetName(), threadIdx, threadElapsed, threadCpu, threadLastSecondElapsed, threadLastSecondCpu); } for (i16 sharedIdx = 0; sharedIdx < static_cast(pool.SharedInfo.size()); ++sharedIdx) { - float sharedElapsed = Rescale(pool.GetSharedElapsed(sharedIdx)); - float sharedLastSecondElapsed = Rescale(pool.GetLastSecondSharedElapsed(sharedIdx)); - float sharedCpu = Rescale(pool.GetSharedCpu(sharedIdx)); - float sharedLastSecondCpu = Rescale(pool.GetLastSecondSharedCpu(sharedIdx)); + float sharedElapsed = pool.GetSharedElapsed(sharedIdx); + float sharedLastSecondElapsed = pool.GetLastSecondSharedElapsed(sharedIdx); + float sharedCpu = pool.GetSharedCpu(sharedIdx); + float sharedLastSecondCpu = pool.GetLastSecondSharedCpu(sharedIdx); poolConsumption->Elapsed += sharedElapsed; poolConsumption->LastSecondElapsed += sharedLastSecondElapsed; poolConsumption->Cpu += sharedCpu; @@ -51,12 +58,19 @@ namespace { } } + void UpdatePoolForeignConsumption(const TPoolInfo& pool, TPoolForeignConsumptionInfo *poolForeignConsumption, const TSharedInfo& sharedInfo) { + float prevElapsed = std::exchange(poolForeignConsumption->PrevElapsedValue, sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignElapsed); + float prevCpu = std::exchange(poolForeignConsumption->PrevCpuValue, sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignCpu); + poolForeignConsumption->Elapsed = sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignElapsed - prevElapsed; + poolForeignConsumption->Cpu = sharedInfo.CpuConsumption[pool.Pool->PoolId].ForeignCpu - prevCpu; + } + bool IsStarved(double elapsed, double cpu) { return Max(elapsed, cpu) > 0.1 && (cpu < elapsed * 0.7 || elapsed - cpu > 0.5); } bool IsHoggish(double elapsed, double currentThreadCount) { - return elapsed < currentThreadCount - 0.5; + return elapsed <= currentThreadCount - 1.0; } } // namespace @@ -82,12 +96,27 @@ void THarmonizerCpuConsumption::Pull(const std::vectorGetBlockingThreadCount(); HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "current thread count", currentThreadCount, "stopping threads", StoppingThreads, "default thread count", pool.DefaultThreadCount); - UpdatePoolConsumption(pool, &PoolConsumption[poolIdx]); - - HARMONIZER_DEBUG_PRINT("pool", poolIdx, "pool name", pool.Pool->GetName(), "elapsed", PoolConsumption[poolIdx].Elapsed, "cpu", PoolConsumption[poolIdx].Cpu, "last second elapsed", PoolConsumption[poolIdx].LastSecondElapsed, "last second cpu", PoolConsumption[poolIdx].LastSecondCpu); + UpdatePoolConsumption(pool, &PoolConsumption[poolIdx], &PoolFullThreadConsumption[poolIdx]); + UpdatePoolForeignConsumption(pool, &PoolForeignConsumption[poolIdx], sharedInfo); + + HARMONIZER_DEBUG_PRINT("CpuConsumption::Pull", + "pool:", poolIdx, + "pool name:", pool.Pool->GetName(), + "elapsed:", PoolConsumption[poolIdx].Elapsed, + "cpu:", PoolConsumption[poolIdx].Cpu, + "last second elapsed:", PoolConsumption[poolIdx].LastSecondElapsed, + "last second cpu:", PoolConsumption[poolIdx].LastSecondCpu, + "full thread elapsed:", PoolFullThreadConsumption[poolIdx].Elapsed, + "full thread cpu:", PoolFullThreadConsumption[poolIdx].Cpu, + "last second full thread elapsed:", PoolFullThreadConsumption[poolIdx].LastSecondElapsed, + "last second full thread cpu:", PoolFullThreadConsumption[poolIdx].LastSecondCpu, + "foreign elapsed:", PoolForeignConsumption[poolIdx].Elapsed, + "foreign cpu:", PoolForeignConsumption[poolIdx].Cpu + ); bool isStarved = IsStarved(PoolConsumption[poolIdx].Elapsed, PoolConsumption[poolIdx].Cpu) || IsStarved(PoolConsumption[poolIdx].LastSecondElapsed, PoolConsumption[poolIdx].LastSecondCpu); @@ -95,21 +124,23 @@ void THarmonizerCpuConsumption::Pull(const std::vector= currentThreadCount); + float expectedThreadCount = pool.GetFullThreadCount() + (sharedInfo.OwnedThreads[poolIdx] != -1 ? 1 : 0) + 0.5; + bool isMoreThanExpected = (PoolConsumption[poolIdx].LastSecondCpu >= expectedThreadCount) && (PoolFullThreadConsumption[poolIdx].LastSecondCpu >= currentFullThreadCount - 1); + bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && (PoolConsumption[poolIdx].LastSecondCpu >= currentThreadCount || isMoreThanExpected); IsNeedyByPool.push_back(isNeedy); if (isNeedy) { NeedyPools.push_back(poolIdx); } - bool isHoggish = IsHoggish(PoolConsumption[poolIdx].Elapsed, currentThreadCount); + bool isHoggish = !isNeedy && IsHoggish(PoolFullThreadConsumption[poolIdx].Elapsed, currentFullThreadCount) && IsHoggish(PoolFullThreadConsumption[poolIdx].LastSecondElapsed, currentFullThreadCount); if (isHoggish) { - float freeCpu = currentThreadCount - PoolConsumption[poolIdx].Elapsed; + float freeCpu = std::min(currentFullThreadCount - PoolFullThreadConsumption[poolIdx].Elapsed, currentFullThreadCount - PoolFullThreadConsumption[poolIdx].LastSecondElapsed); HoggishPools.push_back({poolIdx, freeCpu}); } Elapsed += PoolConsumption[poolIdx].Elapsed; Cpu += PoolConsumption[poolIdx].Cpu; - LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed; + LastSecondElapsed += PoolConsumption[poolIdx].LastSecondElapsed; LastSecondCpu += PoolConsumption[poolIdx].LastSecondCpu; pool.LastFlags.store((i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2), std::memory_order_relaxed); LWPROBE_WITH_DEBUG( @@ -140,10 +171,12 @@ void THarmonizerCpuConsumption::Pull(const std::vector(0.0f, Elapsed - Cpu); - if (Budget < -0.1) { + if (BudgetLS < -0.1) { IsStarvedPresent = true; } HARMONIZER_DEBUG_PRINT("IsStarvedPresent", IsStarvedPresent, "Budget", Budget, "Overbooked", Overbooked, "TotalCores", TotalCores, "Elapsed", Elapsed, "Cpu", Cpu, "LastSecondElapsed", LastSecondElapsed, "LastSecondCpu", LastSecondCpu); diff --git a/ydb/library/actors/core/harmonizer/cpu_consumption.h b/ydb/library/actors/core/harmonizer/cpu_consumption.h index 8779cf1eb64a..8801dad4353a 100644 --- a/ydb/library/actors/core/harmonizer/cpu_consumption.h +++ b/ydb/library/actors/core/harmonizer/cpu_consumption.h @@ -16,15 +16,28 @@ struct TCpuConsumptionInfo { void Clear(); }; // struct TCpuConsumptionInfo +struct TPoolForeignConsumptionInfo { + float Elapsed; + float Cpu; + float PrevElapsedValue; + float PrevCpuValue; +}; // struct TPoolForeignConsumptionInfo + struct THarmonizerCpuConsumption { std::vector PoolConsumption; + std::vector PoolFullThreadConsumption; + std::vector PoolForeignConsumption; float TotalCores = 0; i16 AdditionalThreads = 0; i16 StoppingThreads = 0; bool IsStarvedPresent = false; + float Budget = 0.0; + float BudgetLS = 0.0; float BudgetWithoutSharedCpu = 0.0; + float BudgetLSWithoutSharedCpu = 0.0; + float Overbooked = 0.0; float LostCpu = 0.0; diff --git a/ydb/library/actors/core/harmonizer/harmonizer.cpp b/ydb/library/actors/core/harmonizer/harmonizer.cpp index e1cf16c3d1f7..cf467942b2c9 100644 --- a/ydb/library/actors/core/harmonizer/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer/harmonizer.cpp @@ -28,8 +28,6 @@ namespace NActors { -constexpr float kMinFreeCpuThreshold = 0.1; - LWTRACE_USING(ACTORLIB_PROVIDER); @@ -67,6 +65,8 @@ class THarmonizer: public IHarmonizer { void ProcessNeedyState(); void ProcessExchange(); void ProcessHoggishState(); + + void SetForeignThreadSlotsForCurrentFullThreadCount(ui16 poolIdx); public: THarmonizer(ui64 ts); virtual ~THarmonizer(); @@ -106,7 +106,7 @@ void THarmonizer::PullStats(ui64 ts) { SharedInfo.Pull(Pools, *Shared); } CpuConsumption.Pull(Pools, SharedInfo); - ProcessingBudget = CpuConsumption.BudgetWithoutSharedCpu; + ProcessingBudget = CpuConsumption.BudgetLSWithoutSharedCpu; } void THarmonizer::ProcessWaitingStats() { @@ -131,6 +131,16 @@ void THarmonizer::ProcessWaitingStats() { } } +void THarmonizer::SetForeignThreadSlotsForCurrentFullThreadCount(ui16 poolIdx) { + if (Shared) { + bool hasOwnSharedThread = SharedInfo.OwnedThreads[poolIdx] != -1; + i16 currentFullThreadCount = Pools[poolIdx]->GetFullThreadCount(); + i16 slots = Pools[poolIdx]->MaxThreadCount - currentFullThreadCount - hasOwnSharedThread; + i16 maxSlots = Shared->GetSharedThreadCount() - hasOwnSharedThread; + Shared->SetForeignThreadSlots(poolIdx, Min(slots, maxSlots)); + } +} + void THarmonizer::ProcessStarvedState() { HARMONIZER_DEBUG_PRINT("ProcessStarvedState"); HARMONIZER_DEBUG_PRINT("shared info", SharedInfo.ToString()); @@ -138,11 +148,19 @@ void THarmonizer::ProcessStarvedState() { TPoolInfo &pool = *Pools[poolIdx]; i64 threadCount = pool.GetFullThreadCount(); HARMONIZER_DEBUG_PRINT("poolIdx", poolIdx, "threadCount", threadCount, "pool.DefaultFullThreadCount", pool.DefaultFullThreadCount); + if (CpuConsumption.PoolConsumption[poolIdx].Elapsed > pool.GetThreadCount()) { + continue; + } + i16 maxSharedCpuQuota = i16(SharedInfo.OwnedThreads[poolIdx] != -1) + SharedInfo.ForeignThreadsAllowed[poolIdx]; + if (SharedInfo.CpuConsumption[poolIdx].CpuQuota > maxSharedCpuQuota) { + continue; + } while (threadCount > pool.DefaultFullThreadCount) { pool.SetFullThreadCount(--threadCount); pool.DecreasingThreadsByStarvedState.fetch_add(1, std::memory_order_relaxed); CpuConsumption.AdditionalThreads--; CpuConsumption.StoppingThreads++; + SetForeignThreadSlotsForCurrentFullThreadCount(poolIdx); LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); if (CpuConsumption.Overbooked <= CpuConsumption.StoppingThreads) { @@ -166,21 +184,20 @@ void THarmonizer::ProcessNeedyState() { if (!CpuConsumption.IsNeedyByPool[needyPoolIdx]) { continue; } + float fullThreadCount = pool.GetFullThreadCount(); float threadCount = pool.GetFullThreadCount() + SharedInfo.CpuConsumption[needyPoolIdx].CpuQuota; - if (ProcessingBudget >= 1.0 && threadCount + 1 <= pool.MaxFullThreadCount && SharedInfo.FreeCpu < kMinFreeCpuThreshold) { + float foreignElapsed = CpuConsumption.PoolForeignConsumption[needyPoolIdx].Elapsed; + float extraForeignElapsed = std::max(0.0f, foreignElapsed - std::max(0, SharedInfo.ForeignThreadsAllowed[needyPoolIdx] - 1)); + bool allowedNextThreadCount = pool.GetFullThreadCount() + 1 <= pool.MaxFullThreadCount; + if (ProcessingBudget > 0.0 && ProcessingBudget + extraForeignElapsed >= 1.0 && allowedNextThreadCount) { pool.IncreasingThreadsByNeedyState.fetch_add(1, std::memory_order_relaxed); CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; CpuConsumption.AdditionalThreads++; - pool.SetFullThreadCount(threadCount + 1); - if (Shared) { - bool hasOwnSharedThread = SharedInfo.OwnedThreads[needyPoolIdx] != -1; - i16 slots = pool.MaxThreadCount - threadCount - 1 - hasOwnSharedThread; - i16 maxSlots = Shared->GetSharedThreadCount(); - Shared->SetForeignThreadSlots(needyPoolIdx, Min(slots, maxSlots)); - } + pool.SetFullThreadCount(fullThreadCount + 1); + SetForeignThreadSlotsForCurrentFullThreadCount(needyPoolIdx); ProcessingBudget -= 1.0; - LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); + LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", fullThreadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } if (pool.MaxLocalQueueSize) { bool needToExpandLocalQueue = ProcessingBudget < 1.0 || threadCount >= pool.MaxFullThreadCount; @@ -223,12 +240,7 @@ void THarmonizer::ProcessExchange() { CpuConsumption.IsNeedyByPool[needyPoolIdx] = false; takingAwayThreads++; pool.SetFullThreadCount(fullThreadCount + 1); - if (Shared) { - bool hasOwnSharedThread = SharedInfo.OwnedThreads[needyPoolIdx] != -1; - i16 slots = pool.MaxThreadCount - fullThreadCount - 1 - hasOwnSharedThread; - i16 maxSlots = Shared->GetSharedThreadCount(); - Shared->SetForeignThreadSlots(needyPoolIdx, Min(slots, maxSlots)); - } + SetForeignThreadSlotsForCurrentFullThreadCount(needyPoolIdx); LWPROBE_WITH_DEBUG(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } @@ -248,12 +260,7 @@ void THarmonizer::ProcessExchange() { } takingAwayThreads -= currentTakingAwayThreads; pool.SetFullThreadCount(fullThreadCount - currentTakingAwayThreads); - if (Shared) { - bool hasOwnSharedThread = SharedInfo.OwnedThreads[poolIdx] != -1; - i16 slots = pool.MaxThreadCount - fullThreadCount + currentTakingAwayThreads - hasOwnSharedThread; - i16 maxSlots = Shared->GetSharedThreadCount(); - Shared->SetForeignThreadSlots(poolIdx, Min(slots, maxSlots)); - } + SetForeignThreadSlotsForCurrentFullThreadCount(poolIdx); pool.DecreasingThreadsByExchange.fetch_add(currentTakingAwayThreads, std::memory_order_relaxed); LWPROBE_WITH_DEBUG(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", fullThreadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); @@ -268,12 +275,7 @@ void THarmonizer::ProcessHoggishState() { if (fullThreadCount > pool.MinFullThreadCount && freeCpu >= 1) { pool.DecreasingThreadsByHoggishState.fetch_add(1, std::memory_order_relaxed); pool.SetFullThreadCount(fullThreadCount - 1); - if (Shared) { - bool hasOwnSharedThread = SharedInfo.OwnedThreads[hoggishPoolIdx] != -1; - i16 slots = pool.MaxThreadCount - fullThreadCount + 1 - hasOwnSharedThread; - i16 maxSlots = Shared->GetSharedThreadCount(); - Shared->SetForeignThreadSlots(hoggishPoolIdx, Min(slots, maxSlots)); - } + SetForeignThreadSlotsForCurrentFullThreadCount(hoggishPoolIdx); LWPROBE_WITH_DEBUG(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", fullThreadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount); } if (pool.BasicPool && pool.LocalQueueSize > pool.MinLocalQueueSize) { @@ -334,11 +336,15 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { float poolSharedElapsedCpu = SharedInfo.CpuConsumption[poolIdx].Elapsed; possibleMaxSharedQuota = std::min(poolSharedElapsedCpu + freeSharedCpu, sharedThreads); } - float threadCount = pool.GetFullThreadCount(); - float elapsedCpu = pool.ElapsedCpu.GetAvgPart(); - float parkedCpu = Max(0.0f, threadCount - elapsedCpu); + + float fullThreadCount = pool.GetFullThreadCount(); + float elapsedCpu = CpuConsumption.PoolFullThreadConsumption[poolIdx].Elapsed; + float parkedCpu = Max(0.0f, fullThreadCount - elapsedCpu); float budgetWithoutSharedAndParkedCpu = std::max(0.0f, budgetWithoutSharedCpu - parkedCpu); - i16 potentialMaxThreadCountWithoutSharedCpu = std::min(pool.MaxThreadCount, threadCount + budgetWithoutSharedAndParkedCpu); + float potentialMaxThreadCountWithoutSharedCpu = std::min(pool.MaxThreadCount, fullThreadCount + budgetWithoutSharedAndParkedCpu); + if (!Shared) { + potentialMaxThreadCountWithoutSharedCpu = std::floor(potentialMaxThreadCountWithoutSharedCpu); + } float potentialMaxThreadCount = std::min(pool.MaxThreadCount, potentialMaxThreadCountWithoutSharedCpu + possibleMaxSharedQuota); pool.PotentialMaxThreadCount.store(potentialMaxThreadCount, std::memory_order_relaxed); @@ -350,7 +356,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { "potential max thread count: ", potentialMaxThreadCount, "potential max thread count without shared cpu: ", potentialMaxThreadCountWithoutSharedCpu, "possible max shared quota: ", possibleMaxSharedQuota, - "thread count: ", threadCount, + "thread count: ", fullThreadCount, "elapsed cpu: ", elapsedCpu, "parked cpu: ", parkedCpu ); @@ -434,7 +440,7 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo, bool ign TVector ownedThreads(Pools.size(), -1); Shared->FillOwnedThreads(ownedThreads); bool hasOwnSharedThread = ownedThreads[pool->PoolId] != -1; - Shared->SetForeignThreadSlots(pool->PoolId, Min(poolInfo.MaxThreadCount - hasOwnSharedThread, Shared->GetSharedThreadCount())); + Shared->SetForeignThreadSlots(pool->PoolId, Min(poolInfo.MaxThreadCount, Shared->GetSharedThreadCount()) - hasOwnSharedThread); } if (pingInfo) { poolInfo.AvgPingCounter = pingInfo->AvgPingCounter; @@ -491,6 +497,8 @@ THarmonizerStats THarmonizer::GetStats() const { .MinElapsedCpu = MinElapsedCpu.load(std::memory_order_relaxed), .AvgAwakeningTimeUs = WaitingInfo.AvgAwakeningTimeUs.load(std::memory_order_relaxed), .AvgWakingUpTimeUs = WaitingInfo.AvgWakingUpTimeUs.load(std::memory_order_relaxed), + .Budget = CpuConsumption.Budget, + .SharedFreeCpu = SharedInfo.FreeCpu, }; } @@ -499,7 +507,7 @@ void THarmonizer::SetSharedPool(ISharedPool* pool) { } TString TPoolHarmonizerStats::ToString() const { - return TStringBuilder() << '{' + return TStringBuilder() << '{' << "IncreasingThreadsByNeedyState: " << IncreasingThreadsByNeedyState << ", " << "IncreasingThreadsByExchange: " << IncreasingThreadsByExchange << ", " << "DecreasingThreadsByStarvedState: " << DecreasingThreadsByStarvedState << ", " @@ -531,7 +539,9 @@ TString THarmonizerStats::ToString() const { << "MaxElapsedCpu: " << MaxElapsedCpu << ", " << "MinElapsedCpu: " << MinElapsedCpu << ", " << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << ", " - << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << '}'; + << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << ", " + << "Budget: " << Budget << ", " + << "SharedFreeCpu: " << SharedFreeCpu << '}'; } } diff --git a/ydb/library/actors/core/harmonizer/harmonizer.h b/ydb/library/actors/core/harmonizer/harmonizer.h index 40b91e439e12..f7f5ff8a96be 100644 --- a/ydb/library/actors/core/harmonizer/harmonizer.h +++ b/ydb/library/actors/core/harmonizer/harmonizer.h @@ -6,6 +6,7 @@ namespace NActors { class IExecutorPool; class ISharedPool; struct TSelfPingInfo; + struct THarmonizerIterationState; template struct TWaitingStats; @@ -48,6 +49,9 @@ namespace NActors { float AvgAwakeningTimeUs = 0; float AvgWakingUpTimeUs = 0; + float Budget = 0.0; + float SharedFreeCpu = 0.0; + TString ToString() const; }; diff --git a/ydb/library/actors/core/harmonizer/shared_info.cpp b/ydb/library/actors/core/harmonizer/shared_info.cpp index d6d6b966db39..2b153aac7d65 100644 --- a/ydb/library/actors/core/harmonizer/shared_info.cpp +++ b/ydb/library/actors/core/harmonizer/shared_info.cpp @@ -29,6 +29,10 @@ void TSharedInfo::Pull(const std::vector> &pools, con parked -= CpuConsumptionPerThread[poolId][threadId].Elapsed; CpuConsumption[poolId].Elapsed += CpuConsumptionPerThread[poolId][threadId].Elapsed; CpuConsumption[poolId].Cpu += CpuConsumptionPerThread[poolId][threadId].Cpu; + if (poolId != ThreadOwners[threadId]) { + CpuConsumption[poolId].ForeignElapsed += CpuConsumptionPerThread[poolId][threadId].Elapsed; + CpuConsumption[poolId].ForeignCpu += CpuConsumptionPerThread[poolId][threadId].Cpu; + } } CpuConsumption[ThreadOwners[threadId]].CpuQuota += parked; FreeCpu += parked; @@ -95,6 +99,8 @@ TString TPoolSharedThreadCpuConsumption::ToString() const { builder << " Elapsed: " << Elapsed << "; "; builder << " Cpu: " << Cpu << "; "; builder << " CpuQuota: " << CpuQuota << "; "; + builder << " ForeignElapsed: " << ForeignElapsed << "; "; + builder << " ForeignCpu: " << ForeignCpu << "; "; builder << "}"; return builder; } diff --git a/ydb/library/actors/core/harmonizer/shared_info.h b/ydb/library/actors/core/harmonizer/shared_info.h index 382d1a3ce096..5e5a08107630 100644 --- a/ydb/library/actors/core/harmonizer/shared_info.h +++ b/ydb/library/actors/core/harmonizer/shared_info.h @@ -13,6 +13,8 @@ struct TPoolSharedThreadCpuConsumption { float Elapsed = 0; float Cpu = 0; float CpuQuota = 0; + float ForeignElapsed = 0; + float ForeignCpu = 0; TString ToString() const; }; @@ -34,6 +36,7 @@ struct TSharedInfo { std::vector> CpuConsumptionPerThread; TVector ThreadStats; // for pulling only float FreeCpu = 0.0; + float FreeCpuLS = 0.0; void Init(i16 poolCount, const ISharedPool *shared); void Pull(const std::vector>& pools, const ISharedPool& shared); diff --git a/ydb/library/actors/core/manual_test/shared_threads/main.cpp b/ydb/library/actors/core/manual_test/shared_threads/main.cpp new file mode 100644 index 000000000000..59df53c0ccc0 --- /dev/null +++ b/ydb/library/actors/core/manual_test/shared_threads/main.cpp @@ -0,0 +1,237 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace NActors; + +static TProgramShouldContinue ShouldContinue; + +void OnTerminate(int) { + ShouldContinue.ShouldStop(); +} + +struct TSharedState { + std::atomic PrevSmallTaskEndTs; + std::atomic MaxSmallTaskInterval; +}; + +class TWorkerActor : public TActorBootstrapped { +public: + TWorkerActor(TDuration duration, std::function onFinish) + : _duration(duration) + , _onFinish(onFinish) + {} + + void Work() { + ui64 sum = 0; + ui64 end = GetCycleCountFast() + Us2Ts(_duration.MicroSeconds()); + + while (GetCycleCountFast() < end) { + for (auto& item : _data) { + sum += item; + } + for (auto& item : _data) { + item += sum++; + } + } + _onFinish(); + } + + void Bootstrap() { + ui64 i = 0; + for (auto& item : _data) { + item = i++; + } + Work(); + PassAway(); + } + +private: + std::array _data; + TDuration _duration; + std::function _onFinish; +}; + +class TSchedulerActor : public TActorBootstrapped { +public: + struct TWorkerConfig { + i16 PoolId; + TDuration Duration; + }; + + TSchedulerActor(TDuration interval, std::vector workers, std::function onFinish) + : _interval(interval) + , _workers(workers) + , _onFinish(onFinish) + {} + + void Bootstrap() { + Become(&TSchedulerActor::StateWork); + HandleWakeUp(); + } + + void CreateWorkers() { + for (auto& worker : _workers) { + Register(new TWorkerActor(worker.Duration, _onFinish), TMailboxType::HTSwap, worker.PoolId); + } + } + + void HandleWakeUp() { + CreateWorkers(); + Schedule(_interval, new NActors::TEvents::TEvWakeup()); + } + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + sFunc(NActors::TEvents::TEvWakeup, HandleWakeUp); + } + } + +private: + TDuration _interval; + std::vector _workers; + std::function _onFinish; +}; + +class TSmallTaskActor : public TSchedulerActor { +public: + TSmallTaskActor(std::function onFinish) + : TSchedulerActor(TDuration::MilliSeconds(10), { + {0, TDuration::MilliSeconds(1)} + }, onFinish) + {} + + void Bootstrap() { + TSchedulerActor::Bootstrap(); + } +}; + +class THugeTaskActor : public TSchedulerActor { +public: + THugeTaskActor(std::function onFinish) + : TSchedulerActor(TDuration::MilliSeconds(500), { + {0, TDuration::MilliSeconds(100)}, + {0, TDuration::MilliSeconds(100)}, + {0, TDuration::MilliSeconds(100)}, + {0, TDuration::MilliSeconds(100)}, + {0, TDuration::MilliSeconds(100)}, + {0, TDuration::MilliSeconds(100)}, + {1, TDuration::MilliSeconds(100)}, + {1, TDuration::MilliSeconds(100)}, + {1, TDuration::MilliSeconds(100)}, + }, onFinish) + {} +}; + +THolder BuildActorSystemSetup(bool useSharedThread = true) { + auto setup = MakeHolder(); + + setup->NodeId = 1; + setup->CpuManager.Basic.emplace_back(TBasicExecutorPoolConfig{ + .PoolId = 0, + .Threads = 3, + .SpinThreshold = 0, + .MinThreadCount = 1, + .MaxThreadCount = 3, + .DefaultThreadCount = 1, + .HasSharedThread = useSharedThread, + }); + setup->CpuManager.Basic.emplace_back(TBasicExecutorPoolConfig{ + .PoolId = 1, + .Threads = 3, + .SpinThreshold = 0, + .MinThreadCount = 2, + .MaxThreadCount = 4, + .DefaultThreadCount = 2, + .HasSharedThread = useSharedThread, + }); + + setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0)); + + return setup; +} + +int main(int argc, char **argv) { + bool useSharedThread = true; + if (argc > 1) { + if (argv[1] == std::string("--no-shared-thread")) { + useSharedThread = false; + } else { + Cout << "Usage: " << argv[0] << " [--no-shared-thread]" << Endl; + return 1; + } + } + +#ifdef _unix_ + signal(SIGPIPE, SIG_IGN); +#endif + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + + THolder actorSystemSetup = BuildActorSystemSetup(useSharedThread); + TActorSystem actorSystem(actorSystemSetup); + + actorSystem.Start(); + + TSharedState state; + state.PrevSmallTaskEndTs.store(GetCycleCountFast()); + actorSystem.Register(new TSmallTaskActor([&state]() { + ui64 current = GetCycleCountFast(); + ui64 prev = state.PrevSmallTaskEndTs.exchange(current); + ui64 interval = current - prev; + for (;;) { + ui64 maxInterval = state.MaxSmallTaskInterval.load(); + if (interval > maxInterval) { + if (state.MaxSmallTaskInterval.compare_exchange_weak(maxInterval, interval)) { + break; + } + } else { + break; + } + } + })); + actorSystem.Register(new THugeTaskActor([]{})); + + auto printState = [&](i16 poolId) { + TExecutorPoolState state; + actorSystem.GetExecutorPoolState(poolId, state); + TStringBuilder flagBuilder; + if (state.IsNeedy) { + flagBuilder << "N"; + } + if (state.IsHoggish) { + flagBuilder << "H"; + } + if (state.IsStarved) { + flagBuilder << "S"; + } + if (!flagBuilder.empty()) { + flagBuilder << " "; + } + Cout << "Pool " << poolId << " state: " << flagBuilder << state.ElapsedCpu << " " << state.CurrentLimit << "/" << state.PossibleMaxLimit << "(" << state.MaxLimit << ") foreign shared elapsed cpu: " << state.SharedCpuQuota - 1 << Endl; + }; + + ui64 seconds = 0; + while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) { + Sleep(TDuration::MilliSeconds(1000)); + ui64 maxInterval = state.MaxSmallTaskInterval.exchange(0); + Cout << "--------------------------------" << Endl; + Cout << "Seconds: " << seconds++ << Endl; + Cout << "Max small task interval: " << std::round(Ts2Us(maxInterval)) << " us" << Endl; + printState(0); + printState(1); + THarmonizerStats stats = actorSystem.GetHarmonizerStats(); + Cout << "Harmonizer stats: budget: " << stats.Budget << " shared free cpu: " << stats.SharedFreeCpu << Endl; + } + + actorSystem.Stop(); + actorSystem.Cleanup(); + + return ShouldContinue.GetReturnCode(); +} diff --git a/ydb/library/actors/core/manual_test/shared_threads/ya.make b/ydb/library/actors/core/manual_test/shared_threads/ya.make new file mode 100644 index 000000000000..679b56e6e3dc --- /dev/null +++ b/ydb/library/actors/core/manual_test/shared_threads/ya.make @@ -0,0 +1,11 @@ +PROGRAM(shared_threads) + +SRCS( + main.cpp +) + +PEERDIR( + ydb/library/actors/core +) + +END() diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index cb41f03d1221..10dd03ff6206 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -42,17 +42,27 @@ namespace NActors { struct TExecutorPoolState { float ElapsedCpu = 0; + float SharedCpuQuota = 0; float CurrentLimit = 0; float PossibleMaxLimit = 0; float MaxLimit = 0; float MinLimit = 0; + i8 IsNeedy = 0; + i8 IsStarved = 0; + i8 IsHoggish = 0; + void Aggregate(const TExecutorPoolState& other) { ElapsedCpu += other.ElapsedCpu; + SharedCpuQuota += other.SharedCpuQuota; CurrentLimit += other.CurrentLimit; PossibleMaxLimit += other.PossibleMaxLimit; MaxLimit += other.MaxLimit; MinLimit += other.MinLimit; + + IsNeedy += other.IsNeedy; + IsStarved += other.IsStarved; + IsHoggish += other.IsHoggish; } }; From 47c79e8c89884f8d478e2f4cb8dc7e73874d587c Mon Sep 17 00:00:00 2001 From: Kriukov Aleksandr Date: Tue, 8 Jul 2025 15:46:55 +0300 Subject: [PATCH 2/2] remove manual test --- .../actors/core/executor_pool_basic.cpp | 4 - .../actors/core/harmonizer/harmonizer.cpp | 5 +- .../actors/core/harmonizer/harmonizer.h | 3 - .../core/manual_test/shared_threads/main.cpp | 237 ------------------ .../core/manual_test/shared_threads/ya.make | 11 - ydb/library/actors/core/mon_stats.h | 10 - 6 files changed, 1 insertion(+), 269 deletions(-) delete mode 100644 ydb/library/actors/core/manual_test/shared_threads/main.cpp delete mode 100644 ydb/library/actors/core/manual_test/shared_threads/ya.make diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index d64bcec6183c..fdceebc853db 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -507,10 +507,6 @@ namespace NActors { TPoolHarmonizerStats stats = Harmonizer->GetPoolStats(PoolId); poolState.ElapsedCpu = stats.AvgElapsedCpu; poolState.PossibleMaxLimit = stats.PotentialMaxThreadCount; - poolState.SharedCpuQuota = stats.SharedCpuQuota; - poolState.IsNeedy = stats.IsNeedy; - poolState.IsStarved = stats.IsStarved; - poolState.IsHoggish = stats.IsHoggish; } else { poolState.PossibleMaxLimit = poolState.MaxLimit; } diff --git a/ydb/library/actors/core/harmonizer/harmonizer.cpp b/ydb/library/actors/core/harmonizer/harmonizer.cpp index cf467942b2c9..4dd985d75698 100644 --- a/ydb/library/actors/core/harmonizer/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer/harmonizer.cpp @@ -497,8 +497,6 @@ THarmonizerStats THarmonizer::GetStats() const { .MinElapsedCpu = MinElapsedCpu.load(std::memory_order_relaxed), .AvgAwakeningTimeUs = WaitingInfo.AvgAwakeningTimeUs.load(std::memory_order_relaxed), .AvgWakingUpTimeUs = WaitingInfo.AvgWakingUpTimeUs.load(std::memory_order_relaxed), - .Budget = CpuConsumption.Budget, - .SharedFreeCpu = SharedInfo.FreeCpu, }; } @@ -540,8 +538,7 @@ TString THarmonizerStats::ToString() const { << "MinElapsedCpu: " << MinElapsedCpu << ", " << "AvgAwakeningTimeUs: " << AvgAwakeningTimeUs << ", " << "AvgWakingUpTimeUs: " << AvgWakingUpTimeUs << ", " - << "Budget: " << Budget << ", " - << "SharedFreeCpu: " << SharedFreeCpu << '}'; + << '}'; } } diff --git a/ydb/library/actors/core/harmonizer/harmonizer.h b/ydb/library/actors/core/harmonizer/harmonizer.h index f7f5ff8a96be..5d1886f7692c 100644 --- a/ydb/library/actors/core/harmonizer/harmonizer.h +++ b/ydb/library/actors/core/harmonizer/harmonizer.h @@ -49,9 +49,6 @@ namespace NActors { float AvgAwakeningTimeUs = 0; float AvgWakingUpTimeUs = 0; - float Budget = 0.0; - float SharedFreeCpu = 0.0; - TString ToString() const; }; diff --git a/ydb/library/actors/core/manual_test/shared_threads/main.cpp b/ydb/library/actors/core/manual_test/shared_threads/main.cpp deleted file mode 100644 index 59df53c0ccc0..000000000000 --- a/ydb/library/actors/core/manual_test/shared_threads/main.cpp +++ /dev/null @@ -1,237 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace NActors; - -static TProgramShouldContinue ShouldContinue; - -void OnTerminate(int) { - ShouldContinue.ShouldStop(); -} - -struct TSharedState { - std::atomic PrevSmallTaskEndTs; - std::atomic MaxSmallTaskInterval; -}; - -class TWorkerActor : public TActorBootstrapped { -public: - TWorkerActor(TDuration duration, std::function onFinish) - : _duration(duration) - , _onFinish(onFinish) - {} - - void Work() { - ui64 sum = 0; - ui64 end = GetCycleCountFast() + Us2Ts(_duration.MicroSeconds()); - - while (GetCycleCountFast() < end) { - for (auto& item : _data) { - sum += item; - } - for (auto& item : _data) { - item += sum++; - } - } - _onFinish(); - } - - void Bootstrap() { - ui64 i = 0; - for (auto& item : _data) { - item = i++; - } - Work(); - PassAway(); - } - -private: - std::array _data; - TDuration _duration; - std::function _onFinish; -}; - -class TSchedulerActor : public TActorBootstrapped { -public: - struct TWorkerConfig { - i16 PoolId; - TDuration Duration; - }; - - TSchedulerActor(TDuration interval, std::vector workers, std::function onFinish) - : _interval(interval) - , _workers(workers) - , _onFinish(onFinish) - {} - - void Bootstrap() { - Become(&TSchedulerActor::StateWork); - HandleWakeUp(); - } - - void CreateWorkers() { - for (auto& worker : _workers) { - Register(new TWorkerActor(worker.Duration, _onFinish), TMailboxType::HTSwap, worker.PoolId); - } - } - - void HandleWakeUp() { - CreateWorkers(); - Schedule(_interval, new NActors::TEvents::TEvWakeup()); - } - - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - sFunc(NActors::TEvents::TEvWakeup, HandleWakeUp); - } - } - -private: - TDuration _interval; - std::vector _workers; - std::function _onFinish; -}; - -class TSmallTaskActor : public TSchedulerActor { -public: - TSmallTaskActor(std::function onFinish) - : TSchedulerActor(TDuration::MilliSeconds(10), { - {0, TDuration::MilliSeconds(1)} - }, onFinish) - {} - - void Bootstrap() { - TSchedulerActor::Bootstrap(); - } -}; - -class THugeTaskActor : public TSchedulerActor { -public: - THugeTaskActor(std::function onFinish) - : TSchedulerActor(TDuration::MilliSeconds(500), { - {0, TDuration::MilliSeconds(100)}, - {0, TDuration::MilliSeconds(100)}, - {0, TDuration::MilliSeconds(100)}, - {0, TDuration::MilliSeconds(100)}, - {0, TDuration::MilliSeconds(100)}, - {0, TDuration::MilliSeconds(100)}, - {1, TDuration::MilliSeconds(100)}, - {1, TDuration::MilliSeconds(100)}, - {1, TDuration::MilliSeconds(100)}, - }, onFinish) - {} -}; - -THolder BuildActorSystemSetup(bool useSharedThread = true) { - auto setup = MakeHolder(); - - setup->NodeId = 1; - setup->CpuManager.Basic.emplace_back(TBasicExecutorPoolConfig{ - .PoolId = 0, - .Threads = 3, - .SpinThreshold = 0, - .MinThreadCount = 1, - .MaxThreadCount = 3, - .DefaultThreadCount = 1, - .HasSharedThread = useSharedThread, - }); - setup->CpuManager.Basic.emplace_back(TBasicExecutorPoolConfig{ - .PoolId = 1, - .Threads = 3, - .SpinThreshold = 0, - .MinThreadCount = 2, - .MaxThreadCount = 4, - .DefaultThreadCount = 2, - .HasSharedThread = useSharedThread, - }); - - setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0)); - - return setup; -} - -int main(int argc, char **argv) { - bool useSharedThread = true; - if (argc > 1) { - if (argv[1] == std::string("--no-shared-thread")) { - useSharedThread = false; - } else { - Cout << "Usage: " << argv[0] << " [--no-shared-thread]" << Endl; - return 1; - } - } - -#ifdef _unix_ - signal(SIGPIPE, SIG_IGN); -#endif - signal(SIGINT, &OnTerminate); - signal(SIGTERM, &OnTerminate); - - THolder actorSystemSetup = BuildActorSystemSetup(useSharedThread); - TActorSystem actorSystem(actorSystemSetup); - - actorSystem.Start(); - - TSharedState state; - state.PrevSmallTaskEndTs.store(GetCycleCountFast()); - actorSystem.Register(new TSmallTaskActor([&state]() { - ui64 current = GetCycleCountFast(); - ui64 prev = state.PrevSmallTaskEndTs.exchange(current); - ui64 interval = current - prev; - for (;;) { - ui64 maxInterval = state.MaxSmallTaskInterval.load(); - if (interval > maxInterval) { - if (state.MaxSmallTaskInterval.compare_exchange_weak(maxInterval, interval)) { - break; - } - } else { - break; - } - } - })); - actorSystem.Register(new THugeTaskActor([]{})); - - auto printState = [&](i16 poolId) { - TExecutorPoolState state; - actorSystem.GetExecutorPoolState(poolId, state); - TStringBuilder flagBuilder; - if (state.IsNeedy) { - flagBuilder << "N"; - } - if (state.IsHoggish) { - flagBuilder << "H"; - } - if (state.IsStarved) { - flagBuilder << "S"; - } - if (!flagBuilder.empty()) { - flagBuilder << " "; - } - Cout << "Pool " << poolId << " state: " << flagBuilder << state.ElapsedCpu << " " << state.CurrentLimit << "/" << state.PossibleMaxLimit << "(" << state.MaxLimit << ") foreign shared elapsed cpu: " << state.SharedCpuQuota - 1 << Endl; - }; - - ui64 seconds = 0; - while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) { - Sleep(TDuration::MilliSeconds(1000)); - ui64 maxInterval = state.MaxSmallTaskInterval.exchange(0); - Cout << "--------------------------------" << Endl; - Cout << "Seconds: " << seconds++ << Endl; - Cout << "Max small task interval: " << std::round(Ts2Us(maxInterval)) << " us" << Endl; - printState(0); - printState(1); - THarmonizerStats stats = actorSystem.GetHarmonizerStats(); - Cout << "Harmonizer stats: budget: " << stats.Budget << " shared free cpu: " << stats.SharedFreeCpu << Endl; - } - - actorSystem.Stop(); - actorSystem.Cleanup(); - - return ShouldContinue.GetReturnCode(); -} diff --git a/ydb/library/actors/core/manual_test/shared_threads/ya.make b/ydb/library/actors/core/manual_test/shared_threads/ya.make deleted file mode 100644 index 679b56e6e3dc..000000000000 --- a/ydb/library/actors/core/manual_test/shared_threads/ya.make +++ /dev/null @@ -1,11 +0,0 @@ -PROGRAM(shared_threads) - -SRCS( - main.cpp -) - -PEERDIR( - ydb/library/actors/core -) - -END() diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index 10dd03ff6206..cb41f03d1221 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -42,27 +42,17 @@ namespace NActors { struct TExecutorPoolState { float ElapsedCpu = 0; - float SharedCpuQuota = 0; float CurrentLimit = 0; float PossibleMaxLimit = 0; float MaxLimit = 0; float MinLimit = 0; - i8 IsNeedy = 0; - i8 IsStarved = 0; - i8 IsHoggish = 0; - void Aggregate(const TExecutorPoolState& other) { ElapsedCpu += other.ElapsedCpu; - SharedCpuQuota += other.SharedCpuQuota; CurrentLimit += other.CurrentLimit; PossibleMaxLimit += other.PossibleMaxLimit; MaxLimit += other.MaxLimit; MinLimit += other.MinLimit; - - IsNeedy += other.IsNeedy; - IsStarved += other.IsStarved; - IsHoggish += other.IsHoggish; } };