Skip to content

Commit dd61f85

Browse files
authored
streamlookupjoin: fix joining from multi-partition stream (#8622)
1 parent b3e4c25 commit dd61f85

File tree

2 files changed

+125
-82
lines changed

2 files changed

+125
-82
lines changed

ydb/library/yql/dq/tasks/dq_connection_builder.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ void BuildStreamLookupChannels(TGraph& graph, const NNodes::TDqPhyStage& stage,
216216
auto& originStageInfo = graph.GetStageInfo(cnStreamLookup.Output().Stage());
217217
auto outputIndex = FromString<ui32>(cnStreamLookup.Output().Index().Value());
218218

219-
BuildMapChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc);
219+
BuildUnionAllChannels(graph, stageInfo, inputIndex, originStageInfo, outputIndex, false /*spilling*/, logFunc);
220220
}
221221

222222
template <typename TGraph>

ydb/tests/fq/generic/test_join_streaming.py

Lines changed: 124 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import os
33
import json
44
import sys
5+
from collections import Counter
6+
from operator import itemgetter
57

68
import ydb.public.api.protos.draft.fq_pb2 as fq
79
from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1
@@ -11,6 +13,31 @@
1113
from ydb.tests.fq.generic.utils.settings import Settings
1214

1315
DEBUG = 0
16+
17+
18+
def ResequenceId(messages):
19+
res = []
20+
i = 1
21+
for pair in messages:
22+
rpair = []
23+
for it in pair:
24+
src = json.loads(it)
25+
src["id"] = i
26+
rpair += [json.dumps(src)]
27+
res += [tuple(rpair)]
28+
i += 1
29+
return res
30+
31+
32+
def freeze(json):
33+
t = type(json)
34+
if t == dict:
35+
return frozenset((k, freeze(v)) for k, v in json.items())
36+
if t == list:
37+
return tuple(map(freeze, json))
38+
return json
39+
40+
1441
TESTCASES = [
1542
# 0
1643
(
@@ -96,17 +123,19 @@
96123
insert into myyds.`{output_topic}`
97124
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
98125
''',
99-
[
100-
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
101-
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
102-
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
103-
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
104-
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
105-
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
106-
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
107-
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
108-
]
109-
* 20,
126+
ResequenceId(
127+
[
128+
('{"id":3,"user":5}', '{"id":3,"user_id":5,"lookup":null}'),
129+
('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'),
130+
('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'),
131+
('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'),
132+
('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'),
133+
('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'),
134+
('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'),
135+
('{"id":7,"user":2}', '{"id":7,"user_id":2,"lookup":"ydb20"}'),
136+
]
137+
* 20
138+
),
110139
),
111140
# 3
112141
(
@@ -137,37 +166,39 @@
137166
insert into myyds.`{output_topic}`
138167
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
139168
''',
140-
[
141-
(
142-
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
143-
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
144-
),
145-
(
146-
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
147-
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
148-
),
149-
(
150-
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
151-
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
152-
),
153-
(
154-
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
155-
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
156-
),
157-
(
158-
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
159-
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
160-
),
161-
(
162-
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
163-
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
164-
),
165-
(
166-
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
167-
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
168-
),
169-
]
170-
* 10,
169+
ResequenceId(
170+
[
171+
(
172+
'{"id":2,"ts":"20240701T113344","ev_type":"foo1","user":2}',
173+
'{"id":2,"ts":"11:33:44","user_id":2,"lookup":"ydb20"}',
174+
),
175+
(
176+
'{"id":1,"ts":"20240701T112233","ev_type":"foo2","user":1}',
177+
'{"id":1,"ts":"11:22:33","user_id":1,"lookup":"ydb10"}',
178+
),
179+
(
180+
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":5}',
181+
'{"id":3,"ts":"11:33:55","user_id":5,"lookup":null}',
182+
),
183+
(
184+
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
185+
'{"id":4,"ts":"11:33:56","user_id":3,"lookup":"ydb30"}',
186+
),
187+
(
188+
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
189+
'{"id":5,"ts":"11:33:57","user_id":3,"lookup":"ydb30"}',
190+
),
191+
(
192+
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
193+
'{"id":6,"ts":"11:22:38","user_id":1,"lookup":"ydb10"}',
194+
),
195+
(
196+
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
197+
'{"id":7,"ts":"11:33:49","user_id":2,"lookup":"ydb20"}',
198+
),
199+
]
200+
* 10
201+
),
171202
),
172203
# 4
173204
(
@@ -200,37 +231,39 @@
200231
insert into myyds.`{output_topic}`
201232
select Unwrap(Yson::SerializeJson(Yson::From(TableRow()))) from $enriched;
202233
''',
203-
[
204-
(
205-
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
206-
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
207-
),
208-
(
209-
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
210-
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
211-
),
212-
(
213-
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
214-
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
215-
),
216-
(
217-
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
218-
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
219-
),
220-
(
221-
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
222-
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
223-
),
224-
(
225-
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
226-
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
227-
),
228-
(
229-
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
230-
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
231-
),
232-
]
233-
* 1000,
234+
ResequenceId(
235+
[
236+
(
237+
'{"id":1,"ts":"20240701T113344","ev_type":"foo1","user":2}',
238+
'{"id":1,"ts":"11:33:44","uid":2,"user_id":2,"name":"Petr","age":25}',
239+
),
240+
(
241+
'{"id":2,"ts":"20240701T112233","ev_type":"foo2","user":1}',
242+
'{"id":2,"ts":"11:22:33","uid":1,"user_id":1,"name":"Anya","age":15}',
243+
),
244+
(
245+
'{"id":3,"ts":"20240701T113355","ev_type":"foo3","user":100}',
246+
'{"id":3,"ts":"11:33:55","uid":null,"user_id":100,"name":null,"age":null}',
247+
),
248+
(
249+
'{"id":4,"ts":"20240701T113356","ev_type":"foo4","user":3}',
250+
'{"id":4,"ts":"11:33:56","uid":3,"user_id":3,"name":"Masha","age":17}',
251+
),
252+
(
253+
'{"id":5,"ts":"20240701T113357","ev_type":"foo5","user":3}',
254+
'{"id":5,"ts":"11:33:57","uid":3,"user_id":3,"name":"Masha","age":17}',
255+
),
256+
(
257+
'{"id":6,"ts":"20240701T112238","ev_type":"foo6","user":1}',
258+
'{"id":6,"ts":"11:22:38","uid":1,"user_id":1,"name":"Anya","age":15}',
259+
),
260+
(
261+
'{"id":7,"ts":"20240701T113349","ev_type":"foo7","user":2}',
262+
'{"id":7,"ts":"11:33:49","uid":2,"user_id":2,"name":"Petr","age":25}',
263+
),
264+
]
265+
* 1000
266+
),
234267
),
235268
# 5
236269
(
@@ -334,12 +367,23 @@ def test_simple(self, kikimr, fq_client: FederatedQueryClient, settings: Setting
334367
@yq_v1
335368
@pytest.mark.parametrize("mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-ydb:2136"}], indirect=True)
336369
@pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder_slj"}], indirect=True)
337-
@pytest.mark.parametrize("streamlookup", [False, True])
370+
@pytest.mark.parametrize("partitions_count", [1, 3])
371+
@pytest.mark.parametrize("streamlookup", [False, True] if DEBUG else [True])
338372
@pytest.mark.parametrize("testcase", [*range(len(TESTCASES))])
339373
def test_streamlookup(
340-
self, kikimr, testcase, streamlookup, fq_client: FederatedQueryClient, settings: Settings, yq_version
374+
self,
375+
kikimr,
376+
testcase,
377+
streamlookup,
378+
partitions_count,
379+
fq_client: FederatedQueryClient,
380+
settings: Settings,
381+
yq_version,
341382
):
342-
self.init_topics(f"pq_yq_streaming_test_lookup_{streamlookup}{testcase}_{yq_version}")
383+
self.init_topics(
384+
f"pq_yq_str_lookup_{partitions_count}{streamlookup}{testcase}_{yq_version}",
385+
partitions_count=partitions_count,
386+
)
343387
fq_client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))
344388

345389
table_name = 'join_table'
@@ -359,7 +403,7 @@ def test_streamlookup(
359403
)
360404

361405
query_id = fq_client.create_query(
362-
f"streamlookup_{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
406+
f"streamlookup_{partitions_count}{streamlookup}{testcase}", sql, type=fq.QueryContent.QueryType.STREAMING
363407
).result.query_id
364408
fq_client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
365409
kikimr.compute_plane.wait_zero_checkpoint(query_id)
@@ -375,10 +419,9 @@ def test_streamlookup(
375419
print(streamlookup, testcase, file=sys.stderr)
376420
print(sql, file=sys.stderr)
377421
print(*zip(messages, read_data), file=sys.stderr, sep="\n")
378-
for r, exp in zip(read_data, messages):
379-
r = json.loads(r)
380-
exp = json.loads(exp[1])
381-
assert r == exp
422+
read_data_ctr = Counter(map(freeze, map(json.loads, read_data)))
423+
messages_ctr = Counter(map(freeze, map(json.loads, map(itemgetter(1), messages))))
424+
assert read_data_ctr == messages_ctr
382425

383426
fq_client.abort_query(query_id)
384427
fq_client.wait_query(query_id)

0 commit comments

Comments
 (0)