@@ -28,12 +28,13 @@ struct TCoordinatorMetrics {
28
28
: Counters(counters) {
29
29
IncomingRequests = Counters->GetCounter (" IncomingRequests" , true );
30
30
LeaderChangedCount = Counters->GetCounter (" LeaderChangedCount" );
31
+ PartitionsLimitPerNode = Counters->GetCounter (" PartitionsLimitPerNode" );
31
32
}
32
33
33
34
::NMonitoring::TDynamicCounterPtr Counters;
34
35
::NMonitoring::TDynamicCounters::TCounterPtr IncomingRequests;
35
36
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChangedCount;
36
-
37
+ ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsLimitPerNode;
37
38
};
38
39
39
40
class TActorCoordinator : public TActorBootstrapped <TActorCoordinator> {
@@ -72,15 +73,79 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
72
73
THashSet<TPartitionKey, TPartitionKeyHash> Locations;
73
74
};
74
75
76
+ struct TCoordinatorRequest {
77
+ ui64 Cookie;
78
+ NRowDispatcherProto::TEvGetAddressRequest Record;
79
+ };
80
+
81
+ struct TTopicInfo {
82
+ struct TTopicMetrics {
83
+ TTopicMetrics (const TCoordinatorMetrics& metrics, const TString& topicNmae)
84
+ : Counters(metrics.Counters->GetSubgroup (" topic" , topicNmae))
85
+ {
86
+ PendingPartitions = Counters->GetCounter (" PendingPartitions" );
87
+ }
88
+
89
+ ::NMonitoring::TDynamicCounterPtr Counters;
90
+ ::NMonitoring::TDynamicCounters::TCounterPtr PendingPartitions;
91
+ };
92
+
93
+ struct TNodeMetrics {
94
+ TNodeMetrics (const TTopicMetrics& metrics, ui32 nodeId)
95
+ : Counters(metrics.Counters->GetSubgroup (" node" , ToString(nodeId)))
96
+ {
97
+ PartitionsCount = Counters->GetCounter (" PartitionsCount" );
98
+ }
99
+
100
+ ::NMonitoring::TDynamicCounterPtr Counters;
101
+ ::NMonitoring::TDynamicCounters::TCounterPtr PartitionsCount;
102
+ };
103
+
104
+ struct TNodeInfo {
105
+ ui64 NumberPartitions = 0 ;
106
+ TNodeMetrics Metrics;
107
+ };
108
+
109
+ TTopicInfo (const TCoordinatorMetrics& metrics, const TString& topicName)
110
+ : Metrics(metrics, topicName)
111
+ {}
112
+
113
+ void AddPendingPartition (const TPartitionKey& key) {
114
+ if (PendingPartitions.insert (key).second ) {
115
+ Metrics.PendingPartitions ->Inc ();
116
+ }
117
+ }
118
+
119
+ void RemovePendingPartition (const TPartitionKey& key) {
120
+ if (PendingPartitions.erase (key)) {
121
+ Metrics.PendingPartitions ->Dec ();
122
+ }
123
+ }
124
+
125
+ void IncNodeUsage (ui32 nodeId) {
126
+ auto nodeIt = NodesInfo.find (nodeId);
127
+ if (nodeIt == NodesInfo.end ()) {
128
+ nodeIt = NodesInfo.insert ({nodeId, TNodeInfo{.NumberPartitions = 0 , .Metrics = TNodeMetrics (Metrics, nodeId)}}).first ;
129
+ }
130
+ nodeIt->second .NumberPartitions ++;
131
+ nodeIt->second .Metrics .PartitionsCount ->Inc ();
132
+ }
133
+
134
+ THashSet<TPartitionKey, TPartitionKeyHash> PendingPartitions;
135
+ THashMap<ui32, TNodeInfo> NodesInfo;
136
+ TTopicMetrics Metrics;
137
+ };
138
+
75
139
NConfig::TRowDispatcherCoordinatorConfig Config;
76
140
TYqSharedResources::TPtr YqSharedResources;
77
141
TActorId LocalRowDispatcherId;
78
142
const TString LogPrefix;
79
143
const TString Tenant;
80
144
TMap<NActors::TActorId, RowDispatcherInfo> RowDispatchers;
81
145
THashMap<TPartitionKey, TActorId, TPartitionKeyHash> PartitionLocations;
146
+ THashMap<TString, TTopicInfo> TopicsInfo;
147
+ std::unordered_map<TActorId, TCoordinatorRequest> PendingReadActors;
82
148
TCoordinatorMetrics Metrics;
83
- ui64 LocationRandomCounter = 0 ;
84
149
THashSet<TActorId> InterconnectSessions;
85
150
86
151
public:
@@ -116,7 +181,10 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
116
181
117
182
void AddRowDispatcher(NActors::TActorId actorId, bool isLocal);
118
183
void PrintInternalState ();
119
- NActors::TActorId GetAndUpdateLocation (const TPartitionKey& key);
184
+ TTopicInfo& GetOrCreateTopicInfo (const TString& topicName);
185
+ std::optional<TActorId> GetAndUpdateLocation (const TPartitionKey& key); // std::nullopt if TopicPartitionsLimitPerNode reached
186
+ bool ComputeCoordinatorRequest (TActorId readActorId, const TCoordinatorRequest& request);
187
+ void UpdatePendingReadActors ();
120
188
void UpdateInterconnectSessions (const NActors::TActorId& interconnectSession);
121
189
};
122
190
@@ -131,7 +199,9 @@ TActorCoordinator::TActorCoordinator(
131
199
, LocalRowDispatcherId(localRowDispatcherId)
132
200
, LogPrefix(" Coordinator: " )
133
201
, Tenant(tenant)
134
- , Metrics(counters) {
202
+ , Metrics(counters)
203
+ {
204
+ Metrics.PartitionsLimitPerNode ->Set (Config.GetTopicPartitionsLimitPerNode ());
135
205
AddRowDispatcher (localRowDispatcherId, true );
136
206
}
137
207
@@ -145,6 +215,7 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal
145
215
auto it = RowDispatchers.find (actorId);
146
216
if (it != RowDispatchers.end ()) {
147
217
it->second .Connected = true ;
218
+ UpdatePendingReadActors ();
148
219
return ;
149
220
}
150
221
@@ -161,10 +232,12 @@ void TActorCoordinator::AddRowDispatcher(NActors::TActorId actorId, bool isLocal
161
232
auto node = RowDispatchers.extract (oldActorId);
162
233
node.key () = actorId;
163
234
RowDispatchers.insert (std::move (node));
235
+ UpdatePendingReadActors ();
164
236
return ;
165
237
}
166
238
167
239
RowDispatchers.emplace (actorId, RowDispatcherInfo{true , isLocal});
240
+ UpdatePendingReadActors ();
168
241
}
169
242
170
243
void TActorCoordinator::UpdateInterconnectSessions (const NActors::TActorId& interconnectSession) {
@@ -197,8 +270,14 @@ void TActorCoordinator::PrintInternalState() {
197
270
198
271
str << " \n Locations:\n " ;
199
272
for (auto & [key, actorId] : PartitionLocations) {
200
- str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << " , partId " << key.PartitionId << " , row dispatcher actor id: " << actorId << " \n " ;
273
+ str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << " , partId " << key.PartitionId << " , row dispatcher actor id: " << actorId << " \n " ;
274
+ }
275
+
276
+ str << " \n Pending partitions:\n " ;
277
+ for (const auto & [topicName, topicInfo] : TopicsInfo) {
278
+ str << " " << topicName << " , pending partitions: " << topicInfo.PendingPartitions .size () << " \n " ;
201
279
}
280
+
202
281
LOG_ROW_DISPATCHER_DEBUG (str.Str ());
203
282
}
204
283
@@ -237,31 +316,57 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPt
237
316
Metrics.LeaderChangedCount ->Inc ();
238
317
}
239
318
240
- NActors::TActorId TActorCoordinator::GetAndUpdateLocation (const TPartitionKey& key) {
319
+ TActorCoordinator::TTopicInfo& TActorCoordinator::GetOrCreateTopicInfo (const TString& topicName) {
320
+ const auto it = TopicsInfo.find (topicName);
321
+ if (it != TopicsInfo.end ()) {
322
+ return it->second ;
323
+ }
324
+ return TopicsInfo.insert ({topicName, TTopicInfo (Metrics, topicName)}).first ->second ;
325
+ }
326
+
327
+ std::optional<TActorId> TActorCoordinator::GetAndUpdateLocation (const TPartitionKey& key) {
241
328
Y_ENSURE (!PartitionLocations.contains (key));
242
- auto rand = LocationRandomCounter++ % RowDispatchers.size ();
243
329
244
- auto it = std::begin (RowDispatchers);
245
- std::advance (it, rand);
330
+ auto & topicInfo = GetOrCreateTopicInfo (key.TopicName );
246
331
247
- for (size_t i = 0 ; i < RowDispatchers.size (); ++i) {
248
- auto & info = it->second ;
332
+ TActorId bestLocation;
333
+ ui64 bestNumberPartitions = std::numeric_limits<ui64>::max ();
334
+ for (auto & [location, info] : RowDispatchers) {
249
335
if (!info.Connected ) {
250
- it++;
251
- if (it == std::end (RowDispatchers)) {
252
- it = std::begin (RowDispatchers);
253
- }
254
336
continue ;
255
337
}
256
- PartitionLocations[key] = it->first ;
257
- it->second .Locations .insert (key);
258
- return it->first ;
338
+
339
+ ui64 numberPartitions = 0 ;
340
+ if (const auto it = topicInfo.NodesInfo .find (location.NodeId ()); it != topicInfo.NodesInfo .end ()) {
341
+ numberPartitions = it->second .NumberPartitions ;
342
+ }
343
+
344
+ if (!bestLocation || numberPartitions < bestNumberPartitions) {
345
+ bestLocation = location;
346
+ bestNumberPartitions = numberPartitions;
347
+ }
348
+ }
349
+ Y_ENSURE (bestLocation, " Local row dispatcher should always be connected" );
350
+
351
+ if (Config.GetTopicPartitionsLimitPerNode () > 0 && bestNumberPartitions >= Config.GetTopicPartitionsLimitPerNode ()) {
352
+ topicInfo.AddPendingPartition (key);
353
+ return std::nullopt;
259
354
}
260
- Y_ENSURE (false , " Local row dispatcher should always be connected" );
355
+
356
+ auto rowDispatcherIt = RowDispatchers.find (bestLocation);
357
+ Y_ENSURE (rowDispatcherIt != RowDispatchers.end (), " Invalid best location" );
358
+
359
+ PartitionLocations[key] = bestLocation;
360
+ rowDispatcherIt->second .Locations .insert (key);
361
+ topicInfo.IncNodeUsage (bestLocation.NodeId ());
362
+ topicInfo.RemovePendingPartition (key);
363
+
364
+ return bestLocation;
261
365
}
262
366
263
367
void TActorCoordinator::Handle (NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPtr& ev) {
264
- const auto source = ev->Get ()->Record .GetSource ();
368
+ const auto & source = ev->Get ()->Record .GetSource ();
369
+
265
370
UpdateInterconnectSessions (ev->InterconnectSession );
266
371
267
372
TStringStream str;
@@ -271,22 +376,45 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
271
376
}
272
377
LOG_ROW_DISPATCHER_DEBUG (str.Str ());
273
378
Metrics.IncomingRequests ->Inc ();
379
+
380
+ TCoordinatorRequest request = {.Cookie = ev->Cookie , .Record = ev->Get ()->Record };
381
+ if (ComputeCoordinatorRequest (ev->Sender , request)) {
382
+ PendingReadActors.erase (ev->Sender );
383
+ } else {
384
+ // All nodes are overloaded, add request into pending queue
385
+ // We save only last request from each read actor
386
+ PendingReadActors[ev->Sender ] = request;
387
+ }
388
+ }
389
+
390
+ bool TActorCoordinator::ComputeCoordinatorRequest (TActorId readActorId, const TCoordinatorRequest& request) {
391
+ const auto & source = request.Record .GetSource ();
392
+
274
393
Y_ENSURE (!RowDispatchers.empty ());
275
394
395
+ bool hasPendingPartitions = false ;
276
396
TMap<NActors::TActorId, TSet<ui64>> tmpResult;
277
-
278
- for (auto & partitionId : ev->Get ()->Record .GetPartitionId ()) {
397
+ for (auto & partitionId : request.Record .GetPartitionId ()) {
279
398
TPartitionKey key{source.GetEndpoint (), source.GetDatabase (), source.GetTopicPath (), partitionId};
280
399
auto locationIt = PartitionLocations.find (key);
281
400
NActors::TActorId rowDispatcherId;
282
401
if (locationIt != PartitionLocations.end ()) {
283
402
rowDispatcherId = locationIt->second ;
284
403
} else {
285
- rowDispatcherId = GetAndUpdateLocation (key);
404
+ if (const auto maybeLocation = GetAndUpdateLocation (key)) {
405
+ rowDispatcherId = *maybeLocation;
406
+ } else {
407
+ hasPendingPartitions = true ;
408
+ continue ;
409
+ }
286
410
}
287
411
tmpResult[rowDispatcherId].insert (partitionId);
288
412
}
289
413
414
+ if (hasPendingPartitions) {
415
+ return false ;
416
+ }
417
+
290
418
auto response = std::make_unique<TEvRowDispatcher::TEvCoordinatorResult>();
291
419
for (const auto & [actorId, partitions] : tmpResult) {
292
420
auto * partitionsProto = response->Record .AddPartitions ();
@@ -295,12 +423,23 @@ void TActorCoordinator::Handle(NFq::TEvRowDispatcher::TEvCoordinatorRequest::TPt
295
423
partitionsProto->AddPartitionId (partitionId);
296
424
}
297
425
}
298
-
299
- LOG_ROW_DISPATCHER_DEBUG (" Send TEvCoordinatorResult to " << ev-> Sender );
300
- Send (ev-> Sender , response.release (), IEventHandle::FlagTrackDelivery, ev-> Cookie );
426
+
427
+ LOG_ROW_DISPATCHER_DEBUG (" Send TEvCoordinatorResult to " << readActorId );
428
+ Send (readActorId , response.release (), IEventHandle::FlagTrackDelivery, request. Cookie );
301
429
PrintInternalState ();
430
+
431
+ return true ;
302
432
}
303
433
434
+ void TActorCoordinator::UpdatePendingReadActors () {
435
+ for (auto readActorIt = PendingReadActors.begin (); readActorIt != PendingReadActors.end ();) {
436
+ if (ComputeCoordinatorRequest (readActorIt->first , readActorIt->second )) {
437
+ readActorIt = PendingReadActors.erase (readActorIt);
438
+ } else {
439
+ ++readActorIt;
440
+ }
441
+ }
442
+ }
304
443
305
444
} // namespace
306
445
0 commit comments