Skip to content

Commit d05bdbd

Browse files
committed
Refactored StaticBufferSize -> BufferCellCount
1 parent dacba22 commit d05bdbd

File tree

5 files changed

+13
-13
lines changed

5 files changed

+13
-13
lines changed

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ message TRowDispatcherCoordinatorConfig {
1717
message TJsonParserConfig {
1818
uint64 BatchSizeBytes = 1; // default 1 MiB
1919
uint64 BatchCreationTimeoutMs = 2;
20-
uint64 StaticBufferSize = 3; // (number rows) * (number columns) limit, default 10^6
20+
uint64 BufferCellCount = 3; // (number rows) * (number columns) limit, default 10^6
2121
}
2222

2323
message TRowDispatcherConfig {

ydb/core/fq/libs/row_dispatcher/json_parser.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace {
2020
TString LogPrefix = "JsonParser: ";
2121

2222
constexpr ui64 DEFAULT_BATCH_SIZE = 1_MB;
23-
constexpr ui64 DEFAULT_STATIC_BUFFER_SIZE = 1000000;
23+
constexpr ui64 DEFAULT_BUFFER_CELL_COUNT = 1000000;
2424

2525
struct TJsonParserBuffer {
2626
size_t NumberValues = 0;
@@ -269,11 +269,11 @@ namespace NFq {
269269

270270
class TJsonParser::TImpl {
271271
public:
272-
TImpl(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize)
272+
TImpl(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount)
273273
: Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false)
274274
, TypeEnv(std::make_unique<NKikimr::NMiniKQL::TTypeEnvironment>(Alloc))
275275
, BatchSize(batchSize ? batchSize : DEFAULT_BATCH_SIZE)
276-
, MaxNumberRows(((staticBufferSize ? staticBufferSize : DEFAULT_STATIC_BUFFER_SIZE) - 1) / columns.size() + 1)
276+
, MaxNumberRows(((bufferCellCount ? bufferCellCount : DEFAULT_BUFFER_CELL_COUNT) - 1) / columns.size() + 1)
277277
, BatchCreationTimeout(batchCreationTimeout)
278278
, ParseCallback(parseCallback)
279279
, ParsedValues(columns.size())
@@ -444,8 +444,8 @@ class TJsonParser::TImpl {
444444
TVector<TVector<NYql::NUdf::TUnboxedValue>> ParsedValues;
445445
};
446446

447-
TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize)
448-
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize))
447+
TJsonParser::TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount)
448+
: Impl(std::make_unique<TJsonParser::TImpl>(columns, types, parseCallback, batchSize, batchCreationTimeout, bufferCellCount))
449449
{}
450450

451451
TJsonParser::~TJsonParser() {
@@ -479,8 +479,8 @@ TString TJsonParser::GetDescription() const {
479479
return Impl->GetDescription();
480480
}
481481

482-
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize) {
483-
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, parseCallback, batchSize, batchCreationTimeout, staticBufferSize));
482+
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount) {
483+
return std::unique_ptr<TJsonParser>(new TJsonParser(columns, types, parseCallback, batchSize, batchCreationTimeout, bufferCellCount));
484484
}
485485

486486
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/json_parser.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class TJsonParser {
1111
using TCallback = std::function<void(ui64 rowsOffset, ui64 numberRows, const TVector<TVector<NYql::NUdf::TUnboxedValue>>& parsedValues)>;
1212

1313
public:
14-
TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize);
14+
TJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount);
1515
~TJsonParser();
1616

1717
bool IsReady() const;
@@ -29,6 +29,6 @@ class TJsonParser {
2929
const std::unique_ptr<TImpl> Impl;
3030
};
3131

32-
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 staticBufferSize);
32+
std::unique_ptr<TJsonParser> NewJsonParser(const TVector<TString>& columns, const TVector<TString>& types, TJsonParser::TCallback parseCallback, ui64 batchSize, TDuration batchCreationTimeout, ui64 bufferCellCount);
3333

3434
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ void TTopicSession::UpdateParser() {
870870
const auto& parserConfig = Config.GetJsonParser();
871871
Parser = NewJsonParser(names, types, [this](ui64 rowsOffset, ui64 numberRows, const TVector<TVector<NYql::NUdf::TUnboxedValue>>& parsedValues) {
872872
DoFiltering(rowsOffset, numberRows, parsedValues);
873-
}, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()), parserConfig.GetStaticBufferSize());
873+
}, parserConfig.GetBatchSizeBytes(), TDuration::MilliSeconds(parserConfig.GetBatchCreationTimeoutMs()), parserConfig.GetBufferCellCount());
874874
} catch (const NYql::NPureCalc::TCompileError& e) {
875875
FatalError(e.GetIssues(), nullptr, true);
876876
}

ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class TFixture : public NUnitTest::TBaseFixture {
4646
}
4747
}
4848

49-
void MakeParser(TVector<TString> columns, TVector<TString> types, TJsonParser::TCallback callback, ui64 batchSize = 1_MB, ui64 staticBufferSize = 1000) {
50-
Parser = NFq::NewJsonParser(columns, types, callback, batchSize, TDuration::Hours(1), staticBufferSize);
49+
void MakeParser(TVector<TString> columns, TVector<TString> types, TJsonParser::TCallback callback, ui64 batchSize = 1_MB, ui64 bufferCellCount = 1000) {
50+
Parser = NFq::NewJsonParser(columns, types, callback, batchSize, TDuration::Hours(1), bufferCellCount);
5151
}
5252

5353
void MakeParser(TVector<TString> columns, TJsonParser::TCallback callback) {

0 commit comments

Comments
 (0)