From 81f0a948b105a1c5fdad1da34df1f9c4ed77f6db Mon Sep 17 00:00:00 2001 From: dzen03 <43926347+dzen03@users.noreply.github.com> Date: Thu, 12 Jun 2025 12:30:31 +0000 Subject: [PATCH] add new FQ scheduler --- ydb/core/fq/libs/actors/nodes_manager.cpp | 148 +++++++++++++++++- .../internal/nodes_health_check.cpp | 14 +- .../fq/libs/control_plane_storage/schema.h | 1 + .../ydb_control_plane_storage.cpp | 1 + ydb/core/fq/libs/init/init.cpp | 4 +- ydb/core/fq/libs/protos/fq_private.proto | 1 + .../dq/worker_manager/interface/counters.cpp | 7 +- .../dq/worker_manager/interface/counters.h | 5 +- ydb/tests/fq/yds/test_smart_scheduler.py | 54 +++++++ ydb/tests/fq/yds/ya.make | 1 + 10 files changed, 225 insertions(+), 11 deletions(-) create mode 100644 ydb/tests/fq/yds/test_smart_scheduler.py diff --git a/ydb/core/fq/libs/actors/nodes_manager.cpp b/ydb/core/fq/libs/actors/nodes_manager.cpp index e36efd4b69c2..40b1a8eea4fd 100644 --- a/ydb/core/fq/libs/actors/nodes_manager.cpp +++ b/ydb/core/fq/libs/actors/nodes_manager.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -107,6 +108,8 @@ class TNodesManagerActor : public NActors::TActorBootstrappedRecord.MutableError(); error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST); @@ -142,7 +145,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped& response) { + const auto count = request.GetCount(); + auto resourceId = request.GetResourceId(); + if (!resourceId) { + resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); + } + + TVector tasks; + tasks.reserve(count); + for (ui32 i = 0; i < count; ++i) { + tasks.push_back(&request.GetTask(i)); + } + + // adjacency list + auto findIdx = [&](ui64 taskId) -> int { + for (int i = 0; i < (int)count; ++i) { + if (tasks[i]->GetId() == taskId) return i; + } + return -1; + }; + TVector>> adj(count); + for (ui32 i = 0; i < count; ++i) { + auto* src = tasks[i]; + double outPred = src->GetProgram().GetSettings().GetOutputDataPrediction(); + for (const auto& inp : src->GetOutputs()) { + for (const auto& ch : inp.GetChannels()) { + int dst = findIdx(ch.GetDstTaskId()); + if (dst >= 0) { + adj[i].emplace_back(dst, outPred); + adj[dst].emplace_back(i, outPred); + } + } + } + } + + // clusterizing + TVector comp(count, -1); + int compCnt = 0; + for (ui32 i = 0; i < count; ++i) { + if (comp[i] >= 0) continue; + // BFS + TVector q{ i }; + comp[i] = compCnt; + for (size_t qi = 0; qi < q.size(); ++qi) { + int u = q[qi]; + for (auto& [v, _] : adj[u]) { + if (comp[v] < 0) { + comp[v] = compCnt; + q.push_back(v); + } + } + } + ++compCnt; + } + + // grouping + struct TCluster { TVector Idxs; ui64 MemSum = 0; double CpuScore = 0.0; }; + TVector clusters(compCnt); + for (ui32 i = 0; i < count; ++i) { + clusters[comp[i]].Idxs.push_back(i); + } + + for (auto& cl : clusters) { + for (int idx : cl.Idxs) { + const auto* t = tasks[idx]; + ui64 mem = t->GetInitialTaskMemoryLimit() + ? t->GetInitialTaskMemoryLimit() + : MkqlInitialMemoryLimit; + cl.MemSum += mem; + + const auto& s = t->GetProgram().GetSettings(); + cl.CpuScore + += s.GetHasSort() ? 2.0 : 0.0 + + s.GetHasMapJoin() ? 2.0 : 0.0 + + s.GetHasUdf() ? 1.5 : 0.0 + + s.GetHasStateAggregation() ? 1.5 : 0.0; + } + } + + // biggest one first + std::sort(clusters.begin(), clusters.end(), + [&](auto& a, auto& b){ return a.Idxs.size() > b.Idxs.size(); }); + + bool placementFailure = false; + double cpuThreshold = 1000.0; + + TVector assignment(count); + + for (auto& cl : clusters) { + TPeer chosenNode; + bool placed = false; + + for (bool requireCpu : {true, false}) { + ui32 startPeer = NextPeer; + ui32 peerIdx = startPeer; + do { + auto& peer = Peers[peerIdx]; + if ((!UseDataCenter || DataCenter.empty() || peer.DataCenter.empty() || DataCenter == peer.DataCenter) + && (peer.MemoryLimit == 0 || peer.MemoryLimit >= peer.MemoryAllocated + cl.MemSum) + && (!requireCpu || peer.CpuUsage + cl.CpuScore < cpuThreshold)) + { + peer.MemoryAllocated += cl.MemSum; + peer.CpuUsage += cl.CpuScore; + chosenNode = peer; + placed = true; + break; + } + if (++peerIdx >= Peers.size()) peerIdx = 0; + } while (peerIdx != startPeer); + + if (placed) break; + } + + if (!placed) { + placementFailure = true; + auto& error = *response->Record.MutableError(); + error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED); + error.SetMessage("Не удалось разместить кластер из-за нехватки ресурсов"); + break; + } + + for (int idx : cl.Idxs) { + assignment[idx] = chosenNode; + } + } + + if (!placementFailure) { + response->Record.ClearError(); + auto& group = *response->Record.MutableNodes(); + group.SetResourceId(resourceId); + for (ui32 i = 0; i < count; ++i) { + const auto& node = assignment[i]; + auto* worker = group.AddWorker(); + *worker->MutableGuid() = node.InstanceId; + worker->SetNodeId(node.NodeId); + } + } + } + void Handle(NDqs::TEvFreeWorkersNotify::TPtr&) { ServiceCounters.Counters->GetCounter("EvFreeWorkersNotify", true)->Inc(); } @@ -305,6 +447,7 @@ class TNodesManagerActor : public NActors::TActorBootstrappedGetAtomic())); node.set_memory_limit(AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic())); node.set_memory_allocated(AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic())); + node.set_cpu_usage(AtomicGet(WorkerManagerCounters.ElapsedMicrosec->GetAtomic())); node.set_interconnect_port(IcPort); node.set_data_center(DataCenter); Send(InternalServiceId, new NFq::TEvInternalService::TEvHealthCheckRequest(request)); @@ -336,7 +479,7 @@ class TNodesManagerActor : public NActors::TActorBootstrappedemplace_back(TEvInterconnect::TNodeInfo{ @@ -387,6 +530,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped Peers; diff --git a/ydb/core/fq/libs/control_plane_storage/internal/nodes_health_check.cpp b/ydb/core/fq/libs/control_plane_storage/internal/nodes_health_check.cpp index 8eb514a9b26e..920ecefc8a16 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/nodes_health_check.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/nodes_health_check.cpp @@ -29,6 +29,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth const ui64 activeWorkers = node.active_workers(); const ui64 memoryLimit = node.memory_limit(); const ui64 memoryAllocated = node.memory_allocated(); + const ui64 cpuUsage = node.cpu_usage(); const ui32 icPort = node.interconnect_port(); const TString dataCenter = node.data_center(); const TString nodeAddress = node.node_address(); @@ -54,6 +55,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth node->set_active_workers(activeWorkers); node->set_memory_limit(memoryLimit); node->set_memory_allocated(memoryAllocated); + node->set_cpu_usage(cpuUsage); node->set_node_address(nodeAddress); node->set_data_center(dataCenter); } @@ -63,8 +65,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth readQueryBuilder.AddString("tenant", tenant); readQueryBuilder.AddText( "SELECT `" NODE_ID_COLUMN_NAME "`, `" INSTANCE_ID_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`, `" MEMORY_LIMIT_COLUMN_NAME"`, " - "`" MEMORY_ALLOCATED_COLUMN_NAME"`, `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME"`, `" DATA_CENTER_COLUMN_NAME "` FROM `" NODES_TABLE_NAME "`\n" - "WHERE `" TENANT_COLUMN_NAME"` = $tenant AND `" EXPIRE_AT_COLUMN_NAME "` >= $now;\n" + "`" MEMORY_ALLOCATED_COLUMN_NAME"`, `" CPU_USAGE_COLUMN_NAME "` `" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME"`, `" DATA_CENTER_COLUMN_NAME "`\n" + "FROM `" NODES_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME"` = $tenant AND `" EXPIRE_AT_COLUMN_NAME "` >= $now;\n" ); auto prepareParams = [=, tablePathPrefix=YdbConnection->TablePathPrefix](const std::vector& resultSets) { @@ -80,6 +82,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth node->set_active_workers(*parser.ColumnParser(ACTIVE_WORKERS_COLUMN_NAME).GetOptionalUint64()); node->set_memory_limit(*parser.ColumnParser(MEMORY_LIMIT_COLUMN_NAME).GetOptionalUint64()); node->set_memory_allocated(*parser.ColumnParser(MEMORY_ALLOCATED_COLUMN_NAME).GetOptionalUint64()); + node->set_cpu_usage(*parser.ColumnParser(CPU_USAGE_COLUMN_NAME).GetOptionalUint64()); node->set_interconnect_port(parser.ColumnParser(INTERCONNECT_PORT_COLUMN_NAME).GetOptionalUint32().value_or(0)); node->set_node_address(*parser.ColumnParser(NODE_ADDRESS_COLUMN_NAME).GetOptionalString()); node->set_data_center(*parser.ColumnParser(DATA_CENTER_COLUMN_NAME).GetOptionalString()); @@ -96,6 +99,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth writeQueryBuilder.AddUint64("active_workers", activeWorkers); writeQueryBuilder.AddUint64("memory_limit", memoryLimit); writeQueryBuilder.AddUint64("memory_allocated", memoryAllocated); + writeQueryBuilder.AddUint64("cpu_usage", cpuUsage); writeQueryBuilder.AddUint32("ic_port", icPort); writeQueryBuilder.AddString("node_address", nodeAddress); writeQueryBuilder.AddString("data_center", dataCenter); @@ -103,10 +107,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth "UPSERT INTO `" NODES_TABLE_NAME "`\n" "(`" TENANT_COLUMN_NAME "`, `" NODE_ID_COLUMN_NAME "`, `" INSTANCE_ID_COLUMN_NAME "`,\n" "`" HOST_NAME_COLUMN_NAME "`, `" EXPIRE_AT_COLUMN_NAME "`, `" ACTIVE_WORKERS_COLUMN_NAME"`,\n" - "`" MEMORY_LIMIT_COLUMN_NAME "`, `" MEMORY_ALLOCATED_COLUMN_NAME "`, `" INTERCONNECT_PORT_COLUMN_NAME "`,\n" - "`" NODE_ADDRESS_COLUMN_NAME "`, `" DATA_CENTER_COLUMN_NAME"`)\n" + "`" MEMORY_LIMIT_COLUMN_NAME "`, `" MEMORY_ALLOCATED_COLUMN_NAME "`, `" CPU_USAGE_COLUMN_NAME "`,\n" + "`" INTERCONNECT_PORT_COLUMN_NAME "`, `" NODE_ADDRESS_COLUMN_NAME "`, `" DATA_CENTER_COLUMN_NAME"`)\n" "VALUES ($tenant ,$node_id, $instance_id, $hostname, $deadline, $active_workers, $memory_limit,\n" - "$memory_allocated, $ic_port, $node_address, $data_center);\n" + "$memory_allocated, $cpu_usage, $ic_port, $node_address, $data_center);\n" ); const auto writeQuery = writeQueryBuilder.Build(); return std::make_pair(writeQuery.Sql, writeQuery.Params); diff --git a/ydb/core/fq/libs/control_plane_storage/schema.h b/ydb/core/fq/libs/control_plane_storage/schema.h index a3e52bfeeb9e..0eaef40832f8 100644 --- a/ydb/core/fq/libs/control_plane_storage/schema.h +++ b/ydb/core/fq/libs/control_plane_storage/schema.h @@ -65,6 +65,7 @@ namespace NFq { #define ACTIVE_WORKERS_COLUMN_NAME "active_workers" #define MEMORY_LIMIT_COLUMN_NAME "memory_limit" #define MEMORY_ALLOCATED_COLUMN_NAME "memory_allocated" +#define CPU_USAGE_COLUMN_NAME "cpu_usage" #define INTERCONNECT_PORT_COLUMN_NAME "interconnect_port" #define NODE_ADDRESS_COLUMN_NAME "node_address" #define DATA_CENTER_COLUMN_NAME "data_center" diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 4ece7c84c3a1..5e01c52e68ea 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -193,6 +193,7 @@ void TYdbControlPlaneStorageActor::CreateNodesTable() .AddNullableColumn(ACTIVE_WORKERS_COLUMN_NAME, EPrimitiveType::Uint64) .AddNullableColumn(MEMORY_LIMIT_COLUMN_NAME, EPrimitiveType::Uint64) .AddNullableColumn(MEMORY_ALLOCATED_COLUMN_NAME, EPrimitiveType::Uint64) + .AddNullableColumn(CPU_USAGE_COLUMN_NAME, EPrimitiveType::Uint64) .AddNullableColumn(EXPIRE_AT_COLUMN_NAME, EPrimitiveType::Timestamp) .AddNullableColumn(INTERCONNECT_PORT_COLUMN_NAME, EPrimitiveType::Uint32) .AddNullableColumn(NODE_ADDRESS_COLUMN_NAME, EPrimitiveType::String) diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index eb4ce8ac2c43..e4441a039ac8 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -278,9 +278,11 @@ void Init( ui64 mkqlInitialMemoryLimit = 8_GB; auto taskCounters = protoConfig.GetEnableTaskCounters() ? appData->Counters->GetSubgroup("counters", "dq_tasks") : nullptr; + auto userCounters = appData->Counters->GetSubgroup("counters", "utils"); auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters( yqCounters->GetSubgroup("subsystem", "worker_manager"), - taskCounters); + taskCounters, + userCounters); if (protoConfig.GetResourceManager().GetEnabled()) { mkqlInitialMemoryLimit = protoConfig.GetResourceManager().GetMkqlInitialMemoryLimit(); diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto index 9ba685d81057..3a1b0127b9e8 100644 --- a/ydb/core/fq/libs/protos/fq_private.proto +++ b/ydb/core/fq/libs/protos/fq_private.proto @@ -210,6 +210,7 @@ message NodeInfo { uint32 interconnect_port = 7; string node_address = 8; string data_center = 9; + uint64 cpu_usage = 10; } message NodesHealthCheckRequest { diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp index 2d325d0fe6ff..f7e63b0f0ddc 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp @@ -4,18 +4,21 @@ namespace NYql::NDqs { TWorkerManagerCounters::TWorkerManagerCounters( ::NMonitoring::TDynamicCounterPtr root, - ::NMonitoring::TDynamicCounterPtr taskCounters) + ::NMonitoring::TDynamicCounterPtr taskCounters, + ::NMonitoring::TDynamicCounterPtr userCounters) : ActiveWorkers(root->GetCounter("ActiveWorkers")) , MkqlMemoryLimit(root->GetCounter("MkqlMemoryLimit")) , MkqlMemoryAllocated(root->GetCounter("MkqlMemoryAllocated")) , FreeGroupError(root->GetCounter("FreeGroupError")) + , ElapsedMicrosec(userCounters->GetCounter("Scheduler/ElapsedMicrosec")) , TaskCounters(taskCounters) { } TWorkerManagerCounters::TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root) : TWorkerManagerCounters( root->GetSubgroup("component", "lwm"), - root->GetSubgroup("component", "tasks")) + root->GetSubgroup("component", "tasks"), + root->GetSubgroup("counters", "utils")->GetSubgroup("execpool", "User")) { } TWorkerManagerCounters::TWorkerManagerCounters() diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/counters.h b/ydb/library/yql/providers/dq/worker_manager/interface/counters.h index 07c6c6f4d6c7..68663bdb7af3 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/counters.h +++ b/ydb/library/yql/providers/dq/worker_manager/interface/counters.h @@ -8,10 +8,13 @@ struct TWorkerManagerCounters { ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit; ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryAllocated; ::NMonitoring::TDynamicCounters::TCounterPtr FreeGroupError; + ::NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; ::NMonitoring::TDynamicCounterPtr TaskCounters; explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root); - explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root, ::NMonitoring::TDynamicCounterPtr taskCounters); + explicit TWorkerManagerCounters(::NMonitoring::TDynamicCounterPtr root, + ::NMonitoring::TDynamicCounterPtr taskCounters, + ::NMonitoring::TDynamicCounterPtr userCounters); TWorkerManagerCounters(); }; diff --git a/ydb/tests/fq/yds/test_smart_scheduler.py b/ydb/tests/fq/yds/test_smart_scheduler.py new file mode 100644 index 000000000000..6c763e335bba --- /dev/null +++ b/ydb/tests/fq/yds/test_smart_scheduler.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import logging +import os +import pytest + +import ydb.public.api.protos.draft.fq_pb2 as fq +import ydb.public.api.protos.ydb_value_pb2 as ydb +from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1 + + +class TestSelects(object): + @yq_v1 + @pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True) + def test_3_selects(self, client): + sql = R''' + pragma dq.Scheduler=@@{"type": "smart"}@@; + SELECT 1 AS SingleColumn; + SELECT "A" AS TextColumn; + SELECT 11 AS Column1, 22 AS Column2; + ''' + + client.create_yds_connection(name="myyds", database_id="FakeDatabaseId") + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id + + client.wait_query(query_id, 30) + + rs = client.get_result_data(query_id, result_set_index=0).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 1 + assert len(rs.columns) == 1 + assert rs.columns[0].name == "SingleColumn" + assert rs.columns[0].type.type_id == ydb.Type.INT32 + assert rs.rows[0].items[0].int32_value == 1 + + rs = client.get_result_data(query_id, result_set_index=1).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 1 + assert len(rs.columns) == 1 + assert rs.columns[0].name == "TextColumn" + assert rs.columns[0].type.type_id == ydb.Type.STRING + assert rs.rows[0].items[0].bytes_value == b"A" + + rs = client.get_result_data(query_id, result_set_index=2).result.result_set + logging.debug(str(rs)) + assert len(rs.rows) == 1 + assert len(rs.columns) == 2 + assert rs.columns[0].name == "Column1" + assert rs.columns[0].type.type_id == ydb.Type.INT32 + assert rs.rows[0].items[0].int32_value == 11 + assert rs.columns[1].name == "Column2" + assert rs.columns[1].type.type_id == ydb.Type.INT32 + assert rs.rows[0].items[1].int32_value == 22 diff --git a/ydb/tests/fq/yds/ya.make b/ydb/tests/fq/yds/ya.make index 9b4f7401f32c..dabe639d78d7 100644 --- a/ydb/tests/fq/yds/ya.make +++ b/ydb/tests/fq/yds/ya.make @@ -48,6 +48,7 @@ TEST_SRCS( test_select_limit_db_id.py test_select_limit.py test_select_timings.py + test_smart_scheduler.py test_stop.py test_watermarks.py test_yds_bindings.py