Skip to content

Commit f454f01

Browse files
authored
YQ-4121 Check generation by TEvNoSession / to stable (#14798)
1 parent 03cb887 commit f454f01

File tree

3 files changed

+26
-13
lines changed

3 files changed

+26
-13
lines changed

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& ev) {
899899
}
900900

901901
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNoSession::TPtr& ev) {
902-
LOG_ROW_DISPATCHER_DEBUG("Received TEvNoSession from " << ev->Sender << ", cookie " << ev->Cookie);
902+
LOG_ROW_DISPATCHER_DEBUG("Received TEvNoSession from " << ev->Sender << ", generation " << ev->Cookie);
903+
auto consumerIt = Consumers.find(ev->Sender);
904+
if (consumerIt == Consumers.end()) {
905+
return;
906+
}
907+
const auto& consumer = consumerIt->second;
908+
if (consumer->Generation != ev->Cookie) {
909+
return;
910+
}
903911
DeleteConsumer(ev->Sender);
904912
}
905913

ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ class TFixture : public NUnitTest::TBaseFixture {
7373
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_TRACE);
7474
NConfig::TRowDispatcherConfig config;
7575
config.SetEnabled(true);
76+
config.SetSendStatusPeriodSec(1);
7677
NConfig::TRowDispatcherCoordinatorConfig& coordinatorConfig = *config.MutableCoordinator();
7778
coordinatorConfig.SetCoordinationNodePath("RowDispatcher");
7879
auto& database = *coordinatorConfig.MutableDatabase();
@@ -156,9 +157,9 @@ class TFixture : public NUnitTest::TBaseFixture {
156157
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
157158
}
158159

159-
void MockNoSession(TActorId readActorId) {
160+
void MockNoSession(TActorId readActorId, ui64 generation) {
160161
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvNoSession>();
161-
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
162+
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, generation));
162163
}
163164

164165
void MockNewDataArrived(ui64 partitionId, TActorId topicSessionId, TActorId readActorId) {
@@ -181,10 +182,10 @@ class TFixture : public NUnitTest::TBaseFixture {
181182
Runtime.Send(new IEventHandle(RowDispatcher, topicSessionId, event.release()));
182183
}
183184

184-
void MockGetNextBatch(ui64 partitionId, TActorId readActorId, ui64 generation) {
185+
void MockGetNextBatch(ui64 partitionId, TActorId readActorId, ui64 generation, ui64 seqNo = 2) {
185186
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvGetNextBatch>();
186187
event->Record.SetPartitionId(partitionId);
187-
event->Record.MutableTransportMeta()->SetSeqNo(2);
188+
event->Record.MutableTransportMeta()->SetSeqNo(seqNo);
188189
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, generation));
189190
}
190191

@@ -236,11 +237,11 @@ class TFixture : public NUnitTest::TBaseFixture {
236237
return actorId;
237238
}
238239

239-
void ProcessData(NActors::TActorId readActorId, ui64 partId, NActors::TActorId topicSessionActorId, ui64 generation = 1) {
240+
void ProcessData(NActors::TActorId readActorId, ui64 partId, NActors::TActorId topicSessionActorId, ui64 generation = 1, ui64 seqNo = 1) {
240241
MockNewDataArrived(partId, topicSessionActorId, readActorId);
241242
ExpectNewDataArrived(readActorId, partId);
242243

243-
MockGetNextBatch(partId, readActorId, generation);
244+
MockGetNextBatch(partId, readActorId, generation, seqNo);
244245
ExpectGetNextBatch(topicSessionActorId, partId);
245246

246247
MockMessageBatch(partId, topicSessionActorId, readActorId, generation);
@@ -440,13 +441,17 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
440441
}
441442

442443
Y_UNIT_TEST_F(ProcessNoSession, TFixture) {
443-
MockAddSession(Source1, {PartitionId0}, ReadActorId3);
444+
ui64 generation = 42;
445+
MockAddSession(Source1, {PartitionId0}, ReadActorId3, generation);
444446
auto topicSessionId = ExpectRegisterTopicSession();
445-
ExpectStartSessionAck(ReadActorId3);
447+
ExpectStartSessionAck(ReadActorId3, generation);
446448
ExpectStartSession(topicSessionId);
447-
ProcessData(ReadActorId3, PartitionId0, topicSessionId);
449+
ProcessData(ReadActorId3, PartitionId0, topicSessionId, generation, 2);
450+
451+
MockNoSession(ReadActorId3, generation - 1); // Ignore NoSession with wrong generation.
452+
ProcessData(ReadActorId3, PartitionId0, topicSessionId, generation, 3);
448453

449-
MockNoSession(ReadActorId3);
454+
MockNoSession(ReadActorId3, generation);
450455
ExpectStopSession(topicSessionId);
451456
}
452457

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -944,7 +944,7 @@ TDqPqRdReadActor::TSession* TDqPqRdReadActor::FindAndUpdateSession(const TEventP
944944
auto& session = sessionIt->second;
945945

946946
if (ev->Cookie != session.Generation) {
947-
SRC_LOG_W("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << session.Generation << ", send TEvStopSession");
947+
SRC_LOG_W("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << session.Generation << ", send TEvNoSession");
948948
SendNoSession(ev->Sender, ev->Cookie);
949949
return nullptr;
950950
}
@@ -995,13 +995,13 @@ void TDqPqRdReadActor::UpdateSessions() {
995995
continue;
996996
}
997997

998-
SRC_LOG_I("Create session to " << rowDispatcherActorId);
999998
auto queueId = ++NextEventQueueId;
1000999
Sessions.emplace(
10011000
std::piecewise_construct,
10021001
std::forward_as_tuple(rowDispatcherActorId),
10031002
std::forward_as_tuple(TxId, SelfId(), rowDispatcherActorId, queueId, ++NextGeneration));
10041003
auto& session = Sessions.at(rowDispatcherActorId);
1004+
SRC_LOG_I("Create session to " << rowDispatcherActorId << ", generation " << session.Generation);
10051005
for (auto partitionId : partitions) {
10061006
session.Partitions[partitionId];
10071007
}

0 commit comments

Comments
 (0)