Skip to content

Commit c009b69

Browse files
authored
YQ-3858 RowDispatcher / Add new public metrics / to stable (#14157)
1 parent 2dede2b commit c009b69

File tree

18 files changed

+261
-61
lines changed

18 files changed

+261
-61
lines changed

ydb/core/fq/libs/compute/common/ut/utils_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Y_UNIT_TEST_SUITE(StatsFormat) {
6363

6464
Y_UNIT_TEST(AggregateStat) {
6565
auto res = NFq::AggregateStats(NResource::Find("plan.json"));
66-
UNIT_ASSERT_VALUES_EQUAL(res.size(), 14);
66+
UNIT_ASSERT_VALUES_EQUAL(res.size(), 18);
6767
UNIT_ASSERT_VALUES_EQUAL(res["IngressBytes"], 6333256);
6868
UNIT_ASSERT_VALUES_EQUAL(res["EgressBytes"], 0);
6969
UNIT_ASSERT_VALUES_EQUAL(res["InputBytes"], 1044);

ydb/core/fq/libs/compute/common/utils.cpp

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ struct TTotalStatistics {
8080
TAggregate EgressRows;
8181
TAggregate Tasks;
8282
TAggregates Aggregates;
83+
TAggregate IngressFilteredBytes;
84+
TAggregate IngressFilteredRows;
85+
TAggregate IngressQueuedBytes;
86+
TAggregate IngressQueuedRows;
8387
};
8488

8589
TString FormatDurationMs(ui64 durationMs) {
@@ -309,6 +313,14 @@ void WriteNamedNode(NYson::TYsonWriter& writer, NJson::TJsonValue& node, const T
309313
totals.SourceCpuTimeUs.Add(*sum);
310314
} else if (name == "Tasks") {
311315
totals.Tasks.Add(*sum);
316+
} else if (name == "IngressFilteredBytes") {
317+
totals.IngressFilteredBytes.Add(*sum);
318+
} else if (name == "IngressFilteredRows") {
319+
totals.IngressFilteredRows.Add(*sum);
320+
} else if (name == "IngressQueuedBytes") {
321+
totals.IngressQueuedBytes.Add(*sum);
322+
} else if (name == "IngressQueuedRows") {
323+
totals.IngressQueuedRows.Add(*sum);
312324
}
313325
}
314326
}
@@ -468,6 +480,10 @@ TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage, TString* time
468480
totals.IngressRows.Write(writer, "IngressRows");
469481
totals.EgressBytes.Write(writer, "EgressBytes");
470482
totals.EgressRows.Write(writer, "EgressRows");
483+
totals.IngressFilteredBytes.Write(writer, "IngressFilteredBytes");
484+
totals.IngressFilteredRows.Write(writer, "IngressFilteredRows");
485+
totals.IngressQueuedBytes.Write(writer, "IngressQueuedBytes");
486+
totals.IngressQueuedRows.Write(writer, "IngressQueuedRows");
471487
totals.Tasks.Write(writer, "Tasks");
472488
writer.OnEndMap();
473489
}
@@ -532,6 +548,26 @@ struct TStatsAggregator {
532548
Aggregates[source + ".Splits"] += ingress->GetIntegerSafe();
533549
success = true;
534550
}
551+
if (auto ingress = node.GetValueByPath("Ingress.FilteredBytes.Sum")) {
552+
auto source = name.substr(prefix.size());
553+
Aggregates[source + ".FilteredBytes"] += ingress->GetIntegerSafe();
554+
success = true;
555+
}
556+
if (auto ingress = node.GetValueByPath("Ingress.FilteredRows.Sum")) {
557+
auto source = name.substr(prefix.size());
558+
Aggregates[source + ".FilteredRows"] += ingress->GetIntegerSafe();
559+
success = true;
560+
}
561+
if (auto ingress = node.GetValueByPath("Ingress.QueuedBytes.Sum")) {
562+
auto source = name.substr(prefix.size());
563+
Aggregates[source + ".QueuedBytes"] += ingress->GetIntegerSafe();
564+
success = true;
565+
}
566+
if (auto ingress = node.GetValueByPath("Ingress.QueuedRows.Sum")) {
567+
auto source = name.substr(prefix.size());
568+
Aggregates[source + ".QueuedRows"] += ingress->GetIntegerSafe();
569+
success = true;
570+
}
535571
return success;
536572
}
537573

@@ -543,7 +579,11 @@ struct TStatsAggregator {
543579
{"EgressRows", 0},
544580
{"InputBytes", 0},
545581
{"OutputBytes", 0},
546-
{"CpuTimeUs", 0}
582+
{"CpuTimeUs", 0},
583+
{"IngressFilteredBytes", 0},
584+
{"IngressFilteredRows", 0},
585+
{"IngressQueuedBytes", 0},
586+
{"IngressQueuedRows", 0}
547587
};
548588
};
549589

@@ -989,6 +1029,10 @@ TString GetPrettyStatistics(const TString& statistics) {
9891029
RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressBytes", "EgressBytes");
9901030
RemapNode(writer, p.second, "TaskRunner.Stage=Total.EgressRows", "EgressRows");
9911031
RemapNode(writer, p.second, "TaskRunner.Stage=Total.MkqlMaxMemoryUsage", "MaxMemoryUsage");
1032+
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressFilteredBytes", "IngressFilteredBytes");
1033+
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressFilteredRows", "IngressFilteredRows");
1034+
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressQueuedBytes", "IngressQueuedBytes");
1035+
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressQueuedRows", "IngressQueuedRows");
9921036
writer.OnEndMap();
9931037
}
9941038
// YQv2
@@ -1010,6 +1054,10 @@ TString GetPrettyStatistics(const TString& statistics) {
10101054
RemapNode(writer, p.second, "EgressBytes", "EgressBytes");
10111055
RemapNode(writer, p.second, "EgressRows", "EgressRows");
10121056
RemapNode(writer, p.second, "MaxMemoryUsage", "MaxMemoryUsage");
1057+
RemapNode(writer, p.second, "IngressFilteredBytes", "IngressFilteredBytes");
1058+
RemapNode(writer, p.second, "IngressFilteredRows", "IngressFilteredRows");
1059+
RemapNode(writer, p.second, "IngressQueuedBytes", "IngressQueuedBytes");
1060+
RemapNode(writer, p.second, "IngressQueuedRows", "IngressQueuedRows");
10131061
writer.OnEndMap();
10141062
}
10151063
}

ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,29 @@ namespace NFq {
88
struct TTopicSessionClientStatistic {
99
NActors::TActorId ReadActorId;
1010
ui32 PartitionId = 0;
11-
i64 UnreadRows = 0; // Current value
12-
i64 UnreadBytes = 0; // Current value
11+
i64 QueuedRows = 0; // Current value
12+
i64 QueuedBytes = 0; // Current value
1313
ui64 Offset = 0; // Current value
14-
ui64 FilteredReadBytes = 0; // Increment / filtered
14+
ui64 FilteredBytes = 0; // Increment / filtered
15+
ui64 FilteredRows = 0; // Increment / filtered
1516
ui64 ReadBytes = 0; // Increment
1617
bool IsWaiting = false; // Current value
1718
i64 ReadLagMessages = 0; // Current value
1819
ui64 InitialOffset = 0;
1920
void Add(const TTopicSessionClientStatistic& stat) {
20-
UnreadRows = stat.UnreadRows;
21-
UnreadBytes = stat.UnreadBytes;
21+
QueuedRows = stat.QueuedRows;
22+
QueuedBytes = stat.QueuedBytes;
2223
Offset = stat.Offset;
23-
FilteredReadBytes += stat.FilteredReadBytes;
24+
FilteredBytes += stat.FilteredBytes;
25+
FilteredRows += stat.FilteredRows;
2426
ReadBytes += stat.ReadBytes;
2527
IsWaiting = stat.IsWaiting;
2628
ReadLagMessages = stat.ReadLagMessages;
2729
InitialOffset = stat.InitialOffset;
2830
}
2931
void Clear() {
30-
FilteredReadBytes = 0;
32+
FilteredBytes = 0;
33+
FilteredRows = 0;
3134
ReadBytes = 0;
3235
}
3336
};
@@ -63,7 +66,7 @@ struct TFormatHandlerStatistic {
6366
};
6467

6568
struct TTopicSessionCommonStatistic {
66-
ui64 UnreadBytes = 0; // Current value
69+
ui64 QueuedBytes = 0; // Current value
6770
ui64 RestartSessionByOffsets = 0;
6871
ui64 ReadBytes = 0; // Increment
6972
ui64 ReadEvents = 0; // Increment
@@ -72,7 +75,7 @@ struct TTopicSessionCommonStatistic {
7275
std::unordered_map<TString, TFormatHandlerStatistic> FormatHandlers;
7376

7477
void Add(const TTopicSessionCommonStatistic& stat) {
75-
UnreadBytes = stat.UnreadBytes;
78+
QueuedBytes = stat.QueuedBytes;
7679
RestartSessionByOffsets = stat.RestartSessionByOffsets;
7780
ReadBytes += stat.ReadBytes;
7881
ReadEvents += stat.ReadEvents;

ydb/core/fq/libs/row_dispatcher/probes.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
PROBE(SessionStatistic, \
9191
GROUPS(), \
9292
TYPES(TString, TString, TString, TString, TString, ui32, ui64, ui64, ui64, ui64, ui64), \
93-
NAMES("sender", "readGroup", "endpoint","database", "partitionId", "readBytes", "unreadBytes", "restartSessionByOffsets", "readEvents", "lastReadedOffset")) \
93+
NAMES("sender", "readGroup", "endpoint","database", "partitionId", "readBytes", "queuedBytes", "restartSessionByOffsets", "readEvents", "lastReadedOffset")) \
9494
PROBE(GetInternalState, \
9595
GROUPS(), \
9696
TYPES(TString, ui64), \

ydb/core/fq/libs/row_dispatcher/protos/events.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ message TEvStatistics {
8888
uint64 ReadBytes = 3;
8989
repeated TPartitionStatistics Partition = 4;
9090
uint64 CpuMicrosec = 5;
91+
uint64 FilteredBytes = 6;
92+
uint64 FilteredRows = 7;
93+
uint64 QueuedBytes = 8;
94+
uint64 QueuedRows = 9;
9195
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
9296
}
9397

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -117,16 +117,16 @@ struct TQueryStatKeyHash {
117117
};
118118

119119
struct TAggQueryStat {
120-
NYql::TCounters::TEntry FilteredReadBytes;
121-
NYql::TCounters::TEntry UnreadBytes;
122-
NYql::TCounters::TEntry UnreadRows;
120+
NYql::TCounters::TEntry FilteredBytes;
121+
NYql::TCounters::TEntry QueuedBytes;
122+
NYql::TCounters::TEntry QueuedRows;
123123
NYql::TCounters::TEntry ReadLagMessages;
124124
bool IsWaiting = false;
125125

126126
void Add(const TTopicSessionClientStatistic& stat) {
127-
FilteredReadBytes.Add(NYql::TCounters::TEntry(stat.FilteredReadBytes));
128-
UnreadBytes.Add(NYql::TCounters::TEntry(stat.UnreadBytes));
129-
UnreadRows.Add(NYql::TCounters::TEntry(stat.UnreadRows));
127+
FilteredBytes.Add(NYql::TCounters::TEntry(stat.FilteredBytes));
128+
QueuedBytes.Add(NYql::TCounters::TEntry(stat.QueuedBytes));
129+
QueuedRows.Add(NYql::TCounters::TEntry(stat.QueuedRows));
130130
ReadLagMessages.Add(NYql::TCounters::TEntry(stat.ReadLagMessages));
131131
IsWaiting = IsWaiting || stat.IsWaiting;
132132
}
@@ -410,7 +410,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
410410
void UpdateReadActorsInternalState();
411411
template <class TEventPtr>
412412
bool CheckSession(TAtomicSharedPtr<TConsumerInfo>& consumer, const TEventPtr& ev);
413-
void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax);
413+
void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 queuedBytesMax, ui64 queuedBytesAvg, i64 readLagMessagesMax);
414414
void PrintStateToLog();
415415
void UpdateCpuTime();
416416

@@ -611,7 +611,7 @@ void TRowDispatcher::UpdateMetrics() {
611611
toDelete.insert(key);
612612
continue;
613613
}
614-
SetQueryMetrics(key, stats->UnreadBytes.Max, stats->UnreadBytes.Avg, stats->ReadLagMessages.Max);
614+
SetQueryMetrics(key, stats->QueuedBytes.Max, stats->QueuedBytes.Avg, stats->ReadLagMessages.Max);
615615
}
616616
for (const auto& key : toDelete) {
617617
SetQueryMetrics(key, 0, 0, 0);
@@ -621,11 +621,11 @@ void TRowDispatcher::UpdateMetrics() {
621621
PrintStateToLog();
622622
}
623623

624-
void TRowDispatcher::SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax) {
624+
void TRowDispatcher::SetQueryMetrics(const TQueryStatKey& queryKey, ui64 queuedBytesMax, ui64 queuedBytesAvg, i64 readLagMessagesMax) {
625625
auto queryGroup = Metrics.Counters->GetSubgroup("query_id", queryKey.QueryId);
626626
auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(queryKey.ReadGroup));
627-
topicGroup->GetCounter("MaxUnreadBytes")->Set(unreadBytesMax);
628-
topicGroup->GetCounter("AvgUnreadBytes")->Set(unreadBytesAvg);
627+
topicGroup->GetCounter("MaxQueuedBytes")->Set(queuedBytesMax);
628+
topicGroup->GetCounter("AvgQueuedBytes")->Set(queuedBytesAvg);
629629
topicGroup->GetCounter("MaxReadLag")->Set(readLagMessagesMax);
630630
}
631631

@@ -663,11 +663,11 @@ TString TRowDispatcher::GetInternalState() {
663663

664664
THashMap<TQueryStatKey, TAggQueryStat, TQueryStatKeyHash> queryState;
665665
THashMap<TQueryStatKey, ui64, TQueryStatKeyHash> sessionCountByQuery;
666-
ui64 unreadBytesSum = 0;
666+
ui64 queuedBytesSum = 0;
667667

668668
for (auto& [sessionKey, sessionsInfo] : TopicSessions) {
669669
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
670-
unreadBytesSum += sessionInfo.Stat.UnreadBytes;
670+
queuedBytesSum += sessionInfo.Stat.QueuedBytes;
671671
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
672672
auto key = TQueryStatKey{consumer->QueryId, sessionKey.ReadGroup};
673673
++sessionCountByQuery[key];
@@ -677,18 +677,18 @@ TString TRowDispatcher::GetInternalState() {
677677
}
678678

679679
if (TopicSessions.size()) {
680-
str << "Buffer used: " << Prec(unreadBytesSum * 100.0 / (TopicSessions.size() * MaxSessionBufferSizeBytes), 4) << "% (" << toHuman(unreadBytesSum) << ")\n";
680+
str << "Buffer used: " << Prec(queuedBytesSum * 100.0 / (TopicSessions.size() * MaxSessionBufferSizeBytes), 4) << "% (" << toHuman(queuedBytesSum) << ")\n";
681681
}
682682

683683
str << "Queries:\n";
684684
for (const auto& [queryStatKey, stat]: queryState) {
685685
auto [queryId, readGroup] = queryStatKey;
686686
const auto& aggStat = AggrStats.LastQueryStats[queryStatKey];
687687
auto sessionsBufferSumSize = sessionCountByQuery[queryStatKey] * MaxSessionBufferSizeBytes;
688-
auto used = sessionsBufferSumSize ? (stat.UnreadBytes.Sum * 100.0 / sessionsBufferSumSize) : 0.0;
689-
str << " " << queryId << " / " << readGroup << ": buffer used (all partitions) " << LeftPad(Prec(used, 4), 10) << "% (" << toHuman(stat.UnreadBytes.Sum) << ") unread max (one partition) " << toHuman(stat.UnreadBytes.Max) << " data rate";
688+
auto used = sessionsBufferSumSize ? (stat.QueuedBytes.Sum * 100.0 / sessionsBufferSumSize) : 0.0;
689+
str << " " << queryId << " / " << readGroup << ": buffer used (all partitions) " << LeftPad(Prec(used, 4), 10) << "% (" << toHuman(stat.QueuedBytes.Sum) << ") unread max (one partition) " << toHuman(stat.QueuedBytes.Max) << " data rate";
690690
if (aggStat) {
691-
printDataRate(aggStat->FilteredReadBytes);
691+
printDataRate(aggStat->FilteredBytes);
692692
}
693693
str << " waiting " << stat.IsWaiting << " max read lag " << stat.ReadLagMessages.Max;
694694
str << "\n";
@@ -698,7 +698,7 @@ TString TRowDispatcher::GetInternalState() {
698698
str << " " << key.TopicPath << " / " << key.PartitionId << " / " << key.ReadGroup;
699699
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
700700
str << " / " << LeftPad(actorId, 32)
701-
<< " data rate " << toHumanDR(sessionInfo.AggrReadBytes.Sum) << " unread bytes " << toHuman(sessionInfo.Stat.UnreadBytes)
701+
<< " data rate " << toHumanDR(sessionInfo.AggrReadBytes.Sum) << " unread bytes " << toHuman(sessionInfo.Stat.QueuedBytes)
702702
<< " offset " << LeftPad(sessionInfo.Stat.LastReadedOffset, 12) << " restarts by offsets " << sessionInfo.Stat.RestartSessionByOffsets << "\n";
703703
ui64 maxInitialOffset = 0;
704704
ui64 minInitialOffset = std::numeric_limits<ui64>::max();
@@ -712,7 +712,7 @@ TString TRowDispatcher::GetInternalState() {
712712
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
713713
const auto& partition = consumer->Partitions[key.PartitionId];
714714
str << " " << consumer->QueryId << " " << LeftPad(readActorId, 32) << " unread bytes "
715-
<< toHuman(consumer->Stat.UnreadBytes) << " (" << leftPad(consumer->Stat.UnreadRows) << " rows) "
715+
<< toHuman(consumer->Stat.QueuedBytes) << " (" << leftPad(consumer->Stat.QueuedRows) << " rows) "
716716
<< " offset " << leftPad(consumer->Stat.Offset) << " init offset " << leftPad(consumer->Stat.InitialOffset)
717717
<< " get " << leftPad(consumer->Counters.GetNextBatch)
718718
<< " arr " << leftPad(consumer->Counters.NewDataArrived) << " btc " << leftPad(consumer->Counters.MessageBatch)
@@ -1054,6 +1054,10 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) {
10541054
}
10551055
auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>();
10561056
ui64 readBytes = 0;
1057+
ui64 filteredBytes = 0;
1058+
ui64 filteredRows = 0;
1059+
ui64 queuedBytes = 0;
1060+
ui64 queuedRows = 0;
10571061
for (auto& [partitionId, partition] : consumer->Partitions) {
10581062
if (!partition.StatisticsUpdated) {
10591063
continue;
@@ -1062,12 +1066,20 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) {
10621066
partitionsProto->SetPartitionId(partitionId);
10631067
partitionsProto->SetNextMessageOffset(partition.Stat.Offset);
10641068
readBytes += partition.Stat.ReadBytes;
1069+
filteredBytes += partition.Stat.FilteredBytes;
1070+
filteredRows += partition.Stat.FilteredRows;
1071+
queuedBytes += partition.Stat.QueuedBytes;
1072+
queuedRows += partition.Stat.QueuedRows;
10651073
partition.Stat.Clear();
10661074
partition.StatisticsUpdated = false;
10671075
}
10681076
event->Record.SetReadBytes(readBytes);
10691077
event->Record.SetCpuMicrosec(consumer->CpuMicrosec);
10701078
consumer->CpuMicrosec = 0;
1079+
event->Record.SetFilteredBytes(filteredBytes);
1080+
event->Record.SetFilteredRows(filteredRows);
1081+
event->Record.SetQueuedBytes(queuedBytes);
1082+
event->Record.SetQueuedRows(queuedRows);
10711083
LWPROBE(Statistics, consumer->ReadActorId.ToString(), consumer->QueryId, consumer->Generation, event->Record.ByteSizeLong());
10721084
consumer->EventsQueue.Send(event.release(), consumer->Generation);
10731085
}
@@ -1102,7 +1114,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev
11021114
key.TopicPath,
11031115
key.PartitionId,
11041116
stat.Common.ReadBytes,
1105-
stat.Common.UnreadBytes,
1117+
stat.Common.QueuedBytes,
11061118
stat.Common.RestartSessionByOffsets,
11071119
stat.Common.ReadEvents,
11081120
stat.Common.LastReadedOffset);

0 commit comments

Comments
 (0)