Skip to content

Commit 979a3d1

Browse files
authored
YQ / Sync to stable (#11339)
1 parent 436a844 commit 979a3d1

File tree

13 files changed

+235
-102
lines changed

13 files changed

+235
-102
lines changed

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ struct TEvPrivate {
5050
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
5151
EvCoordinatorPing = EvBegin + 20,
5252
EvUpdateMetrics,
53+
EvPrintStateToLog,
5354
EvEnd
5455
};
5556

5657
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
5758
struct TEvCoordinatorPing : NActors::TEventLocal<TEvCoordinatorPing, EvCoordinatorPing> {};
5859
struct TEvUpdateMetrics : public NActors::TEventLocal<TEvUpdateMetrics, EvUpdateMetrics> {};
60+
struct TEvPrintStateToLog : public NActors::TEventLocal<TEvPrintStateToLog, EvPrintStateToLog> {};
5961
};
6062

6163
struct TQueryStat {
@@ -65,6 +67,7 @@ struct TQueryStat {
6567
};
6668

6769
ui64 UpdateMetricsPeriodSec = 60;
70+
ui64 PrintStateToLogPeriodSec = 300;
6871

6972
class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
7073

@@ -128,6 +131,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
128131
TRowDispatcherMetrics Metrics;
129132
NYql::IPqGateway::TPtr PqGateway;
130133
THashSet<TActorId> InterconnectSessions;
134+
TMap<ui32, bool> NodeConnected;
131135

132136
struct ConsumerCounters {
133137
ui64 NewDataArrived = 0;
@@ -141,16 +145,17 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
141145
NActors::TActorId selfId,
142146
ui64 eventQueueId,
143147
NFq::NRowDispatcherProto::TEvStartSession& proto,
144-
TActorId topicSessionId)
148+
TActorId topicSessionId,
149+
bool alreadyConnected)
145150
: ReadActorId(readActorId)
146151
, SourceParams(proto.GetSource())
147152
, PartitionId(proto.GetPartitionId())
148153
, EventQueueId(eventQueueId)
149154
, Proto(proto)
150155
, TopicSessionId(topicSessionId)
151156
, QueryId(proto.GetQueryId()) {
152-
EventsQueue.Init("txId", selfId, selfId, eventQueueId, /* KeepAlive */ true);
153-
EventsQueue.OnNewRecipientId(readActorId);
157+
EventsQueue.Init("txId", selfId, selfId, eventQueueId, /* KeepAlive */ true, /* UseConnect */ false);
158+
EventsQueue.OnNewRecipientId(readActorId, true, alreadyConnected);
154159
}
155160

156161
NActors::TActorId ReadActorId;
@@ -215,10 +220,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
215220
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvPing::TPtr&);
216221
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr&);
217222
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
218-
223+
void Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&);
224+
219225
void DeleteConsumer(const ConsumerSessionKey& key);
220226
void UpdateInterconnectSessions(const NActors::TActorId& interconnectSession);
221227
void UpdateMetrics();
228+
void PrintInternalState();
222229

223230
STRICT_STFUNC(
224231
StateFunc, {
@@ -242,6 +249,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
242249
hFunc(NActors::TEvents::TEvPing, Handle);
243250
hFunc(NFq::TEvRowDispatcher::TEvNewDataArrived, Handle);
244251
hFunc(NFq::TEvPrivate::TEvUpdateMetrics, Handle);
252+
hFunc(NFq::TEvPrivate::TEvPrintStateToLog, Handle);
245253
})
246254
};
247255

@@ -275,6 +283,7 @@ void TRowDispatcher::Bootstrap() {
275283
Register(NewLeaderElection(SelfId(), coordinatorId, config, CredentialsProviderFactory, YqSharedResources, Tenant, Counters).release());
276284
Schedule(TDuration::Seconds(CoordinatorPingPeriodSec), new TEvPrivate::TEvCoordinatorPing());
277285
Schedule(TDuration::Seconds(UpdateMetricsPeriodSec), new NFq::TEvPrivate::TEvUpdateMetrics());
286+
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
278287
}
279288

280289
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -292,13 +301,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr&
292301

293302
void TRowDispatcher::HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
294303
LOG_ROW_DISPATCHER_DEBUG("EvNodeConnected, node id " << ev->Get()->NodeId);
304+
NodeConnected[ev->Get()->NodeId] = true;
295305
for (auto& [actorId, consumer] : Consumers) {
296306
consumer->EventsQueue.HandleNodeConnected(ev->Get()->NodeId);
297307
}
298308
}
299309

300310
void TRowDispatcher::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
301311
LOG_ROW_DISPATCHER_DEBUG("TEvNodeDisconnected, node id " << ev->Get()->NodeId);
312+
NodeConnected[ev->Get()->NodeId] = false;
302313
for (auto& [actorId, consumer] : Consumers) {
303314
consumer->EventsQueue.HandleNodeDisconnected(ev->Get()->NodeId);
304315
}
@@ -339,18 +350,34 @@ void TRowDispatcher::UpdateMetrics() {
339350
return;
340351
}
341352
TMap<TString, TQueryStat> queryStats;
342-
TStringStream str;
353+
354+
for (auto& [key, sessionsInfo] : TopicSessions) {
355+
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
356+
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
357+
auto& stat = queryStats[consumer->QueryId];
358+
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
359+
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
360+
}
361+
}
362+
}
363+
for (const auto& [queryId, stat] : queryStats) {
364+
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
365+
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
366+
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
367+
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
368+
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
369+
}
370+
}
343371

372+
void TRowDispatcher::PrintInternalState() {
373+
TStringStream str;
344374
str << "Statistics:\n";
345375
for (auto& [key, sessionsInfo] : TopicSessions) {
346376
str << " " << key.Endpoint << " / " << key.Database << " / " << key.TopicPath << " / " << key.PartitionId;
347377
for (auto& [actorId, sessionInfo] : sessionsInfo.Sessions) {
348378
str << " / " << actorId << "\n";
349379
str << " unread bytes " << sessionInfo.Stat.UnreadBytes << "\n";
350380
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
351-
auto& stat = queryStats[consumer->QueryId];
352-
stat.UnreadRows.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadRows));
353-
stat.UnreadBytes.Add(NYql::TCounters::TEntry(consumer->Stat.UnreadBytes));
354381
str << " " << consumer->QueryId << " " << readActorId << " unread rows "
355382
<< consumer->Stat.UnreadRows << " unread bytes " << consumer->Stat.UnreadBytes << " offset " << consumer->Stat.Offset
356383
<< " get " << consumer->Counters.GetNextBatch
@@ -361,15 +388,6 @@ void TRowDispatcher::UpdateMetrics() {
361388
}
362389
}
363390
LOG_ROW_DISPATCHER_DEBUG(str.Str());
364-
365-
for (const auto& [queryId, stat] : queryStats) {
366-
LOG_ROW_DISPATCHER_DEBUG("UnreadBytes " << queryId << " " << stat.UnreadBytes.Max);
367-
auto queryGroup = Metrics.Counters->GetSubgroup("queryId", queryId);
368-
queryGroup->GetCounter("MaxUnreadRows")->Set(stat.UnreadRows.Max);
369-
queryGroup->GetCounter("AvgUnreadRows")->Set(stat.UnreadRows.Avg);
370-
queryGroup->GetCounter("MaxUnreadBytes")->Set(stat.UnreadBytes.Max);
371-
queryGroup->GetCounter("AvgUnreadBytes")->Set(stat.UnreadBytes.Avg);
372-
}
373391
}
374392

375393
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
@@ -384,7 +402,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
384402
ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
385403
auto it = Consumers.find(key);
386404
if (it != Consumers.end()) {
387-
LOG_ROW_DISPATCHER_ERROR("Сonsumer already exists, ignore StartSession");
405+
LOG_ROW_DISPATCHER_ERROR("Consumer already exists, ignore StartSession");
388406
return;
389407
}
390408
const auto& source = ev->Get()->Record.GetSource();
@@ -395,7 +413,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
395413
LOG_ROW_DISPATCHER_DEBUG("Topic session count " << topicSessionInfo.Sessions.size());
396414
Y_ENSURE(topicSessionInfo.Sessions.size() <= 1);
397415

398-
auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId());
416+
auto consumerInfo = MakeAtomicShared<ConsumerInfo>(ev->Sender, SelfId(), NextEventQueueId++, ev->Get()->Record, TActorId(), NodeConnected[ev->Sender.NodeId()]);
399417
Consumers[key] = consumerInfo;
400418
ConsumersByEventQueueId[consumerInfo->EventQueueId] = consumerInfo;
401419
if (!consumerInfo->EventsQueue.OnEventReceived(ev)) {
@@ -609,6 +627,11 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&) {
609627
UpdateMetrics();
610628
}
611629

630+
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvPrintStateToLog::TPtr&) {
631+
PrintInternalState();
632+
Schedule(TDuration::Seconds(PrintStateToLogPeriodSec), new NFq::TEvPrivate::TEvPrintStateToLog());
633+
}
634+
612635
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionStatistic::TPtr& ev) {
613636
LOG_ROW_DISPATCHER_TRACE("TEvSessionStatistic from " << ev->Sender);
614637
const auto& key = ev->Get()->Stat.SessionKey;

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
161161
TParserSchema ParserSchema;
162162
THashMap<TString, ui64> FieldsIndexes;
163163
NYql::IPqGateway::TPtr PqGateway;
164+
TMaybe<TString> ConsumerName;
164165

165166
public:
166167
explicit TTopicSession(
@@ -192,7 +193,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
192193
void DoFiltering(const TVector<ui64>& offsets, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
193194
void SendData(TClientsInfo& info);
194195
void UpdateParser();
195-
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter = nullptr);
196+
void FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter, bool addParserDescription);
196197
void SendDataArrived(TClientsInfo& client);
197198
void StopReadSession();
198199
TString GetSessionId() const;
@@ -218,6 +219,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
218219
TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*> RebuildJson(const TClientsInfo& info, const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& parsedValues);
219220
void UpdateParserSchema(const TParserInputType& inputType);
220221
void UpdateFieldsIds(TClientsInfo& clientInfo);
222+
bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev);
221223

222224
private:
223225

@@ -494,7 +496,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClose
494496
LOG_ROW_DISPATCHER_DEBUG(message);
495497
NYql::TIssues issues;
496498
issues.AddIssue(message);
497-
Self.FatalError(issues.ToOneLineString());
499+
Self.FatalError(issues.ToOneLineString(), nullptr, false);
498500
}
499501

500502
void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent& event) {
@@ -580,7 +582,7 @@ void TTopicSession::DoParsing(bool force) {
580582
const auto& parsedValues = Parser->Parse();
581583
DoFiltering(Parser->GetOffsets(), parsedValues);
582584
} catch (const std::exception& e) {
583-
FatalError(e.what());
585+
FatalError(e.what(), nullptr, true);
584586
}
585587
}
586588

@@ -594,7 +596,7 @@ void TTopicSession::DoFiltering(const TVector<ui64>& offsets, const TVector<NKik
594596
info.Filter->Push(offsets, RebuildJson(info, parsedValues));
595597
}
596598
} catch (const std::exception& e) {
597-
FatalError(e.what(), &info.Filter);
599+
FatalError(e.what(), &info.Filter, false);
598600
}
599601
}
600602

@@ -662,15 +664,13 @@ bool HasJsonColumns(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {
662664
}
663665

664666
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
665-
auto it = Clients.find(ev->Sender);
666-
if (it != Clients.end()) {
667-
FatalError("Internal error: sender " + ev->Sender.ToString());
668-
return;
669-
}
670-
671667
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: "
672668
<< ev->Get()->Record.GetSource().GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset());
673669

670+
if (!CheckNewClient(ev)) {
671+
return;
672+
}
673+
674674
auto columns = GetVector(ev->Get()->Record.GetSource().GetColumns());
675675
auto types = GetVector(ev->Get()->Record.GetSource().GetColumnTypes());
676676

@@ -712,12 +712,13 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
712712
}
713713
}
714714
} catch (const NYql::NPureCalc::TCompileError& e) {
715-
FatalError("Adding new client failed: CompileError: sql: " + e.GetYql() + ", error: " + e.GetIssues());
715+
FatalError("Adding new client failed: CompileError: sql: " + e.GetYql() + ", error: " + e.GetIssues(), nullptr, true);
716716
} catch (const yexception &ex) {
717-
FatalError(TString{"Adding new client failed: "} + ex.what());
717+
FatalError(TString{"Adding new client failed: "} + ex.what(), nullptr, true);
718718
} catch (...) {
719-
FatalError("Adding new client failed, " + CurrentExceptionMessage());
719+
FatalError("Adding new client failed, " + CurrentExceptionMessage(), nullptr, true);
720720
}
721+
ConsumerName = ev->Get()->Record.GetSource().GetConsumerName();
721722
UpdateParser();
722723
SendStatistic();
723724
if (!ReadSession) {
@@ -810,14 +811,14 @@ void TTopicSession::UpdateParser() {
810811
const auto& parserConfig = Config.GetJsonParser();
811812
Parser = NewJsonParser(names, types, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()));
812813
} catch (const NYql::NPureCalc::TCompileError& e) {
813-
FatalError(e.GetIssues());
814+
FatalError(e.GetIssues(), nullptr, true);
814815
}
815816
}
816817

817-
void TTopicSession::FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter) {
818+
void TTopicSession::FatalError(const TString& message, const std::unique_ptr<TJsonFilter>* filter, bool addParserDescription) {
818819
TStringStream str;
819820
str << message;
820-
if (Parser) {
821+
if (Parser && addParserDescription) {
821822
str << ", parser description:\n" << Parser->GetDescription();
822823
}
823824
if (filter) {
@@ -868,7 +869,7 @@ void TTopicSession::HandleException(const std::exception& e) {
868869
if (CurrentStateFunc() == &TThis::ErrorState) {
869870
return;
870871
}
871-
FatalError(TString("Internal error: exception: ") + e.what());
872+
FatalError(TString("Internal error: exception: ") + e.what(), nullptr, false);
872873
}
873874

874875
void TTopicSession::SendStatistic() {
@@ -894,6 +895,23 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) {
894895
SendStatistic();
895896
}
896897

898+
bool TTopicSession::CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
899+
auto it = Clients.find(ev->Sender);
900+
if (it != Clients.end()) {
901+
LOG_ROW_DISPATCHER_ERROR("Such a client already exists");
902+
SendSessionError(ev->Sender, "Internal error: such a client already exists");
903+
return false;
904+
}
905+
if (!Config.GetWithoutConsumer()
906+
&& ConsumerName
907+
&& ConsumerName != ev->Get()->Record.GetSource().GetConsumerName()) {
908+
LOG_ROW_DISPATCHER_INFO("Different consumer, expected " << ConsumerName << ", actual " << ev->Get()->Record.GetSource().GetConsumerName() << ", send error");
909+
SendSessionError(ev->Sender, TStringBuilder() << "Use the same consumer in all queries via RD (current consumer " << ConsumerName << ")");
910+
return false;
911+
}
912+
return true;
913+
}
914+
897915
} // namespace
898916

899917
////////////////////////////////////////////////////////////////////////////////

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,15 @@ class TFixture : public NUnitTest::TBaseFixture {
3939

4040
ReadActorId1 = Runtime.AllocateEdgeActor();
4141
ReadActorId2 = Runtime.AllocateEdgeActor();
42+
ReadActorId3 = Runtime.AllocateEdgeActor();
4243
RowDispatcherActorId = Runtime.AllocateEdgeActor();
4344
}
4445

4546
void Init(const TString& topicPath, ui64 maxSessionUsedMemory = std::numeric_limits<ui64>::max()) {
4647
Config.SetTimeoutBeforeStartSessionSec(TimeoutBeforeStartSessionSec);
4748
Config.SetMaxSessionUsedMemory(maxSessionUsedMemory);
4849
Config.SetSendStatusPeriodSec(2);
49-
Config.SetWithoutConsumer(true);
50+
Config.SetWithoutConsumer(false);
5051

5152
auto credFactory = NKikimr::CreateYdbCredentialsProviderFactory;
5253
auto yqSharedResources = NFq::TYqSharedResources::Cast(NFq::CreateYqSharedResourcesImpl({}, credFactory, MakeIntrusive<NMonitoring::TDynamicCounters>()));
@@ -91,11 +92,11 @@ class TFixture : public NUnitTest::TBaseFixture {
9192
Runtime.Send(new IEventHandle(TopicSession, readActorId, event));
9293
}
9394

94-
NYql::NPq::NProto::TDqPqTopicSource BuildSource(TString topic, bool emptyPredicate = false) {
95+
NYql::NPq::NProto::TDqPqTopicSource BuildSource(TString topic, bool emptyPredicate = false, const TString& consumer = DefaultPqConsumer) {
9596
NYql::NPq::NProto::TDqPqTopicSource settings;
9697
settings.SetEndpoint(GetDefaultPqEndpoint());
9798
settings.SetTopicPath(topic);
98-
settings.SetConsumerName("PqConsumer");
99+
settings.SetConsumerName(consumer);
99100
settings.MutableToken()->SetName("token");
100101
settings.SetDatabase(GetDefaultPqDatabase());
101102
settings.AddColumns("dt");
@@ -162,6 +163,7 @@ class TFixture : public NUnitTest::TBaseFixture {
162163
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
163164
NActors::TActorId ReadActorId1;
164165
NActors::TActorId ReadActorId2;
166+
NActors::TActorId ReadActorId3;
165167
ui64 PartitionId = 0;
166168
NConfig::TRowDispatcherConfig Config;
167169

@@ -186,6 +188,10 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
186188
ExpectMessageBatch(ReadActorId1, { Json1 });
187189
ExpectMessageBatch(ReadActorId2, { Json1 });
188190

191+
auto source2 = BuildSource(topicName, false, "OtherConsumer");
192+
StartSession(ReadActorId3, source2);
193+
ExpectSessionError(ReadActorId3, "Use the same consumer");
194+
189195
StopSession(ReadActorId1, source);
190196
StopSession(ReadActorId2, source);
191197
}

0 commit comments

Comments
 (0)