Skip to content

Commit e2dfc63

Browse files
authored
Properly optimize unordered YtOutput over YtDqProcessWrite (#8411)
1 parent 2a8d528 commit e2dfc63

File tree

34 files changed

+8997
-8933
lines changed

34 files changed

+8997
-8933
lines changed

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_write.cpp

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -669,30 +669,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::YtDqWrite(TExprBase nod
669669
.Build().Done();
670670
}
671671

672-
const auto& items = GetSeqItemType(write.Input().Ref().GetTypeAnn())->Cast<TStructExprType>()->GetItems();
673-
auto expand = ctx.Builder(write.Pos())
674-
.Callable("ExpandMap")
675-
.Add(0, write.Input().Ptr())
676-
.Lambda(1)
677-
.Param("item")
678-
.Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& {
679-
ui32 i = 0U;
680-
for (const auto& item : items) {
681-
lambda.Callable(i++, "Member")
682-
.Arg(0, "item")
683-
.Atom(1, item->GetName())
684-
.Seal();
685-
}
686-
return lambda;
687-
})
688-
.Seal()
689-
.Seal().Build();
690-
691-
return Build<TCoDiscard>(ctx, write.Pos())
692-
.Input<TYtDqWideWrite>()
693-
.Input(std::move(expand))
694-
.Settings(write.Settings())
695-
.Build().Done();
672+
return node;
696673
}
697674

698675
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::Fill(TExprBase node, TExprContext& ctx) const {

ydb/library/yql/providers/yt/provider/yql_yt_peephole.cpp

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class TYtPeepholeTransformer : public TOptimizeTransformerBase {
2626
{
2727
#define HNDL(name) "Peephole-"#name, Hndl(&TYtPeepholeTransformer::name)
2828
AddHandler(0, &TYtLength::Match, HNDL(OptimizeLength));
29-
AddHandler(0, &TYtDqWideWrite::Match, HNDL(OptimizeYtDqWideWrite));
29+
AddHandler(0, &TYtDqWrite::Match, HNDL(OptimizeYtDqWrite));
3030
#undef HNDL
3131
}
3232

@@ -59,9 +59,37 @@ class TYtPeepholeTransformer : public TOptimizeTransformerBase {
5959
return node;
6060
}
6161

62-
TMaybeNode<TExprBase> OptimizeYtDqWideWrite(TExprBase node, TExprContext& ctx) {
62+
TMaybeNode<TExprBase> OptimizeYtDqWrite(TExprBase node, TExprContext& ctx) {
63+
const auto write = node.Cast<TYtDqWrite>();
64+
65+
const auto& items = GetSeqItemType(write.Input().Ref().GetTypeAnn())->Cast<TStructExprType>()->GetItems();
66+
auto expand = ctx.Builder(write.Pos())
67+
.Callable("ExpandMap")
68+
.Add(0, write.Input().Ptr())
69+
.Lambda(1)
70+
.Param("item")
71+
.Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& {
72+
ui32 i = 0U;
73+
for (const auto& item : items) {
74+
lambda.Callable(i++, "Member")
75+
.Arg(0, "item")
76+
.Atom(1, item->GetName())
77+
.Seal();
78+
}
79+
return lambda;
80+
})
81+
.Seal()
82+
.Seal().Build();
83+
84+
TYtDqWideWrite wideWrite = Build<TYtDqWideWrite>(ctx, write.Pos())
85+
.Input(std::move(expand))
86+
.Settings(write.Settings())
87+
.Done();
88+
6389
if (Settings_.empty()) {
64-
return node;
90+
return Build<TCoDiscard>(ctx, write.Pos())
91+
.Input(wideWrite)
92+
.Done();
6593
}
6694

6795
auto cluster = Settings_.at("yt_cluster");
@@ -73,11 +101,7 @@ class TYtPeepholeTransformer : public TOptimizeTransformerBase {
73101
auto outSpec = Settings_.at("yt_outSpec");
74102
auto tx = Settings_.at("yt_tx");
75103

76-
// Check that we optimize only single YtDqWideWrite
77-
if (auto setting = GetSetting(TYtDqWideWrite(&node.Ref()).Settings().Ref(), "table")) {
78-
YQL_ENSURE(setting->Child(1)->Content() == table);
79-
return node;
80-
}
104+
YQL_ENSURE(!HasSetting(wideWrite.Settings().Ref(), "table"));
81105

82106
TMaybeNode<TCoSecureParam> secParams;
83107
if (State_->Configuration->Auth.Get().GetOrElse(TString())) {
@@ -119,7 +143,9 @@ class TYtPeepholeTransformer : public TOptimizeTransformerBase {
119143
.Build()
120144
.Done().Ptr();
121145

122-
return ctx.ChangeChild(node.Ref(), TYtDqWideWrite::idx_Settings, std::move(settings));
146+
return Build<TCoDiscard>(ctx, write.Pos())
147+
.Input(ctx.ChangeChild(wideWrite.Ref(), TYtDqWideWrite::idx_Settings, std::move(settings)))
148+
.Done();
123149
}
124150

125151
const TYtState::TPtr State_;

ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
991991
}
992992

993993
bool isFill = false;
994+
bool isYtDqProcessWrite = false;
994995
int lambdaIdx = -1;
995996
TExprNode::TPtr lambda;
996997
if (TYtMap::Match(&node)) {
@@ -1000,6 +1001,9 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
10001001
} else if (TYtFill::Match(&node)) {
10011002
lambdaIdx = TYtFill::idx_Content;
10021003
isFill = true;
1004+
} else if (TYtDqProcessWrite::Match(&node)) {
1005+
lambdaIdx = TYtDqProcessWrite::idx_Input;
1006+
isYtDqProcessWrite = true;
10031007
}
10041008
if (-1 != lambdaIdx && !hasOtherSortedOuts) {
10051009
if (isFill) {
@@ -1013,20 +1017,46 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
10131017
.Build()
10141018
.Done().Ptr();
10151019
}
1020+
} else if (isYtDqProcessWrite) {
1021+
TProcessedNodesSet processedNodes;
1022+
TNodeOnNodeOwnedMap remaps;
1023+
VisitExpr(node.ChildPtr(lambdaIdx), [&processedNodes, &remaps, &ctx](const TExprNode::TPtr& n) {
1024+
if (TYtOutput::Match(n.Get())) {
1025+
// Stop traversing dependent operations
1026+
processedNodes.insert(n->UniqueId());
1027+
return false;
1028+
}
1029+
if (TYtDqWrite::Match(n.Get())) {
1030+
auto newInput = Build<TCoUnordered>(ctx, n->Pos())
1031+
.Input(n->ChildPtr(TYtDqWrite::idx_Input))
1032+
.Done();
1033+
remaps[n.Get()] = ctx.ChangeChild(*n, TYtDqWrite::idx_Input, newInput.Ptr());
1034+
}
1035+
return true;
1036+
});
1037+
if (!remaps.empty()) {
1038+
TOptimizeExprSettings settings{State_->Types};
1039+
settings.ProcessedNodes = &processedNodes;
1040+
auto status = RemapExpr(node.ChildPtr(lambdaIdx), lambda, remaps, ctx, settings);
1041+
if (status.Level == IGraphTransformer::TStatus::Error) {
1042+
return {};
1043+
}
1044+
}
1045+
10161046
} else {
10171047
TProcessedNodesSet processedNodes;
10181048
TNodeOnNodeOwnedMap remaps;
1019-
VisitExpr(node.ChildPtr(lambdaIdx), [&processedNodes, &remaps, &ctx](const TExprNode::TPtr& node) {
1020-
if (TYtOutput::Match(node.Get())) {
1049+
VisitExpr(node.ChildPtr(lambdaIdx), [&processedNodes, &remaps, &ctx](const TExprNode::TPtr& n) {
1050+
if (TYtOutput::Match(n.Get())) {
10211051
// Stop traversing dependent operations
1022-
processedNodes.insert(node->UniqueId());
1052+
processedNodes.insert(n->UniqueId());
10231053
return false;
10241054
}
1025-
auto name = node->Content();
1026-
if (node->IsCallable() && node->ChildrenSize() > 0 && name.SkipPrefix("Ordered")) {
1027-
const auto inputKind = node->Child(0)->GetTypeAnn()->GetKind();
1055+
auto name = n->Content();
1056+
if (n->IsCallable() && n->ChildrenSize() > 0 && name.SkipPrefix("Ordered")) {
1057+
const auto inputKind = n->Child(0)->GetTypeAnn()->GetKind();
10281058
if (inputKind == ETypeAnnotationKind::Stream || inputKind == ETypeAnnotationKind::Flow) {
1029-
remaps[node.Get()] = ctx.RenameNode(*node, name);
1059+
remaps[n.Get()] = ctx.RenameNode(*n, name);
10301060
}
10311061
}
10321062
return true;
@@ -1052,17 +1082,13 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
10521082
}
10531083

10541084
if (lambdaIdx != -1 && AnyOf(filterColumns, [](const TExprNode::TPtr& p) { return !!p; })) {
1055-
if (!lambda) {
1056-
lambda = node.ChildPtr(lambdaIdx);
1057-
}
1085+
1086+
TExprNode::TPtr extractLambda;
10581087
if (op.Output().Size() == 1) {
1059-
lambda = Build<TCoLambda>(ctx, lambda->Pos())
1088+
extractLambda = Build<TCoLambda>(ctx, lambda->Pos())
10601089
.Args({"stream"})
10611090
.Body<TCoExtractMembers>()
1062-
.Input<TExprApplier>()
1063-
.Apply(TCoLambda(lambda))
1064-
.With(0, "stream")
1065-
.Build()
1091+
.Input("stream")
10661092
.Members(filterColumns[0])
10671093
.Build()
10681094
.Done().Ptr();
@@ -1103,14 +1129,11 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
11031129
}
11041130
}
11051131

1106-
lambda = Build<TCoLambda>(ctx, lambda->Pos())
1132+
extractLambda = Build<TCoLambda>(ctx, lambda->Pos())
11071133
.Args({"stream"})
11081134
.Body<TCoFlatMapBase>()
11091135
.CallableName(hasOtherSortedOuts ? TCoOrderedFlatMap::CallableName() : TCoFlatMap::CallableName())
1110-
.Input<TExprApplier>()
1111-
.Apply(TCoLambda(lambda))
1112-
.With(0, "stream")
1113-
.Build()
1136+
.Input("stream")
11141137
.Lambda()
11151138
.Args({"var"})
11161139
.Body<TCoJust>()
@@ -1125,6 +1148,44 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
11251148
.Build()
11261149
.Done().Ptr();
11271150
}
1151+
1152+
1153+
if (!lambda) {
1154+
lambda = node.ChildPtr(lambdaIdx);
1155+
}
1156+
if (isYtDqProcessWrite) {
1157+
TProcessedNodesSet processedNodes;
1158+
TNodeOnNodeOwnedMap remaps;
1159+
VisitExpr(lambda, [&processedNodes, &remaps, extractLambda, &ctx](const TExprNode::TPtr& n) {
1160+
if (TYtOutput::Match(n.Get())) {
1161+
// Stop traversing dependent operations
1162+
processedNodes.insert(n->UniqueId());
1163+
return false;
1164+
}
1165+
if (auto dqWrite = TMaybeNode<TYtDqWrite>(n)) {
1166+
auto newWrite = Build<TYtDqWrite>(ctx, n->Pos())
1167+
.InitFrom(dqWrite.Cast())
1168+
.Input<TExprApplier>()
1169+
.Apply(TCoLambda(extractLambda))
1170+
.With(0, dqWrite.Cast().Input())
1171+
.Build()
1172+
.Done();
1173+
remaps[n.Get()] = newWrite.Ptr();
1174+
}
1175+
return true;
1176+
});
1177+
if (!remaps.empty()) {
1178+
TOptimizeExprSettings settings{State_->Types};
1179+
settings.ProcessedNodes = &processedNodes;
1180+
auto status = RemapExpr(lambda, lambda, remaps, ctx, settings);
1181+
if (status.Level == IGraphTransformer::TStatus::Error) {
1182+
return {};
1183+
}
1184+
}
1185+
1186+
} else {
1187+
lambda = ctx.FuseLambdas(*extractLambda, *lambda);
1188+
}
11281189
}
11291190
}
11301191

0 commit comments

Comments
 (0)