Skip to content

[YDB CLI][CSV] Optimization: Use Apache Arrow and Protobuf Arena API as fallback in CSV upload module #19667

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e48b417
feat: allocate Ydb::Value on arena if provided in BuildList
Vladislav0Art Apr 18, 2025
54b94eb
feat: move protos into request in TTableClient::TImpl::BulkUpsert
Vladislav0Art Apr 18, 2025
5e748c4
feat: extract proto model out of FieldToValue in TCsvParser
Vladislav0Art Apr 18, 2025
d8df579
feat: add && in return types of ExtractProto methods
Vladislav0Art May 1, 2025
277c354
feat: move rows' proto into request proto in a separate method BulkUp…
Vladislav0Art May 1, 2025
ffa3fed
feat: move line into buffer in UpsertCsv
Vladislav0Art May 1, 2025
ac9928e
feat: introduce per-request arena (allocate Ydb::Value on arena, 1-to…
Vladislav0Art Apr 18, 2025
df81810
Revert "feat: introduce per-request arena (allocate Ydb::Value on are…
Vladislav0Art May 1, 2025
1612ff6
feat: implement single-arena-per-request strategy
Vladislav0Art May 1, 2025
127dd65
fix: remmove nullptr param from BuildList call in tests
Vladislav0Art May 1, 2025
07137ad
refactor: remove explicit default parameter from BuildList call
Vladislav0Art May 1, 2025
65dc7e1
feat: add several TODOs
Vladislav0Art May 1, 2025
c16423f
feat: name file-processing threads in their thread pool
Vladislav0Art May 2, 2025
cf459a6
feat: add TODO about arena-based TValueBuilderImpl
Vladislav0Art May 2, 2025
9b44f1f
refactor: replace pair of TType and Ydb::Value* with type TArenaAlloc…
Vladislav0Art May 2, 2025
666932d
feat: build Ydb::Value on the arena in TCsvParser::BuildListOnArena
Vladislav0Art May 8, 2025
2ca666f
feat: select `TValueHolder` in compile time when call `TCsvParser::Bu…
Vladislav0Art May 8, 2025
12bd12b
feat: output upload metrics for extraction in evaluation pipeline
Vladislav0Art May 23, 2025
65999d7
refactor: remove modifications in SDK related to TValueBuilder
Vladislav0Art Jun 8, 2025
205bfb4
refactor: remove commented code
Vladislav0Art Jun 8, 2025
d3961f2
feat(TLinesSplitter::ConsumeLine): replace bool negation with quote c…
Vladislav0Art Mar 28, 2025
67ca318
feat(TCsvParser): add method TCsvParser::GetHeaderRow()
Vladislav0Art Jun 8, 2025
12df05f
feat(import): implement upload via Apcahe Arrow with fallback on YDB …
Vladislav0Art Jun 8, 2025
7308e92
refactor: remove newlines and commented code
Vladislav0Art Jun 8, 2025
6effb23
Revert "feat(TLinesSplitter::ConsumeLine): replace bool negation with…
Vladislav0Art Jun 13, 2025
bd2278a
refactor: remove redundant includes from value.h
Vladislav0Art Jun 13, 2025
65d8148
refactor: remove TArenaAllocatedValue class (use TValue instead)
Vladislav0Art Jun 13, 2025
4836e43
feat: rename unretryable methods to BulkUpsertUnsafe and add docs
Vladislav0Art Jun 13, 2025
8f316a9
refactor: move logic of RunDeferred and RUnDeferredOnArena into impl …
Vladislav0Art Jun 13, 2025
e1bdb1f
refactor: move logic of Run and RunOnArena into impl method (untested!)
Vladislav0Art Jun 13, 2025
776c2f8
fix: return arena-allocated proto model when present in TValue::TImpl
Vladislav0Art Jun 14, 2025
2596df3
feat: set ZSTD compression in Apache Arrow codec
Vladislav0Art Jun 14, 2025
401e641
refactor: remove addressed TODO
Vladislav0Art Jun 15, 2025
67c310f
refactor: remove printings from import.cpp
Vladislav0Art Jun 15, 2025
a0dd9de
refactor: remove commented code and redundant diffs
Vladislav0Art Jun 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 84 additions & 31 deletions ydb/public/lib/ydb_cli/common/csv_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ class TCsvToYdbConverter {
public:
explicit TCsvToYdbConverter(TTypeParser& parser, const std::optional<TString>& nullValue)
: Parser(parser)
, NullValue(nullValue)
{
}
, NullValue(nullValue) {}

template <class T, std::enable_if_t<std::is_integral_v<T> && std::is_signed_v<T>, std::nullptr_t> = nullptr>
static i64 StringToArithmetic(const TString& token, size_t& cnt) {
Expand Down Expand Up @@ -165,7 +163,7 @@ class TCsvToYdbConverter {
}
case EPrimitiveType::Interval64:
Builder.Interval64(GetArithmetic<i64>(token));
break;
break;
case EPrimitiveType::TzDate:
Builder.TzDate(token);
break;
Expand Down Expand Up @@ -470,7 +468,7 @@ TStringBuf Consume(NCsvFormat::CsvSplitter& splitter,

TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue,
const std::map<std::string, TType>* destinationTypes,
const std::map<TString, TString>* paramSources)
const std::map<TString, TString>* paramSources)
: HeaderRow(std::move(headerRow))
, Delimeter(delimeter)
, NullValue(nullValue)
Expand All @@ -483,7 +481,7 @@ TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::opt

TCsvParser::TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue,
const std::map<std::string, TType>* destinationTypes,
const std::map<TString, TString>* paramSources)
const std::map<TString, TString>* paramSources)
: Header(std::move(header))
, Delimeter(delimeter)
, NullValue(nullValue)
Expand Down Expand Up @@ -558,41 +556,91 @@ void TCsvParser::BuildValue(TString& data, TValueBuilder& builder, const TType&
builder.EndStruct();
}

TValue TCsvParser::BuildList(std::vector<TString>& lines, const TString& filename, std::optional<ui64> row) const {
TValue TCsvParser::BuildList(const std::vector<TString>& lines, const TString& filename, std::optional<ui64> row) const {
std::vector<std::unique_ptr<TTypeParser>> columnTypeParsers;
columnTypeParsers.reserve(ResultColumnCount);
for (const TType* type : ResultLineTypesSorted) {
columnTypeParsers.push_back(std::make_unique<TTypeParser>(*type));
}

Ydb::Value listValue;
auto* listItems = listValue.mutable_items();

// Original path with local value object
Ydb::Value valueProto;
auto* listItems = valueProto.mutable_items();
listItems->Reserve(lines.size());
for (auto& line : lines) {
NCsvFormat::CsvSplitter splitter(line, Delimeter);
TParseMetadata meta {row, filename};
auto* structItems = listItems->Add()->mutable_items();
structItems->Reserve(ResultColumnCount);
auto headerIt = Header.cbegin();
auto skipIt = SkipBitMap.begin();
auto typeParserIt = columnTypeParsers.begin();
do {
if (headerIt == Header.cend()) { // SkipBitMap has same size as Header
throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta);
}
TStringBuf nextField = Consume(splitter, meta, *headerIt);
if (!*skipIt) {
*structItems->Add() = FieldToValue(*typeParserIt->get(), nextField, NullValue, meta, *headerIt).GetProto();
++typeParserIt;
}
++headerIt;
++skipIt;
} while (splitter.Step());

for (const auto& line : lines) {
ProcessCsvLine(line, listItems, columnTypeParsers, row, filename);
if (row.has_value()) {
++row.value();
}
}

// Return a TValue that takes ownership via move
return TValue(ResultListType.value(), std::move(valueProto));
}

TValue TCsvParser::BuildListOnArena(
const std::vector<TString>& lines,
const TString& filename,
google::protobuf::Arena* arena,
std::optional<ui64> row
) const {
Y_ASSERT(arena != nullptr);

std::vector<std::unique_ptr<TTypeParser>> columnTypeParsers;
columnTypeParsers.reserve(ResultColumnCount);
for (const TType* type : ResultLineTypesSorted) {
columnTypeParsers.push_back(std::make_unique<TTypeParser>(*type));
}

// allocate Ydb::Value on arena
Ydb::Value* value = google::protobuf::Arena::CreateMessage<Ydb::Value>(arena);
auto* listItems = value->mutable_items();
listItems->Reserve(lines.size());

for (const auto& line : lines) {
ProcessCsvLine(line, listItems, columnTypeParsers, row, filename);
if (row.has_value()) {
++row.value();
}
}
return TValue(ResultListType.value(), std::move(listValue));

// Return a TValue that references the arena-allocated message
return TValue(ResultListType.value(), value);
}

// Helper method to process a single CSV line
void TCsvParser::ProcessCsvLine(
const TString& line,
google::protobuf::RepeatedPtrField<Ydb::Value>* listItems,
const std::vector<std::unique_ptr<TTypeParser>>& columnTypeParsers,
std::optional<ui64> currentRow,
const TString& filename
) const {
NCsvFormat::CsvSplitter splitter(line, Delimeter);
auto* structItems = listItems->Add()->mutable_items();
structItems->Reserve(ResultColumnCount);

const TParseMetadata meta {currentRow, filename};

auto headerIt = Header.cbegin();
auto skipIt = SkipBitMap.begin();
auto typeParserIt = columnTypeParsers.begin();

do {
if (headerIt == Header.cend()) { // SkipBitMap has same size as Header
throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta);
}
TStringBuf nextField = Consume(splitter, meta, *headerIt);
if (!*skipIt) {
TValue builtValue = FieldToValue(*typeParserIt->get(), nextField, NullValue, meta, *headerIt);
*structItems->Add() = std::move(builtValue).ExtractProto();

++typeParserIt;
}
++headerIt;
++skipIt;
} while (splitter.Step());
}

void TCsvParser::BuildLineType() {
Expand Down Expand Up @@ -761,5 +809,10 @@ const TVector<TString>& TCsvParser::GetHeader() {
return Header;
}

const TString& TCsvParser::GetHeaderRow() const {
return HeaderRow;
}

}
}

22 changes: 21 additions & 1 deletion ydb/public/lib/ydb_cli/common/csv_parser.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/params/params.h>
#include <google/protobuf/arena.h>

#include <library/cpp/string_utils/csv/csv.h>

Expand Down Expand Up @@ -66,10 +67,20 @@ class TCsvParser {

void BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const;
void BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const;
TValue BuildList(std::vector<TString>& lines, const TString& filename,

TValue BuildList(const std::vector<TString>& lines, const TString& filename,
std::optional<ui64> row = std::nullopt) const;

TValue BuildListOnArena(
const std::vector<TString>& lines,
const TString& filename,
google::protobuf::Arena* arena,
std::optional<ui64> row = std::nullopt
) const;

void BuildLineType();
const TVector<TString>& GetHeader();
const TString& GetHeaderRow() const;
void ParseLineTypes(TString& line, TPossibleTypes& possibleTypes, const TParseMetadata& meta);

private:
Expand All @@ -92,6 +103,15 @@ class TCsvParser {
// Types of columns in a single row in resulting TValue.
// Column order according to the header, though can have less elements than the Header
std::vector<const TType*> ResultLineTypesSorted;

// Helper method to process a single line of CSV data
void ProcessCsvLine(
const TString& line,
google::protobuf::RepeatedPtrField<Ydb::Value>* listItems,
const std::vector<std::unique_ptr<TTypeParser>>& columnTypeParsers,
std::optional<ui64> currentRow,
const TString& filename
) const;
};

}
Expand Down
2 changes: 1 addition & 1 deletion ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) {
{"col2", TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Int64).EndOptional().Build()},
{"col3", TTypeBuilder().Primitive(EPrimitiveType::Bool).Build()},
};

TString csvHeader = "col4,col3,col5,col1,col6";
std::vector<TString> data = {
"col4 unused value,true,col5 unused value,col1 value,col6 unused value"
Expand Down
Loading