Skip to content

Commit b375d19

Browse files
authored
YQ-3981 RD fixed missing historical data (#12757)
1 parent 2f41f2d commit b375d19

File tree

2 files changed

+20
-1
lines changed

2 files changed

+20
-1
lines changed

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,10 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
348348
void ParseMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) override {
349349
LOG_ROW_DISPATCHER_TRACE("Send " << messages.size() << " messages to parser");
350350

351+
if (messages) {
352+
CurrentOffset = messages.back().GetOffset();
353+
}
354+
351355
if (Parser) {
352356
Parser->ParseMessages(messages);
353357
ScheduleRefresh();
@@ -367,6 +371,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
367371
TStatus AddClient(IClientDataConsumer::TPtr client) override {
368372
LOG_ROW_DISPATCHER_DEBUG("Add client with id " << client->GetClientId());
369373

374+
if (const auto clientOffset = client->GetNextMessageOffset()) {
375+
if (Parser && CurrentOffset && *CurrentOffset > *clientOffset) {
376+
LOG_ROW_DISPATCHER_DEBUG("Parser was flushed due to new historical offset " << *clientOffset << "(previous parser offset: " << *CurrentOffset << ")");
377+
Parser->Refresh(true);
378+
}
379+
}
380+
370381
auto clientHandler = MakeIntrusive<TClientHandler>(*this, client);
371382
if (!Clients.emplace(client->GetClientId(), clientHandler).second) {
372383
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to create new client, client with id " << client->GetClientId() << " already exists");
@@ -553,6 +564,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
553564
ITopicParser::TPtr Parser;
554565
TParserHandler::TPtr ParserHandler;
555566
ITopicFilters::TPtr Filters;
567+
std::optional<ui64> CurrentOffset;
556568

557569
// Parsed data
558570
const TVector<ui64>* Offsets;

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,15 @@ struct TJsonParserBuffer {
4242

4343
void AddMessage(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
4444
Y_ENSURE(!Finished, "Cannot add messages into finished buffer");
45+
46+
const auto offset = message.GetOffset();
47+
if (Y_UNLIKELY(Offsets && Offsets.back() > offset)) {
48+
LOG_ROW_DISPATCHER_WARN("Got message with offset " << offset << " which is less than previous offset " << Offsets.back());
49+
}
50+
4551
NumberValues++;
4652
Values << message.GetData();
47-
Offsets.emplace_back(message.GetOffset());
53+
Offsets.emplace_back(offset);
4854
}
4955

5056
std::pair<const char*, size_t> Finish() {
@@ -65,6 +71,7 @@ struct TJsonParserBuffer {
6571

6672
private:
6773
TStringBuilder Values = {};
74+
const TString LogPrefix = "TJsonParser: Buffer: ";
6875
};
6976

7077
class TColumnParser {

0 commit comments

Comments
 (0)