Skip to content

Commit 8475274

Browse files
author
deniskhalikov
committed
PR from branch users/deniskhalikov/fuse_join_tree
Add fuse EquiJoins with label list. commit_hash:8f7cbce0cd1bd2bd8e9146fdf4b64805519d4af2
1 parent f838337 commit 8475274

File tree

3 files changed

+130
-41
lines changed

3 files changed

+130
-41
lines changed

yql/essentials/core/common_opt/yql_co_flow2.cpp

Lines changed: 83 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -261,13 +261,13 @@ TExprNode::TPtr RenameJoinTree(TExprNode::TPtr joinTree, const THashMap<TString,
261261
return ret;
262262
}
263263

264-
TExprNode::TPtr ReassembleJoinEquality(TExprNode::TPtr columns, const TStringBuf& upstreamLabel,
264+
TExprNode::TPtr ReassembleJoinEquality(TExprNode::TPtr columns, const THashSet<TStringBuf>& upstreamLabels,
265265
const THashMap<TString, TString>& upstreamTablesRename,
266266
const THashMap<TString, TString>& upstreamColumnsBackRename, TExprContext& ctx)
267267
{
268268
TExprNode::TListType newChildren(columns->ChildrenList());
269269
for (ui32 i = 0; i < columns->ChildrenSize(); i += 2) {
270-
if (columns->Child(i)->Content() != upstreamLabel) {
270+
if (!upstreamLabels.contains(columns->Child(i)->Content())) {
271271
continue;
272272
}
273273

@@ -280,36 +280,38 @@ TExprNode::TPtr ReassembleJoinEquality(TExprNode::TPtr columns, const TStringBuf
280280
upstreamTablesRename, ctx);
281281
newChildren[i + 1] = ctx.NewAtom(columns->Pos(), part2);
282282
} else {
283-
TStringBuf part1;
284-
TStringBuf part2;
285-
SplitTableName(column->Content(), part1, part2);
283+
TStringBuf part1 = columns->Child(i)->Content();
284+
TStringBuf part2 = columns->Child(i + 1)->Content();
285+
286+
if (TString(column->Content()).find(".") != TString::npos) {
287+
SplitTableName(column->Content(), part1, part2);
288+
}
289+
286290
newChildren[i] = RenameJoinTable(columns->Pos(), ctx.NewAtom(columns->Pos(), part1),
287291
upstreamTablesRename, ctx);
288292
newChildren[i + 1] = ctx.NewAtom(columns->Pos(), part2);
289-
290-
return nullptr;
291293
}
292294
}
293295

294296
auto ret = ctx.ChangeChildren(*columns, std::move(newChildren));
295297
return ret;
296298
}
297299

298-
TExprNode::TPtr FuseJoinTree(TExprNode::TPtr downstreamJoinTree, TExprNode::TPtr upstreamJoinTree, const TStringBuf& upstreamLabel,
300+
TExprNode::TPtr FuseJoinTree(TExprNode::TPtr downstreamJoinTree, TExprNode::TPtr upstreamJoinTree, const THashSet<TStringBuf>& upstreamLabels,
299301
const THashMap<TString, TString>& upstreamTablesRename, const THashMap<TString, TString>& upstreamColumnsBackRename,
300302
TExprContext& ctx)
301303
{
302304
TExprNode::TPtr left;
303305
if (downstreamJoinTree->Child(1)->IsAtom()) {
304-
if (downstreamJoinTree->Child(1)->Content() != upstreamLabel) {
306+
if (!upstreamLabels.contains(downstreamJoinTree->Child(1)->Content())) {
305307
left = downstreamJoinTree->Child(1);
306308
}
307309
else {
308310
left = RenameJoinTree(upstreamJoinTree, upstreamTablesRename, ctx);
309311
}
310312
}
311313
else {
312-
left = FuseJoinTree(downstreamJoinTree->Child(1), upstreamJoinTree, upstreamLabel, upstreamTablesRename,
314+
left = FuseJoinTree(downstreamJoinTree->Child(1), upstreamJoinTree, upstreamLabels, upstreamTablesRename,
313315
upstreamColumnsBackRename, ctx);
314316
if (!left) {
315317
return nullptr;
@@ -318,14 +320,14 @@ TExprNode::TPtr FuseJoinTree(TExprNode::TPtr downstreamJoinTree, TExprNode::TPtr
318320

319321
TExprNode::TPtr right;
320322
if (downstreamJoinTree->Child(2)->IsAtom()) {
321-
if (downstreamJoinTree->Child(2)->Content() != upstreamLabel) {
323+
if (!upstreamLabels.contains(downstreamJoinTree->Child(2)->Content())) {
322324
right = downstreamJoinTree->Child(2);
323325
}
324326
else {
325327
right = RenameJoinTree(upstreamJoinTree, upstreamTablesRename, ctx);
326328
}
327329
} else {
328-
right = FuseJoinTree(downstreamJoinTree->Child(2), upstreamJoinTree, upstreamLabel, upstreamTablesRename,
330+
right = FuseJoinTree(downstreamJoinTree->Child(2), upstreamJoinTree, upstreamLabels, upstreamTablesRename,
329331
upstreamColumnsBackRename, ctx);
330332
if (!right) {
331333
return nullptr;
@@ -335,9 +337,9 @@ TExprNode::TPtr FuseJoinTree(TExprNode::TPtr downstreamJoinTree, TExprNode::TPtr
335337
TExprNode::TListType newChildren(downstreamJoinTree->ChildrenList());
336338
newChildren[1] = left;
337339
newChildren[2] = right;
338-
newChildren[3] = ReassembleJoinEquality(downstreamJoinTree->Child(3), upstreamLabel, upstreamTablesRename,
340+
newChildren[3] = ReassembleJoinEquality(downstreamJoinTree->Child(3), upstreamLabels, upstreamTablesRename,
339341
upstreamColumnsBackRename, ctx);
340-
newChildren[4] = ReassembleJoinEquality(downstreamJoinTree->Child(4), upstreamLabel, upstreamTablesRename,
342+
newChildren[4] = ReassembleJoinEquality(downstreamJoinTree->Child(4), upstreamLabels, upstreamTablesRename,
341343
upstreamColumnsBackRename, ctx);
342344
if (!newChildren[3] || !newChildren[4]) {
343345
return nullptr;
@@ -347,18 +349,37 @@ TExprNode::TPtr FuseJoinTree(TExprNode::TPtr downstreamJoinTree, TExprNode::TPtr
347349
return ret;
348350
}
349351

350-
TExprNode::TPtr FuseEquiJoins(const TExprNode::TPtr& node, ui32 upstreamIndex, TExprContext& ctx) {
352+
bool IsSuitableToFuseInputMultiLabels(TOptimizeContext &optCtx) {
353+
YQL_ENSURE(optCtx.Types);
354+
static const char optName[] = "FuseEquiJoinsInputMultiLabels";
355+
return IsOptimizerEnabled<optName>(*optCtx.Types);
356+
}
357+
358+
TExprNode::TPtr FuseEquiJoins(const TExprNode::TPtr& node, ui32 upstreamIndex, TExprContext& ctx, TOptimizeContext &optCtx) {
351359
ui32 downstreamInputs = node->ChildrenSize() - 2;
352360
auto upstreamList = node->Child(upstreamIndex)->Child(0);
353361
auto upstreamLabel = node->Child(upstreamIndex)->Child(1);
362+
THashSet<TStringBuf> upstreamLabelsAssociatedByInputIndex;
354363
THashSet<TStringBuf> downstreamLabels;
355364
for (ui32 i = 0; i < downstreamInputs; ++i) {
356365
auto label = node->Child(i)->Child(1);
357-
if (!label->IsAtom()) {
358-
return node;
366+
if (auto list = TMaybeNode<TCoAtomList>(label)) {
367+
if (!IsSuitableToFuseInputMultiLabels(optCtx)) {
368+
return node;
369+
}
370+
for (auto labelAtom : list.Cast()) {
371+
auto label = labelAtom.Value();
372+
downstreamLabels.insert(label);
373+
if (upstreamIndex == i) {
374+
upstreamLabelsAssociatedByInputIndex.insert(label);
375+
}
376+
}
377+
} else {
378+
if (upstreamIndex == i) {
379+
upstreamLabelsAssociatedByInputIndex.insert(label->Content());
380+
}
381+
downstreamLabels.insert(label->Content());
359382
}
360-
361-
downstreamLabels.insert(label->Content());
362383
}
363384

364385
THashMap<TString, TString> upstreamTablesRename; // rename of conflicted upstream tables
@@ -381,7 +402,18 @@ TExprNode::TPtr FuseEquiJoins(const TExprNode::TPtr& node, ui32 upstreamIndex, T
381402
return node;
382403
}
383404

384-
if (downstreamLabels.contains(label->Content())) {
405+
if (upstreamLabelsAssociatedByInputIndex.size() == 1 && downstreamLabels.contains(label->Content()) ||
406+
// In case multiple labels input, we are not renaming labels associated with upstream input index.
407+
// For example:
408+
// (let ej1 = (EquiJoin '(input1, 'a), '(input2, 'b), upstreamJoinTree, '()))
409+
// (let ej2 = (EquiJoin '(ej1, '('a 'b)), '(input3, 'c), downstreamJoinTree, '())))
410+
// Upstream labels: [a, b];
411+
// Downstream labels: [a, b, c];
412+
// Not renaming [a, b] because their associated with input index.
413+
// As result we should get:
414+
// (let ejFused = (EquiJoin '(input1, 'a), '(input2, 'b), '(input3, 'c), fusedJoinTree, '()))
415+
(upstreamLabelsAssociatedByInputIndex.size() > 1 && downstreamLabels.contains(label->Content()) &&
416+
!upstreamLabelsAssociatedByInputIndex.contains(label->Content()))) {
385417
// fix conflict for labels
386418
for (ui32 suffix = 1;; ++suffix) {
387419
auto newName = TString::Join(label->Content(), "_", ToString(suffix));
@@ -481,26 +513,35 @@ TExprNode::TPtr FuseEquiJoins(const TExprNode::TPtr& node, ui32 upstreamIndex, T
481513
}
482514
}
483515

484-
for (auto& x : upstreamColumnsRename) {
485-
for (auto& y : x.second) {
486-
TStringBuf part1;
487-
TStringBuf part2;
488-
SplitTableName(x.first, part1, part2);
489-
if (auto renamed = upstreamTablesRename.FindPtr(part1)) {
490-
part1 = *renamed;
491-
}
492-
493-
settingsChildren.push_back(ctx.Builder(node->Pos())
494-
.List()
495-
.Atom(0, "rename")
496-
.Atom(1, TString::Join(part1, ".", part2))
497-
.Atom(2, TString::Join(upstreamLabel->Content(), ".", y))
498-
.Seal()
499-
.Build());
500-
}
501-
}
502-
503-
auto joinTree = FuseJoinTree(downstreamJoinTree, upstreamJoinTree, upstreamLabel->Content(),
516+
for (auto& x : upstreamColumnsRename) {
517+
for (auto& y : x.second) {
518+
TStringBuf part1;
519+
TStringBuf part2;
520+
SplitTableName(x.first, part1, part2);
521+
TStringBuf labelName = upstreamLabel->Content();
522+
if (upstreamLabelsAssociatedByInputIndex.size() > 1) {
523+
if (upstreamLabelsAssociatedByInputIndex.contains(part1)) {
524+
continue;
525+
} else {
526+
labelName = part1;
527+
}
528+
}
529+
530+
if (auto renamed = upstreamTablesRename.FindPtr(part1)) {
531+
part1 = *renamed;
532+
}
533+
534+
settingsChildren.push_back(ctx.Builder(node->Pos())
535+
.List()
536+
.Atom(0, "rename")
537+
.Atom(1, TString::Join(part1, ".", part2))
538+
.Atom(2, TString::Join(labelName, ".", y))
539+
.Seal()
540+
.Build());
541+
}
542+
}
543+
544+
auto joinTree = FuseJoinTree(downstreamJoinTree, upstreamJoinTree, upstreamLabelsAssociatedByInputIndex,
504545
upstreamTablesRename, upstreamColumnsBackRename, ctx);
505546
if (!joinTree) {
506547
return node;
@@ -514,6 +555,7 @@ TExprNode::TPtr FuseEquiJoins(const TExprNode::TPtr& node, ui32 upstreamIndex, T
514555
return ret;
515556
}
516557

558+
517559
bool IsRenamingOrPassthroughFlatMap(const TCoFlatMapBase& flatMap, THashMap<TStringBuf, TStringBuf>& renames,
518560
THashSet<TStringBuf>& outputMembers, bool& isIdentity)
519561
{
@@ -2007,7 +2049,7 @@ void RegisterCoFlowCallables2(TCallableOptimizerMap& map) {
20072049
if (node->Child(i)->Child(0)->IsCallable("EquiJoin") &&
20082050
optCtx.IsSingleUsage(*node->Child(i)) &&
20092051
optCtx.IsSingleUsage(*node->Child(i)->Child(0))) {
2010-
auto ret = FuseEquiJoins(node, i, ctx);
2052+
auto ret = FuseEquiJoins(node, i, ctx, optCtx);
20112053
if (ret != node) {
20122054
YQL_CLOG(DEBUG, Core) << "FuseEquiJoins";
20132055
return ret;

yql/essentials/tests/s-expressions/minirun/part5/canondata/result.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,20 @@
128128
"uri": "https://{canondata_backend}/1942525/ebc309da64241c6f5f13249c90361edaa566a9cc/resource.tar.gz#test.test_EquiJoin-EquiConvertToCommonTypeAlias-default.txt-Results_/results.txt"
129129
}
130130
],
131+
"test.test[EquiJoin-FuseEquiJoins-default.txt-Debug]": [
132+
{
133+
"checksum": "e4789e7f286c2d891f84c3f5a9016dbd",
134+
"size": 981,
135+
"uri": "https://{canondata_backend}/1942173/e0468c17e455c4e7740e0b19765633d5a5aca7af/resource.tar.gz#test.test_EquiJoin-FuseEquiJoins-default.txt-Debug_/opt.yql"
136+
}
137+
],
138+
"test.test[EquiJoin-FuseEquiJoins-default.txt-Results]": [
139+
{
140+
"checksum": "d4c82ff9c9a0c83a3f0b79b0472c76af",
141+
"size": 2466,
142+
"uri": "https://{canondata_backend}/1942173/e0468c17e455c4e7740e0b19765633d5a5aca7af/resource.tar.gz#test.test_EquiJoin-FuseEquiJoins-default.txt-Results_/results.txt"
143+
}
144+
],
131145
"test.test[EquiJoin-JoinInMemOpt1Opt2-default.txt-Debug]": [
132146
{
133147
"checksum": "6fab425c7eb72691262bf6b82c6f60fb",
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
(
2+
(let flag (Configure! world (DataSource 'config) 'OptimizerFlags 'FuseEquiJoinsInputMultiLabels))
3+
(let mr_source (DataSource 'yt 'plato))
4+
(let t1 (AsList
5+
(AsStruct '('key1 (Int32 '1)) '('value1 (String 'A)))
6+
(AsStruct '('key1 (Int32 '2)) '('value1 (String 'B)))
7+
))
8+
9+
(let t2 (AsList
10+
(AsStruct '('key2 (Int32 '3)) '('value2 (String 'C)))
11+
(AsStruct '('key2 (Int32 '4)) '('value2 (String 'D)))
12+
))
13+
14+
(let t3 (AsList
15+
(AsStruct '('key3 (Int32 '4)) '('value3 (String 'E)))
16+
(AsStruct '('key3 (Int32 '5)) '('value3 (String 'F)))
17+
))
18+
19+
(let t4 (AsList
20+
(AsStruct '('key4 (Int32 '6)) '('value4 (String 'J)))
21+
(AsStruct '('key4 (Int32 '7)) '('value4 (String 'H)))
22+
))
23+
24+
(let ej1 (EquiJoin '(t1 'a) '((FlatMap t2 (lambda '(row) (OptionalIf (Coalesce (> (Member row 'key2) (Int32 '1)) (Bool 'false)) row))) 'b) '('Inner 'a 'b '('a 'key1) '('b 'key2) '()) '()))
25+
(let ej2 (EquiJoin '(ej1 '('a 'b)) '(t3 'c) '('Inner 'a 'c '('a 'key1) '('c 'key3) '()) '()))
26+
(let ej3 (EquiJoin '(ej2 '('a 'b 'c)) '(t4 'd) '('Inner 'c 'd '('c 'key3) '('d 'key4) '()) '()))
27+
28+
(let res_sink (DataSink 'result))
29+
(let world (Write! flag res_sink (Key) ej3 '('('type))))
30+
31+
(let world (Commit! world res_sink))
32+
(return world)
33+
)

0 commit comments

Comments
 (0)