Skip to content

Commit c26c715

Browse files
YDB FQ: support MongoDB as an external data source (#15335)
Co-authored-by: Vitaly Isaev <vitalyisaev2@gmail.com>
1 parent 68db7f5 commit c26c715

File tree

9 files changed

+100
-54
lines changed

9 files changed

+100
-54
lines changed

ydb/core/external_sources/external_data_source.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ struct TExternalDataSource : public IExternalSource {
3737
}
3838

3939
bool DataSourceMustHaveDataBaseName(const TProtoStringType& sourceType) const {
40-
return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "ClickHouse"}, sourceType);
40+
return IsIn({"Greenplum", "PostgreSQL", "MySQL", "MsSQLServer", "ClickHouse", "MongoDB"}, sourceType);
4141
}
4242

4343
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const override {

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
148148
{
149149
ToString(NYql::EDatabaseType::Prometheus),
150150
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"protocol", "use_tls"}, hostnamePatternsRegEx)
151+
},
152+
{
153+
ToString(NYql::EDatabaseType::MongoDB),
154+
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC"}, {"database_name", "use_tls", "reading_mode", "unexpected_type_display_mode", "unsupported_type_display_mode"}, hostnamePatternsRegEx)
151155
}
152156
},
153157
availableExternalDataSources);

ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ TString GetOrEmpty(const NYql::TCreateObjectSettings& container, const TString&
8888
"service_name", // oracle
8989
"folder_id", // logging
9090
"use_ssl", // solomon
91-
"grpc_port" // solomon
91+
"grpc_port", // solomon
92+
"reading_mode", // mongodb
93+
"unexpected_type_display_mode", // mongodb
94+
"unsupported_type_display_mode", // mongodb
9295
};
9396

9497
auto& featuresExtractor = settings.GetFeaturesExtractor();

ydb/library/yql/providers/common/db_id_async_resolver/database_type.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ EDatabaseType DatabaseTypeFromDataSourceKind(NYql::EGenericDataSourceKind dataSo
4949
return EDatabaseType::Redis;
5050
case NYql::EGenericDataSourceKind::PROMETHEUS:
5151
return EDatabaseType::Prometheus;
52+
case NYql::EGenericDataSourceKind::MONGO_DB:
53+
return EDatabaseType::MongoDB;
5254
default:
5355
ythrow yexception() << "Unknown data source kind: " << NYql::EGenericDataSourceKind_Name(dataSourceKind);
5456
}
@@ -78,6 +80,8 @@ NYql::EGenericDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseType database
7880
return NYql::EGenericDataSourceKind::REDIS;
7981
case EDatabaseType::Prometheus:
8082
return NYql::EGenericDataSourceKind::PROMETHEUS;
83+
case EDatabaseType::MongoDB:
84+
return NYql::EGenericDataSourceKind::MONGO_DB;
8185
default:
8286
ythrow yexception() << "Unknown database type: " << ToString(databaseType);
8387
}

ydb/library/yql/providers/common/db_id_async_resolver/database_type.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ enum class EDatabaseType {
2121
Solomon,
2222
Iceberg,
2323
Redis,
24-
Prometheus
24+
Prometheus,
25+
MongoDB
2526
};
2627

2728
std::set<TString> GetAllExternalDataSourceTypes();

ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ namespace NYql::NDq {
5454
"LoggingGeneric",
5555
"IcebergGeneric",
5656
"RedisGeneric",
57-
"PrometheusGeneric"}) {
57+
"PrometheusGeneric",
58+
"MongoDBGeneric"}) {
5859
factory.RegisterSource<Generic::TSource>(name, readActorFactory);
5960
factory.RegisterLookupSource<Generic::TLookupSource>(name, lookupActorFactory);
6061
}

ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp

Lines changed: 12 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -120,36 +120,6 @@ namespace NYql {
120120
clusterConfig.SetDatabaseName(it->second);
121121
}
122122

123-
void ParseSchema(const THashMap<TString, TString>& properties,
124-
NYql::TGenericClusterConfig& clusterConfig) {
125-
auto it = properties.find("schema");
126-
if (it == properties.cend()) {
127-
// SCHEMA is optional field
128-
return;
129-
}
130-
131-
if (!it->second) {
132-
// SCHEMA is optional field
133-
return;
134-
}
135-
136-
clusterConfig.mutable_datasourceoptions()->insert({TString("schema"), TString(it->second)});
137-
}
138-
139-
void ParseServiceName(const THashMap<TString, TString>& properties,
140-
NYql::TGenericClusterConfig& clusterConfig) {
141-
auto it = properties.find("service_name");
142-
if (it == properties.cend()) {
143-
return;
144-
}
145-
146-
if (!it->second) {
147-
return;
148-
}
149-
150-
clusterConfig.mutable_datasourceoptions()->insert({TString("service_name"), TString(it->second)});
151-
}
152-
153123
void ParseMdbClusterId(const THashMap<TString, TString>& properties,
154124
NYql::TGenericClusterConfig& clusterConfig) {
155125
auto it = properties.find("mdb_cluster_id");
@@ -215,7 +185,8 @@ namespace NYql {
215185
EGenericDataSourceKind::MS_SQL_SERVER,
216186
EGenericDataSourceKind::ORACLE,
217187
EGenericDataSourceKind::ICEBERG,
218-
EGenericDataSourceKind::REDIS
188+
EGenericDataSourceKind::REDIS,
189+
EGenericDataSourceKind::MONGO_DB
219190
},
220191
clusterConfig.GetKind()
221192
)) {
@@ -277,20 +248,18 @@ namespace NYql {
277248
clusterConfig.SetServiceAccountIdSignature(it->second);
278249
}
279250

280-
void ParseFolderId(const THashMap<TString, TString>& properties,
281-
NYql::TGenericClusterConfig& clusterConfig) {
282-
auto it = properties.find("folder_id");
251+
void ParseOptionalField(const THashMap<TString, TString>& properties,
252+
NYql::TGenericClusterConfig& clusterConfig, const TString& fieldName) {
253+
auto it = properties.find(fieldName);
283254
if (it == properties.cend()) {
284-
// FOLDER_ID is optional field
285255
return;
286256
}
287257

288258
if (!it->second) {
289-
// FOLDER_ID is optional field
290259
return;
291260
}
292261

293-
clusterConfig.mutable_datasourceoptions()->insert({"folder_id", TString(it->second)});
262+
clusterConfig.mutable_datasourceoptions()->insert({fieldName, TString(it->second)});
294263
}
295264

296265
///
@@ -349,17 +318,20 @@ namespace NYql {
349318
ParseLocation(properties, clusterConfig);
350319
ParseUseTLS(properties, clusterConfig);
351320
ParseDatabaseName(properties, clusterConfig);
352-
ParseSchema(properties, clusterConfig);
353-
ParseServiceName(properties, clusterConfig);
354321
ParseMdbClusterId(properties, clusterConfig);
355322
ParseDatabaseId(properties, clusterConfig);
356323
ParseSourceType(properties, clusterConfig);
357324
ParseProtocol(properties, clusterConfig);
358325
ParseServiceAccountId(properties, clusterConfig);
359326
ParseServiceAccountIdSignature(properties, clusterConfig);
360327
ParseToken(properties, clusterConfig);
361-
ParseFolderId(properties, clusterConfig);
362328
ParseIcebergFields(properties, clusterConfig);
329+
ParseOptionalField(properties, clusterConfig, "schema");
330+
ParseOptionalField(properties, clusterConfig, "folder_id");
331+
ParseOptionalField(properties, clusterConfig, "reading_mode");
332+
ParseOptionalField(properties, clusterConfig, "service_name");
333+
ParseOptionalField(properties, clusterConfig, "unexpected_type_display_mode");
334+
ParseOptionalField(properties, clusterConfig, "unsupported_type_display_mode");
363335

364336
return clusterConfig;
365337
}

ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ namespace NYql {
4444
return "RedisGeneric";
4545
case NYql::EGenericDataSourceKind::PROMETHEUS:
4646
return "PrometheusGeneric";
47+
case NYql::EGenericDataSourceKind::MONGO_DB:
48+
return "MongoDBGeneric";
4749
default:
4850
throw yexception() << "Data source kind is unknown or not specified";
4951
}
@@ -286,6 +288,9 @@ namespace NYql {
286288
case NYql::EGenericDataSourceKind::ICEBERG:
287289
properties["SourceType"] = "Iceberg";
288290
break;
291+
case NYql::EGenericDataSourceKind::MONGO_DB:
292+
properties["SourceType"] = "MongoDB";
293+
break;
289294
case NYql::EGenericDataSourceKind::REDIS:
290295
properties["SourceType"] = "Redis";
291296
break;

ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ namespace NYql {
9898
TableDescriptions_.reserve(pendingTables.size());
9999

100100
for (const auto& tableAddress : pendingTables) {
101-
LoadTableMetadataFromConnector(tableAddress, handles);
101+
auto tIssues = LoadTableMetadataFromConnector(tableAddress, handles);
102+
if (!tIssues.Empty()) {
103+
ctx.AddError(TIssue(tIssues.ToString()));
104+
return TStatus::Error;
105+
}
102106
}
103107

104108
if (handles.empty()) {
@@ -111,22 +115,28 @@ namespace NYql {
111115
// clang-format on
112116

113117
private:
114-
void LoadTableMetadataFromConnector(const TGenericState::TTableAddress& tableAddress,
118+
TIssues LoadTableMetadataFromConnector(const TGenericState::TTableAddress& tableAddress,
115119
std::vector<NThreading::TFuture<void>>& handles) {
116120
const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(tableAddress.ClusterName);
117121
YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it,
118122
"cluster not found: " << tableAddress.ClusterName);
119123

124+
// preserve data source instance for the further usage
125+
auto emplaceIt =
126+
TableDescriptions_.emplace(tableAddress, std::make_shared<TTableDescription>());
127+
128+
auto desc = emplaceIt.first->second;
129+
120130
NConnector::NApi::TDescribeTableRequest request;
121-
FillDescribeTableRequest(request, it->second, tableAddress.TableName);
131+
auto issues = FillDescribeTableRequest(request, it->second, tableAddress.TableName);
132+
133+
if (!issues.Empty()) {
134+
return issues;
135+
}
122136

123137
auto promise = NThreading::NewPromise();
124138
handles.emplace_back(promise.GetFuture());
125139

126-
// preserve data source instance for the further usage
127-
auto emplaceIt =
128-
TableDescriptions_.emplace(tableAddress, std::make_shared<TTableDescription>());
129-
auto desc = emplaceIt.first->second;
130140
desc->DataSourceInstance = request.data_source_instance();
131141

132142
Y_ENSURE(State_->GenericClient);
@@ -217,6 +227,8 @@ namespace NYql {
217227
});
218228
});
219229
});
230+
231+
return TIssues{};
220232
}
221233

222234
public:
@@ -362,7 +374,7 @@ namespace NYql {
362374
// clang-format on
363375
}
364376

365-
void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request,
377+
TIssues FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request,
366378
const TGenericClusterConfig& clusterConfig, const TString& tablePath) {
367379
const auto dataSourceKind = clusterConfig.GetKind();
368380
auto dsi = request.mutable_data_source_instance();
@@ -372,8 +384,14 @@ namespace NYql {
372384
dsi->set_protocol(clusterConfig.GetProtocol());
373385
FillCredentials(request, clusterConfig);
374386
FillTypeMappingSettings(request);
375-
FillDataSourceOptions(request, clusterConfig);
387+
auto issues = FillDataSourceOptions(request, clusterConfig);
388+
if (!issues.Empty()) {
389+
return issues;
390+
}
391+
376392
FillTablePath(request, clusterConfig, tablePath);
393+
394+
return {};
377395
}
378396

379397
void FillCredentials(NConnector::NApi::TDescribeTableRequest& request,
@@ -508,7 +526,39 @@ namespace NYql {
508526
}
509527
}
510528

511-
void FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request,
529+
TIssues SetMongoDBOptions(NYql::TMongoDbDataSourceOptions& options, const TGenericClusterConfig& clusterConfig) {
530+
TIssues issues;
531+
auto it = clusterConfig.GetDataSourceOptions().find("reading_mode");
532+
if (it != clusterConfig.GetDataSourceOptions().end()) {
533+
TMongoDbDataSourceOptions_EReadingMode value = TMongoDbDataSourceOptions::READING_MODE_UNSPECIFIED;
534+
if (!TMongoDbDataSourceOptions_EReadingMode_Parse(it->second, &value)) {
535+
issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB reading_mode: " << it->second));
536+
}
537+
options.set_reading_mode(value);
538+
}
539+
540+
it = clusterConfig.GetDataSourceOptions().find("unexpected_type_display_mode");
541+
if (it != clusterConfig.GetDataSourceOptions().end()) {
542+
TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode value = TMongoDbDataSourceOptions::UNEXPECTED_UNSPECIFIED;
543+
if (!TMongoDbDataSourceOptions_EUnexpectedTypeDisplayMode_Parse(it->second, &value)) {
544+
issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unexpected_type_display_mode: " << it->second));
545+
}
546+
options.set_unexpected_type_display_mode(value);
547+
}
548+
549+
it = clusterConfig.GetDataSourceOptions().find("unsupported_type_display_mode");
550+
if (it != clusterConfig.GetDataSourceOptions().end()) {
551+
TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode value = TMongoDbDataSourceOptions::UNSUPPORTED_UNSPECIFIED;
552+
if (!TMongoDbDataSourceOptions_EUnsupportedTypeDisplayMode_Parse(it->second, &value)) {
553+
issues.AddIssue(TIssue(TStringBuilder() << "Failed to parse MongoDB unsupported_type_display_mode: " << it->second));
554+
}
555+
options.set_unsupported_type_display_mode(value);
556+
}
557+
558+
return issues;
559+
}
560+
561+
TIssues FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request,
512562
const TGenericClusterConfig& clusterConfig) {
513563
const auto dataSourceKind = clusterConfig.GetKind();
514564
switch (dataSourceKind) {
@@ -544,10 +594,16 @@ namespace NYql {
544594
break;
545595
case NYql::EGenericDataSourceKind::PROMETHEUS:
546596
break;
597+
case NYql::EGenericDataSourceKind::MONGO_DB: {
598+
auto* options = request.mutable_data_source_instance()->mutable_mongodb_options();
599+
return SetMongoDBOptions(*options, clusterConfig);
600+
} break;
547601
default:
548602
throw yexception() << "Unexpected data source kind: '"
549603
<< NYql::EGenericDataSourceKind_Name(dataSourceKind) << "'";
550604
}
605+
606+
return TIssues{};
551607
}
552608

553609
void FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request) {

0 commit comments

Comments
 (0)