Skip to content

Commit 6710cfa

Browse files
committed
Fixed tests 2
1 parent c9f848c commit 6710cfa

File tree

5 files changed

+71
-44
lines changed

5 files changed

+71
-44
lines changed

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ message TRowDispatcherCoordinatorConfig {
1515
}
1616

1717
message TJsonParserConfig {
18-
uint64 BatchSizeBytes = 1;
19-
uint64 BatchCreationTimeoutMs = 2;
18+
uint64 BatchSizeBytes = 1; // default 1 MiB
19+
uint64 BatchCreationTimeoutMs = 2; // default 1 second
2020
uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6
2121
}
2222

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,13 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {
139139

140140
NKikimr::NMiniKQL::TThrowingBindTerminator bind;
141141
with_lock (Worker->GetScopedAlloc()) {
142+
Y_DEFER {
143+
// Clear cache after each object because
144+
// values allocated on another allocator and should be released
145+
Cache.Clear();
146+
Worker->GetGraph().Invalidate();
147+
};
148+
142149
auto& holderFactory = Worker->GetGraph().GetHolderFactory();
143150

144151
// TODO: use blocks here
@@ -159,11 +166,6 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {
159166

160167
Worker->Push(std::move(result));
161168
}
162-
163-
// Clear cache after each object because
164-
// values allocated on another allocator and should be released
165-
Cache.Clear();
166-
Worker->GetGraph().Invalidate();
167169
}
168170
}
169171

ydb/core/fq/libs/row_dispatcher/json_parser.cpp

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ namespace {
1919

2020
TString LogPrefix = "JsonParser: ";
2121

22+
constexpr ui64 DEFAULT_BATCH_SIZE = 1_MB;
2223
constexpr ui64 DEFAULT_STATIC_BUFFER_SIZE = 1000000;
24+
constexpr TDuration DEFAULT_BATCH_CREATION_TIMEOUT = TDuration::Seconds(1);
2325

2426
struct TJsonParserBuffer {
2527
size_t NumberValues = 0;
@@ -40,20 +42,11 @@ struct TJsonParserBuffer {
4042
Offsets.reserve(numberValues);
4143
}
4244

43-
void AddMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) {
45+
void AddMessage(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
4446
Y_ENSURE(!Finished, "Cannot add messages into finished buffer");
45-
46-
size_t messagesSize = 0;
47-
for (const auto& message : messages) {
48-
messagesSize += message.GetData().size();
49-
}
50-
51-
NumberValues += messages.size();
52-
Reserve(Values.size() + messagesSize, NumberValues);
53-
for (const auto& message : messages) {
54-
Values << message.GetData();
55-
Offsets.emplace_back(message.GetOffset());
56-
}
47+
NumberValues++;
48+
Values << message.GetData();
49+
Offsets.emplace_back(message.GetOffset());
5750
}
5851

5952
std::pair<const char*, size_t> Finish() {
@@ -102,8 +95,8 @@ class TColumnParser {
10295
}
10396

10497
void ParseJsonValue(ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
105-
Parser(jsonValue, resultValue);
10698
ParsedRows.emplace_back(rowId);
99+
Parser(jsonValue, resultValue);
107100
}
108101

109102
void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) const {
@@ -280,9 +273,9 @@ class TJsonParser::TImpl {
280273
TImpl(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize)
281274
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
282275
, TypeEnv(std::make_unique<NKikimr::NMiniKQL::TTypeEnvironment>(Alloc))
283-
, BatchSize(batchSize)
276+
, BatchSize(batchSize ? batchSize : DEFAULT_BATCH_SIZE)
284277
, MaxNumberRows(((staticBufferSize ? staticBufferSize : DEFAULT_STATIC_BUFFER_SIZE) - 1) / columns.size() + 1)
285-
, BatchCreationTimeout(batchCreationTimeout)
278+
, BatchCreationTimeout(batchCreationTimeout ? batchCreationTimeout : DEFAULT_BATCH_CREATION_TIMEOUT)
286279
, ParseCallback(parseCallback)
287280
, ParsedValues(columns.size())
288281
{
@@ -330,14 +323,13 @@ class TJsonParser::TImpl {
330323
}
331324

332325
void AddMessages(const TVector<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>& messages) {
333-
if (messages.empty()) {
334-
return;
335-
}
336-
337-
if (Buffer.Finished) {
338-
Buffer.Clear();
326+
Y_ENSURE(!Buffer.Finished, "Cannot add messages into finished buffer");
327+
for (const auto& message : messages) {
328+
Buffer.AddMessage(message);
329+
if (IsReady()) {
330+
Parse();
331+
}
339332
}
340-
Buffer.AddMessages(messages);
341333
}
342334

343335
void Parse() {
@@ -347,13 +339,18 @@ class TJsonParser::TImpl {
347339
LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values);
348340

349341
with_lock (Alloc) {
350-
const ui64 firstOffset = Buffer.Offsets.front();
342+
Y_DEFER {
343+
// Clear all UV in case of exception
344+
ClearColumns();
345+
Buffer.Clear();
346+
};
347+
351348
size_t rowId = 0;
352349
size_t parsedRows = 0;
353350
simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE);
354351
for (auto document : documents) {
355352
if (Y_UNLIKELY(parsedRows >= Buffer.NumberValues)) {
356-
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << parsedRows + 1;
353+
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << parsedRows + 1;
357354
}
358355
for (auto item : document.get_object()) {
359356
const auto it = ColumnsIndex.find(item.escaped_key().value());
@@ -372,18 +369,17 @@ class TJsonParser::TImpl {
372369

373370
rowId++;
374371
parsedRows++;
375-
376372
if (rowId == MaxNumberRows) {
377-
ClearColumns(parsedRows, MaxNumberRows);
373+
FlushColumns(parsedRows, MaxNumberRows);
378374
rowId = 0;
379375
}
380376
}
381377

382-
if (parsedRows != Buffer.NumberValues) {
383-
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId;
378+
if (Y_UNLIKELY(parsedRows != Buffer.NumberValues)) {
379+
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId;
384380
}
385381
if (rowId) {
386-
ClearColumns(parsedRows, rowId);
382+
FlushColumns(parsedRows, rowId);
387383
}
388384
}
389385
}
@@ -406,7 +402,7 @@ class TJsonParser::TImpl {
406402
}
407403

408404
private:
409-
void ClearColumns(size_t parsedRows, size_t savedRows) {
405+
void FlushColumns(size_t parsedRows, size_t savedRows) {
410406
const ui64 firstOffset = Buffer.Offsets.front();
411407
for (const auto& column : Columns) {
412408
column.ValidateNumberValues(savedRows, firstOffset);
@@ -417,6 +413,10 @@ class TJsonParser::TImpl {
417413
ParseCallback(parsedRows - savedRows, savedRows, ParsedValues);
418414
}
419415

416+
ClearColumns();
417+
}
418+
419+
void ClearColumns() {
420420
for (size_t i = 0; i < Columns.size(); ++i) {
421421
auto& parsedColumn = ParsedValues[i];
422422
for (size_t rowId : Columns[i].ParsedRows) {

ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ class TFixture : public NUnitTest::TBaseFixture {
2626
: PureCalcProgramFactory(CreatePureCalcProgramFactory())
2727
, Runtime(true)
2828
, Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
29-
{}
29+
{
30+
Alloc.Ref().UseRefLocking = true;
31+
}
3032

3133
static void SegmentationFaultHandler(int) {
3234
Cerr << "segmentation fault call stack:" << Endl;
@@ -70,7 +72,7 @@ class TFixture : public NUnitTest::TBaseFixture {
7072
}
7173

7274
void Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values) {
73-
Filter->Push(offsets, values, 0, values.size());
75+
Filter->Push(offsets, values, 0, values.front()->size());
7476
}
7577

7678
const TVector<NYql::NUdf::TUnboxedValue>* MakeVector(size_t size, std::function<NYql::NUdf::TUnboxedValuePod(size_t)> valueCreator) {

ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class TFixture : public NUnitTest::TBaseFixture {
4646
}
4747
}
4848

49-
void MakeParser(TVector<TString> columns, TVector<TString> types, TJsonParser::TCallback callback, ui64 staticBufferSize = 1000) {
50-
Parser = NFq::NewJsonParser(columns, types, callback, 0, TDuration::Zero(), staticBufferSize);
49+
void MakeParser(TVector<TString> columns, TVector<TString> types, TJsonParser::TCallback callback, ui64 batchSize = 1_MB, ui64 staticBufferSize = 1000) {
50+
Parser = NFq::NewJsonParser(columns, types, callback, batchSize, TDuration::Hours(1), staticBufferSize);
5151
}
5252

5353
void MakeParser(TVector<TString> columns, TJsonParser::TCallback callback) {
@@ -219,7 +219,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
219219
Y_UNIT_TEST_F(SimpleBooleans, TFixture) {
220220
MakeParser({"a"}, {"[DataType; Bool]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector<TVector<NYql::NUdf::TUnboxedValue>>& result) {
221221
UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset);
222-
UNIT_ASSERT_VALUES_EQUAL(3, numberRows);
222+
UNIT_ASSERT_VALUES_EQUAL(2, numberRows);
223223

224224
UNIT_ASSERT_VALUES_EQUAL(1, result.size());
225225
UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get<bool>());
@@ -244,7 +244,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
244244
UNIT_ASSERT_VALUES_EQUAL(1, numberRows);
245245
UNIT_ASSERT_VALUES_EQUAL(1, result.size());
246246
UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef()));
247-
}, 1);
247+
}, 1_MB, 1);
248248

249249
const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}";
250250
Parser->AddMessages({
@@ -254,6 +254,29 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) {
254254
Parser->Parse();
255255
}
256256

257+
Y_UNIT_TEST_F(LittleBatches, TFixture) {
258+
const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890";
259+
260+
ui64 currentOffset = 42;
261+
MakeParser({"col"}, {"[DataType; String]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector<TVector<NYql::NUdf::TUnboxedValue>>& result) {
262+
UNIT_ASSERT_VALUES_EQUAL(Parser->GetOffsets().size(), 1);
263+
UNIT_ASSERT_VALUES_EQUAL(Parser->GetOffsets().front(), currentOffset);
264+
currentOffset++;
265+
266+
UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset);
267+
UNIT_ASSERT_VALUES_EQUAL(1, numberRows);
268+
UNIT_ASSERT_VALUES_EQUAL(1, result.size());
269+
UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef()));
270+
}, 10);
271+
272+
const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}";
273+
Parser->AddMessages({
274+
GetMessage(42, jsonString),
275+
GetMessage(43, jsonString)
276+
});
277+
UNIT_ASSERT_VALUES_EQUAL(Parser->GetNumberValues(), 0);
278+
}
279+
257280
Y_UNIT_TEST_F(MissingFieldsValidation, TFixture) {
258281
MakeParser({"a1", "a2"}, {"[DataType; String]", "[DataType; Uint64]"});
259282
UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": "hello1", "a2": null, "event": "event1"})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a2' with type [DataType; Uint64], description: (yexception) found unexpected null value, expected non optional data type Uint64");

0 commit comments

Comments
 (0)