Skip to content

Commit 25ffb03

Browse files
authored
YQ-3832 RD supported COALESCE and IF pushdown (#11397)
1 parent dc2fcaf commit 25ffb03

File tree

5 files changed

+204
-29
lines changed

5 files changed

+204
-29
lines changed

ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,18 @@ message TPredicate {
369369
repeated TPredicate operands = 1;
370370
}
371371

372+
// "COALESCE($predicate_1, $predicate_2, ..., $predicate_n)"
373+
message TCoalesce {
374+
repeated TPredicate operands = 1;
375+
}
376+
377+
// "IF($predicate, $then_predicate, $else_predicate)"
378+
message TIf {
379+
TPredicate predicate = 1;
380+
TPredicate then_predicate = 2;
381+
TPredicate else_predicate = 3;
382+
}
383+
372384
// "$column BETWEEN $least AND $greatest"
373385
message TBetween {
374386
TExpression value = 1;
@@ -429,6 +441,8 @@ message TPredicate {
429441
TIsNotNull is_not_null = 7;
430442
TComparison comparison = 8;
431443
TBoolExpression bool_expression = 9;
444+
TCoalesce coalesce = 10;
445+
TIf if = 11;
432446
}
433447
}
434448

ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ Y_UNIT_TEST_SUITE_F(PushdownTest, TPushdownFixture) {
372372
R"ast(
373373
(Coalesce
374374
(!= (Member $row '"col_optional_uint64") (Member $row '"col_uint32"))
375-
(Bool '"true")
375+
(Bool '"false")
376376
)
377377
)ast",
378378
R"proto(
@@ -388,6 +388,46 @@ Y_UNIT_TEST_SUITE_F(PushdownTest, TPushdownFixture) {
388388
)proto");
389389
}
390390

391+
Y_UNIT_TEST(TrueCoalesce) {
392+
AssertFilter(
393+
// Note that R"ast()ast" is empty string!
394+
R"ast(
395+
(Coalesce
396+
(!= (Member $row '"col_optional_uint64") (Member $row '"col_uint32"))
397+
(Bool '"true")
398+
)
399+
)ast",
400+
R"proto(
401+
coalesce {
402+
operands {
403+
comparison {
404+
operation: NE
405+
left_value {
406+
column: "col_optional_uint64"
407+
}
408+
right_value {
409+
column: "col_uint32"
410+
}
411+
}
412+
}
413+
operands {
414+
bool_expression {
415+
value {
416+
typed_value {
417+
type {
418+
type_id: BOOL
419+
}
420+
value {
421+
bool_value: true
422+
}
423+
}
424+
}
425+
}
426+
}
427+
}
428+
)proto");
429+
}
430+
391431
Y_UNIT_TEST(CmpInt16AndInt32) {
392432
AssertFilter(
393433
// Note that R"ast()ast" is empty string!
@@ -421,7 +461,7 @@ Y_UNIT_TEST_SUITE_F(PushdownTest, TPushdownFixture) {
421461
(< (Unwrap (/ (Int64 '42) (Member $row '"col_int64"))) (Int64 '10))
422462
(>= (Member $row '"col_uint32") (- (Uint32 '15) (Uint32 '1)))
423463
)
424-
(Bool '"true")
464+
(Bool '"false")
425465
)
426466
)ast",
427467
R"proto(
@@ -515,7 +555,7 @@ Y_UNIT_TEST_SUITE_F(PushdownTest, TPushdownFixture) {
515555
(< (Unwrap (/ (Int64 '42) (Member $row '"col_int64"))) (Int64 '10))
516556
(>= (Member $row '"col_uint32") (Uint32 '15))
517557
)
518-
(Bool '"true")
558+
(Bool '"false")
519559
)
520560
)ast",
521561
R"proto(
@@ -600,7 +640,7 @@ Y_UNIT_TEST_SUITE_F(PushdownTest, TPushdownFixture) {
600640
(Member $row '"col_utf8")
601641
(Member $row '"col_optional_utf8")
602642
)
603-
(Bool '"true")
643+
(Bool '"false")
604644
)
605645
)ast");
606646
}

ydb/library/yql/providers/generic/provider/yql_generic_predicate_pushdown.cpp

Lines changed: 102 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ namespace NYql {
6969
}
7070

7171
// data
72+
MATCH_ATOM(Bool, BOOL, bool, bool);
7273
MATCH_ATOM(Int8, INT8, int32, i8);
7374
MATCH_ATOM(Uint8, UINT8, uint32, ui8);
7475
MATCH_ATOM(Int16, INT16, int32, i16);
@@ -125,17 +126,41 @@ namespace NYql {
125126

126127
#undef EXPR_NODE_TO_COMPARE_TYPE
127128

128-
bool SerializeCoalesce(const TCoCoalesce& coalesce, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
129-
auto predicate = coalesce.Predicate();
130-
if (auto compare = predicate.Maybe<TCoCompare>()) {
131-
return SerializeCompare(compare.Cast(), proto, arg, err);
132-
}
129+
bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, ui64 depth);
133130

134-
err << "unknown coalesce predicate: " << predicate.Raw()->Content();
135-
return false;
131+
bool SerializeSqlIf(const TCoIf& sqlIf, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, ui64 depth) {
132+
auto* dstProto = proto->mutable_if_();
133+
return SerializePredicate(TExprBase(sqlIf.Predicate()), dstProto->mutable_predicate(), arg, err, depth + 1)
134+
&& SerializePredicate(TExprBase(sqlIf.ThenValue()), dstProto->mutable_then_predicate(), arg, err, depth + 1)
135+
&& SerializePredicate(TExprBase(sqlIf.ElseValue()), dstProto->mutable_else_predicate(), arg, err, depth + 1);
136136
}
137137

138-
bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err);
138+
bool SerializeCoalesce(const TCoCoalesce& coalesce, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, ui64 depth) {
139+
// Special case for top level COALESCE: COALESCE(Predicat, FALSE)
140+
// We can assume NULL as FALSE and skip COALESCE
141+
if (depth == 0) {
142+
auto value = coalesce.Value().Maybe<TCoBool>();
143+
if (value && TStringBuf(value.Cast().Literal()) == "false"sv) {
144+
return SerializePredicate(TExprBase(coalesce.Predicate()), proto, arg, err, 0);
145+
}
146+
}
147+
148+
auto* dstProto = proto->mutable_coalesce();
149+
for (const auto& child : coalesce.Ptr()->Children()) {
150+
if (!SerializePredicate(TExprBase(child), dstProto->add_operands(), arg, err, depth + 1)) {
151+
return false;
152+
}
153+
154+
// We can unwrap nested COALESCE:
155+
// COALESCE(..., COALESCE(Predicat_1, Predicat_2), ...) -> COALESCE(..., Predicat_1, Predicat_2, ...)
156+
if (dstProto->operands().rbegin()->has_coalesce()) {
157+
auto coalesceOperands = std::move(*dstProto->mutable_operands()->rbegin()->mutable_coalesce()->mutable_operands());
158+
dstProto->mutable_operands()->RemoveLast();
159+
dstProto->mutable_operands()->Add(coalesceOperands.begin(), coalesceOperands.end());
160+
}
161+
}
162+
return true;
163+
}
139164

140165
bool SerializeExists(const TCoExists& exists, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, bool withNot = false) {
141166
auto* expressionProto = withNot ? proto->mutable_is_null()->mutable_value() : proto->mutable_is_not_null()->mutable_value();
@@ -168,54 +193,54 @@ namespace NYql {
168193
return true;
169194
}
170195

171-
bool SerializeAnd(const TCoAnd& andExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
196+
bool SerializeAnd(const TCoAnd& andExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, ui64 depth) {
172197
auto* dstProto = proto->mutable_conjunction();
173198
for (const auto& child : andExpr.Ptr()->Children()) {
174-
if (!SerializePredicate(TExprBase(child), dstProto->add_operands(), arg, err)) {
199+
if (!SerializePredicate(TExprBase(child), dstProto->add_operands(), arg, err, depth + 1)) {
175200
return false;
176201
}
177202
}
178203
return true;
179204
}
180205

181-
bool SerializeOr(const TCoOr& orExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
206+
bool SerializeOr(const TCoOr& orExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, ui64 depth) {
182207
auto* dstProto = proto->mutable_disjunction();
183208
for (const auto& child : orExpr.Ptr()->Children()) {
184-
if (!SerializePredicate(TExprBase(child), dstProto->add_operands(), arg, err)) {
209+
if (!SerializePredicate(TExprBase(child), dstProto->add_operands(), arg, err, depth + 1)) {
185210
return false;
186211
}
187212
}
188213
return true;
189214
}
190215

191-
bool SerializeNot(const TCoNot& notExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
216+
bool SerializeNot(const TCoNot& notExpr, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, ui64 depth) {
192217
// Special case: (Not (Exists ...))
193218
if (auto exists = notExpr.Value().Maybe<TCoExists>()) {
194219
return SerializeExists(exists.Cast(), proto, arg, err, true);
195220
}
196221
auto* dstProto = proto->mutable_negation();
197-
return SerializePredicate(notExpr.Value(), dstProto->mutable_operand(), arg, err);
222+
return SerializePredicate(notExpr.Value(), dstProto->mutable_operand(), arg, err, depth + 1);
198223
}
199224

200225
bool SerializeMember(const TCoMember& member, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
201226
return SerializeMember(member, proto->mutable_bool_expression()->mutable_value(), arg, err);
202227
}
203228

204-
bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err) {
229+
bool SerializePredicate(const TExprBase& predicate, TPredicate* proto, const TCoArgument& arg, TStringBuilder& err, ui64 depth) {
205230
if (auto compare = predicate.Maybe<TCoCompare>()) {
206231
return SerializeCompare(compare.Cast(), proto, arg, err);
207232
}
208233
if (auto coalesce = predicate.Maybe<TCoCoalesce>()) {
209-
return SerializeCoalesce(coalesce.Cast(), proto, arg, err);
234+
return SerializeCoalesce(coalesce.Cast(), proto, arg, err, depth);
210235
}
211236
if (auto andExpr = predicate.Maybe<TCoAnd>()) {
212-
return SerializeAnd(andExpr.Cast(), proto, arg, err);
237+
return SerializeAnd(andExpr.Cast(), proto, arg, err, depth);
213238
}
214239
if (auto orExpr = predicate.Maybe<TCoOr>()) {
215-
return SerializeOr(orExpr.Cast(), proto, arg, err);
240+
return SerializeOr(orExpr.Cast(), proto, arg, err, depth);
216241
}
217242
if (auto notExpr = predicate.Maybe<TCoNot>()) {
218-
return SerializeNot(notExpr.Cast(), proto, arg, err);
243+
return SerializeNot(notExpr.Cast(), proto, arg, err, depth);
219244
}
220245
if (auto member = predicate.Maybe<TCoMember>()) {
221246
return SerializeMember(member.Cast(), proto, arg, err);
@@ -226,9 +251,16 @@ namespace NYql {
226251
if (auto sqlIn = predicate.Maybe<TCoSqlIn>()) {
227252
return SerializeSqlIn(sqlIn.Cast(), proto, arg, err);
228253
}
254+
if (auto sqlIf = predicate.Maybe<TCoIf>()) {
255+
return SerializeSqlIf(sqlIf.Cast(), proto, arg, err, depth);
256+
}
257+
if (auto just = predicate.Maybe<TCoJust>()) {
258+
return SerializePredicate(TExprBase(just.Cast().Input()), proto, arg, err, depth + 1);
259+
}
229260

230-
err << "unknown predicate: " << predicate.Raw()->Content();
231-
return false;
261+
// Try to serialize predicate as boolean expression
262+
// For example single bool value TRUE in COALESCE or IF
263+
return SerializeExpression(predicate, proto->mutable_bool_expression()->mutable_value(), arg, err);
232264
}
233265
}
234266

@@ -239,7 +271,7 @@ namespace NYql {
239271
TString FormatValue(const Ydb::TypedValue& value) {
240272
switch (value.value().value_case()) {
241273
case Ydb::Value::kBoolValue:
242-
return ToString(value.value().bool_value());
274+
return value.value().bool_value() ? "TRUE" : "FALSE";
243275
case Ydb::Value::kInt32Value:
244276
return ToString(value.value().int32_value());
245277
case Ydb::Value::kUint32Value:
@@ -383,6 +415,49 @@ namespace NYql {
383415
return stream.Str();
384416
}
385417

418+
TString FormatCoalesce(const TPredicate::TCoalesce& coalesce) {
419+
TStringStream stream;
420+
TString first;
421+
ui32 cnt = 0;
422+
423+
for (const auto& predicate : coalesce.operands()) {
424+
auto statement = FormatPredicate(predicate, false);
425+
426+
if (cnt > 0) {
427+
if (cnt == 1) {
428+
stream << "COALESCE(";
429+
stream << first;
430+
}
431+
432+
stream << ", ";
433+
stream << statement;
434+
} else {
435+
first = statement;
436+
}
437+
cnt++;
438+
}
439+
440+
if (cnt == 0) {
441+
throw yexception() << "failed to format COALESCE statement: no operands";
442+
}
443+
444+
if (cnt == 1) {
445+
stream << first;
446+
} else {
447+
stream << ")";
448+
}
449+
450+
return stream.Str();
451+
}
452+
453+
TString FormatIf(const TPredicate::TIf& sqlIf) {
454+
TStringStream stream;
455+
stream << "IF(" << FormatPredicate(sqlIf.predicate(), false);
456+
stream << ", " << FormatPredicate(sqlIf.then_predicate(), false);
457+
stream << ", " << FormatPredicate(sqlIf.else_predicate(), false) << ")";
458+
return stream.Str();
459+
}
460+
386461
TString FormatIsNull(const TPredicate_TIsNull& isNull) {
387462
auto statement = FormatExpression(isNull.value());
388463
return "(" + statement + " IS NULL)";
@@ -453,6 +528,10 @@ namespace NYql {
453528
return FormatConjunction(predicate.conjunction(), topLevel);
454529
case TPredicate::kDisjunction:
455530
return FormatDisjunction(predicate.disjunction());
531+
case TPredicate::kCoalesce:
532+
return FormatCoalesce(predicate.coalesce());
533+
case TPredicate::kIf:
534+
return FormatIf(predicate.if_());
456535
case TPredicate::kIsNull:
457536
return FormatIsNull(predicate.is_null());
458537
case TPredicate::kIsNotNull:
@@ -477,7 +556,7 @@ namespace NYql {
477556
}
478557

479558
bool SerializeFilterPredicate(const TCoLambda& predicate, TPredicate* proto, TStringBuilder& err) {
480-
return SerializePredicate(predicate.Body(), proto, predicate.Args().Arg(0), err);
559+
return SerializePredicate(predicate.Body(), proto, predicate.Args().Arg(0), err, 0);
481560
}
482561

483562
TString FormatWhere(const TPredicate& predicate) {

ydb/library/yql/providers/pq/provider/yql_pq_logical_opt.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace {
3030
: NPushdown::TSettings(NLog::EComponent::ProviderGeneric)
3131
{
3232
using EFlag = NPushdown::TSettings::EFeatureFlag;
33-
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator);
33+
Enable(EFlag::ExpressionAsPredicate | EFlag::ArithmeticalExpressions | EFlag::ImplicitConversionToInt64 | EFlag::StringTypes | EFlag::LikeOperator | EFlag::DoNotCheckCompareArgumentsTypes | EFlag::InOperator | EFlag::JustPassthroughOperators);
3434
}
3535
};
3636

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ def test_nested_types_without_predicate(self, kikimr, client):
299299
stop_yds_query(client, query_id)
300300

301301
@yq_v1
302-
def test_filters(self, kikimr, client):
302+
def test_filters_non_optional_field(self, kikimr, client):
303303
client.create_yds_connection(
304304
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
305305
)
@@ -325,6 +325,48 @@ def test_filters(self, kikimr, client):
325325
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
326326
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
327327
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"1\\"')
328+
filter = ' event IS DISTINCT FROM data AND event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
329+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`event` IS DISTINCT FROM `data` AND `event` IN (\\"1\\"')
330+
filter = ' IF(event = "event2", event IS DISTINCT FROM data, FALSE)'
331+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE IF(`event` = \\"event2\\", `event` IS DISTINCT FROM `data`, FALSE)')
332+
333+
@yq_v1
334+
def test_filters_optional_field(self, kikimr, client):
335+
client.create_yds_connection(
336+
YDS_CONNECTION, os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"), shared_reading=True
337+
)
338+
self.init_topics("test_filter")
339+
340+
sql = Rf'''
341+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
342+
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
343+
WITH (format=json_each_row, SCHEMA (time UInt64 NOT NULL, data String, event String, flag Bool, field1 UInt8, field2 Int64)) WHERE '''
344+
data = [
345+
'{"time": 101, "data": "hello1", "event": "event1", "flag": false, "field1": 5, "field2": 5}',
346+
'{"time": 102, "data": "hello2", "event": "event2", "flag": true, "field1": 5, "field2": 1005}']
347+
expected = ['102']
348+
filter = 'data = "hello2"'
349+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `data` = \\"hello2\\"')
350+
filter = 'flag'
351+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `flag`')
352+
# filter = ' event IS NOT DISTINCT FROM "event2"'
353+
# self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS NOT DISTINCT FROM \\"event2\\"')
354+
# filter = ' event IS DISTINCT FROM "event1"'
355+
# self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IS DISTINCT FROM \\"event1\\"')
356+
# filter = ' field1 IS DISTINCT FROM field2'
357+
# self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `field1` IS DISTINCT FROM `field2`')
358+
filter = 'event IN ("event2")'
359+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"event2\\")')
360+
filter = 'event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
361+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE `event` IN (\\"1\\"')
362+
# filter = ' event IS DISTINCT FROM data AND event IN ("1", "2", "3", "4", "5", "6", "7", "event2")'
363+
# self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE (`event` IS DISTINCT FROM `data` AND `event` IN (\\"1\\"')
364+
# filter = ' IF(event == "event2", event IS DISTINCT FROM data, FALSE)'
365+
# self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE IF(`event` == "event2", `event` IS DISTINCT FROM `data`, FALSE)')
366+
filter = ' COALESCE(event = "event2", TRUE)'
367+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE COALESCE(`event` = \\"event2\\", TRUE)')
368+
filter = ' COALESCE(event = "event2", data = "hello2", TRUE)'
369+
self.run_and_check(kikimr, client, sql + filter, data, expected, 'predicate: WHERE COALESCE(`event` = \\"event2\\", `data` = \\"hello2\\", TRUE)')
328370

329371
@yq_v1
330372
def test_filter_missing_fields(self, kikimr, client):

0 commit comments

Comments
 (0)