@@ -60,7 +60,6 @@ bool TKqpPlanner::UseMockEmptyPlanner = false;
60
60
// Task can allocate extra memory during execution.
61
61
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
62
62
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2 ;
63
- constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 8 ;
64
63
65
64
TKqpPlanner::TKqpPlanner (TKqpPlanner::TArgs&& args)
66
65
: TxId(args.TxId)
@@ -256,9 +255,18 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
256
255
257
256
auto localResources = ResourceManager_->GetLocalResources ();
258
257
Y_UNUSED (MEMORY_ESTIMATION_OVERFLOW);
258
+
259
+ auto placingOptions = ResourceManager_->GetPlacingOptions ();
260
+
261
+ bool singleNodeExecutionMakeSence = (
262
+ ResourceEstimations.size () <= placingOptions.MaxNonParallelTasksExecutionLimit ||
263
+ // all readers are located on the one node.
264
+ TasksPerNode.size () == 1
265
+ );
266
+
259
267
if (LocalRunMemoryEst * MEMORY_ESTIMATION_OVERFLOW <= localResources.Memory [NRm::EKqpMemoryPool::ScanQuery] &&
260
268
ResourceEstimations.size () <= localResources.ExecutionUnits &&
261
- ResourceEstimations. size () <= MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT )
269
+ singleNodeExecutionMakeSence )
262
270
{
263
271
ui64 selfNodeId = ExecuterId.NodeId ();
264
272
for (ui64 taskId: ComputeTasks) {
@@ -293,47 +301,100 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
293
301
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release ());
294
302
}
295
303
304
+ std::vector<ui64> deepestTasks;
305
+ ui64 maxLevel = 0 ;
306
+ for (auto & task: TasksGraph.GetTasks ()) {
307
+ // const auto& task = TasksGraph.GetTask(taskId);
308
+ const auto & stageInfo = TasksGraph.GetStageInfo (task.StageId );
309
+ const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta .GetStage (stageInfo.Id );
310
+ const ui64 stageLevel = stage.GetProgram ().GetSettings ().GetStageLevel ();
311
+
312
+ if (stageLevel > maxLevel) {
313
+ maxLevel = stageLevel;
314
+ deepestTasks.clear ();
315
+ }
316
+
317
+ if (stageLevel == maxLevel) {
318
+ deepestTasks.push_back (task.Id );
319
+ }
320
+ }
321
+
322
+ THashMap<ui64, ui64> alreadyAssigned;
323
+ for (auto & [nodeId, tasks] : TasksPerNode) {
324
+ for (ui64 taskId: tasks) {
325
+ alreadyAssigned.emplace (taskId, nodeId);
326
+ }
327
+ }
328
+
329
+ if (deepestTasks.size () <= placingOptions.MaxNonParallelTopStageExecutionLimit ) {
330
+ // looks like the merge / union all connection
331
+ for (ui64 taskId: deepestTasks) {
332
+ auto [it, success] = alreadyAssigned.emplace (taskId, ExecuterId.NodeId ());
333
+ if (success) {
334
+ TasksPerNode[ExecuterId.NodeId ()].push_back (taskId);
335
+ }
336
+ }
337
+ }
338
+
296
339
auto planner = (UseMockEmptyPlanner ? CreateKqpMockEmptyPlanner () : CreateKqpGreedyPlanner ()); // KqpMockEmptyPlanner is a mock planner for tests
297
340
298
341
auto ctx = TlsActivationContext->AsActorContext ();
299
342
if (ctx.LoggerSettings () && ctx.LoggerSettings ()->Satisfies (NActors::NLog::PRI_DEBUG, NKikimrServices::KQP_EXECUTER)) {
300
343
planner->SetLogFunc ([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D (msg); });
301
344
}
302
345
303
- THashMap<ui64, size_t > nodeIdtoIdx;
304
- for (size_t idx = 0 ; idx < ResourcesSnapshot.size (); ++idx) {
305
- nodeIdtoIdx[ResourcesSnapshot[idx].nodeid ()] = idx;
306
- }
307
-
308
346
LogMemoryStatistics ([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D (msg); });
309
347
310
- auto plan = planner->Plan (ResourcesSnapshot, ResourceEstimations);
348
+ ui64 selfNodeId = ExecuterId.NodeId ();
349
+ TString selfNodeDC;
311
350
312
- THashMap<ui64, ui64> alreadyAssigned;
313
- for (auto & [nodeId, tasks] : TasksPerNode) {
314
- for (ui64 taskId: tasks) {
315
- alreadyAssigned.emplace (taskId, nodeId);
351
+ TVector<const NKikimrKqp::TKqpNodeResources*> allNodes;
352
+ TVector<const NKikimrKqp::TKqpNodeResources*> executerDcNodes;
353
+ allNodes.reserve (ResourcesSnapshot.size ());
354
+
355
+ for (auto & snapNode: ResourcesSnapshot) {
356
+ const TString& dc = snapNode.GetKqpProxyNodeResources ().GetDataCenterId ();
357
+ if (snapNode.GetNodeId () == selfNodeId) {
358
+ selfNodeDC = dc;
359
+ break ;
316
360
}
317
361
}
318
362
319
- if (!plan.empty ()) {
320
- for (auto & group : plan) {
321
- for (ui64 taskId: group.TaskIds ) {
322
- auto [it, success] = alreadyAssigned.emplace (taskId, group.NodeId );
323
- if (success) {
324
- TasksPerNode[group.NodeId ].push_back (taskId);
325
- }
326
- }
363
+ for (auto & snapNode: ResourcesSnapshot) {
364
+ allNodes.push_back (&snapNode);
365
+ if (selfNodeDC == snapNode.GetKqpProxyNodeResources ().GetDataCenterId ()) {
366
+ executerDcNodes.push_back (&snapNode);
327
367
}
368
+ }
328
369
329
- return nullptr ;
330
- } else {
370
+ TVector<IKqpPlannerStrategy::TResult> plan;
371
+
372
+ if (!executerDcNodes.empty () && placingOptions.PreferLocalDatacenterExecution ) {
373
+ plan = planner->Plan (executerDcNodes, ResourceEstimations);
374
+ }
375
+
376
+ if (plan.empty ()) {
377
+ plan = planner->Plan (allNodes, ResourceEstimations);
378
+ }
379
+
380
+ if (plan.empty ()) {
331
381
LogMemoryStatistics ([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E (msg); });
332
382
333
383
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
334
384
TStringBuilder () << " Not enough resources to execute query. " << " TraceId: " << UserRequestContext->TraceId );
335
385
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release ());
336
386
}
387
+
388
+ for (auto & group : plan) {
389
+ for (ui64 taskId: group.TaskIds ) {
390
+ auto [it, success] = alreadyAssigned.emplace (taskId, group.NodeId );
391
+ if (success) {
392
+ TasksPerNode[group.NodeId ].push_back (taskId);
393
+ }
394
+ }
395
+ }
396
+
397
+ return nullptr ;
337
398
}
338
399
339
400
const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot () const {
0 commit comments