Skip to content

[C++ SDK] Custom executor #20752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/fatal_error_handlers/handlers.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/request_settings.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/status/status.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h>

#include <library/cpp/logger/backend.h>

Expand Down Expand Up @@ -49,7 +50,7 @@ class TDriverConfig {
//! Enable Ssl.
//! caCerts - The buffer containing the PEM encoded root certificates for SSL/TLS connections.
//! If this parameter is empty, the default roots will be used.
TDriverConfig& UseSecureConnection(const std::string& caCerts = std::string());
TDriverConfig& UseSecureConnection(const std::string& caCerts = "");
TDriverConfig& SetUsePerChannelTcpConnection(bool usePerChannel);
TDriverConfig& UseClientCertificate(const std::string& clientCert, const std::string& clientPrivateKey);
//! Set token, this option can be overridden for client by ClientSettings
Expand Down Expand Up @@ -88,7 +89,7 @@ class TDriverConfig {
//! Set policy for balancing
//! Params is a optionally field to set policy settings
//! default: EBalancingPolicy::UsePreferableLocation
TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = std::string());
TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = "");
//! Set grpc level keep alive. If keepalive ping was delayed more than given timeout
//! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error
//! Note: this timeout should not be too small to prevent fail due to
Expand Down Expand Up @@ -118,6 +119,12 @@ class TDriverConfig {

//! Log backend.
TDriverConfig& SetLog(std::unique_ptr<TLogBackend>&& log);

//! Set executor for async responses.
//! If not set, default executor will be used.
//! Warning: executor should start before driver is created and stop after driver is stopped by Stop(true) method.
TDriverConfig& SetExecutor(std::shared_ptr<NExec::IExecutor> executor);

private:
class TImpl;
std::shared_ptr<TImpl> Impl_;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <util/thread/pool.h>

#include <functional>

namespace NYdb::inline Dev::NExec {

class IExecutor {
public:
virtual void Post(std::function<void()>&& f) = 0;

virtual ~IExecutor() = default;
};

// Create executor adapter for util thread pool.
// Thread pool is expected to have been started.
std::shared_ptr<IExecutor> CreateThreadPoolExecutorAdapter(std::shared_ptr<IThreadPool> threadPool);

} // namespace NYdb::inline Dev
7 changes: 7 additions & 0 deletions ydb/public/sdk/cpp/src/client/driver/driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
uint64_t GetMaxOutboundMessageSize() const override { return MaxOutboundMessageSize; }
uint64_t GetMaxMessageSize() const override { return MaxMessageSize; }
const TLog& GetLog() const override { return Log; }
std::shared_ptr<NExec::IExecutor> GetExecutor() const override { return Executor; }

std::string Endpoint;
size_t NetworkThreadsNum = 2;
Expand Down Expand Up @@ -78,6 +79,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
uint64_t MaxOutboundMessageSize = 0;
uint64_t MaxMessageSize = 0;
TLog Log; // Null by default.
std::shared_ptr<NExec::IExecutor> Executor;
};

TDriverConfig::TDriverConfig(const std::string& connectionString)
Expand Down Expand Up @@ -210,6 +212,11 @@ TDriverConfig& TDriverConfig::SetLog(std::unique_ptr<TLogBackend>&& log) {
return *this;
}

TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr<NExec::IExecutor> executor) {
Impl_->Executor = executor;
return *this;
}

////////////////////////////////////////////////////////////////////////////////

std::shared_ptr<TGRpcConnectionsImpl> CreateInternalInterface(const TDriver connection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h>


namespace NYdb::inline Dev {

bool IsTokenCorrect(const std::string& in) {
Expand Down Expand Up @@ -137,7 +138,7 @@ class TScheduledFuture : public TScheduledObject<TScheduledFuture> {
TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr<IConnectionsParams> params)
: MetricRegistryPtr_(nullptr)
, ClientThreadsNum_(params->GetClientThreadsNum())
, ResponseQueue_(CreateThreadPool(ClientThreadsNum_))
, ResponseQueue_(CreateResponseQueue(params))
, DefaultDiscoveryEndpoint_(params->GetEndpoint())
, SslCredentials_(params->GetSslCredentials())
, DefaultDatabase_(params->GetDatabase())
Expand Down Expand Up @@ -181,7 +182,7 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr<IConnectionsParams> p
}
#endif
//TAdaptiveThreadPool ignores params
ResponseQueue_->Start(ClientThreadsNum_, MaxQueuedResponses_);
ResponseQueue_->Start();
if (!DefaultDatabase_.empty()) {
DefaultState_ = StateTracker_.GetDriverState(
DefaultDatabase_,
Expand Down Expand Up @@ -427,7 +428,7 @@ const TLog& TGRpcConnectionsImpl::GetLog() const {
}

void TGRpcConnectionsImpl::EnqueueResponse(IObjectInQueue* action) {
Y_ENSURE(ResponseQueue_->Add(action));
ResponseQueue_->Post(action);
}

} // namespace NYdb
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@

#include "actions.h"
#include "params.h"
#include "response_queue.h"

#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h>
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/common/client_pid.h>
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/db_driver_state/state.h>
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/rpc_request_settings/settings.h>
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool/pool.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/resources/ydb_resources.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/extension_common/extension.h>

#include <ydb/public/sdk/cpp/src/library/issue/yql_issue_message.h>


namespace NYdb::inline Dev {

constexpr TDuration GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY = TDuration::Seconds(10);
Expand Down Expand Up @@ -748,25 +749,25 @@ class TGRpcConnectionsImpl
std::mutex ExtensionsLock_;
::NMonitoring::TMetricRegistry* MetricRegistryPtr_ = nullptr;

const size_t ClientThreadsNum_;
std::unique_ptr<IThreadPool> ResponseQueue_;
const std::size_t ClientThreadsNum_;
std::unique_ptr<IResponseQueue> ResponseQueue_;

const std::string DefaultDiscoveryEndpoint_;
const TSslCredentials SslCredentials_;
const std::string DefaultDatabase_;
std::shared_ptr<ICredentialsProviderFactory> DefaultCredentialsProviderFactory_;
TDbDriverStateTracker StateTracker_;
const EDiscoveryMode DefaultDiscoveryMode_;
const i64 MaxQueuedRequests_;
const i64 MaxQueuedResponses_;
const std::int64_t MaxQueuedRequests_;
const std::int64_t MaxQueuedResponses_;
const bool DrainOnDtors_;
const TBalancingSettings BalancingSettings_;
const TDuration GRpcKeepAliveTimeout_;
const bool GRpcKeepAlivePermitWithoutCalls_;
const ui64 MemoryQuota_;
const ui64 MaxInboundMessageSize_;
const ui64 MaxOutboundMessageSize_;
const ui64 MaxMessageSize_;
const std::uint64_t MemoryQuota_;
const std::uint64_t MaxInboundMessageSize_;
const std::uint64_t MaxOutboundMessageSize_;
const std::uint64_t MaxMessageSize_;

std::atomic_int64_t QueuedRequests_;
const NYdbGrpc::TTcpKeepAliveSettings TcpKeepAliveSettings_;
Expand All @@ -782,7 +783,7 @@ class TGRpcConnectionsImpl

IDiscoveryMutatorApi::TMutatorCb DiscoveryMutatorCb;

const size_t NetworkThreadsNum_;
const std::size_t NetworkThreadsNum_;
bool UsePerChannelTcpConnection_;
// Must be the last member (first called destructor)
NYdbGrpc::TGRpcClientLow GRpcClientLow_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/common/types.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/common_client/ssl_credentials.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h>

namespace NYdb::inline Dev {

Expand Down Expand Up @@ -33,6 +34,7 @@ class IConnectionsParams {
virtual uint64_t GetMaxInboundMessageSize() const = 0;
virtual uint64_t GetMaxOutboundMessageSize() const = 0;
virtual uint64_t GetMaxMessageSize() const = 0;
virtual std::shared_ptr<NExec::IExecutor> GetExecutor() const = 0;
};

} // namespace NYdb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#define INCLUDE_YDB_INTERNAL_H
#include "response_queue.h"

#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool/pool.h>

#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h>


namespace NYdb::inline Dev {

class TOwnedResponseQueue: public IResponseQueue {
public:
TOwnedResponseQueue(size_t clientThreadsNum, size_t maxQueuedResponses)
: ClientThreadsNum_(clientThreadsNum)
, MaxQueuedResponses_(maxQueuedResponses)
, ThreadPool_(CreateThreadPool(clientThreadsNum))
{
}

void Start() override {
ThreadPool_->Start(ClientThreadsNum_, MaxQueuedResponses_);
}

void Stop() override {
ThreadPool_->Stop();
}

void Post(IObjectInQueue* action) override {
Y_ENSURE(ThreadPool_->Add(action));
}

private:
const std::size_t ClientThreadsNum_;
const std::size_t MaxQueuedResponses_;

std::unique_ptr<IThreadPool> ThreadPool_;
};

class TExecutorResponseQueue : public IResponseQueue {
public:
TExecutorResponseQueue(std::shared_ptr<NExec::IExecutor> executor)
: Executor_(executor)
{
}

void Start() override { }

void Stop() override { }

void Post(IObjectInQueue* action) override {
Executor_->Post([action]() {
action->Process(nullptr);
});
}

private:
std::shared_ptr<NExec::IExecutor> Executor_;
};

std::unique_ptr<IResponseQueue> CreateResponseQueue(std::shared_ptr<IConnectionsParams> params) {
if (params->GetExecutor()) {
return std::make_unique<TExecutorResponseQueue>(params->GetExecutor());
}
return std::make_unique<TOwnedResponseQueue>(params->GetClientThreadsNum(), params->GetMaxQueuedResponses());
}

} // namespace NYdb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/internal_header.h>

#include "params.h"

#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h>


namespace NYdb::inline Dev {

class IResponseQueue {
public:
virtual void Start() = 0;
virtual void Stop() = 0;

virtual void Post(IObjectInQueue* action) = 0;

virtual ~IResponseQueue() = default;
};

std::unique_ptr<IResponseQueue> CreateResponseQueue(std::shared_ptr<IConnectionsParams> params);

} // namespace NYdb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ LIBRARY()
SRCS(
actions.cpp
grpc_connections.cpp
response_queue.cpp
)

PEERDIR(
Expand All @@ -14,6 +15,7 @@ PEERDIR(
ydb/public/sdk/cpp/src/client/impl/ydb_stats
ydb/public/sdk/cpp/src/client/resources
ydb/public/sdk/cpp/src/client/types/exceptions
ydb/public/sdk/cpp/src/client/types/executor
)

END()
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "executor_impl.h"

#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool/pool.h>

namespace NYdb::inline Dev::NTopic {

void IAsyncExecutor::Post(TFunction&& f) {
Expand Down
24 changes: 24 additions & 0 deletions ydb/public/sdk/cpp/src/client/types/executor/executor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h>

namespace NYdb::inline Dev::NExec {

class TThreadPoolExecutor : public NExec::IExecutor {
public:
TThreadPoolExecutor(std::shared_ptr<IThreadPool> threadPool)
: ThreadPool(threadPool)
{
}

void Post(std::function<void()>&& f) override {
ThreadPool->SafeAddFunc(std::move(f));
}

private:
std::shared_ptr<IThreadPool> ThreadPool;
};

std::shared_ptr<NExec::IExecutor> CreateThreadPoolExecutorAdapter(std::shared_ptr<IThreadPool> threadPool) {
return std::make_shared<TThreadPoolExecutor>(threadPool);
}

} // namespace NYdb::inline Dev
11 changes: 11 additions & 0 deletions ydb/public/sdk/cpp/src/client/types/executor/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

SRCS(
executor.cpp
)

PEERDIR(
ydb/public/sdk/cpp/src/client/types/exceptions
)

END()
Loading