Skip to content

Commit bfb919d

Browse files
authored
feat(query): optimize min/max aggregation use accurate column stats (#16270)
* feat(query): optimize min/max aggregation use accurate column stats * feat(query): optimize min/max aggregation use accurate column stats * feat(query): optimize min/max aggregation use accurate column stats * feat(query): optimize min/max aggregation use accurate column stats * update * feat(query): optimize min/max aggregation use accurate column stats * feat(query): optimize min/max aggregation use accurate column stats
1 parent a10e885 commit bfb919d

File tree

9 files changed

+426
-141
lines changed

9 files changed

+426
-141
lines changed

โ€Žsrc/query/catalog/src/table.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,10 +625,13 @@ impl CompactionLimits {
625625
}
626626
}
627627

628+
#[derive(Debug)]
628629
pub struct Bound {
629630
pub value: Scalar,
630631
pub may_be_truncated: bool,
631632
}
633+
634+
#[derive(Debug)]
632635
pub struct ColumnRange {
633636
pub min: Bound,
634637
pub max: Bound,

โ€Žsrc/query/sql/src/planner/binder/aggregate.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,11 @@ impl Binder {
443443
let agg_info = &bind_context.aggregate_info;
444444
let mut scalar_items: Vec<ScalarItem> =
445445
Vec::with_capacity(agg_info.aggregate_arguments.len() + agg_info.group_items.len());
446+
446447
for arg in agg_info.aggregate_arguments.iter() {
447448
scalar_items.push(arg.clone());
448449
}
450+
449451
for item in agg_info.group_items.iter() {
450452
if let ScalarExpr::BoundColumnRef(col) = &item.scalar {
451453
if col.column.column_name.eq("_grouping_id") {

โ€Žsrc/query/sql/src/planner/optimizer/aggregate/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@
1313
// limitations under the License.
1414

1515
mod normalize_aggregate;
16+
mod stats_aggregate;
1617

1718
pub use normalize_aggregate::RuleNormalizeAggregateOptimizer;
19+
pub use stats_aggregate::RuleStatsAggregateOptimizer;
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_catalog::table_context::TableContext;
18+
use databend_common_exception::Result;
19+
use databend_common_expression::types::DataType;
20+
21+
use crate::optimizer::SExpr;
22+
use crate::plans::Aggregate;
23+
use crate::plans::ConstantExpr;
24+
use crate::plans::DummyTableScan;
25+
use crate::plans::EvalScalar;
26+
use crate::plans::Operator;
27+
use crate::plans::RelOp;
28+
use crate::plans::RelOperator;
29+
use crate::plans::ScalarExpr;
30+
use crate::plans::ScalarItem;
31+
use crate::MetadataRef;
32+
33+
// Replace aggregate function with scalar from table's accurate stats function
34+
pub struct RuleStatsAggregateOptimizer {
35+
metadata: MetadataRef,
36+
ctx: Arc<dyn TableContext>,
37+
}
38+
39+
impl RuleStatsAggregateOptimizer {
40+
pub fn new(ctx: Arc<dyn TableContext>, metadata: MetadataRef) -> Self {
41+
RuleStatsAggregateOptimizer { metadata, ctx }
42+
}
43+
44+
#[async_recursion::async_recursion(#[recursive::recursive])]
45+
pub async fn run(&self, s_expr: &SExpr) -> Result<SExpr> {
46+
let mut children = Vec::with_capacity(s_expr.arity());
47+
for child in s_expr.children() {
48+
let child = self.run(child).await?;
49+
children.push(Arc::new(child));
50+
}
51+
let s_expr = s_expr.replace_children(children);
52+
if let RelOperator::Aggregate(_) = s_expr.plan.as_ref() {
53+
self.normalize_aggregate(&s_expr).await
54+
} else {
55+
Ok(s_expr)
56+
}
57+
}
58+
59+
async fn normalize_aggregate(&self, s_expr: &SExpr) -> Result<SExpr> {
60+
let agg: Aggregate = s_expr.plan().clone().try_into()?;
61+
if s_expr.arity() != 1 || agg.grouping_sets.is_some() || !agg.group_items.is_empty() {
62+
return Ok(s_expr.clone());
63+
}
64+
65+
// agg --> eval scalar --> scan
66+
let arg_eval_scalar = s_expr.child(0)?;
67+
if arg_eval_scalar.arity() != 1
68+
|| arg_eval_scalar.plan.as_ref().rel_op() != RelOp::EvalScalar
69+
{
70+
return Ok(s_expr.clone());
71+
}
72+
73+
let child = arg_eval_scalar.child(0)?;
74+
if child.arity() != 0 {
75+
return Ok(s_expr.clone());
76+
}
77+
78+
if let RelOperator::Scan(scan) = child.plan.as_ref() {
79+
if scan.prewhere.is_none() && scan.push_down_predicates.is_none() {
80+
let table = self.metadata.read().table(scan.table_index).table();
81+
let schema = table.schema();
82+
83+
let mut column_ids = Vec::with_capacity(agg.aggregate_functions.len());
84+
let mut need_rewrite_aggs = Vec::with_capacity(agg.aggregate_functions.len());
85+
86+
for item in agg.aggregate_functions.iter() {
87+
if let ScalarExpr::AggregateFunction(function) = &item.scalar {
88+
if ["min", "max"].contains(&function.func_name.as_str())
89+
&& function.args.len() == 1
90+
&& !function.distinct
91+
&& Self::supported_stat_type(&function.args[0].data_type()?)
92+
{
93+
if let ScalarExpr::BoundColumnRef(b) = &function.args[0] {
94+
if let Ok(col_id) =
95+
schema.column_id_of(b.column.column_name.as_str())
96+
{
97+
column_ids.push(col_id);
98+
need_rewrite_aggs
99+
.push(Some((col_id, function.func_name.clone())));
100+
101+
continue;
102+
}
103+
}
104+
}
105+
}
106+
need_rewrite_aggs.push(None);
107+
}
108+
109+
if column_ids.is_empty() {
110+
return Ok(s_expr.clone());
111+
}
112+
113+
let mut eval_scalar_results = Vec::with_capacity(agg.aggregate_functions.len());
114+
let mut agg_results = Vec::with_capacity(agg.aggregate_functions.len());
115+
116+
if let Some(stats) = table
117+
.accurate_columns_ranges(self.ctx.clone(), &column_ids)
118+
.await?
119+
{
120+
for (need_rewrite_agg, agg) in
121+
need_rewrite_aggs.iter().zip(agg.aggregate_functions.iter())
122+
{
123+
if let Some((col_id, name)) = need_rewrite_agg {
124+
if let Some(stat) = stats.get(col_id) {
125+
if name.eq_ignore_ascii_case("min") && !stat.min.may_be_truncated {
126+
eval_scalar_results.push(ScalarItem {
127+
index: agg.index,
128+
scalar: ScalarExpr::ConstantExpr(ConstantExpr {
129+
value: stat.min.value.clone(),
130+
span: None,
131+
}),
132+
});
133+
continue;
134+
} else if !stat.max.may_be_truncated {
135+
eval_scalar_results.push(ScalarItem {
136+
index: agg.index,
137+
scalar: ScalarExpr::ConstantExpr(ConstantExpr {
138+
value: stat.max.value.clone(),
139+
span: None,
140+
}),
141+
});
142+
continue;
143+
}
144+
}
145+
}
146+
agg_results.push(agg.clone());
147+
}
148+
}
149+
if eval_scalar_results.is_empty() {
150+
return Ok(s_expr.clone());
151+
}
152+
153+
let eval_scalar = EvalScalar {
154+
items: eval_scalar_results,
155+
};
156+
157+
if agg_results.is_empty() {
158+
let leaf = SExpr::create_leaf(Arc::new(DummyTableScan.into()));
159+
return Ok(SExpr::create_unary(
160+
Arc::new(eval_scalar.into()),
161+
Arc::new(leaf),
162+
));
163+
} else {
164+
let agg = Aggregate {
165+
aggregate_functions: agg_results,
166+
..agg.clone()
167+
};
168+
let child = SExpr::create_unary(
169+
Arc::new(agg.into()),
170+
Arc::new(arg_eval_scalar.clone()),
171+
);
172+
return Ok(SExpr::create_unary(
173+
Arc::new(eval_scalar.into()),
174+
Arc::new(child),
175+
));
176+
}
177+
}
178+
}
179+
Ok(s_expr.clone())
180+
}
181+
182+
// from RangeIndex::supported_stat_type
183+
fn supported_stat_type(data_type: &DataType) -> bool {
184+
let inner_type = data_type.remove_nullable();
185+
matches!(
186+
inner_type,
187+
DataType::Number(_)
188+
| DataType::Date
189+
| DataType::Timestamp
190+
| DataType::String
191+
| DataType::Decimal(_)
192+
)
193+
}
194+
}

โ€Žsrc/query/sql/src/planner/optimizer/optimizer.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_exception::Result;
2222
use educe::Educe;
2323
use log::info;
2424

25+
use super::aggregate::RuleStatsAggregateOptimizer;
2526
use super::distributed::BroadcastToShuffleOptimizer;
2627
use super::format::display_memo;
2728
use super::Memo;
@@ -312,6 +313,10 @@ pub async fn optimize_query(opt_ctx: &mut OptimizerContext, mut s_expr: SExpr) -
312313
)?;
313314
}
314315

316+
s_expr = RuleStatsAggregateOptimizer::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone())
317+
.run(&s_expr)
318+
.await?;
319+
315320
// Collect statistics for each leaf node in SExpr.
316321
s_expr = CollectStatisticsOptimizer::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone())
317322
.run(&s_expr)
@@ -402,6 +407,10 @@ async fn get_optimized_memo(opt_ctx: OptimizerContext, mut s_expr: SExpr) -> Res
402407
)?;
403408
}
404409

410+
s_expr = RuleStatsAggregateOptimizer::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone())
411+
.run(&s_expr)
412+
.await?;
413+
405414
// Collect statistics for each leaf node in SExpr.
406415
s_expr = CollectStatisticsOptimizer::new(opt_ctx.table_ctx.clone(), opt_ctx.metadata.clone())
407416
.run(&s_expr)

โ€Žsrc/query/sql/src/planner/optimizer/rule/rewrite/rule_merge_eval_scalar.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ impl Rule for RuleMergeEvalScalar {
6060
fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> {
6161
let up_eval_scalar: EvalScalar = s_expr.plan().clone().try_into()?;
6262
let down_eval_scalar: EvalScalar = s_expr.child(0)?.plan().clone().try_into()?;
63-
6463
let mut used_columns = ColumnSet::new();
6564
for item in up_eval_scalar.items.iter() {
6665
used_columns = used_columns
@@ -71,7 +70,6 @@ impl Rule for RuleMergeEvalScalar {
7170

7271
let rel_expr = RelExpr::with_s_expr(s_expr.child(0)?);
7372
let input_prop = rel_expr.derive_relational_prop_child(0)?;
74-
7573
// Check if the up EvalScalar depends on the down EvalScalar
7674
if used_columns.is_subset(&input_prop.output_columns) {
7775
// TODO(leiysky): eliminate duplicated scalars

โ€Žtests/sqllogictests/suites/mode/cluster/explain_v2.test

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ drop table if exists t1 all;
44
statement ok
55
drop table if exists t2 all;
66

7+
statement ok
8+
set max_threads = 1;
9+
710
statement ok
811
create table t1(a int not null, b int not null) as select number as a, number+1 as b from numbers(100);
912

@@ -175,30 +178,13 @@ Limit
175178
โ”œโ”€โ”€ estimated rows: 1.00
176179
โ””โ”€โ”€ EvalScalar
177180
โ”œโ”€โ”€ output columns: [count(1) (#2), max(a) (#4), count(b) (#3)]
178-
โ”œโ”€โ”€ expressions: [count(1) (#2)]
181+
โ”œโ”€โ”€ expressions: [99, count(1) (#2)]
179182
โ”œโ”€โ”€ estimated rows: 1.00
180-
โ””โ”€โ”€ AggregateFinal
181-
โ”œโ”€โ”€ output columns: [count(1) (#2), max(a) (#4)]
182-
โ”œโ”€โ”€ group by: []
183-
โ”œโ”€โ”€ aggregate functions: [count(), max(a)]
183+
โ””โ”€โ”€ EvalScalar
184+
โ”œโ”€โ”€ output columns: [count(1) (#2)]
185+
โ”œโ”€โ”€ expressions: [100]
184186
โ”œโ”€โ”€ estimated rows: 1.00
185-
โ””โ”€โ”€ Exchange
186-
โ”œโ”€โ”€ output columns: [count(1) (#2), max(a) (#4)]
187-
โ”œโ”€โ”€ exchange type: Merge
188-
โ””โ”€โ”€ AggregatePartial
189-
โ”œโ”€โ”€ group by: []
190-
โ”œโ”€โ”€ aggregate functions: [count(), max(a)]
191-
โ”œโ”€โ”€ estimated rows: 1.00
192-
โ””โ”€โ”€ TableScan
193-
โ”œโ”€โ”€ table: default.default.t1
194-
โ”œโ”€โ”€ output columns: [a (#0)]
195-
โ”œโ”€โ”€ read rows: 100
196-
โ”œโ”€โ”€ read size: < 1 KiB
197-
โ”œโ”€โ”€ partitions total: 3
198-
โ”œโ”€โ”€ partitions scanned: 3
199-
โ”œโ”€โ”€ pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 3 to 3>]
200-
โ”œโ”€โ”€ push downs: [filters: [], limit: NONE]
201-
โ””โ”€โ”€ estimated rows: 100.00
187+
โ””โ”€โ”€ DummyTableScan
202188

203189
query T
204190
explain select (t1.a + 1) as c,(t1.b+1) as d, (t2.a+1) as e from t1 join t2 on t1.a = t2.a order by c, d, e limit 10;

0 commit comments

Comments
ย (0)