Skip to content

Commit 8074419

Browse files
authored
YQ-3766 Shared reading: add unread stats (#10635)
1 parent 37265de commit 8074419

File tree

11 files changed

+190
-68
lines changed

11 files changed

+190
-68
lines changed

ydb/core/fq/libs/row_dispatcher/actors_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ struct TActorFactory : public IActorFactory {
1010

1111
NActors::TActorId RegisterTopicSession(
1212
const TString& topicPath,
13+
const TString& endpoint,
14+
const TString& database,
1315
const NConfig::TRowDispatcherConfig& config,
1416
NActors::TActorId rowDispatcherActorId,
1517
ui32 partitionId,
@@ -20,6 +22,8 @@ struct TActorFactory : public IActorFactory {
2022

2123
auto actorPtr = NFq::NewTopicSession(
2224
topicPath,
25+
endpoint,
26+
database,
2327
config,
2428
rowDispatcherActorId,
2529
partitionId,

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ struct IActorFactory : public TThrRefBase {
1313

1414
virtual NActors::TActorId RegisterTopicSession(
1515
const TString& topicPath,
16+
const TString& endpoint,
17+
const TString& database,
1618
const NConfig::TRowDispatcherConfig& config,
1719
NActors::TActorId rowDispatcherActorId,
1820
ui32 partitionId,

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <ydb/core/fq/libs/row_dispatcher/protos/events.pb.h>
88
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
9+
#include <ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h>
910

1011
namespace NFq {
1112

@@ -26,6 +27,7 @@ struct TEvRowDispatcher {
2627
EvCoordinatorChangesSubscribe,
2728
EvCoordinatorRequest,
2829
EvCoordinatorResult,
30+
EvSessionStatistic,
2931
EvEnd,
3032
};
3133

@@ -120,6 +122,12 @@ struct TEvRowDispatcher {
120122
TEvSessionError() = default;
121123
NActors::TActorId ReadActorId;
122124
};
125+
126+
struct TEvSessionStatistic : public NActors::TEventLocal<TEvSessionStatistic, EEv::EvSessionStatistic> {
127+
TEvSessionStatistic(const TopicSessionStatistic& stat)
128+
: Stat(stat) {}
129+
TopicSessionStatistic Stat;
130+
};
123131
};
124132

125133
} // namespace NFq
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#pragma once
2+
3+
#include <ydb/library/actors/core/actorid.h>
4+
#include <util/generic/vector.h>
5+
6+
namespace NFq {
7+
8+
struct TopicSessionClientStatistic {
9+
NActors::TActorId ReadActorId;
10+
ui32 PartitionId = 0;
11+
i64 UnreadRows = 0;
12+
i64 UnreadBytes = 0;
13+
ui64 Offset = 0;
14+
};
15+
16+
struct TopicSessionCommonStatistic {
17+
ui64 UnreadBytes = 0;
18+
};
19+
20+
struct TopicSessionParams {
21+
TString Endpoint;
22+
TString Database;
23+
TString TopicPath;
24+
ui64 PartitionId = 0;
25+
};
26+
27+
struct TopicSessionStatistic {
28+
TopicSessionParams SessionKey;
29+
TVector<TopicSessionClientStatistic> Clients;
30+
TopicSessionCommonStatistic Common;
31+
};
32+
33+
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 83 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/library/actors/core/hfunc.h>
77
#include <ydb/library/actors/core/interconnect.h>
88
#include <ydb/library/yql/dq/actors/common/retry_queue.h>
9+
#include <ydb/library/yql/providers/dq/counters/counters.h>
910

1011
#include <ydb/core/fq/libs/actors/logging/log.h>
1112
#include <ydb/core/fq/libs/events/events.h>
@@ -48,16 +49,22 @@ struct TEvPrivate {
4849
enum EEv : ui32 {
4950
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
5051
EvCoordinatorPing = EvBegin + 20,
51-
EvPrintState,
52+
EvUpdateMetrics,
5253
EvEnd
5354
};
5455

5556
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
5657
struct TEvCoordinatorPing : NActors::TEventLocal<TEvCoordinatorPing, EvCoordinatorPing> {};
57-
struct TEvPrintState : public NActors::TEventLocal<TEvPrintState, EvPrintState> {};
58+
struct TEvUpdateMetrics : public NActors::TEventLocal<TEvUpdateMetrics, EvUpdateMetrics> {};
5859
};
5960

60-
ui64 PrintStatePeriodSec = 60;
61+
struct TQueryStat {
62+
const TString QueryId;
63+
NYql::TCounters::TEntry UnreadRows;
64+
NYql::TCounters::TEntry UnreadBytes;
65+
};
66+
67+
ui64 UpdateMetricsPeriodSec = 60;
6168

6269
class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
6370

@@ -84,19 +91,19 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
8491
struct TopicSessionKey {
8592
TString Endpoint;
8693
TString Database;
87-
TString TopicName;
94+
TString TopicPath;
8895
ui64 PartitionId;
8996

9097
size_t Hash() const noexcept {
9198
ui64 hash = std::hash<TString>()(Endpoint);
9299
hash = CombineHashes<ui64>(hash, std::hash<TString>()(Database));
93-
hash = CombineHashes<ui64>(hash, std::hash<TString>()(TopicName));
100+
hash = CombineHashes<ui64>(hash, std::hash<TString>()(TopicPath));
94101
hash = CombineHashes<ui64>(hash, std::hash<ui64>()(PartitionId));
95102
return hash;
96103
}
97104
bool operator==(const TopicSessionKey& other) const {
98105
return Endpoint == other.Endpoint && Database == other.Database
99-
&& TopicName == other.TopicName && PartitionId == other.PartitionId;
106+
&& TopicPath == other.TopicPath && PartitionId == other.PartitionId;
100107
}
101108
};
102109

@@ -154,10 +161,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
154161
TActorId TopicSessionId;
155162
const TString QueryId;
156163
ConsumerCounters Counters;
164+
TopicSessionClientStatistic Stat;
157165
};
158166

159167
struct SessionInfo {
160168
TMap<TActorId, TAtomicSharedPtr<ConsumerInfo>> Consumers; // key - ReadActor actor id
169+
TopicSessionCommonStatistic Stat;
161170
};
162171

163172
struct TopicSessionInfo {
@@ -198,15 +207,16 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
198207
void Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev);
199208
void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev);
200209
void Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev);
210+
void Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev);
201211

202212
void Handle(NActors::TEvents::TEvPing::TPtr& ev);
203213
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&);
204214
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr&);
205215
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
206-
void Handle(NFq::TEvPrivate::TEvPrintState::TPtr&);
216+
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
207217

208218
void DeleteConsumer(const ConsumerSessionKey& key);
209-
void PrintInternalState();
219+
void UpdateMetrics();
210220

211221
STRICT_STFUNC(
212222
StateFunc, {
@@ -223,12 +233,13 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
223233
hFunc(NFq::TEvRowDispatcher::TEvStopSession, Handle);
224234
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
225235
hFunc(NFq::TEvRowDispatcher::TEvStatus, Handle);
236+
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
226237
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
227238
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvPing, Handle);
228239
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
229240
hFunc(NActors::TEvents::TEvPing, Handle);
230241
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
231-
hFunc(NFq::TEvPrivate::TEvPrintState, Handle);
242+
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
232243
})
233244
};
234245

@@ -261,7 +272,7 @@ void TRowDispatcher::Bootstrap() {
261272
auto coordinatorId = Register(NewCoordinator(SelfId(), config, YqSharedResources, Tenant, Counters).release());
262273
Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release());
263274
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
264-
Schedule(TDuration::Seconds(PrintStatePeriodSec), new NFq::TEvPrivate::TEvPrintState());
275+
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
265276
}
266277

267278
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -320,31 +331,42 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscrib
320331
Send(ev->Sender, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(*CoordinatorActorId), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession);
321332
}
322333

323-
void TRowDispatcher::PrintInternalState() {
334+
void TRowDispatcher::UpdateMetrics() {
324335
if (Consumers.empty()) {
325336
return;
326337
}
338+
TMap<TString, TQueryStat> queryStats;
327339
TStringStream str;
328-
str << "Consumers:\n";
329-
for (auto& [key, consumerInfo] : Consumers) {
330-
str << " query id " << consumerInfo->QueryId << ", partId: " << key.PartitionId << ", read actor id: " << key.ReadActorId
331-
<< ", queueId " << consumerInfo->EventQueueId << ", get next " << consumerInfo->Counters.GetNextBatch
332-
<< ", data arrived " << consumerInfo->Counters.NewDataArrived << ", message batch " << consumerInfo->Counters.MessageBatch << "\n";
333-
str << " ";
334-
consumerInfo->EventsQueue.PrintInternalState(str);
335-
}
336340

337-
str << "\nSessions:\n";
338-
for (auto& [key, sessionInfo1] : TopicSessions) {
339-
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicName << ", id: " << key.PartitionId << "\n";
340-
for (auto& [actorId, sessionInfo2] : sessionInfo1.Sessions) {
341-
str << " session id: " << actorId << "\n";
342-
for (auto& [actorId2, consumer] : sessionInfo2.Consumers) {
343-
str << " read actor id: " << actorId2 << "\n";
341+
str << "Statistics:\n";
342+
for (auto& [key, sessionsInfo] : TopicSessions) {
343+
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
344+
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
345+
str << " / " << actorId << "\n";
346+
str << " unread bytes " << sessionInfo.Stat.UnreadBytes << "\n";
347+
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
348+
auto& stat = queryStats[consumer->QueryId];
349+
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
350+
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
351+
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
352+
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset
353+
<< " get " << consumer->Counters.GetNextBatch
354+
<< " arrived " << consumer->Counters.NewDataArrived << " batch " << consumer->Counters.MessageBatch << " ";
355+
str << " retry queue: ";
356+
consumer->EventsQueue.PrintInternalState(str);
344357
}
345358
}
346359
}
347360
LOG_ROW_DISPATCHER_DEBUG(str.Str());
361+
362+
for (const auto& [queryId, stat] : queryStats) {
363+
LOG_ROW_DISPATCHER_DEBUG("UnreadBytes " << queryId << " " << stat.UnreadBytes.Max);
364+
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
365+
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
366+
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
367+
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
368+
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
369+
}
348370
}
349371

350372
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
@@ -383,6 +405,8 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
383405
LOG_ROW_DISPATCHER_DEBUG("Create new session " << readOffset);
384406
sessionActorId = ActorFactory->RegisterTopicSession(
385407
source.GetTopicPath(),
408+
source.GetEndpoint(),
409+
source.GetDatabase(),
386410
Config,
387411
SelfId(),
388412
ev->Get()->Record.GetPartitionId(),
@@ -407,7 +431,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
407431

408432
Forward(ev, sessionActorId);
409433
Metrics.ClientsCount->Set(Consumers.size());
410-
PrintInternalState();
434+
UpdateMetrics();
411435
}
412436

413437
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
@@ -490,7 +514,7 @@ void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) {
490514
ConsumersByEventQueueId.erase(consumerIt->second->EventQueueId);
491515
Consumers.erase(consumerIt);
492516
Metrics.ClientsCount->Set(Consumers.size());
493-
PrintInternalState();
517+
UpdateMetrics();
494518
}
495519

496520
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) {
@@ -577,9 +601,37 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev) {
577601
it->second->EventsQueue.Send(ev.Release()->Release().Release());
578602
}
579603

580-
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintState::TPtr&) {
581-
Schedule(TDuration::Seconds(PrintStatePeriodSec), new NFq::TEvPrivate::TEvPrintState());
582-
PrintInternalState();
604+
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
605+
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
606+
UpdateMetrics();
607+
}
608+
609+
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
610+
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
611+
const auto& key = ev->Get()->Stat.SessionKey;
612+
TopicSessionKey sessionKey{key.Endpoint, key.Database, key.TopicPath, key.PartitionId};
613+
614+
auto sessionsIt = TopicSessions.find(sessionKey);
615+
if (sessionsIt == TopicSessions.end()) {
616+
return;
617+
}
618+
auto& sessionsInfo = sessionsIt->second;
619+
auto sessionIt = sessionsInfo.Sessions.find(ev->Sender);
620+
if (sessionIt == sessionsInfo.Sessions.end()) {
621+
return;
622+
}
623+
624+
auto& sessionInfo = sessionIt->second;
625+
sessionInfo.Stat = ev->Get()->Stat.Common;
626+
627+
for (const auto& clientStat : ev->Get()->Stat.Clients) {
628+
auto it = sessionInfo.Consumers.find(clientStat.ReadActorId);
629+
if (it == sessionInfo.Consumers.end()) {
630+
continue;
631+
}
632+
auto consumerInfoPtr = it->second;
633+
consumerInfoPtr->Stat = clientStat;
634+
}
583635
}
584636

585637
} // namespace

0 commit comments

Comments
 (0)