Skip to content

Commit 26419cc

Browse files
authored
FQ: YQ-3988 / to stable (ydb-platform#13569)
1 parent 1682684 commit 26419cc

File tree

6 files changed

+113
-31
lines changed

6 files changed

+113
-31
lines changed

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ struct TEvRowDispatcher {
4747
EvCoordinatorResult,
4848
EvSessionStatistic,
4949
EvHeartbeat,
50+
EvNoSession,
5051
EvGetInternalStateRequest,
5152
EvGetInternalStateResponse,
5253
EvPurecalcCompileRequest,
@@ -84,6 +85,8 @@ struct TEvRowDispatcher {
8485
TEvCoordinatorResult() = default;
8586
};
8687

88+
// Session events (with seqNo checks)
89+
8790
struct TEvStartSession : public NActors::TEventPB<TEvStartSession,
8891
NFq::NRowDispatcherProto::TEvStartSession, EEv::EvStartSession> {
8992

@@ -155,13 +158,22 @@ struct TEvRowDispatcher {
155158
TTopicSessionStatistic Stat;
156159
};
157160

161+
// Network events (without seqNo checks)
162+
158163
struct TEvHeartbeat : public NActors::TEventPB<TEvHeartbeat, NFq::NRowDispatcherProto::TEvHeartbeat, EEv::EvHeartbeat> {
159164
TEvHeartbeat() = default;
160165
TEvHeartbeat(ui32 partitionId) {
161166
Record.SetPartitionId(partitionId);
162167
}
163168
};
164169

170+
struct TEvNoSession : public NActors::TEventPB<TEvNoSession, NFq::NRowDispatcherProto::TEvNoSession, EEv::EvNoSession> {
171+
TEvNoSession() = default;
172+
TEvNoSession(ui32 partitionId) {
173+
Record.SetPartitionId(partitionId);
174+
}
175+
};
176+
165177
struct TEvGetInternalStateRequest : public NActors::TEventPB<TEvGetInternalStateRequest,
166178
NFq::NRowDispatcherProto::TEvGetInternalStateRequest, EEv::EvGetInternalStateRequest> {
167179
TEvGetInternalStateRequest() = default;

ydb/core/fq/libs/row_dispatcher/protos/events.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ message TEvSessionError {
8585

8686
message TEvHeartbeat {
8787
uint32 PartitionId = 1;
88-
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
88+
}
89+
90+
message TEvNoSession {
91+
uint32 PartitionId = 1;
8992
}
9093

9194
message TEvGetInternalStateRequest {

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,8 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
394394
void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::TPtr& ev);
395395

396396
void Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev);
397+
void Handle(NFq::TEvRowDispatcher::TEvNoSession::TPtr& ev);
398+
397399
void Handle(const TEvPrivate::TEvTryConnect::TPtr&);
398400
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbeat::TPtr&);
399401
void Handle(NFq::TEvPrivate::TEvUpdateMetrics::TPtr&);
@@ -423,6 +425,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
423425
hFunc(NFq::TEvRowDispatcher::TEvMessageBatch, Handle);
424426
hFunc(NFq::TEvRowDispatcher::TEvStartSession, Handle);
425427
hFunc(NFq::TEvRowDispatcher::TEvStopSession, Handle);
428+
hFunc(NFq::TEvRowDispatcher::TEvNoSession, Handle);
426429
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
427430
hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle);
428431
hFunc(NFq::TEvRowDispatcher::TEvSessionStatistic, Handle);
@@ -858,7 +861,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
858861
}
859862
LWPROBE(Heartbeat, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
860863
LOG_ROW_DISPATCHER_TRACE("Received TEvHeartbeat from " << ev->Sender << ", part id " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId);
861-
CheckSession(it->second, ev);
864+
if (ev->Cookie != it->second->Generation) {
865+
LOG_ROW_DISPATCHER_WARN("Wrong message generation (TEvHeartbeat), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << it->second->Generation << ", query id " << it->second->QueryId);
866+
}
867+
}
868+
869+
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNoSession::TPtr& ev) {
870+
LOG_ROW_DISPATCHER_DEBUG("Received TEvNoSession from " << ev->Sender << ", cookie " << ev->Cookie);
871+
ConsumerSessionKey key{ev->Sender, ev->Get()->Record.GetPartitionId()};
872+
DeleteConsumer(key);
862873
}
863874

864875
template <class TEventPtr>
@@ -869,7 +880,7 @@ bool TRowDispatcher::CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, cons
869880
}
870881
if (!consumer->EventsQueue.OnEventReceived(ev)) {
871882
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
872-
LOG_ROW_DISPATCHER_WARN("Wrong seq num ignore message (" << typeid(TEventPtr).name() << ") seqNo " << meta.GetSeqNo() << " from " << ev->Sender.ToString() << ", query id " << consumer->QueryId);
883+
LOG_ROW_DISPATCHER_WARN("Wrong seq num, ignore message (" << typeid(TEventPtr).name() << ") seqNo " << meta.GetSeqNo() << " from " << ev->Sender.ToString() << ", query id " << consumer->QueryId);
873884
return false;
874885
}
875886
return true;
@@ -885,6 +896,11 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
885896
LWPROBE(StopSession, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
886897
LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
887898
" partitionId " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId);
899+
const auto& consumer = it->second;
900+
if (ev->Cookie != consumer->Generation) {
901+
LOG_ROW_DISPATCHER_WARN("Wrong message generation, ignore TEvStopSession, sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << consumer->Generation << ", query id " << consumer->QueryId);
902+
return;
903+
}
888904
if (!CheckSession(it->second, ev)) {
889905
return;
890906
}
@@ -946,7 +962,8 @@ void TRowDispatcher::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartbea
946962
bool needSend = sessionInfo->EventsQueue.Heartbeat();
947963
if (needSend) {
948964
LOG_ROW_DISPATCHER_TRACE("Send TEvHeartbeat to " << sessionInfo->ReadActorId << " query id " << sessionInfo->QueryId);
949-
sessionInfo->EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo->PartitionId), sessionInfo->Generation);
965+
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvHeartbeat>(sessionInfo->PartitionId);
966+
Send(new IEventHandle(sessionInfo->ReadActorId, SelfId(), event.release(), 0, sessionInfo->Generation));
950967
}
951968
}
952969

ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,23 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory {
5151
};
5252

5353
class TFixture : public NUnitTest::TBaseFixture {
54-
54+
const ui64 NodesCount = 2;
5555
public:
5656
TFixture()
57-
: Runtime(1) {}
57+
: Runtime(NodesCount) {}
5858

5959
void SetUp(NUnitTest::TTestContext&) override {
60+
TIntrusivePtr<TTableNameserverSetup> nameserverTable(new TTableNameserverSetup());
61+
TPortManager pm;
62+
for (ui32 i = 0; i < NodesCount; ++i) {
63+
nameserverTable->StaticNodeTable[Runtime.GetNodeId(i)] = std::pair<TString, ui32>("127.0.0." + std::to_string(i + 1), pm.GetPort(12001 + i));
64+
}
65+
const TActorId nameserviceId = GetNameserviceActorId();
66+
for (ui32 i = 0; i < NodesCount; ++i) {
67+
TActorSetupCmd nameserviceSetup(CreateNameserverTable(nameserverTable), TMailboxType::Simple, 0);
68+
Runtime.AddLocalService(nameserviceId, std::move(nameserviceSetup), i);
69+
}
70+
6071
TAutoPtr<TAppPrepare> app = new TAppPrepare();
6172
Runtime.Initialize(app->Unwrap());
6273
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_TRACE);
@@ -78,6 +89,7 @@ class TFixture : public NUnitTest::TBaseFixture {
7889
EdgeActor = Runtime.AllocateEdgeActor();
7990
ReadActorId1 = Runtime.AllocateEdgeActor();
8091
ReadActorId2 = Runtime.AllocateEdgeActor();
92+
ReadActorId3 = Runtime.AllocateEdgeActor(1);
8193
TestActorFactory = MakeIntrusive<TTestActorFactory>(Runtime);
8294

8395
NYql::TPqGatewayServices pqServices(
@@ -133,13 +145,21 @@ class TFixture : public NUnitTest::TBaseFixture {
133145
Nothing(), // readOffset,
134146
0, // StartingMessageTimestamp;
135147
"QueryId");
148+
event->Record.MutableTransportMeta()->SetSeqNo(1);
136149
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event, 0, generation));
137150
}
138151

139152
void MockStopSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId) {
140153
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
141154
event->Record.MutableSource()->CopyFrom(source);
142155
event->Record.SetPartitionId(partitionId);
156+
event->Record.MutableTransportMeta()->SetSeqNo(1);
157+
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
158+
}
159+
160+
void MockNoSession(ui64 partitionId, TActorId readActorId) {
161+
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvNoSession>();
162+
event->Record.SetPartitionId(partitionId);
143163
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
144164
}
145165

@@ -167,6 +187,7 @@ class TFixture : public NUnitTest::TBaseFixture {
167187
void MockGetNextBatch(ui64 partitionId, TActorId readActorId, ui64 generation) {
168188
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
169189
event->Record.SetPartitionId(partitionId);
190+
event->Record.MutableTransportMeta()->SetSeqNo(2);
170191
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, generation));
171192
}
172193

@@ -239,6 +260,7 @@ class TFixture : public NUnitTest::TBaseFixture {
239260
NActors::TActorId EdgeActor;
240261
NActors::TActorId ReadActorId1;
241262
NActors::TActorId ReadActorId2;
263+
NActors::TActorId ReadActorId3;
242264
TIntrusivePtr<TTestActorFactory> TestActorFactory;
243265

244266
NYql::NPq::NProto::TDqPqTopicSource Source1 = BuildPqTopicSourceSettings("Endpoint1", "Database1", "topic", "connection_id1");
@@ -421,6 +443,17 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
421443
MockStopSession(Source1Connection2, PartitionId0, ReadActorId2);
422444
ExpectStopSession(session2, PartitionId0);
423445
}
446+
447+
Y_UNIT_TEST_F(ProcessNoSession, TFixture) {
448+
MockAddSession(Source1, PartitionId0, ReadActorId3);
449+
auto topicSessionId = ExpectRegisterTopicSession();
450+
ExpectStartSessionAck(ReadActorId3);
451+
ExpectStartSession(topicSessionId);
452+
ProcessData(ReadActorId3, PartitionId0, topicSessionId);
453+
454+
MockNoSession(PartitionId0, ReadActorId3);
455+
ExpectStopSession(topicSessionId, PartitionId0);
456+
}
424457
}
425458

426459
}

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
284284
void TrySendGetNextBatch(SessionInfo& sessionInfo);
285285
template <class TEventPtr>
286286
bool CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId);
287-
void SendStopSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie);
287+
void SendNoSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie);
288288
void NotifyCA();
289289
};
290290

@@ -501,7 +501,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e
501501
SRC_LOG_W("Ignore TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
502502
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
503503
YQL_ENSURE(State != EState::STARTED);
504-
SendStopSession(ev->Sender, partitionId, ev->Cookie);
504+
SendNoSession(ev->Sender, partitionId, ev->Cookie);
505505
return;
506506
}
507507
auto& sessionInfo = sessionIt->second;
@@ -522,7 +522,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)
522522
SRC_LOG_W("Ignore TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
523523
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
524524
YQL_ENSURE(State != EState::STARTED);
525-
SendStopSession(ev->Sender, partitionId, ev->Cookie);
525+
SendNoSession(ev->Sender, partitionId, ev->Cookie);
526526
return;
527527
}
528528

@@ -547,7 +547,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
547547
SRC_LOG_W("Ignore TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
548548
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
549549
YQL_ENSURE(State != EState::STARTED);
550-
SendStopSession(ev->Sender, partitionId, ev->Cookie);
550+
SendNoSession(ev->Sender, partitionId, ev->Cookie);
551551
return;
552552
}
553553
auto& sessionInfo = sessionIt->second;
@@ -580,7 +580,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
580580
SRC_LOG_W("Ignore TEvNewDataArrived from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
581581
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
582582
YQL_ENSURE(State != EState::STARTED);
583-
SendStopSession(ev->Sender, partitionId, ev->Cookie);
583+
SendNoSession(ev->Sender, partitionId, ev->Cookie);
584584
return;
585585
}
586586

@@ -619,24 +619,25 @@ void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvEvHeartb
619619
bool needSend = sessionInfo.EventsQueue.Heartbeat();
620620
if (needSend) {
621621
SRC_LOG_T("Send TEvEvHeartbeat");
622-
sessionInfo.EventsQueue.Send(new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo.PartitionId), sessionInfo.Generation);
622+
Send(sessionInfo.RowDispatcherActorId, new NFq::TEvRowDispatcher::TEvHeartbeat(sessionInfo.PartitionId), sessionInfo.Generation);
623623
}
624624
}
625625

626626
void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
627-
SRC_LOG_T("Received TEvHeartbeat from " << ev->Sender);
627+
SRC_LOG_T("Received TEvHeartbeat from " << ev->Sender << ", generation " << ev->Cookie);
628628
Counters.Heartbeat++;
629-
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
630629

631630
ui64 partitionId = ev->Get()->Record.GetPartitionId();
632631
auto sessionIt = Sessions.find(partitionId);
633632
if (sessionIt == Sessions.end()) {
634-
SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
635-
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
636-
SendStopSession(ev->Sender, partitionId, ev->Cookie);
633+
SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", PartitionId " << partitionId << ", generation " << ev->Cookie);
634+
SendNoSession(ev->Sender, partitionId, ev->Cookie);
637635
return;
638636
}
639-
CheckSession(sessionIt->second, ev, partitionId);
637+
638+
if (ev->Cookie != sessionIt->second.Generation) {
639+
SendNoSession(ev->Sender, partitionId, ev->Cookie);
640+
}
640641
}
641642

642643
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr& ev) {
@@ -723,9 +724,12 @@ void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::
723724
}
724725

725726
void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
726-
SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString());
727+
SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << " generation " << ev->Cookie);
727728
Counters.Undelivered++;
728729
for (auto& [partitionId, sessionInfo] : Sessions) {
730+
if (ev->Cookie != sessionInfo.Generation) {
731+
continue;
732+
}
729733
if (sessionInfo.EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) {
730734
ReInit(TStringBuilder() << "Session closed, partition id " << sessionInfo.PartitionId);
731735
break;
@@ -747,7 +751,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
747751
SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
748752
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
749753
YQL_ENSURE(State != EState::STARTED);
750-
SendStopSession(ev->Sender, partitionId, ev->Cookie);
754+
SendNoSession(ev->Sender, partitionId, ev->Cookie);
751755
return;
752756
}
753757

@@ -823,7 +827,7 @@ void TDqPqRdReadActor::PrintInternalState() {
823827

824828
TString TDqPqRdReadActor::GetInternalState() {
825829
TStringStream str;
826-
str << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n";
830+
str << LogPrefix << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n";
827831
str << "Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult
828832
<< " MessageBatch " << Counters.MessageBatch << " StartSessionAck " << Counters.StartSessionAck << " NewDataArrived " << Counters.NewDataArrived
829833
<< " SessionError " << Counters.SessionError << " Statistics " << Counters.Statistics << " NodeDisconnected " << Counters.NodeDisconnected
@@ -867,7 +871,7 @@ template <class TEventPtr>
867871
bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId) {
868872
if (ev->Cookie != session.Generation) {
869873
SRC_LOG_W("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << session.Generation << ", send TEvStopSession");
870-
SendStopSession(ev->Sender, partitionId, ev->Cookie);
874+
SendNoSession(ev->Sender, partitionId, ev->Cookie);
871875
return false;
872876
}
873877
if (!session.EventsQueue.OnEventReceived(ev)) {
@@ -878,9 +882,9 @@ bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, u
878882
return true;
879883
}
880884

881-
void TDqPqRdReadActor::SendStopSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) {
882-
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
883-
*event->Record.MutableSource() = SourceParams;
885+
void TDqPqRdReadActor::SendNoSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) {
886+
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvNoSession>();
887+
//*event->Record.MutableSource() = SourceParams;
884888
event->Record.SetPartitionId(partitionId);
885889
Send(recipient, event.release(), 0, cookie);
886890
}

0 commit comments

Comments
 (0)