Skip to content

Commit e0cdaff

Browse files
authored
Merge pull request #9210 from xudong963/limit_push_down_union
feat: push down `limit` to `union`
2 parents e0f2485 + 5663f3d commit e0cdaff

File tree

7 files changed

+278
-73
lines changed

7 files changed

+278
-73
lines changed

src/query/sql/src/planner/optimizer/heuristic/heuristic.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub static DEFAULT_REWRITE_RULES: Lazy<Vec<RuleID>> = Lazy::new(|| {
3939
RuleID::MergeFilter,
4040
RuleID::MergeEvalScalar,
4141
RuleID::PushDownFilterUnion,
42+
RuleID::PushDownLimitUnion,
4243
RuleID::PushDownLimitSort,
4344
RuleID::PushDownLimitOuterJoin,
4445
RuleID::PushDownLimitScan,

src/query/sql/src/planner/optimizer/rule/factory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::optimizer::rule::rewrite::RulePushDownFilterUnion;
3131
use crate::optimizer::rule::rewrite::RulePushDownLimitOuterJoin;
3232
use crate::optimizer::rule::rewrite::RulePushDownLimitScan;
3333
use crate::optimizer::rule::rewrite::RulePushDownLimitSort;
34+
use crate::optimizer::rule::rewrite::RulePushDownLimitUnion;
3435
use crate::optimizer::rule::rewrite::RulePushDownSortScan;
3536
use crate::optimizer::rule::rewrite::RuleSplitAggregate;
3637
use crate::optimizer::rule::rule_implement_get::RuleImplementGet;
@@ -54,6 +55,7 @@ impl RuleFactory {
5455
RuleID::PushDownFilterEvalScalar => Ok(Box::new(RulePushDownFilterEvalScalar::new())),
5556
RuleID::PushDownFilterJoin => Ok(Box::new(RulePushDownFilterJoin::new())),
5657
RuleID::PushDownFilterScan => Ok(Box::new(RulePushDownFilterScan::new())),
58+
RuleID::PushDownLimitUnion => Ok(Box::new(RulePushDownLimitUnion::new())),
5759
RuleID::PushDownLimitScan => Ok(Box::new(RulePushDownLimitScan::new())),
5860
RuleID::PushDownSortScan => Ok(Box::new(RulePushDownSortScan::new())),
5961
RuleID::PushDownLimitOuterJoin => Ok(Box::new(RulePushDownLimitOuterJoin::new())),

src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod rule_push_down_filter_union;
2626
mod rule_push_down_limit_join;
2727
mod rule_push_down_limit_scan;
2828
mod rule_push_down_limit_sort;
29+
mod rule_push_down_limit_union;
2930
mod rule_push_down_sort_scan;
3031
mod rule_split_aggregate;
3132

@@ -44,5 +45,6 @@ pub use rule_push_down_filter_union::RulePushDownFilterUnion;
4445
pub use rule_push_down_limit_join::RulePushDownLimitOuterJoin;
4546
pub use rule_push_down_limit_scan::RulePushDownLimitScan;
4647
pub use rule_push_down_limit_sort::RulePushDownLimitSort;
48+
pub use rule_push_down_limit_union::RulePushDownLimitUnion;
4749
pub use rule_push_down_sort_scan::RulePushDownSortScan;
4850
pub use rule_split_aggregate::RuleSplitAggregate;
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::cmp;
16+
17+
use common_exception::Result;
18+
19+
use crate::optimizer::rule::Rule;
20+
use crate::optimizer::rule::TransformResult;
21+
use crate::optimizer::RuleID;
22+
use crate::optimizer::SExpr;
23+
use crate::plans::Limit;
24+
use crate::plans::PatternPlan;
25+
use crate::plans::RelOp;
26+
use crate::plans::UnionAll;
27+
28+
pub struct RulePushDownLimitUnion {
29+
id: RuleID,
30+
pattern: SExpr,
31+
}
32+
33+
impl RulePushDownLimitUnion {
34+
pub fn new() -> Self {
35+
Self {
36+
id: RuleID::PushDownLimitUnion,
37+
// Limit
38+
// \
39+
// UnionAll
40+
// / \
41+
// ... ...
42+
pattern: SExpr::create_unary(
43+
PatternPlan {
44+
plan_type: RelOp::Limit,
45+
}
46+
.into(),
47+
SExpr::create_binary(
48+
PatternPlan {
49+
plan_type: RelOp::UnionAll,
50+
}
51+
.into(),
52+
SExpr::create_leaf(
53+
PatternPlan {
54+
plan_type: RelOp::Pattern,
55+
}
56+
.into(),
57+
),
58+
SExpr::create_leaf(
59+
PatternPlan {
60+
plan_type: RelOp::Pattern,
61+
}
62+
.into(),
63+
),
64+
),
65+
),
66+
}
67+
}
68+
}
69+
70+
impl Rule for RulePushDownLimitUnion {
71+
fn id(&self) -> RuleID {
72+
self.id
73+
}
74+
75+
fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
76+
let limit: Limit = s_expr.plan().clone().try_into()?;
77+
let union_s_expr = s_expr.child(0)?;
78+
let union: UnionAll = union_s_expr.plan().clone().try_into()?;
79+
80+
if limit.limit.is_none() {
81+
return Ok(());
82+
}
83+
// Create limit which will be pushed down
84+
let limit_offset = limit.limit.unwrap() + limit.offset;
85+
let new_limit = Limit {
86+
limit: limit
87+
.limit
88+
.map(|origin_limit| cmp::max(origin_limit, limit_offset)),
89+
offset: 0,
90+
};
91+
92+
// Push down new_limit to union children
93+
let mut union_left_child = union_s_expr.child(0)?.clone();
94+
let mut union_right_child = union_s_expr.child(1)?.clone();
95+
96+
// Add limit to union children
97+
union_left_child = SExpr::create_unary(new_limit.clone().into(), union_left_child);
98+
union_right_child = SExpr::create_unary(new_limit.into(), union_right_child);
99+
100+
let mut result = SExpr::create_binary(union.into(), union_left_child, union_right_child);
101+
// Add original limit to top
102+
result = SExpr::create_unary(limit.into(), result);
103+
result.apply_rule(&self.id);
104+
state.add_result(result);
105+
106+
Ok(())
107+
}
108+
109+
fn pattern(&self) -> &SExpr {
110+
&self.pattern
111+
}
112+
}

src/query/sql/src/planner/optimizer/rule/rule.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub enum RuleID {
3939
PushDownFilterUnion,
4040
PushDownFilterJoin,
4141
PushDownFilterScan,
42+
PushDownLimitUnion,
4243
PushDownLimitOuterJoin,
4344
PushDownLimitSort,
4445
PushDownLimitScan,
@@ -67,6 +68,7 @@ impl Display for RuleID {
6768
RuleID::PushDownFilterEvalScalar => write!(f, "PushDownFilterEvalScalar"),
6869
RuleID::PushDownFilterJoin => write!(f, "PushDownFilterJoin"),
6970
RuleID::PushDownFilterScan => write!(f, "PushDownFilterScan"),
71+
RuleID::PushDownLimitUnion => write!(f, "PushDownLimitUnion"),
7072
RuleID::PushDownLimitOuterJoin => write!(f, "PushDownLimitOuterJoin"),
7173
RuleID::PushDownLimitSort => write!(f, "PushDownLimitSort"),
7274
RuleID::PushDownLimitScan => write!(f, "PushDownLimitScan"),
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
statement ok
2+
drop view if exists t;
3+
4+
statement ok
5+
drop table if exists t1;
6+
7+
statement ok
8+
drop table if exists t2;
9+
10+
statement ok
11+
create table t1 (a int, b int);
12+
13+
statement ok
14+
insert into t1 values (1, 2), (2, 3);
15+
16+
statement ok
17+
create table t2 (a int, b int);
18+
19+
statement ok
20+
insert into t2 values (1, 2), (2, 3);
21+
22+
statement ok
23+
create view t as select * from t1 union all select * from t2;
24+
25+
statement query T
26+
explain select * from t where a > b;
27+
28+
----
29+
UnionAll
30+
├── TableScan
31+
│ ├── table: default.default.t1
32+
│ ├── read rows: 0
33+
│ ├── read bytes: 0
34+
│ ├── partitions total: 1
35+
│ ├── partitions scanned: 0
36+
│ └── push downs: [filters: [(a > b)], limit: NONE]
37+
└── TableScan
38+
├── table: default.default.t2
39+
├── read rows: 0
40+
├── read bytes: 0
41+
├── partitions total: 1
42+
├── partitions scanned: 0
43+
└── push downs: [filters: [(a > b)], limit: NONE]
44+
45+
statement query T
46+
explain select * from t where a > 1;
47+
48+
----
49+
UnionAll
50+
├── TableScan
51+
│ ├── table: default.default.t1
52+
│ ├── read rows: 2
53+
│ ├── read bytes: 62
54+
│ ├── partitions total: 1
55+
│ ├── partitions scanned: 1
56+
│ └── push downs: [filters: [(a > 1)], limit: NONE]
57+
└── TableScan
58+
├── table: default.default.t2
59+
├── read rows: 2
60+
├── read bytes: 62
61+
├── partitions total: 1
62+
├── partitions scanned: 1
63+
└── push downs: [filters: [(a > 1)], limit: NONE]
64+
65+
statement query T
66+
explain select * from t limit 3;
67+
68+
----
69+
Limit
70+
├── limit: 3
71+
├── offset: 0
72+
└── UnionAll
73+
├── Limit
74+
│ ├── limit: 3
75+
│ ├── offset: 0
76+
│ └── TableScan
77+
│ ├── table: default.default.t1
78+
│ ├── read rows: 2
79+
│ ├── read bytes: 62
80+
│ ├── partitions total: 1
81+
│ ├── partitions scanned: 1
82+
│ └── push downs: [filters: [], limit: 3]
83+
└── Limit
84+
├── limit: 3
85+
├── offset: 0
86+
└── TableScan
87+
├── table: default.default.t2
88+
├── read rows: 2
89+
├── read bytes: 62
90+
├── partitions total: 1
91+
├── partitions scanned: 1
92+
└── push downs: [filters: [], limit: 3]
93+
94+
statement query T
95+
explain select * from t limit 3 offset 1;
96+
97+
----
98+
Limit
99+
├── limit: 3
100+
├── offset: 1
101+
└── UnionAll
102+
├── Limit
103+
│ ├── limit: 4
104+
│ ├── offset: 0
105+
│ └── TableScan
106+
│ ├── table: default.default.t1
107+
│ ├── read rows: 2
108+
│ ├── read bytes: 62
109+
│ ├── partitions total: 1
110+
│ ├── partitions scanned: 1
111+
│ └── push downs: [filters: [], limit: 4]
112+
└── Limit
113+
├── limit: 4
114+
├── offset: 0
115+
└── TableScan
116+
├── table: default.default.t2
117+
├── read rows: 2
118+
├── read bytes: 62
119+
├── partitions total: 1
120+
├── partitions scanned: 1
121+
└── push downs: [filters: [], limit: 4]
122+
123+
statement query T
124+
explain select * from t1 union all select * from t2 limit 1;
125+
126+
----
127+
Limit
128+
├── limit: 1
129+
├── offset: 0
130+
└── UnionAll
131+
├── Limit
132+
│ ├── limit: 1
133+
│ ├── offset: 0
134+
│ └── TableScan
135+
│ ├── table: default.default.t1
136+
│ ├── read rows: 2
137+
│ ├── read bytes: 62
138+
│ ├── partitions total: 1
139+
│ ├── partitions scanned: 1
140+
│ └── push downs: [filters: [], limit: 1]
141+
└── Limit
142+
├── limit: 1
143+
├── offset: 0
144+
└── TableScan
145+
├── table: default.default.t2
146+
├── read rows: 2
147+
├── read bytes: 62
148+
├── partitions total: 1
149+
├── partitions scanned: 1
150+
└── push downs: [filters: [], limit: 1]
151+
152+
statement ok
153+
drop table t1;
154+
155+
statement ok
156+
drop table t2;
157+
158+
statement ok
159+
drop view t;

0 commit comments

Comments
 (0)