Skip to content

Commit 288c3bb

Browse files
authored
Merge pull request #9209 from xudong963/filter_push_down_union
feat: push down `filter` to `union`
2 parents 93ffeb0 + b3a2558 commit 288c3bb

File tree

6 files changed

+279
-1
lines changed

6 files changed

+279
-1
lines changed

src/query/sql/src/planner/optimizer/heuristic/heuristic.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub static DEFAULT_REWRITE_RULES: Lazy<Vec<RuleID>> = Lazy::new(|| {
3838
RuleID::EliminateEvalScalar,
3939
RuleID::MergeFilter,
4040
RuleID::MergeEvalScalar,
41+
RuleID::PushDownFilterUnion,
4142
RuleID::PushDownLimitSort,
4243
RuleID::PushDownLimitOuterJoin,
4344
RuleID::PushDownLimitScan,

src/query/sql/src/planner/optimizer/rule/factory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::optimizer::rule::rewrite::RuleEliminateFilter;
2727
use crate::optimizer::rule::rewrite::RuleMergeEvalScalar;
2828
use crate::optimizer::rule::rewrite::RuleMergeFilter;
2929
use crate::optimizer::rule::rewrite::RulePushDownFilterScan;
30+
use crate::optimizer::rule::rewrite::RulePushDownFilterUnion;
3031
use crate::optimizer::rule::rewrite::RulePushDownLimitOuterJoin;
3132
use crate::optimizer::rule::rewrite::RulePushDownLimitScan;
3233
use crate::optimizer::rule::rewrite::RulePushDownLimitSort;
@@ -49,6 +50,7 @@ impl RuleFactory {
4950
RuleID::ImplementGet => Ok(Box::new(RuleImplementGet::new())),
5051
RuleID::ImplementHashJoin => Ok(Box::new(RuleImplementHashJoin::new())),
5152
RuleID::EliminateEvalScalar => Ok(Box::new(RuleEliminateEvalScalar::new())),
53+
RuleID::PushDownFilterUnion => Ok(Box::new(RulePushDownFilterUnion::new())),
5254
RuleID::PushDownFilterEvalScalar => Ok(Box::new(RulePushDownFilterEvalScalar::new())),
5355
RuleID::PushDownFilterJoin => Ok(Box::new(RulePushDownFilterJoin::new())),
5456
RuleID::PushDownFilterScan => Ok(Box::new(RulePushDownFilterScan::new())),

src/query/sql/src/planner/optimizer/rule/rewrite/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod rule_normalize_scalar;
2222
mod rule_push_down_filter_eval_scalar;
2323
mod rule_push_down_filter_join;
2424
mod rule_push_down_filter_scan;
25+
mod rule_push_down_filter_union;
2526
mod rule_push_down_limit_join;
2627
mod rule_push_down_limit_scan;
2728
mod rule_push_down_limit_sort;
@@ -39,6 +40,7 @@ pub use rule_push_down_filter_eval_scalar::RulePushDownFilterEvalScalar;
3940
pub use rule_push_down_filter_join::try_push_down_filter_join;
4041
pub use rule_push_down_filter_join::RulePushDownFilterJoin;
4142
pub use rule_push_down_filter_scan::RulePushDownFilterScan;
43+
pub use rule_push_down_filter_union::RulePushDownFilterUnion;
4244
pub use rule_push_down_limit_join::RulePushDownLimitOuterJoin;
4345
pub use rule_push_down_limit_scan::RulePushDownLimitScan;
4446
pub use rule_push_down_limit_sort::RulePushDownLimitSort;
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use ahash::HashMap;
16+
use common_exception::ErrorCode;
17+
use common_exception::Result;
18+
19+
use crate::optimizer::rule::Rule;
20+
use crate::optimizer::rule::TransformResult;
21+
use crate::optimizer::RuleID;
22+
use crate::optimizer::SExpr;
23+
use crate::plans::AggregateFunction;
24+
use crate::plans::AndExpr;
25+
use crate::plans::BoundColumnRef;
26+
use crate::plans::CastExpr;
27+
use crate::plans::ComparisonExpr;
28+
use crate::plans::Filter;
29+
use crate::plans::FunctionCall;
30+
use crate::plans::OrExpr;
31+
use crate::plans::PatternPlan;
32+
use crate::plans::RelOp;
33+
use crate::plans::Scalar;
34+
use crate::plans::UnionAll;
35+
use crate::ColumnBinding;
36+
use crate::IndexType;
37+
use crate::Visibility;
38+
39+
// For a union query, it's not allowed to add `filter` after union
40+
// Such as: `(select * from t1 union all select * from t2) where a > 1`, it's invalid.
41+
// However, it's possible to have `filter` after `union` when involved `view`
42+
// Such as: `create view v_t as (select * from t1 union all select * from t2)`.
43+
// Then use the view with filter, `select * from v_t where a > 1`;
44+
// So it'll be efficient to push down `filter` to `union`, reduce the size of data to pull from table.
45+
pub struct RulePushDownFilterUnion {
46+
id: RuleID,
47+
pattern: SExpr,
48+
}
49+
50+
impl RulePushDownFilterUnion {
51+
pub fn new() -> Self {
52+
Self {
53+
id: RuleID::PushDownFilterUnion,
54+
// Filter
55+
// \
56+
// UnionAll
57+
// / \
58+
// ... ...
59+
pattern: SExpr::create_unary(
60+
PatternPlan {
61+
plan_type: RelOp::Filter,
62+
}
63+
.into(),
64+
SExpr::create_binary(
65+
PatternPlan {
66+
plan_type: RelOp::UnionAll,
67+
}
68+
.into(),
69+
SExpr::create_leaf(
70+
PatternPlan {
71+
plan_type: RelOp::Pattern,
72+
}
73+
.into(),
74+
),
75+
SExpr::create_leaf(
76+
PatternPlan {
77+
plan_type: RelOp::Pattern,
78+
}
79+
.into(),
80+
),
81+
),
82+
),
83+
}
84+
}
85+
}
86+
87+
impl Rule for RulePushDownFilterUnion {
88+
fn id(&self) -> RuleID {
89+
self.id
90+
}
91+
92+
fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
93+
let filter: Filter = s_expr.plan().clone().try_into()?;
94+
let union_s_expr = s_expr.child(0)?;
95+
let union: UnionAll = union_s_expr.plan().clone().try_into()?;
96+
97+
// Create a filter which matches union's right child.
98+
let index_pairs: HashMap<IndexType, IndexType> =
99+
union.pairs.iter().map(|pair| (pair.0, pair.1)).collect();
100+
let new_predicates = filter
101+
.predicates
102+
.iter()
103+
.map(|predicate| replace_column_binding(&index_pairs, predicate.clone()))
104+
.collect::<Result<Vec<_>>>()?;
105+
let right_filer = Filter {
106+
predicates: new_predicates,
107+
is_having: filter.is_having,
108+
};
109+
110+
let mut union_left_child = union_s_expr.child(0)?.clone();
111+
let mut union_right_child = union_s_expr.child(1)?.clone();
112+
113+
// Add filter to union children
114+
union_left_child = SExpr::create_unary(filter.into(), union_left_child);
115+
union_right_child = SExpr::create_unary(right_filer.into(), union_right_child);
116+
117+
let result = SExpr::create_binary(union.into(), union_left_child, union_right_child);
118+
state.add_result(result);
119+
120+
Ok(())
121+
}
122+
123+
fn pattern(&self) -> &SExpr {
124+
&self.pattern
125+
}
126+
}
127+
128+
fn replace_column_binding(
129+
index_pairs: &HashMap<IndexType, IndexType>,
130+
scalar: Scalar,
131+
) -> Result<Scalar> {
132+
match scalar {
133+
Scalar::BoundColumnRef(column) => {
134+
let index = column.column.index;
135+
if index_pairs.contains_key(&index) {
136+
let new_column = ColumnBinding {
137+
database_name: None,
138+
table_name: None,
139+
column_name: "".to_string(),
140+
index: *index_pairs.get(&index).unwrap(),
141+
data_type: column.column.data_type,
142+
visibility: Visibility::Visible,
143+
};
144+
return Ok(Scalar::BoundColumnRef(BoundColumnRef {
145+
column: new_column,
146+
}));
147+
}
148+
Ok(Scalar::BoundColumnRef(column))
149+
}
150+
constant_expr @ Scalar::ConstantExpr(_) => Ok(constant_expr),
151+
Scalar::AndExpr(expr) => Ok(Scalar::AndExpr(AndExpr {
152+
left: Box::new(replace_column_binding(index_pairs, *expr.left)?),
153+
right: Box::new(replace_column_binding(index_pairs, *expr.right)?),
154+
return_type: expr.return_type,
155+
})),
156+
Scalar::OrExpr(expr) => Ok(Scalar::OrExpr(OrExpr {
157+
left: Box::new(replace_column_binding(index_pairs, *expr.left)?),
158+
right: Box::new(replace_column_binding(index_pairs, *expr.right)?),
159+
return_type: expr.return_type,
160+
})),
161+
Scalar::ComparisonExpr(expr) => Ok(Scalar::ComparisonExpr(ComparisonExpr {
162+
op: expr.op,
163+
left: Box::new(replace_column_binding(index_pairs, *expr.left)?),
164+
right: Box::new(replace_column_binding(index_pairs, *expr.right)?),
165+
return_type: expr.return_type,
166+
})),
167+
Scalar::AggregateFunction(expr) => Ok(Scalar::AggregateFunction(AggregateFunction {
168+
display_name: expr.display_name,
169+
func_name: expr.func_name,
170+
distinct: expr.distinct,
171+
params: expr.params,
172+
args: expr
173+
.args
174+
.into_iter()
175+
.map(|arg| replace_column_binding(index_pairs, arg))
176+
.collect::<Result<Vec<_>>>()?,
177+
return_type: expr.return_type,
178+
})),
179+
Scalar::FunctionCall(expr) => Ok(Scalar::FunctionCall(FunctionCall {
180+
arguments: expr
181+
.arguments
182+
.into_iter()
183+
.map(|arg| replace_column_binding(index_pairs, arg))
184+
.collect::<Result<Vec<_>>>()?,
185+
func_name: expr.func_name,
186+
arg_types: expr.arg_types,
187+
return_type: expr.return_type,
188+
})),
189+
Scalar::CastExpr(expr) => Ok(Scalar::CastExpr(CastExpr {
190+
argument: Box::new(replace_column_binding(index_pairs, *(expr.argument))?),
191+
from_type: expr.from_type,
192+
target_type: expr.target_type,
193+
})),
194+
Scalar::SubqueryExpr(_) => Err(ErrorCode::Unimplemented(
195+
"replace_column_binding: don't support subquery",
196+
)),
197+
}
198+
}

src/query/sql/src/planner/optimizer/rule/rule.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub enum RuleID {
3636
NormalizeScalarFilter,
3737
NormalizeDisjunctiveFilter,
3838
PushDownFilterEvalScalar,
39+
PushDownFilterUnion,
3940
PushDownFilterJoin,
4041
PushDownFilterScan,
4142
PushDownLimitOuterJoin,
@@ -62,6 +63,7 @@ pub enum RuleID {
6263
impl Display for RuleID {
6364
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
6465
match self {
66+
RuleID::PushDownFilterUnion => write!(f, "PushDownFilterUnion"),
6567
RuleID::PushDownFilterEvalScalar => write!(f, "PushDownFilterEvalScalar"),
6668
RuleID::PushDownFilterJoin => write!(f, "PushDownFilterJoin"),
6769
RuleID::PushDownFilterScan => write!(f, "PushDownFilterScan"),

tests/logictest/suites/query/union.test

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,77 @@ ORDER BY age,
144144
17-23 2
145145
24-30 2
146146
年轻 2
147-
非年轻 2
147+
非年轻 2
148+
149+
statement ok
150+
drop view if exists t;
151+
152+
statement ok
153+
drop table if exists t1;
154+
155+
statement ok
156+
drop table if exists t2;
157+
158+
statement ok
159+
create table t1 (a int, b int);
160+
161+
statement ok
162+
insert into t1 values (1, 2), (2, 3);
163+
164+
statement ok
165+
create table t2 (a int, b int);
166+
167+
statement ok
168+
insert into t2 values (1, 2), (2, 3);
169+
170+
statement ok
171+
create view t as select * from t1 union all select * from t2;
172+
173+
statement query T
174+
explain select * from t where a > b;
175+
176+
----
177+
UnionAll
178+
├── TableScan
179+
│ ├── table: default.default.t1
180+
│ ├── read rows: 0
181+
│ ├── read bytes: 0
182+
│ ├── partitions total: 1
183+
│ ├── partitions scanned: 0
184+
│ └── push downs: [filters: [(a > b)], limit: NONE]
185+
└── TableScan
186+
├── table: default.default.t2
187+
├── read rows: 0
188+
├── read bytes: 0
189+
├── partitions total: 1
190+
├── partitions scanned: 0
191+
└── push downs: [filters: [(a > b)], limit: NONE]
192+
193+
statement query T
194+
explain select * from t where a > 1;
195+
196+
----
197+
UnionAll
198+
├── TableScan
199+
│ ├── table: default.default.t1
200+
│ ├── read rows: 2
201+
│ ├── read bytes: 62
202+
│ ├── partitions total: 1
203+
│ ├── partitions scanned: 1
204+
│ └── push downs: [filters: [(a > 1)], limit: NONE]
205+
└── TableScan
206+
├── table: default.default.t2
207+
├── read rows: 2
208+
├── read bytes: 62
209+
├── partitions total: 1
210+
├── partitions scanned: 1
211+
└── push downs: [filters: [(a > 1)], limit: NONE]
212+
213+
statement ok
214+
drop table t1;
215+
216+
statement ok
217+
drop table t2;
218+
219+
statement ok
220+
drop view t;

0 commit comments

Comments
 (0)