Skip to content

Commit fef9bc2

Browse files
authored
Improve csv import, steps 1 & 2 (#11679)
1 parent 9c294d8 commit fef9bc2

File tree

7 files changed

+311
-105
lines changed

7 files changed

+311
-105
lines changed

ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ int TCommandImportFromCsv::Run(TConfig& config) {
302302
settings.NewlineDelimited(NewlineDelimited);
303303
settings.HeaderRow(HeaderRow);
304304
settings.NullValue(NullValue);
305+
settings.Verbose(config.IsVerbose());
305306

306307
if (Delimiter.size() != 1) {
307308
throw TMisuseException()

ydb/public/lib/ydb_cli/common/csv_parser.cpp

Lines changed: 70 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "csv_parser.h"
22

3+
#include <ydb/public/api/protos/ydb_value.pb.h>
34
#include <ydb/public/lib/ydb_cli/common/common.h>
45

56
#include <library/cpp/string_utils/csv/csv.h>
@@ -177,7 +178,7 @@ class TCsvToYdbConverter {
177178
}
178179
}
179180

180-
void BuildValue(TStringBuf token) {
181+
void BuildValue(const TStringBuf& token) {
181182
switch (Parser.GetKind()) {
182183
case TTypeParser::ETypeKind::Primitive: {
183184
BuildPrimitive(TString(token));
@@ -279,7 +280,7 @@ class TCsvToYdbConverter {
279280
throw TCsvParseException() << "Expected bool value: \"true\" or \"false\", received: \"" << token << "\".";
280281
}
281282

282-
void EnsureNull(TStringBuf token) const {
283+
void EnsureNull(const TStringBuf& token) const {
283284
if (!NullValue) {
284285
throw TCsvParseException() << "Expected null value instead of \"" << token << "\", but null value is not set.";
285286
}
@@ -288,7 +289,7 @@ class TCsvToYdbConverter {
288289
}
289290
}
290291

291-
TValue Convert(TStringBuf token) {
292+
TValue Convert(const TStringBuf& token) {
292293
BuildValue(token);
293294
return Builder.Build();
294295
}
@@ -317,10 +318,10 @@ TCsvParseException FormatError(const std::exception& inputError,
317318
}
318319

319320
TValue FieldToValue(TTypeParser& parser,
320-
TStringBuf token,
321+
const TStringBuf& token,
321322
const std::optional<TString>& nullValue,
322323
const TCsvParser::TParseMetadata& meta,
323-
TString columnName) {
324+
const TString& columnName) {
324325
try {
325326
TCsvToYdbConverter converter(parser, nullValue);
326327
return converter.Convert(token);
@@ -331,7 +332,7 @@ TValue FieldToValue(TTypeParser& parser,
331332

332333
TStringBuf Consume(NCsvFormat::CsvSplitter& splitter,
333334
const TCsvParser::TParseMetadata& meta,
334-
TString columnName) {
335+
const TString& columnName) {
335336
try {
336337
return splitter.Consume();
337338
} catch (std::exception& e) {
@@ -342,30 +343,30 @@ TStringBuf Consume(NCsvFormat::CsvSplitter& splitter,
342343
}
343344

344345
TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue,
345-
const std::map<TString, TType>* paramTypes,
346+
const std::map<TString, TType>* destinationTypes,
346347
const std::map<TString, TString>* paramSources)
347348
: HeaderRow(std::move(headerRow))
348349
, Delimeter(delimeter)
349350
, NullValue(nullValue)
350-
, ParamTypes(paramTypes)
351+
, DestinationTypes(destinationTypes)
351352
, ParamSources(paramSources)
352353
{
353354
NCsvFormat::CsvSplitter splitter(HeaderRow, Delimeter);
354355
Header = static_cast<TVector<TString>>(splitter);
355356
}
356357

357358
TCsvParser::TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue,
358-
const std::map<TString, TType>* paramTypes,
359+
const std::map<TString, TType>* destinationTypes,
359360
const std::map<TString, TString>* paramSources)
360361
: Header(std::move(header))
361362
, Delimeter(delimeter)
362363
, NullValue(nullValue)
363-
, ParamTypes(paramTypes)
364+
, DestinationTypes(destinationTypes)
364365
, ParamSources(paramSources)
365366
{
366367
}
367368

368-
void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParseMetadata& meta) const {
369+
void TCsvParser::BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const {
369370
NCsvFormat::CsvSplitter splitter(data, Delimeter);
370371
auto headerIt = Header.begin();
371372
do {
@@ -374,8 +375,8 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParse
374375
}
375376
TStringBuf token = Consume(splitter, meta, *headerIt);
376377
TString fullname = "$" + *headerIt;
377-
auto paramIt = ParamTypes->find(fullname);
378-
if (paramIt == ParamTypes->end()) {
378+
auto paramIt = DestinationTypes->find(fullname);
379+
if (paramIt == DestinationTypes->end()) {
379380
++headerIt;
380381
continue;
381382
}
@@ -395,7 +396,7 @@ void TCsvParser::GetParams(TString&& data, TParamsBuilder& builder, const TParse
395396
}
396397
}
397398

398-
void TCsvParser::GetValue(TString&& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const {
399+
void TCsvParser::BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const {
399400
NCsvFormat::CsvSplitter splitter(data, Delimeter);
400401
auto headerIt = Header.cbegin();
401402
std::map<TString, TStringBuf> fields;
@@ -431,18 +432,68 @@ void TCsvParser::GetValue(TString&& data, TValueBuilder& builder, const TType& t
431432
builder.EndStruct();
432433
}
433434

434-
TType TCsvParser::GetColumnsType() const {
435+
TValue TCsvParser::BuildList(std::vector<TString>& lines, const TString& filename, std::optional<ui64> row) const {
436+
std::vector<std::unique_ptr<TTypeParser>> columnTypeParsers;
437+
columnTypeParsers.reserve(ResultColumnCount);
438+
for (const TType* type : ResultLineTypesSorted) {
439+
columnTypeParsers.push_back(std::make_unique<TTypeParser>(*type));
440+
}
441+
Ydb::Value listValue;
442+
auto* listItems = listValue.mutable_items();
443+
listItems->Reserve(lines.size());
444+
for (auto& line : lines) {
445+
std::vector<TStringBuf> fields;
446+
NCsvFormat::CsvSplitter splitter(line, Delimeter);
447+
TParseMetadata meta {row, filename};
448+
auto headerIt = Header.cbegin();
449+
auto skipIt = SkipBitMap.begin();
450+
do {
451+
if (headerIt == Header.cend()) { // SkipBitMap has same size as Header
452+
throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta);
453+
}
454+
TStringBuf nextField = Consume(splitter, meta, *headerIt);
455+
if (!*skipIt) {
456+
fields.emplace_back(nextField);
457+
}
458+
++headerIt;
459+
++skipIt;
460+
} while (splitter.Step());
461+
auto* structItems = listItems->Add()->mutable_items();
462+
structItems->Reserve(ResultColumnCount);
463+
auto typeParserIt = columnTypeParsers.begin();
464+
auto fieldIt = fields.begin();
465+
auto nameIt = ResultLineNamesSorted.begin();
466+
// fields size equals columnTypeParsers size, no need for second end check
467+
for (; typeParserIt != columnTypeParsers.end(); ++typeParserIt, ++fieldIt, ++nameIt) {
468+
*structItems->Add() = FieldToValue(*typeParserIt->get(), *fieldIt, NullValue, meta, **nameIt).GetProto();
469+
}
470+
if (row.has_value()) {
471+
++row.value();
472+
}
473+
}
474+
return TValue(ResultListType.value(), std::move(listValue));
475+
}
476+
477+
void TCsvParser::BuildLineType() {
478+
SkipBitMap.reserve(Header.size());
479+
ResultColumnCount = 0;
435480
TTypeBuilder builder;
436481
builder.BeginStruct();
437482
for (const auto& colName : Header) {
438-
if (ParamTypes->find(colName) != ParamTypes->end()) {
439-
builder.AddMember(colName, ParamTypes->at(colName));
483+
auto findIt = DestinationTypes->find(colName);
484+
if (findIt != DestinationTypes->end()) {
485+
builder.AddMember(colName, findIt->second);
486+
ResultLineTypesSorted.emplace_back(&findIt->second);
487+
ResultLineNamesSorted.emplace_back(&colName);
488+
SkipBitMap.push_back(false);
489+
++ResultColumnCount;
440490
} else {
441-
builder.AddMember("__ydb_skip_column_name", TTypeBuilder().Build());
491+
SkipBitMap.push_back(true);
442492
}
443493
}
444494
builder.EndStruct();
445-
return builder.Build();
495+
ResultLineType = builder.Build();
496+
ResultListType = TTypeBuilder().List(ResultLineType.value()).Build();
446497
}
447498

448499
}

ydb/public/lib/ydb_cli/common/csv_parser.h

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,39 @@ class TCsvParser {
2525
~TCsvParser() = default;
2626

2727
TCsvParser(TString&& headerRow, const char delimeter, const std::optional<TString>& nullValue,
28-
const std::map<TString, TType>* paramTypes = nullptr,
28+
const std::map<TString, TType>* destinationTypes = nullptr,
2929
const std::map<TString, TString>* paramSources = nullptr);
3030
TCsvParser(TVector<TString>&& header, const char delimeter, const std::optional<TString>& nullValue,
31-
const std::map<TString, TType>* paramTypes = nullptr,
31+
const std::map<TString, TType>* destinationTypes = nullptr,
3232
const std::map<TString, TString>* paramSources = nullptr);
3333

34-
void GetParams(TString&& data, TParamsBuilder& builder, const TParseMetadata& meta) const;
35-
void GetValue(TString&& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const;
36-
TType GetColumnsType() const;
34+
void BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const;
35+
void BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const;
36+
TValue BuildList(std::vector<TString>& lines, const TString& filename,
37+
std::optional<ui64> row = std::nullopt) const;
38+
void BuildLineType();
3739

3840
private:
3941
TVector<TString> Header;
4042
TString HeaderRow;
4143
char Delimeter;
4244
std::optional<TString> NullValue;
43-
const std::map<TString, TType>* ParamTypes;
45+
// Types of destination table or query parameters
46+
// Column name -> column type
47+
const std::map<TString, TType>* DestinationTypes;
4448
const std::map<TString, TString>* ParamSources;
49+
// Type of a single row in resulting TValue.
50+
// Column order according to the header, though can have less elements than the Header
51+
std::optional<TType> ResultLineType = std::nullopt;
52+
std::optional<TType> ResultListType = std::nullopt;
53+
// If a value is true (header column is absent in dstTypes), skip corresponding value in input csv row
54+
std::vector<bool> SkipBitMap;
55+
// Count of columns in each struct in resulting TValue
56+
size_t ResultColumnCount;
57+
// Types of columns in a single row in resulting TValue.
58+
// Column order according to the header, though can have less elements than the Header
59+
std::vector<const TType*> ResultLineTypesSorted;
60+
std::vector<const TString*> ResultLineNamesSorted;
4561
};
4662

4763
}

0 commit comments

Comments
 (0)