@@ -126,6 +126,8 @@ class TSchedulerEntity {
126
126
std::atomic<i64 > DelayedSumBatches = 0 ;
127
127
std::atomic<i64 > DelayedCount = 0 ;
128
128
129
+ double Share;
130
+
129
131
TMultithreadPublisher<TGroupMutableStats> MutableStats;
130
132
};
131
133
@@ -215,13 +217,16 @@ struct TComputeScheduler::TImpl {
215
217
216
218
TDuration MaxDelay = TDuration::Seconds(10 );
217
219
218
- void AssignWeights () { }
220
+ void AssignWeights () {
221
+ for (auto & record : Records) {
222
+ record->MutableStats .Next ()->Weight = SumCores * record->Share ;
223
+ }
224
+ }
219
225
220
- void CreateGroup (TString groupName, double maxShare, NMonotonic::TMonotonic now ) {
226
+ void CreateGroup (TString groupName, double maxShare) {
221
227
PoolId[groupName] = Records.size ();
222
228
auto group = std::make_unique<TSchedulerEntity::TGroupRecord>();
223
- group->MutableStats .Next ()->LastNowRecalc = now;
224
- group->MutableStats .Next ()->Weight = maxShare;
229
+ group->Share = maxShare;
225
230
Records.push_back (std::move (group));
226
231
}
227
232
};
@@ -369,14 +374,18 @@ bool TComputeScheduler::Disable(TString group, TMonotonic now) {
369
374
void TComputeScheduler::UpdateMaxShare (TString group, double share, TMonotonic now) {
370
375
auto ptr = Impl->PoolId .FindPtr (group);
371
376
if (!ptr) {
372
- Impl->CreateGroup (group, share, now );
377
+ Impl->CreateGroup (group, share);
373
378
} else {
374
379
auto & record = Impl->Records [*ptr];
375
- record->MutableStats . Next ()-> Weight = share;
380
+ record->Share = share;
376
381
}
382
+ Impl->AssignWeights ();
377
383
AdvanceTime (now);
378
384
}
379
385
386
+ void TComputeScheduler::SetCapacity (ui64 cores) {
387
+ Impl->SumCores = cores;
388
+ }
380
389
381
390
::NMonitoring::TDynamicCounters::TCounterPtr TComputeScheduler::GetGroupUsageCounter (TString group) const {
382
391
return Impl->Counters
@@ -418,6 +427,16 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
418
427
IEventHandle::FlagTrackDelivery);
419
428
420
429
Become (&TSchedulerActor::State);
430
+ SetCapacity (SelfId ().PoolID ());
431
+ }
432
+
433
+ void SetCapacity (ui32 pool) {
434
+ NActors::TExecutorPoolStats poolStats;
435
+ TVector<NActors::TExecutorThreadStats> threadsStats;
436
+ TlsActivationContext->ActorSystem ()->GetPoolStats (pool, poolStats, threadsStats);
437
+ ui64 threads = Max<ui64>(poolStats.MaxThreadCount , 1 );
438
+ Opts.Counters ->SchedulerCapacity ->Set (threads);
439
+ Opts.Scheduler ->SetCapacity (threads);
421
440
}
422
441
423
442
STATEFN (State) {
@@ -467,6 +486,7 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
467
486
}
468
487
469
488
void Handle (TEvents::TEvWakeup::TPtr&) {
489
+ SetCapacity (SelfId ().PoolID ());
470
490
Opts.Scheduler ->AdvanceTime (TlsActivationContext->Monotonic ());
471
491
Schedule (Opts.AdvanceTimeInterval , new TEvents::TEvWakeup ());
472
492
}
0 commit comments