@@ -75,11 +75,22 @@ void FilterPushdownWithMultiusage(const TExprNode::TPtr& node, TNodeOnNodeOwnedM
75
75
Sort (immediateParents, [](const TExprNode* left, const TExprNode* right) { return CompareNodes (*left, *right) < 0 ; });
76
76
}
77
77
78
- TVector<const TExprNode*> parentFilters;
79
- TExprNodeList parentFilterLambdas;
80
- TExprNodeList parentValueLambdas;
81
- size_t likelyCount = 0 ;
82
- for (auto parent : immediateParents) {
78
+ struct TConsumerInfo {
79
+ const TExprNode* OriginalFlatMap = nullptr ;
80
+ const TTypeAnnotationNode* OriginalRowType = nullptr ;
81
+ TExprNode::TPtr FilterLambda;
82
+ TExprNode::TPtr ValueLambda;
83
+ TExprNode::TPtr PushdownLambda;
84
+ TString ColumnName;
85
+ };
86
+
87
+ TVector<TConsumerInfo> consumers;
88
+ bool hasOrdered = false ;
89
+ size_t pushdownCount = 0 ;
90
+ const auto inputStructType = node->GetTypeAnn ()->Cast <TListExprType>()->GetItemType ()->Cast <TStructExprType>();
91
+ const auto genColumnNames = GenNoClashColumns (*inputStructType, " _yql_filter_pushdown" , immediateParents.size ());
92
+ for (size_t i = 0 ; i < immediateParents.size (); ++i) {
93
+ const TExprNode* parent = immediateParents[i];
83
94
while (skipNodes.contains (parent->Content ())) {
84
95
auto newParent = optCtx.GetParentIfSingle (*parent);
85
96
if (newParent) {
@@ -92,60 +103,112 @@ void FilterPushdownWithMultiusage(const TExprNode::TPtr& node, TNodeOnNodeOwnedM
92
103
return ;
93
104
}
94
105
106
+ if (TCoOrderedFlatMap::Match (parent)) {
107
+ hasOrdered = true ;
108
+ }
109
+
95
110
TCoFlatMapBase parentFlatMap (parent);
96
111
if (auto cond = parentFlatMap.Lambda ().Body ().Maybe <TCoConditionalValueBase>()) {
97
- if (IsDepended (parentFlatMap.Lambda ().Ref (), *node)) {
112
+ const TCoArgument lambdaArg = parentFlatMap.Lambda ().Args ().Arg (0 );
113
+ auto pred = cond.Cast ().Predicate ();
114
+ if (pred.Maybe <TCoLikely>() ||
115
+ (pred.Maybe <TCoAnd>() && AnyOf (pred.Ref ().ChildrenList (), [](const auto & p) { return p->IsCallable (" Likely" ); })) ||
116
+ !IsStrict (pred.Ptr ()) ||
117
+ HasDependsOn (pred.Ptr (), lambdaArg.Ptr ()) ||
118
+ IsDepended (parentFlatMap.Lambda ().Ref (), *node))
119
+ {
98
120
return ;
99
121
}
100
- likelyCount += bool (cond.Cast ().Predicate ().Maybe <TCoLikely>());
101
- auto pos = cond.Cast ().Predicate ().Pos ();
102
- parentFilterLambdas.push_back (ctx.NewLambda (pos,
103
- ctx.NewArguments (pos, { parentFlatMap.Lambda ().Args ().Arg (0 ).Ptr () }),
104
- cond.Cast ().Predicate ().Ptr ()));
105
- parentValueLambdas.push_back (ctx.NewLambda (pos,
106
- ctx.NewArguments (pos, { parentFlatMap.Lambda ().Args ().Arg (0 ).Ptr () }),
107
- cond.Cast ().Value ().Ptr ()));
108
- parentFilters.push_back (parent);
122
+
123
+ TExprNodeList andPredicates;
124
+ if (pred.Maybe <TCoAnd>()) {
125
+ andPredicates = pred.Ref ().ChildrenList ();
126
+ } else {
127
+ andPredicates.push_back (pred.Ptr ());
128
+ }
129
+
130
+ TExprNodeList pushdownPreds;
131
+ TExprNodeList restPreds;
132
+ for (auto & p : andPredicates) {
133
+ if (TCoMember::Match (p.Get ()) && p->Child (0 ) == lambdaArg.Raw ()) {
134
+ restPreds.push_back (p);
135
+ } else {
136
+ pushdownPreds.push_back (p);
137
+ }
138
+ }
139
+
140
+ const TPositionHandle pos = pred.Pos ();
141
+ consumers.emplace_back ();
142
+ TConsumerInfo& consumer = consumers.back ();
143
+ consumer.OriginalFlatMap = parent;
144
+ consumer.OriginalRowType = lambdaArg.Ref ().GetTypeAnn ();
145
+ consumer.ColumnName = genColumnNames[i];
146
+ if (!pushdownPreds.empty ()) {
147
+ ++pushdownCount;
148
+ restPreds.push_back (
149
+ ctx.Builder (pos)
150
+ .Callable (" Member" )
151
+ .Add (0 , lambdaArg.Ptr ())
152
+ .Atom (1 , consumer.ColumnName )
153
+ .Seal ()
154
+ .Build ());
155
+ auto restPred = ctx.NewCallable (pos, " And" , std::move (restPreds));
156
+ auto pushdownPred = ctx.NewCallable (pos, " And" , std::move (pushdownPreds));
157
+
158
+ consumer.FilterLambda = ctx.NewLambda (pos, ctx.NewArguments (pos, { lambdaArg.Ptr () }), std::move (restPred));
159
+ consumer.PushdownLambda = ctx.NewLambda (pos, ctx.NewArguments (pos, { lambdaArg.Ptr () }), std::move (pushdownPred));
160
+ } else {
161
+ consumer.FilterLambda = ctx.NewLambda (pos, ctx.NewArguments (pos, { lambdaArg.Ptr () }), pred.Ptr ());
162
+ }
163
+ consumer.ValueLambda = ctx.NewLambda (pos, ctx.NewArguments (pos, { lambdaArg.Ptr () }), cond.Cast ().Value ().Ptr ());
109
164
} else {
110
165
return ;
111
166
}
112
167
}
113
- YQL_ENSURE (parentFilterLambdas. size () > 1 );
114
- if (likelyCount == parentFilters. size () ) {
168
+
169
+ if (!pushdownCount ) {
115
170
return ;
116
171
}
117
172
118
- YQL_CLOG (DEBUG, Core) << " Pushdown " << parentFilters .size () << " filters to common parent " << node->Content ();
173
+ YQL_CLOG (DEBUG, Core) << " Pushdown predicate from " << pushdownCount << " filters (out of total " << consumers .size () << " ) to common parent " << node->Content ();
119
174
120
- const auto inputStructType = node-> GetTypeAnn ()-> Cast <TListExprType>()-> GetItemType ()-> Cast <TStructExprType>( );
121
- const auto genColumnNames = GenNoClashColumns (*inputStructType, " _yql_filter_pushdown " , parentFilterLambdas .size ());
175
+ YQL_ENSURE (consumers. size () > 1 );
176
+ YQL_ENSURE (consumers. size () == immediateParents .size ());
122
177
123
178
TExprNode::TPtr mapArg = ctx.NewArgument (node->Pos (), " row" );
124
179
TExprNode::TPtr mapBody = mapArg;
125
180
TExprNode::TPtr filterArg = ctx.NewArgument (node->Pos (), " row" );
126
181
TExprNodeList filterPreds;
127
- for (size_t i = 0 ; i < parentFilterLambdas.size (); ++i) {
128
- TString memberName = genColumnNames[i];
129
- mapBody = ctx.Builder (mapBody->Pos ())
130
- .Callable (" AddMember" )
131
- .Add (0 , mapBody)
132
- .Atom (1 , memberName)
133
- .Apply (2 , parentFilterLambdas[i])
134
- .With (0 , mapArg)
182
+ for (size_t i = 0 ; i < consumers.size (); ++i) {
183
+ const TConsumerInfo& consumer = consumers[i];
184
+ if (consumer.PushdownLambda ) {
185
+ mapBody = ctx.Builder (mapBody->Pos ())
186
+ .Callable (" AddMember" )
187
+ .Add (0 , mapBody)
188
+ .Atom (1 , consumer.ColumnName )
189
+ .Apply (2 , consumer.PushdownLambda )
190
+ .With (0 )
191
+ .Callable (" CastStruct" )
192
+ .Add (0 , mapArg)
193
+ .Add (1 , ExpandType (mapArg->Pos (), *consumer.OriginalRowType , ctx))
194
+ .Seal ()
195
+ .Done ()
196
+ .Seal ()
135
197
.Seal ()
136
- .Seal ()
137
- .Build ();
198
+ .Build ();
199
+ }
200
+
138
201
filterPreds.push_back (ctx.Builder (node->Pos ())
139
- .Callable ( " Member " )
140
- . Add ( 0 , filterArg)
141
- .Atom ( 1 , memberName )
202
+ .Apply (consumer. FilterLambda )
203
+ // CastStruct is not needed here, since FilterLambda is AND over column references
204
+ .With ( 0 , filterArg )
142
205
.Seal ()
143
206
.Build ());
144
207
}
145
208
146
209
auto newNode = ctx.Builder (node->Pos ())
147
- .Callable (" OrderedFilter" )
148
- .Callable (0 , " OrderedMap" )
210
+ .Callable (hasOrdered ? " OrderedFilter" : " Filter " )
211
+ .Callable (0 , hasOrdered ? " OrderedMap" : " Map " )
149
212
.Add (0 , node)
150
213
.Add (1 , ctx.NewLambda (node->Pos (), ctx.NewArguments (node->Pos (), { mapArg }), std::move (mapBody)))
151
214
.Seal ()
@@ -156,12 +219,15 @@ void FilterPushdownWithMultiusage(const TExprNode::TPtr& node, TNodeOnNodeOwnedM
156
219
for (size_t i = 0 ; i < immediateParents.size (); ++i) {
157
220
const TExprNode* curr = immediateParents[i];
158
221
TExprNode::TPtr resultNode = newNode;
159
- while (curr != parentFilters[i]) {
222
+ const TConsumerInfo& consumer = consumers[i];
223
+ while (curr != consumer.OriginalFlatMap ) {
160
224
if (curr->IsCallable (" AssumeColumnOrder" )) {
161
225
resultNode = ctx.ChangeChild (*ctx.RenameNode (*curr, " AssumeColumnOrderPartial" ), 0 , std::move (resultNode));
162
226
} else if (curr->IsCallable (" ExtractMembers" )) {
163
227
TExprNodeList columns = curr->Child (1 )->ChildrenList ();
164
- columns.push_back (ctx.NewAtom (curr->Child (1 )->Pos (), genColumnNames[i]));
228
+ if (consumer.PushdownLambda ) {
229
+ columns.push_back (ctx.NewAtom (curr->Child (1 )->Pos (), consumer.ColumnName ));
230
+ }
165
231
resultNode = ctx.ChangeChildren (*curr, { resultNode, ctx.NewList (curr->Child (1 )->Pos (), std::move (columns)) });
166
232
} else {
167
233
resultNode = ctx.ChangeChild (*curr, 0 , std::move (resultNode));
@@ -173,24 +239,20 @@ void FilterPushdownWithMultiusage(const TExprNode::TPtr& node, TNodeOnNodeOwnedM
173
239
TCoFlatMapBase flatMap (curr);
174
240
TCoConditionalValueBase cond = flatMap.Lambda ().Body ().Cast <TCoConditionalValueBase>();
175
241
TExprNode::TPtr input = flatMap.Input ().Ptr ();
176
- const TTypeAnnotationNode* originalType = input->GetTypeAnn ()->Cast <TListExprType>()->GetItemType ();
177
- toOptimize[parentFilters[i]] = ctx.Builder (curr->Pos ())
242
+ toOptimize[consumer.OriginalFlatMap ] = ctx.Builder (curr->Pos ())
178
243
.Callable (flatMap.CallableName ())
179
244
.Add (0 , resultNode)
180
245
.Lambda (1 )
181
246
.Param (" row" )
182
247
.Callable (cond.CallableName ())
183
- .Callable (0 , " Likely" )
184
- .Callable (0 , " Member" )
185
- .Arg (0 , " row" )
186
- .Atom (1 , genColumnNames[i])
187
- .Seal ()
248
+ .Apply (0 , consumer.FilterLambda )
249
+ .With (0 , " row" )
188
250
.Seal ()
189
- .Apply (1 , parentValueLambdas[i] )
251
+ .Apply (1 , consumer. ValueLambda )
190
252
.With (0 )
191
253
.Callable (" CastStruct" )
192
254
.Arg (0 , " row" )
193
- .Add (1 , ExpandType (curr->Pos (), *originalType , ctx))
255
+ .Add (1 , ExpandType (curr->Pos (), *consumer. OriginalRowType , ctx))
194
256
.Seal ()
195
257
.Done ()
196
258
.Seal ()
0 commit comments