@@ -264,6 +264,63 @@ namespace NActors {
264
264
return nullptr ;
265
265
}
266
266
267
+ TMailbox* TBasicExecutorPool::GetReadyActivationRingQueue (ui64 revolvingCounter) {
268
+ if (StopFlag.load (std::memory_order_acquire)) {
269
+ return nullptr ;
270
+ }
271
+
272
+ TWorkerId workerId = TlsThreadContext->WorkerId ();
273
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " " );
274
+ NHPTimer::STime hpnow = GetCycleCountFast ();
275
+ TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION, false > activityGuard (hpnow);
276
+
277
+ Y_DEBUG_ABORT_UNLESS (workerId < MaxFullThreadCount);
278
+
279
+ Threads[workerId].UnsetWork ();
280
+ if (Harmonizer) {
281
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " try to harmonize" );
282
+ LWPROBE (TryToHarmonize, PoolId, PoolName);
283
+ Harmonizer->Harmonize (hpnow);
284
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " harmonize done" );
285
+ }
286
+
287
+ do {
288
+ {
289
+ TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false > activityGuard;
290
+ if (const ui32 activation = std::visit ([&revolvingCounter](auto &x) {return x.Pop (++revolvingCounter);}, Activations)) {
291
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " activation found" );
292
+ Threads[workerId].SetWork ();
293
+ AtomicDecrement (Semaphore);
294
+ return MailboxTable->Get (activation);
295
+ }
296
+ }
297
+
298
+ TAtomic semaphoreRaw = AtomicGet (Semaphore);
299
+ TSemaphore semaphore = TSemaphore::GetSemaphore (semaphoreRaw);
300
+ if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0 ) {
301
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " semaphore.OldSemaphore == 0 or workerId >= 0 && semaphore.CurrentSleepThreadCount < 0" );
302
+ if (!TlsThreadContext->ExecutionContext .IsNeededToWaitNextActivation ) {
303
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " wctx.ExecutionContext.IsNeededToWaitNextActivation == false" );
304
+ return nullptr ;
305
+ }
306
+
307
+ bool needToWait = false ;
308
+ bool needToBlock = false ;
309
+ AskToGoToSleep (&needToWait, &needToBlock);
310
+ if (needToWait) {
311
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " go to sleep" );
312
+ if (Threads[workerId].Wait (SpinThresholdCycles, &StopFlag)) {
313
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " sleep interrupted" );
314
+ return nullptr ;
315
+ }
316
+ }
317
+ }
318
+ SpinLockPause ();
319
+ } while (!StopFlag.load (std::memory_order_acquire));
320
+
321
+ return nullptr ;
322
+ }
323
+
267
324
TMailbox* TBasicExecutorPool::GetReadyActivationLocalQueue (ui64 revolvingCounter) {
268
325
TWorkerId workerId = TlsThreadContext->WorkerId ();
269
326
Y_DEBUG_ABORT_UNLESS (workerId < static_cast <i32 >(MaxFullThreadCount));
@@ -278,13 +335,19 @@ namespace NActors {
278
335
TlsThreadContext->LocalQueueContext .LocalQueueSize = LocalQueueSize.load (std::memory_order_relaxed);
279
336
}
280
337
EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " local queue done; moving to common" );
338
+ if (TlsThreadContext->UseRingQueue ()) {
339
+ return GetReadyActivationRingQueue (revolvingCounter);
340
+ }
281
341
return GetReadyActivationCommon (revolvingCounter);
282
342
}
283
343
284
344
TMailbox* TBasicExecutorPool::GetReadyActivation (ui64 revolvingCounter) {
285
345
if (MaxLocalQueueSize) {
286
346
EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " local queue" );
287
347
return GetReadyActivationLocalQueue (revolvingCounter);
348
+ } else if (TlsThreadContext->UseRingQueue ()) {
349
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " ring queue" );
350
+ return GetReadyActivationRingQueue (revolvingCounter);
288
351
} else {
289
352
EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " " );
290
353
return GetReadyActivationCommon (revolvingCounter);
@@ -305,37 +368,48 @@ namespace NActors {
305
368
}
306
369
}
307
370
308
- void TBasicExecutorPool::ScheduleActivationExCommon (TMailbox* mailbox, ui64 revolvingCounter, TAtomic x) {
309
- TSemaphore semaphore = TSemaphore::GetSemaphore (x);
310
- EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " semaphore.OldSemaphore == " , semaphore.OldSemaphore , " semaphore.CurrentSleepThreadCount == " , semaphore.CurrentSleepThreadCount );
311
- std::visit ([mailbox, revolvingCounter](auto &x) {
312
- x.Push (mailbox->Hint , revolvingCounter);
371
+ void TBasicExecutorPool::ScheduleActivationExCommon (TMailbox* mailbox, ui64 revolvingCounter, std::optional<TAtomic> initSemaphore) {
372
+ std::visit ([mailbox, revolvingCounter](auto &queue) {
373
+ queue.Push (mailbox->Hint , revolvingCounter);
313
374
}, Activations);
314
375
bool needToWakeUp = false ;
315
376
bool needToChangeOldSemaphore = true ;
316
377
317
- if (SharedPool) {
378
+ TAtomic x;
379
+ TSemaphore semaphore;
380
+ if (!initSemaphore || SharedPool) {
318
381
x = AtomicIncrement (Semaphore);
319
382
needToChangeOldSemaphore = false ;
383
+ semaphore = TSemaphore::GetSemaphore (x);
384
+ } else {
385
+ x = *initSemaphore;
386
+ semaphore = TSemaphore::GetSemaphore (x);
387
+ }
388
+ EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " semaphore.OldSemaphore == " , semaphore.OldSemaphore , " semaphore.CurrentSleepThreadCount == " , semaphore.CurrentSleepThreadCount );
389
+ if (SharedPool) {
320
390
if (SharedPool->WakeUpLocalThreads (PoolId)) {
321
391
EXECUTOR_POOL_BASIC_DEBUG (EDebugLevel::Activation, " shared pool wake up local threads" );
322
392
return ;
323
393
}
324
- semaphore = TSemaphore::GetSemaphore (x);
325
394
}
326
395
327
396
i16 sleepThreads = 0 ;
328
397
Y_UNUSED (sleepThreads);
329
398
do {
330
399
needToWakeUp = semaphore.CurrentSleepThreadCount > 0 ;
331
400
i64 oldX = semaphore.ConvertToI64 ();
401
+ bool changed = false ;
332
402
if (needToChangeOldSemaphore) {
333
403
semaphore.OldSemaphore ++;
404
+ changed = true ;
334
405
}
335
406
if (needToWakeUp) {
336
407
sleepThreads = semaphore.CurrentSleepThreadCount --;
408
+ changed = true ;
409
+ }
410
+ if (changed) {
411
+ x = AtomicGetAndCas (&Semaphore, semaphore.ConvertToI64 (), oldX);
337
412
}
338
- x = AtomicGetAndCas (&Semaphore, semaphore.ConvertToI64 (), oldX);
339
413
if (x == oldX) {
340
414
break ;
341
415
}
@@ -383,14 +457,14 @@ namespace NActors {
383
457
return ;
384
458
}
385
459
}
386
- ScheduleActivationExCommon (mailbox, revolvingWriteCounter, AtomicGet (Semaphore) );
460
+ ScheduleActivationExCommon (mailbox, revolvingWriteCounter, std::nullopt );
387
461
}
388
462
389
463
void TBasicExecutorPool::ScheduleActivationEx (TMailbox* mailbox, ui64 revolvingCounter) {
390
464
if (MaxLocalQueueSize) {
391
465
ScheduleActivationExLocalQueue (mailbox, revolvingCounter);
392
466
} else {
393
- ScheduleActivationExCommon (mailbox, revolvingCounter, AtomicGet (Semaphore) );
467
+ ScheduleActivationExCommon (mailbox, revolvingCounter, std::nullopt );
394
468
}
395
469
}
396
470
0 commit comments