@@ -123,8 +123,8 @@ struct TAggQueryStat {
123
123
NYql::TCounters::TEntry ReadLagMessages;
124
124
bool IsWaiting = false ;
125
125
126
- void Add (const TTopicSessionClientStatistic& stat) {
127
- FilteredBytes.Add (NYql::TCounters::TEntry (stat. FilteredBytes ));
126
+ void Add (const TTopicSessionClientStatistic& stat, ui64 filteredBytes ) {
127
+ FilteredBytes.Add (NYql::TCounters::TEntry (filteredBytes ));
128
128
QueuedBytes.Add (NYql::TCounters::TEntry (stat.QueuedBytes ));
129
129
QueuedRows.Add (NYql::TCounters::TEntry (stat.QueuedRows ));
130
130
ReadLagMessages.Add (NYql::TCounters::TEntry (stat.ReadLagMessages ));
@@ -266,7 +266,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
266
266
};
267
267
268
268
struct TAggregatedStats {
269
- NYql::TCounters::TEntry AllSessionsReadBytes;
270
269
THashMap<TQueryStatKey, TMaybe<TAggQueryStat>, TQueryStatKeyHash> LastQueryStats;
271
270
TDuration LastUpdateMetricsPeriod;
272
271
};
@@ -290,6 +289,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
290
289
NYql::IPqGateway::TPtr PqGateway;
291
290
NActors::TMon* Monitoring;
292
291
TNodesTracker NodesTracker;
292
+ NYql::TCounters::TEntry AllSessionsDateRate;
293
293
TAggregatedStats AggrStats;
294
294
ui64 LastCpuTime = 0 ;
295
295
@@ -304,6 +304,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
304
304
bool PendingNewDataArrived = false ;
305
305
TActorId TopicSessionId;
306
306
TTopicSessionClientStatistic Stat;
307
+ ui64 FilteredBytes = 0 ;
307
308
bool StatisticsUpdated = false ;
308
309
};
309
310
@@ -333,7 +334,6 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
333
334
THashMap<ui32, TConsumerPartition> Partitions;
334
335
const TString QueryId;
335
336
TConsumerCounters Counters;
336
- TTopicSessionClientStatistic Stat;
337
337
ui64 CpuMicrosec = 0 ; // Increment.
338
338
ui64 Generation;
339
339
};
@@ -583,25 +583,30 @@ void TRowDispatcher::UpdateMetrics() {
583
583
return ;
584
584
}
585
585
586
- AggrStats. AllSessionsReadBytes = NYql::TCounters::TEntry ();
586
+ AllSessionsDateRate = NYql::TCounters::TEntry ();
587
587
for (auto & [queryId, stat] : AggrStats.LastQueryStats ) {
588
588
stat = Nothing ();
589
589
}
590
590
591
591
for (auto & [key, sessionsInfo] : TopicSessions) {
592
592
for (auto & [actorId, sessionInfo] : sessionsInfo.Sessions ) {
593
593
auto read = NYql::TCounters::TEntry (sessionInfo.Stat .ReadBytes );
594
- AggrStats. AllSessionsReadBytes .Add (read);
594
+ AllSessionsDateRate .Add (read);
595
595
sessionInfo.AggrReadBytes = read;
596
596
sessionInfo.Stat .Clear ();
597
597
598
598
for (auto & [readActorId, consumer] : sessionInfo.Consumers ) {
599
+ const auto partionIt = consumer->Partitions .find (key.PartitionId );
600
+ if (partionIt == consumer->Partitions .end ()) {
601
+ continue ;
602
+ }
603
+ auto & partition = partionIt->second ;
599
604
auto & stat = AggrStats.LastQueryStats [TQueryStatKey{consumer->QueryId , key.ReadGroup }];
600
605
if (!stat) {
601
606
stat = TAggQueryStat ();
602
607
}
603
- stat->Add (consumer-> Stat );
604
- consumer-> Stat . Clear () ;
608
+ stat->Add (partition. Stat , partition. FilteredBytes );
609
+ partition. FilteredBytes = 0 ;
605
610
}
606
611
}
607
612
}
@@ -658,7 +663,7 @@ TString TRowDispatcher::GetInternalState() {
658
663
str << " Max session buffer size: " << toHuman (MaxSessionBufferSizeBytes) << " \n " ;
659
664
str << " CpuMicrosec: " << toHuman (LastCpuTime) << " \n " ;
660
665
str << " DataRate (all sessions): " ;
661
- printDataRate (AggrStats. AllSessionsReadBytes );
666
+ printDataRate (AllSessionsDateRate );
662
667
str << " \n " ;
663
668
664
669
THashMap<TQueryStatKey, TAggQueryStat, TQueryStatKeyHash> queryState;
@@ -669,9 +674,14 @@ TString TRowDispatcher::GetInternalState() {
669
674
for (auto & [actorId, sessionInfo] : sessionsInfo.Sessions ) {
670
675
queuedBytesSum += sessionInfo.Stat .QueuedBytes ;
671
676
for (auto & [readActorId, consumer] : sessionInfo.Consumers ) {
677
+ const auto partionIt = consumer->Partitions .find (sessionKey.PartitionId );
678
+ if (partionIt == consumer->Partitions .end ()) {
679
+ continue ;
680
+ }
681
+ const auto & partitionStat = partionIt->second .Stat ;
672
682
auto key = TQueryStatKey{consumer->QueryId , sessionKey.ReadGroup };
673
683
++sessionCountByQuery[key];
674
- queryState[key].Add (consumer-> Stat );
684
+ queryState[key].Add (partitionStat, 0 );
675
685
}
676
686
}
677
687
}
@@ -714,16 +724,17 @@ TString TRowDispatcher::GetInternalState() {
714
724
continue ;
715
725
}
716
726
const auto & partition = consumer->Partitions [key.PartitionId ];
717
- str << " " << consumer->QueryId << " " << LeftPad (readActorId, 32 ) << " unread bytes "
718
- << toHuman (consumer->Stat .QueuedBytes ) << " (" << leftPad (consumer->Stat .QueuedRows ) << " rows) "
719
- << " offset " << leftPad (consumer->Stat .Offset ) << " init offset " << leftPad (consumer->Stat .InitialOffset )
727
+ const auto & stat = partition.Stat ;
728
+ str << " " << consumer->QueryId << " " << LeftPad (readActorId, 33 ) << " unread bytes "
729
+ << toHuman (stat.QueuedBytes ) << " (" << leftPad (stat.QueuedRows ) << " rows) "
730
+ << " offset " << leftPad (stat.Offset ) << " init offset " << leftPad (stat.InitialOffset )
720
731
<< " get " << leftPad (consumer->Counters .GetNextBatch )
721
732
<< " arr " << leftPad (consumer->Counters .NewDataArrived ) << " btc " << leftPad (consumer->Counters .MessageBatch )
722
- << " pend get " << leftPad (partition.PendingGetNextBatch ) << " pend new " << leftPad (partition.PendingNewDataArrived )
723
- << " waiting " << consumer-> Stat .IsWaiting << " read lag " << leftPad (consumer-> Stat .ReadLagMessages )
733
+ << " pend get " << leftPad (partition.PendingGetNextBatch ) << " pend new " << leftPad (partition.PendingNewDataArrived )
734
+ << " waiting " << stat .IsWaiting << " read lag " << leftPad (stat .ReadLagMessages )
724
735
<< " conn id " << consumer->Generation << " \n " ;
725
- maxInitialOffset = std::max (maxInitialOffset, consumer-> Stat .InitialOffset );
726
- minInitialOffset = std::min (minInitialOffset, consumer-> Stat .InitialOffset );
736
+ maxInitialOffset = std::max (maxInitialOffset, stat .InitialOffset );
737
+ minInitialOffset = std::min (minInitialOffset, stat .InitialOffset );
727
738
}
728
739
str << " initial offset max " << leftPad (maxInitialOffset) << " min " << leftPad (minInitialOffset) << " \n " ;;
729
740
}
@@ -943,10 +954,8 @@ void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) {
943
954
partitionId};
944
955
TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
945
956
TSessionInfo& sessionInfo = topicSessionInfo.Sessions [partition.TopicSessionId ];
946
- if (!sessionInfo.Consumers .contains (consumer->ReadActorId )) {
957
+ if (!sessionInfo.Consumers .erase (consumer->ReadActorId )) {
947
958
LOG_ROW_DISPATCHER_ERROR (" Wrong readActorId " << consumer->ReadActorId << " , no such consumer" );
948
- } else {
949
- sessionInfo.Consumers .erase (consumer->ReadActorId );
950
959
}
951
960
if (sessionInfo.Consumers .empty ()) {
952
961
LOG_ROW_DISPATCHER_DEBUG (" Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId );
@@ -991,9 +1000,9 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev)
991
1000
LOG_ROW_DISPATCHER_WARN (" Ignore (no consumer) TEvNewDataArrived from " << ev->Sender << " part id " << ev->Get ()->Record .GetPartitionId ());
992
1001
return ;
993
1002
}
994
- LWPROBE (NewDataArrived, ev->Sender .ToString (), ev->Get ()->ReadActorId .ToString (), it->second ->QueryId , it->second ->Generation , ev->Get ()->Record .ByteSizeLong ());
995
- LOG_ROW_DISPATCHER_TRACE (" Forward TEvNewDataArrived from " << ev->Sender << " to " << ev->Get ()->ReadActorId << " query id " << it->second ->QueryId );
996
1003
auto consumerInfoPtr = it->second ;
1004
+ LWPROBE (NewDataArrived, ev->Sender .ToString (), ev->Get ()->ReadActorId .ToString (), consumerInfoPtr->QueryId , consumerInfoPtr->Generation , ev->Get ()->Record .ByteSizeLong ());
1005
+ LOG_ROW_DISPATCHER_TRACE (" Forward TEvNewDataArrived from " << ev->Sender << " to " << ev->Get ()->ReadActorId << " query id " << consumerInfoPtr->QueryId );
997
1006
auto partitionIt = consumerInfoPtr->Partitions .find (ev->Get ()->Record .GetPartitionId ());
998
1007
if (partitionIt == consumerInfoPtr->Partitions .end ()) {
999
1008
// Ignore TEvNewDataArrived because read actor now read others partitions.
@@ -1010,10 +1019,10 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
1010
1019
LOG_ROW_DISPATCHER_WARN (" Ignore (no consumer) TEvMessageBatch from " << ev->Sender << " to " << ev->Get ()->ReadActorId );
1011
1020
return ;
1012
1021
}
1013
- LWPROBE (MessageBatch, ev->Sender .ToString (), ev->Get ()->ReadActorId .ToString (), it->second ->QueryId , it->second ->Generation , ev->Get ()->Record .ByteSizeLong ());
1014
- LOG_ROW_DISPATCHER_TRACE (" Forward TEvMessageBatch from " << ev->Sender << " to " << ev->Get ()->ReadActorId << " query id " << it->second ->QueryId );
1015
- Metrics.RowsSent ->Add (ev->Get ()->Record .MessagesSize ());
1016
1022
auto consumerInfoPtr = it->second ;
1023
+ LWPROBE (MessageBatch, ev->Sender .ToString (), ev->Get ()->ReadActorId .ToString (), consumerInfoPtr->QueryId , consumerInfoPtr->Generation , ev->Get ()->Record .ByteSizeLong ());
1024
+ LOG_ROW_DISPATCHER_TRACE (" Forward TEvMessageBatch from " << ev->Sender << " to " << ev->Get ()->ReadActorId << " query id " << consumerInfoPtr->QueryId );
1025
+ Metrics.RowsSent ->Add (ev->Get ()->Record .MessagesSize ());
1017
1026
auto partitionIt = consumerInfoPtr->Partitions .find (ev->Get ()->Record .GetPartitionId ());
1018
1027
if (partitionIt == consumerInfoPtr->Partitions .end ()) {
1019
1028
// Ignore TEvMessageBatch because read actor now read others partitions.
@@ -1152,12 +1161,12 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
1152
1161
continue ;
1153
1162
}
1154
1163
auto consumerInfoPtr = it->second ;
1155
- consumerInfoPtr->Stat .Add (clientStat);
1156
1164
auto partitionIt = consumerInfoPtr->Partitions .find (key.PartitionId );
1157
1165
if (partitionIt == consumerInfoPtr->Partitions .end ()) {
1158
1166
continue ;
1159
1167
}
1160
1168
partitionIt->second .Stat .Add (clientStat);
1169
+ partitionIt->second .FilteredBytes += clientStat.FilteredBytes ;
1161
1170
partitionIt->second .StatisticsUpdated = true ;
1162
1171
}
1163
1172
}
0 commit comments