Skip to content

Commit 1172da4

Browse files
authored
YQ-3893 Fix next message offset in topic session (#12284)
1 parent d8dfdbf commit 1172da4

File tree

2 files changed

+23
-10
lines changed

2 files changed

+23
-10
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
148148
TQueue<std::pair<ui64, TString>> Buffer;
149149
ui64 UnreadBytes = 0;
150150
bool DataArrivedSent = false;
151-
TMaybe<ui64> NextMessageOffset;
152-
ui64 LastSendedNextMessageOffset = 0;
151+
TMaybe<ui64> NextMessageOffset; // offset to restart topic session
152+
TMaybe<ui64> ProcessedNextMessageOffset; // offset of fully processed data (to save to checkpoint)
153153
TVector<ui64> FieldsIds;
154154
TDuration ReconnectPeriod;
155155
TStats Stat; // Send (filtered) to read_actor
@@ -509,16 +509,15 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvSendStatisticToReadActor::TPtr&)
509509

510510
auto readBytes = ClientsStats.Bytes;
511511
for (auto& [actorId, info] : Clients) {
512-
if (!info.NextMessageOffset) {
512+
if (!info.ProcessedNextMessageOffset) {
513513
continue;
514514
}
515515
auto event = std::make_unique<TEvRowDispatcher::TEvStatistics>();
516516
event->Record.SetPartitionId(PartitionId);
517-
event->Record.SetNextMessageOffset(*info.NextMessageOffset);
517+
event->Record.SetNextMessageOffset(*info.ProcessedNextMessageOffset);
518518
event->Record.SetReadBytes(readBytes);
519-
info.LastSendedNextMessageOffset = *info.NextMessageOffset;
520519
event->ReadActorId = info.ReadActorId;
521-
LOG_ROW_DISPATCHER_TRACE("Send status to " << info.ReadActorId << ", offset " << *info.NextMessageOffset);
520+
LOG_ROW_DISPATCHER_TRACE("Send status to " << info.ReadActorId << ", offset " << info.ProcessedNextMessageOffset);
522521
Send(RowDispatcherActorId, event.release());
523522
}
524523
ClientsStats.Clear();
@@ -541,6 +540,11 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr& ev) {
541540
if (!info.NextMessageOffset || *info.NextMessageOffset < ev->Get()->Offset + 1) {
542541
info.NextMessageOffset = ev->Get()->Offset + 1;
543542
}
543+
if (info.Buffer.empty()) {
544+
if (!info.ProcessedNextMessageOffset || *info.ProcessedNextMessageOffset < ev->Get()->Offset + 1) {
545+
info.ProcessedNextMessageOffset = ev->Get()->Offset + 1;
546+
}
547+
}
544548
}
545549
}
546550

@@ -776,7 +780,7 @@ void TTopicSession::SendData(TClientsInfo& info) {
776780
} while(!info.Buffer.empty());
777781
info.Stat.Add(dataSize, eventsSize);
778782
info.FilteredDataRate->Add(dataSize);
779-
info.LastSendedNextMessageOffset = *info.NextMessageOffset;
783+
info.ProcessedNextMessageOffset = *info.NextMessageOffset;
780784
}
781785

782786
void TTopicSession::UpdateFieldsIds(TClientsInfo& info) {

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,14 @@ class TFixture : public NUnitTest::TBaseFixture {
206206
return eventHolder->Get()->Record.MessagesSize();
207207
}
208208

209-
void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds) {
209+
void ExpectStatisticToReadActor(TSet<NActors::TActorId> readActorIds, ui64 expectedNextMessageOffset) {
210210
size_t count = readActorIds.size();
211211
for (size_t i = 0; i < count; ++i) {
212212
auto eventHolder = Runtime.GrabEdgeEvent<TEvRowDispatcher::TEvStatistics>(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec));
213213
UNIT_ASSERT(eventHolder.Get() != nullptr);
214214
UNIT_ASSERT(readActorIds.contains(eventHolder->Get()->ReadActorId));
215215
readActorIds.erase(eventHolder->Get()->ReadActorId);
216+
UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->Record.GetNextMessageOffset(), expectedNextMessageOffset);
216217
}
217218
}
218219

@@ -244,12 +245,20 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
244245
StartSession(ReadActorId1, source);
245246
StartSession(ReadActorId2, source);
246247

247-
const std::vector<TString> data = { Json1 };
248+
std::vector<TString> data = { Json1 };
248249
PQWrite(data, topicName);
249250
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
250251
ExpectMessageBatch(ReadActorId1, { Json1 });
251252
ExpectMessageBatch(ReadActorId2, { Json1 });
252-
ExpectStatisticToReadActor({ReadActorId1, ReadActorId2});
253+
ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1);
254+
255+
data = { Json2 };
256+
PQWrite(data, topicName);
257+
ExpectNewDataArrived({ReadActorId1, ReadActorId2});
258+
ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 1);
259+
ExpectMessageBatch(ReadActorId1, data);
260+
ExpectMessageBatch(ReadActorId2, data);
261+
ExpectStatisticToReadActor({ReadActorId1, ReadActorId2}, 2);
253262

254263
auto source2 = BuildSource(topicName, false, "OtherConsumer");
255264
StartSession(ReadActorId3, source2, Nothing(), true);

0 commit comments

Comments
 (0)