Skip to content

Commit 00bc077

Browse files
committed
Fix filter pushdown over Aggregate with session/hopping window
commit_hash:3d4989f7a92e9b330cd4b19f1850cdca544a1d64
1 parent 038c81c commit 00bc077

File tree

4 files changed

+51
-18
lines changed

4 files changed

+51
-18
lines changed

yql/essentials/core/common_opt/yql_co_flow2.cpp

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,7 @@ bool AllowComplexFiltersOverAggregatePushdown(const TOptimizeContext& optCtx) {
3131
optCtx.Types->MaxAggPushdownPredicates > 0;
3232
}
3333

34-
TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprContext& ctx, const TParentsMap& parentsMap) {
35-
auto inputType = node.Input().Ref().GetTypeAnn();
36-
auto structType = inputType->GetKind() == ETypeAnnotationKind::List
37-
? inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>()
38-
: inputType->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>();
39-
40-
if (structType->GetSize() == 0) {
41-
return node.Ptr();
42-
}
43-
34+
THashSet<TStringBuf> GetAggregationInputKeys(const TCoAggregate& node) {
4435
TMaybe<TStringBuf> sessionColumn;
4536
const auto sessionSetting = GetSetting(node.Settings().Ref(), "session");
4637
if (sessionSetting) {
@@ -58,13 +49,28 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
5849
}
5950
}
6051

61-
TSet<TStringBuf> usedFields;
52+
THashSet<TStringBuf> result;
6253
for (const auto& x : node.Keys()) {
6354
if (x.Value() != sessionColumn && x.Value() != hoppingColumn) {
64-
usedFields.insert(x.Value());
55+
result.insert(x.Value());
6556
}
6657
}
6758

59+
return result;
60+
}
61+
62+
TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprContext& ctx, const TParentsMap& parentsMap) {
63+
auto inputType = node.Input().Ref().GetTypeAnn();
64+
auto structType = inputType->GetKind() == ETypeAnnotationKind::List
65+
? inputType->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>()
66+
: inputType->Cast<TStreamExprType>()->GetItemType()->Cast<TStructExprType>();
67+
68+
if (structType->GetSize() == 0) {
69+
return node.Ptr();
70+
}
71+
72+
THashSet<TStringBuf> usedFields = GetAggregationInputKeys(node);
73+
6874
if (usedFields.size() == structType->GetSize()) {
6975
return node.Ptr();
7076
}
@@ -96,7 +102,7 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
96102
}
97103
}
98104

99-
if (hoppingSetting) {
105+
if (auto hoppingSetting = GetSetting(node.Settings().Ref(), "hopping")) {
100106
auto traitsNode = hoppingSetting->ChildPtr(1);
101107
if (traitsNode->IsList()) {
102108
traitsNode = traitsNode->ChildPtr(1);
@@ -120,7 +126,7 @@ TExprNode::TPtr AggregateSubsetFieldsAnalyzer(const TCoAggregate& node, TExprCon
120126
}
121127
}
122128

123-
if (sessionSetting) {
129+
if (auto sessionSetting = GetSetting(node.Settings().Ref(), "session")) {
124130
TCoSessionWindowTraits traits(sessionSetting->Child(1)->ChildPtr(1));
125131

126132
auto usedType = traits.ListType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TListExprType>()->
@@ -1326,10 +1332,7 @@ TExprBase FilterOverAggregate(const TCoFlatMapBase& node, TExprContext& ctx, TOp
13261332
TCoConditionalValueBase body = node.Lambda().Body().Cast<TCoConditionalValueBase>();
13271333

13281334
const TCoAggregate agg = node.Input().Cast<TCoAggregate>();
1329-
THashSet<TStringBuf> keyColumns;
1330-
for (auto key : agg.Keys()) {
1331-
keyColumns.insert(key.Value());
1332-
}
1335+
const THashSet<TStringBuf> keyColumns = GetAggregationInputKeys(agg);
13331336

13341337
TExprNodeList andComponents;
13351338
if (auto maybeAnd = body.Predicate().Maybe<TCoAnd>()) {

yql/essentials/tests/sql/sql2yql/canondata/result.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2463,6 +2463,13 @@
24632463
"uri": "https://{canondata_backend}/1924537/1ab444909086b08bd4fe21c5a43f5e183c647e0a/resource.tar.gz#test_sql2yql.test_aggregate-group_by_session_extended_tuple_/sql.yql"
24642464
}
24652465
],
2466+
"test_sql2yql.test[aggregate-group_by_session_nopush]": [
2467+
{
2468+
"checksum": "09f9e4a178067f6aaa81b9e9959b4cec",
2469+
"size": 3177,
2470+
"uri": "https://{canondata_backend}/212715/fe819b0081800cfcbf6e2512d273e760949a6cc7/resource.tar.gz#test_sql2yql.test_aggregate-group_by_session_nopush_/sql.yql"
2471+
}
2472+
],
24662473
"test_sql2yql.test[aggregate-group_by_session_only]": [
24672474
{
24682475
"checksum": "0c22dd1ef887ea533c6e0621c0937ffa",
@@ -22322,6 +22329,13 @@
2232222329
"uri": "https://{canondata_backend}/1880306/64654158d6bfb1289c66c626a8162239289559d0/resource.tar.gz#test_sql_format.test_aggregate-group_by_session_extended_tuple_/formatted.sql"
2232322330
}
2232422331
],
22332+
"test_sql_format.test[aggregate-group_by_session_nopush]": [
22333+
{
22334+
"checksum": "382f93f1c899dd2d1d5ea6b04575cfef",
22335+
"size": 372,
22336+
"uri": "https://{canondata_backend}/212715/fe819b0081800cfcbf6e2512d273e760949a6cc7/resource.tar.gz#test_sql_format.test_aggregate-group_by_session_nopush_/formatted.sql"
22337+
}
22338+
],
2232522339
"test_sql_format.test[aggregate-group_by_session_only]": [
2232622340
{
2232722341
"checksum": "531ee77369e54e2a1616411e89c86bb7",
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
in Input session1.txt
2+
providers yt
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/* syntax version 1 */
2+
/* postgres can not */
3+
/* yt can not */
4+
5+
SELECT * FROM (
6+
SELECT
7+
user,
8+
cast(session_start as Int64) as ss,
9+
ListSort(AGGREGATE_LIST(ts)) as session,
10+
COUNT(1) as session_len
11+
FROM plato.Input
12+
GROUP BY SessionWindow(ts, 10) as session_start, user
13+
)
14+
WHERE ss != 100500; -- should not push down

0 commit comments

Comments
 (0)