@@ -81,12 +81,13 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
81
81
if (config.HasIteratorReadQuotaSettings ()) {
82
82
SetIteratorReadsQuotaSettings (config.GetIteratorReadQuotaSettings ());
83
83
}
84
- if (config.HasPoolsConfiguration ()) {
85
- SetPriorities (config.GetPoolsConfiguration ());
86
- }
87
- Scheduler.ReportCounters (counters);
88
- AdvanceTimeInterval = TDuration::MicroSeconds (config.GetComputeSchedulerSettings ().GetAdvanceTimeIntervalUsec ());
89
- Scheduler.SetForgetInterval (TDuration::MicroSeconds (config.GetComputeSchedulerSettings ().GetForgetOverflowTimeoutUsec ()));
84
+
85
+ SchedulerOptions = {
86
+ .AdvanceTimeInterval = TDuration::MicroSeconds (config.GetComputeSchedulerSettings ().GetAdvanceTimeIntervalUsec ()),
87
+ .ForgetOverflowTimeout = TDuration::MicroSeconds (config.GetComputeSchedulerSettings ().GetForgetOverflowTimeoutUsec ()),
88
+ .ActivePoolPollingTimeout = TDuration::Seconds (config.GetComputeSchedulerSettings ().GetActivePoolPollingSec ()),
89
+ .Counters = counters,
90
+ };
90
91
}
91
92
92
93
void Bootstrap () {
@@ -105,15 +106,13 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
105
106
TlsActivationContext->ExecutorThread .ActorSystem , SelfId ());
106
107
}
107
108
108
- Schedule (TDuration::Seconds (1 ), new TEvents::TEvWakeup (WakeCleaunupTag));
109
- Schedule (AdvanceTimeInterval, new TEvents::TEvWakeup (WakeAdvanceTimeTag));
109
+ Schedule (TDuration::Seconds (1 ), new TEvents::TEvWakeup ());
110
110
Become (&TKqpNodeService::WorkState);
111
- }
112
111
113
- enum {
114
- WakeCleaunupTag,
115
- WakeAdvanceTimeTag
116
- };
112
+ Scheduler = std::make_shared<TComputeScheduler>();
113
+ SchedulerOptions. Scheduler = Scheduler;
114
+ SchedulerActorId = RegisterWithSameMailbox ( CreateSchedulerActor (SchedulerOptions));
115
+ }
117
116
118
117
private:
119
118
STATEFN (WorkState) {
@@ -128,8 +127,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
128
127
hFunc (TEvents::TEvUndelivered, HandleWork);
129
128
hFunc (TEvents::TEvPoison, HandleWork);
130
129
hFunc (NMon::TEvHttpInfo, HandleWork);
131
- // sheduling
132
- hFunc (TEvSchedulerDeregister, HandleWork);
133
130
default : {
134
131
Y_ABORT (" Unexpected event 0x%x for TKqpResourceManagerService" , ev->GetTypeRewrite ());
135
132
}
@@ -138,12 +135,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
138
135
139
136
static constexpr double SecToUsec = 1e6 ;
140
137
141
- void HandleWork (TEvSchedulerDeregister::TPtr& ev) {
142
- if (ev->Get ()->SchedulerEntity ) {
143
- Scheduler.Deregister (*ev->Get ()->SchedulerEntity , TMonotonic::Now ());
144
- }
145
- }
146
-
147
138
void HandleWork (TEvKqpNode::TEvStartKqpTasksRequest::TPtr& ev) {
148
139
NWilson::TSpan sendTasksSpan (TWilsonKqp::KqpNodeSendTasks, NWilson::TTraceId (ev->TraceId ), " KqpNode.SendTasks" , NWilson::EFlags::AUTO_END);
149
140
@@ -198,7 +189,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
198
189
const TString& serializedGUCSettings = ev->Get ()->Record .HasSerializedGUCSettings () ?
199
190
ev->Get ()->Record .GetSerializedGUCSettings () : " " ;
200
191
201
- auto schedulerNow = TMonotonic::Now ();
192
+ auto schedulerNow = TlsActivationContext-> Monotonic ();
202
193
203
194
// start compute actors
204
195
TMaybe<NYql::NDqProto::TRlPath> rlPath = Nothing ();
@@ -215,20 +206,28 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
215
206
for (auto & dqTask: *msg.MutableTasks ()) {
216
207
TString group = msg.GetSchedulerGroup ();
217
208
218
- TComputeActorSchedulingOptions schedulingOptions {
209
+ TComputeActorSchedulingOptions schedulingTaskOptions {
219
210
.Now = schedulerNow,
220
- .NodeService = SelfId () ,
221
- .Scheduler = & Scheduler,
211
+ .SchedulerActorId = SchedulerActorId ,
212
+ .Scheduler = Scheduler. get () ,
222
213
.Group = group,
223
214
.Weight = 1 ,
224
215
.NoThrottle = false ,
225
216
.Counters = Counters
226
217
};
227
218
228
- if (Scheduler.Disabled (schedulingOptions.Group )) {
229
- schedulingOptions.NoThrottle = true ;
230
- } else {
231
- schedulingOptions.Handle = Scheduler.Enroll (schedulingOptions.Group , schedulingOptions.Weight , schedulingOptions.Now );
219
+ if (SchedulerOptions.Scheduler ->Disabled (group)) {
220
+ auto share = msg.GetMaxCpuShare ();
221
+ if (share > 0 ) {
222
+ Scheduler->UpdateMaxShare (group, share, schedulerNow);
223
+ Send (SchedulerActorId, new TEvSchedulerNewPool (msg.GetDatabase (), group, share));
224
+ } else {
225
+ schedulingTaskOptions.NoThrottle = true ;
226
+ }
227
+ }
228
+
229
+ if (!schedulingTaskOptions.NoThrottle ) {
230
+ schedulingTaskOptions.Handle = SchedulerOptions.Scheduler ->Enroll (schedulingTaskOptions.Group , schedulingTaskOptions.Weight , schedulingTaskOptions.Now );
232
231
}
233
232
234
233
auto result = CaFactory_->CreateKqpComputeActor ({
@@ -250,7 +249,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
250
249
.RlPath = rlPath,
251
250
.ComputesByStages = &computesByStage,
252
251
.State = State_,
253
- .SchedulingOptions = std::move (schedulingOptions ),
252
+ .SchedulingOptions = std::move (schedulingTaskOptions ),
254
253
});
255
254
256
255
if (const auto * rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&result)) {
@@ -343,17 +342,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
343
342
}
344
343
345
344
void HandleWork (TEvents::TEvWakeup::TPtr& ev) {
346
- if (ev->Get ()->Tag == WakeAdvanceTimeTag) {
347
- Scheduler.AdvanceTime (TMonotonic::Now ());
348
- Schedule (AdvanceTimeInterval, new TEvents::TEvWakeup (WakeAdvanceTimeTag));
349
- }
350
- if (ev->Get ()->Tag == WakeCleaunupTag) {
351
- Schedule (TDuration::Seconds (1 ), ev->Release ().Release ());
352
- for (auto & bucket : State_->Buckets ) {
353
- auto expiredRequests = bucket.ClearExpiredRequests ();
354
- for (auto & cxt : expiredRequests) {
355
- TerminateTx (cxt.TxId , " reached execution deadline" , NYql::NDqProto::StatusIds::TIMEOUT);
356
- }
345
+ Schedule (TDuration::Seconds (1 ), ev->Release ().Release ());
346
+ for (auto & bucket : State_->Buckets ) {
347
+ auto expiredRequests = bucket.ClearExpiredRequests ();
348
+ for (auto & cxt : expiredRequests) {
349
+ TerminateTx (cxt.TxId , " reached execution deadline" , NYql::NDqProto::StatusIds::TIMEOUT);
357
350
}
358
351
}
359
352
}
@@ -395,13 +388,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
395
388
SetIteratorReadsQuotaSettings (event.GetConfig ().GetTableServiceConfig ().GetIteratorReadQuotaSettings ());
396
389
}
397
390
398
- if (event.GetConfig ().GetTableServiceConfig ().HasPoolsConfiguration ()) {
399
- SetPriorities (event.GetConfig ().GetTableServiceConfig ().GetPoolsConfiguration ());
400
- }
401
-
402
- AdvanceTimeInterval = TDuration::MicroSeconds (event.GetConfig ().GetTableServiceConfig ().GetComputeSchedulerSettings ().GetAdvanceTimeIntervalUsec ());
403
- Scheduler.SetForgetInterval (TDuration::MicroSeconds (event.GetConfig ().GetTableServiceConfig ().GetComputeSchedulerSettings ().GetForgetOverflowTimeoutUsec ()));
404
-
405
391
auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
406
392
Send (ev->Sender , responseEv.Release (), IEventHandle::FlagTrackDelivery, ev->Cookie );
407
393
}
@@ -410,35 +396,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
410
396
SetDefaultIteratorQuotaSettings (settings.GetMaxRows (), settings.GetMaxBytes ());
411
397
}
412
398
413
- void SetPriorities (const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) {
414
- std::function<TComputeScheduler::TDistributionRule (const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration&)> convert
415
- = [&](const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf)
416
- {
417
- if (conf.HasName ()) {
418
- return TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare (), .Name = conf.GetName ()};
419
- } else if (conf.HasSubPoolsConfiguration ()) {
420
- auto res = TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare ()};
421
- for (auto & subConf : conf.GetSubPoolsConfiguration ().GetSubPools ()) {
422
- res.SubRules .push_back (convert (subConf));
423
- }
424
- return res;
425
- } else {
426
- Y_ENSURE (false , " unknown case" );
427
- }
428
- };
429
- SetPriorities (convert (conf));
430
- }
431
-
432
- void SetPriorities (TComputeScheduler::TDistributionRule rule) {
433
- NActors::TExecutorPoolStats poolStats;
434
- TVector<NActors::TExecutorThreadStats> threadsStats;
435
- TlsActivationContext->ActorSystem ()->GetPoolStats (SelfId ().PoolID (), poolStats, threadsStats);
436
- Y_ENSURE (poolStats.MaxThreadCount > 0 );
437
- Counters->SchedulerCapacity ->Set (poolStats.MaxThreadCount );
438
-
439
- Scheduler.SetPriorities (rule, poolStats.MaxThreadCount , TMonotonic::Now ());
440
- }
441
-
442
399
void SetIteratorReadsRetrySettings (const NKikimrConfig::TTableServiceConfig::TIteratorReadsRetrySettings& settings) {
443
400
auto ptr = MakeIntrusive<NKikimr::NKqp::TIteratorReadBackoffSettings>();
444
401
ptr->StartRetryDelay = TDuration::MilliSeconds (settings.GetStartDelayMs ());
@@ -525,8 +482,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
525
482
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
526
483
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
527
484
528
- TComputeScheduler Scheduler;
529
- TDuration AdvanceTimeInterval;
485
+ std::shared_ptr<TComputeScheduler> Scheduler;
486
+ TSchedulerActorOptions SchedulerOptions;
487
+ TActorId SchedulerActorId;
530
488
531
489
// state sharded by TxId
532
490
std::shared_ptr<TNodeServiceState> State_;
0 commit comments