Skip to content

Commit 0295ff2

Browse files
authored
YQ-3844 Shared reading: add restart session sensor / to stable (#11529)
1 parent 0bf2b38 commit 0295ff2

File tree

3 files changed

+8
-1
lines changed

3 files changed

+8
-1
lines changed

ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ struct TopicSessionClientStatistic {
1515

1616
struct TopicSessionCommonStatistic {
1717
ui64 UnreadBytes = 0;
18+
ui64 RestartSessionByOffsets = 0;
1819
};
1920

2021
struct TopicSessionParams {

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ TString TRowDispatcher::GetInternalState() {
484484
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
485485
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
486486
str << " / " << actorId << "\n";
487-
str << " unread bytes " << sessionInfo.Stat.UnreadBytes << "\n";
487+
str << " unread bytes " << sessionInfo.Stat.UnreadBytes << " restarts by offsets " << sessionInfo.Stat.RestartSessionByOffsets << "\n";
488488
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
489489
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
490490
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ struct TTopicSessionMetrics {
3434
RowsRead = SubGroup->GetCounter("RowsRead", true);
3535
InFlySubscribe = SubGroup->GetCounter("InFlySubscribe");
3636
ReconnectRate = SubGroup->GetCounter("ReconnectRate", true);
37+
RestartSessionByOffsets = counters->GetCounter("RestartSessionByOffsets", true);
3738
}
3839

3940
~TTopicSessionMetrics() {
@@ -45,6 +46,7 @@ struct TTopicSessionMetrics {
4546
::NMonitoring::TDynamicCounters::TCounterPtr RowsRead;
4647
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
4748
::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate;
49+
::NMonitoring::TDynamicCounters::TCounterPtr RestartSessionByOffsets;
4850
};
4951

5052
struct TEvPrivate {
@@ -171,6 +173,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
171173
THashMap<TString, ui64> FieldsIndexes;
172174
NYql::IPqGateway::TPtr PqGateway;
173175
TMaybe<TString> ConsumerName;
176+
ui64 RestartSessionByOffsets = 0;
174177

175178
public:
176179
explicit TTopicSession(
@@ -750,6 +753,8 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
750753
if (ReadSession) {
751754
if (clientInfo.Settings.HasOffset() && (clientInfo.Settings.GetOffset() <= LastMessageOffset)) {
752755
LOG_ROW_DISPATCHER_INFO("New client has less offset (" << clientInfo.Settings.GetOffset() << ") than the last message (" << LastMessageOffset << "), stop (restart) topic session");
756+
Metrics.RestartSessionByOffsets->Inc();
757+
++RestartSessionByOffsets;
753758
StopReadSession();
754759
}
755760
}
@@ -917,6 +922,7 @@ void TTopicSession::HandleException(const std::exception& e) {
917922
void TTopicSession::SendStatistic() {
918923
TopicSessionStatistic stat;
919924
stat.Common.UnreadBytes = UnreadBytes;
925+
stat.Common.RestartSessionByOffsets = RestartSessionByOffsets;
920926
stat.SessionKey = TopicSessionParams{Endpoint, Database, TopicPath, PartitionId};
921927
stat.Clients.reserve(Clients.size());
922928
for (auto& [readActorId, info] : Clients) {

0 commit comments

Comments
 (0)