Skip to content

Commit a819f67

Browse files
Improved Q9 by moving BuildFlatmapStage to a later stage and adding a Flatmap pushdown (#11426)
1 parent 050eae7 commit a819f67

File tree

6 files changed

+209
-10
lines changed

6 files changed

+209
-10
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
5050
AddHandler(0, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<false>));
5151
AddHandler(0, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<false>));
5252
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildPureFlatmapStage));
53-
AddHandler(0, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
5453
AddHandler(0, &TCoCombineByKey::Match, HNDL(PushCombineToStage<false>));
5554
AddHandler(0, &TCoPartitionsByKeys::Match, HNDL(BuildPartitionsStage<false>));
5655
AddHandler(0, &TCoFinalizeByKey::Match, HNDL(BuildFinalizeByKeyStage<false>));
@@ -87,14 +86,15 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
8786
AddHandler(0, &TCoAsList::Match, HNDL(PropagatePrecomuteScalarRowset<false>));
8887
AddHandler(0, &TCoTake::Match, HNDL(PropagatePrecomuteTake<false>));
8988
AddHandler(0, &TCoFlatMap::Match, HNDL(PropagatePrecomuteFlatmap<false>));
90-
89+
AddHandler(0, &TCoFlatMapBase::Match, HNDL(PushFlatmapToStage<false>));
9190
AddHandler(0, &TCoAggregateCombine::Match, HNDL(ExpandAggregatePhase));
9291
AddHandler(0, &TCoAggregateCombineState::Match, HNDL(ExpandAggregatePhase));
9392
AddHandler(0, &TCoAggregateMergeState::Match, HNDL(ExpandAggregatePhase));
9493
AddHandler(0, &TCoAggregateMergeFinalize::Match, HNDL(ExpandAggregatePhase));
9594
AddHandler(0, &TCoAggregateMergeManyFinalize::Match, HNDL(ExpandAggregatePhase));
9695
AddHandler(0, &TCoAggregateFinalize::Match, HNDL(ExpandAggregatePhase));
9796

97+
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<false>));
9898
AddHandler(1, &TCoSkipNullMembers::Match, HNDL(PushSkipNullMembersToStage<true>));
9999
AddHandler(1, &TCoExtractMembers::Match, HNDL(PushExtractMembersToStage<true>));
100100
AddHandler(1, &TCoFlatMapBase::Match, HNDL(BuildFlatmapStage<true>));
@@ -297,6 +297,15 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
297297
return output;
298298
}
299299

300+
template <bool IsGlobal>
301+
TMaybeNode<TExprBase> PushFlatmapToStage(TExprBase node, TExprContext& ctx,
302+
IOptimizationContext& optCtx, const TGetParents& getParents)
303+
{
304+
TExprBase output = DqPushFlatmapToStage(node, ctx, optCtx, *getParents(), IsGlobal);
305+
DumpAppliedRule("DqPushFlatmapToStage", node.Ptr(), output.Ptr(), ctx);
306+
return output;
307+
}
308+
300309
template <bool IsGlobal>
301310
TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx,
302311
IOptimizationContext& optCtx, const TGetParents& getParents)
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
pragma TablePathPrefix = "/Root/test/ds/";
2+
3+
$count_1_20 = (select count(*)
4+
from store_sales as store_sales
5+
where ss_quantity between 1 and 20);
6+
7+
$avg_ss_ext_discount_amt_1_20 = (select avg(ss_ext_discount_amt)
8+
from store_sales as store_sales
9+
where ss_quantity between 1 and 20);
10+
11+
$avg_ss_net_profit_1_20 = (select avg(ss_net_profit)
12+
from store_sales as store_sales
13+
where ss_quantity between 1 and 20);
14+
15+
16+
$count_21_40 = (select count(*)
17+
from store_sales as store_sales
18+
where ss_quantity between 21 and 40);
19+
20+
$avg_ss_ext_discount_amt_21_40 = (select avg(ss_ext_discount_amt)
21+
from store_sales as store_sales
22+
where ss_quantity between 21 and 40);
23+
24+
$avg_ss_net_profit_21_40 = (select avg(ss_net_profit)
25+
from store_sales as store_sales
26+
where ss_quantity between 21 and 40);
27+
28+
29+
$count_41_60 = (select count(*)
30+
from store_sales as store_sales
31+
where ss_quantity between 41 and 60);
32+
33+
$avg_ss_ext_discount_amt_41_60 = (select avg(ss_ext_discount_amt)
34+
from store_sales as store_sales
35+
where ss_quantity between 41 and 60);
36+
37+
$avg_ss_net_profit_41_60 = (select avg(ss_net_profit)
38+
from store_sales as store_sales
39+
where ss_quantity between 41 and 60);
40+
41+
42+
$count_61_80 = (select count(*)
43+
from store_sales as store_sales
44+
where ss_quantity between 61 and 80);
45+
46+
$avg_ss_ext_discount_amt_61_80 = (select avg(ss_ext_discount_amt)
47+
from store_sales as store_sales
48+
where ss_quantity between 61 and 80);
49+
50+
$avg_ss_net_profit_61_80 = (select avg(ss_net_profit)
51+
from store_sales as store_sales
52+
where ss_quantity between 61 and 80);
53+
54+
55+
$count_81_100 = (select count(*)
56+
from store_sales as store_sales
57+
where ss_quantity between 81 and 100);
58+
59+
$avg_ss_ext_discount_amt_81_100 = (select avg(ss_ext_discount_amt)
60+
from store_sales as store_sales
61+
where ss_quantity between 81 and 100);
62+
63+
$avg_ss_net_profit_81_100 = (select avg(ss_net_profit)
64+
from store_sales as store_sales
65+
where ss_quantity between 81 and 100);
66+
67+
68+
select case when $count_1_20 > 98972190
69+
then $avg_ss_ext_discount_amt_1_20
70+
else $avg_ss_net_profit_1_20 end bucket1 ,
71+
case when $count_21_40 > 160856845
72+
then $avg_ss_ext_discount_amt_21_40
73+
else $avg_ss_net_profit_21_40 end bucket2,
74+
case when $count_41_60 > 12733327
75+
then $avg_ss_ext_discount_amt_41_60
76+
else $avg_ss_net_profit_41_60 end bucket3,
77+
case when $count_61_80 > 96251173
78+
then $avg_ss_ext_discount_amt_61_80
79+
else $avg_ss_net_profit_61_80 end bucket4,
80+
case when $count_81_100 > 80049606
81+
then $avg_ss_ext_discount_amt_81_100
82+
else $avg_ss_net_profit_81_100 end bucket5
83+
from reason as reason
84+
where r_reason_sk = 1;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
pragma TablePathPrefix = "/Root/test/ds/";
2+
3+
---$count_1_20 = (select count(*)
4+
-- from store_sales as store_sales
5+
-- where ss_quantity between 1 and 20);
6+
7+
$avg_ss_ext_discount_amt_1_20 = (select avg(ss_ext_discount_amt)
8+
from store_sales as store_sales
9+
where ss_quantity between 1 and 20);
10+
11+
$avg_ss_net_profit_1_20 = (select avg(ss_net_profit)
12+
from store_sales as store_sales
13+
where ss_quantity between 1 and 20);
14+
15+
16+
--select case when $count_1_20 > 98972190
17+
select case when 1e20 > 98972190
18+
then $avg_ss_ext_discount_amt_1_20
19+
else $avg_ss_net_profit_1_20 end bucket1
20+
21+
from reason as reason
22+
where r_reason_sk = 1;

ydb/core/kqp/ut/join/kqp_join_order_ut.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,14 @@ Y_UNIT_TEST_SUITE(KqpJoinOrder) {
462462
ExecuteJoinOrderTestDataQueryWithStats("queries/tpch21.sql", "stats/tpch1000s.json", StreamLookupJoin, ColumnStore);
463463
}
464464

465+
Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS9, StreamLookupJoin, ColumnStore) {
466+
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds9.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
467+
}
468+
469+
Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS9_SMALL, StreamLookupJoin, ColumnStore) {
470+
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds9_small.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
471+
}
472+
465473
Y_UNIT_TEST_XOR_OR_BOTH_FALSE(TPCDS16, StreamLookupJoin, ColumnStore) {
466474
ExecuteJoinOrderTestDataQueryWithStats("queries/tpcds16.sql", "stats/tpcds1000s.json", StreamLookupJoin, ColumnStore);
467475
}

ydb/library/yql/dq/opt/dq_opt_phy.cpp

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,8 @@ TExprBase DqBuildPureFlatmapStage(TExprBase node, TExprContext& ctx) {
761761
TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
762762
const TParentsMap& parentsMap, bool allowStageMultiUsage)
763763
{
764+
Y_UNUSED(optCtx);
765+
764766
if (!node.Maybe<TCoFlatMapBase>().Input().Maybe<TDqCnUnionAll>()) {
765767
return node;
766768
}
@@ -792,14 +794,25 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
792794
return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage)));
793795
}
794796

795-
auto lambda = TCoLambda(ctx.Builder(flatmap.Lambda().Pos())
796-
.Lambda()
797-
.Param("stream")
798-
.Callable(flatmap.Ref().Content())
799-
.Arg(0, "stream")
800-
.Add(1, ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
801-
.Seal()
802-
.Seal().Build());
797+
TCoLambda lambda = flatmap.Lambda();
798+
799+
if (flatmap.Maybe<TCoFlatMap>()) {
800+
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
801+
.Args({"stream"})
802+
.Body<TCoFlatMap>()
803+
.Input("stream")
804+
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
805+
.Build()
806+
.Done();
807+
} else {
808+
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
809+
.Args({"stream"})
810+
.Body<TCoOrderedFlatMap>()
811+
.Input("stream")
812+
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
813+
.Build()
814+
.Done();
815+
}
803816

804817
auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
805818
if (pushResult) {
@@ -825,6 +838,66 @@ TExprBase DqBuildFlatmapStage(TExprBase node, TExprContext& ctx, IOptimizationCo
825838
.Done();
826839
}
827840

841+
TExprBase DqPushFlatmapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
842+
const TParentsMap& parentsMap, bool allowStageMultiUsage)
843+
{
844+
if (!node.Maybe<TCoFlatMapBase>().Input().Maybe<TDqCnUnionAll>()) {
845+
return node;
846+
}
847+
848+
auto flatmap = node.Cast<TCoFlatMapBase>();
849+
if (!IsDqSelfContainedExpr(flatmap.Lambda())) {
850+
return node;
851+
}
852+
auto dqUnion = flatmap.Input().Cast<TDqCnUnionAll>();
853+
if (!IsSingleConsumerConnection(dqUnion, parentsMap, allowStageMultiUsage)) {
854+
return node;
855+
}
856+
857+
bool isPure;
858+
TVector<TDqConnection> innerConnections;
859+
FindDqConnections(flatmap.Lambda(), innerConnections, isPure);
860+
if (!isPure) {
861+
return node;
862+
}
863+
864+
TMaybeNode<TDqStage> flatmapStage;
865+
if (!innerConnections.empty()) {
866+
return node;
867+
} else {
868+
if (auto connToPushableStage = DqBuildPushableStage(dqUnion, ctx)) {
869+
return TExprBase(ctx.ChangeChild(*node.Raw(), TCoFlatMapBase::idx_Input, std::move(connToPushableStage)));
870+
}
871+
872+
TCoLambda lambda = flatmap.Lambda();
873+
874+
if (flatmap.Maybe<TCoFlatMap>()) {
875+
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
876+
.Args({"stream"})
877+
.Body<TCoFlatMap>()
878+
.Input("stream")
879+
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
880+
.Build()
881+
.Done();
882+
} else {
883+
lambda = Build<TCoLambda>(ctx, flatmap.Lambda().Pos())
884+
.Args({"stream"})
885+
.Body<TCoOrderedFlatMap>()
886+
.Input("stream")
887+
.Lambda(ctx.DeepCopyLambda(flatmap.Lambda().Ref()))
888+
.Build()
889+
.Done();
890+
}
891+
892+
auto pushResult = DqPushLambdaToStageUnionAll(dqUnion, lambda, {}, ctx, optCtx);
893+
if (pushResult) {
894+
return pushResult.Cast();
895+
}
896+
}
897+
898+
return node;
899+
}
900+
828901
template <typename BaseLMap>
829902
TExprBase DqPushBaseLMapToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
830903
const TParentsMap& parentsMap, bool allowStageMultiUsage = true)

ydb/library/yql/dq/opt/dq_opt_phy.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ NNodes::TExprBase DqBuildPureFlatmapStage(NNodes::TExprBase node, TExprContext&
4848
NNodes::TExprBase DqBuildFlatmapStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
4949
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
5050

51+
NNodes::TExprBase DqPushFlatmapToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
52+
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
53+
5154
NNodes::TExprBase DqPushCombineToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
5255
const TParentsMap& parentsMap, bool allowStageMultiUsage = true);
5356

0 commit comments

Comments
 (0)