Skip to content

Commit 965fbbb

Browse files
committed
Push down projection to prewhere scan to prune output columns.
1 parent 7aa820a commit 965fbbb

File tree

9 files changed

+113
-45
lines changed

9 files changed

+113
-45
lines changed

src/query/legacy-planners/src/plan_node_extras.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,10 @@ impl Debug for Projection {
6868

6969
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
7070
pub struct PrewhereInfo {
71-
/// column indices of the table used for prewhere
72-
pub need_columns: Projection,
71+
/// columns to be ouput be prewhere scan
72+
pub output_columns: Projection,
73+
/// columns used for prewhere
74+
pub prewhere_columns: Projection,
7375
/// remain_columns = scan.columns - need_columns
7476
pub remain_columns: Projection,
7577
/// filter for prewhere

src/query/service/src/sql/executor/physical_plan_builder.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,16 @@ impl PhysicalPlanBuilder {
121121
if column.path_indices.is_some() {
122122
has_inner_column = true;
123123
}
124-
let name = column.name.clone();
125-
name_mapping.insert(name, index.to_string());
124+
if let Some(prewhere) = &scan.prewhere {
125+
// if there is a prewhere optimization,
126+
// we can prune `PhysicalScan`'s ouput schema.
127+
if prewhere.output_columns.contains(index) {
128+
name_mapping.insert(column.name.clone(), index.to_string());
129+
}
130+
} else {
131+
let name = column.name.clone();
132+
name_mapping.insert(name, index.to_string());
133+
}
126134
}
127135

128136
let table_entry = metadata.table(scan.table_index);
@@ -424,13 +432,19 @@ impl PhysicalPlanBuilder {
424432

425433
let remain_columns = scan
426434
.columns
427-
.difference(&prewhere.columns)
435+
.difference(&prewhere.prewhere_columns)
428436
.copied()
429437
.collect::<HashSet<usize>>();
430-
let need_columns = Self::build_projection(
438+
let output_columns = Self::build_projection(
439+
&metadata,
440+
table_schema,
441+
&prewhere.output_columns,
442+
has_inner_column,
443+
);
444+
let prewhere_columns = Self::build_projection(
431445
&metadata,
432446
table_schema,
433-
&prewhere.columns,
447+
&prewhere.prewhere_columns,
434448
has_inner_column,
435449
);
436450
let remain_columns = Self::build_projection(
@@ -441,7 +455,8 @@ impl PhysicalPlanBuilder {
441455
);
442456

443457
Ok::<PrewhereInfo, ErrorCode>(PrewhereInfo {
444-
need_columns,
458+
output_columns,
459+
prewhere_columns,
445460
remain_columns,
446461
filter,
447462
})

src/query/service/src/sql/planner/metadata.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,14 @@ impl Metadata {
230230
}
231231
table_index
232232
}
233+
234+
pub fn find_smallest_column_within(&self, indices: &[usize]) -> usize {
235+
let entries = indices
236+
.iter()
237+
.map(|i| self.column(*i).clone())
238+
.collect::<Vec<_>>();
239+
find_smallest_column(entries.as_slice())
240+
}
233241
}
234242

235243
pub fn optimize_remove_count_args(name: &str, distinct: bool, args: &[&Expr]) -> bool {

src/query/service/src/sql/planner/optimizer/heuristic/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,17 @@
1414

1515
mod decorrelate;
1616
mod implement;
17+
mod prewhere_optimization;
1718
mod prune_columns;
1819
mod rule_list;
1920
mod subquery_rewriter;
20-
mod where_optimizer;
2121

2222
use std::sync::Arc;
2323

2424
use common_exception::Result;
2525
use once_cell::sync::Lazy;
2626

27+
use self::prewhere_optimization::PrewhereOptimizer;
2728
use super::rule::RuleID;
2829
use super::ColumnSet;
2930
use crate::sessions::TableContext;
@@ -93,13 +94,13 @@ impl HeuristicOptimizer {
9394
}
9495

9596
fn post_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
97+
let prewhere_optimizer = PrewhereOptimizer::new(self.metadata.clone());
98+
let s_expr = prewhere_optimizer.prewhere_optimize(s_expr)?;
99+
96100
let pruner = prune_columns::ColumnPruner::new(self.metadata.clone());
97101
let require_columns: ColumnSet =
98102
self.bind_context.columns.iter().map(|c| c.index).collect();
99-
let s_expr = pruner.prune_columns(&s_expr, require_columns)?;
100-
101-
let where_opt = where_optimizer::WhereOptimizer::new(self.metadata.clone());
102-
where_opt.where_optimize(s_expr)
103+
pruner.prune_columns(&s_expr, require_columns)
103104
}
104105

105106
pub fn optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {

src/query/service/src/sql/planner/optimizer/heuristic/where_optimizer.rs renamed to src/query/service/src/sql/planner/optimizer/heuristic/prewhere_optimization.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ use crate::sql::plans::RelOp;
2424
use crate::sql::plans::Scalar;
2525
use crate::sql::MetadataRef;
2626

27-
pub struct WhereOptimizer {
27+
pub struct PrewhereOptimizer {
2828
metadata: MetadataRef,
2929
pattern: SExpr,
3030
}
3131

32-
impl WhereOptimizer {
32+
impl PrewhereOptimizer {
3333
pub fn new(metadata: MetadataRef) -> Self {
3434
Self {
3535
metadata,
@@ -97,7 +97,7 @@ impl WhereOptimizer {
9797
)
9898
}
9999

100-
pub fn where_optimize(&self, s_expr: SExpr) -> Result<SExpr> {
100+
pub fn prewhere_optimize(&self, s_expr: SExpr) -> Result<SExpr> {
101101
let rel_op = s_expr.plan();
102102
if s_expr.match_pattern(&self.pattern) {
103103
let mut filter: Filter = s_expr.plan().clone().try_into()?;
@@ -131,7 +131,8 @@ impl WhereOptimizer {
131131
None
132132
} else {
133133
Some(Prewhere {
134-
columns: prewhere_columns,
134+
output_columns: get.columns.clone(),
135+
prewhere_columns,
135136
predicates: prewhere_pred,
136137
})
137138
};
@@ -149,7 +150,7 @@ impl WhereOptimizer {
149150
let children = s_expr
150151
.children()
151152
.iter()
152-
.map(|expr| self.where_optimize(expr.clone()))
153+
.map(|expr| self.prewhere_optimize(expr.clone()))
153154
.collect::<Result<Vec<_>>>()?;
154155
Ok(SExpr::create(rel_op.clone(), children, None, None))
155156
}

src/query/service/src/sql/planner/optimizer/heuristic/prune_columns.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,29 @@ impl ColumnPruner {
7676
fn keep_required_columns(&self, expr: &SExpr, mut required: ColumnSet) -> Result<SExpr> {
7777
match expr.plan() {
7878
RelOperator::LogicalGet(p) => {
79+
let mut prewhere = p.prewhere.clone();
7980
let mut used: ColumnSet = required.intersection(&p.columns).cloned().collect();
81+
if let Some(ref mut pw) = prewhere {
82+
debug_assert!(
83+
pw.prewhere_columns.is_subset(&p.columns),
84+
"prewhere columns should be a subset of scan columns"
85+
);
86+
// `used` is the columns which prewhere scan needs to output for its upper operator.
87+
if used.is_empty() {
88+
let prewhere_columns =
89+
pw.prewhere_columns.iter().map(|&i| i).collect::<Vec<_>>();
90+
let smallest_index = self
91+
.metadata
92+
.read()
93+
.find_smallest_column_within(prewhere_columns.as_slice());
94+
used.insert(smallest_index);
95+
}
96+
pw.output_columns = used.clone();
97+
// `prune_columns` is after `prewhere_optimize`,
98+
// so we need to add prewhere columns to scan columns.
99+
used = used.union(&pw.prewhere_columns).cloned().collect();
100+
}
101+
80102
if used.is_empty() {
81103
let columns = self.metadata.read().columns_by_table_index(p.table_index);
82104
let smallest_index = find_smallest_column(&columns);
@@ -90,7 +112,7 @@ impl ColumnPruner {
90112
limit: p.limit,
91113
order_by: p.order_by.clone(),
92114
statistics: p.statistics,
93-
prewhere: p.prewhere.clone(),
115+
prewhere,
94116
})))
95117
}
96118
RelOperator::LogicalInnerJoin(p) => {

src/query/service/src/sql/planner/plans/logical_get.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ use crate::sql::plans::SortItem;
2929

3030
#[derive(Clone, Debug, PartialEq, Eq)]
3131
pub struct Prewhere {
32-
pub columns: ColumnSet,
32+
// columns needed to be output after prewhere scan
33+
pub output_columns: ColumnSet,
34+
// columns needed to conduct prewhere filter
35+
pub prewhere_columns: ColumnSet,
36+
// prewhere filter predicates
3337
pub predicates: Vec<Scalar>,
3438
}
3539

src/query/service/src/storages/storage_table_read_plan.rs

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
use std::collections::BTreeMap;
1616
use std::sync::Arc;
1717

18+
use common_datavalues::DataField;
1819
use common_exception::Result;
1920
use common_legacy_planners::Extras;
2021
use common_legacy_planners::Projection;
2122
use common_legacy_planners::ReadDataSourcePlan;
2223
use common_legacy_planners::SourceInfo;
24+
use common_meta_app::schema::TableInfo;
2325

2426
use crate::sessions::QueryContext;
2527
use crate::storages::Table;
@@ -57,29 +59,14 @@ impl ToReadDataSourcePlan for dyn Table {
5759
let description = statistics.get_description(table_info);
5860

5961
let scan_fields = match (self.benefit_column_prune(), &push_downs) {
60-
(true, Some(push_downs)) => match &push_downs.projection {
61-
Some(projection) => match projection {
62-
Projection::Columns(ref indices) => {
63-
if indices.len() < table_info.schema().fields().len() {
64-
let fields = indices
65-
.iter()
66-
.map(|i| table_info.schema().field(*i).clone());
67-
68-
Some((indices.iter().cloned().zip(fields)).collect::<BTreeMap<_, _>>())
69-
} else {
70-
None
71-
}
72-
}
73-
Projection::InnerColumns(ref path_indices) => {
74-
let column_ids: Vec<usize> = path_indices.keys().cloned().collect();
75-
let new_schema = table_info.schema().inner_project(path_indices);
76-
Some(
77-
(column_ids.iter().cloned().zip(new_schema.fields().clone()))
78-
.collect::<BTreeMap<_, _>>(),
79-
)
80-
}
62+
(true, Some(push_downs)) => match &push_downs.prewhere {
63+
Some(prewhere) => {
64+
extract_scan_fields_from_projection(table_info, &prewhere.output_columns)
65+
}
66+
_ => match &push_downs.projection {
67+
Some(projection) => extract_scan_fields_from_projection(table_info, projection),
68+
_ => None,
8169
},
82-
_ => None,
8370
},
8471
_ => None,
8572
};
@@ -98,3 +85,30 @@ impl ToReadDataSourcePlan for dyn Table {
9885
})
9986
}
10087
}
88+
89+
fn extract_scan_fields_from_projection(
90+
table_info: &TableInfo,
91+
projection: &Projection,
92+
) -> Option<BTreeMap<usize, DataField>> {
93+
match projection {
94+
Projection::Columns(ref indices) => {
95+
if indices.len() < table_info.schema().fields().len() {
96+
let fields = indices
97+
.iter()
98+
.map(|i| table_info.schema().field(*i).clone());
99+
100+
Some((indices.iter().cloned().zip(fields)).collect::<BTreeMap<_, _>>())
101+
} else {
102+
None
103+
}
104+
}
105+
Projection::InnerColumns(ref path_indices) => {
106+
let column_ids: Vec<usize> = path_indices.keys().cloned().collect();
107+
let new_schema = table_info.schema().inner_project(path_indices);
108+
Some(
109+
(column_ids.iter().cloned().zip(new_schema.fields().clone()))
110+
.collect::<BTreeMap<_, _>>(),
111+
)
112+
}
113+
}
114+
}

src/query/storages/fuse/src/operations/read.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl FuseTable {
9090

9191
let (output_reader, prewhere_reader, prewhere_filter, remain_reader) =
9292
if let Some(prewhere) = self.prewhere_of_push_downs(&plan.push_downs) {
93-
let prewhere_schema = prewhere.need_columns.project_schema(&table_schema);
93+
let prewhere_schema = prewhere.prewhere_columns.project_schema(&table_schema);
9494
let prewhere_schema = Arc::new(prewhere_schema);
9595
let expr_field = prewhere.filter.to_data_field(&prewhere_schema)?;
9696
let expr_schema = DataSchemaRefExt::create(vec![expr_field]);
@@ -103,9 +103,10 @@ impl FuseTable {
103103
vec![prewhere.filter.clone()],
104104
false,
105105
)?;
106-
106+
let output_reader =
107+
self.create_block_reader(&ctx, prewhere.output_columns.clone())?;
107108
let prewhere_reader =
108-
self.create_block_reader(&ctx, prewhere.need_columns.clone())?;
109+
self.create_block_reader(&ctx, prewhere.prewhere_columns.clone())?;
109110
let remain_reader = if prewhere.remain_columns.is_empty() {
110111
None
111112
} else {

0 commit comments

Comments
 (0)