From 43231739cae155c3ab64cf6edd75b35298e0d635 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 14 Nov 2024 17:48:05 +0000 Subject: [PATCH 1/7] Added limit for parser bufer size --- .../libs/config/protos/row_dispatcher.proto | 1 + .../fq/libs/row_dispatcher/json_filter.cpp | 37 ++- ydb/core/fq/libs/row_dispatcher/json_filter.h | 4 +- .../fq/libs/row_dispatcher/json_parser.cpp | 98 +++++--- ydb/core/fq/libs/row_dispatcher/json_parser.h | 13 +- .../fq/libs/row_dispatcher/topic_session.cpp | 22 +- .../libs/row_dispatcher/ut/json_filter_ut.cpp | 40 ++- .../libs/row_dispatcher/ut/json_parser_ut.cpp | 230 ++++++++++-------- 8 files changed, 266 insertions(+), 179 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index 0607f63dd45c..feb3c830b614 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -17,6 +17,7 @@ message TRowDispatcherCoordinatorConfig { message TJsonParserConfig { uint64 BatchSizeBytes = 1; uint64 BatchCreationTimeoutMs = 2; + uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6 ~ 24 MiB } message TRowDispatcherConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index c510094be3a9..eef834281818 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -71,6 +71,17 @@ NYT::TNode MakeOutputSchema() { return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); } +struct TInputType { + const TVector& Offsets; + const TVector*>& Values; + const ui64 RowsOffset; // ofset of first value + const ui64 NumberRows; + + ui64 GetOffset(ui64 rowId) const { + return Offsets[rowId + RowsOffset]; + } +}; + class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase { public: TFilterInputSpec(const NYT::TNode& schema) @@ -85,7 +96,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase { TVector Schemas; }; -class TFilterInputConsumer : public NYql::NPureCalc::IConsumer&, const TVector&>> { +class TFilterInputConsumer : public NYql::NPureCalc::IConsumer { public: TFilterInputConsumer( const TFilterInputSpec& spec, @@ -123,26 +134,26 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer&, const TVector&> values) override { - Y_ENSURE(FieldsPositions.size() == values.second.size()); + void OnObject(TInputType input) override { + Y_ENSURE(FieldsPositions.size() == input.Values.size()); NKikimr::NMiniKQL::TThrowingBindTerminator bind; with_lock (Worker->GetScopedAlloc()) { auto& holderFactory = Worker->GetGraph().GetHolderFactory(); // TODO: use blocks here - for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) { + for (size_t rowId = 0; rowId < input.NumberRows; ++rowId) { NYql::NUdf::TUnboxedValue* items = nullptr; NYql::NUdf::TUnboxedValue result = Cache.NewArray( holderFactory, - static_cast(values.second.size() + 1), + static_cast(input.Values.size() + 1), items); - items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first[rowId]); + items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(input.GetOffset(rowId)); size_t fieldId = 0; - for (const auto& column : values.second) { + for (const auto column : input.Values) { items[FieldsPositions[fieldId++]] = column->at(rowId); } @@ -236,7 +247,7 @@ struct NYql::NPureCalc::TInputSpecTraits { static constexpr bool IsPartial = false; static constexpr bool SupportPushStreamMode = true; - using TConsumerType = THolder&, const TVector&>>>; + using TConsumerType = THolder>; static TConsumerType MakeConsumer( const TFilterInputSpec& spec, @@ -282,9 +293,9 @@ class TJsonFilter::TImpl { LOG_ROW_DISPATCHER_DEBUG("Program created"); } - void Push(const TVector& offsets, const TVector& values) { + void Push(const TVector& offsets, const TVector*>& values, ui64 rowsOffset, ui64 numberRows) { Y_ENSURE(values, "Expected non empty schema"); - InputConsumer->OnObject(std::make_pair(offsets, values)); + InputConsumer->OnObject({.Offsets = offsets, .Values = values, .RowsOffset = rowsOffset, .NumberRows = numberRows}); } TString GetSql() const { @@ -305,7 +316,7 @@ class TJsonFilter::TImpl { private: THolder> Program; - THolder&, const TVector&>>> InputConsumer; + THolder> InputConsumer; const TString Sql; }; @@ -322,8 +333,8 @@ TJsonFilter::TJsonFilter( TJsonFilter::~TJsonFilter() { } -void TJsonFilter::Push(const TVector& offsets, const TVector& values) { - Impl->Push(offsets, values); +void TJsonFilter::Push(const TVector& offsets, const TVector*>& values, ui64 rowsOffset, ui64 numberRows) { + Impl->Push(offsets, values, rowsOffset, numberRows); } TString TJsonFilter::GetSql() { diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index 6d1cebf9338c..51c0ff1581f7 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -2,8 +2,6 @@ #include "common.h" -#include -#include #include namespace NFq { @@ -23,7 +21,7 @@ class TJsonFilter { ~TJsonFilter(); - void Push(const TVector& offsets, const TVector& values); + void Push(const TVector& offsets, const TVector*>& values, ui64 rowsOffset, ui64 numberRows); TString GetSql(); private: diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 99ee146c68df..cfec2f4540bb 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -4,9 +4,11 @@ #include #include +#include #include #include #include +#include #include #include @@ -17,6 +19,8 @@ namespace { TString LogPrefix = "JsonParser: "; +constexpr ui64 DEFAULT_STATIC_BUFFER_SIZE = 1000000; + struct TJsonParserBuffer { size_t NumberValues = 0; bool Finished = false; @@ -80,16 +84,16 @@ class TColumnParser { const TString TypeYson; const NKikimr::NMiniKQL::TType* TypeMkql; const bool IsOptional = false; - size_t NumberValues = 0; + TVector ParsedRows; public: - TColumnParser(const TString& name, const TString& typeYson, NKikimr::NMiniKQL::TProgramBuilder& programBuilder) + TColumnParser(const TString& name, const TString& typeYson, ui64 maxNumberRows, NKikimr::NMiniKQL::TProgramBuilder& programBuilder) : Name(name) , TypeYson(typeYson) , TypeMkql(NYql::NCommon::ParseTypeFromYson(TStringBuf(typeYson), programBuilder, Cerr)) , IsOptional(TypeMkql->IsOptional()) - , NumberValues(0) { + ParsedRows.reserve(maxNumberRows); try { Parser = CreateParser(TypeMkql); } catch (...) { @@ -97,14 +101,14 @@ class TColumnParser { } } - void ParseJsonValue(simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + void ParseJsonValue(ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { Parser(jsonValue, resultValue); - NumberValues++; + ParsedRows.emplace_back(rowId); } void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) const { - if (Y_UNLIKELY(!IsOptional && NumberValues < expectedNumberValues)) { - throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - NumberValues << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson; + if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) { + throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson; } } @@ -273,11 +277,13 @@ namespace NFq { class TJsonParser::TImpl { public: - TImpl(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) + TImpl(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) : Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) , TypeEnv(std::make_unique(Alloc)) , BatchSize(batchSize) + , MaxNumberRows(((staticBufferSize ? staticBufferSize : DEFAULT_STATIC_BUFFER_SIZE) - 1) / columns.size() + 1) , BatchCreationTimeout(batchCreationTimeout) + , ParseCallback(parseCallback) , ParsedValues(columns.size()) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); @@ -288,7 +294,7 @@ class TJsonParser::TImpl { Columns.reserve(columns.size()); for (size_t i = 0; i < columns.size(); i++) { - Columns.emplace_back(columns[i], types[i], programBuilder); + Columns.emplace_back(columns[i], types[i], MaxNumberRows, programBuilder); } } @@ -297,7 +303,11 @@ class TJsonParser::TImpl { ColumnsIndex.emplace(std::string_view(Columns[i].Name), i); } - Buffer.Reserve(BatchSize, 1); + for (size_t i = 0; i < columns.size(); i++) { + ParsedValues[i].resize(MaxNumberRows); + } + + Buffer.Reserve(BatchSize, MaxNumberRows); LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name()); Parser.threaded = false; @@ -330,21 +340,20 @@ class TJsonParser::TImpl { Buffer.AddMessages(messages); } - const TVector& Parse() { + void Parse() { Y_ENSURE(Buffer.IsReady(), "Nothing to parse"); const auto [values, size] = Buffer.Finish(); LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values); with_lock (Alloc) { - ClearColumns(Buffer.NumberValues); - const ui64 firstOffset = Buffer.Offsets.front(); size_t rowId = 0; + size_t parsedRows = 0; simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE); for (auto document : documents) { - if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) { - throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId + 1; + if (Y_UNLIKELY(parsedRows >= Buffer.NumberValues)) { + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << parsedRows + 1; } for (auto item : document.get_object()) { const auto it = ColumnsIndex.find(item.escaped_key().value()); @@ -355,23 +364,28 @@ class TJsonParser::TImpl { const size_t columnId = it->second; auto& columnParser = Columns[columnId]; try { - columnParser.ParseJsonValue(item.value(), ParsedValues[columnId][rowId]); + columnParser.ParseJsonValue(rowId, item.value(), ParsedValues[columnId][rowId]); } catch (...) { throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnParser.Name << "' with type " << columnParser.TypeYson << ", description: " << CurrentExceptionMessage(); } } + rowId++; + parsedRows++; + + if (rowId == MaxNumberRows) { + ClearColumns(parsedRows, MaxNumberRows); + rowId = 0; + } } - if (rowId != Buffer.NumberValues) { + if (parsedRows != Buffer.NumberValues) { throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId; } - for (const auto& columnDesc : Columns) { - columnDesc.ValidateNumberValues(rowId, firstOffset); + if (rowId) { + ClearColumns(parsedRows, rowId); } } - - return ParsedValues; } TString GetDescription() const { @@ -385,7 +399,6 @@ class TJsonParser::TImpl { ~TImpl() { with_lock (Alloc) { - ClearColumns(0); ParsedValues.clear(); Columns.clear(); TypeEnv.reset(); @@ -393,18 +406,25 @@ class TJsonParser::TImpl { } private: - void ClearColumns(size_t newSize) { - const auto clearValue = [&allocState = Alloc.Ref()](NYql::NUdf::TUnboxedValue& value){ - value.UnlockRef(1); - value.Clear(); - }; + void ClearColumns(size_t parsedRows, size_t savedRows) { + const ui64 firstOffset = Buffer.Offsets.front(); + for (const auto& column : Columns) { + column.ValidateNumberValues(savedRows, firstOffset); + } - for (size_t i = 0; i < Columns.size(); ++i) { - Columns[i].NumberValues = 0; + { + auto unguard = Unguard(Alloc); + ParseCallback(parsedRows - savedRows, savedRows, ParsedValues); + } + for (size_t i = 0; i < Columns.size(); ++i) { auto& parsedColumn = ParsedValues[i]; - std::for_each(parsedColumn.begin(), parsedColumn.end(), clearValue); - parsedColumn.resize(newSize); + for (size_t rowId : Columns[i].ParsedRows) { + auto& parsedRow = parsedColumn[rowId]; + parsedRow.UnlockRef(1); + parsedRow.Clear(); + } + Columns[i].ParsedRows.clear(); } } @@ -413,18 +433,20 @@ class TJsonParser::TImpl { std::unique_ptr TypeEnv; const ui64 BatchSize; + const ui64 MaxNumberRows; const TDuration BatchCreationTimeout; + const TCallback ParseCallback; TVector Columns; absl::flat_hash_map ColumnsIndex; TJsonParserBuffer Buffer; simdjson::ondemand::parser Parser; - TVector ParsedValues; + TVector> ParsedValues; }; -TJsonParser::TJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) - : Impl(std::make_unique(columns, types, batchSize, batchCreationTimeout)) +TJsonParser::TJsonParser(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) + : Impl(std::make_unique(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize)) {} TJsonParser::~TJsonParser() { @@ -450,16 +472,16 @@ const TVector& TJsonParser::GetOffsets() const { return Impl->GetOffsets(); } -const TVector& TJsonParser::Parse() { - return Impl->Parse(); +void TJsonParser::Parse() { + Impl->Parse(); } TString TJsonParser::GetDescription() const { return Impl->GetDescription(); } -std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) { - return std::unique_ptr(new TJsonParser(columns, types, batchSize, batchCreationTimeout)); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) { + return std::unique_ptr(new TJsonParser(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index 0b5b74de8642..cfb80395d247 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -1,14 +1,17 @@ #pragma once -#include - #include +#include + namespace NFq { class TJsonParser { public: - TJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout); + using TCallback = std::function>& parsedValues)>; + +public: + TJsonParser(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize); ~TJsonParser(); bool IsReady() const; @@ -17,7 +20,7 @@ class TJsonParser { const TVector& GetOffsets() const; void AddMessages(const TVector& messages); - const TVector& Parse(); + void Parse(); TString GetDescription() const; @@ -26,6 +29,6 @@ class TJsonParser { const std::unique_ptr Impl; }; -std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 841ed7cf2d31..885691a52d2e 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -208,7 +208,7 @@ class TTopicSession : public TActorBootstrapped { void SubscribeOnNextEvent(); void SendToParsing(const TVector& messages); void DoParsing(bool force = false); - void DoFiltering(const TVector& offsets, const TVector& parsedValues); + void DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector>& parsedValues); void SendData(TClientsInfo& info); void UpdateParser(); void FatalError(const TString& message, const std::unique_ptr* filter, bool addParserDescription); @@ -235,7 +235,7 @@ class TTopicSession : public TActorBootstrapped { void SendStatistic(); void SendSessionError(NActors::TActorId readActorId, const TString& message); - TVector RebuildJson(const TClientsInfo& info, const TVector& parsedValues); + TVector*> RebuildJson(const TClientsInfo& info, const TVector>& parsedValues); void UpdateParserSchema(const TParserInputType& inputType); void UpdateFieldsIds(TClientsInfo& clientInfo); bool CheckNewClient(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev); @@ -422,8 +422,8 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) { CreateTopicSession(); } -TVector TTopicSession::RebuildJson(const TClientsInfo& info, const TVector& parsedValues) { - TVector result; +TVector*> TTopicSession::RebuildJson(const TClientsInfo& info, const TVector>& parsedValues) { + TVector*> result; const auto& offsets = ParserSchema.FieldsMap; result.reserve(info.FieldsIds.size()); for (auto fieldId : info.FieldsIds) { @@ -623,16 +623,18 @@ void TTopicSession::DoParsing(bool force) { LOG_ROW_DISPATCHER_TRACE("SendToParsing, first offset: " << Parser->GetOffsets().front() << ", number values in buffer " << Parser->GetOffsets().size()); try { - const auto& parsedValues = Parser->Parse(); - DoFiltering(Parser->GetOffsets(), parsedValues); + Parser->Parse(); } catch (const std::exception& e) { FatalError(e.what(), nullptr, true); } } -void TTopicSession::DoFiltering(const TVector& offsets, const TVector& parsedValues) { +void TTopicSession::DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector>& parsedValues) { + const auto& offsets = Parser->GetOffsets(); + Y_ENSURE(rowsOffset < offsets.size(), "Invalid first row ofset"); + Y_ENSURE(numberRows, "Expected non empty parsed batch"); Y_ENSURE(parsedValues, "Expected non empty schema"); - LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets.front() << ", last offset: " << offsets.back()); + LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets[rowsOffset] << ", last offset: " << offsets[rowsOffset + numberRows - 1]); for (auto& [actorId, info] : Clients) { try { @@ -866,7 +868,9 @@ void TTopicSession::UpdateParser() { LOG_ROW_DISPATCHER_TRACE("Init JsonParser with columns: " << JoinSeq(',', names)); const auto& parserConfig = Config.GetJsonParser(); - Parser = NewJsonParser(names, types, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs())); + Parser = NewJsonParser(names, types, [this](ui64 rowsOffset, ui64 numberRows, const TVector>& parsedValues) { + DoFiltering(rowsOffset, numberRows, parsedValues); + }, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()), parserConfig.GetStaticBufferSize()); } catch (const NYql::NPureCalc::TCompileError& e) { FatalError(e.GetIssues(), nullptr, true); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 8f975e004463..b0c26ed30ffa 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -69,7 +69,11 @@ class TFixture : public NUnitTest::TBaseFixture { {.EnabledLLVM = false}); } - const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function valueCreator) { + void Push(const TVector& offsets, const TVector*>& values) { + Filter->Push(offsets, values, 0, values.size()); + } + + const TVector* MakeVector(size_t size, std::function valueCreator) { with_lock (Alloc) { Holders.emplace_front(); for (size_t i = 0; i < size; ++i) { @@ -81,21 +85,21 @@ class TFixture : public NUnitTest::TBaseFixture { } template - const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(const TVector& values, bool optional = false) { + const TVector* MakeVector(const TVector& values, bool optional = false) { return MakeVector(values.size(), [&](size_t i) { NYql::NUdf::TUnboxedValuePod unboxedValue = NYql::NUdf::TUnboxedValuePod(values[i]); return optional ? unboxedValue.MakeOptional() : unboxedValue; }); } - const NKikimr::NMiniKQL::TUnboxedValueVector* MakeStringVector(const TVector& values, bool optional = false) { + const TVector* MakeStringVector(const TVector& values, bool optional = false) { return MakeVector(values.size(), [&](size_t i) { NYql::NUdf::TUnboxedValuePod stringValue = NKikimr::NMiniKQL::MakeString(values[i]); return optional ? stringValue.MakeOptional() : stringValue; }); } - const NKikimr::NMiniKQL::TUnboxedValueVector* MakeEmptyVector(size_t size) { + const TVector* MakeEmptyVector(size_t size) { return MakeVector(size, [&](size_t) { return NYql::NUdf::TUnboxedValuePod(); }); @@ -107,7 +111,7 @@ class TFixture : public NUnitTest::TBaseFixture { std::unique_ptr Filter; NKikimr::NMiniKQL::TScopedAlloc Alloc; - TList Holders; + TList> Holders; }; Y_UNIT_TEST_SUITE(TJsonFilterTests) { @@ -120,8 +124,8 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {MakeStringVector({"hello1"}), MakeVector({99}), MakeStringVector({"zapuskaem"}, true)}); - Filter->Push({6}, {MakeStringVector({"hello2"}), MakeVector({101}), MakeStringVector({"gusya"}, true)}); + Push({5}, {MakeStringVector({"hello1"}), MakeVector({99}), MakeStringVector({"zapuskaem"}, true)}); + Push({6}, {MakeStringVector({"hello2"}), MakeVector({101}), MakeStringVector({"gusya"}, true)}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101,"a@3":"gusya"})", result[6]); } @@ -135,8 +139,8 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {MakeVector({99}), MakeStringVector({"hello1"})}); - Filter->Push({6}, {MakeVector({101}), MakeStringVector({"hello2"})}); + Push({5}, {MakeVector({99}), MakeStringVector({"hello1"})}); + Push({6}, {MakeVector({101}), MakeStringVector({"hello2"})}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); } @@ -152,7 +156,7 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { }); const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; for (ui64 i = 0; i < 5; ++i) { - Filter->Push({2 * i, 2 * i + 1}, {MakeStringVector({"hello1", "hello2"}), MakeVector({99, 101}), MakeStringVector({largeString, largeString})}); + Push({2 * i, 2 * i + 1}, {MakeStringVector({"hello1", "hello2"}), MakeVector({99, 101}), MakeStringVector({largeString, largeString})}); UNIT_ASSERT_VALUES_EQUAL_C(i + 1, result.size(), i); UNIT_ASSERT_VALUES_EQUAL_C(TStringBuilder() << "{\"a1\":\"hello2\",\"a2\":101,\"a3\":\"" << largeString << "\"}", result[2 * i + 1], i); } @@ -167,10 +171,24 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {MakeEmptyVector(1), MakeStringVector({"str"})}); + Push({5}, {MakeEmptyVector(1), MakeStringVector({"str"})}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null,"a2":"str"})", result[5]); } + + Y_UNIT_TEST_F(PartialPush, TFixture) { + TMap result; + MakeFilter( + {"a1", "a2", "a@3"}, + {"[DataType; String]", "[DataType; Uint64]", "[OptionalType; [DataType; String]]"}, + "where a2 > 50", + [&](ui64 offset, const TString& json) { + result[offset] = json; + }); + Filter->Push({5, 6, 7}, {MakeStringVector({"hello1", "hello2"}), MakeVector({99, 101}), MakeStringVector({"zapuskaem", "gusya"}, true)}, 1, 1); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello1","a2":99,"a@3":"zapuskaem"})", result[6]); + } } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index a3859130962f..48caef27767d 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -46,20 +46,25 @@ class TFixture : public NUnitTest::TBaseFixture { } } + void MakeParser(TVector columns, TVector types, TJsonParser::TCallback callback, ui64 staticBufferSize = 1000) { + Parser = NFq::NewJsonParser(columns, types, callback, 0, TDuration::Zero(), staticBufferSize); + } + + void MakeParser(TVector columns, TJsonParser::TCallback callback) { + MakeParser(columns, TVector(columns.size(), "[DataType; String]"), callback); + } + void MakeParser(TVector columns, TVector types) { - Parser = NFq::NewJsonParser(columns, types, 0, TDuration::Zero()); + MakeParser(columns, types, [](ui64, ui64, const TVector>&) {}); } void MakeParser(TVector columns) { - MakeParser(columns, TVector(columns.size(), "[DataType; String]")); + MakeParser(columns, TVector(columns.size(), "[DataType; String]"), [](ui64, ui64, const TVector>&) {}); } - const TVector& PushToParser(ui64 offset, const TString& data) { + void PushToParser(ui64 offset, const TString& data) { Parser->AddMessages({GetMessage(offset, data)}); - - const auto& parsedValues = Parser->Parse(); - ResultNumberValues = parsedValues ? parsedValues.front().size() : 0; - return parsedValues; + Parser->Parse(); } static NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage GetMessage(ui64 offset, const TString& data) { @@ -70,118 +75,137 @@ class TFixture : public NUnitTest::TBaseFixture { TActorSystemStub actorSystemStub; NActors::TTestActorRuntime Runtime; std::unique_ptr Parser; - ui64 ResultNumberValues = 0; }; Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple1, TFixture) { - MakeParser({"a1", "a2"}, {"[DataType; String]", "[OptionalType; [DataType; Uint64]]"}); - const auto& result = PushToParser(42,R"({"a1": "hello1", "a2": 101, "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL(101, result[1][0].GetOptionalValue().Get()); + MakeParser({"a1", "a2"}, {"[DataType; String]", "[OptionalType; [DataType; Uint64]]"}, [](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(101, result[1][0].GetOptionalValue().Get()); + }); + PushToParser(42,R"({"a1": "hello1", "a2": 101, "event": "event1"})"); } Y_UNIT_TEST_F(Simple2, TFixture) { - MakeParser({"a2", "a1"}); - const auto& result = PushToParser(42,R"({"a1": "hello1", "a2": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + MakeParser({"a2", "a1"}, [](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + }); + PushToParser(42,R"({"a1": "hello1", "a2": "101", "event": "event1"})"); } Y_UNIT_TEST_F(Simple3, TFixture) { - MakeParser({"a1", "a2"}); - const auto& result = PushToParser(42,R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + MakeParser({"a1", "a2"}, [](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + }); + PushToParser(42,R"({"a2": "hello1", "a1": "101", "event": "event1"})"); } Y_UNIT_TEST_F(Simple4, TFixture) { - MakeParser({"a2", "a1"}); - const auto& result = PushToParser(42, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1][0].AsStringRef())); + MakeParser({"a2", "a1"}, [](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1][0].AsStringRef())); + }); + PushToParser(42, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); } Y_UNIT_TEST_F(LargeStrings, TFixture) { - MakeParser({"col"}); - const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; + + MakeParser({"col"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(2, numberRows); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][1].AsStringRef())); + }); + const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; Parser->AddMessages({ GetMessage(42, jsonString), GetMessage(43, jsonString) }); - - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][1].AsStringRef())); + Parser->Parse(); } Y_UNIT_TEST_F(ManyValues, TFixture) { - MakeParser({"a1", "a2"}); + MakeParser({"a1", "a2"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(3, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + for (size_t i = 0; i < numberRows; ++i) { + UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i); + UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i); + } + }); Parser->AddMessages({ GetMessage(42, R"({"a1": "hello1", "a2": "101", "event": "event1"})"), GetMessage(43, R"({"a1": "hello1", "a2": "101", "event": "event2"})"), GetMessage(44, R"({"a2": "101", "a1": "hello1", "event": "event3"})") }); - - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - for (size_t i = 0; i < ResultNumberValues; ++i) { - UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i); - UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i); - } + Parser->Parse(); } Y_UNIT_TEST_F(MissingFields, TFixture) { - MakeParser({"a1", "a2"}, {"[OptionalType; [DataType; String]]", "[OptionalType; [DataType; Uint64]]"}); + MakeParser({"a1", "a2"}, {"[OptionalType; [DataType; String]]", "[OptionalType; [DataType; Uint64]]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(3, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + for (size_t i = 0; i < numberRows; ++i) { + if (i == 2) { + UNIT_ASSERT_C(!result[0][i], i); + } else { + NYql::NUdf::TUnboxedValue value = result[0][i].GetOptionalValue(); + UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(value.AsStringRef()), i); + } + if (i == 1) { + UNIT_ASSERT_C(!result[1][i], i); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(101, result[1][i].GetOptionalValue().Get(), i); + } + } + }); Parser->AddMessages({ GetMessage(42, R"({"a1": "hello1", "a2": 101 , "event": "event1"})"), GetMessage(43, R"({"a1": "hello1", "event": "event2"})"), GetMessage(44, R"({"a2": "101", "a1": null, "event": "event3"})") }); - - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - for (size_t i = 0; i < ResultNumberValues; ++i) { - if (i == 2) { - UNIT_ASSERT_C(!result[0][i], i); - } else { - NYql::NUdf::TUnboxedValue value = result[0][i].GetOptionalValue(); - UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(value.AsStringRef()), i); - } - if (i == 1) { - UNIT_ASSERT_C(!result[1][i], i); - } else { - UNIT_ASSERT_VALUES_EQUAL_C(101, result[1][i].GetOptionalValue().Get(), i); - } - } + Parser->Parse(); } Y_UNIT_TEST_F(NestedTypes, TFixture) { - MakeParser({"nested", "a1"}, {"[OptionalType; [DataType; Json]]", "[DataType; String]"}); + MakeParser({"nested", "a1"}, {"[OptionalType; [DataType; Json]]", "[DataType; String]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(4, numberRows); + + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + + UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0][1].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1][1].AsStringRef())); + + UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0][2].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1][2].AsStringRef())); + + UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0][3].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1][3].AsStringRef())); + }); Parser->AddMessages({ GetMessage(42, R"({"a1": "hello1", "nested": {"key": "value"}})"), @@ -189,39 +213,45 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { GetMessage(43, R"({"a1": "hello3", "nested": "some string"})"), GetMessage(43, R"({"a1": "hello4", "nested": 123456})") }); - - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(4, ResultNumberValues); - - UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0][0].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); - - UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0][1].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1][1].AsStringRef())); - - UNIT_ASSERT_VALUES_EQUAL("\"some string\"", TString(result[0][2].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello3", TString(result[1][2].AsStringRef())); - - UNIT_ASSERT_VALUES_EQUAL("123456", TString(result[0][3].AsStringRef())); - UNIT_ASSERT_VALUES_EQUAL("hello4", TString(result[1][3].AsStringRef())); + Parser->Parse(); } Y_UNIT_TEST_F(SimpleBooleans, TFixture) { - MakeParser({"a"}, {"[DataType; Bool]"}); + MakeParser({"a"}, {"[DataType; Bool]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(3, numberRows); + + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); + }); + Parser->AddMessages({ GetMessage(42, R"({"a": true})"), GetMessage(43, R"({"a": false})") }); + Parser->Parse(); + } + + Y_UNIT_TEST_F(ManyBatches, TFixture) { + const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; - const auto& result = Parser->Parse(); - ResultNumberValues = result.front().size(); - UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues); + ui64 currentOffset = 0; + MakeParser({"col"}, {"[DataType; String]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(currentOffset, rowsOffset); + currentOffset++; - UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); - UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + }, 1); + + const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; + Parser->AddMessages({ + GetMessage(42, jsonString), + GetMessage(43, jsonString) + }); + Parser->Parse(); } Y_UNIT_TEST_F(MissingFieldsValidation, TFixture) { From b9b6906439ea5e9515770d022f8e726160bba4c4 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 14 Nov 2024 17:58:06 +0000 Subject: [PATCH 2/7] Removed copy of lambsa --- ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 48caef27767d..8d1819065a38 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -59,7 +59,7 @@ class TFixture : public NUnitTest::TBaseFixture { } void MakeParser(TVector columns) { - MakeParser(columns, TVector(columns.size(), "[DataType; String]"), [](ui64, ui64, const TVector>&) {}); + MakeParser(columns, TVector(columns.size(), "[DataType; String]")); } void PushToParser(ui64 offset, const TString& data) { From 9da66ef411f77a2b26de92d425d653cb8c8c1bff Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 14 Nov 2024 18:10:59 +0000 Subject: [PATCH 3/7] Fixed build --- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 885691a52d2e..3b3c6ca00afe 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -639,7 +639,7 @@ void TTopicSession::DoFiltering(ui64 rowsOffset, ui64 numberRows, const TVector< for (auto& [actorId, info] : Clients) { try { if (info.Filter) { - info.Filter->Push(offsets, RebuildJson(info, parsedValues)); + info.Filter->Push(offsets, RebuildJson(info, parsedValues), rowsOffset, numberRows); } } catch (const std::exception& e) { FatalError(e.what(), &info.Filter, false); From c9f848cd073676f67429ca7eccf6c84a7f195996 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 14 Nov 2024 18:45:07 +0000 Subject: [PATCH 4/7] Fixed typos --- ydb/core/fq/libs/config/protos/row_dispatcher.proto | 2 +- ydb/core/fq/libs/row_dispatcher/json_filter.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index feb3c830b614..ca299df00edd 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -17,7 +17,7 @@ message TRowDispatcherCoordinatorConfig { message TJsonParserConfig { uint64 BatchSizeBytes = 1; uint64 BatchCreationTimeoutMs = 2; - uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6 ~ 24 MiB + uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6 } message TRowDispatcherConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index eef834281818..597b1e27a442 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -74,7 +74,7 @@ NYT::TNode MakeOutputSchema() { struct TInputType { const TVector& Offsets; const TVector*>& Values; - const ui64 RowsOffset; // ofset of first value + const ui64 RowsOffset; // offset of first value const ui64 NumberRows; ui64 GetOffset(ui64 rowId) const { From 6710cfa8a1d3637673bfd9843d8aa85c5d7a1351 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Fri, 15 Nov 2024 09:49:19 +0000 Subject: [PATCH 5/7] Fixed tests 2 --- .../libs/config/protos/row_dispatcher.proto | 4 +- .../fq/libs/row_dispatcher/json_filter.cpp | 12 ++-- .../fq/libs/row_dispatcher/json_parser.cpp | 62 +++++++++---------- .../libs/row_dispatcher/ut/json_filter_ut.cpp | 6 +- .../libs/row_dispatcher/ut/json_parser_ut.cpp | 31 ++++++++-- 5 files changed, 71 insertions(+), 44 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index ca299df00edd..900fa2de9308 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -15,8 +15,8 @@ message TRowDispatcherCoordinatorConfig { } message TJsonParserConfig { - uint64 BatchSizeBytes = 1; - uint64 BatchCreationTimeoutMs = 2; + uint64 BatchSizeBytes = 1; // default 1 MiB + uint64 BatchCreationTimeoutMs = 2; // default 1 second uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6 } diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 597b1e27a442..2977f6d03bb6 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -139,6 +139,13 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer { NKikimr::NMiniKQL::TThrowingBindTerminator bind; with_lock (Worker->GetScopedAlloc()) { + Y_DEFER { + // Clear cache after each object because + // values allocated on another allocator and should be released + Cache.Clear(); + Worker->GetGraph().Invalidate(); + }; + auto& holderFactory = Worker->GetGraph().GetHolderFactory(); // TODO: use blocks here @@ -159,11 +166,6 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer { Worker->Push(std::move(result)); } - - // Clear cache after each object because - // values allocated on another allocator and should be released - Cache.Clear(); - Worker->GetGraph().Invalidate(); } } diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index cfec2f4540bb..d8cfd210365f 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -19,7 +19,9 @@ namespace { TString LogPrefix = "JsonParser: "; +constexpr ui64 DEFAULT_BATCH_SIZE = 1_MB; constexpr ui64 DEFAULT_STATIC_BUFFER_SIZE = 1000000; +constexpr TDuration DEFAULT_BATCH_CREATION_TIMEOUT = TDuration::Seconds(1); struct TJsonParserBuffer { size_t NumberValues = 0; @@ -40,20 +42,11 @@ struct TJsonParserBuffer { Offsets.reserve(numberValues); } - void AddMessages(const TVector& messages) { + void AddMessage(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { Y_ENSURE(!Finished, "Cannot add messages into finished buffer"); - - size_t messagesSize = 0; - for (const auto& message : messages) { - messagesSize += message.GetData().size(); - } - - NumberValues += messages.size(); - Reserve(Values.size() + messagesSize, NumberValues); - for (const auto& message : messages) { - Values << message.GetData(); - Offsets.emplace_back(message.GetOffset()); - } + NumberValues++; + Values << message.GetData(); + Offsets.emplace_back(message.GetOffset()); } std::pair Finish() { @@ -102,8 +95,8 @@ class TColumnParser { } void ParseJsonValue(ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { - Parser(jsonValue, resultValue); ParsedRows.emplace_back(rowId); + Parser(jsonValue, resultValue); } void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) const { @@ -280,9 +273,9 @@ class TJsonParser::TImpl { TImpl(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) : Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) , TypeEnv(std::make_unique(Alloc)) - , BatchSize(batchSize) + , BatchSize(batchSize ? batchSize : DEFAULT_BATCH_SIZE) , MaxNumberRows(((staticBufferSize ? staticBufferSize : DEFAULT_STATIC_BUFFER_SIZE) - 1) / columns.size() + 1) - , BatchCreationTimeout(batchCreationTimeout) + , BatchCreationTimeout(batchCreationTimeout ? batchCreationTimeout : DEFAULT_BATCH_CREATION_TIMEOUT) , ParseCallback(parseCallback) , ParsedValues(columns.size()) { @@ -330,14 +323,13 @@ class TJsonParser::TImpl { } void AddMessages(const TVector& messages) { - if (messages.empty()) { - return; - } - - if (Buffer.Finished) { - Buffer.Clear(); + Y_ENSURE(!Buffer.Finished, "Cannot add messages into finished buffer"); + for (const auto& message : messages) { + Buffer.AddMessage(message); + if (IsReady()) { + Parse(); + } } - Buffer.AddMessages(messages); } void Parse() { @@ -347,13 +339,18 @@ class TJsonParser::TImpl { LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values); with_lock (Alloc) { - const ui64 firstOffset = Buffer.Offsets.front(); + Y_DEFER { + // Clear all UV in case of exception + ClearColumns(); + Buffer.Clear(); + }; + size_t rowId = 0; size_t parsedRows = 0; simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE); for (auto document : documents) { if (Y_UNLIKELY(parsedRows >= Buffer.NumberValues)) { - throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << parsedRows + 1; + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << parsedRows + 1; } for (auto item : document.get_object()) { const auto it = ColumnsIndex.find(item.escaped_key().value()); @@ -372,18 +369,17 @@ class TJsonParser::TImpl { rowId++; parsedRows++; - if (rowId == MaxNumberRows) { - ClearColumns(parsedRows, MaxNumberRows); + FlushColumns(parsedRows, MaxNumberRows); rowId = 0; } } - if (parsedRows != Buffer.NumberValues) { - throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId; + if (Y_UNLIKELY(parsedRows != Buffer.NumberValues)) { + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId; } if (rowId) { - ClearColumns(parsedRows, rowId); + FlushColumns(parsedRows, rowId); } } } @@ -406,7 +402,7 @@ class TJsonParser::TImpl { } private: - void ClearColumns(size_t parsedRows, size_t savedRows) { + void FlushColumns(size_t parsedRows, size_t savedRows) { const ui64 firstOffset = Buffer.Offsets.front(); for (const auto& column : Columns) { column.ValidateNumberValues(savedRows, firstOffset); @@ -417,6 +413,10 @@ class TJsonParser::TImpl { ParseCallback(parsedRows - savedRows, savedRows, ParsedValues); } + ClearColumns(); + } + + void ClearColumns() { for (size_t i = 0; i < Columns.size(); ++i) { auto& parsedColumn = ParsedValues[i]; for (size_t rowId : Columns[i].ParsedRows) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index b0c26ed30ffa..b360d159d5ca 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -26,7 +26,9 @@ class TFixture : public NUnitTest::TBaseFixture { : PureCalcProgramFactory(CreatePureCalcProgramFactory()) , Runtime(true) , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) - {} + { + Alloc.Ref().UseRefLocking = true; + } static void SegmentationFaultHandler(int) { Cerr << "segmentation fault call stack:" << Endl; @@ -70,7 +72,7 @@ class TFixture : public NUnitTest::TBaseFixture { } void Push(const TVector& offsets, const TVector*>& values) { - Filter->Push(offsets, values, 0, values.size()); + Filter->Push(offsets, values, 0, values.front()->size()); } const TVector* MakeVector(size_t size, std::function valueCreator) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 8d1819065a38..3ab54adfdd74 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -46,8 +46,8 @@ class TFixture : public NUnitTest::TBaseFixture { } } - void MakeParser(TVector columns, TVector types, TJsonParser::TCallback callback, ui64 staticBufferSize = 1000) { - Parser = NFq::NewJsonParser(columns, types, callback, 0, TDuration::Zero(), staticBufferSize); + void MakeParser(TVector columns, TVector types, TJsonParser::TCallback callback, ui64 batchSize = 1_MB, ui64 staticBufferSize = 1000) { + Parser = NFq::NewJsonParser(columns, types, callback, batchSize, TDuration::Hours(1), staticBufferSize); } void MakeParser(TVector columns, TJsonParser::TCallback callback) { @@ -219,7 +219,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(SimpleBooleans, TFixture) { MakeParser({"a"}, {"[DataType; Bool]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); - UNIT_ASSERT_VALUES_EQUAL(3, numberRows); + UNIT_ASSERT_VALUES_EQUAL(2, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); @@ -244,7 +244,7 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { UNIT_ASSERT_VALUES_EQUAL(1, numberRows); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); - }, 1); + }, 1_MB, 1); const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; Parser->AddMessages({ @@ -254,6 +254,29 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Parser->Parse(); } + Y_UNIT_TEST_F(LittleBatches, TFixture) { + const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; + + ui64 currentOffset = 42; + MakeParser({"col"}, {"[DataType; String]"}, [&](ui64 rowsOffset, ui64 numberRows, const TVector>& result) { + UNIT_ASSERT_VALUES_EQUAL(Parser->GetOffsets().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(Parser->GetOffsets().front(), currentOffset); + currentOffset++; + + UNIT_ASSERT_VALUES_EQUAL(0, rowsOffset); + UNIT_ASSERT_VALUES_EQUAL(1, numberRows); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + }, 10); + + const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; + Parser->AddMessages({ + GetMessage(42, jsonString), + GetMessage(43, jsonString) + }); + UNIT_ASSERT_VALUES_EQUAL(Parser->GetNumberValues(), 0); + } + Y_UNIT_TEST_F(MissingFieldsValidation, TFixture) { MakeParser({"a1", "a2"}, {"[DataType; String]", "[DataType; Uint64]"}); UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": "hello1", "a2": null, "event": "event1"})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a2' with type [DataType; Uint64], description: (yexception) found unexpected null value, expected non optional data type Uint64"); From dacba22371b5db6e60d2345c7c0f08d1dfabba81 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Fri, 15 Nov 2024 10:00:37 +0000 Subject: [PATCH 6/7] Fixed unit tests 3 --- ydb/core/fq/libs/config/protos/row_dispatcher.proto | 2 +- ydb/core/fq/libs/row_dispatcher/json_parser.cpp | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index 900fa2de9308..955944704723 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -16,7 +16,7 @@ message TRowDispatcherCoordinatorConfig { message TJsonParserConfig { uint64 BatchSizeBytes = 1; // default 1 MiB - uint64 BatchCreationTimeoutMs = 2; // default 1 second + uint64 BatchCreationTimeoutMs = 2; uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6 } diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index d8cfd210365f..dd8d669ce207 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -21,7 +21,6 @@ TString LogPrefix = "JsonParser: "; constexpr ui64 DEFAULT_BATCH_SIZE = 1_MB; constexpr ui64 DEFAULT_STATIC_BUFFER_SIZE = 1000000; -constexpr TDuration DEFAULT_BATCH_CREATION_TIMEOUT = TDuration::Seconds(1); struct TJsonParserBuffer { size_t NumberValues = 0; @@ -275,7 +274,7 @@ class TJsonParser::TImpl { , TypeEnv(std::make_unique(Alloc)) , BatchSize(batchSize ? batchSize : DEFAULT_BATCH_SIZE) , MaxNumberRows(((staticBufferSize ? staticBufferSize : DEFAULT_STATIC_BUFFER_SIZE) - 1) / columns.size() + 1) - , BatchCreationTimeout(batchCreationTimeout ? batchCreationTimeout : DEFAULT_BATCH_CREATION_TIMEOUT) + , BatchCreationTimeout(batchCreationTimeout) , ParseCallback(parseCallback) , ParsedValues(columns.size()) { @@ -326,7 +325,7 @@ class TJsonParser::TImpl { Y_ENSURE(!Buffer.Finished, "Cannot add messages into finished buffer"); for (const auto& message : messages) { Buffer.AddMessage(message); - if (IsReady()) { + if (Buffer.IsReady() && Buffer.GetSize() >= BatchSize) { Parse(); } } From d05bdbde780398d394d6969fd8cb791520cbfa87 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Sun, 17 Nov 2024 15:47:05 +0000 Subject: [PATCH 7/7] Refactored StaticBufferSize -> BufferCellCount --- .../fq/libs/config/protos/row_dispatcher.proto | 2 +- ydb/core/fq/libs/row_dispatcher/json_parser.cpp | 14 +++++++------- ydb/core/fq/libs/row_dispatcher/json_parser.h | 4 ++-- ydb/core/fq/libs/row_dispatcher/topic_session.cpp | 2 +- .../fq/libs/row_dispatcher/ut/json_parser_ut.cpp | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/ydb/core/fq/libs/config/protos/row_dispatcher.proto b/ydb/core/fq/libs/config/protos/row_dispatcher.proto index 955944704723..e4f5d180cb7a 100644 --- a/ydb/core/fq/libs/config/protos/row_dispatcher.proto +++ b/ydb/core/fq/libs/config/protos/row_dispatcher.proto @@ -17,7 +17,7 @@ message TRowDispatcherCoordinatorConfig { message TJsonParserConfig { uint64 BatchSizeBytes = 1; // default 1 MiB uint64 BatchCreationTimeoutMs = 2; - uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6 + uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6 } message TRowDispatcherConfig { diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index dd8d669ce207..41428a939a99 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -20,7 +20,7 @@ namespace { TString LogPrefix = "JsonParser: "; constexpr ui64 DEFAULT_BATCH_SIZE = 1_MB; -constexpr ui64 DEFAULT_STATIC_BUFFER_SIZE = 1000000; +constexpr ui64 DEFAULT_BUFFER_CELL_COUNT = 1000000; struct TJsonParserBuffer { size_t NumberValues = 0; @@ -269,11 +269,11 @@ namespace NFq { class TJsonParser::TImpl { public: - TImpl(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) + TImpl(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount) : Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) , TypeEnv(std::make_unique(Alloc)) , BatchSize(batchSize ? batchSize : DEFAULT_BATCH_SIZE) - , MaxNumberRows(((staticBufferSize ? staticBufferSize : DEFAULT_STATIC_BUFFER_SIZE) - 1) / columns.size() + 1) + , MaxNumberRows(((bufferCellCount ? bufferCellCount : DEFAULT_BUFFER_CELL_COUNT) - 1) / columns.size() + 1) , BatchCreationTimeout(batchCreationTimeout) , ParseCallback(parseCallback) , ParsedValues(columns.size()) @@ -444,8 +444,8 @@ class TJsonParser::TImpl { TVector> ParsedValues; }; -TJsonParser::TJsonParser(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) - : Impl(std::make_unique(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize)) +TJsonParser::TJsonParser(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount) + : Impl(std::make_unique(columns, types, parseCallback, batchSize, batchCreationTimeout, bufferCellCount)) {} TJsonParser::~TJsonParser() { @@ -479,8 +479,8 @@ TString TJsonParser::GetDescription() const { return Impl->GetDescription(); } -std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) { - return std::unique_ptr(new TJsonParser(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize)); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount) { + return std::unique_ptr(new TJsonParser(columns, types, parseCallback, batchSize, batchCreationTimeout, bufferCellCount)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index cfb80395d247..77c6cccf9f6e 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -11,7 +11,7 @@ class TJsonParser { using TCallback = std::function>& parsedValues)>; public: - TJsonParser(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize); + TJsonParser(const TVector& columns, const TVector& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount); ~TJsonParser(); bool IsReady() const; @@ -29,6 +29,6 @@ class TJsonParser { const std::unique_ptr Impl; }; -std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 3b3c6ca00afe..32a2d36871b8 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -870,7 +870,7 @@ void TTopicSession::UpdateParser() { const auto& parserConfig = Config.GetJsonParser(); Parser = NewJsonParser(names, types, [this](ui64 rowsOffset, ui64 numberRows, const TVector>& parsedValues) { DoFiltering(rowsOffset, numberRows, parsedValues); - }, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()), parserConfig.GetStaticBufferSize()); + }, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()), parserConfig.GetBufferCellCount()); } catch (const NYql::NPureCalc::TCompileError& e) { FatalError(e.GetIssues(), nullptr, true); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 3ab54adfdd74..c01ce5ae7601 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -46,8 +46,8 @@ class TFixture : public NUnitTest::TBaseFixture { } } - void MakeParser(TVector columns, TVector types, TJsonParser::TCallback callback, ui64 batchSize = 1_MB, ui64 staticBufferSize = 1000) { - Parser = NFq::NewJsonParser(columns, types, callback, batchSize, TDuration::Hours(1), staticBufferSize); + void MakeParser(TVector columns, TVector types, TJsonParser::TCallback callback, ui64 batchSize = 1_MB, ui64 bufferCellCount = 1000) { + Parser = NFq::NewJsonParser(columns, types, callback, batchSize, TDuration::Hours(1), bufferCellCount); } void MakeParser(TVector columns, TJsonParser::TCallback callback) {