@@ -49,6 +49,48 @@ namespace NActors {
49
49
});
50
50
}
51
51
52
+ namespace {
53
+ bool CheckPoolAdjacency (const TPoolManager& poolManager, i16 poolId, i16 adjacentPoolId) {
54
+ if (poolId == adjacentPoolId) {
55
+ return true ;
56
+ }
57
+ Y_ABORT_UNLESS ((ui32)poolId < poolManager.PoolInfos .size ());
58
+ const auto & poolInfo = poolManager.PoolInfos [poolId];
59
+ return std::find (poolInfo.AdjacentPools .begin (), poolInfo.AdjacentPools .end (), adjacentPoolId) != poolInfo.AdjacentPools .end ();
60
+ }
61
+
62
+ bool HasAdjacentPools (const TPoolManager& poolManager, i16 poolId) {
63
+ Y_ABORT_UNLESS ((ui32)poolId < poolManager.PoolInfos .size ());
64
+ const auto & poolInfo = poolManager.PoolInfos [poolId];
65
+ return !poolInfo.AdjacentPools .empty ();
66
+ }
67
+
68
+ i16 NextAdjacentPool (const TPoolManager& poolManager, i16 poolId, i16 currentPoolId) {
69
+ if (poolId == currentPoolId) {
70
+ if (poolManager.PoolInfos [poolId].AdjacentPools .empty ()) {
71
+ return poolId;
72
+ }
73
+ return poolManager.PoolInfos [poolId].AdjacentPools [0 ];
74
+ }
75
+ Y_ABORT_UNLESS ((ui32)poolId < poolManager.PoolInfos .size ());
76
+ const auto & poolInfo = poolManager.PoolInfos [poolId];
77
+ auto it = std::find (poolInfo.AdjacentPools .begin (), poolInfo.AdjacentPools .end (), currentPoolId);
78
+ if (it == poolInfo.AdjacentPools .end () || it + 1 == poolInfo.AdjacentPools .end ()) {
79
+ return poolId;
80
+ }
81
+ return *(it + 1 );
82
+ }
83
+
84
+ std::optional<i16 > GetForcedForeignSlots (const TPoolManager& poolManager, i16 poolId) {
85
+ const auto & poolInfo = poolManager.PoolInfos [poolId];
86
+ if (poolInfo.ForcedForeignSlots ) {
87
+ return poolInfo.ForcedForeignSlots ;
88
+ }
89
+ return std::nullopt;
90
+ }
91
+
92
+ }
93
+
52
94
LWTRACE_USING (ACTORLIB_PROVIDER);
53
95
54
96
TSharedExecutorPool::TSharedExecutorPool (
@@ -80,8 +122,8 @@ namespace NActors {
80
122
}
81
123
}
82
124
for (ui64 i = 0 ; i < PoolManager.PoolInfos .size (); ++i) {
83
- ForeignThreadsAllowedByPool[i].store (0 , std::memory_order_release);
84
- ForeignThreadSlots[i].store (0 , std::memory_order_release);
125
+ ForeignThreadsAllowedByPool[i].store (PoolManager. PoolInfos [i]. ForeignSlots , std::memory_order_release);
126
+ ForeignThreadSlots[i].store (PoolManager. PoolInfos [i]. ForeignSlots , std::memory_order_release);
85
127
LocalThreads[i].store (PoolManager.PoolInfos [i].SharedThreadCount , std::memory_order_release);
86
128
LocalNotifications[i].store (0 , std::memory_order_release);
87
129
}
@@ -118,7 +160,7 @@ namespace NActors {
118
160
continue ;
119
161
}
120
162
121
- if (thread.OwnerPoolId == i ) {
163
+ if (CheckPoolAdjacency (PoolManager, thread.OwnerPoolId , i) ) {
122
164
EXECUTOR_POOL_SHARED_DEBUG (EDebugLevel::Executor, " ownerPoolId == poolId; ownerPoolId == " , thread.OwnerPoolId , " poolId == " , i);
123
165
return i;
124
166
}
@@ -162,8 +204,10 @@ namespace NActors {
162
204
auto &thread = Threads[workerId];
163
205
thread.UnsetWork ();
164
206
TMailbox *mailbox = nullptr ;
207
+ bool hasAdjacentPools = HasAdjacentPools (PoolManager, thread.OwnerPoolId );
165
208
while (!StopFlag.load (std::memory_order_acquire)) {
166
- if (hpnow < thread.SoftDeadlineForPool || thread.CurrentPoolId == thread.OwnerPoolId ) {
209
+ bool adjacentPool = CheckPoolAdjacency (PoolManager, thread.OwnerPoolId , thread.CurrentPoolId );
210
+ if (hpnow < thread.SoftDeadlineForPool || !hasAdjacentPools && adjacentPool) {
167
211
EXECUTOR_POOL_SHARED_DEBUG (EDebugLevel::Activation, " continue same pool; ownerPoolId == " , thread.OwnerPoolId , " currentPoolId == " , thread.CurrentPoolId );
168
212
if (thread.SoftDeadlineForPool == Max<NHPTimer::STime>()) {
169
213
thread.SoftDeadlineForPool = GetCycleCountFast () + thread.SoftProcessingDurationTs ;
@@ -181,6 +225,7 @@ namespace NActors {
181
225
EXECUTOR_POOL_SHARED_DEBUG (EDebugLevel::Executor, " no mailbox and need to find new pool; ownerPoolId == " , thread.OwnerPoolId , " currentPoolId == " , thread.CurrentPoolId , " processedActivationsByCurrentPool == " , TlsThreadContext->ProcessedActivationsByCurrentPool );
182
226
TlsThreadContext->ProcessedActivationsByCurrentPool = 0 ;
183
227
if (thread.CurrentPoolId != thread.OwnerPoolId ) {
228
+ thread.AdjacentPoolId = NextAdjacentPool (PoolManager, thread.OwnerPoolId , thread.AdjacentPoolId );
184
229
SwitchToPool (thread.OwnerPoolId , hpnow);
185
230
continue ;
186
231
}
@@ -190,10 +235,11 @@ namespace NActors {
190
235
EXECUTOR_POOL_SHARED_DEBUG (EDebugLevel::Activation, " no mailbox and no need to wait; ownerPoolId == " , thread.OwnerPoolId , " currentPoolId == " , thread.CurrentPoolId );
191
236
return nullptr ;
192
237
} else {
193
- EXECUTOR_POOL_SHARED_DEBUG (EDebugLevel::Executor, " comeback to owner pool; ownerPoolId == " , thread.OwnerPoolId , " currentPoolId == " , thread.CurrentPoolId , " processedActivationsByCurrentPool == " , TlsThreadContext->ProcessedActivationsByCurrentPool , " hpnow == " , hpnow, " softDeadlineForPool == " , thread.SoftDeadlineForPool );
238
+ EXECUTOR_POOL_SHARED_DEBUG (EDebugLevel::Executor, " change adjacent pool; ownerPoolId == " , thread.OwnerPoolId , " currentPoolId == " , thread.CurrentPoolId , " processedActivationsByCurrentPool == " , TlsThreadContext->ProcessedActivationsByCurrentPool , " hpnow == " , hpnow, " softDeadlineForPool == " , thread.SoftDeadlineForPool );
194
239
TlsThreadContext->ProcessedActivationsByCurrentPool = 0 ;
195
- SwitchToPool (thread.OwnerPoolId , hpnow);
196
- // after soft deadline we check owner pool again
240
+ thread.AdjacentPoolId = NextAdjacentPool (PoolManager, thread.OwnerPoolId , thread.AdjacentPoolId );
241
+ SwitchToPool (thread.AdjacentPoolId , hpnow);
242
+ // after soft deadline we check adjacent pool
197
243
continue ;
198
244
}
199
245
bool goToSleep = true ;
@@ -308,7 +354,7 @@ namespace NActors {
308
354
Y_ABORT_UNLESS (Threads[i].OwnerPoolId < static_cast <i16 >(Pools.size ()), " OwnerPoolId is out of range i %" PRIu16 " OwnerPoolId == %" PRIu16, i, Threads[i].OwnerPoolId );
309
355
Y_ABORT_UNLESS (Threads[i].OwnerPoolId >= 0 , " OwnerPoolId is out of range i %" PRIu16 " OwnerPoolId == %" PRIu16, i, Threads[i].OwnerPoolId );
310
356
EXECUTOR_POOL_SHARED_DEBUG (EDebugLevel::ExecutorPool, " create thread " , i, " OwnerPoolId == " , Threads[i].OwnerPoolId );
311
- Threads[i].Thread .reset (
357
+ Threads[i].Thread .reset (
312
358
new TExecutorThread (
313
359
i,
314
360
actorSystem,
@@ -482,8 +528,6 @@ namespace NActors {
482
528
return false ;
483
529
}
484
530
485
-
486
-
487
531
void TSharedExecutorPool::FillForeignThreadsAllowed (std::vector<i16 >& foreignThreadsAllowed) const {
488
532
foreignThreadsAllowed.resize (PoolManager.PoolInfos .size ());
489
533
for (ui64 i = 0 ; i < foreignThreadsAllowed.size (); ++i) {
@@ -513,6 +557,9 @@ namespace NActors {
513
557
}
514
558
515
559
void TSharedExecutorPool::SetForeignThreadSlots (i16 poolId, i16 slots) {
560
+ if (auto forcedSlots = GetForcedForeignSlots (PoolManager, poolId)) {
561
+ return ;
562
+ }
516
563
i16 current = ForeignThreadsAllowedByPool[poolId].load (std::memory_order_acquire);
517
564
if (current == slots) {
518
565
return ;
0 commit comments