Skip to content

Import YDB C++ SDK 14 #528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/import_generation.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
14
15
2 changes: 1 addition & 1 deletion .github/last_commit.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
71f3186a075dd5aa34e2840ef29fc3958a89d2fa
9a3ba4fbaa4d0b2d6dcff910256db11b3c909166
2 changes: 1 addition & 1 deletion CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
"outputOnFailure": true
},
"execution": {
"timeout": 1200
"timeout": 600
}
},
{
Expand Down
2 changes: 1 addition & 1 deletion include/ydb-cpp-sdk/client/iam/common/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct TIamEndpoint {
TDuration RefreshPeriod = NIam::DEFAULT_REFRESH_PERIOD;
TDuration RequestTimeout = NIam::DEFAULT_REQUEST_TIMEOUT;
bool EnableSsl = NIam::DEFAULT_ENABLE_SSL;
std::string CaCerts;
std::string CaCerts = "";
};

struct TIamJwtFilename : TIamEndpoint { std::string JwtFilename; };
Expand Down
4 changes: 0 additions & 4 deletions include/ydb-cpp-sdk/client/query/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ struct TClientSettings : public TCommonClientSettingsBase<TClientSettings> {
FLUENT_SETTING(TSessionPoolSettings, SessionPoolSettings);
};

// ! WARNING: Experimental API
// ! This API is currently in experimental state and is a subject for changes.
// ! No backward and/or forward compatibility guarantees are provided.
// ! DO NOT USE for production workloads.
class TQueryClient {
friend class TSession;
friend class NRetry::Async::TRetryContext<TQueryClient, TAsyncExecuteQueryResult>;
Expand Down
1 change: 1 addition & 0 deletions include/ydb-cpp-sdk/client/scheme/scheme.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ enum class ESchemeEntryType : i32 {
ExternalDataSource = 19,
View = 20,
ResourcePool = 21,
SysView = 22,
};

struct TVirtualTimestamp {
Expand Down
2 changes: 2 additions & 0 deletions include/ydb-cpp-sdk/client/table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,8 @@ struct TBulkUpsertSettings : public TOperationRequestSettings<TBulkUpsertSetting
// Format setting proto serialized into string. If not set format defaults are used.
// I.e. it's Ydb.Table.CsvSettings for CSV.
FLUENT_SETTING_DEFAULT(std::string, FormatSettings, "");
google::protobuf::Arena* Arena_ = nullptr;
TBulkUpsertSettings& Arena(google::protobuf::Arena* arena) { Arena_ = arena; return *this; }
};

struct TReadRowsSettings : public TOperationRequestSettings<TReadRowsSettings> {
Expand Down
17 changes: 15 additions & 2 deletions include/ydb-cpp-sdk/client/value/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,33 @@ struct TUuidValue {
} Buf_;
};

namespace NTable {

class TTableClient;

} // namespace NTable

//! Representation of YDB value.
class TValue {
friend class TValueParser;
friend class TProtoAccessor;
friend class NTable::TTableClient;
public:
TValue(const TType& type, const Ydb::Value& valueProto);
TValue(const TType& type, Ydb::Value&& valueProto);
/**
* Lifetime of the arena, and hence the `Ydb::Value`, is expected to be managed by the caller.
* The `Ydb::Value` is expected to be arena-allocated.
*
* See: https://protobuf.dev/reference/cpp/arenas
*/
TValue(const TType& type, Ydb::Value* arenaAllocatedValueProto);

const TType& GetType() const;
TType & GetType();
TType& GetType();

const Ydb::Value& GetProto() const;
Ydb::Value& GetProto();

private:
class TImpl;
std::shared_ptr<TImpl> Impl_;
Expand Down
22 changes: 22 additions & 0 deletions src/api/grpc/draft/ydb_bridge_v1.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
syntax = "proto3";

package Ydb.Bridge.V1;

option java_package = "com.yandex.ydb.bridge.v1";
option java_outer_classname = "BridgeGrpc";
option java_multiple_files = true;

import "src/api/protos/draft/ydb_bridge.proto";

// Service for managing cluster in bridge mode
service BridgeService {

// Get current cluster state
// Useful for monitoring and decision-making before state changes
rpc GetClusterState(Ydb.Bridge.GetClusterStateRequest) returns (Ydb.Bridge.GetClusterStateResponse);

// Update cluster state by providing a list of desired pile states
// This is the main operation for failover and promotion scenarios
rpc UpdateClusterState(Ydb.Bridge.UpdateClusterStateRequest) returns (Ydb.Bridge.UpdateClusterStateResponse);

}
3 changes: 3 additions & 0 deletions src/api/protos/draft/persqueue_error_codes.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,8 @@ enum EErrorCode {

PRECONDITION_FAILED = 31;

KAFKA_INVALID_PRODUCER_EPOCH = 32;
KAFKA_OUT_OF_ORDER_SEQUENCE_NUMBER = 33;

ERROR = 100;
}
51 changes: 51 additions & 0 deletions src/api/protos/draft/ydb_bridge.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
syntax = "proto3";
option cc_enable_arenas = true;

package Ydb.Bridge;

option java_package = "com.yandex.ydb.bridge.proto";
option java_outer_classname = "BridgeProtos";
option java_multiple_files = true;

import "src/api/protos/ydb_operation.proto";

// State of a pile in relation to primary
enum PileState {
DISCONNECTED = 0; // disconnected from the cluster (no connectivity to other piles)
NOT_SYNCHRONIZED = 1; // not synchronized with primary, cannot be promoted
SYNCHRONIZED = 2; // fully synchronized with primary, may be promoted
PROMOTE = 3; // pile is being promoted to primary
PRIMARY = 4; // pile is primary
}

// Pair of pile id and pile state
message PileStateUpdate {
uint32 pile_id = 1;
PileState state = 2;
}

message GetClusterStateRequest {
Ydb.Operations.OperationParams operation_params = 1;
}

message GetClusterStateResponse {
Ydb.Operations.Operation operation = 1;
}

message GetClusterStateResult {
// Current cluster state
repeated PileStateUpdate per_pile_state = 1;
}

message UpdateClusterStateRequest {
Ydb.Operations.OperationParams operation_params = 1;
// List of desired pile states to update
repeated PileStateUpdate updates = 2;
}

message UpdateClusterStateResponse {
Ydb.Operations.Operation operation = 1;
}

message UpdateClusterStateResult {
}
1 change: 1 addition & 0 deletions src/api/protos/draft/ydb_maintenance.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ message Node {
// version defines YDB version for current Node.
// For example, 'ydb-stable-24-1'.
string version = 9;
uint32 pile_id = 10;
}

message ListClusterNodesRequest {
Expand Down
2 changes: 1 addition & 1 deletion src/client/federated_topic/ut/basic_usage_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <ydb-cpp-sdk/client/federated_topic/federated_topic.h>
#include <src/client/federated_topic/impl/federated_write_session.h>

#include <src/client/topic/ut/ut_utils/managed_executor.h>
#include <tests/integration/topic/utils/managed_executor.h>

#include <src/client/persqueue_public/persqueue.h>

Expand Down
62 changes: 54 additions & 8 deletions src/client/impl/ydb_internal/grpc_connections/grpc_connections.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,55 @@ class TGRpcConnectionsImpl
TRequest,
TResponse>::TAsyncRequest;

template<typename TRequest>
class TRequestWrapper {
public:
// Implicit conversion from rvalue reference
TRequestWrapper(TRequest&& request)
: Storage_(std::move(request))
{}

// Implicit conversion from pointer. Means that request is allocated on Arena
TRequestWrapper(TRequest* request)
: Storage_(request)
{}

// Copy constructor
TRequestWrapper(const TRequestWrapper& other) = default;

// Move constructor
TRequestWrapper(TRequestWrapper&& other) = default;

// Copy assignment
TRequestWrapper& operator=(const TRequestWrapper& other) = default;

// Move assignment
TRequestWrapper& operator=(TRequestWrapper&& other) = default;

template<typename TService, typename TResponse>
void DoRequest(
std::unique_ptr<TServiceConnection<TService>>& serviceConnection,
NYdbGrpc::TAdvancedResponseCallback<TResponse>&& responseCbLow,
typename NYdbGrpc::TSimpleRequestProcessor<typename TService::Stub, TRequest, TResponse>::TAsyncRequest rpc,
const TCallMeta& meta,
IQueueClientContext* context)
{
if (auto ptr = std::get_if<TRequest*>(&Storage_)) {
serviceConnection->DoAdvancedRequest(**ptr,
std::move(responseCbLow), rpc, meta, context);
} else {
serviceConnection->DoAdvancedRequest(std::move(std::get<TRequest>(Storage_)),
std::move(responseCbLow), rpc, meta, context);
}
}

private:
std::variant<TRequest*, TRequest> Storage_;
};

template<typename TService, typename TRequest, typename TResponse>
void Run(
TRequest&& request,
TRequestWrapper<TRequest>&& requestWrapper,
TResponseCb<TResponse>&& userResponseCb,
TSimpleRpc<TService, TRequest, TResponse> rpc,
TDbDriverStatePtr dbState,
Expand Down Expand Up @@ -174,7 +220,8 @@ class TGRpcConnectionsImpl
}

WithServiceConnection<TService>(
[this, request = std::move(request), userResponseCb = std::move(userResponseCb), rpc, requestSettings, context = std::move(context), dbState]
[this, requestWrapper = std::move(requestWrapper), userResponseCb = std::move(userResponseCb), rpc,
requestSettings, context = std::move(context), dbState]
(TPlainStatus status, TConnection serviceConnection, TEndpointKey endpoint) mutable -> void {
if (!status.Ok()) {
userResponseCb(
Expand Down Expand Up @@ -271,14 +318,13 @@ class TGRpcConnectionsImpl
}
};

serviceConnection->DoAdvancedRequest(std::move(request), std::move(responseCbLow), rpc, meta,
context.get());
requestWrapper.DoRequest(serviceConnection, std::move(responseCbLow), rpc, meta, context.get());
}, dbState, requestSettings.PreferredEndpoint, requestSettings.EndpointPolicy);
}

template<typename TService, typename TRequest, typename TResponse>
void RunDeferred(
TRequest&& request,
TRequestWrapper<TRequest>&& requestWrapper,
TDeferredOperationCb&& userResponseCb,
TSimpleRpc<TService, TRequest, TResponse> rpc,
TDbDriverStatePtr dbState,
Expand Down Expand Up @@ -321,7 +367,7 @@ class TGRpcConnectionsImpl
};

Run<TService, TRequest, TResponse>(
std::move(request),
std::move(requestWrapper),
responseCb,
rpc,
dbState,
Expand Down Expand Up @@ -357,7 +403,7 @@ class TGRpcConnectionsImpl

template<typename TService, typename TRequest, typename TResponse>
void RunDeferred(
TRequest&& request,
TRequestWrapper<TRequest>&& requestWrapper,
TDeferredResultCb&& userResponseCb,
TSimpleRpc<TService, TRequest, TResponse> rpc,
TDbDriverStatePtr dbState,
Expand All @@ -375,7 +421,7 @@ class TGRpcConnectionsImpl
};

RunDeferred<TService, TRequest, TResponse>(
std::move(request),
std::move(requestWrapper),
operationCb,
rpc,
dbState,
Expand Down
14 changes: 14 additions & 0 deletions src/client/impl/ydb_internal/make_request/make.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,18 @@ TProtoRequest MakeOperationRequest(const TRequestSettings& settings) {
return request;
}


template <typename TProtoRequest>
TProtoRequest* MakeRequestOnArena(google::protobuf::Arena* arena) {
return google::protobuf::Arena::CreateMessage<TProtoRequest>(arena);
}

template <typename TProtoRequest, typename TRequestSettings>
TProtoRequest* MakeOperationRequestOnArena(const TRequestSettings& settings, google::protobuf::Arena* arena) {
Y_ASSERT(arena != nullptr);
auto request = MakeRequestOnArena<TProtoRequest>(arena);
FillOperationParams(settings, *request);
return request;
}

} // namespace NYdb
2 changes: 2 additions & 0 deletions src/client/scheme/scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ static ESchemeEntryType ConvertProtoEntryType(::Ydb::Scheme::Entry::Type entry)
return ESchemeEntryType::View;
case ::Ydb::Scheme::Entry::RESOURCE_POOL:
return ESchemeEntryType::ResourcePool;
case ::Ydb::Scheme::Entry::SYS_VIEW:
return ESchemeEntryType::SysView;
default:
return ESchemeEntryType::Unknown;
}
Expand Down
59 changes: 40 additions & 19 deletions src/client/table/impl/table_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -989,29 +989,50 @@ void TTableClient::TImpl::SetStatCollector(const NSdkStats::TStatCollector::TCli
SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing);
}

TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings) {
auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings);
request.set_table(TStringType{table});
*request.mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType());
*request.mutable_rows()->mutable_value() = rows.GetProto();
TAsyncBulkUpsertResult TTableClient::TImpl::BulkUpsert(const std::string& table, TValue&& rows, const TBulkUpsertSettings& settings, bool canMove) {
Ydb::Table::BulkUpsertRequest* request = nullptr;
std::unique_ptr<Ydb::Table::BulkUpsertRequest> holder;

auto promise = NewPromise<TBulkUpsertResult>();
if (settings.Arena_) {
request = MakeOperationRequestOnArena<Ydb::Table::BulkUpsertRequest>(settings, settings.Arena_);
} else {
holder = std::make_unique<Ydb::Table::BulkUpsertRequest>(MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings));
request = holder.get();
}

auto extractor = [promise]
(google::protobuf::Any* any, TPlainStatus status) mutable {
Y_UNUSED(any);
TBulkUpsertResult val(TStatus(std::move(status)));
promise.SetValue(std::move(val));
};
request->set_table(TStringType{table});
if (canMove) {
request->mutable_rows()->mutable_type()->Swap(&rows.GetType().GetProto());
request->mutable_rows()->mutable_value()->Swap(&rows.GetProto());
} else {
*request->mutable_rows()->mutable_type() = TProtoAccessor::GetProto(rows.GetType());
*request->mutable_rows()->mutable_value() = rows.GetProto();
}

Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
std::move(request),
extractor,
&Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
TRpcRequestSettings::Make(settings));
auto promise = NewPromise<TBulkUpsertResult>();
auto extractor = [promise](google::protobuf::Any* any, TPlainStatus status) mutable {
Y_UNUSED(any);
TBulkUpsertResult val(TStatus(std::move(status)));
promise.SetValue(std::move(val));
};

if (settings.Arena_) {
Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
request,
extractor,
&Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
TRpcRequestSettings::Make(settings));
} else {
Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse>(
std::move(*holder),
extractor,
&Ydb::Table::V1::TableService::Stub::AsyncBulkUpsert,
DbDriverState_,
INITIAL_DEFERRED_CALL_DELAY,
TRpcRequestSettings::Make(settings));
}
return promise.GetFuture();
}

Expand Down
Loading
Loading