Skip to content

Commit 6cd46f1

Browse files
committed
sql/planner/optimizer/*/mod.rs -> *.rs
1 parent 6ff463e commit 6cd46f1

File tree

8 files changed

+374
-294
lines changed

8 files changed

+374
-294
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use common_exception::Result;
16+
17+
use crate::sql::optimizer::property::require_property;
18+
use crate::sql::optimizer::Distribution;
19+
use crate::sql::optimizer::RelExpr;
20+
use crate::sql::optimizer::RequiredProperty;
21+
use crate::sql::optimizer::SExpr;
22+
use crate::sql::plans::Exchange;
23+
24+
pub fn optimize_distributed_query(s_expr: &SExpr) -> Result<SExpr> {
25+
let required = RequiredProperty {
26+
distribution: Distribution::Any,
27+
};
28+
let mut result = require_property(&required, s_expr)?;
29+
let rel_expr = RelExpr::with_s_expr(&result);
30+
let physical_prop = rel_expr.derive_physical_prop()?;
31+
let root_required = RequiredProperty {
32+
distribution: Distribution::Serial,
33+
};
34+
if !root_required.satisfied_by(&physical_prop) {
35+
// Manually enforce serial distribution.
36+
result = SExpr::create_unary(Exchange::Merge.into(), result);
37+
}
38+
39+
Ok(result)
40+
}

src/query/service/src/sql/planner/optimizer/distributed/mod.rs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use common_exception::Result;
15+
#[allow(clippy::module_inception)]
16+
mod distributed;
1617

17-
use super::property::require_property;
18-
use super::Distribution;
19-
use super::RelExpr;
20-
use super::RequiredProperty;
21-
use super::SExpr;
22-
use crate::sql::plans::Exchange;
23-
24-
pub fn optimize_distributed_query(s_expr: &SExpr) -> Result<SExpr> {
25-
let required = RequiredProperty {
26-
distribution: Distribution::Any,
27-
};
28-
let mut result = require_property(&required, s_expr)?;
29-
let rel_expr = RelExpr::with_s_expr(&result);
30-
let physical_prop = rel_expr.derive_physical_prop()?;
31-
let root_required = RequiredProperty {
32-
distribution: Distribution::Serial,
33-
};
34-
if !root_required.satisfied_by(&physical_prop) {
35-
// Manually enforce serial distribution.
36-
result = SExpr::create_unary(Exchange::Merge.into(), result);
37-
}
38-
39-
Ok(result)
40-
}
18+
pub use distributed::optimize_distributed_query;
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::sql::optimizer::group::Group;
16+
use crate::sql::optimizer::MExpr;
17+
use crate::sql::optimizer::Memo;
18+
use crate::sql::plans::RelOperator;
19+
20+
pub fn display_memo(memo: &Memo) -> String {
21+
memo.groups
22+
.iter()
23+
.map(display_group)
24+
.collect::<Vec<_>>()
25+
.join("\n")
26+
}
27+
28+
pub fn display_group(group: &Group) -> String {
29+
format!(
30+
"Group #{}: [{}]",
31+
group.group_index,
32+
group
33+
.m_exprs
34+
.iter()
35+
.map(display_m_expr)
36+
.collect::<Vec<_>>()
37+
.join(",\n")
38+
)
39+
}
40+
41+
pub fn display_m_expr(m_expr: &MExpr) -> String {
42+
format!(
43+
"{} [{}]",
44+
display_rel_op(&m_expr.plan),
45+
m_expr
46+
.children
47+
.iter()
48+
.map(|child| format!("#{child}"))
49+
.collect::<Vec<_>>()
50+
.join(", ")
51+
)
52+
}
53+
54+
pub fn display_rel_op(rel_op: &RelOperator) -> String {
55+
match rel_op {
56+
RelOperator::LogicalGet(_) => "LogicalGet".to_string(),
57+
RelOperator::LogicalInnerJoin(_) => "LogicalInnerJoin".to_string(),
58+
RelOperator::PhysicalScan(_) => "PhysicalScan".to_string(),
59+
RelOperator::PhysicalHashJoin(_) => "PhysicalHashJoin".to_string(),
60+
RelOperator::Project(_) => "Project".to_string(),
61+
RelOperator::EvalScalar(_) => "EvalScalar".to_string(),
62+
RelOperator::Filter(_) => "Filter".to_string(),
63+
RelOperator::Aggregate(_) => "Aggregate".to_string(),
64+
RelOperator::Sort(_) => "Sort".to_string(),
65+
RelOperator::Limit(_) => "Limit".to_string(),
66+
RelOperator::UnionAll(_) => "UnionAll".to_string(),
67+
RelOperator::Exchange(_) => "Exchange".to_string(),
68+
RelOperator::Pattern(_) => "Pattern".to_string(),
69+
RelOperator::DummyTableScan(_) => "DummyTableScan".to_string(),
70+
}
71+
}

src/query/service/src/sql/planner/optimizer/format/mod.rs

Lines changed: 6 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,60 +12,10 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use super::group::Group;
16-
use super::MExpr;
17-
use super::Memo;
18-
use crate::sql::plans::RelOperator;
15+
#[allow(clippy::module_inception)]
16+
mod format;
1917

20-
pub fn display_memo(memo: &Memo) -> String {
21-
memo.groups
22-
.iter()
23-
.map(display_group)
24-
.collect::<Vec<_>>()
25-
.join("\n")
26-
}
27-
28-
pub fn display_group(group: &Group) -> String {
29-
format!(
30-
"Group #{}: [{}]",
31-
group.group_index,
32-
group
33-
.m_exprs
34-
.iter()
35-
.map(display_m_expr)
36-
.collect::<Vec<_>>()
37-
.join(",\n")
38-
)
39-
}
40-
41-
pub fn display_m_expr(m_expr: &MExpr) -> String {
42-
format!(
43-
"{} [{}]",
44-
display_rel_op(&m_expr.plan),
45-
m_expr
46-
.children
47-
.iter()
48-
.map(|child| format!("#{child}"))
49-
.collect::<Vec<_>>()
50-
.join(", ")
51-
)
52-
}
53-
54-
pub fn display_rel_op(rel_op: &RelOperator) -> String {
55-
match rel_op {
56-
RelOperator::LogicalGet(_) => "LogicalGet".to_string(),
57-
RelOperator::LogicalInnerJoin(_) => "LogicalInnerJoin".to_string(),
58-
RelOperator::PhysicalScan(_) => "PhysicalScan".to_string(),
59-
RelOperator::PhysicalHashJoin(_) => "PhysicalHashJoin".to_string(),
60-
RelOperator::Project(_) => "Project".to_string(),
61-
RelOperator::EvalScalar(_) => "EvalScalar".to_string(),
62-
RelOperator::Filter(_) => "Filter".to_string(),
63-
RelOperator::Aggregate(_) => "Aggregate".to_string(),
64-
RelOperator::Sort(_) => "Sort".to_string(),
65-
RelOperator::Limit(_) => "Limit".to_string(),
66-
RelOperator::UnionAll(_) => "UnionAll".to_string(),
67-
RelOperator::Exchange(_) => "Exchange".to_string(),
68-
RelOperator::Pattern(_) => "Pattern".to_string(),
69-
RelOperator::DummyTableScan(_) => "DummyTableScan".to_string(),
70-
}
71-
}
18+
pub use format::display_group;
19+
pub use format::display_m_expr;
20+
pub use format::display_memo;
21+
pub use format::display_rel_op;
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
// Copyright 2022 Datafuse Labs.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use common_exception::Result;
18+
use common_planner::MetadataRef;
19+
use once_cell::sync::Lazy;
20+
21+
use crate::sessions::TableContext;
22+
use crate::sql::optimizer::heuristic::decorrelate::decorrelate_subquery;
23+
use crate::sql::optimizer::heuristic::implement::HeuristicImplementor;
24+
use crate::sql::optimizer::heuristic::prewhere_optimization::PrewhereOptimizer;
25+
use crate::sql::optimizer::heuristic::prune_columns;
26+
use crate::sql::optimizer::heuristic::RuleList;
27+
use crate::sql::optimizer::rule::TransformState;
28+
use crate::sql::optimizer::ColumnSet;
29+
use crate::sql::optimizer::RuleID;
30+
use crate::sql::optimizer::SExpr;
31+
use crate::sql::BindContext;
32+
33+
pub static DEFAULT_REWRITE_RULES: Lazy<Vec<RuleID>> = Lazy::new(|| {
34+
vec![
35+
RuleID::NormalizeDisjunctiveFilter,
36+
RuleID::NormalizeScalarFilter,
37+
RuleID::EliminateFilter,
38+
RuleID::EliminateEvalScalar,
39+
RuleID::EliminateProject,
40+
RuleID::MergeFilter,
41+
RuleID::MergeEvalScalar,
42+
RuleID::MergeProject,
43+
RuleID::PushDownLimitProject,
44+
RuleID::PushDownLimitSort,
45+
RuleID::PushDownLimitOuterJoin,
46+
RuleID::PushDownLimitScan,
47+
RuleID::PushDownSortScan,
48+
RuleID::PushDownFilterEvalScalar,
49+
RuleID::PushDownFilterProject,
50+
RuleID::PushDownFilterJoin,
51+
RuleID::FoldCountAggregate,
52+
RuleID::SplitAggregate,
53+
RuleID::PushDownFilterScan,
54+
]
55+
});
56+
57+
/// A heuristic query optimizer. It will apply specific transformation rules in order and
58+
/// implement the logical plans with default implementation rules.
59+
pub struct HeuristicOptimizer {
60+
rules: RuleList,
61+
implementor: HeuristicImplementor,
62+
63+
_ctx: Arc<dyn TableContext>,
64+
bind_context: Box<BindContext>,
65+
metadata: MetadataRef,
66+
}
67+
68+
impl HeuristicOptimizer {
69+
pub fn new(
70+
ctx: Arc<dyn TableContext>,
71+
bind_context: Box<BindContext>,
72+
metadata: MetadataRef,
73+
rules: RuleList,
74+
) -> Self {
75+
HeuristicOptimizer {
76+
rules,
77+
implementor: HeuristicImplementor::new(),
78+
79+
_ctx: ctx,
80+
bind_context,
81+
metadata,
82+
}
83+
}
84+
85+
fn pre_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
86+
let result = decorrelate_subquery(self.metadata.clone(), s_expr)?;
87+
Ok(result)
88+
}
89+
90+
fn post_optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
91+
let prewhere_optimizer = PrewhereOptimizer::new(self.metadata.clone());
92+
let s_expr = prewhere_optimizer.prewhere_optimize(s_expr)?;
93+
94+
let pruner = prune_columns::ColumnPruner::new(self.metadata.clone());
95+
let require_columns: ColumnSet =
96+
self.bind_context.columns.iter().map(|c| c.index).collect();
97+
pruner.prune_columns(&s_expr, require_columns)
98+
}
99+
100+
pub fn optimize(&mut self, s_expr: SExpr) -> Result<SExpr> {
101+
let pre_optimized = self.pre_optimize(s_expr)?;
102+
let optimized = self.optimize_expression(&pre_optimized)?;
103+
let post_optimized = self.post_optimize(optimized)?;
104+
// let mut result = self.implement_expression(&post_optimized)?;
105+
106+
Ok(post_optimized)
107+
}
108+
109+
fn optimize_expression(&self, s_expr: &SExpr) -> Result<SExpr> {
110+
let mut optimized_children = Vec::with_capacity(s_expr.arity());
111+
for expr in s_expr.children() {
112+
optimized_children.push(self.optimize_expression(expr)?);
113+
}
114+
let optimized_expr = s_expr.replace_children(optimized_children);
115+
let result = self.apply_transform_rules(&optimized_expr, &self.rules)?;
116+
117+
Ok(result)
118+
}
119+
120+
#[allow(dead_code)]
121+
fn implement_expression(&self, s_expr: &SExpr) -> Result<SExpr> {
122+
let mut implemented_children = Vec::with_capacity(s_expr.arity());
123+
for expr in s_expr.children() {
124+
implemented_children.push(self.implement_expression(expr)?);
125+
}
126+
let implemented_expr =
127+
SExpr::create(s_expr.plan().clone(), implemented_children, None, None);
128+
// Implement expression with Implementor
129+
let mut state = TransformState::new();
130+
self.implementor.implement(&implemented_expr, &mut state)?;
131+
let result = if !state.results().is_empty() {
132+
state.results()[0].clone()
133+
} else {
134+
implemented_expr
135+
};
136+
Ok(result)
137+
}
138+
139+
// Return `None` if no rules matched
140+
fn apply_transform_rules(&self, s_expr: &SExpr, rule_list: &RuleList) -> Result<SExpr> {
141+
let mut s_expr = s_expr.clone();
142+
for rule in rule_list.iter() {
143+
let mut state = TransformState::new();
144+
if s_expr.match_pattern(rule.pattern()) && !s_expr.applied_rule(&rule.id()) {
145+
s_expr.apply_rule(&rule.id());
146+
rule.apply(&s_expr, &mut state)?;
147+
if !state.results().is_empty() {
148+
// Recursive optimize the result
149+
let result = &state.results()[0];
150+
let optimized_result = self.optimize_expression(result)?;
151+
152+
return Ok(optimized_result);
153+
}
154+
}
155+
}
156+
157+
Ok(s_expr.clone())
158+
}
159+
}

0 commit comments

Comments
 (0)