Skip to content

Commit 3606e07

Browse files
authored
24-3: Enable/disable ssl connections, return connection_string in API (#7147)
1 parent e8f2390 commit 3606e07

File tree

12 files changed

+113
-25
lines changed

12 files changed

+113
-25
lines changed

ydb/core/grpc_services/rpc_replication.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
#include <google/protobuf/util/time_util.h>
1313

14+
#include <util/string/builder.h>
15+
1416
namespace NKikimr::NGRpcService {
1517

1618
using namespace Ydb;
@@ -138,9 +140,18 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
138140
return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
139141
}
140142

143+
static TString BuildConnectionString(const NKikimrReplication::TConnectionParams& params) {
144+
return TStringBuilder()
145+
<< (params.GetEnableSsl() ? "grpcs://" : "grpc://")
146+
<< params.GetEndpoint()
147+
<< "/?database=" << params.GetDatabase();
148+
}
149+
141150
static void ConvertConnectionParams(const NKikimrReplication::TConnectionParams& from, Ydb::Replication::ConnectionParams& to) {
142151
to.set_endpoint(from.GetEndpoint());
143152
to.set_database(from.GetDatabase());
153+
to.set_enable_ssl(from.GetEnableSsl());
154+
to.set_connection_string(BuildConnectionString(from));
144155

145156
switch (from.GetCredentialsCase()) {
146157
case NKikimrReplication::TConnectionParams::kStaticCredentials:

ydb/core/kqp/host/kqp_gateway_proxy.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1901,6 +1901,7 @@ class TKqpGatewayProxy : public IKikimrGateway {
19011901
const auto parseResult = NYdb::ParseConnectionString(*connectionString);
19021902
params.SetEndpoint(parseResult.Endpoint);
19031903
params.SetDatabase(parseResult.Database);
1904+
params.SetEnableSsl(parseResult.EnableSsl);
19041905
}
19051906
if (const auto& endpoint = settings.Settings.Endpoint) {
19061907
params.SetEndpoint(*endpoint);

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6055,6 +6055,61 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
60556055
}
60566056
}
60576057

6058+
void AsyncReplicationConnectionParams(TKikimrRunner& kikimr, const TString& connectionParam, bool ssl = false) {
6059+
using namespace NReplication;
6060+
6061+
auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
6062+
auto db = kikimr.GetTableClient();
6063+
auto session = db.CreateSession().GetValueSync().GetSession();
6064+
6065+
{
6066+
auto query = R"(
6067+
--!syntax_v1
6068+
CREATE TABLE `/Root/table` (Key Uint64, Value String, PRIMARY KEY (Key));
6069+
)";
6070+
6071+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
6072+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
6073+
}
6074+
{
6075+
auto query = Sprintf(R"(
6076+
--!syntax_v1
6077+
CREATE ASYNC REPLICATION `/Root/replication` FOR
6078+
`/Root/table` AS `/Root/replica`
6079+
WITH (
6080+
%s, TOKEN = "root@builtin"
6081+
);
6082+
)", connectionParam.c_str());
6083+
6084+
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
6085+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
6086+
}
6087+
{
6088+
const auto result = repl.DescribeReplication("/Root/replication").ExtractValueSync();
6089+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
6090+
6091+
const auto& params = result.GetReplicationDescription().GetConnectionParams();
6092+
UNIT_ASSERT_VALUES_EQUAL(params.GetDiscoveryEndpoint(), kikimr.GetEndpoint());
6093+
UNIT_ASSERT_VALUES_EQUAL(params.GetDatabase(), "/Root");
6094+
UNIT_ASSERT_VALUES_EQUAL(params.GetEnableSsl(), ssl);
6095+
}
6096+
}
6097+
6098+
Y_UNIT_TEST(AsyncReplicationConnectionString) {
6099+
TKikimrRunner kikimr;
6100+
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(CONNECTION_STRING = "grpc://%s/?database=/Root")", kikimr.GetEndpoint().c_str()));
6101+
}
6102+
6103+
Y_UNIT_TEST(AsyncReplicationConnectionStringWithSsl) {
6104+
TKikimrRunner kikimr;
6105+
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(CONNECTION_STRING = "grpcs://%s/?database=/Root")", kikimr.GetEndpoint().c_str()), true);
6106+
}
6107+
6108+
Y_UNIT_TEST(AsyncReplicationEndpointAndDatabase) {
6109+
TKikimrRunner kikimr;
6110+
AsyncReplicationConnectionParams(kikimr, Sprintf(R"(ENDPOINT = "%s", DATABASE = "/Root")", kikimr.GetEndpoint().c_str()));
6111+
}
6112+
60586113
Y_UNIT_TEST(DisableResourcePools) {
60596114
TKikimrRunner kikimr(TKikimrSettings().SetEnableResourcePools(false));
60606115
auto db = kikimr.GetTableClient();

ydb/core/protos/replication.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ message TOAuthToken {
1919
message TConnectionParams {
2020
optional string Endpoint = 1;
2121
optional string Database = 2;
22+
optional bool EnableSsl = 5;
2223
// credentials
2324
oneof Credentials {
2425
TStaticCredentials StaticCredentials = 3;

ydb/core/tx/replication/controller/replication.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,22 @@ class TReplication::TImpl: public TLagProvider {
113113
if (!YdbProxy && !(State == EState::Removing && !Targets)) {
114114
THolder<IActor> ydbProxy;
115115
const auto& params = Config.GetSrcConnectionParams();
116+
const auto& endpoint = params.GetEndpoint();
117+
const auto& database = params.GetDatabase();
118+
const bool ssl = params.GetEnableSsl();
116119

117120
switch (params.GetCredentialsCase()) {
118121
case NKikimrReplication::TConnectionParams::kStaticCredentials:
119122
if (!params.GetStaticCredentials().HasPassword()) {
120123
return ResolveSecret(params.GetStaticCredentials().GetPasswordSecretName(), ctx);
121124
}
122-
ydbProxy.Reset(CreateYdbProxy(params.GetEndpoint(), params.GetDatabase(), params.GetStaticCredentials()));
125+
ydbProxy.Reset(CreateYdbProxy(endpoint, database, ssl, params.GetStaticCredentials()));
123126
break;
124127
case NKikimrReplication::TConnectionParams::kOAuthToken:
125128
if (!params.GetOAuthToken().HasToken()) {
126129
return ResolveSecret(params.GetOAuthToken().GetTokenSecretName(), ctx);
127130
}
128-
ydbProxy.Reset(CreateYdbProxy(params.GetEndpoint(), params.GetDatabase(), params.GetOAuthToken().GetToken()));
131+
ydbProxy.Reset(CreateYdbProxy(endpoint, database, ssl, params.GetOAuthToken().GetToken()));
129132
break;
130133
default:
131134
ErrorState(TStringBuilder() << "Unexpected credentials: " << params.GetCredentialsCase());

ydb/core/tx/replication/service/service.cpp

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ class TSessionInfo {
125125

126126
}; // TSessionInfo
127127

128-
struct TCredentialsKey: std::tuple<TString, TString, TString> {
129-
explicit TCredentialsKey(const TString& endpoint, const TString& database, const TString& user)
130-
: std::tuple<TString, TString, TString>(endpoint, database, user)
128+
struct TCredentialsKey: std::tuple<TString, TString, bool, TString> {
129+
explicit TCredentialsKey(const TString& endpoint, const TString& database, bool ssl, const TString& user)
130+
: std::tuple<TString, TString, bool, TString>(endpoint, database, ssl, user)
131131
{
132132
}
133133

@@ -139,12 +139,20 @@ struct TCredentialsKey: std::tuple<TString, TString, TString> {
139139
return std::get<1>(*this);
140140
}
141141

142+
bool EnableSsl() const {
143+
return std::get<2>(*this);
144+
}
145+
142146
static TCredentialsKey FromParams(const NKikimrReplication::TConnectionParams& params) {
147+
const auto& endpoint = params.GetEndpoint();
148+
const auto& database = params.GetDatabase();
149+
const bool ssl = params.GetEnableSsl();
150+
143151
switch (params.GetCredentialsCase()) {
144152
case NKikimrReplication::TConnectionParams::kStaticCredentials:
145-
return TCredentialsKey(params.GetEndpoint(), params.GetDatabase(), params.GetStaticCredentials().GetUser());
153+
return TCredentialsKey(endpoint, database, ssl, params.GetStaticCredentials().GetUser());
146154
case NKikimrReplication::TConnectionParams::kOAuthToken:
147-
return TCredentialsKey(params.GetEndpoint(), params.GetDatabase(), params.GetOAuthToken().GetToken() /* TODO */);
155+
return TCredentialsKey(endpoint, database, ssl, params.GetOAuthToken().GetToken());
148156
default:
149157
Y_ABORT("Unexpected credentials");
150158
}
@@ -155,7 +163,7 @@ struct TCredentialsKey: std::tuple<TString, TString, TString> {
155163
} // NKikimr::NReplication::NService
156164

157165
template <>
158-
struct THash<NKikimr::NReplication::NService::TCredentialsKey> : THash<std::tuple<TString, TString, TString>> {};
166+
struct THash<NKikimr::NReplication::NService::TCredentialsKey> : THash<std::tuple<TString, TString, bool, TString>> {};
159167

160168
namespace NKikimr::NReplication {
161169

@@ -212,7 +220,7 @@ class TReplicationService: public TActorBootstrapped<TReplicationService> {
212220
const TActorId& GetOrCreateYdbProxy(TCredentialsKey&& key, Args&&... args) {
213221
auto it = YdbProxies.find(key);
214222
if (it == YdbProxies.end()) {
215-
auto ydbProxy = Register(CreateYdbProxy(key.Endpoint(), key.Database(), std::forward<Args>(args)...));
223+
auto ydbProxy = Register(CreateYdbProxy(key.Endpoint(), key.Database(), key.EnableSsl(), std::forward<Args>(args)...));
216224
auto res = YdbProxies.emplace(std::move(key), std::move(ydbProxy));
217225
Y_ABORT_UNLESS(res.second);
218226
it = res.first;

ydb/core/tx/replication/ut_helpers/test_env.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class TEnv {
3232
Database = "/" + ToString(DomainName);
3333

3434
YdbProxy = Server.GetRuntime()->Register(CreateYdbProxy(
35-
Endpoint, UseDatabase ? Database : "", std::forward<Args>(args)...));
35+
Endpoint, UseDatabase ? Database : "", false /* ssl */, std::forward<Args>(args)...));
3636
Sender = Server.GetRuntime()->AllocateEdgeActor();
3737
}
3838

ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -419,20 +419,21 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
419419
Call<TEvYdbProxy::TEvCommitOffsetResponse>(ev, &TTopicClient::CommitOffset);
420420
}
421421

422-
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database) {
422+
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, bool ssl) {
423423
return TCommonClientSettings()
424424
.DiscoveryEndpoint(endpoint)
425425
.DiscoveryMode(EDiscoveryMode::Async)
426-
.Database(database);
426+
.Database(database)
427+
.SslCredentials(ssl);
427428
}
428429

429-
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, const TString& token) {
430-
return MakeSettings(endpoint, database)
430+
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, bool ssl, const TString& token) {
431+
return MakeSettings(endpoint, database, ssl)
431432
.AuthToken(token);
432433
}
433434

434-
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, const TStaticCredentials& credentials) {
435-
return MakeSettings(endpoint, database)
435+
static TCommonClientSettings MakeSettings(const TString& endpoint, const TString& database, bool ssl, const TStaticCredentials& credentials) {
436+
return MakeSettings(endpoint, database, ssl)
436437
.CredentialsProviderFactory(CreateLoginCredentialsProviderFactory({
437438
.User = credentials.GetUser(),
438439
.Password = credentials.GetPassword(),
@@ -485,16 +486,16 @@ class TYdbProxy: public TBaseProxyActor<TYdbProxy> {
485486

486487
}; // TYdbProxy
487488

488-
IActor* CreateYdbProxy(const TString& endpoint, const TString& database) {
489-
return new TYdbProxy(endpoint, database);
489+
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl) {
490+
return new TYdbProxy(endpoint, database, ssl);
490491
}
491492

492-
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const TString& token) {
493-
return new TYdbProxy(endpoint, database, token);
493+
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl, const TString& token) {
494+
return new TYdbProxy(endpoint, database, ssl, token);
494495
}
495496

496-
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const TStaticCredentials& credentials) {
497-
return new TYdbProxy(endpoint, database, credentials);
497+
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl, const TStaticCredentials& credentials) {
498+
return new TYdbProxy(endpoint, database, ssl, credentials);
498499
}
499500

500501
}

ydb/core/tx/replication/ydb_proxy/ydb_proxy.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,9 @@ struct TEvYdbProxy {
255255

256256
#pragma pop_macro("RemoveDirectory")
257257

258-
IActor* CreateYdbProxy(const TString& endpoint, const TString& database);
259-
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, const TString& token);
260-
IActor* CreateYdbProxy(const TString& endpoint, const TString& database,
258+
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl);
259+
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl, const TString& token);
260+
IActor* CreateYdbProxy(const TString& endpoint, const TString& database, bool ssl,
261261
const NKikimrReplication::TStaticCredentials& credentials);
262262

263263
}

ydb/public/api/protos/draft/ydb_replication.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ message ConnectionParams {
3636

3737
string endpoint = 1;
3838
string database = 2;
39+
bool enable_ssl = 5;
40+
string connection_string = 6;
3941

4042
oneof credentials {
4143
StaticCredentials static_credentials = 3;

0 commit comments

Comments
 (0)