@@ -93,20 +93,27 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
93
93
struct TClientsInfo : public IClientDataConsumer {
94
94
using TPtr = TIntrusivePtr<TClientsInfo>;
95
95
96
- TClientsInfo (TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, NMonitoring::TDynamicCounterPtr& counters)
96
+ TClientsInfo (TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup )
97
97
: Self(self)
98
98
, LogPrefix(logPrefix)
99
99
, HandlerSettings(handlerSettings)
100
100
, Settings(ev->Get ()->Record)
101
101
, ReadActorId(ev->Sender)
102
- , FilteredDataRate(counters->GetCounter (" FilteredDataRate" , true ))
103
- , RestartSessionByOffsetsByQuery(counters->GetCounter (" RestartSessionByOffsetsByQuery" , true ))
102
+ , Counters(counters)
104
103
{
105
104
if (Settings.HasOffset ()) {
106
105
NextMessageOffset = Settings.GetOffset ();
107
106
InitialOffset = Settings.GetOffset ();
108
107
}
109
108
Y_UNUSED (TDuration::TryParse (Settings.GetSource ().GetReconnectPeriod (), ReconnectPeriod));
109
+ auto queryGroup = Counters->GetSubgroup (" query_id" , ev->Get ()->Record .GetQueryId ());
110
+ auto topicGroup = queryGroup->GetSubgroup (" read_group" , CleanupCounterValueString (readGroup));
111
+ FilteredDataRate = topicGroup->GetCounter (" FilteredDataRate" , true );
112
+ RestartSessionByOffsetsByQuery = counters->GetCounter (" RestartSessionByOffsetsByQuery" , true );
113
+ }
114
+
115
+ ~TClientsInfo () {
116
+ Counters->RemoveSubgroup (" query_id" , Settings.GetQueryId ());
110
117
}
111
118
112
119
TActorId GetClientId () const override {
@@ -188,6 +195,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
188
195
// Metrics
189
196
ui64 InitialOffset = 0 ;
190
197
TStats Stat; // Send (filtered) to read_actor
198
+ const ::NMonitoring::TDynamicCounterPtr Counters;
191
199
NMonitoring::TDynamicCounters::TCounterPtr FilteredDataRate; // filtered
192
200
NMonitoring::TDynamicCounters::TCounterPtr RestartSessionByOffsetsByQuery;
193
201
};
@@ -707,10 +715,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
707
715
const TString& format = source.GetFormat ();
708
716
ITopicFormatHandler::TSettings handlerSettings = {.ParsingFormat = format ? format : " raw" };
709
717
710
- auto queryGroup = Counters->GetSubgroup (" query_id" , ev->Get ()->Record .GetQueryId ());
711
- auto readGroup = queryGroup->GetSubgroup (" read_group" , CleanupCounterValueString (ReadGroup));
712
- auto clientInfo = Clients.insert ({ev->Sender , MakeIntrusive<TClientsInfo>(*this , LogPrefix, handlerSettings, ev, readGroup)}).first ->second ;
713
-
718
+ auto clientInfo = Clients.insert ({ev->Sender , MakeIntrusive<TClientsInfo>(*this , LogPrefix, handlerSettings, ev, Counters, ReadGroup)}).first ->second ;
714
719
auto formatIt = FormatHandlers.find (handlerSettings);
715
720
if (formatIt == FormatHandlers.end ()) {
716
721
formatIt = FormatHandlers.insert ({handlerSettings, CreateTopicFormatHandler (ActorContext (), FormatHandlerConfig, handlerSettings, Metrics.PartitionGroup )}).first ;
0 commit comments