Skip to content

Commit d6db40f

Browse files
pnv1Vladislav0Art
andauthored
Add Arena option to BulkUpsert, add hidden --send-format option (#20061)
Co-authored-by: Vladislav Artiukhov <vladislav0art.work@gmail.com>
1 parent 6072bfc commit d6db40f

File tree

14 files changed

+477
-93
lines changed

14 files changed

+477
-93
lines changed

ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#include <ydb/public/lib/ydb_cli/common/print_operation.h>
77
#include <ydb/public/lib/ydb_cli/common/interactive.h>
88
#include <ydb/public/lib/ydb_cli/dump/files/files.h>
9-
#include <ydb/public/lib/ydb_cli/import/import.h>
109
#include <ydb/library/backup/util.h>
1110

1211
#include <util/string/builder.h>
@@ -302,6 +301,17 @@ void TCommandImportFromCsv::Config(TConfig& config) {
302301
config.Opts->AddLongOption("newline-delimited",
303302
"No newline characters inside records, enables some import optimizations (see docs)")
304303
.StoreTrue(&NewlineDelimited);
304+
TStringStream description;
305+
description << "Format that data will be serialized to before sending to YDB. Available options: ";
306+
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
307+
description << "\n " << colors.BoldColor() << "tvalue" << colors.OldColor()
308+
<< "\n " << "A default YDB protobuf format";
309+
description << "\n " << colors.BoldColor() << "arrow" << colors.OldColor()
310+
<< "\n " << "Apache Arrow format";
311+
description << "\nDefault: " << colors.CyanColor() << "\"" << "tvalue" << "\"" << colors.OldColor() << ".";
312+
config.Opts->AddLongOption("send-format", description.Str())
313+
.RequiredArgument("STRING").StoreResult(&SendFormat)
314+
.Hidden();
305315
if (InputFormat == EDataFormat::Csv) {
306316
config.Opts->AddLongOption("delimiter", "Field delimiter in rows")
307317
.RequiredArgument("STRING").StoreResult(&Delimiter).DefaultValue(Delimiter);
@@ -325,6 +335,7 @@ int TCommandImportFromCsv::Run(TConfig& config) {
325335
settings.HeaderRow(HeaderRow);
326336
settings.NullValue(NullValue);
327337
settings.Verbose(config.IsVerbose());
338+
settings.SendFormat(SendFormat);
328339

329340
if (Delimiter.size() != 1) {
330341
throw TMisuseException()

ydb/public/lib/ydb_cli/commands/ydb_service_import.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/public/lib/ydb_cli/common/aws.h>
88
#include <ydb/public/lib/ydb_cli/common/format.h>
99
#include <ydb/public/lib/ydb_cli/common/parseable_struct.h>
10+
#include <ydb/public/lib/ydb_cli/import/import.h>
1011

1112
namespace NYdb::NConsoleClient {
1213

@@ -86,6 +87,7 @@ class TCommandImportFromCsv : public TCommandImportFileBase {
8687
ui32 SkipRows = 0;
8788
bool Header = false;
8889
bool NewlineDelimited = true;
90+
NConsoleClient::ESendFormat SendFormat = NConsoleClient::ESendFormat::Default;
8991
};
9092

9193
class TCommandImportFromTsv : public TCommandImportFromCsv {

ydb/public/lib/ydb_cli/common/csv_parser.cpp

Lines changed: 84 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ class TCsvToYdbConverter {
1515
public:
1616
explicit TCsvToYdbConverter(TTypeParser& parser, const std::optional<TString>& nullValue)
1717
: Parser(parser)
18-
, NullValue(nullValue)
19-
{
20-
}
18+
, NullValue(nullValue) {}
2119

2220
template <class T, std::enable_if_t<std::is_integral_v<T> && std::is_signed_v<T>, std::nullptr_t> = nullptr>
2321
static i64 StringToArithmetic(const TString& token, size_t& cnt) {
@@ -165,7 +163,7 @@ class TCsvToYdbConverter {
165163
}
166164
case EPrimitiveType::Interval64:
167165
Builder.Interval64(GetArithmetic<i64>(token));
168-
break;
166+
break;
169167
case EPrimitiveType::TzDate:
170168
Builder.TzDate(token);
171169
break;
@@ -441,7 +439,7 @@ TStringBuf Consume(NCsvFormat::CsvSplitter& splitter,
441439

442440
TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue,
443441
const std::map<std::string, TType>* destinationTypes,
444-
const std::map<TString, TString>* paramSources)
442+
const std::map<TString, TString>* paramSources)
445443
: HeaderRow(std::move(headerRow))
446444
, Delimeter(delimeter)
447445
, NullValue(nullValue)
@@ -454,7 +452,7 @@ TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::opt
454452

455453
TCsvParser::TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue,
456454
const std::map<std::string, TType>* destinationTypes,
457-
const std::map<TString, TString>* paramSources)
455+
const std::map<TString, TString>* paramSources)
458456
: Header(std::move(header))
459457
, Delimeter(delimeter)
460458
, NullValue(nullValue)
@@ -529,41 +527,91 @@ void TCsvParser::BuildValue(TString& data, TValueBuilder& builder, const TType&
529527
builder.EndStruct();
530528
}
531529

532-
TValue TCsvParser::BuildList(std::vector<TString>& lines, const TString& filename, std::optional<ui64> row) const {
530+
TValue TCsvParser::BuildList(const std::vector<TString>& lines, const TString& filename, std::optional<ui64> row) const {
533531
std::vector<std::unique_ptr<TTypeParser>> columnTypeParsers;
534532
columnTypeParsers.reserve(ResultColumnCount);
535533
for (const TType* type : ResultLineTypesSorted) {
536534
columnTypeParsers.push_back(std::make_unique<TTypeParser>(*type));
537535
}
538-
539-
Ydb::Value listValue;
540-
auto* listItems = listValue.mutable_items();
536+
537+
// Original path with local value object
538+
Ydb::Value listProtoValue;
539+
auto* listItems = listProtoValue.mutable_items();
541540
listItems->Reserve(lines.size());
542-
for (auto& line : lines) {
543-
NCsvFormat::CsvSplitter splitter(line, Delimeter);
544-
TParseMetadata meta {row, filename};
545-
auto* structItems = listItems->Add()->mutable_items();
546-
structItems->Reserve(ResultColumnCount);
547-
auto headerIt = Header.cbegin();
548-
auto skipIt = SkipBitMap.begin();
549-
auto typeParserIt = columnTypeParsers.begin();
550-
do {
551-
if (headerIt == Header.cend()) { // SkipBitMap has same size as Header
552-
throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta);
553-
}
554-
TStringBuf nextField = Consume(splitter, meta, *headerIt);
555-
if (!*skipIt) {
556-
*structItems->Add() = FieldToValue(*typeParserIt->get(), nextField, NullValue, meta, *headerIt).GetProto();
557-
++typeParserIt;
558-
}
559-
++headerIt;
560-
++skipIt;
561-
} while (splitter.Step());
541+
542+
for (const auto& line : lines) {
543+
ProcessCsvLine(line, listItems, columnTypeParsers, row, filename);
562544
if (row.has_value()) {
563545
++row.value();
564546
}
565547
}
566-
return TValue(ResultListType.value(), std::move(listValue));
548+
549+
// Return a TValue that takes ownership via move
550+
return TValue(ResultListType.value(), std::move(listProtoValue));
551+
}
552+
553+
TValue TCsvParser::BuildListOnArena(
554+
const std::vector<TString>& lines,
555+
const TString& filename,
556+
google::protobuf::Arena* arena,
557+
std::optional<ui64> row
558+
) const {
559+
Y_ASSERT(arena != nullptr);
560+
561+
std::vector<std::unique_ptr<TTypeParser>> columnTypeParsers;
562+
columnTypeParsers.reserve(ResultColumnCount);
563+
for (const TType* type : ResultLineTypesSorted) {
564+
columnTypeParsers.push_back(std::make_unique<TTypeParser>(*type));
565+
}
566+
567+
// allocate Ydb::Value on arena
568+
Ydb::Value* listProtoValue = google::protobuf::Arena::CreateMessage<Ydb::Value>(arena);
569+
auto* listItems = listProtoValue->mutable_items();
570+
listItems->Reserve(lines.size());
571+
572+
for (const auto& line : lines) {
573+
ProcessCsvLine(line, listItems, columnTypeParsers, row, filename);
574+
if (row.has_value()) {
575+
++row.value();
576+
}
577+
}
578+
579+
// Return a TValue that references the arena-allocated message
580+
return TValue(ResultListType.value(), listProtoValue);
581+
}
582+
583+
// Helper method to process a single CSV line
584+
void TCsvParser::ProcessCsvLine(
585+
const TString& line,
586+
google::protobuf::RepeatedPtrField<Ydb::Value>* listItems,
587+
const std::vector<std::unique_ptr<TTypeParser>>& columnTypeParsers,
588+
std::optional<ui64> currentRow,
589+
const TString& filename
590+
) const {
591+
NCsvFormat::CsvSplitter splitter(line, Delimeter);
592+
auto* structItems = listItems->Add()->mutable_items();
593+
structItems->Reserve(ResultColumnCount);
594+
595+
const TParseMetadata meta {currentRow, filename};
596+
597+
auto headerIt = Header.cbegin();
598+
auto skipIt = SkipBitMap.begin();
599+
auto typeParserIt = columnTypeParsers.begin();
600+
601+
do {
602+
if (headerIt == Header.cend()) { // SkipBitMap has same size as Header
603+
throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta);
604+
}
605+
TStringBuf nextField = Consume(splitter, meta, *headerIt);
606+
if (!*skipIt) {
607+
TValue builtValue = FieldToValue(*typeParserIt->get(), nextField, NullValue, meta, *headerIt);
608+
*structItems->Add() = std::move(builtValue.GetProto());
609+
610+
++typeParserIt;
611+
}
612+
++headerIt;
613+
++skipIt;
614+
} while (splitter.Step());
567615
}
568616

569617
void TCsvParser::BuildLineType() {
@@ -607,5 +655,10 @@ const TVector<TString>& TCsvParser::GetHeader() {
607655
return Header;
608656
}
609657

658+
const TString& TCsvParser::GetHeaderRow() const {
659+
return HeaderRow;
660+
}
661+
610662
}
611663
}
664+

ydb/public/lib/ydb_cli/common/csv_parser.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/params/params.h>
4+
#include <google/protobuf/arena.h>
45

56
#include <library/cpp/string_utils/csv/csv.h>
67

@@ -35,10 +36,20 @@ class TCsvParser {
3536

3637
void BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const;
3738
void BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const;
38-
TValue BuildList(std::vector<TString>& lines, const TString& filename,
39+
40+
TValue BuildList(const std::vector<TString>& lines, const TString& filename,
3941
std::optional<ui64> row = std::nullopt) const;
42+
43+
TValue BuildListOnArena(
44+
const std::vector<TString>& lines,
45+
const TString& filename,
46+
google::protobuf::Arena* arena,
47+
std::optional<ui64> row = std::nullopt
48+
) const;
49+
4050
void BuildLineType();
4151
const TVector<TString>& GetHeader();
52+
const TString& GetHeaderRow() const;
4253

4354
private:
4455
TVector<TString> Header;
@@ -60,6 +71,15 @@ class TCsvParser {
6071
// Types of columns in a single row in resulting TValue.
6172
// Column order according to the header, though can have less elements than the Header
6273
std::vector<const TType*> ResultLineTypesSorted;
74+
75+
// Helper method to process a single line of CSV data
76+
void ProcessCsvLine(
77+
const TString& line,
78+
google::protobuf::RepeatedPtrField<Ydb::Value>* listItems,
79+
const std::vector<std::unique_ptr<TTypeParser>>& columnTypeParsers,
80+
std::optional<ui64> currentRow,
81+
const TString& filename
82+
) const;
6383
};
6484

6585
}

ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) {
317317
{"col2", TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Int64).EndOptional().Build()},
318318
{"col3", TTypeBuilder().Primitive(EPrimitiveType::Bool).Build()},
319319
};
320-
320+
321321
TString csvHeader = "col4,col3,col5,col1,col6";
322322
std::vector<TString> data = {
323323
"col4 unused value,true,col5 unused value,col1 value,col6 unused value"

0 commit comments

Comments
 (0)