Skip to content

Commit e396f16

Browse files
authored
YDB FQ: add Greenplum to federated JOIN test (#8649)
1 parent be39b5a commit e396f16

File tree

5 files changed

+55
-35
lines changed

5 files changed

+55
-35
lines changed

ydb/core/fq/libs/actors/clusters_from_connections.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,18 @@ void FillGenericClusterConfigBase(
122122
FillClusterAuth(clusterCfg, connection.auth(), authToken, accountIdSignatures);
123123
clusterCfg.SetUseSsl(!common.GetDisableSslForGenericDataSources());
124124

125-
// In YQv1 we just hardcode desired protocols here.
125+
// In YQv1 we just hardcode the appropriate protocols here.
126126
// In YQv2 protocol can be configured via `CREATE EXTERNAL DATA SOURCE` params.
127127
switch (dataSourceKind) {
128128
case NYql::NConnector::NApi::CLICKHOUSE:
129129
clusterCfg.SetProtocol(common.GetUseNativeProtocolForClickHouse() ? NYql::NConnector::NApi::EProtocol::NATIVE : NYql::NConnector::NApi::EProtocol::HTTP);
130130
break;
131+
case NYql::NConnector::NApi::GREENPLUM:
132+
clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
133+
break;
134+
case NYql::NConnector::NApi::MYSQL:
135+
clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
136+
break;
131137
case NYql::NConnector::NApi::POSTGRESQL:
132138
clusterCfg.SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
133139
break;

ydb/library/yql/providers/generic/connector/tests/utils/docker_compose.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def __init__(self, docker_compose_yml_path: os.PathLike):
3838
self.docker_compose_yml_path = docker_compose_yml_path
3939

4040
with open(self.docker_compose_yml_path) as f:
41-
self.docker_compose_yml_data = yaml.load(f, Loader=yaml.FullLoader)
41+
self.docker_compose_yml_data = yaml.load(f, Loader=yaml.SafeLoader)
4242

4343
def get_external_port(self, service_name: str, internal_port: int) -> int:
4444
cmd = [

ydb/tests/fq/generic/test_join.py renamed to ydb/tests/fq/generic/test_join_analytics.py

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,66 +8,80 @@
88
from ydb.tests.fq.generic.utils.settings import Settings
99

1010

11-
class TestJoin:
11+
class TestJoinAnalytics:
1212
@yq_all
1313
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
1414
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True)
1515
@pytest.mark.parametrize("query_type", [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING])
1616
def test_simple(self, fq_client: FederatedQueryClient, settings: Settings, query_type):
17-
table_name = 'join_table'
18-
ch_conn_name = f'ch_conn_{table_name}'
19-
pg_conn_name = f'pg_conn_{table_name}'
20-
ydb_conn_name = f'ydb_conn_{table_name}'
21-
query_name = f'query_{table_name}'
22-
23-
fq_client.create_postgresql_connection(
24-
name=pg_conn_name,
25-
database_name=settings.postgresql.dbname,
26-
database_id='postgresql_cluster_id',
27-
login=settings.postgresql.username,
28-
password=settings.postgresql.password,
29-
)
17+
table_name = "join_table"
18+
ch_conn_name = f"ch_conn_{table_name}"
19+
gp_conn_name = f"gp_conn_{table_name}"
20+
pg_conn_name = f"pg_conn_{table_name}"
21+
ydb_conn_name = f"ydb_conn_{table_name}"
22+
query_name = f"query_{table_name}"
3023

3124
fq_client.create_clickhouse_connection(
3225
name=ch_conn_name,
3326
database_name=settings.clickhouse.dbname,
34-
database_id='clickhouse_cluster_id',
27+
database_id="clickhouse_cluster_id",
3528
login=settings.clickhouse.username,
3629
password=settings.clickhouse.password,
3730
)
3831

32+
fq_client.create_greenplum_connection(
33+
name=gp_conn_name,
34+
database_name=settings.greenplum.dbname,
35+
database_id="greenplum_cluster_id",
36+
login=settings.greenplum.username,
37+
password=settings.greenplum.password,
38+
)
39+
40+
fq_client.create_postgresql_connection(
41+
name=pg_conn_name,
42+
database_name=settings.postgresql.dbname,
43+
database_id="postgresql_cluster_id",
44+
login=settings.postgresql.username,
45+
password=settings.postgresql.password,
46+
)
47+
3948
fq_client.create_ydb_connection(
4049
name=ydb_conn_name,
4150
database_id=settings.ydb.dbname,
4251
)
4352

44-
# FIXME: research why test starts failing if we add Greenplum
45-
sql = fR'''
46-
SELECT pg.data AS data_pg, ch.data AS data_ch, ydb.data AS data_ydb
53+
sql = Rf"""
54+
SELECT pg.data AS data_pg, ch.data AS data_ch, ydb.data AS data_ydb, gp.data AS data_gp
4755
FROM {pg_conn_name}.{table_name} AS pg
4856
JOIN {ch_conn_name}.{table_name} AS ch
4957
ON pg.id = ch.id
5058
JOIN {ydb_conn_name}.{table_name} AS ydb
51-
ON pg.id = ydb.id;
52-
'''
59+
ON pg.id = ydb.id
60+
JOIN {gp_conn_name}.{table_name} AS gp
61+
ON pg.id = gp.id;
62+
"""
5363

5464
query_id = fq_client.create_query(query_name, sql, type=query_type).result.query_id
5565
fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
5666

5767
data = fq_client.get_result_data(query_id)
5868
result_set = data.result.result_set
5969
logging.debug(str(result_set))
60-
assert len(result_set.columns) == 3
70+
assert len(result_set.columns) == 4
6171
assert result_set.columns[0].name == "data_pg"
6272
assert result_set.columns[1].name == "data_ch"
6373
assert result_set.columns[2].name == "data_ydb"
74+
assert result_set.columns[3].name == "data_gp"
6475
assert len(result_set.rows) == 3
65-
assert result_set.rows[0].items[0].bytes_value == b'pg10'
66-
assert result_set.rows[0].items[1].bytes_value == b'ch10'
67-
assert result_set.rows[0].items[2].bytes_value == b'ydb10', result_set
68-
assert result_set.rows[1].items[0].bytes_value == b'pg20'
69-
assert result_set.rows[1].items[1].bytes_value == b'ch20'
70-
assert result_set.rows[1].items[2].bytes_value == b'ydb20'
71-
assert result_set.rows[2].items[0].bytes_value == b'pg30'
72-
assert result_set.rows[2].items[1].bytes_value == b'ch30'
73-
assert result_set.rows[2].items[2].bytes_value == b'ydb30'
76+
assert result_set.rows[0].items[0].bytes_value == b"pg10"
77+
assert result_set.rows[0].items[1].bytes_value == b"ch10"
78+
assert result_set.rows[0].items[2].bytes_value == b"ydb10"
79+
assert result_set.rows[0].items[3].bytes_value == b"gp10"
80+
assert result_set.rows[1].items[0].bytes_value == b"pg20"
81+
assert result_set.rows[1].items[1].bytes_value == b"ch20"
82+
assert result_set.rows[1].items[2].bytes_value == b"ydb20"
83+
assert result_set.rows[1].items[3].bytes_value == b"gp20"
84+
assert result_set.rows[2].items[0].bytes_value == b"pg30"
85+
assert result_set.rows[2].items[1].bytes_value == b"ch30"
86+
assert result_set.rows[2].items[2].bytes_value == b"ydb30"
87+
assert result_set.rows[2].items[3].bytes_value == b"gp30"

ydb/tests/fq/generic/test_streaming_join.py renamed to ydb/tests/fq/generic/test_join_streaming.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@
280280
]
281281

282282

283-
class TestStreamingJoin(TestYdsBase):
283+
class TestJoinStreaming(TestYdsBase):
284284
@yq_v1
285285
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
286286
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True)

ydb/tests/fq/generic/ya.make

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ TEST_SRCS(
7070
conftest.py
7171
test_clickhouse.py
7272
test_greenplum.py
73-
test_join.py
7473
test_postgresql.py
75-
test_streaming_join.py
74+
test_join_analytics.py
75+
test_join_streaming.py
7676
test_ydb.py
7777
)
7878

0 commit comments

Comments
 (0)