Skip to content

Commit 2e37a1c

Browse files
authored
YQ-3846 RD support OR split during pushdown (#11482)
1 parent 248c41c commit 2e37a1c

File tree

8 files changed

+84
-40
lines changed

8 files changed

+84
-40
lines changed

ydb/library/yql/providers/common/pushdown/physical_opt.cpp

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,29 @@ using namespace NNodes;
1111

1212
namespace {
1313

14-
TPredicateNode SplitForPartialPushdown(
15-
const NPushdown::TPredicateNode& predicateTree,
16-
TExprContext& ctx,
17-
TPositionHandle pos) {
14+
TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, TExprContext& ctx, TPositionHandle pos, const TSettings& settings) {
1815
if (predicateTree.CanBePushed) {
1916
return predicateTree;
2017
}
2118

22-
if (predicateTree.Op != NPushdown::EBoolOp::And) {
23-
return NPushdown::TPredicateNode(); // Not valid, => return the same node from optimizer
19+
if (predicateTree.Op != NPushdown::EBoolOp::And && (!settings.IsEnabled(TSettings::EFeatureFlag::SplitOrOperator) || predicateTree.Op != NPushdown::EBoolOp::Or)) {
20+
// Predicate can't be split, so return invalid value and skip this branch
21+
return NPushdown::TPredicateNode();
2422
}
2523

2624
std::vector<NPushdown::TPredicateNode> pushable;
2725
for (auto& predicate : predicateTree.Children) {
28-
if (predicate.CanBePushed) {
29-
pushable.emplace_back(predicate);
26+
NPushdown::TPredicateNode pushablePredicate = SplitForPartialPushdown(predicate, ctx, pos, settings);
27+
if (pushablePredicate.IsValid()) {
28+
pushable.emplace_back(pushablePredicate);
29+
} else if (predicateTree.Op == NPushdown::EBoolOp::Or) {
30+
// One of the OR branch was invalid, so the whole predicate is invalid
31+
return NPushdown::TPredicateNode();
3032
}
3133
}
34+
3235
NPushdown::TPredicateNode predicateToPush;
33-
predicateToPush.SetPredicates(pushable, ctx, pos);
36+
predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op);
3437
return predicateToPush;
3538
}
3639

@@ -51,7 +54,7 @@ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContex
5154
NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg.Get(), TExprBase(lambdaArg), settings);
5255
YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid");
5356

54-
NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos);
57+
NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos, settings);
5558
if (!predicateToPush.IsValid()) {
5659
return {};
5760
}

ydb/library/yql/providers/common/pushdown/predicate_node.cpp

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,38 @@ bool TPredicateNode::IsValid() const {
3030
return res && ExprNode.IsValid();
3131
}
3232

33-
void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
33+
void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op) {
3434
auto predicatesSize = predicates.size();
3535
if (predicatesSize == 0) {
3636
return;
37-
} else if (predicatesSize == 1) {
37+
}
38+
if (predicatesSize == 1) {
3839
*this = predicates[0];
39-
} else {
40-
Op = EBoolOp::And;
41-
Children = predicates;
42-
CanBePushed = true;
43-
44-
TVector<NNodes::TExprBase> exprNodes;
45-
exprNodes.reserve(predicatesSize);
46-
for (auto& pred : predicates) {
47-
exprNodes.emplace_back(pred.ExprNode.Cast());
48-
CanBePushed &= pred.CanBePushed;
49-
}
50-
ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
51-
.Add(exprNodes)
52-
.Done();
40+
return;
41+
}
42+
43+
Op = op;
44+
Children = predicates;
45+
CanBePushed = true;
46+
47+
TVector<NNodes::TExprBase> exprNodes;
48+
exprNodes.reserve(predicatesSize);
49+
for (auto& pred : predicates) {
50+
exprNodes.emplace_back(pred.ExprNode.Cast());
51+
CanBePushed &= pred.CanBePushed;
52+
}
53+
54+
switch (op) {
55+
case EBoolOp::And:
56+
ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos).Add(exprNodes).Done();
57+
break;
58+
59+
case EBoolOp::Or:
60+
ExprNode = NNodes::Build<NNodes::TCoOr>(ctx, pos).Add(exprNodes).Done();
61+
break;
62+
63+
default:
64+
throw yexception() << "Unsupported operator for predicate node creation: " << static_cast<int>(op);
5365
}
5466
}
5567

ydb/library/yql/providers/common/pushdown/predicate_node.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ struct TPredicateNode {
2323
~TPredicateNode();
2424

2525
bool IsValid() const;
26-
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos);
26+
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op);
2727

2828
NNodes::TMaybeNode<NNodes::TExprBase> ExprNode;
2929
std::vector<TPredicateNode> Children;

ydb/library/yql/providers/common/pushdown/settings.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,15 @@ struct TSettings {
2727
TimestampCtor = 1 << 17,
2828
JustPassthroughOperators = 1 << 18, // if + coalesce + just
2929
InOperator = 1 << 19, // IN()
30-
IsDistinctOperator = 1 << 20 // IS NOT DISTINCT FROM / IS DISTINCT FROM
30+
IsDistinctOperator = 1 << 20, // IS NOT DISTINCT FROM / IS DISTINCT FROM
31+
32+
// Option which enables partial pushdown for sequence of OR
33+
// For example next predicate:
34+
// ($A AND $B) OR ($C AND $D)
35+
// May be partially pushdowned as:
36+
// $A OR $C
37+
// In case of unsupported / complicated expressions $B and $D
38+
SplitOrOperator = 1 << 21
3139
};
3240

3341
explicit TSettings(NLog::EComponent logComponent)

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ namespace NYql {
9797
}
9898

9999
#undef MATCH_ATOM
100+
#undef MATCH_ARITHMETICAL
100101

101102
#define EXPR_NODE_TO_COMPARE_TYPE(TExprNodeType, COMPARE_TYPE) \
102103
if (!opMatched && compare.Maybe<TExprNodeType>()) { \
@@ -118,7 +119,7 @@ namespace NYql {
118119
EXPR_NODE_TO_COMPARE_TYPE(TCoAggrNotEqual, ID);
119120

120121
if (proto->operation() == TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED) {
121-
err << "unknown operation: " << compare.Raw()->Content();
122+
err << "unknown compare operation: " << compare.Raw()->Content();
122123
return false;
123124
}
124125
return SerializeExpression(compare.Left(), proto->mutable_left_value(), arg, err) && SerializeExpression(compare.Right(), proto->mutable_right_value(), arg, err);
@@ -181,7 +182,7 @@ namespace NYql {
181182
} else if (auto maybeAsList = expr.Maybe<TCoAsList>()) {
182183
collection = maybeAsList.Cast().Ptr();
183184
} else {
184-
err << "unknown operation: " << expr.Ref().Content();
185+
err << "unknown source for in: " << expr.Ref().Content();
185186
return false;
186187
}
187188

@@ -195,7 +196,7 @@ namespace NYql {
195196

196197
bool SerializeIsNotDistinctFrom(const TExprBase& predicate, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err, bool invert) {
197198
if (predicate.Ref().ChildrenSize() != 2) {
198-
err << "unknown predicate, expected 2, children size " << predicate.Ref().ChildrenSize();
199+
err << "invalid IsNotDistinctFrom predicate, expected 2 children but got " << predicate.Ref().ChildrenSize();
199200
return false;
200201
}
201202
TPredicate::TComparison* proto = predicateProto->mutable_comparison();
@@ -356,7 +357,7 @@ namespace NYql {
356357

357358
auto left = FormatExpression(expression.left_value());
358359
auto right = FormatExpression(expression.right_value());
359-
return left + operation + right;
360+
return TStringBuilder() << "(" << left << operation << right << ")";
360361
}
361362

362363
TString FormatNegation(const TPredicate_TNegation& negation) {
@@ -524,14 +525,22 @@ namespace NYql {
524525

525526
TString FormatIn(const TPredicate_TIn& in) {
526527
auto value = FormatExpression(in.value());
527-
TString list;
528+
TStringStream list;
528529
for (const auto& expr : in.set()) {
529530
if (!list.empty()) {
530-
list += ",";
531+
list << ", ";
532+
} else {
533+
list << value << " IN (";
531534
}
532-
list += FormatExpression(expr);
535+
list << FormatExpression(expr);
533536
}
534-
return value + " IN (" + list + ")";
537+
538+
if (list.empty()) {
539+
throw yexception() << "failed to format IN statement, no operands";
540+
}
541+
542+
list << ")";
543+
return list.Str();
535544
}
536545

537546
TString FormatPredicate(const TPredicate& predicate, bool topLevel ) {

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,15 @@ namespace {
3030
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
3131
{
3232
using EFlag = NPushdown::TSettings::EFeatureFlag;
33-
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators);
33+
Enable(
34+
// Operator features
35+
EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 |
36+
EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator |
37+
EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators |
38+
39+
// Split features
40+
EFlag::SplitOrOperator
41+
);
3442
}
3543
};
3644

ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
286286
}
287287
}
288288
NPushdown::TPredicateNode predicateToPush;
289-
predicateToPush.SetPredicates(pushable, ctx, pos);
289+
predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op);
290290
return predicateToPush;
291291
}
292292

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ def test_filters_non_optional_field(self, kikimr, client):
302302
client.create_yds_connection(
303303
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
304304
)
305-
self.init_topics("test_filter")
305+
self.init_topics("test_filters_non_optional_field")
306306

307307
sql = Rf'''
308308
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
@@ -334,7 +334,7 @@ def test_filters_optional_field(self, kikimr, client):
334334
client.create_yds_connection(
335335
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
336336
)
337-
self.init_topics("test_filter")
337+
self.init_topics("test_filters_optional_field")
338338

339339
sql = Rf'''
340340
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
@@ -348,12 +348,16 @@ def test_filters_optional_field(self, kikimr, client):
348348
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"')
349349
filter = 'flag'
350350
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `flag`')
351+
filter = 'time * (field2 - field1) != 0'
352+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` * (`field2` - `field1`)) <> 0')
351353
filter = ' event IS NOT DISTINCT FROM "event2"'
352354
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"')
353355
filter = ' event IS DISTINCT FROM "event1"'
354356
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
355357
filter = ' field1 IS DISTINCT FROM field2'
356358
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `field1` IS DISTINCT FROM `field2`')
359+
filter = 'time == 102 OR (field2 IS NOT DISTINCT FROM 1005 AND Random(field1) < 10.0)'
360+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` = 102 OR `field2` IS NOT DISTINCT FROM 1005)')
357361
filter = 'event IN ("event2")'
358362
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
359363
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'

0 commit comments

Comments
 (0)