Skip to content

Import YDB C++ SDK 17 #536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/import_generation.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
17
18
2 changes: 1 addition & 1 deletion .github/last_commit.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ca39c1dc5e3592adab111f61e4aaec2021bfa95b
cdad0889aa2f47f9cde997deae6f682eced20873
1 change: 1 addition & 0 deletions .github/workflows/examples.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- main
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
branches:
- main
concurrency:
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/import.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ jobs:
--base ${{ github.ref_name }} \
--head import-pr-$GENERATION \
--title "Import YDB C++ SDK $GENERATION" \
--body "Automatic import of new commits from ydb repository"
--body "Automatic import of new commits from ydb repository" \
--draft
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
3 changes: 2 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- main
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
branches:
- main
concurrency:
Expand Down Expand Up @@ -116,4 +117,4 @@ jobs:
- name: Test
shell: bash
run: |
YDB_VERSION=${{ matrix.ydb-version }} ctest -j$(nproc) --preset integration
YDB_VERSION=${{ matrix.ydb-version }} ctest -j2 --preset integration
6 changes: 0 additions & 6 deletions include/ydb-cpp-sdk/client/proto/accessor.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once

#include "private.h"

#include <src/api/protos/ydb_coordination.pb.h>
#include <src/api/protos/ydb_export.pb.h>
#include <src/api/protos/ydb_import.pb.h>
Expand Down Expand Up @@ -52,10 +50,6 @@ class TProtoAccessor {
static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult);
static const Ydb::Coordination::DescribeNodeResult& GetProto(const NYdb::NCoordination::TNodeDescription& describeNodeResult);
static const Ydb::Import::ListObjectsInS3ExportResult& GetProto(const NYdb::NImport::TListObjectsInS3ExportResult& result);
#ifdef YDB_SDK_INTERNAL_CLIENTS
static const Ydb::Replication::DescribeReplicationResult& GetProto(const NYdb::NReplication::TDescribeReplicationResult& desc);
static const Ydb::View::DescribeViewResult& GetProto(const NYdb::NView::TDescribeViewResult& desc);
#endif

static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats);
static NTable::TTableDescription FromProto(const Ydb::Table::CreateTableRequest& request);
Expand Down
1 change: 1 addition & 0 deletions include/ydb-cpp-sdk/client/scheme/scheme.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enum class ESchemeEntryType : i32 {
View = 20,
ResourcePool = 21,
SysView = 22,
Transfer = 23,
};

struct TVirtualTimestamp {
Expand Down
6 changes: 4 additions & 2 deletions include/ydb-cpp-sdk/client/value/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include <util/datetime/base.h>

#include <google/protobuf/arena.h>

#include <optional>
#include <memory>

Expand Down Expand Up @@ -528,7 +530,7 @@ class TValueBuilderBase : public TMoveOnly {
protected:
TValueBuilderBase(TValueBuilderBase&&);

TValueBuilderBase();
TValueBuilderBase(google::protobuf::Arena* arena = nullptr);

TValueBuilderBase(const TType& type);

Expand All @@ -544,7 +546,7 @@ class TValueBuilderBase : public TMoveOnly {

class TValueBuilder : public TValueBuilderBase<TValueBuilder> {
public:
TValueBuilder();
TValueBuilder(google::protobuf::Arena* arena = nullptr);

TValueBuilder(const TType& type);

Expand Down
1 change: 1 addition & 0 deletions src/api/grpc/draft/ydb_replication_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ option java_package = "com.yandex.ydb.replication.v1";

service ReplicationService {
rpc DescribeReplication(Replication.DescribeReplicationRequest) returns (Replication.DescribeReplicationResponse);
rpc DescribeTransfer(Replication.DescribeTransferRequest) returns (Replication.DescribeTransferResponse);
}
49 changes: 49 additions & 0 deletions src/api/protos/draft/ydb_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,52 @@ message DescribeReplicationResult {
}
}

message DescribeTransferRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Replication path.
string path = 2 [(required) = true];
}

message DescribeTransferResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
}

message DescribeTransferResult {
message RunningState {
}

message ErrorState {
repeated Ydb.Issue.IssueMessage issues = 1;
}

message DoneState {
}

message PausedState {
}

// Description of scheme object.
Ydb.Scheme.Entry self = 1;

ConnectionParams connection_params = 2;

oneof state {
RunningState running = 3;
ErrorState error = 4;
DoneState done = 5;
PausedState paused = 6;
}

string source_path = 7;
string destination_path = 8;
string transformation_lambda = 9;
string consumer_name = 10;

message BatchSettings {
optional uint64 size_bytes = 1;
google.protobuf.Duration flush_interval = 2;
}

optional BatchSettings batch_settings = 11;
}
8 changes: 3 additions & 5 deletions src/client/impl/ydb_internal/retry/retry_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class TRetryWithoutSession : public TRetryContext<TClient, TAsyncStatusType> {
template <typename TClient, typename TOperation, typename TAsyncStatusType = TFunctionResult<TOperation>>
class TRetryWithSession : public TRetryContext<TClient, TAsyncStatusType> {
using TRetryContextAsync = TRetryContext<TClient, TAsyncStatusType>;
using TPtr = typename TRetryContextAsync::TPtr;
using TStatusType = typename TRetryContextAsync::TStatusType;
using TSession = typename TClient::TSession;
using TCreateSessionSettings = typename TClient::TCreateSessionSettings;
Expand All @@ -132,7 +131,7 @@ class TRetryWithSession : public TRetryContext<TClient, TAsyncStatusType> {
{}

void Retry() override {
TPtr self(this);
TIntrusivePtr<TRetryWithSession> self(this);
if (!Session_) {
auto settings = TCreateSessionSettings().ClientTimeout(this->Settings_.GetSessionClientTimeout_);
this->Client_.GetSession(settings).Subscribe(
Expand All @@ -143,9 +142,8 @@ class TRetryWithSession : public TRetryContext<TClient, TAsyncStatusType> {
return TRetryContextAsync::HandleStatusAsync(self, TStatusType(TStatus(result)));
}

auto* myself = dynamic_cast<TRetryWithSession*>(self.Get());
myself->Session_ = result.GetSession();
myself->DoRunOperation(self);
self->Session_ = result.GetSession();
self->DoRunOperation(self);
} catch (...) {
return TRetryContextAsync::HandleExceptionAsync(self, std::current_exception());
}
Expand Down
4 changes: 2 additions & 2 deletions src/client/query/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,8 @@ class TTransaction::TImpl : public std::enable_shared_from_this<TImpl> {

private:
bool ChangesAreAccepted = true; // haven't called Commit or Rollback yet
std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
std::vector<TOnFailureTransactionCallback> OnFailureCallbacks;
mutable std::vector<TPrecommitTransactionCallback> PrecommitCallbacks;
mutable std::vector<TOnFailureTransactionCallback> OnFailureCallbacks;

std::mutex PrecommitCallbacksMutex;
std::mutex OnFailureCallbacksMutex;
Expand Down
2 changes: 2 additions & 0 deletions src/client/scheme/scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ static ESchemeEntryType ConvertProtoEntryType(::Ydb::Scheme::Entry::Type entry)
return ESchemeEntryType::ResourcePool;
case ::Ydb::Scheme::Entry::SYS_VIEW:
return ESchemeEntryType::SysView;
case ::Ydb::Scheme::Entry::TRANSFER:
return ESchemeEntryType::Transfer;
default:
return ESchemeEntryType::Unknown;
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/table/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2508,7 +2508,7 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) {
std::vector<std::string> indexColumns;
std::vector<std::string> dataColumns;
std::vector<TGlobalIndexSettings> globalIndexSettings;
std::variant<std::monostate, TKMeansTreeSettings> specializedIndexSettings;
std::variant<std::monostate, TKMeansTreeSettings> specializedIndexSettings = std::monostate{};

indexColumns.assign(proto.index_columns().begin(), proto.index_columns().end());
dataColumns.assign(proto.data_columns().begin(), proto.data_columns().end());
Expand Down
2 changes: 1 addition & 1 deletion src/client/topic/impl/direct_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <src/api/grpc/ydb_topic_v1.grpc.pb.h>


namespace NYdb::NTopic {
namespace NYdb::inline V3::NTopic {

TDirectReadClientMessage TDirectReadPartitionSession::MakeStartRequest() const {
TDirectReadClientMessage req;
Expand Down
2 changes: 1 addition & 1 deletion src/client/topic/impl/direct_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <util/thread/lfqueue.h>


namespace NYdb::NTopic {
namespace NYdb::inline V3::NTopic {

template <bool UseMigrationProtocol>
class TDeferredActions;
Expand Down
2 changes: 1 addition & 1 deletion src/client/topic/impl/read_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ struct THash<NYdb::NTopic::TPartitionStreamImpl<true>::TKey> {
}
};

namespace NYdb::NTopic {
namespace NYdb::inline V3::NTopic {

// Read session for single cluster.
// This class holds only read session logic.
Expand Down
6 changes: 3 additions & 3 deletions src/client/topic/impl/read_session_impl.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -2183,6 +2183,9 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TrySubscribeOnTransact
return;
}

txInfo->IsActive = true;
txInfo->Subscribed = true;

auto callback = [cbContext = this->SelfContext, txId, txInfo, consumer = Settings.ConsumerName_, client]() {
std::vector<TTopicOffsets> offsets;

Expand All @@ -2205,9 +2208,6 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::TrySubscribeOnTransact
};

tx.AddPrecommitCallback(std::move(callback));

txInfo->IsActive = true;
txInfo->Subscribed = true;
}
}

Expand Down
34 changes: 17 additions & 17 deletions src/client/topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,31 +554,31 @@ void TWriteSessionImpl::TrySubscribeOnTransactionCommit(TTransactionBase* tx)
txInfo->IsActive = true;
txInfo->Subscribed = true;
txInfo->AllAcksReceived = NThreading::NewPromise<TStatus>();
}

auto callback = [cbContext = this->SelfContext, txId, txInfo]() {
with_lock(txInfo->Lock) {
Y_ABORT_UNLESS(!txInfo->CommitCalled);
auto callback = [cbContext = this->SelfContext, txId, txInfo]() {
with_lock(txInfo->Lock) {
Y_ABORT_UNLESS(!txInfo->CommitCalled);

txInfo->CommitCalled = true;
txInfo->CommitCalled = true;

if (txInfo->WriteCount == txInfo->AckCount) {
txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
if (auto self = cbContext->LockShared()) {
self->DeleteTx(txId);
if (txInfo->WriteCount == txInfo->AckCount) {
txInfo->AllAcksReceived.SetValue(MakeCommitTransactionSuccess());
if (auto self = cbContext->LockShared()) {
self->DeleteTx(txId);
}
return txInfo->AllAcksReceived.GetFuture();
}
return txInfo->AllAcksReceived.GetFuture();
}

if (txInfo->IsActive) {
return txInfo->AllAcksReceived.GetFuture();
if (txInfo->IsActive) {
return txInfo->AllAcksReceived.GetFuture();
}
}
}

return NThreading::MakeFuture(MakeSessionExpiredError());
};
return NThreading::MakeFuture(MakeSessionExpiredError());
};

tx->AddPrecommitCallback(std::move(callback));
tx->AddPrecommitCallback(std::move(callback));
}
}

void TWriteSessionImpl::TrySignalAllAcksReceived(ui64 seqNo)
Expand Down
42 changes: 33 additions & 9 deletions src/client/value/value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2061,19 +2061,33 @@ class TValueBuilderImpl {
public:
TValueBuilderImpl()
: TypeBuilder_()
, Arena(nullptr)
, ProtoValue_(ProtoValueHeap)
{
PushPath(ProtoValue_);
}

TValueBuilderImpl(google::protobuf::Arena* arena)
: TypeBuilder_()
, Arena(arena)
, ProtoValue_(*google::protobuf::Arena::CreateMessage<Ydb::Value>(Arena))
{
PushPath(ProtoValue_);
}

TValueBuilderImpl(const TType& type)
: TypeBuilder_()
, Arena(nullptr)
, ProtoValue_(ProtoValueHeap)
{
PushPath(ProtoValue_);
GetType().CopyFrom(type.GetProto());
}

TValueBuilderImpl(Ydb::Type& type, Ydb::Value& value)
: TypeBuilder_(type)
, Arena(nullptr)
, ProtoValue_(ProtoValueHeap)
{
PushPath(value);
}
Expand All @@ -2088,10 +2102,15 @@ class TValueBuilderImpl {
TValue BuildValue() {
CheckValue();

Ydb::Value value;
value.Swap(&ProtoValue_);

return TValue(TypeBuilder_.Build(), std::move(value));
if (Arena) {
auto* value = google::protobuf::Arena::CreateMessage<Ydb::Value>(Arena);
value->Swap(&ProtoValue_);
return TValue(TypeBuilder_.Build(), value);
} else {
Ydb::Value value;
value.Swap(&ProtoValue_);
return TValue(TypeBuilder_.Build(), std::move(value));
}
}

void Bool(bool value) {
Expand Down Expand Up @@ -2803,7 +2822,12 @@ class TValueBuilderImpl {
private:
//TTypeBuilder TypeBuilder_;
TTypeBuilder::TImpl TypeBuilder_;
Ydb::Value ProtoValue_;
google::protobuf::Arena* Arena;
Ydb::Value ProtoValueHeap;

// either ProtoValueHeap or a reference to the arena allocated protobuf
Ydb::Value& ProtoValue_;

std::map<const Ydb::StructType*, TMembersMap> StructsMap_;

TStackVec<TProtoPosition, 8> Path_;
Expand All @@ -2819,8 +2843,8 @@ template<typename TDerived>
TValueBuilderBase<TDerived>::~TValueBuilderBase() = default;

template<typename TDerived>
TValueBuilderBase<TDerived>::TValueBuilderBase()
: Impl_(new TValueBuilderImpl()) {}
TValueBuilderBase<TDerived>::TValueBuilderBase(google::protobuf::Arena* arena)
: Impl_(new TValueBuilderImpl(arena)) {}

template<typename TDerived>
TValueBuilderBase<TDerived>::TValueBuilderBase(const TType& type)
Expand Down Expand Up @@ -3382,8 +3406,8 @@ template class TValueBuilderBase<TParamValueBuilder>;

////////////////////////////////////////////////////////////////////////////////

TValueBuilder::TValueBuilder()
: TValueBuilderBase() {}
TValueBuilder::TValueBuilder(google::protobuf::Arena* arena)
: TValueBuilderBase(arena) {}

TValueBuilder::TValueBuilder(const TType& type)
: TValueBuilderBase(type) {}
Expand Down
Loading
Loading