Skip to content

Commit db7e0cc

Browse files
committed
Fixed issues 1
1 parent c028a26 commit db7e0cc

File tree

5 files changed

+25
-11
lines changed

5 files changed

+25
-11
lines changed

ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ class TTopicFilters : public ITopicFilters {
185185

186186
IPurecalcFilter::TPtr purecalcFilter;
187187
if (const auto& predicate = filter->GetWhereFilter()) {
188-
LOG_ROW_DISPATCHER_TRACE("Create purecalc filter for predicate '" << predicate << "' (client id: " << filter->GetFilterId() << ")");
188+
LOG_ROW_DISPATCHER_TRACE("Create purecalc filter for predicate '" << predicate << "' (filter id: " << filter->GetFilterId() << ")");
189189

190190
auto filterStatus = CreatePurecalcFilter(filter);
191191
if (filterStatus.IsFail()) {
@@ -237,13 +237,11 @@ class TTopicFilters : public ITopicFilters {
237237
}
238238

239239
if (const auto filter = filterHandler.GetPurecalcFilter()) {
240-
LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (client id: " << consumer->GetFilterId() << ")");
240+
LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (filter id: " << consumer->GetFilterId() << ")");
241241
filter->FilterData(result, numberRows);
242-
} else {
242+
} else if (numberRows) {
243243
LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
244-
for (ui64 rowId = 0; rowId < numberRows; ++rowId) {
245-
consumer->OnFilteredData(rowId);
246-
}
244+
consumer->OnFilteredBatch(0, numberRows - 1);
247245
}
248246
}
249247

ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ class IFilteredDataConsumer : public IPurecalcFilterConsumer {
1717
virtual const TVector<ui64>& GetColumnIds() const = 0;
1818
virtual TMaybe<ui64> GetNextMessageOffset() const = 0;
1919

20+
virtual void OnFilteredBatch(ui64 firstRow, ui64 lastRow) = 0; // inclusive interval [firstRow, lastRow]
21+
2022
virtual void OnFilterStarted() = 0;
2123
virtual void OnFilteringError(TStatus status) = 0;
2224
};

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,13 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
207207
Client->StartClientSession();
208208
}
209209

210+
void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override {
211+
LOG_ROW_DISPATCHER_TRACE("OnFilteredBatch, rows [" << firstRow << ", " << lastRow << "]");
212+
for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) {
213+
OnFilteredData(rowId);
214+
}
215+
}
216+
210217
void OnFilteredData(ui64 rowId) override {
211218
const ui64 offset = Self.Offsets->at(rowId);
212219
if (const auto nextOffset = Client->GetNextMessageOffset(); nextOffset && offset < *nextOffset) {

ydb/core/fq/libs/row_dispatcher/format_handler/parsers/json_parser.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -414,23 +414,23 @@ class TJsonParser : public TTopicParserBase {
414414

415415
simdjson::ondemand::document_stream documents;
416416
CHECK_JSON_ERROR(Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE).get(documents)) {
417-
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch:\n" << TruncateString(std::string_view(values, size)));
417+
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse message batch from offset " << Buffer.Offsets.front() << ", json documents was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
418418
}
419419

420420
size_t rowId = 0;
421421
for (auto document : documents) {
422422
if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) {
423-
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch:\n" << TruncateString(std::string_view(values, size)));
423+
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId + 1 << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)));
424424
}
425425

426426
const ui64 offset = Buffer.Offsets[rowId];
427427
CHECK_JSON_ERROR(document.error()) {
428-
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error) << " Current data batch:\n" << TruncateString(std::string_view(values, size)));
428+
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json document was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
429429
}
430430

431431
for (auto item : document.get_object()) {
432432
CHECK_JSON_ERROR(item.error()) {
433-
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error) << " Current data batch:\n" << TruncateString(std::string_view(values, size)));
433+
return TStatus::Fail(EStatusId::BAD_REQUEST, TStringBuilder() << "Failed to parse json message for offset " << offset << ", json item was corrupted: " << simdjson::error_message(error) << " Current data batch: " << TruncateString(std::string_view(values, size)));
434434
}
435435

436436
const auto it = ColumnsIndex.find(item.escaped_key().value());
@@ -445,7 +445,7 @@ class TJsonParser : public TTopicParserBase {
445445
}
446446

447447
if (Y_UNLIKELY(rowId != Buffer.NumberValues)) {
448-
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch:\n" << TruncateString(std::string_view(values, size)));
448+
return TStatus::Fail(EStatusId::INTERNAL_ERROR, TStringBuilder() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId << " (expected one json row for each offset from topic API in json each row format, maybe initial data was corrupted or messages is not in json format), current data batch: " << TruncateString(std::string_view(values, size)));
449449
}
450450

451451
const ui64 firstOffset = Buffer.Offsets.front();

ydb/core/fq/libs/row_dispatcher/format_handler/ut/topic_filter_ut.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,13 @@ class TFiterFixture : public TBaseFixture {
6464
}
6565
}
6666

67+
void OnFilteredBatch(ui64 firstRow, ui64 lastRow) override {
68+
UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
69+
for (ui64 rowId = firstRow; rowId <= lastRow; ++rowId) {
70+
Callback(rowId);
71+
}
72+
}
73+
6774
void OnFilteredData(ui64 rowId) override {
6875
UNIT_ASSERT_C(Started, "Unexpected data for not started filter");
6976
Callback(rowId);

0 commit comments

Comments
 (0)