From 1f39d70942b9353fdcdf526a453a79eb14d39383 Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Mon, 7 Jul 2025 17:18:10 +0000 Subject: [PATCH 1/5] [C++ SDK] Custom executor --- .../ydb-cpp-sdk/client/driver/driver.h | 10 ++- .../client/types/executor/executor.h | 20 ++++++ .../sdk/cpp/src/client/driver/driver.cpp | 7 ++ .../grpc_connections/grpc_connections.cpp | 7 +- .../grpc_connections/grpc_connections.h | 21 +++--- .../ydb_internal/grpc_connections/params.h | 2 + .../grpc_connections/response_queue.cpp | 67 +++++++++++++++++++ .../grpc_connections/response_queue.h | 24 +++++++ .../ydb_internal/grpc_connections/ya.make | 2 + .../src/client/topic/common/executor_impl.cpp | 2 + .../src/client/types/executor/executor.cpp | 24 +++++++ .../sdk/cpp/src/client/types/executor/ya.make | 11 +++ 12 files changed, 182 insertions(+), 15 deletions(-) create mode 100644 ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h create mode 100644 ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp create mode 100644 ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.h create mode 100644 ydb/public/sdk/cpp/src/client/types/executor/executor.cpp create mode 100644 ydb/public/sdk/cpp/src/client/types/executor/ya.make diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h index f5e3e4579556..36fbf55b9a44 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -49,7 +50,7 @@ class TDriverConfig { //! Enable Ssl. //! caCerts - The buffer containing the PEM encoded root certificates for SSL/TLS connections. //! If this parameter is empty, the default roots will be used. - TDriverConfig& UseSecureConnection(const std::string& caCerts = std::string()); + TDriverConfig& UseSecureConnection(const std::string& caCerts = ""); TDriverConfig& SetUsePerChannelTcpConnection(bool usePerChannel); TDriverConfig& UseClientCertificate(const std::string& clientCert, const std::string& clientPrivateKey); //! Set token, this option can be overridden for client by ClientSettings @@ -88,7 +89,7 @@ class TDriverConfig { //! Set policy for balancing //! Params is a optionally field to set policy settings //! default: EBalancingPolicy::UsePreferableLocation - TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = std::string()); + TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = ""); //! Set grpc level keep alive. If keepalive ping was delayed more than given timeout //! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error //! Note: this timeout should not be too small to prevent fail due to @@ -118,6 +119,11 @@ class TDriverConfig { //! Log backend. TDriverConfig& SetLog(std::unique_ptr&& log); + + //! Set executor for async responses. + //! If not set, default executor will be used. + TDriverConfig& SetExecutor(std::shared_ptr executor); + private: class TImpl; std::shared_ptr Impl_; diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h new file mode 100644 index 000000000000..6b76c51ea70c --- /dev/null +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h @@ -0,0 +1,20 @@ +#pragma once + +#include + +#include + +namespace NYdb::inline Dev { + +class IExecutor { +public: + virtual void Post(std::function&& f) = 0; + + virtual ~IExecutor() = default; +}; + +// Create executor adapter for util thread pool. +// Thread pool is expected to have been started. +std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool); + +} // namespace NYdb::inline Dev diff --git a/ydb/public/sdk/cpp/src/client/driver/driver.cpp b/ydb/public/sdk/cpp/src/client/driver/driver.cpp index 1e5cb0c26d1f..7247e4ef9946 100644 --- a/ydb/public/sdk/cpp/src/client/driver/driver.cpp +++ b/ydb/public/sdk/cpp/src/client/driver/driver.cpp @@ -50,6 +50,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t GetMaxOutboundMessageSize() const override { return MaxOutboundMessageSize; } uint64_t GetMaxMessageSize() const override { return MaxMessageSize; } const TLog& GetLog() const override { return Log; } + std::shared_ptr GetExecutor() const override { return Executor; } std::string Endpoint; size_t NetworkThreadsNum = 2; @@ -78,6 +79,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t MaxOutboundMessageSize = 0; uint64_t MaxMessageSize = 0; TLog Log; // Null by default. + std::shared_ptr Executor; }; TDriverConfig::TDriverConfig(const std::string& connectionString) @@ -210,6 +212,11 @@ TDriverConfig& TDriverConfig::SetLog(std::unique_ptr&& log) { return *this; } +TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr executor) { + Impl_->Executor = executor; + return *this; +} + //////////////////////////////////////////////////////////////////////////////// std::shared_ptr CreateInternalInterface(const TDriver connection) { diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp index 052a2cfc773c..7008b24235ec 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp @@ -3,6 +3,7 @@ #include + namespace NYdb::inline Dev { bool IsTokenCorrect(const std::string& in) { @@ -137,7 +138,7 @@ class TScheduledFuture : public TScheduledObject { TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr params) : MetricRegistryPtr_(nullptr) , ClientThreadsNum_(params->GetClientThreadsNum()) - , ResponseQueue_(CreateThreadPool(ClientThreadsNum_)) + , ResponseQueue_(CreateResponseQueue(params)) , DefaultDiscoveryEndpoint_(params->GetEndpoint()) , SslCredentials_(params->GetSslCredentials()) , DefaultDatabase_(params->GetDatabase()) @@ -181,7 +182,7 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p } #endif //TAdaptiveThreadPool ignores params - ResponseQueue_->Start(ClientThreadsNum_, MaxQueuedResponses_); + ResponseQueue_->Start(); if (!DefaultDatabase_.empty()) { DefaultState_ = StateTracker_.GetDriverState( DefaultDatabase_, @@ -427,7 +428,7 @@ const TLog& TGRpcConnectionsImpl::GetLog() const { } void TGRpcConnectionsImpl::EnqueueResponse(IObjectInQueue* action) { - Y_ENSURE(ResponseQueue_->Add(action)); + ResponseQueue_->Post(action); } } // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h index d65d408bf022..b6a0fc772b65 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -5,17 +5,18 @@ #include "actions.h" #include "params.h" +#include "response_queue.h" #include #include #include #include -#include #include #include #include + namespace NYdb::inline Dev { constexpr TDuration GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY = TDuration::Seconds(10); @@ -748,8 +749,8 @@ class TGRpcConnectionsImpl std::mutex ExtensionsLock_; ::NMonitoring::TMetricRegistry* MetricRegistryPtr_ = nullptr; - const size_t ClientThreadsNum_; - std::unique_ptr ResponseQueue_; + const std::size_t ClientThreadsNum_; + std::unique_ptr ResponseQueue_; const std::string DefaultDiscoveryEndpoint_; const TSslCredentials SslCredentials_; @@ -757,16 +758,16 @@ class TGRpcConnectionsImpl std::shared_ptr DefaultCredentialsProviderFactory_; TDbDriverStateTracker StateTracker_; const EDiscoveryMode DefaultDiscoveryMode_; - const i64 MaxQueuedRequests_; - const i64 MaxQueuedResponses_; + const std::int64_t MaxQueuedRequests_; + const std::int64_t MaxQueuedResponses_; const bool DrainOnDtors_; const TBalancingSettings BalancingSettings_; const TDuration GRpcKeepAliveTimeout_; const bool GRpcKeepAlivePermitWithoutCalls_; - const ui64 MemoryQuota_; - const ui64 MaxInboundMessageSize_; - const ui64 MaxOutboundMessageSize_; - const ui64 MaxMessageSize_; + const std::uint64_t MemoryQuota_; + const std::uint64_t MaxInboundMessageSize_; + const std::uint64_t MaxOutboundMessageSize_; + const std::uint64_t MaxMessageSize_; std::atomic_int64_t QueuedRequests_; const NYdbGrpc::TTcpKeepAliveSettings TcpKeepAliveSettings_; @@ -782,7 +783,7 @@ class TGRpcConnectionsImpl IDiscoveryMutatorApi::TMutatorCb DiscoveryMutatorCb; - const size_t NetworkThreadsNum_; + const std::size_t NetworkThreadsNum_; bool UsePerChannelTcpConnection_; // Must be the last member (first called destructor) NYdbGrpc::TGRpcClientLow GRpcClientLow_; diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h index 50eea9f42f88..bd95d9f35722 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace NYdb::inline Dev { @@ -33,6 +34,7 @@ class IConnectionsParams { virtual uint64_t GetMaxInboundMessageSize() const = 0; virtual uint64_t GetMaxOutboundMessageSize() const = 0; virtual uint64_t GetMaxMessageSize() const = 0; + virtual std::shared_ptr GetExecutor() const = 0; }; } // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp new file mode 100644 index 000000000000..fc4ba2b664ab --- /dev/null +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp @@ -0,0 +1,67 @@ +#define INCLUDE_YDB_INTERNAL_H +#include "response_queue.h" + +#include + +#include + + +namespace NYdb::inline Dev { + +class TOwnedResponseQueue: public IResponseQueue { +public: + TOwnedResponseQueue(size_t clientThreadsNum, size_t maxQueuedResponses) + : ClientThreadsNum_(clientThreadsNum) + , MaxQueuedResponses_(maxQueuedResponses) + , ThreadPool_(CreateThreadPool(clientThreadsNum)) + { + } + + void Start() override { + ThreadPool_->Start(ClientThreadsNum_, MaxQueuedResponses_); + } + + void Stop() override { + ThreadPool_->Stop(); + } + + void Post(IObjectInQueue* action) override { + Y_ENSURE(ThreadPool_->Add(action)); + } + +private: + const std::size_t ClientThreadsNum_; + const std::size_t MaxQueuedResponses_; + + std::unique_ptr ThreadPool_; +}; + +class TExecutorResponseQueue : public IResponseQueue { +public: + TExecutorResponseQueue(std::shared_ptr executor) + : Executor_(executor) + { + } + + void Start() override { } + + void Stop() override { } + + void Post(IObjectInQueue* action) override { + Executor_->Post([action]() { + action->Process(nullptr); + }); + } + +private: + std::shared_ptr Executor_; +}; + +std::unique_ptr CreateResponseQueue(std::shared_ptr params) { + if (params->GetExecutor()) { + return std::make_unique(params->GetExecutor()); + } + return std::make_unique(params->GetClientThreadsNum(), params->GetMaxQueuedResponses()); +} + +} // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.h new file mode 100644 index 000000000000..7fae4841cab4 --- /dev/null +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +#include "params.h" + +#include + + +namespace NYdb::inline Dev { + +class IResponseQueue { +public: + virtual void Start() = 0; + virtual void Stop() = 0; + + virtual void Post(IObjectInQueue* action) = 0; + + virtual ~IResponseQueue() = default; +}; + +std::unique_ptr CreateResponseQueue(std::shared_ptr params); + +} // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make index 1bd264bd238d..c3a5a4dbd830 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( actions.cpp grpc_connections.cpp + response_queue.cpp ) PEERDIR( @@ -14,6 +15,7 @@ PEERDIR( ydb/public/sdk/cpp/src/client/impl/ydb_stats ydb/public/sdk/cpp/src/client/resources ydb/public/sdk/cpp/src/client/types/exceptions + ydb/public/sdk/cpp/src/client/types/executor ) END() diff --git a/ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp b/ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp index 44fccf8df414..3f7defb58a42 100644 --- a/ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp @@ -1,5 +1,7 @@ #include "executor_impl.h" +#include + namespace NYdb::inline Dev::NTopic { void IAsyncExecutor::Post(TFunction&& f) { diff --git a/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp new file mode 100644 index 000000000000..82e83cb9294a --- /dev/null +++ b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp @@ -0,0 +1,24 @@ +#include + +namespace NYdb::inline Dev { + +class TThreadPoolExecutor : public IExecutor { +public: + TThreadPoolExecutor(std::shared_ptr threadPool) + : ThreadPool(threadPool) + { + } + + void Post(std::function&& f) override { + ThreadPool->SafeAddFunc(std::move(f)); + } + +private: + std::shared_ptr ThreadPool; +}; + +std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool) { + return std::make_shared(threadPool); +} + +} // namespace NYdb::inline Dev diff --git a/ydb/public/sdk/cpp/src/client/types/executor/ya.make b/ydb/public/sdk/cpp/src/client/types/executor/ya.make new file mode 100644 index 000000000000..b35ce920e453 --- /dev/null +++ b/ydb/public/sdk/cpp/src/client/types/executor/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + executor.cpp +) + +PEERDIR( + ydb/public/sdk/cpp/src/client/types/exceptions +) + +END() From 0f5c3b3a186a4383371aa51a6acbdc73f5a0bf8f Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Tue, 8 Jul 2025 16:04:58 +0000 Subject: [PATCH 2/5] fix --- .../sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h | 3 ++- .../include/ydb-cpp-sdk/client/types/executor/executor.h | 2 +- ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h | 2 ++ ydb/public/sdk/cpp/src/client/driver/driver.cpp | 6 +++--- .../src/client/impl/ydb_internal/grpc_connections/params.h | 2 +- .../impl/ydb_internal/grpc_connections/response_queue.cpp | 4 ++-- ydb/public/sdk/cpp/src/client/types/executor/executor.cpp | 6 +++--- 7 files changed, 14 insertions(+), 11 deletions(-) diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h index 36fbf55b9a44..1189c21e63ce 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h @@ -122,7 +122,8 @@ class TDriverConfig { //! Set executor for async responses. //! If not set, default executor will be used. - TDriverConfig& SetExecutor(std::shared_ptr executor); + //! Warning: executor should start before driver is created and stop after driver is stopped by Stop(true) method. + TDriverConfig& SetExecutor(std::shared_ptr executor); private: class TImpl; diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h index 6b76c51ea70c..d21ced223107 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h @@ -4,7 +4,7 @@ #include -namespace NYdb::inline Dev { +namespace NYdb::inline Dev::NExec { class IExecutor { public: diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h index 3b24aaca9047..7d7a0e72ec03 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h @@ -498,6 +498,8 @@ class TValueBuilderBase : public TMoveOnly { TDerived& EmptyList(); // Struct + TDerived& Struct(const std::vector>& members); + TDerived& BeginStruct(); TDerived& AddMember(const std::string& memberName); TDerived& AddMember(const std::string& memberName, const TValue& memberValue); diff --git a/ydb/public/sdk/cpp/src/client/driver/driver.cpp b/ydb/public/sdk/cpp/src/client/driver/driver.cpp index 7247e4ef9946..83132baba5d1 100644 --- a/ydb/public/sdk/cpp/src/client/driver/driver.cpp +++ b/ydb/public/sdk/cpp/src/client/driver/driver.cpp @@ -50,7 +50,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t GetMaxOutboundMessageSize() const override { return MaxOutboundMessageSize; } uint64_t GetMaxMessageSize() const override { return MaxMessageSize; } const TLog& GetLog() const override { return Log; } - std::shared_ptr GetExecutor() const override { return Executor; } + std::shared_ptr GetExecutor() const override { return Executor; } std::string Endpoint; size_t NetworkThreadsNum = 2; @@ -79,7 +79,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t MaxOutboundMessageSize = 0; uint64_t MaxMessageSize = 0; TLog Log; // Null by default. - std::shared_ptr Executor; + std::shared_ptr Executor; }; TDriverConfig::TDriverConfig(const std::string& connectionString) @@ -212,7 +212,7 @@ TDriverConfig& TDriverConfig::SetLog(std::unique_ptr&& log) { return *this; } -TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr executor) { +TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr executor) { Impl_->Executor = executor; return *this; } diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h index bd95d9f35722..4f11a5d8cc40 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h @@ -34,7 +34,7 @@ class IConnectionsParams { virtual uint64_t GetMaxInboundMessageSize() const = 0; virtual uint64_t GetMaxOutboundMessageSize() const = 0; virtual uint64_t GetMaxMessageSize() const = 0; - virtual std::shared_ptr GetExecutor() const = 0; + virtual std::shared_ptr GetExecutor() const = 0; }; } // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp index fc4ba2b664ab..e0f5cdb36c05 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp @@ -38,7 +38,7 @@ class TOwnedResponseQueue: public IResponseQueue { class TExecutorResponseQueue : public IResponseQueue { public: - TExecutorResponseQueue(std::shared_ptr executor) + TExecutorResponseQueue(std::shared_ptr executor) : Executor_(executor) { } @@ -54,7 +54,7 @@ class TExecutorResponseQueue : public IResponseQueue { } private: - std::shared_ptr Executor_; + std::shared_ptr Executor_; }; std::unique_ptr CreateResponseQueue(std::shared_ptr params) { diff --git a/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp index 82e83cb9294a..0ed822c6a503 100644 --- a/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp +++ b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp @@ -1,8 +1,8 @@ #include -namespace NYdb::inline Dev { +namespace NYdb::inline Dev::NExec { -class TThreadPoolExecutor : public IExecutor { +class TThreadPoolExecutor : public NExec::IExecutor { public: TThreadPoolExecutor(std::shared_ptr threadPool) : ThreadPool(threadPool) @@ -17,7 +17,7 @@ class TThreadPoolExecutor : public IExecutor { std::shared_ptr ThreadPool; }; -std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool) { +std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool) { return std::make_shared(threadPool); } From e7ec9ae79abc9caa43a96ff30d810e60fb3236ba Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Tue, 8 Jul 2025 16:11:21 +0000 Subject: [PATCH 3/5] fix --- ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h index 7d7a0e72ec03..3b24aaca9047 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/value/value.h @@ -498,8 +498,6 @@ class TValueBuilderBase : public TMoveOnly { TDerived& EmptyList(); // Struct - TDerived& Struct(const std::vector>& members); - TDerived& BeginStruct(); TDerived& AddMember(const std::string& memberName); TDerived& AddMember(const std::string& memberName, const TValue& memberValue); From cc7c9a7421b00724d8f432d9f56a706a24058cef Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Tue, 8 Jul 2025 18:07:28 +0000 Subject: [PATCH 4/5] Fix --- ydb/public/sdk/cpp/examples/executor/main.cpp | 52 ++++++++++++++ ydb/public/sdk/cpp/examples/executor/ya.make | 13 ++++ ydb/public/sdk/cpp/examples/ya.make | 1 + .../client/types/executor/executor.h | 8 ++- .../grpc_connections/grpc_connections.cpp | 15 ++++- .../grpc_connections/grpc_connections.h | 3 +- .../grpc_connections/response_queue.cpp | 67 ------------------- .../grpc_connections/response_queue.h | 24 ------- .../ydb_internal/grpc_connections/ya.make | 1 - .../src/client/types/executor/executor.cpp | 24 +++++-- 10 files changed, 102 insertions(+), 106 deletions(-) create mode 100644 ydb/public/sdk/cpp/examples/executor/main.cpp create mode 100644 ydb/public/sdk/cpp/examples/executor/ya.make delete mode 100644 ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp delete mode 100644 ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.h diff --git a/ydb/public/sdk/cpp/examples/executor/main.cpp b/ydb/public/sdk/cpp/examples/executor/main.cpp new file mode 100644 index 000000000000..2e5cbca1e53d --- /dev/null +++ b/ydb/public/sdk/cpp/examples/executor/main.cpp @@ -0,0 +1,52 @@ +#include +#include +#include + +#include + +#include + +#include + + +void ExecutorExample(const std::string& endpoint, const std::string& database) { + auto driverConfig = NYdb::CreateFromEnvironment(endpoint + "/?database=" + database) + .SetExecutor(NYdb::NExec::CreateThreadPoolExecutorAdapter( + std::make_shared(TThreadPool::TParams() + .SetBlocking(true) + .SetCatching(false) + .SetForkAware(false)), + std::thread::hardware_concurrency()) + ); + + NYdb::TDriver driver(driverConfig); + NYdb::NQuery::TQueryClient client(driver); + + try { + auto result = client.ExecuteQuery("SELECT 1", NYdb::NQuery::TTxControl::NoTx()).GetValueSync(); + NYdb::NStatusHelpers::ThrowOnError(result); + auto parser = result.GetResultSetParser(0); + parser.TryNextRow(); + std::cout << "Result: " << parser.ColumnParser(0).GetInt32() << std::endl; + } catch (const std::exception& e) { + std::cerr << "Execution failed: " << e.what() << std::endl; + } + + driver.Stop(true); +} + +int main(int argc, char** argv) { + std::string endpoint; + std::string database; + + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + + opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT").StoreResult(&endpoint); + opts.AddLongOption('d', "database", "YDB database").Required().RequiredArgument("DATABASE").StoreResult(&database); + + opts.SetFreeArgsMin(0); + NLastGetopt::TOptsParseResult result(&opts, argc, argv); + + ExecutorExample(endpoint, database); + return 0; +} diff --git a/ydb/public/sdk/cpp/examples/executor/ya.make b/ydb/public/sdk/cpp/examples/executor/ya.make new file mode 100644 index 000000000000..72eb3f430753 --- /dev/null +++ b/ydb/public/sdk/cpp/examples/executor/ya.make @@ -0,0 +1,13 @@ +PROGRAM() + +SRCS( + main.cpp +) + +PEERDIR( + library/cpp/getopt + ydb/public/sdk/cpp/src/client/query + ydb/public/sdk/cpp/src/client/helpers +) + +END() diff --git a/ydb/public/sdk/cpp/examples/ya.make b/ydb/public/sdk/cpp/examples/ya.make index 66098fe0f62a..4c9bc0470d5b 100644 --- a/ydb/public/sdk/cpp/examples/ya.make +++ b/ydb/public/sdk/cpp/examples/ya.make @@ -1,6 +1,7 @@ RECURSE( basic_example bulk_upsert_simple + executor pagination secondary_index secondary_index_builtin diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h index d21ced223107..7dfa5387b462 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h @@ -8,13 +8,15 @@ namespace NYdb::inline Dev::NExec { class IExecutor { public: + virtual void Start() = 0; + virtual void Stop() = 0; + virtual void Post(std::function&& f) = 0; virtual ~IExecutor() = default; }; // Create executor adapter for util thread pool. -// Thread pool is expected to have been started. -std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool); +std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool, std::size_t threadCount, std::size_t maxQueueSize = 0); -} // namespace NYdb::inline Dev +} // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp index 7008b24235ec..6200bdb4c3da 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp @@ -3,6 +3,8 @@ #include +#include + namespace NYdb::inline Dev { @@ -138,7 +140,6 @@ class TScheduledFuture : public TScheduledObject { TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr params) : MetricRegistryPtr_(nullptr) , ClientThreadsNum_(params->GetClientThreadsNum()) - , ResponseQueue_(CreateResponseQueue(params)) , DefaultDiscoveryEndpoint_(params->GetEndpoint()) , SslCredentials_(params->GetSslCredentials()) , DefaultDatabase_(params->GetDatabase()) @@ -181,7 +182,13 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p AddPeriodicTask(channelPoolUpdateWrapper, SocketIdleTimeout_ * 0.1); } #endif - //TAdaptiveThreadPool ignores params + if (params->GetExecutor()) { + ResponseQueue_ = params->GetExecutor(); + } else { + // TAdaptiveThreadPool ignores params + ResponseQueue_ = NExec::CreateThreadPoolExecutorAdapter(CreateThreadPool(ClientThreadsNum_), ClientThreadsNum_, MaxQueuedRequests_); + } + ResponseQueue_->Start(); if (!DefaultDatabase_.empty()) { DefaultState_ = StateTracker_.GetDriverState( @@ -428,7 +435,9 @@ const TLog& TGRpcConnectionsImpl::GetLog() const { } void TGRpcConnectionsImpl::EnqueueResponse(IObjectInQueue* action) { - ResponseQueue_->Post(action); + ResponseQueue_->Post([action]() { + action->Process(nullptr); + }); } } // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h index b6a0fc772b65..ad5ae684f8ed 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -5,7 +5,6 @@ #include "actions.h" #include "params.h" -#include "response_queue.h" #include #include @@ -750,7 +749,7 @@ class TGRpcConnectionsImpl ::NMonitoring::TMetricRegistry* MetricRegistryPtr_ = nullptr; const std::size_t ClientThreadsNum_; - std::unique_ptr ResponseQueue_; + std::shared_ptr ResponseQueue_; const std::string DefaultDiscoveryEndpoint_; const TSslCredentials SslCredentials_; diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp deleted file mode 100644 index e0f5cdb36c05..000000000000 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.cpp +++ /dev/null @@ -1,67 +0,0 @@ -#define INCLUDE_YDB_INTERNAL_H -#include "response_queue.h" - -#include - -#include - - -namespace NYdb::inline Dev { - -class TOwnedResponseQueue: public IResponseQueue { -public: - TOwnedResponseQueue(size_t clientThreadsNum, size_t maxQueuedResponses) - : ClientThreadsNum_(clientThreadsNum) - , MaxQueuedResponses_(maxQueuedResponses) - , ThreadPool_(CreateThreadPool(clientThreadsNum)) - { - } - - void Start() override { - ThreadPool_->Start(ClientThreadsNum_, MaxQueuedResponses_); - } - - void Stop() override { - ThreadPool_->Stop(); - } - - void Post(IObjectInQueue* action) override { - Y_ENSURE(ThreadPool_->Add(action)); - } - -private: - const std::size_t ClientThreadsNum_; - const std::size_t MaxQueuedResponses_; - - std::unique_ptr ThreadPool_; -}; - -class TExecutorResponseQueue : public IResponseQueue { -public: - TExecutorResponseQueue(std::shared_ptr executor) - : Executor_(executor) - { - } - - void Start() override { } - - void Stop() override { } - - void Post(IObjectInQueue* action) override { - Executor_->Post([action]() { - action->Process(nullptr); - }); - } - -private: - std::shared_ptr Executor_; -}; - -std::unique_ptr CreateResponseQueue(std::shared_ptr params) { - if (params->GetExecutor()) { - return std::make_unique(params->GetExecutor()); - } - return std::make_unique(params->GetClientThreadsNum(), params->GetMaxQueuedResponses()); -} - -} // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.h deleted file mode 100644 index 7fae4841cab4..000000000000 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/response_queue.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include - -#include "params.h" - -#include - - -namespace NYdb::inline Dev { - -class IResponseQueue { -public: - virtual void Start() = 0; - virtual void Stop() = 0; - - virtual void Post(IObjectInQueue* action) = 0; - - virtual ~IResponseQueue() = default; -}; - -std::unique_ptr CreateResponseQueue(std::shared_ptr params); - -} // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make index c3a5a4dbd830..9dfd33914adb 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make @@ -3,7 +3,6 @@ LIBRARY() SRCS( actions.cpp grpc_connections.cpp - response_queue.cpp ) PEERDIR( diff --git a/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp index 0ed822c6a503..26c5a4d3e5c9 100644 --- a/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp +++ b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp @@ -4,21 +4,33 @@ namespace NYdb::inline Dev::NExec { class TThreadPoolExecutor : public NExec::IExecutor { public: - TThreadPoolExecutor(std::shared_ptr threadPool) - : ThreadPool(threadPool) + TThreadPoolExecutor(std::shared_ptr threadPool, std::size_t threadCount, std::size_t maxQueueSize) + : ThreadPool_(threadPool) + , ThreadCount_(threadCount) + , MaxQueueSize_(maxQueueSize) { } + void Start() override { + ThreadPool_->Start(ThreadCount_, MaxQueueSize_); + } + + void Stop() override { + ThreadPool_->Stop(); + } + void Post(std::function&& f) override { - ThreadPool->SafeAddFunc(std::move(f)); + ThreadPool_->SafeAddFunc(std::move(f)); } private: - std::shared_ptr ThreadPool; + std::shared_ptr ThreadPool_; + const std::size_t ThreadCount_; + const std::size_t MaxQueueSize_; }; -std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool) { - return std::make_shared(threadPool); +std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool, std::size_t threadCount, std::size_t maxQueueSize) { + return std::make_shared(threadPool, threadCount, maxQueueSize); } } // namespace NYdb::inline Dev From 6d9fb99a9ff3000443d8913bc596dde7501191a7 Mon Sep 17 00:00:00 2001 From: Bulat Gayazov Date: Tue, 8 Jul 2025 18:26:48 +0000 Subject: [PATCH 5/5] fix --- .../sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h | 1 - .../include/ydb-cpp-sdk/client/types/executor/executor.h | 3 +++ .../ydb_internal/grpc_connections/grpc_connections.cpp | 4 +--- .../client/impl/ydb_internal/grpc_connections/ya.make | 1 - .../sdk/cpp/src/client/types/executor/executor.cpp | 9 +++++++++ ydb/public/sdk/cpp/src/client/types/executor/ya.make | 1 + 6 files changed, 14 insertions(+), 5 deletions(-) diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h index 1189c21e63ce..d03ef3938916 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h @@ -122,7 +122,6 @@ class TDriverConfig { //! Set executor for async responses. //! If not set, default executor will be used. - //! Warning: executor should start before driver is created and stop after driver is stopped by Stop(true) method. TDriverConfig& SetExecutor(std::shared_ptr executor); private: diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h index 7dfa5387b462..13e74058f8f7 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h @@ -16,6 +16,9 @@ class IExecutor { virtual ~IExecutor() = default; }; +// Create default executor for thread pool. +std::shared_ptr CreateThreadPoolExecutor(std::size_t threadCount, std::size_t maxQueueSize = 0); + // Create executor adapter for util thread pool. std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool, std::size_t threadCount, std::size_t maxQueueSize = 0); diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp index 6200bdb4c3da..a877005f80d0 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp @@ -3,8 +3,6 @@ #include -#include - namespace NYdb::inline Dev { @@ -186,7 +184,7 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p ResponseQueue_ = params->GetExecutor(); } else { // TAdaptiveThreadPool ignores params - ResponseQueue_ = NExec::CreateThreadPoolExecutorAdapter(CreateThreadPool(ClientThreadsNum_), ClientThreadsNum_, MaxQueuedRequests_); + ResponseQueue_ = NExec::CreateThreadPoolExecutor(ClientThreadsNum_, MaxQueuedRequests_); } ResponseQueue_->Start(); diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make index 9dfd33914adb..7ad3993eae79 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make @@ -10,7 +10,6 @@ PEERDIR( ydb/public/api/protos ydb/public/sdk/cpp/src/client/impl/ydb_internal/db_driver_state ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status - ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool ydb/public/sdk/cpp/src/client/impl/ydb_stats ydb/public/sdk/cpp/src/client/resources ydb/public/sdk/cpp/src/client/types/exceptions diff --git a/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp index 26c5a4d3e5c9..f8127d158423 100644 --- a/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp +++ b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp @@ -1,5 +1,10 @@ #include +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + + namespace NYdb::inline Dev::NExec { class TThreadPoolExecutor : public NExec::IExecutor { @@ -29,6 +34,10 @@ class TThreadPoolExecutor : public NExec::IExecutor { const std::size_t MaxQueueSize_; }; +std::shared_ptr CreateThreadPoolExecutor(std::size_t threadCount, std::size_t maxQueueSize) { + return std::make_shared(CreateThreadPool(threadCount), threadCount, maxQueueSize); +} + std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool, std::size_t threadCount, std::size_t maxQueueSize) { return std::make_shared(threadPool, threadCount, maxQueueSize); } diff --git a/ydb/public/sdk/cpp/src/client/types/executor/ya.make b/ydb/public/sdk/cpp/src/client/types/executor/ya.make index b35ce920e453..1d71f0cc8f79 100644 --- a/ydb/public/sdk/cpp/src/client/types/executor/ya.make +++ b/ydb/public/sdk/cpp/src/client/types/executor/ya.make @@ -5,6 +5,7 @@ SRCS( ) PEERDIR( + ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool ydb/public/sdk/cpp/src/client/types/exceptions )