Skip to content

Commit 3946488

Browse files
committed
[C++ SDK] Custom executor
1 parent 4a26872 commit 3946488

File tree

12 files changed

+182
-15
lines changed

12 files changed

+182
-15
lines changed

ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/fatal_error_handlers/handlers.h>
99
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/request_settings.h>
1010
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/status/status.h>
11+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h>
1112

1213
#include <library/cpp/logger/backend.h>
1314

@@ -49,7 +50,7 @@ class TDriverConfig {
4950
//! Enable Ssl.
5051
//! caCerts - The buffer containing the PEM encoded root certificates for SSL/TLS connections.
5152
//! If this parameter is empty, the default roots will be used.
52-
TDriverConfig& UseSecureConnection(const std::string& caCerts = std::string());
53+
TDriverConfig& UseSecureConnection(const std::string& caCerts = "");
5354
TDriverConfig& SetUsePerChannelTcpConnection(bool usePerChannel);
5455
TDriverConfig& UseClientCertificate(const std::string& clientCert, const std::string& clientPrivateKey);
5556
//! Set token, this option can be overridden for client by ClientSettings
@@ -88,7 +89,7 @@ class TDriverConfig {
8889
//! Set policy for balancing
8990
//! Params is a optionally field to set policy settings
9091
//! default: EBalancingPolicy::UsePreferableLocation
91-
TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = std::string());
92+
TDriverConfig& SetBalancingPolicy(EBalancingPolicy policy, const std::string& params = "");
9293
//! !!! EXPERIMENTAL !!!
9394
//! Set grpc level keep alive. If keepalive ping was delayed more than given timeout
9495
//! internal grpc routine fails request with TRANSIENT_FAILURE or TRANSPORT_UNAVAILABLE error
@@ -119,6 +120,11 @@ class TDriverConfig {
119120

120121
//! Log backend.
121122
TDriverConfig& SetLog(std::unique_ptr<TLogBackend>&& log);
123+
124+
//! Set executor for async responses.
125+
//! If not set, default executor will be used.
126+
TDriverConfig& SetExecutor(std::shared_ptr<IExecutor> executor);
127+
122128
private:
123129
class TImpl;
124130
std::shared_ptr<TImpl> Impl_;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#pragma once
2+
3+
#include <util/thread/pool.h>
4+
5+
#include <functional>
6+
7+
namespace NYdb::inline Dev {
8+
9+
class IExecutor {
10+
public:
11+
virtual void Post(std::function<void()>&& f) = 0;
12+
13+
virtual ~IExecutor() = default;
14+
};
15+
16+
// Create executor adapter for util thread pool.
17+
// Thread pool is expected to have been started.
18+
std::shared_ptr<IExecutor> CreateThreadPoolExecutorAdapter(std::shared_ptr<IThreadPool> threadPool);
19+
20+
} // namespace NYdb::inline Dev

ydb/public/sdk/cpp/src/client/driver/driver.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
5050
uint64_t GetMaxOutboundMessageSize() const override { return MaxOutboundMessageSize; }
5151
uint64_t GetMaxMessageSize() const override { return MaxMessageSize; }
5252
const TLog& GetLog() const override { return Log; }
53+
std::shared_ptr<IExecutor> GetExecutor() const override { return Executor; }
5354

5455
std::string Endpoint;
5556
size_t NetworkThreadsNum = 2;
@@ -78,6 +79,7 @@ class TDriverConfig::TImpl : public IConnectionsParams {
7879
uint64_t MaxOutboundMessageSize = 0;
7980
uint64_t MaxMessageSize = 0;
8081
TLog Log; // Null by default.
82+
std::shared_ptr<IExecutor> Executor;
8183
};
8284

8385
TDriverConfig::TDriverConfig(const std::string& connectionString)
@@ -210,6 +212,11 @@ TDriverConfig& TDriverConfig::SetLog(std::unique_ptr<TLogBackend>&& log) {
210212
return *this;
211213
}
212214

215+
TDriverConfig& TDriverConfig::SetExecutor(std::shared_ptr<IExecutor> executor) {
216+
Impl_->Executor = executor;
217+
return *this;
218+
}
219+
213220
////////////////////////////////////////////////////////////////////////////////
214221

215222
std::shared_ptr<TGRpcConnectionsImpl> CreateInternalInterface(const TDriver connection) {

ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h>
55

6+
67
namespace NYdb::inline Dev {
78

89
bool IsTokenCorrect(const std::string& in) {
@@ -137,7 +138,7 @@ class TScheduledFuture : public TScheduledObject<TScheduledFuture> {
137138
TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr<IConnectionsParams> params)
138139
: MetricRegistryPtr_(nullptr)
139140
, ClientThreadsNum_(params->GetClientThreadsNum())
140-
, ResponseQueue_(CreateThreadPool(ClientThreadsNum_))
141+
, ResponseQueue_(CreateResponseQueue(params))
141142
, DefaultDiscoveryEndpoint_(params->GetEndpoint())
142143
, SslCredentials_(params->GetSslCredentials())
143144
, DefaultDatabase_(params->GetDatabase())
@@ -181,7 +182,7 @@ TGRpcConnectionsImpl::TGRpcConnectionsImpl(std::shared_ptr<IConnectionsParams> p
181182
}
182183
#endif
183184
//TAdaptiveThreadPool ignores params
184-
ResponseQueue_->Start(ClientThreadsNum_, MaxQueuedResponses_);
185+
ResponseQueue_->Start();
185186
if (!DefaultDatabase_.empty()) {
186187
DefaultState_ = StateTracker_.GetDriverState(
187188
DefaultDatabase_,
@@ -427,7 +428,7 @@ const TLog& TGRpcConnectionsImpl::GetLog() const {
427428
}
428429

429430
void TGRpcConnectionsImpl::EnqueueResponse(IObjectInQueue* action) {
430-
Y_ENSURE(ResponseQueue_->Add(action));
431+
ResponseQueue_->Post(action);
431432
}
432433

433434
} // namespace NYdb

ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/grpc_connections.h

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@
55

66
#include "actions.h"
77
#include "params.h"
8+
#include "response_queue.h"
89

910
#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h>
1011
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/common/client_pid.h>
1112
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/db_driver_state/state.h>
1213
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/rpc_request_settings/settings.h>
13-
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool/pool.h>
1414
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/resources/ydb_resources.h>
1515
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/extension_common/extension.h>
1616

1717
#include <ydb/public/sdk/cpp/src/library/issue/yql_issue_message.h>
1818

19+
1920
namespace NYdb::inline Dev {
2021

2122
constexpr TDuration GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY = TDuration::Seconds(10);
@@ -748,25 +749,25 @@ class TGRpcConnectionsImpl
748749
std::mutex ExtensionsLock_;
749750
::NMonitoring::TMetricRegistry* MetricRegistryPtr_ = nullptr;
750751

751-
const size_t ClientThreadsNum_;
752-
std::unique_ptr<IThreadPool> ResponseQueue_;
752+
const std::size_t ClientThreadsNum_;
753+
std::unique_ptr<IResponseQueue> ResponseQueue_;
753754

754755
const std::string DefaultDiscoveryEndpoint_;
755756
const TSslCredentials SslCredentials_;
756757
const std::string DefaultDatabase_;
757758
std::shared_ptr<ICredentialsProviderFactory> DefaultCredentialsProviderFactory_;
758759
TDbDriverStateTracker StateTracker_;
759760
const EDiscoveryMode DefaultDiscoveryMode_;
760-
const i64 MaxQueuedRequests_;
761-
const i64 MaxQueuedResponses_;
761+
const std::int64_t MaxQueuedRequests_;
762+
const std::int64_t MaxQueuedResponses_;
762763
const bool DrainOnDtors_;
763764
const TBalancingSettings BalancingSettings_;
764765
const TDuration GRpcKeepAliveTimeout_;
765766
const bool GRpcKeepAlivePermitWithoutCalls_;
766-
const ui64 MemoryQuota_;
767-
const ui64 MaxInboundMessageSize_;
768-
const ui64 MaxOutboundMessageSize_;
769-
const ui64 MaxMessageSize_;
767+
const std::uint64_t MemoryQuota_;
768+
const std::uint64_t MaxInboundMessageSize_;
769+
const std::uint64_t MaxOutboundMessageSize_;
770+
const std::uint64_t MaxMessageSize_;
770771

771772
std::atomic_int64_t QueuedRequests_;
772773
const NYdbGrpc::TTcpKeepAliveSettings TcpKeepAliveSettings_;
@@ -782,7 +783,7 @@ class TGRpcConnectionsImpl
782783

783784
IDiscoveryMutatorApi::TMutatorCb DiscoveryMutatorCb;
784785

785-
const size_t NetworkThreadsNum_;
786+
const std::size_t NetworkThreadsNum_;
786787
bool UsePerChannelTcpConnection_;
787788
// Must be the last member (first called destructor)
788789
NYdbGrpc::TGRpcClientLow GRpcClientLow_;

ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/params.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/common/types.h>
77
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/common_client/ssl_credentials.h>
88
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h>
9+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h>
910

1011
namespace NYdb::inline Dev {
1112

@@ -33,6 +34,7 @@ class IConnectionsParams {
3334
virtual uint64_t GetMaxInboundMessageSize() const = 0;
3435
virtual uint64_t GetMaxOutboundMessageSize() const = 0;
3536
virtual uint64_t GetMaxMessageSize() const = 0;
37+
virtual std::shared_ptr<IExecutor> GetExecutor() const = 0;
3638
};
3739

3840
} // namespace NYdb
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#define INCLUDE_YDB_INTERNAL_H
2+
#include "response_queue.h"
3+
4+
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool/pool.h>
5+
6+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/exceptions/exceptions.h>
7+
8+
9+
namespace NYdb::inline Dev {
10+
11+
class TOwnedResponseQueue: public IResponseQueue {
12+
public:
13+
TOwnedResponseQueue(size_t clientThreadsNum, size_t maxQueuedResponses)
14+
: ClientThreadsNum_(clientThreadsNum)
15+
, MaxQueuedResponses_(maxQueuedResponses)
16+
, ThreadPool_(CreateThreadPool(clientThreadsNum))
17+
{
18+
}
19+
20+
void Start() override {
21+
ThreadPool_->Start(ClientThreadsNum_, MaxQueuedResponses_);
22+
}
23+
24+
void Stop() override {
25+
ThreadPool_->Stop();
26+
}
27+
28+
void Post(IObjectInQueue* action) override {
29+
Y_ENSURE(ThreadPool_->Add(action));
30+
}
31+
32+
private:
33+
const std::size_t ClientThreadsNum_;
34+
const std::size_t MaxQueuedResponses_;
35+
36+
std::unique_ptr<IThreadPool> ThreadPool_;
37+
};
38+
39+
class TExecutorResponseQueue : public IResponseQueue {
40+
public:
41+
TExecutorResponseQueue(std::shared_ptr<IExecutor> executor)
42+
: Executor_(executor)
43+
{
44+
}
45+
46+
void Start() override { }
47+
48+
void Stop() override { }
49+
50+
void Post(IObjectInQueue* action) override {
51+
Executor_->Post([action]() {
52+
action->Process(nullptr);
53+
});
54+
}
55+
56+
private:
57+
std::shared_ptr<IExecutor> Executor_;
58+
};
59+
60+
std::unique_ptr<IResponseQueue> CreateResponseQueue(std::shared_ptr<IConnectionsParams> params) {
61+
if (params->GetExecutor()) {
62+
return std::make_unique<TExecutorResponseQueue>(params->GetExecutor());
63+
}
64+
return std::make_unique<TOwnedResponseQueue>(params->GetClientThreadsNum(), params->GetMaxQueuedResponses());
65+
}
66+
67+
} // namespace NYdb
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#pragma once
2+
3+
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/internal_header.h>
4+
5+
#include "params.h"
6+
7+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h>
8+
9+
10+
namespace NYdb::inline Dev {
11+
12+
class IResponseQueue {
13+
public:
14+
virtual void Start() = 0;
15+
virtual void Stop() = 0;
16+
17+
virtual void Post(IObjectInQueue* action) = 0;
18+
19+
virtual ~IResponseQueue() = default;
20+
};
21+
22+
std::unique_ptr<IResponseQueue> CreateResponseQueue(std::shared_ptr<IConnectionsParams> params);
23+
24+
} // namespace NYdb

ydb/public/sdk/cpp/src/client/impl/ydb_internal/grpc_connections/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
actions.cpp
55
grpc_connections.cpp
6+
response_queue.cpp
67
)
78

89
PEERDIR(
@@ -14,6 +15,7 @@ PEERDIR(
1415
ydb/public/sdk/cpp/src/client/impl/ydb_stats
1516
ydb/public/sdk/cpp/src/client/resources
1617
ydb/public/sdk/cpp/src/client/types/exceptions
18+
ydb/public/sdk/cpp/src/client/types/executor
1719
)
1820

1921
END()

ydb/public/sdk/cpp/src/client/topic/common/executor_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "executor_impl.h"
22

3+
#include <ydb/public/sdk/cpp/src/client/impl/ydb_internal/thread_pool/pool.h>
4+
35
namespace NYdb::inline Dev::NTopic {
46

57
void IAsyncExecutor::Post(TFunction&& f) {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/executor/executor.h>
2+
3+
namespace NYdb::inline Dev {
4+
5+
class TThreadPoolExecutor : public IExecutor {
6+
public:
7+
TThreadPoolExecutor(std::shared_ptr<IThreadPool> threadPool)
8+
: ThreadPool(threadPool)
9+
{
10+
}
11+
12+
void Post(std::function<void()>&& f) override {
13+
ThreadPool->SafeAddFunc(std::move(f));
14+
}
15+
16+
private:
17+
std::shared_ptr<IThreadPool> ThreadPool;
18+
};
19+
20+
std::shared_ptr<IExecutor> CreateThreadPoolExecutorAdapter(std::shared_ptr<IThreadPool> threadPool) {
21+
return std::make_shared<TThreadPoolExecutor>(threadPool);
22+
}
23+
24+
} // namespace NYdb::inline Dev
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
LIBRARY()
2+
3+
SRCS(
4+
executor.cpp
5+
)
6+
7+
PEERDIR(
8+
ydb/public/sdk/cpp/src/client/types/exceptions
9+
)
10+
11+
END()

0 commit comments

Comments
 (0)