Skip to content

Commit c957c93

Browse files
committed
Allow to skip discovery routine in the C++ SDK for driver/client scope. (#16226)
1 parent 172ee0a commit c957c93

File tree

4 files changed

+102
-32
lines changed

4 files changed

+102
-32
lines changed

include/ydb-cpp-sdk/client/types/ydb.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ enum class EDiscoveryMode {
1616
//! we got endpoint list. The error will be returned if the endpoint list
1717
//! is empty and discovery failed
1818
//! This method is a bit more "user friendly" but can produce additional hidden latency
19-
Async
19+
Async,
20+
//! Do not perform discovery.
21+
//! This option disables database discovery and allow to use user provided endpoint for grpc connections
22+
Off
2023
};
2124

2225
enum class EBalancingPolicy {

src/client/impl/ydb_internal/db_driver_state/state.cpp

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -205,27 +205,32 @@ TDbDriverStatePtr TDbDriverStateTracker::GetDriverState(
205205
? credentialsProviderFactory->CreateProvider(strongState)
206206
: CreateInsecureCredentialsProviderFactory()->CreateProvider(strongState));
207207

208-
DiscoveryClient_->AddPeriodicTask(CreatePeriodicDiscoveryTask(strongState), DISCOVERY_RECHECK_PERIOD);
208+
if (discoveryMode != EDiscoveryMode::Off) {
209+
DiscoveryClient_->AddPeriodicTask(CreatePeriodicDiscoveryTask(strongState), DISCOVERY_RECHECK_PERIOD);
210+
}
209211
Y_ABORT_UNLESS(States_.emplace(key, strongState).second);
210212
break;
211213
}
212214
}
213-
auto updateResult = strongState->EndpointPool.UpdateAsync();
214-
if (updateResult.second) {
215-
auto cb = [strongState](const NThreading::TFuture<TEndpointUpdateResult>&) {
216-
strongState->SignalDiscoveryCompleted();
217-
};
218-
updateResult.first.Subscribe(cb);
219-
}
220215

221-
if (strongState->DiscoveryMode == EDiscoveryMode::Sync) {
222-
const auto& discoveryStatus = updateResult.first.GetValueSync().DiscoveryStatus;
223-
// Almost always true, except the situation when the current thread was
224-
// preempted just before UpdateAsync call and other one get
225-
// state from cache and call UpdateAsync before us.
226-
if (Y_LIKELY(updateResult.second)) {
227-
std::unique_lock guard(strongState->LastDiscoveryStatusRWLock);
228-
strongState->LastDiscoveryStatus = discoveryStatus;
216+
if (strongState->DiscoveryMode != EDiscoveryMode::Off) {
217+
auto updateResult = strongState->EndpointPool.UpdateAsync();
218+
if (updateResult.second) {
219+
auto cb = [strongState](const NThreading::TFuture<TEndpointUpdateResult>&) {
220+
strongState->SignalDiscoveryCompleted();
221+
};
222+
updateResult.first.Subscribe(cb);
223+
}
224+
225+
if (strongState->DiscoveryMode == EDiscoveryMode::Sync) {
226+
const auto& discoveryStatus = updateResult.first.GetValueSync().DiscoveryStatus;
227+
// Almost always true, except the situation when the current thread was
228+
// preempted just before UpdateAsync call and other one get
229+
// state from cache and call UpdateAsync before us.
230+
if (Y_LIKELY(updateResult.second)) {
231+
std::unique_lock guard(strongState->LastDiscoveryStatusRWLock);
232+
strongState->LastDiscoveryStatus = discoveryStatus;
233+
}
229234
}
230235
}
231236

src/client/impl/ydb_internal/grpc_connections/grpc_connections.h

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,20 +98,22 @@ class TGRpcConnectionsImpl
9898
clientConfig.MaxOutboundMessageSize = MaxOutboundMessageSize_;
9999
}
100100

101-
if (std::is_same<TService,Ydb::Discovery::V1::DiscoveryService>()
102-
|| dbState->Database.empty()
103-
|| endpointPolicy == TRpcRequestSettings::TEndpointPolicy::UseDiscoveryEndpoint)
104-
{
105-
SetGrpcKeepAlive(clientConfig, GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY, GRpcKeepAlivePermitWithoutCalls_);
106-
} else {
107-
auto endpoint = dbState->EndpointPool.GetEndpoint(preferredEndpoint, endpointPolicy == TRpcRequestSettings::TEndpointPolicy::UsePreferredEndpointStrictly);
108-
if (!endpoint) {
109-
return {nullptr, TEndpointKey()};
110-
}
111-
clientConfig.Locator = endpoint.Endpoint;
112-
clientConfig.SslTargetNameOverride = endpoint.SslTargetNameOverride;
113-
if (GRpcKeepAliveTimeout_) {
114-
SetGrpcKeepAlive(clientConfig, GRpcKeepAliveTimeout_, GRpcKeepAlivePermitWithoutCalls_);
101+
if (dbState->DiscoveryMode != EDiscoveryMode::Off) {
102+
if (std::is_same<TService,Ydb::Discovery::V1::DiscoveryService>()
103+
|| dbState->Database.empty()
104+
|| endpointPolicy == TRpcRequestSettings::TEndpointPolicy::UseDiscoveryEndpoint)
105+
{
106+
SetGrpcKeepAlive(clientConfig, GRPC_KEEP_ALIVE_TIMEOUT_FOR_DISCOVERY, GRpcKeepAlivePermitWithoutCalls_);
107+
} else {
108+
auto endpoint = dbState->EndpointPool.GetEndpoint(preferredEndpoint, endpointPolicy == TRpcRequestSettings::TEndpointPolicy::UsePreferredEndpointStrictly);
109+
if (!endpoint) {
110+
return {nullptr, TEndpointKey()};
111+
}
112+
clientConfig.Locator = endpoint.Endpoint;
113+
clientConfig.SslTargetNameOverride = endpoint.SslTargetNameOverride;
114+
if (GRpcKeepAliveTimeout_) {
115+
SetGrpcKeepAlive(clientConfig, GRpcKeepAliveTimeout_, GRpcKeepAlivePermitWithoutCalls_);
116+
}
115117
}
116118
}
117119

@@ -609,7 +611,17 @@ class TGRpcConnectionsImpl
609611
TEndpointKey endpoint;
610612
std::tie(serviceConnection, endpoint) = GetServiceConnection<TService>(dbState, preferredEndpoint, endpointPolicy);
611613
if (!serviceConnection) {
612-
if (dbState->DiscoveryMode == EDiscoveryMode::Sync) {
614+
if (dbState->DiscoveryMode == EDiscoveryMode::Off) {
615+
TStringStream errString;
616+
errString << "No endpoint for database " << dbState->Database;
617+
errString << ", cluster endpoint " << dbState->DiscoveryEndpoint;
618+
dbState->StatCollector.IncReqFailNoEndpoint();
619+
callback(
620+
TPlainStatus(EStatus::UNAVAILABLE, errString.Str()),
621+
TConnection{nullptr},
622+
TEndpointKey{ });
623+
624+
} else if (dbState->DiscoveryMode == EDiscoveryMode::Sync) {
613625
TStringStream errString;
614626
errString << "Endpoint list is empty for database " << dbState->Database;
615627
errString << ", cluster endpoint " << dbState->DiscoveryEndpoint;

tests/unit/client/driver/driver_ut.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,54 @@ Y_UNIT_TEST_SUITE(CppGrpcClientSimpleTest) {
183183
auto session = sessionResult.GetSession();
184184
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), "my-session-id");
185185
}
186+
187+
Y_UNIT_TEST(WithoutDiscoveryDriverLevel) {
188+
TPortManager pm;
189+
190+
// Start our mock table service
191+
TMockTableService tableService;
192+
ui16 tablePort = pm.GetPort();
193+
auto tableServer = StartGrpcServer(
194+
TStringBuilder() << "127.0.0.1:" << tablePort,
195+
tableService);
196+
197+
auto driver = TDriver(
198+
TDriverConfig()
199+
.SetEndpoint(TStringBuilder() << "localhost:" << tablePort)
200+
.SetDiscoveryMode(EDiscoveryMode::Off)
201+
.SetDatabase("/Root/My/DB"));
202+
auto client = NTable::TTableClient(driver);
203+
auto sessionFuture = client.CreateSession();
204+
205+
UNIT_ASSERT(sessionFuture.Wait(TDuration::Seconds(10)));
206+
auto sessionResult = sessionFuture.ExtractValueSync();
207+
UNIT_ASSERT(sessionResult.IsSuccess());
208+
auto session = sessionResult.GetSession();
209+
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), "my-session-id");
210+
}
211+
212+
Y_UNIT_TEST(WithoutDiscoveryClientLevel) {
213+
TPortManager pm;
214+
215+
// Start our mock table service
216+
TMockTableService tableService;
217+
ui16 tablePort = pm.GetPort();
218+
auto tableServer = StartGrpcServer(
219+
TStringBuilder() << "127.0.0.1:" << tablePort,
220+
tableService);
221+
222+
auto driver = TDriver(
223+
TDriverConfig()
224+
.SetEndpoint(TStringBuilder() << "localhost:" << tablePort)
225+
.SetDatabase("/Root/My/DB"));
226+
auto client = NTable::TTableClient(driver, TClientSettings().DiscoveryMode(EDiscoveryMode::Off));
227+
auto sessionFuture = client.CreateSession();
228+
229+
UNIT_ASSERT(sessionFuture.Wait(TDuration::Seconds(10)));
230+
auto sessionResult = sessionFuture.ExtractValueSync();
231+
UNIT_ASSERT(sessionResult.IsSuccess());
232+
auto session = sessionResult.GetSession();
233+
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), "my-session-id");
234+
}
235+
186236
}

0 commit comments

Comments
 (0)