Skip to content

Commit 4323173

Browse files
committed
Added limit for parser bufer size
1 parent 4239317 commit 4323173

File tree

8 files changed

+266
-179
lines changed

8 files changed

+266
-179
lines changed

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ message TRowDispatcherCoordinatorConfig {
1717
message TJsonParserConfig {
1818
uint64 BatchSizeBytes = 1;
1919
uint64 BatchCreationTimeoutMs = 2;
20+
uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6 ~ 24 MiB
2021
}
2122

2223
message TRowDispatcherConfig {

ydb/core/fq/libs/row_dispatcher/json_filter.cpp

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ NYT::TNode MakeOutputSchema() {
7171
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
7272
}
7373

74+
struct TInputType {
75+
const TVector<ui64>& Offsets;
76+
const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& Values;
77+
const ui64 RowsOffset; // ofset of first value
78+
const ui64 NumberRows;
79+
80+
ui64 GetOffset(ui64 rowId) const {
81+
return Offsets[rowId + RowsOffset];
82+
}
83+
};
84+
7485
class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
7586
public:
7687
TFilterInputSpec(const NYT::TNode& schema)
@@ -85,7 +96,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
8596
TVector<NYT::TNode> Schemas;
8697
};
8798

88-
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
99+
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {
89100
public:
90101
TFilterInputConsumer(
91102
const TFilterInputSpec& spec,
@@ -123,26 +134,26 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
123134
}
124135
}
125136

126-
void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
127-
Y_ENSURE(FieldsPositions.size() == values.second.size());
137+
void OnObject(TInputType input) override {
138+
Y_ENSURE(FieldsPositions.size() == input.Values.size());
128139

129140
NKikimr::NMiniKQL::TThrowingBindTerminator bind;
130141
with_lock (Worker->GetScopedAlloc()) {
131142
auto& holderFactory = Worker->GetGraph().GetHolderFactory();
132143

133144
// TODO: use blocks here
134-
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
145+
for (size_t rowId = 0; rowId < input.NumberRows; ++rowId) {
135146
NYql::NUdf::TUnboxedValue* items = nullptr;
136147

137148
NYql::NUdf::TUnboxedValue result = Cache.NewArray(
138149
holderFactory,
139-
static_cast<ui32>(values.second.size() + 1),
150+
static_cast<ui32>(input.Values.size() + 1),
140151
items);
141152

142-
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first[rowId]);
153+
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(input.GetOffset(rowId));
143154

144155
size_t fieldId = 0;
145-
for (const auto& column : values.second) {
156+
for (const auto column : input.Values) {
146157
items[FieldsPositions[fieldId++]] = column->at(rowId);
147158
}
148159

@@ -236,7 +247,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
236247
static constexpr bool IsPartial = false;
237248
static constexpr bool SupportPushStreamMode = true;
238249

239-
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;
250+
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<TInputType>>;
240251

241252
static TConsumerType MakeConsumer(
242253
const TFilterInputSpec& spec,
@@ -282,9 +293,9 @@ class TJsonFilter::TImpl {
282293
LOG_ROW_DISPATCHER_DEBUG("Program created");
283294
}
284295

285-
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
296+
void Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows) {
286297
Y_ENSURE(values, "Expected non empty schema");
287-
InputConsumer->OnObject(std::make_pair(offsets, values));
298+
InputConsumer->OnObject({.Offsets = offsets, .Values = values, .RowsOffset = rowsOffset, .NumberRows = numberRows});
288299
}
289300

290301
TString GetSql() const {
@@ -305,7 +316,7 @@ class TJsonFilter::TImpl {
305316

306317
private:
307318
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
308-
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
319+
THolder<NYql::NPureCalc::IConsumer<TInputType>> InputConsumer;
309320
const TString Sql;
310321
};
311322

@@ -322,8 +333,8 @@ TJsonFilter::TJsonFilter(
322333
TJsonFilter::~TJsonFilter() {
323334
}
324335

325-
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
326-
Impl->Push(offsets, values);
336+
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows) {
337+
Impl->Push(offsets, values, rowsOffset, numberRows);
327338
}
328339

329340
TString TJsonFilter::GetSql() {

ydb/core/fq/libs/row_dispatcher/json_filter.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
#include "common.h"
44

5-
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
6-
#include <yql/essentials/public/udf/udf_data_type.h>
75
#include <yql/essentials/public/udf/udf_value.h>
86

97
namespace NFq {
@@ -23,7 +21,7 @@ class TJsonFilter {
2321

2422
~TJsonFilter();
2523

26-
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
24+
void Push(const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 rowsOffset, ui64 numberRows);
2725
TString GetSql();
2826

2927
private:

ydb/core/fq/libs/row_dispatcher/json_parser.cpp

Lines changed: 60 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
#include <yql/essentials/minikql/dom/json.h>
66
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
7+
#include <yql/essentials/minikql/mkql_function_registry.h>
78
#include <yql/essentials/minikql/mkql_node_cast.h>
89
#include <yql/essentials/minikql/mkql_program_builder.h>
910
#include <yql/essentials/minikql/mkql_string_util.h>
11+
#include <yql/essentials/minikql/mkql_type_ops.h>
1012
#include <yql/essentials/providers/common/schema/mkql/yql_mkql_schema.h>
1113

1214
#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>
@@ -17,6 +19,8 @@ namespace {
1719

1820
TString LogPrefix = "JsonParser: ";
1921

22+
constexpr ui64 DEFAULT_STATIC_BUFFER_SIZE = 1000000;
23+
2024
struct TJsonParserBuffer {
2125
size_t NumberValues = 0;
2226
bool Finished = false;
@@ -80,31 +84,31 @@ class TColumnParser {
8084
const TString TypeYson;
8185
const NKikimr::NMiniKQL::TType* TypeMkql;
8286
const bool IsOptional = false;
83-
size_t NumberValues = 0;
87+
TVector<size_t> ParsedRows;
8488

8589
public:
86-
TColumnParser(const TString& name, const TString& typeYson, NKikimr::NMiniKQL::TProgramBuilder& programBuilder)
90+
TColumnParser(const TString& name, const TString& typeYson, ui64 maxNumberRows, NKikimr::NMiniKQL::TProgramBuilder& programBuilder)
8791
: Name(name)
8892
, TypeYson(typeYson)
8993
, TypeMkql(NYql::NCommon::ParseTypeFromYson(TStringBuf(typeYson), programBuilder, Cerr))
9094
, IsOptional(TypeMkql->IsOptional())
91-
, NumberValues(0)
9295
{
96+
ParsedRows.reserve(maxNumberRows);
9397
try {
9498
Parser = CreateParser(TypeMkql);
9599
} catch (...) {
96100
throw yexception() << "Failed to create parser for column '" << Name << "' with type " << TypeYson << ", description: " << CurrentExceptionMessage();
97101
}
98102
}
99103

100-
void ParseJsonValue(simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
104+
void ParseJsonValue(ui64 rowId, simdjson::builtin::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) {
101105
Parser(jsonValue, resultValue);
102-
NumberValues++;
106+
ParsedRows.emplace_back(rowId);
103107
}
104108

105109
void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) const {
106-
if (Y_UNLIKELY(!IsOptional && NumberValues < expectedNumberValues)) {
107-
throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - NumberValues << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson;
110+
if (Y_UNLIKELY(!IsOptional && ParsedRows.size() < expectedNumberValues)) {
111+
throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - ParsedRows.size() << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson;
108112
}
109113
}
110114

@@ -273,11 +277,13 @@ namespace NFq {
273277

274278
class TJsonParser::TImpl {
275279
public:
276-
TImpl(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout)
280+
TImpl(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize)
277281
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
278282
, TypeEnv(std::make_unique<NKikimr::NMiniKQL::TTypeEnvironment>(Alloc))
279283
, BatchSize(batchSize)
284+
, MaxNumberRows(((staticBufferSize ? staticBufferSize : DEFAULT_STATIC_BUFFER_SIZE) - 1) / columns.size() + 1)
280285
, BatchCreationTimeout(batchCreationTimeout)
286+
, ParseCallback(parseCallback)
281287
, ParsedValues(columns.size())
282288
{
283289
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
@@ -288,7 +294,7 @@ class TJsonParser::TImpl {
288294

289295
Columns.reserve(columns.size());
290296
for (size_t i = 0; i < columns.size(); i++) {
291-
Columns.emplace_back(columns[i], types[i], programBuilder);
297+
Columns.emplace_back(columns[i], types[i], MaxNumberRows, programBuilder);
292298
}
293299
}
294300

@@ -297,7 +303,11 @@ class TJsonParser::TImpl {
297303
ColumnsIndex.emplace(std::string_view(Columns[i].Name), i);
298304
}
299305

300-
Buffer.Reserve(BatchSize, 1);
306+
for (size_t i = 0; i < columns.size(); i++) {
307+
ParsedValues[i].resize(MaxNumberRows);
308+
}
309+
310+
Buffer.Reserve(BatchSize, MaxNumberRows);
301311

302312
LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name());
303313
Parser.threaded = false;
@@ -330,21 +340,20 @@ class TJsonParser::TImpl {
330340
Buffer.AddMessages(messages);
331341
}
332342

333-
const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& Parse() {
343+
void Parse() {
334344
Y_ENSURE(Buffer.IsReady(), "Nothing to parse");
335345

336346
const auto [values, size] = Buffer.Finish();
337347
LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values);
338348

339349
with_lock (Alloc) {
340-
ClearColumns(Buffer.NumberValues);
341-
342350
const ui64 firstOffset = Buffer.Offsets.front();
343351
size_t rowId = 0;
352+
size_t parsedRows = 0;
344353
simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE);
345354
for (auto document : documents) {
346-
if (Y_UNLIKELY(rowId >= Buffer.NumberValues)) {
347-
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId + 1;
355+
if (Y_UNLIKELY(parsedRows >= Buffer.NumberValues)) {
356+
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << parsedRows + 1;
348357
}
349358
for (auto item : document.get_object()) {
350359
const auto it = ColumnsIndex.find(item.escaped_key().value());
@@ -355,23 +364,28 @@ class TJsonParser::TImpl {
355364
const size_t columnId = it->second;
356365
auto& columnParser = Columns[columnId];
357366
try {
358-
columnParser.ParseJsonValue(item.value(), ParsedValues[columnId][rowId]);
367+
columnParser.ParseJsonValue(rowId, item.value(), ParsedValues[columnId][rowId]);
359368
} catch (...) {
360369
throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnParser.Name << "' with type " << columnParser.TypeYson << ", description: " << CurrentExceptionMessage();
361370
}
362371
}
372+
363373
rowId++;
374+
parsedRows++;
375+
376+
if (rowId == MaxNumberRows) {
377+
ClearColumns(parsedRows, MaxNumberRows);
378+
rowId = 0;
379+
}
364380
}
365381

366-
if (rowId != Buffer.NumberValues) {
382+
if (parsedRows != Buffer.NumberValues) {
367383
throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId;
368384
}
369-
for (const auto& columnDesc : Columns) {
370-
columnDesc.ValidateNumberValues(rowId, firstOffset);
385+
if (rowId) {
386+
ClearColumns(parsedRows, rowId);
371387
}
372388
}
373-
374-
return ParsedValues;
375389
}
376390

377391
TString GetDescription() const {
@@ -385,26 +399,32 @@ class TJsonParser::TImpl {
385399

386400
~TImpl() {
387401
with_lock (Alloc) {
388-
ClearColumns(0);
389402
ParsedValues.clear();
390403
Columns.clear();
391404
TypeEnv.reset();
392405
}
393406
}
394407

395408
private:
396-
void ClearColumns(size_t newSize) {
397-
const auto clearValue = [&allocState = Alloc.Ref()](NYql::NUdf::TUnboxedValue& value){
398-
value.UnlockRef(1);
399-
value.Clear();
400-
};
409+
void ClearColumns(size_t parsedRows, size_t savedRows) {
410+
const ui64 firstOffset = Buffer.Offsets.front();
411+
for (const auto& column : Columns) {
412+
column.ValidateNumberValues(savedRows, firstOffset);
413+
}
401414

402-
for (size_t i = 0; i < Columns.size(); ++i) {
403-
Columns[i].NumberValues = 0;
415+
{
416+
auto unguard = Unguard(Alloc);
417+
ParseCallback(parsedRows - savedRows, savedRows, ParsedValues);
418+
}
404419

420+
for (size_t i = 0; i < Columns.size(); ++i) {
405421
auto& parsedColumn = ParsedValues[i];
406-
std::for_each(parsedColumn.begin(), parsedColumn.end(), clearValue);
407-
parsedColumn.resize(newSize);
422+
for (size_t rowId : Columns[i].ParsedRows) {
423+
auto& parsedRow = parsedColumn[rowId];
424+
parsedRow.UnlockRef(1);
425+
parsedRow.Clear();
426+
}
427+
Columns[i].ParsedRows.clear();
408428
}
409429
}
410430

@@ -413,18 +433,20 @@ class TJsonParser::TImpl {
413433
std::unique_ptr<NKikimr::NMiniKQL::TTypeEnvironment> TypeEnv;
414434

415435
const ui64 BatchSize;
436+
const ui64 MaxNumberRows;
416437
const TDuration BatchCreationTimeout;
438+
const TCallback ParseCallback;
417439
TVector<TColumnParser> Columns;
418440
absl::flat_hash_map<std::string_view, size_t> ColumnsIndex;
419441

420442
TJsonParserBuffer Buffer;
421443
simdjson::ondemand::parser Parser;
422444

423-
TVector<NKikimr::NMiniKQL::TUnboxedValueVector> ParsedValues;
445+
TVector<TVector<NYql::NUdf::TUnboxedValue>> ParsedValues;
424446
};
425447

426-
TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout)
427-
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, batchSize, batchCreationTimeout))
448+
TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize)
449+
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize))
428450
{}
429451

430452
TJsonParser::~TJsonParser() {
@@ -450,16 +472,16 @@ const TVector<ui64>& TJsonParser::GetOffsets() const {
450472
return Impl->GetOffsets();
451473
}
452474

453-
const TVector<NKikimr::NMiniKQL::TUnboxedValueVector>& TJsonParser::Parse() {
454-
return Impl->Parse();
475+
void TJsonParser::Parse() {
476+
Impl->Parse();
455477
}
456478

457479
TString TJsonParser::GetDescription() const {
458480
return Impl->GetDescription();
459481
}
460482

461-
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, ui64 batchSize, TDuration batchCreationTimeout) {
462-
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, batchSize, batchCreationTimeout));
483+
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) {
484+
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize));
463485
}
464486

465487
} // namespace NFq

0 commit comments

Comments
 (0)