Skip to content

Commit 76f4ca6

Browse files
eivanov89github-actions[bot]
authored andcommitted
Speedup bulkupsert (#17333) (#19793)
1 parent 2132566 commit 76f4ca6

File tree

8 files changed

+32
-21
lines changed

8 files changed

+32
-21
lines changed

.github/last_commit.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
71f3186a075dd5aa34e2840ef29fc3958a89d2fa
1+
81ac0de27c1d3727dcc4ef3a2f2581f441e57e41

include/ydb-cpp-sdk/client/value/value.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,17 @@ struct TUuidValue {
262262
} Buf_;
263263
};
264264

265+
namespace NTable {
266+
267+
class TTableClient;
268+
269+
} // namespace NTable
270+
265271
//! Representation of YDB value.
266272
class TValue {
267273
friend class TValueParser;
268274
friend class TProtoAccessor;
275+
friend class ::NYdb::Dev::NTable::TTableClient;
269276
public:
270277
TValue(const TType& type, const Ydb::Value& valueProto);
271278
TValue(const TType& type, Ydb::Value&& valueProto);

include/ydb-cpp-sdk/library/retry/retry_policy.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#pragma once
22

33
#include <util/datetime/base.h>
4-
#include <util/generic/maybe.h>
54
#include <util/generic/typetraits.h>
65
#include <util/random/random.h>
76

@@ -42,7 +41,7 @@ struct IRetryPolicy {
4241

4342
//! Calculate delay before next retry if next retry is allowed.
4443
//! Returns empty maybe if retry is not allowed anymore.
45-
[[nodiscard]] virtual TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) = 0;
44+
[[nodiscard]] virtual std::optional<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) = 0;
4645
};
4746

4847
virtual ~IRetryPolicy() = default;
@@ -82,8 +81,8 @@ struct TNoRetryPolicy : IRetryPolicy<TArgs...> {
8281
using IRetryState = typename IRetryPolicy<TArgs...>::IRetryState;
8382

8483
struct TNoRetryState : IRetryState {
85-
TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam...) override {
86-
return {};
84+
std::optional<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam...) override {
85+
return std::nullopt;
8786
}
8887
};
8988

@@ -124,10 +123,10 @@ struct TExponentialBackoffPolicy : IRetryPolicy<TArgs...> {
124123
{
125124
}
126125

127-
TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override {
126+
std::optional<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override {
128127
const ERetryErrorClass errorClass = RetryClassFunction(args...);
129128
if (errorClass == ERetryErrorClass::NoRetry || AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime) {
130-
return {};
129+
return std::nullopt;
131130
}
132131

133132
if (errorClass == ERetryErrorClass::LongRetry) {
@@ -213,10 +212,10 @@ struct TFixedIntervalPolicy : IRetryPolicy<TArgs...> {
213212
{
214213
}
215214

216-
TMaybe<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override {
215+
std::optional<TDuration> GetNextRetryDelay(typename TTypeTraits<TArgs>::TFuncParam... args) override {
217216
const ERetryErrorClass errorClass = RetryClassFunction(args...);
218217
if (errorClass == ERetryErrorClass::NoRetry || AttemptsDone >= MaxRetries || StartTime && TInstant::Now() - StartTime >= MaxTime) {
219-
return {};
218+
return std::nullopt;
220219
}
221220

222221
const TDuration delay = NRetryDetails::RandomizeDelay(errorClass == ERetryErrorClass::LongRetry ? LongRetryDelay : Delay);

src/client/table/impl/table_client.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -989,11 +989,16 @@ void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TCli
989989
SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing);
990990
}
991991

992-
TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings) {
992+
TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings, bool canMove) {
993993
auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings);
994994
request.set_table(TStringType{table});
995-
*request.mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType());
996-
*request.mutable_rows()->mutable_value() = rows.GetProto();
995+
if (canMove) {
996+
request.mutable_rows()->mutable_type()->Swap(&rows.GetType().GetProto());
997+
request.mutable_rows()->mutable_value()->Swap(&rows.GetProto());
998+
} else {
999+
*request.mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType());
1000+
*request.mutable_rows()->mutable_value() = rows.GetProto();
1001+
}
9971002

9981003
auto promise = NewPromise<TBulkUpsertResult>();
9991004

src/client/table/impl/table_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class TTableClient::TImpl: public TClientImplCommon<TTableClient::TImpl>, public
138138

139139
void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector);
140140

141-
TAsyncBulkUpsertResult BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings);
141+
TAsyncBulkUpsertResult BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings, bool canMove);
142142
TAsyncBulkUpsertResult BulkUpsert(const std::string& table, EDataFormat format,
143143
const std::string& data, const std::string& schema, const TBulkUpsertSettings& settings);
144144

src/client/table/table.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1460,7 +1460,7 @@ NThreading::TFuture<void> TTableClient::Stop() {
14601460
TAsyncBulkUpsertResult TTableClient::BulkUpsert(const std::string& table, TValue&& rows,
14611461
const TBulkUpsertSettings& settings)
14621462
{
1463-
return Impl_->BulkUpsert(table, std::move(rows), settings);
1463+
return Impl_->BulkUpsert(table, std::move(rows), settings, rows.Impl_.use_count() == 1);
14641464
}
14651465

14661466
TAsyncBulkUpsertResult TTableClient::BulkUpsert(const std::string& table, EDataFormat format,

src/library/retry/retry.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ class TRetryOptionsWithRetCodePolicy : public IRetryPolicy<bool> {
1717
{
1818
}
1919

20-
TMaybe<TDuration> GetNextRetryDelay(bool ret) override {
20+
std::optional<TDuration> GetNextRetryDelay(bool ret) override {
2121
if (ret || Attempt == Opts.RetryCount) {
22-
return {};
22+
return std::nullopt;
2323
}
2424
return Opts.GetTimeToSleep(Attempt++);
2525
}

src/library/retry/retry.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ class TRetryOptionsPolicy : public IRetryPolicy<const TException&> {
104104
{
105105
}
106106

107-
TMaybe<TDuration> GetNextRetryDelay(const TException&) override {
107+
std::optional<TDuration> GetNextRetryDelay(const TException&) override {
108108
if (Attempt == Opts.RetryCount) {
109-
return {};
109+
return std::nullopt;
110110
}
111111
return Opts.GetTimeToSleep(Attempt++);
112112
}
@@ -151,7 +151,7 @@ std::optional<TResult> DoWithRetry(std::function<TResult()> func, const typename
151151
retryState = retryPolicy->CreateRetryState();
152152
}
153153

154-
if (const TMaybe<TDuration> delay = retryState->GetNextRetryDelay(ex)) {
154+
if (const std::optional<TDuration> delay = retryState->GetNextRetryDelay(ex)) {
155155
if (*delay) {
156156
if (sleepFunction) {
157157
sleepFunction(*delay);
@@ -167,7 +167,7 @@ std::optional<TResult> DoWithRetry(std::function<TResult()> func, const typename
167167
}
168168
}
169169
}
170-
return {};
170+
return std::nullopt;
171171
}
172172

173173
template <typename TResult, typename TException = yexception>
@@ -204,7 +204,7 @@ TRetCode DoWithRetryOnRetCode(std::function<TRetCode()> func, const typename IRe
204204
auto retryState = retryPolicy->CreateRetryState();
205205
while (true) {
206206
TRetCode code = func();
207-
if (const TMaybe<TDuration> delay = retryState->GetNextRetryDelay(code)) {
207+
if (const std::optional<TDuration> delay = retryState->GetNextRetryDelay(code)) {
208208
if (*delay) {
209209
if (sleepFunction) {
210210
sleepFunction(*delay);

0 commit comments

Comments
 (0)