Skip to content

Commit 53e0492

Browse files
committed
Handle WaitAllInputs in EarlyMergeJoin
commit_hash:ca9b4625782c43f6096c8c85742d6450827f29a3
1 parent 11c9ef2 commit 53e0492

File tree

5 files changed

+39
-11
lines changed

5 files changed

+39
-11
lines changed

yt/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,12 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EquiJoin(TExprBase node
327327
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBase node, TExprContext& ctx) const {
328328
if (State_->Configuration->JoinMergeTablesLimit.Get()) {
329329
auto equiJoin = node.Cast<TYtEquiJoin>();
330+
331+
const bool waitAllInputs = State_->Configuration->JoinWaitAllInputs.Get().GetOrElse(false);
332+
if (waitAllInputs && !AreJoinInputsReady(equiJoin)) {
333+
return node;
334+
}
335+
330336
const auto tree = ImportYtEquiJoin(equiJoin, ctx);
331337
if (State_->Configuration->JoinMergeForce.Get() || tree->LinkSettings.ForceSortedMerge) {
332338
const auto rewriteStatus = RewriteYtEquiJoinLeaves(equiJoin, *tree, State_, ctx);
@@ -479,19 +485,11 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
479485
&& !HasSetting(equiJoin.JoinOptions().Ref(), "cbo_passed");
480486

481487
const bool waitAllInputs = State_->Configuration->JoinWaitAllInputs.Get().GetOrElse(false) || tryReorder;
482-
483-
if (waitAllInputs) {
484-
for (auto section: equiJoin.Input()) {
485-
for (auto path: section.Paths()) {
486-
TYtPathInfo pathInfo(path);
487-
if (!pathInfo.Table->Stat) {
488-
return node;
489-
}
490-
}
491-
}
488+
if (waitAllInputs && !AreJoinInputsReady(equiJoin)) {
489+
return node;
492490
}
493-
const auto tree = ImportYtEquiJoin(equiJoin, ctx);
494491

492+
const auto tree = ImportYtEquiJoin(equiJoin, ctx);
495493
if (tryReorder) {
496494
YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
497495
auto collectStatus = CollectCboStats(*tree, State_, ctx);

yt/yql/providers/yt/provider/yql_yt_join_impl.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5244,4 +5244,17 @@ TMaybeNode<TExprBase> ExportYtEquiJoin(TYtEquiJoin equiJoin, const TYtJoinNodeOp
52445244
return TExprBase(ctx.ChangeChildren(join.Ref(), std::move(children)));
52455245
}
52465246

5247+
bool AreJoinInputsReady(const TYtEquiJoin& equiJoin) {
5248+
for (auto section: equiJoin.Input()) {
5249+
for (auto path: section.Paths()) {
5250+
TYtPathInfo pathInfo(path);
5251+
if (!pathInfo.Table->Stat) {
5252+
return false;
5253+
}
5254+
}
5255+
}
5256+
5257+
return true;
5258+
}
5259+
52475260
}

yt/yql/providers/yt/provider/yql_yt_join_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,6 @@ IGraphTransformer::TStatus TryEstimateDataSizeChecked(IYtGateway::TPathStatResul
123123
ui64 CalcInMemorySizeNoCrossJoin(const TJoinLabel& label, const TYtJoinNodeOp& op, const TMapJoinSettings& settings, bool isLeft,
124124
TExprContext& ctx, bool needPayload, ui64 size);
125125

126+
bool AreJoinInputsReady(const TYtEquiJoin& equiJoin);
127+
126128
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
in Input1 sorted_by_kv1.txt
2+
in Input2 input_tutorial_users.txt
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
PRAGMA DisableSimpleColumns;
2+
use plato;
3+
pragma yt.JoinMergeTablesLimit="10";
4+
pragma yt.JoinAllowColumnRenames="true";
5+
pragma yt.JoinMergeUseSmallAsPrimary="true";
6+
pragma yt.JoinWaitAllInputs="true";
7+
8+
INSERT INTO @Input2Sorted SELECT * FROM Input2 ORDER BY key;
9+
COMMIT;
10+
11+
-- Input1 is smaller than Input2 (known thanks to JoinWaitAllInputs)
12+
select * from @Input2Sorted as b join /*+ merge() */ Input1 as a on a.k1 = b.key
13+
order by a.v1, b.value;

0 commit comments

Comments
 (0)