@@ -69,7 +69,7 @@ struct TAggregationStatistics {
69
69
size_t NextTablet{ 0 };
70
70
ui32 InFlight{ 0 };
71
71
std::vector<ui64> Ids;
72
- std::unordered_set <ui64> FinishedTablets ;
72
+ std::unordered_map <ui64, TActorId> TabletsPipes ;
73
73
};
74
74
75
75
struct ColumnStatistics {
@@ -195,8 +195,6 @@ class TStatService : public TActorBootstrapped<TStatService> {
195
195
hFunc (TEvStatistics::TEvAggregateKeepAlive, Handle);
196
196
hFunc (TEvPrivate::TEvDispatchKeepAlive, Handle);
197
197
hFunc (TEvPrivate::TEvKeepAliveTimeout, Handle);
198
- hFunc (TEvPipeCache::TEvGetTabletNodeResult, Handle);
199
- hFunc (TEvPipeCache::TEvDeliveryProblem, Handle);
200
198
hFunc (TEvStatistics::TEvStatisticsResponse, Handle);
201
199
hFunc (TEvStatistics::TEvAggregateStatisticsResponse, Handle);
202
200
@@ -238,29 +236,9 @@ class TStatService : public TActorBootstrapped<TStatService> {
238
236
return false ;
239
237
}
240
238
241
- //
242
- // returns true if the tablet processing has not been completed yet
243
- //
244
- bool OnTabletFinished (ui64 tabletId) {
245
- auto & localTablets = AggregationStatistics.LocalTablets ;
246
- auto isFinished = !localTablets.FinishedTablets .emplace (tabletId).second ;
247
- if (isFinished) {
248
- LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
249
- " OnTabletFinished: table " << tabletId << " has already been processed" );
250
- return false ;
251
- }
252
-
253
- --localTablets.InFlight ;
254
- return true ;
255
- }
256
-
257
239
void OnAggregateStatisticsFinished () {
258
240
SendAggregateStatisticsResponse ();
259
-
260
- Send (MakePipePerNodeCacheID (false ), new TEvPipeCache::TEvUnlink (0 ));
261
-
262
- TAggregationStatistics aggregationStatistics (Settings.FanOutFactor );
263
- std::swap (AggregationStatistics, aggregationStatistics);
241
+ ResetAggregationStatistics ();
264
242
}
265
243
266
244
void SendRequestToNextTablet () {
@@ -272,80 +250,22 @@ class TStatService : public TActorBootstrapped<TStatService> {
272
250
const auto tabletId = localTablets.Ids [localTablets.NextTablet ];
273
251
++localTablets.NextTablet ;
274
252
++localTablets.InFlight ;
275
- Send (MakePipePerNodeCacheID (false ), new TEvPipeCache::TEvGetTabletNode (tabletId), 0 , AggregationStatistics.Round );
276
- }
277
-
278
- void Handle (TEvPipeCache::TEvGetTabletNodeResult::TPtr& ev) {
279
- const auto msg = ev->Get ();
280
- LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
281
- " Received TEvGetTabletNodeResult NodeId: " << msg->NodeId << " , TabletId: " << msg->TabletId );
282
-
283
- const auto round = ev->Cookie ;
284
- if (IsNotCurrentRound (round)) {
285
- return ;
286
- }
287
-
288
- const auto currentNodeId = ev->Recipient .NodeId ();
289
-
290
- // there is no need for retries, as the tablet must be local
291
- // no problems are expected in resolving
292
- if (currentNodeId != msg->NodeId ) {
293
- const auto tabletFinished = OnTabletFinished (msg->TabletId );
294
- Y_ABORT_UNLESS (tabletFinished);
295
-
296
- LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
297
- " Tablet is not local. Table node: " << msg->NodeId << " , current node: " << ev->Recipient .NodeId ());
298
-
299
- const auto error = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET;
300
- AggregationStatistics.FailedTablets .emplace_back (msg->TabletId , msg->NodeId , error);
301
-
302
- SendRequestToNextTablet ();
303
-
304
- if (AggregationStatistics.IsCompleted ()) {
305
- OnAggregateStatisticsFinished ();
306
- }
307
-
308
- return ;
309
- }
310
253
311
- auto request = std::make_unique<TEvStatistics::TEvStatisticsRequest>();
312
- auto & record = request->Record ;
313
- record.MutableTypes ()->Add (NKikimr::NStat::COUNT_MIN_SKETCH);
314
-
315
- auto * path = record.MutableTable ()->MutablePathId ();
316
- path->SetOwnerId (AggregationStatistics.PathId .OwnerId );
317
- path->SetLocalId (AggregationStatistics.PathId .LocalPathId );
318
-
319
- auto * columnTags = record.MutableTable ()->MutableColumnTags ();
320
- for (const auto & tag : AggregationStatistics.ColumnTags ) {
321
- columnTags->Add (tag);
322
- }
323
-
324
- Send (MakePipePerNodeCacheID (false ),
325
- new TEvPipeCache::TEvForward (request.release (), msg->TabletId , true , round),
326
- IEventHandle::FlagTrackDelivery, round);
254
+ auto policy = NTabletPipe::TClientRetryPolicy::WithRetries ();
255
+ policy.RetryLimitCount = 2 ;
256
+ NTabletPipe::TClientConfig pipeConfig{policy};
257
+ pipeConfig.ForceLocal = true ;
258
+ localTablets.TabletsPipes [tabletId] = Register (NTabletPipe::CreateClient (SelfId (), tabletId, pipeConfig));
327
259
}
328
260
329
- void Handle (TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
330
- LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
331
- " Received TEvDeliveryProblem TabletId: " << ev->Get ()->TabletId );
332
-
333
- const auto round = ev->Cookie ;
334
- const auto tabletId = ev->Get ()->TabletId ;
335
- if (IsNotCurrentRound (round)
336
- || !OnTabletFinished (tabletId)) {
337
- return ;
261
+ void ResetAggregationStatistics () {
262
+ const auto & tabletsPipes = AggregationStatistics.LocalTablets .TabletsPipes ;
263
+ for (auto it = tabletsPipes.begin (); it != tabletsPipes.end (); ++it) {
264
+ NTabletPipe::CloseClient (SelfId (), it->second );
338
265
}
339
266
340
- const auto nodeId = ev->Recipient .NodeId ();
341
- const auto error = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET;
342
- AggregationStatistics.FailedTablets .emplace_back (tabletId, nodeId, error);
343
-
344
- SendRequestToNextTablet ();
345
-
346
- if (AggregationStatistics.IsCompleted ()) {
347
- OnAggregateStatisticsFinished ();
348
- }
267
+ TAggregationStatistics aggregationStatistics (Settings.FanOutFactor );
268
+ std::swap (AggregationStatistics, aggregationStatistics);
349
269
}
350
270
351
271
void AggregateStatistics (const TAggregationStatistics::TColumnsStatistics& columnsStatistics) {
@@ -372,17 +292,25 @@ class TStatService : public TActorBootstrapped<TStatService> {
372
292
}
373
293
374
294
void Handle (TEvStatistics::TEvStatisticsResponse::TPtr& ev) {
295
+ const auto & record = ev->Get ()->Record ;
296
+ const auto tabletId = record.GetShardTabletId ();
297
+
375
298
LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
376
- " Received TEvStatisticsResponse TabletId: " << ev-> Get ()-> Record . GetShardTabletId () );
299
+ " Received TEvStatisticsResponse TabletId: " << tabletId );
377
300
378
301
const auto round = ev->Cookie ;
379
- const auto & record = ev->Get ()->Record ;
380
- if (IsNotCurrentRound (round)
381
- || !OnTabletFinished (record.GetShardTabletId ())) {
302
+ if (IsNotCurrentRound (round)) {
382
303
return ;
383
304
}
384
305
306
+ auto tabletPipe = AggregationStatistics.LocalTablets .TabletsPipes .find (tabletId);
307
+ if (tabletPipe != AggregationStatistics.LocalTablets .TabletsPipes .end ()) {
308
+ NTabletPipe::CloseClient (SelfId (), tabletPipe->second );
309
+ AggregationStatistics.LocalTablets .TabletsPipes .erase (tabletPipe);
310
+ }
311
+
385
312
AggregateStatistics (record.GetColumns ());
313
+ --AggregationStatistics.LocalTablets .InFlight ;
386
314
387
315
SendRequestToNextTablet ();
388
316
@@ -394,6 +322,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
394
322
void Handle (TEvStatistics::TEvAggregateKeepAliveAck::TPtr& ev) {
395
323
const auto & record = ev->Get ()->Record ;
396
324
const auto round = record.GetRound ();
325
+
397
326
if (IsNotCurrentRound (round)) {
398
327
LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
399
328
" Skip TEvAggregateKeepAliveAck" );
@@ -425,10 +354,8 @@ class TStatService : public TActorBootstrapped<TStatService> {
425
354
LOG_INFO_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
426
355
" Parent node " << AggregationStatistics.ParentNode .NodeId () << " is unavailable" );
427
356
428
- TAggregationStatistics aggregationStatistics (Settings.FanOutFactor );
429
- std::swap (AggregationStatistics, aggregationStatistics);
430
357
431
- Send ( MakePipePerNodeCacheID ( false ), new TEvPipeCache::TEvUnlink ( 0 ) );
358
+ ResetAggregationStatistics ( );
432
359
}
433
360
434
361
void Handle (TEvPrivate::TEvDispatchKeepAlive::TPtr& ev) {
@@ -658,8 +585,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
658
585
659
586
// reset previous state
660
587
if (AggregationStatistics.Round != 0 ) {
661
- TAggregationStatistics aggregationStatistics (Settings.FanOutFactor );
662
- std::swap (AggregationStatistics, aggregationStatistics);
588
+ ResetAggregationStatistics ();
663
589
}
664
590
665
591
AggregationStatistics.Round = round;
@@ -684,8 +610,6 @@ class TStatService : public TActorBootstrapped<TStatService> {
684
610
// forming the right and left child nodes
685
611
size_t k = 0 ;
686
612
for (const auto & node : nodes) {
687
- ++k;
688
-
689
613
if (node.GetNodeId () == currentNodeId) {
690
614
AggregationStatistics.LocalTablets .Ids .reserve (node.GetTabletIds ().size ());
691
615
@@ -703,6 +627,7 @@ class TStatService : public TActorBootstrapped<TStatService> {
703
627
}
704
628
705
629
AggregationStatistics.Nodes [k % Settings.FanOutFactor ].Tablets .push_back (std::move (nodeTablets));
630
+ ++k;
706
631
}
707
632
708
633
for (auto & node : AggregationStatistics.Nodes ) {
@@ -969,31 +894,104 @@ class TStatService : public TActorBootstrapped<TStatService> {
969
894
}
970
895
}
971
896
897
+ void SendStatisticsRequest (const TActorId& clientId) {
898
+ auto request = std::make_unique<TEvStatistics::TEvStatisticsRequest>();
899
+ auto & record = request->Record ;
900
+ record.MutableTypes ()->Add (NKikimr::NStat::COUNT_MIN_SKETCH);
901
+
902
+ auto * path = record.MutableTable ()->MutablePathId ();
903
+ path->SetOwnerId (AggregationStatistics.PathId .OwnerId );
904
+ path->SetLocalId (AggregationStatistics.PathId .LocalPathId );
905
+
906
+ auto * columnTags = record.MutableTable ()->MutableColumnTags ();
907
+ for (const auto & tag : AggregationStatistics.ColumnTags ) {
908
+ columnTags->Add (tag);
909
+ }
910
+
911
+ NTabletPipe::SendData (SelfId (), clientId, request.release (), AggregationStatistics.Round );
912
+ }
913
+
914
+ void OnTabletError (ui64 tabletId) {
915
+ LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
916
+ " Tablet " << tabletId << " is not local." );
917
+
918
+ constexpr auto error = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET;
919
+ AggregationStatistics.FailedTablets .emplace_back (tabletId, 0 , error);
920
+
921
+ AggregationStatistics.LocalTablets .TabletsPipes .erase (tabletId);
922
+ --AggregationStatistics.LocalTablets .InFlight ;
923
+ SendRequestToNextTablet ();
924
+
925
+ if (AggregationStatistics.IsCompleted ()) {
926
+ OnAggregateStatisticsFinished ();
927
+ }
928
+ }
929
+
972
930
void Handle (TEvTabletPipe::TEvClientConnected::TPtr& ev) {
931
+ const auto & clientId = ev->Get ()->ClientId ;
932
+ const auto & tabletId = ev->Get ()->TabletId ;
933
+
973
934
LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
974
935
" EvClientConnected"
975
936
<< " , node id = " << ev->Get ()->ClientId .NodeId ()
976
- << " , client id = " << ev-> Get ()-> ClientId
937
+ << " , client id = " << clientId
977
938
<< " , server id = " << ev->Get ()->ServerId
939
+ << " , tablet id = " << tabletId
978
940
<< " , status = " << ev->Get ()->Status );
979
941
980
- if (ev->Get ()->Status != NKikimrProto::OK) {
981
- SAPipeClientId = TActorId ();
982
- ConnectToSA ();
983
- SyncNode ();
942
+ if (clientId == SAPipeClientId) {
943
+ if (ev->Get ()->Status != NKikimrProto::OK) {
944
+ SAPipeClientId = TActorId ();
945
+ ConnectToSA ();
946
+ SyncNode ();
947
+ }
948
+ return ;
949
+ }
950
+
951
+ const auto & tabletsPipes = AggregationStatistics.LocalTablets .TabletsPipes ;
952
+ auto tabletPipe = tabletsPipes.find (tabletId);
953
+
954
+ if (tabletPipe != tabletsPipes.end () && clientId == tabletPipe->second ) {
955
+ if (ev->Get ()->Status == NKikimrProto::OK) {
956
+ SendStatisticsRequest (clientId);
957
+ } else {
958
+ OnTabletError (tabletId);
959
+ }
960
+ return ;
984
961
}
962
+
963
+ LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
964
+ " Skip EvClientConnected" );
985
965
}
986
966
987
967
void Handle (TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
968
+ const auto & clientId = ev->Get ()->ClientId ;
969
+ const auto & tabletId = ev->Get ()->TabletId ;
970
+
988
971
LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
989
972
" EvClientDestroyed"
990
973
<< " , node id = " << ev->Get ()->ClientId .NodeId ()
991
- << " , client id = " << ev->Get ()->ClientId
992
- << " , server id = " << ev->Get ()->ServerId );
974
+ << " , client id = " << clientId
975
+ << " , server id = " << ev->Get ()->ServerId
976
+ << " , tablet id = " << tabletId);
993
977
994
- SAPipeClientId = TActorId ();
995
- ConnectToSA ();
996
- SyncNode ();
978
+ if (clientId == SAPipeClientId) {
979
+ SAPipeClientId = TActorId ();
980
+ ConnectToSA ();
981
+ SyncNode ();
982
+ return ;
983
+ }
984
+
985
+ const auto & tabletsPipes = AggregationStatistics.LocalTablets .TabletsPipes ;
986
+ auto tabletPipe = tabletsPipes.find (tabletId);
987
+
988
+ if (tabletPipe != tabletsPipes.end () && clientId == tabletPipe->second ) {
989
+ OnTabletError (tabletId);
990
+ return ;
991
+ }
992
+
993
+ LOG_DEBUG_S (TlsActivationContext->AsActorContext (), NKikimrServices::STATISTICS,
994
+ " Skip EvClientDestroyed" );
997
995
}
998
996
999
997
void Handle (TEvStatistics::TEvStatisticsIsDisabled::TPtr&) {
0 commit comments