Skip to content

Commit ee016ff

Browse files
1 parent 3a2566e commit ee016ff

File tree

14 files changed

+233
-13
lines changed

14 files changed

+233
-13
lines changed

ydb/core/external_sources/external_data_source.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ struct TExternalDataSource : public IExternalSource {
3636
ythrow TExternalSourceException() << "Only external table supports parameters";
3737
}
3838

39+
bool IsRDBMSDataSource(const TProtoStringType& sourceType) const {
40+
return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "Clickhouse"}, sourceType);
41+
}
42+
3943
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {
4044
NKikimrSchemeOp::TExternalDataSourceDescription proto;
4145
if (!proto.ParseFromString(externalDataSourceDescription)) {
@@ -49,6 +53,10 @@ struct TExternalDataSource : public IExternalSource {
4953
ythrow TExternalSourceException() << "Unsupported property: " << key;
5054
}
5155

56+
if (IsRDBMSDataSource(proto.GetSourceType()) && !proto.GetProperties().GetProperties().contains("database_name")){
57+
ythrow TExternalSourceException() << proto.GetSourceType() << " source must provide database_name";
58+
}
59+
5260
ValidateHostname(HostnamePatterns, proto.GetLocation());
5361
}
5462

ydb/core/fq/libs/actors/clusters_from_connections.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,18 @@ void AddClustersFromConnections(
283283
clusters.emplace(connectionName, GenericProviderName);
284284
break;
285285
}
286+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
287+
FillGenericClusterConfig(
288+
common,
289+
*gatewaysConfig.MutableGeneric()->AddClusterMapping(),
290+
conn.content().setting().mysql_cluster(),
291+
connectionName,
292+
NYql::NConnector::NApi::EDataSourceKind::MYSQL,
293+
authToken,
294+
accountIdSignatures);
295+
clusters.emplace(connectionName, GenericProviderName);
296+
break;
297+
}
286298

287299
// Do not replace with default. Adding a new connection should cause a compilation error
288300
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:

ydb/core/fq/libs/actors/database_resolver.cpp

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
319319
// There are two kinds of managed YDBs: serverless and dedicated.
320320
// While working with dedicated databases, we have to use underlay network.
321321
// That's why we add `u-` prefix to database fqdn.
322-
if (databaseInfo.GetMap().contains("dedicatedDatabase")) {
322+
if (databaseInfo.GetMap().contains("storageConfig")) {
323323
endpoint = "u-" + endpoint;
324324
host = "u-" + host;
325325
}
@@ -335,7 +335,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
335335
{
336336
auto ret = ydbParser(databaseInfo, mdbEndpointGenerator, useTls, protocol);
337337
// TODO: Take explicit field from MVP
338-
bool isDedicatedDb = databaseInfo.GetMap().contains("dedicatedDatabase");
338+
bool isDedicatedDb = databaseInfo.GetMap().contains("storageConfig");
339339
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
340340
// Replace "ydb." -> "yds."
341341
ret.Endpoint[2] = 's';
@@ -457,6 +457,56 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
457457

458458
endpoint = mdbEndpointGenerator->ToEndpoint(params);
459459

460+
return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
461+
};
462+
Parsers[NYql::EDatabaseType::MySQL] = [](
463+
NJson::TJsonValue& databaseInfo,
464+
const NYql::IMdbEndpointGenerator::TPtr& mdbEndpointGenerator,
465+
bool useTls,
466+
NConnector::NApi::EProtocol protocol
467+
) {
468+
NYql::IMdbEndpointGenerator::TEndpoint endpoint;
469+
TVector<TString> aliveHosts;
470+
471+
const auto& hostsArray = databaseInfo.GetMap().at("hosts").GetArraySafe();
472+
473+
for (const auto& host : hostsArray) {
474+
const auto& hostMap = host.GetMap();
475+
476+
if (!hostMap.contains("services")) {
477+
// indicates that cluster is down
478+
continue;
479+
}
480+
481+
const auto& servicesArray = hostMap.at("services").GetArraySafe();
482+
483+
// check if all services of a particular host are alive
484+
const bool alive = std::all_of(
485+
servicesArray.begin(),
486+
servicesArray.end(),
487+
[](const auto& service) {
488+
return service["health"].GetString() == "ALIVE";
489+
}
490+
);
491+
492+
if (alive) {
493+
aliveHosts.push_back(host["name"].GetString());
494+
}
495+
}
496+
497+
if (aliveHosts.empty()) {
498+
ythrow TCodeLineException(TIssuesIds::INTERNAL_ERROR) << "No ALIVE MySQL hosts found";
499+
}
500+
501+
NYql::IMdbEndpointGenerator::TParams params = {
502+
.DatabaseType = NYql::EDatabaseType::MySQL,
503+
.MdbHost = aliveHosts[std::rand() % static_cast<int>(aliveHosts.size())],
504+
.UseTls = useTls,
505+
.Protocol = protocol,
506+
};
507+
508+
endpoint = mdbEndpointGenerator->ToEndpoint(params);
509+
460510
return TDatabaseDescription{"", endpoint.first, endpoint.second, "", useTls};
461511
};
462512
}
@@ -538,7 +588,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
538588
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
539589
.AddUrlParam("databaseId", databaseId)
540590
.Build();
541-
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL}, databaseType)) {
591+
} else if (IsIn({NYql::EDatabaseType::ClickHouse, NYql::EDatabaseType::PostgreSQL, NYql::EDatabaseType::MySQL}, databaseType)) {
542592
YQL_ENSURE(ev->Get()->MdbGateway, "empty MDB Gateway");
543593
url = TUrlBuilder(
544594
ev->Get()->MdbGateway + "/managed-" + NYql::DatabaseTypeLowercase(databaseType) + "/v1/clusters/")

ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
243243
R"(
244244
{
245245
"endpoint":"grpcs://lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1gtl2kg13him37quoo6/etn021us5r9rhld1vgbh",
246-
"dedicatedDatabase":{"resuorcePresetId": "medium"}
246+
"storageConfig":{"storageSizeLimit":107374182400}
247247
})",
248248
NYql::TDatabaseResolverResponse::TDatabaseDescription{
249249
TString{"u-lb.etnbrtlini51k7cinbdr.ydb.mdb.yandexcloud.net:2135"},
@@ -286,7 +286,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
286286
R"(
287287
{
288288
"endpoint":"grpcs://lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135/?database=/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh",
289-
"dedicatedDatabase":{"resourcePresetId": "medium"}
289+
"storageConfig":{"storageSizeLimit":107374182400}
290290
})",
291291
NYql::TDatabaseResolverResponse::TDatabaseDescription{
292292
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
@@ -474,6 +474,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
474474
issues
475475
);
476476
}
477+
477478
Y_UNIT_TEST(Greenplum_MasterNode) {
478479
Test(
479480
NYql::EDatabaseType::Greenplum,
@@ -505,7 +506,7 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
505506
TString(""),
506507
true},
507508
{});
508-
}
509+
}
509510

510511
Y_UNIT_TEST(Greenplum_PermissionDenied) {
511512
NYql::TIssues issues{
@@ -536,7 +537,79 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
536537
)",
537538
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
538539
issues);
539-
}
540+
}
541+
542+
Y_UNIT_TEST(MySQL) {
543+
Test(
544+
NYql::EDatabaseType::MySQL,
545+
NYql::NConnector::NApi::EProtocol::NATIVE,
546+
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
547+
"200",
548+
R"({
549+
"hosts": [
550+
{
551+
"services": [
552+
{
553+
"type": "POOLER",
554+
"health": "ALIVE"
555+
},
556+
{
557+
"type": "MYSQL",
558+
"health": "ALIVE"
559+
}
560+
],
561+
"name": "rc1b-eyt6dtobu96rwydq.mdb.yandexcloud.net",
562+
"clusterId": "c9qb2bjghs8onbncpamk",
563+
"zoneId": "ru-central1-b",
564+
"role": "MASTER",
565+
"health": "ALIVE"
566+
}
567+
]
568+
})",
569+
NYql::TDatabaseResolverResponse::TDatabaseDescription{
570+
TString{""},
571+
TString{"rc1b-eyt6dtobu96rwydq.db.yandex.net"},
572+
3306,
573+
TString(""),
574+
true
575+
},
576+
{});
577+
}
578+
579+
Y_UNIT_TEST(MySQL_PermissionDenied) {
580+
NYql::TIssues issues{
581+
NYql::TIssue(
582+
TStringBuilder{} << MakeErrorPrefix(
583+
"mdb.api.cloud.yandex.net:443",
584+
"/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
585+
"etn021us5r9rhld1vgbh",
586+
NYql::EDatabaseType::MySQL
587+
) << NoPermissionStr
588+
)
589+
};
590+
591+
Test(
592+
NYql::EDatabaseType::MySQL,
593+
NYql::NConnector::NApi::EProtocol::NATIVE,
594+
"https://mdb.api.cloud.yandex.net:443/managed-mysql/v1/clusters/etn021us5r9rhld1vgbh/hosts",
595+
"403",
596+
R"(
597+
{
598+
"code": 7,
599+
"message": "Permission denied",
600+
"details": [
601+
{
602+
"@type": "type.googleapis.com/google.rpc.RequestInfo",
603+
"requestId": "a943c092-d596-4e0e-ae7b-1f67f9d8164e"
604+
}
605+
]
606+
}
607+
)",
608+
NYql::TDatabaseResolverResponse::TDatabaseDescription{},
609+
issues
610+
);
611+
}
612+
540613

541614
Y_UNIT_TEST(DataStreams_PermissionDenied) {
542615
NYql::TIssues issues{

ydb/core/fq/libs/common/util.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ TString ExtractServiceAccountId(const FederatedQuery::ConnectionSetting& setting
129129
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
130130
return GetServiceAccountId(setting.greenplum_cluster().auth());
131131
}
132+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
133+
return GetServiceAccountId(setting.mysql_cluster().auth());
134+
}
132135
// Do not replace with default. Adding a new connection should cause a compilation error
133136
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
134137
break;
@@ -162,6 +165,8 @@ TMaybe<TString> GetLogin(const FederatedQuery::ConnectionSetting& setting) {
162165
return setting.postgresql_cluster().login();
163166
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
164167
return setting.greenplum_cluster().login();
168+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
169+
return setting.mysql_cluster().login();
165170
}
166171
}
167172

@@ -183,6 +188,8 @@ TMaybe<TString> GetPassword(const FederatedQuery::ConnectionSetting& setting) {
183188
return setting.postgresql_cluster().password();
184189
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
185190
return setting.greenplum_cluster().password();
191+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
192+
return setting.mysql_cluster().password();
186193
}
187194
}
188195

@@ -204,6 +211,8 @@ EYdbComputeAuth GetYdbComputeAuthMethod(const FederatedQuery::ConnectionSetting&
204211
return GetBasicAuthMethod(setting.postgresql_cluster().auth());
205212
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
206213
return GetBasicAuthMethod(setting.greenplum_cluster().auth());
214+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
215+
return GetBasicAuthMethod(setting.mysql_cluster().auth());
207216
}
208217
}
209218

@@ -223,6 +232,8 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection) {
223232
return connection.content().setting().postgresql_cluster().auth();
224233
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
225234
return connection.content().setting().greenplum_cluster().auth();
235+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
236+
return connection.content().setting().mysql_cluster().auth();
226237
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
227238
return FederatedQuery::IamAuth{};
228239
}

ydb/core/fq/libs/compute/common/config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ class TComputeConfig {
165165
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
166166
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
167167
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
168+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
168169
case FederatedQuery::ConnectionSetting::kYdbDatabase:
169170
return true;
170171
case FederatedQuery::ConnectionSetting::kDataStreams:

ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,19 @@ TString MakeCreateExternalDataSourceQuery(
247247
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true",
248248
"schema"_a = gpschema ? ", SCHEMA=" + EncloseAndEscapeString(gpschema, '"') : TString{});
249249

250+
}
251+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
252+
properties = fmt::format(
253+
R"(
254+
SOURCE_TYPE="MySQL",
255+
MDB_CLUSTER_ID={mdb_cluster_id},
256+
DATABASE_NAME={database_name},
257+
USE_TLS="{use_tls}"
258+
)",
259+
"mdb_cluster_id"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_id(), '"'),
260+
"database_name"_a = EncloseAndEscapeString(connectionContent.setting().mysql_cluster().database_name(), '"'),
261+
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");
262+
250263
}
251264
break;
252265
}

ydb/core/fq/libs/control_plane_proxy/utils/utils.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ TString ExtractServiceAccountIdWithConnection(const T& setting) {
3434
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
3535
return GetServiceAccountId(setting.greenplum_cluster().auth());
3636
}
37+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
38+
return GetServiceAccountId(setting.mysql_cluster().auth());
39+
}
3740
// Do not replace with default. Adding a new connection should cause a compilation error
3841
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
3942
break;

ydb/core/fq/libs/control_plane_storage/request_validators.cpp

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@ void ValidateGenericConnectionSetting(
2020
}
2121

2222
if (!connection.database_id() && !(connection.host() && connection.port())) {
23-
auto msg = TStringBuilder() << "content.setting.clickhouse_cluster.{database_id or host,port} field is not specified";
24-
issues.AddIssue( MakeErrorIssue(TIssuesIds::BAD_REQUEST,msg));
23+
auto msg = TStringBuilder() << "content.setting." << dataSourceKind << "_cluster.{database_id or host,port} field is not specified";
24+
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,msg));
2525
}
2626

27+
if (!connection.database_name()) {
28+
auto msg = TStringBuilder() << "content.setting." << dataSourceKind << "_cluster.database_name field is not specified";
29+
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST,msg));
30+
}
31+
2732
if (!connection.login()) {
2833
auto msg = TStringBuilder() << "content.setting." << dataSourceKind << "_cluster.login is not specified";
2934
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, msg));
@@ -70,17 +75,33 @@ NYql::TIssues ValidateConnectionSetting(
7075
break;
7176
}
7277
case FederatedQuery::ConnectionSetting::kGreenplumCluster: {
73-
const FederatedQuery::GreenplumCluster database = setting.greenplum_cluster();
74-
if (!database.has_auth() || database.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
78+
const FederatedQuery::GreenplumCluster& greenplumCluster = setting.greenplum_cluster();
79+
80+
if (!greenplumCluster.has_auth() || greenplumCluster.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
7581
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.greenplum_database.auth field is not specified"));
7682
}
7783

84+
if (greenplumCluster.auth().identity_case() == FederatedQuery::IamAuth::kCurrentIam && disableCurrentIam) {
85+
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
86+
}
87+
88+
if (!greenplumCluster.database_id() && !greenplumCluster.database_name()) {
89+
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.greenplum_database.{database_id or database_name} field is not specified"));
90+
}
91+
break;
92+
}
93+
case FederatedQuery::ConnectionSetting::kMysqlCluster: {
94+
const FederatedQuery::MySQLCluster database = setting.mysql_cluster();
95+
if (!database.has_auth() || database.auth().identity_case() == FederatedQuery::IamAuth::IDENTITY_NOT_SET) {
96+
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.mysql_database.auth field is not specified"));
97+
}
98+
7899
if (database.auth().identity_case() == FederatedQuery::IamAuth::kCurrentIam && disableCurrentIam) {
79100
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "current iam authorization is disabled"));
80101
}
81102

82103
if (!database.database_id() && !database.database_name()) {
83-
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.greenplum_database.{database_id or database_name} field is not specified"));
104+
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "content.setting.mysql_database.{database_id or database_name} field is not specified"));
84105
}
85106
break;
86107
}

ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ FederatedQuery::IamAuth::IdentityCase GetIamAuth(const FederatedQuery::Connectio
4141
return setting.postgresql_cluster().auth().identity_case();
4242
case FederatedQuery::ConnectionSetting::kGreenplumCluster:
4343
return setting.greenplum_cluster().auth().identity_case();
44+
case FederatedQuery::ConnectionSetting::kMysqlCluster:
45+
return setting.mysql_cluster().auth().identity_case();
4446
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
4547
return FederatedQuery::IamAuth::IDENTITY_NOT_SET;
4648
}

0 commit comments

Comments
 (0)