Skip to content

Commit 1ac7a6f

Browse files
authored
Handle Unordered in Dq (#8357)
1 parent 7943190 commit 1ac7a6f

File tree

28 files changed

+889
-710
lines changed

28 files changed

+889
-710
lines changed

ydb/library/yql/core/common_opt/yql_co_simple1.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5936,7 +5936,7 @@ void RegisterCoSimpleCallables1(TCallableOptimizerMap& map) {
59365936
};
59375937

59385938
map["Unordered"] = map["UnorderedSubquery"] = [](const TExprNode::TPtr& node, TExprContext& ctx, TOptimizeContext& /*optCtx*/) {
5939-
if (node->Head().IsCallable({"AsList","EquiJoin","Filter","Map","FlatMap","MultiMap","Extend", "Apply"})) {
5939+
if (node->Head().IsCallable({"AsList","EquiJoin","Filter","Map","FlatMap","MultiMap","Extend", "Apply","PartitionByKey","PartitionsByKeys"})) {
59405940
YQL_CLOG(DEBUG, Core) << "Drop " << node->Content() << " over " << node->Head().Content();
59415941
return node->HeadPtr();
59425942
}

ydb/library/yql/dq/opt/dq_opt.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <ydb/library/yql/core/yql_expr_optimize.h>
66
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
7+
#include <ydb/library/yql/dq/integration/yql_dq_optimization.h>
78

89
using namespace NYql::NNodes;
910

@@ -160,4 +161,14 @@ bool CanPushDqExpr(const TExprBase& expr, const TDqConnection& connection) {
160161
return CanPushDqExpr(expr, connection.Output().Stage());
161162
}
162163

164+
IDqOptimization* GetDqOptCallback(const TExprBase& providerCall, const TTypeAnnotationContext& typeAnnCtx) {
165+
if (providerCall.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerCall.Ref().Child(1))) {
166+
auto dataSourceName = providerCall.Ref().Child(1)->Child(0)->Content();
167+
auto datasource = typeAnnCtx.DataSourceMap.FindPtr(dataSourceName);
168+
YQL_ENSURE(datasource);
169+
return (*datasource)->GetDqOptimization();
170+
}
171+
return nullptr;
172+
}
173+
163174
} // namespace NYql::NDq

ydb/library/yql/dq/opt/dq_opt.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
#include <util/generic/guid.h>
88

9+
namespace NYql {
10+
class IDqOptimization;
11+
struct TTypeAnnotationContext;
12+
}
13+
914
namespace NYql::NDq {
1015

1116
NNodes::TCoAtom BuildAtom(TStringBuf value, TPositionHandle pos, TExprContext& ctx);
@@ -38,4 +43,7 @@ bool IsDqDependsOnStageOutput(const NNodes::TExprBase& node, const NNodes::TDqSt
3843
bool CanPushDqExpr(const NNodes::TExprBase& expr, const NNodes::TDqStageBase& stage);
3944
bool CanPushDqExpr(const NNodes::TExprBase& expr, const NNodes::TDqConnection& connection);
4045

46+
IDqOptimization* GetDqOptCallback(const NNodes::TExprBase& providerCall, const TTypeAnnotationContext& typeAnnCtx);
47+
48+
4149
} // namespace NYql::NDq

ydb/library/yql/dq/opt/dq_opt_log.cpp

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -386,16 +386,6 @@ TExprBase DqExpandMatchRecognize(TExprBase node, TExprContext& ctx, TTypeAnnotat
386386
return TExprBase(ExpandMatchRecognize(node.Ptr(), ctx, typeAnnCtx));
387387
}
388388

389-
IDqOptimization* GetDqOptCallback(const TExprBase& providerRead, TTypeAnnotationContext& typeAnnCtx) {
390-
if (providerRead.Ref().ChildrenSize() > 1 && TCoDataSource::Match(providerRead.Ref().Child(1))) {
391-
auto dataSourceName = providerRead.Ref().Child(1)->Child(0)->Content();
392-
auto datasource = typeAnnCtx.DataSourceMap.FindPtr(dataSourceName);
393-
YQL_ENSURE(datasource);
394-
return (*datasource)->GetDqOptimization();
395-
}
396-
return nullptr;
397-
}
398-
399389
TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const std::function<const TParentsMap*()>& getParents, bool enableDqReplicate, TTypeAnnotationContext& typeAnnCtx) {
400390
const auto unordered = node.Cast<TCoUnorderedBase>();
401391
if (const auto maybeRead = unordered.Input().Maybe<TDqReadWrapBase>().Input()) {

ydb/library/yql/dq/opt/dq_opt_phy.cpp

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
#include "dq_opt_phy.h"
22
#include "dq_opt_join.h"
3+
#include "dq_opt.h"
34

45
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
56
#include <ydb/library/yql/core/yql_opt_utils.h>
67
#include <ydb/library/yql/core/yql_type_helpers.h>
78
#include <ydb/library/yql/utils/log/log.h>
89
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
910
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
11+
#include <ydb/library/yql/dq/integration/yql_dq_optimization.h>
1012
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
1113
#include <ydb/library/yql/core/yql_cost_function.h>
1214

@@ -2939,4 +2941,156 @@ NNodes::TExprBase DqBuildStageWithReadWrap(NNodes::TExprBase node, TExprContext&
29392941
.Build() .Done();
29402942
}
29412943

2944+
TExprBase DqPushUnorderedToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
2945+
const TParentsMap& parentsMap, bool allowStageMultiUsage)
2946+
{
2947+
if (!node.Maybe<TCoUnorderedBase>().Input().template Maybe<TDqConnection>()) {
2948+
return node;
2949+
}
2950+
2951+
auto unordered = node.Cast<TCoUnorderedBase>();
2952+
auto dqConection = unordered.Input().template Cast<TDqConnection>();
2953+
2954+
if (!IsSingleConsumerConnection(dqConection, parentsMap, allowStageMultiUsage)) {
2955+
return node;
2956+
}
2957+
2958+
if (dqConection.Maybe<TDqCnMerge>()) {
2959+
dqConection = Build<TDqCnUnionAll>(ctx, dqConection.Pos()).Output(dqConection.Output()).Done();
2960+
}
2961+
2962+
const auto stage = dqConection.Output().Stage().Cast<TDqStage>();
2963+
if (GetStageOutputsCount(stage) > 1 && !stage.Program().Body().Maybe<TDqReplicate>()) {
2964+
// Can't push Unordered to such stage. Just drop Unordered
2965+
return dqConection;
2966+
}
2967+
2968+
auto lambda = Build<TCoLambda>(ctx, unordered.Pos())
2969+
.Args({"stream"})
2970+
.Body<TCoUnordered>()
2971+
.Input("stream")
2972+
.Build()
2973+
.Done();
2974+
2975+
auto result = DqPushLambdaToStageUnionAll(dqConection, lambda, {}, ctx, optCtx);
2976+
if (!result) {
2977+
return node;
2978+
}
2979+
2980+
return result.Cast();
2981+
}
2982+
2983+
TExprBase DqPushUnorderedOverDqConnection(TDqConnection dqConection, TExprContext& ctx, IOptimizationContext& optCtx) {
2984+
if (dqConection.Maybe<TDqCnMerge>()) {
2985+
dqConection = Build<TDqCnUnionAll>(ctx, dqConection.Pos()).Output(dqConection.Output()).Done();
2986+
}
2987+
2988+
const auto stage = dqConection.Output().Stage().Cast<TDqStage>();
2989+
if (GetStageOutputsCount(stage) > 1 && !stage.Program().Body().Maybe<TDqReplicate>()) {
2990+
// Can't push Unordered to such stage
2991+
return dqConection;
2992+
}
2993+
2994+
auto lambda = Build<TCoLambda>(ctx, dqConection.Pos())
2995+
.Args({"stream"})
2996+
.Body<TCoUnordered>()
2997+
.Input("stream")
2998+
.Build()
2999+
.Done();
3000+
3001+
auto result = DqPushLambdaToStageUnionAll(dqConection, lambda, {}, ctx, optCtx);
3002+
if (!result) {
3003+
return dqConection;
3004+
}
3005+
3006+
return result.Cast();
3007+
}
3008+
3009+
3010+
TMaybeNode<TExprBase> DqUnorderedOverStageInput(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
3011+
const TTypeAnnotationContext& typeAnnCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage) {
3012+
3013+
if (!node.Maybe<TDqStageBase>()) {
3014+
return node;
3015+
}
3016+
3017+
auto stage = node.Cast<TDqStageBase>();
3018+
3019+
auto stageLambda = stage.Program();
3020+
3021+
TNodeOnNodeOwnedMap nodesToOptimize;
3022+
TDynBitMap inputsToOptimize;
3023+
for (size_t i = 0; i < stageLambda.Args().Size(); ++i) {
3024+
auto arg = stageLambda.Args().Arg(i);
3025+
auto it = parentsMap.find(arg.Raw());
3026+
if (it != parentsMap.cend()) {
3027+
// Substitute Unordered(arg) by arg
3028+
std::vector<const TExprNode*> unordereds;
3029+
for (auto n: it->second) {
3030+
if (TCoUnordered::Match(n)) {
3031+
unordereds.push_back(n);
3032+
} else if (!TCoDependsOn::Match(n)) {
3033+
unordereds.clear();
3034+
break;
3035+
}
3036+
}
3037+
if (!unordereds.empty()) {
3038+
for (auto n: unordereds) {
3039+
nodesToOptimize.emplace(n, arg.Ptr());
3040+
}
3041+
inputsToOptimize.Set(i);
3042+
}
3043+
}
3044+
}
3045+
3046+
if (nodesToOptimize.empty()) {
3047+
return node;
3048+
}
3049+
3050+
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
3051+
if (inputsToOptimize.Test(i)) {
3052+
if (auto conn = stage.Inputs().Item(i).Maybe<TDqConnection>()) {
3053+
if (!IsSingleConsumerConnection(conn.Cast(), parentsMap, allowStageMultiUsage)) {
3054+
return node;
3055+
}
3056+
}
3057+
}
3058+
}
3059+
3060+
TExprNode::TPtr newStageLambda;
3061+
auto status = RemapExpr(stageLambda.Ptr(), newStageLambda, nodesToOptimize, ctx, TOptimizeExprSettings{nullptr});
3062+
if (status.Level == IGraphTransformer::TStatus::Error) {
3063+
return {};
3064+
}
3065+
3066+
auto res = ctx.ChangeChild(stage.Ref(), TDqStageBase::idx_Program, std::move(newStageLambda));
3067+
3068+
TVector<TExprBase> updatedInputs;
3069+
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
3070+
if (inputsToOptimize.Test(i)) {
3071+
if (auto conn = stage.Inputs().Item(i).Maybe<TDqConnection>()) {
3072+
updatedInputs.push_back(DqPushUnorderedOverDqConnection(conn.Cast(), ctx, optCtx));
3073+
continue;
3074+
} else if (stage.Inputs().Item(i).Maybe<TDqSource>()) {
3075+
if (auto dqOpt = GetDqOptCallback(stage.Inputs().Item(i), typeAnnCtx)) {
3076+
auto updatedSource = dqOpt->ApplyUnordered(stage.Inputs().Item(i).Ptr(), ctx);
3077+
if (!updatedSource) {
3078+
return {};
3079+
}
3080+
updatedInputs.push_back(TExprBase(updatedSource));
3081+
continue;
3082+
}
3083+
}
3084+
}
3085+
updatedInputs.push_back(stage.Inputs().Item(i));
3086+
}
3087+
res = ctx.ChangeChild(*res, TDqStageBase::idx_Inputs,
3088+
Build<TExprList>(ctx, stage.Inputs().Pos())
3089+
.Add(updatedInputs)
3090+
.Done().Ptr());
3091+
3092+
return TExprBase(res);
3093+
}
3094+
3095+
29423096
} // namespace NYql::NDq

ydb/library/yql/dq/opt/dq_opt_phy.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <ydb/library/yql/ast/yql_expr.h>
99
#include <ydb/library/yql/core/yql_expr_optimize.h>
1010

11+
namespace NYql {
12+
struct TTypeAnnotationContext;
13+
}
14+
1115
namespace NYql::NDq {
1216

1317
NNodes::TMaybeNode<NNodes::TDqStage> DqPushLambdaToStage(const NNodes::TDqStage &stage,
@@ -128,4 +132,11 @@ NNodes::TExprBase DqBuildStageWithSourceWrap(NNodes::TExprBase node, TExprContex
128132

129133
NNodes::TExprBase DqBuildStageWithReadWrap(NNodes::TExprBase node, TExprContext& ctx);
130134

135+
NNodes::TExprBase DqPushUnorderedToStage(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
136+
const TParentsMap& parentsMap, bool allowStageMultiUsage);
137+
138+
NNodes::TMaybeNode<NNodes::TExprBase> DqUnorderedOverStageInput(NNodes::TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx,
139+
const TTypeAnnotationContext& typeAnnCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage);
140+
141+
131142
} // namespace NYql::NDq

ydb/library/yql/providers/dq/opt/logical_optimize.cpp

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
114114
, TypesCtx(*typeCtx)
115115
{
116116
#define HNDL(name) "DqsLogical-"#name, Hndl(&TDqsLogicalOptProposalTransformer::name)
117-
AddHandler(0, &TCoUnorderedBase::Match, HNDL(SkipUnordered));
118117
AddHandler(0, &TCoUnorderedBase::Match, HNDL(UnorderedOverDqReadWrap));
119118
AddHandler(0, &TCoExtractMembers::Match, HNDL(ExtractMembersOverDqReadWrap));
120119
AddHandler(0, &TCoCountBase::Match, HNDL(TakeOrSkipOverDqReadWrap));
@@ -146,15 +145,6 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
146145
}
147146

148147
protected:
149-
TMaybeNode<TExprBase> SkipUnordered(TExprBase node, TExprContext& ctx) {
150-
Y_UNUSED(ctx);
151-
const auto unordered = node.Cast<TCoUnorderedBase>();
152-
if (unordered.Input().Maybe<TDqConnection>()) {
153-
return unordered.Input();
154-
}
155-
return node;
156-
}
157-
158148
TMaybeNode<TExprBase> UnorderedOverDqReadWrap(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
159149
return NDq::UnorderedOverDqReadWrap(node, ctx, getParents, Config->EnableDqReplicate.Get().GetOrElse(TDqSettings::TDefault::EnableDqReplicate), TypesCtx);
160150
}
@@ -252,7 +242,7 @@ class TDqsLogicalOptProposalTransformer : public TOptimizeTransformerBase {
252242
return node;
253243
}
254244

255-
TDqLookupSourceWrap lookupSourceWrap = right.Maybe<TDqSourceWrap>()
245+
TDqLookupSourceWrap lookupSourceWrap = right.Maybe<TDqSourceWrap>()
256246
? LookupSourceFromSource(right.Cast<TDqSourceWrap>(), ctx)
257247
: LookupSourceFromRead(right.Cast<TDqReadWrap>(), ctx)
258248
;

ydb/library/yql/providers/dq/opt/physical_optimize.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
5252
AddHandler(0, &TCoLMap::Match, HNDL(PushLMapToStage<false>));
5353
AddHandler(0, &TCoOrderedLMap::Match, HNDL(BuildOrderedLMapOverMuxStage));
5454
AddHandler(0, &TCoLMap::Match, HNDL(BuildLMapOverMuxStage));
55+
AddHandler(0, &TCoUnorderedBase::Match, HNDL(PushUnorderedToStage<false>));
56+
AddHandler(0, &TDqStage::Match, HNDL(UnorderedOverStageInput<false>));
5557
if (enablePrecompute) {
5658
AddHandler(0, &TCoHasItems::Match, HNDL(BuildHasItems<false>));
5759
AddHandler(0, &TCoSqlIn::Match, HNDL(BuildSqlIn<false>));
@@ -80,6 +82,8 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
8082
AddHandler(1, &TCoAssumeSorted::Match, HNDL(BuildSortStage<true>));
8183
AddHandler(1, &TCoOrderedLMap::Match, HNDL(PushOrderedLMapToStage<true>));
8284
AddHandler(1, &TCoLMap::Match, HNDL(PushLMapToStage<true>));
85+
AddHandler(1, &TCoUnorderedBase::Match, HNDL(PushUnorderedToStage<true>));
86+
AddHandler(1, &TDqStage::Match, HNDL(UnorderedOverStageInput<true>));
8387
if (enablePrecompute) {
8488
AddHandler(1, &TCoHasItems::Match, HNDL(BuildHasItems<true>));
8589
AddHandler(1, &TCoSqlIn::Match, HNDL(BuildSqlIn<true>));
@@ -135,6 +139,17 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
135139
return DqBuildLMapOverMuxStage(node, ctx, optCtx, *getParents());
136140
}
137141

142+
template <bool IsGlobal>
143+
TMaybeNode<TExprBase> PushUnorderedToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
144+
return DqPushUnorderedToStage(node, ctx, optCtx, *getParents(), IsGlobal);
145+
}
146+
147+
template <bool IsGlobal>
148+
TMaybeNode<TExprBase> UnorderedOverStageInput(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
149+
return DqUnorderedOverStageInput(node, ctx, optCtx, *Types, *getParents(), IsGlobal);
150+
}
151+
152+
138153
template <bool IsGlobal>
139154
TMaybeNode<TExprBase> PushCombineToStage(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) {
140155
return DqPushCombineToStage(node, ctx, optCtx, *getParents(), IsGlobal);

0 commit comments

Comments
 (0)