diff --git a/ydb/public/sdk/cpp/examples/executor/main.cpp b/ydb/public/sdk/cpp/examples/executor/main.cpp new file mode 100644 index 000000000000..2e5cbca1e53d --- /dev/null +++ b/ydb/public/sdk/cpp/examples/executor/main.cpp @@ -0,0 +1,52 @@ +#include +#include +#include + +#include + +#include + +#include + + +void ExecutorExample(const std::string& endpoint, const std::string& database) { + auto driverConfig = NYdb::CreateFromEnvironment(endpoint + "/?database=" + database) + .SetExecutor(NYdb::NExec::CreateThreadPoolExecutorAdapter( + std::make_shared(TThreadPool::TParams() + .SetBlocking(true) + .SetCatching(false) + .SetForkAware(false)), + std::thread::hardware_concurrency()) + ); + + NYdb::TDriver driver(driverConfig); + NYdb::NQuery::TQueryClient client(driver); + + try { + auto result = client.ExecuteQuery("SELECT 1", NYdb::NQuery::TTxControl::NoTx()).GetValueSync(); + NYdb::NStatusHelpers::ThrowOnError(result); + auto parser = result.GetResultSetParser(0); + parser.TryNextRow(); + std::cout << "Result: " << parser.ColumnParser(0).GetInt32() << std::endl; + } catch (const std::exception& e) { + std::cerr << "Execution failed: " << e.what() << std::endl; + } + + driver.Stop(true); +} + +int main(int argc, char** argv) { + std::string endpoint; + std::string database; + + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + + opts.AddLongOption('e', "endpoint", "YDB endpoint").Required().RequiredArgument("HOST:PORT").StoreResult(&endpoint); + opts.AddLongOption('d', "database", "YDB database").Required().RequiredArgument("DATABASE").StoreResult(&database); + + opts.SetFreeArgsMin(0); + NLastGetopt::TOptsParseResult result(&opts, argc, argv); + + ExecutorExample(endpoint, database); + return 0; +} diff --git a/ydb/public/sdk/cpp/examples/executor/ya.make b/ydb/public/sdk/cpp/examples/executor/ya.make new file mode 100644 index 000000000000..72eb3f430753 --- /dev/null +++ b/ydb/public/sdk/cpp/examples/executor/ya.make @@ -0,0 +1,13 @@ +PROGRAM() + +SRCS( + main.cpp +) + +PEERDIR( + library/cpp/getopt + ydb/public/sdk/cpp/src/client/query + ydb/public/sdk/cpp/src/client/helpers +) + +END() diff --git a/ydb/public/sdk/cpp/examples/ya.make b/ydb/public/sdk/cpp/examples/ya.make index 66098fe0f62a..4c9bc0470d5b 100644 --- a/ydb/public/sdk/cpp/examples/ya.make +++ b/ydb/public/sdk/cpp/examples/ya.make @@ -1,6 +1,7 @@ RECURSE( basic_example bulk_upsert_simple + executor pagination secondary_index secondary_index_builtin diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h index f5e3e4579556..d03ef3938916 100644 --- a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -49,7 +50,7 @@ class TDriverConfig { //! Enable Ssl. //! caCerts - The buffer containing the PEM encoded root certificates for SSL/TLS connections. //! If this parameter is empty, the default roots will be used. - TDriverConfig& UseSecureConnection(const std::string& caCerts = std::string()); + TDriverConfig& UseSecureConnection(const std::string& caCerts = ""); TDriverConfig& SetUsePerChannelTcpConnection(bool usePerChannel); TDriverConfig& UseClientCertificate(const std::string& clientCert, const std::string& clientPrivateKey); //! Set token, this option can be overridden for client by ClientSettings @@ -88,7 +89,7 @@ class TDriverConfig { //! Set policy for balancing //! Params is a optionally field to set policy settings //! default: EBalancingPolicy::UsePreferableLocation - TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = std::string()); + TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = ""); //! Set grpc level keep alive. If keepalive ping was delayed more than given timeout //! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error //! Note: this timeout should not be too small to prevent fail due to @@ -118,6 +119,11 @@ class TDriverConfig { //! Log backend. TDriverConfig& SetLog(std::unique_ptr&& log); + + //! Set executor for async responses. + //! If not set, default executor will be used. + TDriverConfig& SetExecutor(std::shared_ptr executor); + private: class TImpl; std::shared_ptr Impl_; diff --git a/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h new file mode 100644 index 000000000000..13e74058f8f7 --- /dev/null +++ b/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +#include + +namespace NYdb::inline Dev::NExec { + +class IExecutor { +public: + virtual void Start() = 0; + virtual void Stop() = 0; + + virtual void Post(std::function&& f) = 0; + + virtual ~IExecutor() = default; +}; + +// Create default executor for thread pool. +std::shared_ptr CreateThreadPoolExecutor(std::size_t threadCount, std::size_t maxQueueSize = 0); + +// Create executor adapter for util thread pool. +std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool, std::size_t threadCount, std::size_t maxQueueSize = 0); + +} // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/driver/driver.cpp b/ydb/public/sdk/cpp/src/client/driver/driver.cpp index 1e5cb0c26d1f..83132baba5d1 100644 --- a/ydb/public/sdk/cpp/src/client/driver/driver.cpp +++ b/ydb/public/sdk/cpp/src/client/driver/driver.cpp @@ -50,6 +50,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t GetMaxOutboundMessageSize() const override { return MaxOutboundMessageSize; } uint64_t GetMaxMessageSize() const override { return MaxMessageSize; } const TLog& GetLog() const override { return Log; } + std::shared_ptr GetExecutor() const override { return Executor; } std::string Endpoint; size_t NetworkThreadsNum = 2; @@ -78,6 +79,7 @@ class TDriverConfig::TImpl : public IConnectionsParams { uint64_t MaxOutboundMessageSize = 0; uint64_t MaxMessageSize = 0; TLog Log; // Null by default. + std::shared_ptr Executor; }; TDriverConfig::TDriverConfig(const std::string& connectionString) @@ -210,6 +212,11 @@ TDriverConfig& TDriverConfig::SetLog(std::unique_ptr&& log) { return *this; } +TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr executor) { + Impl_->Executor = executor; + return *this; +} + //////////////////////////////////////////////////////////////////////////////// std::shared_ptr CreateInternalInterface(const TDriver connection) { diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp index 052a2cfc773c..a877005f80d0 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp @@ -3,6 +3,7 @@ #include + namespace NYdb::inline Dev { bool IsTokenCorrect(const std::string& in) { @@ -137,7 +138,6 @@ class TScheduledFuture : public TScheduledObject { TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr params) : MetricRegistryPtr_(nullptr) , ClientThreadsNum_(params->GetClientThreadsNum()) - , ResponseQueue_(CreateThreadPool(ClientThreadsNum_)) , DefaultDiscoveryEndpoint_(params->GetEndpoint()) , SslCredentials_(params->GetSslCredentials()) , DefaultDatabase_(params->GetDatabase()) @@ -180,8 +180,14 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr p AddPeriodicTask(channelPoolUpdateWrapper, SocketIdleTimeout_ * 0.1); } #endif - //TAdaptiveThreadPool ignores params - ResponseQueue_->Start(ClientThreadsNum_, MaxQueuedResponses_); + if (params->GetExecutor()) { + ResponseQueue_ = params->GetExecutor(); + } else { + // TAdaptiveThreadPool ignores params + ResponseQueue_ = NExec::CreateThreadPoolExecutor(ClientThreadsNum_, MaxQueuedRequests_); + } + + ResponseQueue_->Start(); if (!DefaultDatabase_.empty()) { DefaultState_ = StateTracker_.GetDriverState( DefaultDatabase_, @@ -427,7 +433,9 @@ const TLog& TGRpcConnectionsImpl::GetLog() const { } void TGRpcConnectionsImpl::EnqueueResponse(IObjectInQueue* action) { - Y_ENSURE(ResponseQueue_->Add(action)); + ResponseQueue_->Post([action]() { + action->Process(nullptr); + }); } } // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h index d65d408bf022..ad5ae684f8ed 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h @@ -10,12 +10,12 @@ #include #include #include -#include #include #include #include + namespace NYdb::inline Dev { constexpr TDuration GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY = TDuration::Seconds(10); @@ -748,8 +748,8 @@ class TGRpcConnectionsImpl std::mutex ExtensionsLock_; ::NMonitoring::TMetricRegistry* MetricRegistryPtr_ = nullptr; - const size_t ClientThreadsNum_; - std::unique_ptr ResponseQueue_; + const std::size_t ClientThreadsNum_; + std::shared_ptr ResponseQueue_; const std::string DefaultDiscoveryEndpoint_; const TSslCredentials SslCredentials_; @@ -757,16 +757,16 @@ class TGRpcConnectionsImpl std::shared_ptr DefaultCredentialsProviderFactory_; TDbDriverStateTracker StateTracker_; const EDiscoveryMode DefaultDiscoveryMode_; - const i64 MaxQueuedRequests_; - const i64 MaxQueuedResponses_; + const std::int64_t MaxQueuedRequests_; + const std::int64_t MaxQueuedResponses_; const bool DrainOnDtors_; const TBalancingSettings BalancingSettings_; const TDuration GRpcKeepAliveTimeout_; const bool GRpcKeepAlivePermitWithoutCalls_; - const ui64 MemoryQuota_; - const ui64 MaxInboundMessageSize_; - const ui64 MaxOutboundMessageSize_; - const ui64 MaxMessageSize_; + const std::uint64_t MemoryQuota_; + const std::uint64_t MaxInboundMessageSize_; + const std::uint64_t MaxOutboundMessageSize_; + const std::uint64_t MaxMessageSize_; std::atomic_int64_t QueuedRequests_; const NYdbGrpc::TTcpKeepAliveSettings TcpKeepAliveSettings_; @@ -782,7 +782,7 @@ class TGRpcConnectionsImpl IDiscoveryMutatorApi::TMutatorCb DiscoveryMutatorCb; - const size_t NetworkThreadsNum_; + const std::size_t NetworkThreadsNum_; bool UsePerChannelTcpConnection_; // Must be the last member (first called destructor) NYdbGrpc::TGRpcClientLow GRpcClientLow_; diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h index 50eea9f42f88..4f11a5d8cc40 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace NYdb::inline Dev { @@ -33,6 +34,7 @@ class IConnectionsParams { virtual uint64_t GetMaxInboundMessageSize() const = 0; virtual uint64_t GetMaxOutboundMessageSize() const = 0; virtual uint64_t GetMaxMessageSize() const = 0; + virtual std::shared_ptr GetExecutor() const = 0; }; } // namespace NYdb diff --git a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make index 1bd264bd238d..7ad3993eae79 100644 --- a/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make +++ b/ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make @@ -10,10 +10,10 @@ PEERDIR( ydb/public/api/protos ydb/public/sdk/cpp/src/client/impl/ydb_internal/db_driver_state ydb/public/sdk/cpp/src/client/impl/ydb_internal/plain_status - ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool ydb/public/sdk/cpp/src/client/impl/ydb_stats ydb/public/sdk/cpp/src/client/resources ydb/public/sdk/cpp/src/client/types/exceptions + ydb/public/sdk/cpp/src/client/types/executor ) END() diff --git a/ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp b/ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp index 44fccf8df414..3f7defb58a42 100644 --- a/ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp @@ -1,5 +1,7 @@ #include "executor_impl.h" +#include + namespace NYdb::inline Dev::NTopic { void IAsyncExecutor::Post(TFunction&& f) { diff --git a/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp new file mode 100644 index 000000000000..f8127d158423 --- /dev/null +++ b/ydb/public/sdk/cpp/src/client/types/executor/executor.cpp @@ -0,0 +1,45 @@ +#include + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + + +namespace NYdb::inline Dev::NExec { + +class TThreadPoolExecutor : public NExec::IExecutor { +public: + TThreadPoolExecutor(std::shared_ptr threadPool, std::size_t threadCount, std::size_t maxQueueSize) + : ThreadPool_(threadPool) + , ThreadCount_(threadCount) + , MaxQueueSize_(maxQueueSize) + { + } + + void Start() override { + ThreadPool_->Start(ThreadCount_, MaxQueueSize_); + } + + void Stop() override { + ThreadPool_->Stop(); + } + + void Post(std::function&& f) override { + ThreadPool_->SafeAddFunc(std::move(f)); + } + +private: + std::shared_ptr ThreadPool_; + const std::size_t ThreadCount_; + const std::size_t MaxQueueSize_; +}; + +std::shared_ptr CreateThreadPoolExecutor(std::size_t threadCount, std::size_t maxQueueSize) { + return std::make_shared(CreateThreadPool(threadCount), threadCount, maxQueueSize); +} + +std::shared_ptr CreateThreadPoolExecutorAdapter(std::shared_ptr threadPool, std::size_t threadCount, std::size_t maxQueueSize) { + return std::make_shared(threadPool, threadCount, maxQueueSize); +} + +} // namespace NYdb::inline Dev diff --git a/ydb/public/sdk/cpp/src/client/types/executor/ya.make b/ydb/public/sdk/cpp/src/client/types/executor/ya.make new file mode 100644 index 000000000000..1d71f0cc8f79 --- /dev/null +++ b/ydb/public/sdk/cpp/src/client/types/executor/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + executor.cpp +) + +PEERDIR( + ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool + ydb/public/sdk/cpp/src/client/types/exceptions +) + +END()