@@ -171,6 +171,8 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
171
171
TActorId TopicSessionId;
172
172
const TString QueryId;
173
173
ConsumerCounters Counters;
174
+ bool PendingGetNextBatch = false ;
175
+ bool PendingNewDataArrived = false ;
174
176
TopicSessionClientStatistic Stat;
175
177
};
176
178
@@ -395,7 +397,8 @@ TString TRowDispatcher::GetInternalState() {
395
397
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
396
398
<< consumer->Stat .UnreadRows << " unread bytes " << consumer->Stat .UnreadBytes << " offset " << consumer->Stat .Offset
397
399
<< " get " << consumer->Counters .GetNextBatch
398
- << " arrived " << consumer->Counters .NewDataArrived << " batch " << consumer->Counters .MessageBatch << " " ;
400
+ << " arr " << consumer->Counters .NewDataArrived << " btc " << consumer->Counters .MessageBatch
401
+ << " pend get " << consumer->PendingGetNextBatch << " pend new " << consumer->PendingNewDataArrived << " " ;
399
402
str << " retry queue: " ;
400
403
consumer->EventsQueue .PrintInternalState (str);
401
404
}
@@ -486,6 +489,8 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
486
489
LOG_ROW_DISPATCHER_ERROR (" TEvGetNextBatch: wrong seq num from " << ev->Sender .ToString () << " , seqNo " << seqNo << " , ignore message" );
487
490
return ;
488
491
}
492
+ it->second ->PendingNewDataArrived = false ;
493
+ it->second ->PendingGetNextBatch = true ;
489
494
it->second ->Counters .GetNextBatch ++;
490
495
Forward (ev, it->second ->TopicSessionId );
491
496
}
@@ -498,7 +503,6 @@ void TRowDispatcher::Handle(NActors::TEvents::TEvPing::TPtr& ev) {
498
503
void TRowDispatcher::Handle (NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
499
504
LOG_ROW_DISPATCHER_DEBUG (" TEvStopSession, topicPath " << ev->Get ()->Record .GetSource ().GetTopicPath () <<
500
505
" partitionId " << ev->Get ()->Record .GetPartitionId ());
501
-
502
506
ConsumerSessionKey key{ev->Sender , ev->Get ()->Record .GetPartitionId ()};
503
507
auto it = Consumers.find (key);
504
508
if (it == Consumers.end ()) {
@@ -593,6 +597,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev)
593
597
return ;
594
598
}
595
599
LOG_ROW_DISPATCHER_TRACE (" Forward TEvNewDataArrived to " << ev->Get ()->ReadActorId );
600
+ it->second ->PendingNewDataArrived = true ;
596
601
it->second ->Counters .NewDataArrived ++;
597
602
it->second ->EventsQueue .Send (ev->Release ().Release ());
598
603
}
@@ -607,6 +612,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
607
612
}
608
613
Metrics.RowsSent ->Add (ev->Get ()->Record .MessagesSize ());
609
614
LOG_ROW_DISPATCHER_TRACE (" Forward TEvMessageBatch to " << ev->Get ()->ReadActorId );
615
+ it->second ->PendingGetNextBatch = false ;
610
616
it->second ->Counters .MessageBatch ++;
611
617
it->second ->EventsQueue .Send (ev->Release ().Release ());
612
618
}
0 commit comments