Skip to content

Commit 3a5cfed

Browse files
authored
Do not use OlapApply for known functions pushdown (#15055)
1 parent 80dee28 commit 3a5cfed

File tree

6 files changed

+205
-201
lines changed

6 files changed

+205
-201
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy_olap_filter.cpp

Lines changed: 112 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,32 @@ static const std::unordered_set<std::string> SecondLevelFilters = {
2424
"ends_with"
2525
};
2626

27+
static TMaybeNode<TExprBase> CombinePredicatesWithAnd(const TVector<TExprBase>& conjuncts, TExprContext& ctx, TPositionHandle pos, bool useOlapAnd, bool trueForEmpty) {
28+
if (conjuncts.empty()) {
29+
return trueForEmpty ? TMaybeNode<TExprBase>{MakeBool<true>(pos, ctx)} : TMaybeNode<TExprBase>{};
30+
} else if (conjuncts.size() == 1) {
31+
return conjuncts[0];
32+
} else {
33+
if (useOlapAnd) {
34+
return Build<TKqpOlapAnd>(ctx, pos)
35+
.Add(conjuncts)
36+
.Done();
37+
} else {
38+
return Build<TCoAnd>(ctx, pos)
39+
.Add(conjuncts)
40+
.Done();
41+
}
42+
}
43+
}
44+
45+
static TMaybeNode<TExprBase> CombinePredicatesWithAnd(const TVector<TOLAPPredicateNode>& conjuncts, TExprContext& ctx, TPositionHandle pos, bool useOlapAnd, bool trueForEmpty) {
46+
TVector<TExprBase> exprs;
47+
for(const auto& c: conjuncts) {
48+
exprs.emplace_back(c.ExprNode);
49+
}
50+
return CombinePredicatesWithAnd(exprs, ctx, pos, useOlapAnd, trueForEmpty);
51+
}
52+
2753
struct TFilterOpsLevels {
2854
TFilterOpsLevels(const TMaybeNode<TExprBase>& firstLevel, const TMaybeNode<TExprBase>& secondLevel)
2955
: FirstLevelOps(firstLevel)
@@ -69,6 +95,23 @@ struct TFilterOpsLevels {
6995
}
7096

7197

98+
static TFilterOpsLevels Merge(TVector<TFilterOpsLevels> predicates, TExprContext& ctx, TPositionHandle pos) {
99+
TVector<TExprBase> predicatesFirstLevel;
100+
TVector<TExprBase> predicatesSecondLevel;
101+
for (const auto& p: predicates) {
102+
if (p.FirstLevelOps.IsValid()) {
103+
predicatesFirstLevel.emplace_back(p.FirstLevelOps.Cast());
104+
}
105+
if (p.SecondLevelOps.IsValid()) {
106+
predicatesSecondLevel.emplace_back(p.SecondLevelOps.Cast());
107+
}
108+
}
109+
return {
110+
CombinePredicatesWithAnd(predicatesFirstLevel, ctx, pos, true, false),
111+
CombinePredicatesWithAnd(predicatesSecondLevel, ctx, pos, true, false),
112+
};
113+
}
114+
72115
TMaybeNode<TExprBase> FirstLevelOps;
73116
TMaybeNode<TExprBase> SecondLevelOps;
74117
};
@@ -262,7 +305,6 @@ TMaybeNode<TExprBase> SafeCastPredicatePushdown(const TCoFlatMap& inputFlatmap,
262305

263306
std::vector<TExprBase> ConvertComparisonNode(const TExprBase& nodeIn, const TExprNode& argument, TExprContext& ctx, TPositionHandle pos)
264307
{
265-
std::vector<TExprBase> out;
266308
const auto convertNode = [&ctx, &pos, &argument](const TExprBase& node) -> TMaybeNode<TExprBase> {
267309
if (node.Maybe<TCoNull>()) {
268310
return node;
@@ -368,36 +410,27 @@ std::vector<TExprBase> ConvertComparisonNode(const TExprBase& nodeIn, const TExp
368410
}
369411
};
370412

371-
// Columns & values may be single element
372-
TMaybeNode<TExprBase> node = convertNode(nodeIn);
373-
374-
if (node.IsValid()) {
375-
out.emplace_back(std::move(node.Cast()));
376-
return out;
377-
}
378-
379-
// Or columns and values can be Tuple
380-
if (!nodeIn.Maybe<TExprList>()) {
381-
// something unusual found, return empty vector
382-
return out;
383-
}
413+
if (const auto& list = nodeIn.Maybe<TExprList>()) {
414+
const auto& tuple = list.Cast();
415+
std::vector<TExprBase> out;
384416

385-
auto tuple = nodeIn.Cast<TExprList>();
417+
out.reserve(tuple.Size());
418+
for (ui32 i = 0; i < tuple.Size(); ++i) {
419+
TMaybeNode<TExprBase> node = convertNode(tuple.Item(i));
386420

387-
out.reserve(tuple.Size());
388-
389-
for (ui32 i = 0; i < tuple.Size(); ++i) {
390-
TMaybeNode<TExprBase> node = convertNode(tuple.Item(i));
421+
if (!node.IsValid()) {
422+
// Return empty vector
423+
return TVector<TExprBase>();
424+
}
391425

392-
if (!node.IsValid()) {
393-
// Return empty vector
394-
return TVector<TExprBase>();
426+
out.emplace_back(node.Cast());
395427
}
396-
397-
out.emplace_back(node.Cast());
428+
return out;
429+
} else if (const auto& node = convertNode(nodeIn); node.IsValid()) {
430+
return {node.Cast()};
431+
} else {
432+
return {};
398433
}
399-
400-
return out;
401434
}
402435

403436
TExprBase BuildOneElementComparison(const std::pair<TExprBase, TExprBase>& parameter, const TCoCompare& predicate,
@@ -663,50 +696,22 @@ TFilterOpsLevels PredicatePushdown(const TExprBase& predicate, const TExprNode&
663696
return YqlApplyPushdown(predicate, argument, ctx);
664697
}
665698

666-
TOLAPPredicateNode WrapPredicates(const std::vector<TOLAPPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
667-
if (predicates.empty()) {
668-
return {};
669-
}
670-
671-
if (const auto predicatesSize = predicates.size(); 1U == predicatesSize) {
672-
return predicates.front();
673-
} else {
674-
TOLAPPredicateNode result;
675-
result.Children = predicates;
676-
result.CanBePushed = true;
677-
678-
TVector<NNodes::TExprBase> exprNodes;
679-
exprNodes.reserve(predicatesSize);
680-
for (const auto& pred : predicates) {
681-
exprNodes.emplace_back(pred.ExprNode);
682-
result.CanBePushed &= pred.CanBePushed;
683-
}
684-
result.ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
685-
.Add(exprNodes)
686-
.Done().Ptr();
687-
return result;
688-
}
689-
}
690-
691-
void SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree, TOLAPPredicateNode& predicatesToPush, TOLAPPredicateNode& remainingPredicates,
692-
TExprContext& ctx, TPositionHandle pos)
699+
std::pair<TVector<TOLAPPredicateNode>, TVector<TOLAPPredicateNode>> SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree)
693700
{
694701
if (predicateTree.CanBePushed) {
695-
predicatesToPush = predicateTree;
696-
remainingPredicates.ExprNode = MakeBool<true>(pos, ctx);
697-
return;
702+
return {{predicateTree}, {}};
698703
}
699704

700705
if (!TCoAnd::Match(predicateTree.ExprNode.Get())) {
701706
// We can partially pushdown predicates from AND operator only.
702707
// For OR operator we would need to have several read operators which is not acceptable.
703708
// TODO: Add support for NOT(op1 OR op2), because it expands to (!op1 AND !op2).
704-
remainingPredicates = predicateTree;
705-
return;
709+
return {{}, {predicateTree}};
706710
}
707711

708712
bool isFoundNotStrictOp = false;
709-
std::vector<TOLAPPredicateNode> pushable, remaining;
713+
TVector<TOLAPPredicateNode> pushable;
714+
TVector<TOLAPPredicateNode> remaining;
710715
for (const auto& predicate : predicateTree.Children) {
711716
if (predicate.CanBePushed && !isFoundNotStrictOp) {
712717
pushable.emplace_back(predicate);
@@ -717,8 +722,7 @@ void SplitForPartialPushdown(const TOLAPPredicateNode& predicateTree, TOLAPPredi
717722
remaining.emplace_back(predicate);
718723
}
719724
}
720-
predicatesToPush = WrapPredicates(pushable, ctx, pos);
721-
remainingPredicates = WrapPredicates(remaining, ctx, pos);
725+
return {pushable, remaining};
722726
}
723727

724728
} // anonymous namespace end
@@ -742,6 +746,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
742746
}
743747

744748
const auto& lambda = flatmap.Lambda();
749+
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
745750

746751
YQL_CLOG(TRACE, ProviderKqp) << "Initial OLAP lambda: " << KqpExprToPrettyString(lambda, ctx);
747752

@@ -754,55 +759,68 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
754759
auto predicate = optionaIf.Predicate();
755760
auto value = optionaIf.Value();
756761

757-
if constexpr (NSsa::RuntimeVersion >= 5U) {
758-
TExprNode::TPtr afterPeephole;
759-
bool hasNonDeterministicFunctions;
760-
if (const auto status = PeepHoleOptimizeNode(optionaIf.Ptr(), afterPeephole, ctx, typesCtx, nullptr, hasNonDeterministicFunctions);
761-
status != IGraphTransformer::TStatus::Ok) {
762-
YQL_CLOG(ERROR, ProviderKqp) << "Peephole OLAP failed." << Endl << ctx.IssueManager.GetIssues().ToString();
763-
return node;
764-
}
765-
766-
const TCoIf simplified(std::move(afterPeephole));
767-
predicate = simplified.Predicate();
768-
value = simplified.ThenValue().Cast<TCoJust>().Input();
769-
}
770-
771762
TOLAPPredicateNode predicateTree;
772763
predicateTree.ExprNode = predicate.Ptr();
773-
const auto& lambdaArg = lambda.Args().Arg(0).Ref();
774-
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body());
764+
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body(), false);
775765
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
776766

777-
TOLAPPredicateNode predicatesToPush, remainingPredicates;
778-
SplitForPartialPushdown(predicateTree, predicatesToPush, remainingPredicates, ctx, node.Pos());
779-
if (!predicatesToPush.IsValid()) {
780-
return node;
767+
auto [pushable, remaining] = SplitForPartialPushdown(predicateTree);
768+
TVector<TFilterOpsLevels> pushedPredicates;
769+
for (const auto& p: pushable) {
770+
pushedPredicates.emplace_back(PredicatePushdown(TExprBase(p.ExprNode), lambdaArg, ctx, node.Pos()));
781771
}
782772

783-
YQL_ENSURE(predicatesToPush.IsValid(), "Predicates to push is invalid");
784-
YQL_ENSURE(remainingPredicates.IsValid(), "Remaining predicates is invalid");
785-
786-
const auto pushedFilters = PredicatePushdown(TExprBase(predicatesToPush.ExprNode), lambdaArg, ctx, node.Pos());
787-
// Temporary fix for https://st.yandex-team.ru/KIKIMR-22560
788-
// YQL_ENSURE(pushedFilters.IsValid(), "Pushed predicate should be always valid!");
789-
790-
if (!pushedFilters.IsValid()) {
773+
if constexpr (NSsa::RuntimeVersion >= 5U) {
774+
TVector<TOLAPPredicateNode> remainingAfterApply;
775+
for(const auto& p: remaining) {
776+
const auto recoveredOptinalIfForNonPushedDownPredicates = Build<TCoOptionalIf>(ctx, node.Pos())
777+
.Predicate(p.ExprNode)
778+
.Value(value)
779+
.Build();
780+
TExprNode::TPtr afterPeephole;
781+
bool hasNonDeterministicFunctions;
782+
if (const auto status = PeepHoleOptimizeNode(recoveredOptinalIfForNonPushedDownPredicates.Value().Ptr(), afterPeephole, ctx, typesCtx, nullptr, hasNonDeterministicFunctions);
783+
status != IGraphTransformer::TStatus::Ok) {
784+
YQL_CLOG(ERROR, ProviderKqp) << "Peephole OLAP failed." << Endl << ctx.IssueManager.GetIssues().ToString();
785+
return node;
786+
}
787+
const TCoIf simplified(std::move(afterPeephole));
788+
predicate = simplified.Predicate();
789+
value = simplified.ThenValue().Cast<TCoJust>().Input();
790+
791+
TOLAPPredicateNode predicateTree;
792+
predicateTree.ExprNode = predicate.Ptr();
793+
CollectPredicates(predicate, predicateTree, &lambdaArg, read.Process().Body(), true);
794+
YQL_ENSURE(predicateTree.IsValid(), "Collected OLAP predicates are invalid");
795+
auto [pushableWithApply, remaining] = SplitForPartialPushdown(predicateTree);
796+
for (const auto& p: pushableWithApply) {
797+
pushedPredicates.emplace_back(PredicatePushdown(TExprBase(p.ExprNode), lambdaArg, ctx, node.Pos()));
798+
}
799+
remainingAfterApply.insert(remainingAfterApply.end(), remaining.begin(), remaining.end());
800+
}
801+
remaining = std::move(remainingAfterApply);
802+
}
803+
804+
if (pushedPredicates.empty()) {
791805
return node;
792806
}
793807

808+
const auto& pushedFilter = TFilterOpsLevels::Merge(pushedPredicates, ctx, node.Pos());
809+
810+
const auto remainingFilter = CombinePredicatesWithAnd(remaining, ctx, node.Pos(), false, true);
811+
794812
TMaybeNode<TExprBase> olapFilter;
795-
if (pushedFilters.FirstLevelOps.IsValid()) {
813+
if (pushedFilter.FirstLevelOps.IsValid()) {
796814
olapFilter = Build<TKqpOlapFilter>(ctx, node.Pos())
797815
.Input(read.Process().Body())
798-
.Condition(pushedFilters.FirstLevelOps.Cast())
816+
.Condition(pushedFilter.FirstLevelOps.Cast())
799817
.Done();
800818
}
801819

802-
if (pushedFilters.SecondLevelOps.IsValid()) {
820+
if (pushedFilter.SecondLevelOps.IsValid()) {
803821
olapFilter = Build<TKqpOlapFilter>(ctx, node.Pos())
804822
.Input(olapFilter.IsValid() ? olapFilter.Cast() : read.Process().Body())
805-
.Condition(pushedFilters.SecondLevelOps.Cast())
823+
.Condition(pushedFilter.SecondLevelOps.Cast())
806824
.Done();
807825
}
808826

@@ -846,7 +864,7 @@ TExprBase KqpPushOlapFilter(TExprBase node, TExprContext& ctx, const TKqpOptimiz
846864
.Args({"new_arg"})
847865
.Body<TCoOptionalIf>()
848866
.Predicate<TExprApplier>()
849-
.Apply(TExprBase(remainingPredicates.ExprNode))
867+
.Apply(remainingFilter.Cast())
850868
.With(lambda.Args().Arg(0), "new_arg")
851869
.Build()
852870
.Value<TExprApplier>()

0 commit comments

Comments
 (0)