Skip to content

Commit 36884b0

Browse files
authored
YQ-4085 FQ: Add partition id check in row dispatcher #14178 / to stable (#14191)
1 parent ae3ca4f commit 36884b0

File tree

4 files changed

+44
-13
lines changed

4 files changed

+44
-13
lines changed

ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,9 @@ TString TRowDispatcher::GetInternalState() {
710710
}
711711

712712
for (auto& [readActorId, consumer] : sessionInfo.Consumers) {
713+
if (!consumer->Partitions.contains(key.PartitionId)) {
714+
continue;
715+
}
713716
const auto& partition = consumer->Partitions[key.PartitionId];
714717
str << " " << consumer->QueryId << " " << LeftPad(readActorId, 32) << " unread bytes "
715718
<< toHuman(consumer->Stat.QueuedBytes) << " (" << leftPad(consumer->Stat.QueuedRows) << " rows) "
@@ -913,7 +916,7 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
913916
}
914917

915918
LWPROBE(StopSession, ev->Sender.ToString(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
916-
LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " query id " << it->second->QueryId);
919+
LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession from " << ev->Sender << " topic " << ev->Get()->Record.GetSource().GetTopicPath() << " query id " << it->second->QueryId);
917920
if (!CheckSession(it->second, ev)) {
918921
return;
919922
}
@@ -942,8 +945,11 @@ void TRowDispatcher::DeleteConsumer(NActors::TActorId readActorId) {
942945
partitionId};
943946
TTopicSessionInfo& topicSessionInfo = TopicSessions[topicKey];
944947
TSessionInfo& sessionInfo = topicSessionInfo.Sessions[partition.TopicSessionId];
945-
Y_ENSURE(sessionInfo.Consumers.count(consumer->ReadActorId));
946-
sessionInfo.Consumers.erase(consumer->ReadActorId);
948+
if (!sessionInfo.Consumers.contains(consumer->ReadActorId)) {
949+
LOG_ROW_DISPATCHER_ERROR("Wrong readActorId " << consumer->ReadActorId << ", no such consumer");
950+
} else {
951+
sessionInfo.Consumers.erase(consumer->ReadActorId);
952+
}
947953
if (sessionInfo.Consumers.empty()) {
948954
LOG_ROW_DISPATCHER_DEBUG("Session is not used, sent TEvPoisonPill to " << partition.TopicSessionId);
949955
topicSessionInfo.Sessions.erase(partition.TopicSessionId);
@@ -989,10 +995,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev)
989995
}
990996
LWPROBE(NewDataArrived, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
991997
LOG_ROW_DISPATCHER_TRACE("Forward TEvNewDataArrived from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId);
992-
auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()];
993-
partition.PendingNewDataArrived = true;
994-
it->second->Counters.NewDataArrived++;
995-
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
998+
auto consumerInfoPtr = it->second;
999+
auto partitionIt = consumerInfoPtr->Partitions.find(ev->Get()->Record.GetPartitionId());
1000+
if (partitionIt == consumerInfoPtr->Partitions.end()) {
1001+
// Ignore TEvNewDataArrived because read actor now read others partitions.
1002+
return;
1003+
}
1004+
partitionIt->second.PendingNewDataArrived = true;
1005+
consumerInfoPtr->Counters.NewDataArrived++;
1006+
consumerInfoPtr->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
9961007
}
9971008

9981009
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
@@ -1004,10 +1015,15 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
10041015
LWPROBE(MessageBatch, ev->Sender.ToString(), ev->Get()->ReadActorId.ToString(), it->second->QueryId, it->second->Generation, ev->Get()->Record.ByteSizeLong());
10051016
LOG_ROW_DISPATCHER_TRACE("Forward TEvMessageBatch from " << ev->Sender << " to " << ev->Get()->ReadActorId << " query id " << it->second->QueryId);
10061017
Metrics.RowsSent->Add(ev->Get()->Record.MessagesSize());
1007-
auto& partition = it->second->Partitions[ev->Get()->Record.GetPartitionId()];
1008-
partition.PendingGetNextBatch = false;
1009-
it->second->Counters.MessageBatch++;
1010-
it->second->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
1018+
auto consumerInfoPtr = it->second;
1019+
auto partitionIt = consumerInfoPtr->Partitions.find(ev->Get()->Record.GetPartitionId());
1020+
if (partitionIt == consumerInfoPtr->Partitions.end()) {
1021+
// Ignore TEvMessageBatch because read actor now read others partitions.
1022+
return;
1023+
}
1024+
partitionIt->second.PendingGetNextBatch = false;
1025+
consumerInfoPtr->Counters.MessageBatch++;
1026+
consumerInfoPtr->EventsQueue.Send(ev->Release().Release(), it->second->Generation);
10111027
}
10121028

10131029
void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) {

ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,18 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
449449
MockNoSession(ReadActorId3);
450450
ExpectStopSession(topicSessionId);
451451
}
452+
453+
Y_UNIT_TEST_F(IgnoreWrongPartitionId, TFixture) {
454+
MockAddSession(Source1, {PartitionId0}, ReadActorId1);
455+
auto topicSessionId = ExpectRegisterTopicSession();
456+
ExpectStartSessionAck(ReadActorId1);
457+
ExpectStartSession(topicSessionId);
458+
459+
MockNewDataArrived(PartitionId1, topicSessionId, ReadActorId1);
460+
461+
MockStopSession(Source1, ReadActorId1);
462+
ExpectStopSession(topicSessionId);
463+
}
452464
}
453465

454466
}

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
636636
}
637637
auto partitionIt = session->Partitions.find(ev->Get()->Record.GetPartitionId());
638638
if (partitionIt == session->Partitions.end()) {
639-
SRC_LOG_E("TEvNewDataArrived: wrong partition id " << ev->Get()->Record.GetPartitionId());
639+
SRC_LOG_E("Received TEvNewDataArrived from " << ev->Sender << " with wrong partition id " << ev->Get()->Record.GetPartitionId());
640640
Stop(NDqProto::StatusIds::INTERNAL_ERROR, {TIssue(TStringBuilder() << LogPrefix << "No partition with id " << ev->Get()->Record.GetPartitionId())});
641641
return;
642642
}

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,12 @@ void TDqPqReadActorBase::LoadState(const TSourceState& state) {
7373
minStartingMessageTs = Min(minStartingMessageTs, TInstant::MilliSeconds(stateProto.GetStartingMessageTimestampMs()));
7474
ingressBytes += stateProto.GetIngressBytes();
7575
}
76+
TStringStream str;
77+
str << "SessionId: " << GetSessionId() << " Restoring offset: ";
7678
for (const auto& [key, value] : PartitionToOffset) {
77-
SRC_LOG_D("SessionId: " << GetSessionId() << " Restoring offset: cluster " << key.first << ", partition id " << key.second << ", offset: " << value);
79+
str << "{" << key.first << "," << key.second << "," << value << "},";
7880
}
81+
SRC_LOG_D(str.Str());
7982
StartingMessageTimestamp = minStartingMessageTs;
8083
IngressStats.Bytes += ingressBytes;
8184
IngressStats.Chunks++;

0 commit comments

Comments
 (0)