Skip to content

add new FQ scheduler #19637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 146 additions & 2 deletions ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <ydb/library/actors/core/log.h>
#include <util/system/hostname.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/kqp/query_data/kqp_predictor.h>

#include <library/cpp/scheme/scheme.h>

Expand Down Expand Up @@ -107,6 +108,8 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
auto schedulerType = schedulerSettings["type"].GetString();
if (schedulerType == "single_node") {
ScheduleOnSingleNode(request, response);
} else if (schedulerType == "smart") {
ScheduleSmart(request, response);
} else {
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
Expand Down Expand Up @@ -142,7 +145,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
if (totalMemoryLimit == 0) {
totalMemoryLimit = MkqlInitialMemoryLimit;
}
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, 0, DataCenter};
bool selfPlacement = true;
if (!Peers.empty()) {
auto firstPeer = NextPeer;
Expand Down Expand Up @@ -234,6 +237,145 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
}
}

void ScheduleSmart(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder<NDqs::TEvAllocateWorkersResponse>& response) {
const auto count = request.GetCount();
auto resourceId = request.GetResourceId();
if (!resourceId) {
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
}

TVector<const NDqProto::TDqTask*> tasks;
tasks.reserve(count);
for (ui32 i = 0; i < count; ++i) {
tasks.push_back(&request.GetTask(i));
}

// adjacency list
auto findIdx = [&](ui64 taskId) -> int {
for (int i = 0; i < (int)count; ++i) {
if (tasks[i]->GetId() == taskId) return i;
}
return -1;
};
TVector<TVector<std::pair<int,double>>> adj(count);
for (ui32 i = 0; i < count; ++i) {
auto* src = tasks[i];
double outPred = src->GetProgram().GetSettings().GetOutputDataPrediction();
for (const auto& inp : src->GetOutputs()) {
for (const auto& ch : inp.GetChannels()) {
int dst = findIdx(ch.GetDstTaskId());
if (dst >= 0) {
adj[i].emplace_back(dst, outPred);
adj[dst].emplace_back(i, outPred);
}
}
}
}

// clusterizing
TVector<int> comp(count, -1);
int compCnt = 0;
for (ui32 i = 0; i < count; ++i) {
if (comp[i] >= 0) continue;
// BFS
TVector<ui32> q{ i };
comp[i] = compCnt;
for (size_t qi = 0; qi < q.size(); ++qi) {
int u = q[qi];
for (auto& [v, _] : adj[u]) {
if (comp[v] < 0) {
comp[v] = compCnt;
q.push_back(v);
}
}
}
++compCnt;
}

// grouping
struct TCluster { TVector<int> Idxs; ui64 MemSum = 0; double CpuScore = 0.0; };
TVector<TCluster> clusters(compCnt);
for (ui32 i = 0; i < count; ++i) {
clusters[comp[i]].Idxs.push_back(i);
}

for (auto& cl : clusters) {
for (int idx : cl.Idxs) {
const auto* t = tasks[idx];
ui64 mem = t->GetInitialTaskMemoryLimit()
? t->GetInitialTaskMemoryLimit()
: MkqlInitialMemoryLimit;
cl.MemSum += mem;

const auto& s = t->GetProgram().GetSettings();
cl.CpuScore
+= s.GetHasSort() ? 2.0 : 0.0
+ s.GetHasMapJoin() ? 2.0 : 0.0
+ s.GetHasUdf() ? 1.5 : 0.0
+ s.GetHasStateAggregation() ? 1.5 : 0.0;
}
}

// biggest one first
std::sort(clusters.begin(), clusters.end(),
[&](auto& a, auto& b){ return a.Idxs.size() > b.Idxs.size(); });

bool placementFailure = false;
double cpuThreshold = 1000.0;

TVector<TPeer> assignment(count);

for (auto& cl : clusters) {
TPeer chosenNode;
bool placed = false;

for (bool requireCpu : {true, false}) {
ui32 startPeer = NextPeer;
ui32 peerIdx = startPeer;
do {
auto& peer = Peers[peerIdx];
if ((!UseDataCenter || DataCenter.empty() || peer.DataCenter.empty() || DataCenter == peer.DataCenter)
&& (peer.MemoryLimit == 0 || peer.MemoryLimit >= peer.MemoryAllocated + cl.MemSum)
&& (!requireCpu || peer.CpuUsage + cl.CpuScore < cpuThreshold))
{
peer.MemoryAllocated += cl.MemSum;
peer.CpuUsage += cl.CpuScore;
chosenNode = peer;
placed = true;
break;
}
if (++peerIdx >= Peers.size()) peerIdx = 0;
} while (peerIdx != startPeer);

if (placed) break;
}

if (!placed) {
placementFailure = true;
auto& error = *response->Record.MutableError();
error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED);
error.SetMessage("Не удалось разместить кластер из-за нехватки ресурсов");
break;
}

for (int idx : cl.Idxs) {
assignment[idx] = chosenNode;
}
}

if (!placementFailure) {
response->Record.ClearError();
auto& group = *response->Record.MutableNodes();
group.SetResourceId(resourceId);
for (ui32 i = 0; i < count; ++i) {
const auto& node = assignment[i];
auto* worker = group.AddWorker();
*worker->MutableGuid() = node.InstanceId;
worker->SetNodeId(node.NodeId);
}
}
}

void Handle(NDqs::TEvFreeWorkersNotify::TPtr&) {
ServiceCounters.Counters->GetCounter("EvFreeWorkersNotify", true)->Inc();
}
Expand Down Expand Up @@ -305,6 +447,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
node.set_active_workers(AtomicGet(WorkerManagerCounters.ActiveWorkers->GetAtomic()));
node.set_memory_limit(AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic()));
node.set_memory_allocated(AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic()));
node.set_cpu_usage(AtomicGet(WorkerManagerCounters.ElapsedMicrosec->GetAtomic()));
node.set_interconnect_port(IcPort);
node.set_data_center(DataCenter);
Send(InternalServiceId, new NFq::TEvInternalService::TEvHealthCheckRequest(request));
Expand Down Expand Up @@ -336,7 +479,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
nodeIds.insert(node.node_id());

Peers.push_back({node.node_id(), node.instance_id() + "," + node.hostname(),
node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()});
node.active_workers(), node.memory_limit(), node.memory_allocated(), node.cpu_usage(), node.data_center()});

if (node.interconnect_port()) {
nodesInfo->emplace_back(TEvInterconnect::TNodeInfo{
Expand Down Expand Up @@ -387,6 +530,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
ui64 ActiveWorkers;
ui64 MemoryLimit;
ui64 MemoryAllocated;
ui64 CpuUsage;
TString DataCenter;
};
TVector<TPeer> Peers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
const ui64 activeWorkers = node.active_workers();
const ui64 memoryLimit = node.memory_limit();
const ui64 memoryAllocated = node.memory_allocated();
const ui64 cpuUsage = node.cpu_usage();
const ui32 icPort = node.interconnect_port();
const TString dataCenter = node.data_center();
const TString nodeAddress = node.node_address();
Expand All @@ -54,6 +55,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
node->set_active_workers(activeWorkers);
node->set_memory_limit(memoryLimit);
node->set_memory_allocated(memoryAllocated);
node->set_cpu_usage(cpuUsage);
node->set_node_address(nodeAddress);
node->set_data_center(dataCenter);
}
Expand All @@ -63,8 +65,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
readQueryBuilder.AddString("tenant", tenant);
readQueryBuilder.AddText(
"SELECT `" NODE_ID_COLUMN_NAME "`, `" INSTANCE_ID_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`, `" MEMORY_LIMIT_COLUMN_NAME"`, "
"`" MEMORY_ALLOCATED_COLUMN_NAME"`, `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME"`, `" DATA_CENTER_COLUMN_NAME "` FROM `" NODES_TABLE_NAME "`\n"
"WHERE `" TENANT_COLUMN_NAME"` = $tenant AND `" EXPIRE_AT_COLUMN_NAME "` >= $now;\n"
"`" MEMORY_ALLOCATED_COLUMN_NAME"`, `" CPU_USAGE_COLUMN_NAME "` `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME"`, `" DATA_CENTER_COLUMN_NAME "`\n"
"FROM `" NODES_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME"` = $tenant AND `" EXPIRE_AT_COLUMN_NAME "` >= $now;\n"
);

auto prepareParams = [=, tablePathPrefix=YdbConnection->TablePathPrefix](const std::vector<TResultSet>& resultSets) {
Expand All @@ -80,6 +82,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
node->set_active_workers(*parser.ColumnParser(ACTIVE_WORKERS_COLUMN_NAME).GetOptionalUint64());
node->set_memory_limit(*parser.ColumnParser(MEMORY_LIMIT_COLUMN_NAME).GetOptionalUint64());
node->set_memory_allocated(*parser.ColumnParser(MEMORY_ALLOCATED_COLUMN_NAME).GetOptionalUint64());
node->set_cpu_usage(*parser.ColumnParser(CPU_USAGE_COLUMN_NAME).GetOptionalUint64());
node->set_interconnect_port(parser.ColumnParser(INTERCONNECT_PORT_COLUMN_NAME).GetOptionalUint32().value_or(0));
node->set_node_address(*parser.ColumnParser(NODE_ADDRESS_COLUMN_NAME).GetOptionalString());
node->set_data_center(*parser.ColumnParser(DATA_CENTER_COLUMN_NAME).GetOptionalString());
Expand All @@ -96,17 +99,18 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth
writeQueryBuilder.AddUint64("active_workers", activeWorkers);
writeQueryBuilder.AddUint64("memory_limit", memoryLimit);
writeQueryBuilder.AddUint64("memory_allocated", memoryAllocated);
writeQueryBuilder.AddUint64("cpu_usage", cpuUsage);
writeQueryBuilder.AddUint32("ic_port", icPort);
writeQueryBuilder.AddString("node_address", nodeAddress);
writeQueryBuilder.AddString("data_center", dataCenter);
writeQueryBuilder.AddText(
"UPSERT INTO `" NODES_TABLE_NAME "`\n"
"(`" TENANT_COLUMN_NAME "`, `" NODE_ID_COLUMN_NAME "`, `" INSTANCE_ID_COLUMN_NAME "`,\n"
"`" HOST_NAME_COLUMN_NAME "`, `" EXPIRE_AT_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`,\n"
"`" MEMORY_LIMIT_COLUMN_NAME "`, `" MEMORY_ALLOCATED_COLUMN_NAME "`, `" INTERCONNECT_PORT_COLUMN_NAME "`,\n"
"`" NODE_ADDRESS_COLUMN_NAME "`, `" DATA_CENTER_COLUMN_NAME"`)\n"
"`" MEMORY_LIMIT_COLUMN_NAME "`, `" MEMORY_ALLOCATED_COLUMN_NAME "`, `" CPU_USAGE_COLUMN_NAME "`,\n"
"`" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME "`, `" DATA_CENTER_COLUMN_NAME"`)\n"
"VALUES ($tenant ,$node_id, $instance_id, $hostname, $deadline, $active_workers, $memory_limit,\n"
"$memory_allocated, $ic_port, $node_address, $data_center);\n"
"$memory_allocated, $cpu_usage, $ic_port, $node_address, $data_center);\n"
);
const auto writeQuery = writeQueryBuilder.Build();
return std::make_pair(writeQuery.Sql, writeQuery.Params);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/control_plane_storage/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ namespace NFq {
#define ACTIVE_WORKERS_COLUMN_NAME "active_workers"
#define MEMORY_LIMIT_COLUMN_NAME "memory_limit"
#define MEMORY_ALLOCATED_COLUMN_NAME "memory_allocated"
#define CPU_USAGE_COLUMN_NAME "cpu_usage"
#define INTERCONNECT_PORT_COLUMN_NAME "interconnect_port"
#define NODE_ADDRESS_COLUMN_NAME "node_address"
#define DATA_CENTER_COLUMN_NAME "data_center"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ void TYdbControlPlaneStorageActor::CreateNodesTable()
.AddNullableColumn(ACTIVE_WORKERS_COLUMN_NAME, EPrimitiveType::Uint64)
.AddNullableColumn(MEMORY_LIMIT_COLUMN_NAME, EPrimitiveType::Uint64)
.AddNullableColumn(MEMORY_ALLOCATED_COLUMN_NAME, EPrimitiveType::Uint64)
.AddNullableColumn(CPU_USAGE_COLUMN_NAME, EPrimitiveType::Uint64)
.AddNullableColumn(EXPIRE_AT_COLUMN_NAME, EPrimitiveType::Timestamp)
.AddNullableColumn(INTERCONNECT_PORT_COLUMN_NAME, EPrimitiveType::Uint32)
.AddNullableColumn(NODE_ADDRESS_COLUMN_NAME, EPrimitiveType::String)
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,11 @@ void Init(

ui64 mkqlInitialMemoryLimit = 8_GB;
auto taskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr;
auto userCounters = appData->Counters->GetSubgroup("counters", "utils");
auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters(
yqCounters->GetSubgroup("subsystem", "worker_manager"),
taskCounters);
taskCounters,
userCounters);

if (protoConfig.GetResourceManager().GetEnabled()) {
mkqlInitialMemoryLimit = protoConfig.GetResourceManager().GetMkqlInitialMemoryLimit();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/protos/fq_private.proto
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ message NodeInfo {
uint32 interconnect_port = 7;
string node_address = 8;
string data_center = 9;
uint64 cpu_usage = 10;
}

message NodesHealthCheckRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ namespace NYql::NDqs {

TWorkerManagerCounters::TWorkerManagerCounters(
::NMonitoring::TDynamicCounterPtr root,
::NMonitoring::TDynamicCounterPtr taskCounters)
::NMonitoring::TDynamicCounterPtr taskCounters,
::NMonitoring::TDynamicCounterPtr userCounters)
: ActiveWorkers(root->GetCounter("ActiveWorkers"))
, MkqlMemoryLimit(root->GetCounter("MkqlMemoryLimit"))
, MkqlMemoryAllocated(root->GetCounter("MkqlMemoryAllocated"))
, FreeGroupError(root->GetCounter("FreeGroupError"))
, ElapsedMicrosec(userCounters->GetCounter("Scheduler/ElapsedMicrosec"))
, TaskCounters(taskCounters)
{ }

TWorkerManagerCounters::TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root)
: TWorkerManagerCounters(
root->GetSubgroup("component", "lwm"),
root->GetSubgroup("component", "tasks"))
root->GetSubgroup("component", "tasks"),
root->GetSubgroup("counters", "utils")->GetSubgroup("execpool", "User"))
{ }

TWorkerManagerCounters::TWorkerManagerCounters()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ struct TWorkerManagerCounters {
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit;
::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryAllocated;
::NMonitoring::TDynamicCounters::TCounterPtr FreeGroupError;
::NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
::NMonitoring::TDynamicCounterPtr TaskCounters;

explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root);
explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root, ::NMonitoring::TDynamicCounterPtr taskCounters);
explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root,
::NMonitoring::TDynamicCounterPtr taskCounters,
::NMonitoring::TDynamicCounterPtr userCounters);
TWorkerManagerCounters();
};

Expand Down
54 changes: 54 additions & 0 deletions ydb/tests/fq/yds/test_smart_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
import os
import pytest

import ydb.public.api.protos.draft.fq_pb2 as fq
import ydb.public.api.protos.ydb_value_pb2 as ydb
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1


class TestSelects(object):
@yq_v1
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True)
def test_3_selects(self, client):
sql = R'''
pragma dq.Scheduler=@@{"type": "smart"}@@;
SELECT 1 AS SingleColumn;
SELECT "A" AS TextColumn;
SELECT 11 AS Column1, 22 AS Column2;
'''

client.create_yds_connection(name="myyds", database_id="FakeDatabaseId")
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id

client.wait_query(query_id, 30)

rs = client.get_result_data(query_id, result_set_index=0).result.result_set
logging.debug(str(rs))
assert len(rs.rows) == 1
assert len(rs.columns) == 1
assert rs.columns[0].name == "SingleColumn"
assert rs.columns[0].type.type_id == ydb.Type.INT32
assert rs.rows[0].items[0].int32_value == 1

rs = client.get_result_data(query_id, result_set_index=1).result.result_set
logging.debug(str(rs))
assert len(rs.rows) == 1
assert len(rs.columns) == 1
assert rs.columns[0].name == "TextColumn"
assert rs.columns[0].type.type_id == ydb.Type.STRING
assert rs.rows[0].items[0].bytes_value == b"A"

rs = client.get_result_data(query_id, result_set_index=2).result.result_set
logging.debug(str(rs))
assert len(rs.rows) == 1
assert len(rs.columns) == 2
assert rs.columns[0].name == "Column1"
assert rs.columns[0].type.type_id == ydb.Type.INT32
assert rs.rows[0].items[0].int32_value == 11
assert rs.columns[1].name == "Column2"
assert rs.columns[1].type.type_id == ydb.Type.INT32
assert rs.rows[0].items[1].int32_value == 22
1 change: 1 addition & 0 deletions ydb/tests/fq/yds/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ TEST_SRCS(
test_select_limit_db_id.py
test_select_limit.py
test_select_timings.py
test_smart_scheduler.py
test_stop.py
test_watermarks.py
test_yds_bindings.py
Expand Down
Loading