Skip to content

Commit 2a7d9ef

Browse files
authored
Merge pull request #7646 from RinChanNOWWW/prune-prewhere
refactor(query): push all filters to prewhere and prune columns for it.
2 parents 7d35a62 + 3d29e26 commit 2a7d9ef

File tree

13 files changed

+177
-152
lines changed

13 files changed

+177
-152
lines changed

src/query/legacy-planners/src/plan_node_extras.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,10 @@ impl Debug for Projection {
6868

6969
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
7070
pub struct PrewhereInfo {
71-
/// column indices of the table used for prewhere
72-
pub need_columns: Projection,
71+
/// columns to be ouput be prewhere scan
72+
pub output_columns: Projection,
73+
/// columns used for prewhere
74+
pub prewhere_columns: Projection,
7375
/// remain_columns = scan.columns - need_columns
7476
pub remain_columns: Projection,
7577
/// filter for prewhere

src/query/service/src/sql/executor/physical_plan_builder.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,16 @@ impl PhysicalPlanBuilder {
121121
if column.path_indices.is_some() {
122122
has_inner_column = true;
123123
}
124-
let name = column.name.clone();
125-
name_mapping.insert(name, index.to_string());
124+
if let Some(prewhere) = &scan.prewhere {
125+
// if there is a prewhere optimization,
126+
// we can prune `PhysicalScan`'s ouput schema.
127+
if prewhere.output_columns.contains(index) {
128+
name_mapping.insert(column.name.clone(), index.to_string());
129+
}
130+
} else {
131+
let name = column.name.clone();
132+
name_mapping.insert(name, index.to_string());
133+
}
126134
}
127135

128136
let table_entry = metadata.table(scan.table_index);
@@ -424,13 +432,19 @@ impl PhysicalPlanBuilder {
424432

425433
let remain_columns = scan
426434
.columns
427-
.difference(&prewhere.columns)
435+
.difference(&prewhere.prewhere_columns)
428436
.copied()
429437
.collect::<HashSet<usize>>();
430-
let need_columns = Self::build_projection(
438+
let output_columns = Self::build_projection(
439+
&metadata,
440+
table_schema,
441+
&prewhere.output_columns,
442+
has_inner_column,
443+
);
444+
let prewhere_columns = Self::build_projection(
431445
&metadata,
432446
table_schema,
433-
&prewhere.columns,
447+
&prewhere.prewhere_columns,
434448
has_inner_column,
435449
);
436450
let remain_columns = Self::build_projection(
@@ -441,7 +455,8 @@ impl PhysicalPlanBuilder {
441455
);
442456

443457
Ok::<PrewhereInfo, ErrorCode>(PrewhereInfo {
444-
need_columns,
458+
output_columns,
459+
prewhere_columns,
445460
remain_columns,
446461
filter,
447462
})

src/query/service/src/sql/planner/metadata.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,14 @@ impl Metadata {
230230
}
231231
table_index
232232
}
233+
234+
pub fn find_smallest_column_within(&self, indices: &[usize]) -> usize {
235+
let entries = indices
236+
.iter()
237+
.map(|i| self.column(*i).clone())
238+
.collect::<Vec<_>>();
239+
find_smallest_column(entries.as_slice())
240+
}
233241
}
234242

235243
pub fn optimize_remove_count_args(name: &str, distinct: bool, args: &[&Expr]) -> bool {

src/query/service/src/sql/planner/optimizer/heuristic/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@
1414

1515
mod decorrelate;
1616
mod implement;
17+
mod prewhere_optimization;
1718
mod prune_columns;
1819
mod rule_list;
1920
mod subquery_rewriter;
20-
mod where_optimizer;
2121

2222
use std::sync::Arc;
2323

2424
use common_exception::Result;
2525
use once_cell::sync::Lazy;
2626

27+
use self::prewhere_optimization::PrewhereOptimizer;
2728
use super::rule::RuleID;
2829
use super::ColumnSet;
2930
use crate::sessions::TableContext;
@@ -93,13 +94,13 @@ impl HeuristicOptimizer {
9394
}
9495

9596
fn post_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
97+
let prewhere_optimizer = PrewhereOptimizer::new(self.metadata.clone());
98+
let s_expr = prewhere_optimizer.prewhere_optimize(s_expr)?;
99+
96100
let pruner = prune_columns::ColumnPruner::new(self.metadata.clone());
97101
let require_columns: ColumnSet =
98102
self.bind_context.columns.iter().map(|c| c.index).collect();
99-
let s_expr = pruner.prune_columns(&s_expr, require_columns)?;
100-
101-
let where_opt = where_optimizer::WhereOptimizer::new(self.metadata.clone());
102-
where_opt.where_optimize(s_expr)
103+
pruner.prune_columns(&s_expr, require_columns)
103104
}
104105

105106
pub fn optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {

src/query/service/src/sql/planner/optimizer/heuristic/where_optimizer.rs renamed to src/query/service/src/sql/planner/optimizer/heuristic/prewhere_optimization.rs

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ use crate::sql::plans::RelOp;
2424
use crate::sql::plans::Scalar;
2525
use crate::sql::MetadataRef;
2626

27-
pub struct WhereOptimizer {
27+
pub struct PrewhereOptimizer {
2828
metadata: MetadataRef,
2929
pattern: SExpr,
3030
}
3131

32-
impl WhereOptimizer {
32+
impl PrewhereOptimizer {
3333
pub fn new(metadata: MetadataRef) -> Self {
3434
Self {
3535
metadata,
@@ -48,59 +48,50 @@ impl WhereOptimizer {
4848
}
4949
}
5050

51-
fn collect_columns(expr: &Scalar, columns: &mut ColumnSet) {
51+
fn collect_columns_impl(expr: &Scalar, columns: &mut ColumnSet) {
5252
match expr {
5353
Scalar::BoundColumnRef(column) => {
5454
columns.insert(column.column.index);
5555
}
5656
Scalar::AndExpr(and) => {
57-
Self::collect_columns(and.left.as_ref(), columns);
58-
Self::collect_columns(and.right.as_ref(), columns);
57+
Self::collect_columns_impl(and.left.as_ref(), columns);
58+
Self::collect_columns_impl(and.right.as_ref(), columns);
5959
}
6060
Scalar::OrExpr(or) => {
61-
Self::collect_columns(or.left.as_ref(), columns);
62-
Self::collect_columns(or.right.as_ref(), columns);
61+
Self::collect_columns_impl(or.left.as_ref(), columns);
62+
Self::collect_columns_impl(or.right.as_ref(), columns);
6363
}
6464
Scalar::ComparisonExpr(cmp) => {
65-
Self::collect_columns(cmp.left.as_ref(), columns);
66-
Self::collect_columns(cmp.right.as_ref(), columns);
65+
Self::collect_columns_impl(cmp.left.as_ref(), columns);
66+
Self::collect_columns_impl(cmp.right.as_ref(), columns);
6767
}
6868
Scalar::FunctionCall(func) => {
6969
for arg in func.arguments.iter() {
70-
Self::collect_columns(arg, columns);
70+
Self::collect_columns_impl(arg, columns);
7171
}
7272
}
7373
Scalar::CastExpr(cast) => {
74-
Self::collect_columns(cast.argument.as_ref(), columns);
74+
Self::collect_columns_impl(cast.argument.as_ref(), columns);
7575
}
7676
// 1. ConstantExpr is not collected.
77-
// 2. SubqueryExpr is not collected.
78-
// 3. AggregateFunction will not appear in where clause.
77+
// 2. SubqueryExpr and AggregateFunction will not appear in Filter-LogicalGet
7978
_ => {}
8079
}
8180
}
8281

8382
// analyze if the expression can be moved to prewhere
84-
fn analyze(expr: &Scalar, columns_to_scan: usize) -> (bool, ColumnSet) {
83+
fn collect_columns(expr: &Scalar) -> ColumnSet {
8584
let mut columns = ColumnSet::new();
86-
8785
// columns in subqueries are not considered
88-
Self::collect_columns(expr, &mut columns);
86+
Self::collect_columns_impl(expr, &mut columns);
8987

90-
// viable conditions:
91-
// 1. Condition depend on some column. Constant expressions are not moved.
92-
// 2. Do not move conditions involving all queried columns.
93-
// 3. Only current table columns are considered. (This condition is always true in current Pattern (Filter -> LogicalGet)).
94-
(
95-
!columns.is_empty() && columns.len() < columns_to_scan,
96-
columns,
97-
)
88+
columns
9889
}
9990

100-
pub fn where_optimize(&self, s_expr: SExpr) -> Result<SExpr> {
91+
pub fn prewhere_optimize(&self, s_expr: SExpr) -> Result<SExpr> {
10192
let rel_op = s_expr.plan();
10293
if s_expr.match_pattern(&self.pattern) {
103-
let mut filter: Filter = s_expr.plan().clone().try_into()?;
94+
let filter: Filter = s_expr.plan().clone().try_into()?;
10495
let mut get: LogicalGet = s_expr.child(0)?.plan().clone().try_into()?;
10596
let metadata = self.metadata.read().clone();
10697

@@ -112,44 +103,30 @@ impl WhereOptimizer {
112103

113104
let mut prewhere_columns = ColumnSet::new();
114105
let mut prewhere_pred = Vec::new();
115-
let mut remain_pred = Vec::new();
116-
117-
let columns_to_scan = get.columns.len();
118106

119107
// filter.predicates are already splited by AND
120108
for pred in filter.predicates.iter() {
121-
let (viable, columns) = Self::analyze(pred, columns_to_scan);
122-
if viable {
123-
prewhere_pred.push(pred.clone());
124-
prewhere_columns.extend(&columns);
125-
} else {
126-
remain_pred.push(pred.clone());
127-
}
109+
let columns = Self::collect_columns(pred);
110+
prewhere_pred.push(pred.clone());
111+
prewhere_columns.extend(&columns);
128112
}
129113

130114
get.prewhere = if prewhere_pred.is_empty() {
131115
None
132116
} else {
133117
Some(Prewhere {
134-
columns: prewhere_columns,
118+
output_columns: get.columns.clone(),
119+
prewhere_columns,
135120
predicates: prewhere_pred,
136121
})
137122
};
138123

139-
if !remain_pred.is_empty() {
140-
filter.predicates = remain_pred;
141-
Ok(SExpr::create_unary(
142-
filter.into(),
143-
SExpr::create_leaf(get.into()),
144-
))
145-
} else {
146-
Ok(SExpr::create_leaf(get.into()))
147-
}
124+
Ok(SExpr::create_leaf(get.into()))
148125
} else {
149126
let children = s_expr
150127
.children()
151128
.iter()
152-
.map(|expr| self.where_optimize(expr.clone()))
129+
.map(|expr| self.prewhere_optimize(expr.clone()))
153130
.collect::<Result<Vec<_>>>()?;
154131
Ok(SExpr::create(rel_op.clone(), children, None, None))
155132
}

src/query/service/src/sql/planner/optimizer/heuristic/prune_columns.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,29 @@ impl ColumnPruner {
7676
fn keep_required_columns(&self, expr: &SExpr, mut required: ColumnSet) -> Result<SExpr> {
7777
match expr.plan() {
7878
RelOperator::LogicalGet(p) => {
79+
let mut prewhere = p.prewhere.clone();
7980
let mut used: ColumnSet = required.intersection(&p.columns).cloned().collect();
81+
if let Some(ref mut pw) = prewhere {
82+
debug_assert!(
83+
pw.prewhere_columns.is_subset(&p.columns),
84+
"prewhere columns should be a subset of scan columns"
85+
);
86+
// `used` is the columns which prewhere scan needs to output for its upper operator.
87+
if used.is_empty() {
88+
let prewhere_columns =
89+
pw.prewhere_columns.iter().copied().collect::<Vec<_>>();
90+
let smallest_index = self
91+
.metadata
92+
.read()
93+
.find_smallest_column_within(prewhere_columns.as_slice());
94+
used.insert(smallest_index);
95+
}
96+
pw.output_columns = used.clone();
97+
// `prune_columns` is after `prewhere_optimize`,
98+
// so we need to add prewhere columns to scan columns.
99+
used = used.union(&pw.prewhere_columns).cloned().collect();
100+
}
101+
80102
if used.is_empty() {
81103
let columns = self.metadata.read().columns_by_table_index(p.table_index);
82104
let smallest_index = find_smallest_column(&columns);
@@ -90,7 +112,7 @@ impl ColumnPruner {
90112
limit: p.limit,
91113
order_by: p.order_by.clone(),
92114
statistics: p.statistics,
93-
prewhere: p.prewhere.clone(),
115+
prewhere,
94116
})))
95117
}
96118
RelOperator::LogicalInnerJoin(p) => {

src/query/service/src/sql/planner/plans/logical_get.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ use crate::sql::plans::SortItem;
2929

3030
#[derive(Clone, Debug, PartialEq, Eq)]
3131
pub struct Prewhere {
32-
pub columns: ColumnSet,
32+
// columns needed to be output after prewhere scan
33+
pub output_columns: ColumnSet,
34+
// columns needed to conduct prewhere filter
35+
pub prewhere_columns: ColumnSet,
36+
// prewhere filter predicates
3337
pub predicates: Vec<Scalar>,
3438
}
3539

src/query/service/src/storages/storage_table_read_plan.rs

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
use std::collections::BTreeMap;
1616
use std::sync::Arc;
1717

18+
use common_datavalues::DataField;
1819
use common_exception::Result;
1920
use common_legacy_planners::Extras;
2021
use common_legacy_planners::Projection;
2122
use common_legacy_planners::ReadDataSourcePlan;
2223
use common_legacy_planners::SourceInfo;
24+
use common_meta_app::schema::TableInfo;
2325

2426
use crate::sessions::QueryContext;
2527
use crate::storages::Table;
@@ -57,29 +59,14 @@ impl ToReadDataSourcePlan for dyn Table {
5759
let description = statistics.get_description(table_info);
5860

5961
let scan_fields = match (self.benefit_column_prune(), &push_downs) {
60-
(true, Some(push_downs)) => match &push_downs.projection {
61-
Some(projection) => match projection {
62-
Projection::Columns(ref indices) => {
63-
if indices.len() < table_info.schema().fields().len() {
64-
let fields = indices
65-
.iter()
66-
.map(|i| table_info.schema().field(*i).clone());
67-
68-
Some((indices.iter().cloned().zip(fields)).collect::<BTreeMap<_, _>>())
69-
} else {
70-
None
71-
}
72-
}
73-
Projection::InnerColumns(ref path_indices) => {
74-
let column_ids: Vec<usize> = path_indices.keys().cloned().collect();
75-
let new_schema = table_info.schema().inner_project(path_indices);
76-
Some(
77-
(column_ids.iter().cloned().zip(new_schema.fields().clone()))
78-
.collect::<BTreeMap<_, _>>(),
79-
)
80-
}
62+
(true, Some(push_downs)) => match &push_downs.prewhere {
63+
Some(prewhere) => {
64+
extract_scan_fields_from_projection(table_info, &prewhere.output_columns)
65+
}
66+
_ => match &push_downs.projection {
67+
Some(projection) => extract_scan_fields_from_projection(table_info, projection),
68+
_ => None,
8169
},
82-
_ => None,
8370
},
8471
_ => None,
8572
};
@@ -98,3 +85,30 @@ impl ToReadDataSourcePlan for dyn Table {
9885
})
9986
}
10087
}
88+
89+
fn extract_scan_fields_from_projection(
90+
table_info: &TableInfo,
91+
projection: &Projection,
92+
) -> Option<BTreeMap<usize, DataField>> {
93+
match projection {
94+
Projection::Columns(ref indices) => {
95+
if indices.len() < table_info.schema().fields().len() {
96+
let fields = indices
97+
.iter()
98+
.map(|i| table_info.schema().field(*i).clone());
99+
100+
Some((indices.iter().cloned().zip(fields)).collect::<BTreeMap<_, _>>())
101+
} else {
102+
None
103+
}
104+
}
105+
Projection::InnerColumns(ref path_indices) => {
106+
let column_ids: Vec<usize> = path_indices.keys().cloned().collect();
107+
let new_schema = table_info.schema().inner_project(path_indices);
108+
Some(
109+
(column_ids.iter().cloned().zip(new_schema.fields().clone()))
110+
.collect::<BTreeMap<_, _>>(),
111+
)
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)