Skip to content

Commit 0a9693f

Browse files
authored
YQ-3910 Fix MaxUnreadBytes on stopping query (#11948)
1 parent e6eea25 commit 0a9693f

File tree

1 file changed

+31
-10
lines changed

1 file changed

+31
-10
lines changed

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
242242

243243
struct TAggregatedStats{
244244
NYql::TCounters::TEntry AllSessionsReadBytes;
245-
TMap<TQueryStatKey, TAggQueryStat> LastQueryStats;
245+
TMap<TQueryStatKey, TMaybe<TAggQueryStat>> LastQueryStats;
246246
TDuration LastUpdateMetricsPeriod;
247247
};
248248

@@ -364,6 +364,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
364364
TString GetInternalState();
365365
template <class TEventPtr>
366366
bool CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, const TEventPtr& ev);
367+
void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax);
367368
void PrintStateToLog();
368369

369370
STRICT_STFUNC(
@@ -516,7 +517,9 @@ void TRowDispatcher::UpdateMetrics() {
516517
}
517518

518519
AggrStats.AllSessionsReadBytes = NYql::TCounters::TEntry();
519-
AggrStats.LastQueryStats.clear();
520+
for (auto& [queryId, stat] : AggrStats.LastQueryStats) {
521+
stat = Nothing();
522+
}
520523

521524
for (auto& [key, sessionsInfo] : TopicSessions) {
522525
const auto& topic = key.TopicPath;
@@ -527,18 +530,34 @@ void TRowDispatcher::UpdateMetrics() {
527530
sessionInfo.Stat.Clear();
528531

529532
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
530-
AggrStats.LastQueryStats[TQueryStatKey{consumer->QueryId, topic}].Add(consumer->Stat);
533+
auto& stat = AggrStats.LastQueryStats[TQueryStatKey{consumer->QueryId, topic}];
534+
if (!stat) {
535+
stat = TAggQueryStat();
536+
}
537+
stat->Add(consumer->Stat);
531538
consumer->Stat.Clear();
532539
}
533540
}
534541
}
535-
for (const auto& [queryStatKey, stat] : AggrStats.LastQueryStats) {
536-
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryStatKey.first);
537-
auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(queryStatKey.second));
538-
topicGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
539-
topicGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
540-
topicGroup->GetCounter("MaxReadLag")->Set(stat.ReadLagMessages.Max);
542+
for (auto it = AggrStats.LastQueryStats.begin(); it != AggrStats.LastQueryStats.end();) {
543+
const auto& stats = it->second;
544+
if (!stats) {
545+
SetQueryMetrics(it->first, 0, 0, 0);
546+
it = AggrStats.LastQueryStats.erase(it);
547+
continue;
548+
}
549+
SetQueryMetrics(it->first, stats->UnreadBytes.Max, stats->UnreadBytes.Avg, stats->ReadLagMessages.Max);
550+
++it;
541551
}
552+
PrintStateToLog();
553+
}
554+
555+
void TRowDispatcher::SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax) {
556+
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryKey.first);
557+
auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(queryKey.second));
558+
topicGroup->GetCounter("MaxUnreadBytes")->Set(unreadBytesMax);
559+
topicGroup->GetCounter("AvgUnreadBytes")->Set(unreadBytesAvg);
560+
topicGroup->GetCounter("MaxReadLag")->Set(readLagMessagesMax);
542561
}
543562

544563
TString TRowDispatcher::GetInternalState() {
@@ -598,7 +617,9 @@ TString TRowDispatcher::GetInternalState() {
598617
auto sessionsBufferSumSize = sessionCountByQuery[queryStatKey] * MaxSessionBufferSizeBytes;
599618
auto used = sessionsBufferSumSize ? (stat.UnreadBytes.Sum * 100.0 / sessionsBufferSumSize) : 0.0;
600619
str << " " << queryId << " / " << topic << ": buffer used (all partitions) " << LeftPad(Prec(used, 4), 10) << "% (" << toHuman(stat.UnreadBytes.Sum) << ") unread max (one partition) " << toHuman(stat.UnreadBytes.Max) << " data rate";
601-
printDataRate(aggStat.ReadBytes);
620+
if (aggStat) {
621+
printDataRate(aggStat->ReadBytes);
622+
}
602623
str << " waiting " << stat.IsWaiting << " max read lag " << stat.ReadLagMessages.Max;
603624
str << "\n";
604625
}

0 commit comments

Comments
 (0)