Skip to content

Commit dadb557

Browse files
authored
single node scheduler has been added (#8445)
1 parent fe3a015 commit dadb557

File tree

6 files changed

+139
-68
lines changed

6 files changed

+139
-68
lines changed

ydb/core/fq/libs/actors/nodes_manager.cpp

Lines changed: 132 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
#include <util/system/hostname.h>
1717
#include <ydb/library/services/services.pb.h>
1818

19+
#include <library/cpp/scheme/scheme.h>
20+
21+
#include <random>
22+
1923

2024
#define LOG_E(stream) \
2125
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_NODES_MANAGER, stream)
@@ -86,93 +90,148 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
8690
private:
8791
void Handle(NDqs::TEvAllocateWorkersRequest::TPtr& ev) {
8892
ServiceCounters.Counters->GetCounter("EvAllocateWorkersRequest", true)->Inc();
89-
const auto &rec = ev->Get()->Record;
90-
const auto count = rec.GetCount();
91-
92-
auto req = MakeHolder<NDqs::TEvAllocateWorkersResponse>();
93+
const auto &request = ev->Get()->Record;
94+
const auto count = request.GetCount();
95+
auto scheduler = request.GetScheduler();
9396

97+
auto response = MakeHolder<NDqs::TEvAllocateWorkersResponse>();
9498
if (count == 0) {
95-
auto& error = *req->Record.MutableError();
99+
auto& error = *response->Record.MutableError();
96100
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
97101
error.SetMessage("Incorrect request - 0 nodes requested");
102+
} else if (!scheduler) {
103+
ScheduleUniformly(request, response);
98104
} else {
99-
auto resourceId = rec.GetResourceId();
100-
if (!resourceId) {
101-
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
105+
try {
106+
auto schedulerSettings = NSc::TValue::FromJsonThrow(scheduler);
107+
auto schedulerType = schedulerSettings["type"].GetString();
108+
if (schedulerType == "single_node") {
109+
ScheduleOnSingleNode(request, response);
110+
} else {
111+
auto& error = *response->Record.MutableError();
112+
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
113+
error.SetMessage(TStringBuilder{} << "Unknown scheduler type: " << schedulerType << ", settings: " << scheduler);
114+
}
115+
} catch (...) {
116+
auto& error = *response->Record.MutableError();
117+
error.SetStatusCode(NYql::NDqProto::StatusIds::BAD_REQUEST);
118+
error.SetMessage(TStringBuilder{} << "Error choosing scheduler. Invalid settings: " << scheduler << ", error: " << CurrentExceptionMessage());
102119
}
120+
}
121+
LOG_D("TEvAllocateWorkersResponse " << response->Record.DebugString());
103122

104-
bool placementFailure = false;
105-
ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic());
106-
ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
107-
TVector<TPeer> nodes;
108-
for (ui32 i = 0; i < count; ++i) {
109-
ui64 totalMemoryLimit = 0;
110-
if (rec.TaskSize() > i) {
111-
totalMemoryLimit = rec.GetTask(i).GetInitialTaskMemoryLimit();
112-
}
113-
if (totalMemoryLimit == 0) {
114-
totalMemoryLimit = MkqlInitialMemoryLimit;
115-
}
116-
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
117-
bool selfPlacement = true;
118-
if (!Peers.empty()) {
119-
auto FirstPeer = NextPeer;
120-
while (true) {
121-
Y_ABORT_UNLESS(NextPeer < Peers.size());
122-
auto& nextNode = Peers[NextPeer];
123-
124-
if (++NextPeer >= Peers.size()) {
125-
NextPeer = 0;
126-
}
123+
Send(ev->Sender, response.Release());
124+
}
127125

128-
if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match
129-
&& ( nextNode.MemoryLimit == 0 // memory is NOT limited
130-
|| nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough
131-
) {
132-
// adjust allocated size to place next tasks correctly, will be reset after next health check
133-
nextNode.MemoryAllocated += totalMemoryLimit;
134-
if (nextNode.NodeId == SelfId().NodeId()) {
135-
// eventually synced self allocation info
136-
memoryAllocated += totalMemoryLimit;
137-
}
138-
node = nextNode;
139-
selfPlacement = false;
140-
break;
141-
}
126+
void ScheduleUniformly(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder<NDqs::TEvAllocateWorkersResponse>& response) {
127+
const auto count = request.GetCount();
128+
auto resourceId = request.GetResourceId();
129+
if (!resourceId) {
130+
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
131+
}
132+
133+
bool placementFailure = false;
134+
ui64 memoryLimit = AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic());
135+
ui64 memoryAllocated = AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic());
136+
TVector<TPeer> nodes;
137+
for (ui32 i = 0; i < count; ++i) {
138+
ui64 totalMemoryLimit = 0;
139+
if (request.TaskSize() > i) {
140+
totalMemoryLimit = request.GetTask(i).GetInitialTaskMemoryLimit();
141+
}
142+
if (totalMemoryLimit == 0) {
143+
totalMemoryLimit = MkqlInitialMemoryLimit;
144+
}
145+
TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0, DataCenter};
146+
bool selfPlacement = true;
147+
if (!Peers.empty()) {
148+
auto FirstPeer = NextPeer;
149+
while (true) {
150+
Y_ABORT_UNLESS(NextPeer < Peers.size());
151+
auto& nextNode = Peers[NextPeer];
152+
153+
if (++NextPeer >= Peers.size()) {
154+
NextPeer = 0;
155+
}
142156

143-
if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then
144-
break;
157+
if ( (!UseDataCenter || DataCenter.empty() || nextNode.DataCenter.empty() || DataCenter == nextNode.DataCenter) // non empty DC must match
158+
&& ( nextNode.MemoryLimit == 0 // memory is NOT limited
159+
|| nextNode.MemoryLimit >= nextNode.MemoryAllocated + totalMemoryLimit) // or enough
160+
) {
161+
// adjust allocated size to place next tasks correctly, will be reset after next health check
162+
nextNode.MemoryAllocated += totalMemoryLimit;
163+
if (nextNode.NodeId == SelfId().NodeId()) {
164+
// eventually synced self allocation info
165+
memoryAllocated += totalMemoryLimit;
145166
}
167+
node = nextNode;
168+
selfPlacement = false;
169+
break;
146170
}
147-
}
148-
if (selfPlacement) {
149-
if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) {
150-
memoryAllocated += totalMemoryLimit;
151-
} else {
152-
placementFailure = true;
153-
auto& error = *req->Record.MutableError();
154-
error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED);
155-
error.SetMessage("Not enough free memory in the cluster");
171+
172+
if (NextPeer == FirstPeer) { // we closed loop w/o success, fallback to self placement then
156173
break;
157174
}
158175
}
159-
nodes.push_back(node);
160176
}
161-
162-
if (!placementFailure) {
163-
req->Record.ClearError();
164-
auto& group = *req->Record.MutableNodes();
165-
group.SetResourceId(resourceId);
166-
for (const auto& node : nodes) {
167-
auto* worker = group.AddWorker();
168-
*worker->MutableGuid() = node.InstanceId;
169-
worker->SetNodeId(node.NodeId);
177+
if (selfPlacement) {
178+
if (memoryLimit == 0 || memoryLimit >= memoryAllocated + totalMemoryLimit) {
179+
memoryAllocated += totalMemoryLimit;
180+
} else {
181+
placementFailure = true;
182+
auto& error = *response->Record.MutableError();
183+
error.SetStatusCode(NYql::NDqProto::StatusIds::CLUSTER_OVERLOADED);
184+
error.SetMessage("Not enough free memory in the cluster");
185+
break;
170186
}
171187
}
188+
nodes.push_back(node);
172189
}
173-
LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString());
174190

175-
Send(ev->Sender, req.Release());
191+
if (!placementFailure) {
192+
response->Record.ClearError();
193+
auto& group = *response->Record.MutableNodes();
194+
group.SetResourceId(resourceId);
195+
for (const auto& node : nodes) {
196+
auto* worker = group.AddWorker();
197+
*worker->MutableGuid() = node.InstanceId;
198+
worker->SetNodeId(node.NodeId);
199+
}
200+
}
201+
}
202+
203+
void ScheduleOnSingleNode(const NYql::NDqProto::TAllocateWorkersRequest& request, THolder<NDqs::TEvAllocateWorkersResponse>& response) {
204+
const auto count = request.GetCount();
205+
auto resourceId = request.GetResourceId();
206+
if (!resourceId) {
207+
resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId();
208+
}
209+
210+
if (Peers.size() != SingleNodeScheduler.NodeOrder.size()) {
211+
SingleNodeScheduler.NodeOrder.clear();
212+
for (ui32 i = 0; i < Peers.size(); i++) {
213+
SingleNodeScheduler.NodeOrder.push_back(i);
214+
}
215+
std::shuffle(SingleNodeScheduler.NodeOrder.begin(), SingleNodeScheduler.NodeOrder.end(), std::default_random_engine(TInstant::Now().MicroSeconds()));
216+
}
217+
218+
TVector<TPeer> nodes;
219+
for (ui32 i = 0; i < count; ++i) {
220+
Y_ABORT_UNLESS(NextPeer < Peers.size());
221+
nodes.push_back(Peers[SingleNodeScheduler.NodeOrder[NextPeer]]);
222+
}
223+
if (++NextPeer >= Peers.size()) {
224+
NextPeer = 0;
225+
}
226+
227+
response->Record.ClearError();
228+
auto& group = *response->Record.MutableNodes();
229+
group.SetResourceId(resourceId);
230+
for (const auto& node : nodes) {
231+
auto* worker = group.AddWorker();
232+
*worker->MutableGuid() = node.InstanceId;
233+
worker->SetNodeId(node.NodeId);
234+
}
176235
}
177236

178237
void Handle(NDqs::TEvFreeWorkersNotify::TPtr&) {
@@ -338,6 +397,11 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
338397
TString Address;
339398
::NMonitoring::TDynamicCounters::TCounterPtr AnonRssSize;
340399
::NMonitoring::TDynamicCounters::TCounterPtr AnonRssLimit;
400+
401+
struct TSingleNodeScheduler {
402+
TVector<int> NodeOrder;
403+
};
404+
TSingleNodeScheduler SingleNodeScheduler;
341405
};
342406

343407
TActorId MakeNodesManagerId() {

ydb/library/yql/providers/dq/actors/executer_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
191191

192192

193193
const TString computeActorType = Settings->ComputeActorType.Get().GetOrElse("sync");
194+
const TString scheduler = Settings->Scheduler.Get().GetOrElse({});
194195

195196
auto resourceAllocator = RegisterChild(CreateResourceAllocator(
196197
GwmActorId, SelfId(), ControlId, workerCount,
@@ -204,6 +205,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
204205
allocateRequest->Record.SetCreateComputeActor(enableComputeActor);
205206
allocateRequest->Record.SetComputeActorType(computeActorType);
206207
allocateRequest->Record.SetStatsMode(StatsMode);
208+
allocateRequest->Record.SetScheduler(scheduler);
207209
if (enableComputeActor) {
208210
ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId());
209211
}

ydb/library/yql/providers/dq/api/protos/dqs.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ message TAllocateWorkersRequest {
4040
uint64 FreeWorkerAfterMs = 14;
4141
NYql.NDqProto.EDqStatsMode StatsMode = 16;
4242
reserved 17;
43+
string Scheduler = 18;
4344
}
4445

4546
message TWorkerGroup {

ydb/library/yql/providers/dq/common/yql_dq_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ TDqConfiguration::TDqConfiguration() {
116116
return res;
117117
});
118118
REGISTER_SETTING(*this, UseGraceJoinCoreForMap);
119+
REGISTER_SETTING(*this, Scheduler);
119120
}
120121

121122
} // namespace NYql

ydb/library/yql/providers/dq/common/yql_dq_settings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ struct TDqSettings {
139139
NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
140140
NCommon::TConfSetting<bool, false> DisableCheckpoints;
141141
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;
142+
NCommon::TConfSetting<TString, false> Scheduler;
142143

143144
// This options will be passed to executor_actor and worker_actor
144145
template <typename TProtoConfig>
@@ -193,6 +194,7 @@ struct TDqSettings {
193194
SAVE_SETTING(SpillingEngine);
194195
SAVE_SETTING(EnableSpillingInChannels);
195196
SAVE_SETTING(DisableCheckpoints);
197+
SAVE_SETTING(Scheduler);
196198
#undef SAVE_SETTING
197199
}
198200

ydb/tests/fq/yds/test_3_selects.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class TestSelects(object):
1515
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": os.getenv("YDB_ENDPOINT")}], indirect=True)
1616
def test_3_selects(self, client):
1717
sql = R'''
18+
pragma dq.Scheduler=@@{"type": "single_node"}@@;
1819
SELECT 1 AS SingleColumn;
1920
SELECT "A" AS TextColumn;
2021
SELECT 11 AS Column1, 22 AS Column2;

0 commit comments

Comments
 (0)