diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.cpp b/ydb/public/lib/ydb_cli/common/csv_parser.cpp index 9b99bbfa6d1f..8036d5b5358b 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser.cpp @@ -15,9 +15,7 @@ class TCsvToYdbConverter { public: explicit TCsvToYdbConverter(TTypeParser& parser, const std::optional& nullValue) : Parser(parser) - , NullValue(nullValue) - { - } + , NullValue(nullValue) {} template && std::is_signed_v, std::nullptr_t> = nullptr> static i64 StringToArithmetic(const TString& token, size_t& cnt) { @@ -165,7 +163,7 @@ class TCsvToYdbConverter { } case EPrimitiveType::Interval64: Builder.Interval64(GetArithmetic(token)); - break; + break; case EPrimitiveType::TzDate: Builder.TzDate(token); break; @@ -470,7 +468,7 @@ TStringBuf Consume(NCsvFormat::CsvSplitter& splitter, TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::optional& nullValue, const std::map* destinationTypes, - const std::map* paramSources) + const std::map* paramSources) : HeaderRow(std::move(headerRow)) , Delimeter(delimeter) , NullValue(nullValue) @@ -483,7 +481,7 @@ TCsvParser::TCsvParser(TString&& headerRow, const char delimeter, const std::opt TCsvParser::TCsvParser(TVector&& header, const char delimeter, const std::optional& nullValue, const std::map* destinationTypes, - const std::map* paramSources) + const std::map* paramSources) : Header(std::move(header)) , Delimeter(delimeter) , NullValue(nullValue) @@ -558,41 +556,91 @@ void TCsvParser::BuildValue(TString& data, TValueBuilder& builder, const TType& builder.EndStruct(); } -TValue TCsvParser::BuildList(std::vector& lines, const TString& filename, std::optional row) const { +TValue TCsvParser::BuildList(const std::vector& lines, const TString& filename, std::optional row) const { std::vector> columnTypeParsers; columnTypeParsers.reserve(ResultColumnCount); for (const TType* type : ResultLineTypesSorted) { columnTypeParsers.push_back(std::make_unique(*type)); } - - Ydb::Value listValue; - auto* listItems = listValue.mutable_items(); + + // Original path with local value object + Ydb::Value valueProto; + auto* listItems = valueProto.mutable_items(); listItems->Reserve(lines.size()); - for (auto& line : lines) { - NCsvFormat::CsvSplitter splitter(line, Delimeter); - TParseMetadata meta {row, filename}; - auto* structItems = listItems->Add()->mutable_items(); - structItems->Reserve(ResultColumnCount); - auto headerIt = Header.cbegin(); - auto skipIt = SkipBitMap.begin(); - auto typeParserIt = columnTypeParsers.begin(); - do { - if (headerIt == Header.cend()) { // SkipBitMap has same size as Header - throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta); - } - TStringBuf nextField = Consume(splitter, meta, *headerIt); - if (!*skipIt) { - *structItems->Add() = FieldToValue(*typeParserIt->get(), nextField, NullValue, meta, *headerIt).GetProto(); - ++typeParserIt; - } - ++headerIt; - ++skipIt; - } while (splitter.Step()); + + for (const auto& line : lines) { + ProcessCsvLine(line, listItems, columnTypeParsers, row, filename); + if (row.has_value()) { + ++row.value(); + } + } + + // Return a TValue that takes ownership via move + return TValue(ResultListType.value(), std::move(valueProto)); +} + +TValue TCsvParser::BuildListOnArena( + const std::vector& lines, + const TString& filename, + google::protobuf::Arena* arena, + std::optional row +) const { + Y_ASSERT(arena != nullptr); + + std::vector> columnTypeParsers; + columnTypeParsers.reserve(ResultColumnCount); + for (const TType* type : ResultLineTypesSorted) { + columnTypeParsers.push_back(std::make_unique(*type)); + } + + // allocate Ydb::Value on arena + Ydb::Value* value = google::protobuf::Arena::CreateMessage(arena); + auto* listItems = value->mutable_items(); + listItems->Reserve(lines.size()); + + for (const auto& line : lines) { + ProcessCsvLine(line, listItems, columnTypeParsers, row, filename); if (row.has_value()) { ++row.value(); } } - return TValue(ResultListType.value(), std::move(listValue)); + + // Return a TValue that references the arena-allocated message + return TValue(ResultListType.value(), value); +} + +// Helper method to process a single CSV line +void TCsvParser::ProcessCsvLine( + const TString& line, + google::protobuf::RepeatedPtrField* listItems, + const std::vector>& columnTypeParsers, + std::optional currentRow, + const TString& filename +) const { + NCsvFormat::CsvSplitter splitter(line, Delimeter); + auto* structItems = listItems->Add()->mutable_items(); + structItems->Reserve(ResultColumnCount); + + const TParseMetadata meta {currentRow, filename}; + + auto headerIt = Header.cbegin(); + auto skipIt = SkipBitMap.begin(); + auto typeParserIt = columnTypeParsers.begin(); + + do { + if (headerIt == Header.cend()) { // SkipBitMap has same size as Header + throw FormatError(yexception() << "Header contains less fields than data. Header: \"" << HeaderRow << "\", data: \"" << line << "\"", meta); + } + TStringBuf nextField = Consume(splitter, meta, *headerIt); + if (!*skipIt) { + TValue builtValue = FieldToValue(*typeParserIt->get(), nextField, NullValue, meta, *headerIt); + *structItems->Add() = std::move(builtValue).ExtractProto(); + + ++typeParserIt; + } + ++headerIt; + ++skipIt; + } while (splitter.Step()); } void TCsvParser::BuildLineType() { @@ -761,5 +809,10 @@ const TVector& TCsvParser::GetHeader() { return Header; } +const TString& TCsvParser::GetHeaderRow() const { + return HeaderRow; } + } +} + diff --git a/ydb/public/lib/ydb_cli/common/csv_parser.h b/ydb/public/lib/ydb_cli/common/csv_parser.h index f8ade2b8e9f6..a344a27d1b43 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser.h +++ b/ydb/public/lib/ydb_cli/common/csv_parser.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -66,10 +67,20 @@ class TCsvParser { void BuildParams(TString& data, TParamsBuilder& builder, const TParseMetadata& meta) const; void BuildValue(TString& data, TValueBuilder& builder, const TType& type, const TParseMetadata& meta) const; - TValue BuildList(std::vector& lines, const TString& filename, + + TValue BuildList(const std::vector& lines, const TString& filename, std::optional row = std::nullopt) const; + + TValue BuildListOnArena( + const std::vector& lines, + const TString& filename, + google::protobuf::Arena* arena, + std::optional row = std::nullopt + ) const; + void BuildLineType(); const TVector& GetHeader(); + const TString& GetHeaderRow() const; void ParseLineTypes(TString& line, TPossibleTypes& possibleTypes, const TParseMetadata& meta); private: @@ -92,6 +103,15 @@ class TCsvParser { // Types of columns in a single row in resulting TValue. // Column order according to the header, though can have less elements than the Header std::vector ResultLineTypesSorted; + + // Helper method to process a single line of CSV data + void ProcessCsvLine( + const TString& line, + google::protobuf::RepeatedPtrField* listItems, + const std::vector>& columnTypeParsers, + std::optional currentRow, + const TString& filename + ) const; }; } diff --git a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp index ffe3de88bf27..b84553a6b531 100644 --- a/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp +++ b/ydb/public/lib/ydb_cli/common/csv_parser_ut.cpp @@ -317,7 +317,7 @@ Y_UNIT_TEST_SUITE(YdbCliCsvParserTests) { {"col2", TTypeBuilder().BeginOptional().Primitive(EPrimitiveType::Int64).EndOptional().Build()}, {"col3", TTypeBuilder().Primitive(EPrimitiveType::Bool).Build()}, }; - + TString csvHeader = "col4,col3,col5,col1,col6"; std::vector data = { "col4 unused value,true,col5 unused value,col1 value,col6 unused value" diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index 9ade22b16fa9..4a509be3925f 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -11,6 +11,8 @@ #include #include +#include + #include #include #include @@ -25,6 +27,7 @@ #include #include #include +#include #include #include @@ -50,6 +53,7 @@ #include #endif +#include namespace NYdb { namespace NConsoleClient { @@ -557,6 +561,16 @@ class TImportFileClient::TImpl { std::shared_ptr progressFile); TAsyncStatus UpsertTValueBuffer(const TString& dbPath, TValueBuilder& builder); TAsyncStatus UpsertTValueBuffer(const TString& dbPath, std::function&& buildFunc); + + TAsyncStatus UpsertTValueBufferParquet( + const TString& dbPath, + std::shared_ptr batch, + const arrow::ipc::IpcWriteOptions& writeOptions + ); + + TAsyncStatus UpsertTValueBufferOnArena( + const TString& dbPath, std::function&& buildFunc); + TStatus UpsertJson(IInputStream &input, const TString &dbPath, std::optional inputSizeHint, ProgressCallbackFunc & progressCallback); TStatus UpsertParquet(const TString& filename, const TString& dbPath, ProgressCallbackFunc & progressCallback); @@ -682,7 +696,7 @@ TStatus TImportFileClient::TImpl::Import(const TVector& filePaths, cons auto start = TInstant::Now(); - TThreadPool jobPool; + TThreadPool jobPool(IThreadPool::TParams().SetThreadNamePrefix("FileWorker")); jobPool.Start(filePathsSize); TVector> asyncResults; @@ -970,7 +984,7 @@ TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, // to prevent copying data in retryFunc in a happy way when there is only one request TValue builtValue = prebuiltValue.has_value() ? std::move(prebuiltValue.value()) : buildFunc(); prebuiltValue = std::nullopt; - return tableClient.BulkUpsert(dbPath, std::move(builtValue), UpsertSettings) + return tableClient.BulkUpsertUnretryableUnsafe(dbPath, std::move(builtValue), UpsertSettings) .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { NYdb::TStatus status = bulkUpsertResult.GetValueSync(); return NThreading::MakeFuture(status); @@ -1003,6 +1017,97 @@ TAsyncStatus TImportFileClient::TImpl::UpsertTValueBuffer(const TString& dbPath, }); } +inline TAsyncStatus TImportFileClient::TImpl::UpsertTValueBufferParquet( + const TString& dbPath, + std::shared_ptr batch, + const arrow::ipc::IpcWriteOptions& writeOptions +) { + if (!RequestsInflight->try_acquire()) { + if (Settings.Verbose_ && Settings.NewlineDelimited_) { + if (!InformedAboutLimit.exchange(true)) { + Cerr << (TStringBuilder() << "@ (each '@' means max request inflight is reached and a worker thread is waiting for " + "any response from database)" << Endl); + } else { + Cerr << '@'; + } + } + RequestsInflight->acquire(); + } + + auto retryFunc = [parquet = NYdb_cli::NArrow::SerializeBatch(batch, writeOptions), + schema = NYdb_cli::NArrow::SerializeSchema(*batch->schema()), + dbPath](NTable::TTableClient& client) { + return client.BulkUpsert(dbPath, NTable::EDataFormat::ApacheArrow, parquet, schema) + .Apply([](const NTable::TAsyncBulkUpsertResult& result) { + return TStatus(result.GetValueSync()); + }); + }; + + return TableClient->RetryOperation(std::move(retryFunc), RetrySettings) + .Apply([this](const TAsyncStatus& asyncStatus) { + NYdb::TStatus status = asyncStatus.GetValueSync(); + if (!status.IsSuccess()) { + if (!Failed.exchange(true)) { + ErrorStatus = MakeHolder(status); + } + } + RequestsInflight->release(); + return asyncStatus; + }); +} + +inline TAsyncStatus TImportFileClient::TImpl::UpsertTValueBufferOnArena( + const TString& dbPath, std::function&& buildFunc) { + auto arena = std::make_shared(); + + // For the first attempt values are built before acquiring request inflight semaphore + std::optional prebuiltValue = buildFunc(arena.get()); + + auto retryFunc = [this, &dbPath, buildFunc = std::move(buildFunc), + prebuiltValue = std::move(prebuiltValue), arena = std::move(arena)] + (NYdb::NTable::TTableClient& tableClient) mutable -> TAsyncStatus { + auto buildTValueAndSendRequest = [this, &buildFunc, &dbPath, &tableClient, &prebuiltValue, arena]() { + // For every retry attempt after first request build value from strings again + // to prevent copying data in retryFunc in a happy way when there is only one request + TValue builtValue = prebuiltValue.has_value() ? std::move(prebuiltValue.value()) : buildFunc(arena.get()); + prebuiltValue = std::nullopt; + + return tableClient.BulkUpsertUnretryableArenaAllocatedUnsafe( + dbPath, std::move(builtValue), arena.get(), UpsertSettings) + .Apply([](const NYdb::NTable::TAsyncBulkUpsertResult& bulkUpsertResult) { + NYdb::TStatus status = bulkUpsertResult.GetValueSync(); + return NThreading::MakeFuture(status); + }); + }; + // Running heavy building task on processing pool: + return NThreading::Async(std::move(buildTValueAndSendRequest), *ProcessingPool); + }; + + if (!RequestsInflight->try_acquire()) { + if (Settings.Verbose_ && Settings.NewlineDelimited_) { + if (!InformedAboutLimit.exchange(true)) { + Cerr << (TStringBuilder() << "@ (each '@' means max request inflight is reached and a worker thread is waiting for " + "any response from database)" << Endl); + } else { + Cerr << '@'; + } + } + RequestsInflight->acquire(); + } + + return TableClient->RetryOperation(std::move(retryFunc), RetrySettings) + .Apply([this](const TAsyncStatus& asyncStatus) { + NYdb::TStatus status = asyncStatus.GetValueSync(); + if (!status.IsSuccess()) { + if (!Failed.exchange(true)) { + ErrorStatus = MakeHolder(status); + } + } + RequestsInflight->release(); + return asyncStatus; + }); +} + TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input, const TString& dbPath, const TString& filePath, @@ -1064,30 +1169,113 @@ TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input, } }; + // Note: table = dbPath (path to the table on the server) + auto columns = DbTableInfo->GetTableColumns(); + + const Ydb::Formats::CsvSettings csvSettings = ([this]() { + Ydb::Formats::CsvSettings settings; + settings.set_delimiter(Settings.Delimiter_); + settings.set_header(Settings.Header_); + if (Settings.NullValue_.has_value()) { + settings.set_null_value(Settings.NullValue_.value()); + } + settings.set_skip_rows(Settings.SkipRows_); + return settings; + }()); + + auto writeOptions = arrow::ipc::IpcWriteOptions::Defaults(); + constexpr auto codecType = arrow::Compression::type::ZSTD; + writeOptions.codec = *arrow::util::Codec::Create(codecType); + + // determines if we should fallback to YDB Types (TValue with Protobuf arena) when Apache Arrow failed. + // Apache Arrow tends to be faster but doesn't support all YDB types (e.g. TzDatetime). On the other hand, + // YDB Types are slower but support all YDB types. + std::atomic_bool fallbackToYDBTypes = false; + + auto upsertCsvFunc = [&](std::vector&& buffer, ui64 row, std::shared_ptr batchStatus) { - auto buildFunc = [&, buffer = std::move(buffer), row, this] () mutable { - try { - return parser.BuildList(buffer, filePath, row); - } catch (const std::exception& e) { - if (!Failed.exchange(true)) { - ErrorStatus = MakeHolder(MakeStatus(EStatus::INTERNAL_ERROR, e.what())); + // by default, we attempt to send data via Apache Arrow, if it fails, we fallback to YDB Types (TValue with Protobuf arena) + if (!fallbackToYDBTypes.load()) { + const i64 estimatedCsvLineLength = (!buffer.empty() ? 2 * buffer.front().size() : 10'000); + TStringBuilder data; + data.reserve((buffer.size() + (Settings.Header_ ? 1 : 0)) * estimatedCsvLineLength); + // insert header if it is present in the given csv file + if (Settings.Header_) { + data << parser.GetHeaderRow() << Endl; + } + data << JoinSeq("\n", buffer) << Endl; + + // I: attempt to send data via Apache Arrow + // if header is present, it is expected to be the first line of the data + TString error; + auto arrowCsv = NKikimr::NFormats::TArrowCSVTable::Create(columns, Settings.Header_); + if (arrowCsv.ok()) { + if (auto batch = arrowCsv->ReadSingleBatch(data, csvSettings, error)) { + if (!error) { + // batch was read successfully, sending data via Apache Arrow + UpsertTValueBufferParquet(dbPath, std::move(batch), writeOptions) + .Apply([&, batchStatus](const TAsyncStatus& asyncStatus) { + jobInflightManager->ReleaseJob(); + if (asyncStatus.GetValueSync().IsSuccess()) { + batchStatus->Completed = true; + if (!FileProgressPool->AddFunc(saveProgressIfAny) && !Failed.exchange(true)) { + ErrorStatus = MakeHolder(MakeStatus(EStatus::INTERNAL_ERROR, + "Couldn't add worker func to save progress")); + } + } + return asyncStatus; + }); + } + else { + error = "Error while reading a batch from Apache Arrow: " + error; + fallbackToYDBTypes.store(true); + } + } + else { + error = "Could not read a batch from Apache Arrow"; + fallbackToYDBTypes.store(true); } - jobInflightManager->ReleaseJob(); - throw; } - }; - UpsertTValueBuffer(dbPath, std::move(buildFunc)) - .Apply([&, batchStatus](const TAsyncStatus& asyncStatus) { - jobInflightManager->ReleaseJob(); - if (asyncStatus.GetValueSync().IsSuccess()) { - batchStatus->Completed = true; - if (!FileProgressPool->AddFunc(saveProgressIfAny) && !Failed.exchange(true)) { - ErrorStatus = MakeHolder(MakeStatus(EStatus::INTERNAL_ERROR, - "Couldn't add worker func to save progress")); + else { + error = arrowCsv.status().ToString(); + fallbackToYDBTypes.store(true); + } + + // print the error if fallback requested + if (fallbackToYDBTypes.load()) { + std::cerr << "Error when trying to import via Apache Arrow: '" << error << "'.\n" + << "Falling back to import via YDB Types (no action needed from your side)." << std::endl; + } + } + + // check if fallback requested (after first Apache Arrow failure, we will always fallback to YDB Types) + if (fallbackToYDBTypes.load()) { + // II: fallback to TValue with Protobuf arena because Apache Arrow failed + auto buildOnArenaFunc = [&, buffer = std::move(buffer), row, this] (google::protobuf::Arena* arena) mutable { + try { + return parser.BuildListOnArena(buffer, filePath, arena, row); + } catch (const std::exception& e) { + if (!Failed.exchange(true)) { + ErrorStatus = MakeHolder(MakeStatus(EStatus::INTERNAL_ERROR, e.what())); } + jobInflightManager->ReleaseJob(); + throw; } - return asyncStatus; - }); + }; + + UpsertTValueBufferOnArena(dbPath, std::move(buildOnArenaFunc)) + .Apply([&, batchStatus](const TAsyncStatus& asyncStatus) { + jobInflightManager->ReleaseJob(); + if (asyncStatus.GetValueSync().IsSuccess()) { + batchStatus->Completed = true; + if (!FileProgressPool->AddFunc(saveProgressIfAny) && !Failed.exchange(true)) { + ErrorStatus = MakeHolder(MakeStatus(EStatus::INTERNAL_ERROR, + "Couldn't add worker func to save progress")); + } + } + return asyncStatus; + }); + } }; for (ui32 i = 0; i < rowsToSkip; ++i) { @@ -1115,7 +1303,7 @@ TStatus TImportFileClient::TImpl::UpsertCsv(IInputStream& input, line.erase(line.size() - Settings.Delimiter_.size()); } - buffer.push_back(line); + buffer.push_back(std::move(line)); if (readBytes >= nextReadBorder && Settings.Verbose_) { nextReadBorder += VerboseModeStepSize; diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h index 01bccbc00c03..3e1dcf28dc15 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h @@ -1260,6 +1260,75 @@ class TTableClient { //! Using the client after call this method causes UB NThreading::TFuture Stop(); + /** + * @brief Upsert data into table moving `Ydb::Value` from `rows` into request proto model. + * + * This method is not retryable with the same `rows` object because it moves the underlying data + * from the TValue object into the request proto model. The TValue class uses a shared pointer + * to store its implementation data, so when you make a copy of a TValue object, both copies + * share the same underlying data. When this method moves the data from one TValue instance, + * all other instances sharing the same implementation pointer will be left with moved-from data. + * + * Example of incorrect usage: + * ``` + * TValue originalValue = BuildValue(); + * retry { + * TValue copy = originalValue; // Both originalValue and copy share the same data + * client.BulkUpsertUnretryableUnsafe(std::move(copy)); // Moves data from copy, leaving originalValue in moved-from state + * } + * ``` + * CAUTION: It is unsafe to use the instance of `originalValue` because its underlying data is moved! + * + * To safely retry the operation, you need to create a new TValue with fresh data for each attempt. + */ + TAsyncBulkUpsertResult BulkUpsertUnretryableUnsafe(const std::string& table, TValue&& rows, + const TBulkUpsertSettings& settings); + + /** + * @brief Upsert data into table moving `Ydb::Value` from `rows` into request proto model. The `Ydb::Value` of `TValue` must be arena-allocated. + * + * This method is not retryable with the same `rows` object because it moves the underlying data + * from the `TValue` object into the request proto model. The `TValue` class uses a shared pointer + * to store its implementation data, so when you make a copy of a `TValue` object, both copies + * share the same underlying data. When this method moves the data from one `TValue` instance, + * all other instances sharing the same implementation pointer will be left with moved-from data. + * + * IMPORTANT: The `rows` parameter must contain arena-allocated protobuf data. This method + * should only be called when the `TValue`'s underlying data was allocated using the provided + * protobuf arena. Using this method with non-arena-allocated data will lead to undefined behavior. + * It is the caller's responsibility to ensure that the lifetime of the used arena is longer than the lifetime of the `TValue` object during the operation. + * + * @see: https://protobuf.dev/reference/cpp/arenas + * + * Example of incorrect usage: + * ``` + * TValue originalValue = BuildValue(); // Not arena-allocated + * retry { + * TValue copy = originalValue; // Both originalValue and copy share the same data + * client.BulkUpsertUnretryableArenaAllocated(std::move(copy), arena); // WRONG: data not arena-allocated + * } + * ``` + * + * Example of correct usage: + * ``` + * google::protobuf::Arena arena; + * TValue arenaValue = BuildValueOnArena(&arena); // Value built using arena allocation + * client.BulkUpsertUnretryableArenaAllocated(std::move(arenaValue), &arena); + * ``` + * + * To safely retry the operation, you need to create a new `TValue` with fresh arena-allocated data for each attempt. + * + * @param arena The protobuf arena that was used to allocate the data in `rows`. Must be the same + * arena that was used to create the TValue's underlying data. + * @param settings The settings for the bulk upsert operation. + */ + TAsyncBulkUpsertResult BulkUpsertUnretryableArenaAllocatedUnsafe( + const std::string& table, + TValue&& rows, + google::protobuf::Arena* arena, + const TBulkUpsertSettings& settings + ); + //! Non-transactional fast bulk write. //! Interanlly it uses an implicit session and thus doesn't need a session to be passed. //! "rows" parameter must be a list of structs where each stuct represents one row. diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h index 61eb9ce2c0f2..4c512390dc41 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h @@ -29,6 +29,8 @@ class TType { const Ydb::Type& GetProto() const; Ydb::Type& GetProto(); + Ydb::Type&& ExtractProto() &&; + private: class TImpl; std::shared_ptr Impl_; @@ -269,13 +271,15 @@ class TValue { public: TValue(const TType& type, const Ydb::Value& valueProto); TValue(const TType& type, Ydb::Value&& valueProto); + TValue(const TType& type, Ydb::Value* arenaAllocatedValueProto); const TType& GetType() const; - TType & GetType(); + TType& GetType(); const Ydb::Value& GetProto() const; Ydb::Value& GetProto(); + Ydb::Value&& ExtractProto() &&; private: class TImpl; std::shared_ptr Impl_; diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h index f1a23631c94e..0a608fe85171 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -149,131 +149,33 @@ class TGRpcConnectionsImpl const TRpcRequestSettings& requestSettings, std::shared_ptr context = nullptr) { - using NYdbGrpc::TGrpcStatus; - using TConnection = std::unique_ptr>; - Y_ABORT_UNLESS(dbState); - - if (!TryCreateContext(context)) { - TPlainStatus status(EStatus::CLIENT_CANCELLED, "Client is stopped"); - userResponseCb(nullptr, TPlainStatus{status.Status, std::move(status.Issues)}); - return; - } - - if (dbState->StatCollector.IsCollecting()) { - std::weak_ptr weakState = dbState; - const auto startTime = TInstant::Now(); - userResponseCb = std::move([cb = std::move(userResponseCb), weakState, startTime](TResponse* response, TPlainStatus status) { - const auto resultSize = response ? response->ByteSizeLong() : 0; - cb(response, status); - - if (auto state = weakState.lock()) { - state->StatCollector.IncRequestLatency(TInstant::Now() - startTime); - state->StatCollector.IncResultSize(resultSize); - } - }); - } - - WithServiceConnection( - [this, request = std::move(request), userResponseCb = std::move(userResponseCb), rpc, requestSettings, context = std::move(context), dbState] - (TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable -> void { - if (!status.Ok()) { - userResponseCb( - nullptr, - std::move(status)); - return; - } - - TCallMeta meta; - meta.Timeout = requestSettings.ClientTimeout; - #ifndef YDB_GRPC_UNSECURE_AUTH - meta.CallCredentials = dbState->CallCredentials; - #else - if (requestSettings.UseAuth && dbState->CredentialsProvider && dbState->CredentialsProvider->IsValid()) { - try { - meta.Aux.push_back({ YDB_AUTH_TICKET_HEADER, GetAuthInfo(dbState) }); - } catch (const std::exception& e) { - userResponseCb( - nullptr, - TPlainStatus( - EStatus::CLIENT_UNAUTHENTICATED, - TStringBuilder() << "Can't get Authentication info from CredentialsProvider. " << e.what() - ) - ); - return; - } - } - #endif - if (!requestSettings.TraceId.empty()) { - meta.Aux.push_back({YDB_TRACE_ID_HEADER, requestSettings.TraceId}); - } - - if (!requestSettings.RequestType.empty()) { - meta.Aux.push_back({YDB_REQUEST_TYPE_HEADER, requestSettings.RequestType}); - } - - if (!dbState->Database.empty()) { - SetDatabaseHeader(meta, dbState->Database); - } - - static const std::string clientPid = GetClientPIDHeaderValue(); - - meta.Aux.push_back({YDB_SDK_BUILD_INFO_HEADER, CreateSDKBuildInfo()}); - meta.Aux.push_back({YDB_CLIENT_PID, clientPid}); - meta.Aux.insert(meta.Aux.end(), requestSettings.Header.begin(), requestSettings.Header.end()); - - dbState->StatCollector.IncGRpcInFlight(); - dbState->StatCollector.IncGRpcInFlightByHost(endpoint.GetEndpoint()); - - NYdbGrpc::TAdvancedResponseCallback responseCbLow = - [this, context, userResponseCb = std::move(userResponseCb), endpoint, dbState] - (const grpc::ClientContext& ctx, TGrpcStatus&& grpcStatus, TResponse&& response) mutable -> void { - dbState->StatCollector.DecGRpcInFlight(); - dbState->StatCollector.DecGRpcInFlightByHost(endpoint.GetEndpoint()); - - if (NYdbGrpc::IsGRpcStatusGood(grpcStatus)) { - std::multimap metadata; - - for (const auto& [name, value] : ctx.GetServerInitialMetadata()) { - metadata.emplace( - std::string(name.begin(), name.end()), - std::string(value.begin(), value.end())); - } - for (const auto& [name, value] : ctx.GetServerTrailingMetadata()) { - metadata.emplace( - std::string(name.begin(), name.end()), - std::string(value.begin(), value.end())); - } - - auto resp = new TResult( - std::move(response), - std::move(grpcStatus), - std::move(userResponseCb), - this, - std::move(context), - endpoint.GetEndpoint(), - std::move(metadata)); - - EnqueueResponse(resp); - } else { - dbState->StatCollector.IncReqFailDueTransportError(); - dbState->StatCollector.IncTransportErrorsByHost(endpoint.GetEndpoint()); - - auto resp = new TGRpcErrorResponse( - std::move(grpcStatus), - std::move(userResponseCb), - this, - std::move(context), - endpoint.GetEndpoint()); - - dbState->EndpointPool.BanEndpoint(endpoint.GetEndpoint()); - - EnqueueResponse(resp); - } - }; + RunImpl( + std::forward(request), + std::move(userResponseCb), + rpc, + dbState, + requestSettings, + std::move(context) + ); + } - serviceConnection->DoAdvancedRequest(std::move(request), std::move(responseCbLow), rpc, meta, - context.get()); - }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy); + template + void RunOnArena( + TRequest* request, + TResponseCb&& userResponseCb, + TSimpleRpc rpc, + TDbDriverStatePtr dbState, + const TRpcRequestSettings& requestSettings, + std::shared_ptr context = nullptr) + { + RunImpl( + request, + std::move(userResponseCb), + rpc, + dbState, + requestSettings, + std::move(context) + ); } template @@ -285,48 +187,39 @@ class TGRpcConnectionsImpl TDuration deferredTimeout, const TRpcRequestSettings& requestSettings, bool poll = false, - std::shared_ptr context = nullptr) - { - if (!TryCreateContext(context)) { - TPlainStatus status(EStatus::CLIENT_CANCELLED, "Client is stopped"); - userResponseCb(nullptr, status); - return; - } - - auto responseCb = [this, userResponseCb = std::move(userResponseCb), dbState, deferredTimeout, poll, context] - (TResponse* response, TPlainStatus status) mutable - { - if (response) { - Ydb::Operations::Operation* operation = response->mutable_operation(); - if (!operation->ready() && poll) { - auto action = MakeIntrusive( - operation->id(), - std::move(userResponseCb), - this, - std::move(context), - deferredTimeout, - dbState, - status.Endpoint); - - action->Start(); - } else { - NYdb::NIssue::TIssues opIssues; - NYdb::NIssue::IssuesFromMessage(operation->issues(), opIssues); - userResponseCb(operation, TPlainStatus{static_cast(operation->status()), std::move(opIssues), - status.Endpoint, std::move(status.Metadata)}); - } - } else { - userResponseCb(nullptr, status); - } - }; + std::shared_ptr context = nullptr) { + RunDeferredImpl( + std::forward(request), + std::move(userResponseCb), + rpc, + dbState, + deferredTimeout, + requestSettings, + poll, + std::move(context) + ); + } - Run( - std::move(request), - responseCb, + template + void RunDeferredOnArena( + TRequest* request, + TDeferredOperationCb&& userResponseCb, + TSimpleRpc rpc, + TDbDriverStatePtr dbState, + TDuration deferredTimeout, + const TRpcRequestSettings& requestSettings, + bool poll = false, + std::shared_ptr context = nullptr) { + RunDeferredImpl( + request, + std::move(userResponseCb), rpc, dbState, + deferredTimeout, requestSettings, - std::move(context)); + poll, + std::move(context) + ); } // Run request using discovery endpoint. @@ -375,7 +268,37 @@ class TGRpcConnectionsImpl }; RunDeferred( - std::move(request), + std::forward(request), + operationCb, + rpc, + dbState, + deferredTimeout, + requestSettings, + true, // poll + context); + } + +template + void RunDeferredOnArena( + TRequest* request, + TDeferredResultCb&& userResponseCb, + TSimpleRpc rpc, + TDbDriverStatePtr dbState, + TDuration deferredTimeout, + const TRpcRequestSettings& requestSettings, + std::shared_ptr context = nullptr) + { + auto operationCb = [userResponseCb = std::move(userResponseCb)](Ydb::Operations::Operation* operation, TPlainStatus status) mutable { + if (operation) { + status.SetCostInfo(std::move(*operation->mutable_cost_info())); + userResponseCb(operation->mutable_result(), std::move(status)); + } else { + userResponseCb(nullptr, std::move(status)); + } + }; + + RunDeferredOnArena( + request, operationCb, rpc, dbState, @@ -606,6 +529,249 @@ class TGRpcConnectionsImpl const TLog& GetLog() const override; private: + template + void RunImpl( + std::conditional_t request, + TResponseCb&& userResponseCb, + TSimpleRpc rpc, + TDbDriverStatePtr dbState, + const TRpcRequestSettings& requestSettings, + std::shared_ptr context = nullptr) + { + using NYdbGrpc::TGrpcStatus; + using TConnection = std::unique_ptr>; + Y_ABORT_UNLESS(dbState); + + if (!TryCreateContext(context)) { + TPlainStatus status(EStatus::CLIENT_CANCELLED, "Client is stopped"); + userResponseCb(nullptr, TPlainStatus{status.Status, std::move(status.Issues)}); + return; + } + + if (dbState->StatCollector.IsCollecting()) { + std::weak_ptr weakState = dbState; + const auto startTime = TInstant::Now(); + userResponseCb = std::move([cb = std::move(userResponseCb), weakState, startTime](TResponse* response, TPlainStatus status) { + const auto resultSize = response ? response->ByteSizeLong() : 0; + cb(response, status); + + if (auto state = weakState.lock()) { + state->StatCollector.IncRequestLatency(TInstant::Now() - startTime); + state->StatCollector.IncResultSize(resultSize); + } + }); + } + + auto createMeta = [requestSettings, dbState](const TResponseCb& userResponseCb) -> std::optional { + TCallMeta meta; + meta.Timeout = requestSettings.ClientTimeout; + #ifndef YDB_GRPC_UNSECURE_AUTH + meta.CallCredentials = dbState->CallCredentials; + #else + if (requestSettings.UseAuth && dbState->CredentialsProvider && dbState->CredentialsProvider->IsValid()) { + try { + meta.Aux.push_back({ YDB_AUTH_TICKET_HEADER, GetAuthInfo(dbState) }); + } catch (const std::exception& e) { + userResponseCb( + nullptr, + TPlainStatus( + EStatus::CLIENT_UNAUTHENTICATED, + TStringBuilder() << "Can't get Authentication info from CredentialsProvider. " << e.what() + ) + ); + return std::nullopt; + } + } + #endif + if (!requestSettings.TraceId.empty()) { + meta.Aux.push_back({YDB_TRACE_ID_HEADER, requestSettings.TraceId}); + } + + if (!requestSettings.RequestType.empty()) { + meta.Aux.push_back({YDB_REQUEST_TYPE_HEADER, requestSettings.RequestType}); + } + + if (!dbState->Database.empty()) { + SetDatabaseHeader(meta, dbState->Database); + } + + static const std::string clientPid = GetClientPIDHeaderValue(); + + meta.Aux.push_back({YDB_SDK_BUILD_INFO_HEADER, CreateSDKBuildInfo()}); + meta.Aux.push_back({YDB_CLIENT_PID, clientPid}); + meta.Aux.insert(meta.Aux.end(), requestSettings.Header.begin(), requestSettings.Header.end()); + + return meta; + }; + + auto createResponseCbLow = [this, dbState]( + const TResponseCb& userResponseCb, + const TEndpointKey& endpoint, + std::shared_ptr context) -> NYdbGrpc::TAdvancedResponseCallback { + dbState->StatCollector.IncGRpcInFlight(); + dbState->StatCollector.IncGRpcInFlightByHost(endpoint.GetEndpoint()); + + NYdbGrpc::TAdvancedResponseCallback responseCbLow = + [this, context, userResponseCb = std::move(userResponseCb), endpoint, dbState] + (const grpc::ClientContext& ctx, TGrpcStatus&& grpcStatus, TResponse&& response) mutable -> void { + dbState->StatCollector.DecGRpcInFlight(); + dbState->StatCollector.DecGRpcInFlightByHost(endpoint.GetEndpoint()); + + if (NYdbGrpc::IsGRpcStatusGood(grpcStatus)) { + std::multimap metadata; + + for (const auto& [name, value] : ctx.GetServerInitialMetadata()) { + metadata.emplace( + std::string(name.begin(), name.end()), + std::string(value.begin(), value.end())); + } + for (const auto& [name, value] : ctx.GetServerTrailingMetadata()) { + metadata.emplace( + std::string(name.begin(), name.end()), + std::string(value.begin(), value.end())); + } + + auto resp = new TResult( + std::move(response), + std::move(grpcStatus), + std::move(userResponseCb), + this, + std::move(context), + endpoint.GetEndpoint(), + std::move(metadata)); + + EnqueueResponse(resp); + } else { + dbState->StatCollector.IncReqFailDueTransportError(); + dbState->StatCollector.IncTransportErrorsByHost(endpoint.GetEndpoint()); + + auto resp = new TGRpcErrorResponse( + std::move(grpcStatus), + std::move(userResponseCb), + this, + std::move(context), + endpoint.GetEndpoint()); + + dbState->EndpointPool.BanEndpoint(endpoint.GetEndpoint()); + + EnqueueResponse(resp); + } + }; + + return responseCbLow; + }; + + if constexpr (RequestOnArena) { + WithServiceConnection( + [request, userResponseCb = std::move(userResponseCb), rpc, requestSettings, context = std::move(context), dbState, + createMeta = std::move(createMeta), createResponseCbLow = std::move(createResponseCbLow)] + (TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable -> void { + if (!status.Ok()) { + userResponseCb( + nullptr, + std::move(status)); + return; + } + + std::optional metaOpt = createMeta(userResponseCb); + if (!metaOpt) { + return; + } + auto meta = std::move(*metaOpt); + auto responseCbLow = createResponseCbLow(userResponseCb, endpoint, context); + + serviceConnection->DoAdvancedRequest(*request, std::move(responseCbLow), rpc, meta, context.get()); + }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy + ); + } + else { + WithServiceConnection( + [request, userResponseCb = std::move(userResponseCb), rpc, requestSettings, context = std::move(context), dbState, + createMeta = std::move(createMeta), createResponseCbLow = std::move(createResponseCbLow)] + (TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable -> void { + if (!status.Ok()) { + userResponseCb( + nullptr, + std::move(status)); + return; + } + + std::optional metaOpt = createMeta(userResponseCb); + if (!metaOpt) { + return; + } + auto meta = std::move(*metaOpt); + auto responseCbLow = createResponseCbLow(userResponseCb, endpoint, context); + + serviceConnection->DoAdvancedRequest(std::move(request), std::move(responseCbLow), rpc, meta, context.get()); + }, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy + ); + } + } + + template + void RunDeferredImpl( + std::conditional_t request, + TDeferredOperationCb&& userResponseCb, + TSimpleRpc rpc, + TDbDriverStatePtr dbState, + TDuration deferredTimeout, + const TRpcRequestSettings& requestSettings, + bool poll = false, + std::shared_ptr context = nullptr) { + if (!TryCreateContext(context)) { + TPlainStatus status(EStatus::CLIENT_CANCELLED, "Client is stopped"); + userResponseCb(nullptr, status); + return; + } + + auto responseCb = [this, userResponseCb = std::move(userResponseCb), dbState, deferredTimeout, poll, context] + (TResponse* response, TPlainStatus status) mutable + { + if (response) { + Ydb::Operations::Operation* operation = response->mutable_operation(); + if (!operation->ready() && poll) { + auto action = MakeIntrusive( + operation->id(), + std::move(userResponseCb), + this, + std::move(context), + deferredTimeout, + dbState, + status.Endpoint); + + action->Start(); + } else { + NYdb::NIssue::TIssues opIssues; + NYdb::NIssue::IssuesFromMessage(operation->issues(), opIssues); + userResponseCb(operation, TPlainStatus{static_cast(operation->status()), std::move(opIssues), + status.Endpoint, std::move(status.Metadata)}); + } + } else { + userResponseCb(nullptr, status); + } + }; + + if constexpr (RequestOnArena) { + RunOnArena( + request, + responseCb, + rpc, + dbState, + requestSettings, + std::move(context)); + } + else { + Run( + std::forward(request), + responseCb, + rpc, + dbState, + requestSettings, + std::move(context)); + } + } + template void WithServiceConnection(TCallback callback, TDbDriverStatePtr dbState, const TEndpointKey& preferredEndpoint, TRpcRequestSettings::TEndpointPolicy endpointPolicy) diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/make_request/make.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/make_request/make.h index d8dd35dbe6b7..742e6a8a53e0 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/make_request/make.h +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/make_request/make.h @@ -46,4 +46,18 @@ TProtoRequest MakeOperationRequest(const TRequestSettings& settings) { return request; } + +template +TProtoRequest* MakeRequestOnArena(google::protobuf::Arena* arena) { + return google::protobuf::Arena::CreateMessage(arena); +} + +template +TProtoRequest* MakeOperationRequestOnArena(const TRequestSettings& settings, google::protobuf::Arena* arena) { + Y_ASSERT(arena != nullptr); + auto request = MakeRequestOnArena(arena); + FillOperationParams(settings, *request); + return request; +} + } // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/table/impl/table_client.cpp b/ydb/public/sdk/cpp/src/client/table/impl/table_client.cpp index 64fa171dd0a3..7bc8b9d98228 100644 --- a/ydb/public/sdk/cpp/src/client/table/impl/table_client.cpp +++ b/ydb/public/sdk/cpp/src/client/table/impl/table_client.cpp @@ -989,6 +989,64 @@ void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TCli SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing); } +TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsertUnretryableUnsafe(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings) { + auto request = MakeOperationRequest(settings); + request.set_table(TStringType{table}); + *request.mutable_rows()->mutable_type() = std::move(rows.GetType()).ExtractProto(); + *request.mutable_rows()->mutable_value() = std::move(rows).ExtractProto(); + + auto promise = NewPromise(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Y_UNUSED(any); + TBulkUpsertResult val(TStatus(std::move(status))); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred( + std::move(request), + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); +} + +TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsertUnretryableArenaAllocatedUnsafe( + const std::string& table, + TValue&& rows, + google::protobuf::Arena* arena, + const TBulkUpsertSettings& settings +) { + auto request = MakeOperationRequestOnArena(settings, arena); + request->set_table(TStringType{table}); + *request->mutable_rows()->mutable_type() = std::move(rows.GetType()).ExtractProto(); + *request->mutable_rows()->mutable_value() = std::move(rows).ExtractProto(); + + auto promise = NewPromise(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Y_UNUSED(any); + TBulkUpsertResult val(TStatus(std::move(status))); + promise.SetValue(std::move(val)); + }; + + // don't give ownership of request to the function + Connections_->RunDeferredOnArena( + request, + extractor, + &Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); +} + TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings) { auto request = MakeOperationRequest(settings); request.set_table(TStringType{table}); diff --git a/ydb/public/sdk/cpp/src/client/table/impl/table_client.h b/ydb/public/sdk/cpp/src/client/table/impl/table_client.h index b269c24dedb1..2a7e8c43a9e2 100644 --- a/ydb/public/sdk/cpp/src/client/table/impl/table_client.h +++ b/ydb/public/sdk/cpp/src/client/table/impl/table_client.h @@ -138,6 +138,15 @@ class TTableClient::TImpl: public TClientImplCommon, public void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector); + TAsyncBulkUpsertResult BulkUpsertUnretryableUnsafe(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings); + + TAsyncBulkUpsertResult BulkUpsertUnretryableArenaAllocatedUnsafe( + const std::string& table, + TValue&& rows, + google::protobuf::Arena* arena, + const TBulkUpsertSettings& settings + ); + TAsyncBulkUpsertResult BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings); TAsyncBulkUpsertResult BulkUpsert(const std::string& table, EDataFormat format, const std::string& data, const std::string& schema, const TBulkUpsertSettings& settings); diff --git a/ydb/public/sdk/cpp/src/client/table/table.cpp b/ydb/public/sdk/cpp/src/client/table/table.cpp index 65b271ddc04d..a6af236d313c 100644 --- a/ydb/public/sdk/cpp/src/client/table/table.cpp +++ b/ydb/public/sdk/cpp/src/client/table/table.cpp @@ -1457,6 +1457,21 @@ NThreading::TFuture TTableClient::Stop() { return Impl_->Stop(); } +TAsyncBulkUpsertResult TTableClient::BulkUpsertUnretryableUnsafe(const std::string& table, TValue&& rows, + const TBulkUpsertSettings& settings) +{ + return Impl_->BulkUpsertUnretryableUnsafe(table, std::move(rows), settings); +} + +TAsyncBulkUpsertResult TTableClient::BulkUpsertUnretryableArenaAllocatedUnsafe( + const std::string& table, + TValue&& rows, + google::protobuf::Arena* arena, + const TBulkUpsertSettings& settings) +{ + return Impl_->BulkUpsertUnretryableArenaAllocatedUnsafe(table, std::move(rows), arena, settings); +} + TAsyncBulkUpsertResult TTableClient::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings) { diff --git a/ydb/public/sdk/cpp/src/client/value/value.cpp b/ydb/public/sdk/cpp/src/client/value/value.cpp index 2516f3a42264..34b489cd7ec6 100644 --- a/ydb/public/sdk/cpp/src/client/value/value.cpp +++ b/ydb/public/sdk/cpp/src/client/value/value.cpp @@ -111,6 +111,10 @@ Ydb::Type& TType::GetProto() return Impl_->ProtoType_; } +Ydb::Type&& TType::ExtractProto() && { + return std::move(Impl_->ProtoType_); +} + //////////////////////////////////////////////////////////////////////////////// class TTypeParser::TImpl { @@ -1046,14 +1050,41 @@ class TValue::TImpl { public: TImpl(const TType& type, const Ydb::Value& valueProto) : Type_(type) - , ProtoValue_(valueProto) {} + , ProtoValue_(valueProto) + , ArenaAllocatedValueProto_(nullptr) {} TImpl(const TType& type, Ydb::Value&& valueProto) : Type_(type) - , ProtoValue_(std::move(valueProto)) {} + , ProtoValue_(std::move(valueProto)) + , ArenaAllocatedValueProto_(nullptr) {} + + /** + * Lifetime of the arena, and hence the `Ydb::Value`, is expected to be managed by the caller. + * The `Ydb::Value` is expected to be arena-allocated. + * + * See: https://protobuf.dev/reference/cpp/arenas + */ + TImpl(const TType& type, Ydb::Value* arenaAllocatedValueProto) + : Type_(type) + , ProtoValue_{} + , ArenaAllocatedValueProto_(arenaAllocatedValueProto) {} + + const Ydb::Value& GetProto() const { + return ArenaAllocatedValueProto_ ? *ArenaAllocatedValueProto_ : ProtoValue_; + } + + Ydb::Value& GetProto() { + return ArenaAllocatedValueProto_ ? *ArenaAllocatedValueProto_ : ProtoValue_; + } + + Ydb::Value&& ExtractProto() && { + return std::move(ArenaAllocatedValueProto_ ? *ArenaAllocatedValueProto_ : ProtoValue_); + } TType Type_; +private: Ydb::Value ProtoValue_; + Ydb::Value* ArenaAllocatedValueProto_; }; //////////////////////////////////////////////////////////////////////////////// @@ -1064,6 +1095,9 @@ TValue::TValue(const TType& type, const Ydb::Value& valueProto) TValue::TValue(const TType& type, Ydb::Value&& valueProto) : Impl_(new TImpl(type, std::move(valueProto))) {} +TValue::TValue(const TType& type, Ydb::Value* arenaAllocatedValueProto) + : Impl_(new TImpl(type, arenaAllocatedValueProto)) {} + const TType& TValue::GetType() const { return Impl_->Type_; } @@ -1073,11 +1107,15 @@ TType & TValue::GetType() { } const Ydb::Value& TValue::GetProto() const { - return Impl_->ProtoValue_; + return Impl_->GetProto(); } Ydb::Value& TValue::GetProto() { - return Impl_->ProtoValue_; + return Impl_->GetProto(); +} + +Ydb::Value&& TValue::ExtractProto() && { + return std::move(std::move(*Impl_).ExtractProto()); } //////////////////////////////////////////////////////////////////////////////// @@ -1104,7 +1142,7 @@ class TValueParser::TImpl { : Value_(value.Impl_) , TypeParser_(value.GetType()) { - Reset(Value_->ProtoValue_); + Reset(Value_->GetProto()); } TImpl(const TType& type) @@ -2781,7 +2819,6 @@ class TValueBuilderImpl { } private: - //TTypeBuilder TypeBuilder_; TTypeBuilder::TImpl TypeBuilder_; Ydb::Value ProtoValue_;