Skip to content

Commit e31118f

Browse files
authored
YQ-3846 RD support OR split during pushdown (#11439)
1 parent 6c7ed0d commit e31118f

File tree

8 files changed

+84
-40
lines changed

8 files changed

+84
-40
lines changed

ydb/library/yql/providers/common/pushdown/physical_opt.cpp

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,29 @@ using namespace NNodes;
1111

1212
namespace {
1313

14-
TPredicateNode SplitForPartialPushdown(
15-
const NPushdown::TPredicateNode& predicateTree,
16-
TExprContext& ctx,
17-
TPositionHandle pos) {
14+
TPredicateNode SplitForPartialPushdown(const NPushdown::TPredicateNode& predicateTree, TExprContext& ctx, TPositionHandle pos, const TSettings& settings) {
1815
if (predicateTree.CanBePushed) {
1916
return predicateTree;
2017
}
2118

22-
if (predicateTree.Op != NPushdown::EBoolOp::And) {
23-
return NPushdown::TPredicateNode(); // Not valid, => return the same node from optimizer
19+
if (predicateTree.Op != NPushdown::EBoolOp::And && (!settings.IsEnabled(TSettings::EFeatureFlag::SplitOrOperator) || predicateTree.Op != NPushdown::EBoolOp::Or)) {
20+
// Predicate can't be split, so return invalid value and skip this branch
21+
return NPushdown::TPredicateNode();
2422
}
2523

2624
std::vector<NPushdown::TPredicateNode> pushable;
2725
for (auto& predicate : predicateTree.Children) {
28-
if (predicate.CanBePushed) {
29-
pushable.emplace_back(predicate);
26+
NPushdown::TPredicateNode pushablePredicate = SplitForPartialPushdown(predicate, ctx, pos, settings);
27+
if (pushablePredicate.IsValid()) {
28+
pushable.emplace_back(pushablePredicate);
29+
} else if (predicateTree.Op == NPushdown::EBoolOp::Or) {
30+
// One of the OR branch was invalid, so the whole predicate is invalid
31+
return NPushdown::TPredicateNode();
3032
}
3133
}
34+
3235
NPushdown::TPredicateNode predicateToPush;
33-
predicateToPush.SetPredicates(pushable, ctx, pos);
36+
predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op);
3437
return predicateToPush;
3538
}
3639

@@ -51,7 +54,7 @@ TMaybeNode<TCoLambda> MakePushdownPredicate(const TCoLambda& lambda, TExprContex
5154
NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, lambdaArg.Get(), TExprBase(lambdaArg), settings);
5255
YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid");
5356

54-
NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos);
57+
NPushdown::TPredicateNode predicateToPush = SplitForPartialPushdown(predicateTree, ctx, pos, settings);
5558
if (!predicateToPush.IsValid()) {
5659
return {};
5760
}

ydb/library/yql/providers/common/pushdown/predicate_node.cpp

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,38 @@ bool TPredicateNode::IsValid() const {
3030
return res && ExprNode.IsValid();
3131
}
3232

33-
void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos) {
33+
void TPredicateNode::SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op) {
3434
auto predicatesSize = predicates.size();
3535
if (predicatesSize == 0) {
3636
return;
37-
} else if (predicatesSize == 1) {
37+
}
38+
if (predicatesSize == 1) {
3839
*this = predicates[0];
39-
} else {
40-
Op = EBoolOp::And;
41-
Children = predicates;
42-
CanBePushed = true;
43-
44-
TVector<NNodes::TExprBase> exprNodes;
45-
exprNodes.reserve(predicatesSize);
46-
for (auto& pred : predicates) {
47-
exprNodes.emplace_back(pred.ExprNode.Cast());
48-
CanBePushed &= pred.CanBePushed;
49-
}
50-
ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos)
51-
.Add(exprNodes)
52-
.Done();
40+
return;
41+
}
42+
43+
Op = op;
44+
Children = predicates;
45+
CanBePushed = true;
46+
47+
TVector<NNodes::TExprBase> exprNodes;
48+
exprNodes.reserve(predicatesSize);
49+
for (auto& pred : predicates) {
50+
exprNodes.emplace_back(pred.ExprNode.Cast());
51+
CanBePushed &= pred.CanBePushed;
52+
}
53+
54+
switch (op) {
55+
case EBoolOp::And:
56+
ExprNode = NNodes::Build<NNodes::TCoAnd>(ctx, pos).Add(exprNodes).Done();
57+
break;
58+
59+
case EBoolOp::Or:
60+
ExprNode = NNodes::Build<NNodes::TCoOr>(ctx, pos).Add(exprNodes).Done();
61+
break;
62+
63+
default:
64+
throw yexception() << "Unsupported operator for predicate node creation: " << static_cast<int>(op);
5365
}
5466
}
5567

ydb/library/yql/providers/common/pushdown/predicate_node.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ struct TPredicateNode {
2323
~TPredicateNode();
2424

2525
bool IsValid() const;
26-
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos);
26+
void SetPredicates(const std::vector<TPredicateNode>& predicates, TExprContext& ctx, TPositionHandle pos, EBoolOp op);
2727

2828
NNodes::TMaybeNode<NNodes::TExprBase> ExprNode;
2929
std::vector<TPredicateNode> Children;

ydb/library/yql/providers/common/pushdown/settings.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,15 @@ struct TSettings {
2727
TimestampCtor = 1 << 17,
2828
JustPassthroughOperators = 1 << 18, // if + coalesce + just
2929
InOperator = 1 << 19, // IN()
30-
IsDistinctOperator = 1 << 20 // IS NOT DISTINCT FROM / IS DISTINCT FROM
30+
IsDistinctOperator = 1 << 20, // IS NOT DISTINCT FROM / IS DISTINCT FROM
31+
32+
// Option which enables partial pushdown for sequence of OR
33+
// For example next predicate:
34+
// ($A AND $B) OR ($C AND $D)
35+
// May be partially pushdowned as:
36+
// $A OR $C
37+
// In case of unsupported / complicated expressions $B and $D
38+
SplitOrOperator = 1 << 21
3139
};
3240

3341
explicit TSettings(NLog::EComponent logComponent)

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ namespace NYql {
9797
}
9898

9999
#undef MATCH_ATOM
100+
#undef MATCH_ARITHMETICAL
100101

101102
#define EXPR_NODE_TO_COMPARE_TYPE(TExprNodeType, COMPARE_TYPE) \
102103
if (!opMatched && compare.Maybe<TExprNodeType>()) { \
@@ -118,7 +119,7 @@ namespace NYql {
118119
EXPR_NODE_TO_COMPARE_TYPE(TCoAggrNotEqual, ID);
119120

120121
if (proto->operation() == TPredicate::TComparison::COMPARISON_OPERATION_UNSPECIFIED) {
121-
err << "unknown operation: " << compare.Raw()->Content();
122+
err << "unknown compare operation: " << compare.Raw()->Content();
122123
return false;
123124
}
124125
return SerializeExpression(compare.Left(), proto->mutable_left_value(), arg, err) && SerializeExpression(compare.Right(), proto->mutable_right_value(), arg, err);
@@ -181,7 +182,7 @@ namespace NYql {
181182
} else if (auto maybeAsList = expr.Maybe<TCoAsList>()) {
182183
collection = maybeAsList.Cast().Ptr();
183184
} else {
184-
err << "unknown operation: " << expr.Ref().Content();
185+
err << "unknown source for in: " << expr.Ref().Content();
185186
return false;
186187
}
187188

@@ -195,7 +196,7 @@ namespace NYql {
195196

196197
bool SerializeIsNotDistinctFrom(const TExprBase& predicate, TPredicate* predicateProto, const TCoArgument& arg, TStringBuilder& err, bool invert) {
197198
if (predicate.Ref().ChildrenSize() != 2) {
198-
err << "unknown predicate, expected 2, children size " << predicate.Ref().ChildrenSize();
199+
err << "invalid IsNotDistinctFrom predicate, expected 2 children but got " << predicate.Ref().ChildrenSize();
199200
return false;
200201
}
201202
TPredicate::TComparison* proto = predicateProto->mutable_comparison();
@@ -356,7 +357,7 @@ namespace NYql {
356357

357358
auto left = FormatExpression(expression.left_value());
358359
auto right = FormatExpression(expression.right_value());
359-
return left + operation + right;
360+
return TStringBuilder() << "(" << left << operation << right << ")";
360361
}
361362

362363
TString FormatNegation(const TPredicate_TNegation& negation) {
@@ -525,14 +526,22 @@ namespace NYql {
525526

526527
TString FormatIn(const TPredicate_TIn& in) {
527528
auto value = FormatExpression(in.value());
528-
TString list;
529+
TStringStream list;
529530
for (const auto& expr : in.set()) {
530531
if (!list.empty()) {
531-
list += ",";
532+
list << ", ";
533+
} else {
534+
list << value << " IN (";
532535
}
533-
list += FormatExpression(expr);
536+
list << FormatExpression(expr);
534537
}
535-
return value + " IN (" + list + ")";
538+
539+
if (list.empty()) {
540+
throw yexception() << "failed to format IN statement, no operands";
541+
}
542+
543+
list << ")";
544+
return list.Str();
536545
}
537546

538547
TString FormatPredicate(const TPredicate& predicate, bool topLevel ) {

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,15 @@ namespace {
3030
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
3131
{
3232
using EFlag = NPushdown::TSettings::EFeatureFlag;
33-
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators);
33+
Enable(
34+
// Operator features
35+
EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 |
36+
EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator |
37+
EFlag::IsDistinctOperator | EFlag::JustPassthroughOperators |
38+
39+
// Split features
40+
EFlag::SplitOrOperator
41+
);
3442
}
3543
};
3644

ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class TS3LogicalOptProposalTransformer : public TOptimizeTransformerBase {
286286
}
287287
}
288288
NPushdown::TPredicateNode predicateToPush;
289-
predicateToPush.SetPredicates(pushable, ctx, pos);
289+
predicateToPush.SetPredicates(pushable, ctx, pos, predicateTree.Op);
290290
return predicateToPush;
291291
}
292292

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def test_filters_non_optional_field(self, kikimr, client):
303303
client.create_yds_connection(
304304
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
305305
)
306-
self.init_topics("test_filter")
306+
self.init_topics("test_filters_non_optional_field")
307307

308308
sql = Rf'''
309309
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
@@ -335,7 +335,7 @@ def test_filters_optional_field(self, kikimr, client):
335335
client.create_yds_connection(
336336
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
337337
)
338-
self.init_topics("test_filter")
338+
self.init_topics("test_filters_optional_field")
339339

340340
sql = Rf'''
341341
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
@@ -349,12 +349,16 @@ def test_filters_optional_field(self, kikimr, client):
349349
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"')
350350
filter = 'flag'
351351
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `flag`')
352+
filter = 'time * (field2 - field1) != 0'
353+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` * (`field2` - `field1`)) <> 0')
352354
filter = ' event IS NOT DISTINCT FROM "event2"'
353355
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"')
354356
filter = ' event IS DISTINCT FROM "event1"'
355357
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
356358
filter = ' field1 IS DISTINCT FROM field2'
357359
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `field1` IS DISTINCT FROM `field2`')
360+
filter = 'time == 102 OR (field2 IS NOT DISTINCT FROM 1005 AND Random(field1) < 10.0)'
361+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`time` = 102 OR `field2` IS NOT DISTINCT FROM 1005)')
358362
filter = 'event IN ("event2")'
359363
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
360364
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'

0 commit comments

Comments
 (0)