Skip to content

Commit e0ce471

Browse files
pnv1github-actions[bot]
authored andcommitted
Add Arena option to BulkUpsert, add hidden --send-format option (#20061)
1 parent 032916a commit e0ce471

File tree

7 files changed

+141
-38
lines changed

7 files changed

+141
-38
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
6072bfcb65e5be2c6644af8a42112093ca50882b
1+
d6db40f951eff2a39fbf9e4ceb97db518fda4fea

include/ydb-cpp-sdk/client/table/table.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,6 +1162,8 @@ struct TBulkUpsertSettings : public TOperationRequestSettings<TBulkUpsertSetting
11621162
// Format setting proto serialized into string. If not set format defaults are used.
11631163
// I.e. it's Ydb.Table.CsvSettings for CSV.
11641164
FLUENT_SETTING_DEFAULT(std::string, FormatSettings, "");
1165+
google::protobuf::Arena* Arena_ = nullptr;
1166+
TBulkUpsertSettings& Arena(google::protobuf::Arena* arena) { Arena_ = arena; return *this; }
11651167
};
11661168

11671169
struct TReadRowsSettings : public TOperationRequestSettings<TReadRowsSettings> {

include/ydb-cpp-sdk/client/value/value.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,13 +276,19 @@ class TValue {
276276
public:
277277
TValue(const TType& type, const Ydb::Value& valueProto);
278278
TValue(const TType& type, Ydb::Value&& valueProto);
279+
/**
280+
* Lifetime of the arena, and hence the `Ydb::Value`, is expected to be managed by the caller.
281+
* The `Ydb::Value` is expected to be arena-allocated.
282+
*
283+
* See: https://protobuf.dev/reference/cpp/arenas
284+
*/
285+
TValue(const TType& type, Ydb::Value* arenaAllocatedValueProto);
279286

280287
const TType& GetType() const;
281-
TType & GetType();
288+
TType& GetType();
282289

283290
const Ydb::Value& GetProto() const;
284291
Ydb::Value& GetProto();
285-
286292
private:
287293
class TImpl;
288294
std::shared_ptr<TImpl> Impl_;

src/client/impl/ydb_internal/grpc_connections/grpc_connections.h

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,55 @@ class TGRpcConnectionsImpl
140140
TRequest,
141141
TResponse>::TAsyncRequest;
142142

143+
template<typename TRequest>
144+
class TRequestWrapper {
145+
public:
146+
// Implicit conversion from rvalue reference
147+
TRequestWrapper(TRequest&& request)
148+
: Storage_(std::move(request))
149+
{}
150+
151+
// Implicit conversion from pointer. Means that request is allocated on Arena
152+
TRequestWrapper(TRequest* request)
153+
: Storage_(request)
154+
{}
155+
156+
// Copy constructor
157+
TRequestWrapper(const TRequestWrapper& other) = default;
158+
159+
// Move constructor
160+
TRequestWrapper(TRequestWrapper&& other) = default;
161+
162+
// Copy assignment
163+
TRequestWrapper& operator=(const TRequestWrapper& other) = default;
164+
165+
// Move assignment
166+
TRequestWrapper& operator=(TRequestWrapper&& other) = default;
167+
168+
template<typename TService, typename TResponse>
169+
void DoRequest(
170+
std::unique_ptr<TServiceConnection<TService>>& serviceConnection,
171+
NYdbGrpc::TAdvancedResponseCallback<TResponse>&& responseCbLow,
172+
typename NYdbGrpc::TSimpleRequestProcessor<typename TService::Stub, TRequest, TResponse>::TAsyncRequest rpc,
173+
const TCallMeta& meta,
174+
IQueueClientContext* context)
175+
{
176+
if (auto ptr = std::get_if<TRequest*>(&Storage_)) {
177+
serviceConnection->DoAdvancedRequest(**ptr,
178+
std::move(responseCbLow), rpc, meta, context);
179+
} else {
180+
serviceConnection->DoAdvancedRequest(std::move(std::get<TRequest>(Storage_)),
181+
std::move(responseCbLow), rpc, meta, context);
182+
}
183+
}
184+
185+
private:
186+
std::variant<TRequest*, TRequest> Storage_;
187+
};
188+
143189
template<typename TService, typename TRequest, typename TResponse>
144190
void Run(
145-
TRequest&& request,
191+
TRequestWrapper<TRequest>&& requestWrapper,
146192
TResponseCb<TResponse>&& userResponseCb,
147193
TSimpleRpc<TService, TRequest, TResponse> rpc,
148194
TDbDriverStatePtr dbState,
@@ -174,7 +220,8 @@ class TGRpcConnectionsImpl
174220
}
175221

176222
WithServiceConnection<TService>(
177-
[this, request = std::move(request), userResponseCb = std::move(userResponseCb), rpc, requestSettings, context = std::move(context), dbState]
223+
[this, requestWrapper = std::move(requestWrapper), userResponseCb = std::move(userResponseCb), rpc,
224+
requestSettings, context = std::move(context), dbState]
178225
(TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable -> void {
179226
if (!status.Ok()) {
180227
userResponseCb(
@@ -271,14 +318,13 @@ class TGRpcConnectionsImpl
271318
}
272319
};
273320

274-
serviceConnection->DoAdvancedRequest(std::move(request), std::move(responseCbLow), rpc, meta,
275-
context.get());
321+
requestWrapper.DoRequest(serviceConnection, std::move(responseCbLow), rpc, meta, context.get());
276322
}, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy);
277323
}
278324

279325
template<typename TService, typename TRequest, typename TResponse>
280326
void RunDeferred(
281-
TRequest&& request,
327+
TRequestWrapper<TRequest>&& requestWrapper,
282328
TDeferredOperationCb&& userResponseCb,
283329
TSimpleRpc<TService, TRequest, TResponse> rpc,
284330
TDbDriverStatePtr dbState,
@@ -321,7 +367,7 @@ class TGRpcConnectionsImpl
321367
};
322368

323369
Run<TService, TRequest, TResponse>(
324-
std::move(request),
370+
std::move(requestWrapper),
325371
responseCb,
326372
rpc,
327373
dbState,
@@ -357,7 +403,7 @@ class TGRpcConnectionsImpl
357403

358404
template<typename TService, typename TRequest, typename TResponse>
359405
void RunDeferred(
360-
TRequest&& request,
406+
TRequestWrapper<TRequest>&& requestWrapper,
361407
TDeferredResultCb&& userResponseCb,
362408
TSimpleRpc<TService, TRequest, TResponse> rpc,
363409
TDbDriverStatePtr dbState,
@@ -375,7 +421,7 @@ class TGRpcConnectionsImpl
375421
};
376422

377423
RunDeferred<TService, TRequest, TResponse>(
378-
std::move(request),
424+
std::move(requestWrapper),
379425
operationCb,
380426
rpc,
381427
dbState,

src/client/impl/ydb_internal/make_request/make.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,18 @@ TProtoRequest MakeOperationRequest(const TRequestSettings& settings) {
4646
return request;
4747
}
4848

49+
50+
template <typename TProtoRequest>
51+
TProtoRequest* MakeRequestOnArena(google::protobuf::Arena* arena) {
52+
return google::protobuf::Arena::CreateMessage<TProtoRequest>(arena);
53+
}
54+
55+
template <typename TProtoRequest, typename TRequestSettings>
56+
TProtoRequest* MakeOperationRequestOnArena(const TRequestSettings& settings, google::protobuf::Arena* arena) {
57+
Y_ASSERT(arena != nullptr);
58+
auto request = MakeRequestOnArena<TProtoRequest>(arena);
59+
FillOperationParams(settings, *request);
60+
return request;
61+
}
62+
4963
} // namespace NYdb

src/client/table/impl/table_client.cpp

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -990,33 +990,49 @@ void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TCli
990990
}
991991

992992
TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings, bool canMove) {
993-
auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings);
994-
request.set_table(TStringType{table});
993+
Ydb::Table::BulkUpsertRequest* request = nullptr;
994+
std::unique_ptr<Ydb::Table::BulkUpsertRequest> holder;
995+
996+
if (settings.Arena_) {
997+
request = MakeOperationRequestOnArena<Ydb::Table::BulkUpsertRequest>(settings, settings.Arena_);
998+
} else {
999+
holder = std::make_unique<Ydb::Table::BulkUpsertRequest>(MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings));
1000+
request = holder.get();
1001+
}
1002+
1003+
request->set_table(TStringType{table});
9951004
if (canMove) {
996-
request.mutable_rows()->mutable_type()->Swap(&rows.GetType().GetProto());
997-
request.mutable_rows()->mutable_value()->Swap(&rows.GetProto());
1005+
request->mutable_rows()->mutable_type()->Swap(&rows.GetType().GetProto());
1006+
request->mutable_rows()->mutable_value()->Swap(&rows.GetProto());
9981007
} else {
999-
*request.mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType());
1000-
*request.mutable_rows()->mutable_value() = rows.GetProto();
1008+
*request->mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType());
1009+
*request->mutable_rows()->mutable_value() = rows.GetProto();
10011010
}
10021011

10031012
auto promise = NewPromise<TBulkUpsertResult>();
1013+
auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable {
1014+
Y_UNUSED(any);
1015+
TBulkUpsertResult val(TStatus(std::move(status)));
1016+
promise.SetValue(std::move(val));
1017+
};
10041018

1005-
auto extractor = [promise]
1006-
(google::protobuf::Any* any, TPlainStatus status) mutable {
1007-
Y_UNUSED(any);
1008-
TBulkUpsertResult val(TStatus(std::move(status)));
1009-
promise.SetValue(std::move(val));
1010-
};
1011-
1012-
Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
1013-
std::move(request),
1014-
extractor,
1015-
&Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
1016-
DbDriverState_,
1017-
INITIAL_DEFERRED_CALL_DELAY,
1018-
TRpcRequestSettings::Make(settings));
1019-
1019+
if (settings.Arena_) {
1020+
Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
1021+
request,
1022+
extractor,
1023+
&Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
1024+
DbDriverState_,
1025+
INITIAL_DEFERRED_CALL_DELAY,
1026+
TRpcRequestSettings::Make(settings));
1027+
} else {
1028+
Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
1029+
std::move(*holder),
1030+
extractor,
1031+
&Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
1032+
DbDriverState_,
1033+
INITIAL_DEFERRED_CALL_DELAY,
1034+
TRpcRequestSettings::Make(settings));
1035+
}
10201036
return promise.GetFuture();
10211037
}
10221038

src/client/value/value.cpp

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,14 +1046,31 @@ class TValue::TImpl {
10461046
public:
10471047
TImpl(const TType& type, const Ydb::Value& valueProto)
10481048
: Type_(type)
1049-
, ProtoValue_(valueProto) {}
1049+
, ProtoValue_(valueProto)
1050+
, ArenaAllocatedValueProto_(nullptr) {}
10501051

10511052
TImpl(const TType& type, Ydb::Value&& valueProto)
10521053
: Type_(type)
1053-
, ProtoValue_(std::move(valueProto)) {}
1054+
, ProtoValue_(std::move(valueProto))
1055+
, ArenaAllocatedValueProto_(nullptr) {}
1056+
1057+
TImpl(const TType& type, Ydb::Value* arenaAllocatedValueProto)
1058+
: Type_(type)
1059+
, ProtoValue_{}
1060+
, ArenaAllocatedValueProto_(arenaAllocatedValueProto) {}
1061+
1062+
const Ydb::Value& GetProto() const {
1063+
return ArenaAllocatedValueProto_ ? *ArenaAllocatedValueProto_ : ProtoValue_;
1064+
}
1065+
1066+
Ydb::Value& GetProto() {
1067+
return ArenaAllocatedValueProto_ ? *ArenaAllocatedValueProto_ : ProtoValue_;
1068+
}
10541069

10551070
TType Type_;
1071+
private:
10561072
Ydb::Value ProtoValue_;
1073+
Ydb::Value* ArenaAllocatedValueProto_;
10571074
};
10581075

10591076
////////////////////////////////////////////////////////////////////////////////
@@ -1064,6 +1081,9 @@ TValue::TValue(const TType& type, const Ydb::Value& valueProto)
10641081
TValue::TValue(const TType& type, Ydb::Value&& valueProto)
10651082
: Impl_(new TImpl(type, std::move(valueProto))) {}
10661083

1084+
TValue::TValue(const TType& type, Ydb::Value* arenaAllocatedValueProto)
1085+
: Impl_(new TImpl(type, arenaAllocatedValueProto)) {}
1086+
10671087
const TType& TValue::GetType() const {
10681088
return Impl_->Type_;
10691089
}
@@ -1073,11 +1093,11 @@ TType & TValue::GetType() {
10731093
}
10741094

10751095
const Ydb::Value& TValue::GetProto() const {
1076-
return Impl_->ProtoValue_;
1096+
return Impl_->GetProto();
10771097
}
10781098

10791099
Ydb::Value& TValue::GetProto() {
1080-
return Impl_->ProtoValue_;
1100+
return Impl_->GetProto();
10811101
}
10821102

10831103
////////////////////////////////////////////////////////////////////////////////
@@ -1104,7 +1124,7 @@ class TValueParser::TImpl {
11041124
: Value_(value.Impl_)
11051125
, TypeParser_(value.GetType())
11061126
{
1107-
Reset(Value_->ProtoValue_);
1127+
Reset(Value_->GetProto());
11081128
}
11091129

11101130
TImpl(const TType& type)
@@ -2781,7 +2801,6 @@ class TValueBuilderImpl {
27812801
}
27822802

27832803
private:
2784-
27852804
//TTypeBuilder TypeBuilder_;
27862805
TTypeBuilder::TImpl TypeBuilder_;
27872806
Ydb::Value ProtoValue_;

0 commit comments

Comments
 (0)