Skip to content

Commit a15d870

Browse files
authored
YQ-3912 Add read group to row dispatcher (#12102)
1 parent 800705f commit a15d870

File tree

9 files changed

+133
-38
lines changed

9 files changed

+133
-38
lines changed

ydb/core/fq/libs/row_dispatcher/actors_factory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ struct TActorFactory : public IActorFactory {
99
TActorFactory() {}
1010

1111
NActors::TActorId RegisterTopicSession(
12+
const TString& readGroup,
1213
const TString& topicPath,
1314
const TString& endpoint,
1415
const TString& database,
@@ -23,6 +24,7 @@ struct TActorFactory : public IActorFactory {
2324
ui64 maxBufferSize) const override {
2425

2526
auto actorPtr = NFq::NewTopicSession(
27+
readGroup,
2628
topicPath,
2729
endpoint,
2830
database,

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ struct IActorFactory : public TThrRefBase {
1212
using TPtr = TIntrusivePtr<IActorFactory>;
1313

1414
virtual NActors::TActorId RegisterTopicSession(
15+
const TString& readGroup,
1516
const TString& topicPath,
1617
const TString& endpoint,
1718
const TString& database,

ydb/core/fq/libs/row_dispatcher/events/topic_session_stats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ struct TopicSessionCommonStatistic {
5151
};
5252

5353
struct TopicSessionParams {
54+
TString ReadGroup;
5455
TString Endpoint;
5556
TString Database;
5657
TString TopicPath;

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,25 @@ struct TEvPrivate {
7272
};
7373
};
7474

75-
using TQueryStatKey = std::pair<TString, TString>; // QueryId / Topic
75+
struct TQueryStatKey {
76+
TString QueryId;
77+
TString ReadGroup;
78+
79+
size_t Hash() const noexcept {
80+
ui64 hash = std::hash<TString>()(QueryId);
81+
hash = CombineHashes<ui64>(hash, std::hash<TString>()(ReadGroup));
82+
return hash;
83+
}
84+
bool operator==(const TQueryStatKey& other) const {
85+
return QueryId == other.QueryId && ReadGroup == other.ReadGroup;
86+
}
87+
};
88+
89+
struct TQueryStatKeyHash {
90+
size_t operator()(const TQueryStatKey& k) const {
91+
return k.Hash();
92+
}
93+
};
7694

7795
struct TAggQueryStat {
7896
NYql::TCounters::TEntry ReadBytes;
@@ -118,20 +136,22 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
118136
};
119137

120138
struct TopicSessionKey {
139+
TString ReadGroup;
121140
TString Endpoint;
122141
TString Database;
123142
TString TopicPath;
124143
ui64 PartitionId;
125144

126145
size_t Hash() const noexcept {
127-
ui64 hash = std::hash<TString>()(Endpoint);
146+
ui64 hash = std::hash<TString>()(ReadGroup);
147+
hash = CombineHashes<ui64>(hash, std::hash<TString>()(Endpoint));
128148
hash = CombineHashes<ui64>(hash, std::hash<TString>()(Database));
129149
hash = CombineHashes<ui64>(hash, std::hash<TString>()(TopicPath));
130150
hash = CombineHashes<ui64>(hash, std::hash<ui64>()(PartitionId));
131151
return hash;
132152
}
133153
bool operator==(const TopicSessionKey& other) const {
134-
return Endpoint == other.Endpoint && Database == other.Database
154+
return ReadGroup == other.ReadGroup && Endpoint == other.Endpoint && Database == other.Database
135155
&& TopicPath == other.TopicPath && PartitionId == other.PartitionId;
136156
}
137157
};
@@ -243,7 +263,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
243263

244264
struct TAggregatedStats{
245265
NYql::TCounters::TEntry AllSessionsReadBytes;
246-
TMap<TQueryStatKey, TMaybe<TAggQueryStat>> LastQueryStats;
266+
THashMap<TQueryStatKey, TMaybe<TAggQueryStat>, TQueryStatKeyHash> LastQueryStats;
247267
TDuration LastUpdateMetricsPeriod;
248268
};
249269

@@ -540,15 +560,14 @@ void TRowDispatcher::UpdateMetrics() {
540560
}
541561

542562
for (auto& [key, sessionsInfo] : TopicSessions) {
543-
const auto& topic = key.TopicPath;
544563
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
545564
auto read = NYql::TCounters::TEntry(sessionInfo.Stat.ReadBytes);
546565
AggrStats.AllSessionsReadBytes.Add(read);
547566
sessionInfo.AggrReadBytes = read;
548567
sessionInfo.Stat.Clear();
549568

550569
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
551-
auto& stat = AggrStats.LastQueryStats[TQueryStatKey{consumer->QueryId, topic}];
570+
auto& stat = AggrStats.LastQueryStats[TQueryStatKey{consumer->QueryId, key.ReadGroup}];
552571
if (!stat) {
553572
stat = TAggQueryStat();
554573
}
@@ -557,22 +576,24 @@ void TRowDispatcher::UpdateMetrics() {
557576
}
558577
}
559578
}
560-
for (auto it = AggrStats.LastQueryStats.begin(); it != AggrStats.LastQueryStats.end();) {
561-
const auto& stats = it->second;
579+
THashSet<TQueryStatKey, TQueryStatKeyHash> toDelete;
580+
for (const auto& [key, stats] : AggrStats.LastQueryStats) {
562581
if (!stats) {
563-
SetQueryMetrics(it->first, 0, 0, 0);
564-
it = AggrStats.LastQueryStats.erase(it);
582+
toDelete.insert(key);
565583
continue;
566584
}
567-
SetQueryMetrics(it->first, stats->UnreadBytes.Max, stats->UnreadBytes.Avg, stats->ReadLagMessages.Max);
568-
++it;
585+
SetQueryMetrics(key, stats->UnreadBytes.Max, stats->UnreadBytes.Avg, stats->ReadLagMessages.Max);
586+
}
587+
for (const auto& key : toDelete) {
588+
SetQueryMetrics(key, 0, 0, 0);
589+
AggrStats.LastQueryStats.erase(key);
569590
}
570591
PrintStateToLog();
571592
}
572593

573594
void TRowDispatcher::SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax) {
574-
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryKey.first);
575-
auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(queryKey.second));
595+
auto queryGroup = Metrics.Counters->GetSubgroup("query_id", queryKey.QueryId);
596+
auto topicGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(queryKey.ReadGroup));
576597
topicGroup->GetCounter("MaxUnreadBytes")->Set(unreadBytesMax);
577598
topicGroup->GetCounter("AvgUnreadBytes")->Set(unreadBytesAvg);
578599
topicGroup->GetCounter("MaxReadLag")->Set(readLagMessagesMax);
@@ -608,16 +629,15 @@ TString TRowDispatcher::GetInternalState() {
608629
printDataRate(AggrStats.AllSessionsReadBytes);
609630
str << "\n";
610631

611-
TMap<TQueryStatKey, TAggQueryStat> queryState;
612-
TMap<TQueryStatKey, ui64> sessionCountByQuery;
632+
THashMap<TQueryStatKey, TAggQueryStat, TQueryStatKeyHash> queryState;
633+
THashMap<TQueryStatKey, ui64, TQueryStatKeyHash> sessionCountByQuery;
613634
ui64 unreadBytesSum = 0;
614635

615-
for (auto& [key, sessionsInfo] : TopicSessions) {
636+
for (auto& [sessionKey, sessionsInfo] : TopicSessions) {
616637
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
617-
const auto& topic = key.TopicPath;
618638
unreadBytesSum += sessionInfo.Stat.UnreadBytes;
619639
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
620-
auto key = TQueryStatKey{consumer->QueryId, topic};
640+
auto key = TQueryStatKey{consumer->QueryId, sessionKey.ReadGroup};
621641
++sessionCountByQuery[key];
622642
queryState[key].Add(consumer->Stat);
623643
}
@@ -630,11 +650,11 @@ TString TRowDispatcher::GetInternalState() {
630650

631651
str << "Queries:\n";
632652
for (const auto& [queryStatKey, stat]: queryState) {
633-
auto [queryId, topic] = queryStatKey;
653+
auto [queryId, readGroup] = queryStatKey;
634654
const auto& aggStat = AggrStats.LastQueryStats[queryStatKey];
635655
auto sessionsBufferSumSize = sessionCountByQuery[queryStatKey] * MaxSessionBufferSizeBytes;
636656
auto used = sessionsBufferSumSize ? (stat.UnreadBytes.Sum * 100.0 / sessionsBufferSumSize) : 0.0;
637-
str << " " << queryId << " / " << topic << ": buffer used (all partitions) " << LeftPad(Prec(used, 4), 10) << "% (" << toHuman(stat.UnreadBytes.Sum) << ") unread max (one partition) " << toHuman(stat.UnreadBytes.Max) << " data rate";
657+
str << " " << queryId << " / " << readGroup << ": buffer used (all partitions) " << LeftPad(Prec(used, 4), 10) << "% (" << toHuman(stat.UnreadBytes.Sum) << ") unread max (one partition) " << toHuman(stat.UnreadBytes.Max) << " data rate";
638658
if (aggStat) {
639659
printDataRate(aggStat->ReadBytes);
640660
}
@@ -643,7 +663,7 @@ TString TRowDispatcher::GetInternalState() {
643663
}
644664
str << "TopicSessions:\n";
645665
for (auto& [key, sessionsInfo] : TopicSessions) {
646-
str << " " << key.TopicPath << " / " << key.PartitionId;
666+
str << " " << key.TopicPath << " / " << key.PartitionId << " / " << key.ReadGroup;
647667
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
648668
str << " / " << LeftPad(actorId, 32)
649669
<< " data rate " << toHumanDR(sessionInfo.AggrReadBytes.Sum) << " unread bytes " << toHuman(sessionInfo.Stat.UnreadBytes)
@@ -707,10 +727,10 @@ void TRowDispatcher::UpdateReadActorsInternalState() {
707727
}
708728

709729
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
710-
LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
730+
LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", read group " << ev->Get()->Record.GetSource().GetReadGroup() << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
711731
" part id " << ev->Get()->Record.GetPartitionId() << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie);
712-
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", ev->Get()->Record.GetQueryId());
713-
auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(ev->Get()->Record.GetSource().GetTopicPath()));
732+
auto queryGroup = Metrics.Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
733+
auto topicGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(ev->Get()->Record.GetSource().GetReadGroup()));
714734
topicGroup->GetCounter("StartSession", true)->Inc();
715735

716736
NodesTracker.AddNode(ev->Sender.NodeId());
@@ -733,7 +753,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
733753
}
734754
const auto& source = ev->Get()->Record.GetSource();
735755
TActorId sessionActorId;
736-
TopicSessionKey topicKey{source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), ev->Get()->Record.GetPartitionId()};
756+
TopicSessionKey topicKey{source.GetReadGroup(), source.GetEndpoint(), source.GetDatabase(), source.GetTopicPath(), ev->Get()->Record.GetPartitionId()};
737757
TopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
738758
Y_ENSURE(topicSessionInfo.Sessions.size() <= 1);
739759

@@ -745,8 +765,10 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
745765
}
746766

747767
if (topicSessionInfo.Sessions.empty()) {
748-
LOG_ROW_DISPATCHER_DEBUG("Create new session, offset " << readOffset);
768+
LOG_ROW_DISPATCHER_DEBUG("Create new session: read group " << source.GetReadGroup() << " topic " << source.GetTopicPath()
769+
<< " part id " << ev->Get()->Record.GetPartitionId() << " offset " << readOffset);
749770
sessionActorId = ActorFactory->RegisterTopicSession(
771+
source.GetReadGroup(),
750772
source.GetTopicPath(),
751773
source.GetEndpoint(),
752774
source.GetDatabase(),
@@ -850,6 +872,7 @@ void TRowDispatcher::DeleteConsumer(const ConsumerSessionKey& key) {
850872
Send(new IEventHandle(consumerIt->second->TopicSessionId, consumer->ReadActorId, event.release(), 0));
851873

852874
TopicSessionKey topicKey{
875+
consumer->SourceParams.GetReadGroup(),
853876
consumer->SourceParams.GetEndpoint(),
854877
consumer->SourceParams.GetDatabase(),
855878
consumer->SourceParams.GetTopicPath(),
@@ -879,7 +902,7 @@ void TRowDispatcher::Handle(const TEvPrivate::TEvTryConnect::TPtr& ev) {
879902
void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr& ev) {
880903
auto it = ConsumersByEventQueueId.find(ev->Get()->EventQueueId);
881904
if (it == ConsumersByEventQueueId.end()) {
882-
LOG_ROW_DISPATCHER_WARN("No consumer with EventQueueId = " << ev->Get()->EventQueueId);
905+
LOG_ROW_DISPATCHER_TRACE("No consumer with EventQueueId = " << ev->Get()->EventQueueId);
883906
return;
884907
}
885908
auto& sessionInfo = it->second;
@@ -984,7 +1007,7 @@ void TRowDispatcher::Handle(const NMon::TEvHttpInfo::TPtr& ev) {
9841007
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
9851008
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
9861009
const auto& key = ev->Get()->Stat.SessionKey;
987-
TopicSessionKey sessionKey{key.Endpoint, key.Database, key.TopicPath, key.PartitionId};
1010+
TopicSessionKey sessionKey{key.ReadGroup, key.Endpoint, key.Database, key.TopicPath, key.PartitionId};
9881011

9891012
auto sessionsIt = TopicSessions.find(sessionKey);
9901013
if (sessionsIt == TopicSessions.end()) {

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
190190

191191
bool InflightReconnect = false;
192192
TDuration ReconnectPeriod;
193+
const TString ReadGroup;
193194
const TString TopicPath;
194195
const TString TopicPathPartition;
195196
const TString Endpoint;
@@ -225,6 +226,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
225226

226227
public:
227228
explicit TTopicSession(
229+
const TString& readGroup,
228230
const TString& topicPath,
229231
const TString& endpoint,
230232
const TString& database,
@@ -322,6 +324,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
322324
};
323325

324326
TTopicSession::TTopicSession(
327+
const TString& readGroup,
325328
const TString& topicPath,
326329
const TString& endpoint,
327330
const TString& database,
@@ -334,7 +337,8 @@ TTopicSession::TTopicSession(
334337
const ::NMonitoring::TDynamicCounterPtr& counters,
335338
const NYql::IPqGateway::TPtr& pqGateway,
336339
ui64 maxBufferSize)
337-
: TopicPath(topicPath)
340+
: ReadGroup(readGroup)
341+
, TopicPath(topicPath)
338342
, TopicPathPartition(TStringBuilder() << topicPath << "/" << partitionId)
339343
, Endpoint(endpoint)
340344
, Database(database)
@@ -833,12 +837,16 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
833837
auto types = GetVector(ev->Get()->Record.GetSource().GetColumnTypes());
834838

835839
try {
836-
auto queryGroup = Counters->GetSubgroup("queryId", ev->Get()->Record.GetQueryId());
837-
auto topicGroup = queryGroup->GetSubgroup("topic", CleanupCounterValueString(TopicPath));
840+
if (Parser) {
841+
// Parse remains data before adding new client
842+
DoParsing(true);
843+
}
844+
auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
845+
auto readGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(ReadGroup));
838846
auto& clientInfo = Clients.emplace(
839847
std::piecewise_construct,
840848
std::forward_as_tuple(ev->Sender),
841-
std::forward_as_tuple(ev, topicGroup)).first->second;
849+
std::forward_as_tuple(ev, readGroup)).first->second;
842850
UpdateFieldsIds(clientInfo);
843851

844852
const auto& source = clientInfo.Settings.GetSource();
@@ -1044,7 +1052,7 @@ void TTopicSession::SendStatisticToRowDispatcher() {
10441052
commonStatistic.LastReadedOffset = LastMessageOffset;
10451053
SessionStats.Clear();
10461054

1047-
sessionStatistic.SessionKey = TopicSessionParams{Endpoint, Database, TopicPath, PartitionId};
1055+
sessionStatistic.SessionKey = TopicSessionParams{ReadGroup, Endpoint, Database, TopicPath, PartitionId};
10481056
sessionStatistic.Clients.reserve(Clients.size());
10491057
for (auto& [readActorId, info] : Clients) {
10501058
TopicSessionClientStatistic clientStatistic;
@@ -1155,6 +1163,7 @@ TString TTopicSession::GetAnyQueryIdByFieldName(const TString& fieldName) {
11551163
////////////////////////////////////////////////////////////////////////////////
11561164

11571165
std::unique_ptr<NActors::IActor> NewTopicSession(
1166+
const TString& readGroup,
11581167
const TString& topicPath,
11591168
const TString& endpoint,
11601169
const TString& database,
@@ -1167,7 +1176,7 @@ std::unique_ptr<NActors::IActor> NewTopicSession(
11671176
const ::NMonitoring::TDynamicCounterPtr& counters,
11681177
const NYql::IPqGateway::TPtr& pqGateway,
11691178
ui64 maxBufferSize) {
1170-
return std::unique_ptr<NActors::IActor>(new TTopicSession(topicPath, endpoint, database, config, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway, maxBufferSize));
1179+
return std::unique_ptr<NActors::IActor>(new TTopicSession(readGroup, topicPath, endpoint, database, config, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, pqGateway, maxBufferSize));
11711180
}
11721181

11731182
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/topic_session.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
namespace NFq {
1717

1818
std::unique_ptr<NActors::IActor> NewTopicSession(
19+
const TString& readGroup,
1920
const TString& topicPath,
2021
const TString& endpoint,
2122
const TString& database,

0 commit comments

Comments
 (0)