Skip to content

Commit b13ee2c

Browse files
authored
streaming grace join test (#3862)
1 parent d8479a2 commit b13ee2c

File tree

2 files changed

+98
-0
lines changed

2 files changed

+98
-0
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
import boto3
5+
import logging
6+
import os
7+
import pytest
8+
9+
from ydb.tests.tools.datastreams_helpers.test_yds_base import TestYdsBase
10+
import ydb.public.api.protos.ydb_value_pb2 as ydb_value
11+
import ydb.public.api.protos.draft.fq_pb2 as fq
12+
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
13+
14+
15+
class TestStreamingJoin(TestYdsBase):
16+
@yq_v1
17+
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
18+
def test_grace_join(self, kikimr, s3, client, unique_prefix):
19+
self.init_topics("pq_test_grace_join")
20+
21+
# S3
22+
resource = boto3.resource(
23+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
24+
)
25+
26+
bucket = resource.Bucket("test_grace_join")
27+
bucket.create(ACL='public-read')
28+
29+
s3_client = boto3.client(
30+
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
31+
)
32+
33+
fruits = R'''Time,Fruit,Price
34+
0,Banana,3
35+
1,Apple,2
36+
2,Pear,15'''
37+
s3_client.put_object(Body=fruits, Bucket='test_grace_join', Key='fruits.csv', ContentType='text/plain')
38+
39+
kikimr.control_plane.wait_bootstrap(1)
40+
41+
storage_connection_name = unique_prefix + "test_grace_join"
42+
client.create_storage_connection(storage_connection_name, "test_grace_join")
43+
44+
connection_response = client.create_yds_connection(
45+
"myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT")
46+
)
47+
keyColumn = ydb_value.Column(name="Data", type=ydb_value.Type(type_id=ydb_value.Type.PrimitiveTypeId.STRING))
48+
client.create_yds_binding(
49+
name="my_binding_in",
50+
stream=self.input_topic,
51+
format="raw",
52+
connection_id=connection_response.result.connection_id,
53+
columns=[keyColumn],
54+
)
55+
client.create_yds_binding(
56+
name="my_binding_out",
57+
stream=self.output_topic,
58+
format="raw",
59+
connection_id=connection_response.result.connection_id,
60+
columns=[keyColumn],
61+
)
62+
63+
sql = fR'''
64+
pragma dq.HashJoinMode="grace";
65+
66+
$stream = SELECT `Data` FROM bindings.`my_binding_in`;
67+
68+
$s3 = SELECT *,
69+
FROM `{storage_connection_name}`.`fruits.csv`
70+
WITH (format=csv_with_names, SCHEMA (
71+
Time UInt64 NOT NULL,
72+
Fruit String NOT NULL,
73+
Price Int NOT NULL
74+
));
75+
76+
$my_join = SELECT * FROM $stream AS a LEFT JOIN $s3 AS b ON a.Data = b.Fruit;
77+
78+
INSERT INTO bindings.`my_binding_out`
79+
SELECT Data AS data FROM $my_join WHERE Data LIKE '%ahaha%'
80+
'''
81+
82+
query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
83+
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
84+
kikimr.compute_plane.wait_zero_checkpoint(query_id)
85+
86+
data = ['{{"event_class": "{}", "time": "{}"}}'.format("ahaha", "2024-01-03T00:00:00Z")]
87+
self.write_stream(data, self.input_topic)
88+
89+
read_data = self.read_stream(1)
90+
logging.info("Data was read: {}".format(read_data))
91+
92+
# This condition will be failed after the fix grace join for streaming queries
93+
assert len(read_data) == 0
94+
95+
client.abort_query(query_id)
96+
97+
client.wait_query(query_id)

ydb/tests/fq/s3/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ TEST_SRCS(
3535
test_push_down.py
3636
test_s3_0.py
3737
test_s3_1.py
38+
test_streaming_join.py
3839
test_size_limit.py
3940
test_statistics.py
4041
test_test_connection.py

0 commit comments

Comments
 (0)