Skip to content

Commit 154697e

Browse files
authored
support launching tasks in the same dc with executer (#8457)
1 parent e2dfc63 commit 154697e

File tree

6 files changed

+55
-25
lines changed

6 files changed

+55
-25
lines changed

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -362,33 +362,58 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
362362
planner->SetLogFunc([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
363363
}
364364

365-
THashMap<ui64, size_t> nodeIdtoIdx;
366-
for (size_t idx = 0; idx < ResourcesSnapshot.size(); ++idx) {
367-
nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx;
368-
}
369-
370365
LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
371366

372-
auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);
367+
ui64 selfNodeId = ExecuterId.NodeId();
368+
TString selfNodeDC;
373369

374-
if (!plan.empty()) {
375-
for (auto& group : plan) {
376-
for(ui64 taskId: group.TaskIds) {
377-
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
378-
if (success) {
379-
TasksPerNode[group.NodeId].push_back(taskId);
380-
}
381-
}
370+
TVector<const NKikimrKqp::TKqpNodeResources*> allNodes;
371+
TVector<const NKikimrKqp::TKqpNodeResources*> executerDcNodes;
372+
allNodes.reserve(ResourcesSnapshot.size());
373+
374+
for(auto& snapNode: ResourcesSnapshot) {
375+
const TString& dc = snapNode.GetKqpProxyNodeResources().GetDataCenterId();
376+
if (snapNode.GetNodeId() == selfNodeId) {
377+
selfNodeDC = dc;
378+
break;
382379
}
380+
}
383381

384-
return nullptr;
385-
} else {
382+
for(auto& snapNode: ResourcesSnapshot) {
383+
allNodes.push_back(&snapNode);
384+
if (selfNodeDC == snapNode.GetKqpProxyNodeResources().GetDataCenterId()) {
385+
executerDcNodes.push_back(&snapNode);
386+
}
387+
}
388+
389+
TVector<IKqpPlannerStrategy::TResult> plan;
390+
391+
if (!executerDcNodes.empty() && placingOptions.PreferLocalDatacenterExecution) {
392+
plan = planner->Plan(executerDcNodes, ResourceEstimations);
393+
}
394+
395+
if (plan.empty()) {
396+
plan = planner->Plan(allNodes, ResourceEstimations);
397+
}
398+
399+
if (plan.empty()) {
386400
LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E(msg); });
387401

388402
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
389403
TStringBuilder() << "Not enough resources to execute query. " << "TraceId: " << UserRequestContext->TraceId);
390404
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
391405
}
406+
407+
for (auto& group : plan) {
408+
for(ui64 taskId: group.TaskIds) {
409+
auto [it, success] = alreadyAssigned.emplace(taskId, group.NodeId);
410+
if (success) {
411+
TasksPerNode[group.NodeId].push_back(taskId);
412+
}
413+
}
414+
}
415+
416+
return nullptr;
392417
}
393418

394419
const IKqpGateway::TKqpSnapshot& TKqpPlanner::GetSnapshot() const {

ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,16 @@ class TNodesManager {
9090
return result;
9191
}
9292

93-
TNodesManager(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources) {
93+
TNodesManager(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources) {
9494
for (auto& node : nodeResources) {
95-
if (!node.GetAvailableComputeActors()) {
95+
if (!node->GetAvailableComputeActors()) {
9696
continue;
9797
}
9898
Nodes.emplace_back(TNodeDesc{
99-
node.GetNodeId(),
100-
ActorIdFromProto(node.GetResourceManagerActorId()),
101-
node.GetTotalMemory() - node.GetUsedMemory(),
102-
node.GetAvailableComputeActors(),
99+
node->GetNodeId(),
100+
ActorIdFromProto(node->GetResourceManagerActorId()),
101+
node->GetTotalMemory() - node->GetUsedMemory(),
102+
node->GetAvailableComputeActors(),
103103
{}
104104
});
105105
}
@@ -111,7 +111,7 @@ class TKqpGreedyPlanner : public IKqpPlannerStrategy {
111111
public:
112112
~TKqpGreedyPlanner() override {}
113113

114-
TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
114+
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
115115
const TVector<TTaskResourceEstimation>& tasks) override
116116
{
117117
TVector<TResult> result;
@@ -161,7 +161,7 @@ class TKqpMockEmptyPlanner : public IKqpPlannerStrategy {
161161
public:
162162
~TKqpMockEmptyPlanner() override {}
163163

164-
TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>&,
164+
TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>&,
165165
const TVector<TTaskResourceEstimation>&) override
166166
{
167167
return {};

ydb/core/kqp/executer_actor/kqp_planner_strategy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class IKqpPlannerStrategy {
2323
TVector<ui64> TaskIds;
2424
};
2525

26-
virtual TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>& nodeResources,
26+
virtual TVector<TResult> Plan(const TVector<const NKikimrKqp::TKqpNodeResources*>& nodeResources,
2727
const TVector<TTaskResourceEstimation>& estimatedResources) = 0;
2828

2929
protected:

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ class TKqpResourceManager : public IKqpResourceManager {
188188
return TPlannerPlacingOptions{
189189
.MaxNonParallelTasksExecutionLimit = MaxNonParallelTasksExecutionLimit.load(),
190190
.MaxNonParallelTopStageExecutionLimit = MaxNonParallelTopStageExecutionLimit.load(),
191+
.PreferLocalDatacenterExecution = PreferLocalDatacenterExecution.load(),
191192
};
192193
}
193194

@@ -474,6 +475,7 @@ class TKqpResourceManager : public IKqpResourceManager {
474475
SpillingPercent.store(config.GetSpillingPercent());
475476
MaxNonParallelTopStageExecutionLimit.store(config.GetMaxNonParallelTopStageExecutionLimit());
476477
MaxNonParallelTasksExecutionLimit.store(config.GetMaxNonParallelTasksExecutionLimit());
478+
PreferLocalDatacenterExecution.store(config.GetPreferLocalDatacenterExecution());
477479
}
478480

479481
ui32 GetNodeId() override {
@@ -523,6 +525,7 @@ class TKqpResourceManager : public IKqpResourceManager {
523525
std::atomic<i64> ExternalDataQueryMemory = 0;
524526
std::atomic<ui64> MaxNonParallelTopStageExecutionLimit = 1;
525527
std::atomic<ui64> MaxNonParallelTasksExecutionLimit = 8;
528+
std::atomic<bool> PreferLocalDatacenterExecution = true;
526529

527530
// current state
528531
std::atomic<ui64> LastResourceBrokerTaskId = 0;

ydb/core/kqp/rm_service/kqp_rm_service.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ struct TKqpLocalNodeResources {
239239
struct TPlannerPlacingOptions {
240240
ui64 MaxNonParallelTasksExecutionLimit = 8;
241241
ui64 MaxNonParallelTopStageExecutionLimit = 1;
242+
bool PreferLocalDatacenterExecution = true;
242243
};
243244

244245
/// per node singleton with instant API

ydb/core/protos/table_service_config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ message TTableServiceConfig {
4949

5050
optional uint64 MaxNonParallelTasksExecutionLimit = 25 [default = 8];
5151
optional uint64 MaxNonParallelTopStageExecutionLimit = 26 [default = 1];
52+
optional bool PreferLocalDatacenterExecution = 27 [ default = true ];
5253
}
5354

5455
message TSpillingServiceConfig {

0 commit comments

Comments
 (0)