Skip to content

Commit 65189f7

Browse files
committed
Fixed fault for parsing errors without filter
1 parent 661f6de commit 65189f7

File tree

7 files changed

+75
-69
lines changed

7 files changed

+75
-69
lines changed

ydb/core/fq/libs/row_dispatcher/format_handler/common/common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ struct TSchemaColumn {
2727
struct TCountersDesc {
2828
NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
2929
NMonitoring::TDynamicCounterPtr CountersSubgroup = MakeIntrusive<NMonitoring::TDynamicCounters>();
30-
TString MkqlCountersName; // Used for TAlignedPagePoolCounters created from CountersRoot
30+
TString MkqlCountersName = ""; // Used for TAlignedPagePoolCounters created from CountersRoot
3131

3232
[[nodiscard]] TCountersDesc CopyWithNewMkqlCountersName(const TString& mkqlCountersName) const;
3333
};

ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -154,16 +154,7 @@ class TTopicFilters : public ITopicFilters {
154154
continue;
155155
}
156156

157-
if (filterHandler.GetPurecalcFilter()) {
158-
PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
159-
continue;
160-
}
161-
162-
// Clients without filters
163-
LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
164-
for (ui64 rowId = 0; rowId < numberRows; ++rowId) {
165-
consumer->OnFilteredData(rowId);
166-
}
157+
PushToFilter(filterHandler, offsets, columnIndex, values, numberRows);
167158
}
168159
Stats.AddFilterLatency(TInstant::Now() - startFilter);
169160
}
@@ -193,7 +184,9 @@ class TTopicFilters : public ITopicFilters {
193184
LOG_ROW_DISPATCHER_TRACE("Create filter with id " << filter->GetFilterId());
194185

195186
IPurecalcFilter::TPtr purecalcFilter;
196-
if (filter->GetWhereFilter()) {
187+
if (const auto& predicate = filter->GetWhereFilter()) {
188+
LOG_ROW_DISPATCHER_TRACE("Create purecalc filter for predicate '" << predicate << "' (client id: " << filter->GetFilterId() << ")");
189+
197190
auto filterStatus = CreatePurecalcFilter(filter);
198191
if (filterStatus.IsFail()) {
199192
return filterStatus;
@@ -225,9 +218,6 @@ class TTopicFilters : public ITopicFilters {
225218

226219
private:
227220
void PushToFilter(const TFilterHandler& filterHandler, const TVector<ui64>& offsets, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
228-
const auto filter = filterHandler.GetPurecalcFilter();
229-
Y_ENSURE(filter, "Expected initialized filter");
230-
231221
const auto consumer = filterHandler.GetConsumer();
232222
const auto& columnIds = consumer->GetColumnIds();
233223

@@ -246,8 +236,15 @@ class TTopicFilters : public ITopicFilters {
246236
}
247237
}
248238

249-
LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (client id: " << consumer->GetFilterId() << ")");
250-
filter->FilterData(result, numberRows);
239+
if (const auto filter = filterHandler.GetPurecalcFilter()) {
240+
LOG_ROW_DISPATCHER_TRACE("Pass " << numberRows << " rows to purecalc filter (client id: " << consumer->GetFilterId() << ")");
241+
filter->FilterData(result, numberRows);
242+
} else {
243+
LOG_ROW_DISPATCHER_TRACE("Add " << numberRows << " rows to client " << consumer->GetFilterId() << " without filtering");
244+
for (ui64 rowId = 0; rowId < numberRows; ++rowId) {
245+
consumer->OnFilteredData(rowId);
246+
}
247+
}
251248
}
252249

253250
private:

ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class TOptionalCell : public TBaseFixture::ICell {
6868

6969
void Validate(const NYql::NUdf::TUnboxedValue& parsedValue) const override {
7070
if (!parsedValue) {
71+
UNIT_FAIL("Unexpected NULL value for optional cell");
7172
return;
7273
}
7374
Value->Validate(parsedValue.GetOptionalValue());
@@ -166,7 +167,7 @@ void TBaseFixture::SetUp(NUnitTest::TTestContext&) {
166167
TAutoPtr<NKikimr::TAppPrepare> app = new NKikimr::TAppPrepare();
167168
Runtime.SetLogBackend(NActors::CreateStderrBackend());
168169
Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NActors::NLog::PRI_TRACE);
169-
Runtime.SetDispatchTimeout(TDuration::Seconds(5));
170+
Runtime.SetDispatchTimeout(WAIT_TIMEOUT);
170171
Runtime.Initialize(app->Unwrap());
171172

172173
// Init tls context

ydb/core/fq/libs/row_dispatcher/format_handler/ut/common/ut_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
namespace NFq::NRowDispatcher::NTests {
1212

13+
static constexpr TDuration WAIT_TIMEOUT = TDuration::Seconds(20);
14+
1315
class TBaseFixture : public NUnitTest::TBaseFixture, public TTypeParser {
1416
public:
1517
// Helper classes for checking serialized rows in multi type format

ydb/core/fq/libs/row_dispatcher/format_handler/ut/format_handler_ut.cpp

Lines changed: 53 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class TFormatHadlerFixture : public TBaseFixture {
155155
FormatHandler = CreateTestFormatHandler(config, settings);
156156
}
157157

158-
TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) {
158+
[[nodiscard]] TStatus MakeClient(const TVector<TSchemaColumn>& columns, const TString& whereFilter, TCallback callback, ui64 expectedFilteredRows = 1) {
159159
ClientIds.emplace_back(ClientIds.size(), 0, 0, 0);
160160

161161
auto client = MakeIntrusive<TClientDataConsumer>(ClientIds.back(), columns, whereFilter, callback, expectedFilteredRows);
@@ -202,6 +202,30 @@ class TFormatHadlerFixture : public TBaseFixture {
202202
FormatHandler->RemoveClient(clientId);
203203
}
204204

205+
public:
206+
static TCallback EmptyCheck() {
207+
return [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {};
208+
}
209+
210+
static TCallback OneBatchCheck(std::function<void(TRope&& messages, TVector<ui64>&& offsets)> callback) {
211+
return [callback](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
212+
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
213+
auto [messages, offsets] = data.front();
214+
215+
UNIT_ASSERT(!offsets.empty());
216+
callback(std::move(messages), std::move(offsets));
217+
};
218+
}
219+
220+
TCallback OneRowCheck(ui64 offset, const TRow& row) const {
221+
return OneBatchCheck([this, offset, row](TRope&& messages, TVector<ui64>&& offsets) {
222+
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
223+
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), offset);
224+
225+
CheckMessageBatch(messages, TBatch().AddRow(row));
226+
});
227+
}
228+
205229
private:
206230
void ExtractClientsData() {
207231
for (auto& client : Clients) {
@@ -233,33 +257,13 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
233257
CheckSuccess(MakeClient(
234258
{commonColumn, {"col_first", "[DataType; String]"}},
235259
"WHERE col_first = \"str_first__large__\"",
236-
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
237-
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
238-
239-
auto [messages, offsets] = data.front();
240-
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
241-
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset + 1);
242-
243-
CheckMessageBatch(messages, TBatch().AddRow(
244-
TRow().AddString("event2").AddString("str_first__large__")
245-
));
246-
}
260+
OneRowCheck(firstOffset + 1, TRow().AddString("event2").AddString("str_first__large__"))
247261
));
248262

249263
CheckSuccess(MakeClient(
250264
{commonColumn, {"col_second", "[DataType; String]"}},
251265
"WHERE col_second = \"str_second\"",
252-
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
253-
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
254-
255-
auto [messages, offsets] = data.front();
256-
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
257-
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset);
258-
259-
CheckMessageBatch(messages, TBatch().AddRow(
260-
TRow().AddString("event1").AddString("str_second")
261-
));
262-
}
266+
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
263267
));
264268

265269
ParseMessages({
@@ -288,14 +292,10 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
288292
R"({"col_a": false, "col_b": {"X": "Y"}})"
289293
};
290294

291-
CheckSuccess(MakeClient(schema, "WHERE FALSE", [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {}, 0));
292-
293-
auto trueChacker = [&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
294-
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
295-
auto [messages, offsets] = data.front();
295+
CheckSuccess(MakeClient(schema, "WHERE FALSE", EmptyCheck(), 0));
296296

297+
const auto trueChacker = OneBatchCheck([&](TRope&& messages, TVector<ui64>&& offsets) {
297298
TBatch expectedBatch;
298-
UNIT_ASSERT(!offsets.empty());
299299
for (ui64 offset : offsets) {
300300
UNIT_ASSERT(offset - firstOffset < testData.size());
301301
expectedBatch.AddRow(
@@ -304,7 +304,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
304304
}
305305

306306
CheckMessageBatch(messages, expectedBatch);
307-
};
307+
});
308308
CheckSuccess(MakeClient(schema, "WHERE TRUE", trueChacker, 3));
309309
CheckSuccess(MakeClient(schema, "", trueChacker, 2));
310310

@@ -323,7 +323,7 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
323323
Y_UNIT_TEST_F(ClientValidation, TFormatHadlerFixture) {
324324
const TVector<TSchemaColumn> schema = {{"data", "[DataType; String]"}};
325325
const TString filter = "WHERE FALSE";
326-
const auto callback = [&](TQueue<std::pair<TRope, TVector<ui64>>>&&) {};
326+
const auto callback = EmptyCheck();
327327
CheckSuccess(MakeClient(schema, filter, callback, 0));
328328

329329
CheckError(
@@ -349,27 +349,12 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
349349
const ui64 firstOffset = 42;
350350
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};
351351

352-
CheckSuccess(MakeClient(
353-
{commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}},
354-
"WHERE TRUE",
355-
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {},
356-
0
357-
));
352+
CheckSuccess(MakeClient({commonColumn, {"col_first", "[OptionalType; [DataType; Uint8]]"}}, "WHERE TRUE", EmptyCheck(), 0));
358353

359354
CheckSuccess(MakeClient(
360355
{commonColumn, {"col_second", "[DataType; String]"}},
361356
"WHERE col_second = \"str_second\"",
362-
[&](TQueue<std::pair<TRope, TVector<ui64>>>&& data) {
363-
UNIT_ASSERT_VALUES_EQUAL(data.size(), 1);
364-
365-
auto [messages, offsets] = data.front();
366-
UNIT_ASSERT_VALUES_EQUAL(offsets.size(), 1);
367-
UNIT_ASSERT_VALUES_EQUAL(offsets.front(), firstOffset);
368-
369-
CheckMessageBatch(messages, TBatch().AddRow(
370-
TRow().AddString("event1").AddString("str_second")
371-
));
372-
}
357+
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
373358
));
374359

375360
CheckClientError(
@@ -379,6 +364,26 @@ Y_UNIT_TEST_SUITE(TestFormatHandler) {
379364
TStringBuilder() << "Failed to parse json string at offset " << firstOffset << ", got parsing error for column 'col_first' with type [OptionalType; [DataType; Uint8]]"
380365
);
381366
}
367+
368+
Y_UNIT_TEST_F(ClientErrorWithEmptyFilter, TFormatHadlerFixture) {
369+
const ui64 firstOffset = 42;
370+
const TSchemaColumn commonColumn = {"com_col", "[DataType; String]"};
371+
372+
CheckSuccess(MakeClient({commonColumn, {"col_first", "[DataType; String]"}}, "", EmptyCheck(), 0));
373+
374+
CheckSuccess(MakeClient(
375+
{commonColumn, {"col_second", "[DataType; String]"}},
376+
"WHERE col_second = \"str_second\"",
377+
OneRowCheck(firstOffset, TRow().AddString("event1").AddString("str_second"))
378+
));
379+
380+
CheckClientError(
381+
{GetMessage(firstOffset, R"({"com_col": "event1", "col_second": "str_second"})")},
382+
ClientIds[0],
383+
EStatusId::PRECONDITION_FAILED,
384+
TStringBuilder() << "Failed to parse json messages, found 1 missing values from offset " << firstOffset << " in non optional column 'col_first' with type [DataType; String]"
385+
);
386+
}
382387
}
383388

384389
} // namespace NFq::NRowDispatcher::NTests

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,7 @@ void TTopicSession::StartClientSession(TClientsInfo& info) {
710710

711711
void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) {
712712
const auto& source = ev->Get()->Record.GetSource();
713-
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: " << source.GetPredicate() << ", offset: " << ev->Get()->Record.GetOffset());
713+
LOG_ROW_DISPATCHER_INFO("New client: read actor id " << ev->Sender.ToString() << ", predicate: '" << source.GetPredicate() << "', offset: " << ev->Get()->Record.GetOffset());
714714

715715
if (!CheckNewClient(ev)) {
716716
return;

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ namespace {
2323
using namespace NKikimr;
2424
using namespace NYql::NDq;
2525

26-
const ui64 TimeoutBeforeStartSessionSec = 3;
27-
const ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
26+
constexpr ui64 TimeoutBeforeStartSessionSec = 3;
27+
constexpr ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
28+
static_assert(GrabTimeoutSec <= WAIT_TIMEOUT.Seconds());
2829

2930
class TFixture : public NTests::TBaseFixture {
3031
public:

0 commit comments

Comments
 (0)