28
28
29
29
namespace NActors {
30
30
31
-
31
+ constexpr float kMinFreeCpuThreshold = 0.1 ;
32
32
33
33
LWTRACE_USING (ACTORLIB_PROVIDER);
34
34
@@ -73,7 +73,7 @@ class THarmonizer: public IHarmonizer {
73
73
double Rescale (double value) const ;
74
74
void Harmonize (ui64 ts) override ;
75
75
void DeclareEmergency (ui64 ts) override ;
76
- void AddPool (IExecutorPool* pool, TSelfPingInfo *pingInfo) override ;
76
+ void AddPool (IExecutorPool* pool, TSelfPingInfo *pingInfo, bool ignoreFullThreadQuota = false ) override ;
77
77
void Enable (bool enable) override ;
78
78
TPoolHarmonizerStats GetPoolStats (i16 poolId) const override ;
79
79
THarmonizerStats GetStats () const override ;
@@ -103,10 +103,10 @@ void THarmonizer::PullStats(ui64 ts) {
103
103
104
104
WaitingInfo.Pull (Pools);
105
105
if (Shared) {
106
- SharedInfo.Pull (*Shared);
106
+ SharedInfo.Pull (Pools, *Shared);
107
107
}
108
108
CpuConsumption.Pull (Pools, SharedInfo);
109
- ProcessingBudget = CpuConsumption.Budget ;
109
+ ProcessingBudget = CpuConsumption.BudgetWithoutSharedCpu ;
110
110
}
111
111
112
112
void THarmonizer::ProcessWaitingStats () {
@@ -168,14 +168,17 @@ void THarmonizer::ProcessNeedyState() {
168
168
}
169
169
float threadCount = pool.GetFullThreadCount () + SharedInfo.CpuConsumption [needyPoolIdx].CpuQuota ;
170
170
171
- if (Shared && SharedInfo.ForeignThreadsAllowed [needyPoolIdx] == 0 ) {
172
- Shared->SetForeignThreadSlots (needyPoolIdx, static_cast <i16 >(Pools.size ()));
173
- CpuConsumption.IsNeedyByPool [needyPoolIdx] = false ;
174
- } else if (ProcessingBudget >= 1.0 && threadCount + 1 <= pool.MaxFullThreadCount ) {
171
+ if (ProcessingBudget >= 1.0 && threadCount + 1 <= pool.MaxFullThreadCount && SharedInfo.FreeCpu < kMinFreeCpuThreshold ) {
175
172
pool.IncreasingThreadsByNeedyState .fetch_add (1 , std::memory_order_relaxed);
176
173
CpuConsumption.IsNeedyByPool [needyPoolIdx] = false ;
177
174
CpuConsumption.AdditionalThreads ++;
178
175
pool.SetFullThreadCount (threadCount + 1 );
176
+ if (Shared) {
177
+ bool hasOwnSharedThread = SharedInfo.OwnedThreads [needyPoolIdx] != -1 ;
178
+ i16 slots = pool.MaxThreadCount - threadCount - 1 - hasOwnSharedThread;
179
+ i16 maxSlots = Shared->GetSharedThreadCount ();
180
+ Shared->SetForeignThreadSlots (needyPoolIdx, Min<i16 >(slots, maxSlots));
181
+ }
179
182
ProcessingBudget -= 1.0 ;
180
183
LWPROBE_WITH_DEBUG (HarmonizeOperation, needyPoolIdx, pool.Pool ->GetName (), " increase by needs" , threadCount + 1 , pool.DefaultFullThreadCount , pool.MaxFullThreadCount );
181
184
}
@@ -201,16 +204,16 @@ void THarmonizer::ProcessExchange() {
201
204
size_t sumOfAdditionalThreads = CpuConsumption.AdditionalThreads ;
202
205
for (size_t needyPoolIdx : CpuConsumption.NeedyPools ) {
203
206
TPoolInfo &pool = *Pools[needyPoolIdx];
204
- i64 threadCount = pool.GetFullThreadCount ();
207
+ i64 fullThreadCount = pool.GetFullThreadCount ();
208
+ float threadCount = fullThreadCount + SharedInfo.CpuConsumption [needyPoolIdx].CpuQuota ;
205
209
206
- sumOfAdditionalThreads -= threadCount - pool.DefaultFullThreadCount ;
210
+ sumOfAdditionalThreads -= fullThreadCount - pool.DefaultFullThreadCount ;
207
211
if (sumOfAdditionalThreads < takingAwayThreads + 1 ) {
208
212
break ;
209
213
}
210
214
211
- if (Shared && SharedInfo.ForeignThreadsAllowed [needyPoolIdx] < static_cast <i16 >(Pools.size ())) {
212
- Shared->SetForeignThreadSlots (needyPoolIdx, static_cast <i16 >(Pools.size ()));
213
- CpuConsumption.IsNeedyByPool [needyPoolIdx] = false ;
215
+ if (threadCount + 1 > pool.MaxFullThreadCount ) {
216
+ continue ;
214
217
}
215
218
216
219
if (!CpuConsumption.IsNeedyByPool [needyPoolIdx]) {
@@ -219,7 +222,13 @@ void THarmonizer::ProcessExchange() {
219
222
pool.IncreasingThreadsByExchange .fetch_add (1 , std::memory_order_relaxed);
220
223
CpuConsumption.IsNeedyByPool [needyPoolIdx] = false ;
221
224
takingAwayThreads++;
222
- pool.SetFullThreadCount (threadCount + 1 );
225
+ pool.SetFullThreadCount (fullThreadCount + 1 );
226
+ if (Shared) {
227
+ bool hasOwnSharedThread = SharedInfo.OwnedThreads [needyPoolIdx] != -1 ;
228
+ i16 slots = pool.MaxThreadCount - fullThreadCount - 1 - hasOwnSharedThread;
229
+ i16 maxSlots = Shared->GetSharedThreadCount ();
230
+ Shared->SetForeignThreadSlots (needyPoolIdx, Min<i16 >(slots, maxSlots));
231
+ }
223
232
224
233
LWPROBE_WITH_DEBUG (HarmonizeOperation, needyPoolIdx, pool.Pool ->GetName (), " increase by exchanging" , threadCount + 1 , pool.DefaultFullThreadCount , pool.MaxFullThreadCount );
225
234
}
@@ -230,39 +239,48 @@ void THarmonizer::ProcessExchange() {
230
239
}
231
240
232
241
TPoolInfo &pool = *Pools[poolIdx];
233
- size_t threadCount = pool.GetFullThreadCount ();
234
- size_t additionalThreadsCount = Max<size_t >(0L , threadCount - pool.DefaultFullThreadCount );
242
+ size_t fullThreadCount = pool.GetFullThreadCount ();
243
+ size_t additionalThreadsCount = Max<size_t >(0L , fullThreadCount - pool.DefaultFullThreadCount );
235
244
size_t currentTakingAwayThreads = Min (additionalThreadsCount, takingAwayThreads);
236
245
237
246
if (!currentTakingAwayThreads) {
238
247
continue ;
239
248
}
240
249
takingAwayThreads -= currentTakingAwayThreads;
241
- pool.SetFullThreadCount (threadCount - currentTakingAwayThreads);
250
+ pool.SetFullThreadCount (fullThreadCount - currentTakingAwayThreads);
251
+ if (Shared) {
252
+ bool hasOwnSharedThread = SharedInfo.OwnedThreads [poolIdx] != -1 ;
253
+ i16 slots = pool.MaxThreadCount - fullThreadCount + currentTakingAwayThreads - hasOwnSharedThread;
254
+ i16 maxSlots = Shared->GetSharedThreadCount ();
255
+ Shared->SetForeignThreadSlots (poolIdx, Min<i16 >(slots, maxSlots));
256
+ }
242
257
243
258
pool.DecreasingThreadsByExchange .fetch_add (currentTakingAwayThreads, std::memory_order_relaxed);
244
- LWPROBE_WITH_DEBUG (HarmonizeOperation, poolIdx, pool.Pool ->GetName (), " decrease by exchanging" , threadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount , pool.MaxFullThreadCount );
259
+ LWPROBE_WITH_DEBUG (HarmonizeOperation, poolIdx, pool.Pool ->GetName (), " decrease by exchanging" , fullThreadCount - currentTakingAwayThreads, pool.DefaultFullThreadCount , pool.MaxFullThreadCount );
245
260
}
246
261
}
247
262
248
263
void THarmonizer::ProcessHoggishState () {
249
264
HARMONIZER_DEBUG_PRINT (" ProcessHoggishState" );
250
265
for (auto &[hoggishPoolIdx, freeCpu] : CpuConsumption.HoggishPools ) {
251
266
TPoolInfo &pool = *Pools[hoggishPoolIdx];
252
- i64 threadCount = pool.GetFullThreadCount ();
253
- if (threadCount > pool.MinFullThreadCount && freeCpu >= 1 ) {
267
+ i64 fullThreadCount = pool.GetFullThreadCount ();
268
+ if (fullThreadCount > pool.MinFullThreadCount && freeCpu >= 1 ) {
254
269
pool.DecreasingThreadsByHoggishState .fetch_add (1 , std::memory_order_relaxed);
255
- pool.SetFullThreadCount (threadCount - 1 );
256
- LWPROBE_WITH_DEBUG (HarmonizeOperation, hoggishPoolIdx, pool.Pool ->GetName (), " decrease by hoggish" , threadCount - 1 , pool.DefaultFullThreadCount , pool.MaxFullThreadCount );
257
- }
258
- if (threadCount == pool.MinFullThreadCount && Shared && SharedInfo.ForeignThreadsAllowed [hoggishPoolIdx] != 0 ) {
259
- Shared->SetForeignThreadSlots (hoggishPoolIdx, 0 );
270
+ pool.SetFullThreadCount (fullThreadCount - 1 );
271
+ if (Shared) {
272
+ bool hasOwnSharedThread = SharedInfo.OwnedThreads [hoggishPoolIdx] != -1 ;
273
+ i16 slots = pool.MaxThreadCount - fullThreadCount + 1 - hasOwnSharedThread;
274
+ i16 maxSlots = Shared->GetSharedThreadCount ();
275
+ Shared->SetForeignThreadSlots (hoggishPoolIdx, Min<i16 >(slots, maxSlots));
276
+ }
277
+ LWPROBE_WITH_DEBUG (HarmonizeOperation, hoggishPoolIdx, pool.Pool ->GetName (), " decrease by hoggish" , fullThreadCount - 1 , pool.DefaultFullThreadCount , pool.MaxFullThreadCount );
260
278
}
261
279
if (pool.BasicPool && pool.LocalQueueSize > pool.MinLocalQueueSize ) {
262
280
pool.LocalQueueSize = std::min<ui16>(pool.MinLocalQueueSize , pool.LocalQueueSize / 2 );
263
281
pool.BasicPool ->SetLocalQueueSize (pool.LocalQueueSize );
264
282
}
265
- HARMONIZER_DEBUG_PRINT (" poolIdx" , hoggishPoolIdx, " threadCount" , threadCount , " pool.MinFullThreadCount" , pool.MinFullThreadCount , " freeCpu" , freeCpu);
283
+ HARMONIZER_DEBUG_PRINT (" poolIdx" , hoggishPoolIdx, " threadCount" , fullThreadCount , " pool.MinFullThreadCount" , pool.MinFullThreadCount , " freeCpu" , freeCpu);
266
284
}
267
285
}
268
286
@@ -305,8 +323,37 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
305
323
306
324
for (size_t poolIdx = 0 ; poolIdx < Pools.size (); ++poolIdx) {
307
325
TPoolInfo& pool = *Pools[poolIdx];
308
- pool.PotentialMaxThreadCount .store (std::min<i64 >(pool.MaxThreadCount , static_cast <i64 >(pool.GetThreadCount () + CpuConsumption.Budget )), std::memory_order_relaxed);
309
- HARMONIZER_DEBUG_PRINT (poolIdx, pool.Pool ->GetName (), " potential max thread count" , pool.PotentialMaxThreadCount .load (std::memory_order_relaxed), " budget" , CpuConsumption.Budget , " thread count" , pool.GetThreadCount ());
326
+
327
+ float freeSharedCpu = SharedInfo.FreeCpu ;
328
+ float budgetWithoutSharedCpu = std::max<float >(0 .0f , CpuConsumption.BudgetWithoutSharedCpu );
329
+
330
+ float possibleMaxSharedQuota = 0 .0f ;
331
+ if (Shared) {
332
+ bool hasOwnSharedThread = SharedInfo.OwnedThreads [poolIdx] != -1 ;
333
+ i16 sharedThreads = std::min<i16 >(SharedInfo.ForeignThreadsAllowed [poolIdx] + hasOwnSharedThread, SharedInfo.ThreadCount );
334
+ float poolSharedElapsedCpu = SharedInfo.CpuConsumption [poolIdx].Elapsed ;
335
+ possibleMaxSharedQuota = std::min<float >(poolSharedElapsedCpu + freeSharedCpu, sharedThreads);
336
+ }
337
+ float threadCount = pool.GetFullThreadCount ();
338
+ float elapsedCpu = pool.ElapsedCpu .GetAvgPart ();
339
+ float parkedCpu = Max<float >(0 .0f , threadCount - elapsedCpu);
340
+ float budgetWithoutSharedAndParkedCpu = std::max<float >(0 .0f , budgetWithoutSharedCpu - parkedCpu);
341
+ i16 potentialMaxThreadCountWithoutSharedCpu = std::min<float >(pool.MaxThreadCount , threadCount + budgetWithoutSharedAndParkedCpu);
342
+ float potentialMaxThreadCount = std::min<float >(pool.MaxThreadCount , potentialMaxThreadCountWithoutSharedCpu + possibleMaxSharedQuota);
343
+
344
+ pool.PotentialMaxThreadCount .store (potentialMaxThreadCount, std::memory_order_relaxed);
345
+ HARMONIZER_DEBUG_PRINT (poolIdx, pool.Pool ->GetName (),
346
+ " budget: " , CpuConsumption.Budget ,
347
+ " free shared cpu: " , freeSharedCpu,
348
+ " budget without shared cpu: " , budgetWithoutSharedCpu,
349
+ " budget without shared and parked cpu: " , budgetWithoutSharedAndParkedCpu,
350
+ " potential max thread count: " , potentialMaxThreadCount,
351
+ " potential max thread count without shared cpu: " , potentialMaxThreadCountWithoutSharedCpu,
352
+ " possible max shared quota: " , possibleMaxSharedQuota,
353
+ " thread count: " , threadCount,
354
+ " elapsed cpu: " , elapsedCpu,
355
+ " parked cpu: " , parkedCpu
356
+ );
310
357
}
311
358
}
312
359
@@ -362,14 +409,16 @@ void THarmonizer::DeclareEmergency(ui64 ts) {
362
409
NextHarmonizeTs = ts;
363
410
}
364
411
365
- void THarmonizer::AddPool (IExecutorPool* pool, TSelfPingInfo *pingInfo) {
412
+ void THarmonizer::AddPool (IExecutorPool* pool, TSelfPingInfo *pingInfo, bool ignoreFullThreadQuota ) {
366
413
TGuard<TSpinLock> guard (Lock);
367
414
Pools.emplace_back (new TPoolInfo);
368
415
TPoolInfo &poolInfo = *Pools.back ();
369
416
poolInfo.Pool = pool;
370
417
poolInfo.Shared = Shared;
371
418
poolInfo.BasicPool = dynamic_cast <TBasicExecutorPool*>(pool);
419
+
372
420
poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount ();
421
+ poolInfo.ThreadQuota = ignoreFullThreadQuota ? 0 : poolInfo.DefaultThreadCount ;
373
422
poolInfo.MinThreadCount = pool->GetMinThreadCount ();
374
423
poolInfo.MaxThreadCount = pool->GetMaxThreadCount ();
375
424
poolInfo.PotentialMaxThreadCount = poolInfo.MaxThreadCount ;
@@ -381,6 +430,12 @@ void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
381
430
poolInfo.SharedInfo .resize (Shared ? Shared->GetSharedThreadCount () : 0 );
382
431
poolInfo.Priority = pool->GetPriority ();
383
432
pool->SetFullThreadCount (poolInfo.DefaultFullThreadCount );
433
+ if (Shared) {
434
+ TVector<i16 > ownedThreads (Pools.size (), -1 );
435
+ Shared->FillOwnedThreads (ownedThreads);
436
+ bool hasOwnSharedThread = ownedThreads[pool->PoolId ] != -1 ;
437
+ Shared->SetForeignThreadSlots (pool->PoolId , Min<i16 >(poolInfo.MaxThreadCount - hasOwnSharedThread, Shared->GetSharedThreadCount ()));
438
+ }
384
439
if (pingInfo) {
385
440
poolInfo.AvgPingCounter = pingInfo->AvgPingCounter ;
386
441
poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow ;
@@ -414,12 +469,6 @@ TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const {
414
469
.DecreasingThreadsByStarvedState = pool.DecreasingThreadsByStarvedState .load (std::memory_order_relaxed),
415
470
.DecreasingThreadsByHoggishState = pool.DecreasingThreadsByHoggishState .load (std::memory_order_relaxed),
416
471
.DecreasingThreadsByExchange = pool.DecreasingThreadsByExchange .load (std::memory_order_relaxed),
417
- .ReceivedHalfThreadByNeedyState = pool.ReceivedHalfThreadByNeedyState .load (std::memory_order_relaxed),
418
- .GivenHalfThreadByOtherStarvedState = pool.GivenHalfThreadByOtherStarvedState .load (std::memory_order_relaxed),
419
- .GivenHalfThreadByHoggishState = pool.GivenHalfThreadByHoggishState .load (std::memory_order_relaxed),
420
- .GivenHalfThreadByOtherNeedyState = pool.GivenHalfThreadByOtherNeedyState .load (std::memory_order_relaxed),
421
- .ReturnedHalfThreadByStarvedState = pool.ReturnedHalfThreadByStarvedState .load (std::memory_order_relaxed),
422
- .ReturnedHalfThreadByOtherHoggishState = pool.ReturnedHalfThreadByOtherHoggishState .load (std::memory_order_relaxed),
423
472
.MaxUsedCpu = pool.MaxUsedCpu .load (std::memory_order_relaxed),
424
473
.MinUsedCpu = pool.MinUsedCpu .load (std::memory_order_relaxed),
425
474
.AvgUsedCpu = pool.AvgUsedCpu .load (std::memory_order_relaxed),
0 commit comments