Skip to content

Commit 2ccede3

Browse files
authored
refactor: refactor cte binder and fix materialized cte used in subquery (#16785)
* refactor: refactor cte binder * extract cte context and refactor join parts, todo: set op, merge into and r cte * all cte tests pass * cargo fix * make tpcds q95 materialized cte work * remove used count in cte info * fix set_cte_context * fix the distinct number for ctes * try to fix stackoverflow * fix with consume * add test for tpcds95
1 parent fb5dbcb commit 2ccede3

File tree

26 files changed

+231
-215
lines changed

26 files changed

+231
-215
lines changed

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,6 @@ impl ReplaceInterpreter {
214214
&name_resolution_ctx,
215215
metadata,
216216
&[],
217-
Default::default(),
218-
Default::default(),
219217
);
220218
let (scalar, _) = scalar_binder.bind(expr)?;
221219
let columns = scalar.used_columns();

src/query/sql/src/planner/binder/aggregate.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -639,8 +639,6 @@ impl Binder {
639639
&self.name_resolution_ctx,
640640
self.metadata.clone(),
641641
&[],
642-
self.m_cte_bound_ctx.clone(),
643-
self.ctes_map.clone(),
644642
);
645643
let (scalar_expr, _) = scalar_binder
646644
.bind(expr)

src/query/sql/src/planner/binder/bind_context.rs

Lines changed: 86 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
use std::collections::btree_map;
1616
use std::collections::BTreeMap;
17+
use std::collections::HashMap;
1718
use std::hash::Hash;
19+
use std::sync::Arc;
1820

1921
use dashmap::DashMap;
2022
use databend_common_ast::ast::Identifier;
@@ -40,6 +42,9 @@ use crate::binder::column_binding::ColumnBinding;
4042
use crate::binder::window::WindowInfo;
4143
use crate::binder::ColumnBindingBuilder;
4244
use crate::normalize_identifier;
45+
use crate::optimizer::SExpr;
46+
use crate::plans::MaterializedCte;
47+
use crate::plans::RelOperator;
4348
use crate::plans::ScalarExpr;
4449
use crate::plans::ScalarItem;
4550
use crate::ColumnSet;
@@ -117,10 +122,7 @@ pub struct BindContext {
117122

118123
pub windows: WindowInfo,
119124

120-
/// If the `BindContext` is created from a CTE, record the cte name
121-
pub cte_name: Option<String>,
122-
123-
pub cte_map_ref: Box<IndexMap<String, CteInfo>>,
125+
pub cte_context: CteContext,
124126

125127
/// True if there is aggregation in current context, which means
126128
/// non-grouping columns cannot be referenced outside aggregation
@@ -153,15 +155,90 @@ pub struct BindContext {
153155
pub window_definitions: DashMap<String, WindowSpec>,
154156
}
155157

158+
#[derive(Clone, Debug, Default)]
159+
pub struct CteContext {
160+
/// If the `BindContext` is created from a CTE, record the cte name
161+
pub cte_name: Option<String>,
162+
/// Use `IndexMap` because need to keep the insertion order
163+
/// Then wrap materialized ctes to main plan.
164+
pub cte_map: Box<IndexMap<String, CteInfo>>,
165+
/// Record the bound s_expr of materialized cte
166+
pub m_cte_bound_s_expr: HashMap<IndexType, SExpr>,
167+
pub m_cte_materialized_indexes: HashMap<IndexType, IndexType>,
168+
}
169+
170+
impl CteContext {
171+
pub fn set_m_cte_bound_s_expr(&mut self, cte_idx: IndexType, s_expr: SExpr) {
172+
self.m_cte_bound_s_expr.insert(cte_idx, s_expr);
173+
}
174+
175+
pub fn set_m_cte_materialized_indexes(&mut self, cte_idx: IndexType, index: IndexType) {
176+
self.m_cte_materialized_indexes.insert(cte_idx, index);
177+
}
178+
179+
// Check if the materialized cte has been bound.
180+
pub fn has_bound(&self, cte_idx: IndexType) -> bool {
181+
self.m_cte_bound_s_expr.contains_key(&cte_idx)
182+
}
183+
184+
// Merge two `CteContext` into one.
185+
pub fn merge(&mut self, other: CteContext) {
186+
let mut merged_cte_map = IndexMap::new();
187+
for (left_key, left_value) in self.cte_map.iter() {
188+
if let Some(right_value) = other.cte_map.get(left_key) {
189+
let mut merged_value = left_value.clone();
190+
if left_value.columns.is_empty() {
191+
merged_value.columns = right_value.columns.clone()
192+
}
193+
merged_cte_map.insert(left_key.clone(), merged_value);
194+
}
195+
}
196+
self.cte_map = Box::new(merged_cte_map);
197+
self.m_cte_bound_s_expr.extend(other.m_cte_bound_s_expr);
198+
self.m_cte_materialized_indexes
199+
.extend(other.m_cte_materialized_indexes);
200+
}
201+
202+
// Wrap materialized cte to main plan.
203+
// It will be called at the end of binding.
204+
pub fn wrap_m_cte(&self, mut s_expr: SExpr) -> SExpr {
205+
for (_, cte_info) in self.cte_map.iter().rev() {
206+
if !cte_info.materialized {
207+
continue;
208+
}
209+
if let Some(cte_s_expr) = self.m_cte_bound_s_expr.get(&cte_info.cte_idx) {
210+
let materialized_output_columns = cte_info.columns.clone();
211+
s_expr = SExpr::create_binary(
212+
Arc::new(RelOperator::MaterializedCte(MaterializedCte {
213+
cte_idx: cte_info.cte_idx,
214+
materialized_output_columns,
215+
materialized_indexes: self.m_cte_materialized_indexes.clone(),
216+
})),
217+
Arc::new(s_expr),
218+
Arc::new(cte_s_expr.clone()),
219+
);
220+
}
221+
}
222+
s_expr
223+
}
224+
225+
// Set cte context to current `BindContext`.
226+
// To make sure the last `BindContext` of the whole binding phase contains `cte context`
227+
// Then we can wrap materialized cte to main plan.
228+
pub fn set_cte_context(&mut self, cte_context: CteContext) {
229+
self.cte_map = cte_context.cte_map;
230+
self.m_cte_bound_s_expr = cte_context.m_cte_bound_s_expr;
231+
self.m_cte_materialized_indexes = cte_context.m_cte_materialized_indexes;
232+
}
233+
}
234+
156235
#[derive(Clone, Debug)]
157236
pub struct CteInfo {
158237
pub columns_alias: Vec<String>,
159238
pub query: Query,
160239
pub materialized: bool,
161240
pub recursive: bool,
162241
pub cte_idx: IndexType,
163-
// Record how many times this cte is used
164-
pub used_count: usize,
165242
// If cte is materialized, save its columns
166243
pub columns: Vec<ColumnBinding>,
167244
}
@@ -174,8 +251,7 @@ impl BindContext {
174251
bound_internal_columns: BTreeMap::new(),
175252
aggregate_info: AggregateInfo::default(),
176253
windows: WindowInfo::default(),
177-
cte_name: None,
178-
cte_map_ref: Box::default(),
254+
cte_context: CteContext::default(),
179255
in_grouping: false,
180256
view_info: None,
181257
srfs: Vec::new(),
@@ -196,8 +272,7 @@ impl BindContext {
196272
bound_internal_columns: BTreeMap::new(),
197273
aggregate_info: Default::default(),
198274
windows: Default::default(),
199-
cte_name: parent.cte_name,
200-
cte_map_ref: parent.cte_map_ref.clone(),
275+
cte_context: parent.cte_context.clone(),
201276
in_grouping: false,
202277
view_info: None,
203278
srfs: Vec::new(),
@@ -215,8 +290,7 @@ impl BindContext {
215290
pub fn replace(&self) -> Self {
216291
let mut bind_context = BindContext::new();
217292
bind_context.parent = self.parent.clone();
218-
bind_context.cte_name = self.cte_name.clone();
219-
bind_context.cte_map_ref = self.cte_map_ref.clone();
293+
bind_context.cte_context = self.cte_context.clone();
220294
bind_context
221295
}
222296

src/query/sql/src/planner/binder/bind_mutation/bind.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use databend_common_expression::Scalar;
3232
use databend_common_expression::TableSchema;
3333
use databend_common_expression::TableSchemaRef;
3434
use databend_common_expression::ROW_VERSION_COL_NAME;
35-
use indexmap::IndexMap;
3635

3736
use crate::binder::bind_mutation::mutation_expression::MutationExpression;
3837
use crate::binder::bind_mutation::mutation_expression::MutationExpressionBindResult;
@@ -258,8 +257,6 @@ impl Binder {
258257
&name_resolution_ctx,
259258
self.metadata.clone(),
260259
&[],
261-
HashMap::new(),
262-
Box::new(IndexMap::new()),
263260
);
264261

265262
// Bind matched clause columns and add update fields and exprs

src/query/sql/src/planner/binder/bind_mutation/mutation_expression.rs

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use crate::optimizer::SExpr;
3939
use crate::optimizer::SubqueryRewriter;
4040
use crate::plans::BoundColumnRef;
4141
use crate::plans::Filter;
42-
use crate::plans::MaterializedCte;
4342
use crate::plans::MutationSource;
4443
use crate::plans::RelOperator;
4544
use crate::plans::SubqueryExpr;
@@ -121,7 +120,7 @@ impl MutationExpression {
121120
}
122121

123122
// Wrap `LogicalMaterializedCte` to `source_expr`.
124-
source_s_expr = binder.wrap_cte(source_s_expr);
123+
source_s_expr = source_context.cte_context.wrap_m_cte(source_s_expr);
125124

126125
// When there is "update *" or "insert *", prepare all source columns.
127126
let all_source_columns = Self::all_source_columns(
@@ -443,26 +442,6 @@ impl Binder {
443442
Ok(row_id_index)
444443
}
445444

446-
fn wrap_cte(&mut self, mut s_expr: SExpr) -> SExpr {
447-
for (_, cte_info) in self.ctes_map.iter().rev() {
448-
if !cte_info.materialized || cte_info.used_count == 0 {
449-
continue;
450-
}
451-
let cte_s_expr = self.m_cte_bound_s_expr.get(&cte_info.cte_idx).unwrap();
452-
let materialized_output_columns = cte_info.columns.clone();
453-
s_expr = SExpr::create_binary(
454-
Arc::new(RelOperator::MaterializedCte(MaterializedCte {
455-
cte_idx: cte_info.cte_idx,
456-
materialized_output_columns,
457-
materialized_indexes: self.m_cte_materialized_indexes.clone(),
458-
})),
459-
Arc::new(s_expr),
460-
Arc::new(cte_s_expr.clone()),
461-
);
462-
}
463-
s_expr
464-
}
465-
466445
// Recursively flatten the AND expressions.
467446
pub fn flatten_and_scalar_expr(scalar: &ScalarExpr) -> Vec<ScalarExpr> {
468447
if let ScalarExpr::FunctionCall(func) = scalar
@@ -489,8 +468,6 @@ impl Binder {
489468
&self.name_resolution_ctx,
490469
self.metadata.clone(),
491470
&[],
492-
self.m_cte_bound_ctx.clone(),
493-
self.ctes_map.clone(),
494471
);
495472
let (scalar, _) = scalar_binder.bind(expr)?;
496473
if !self.check_allowed_scalar_expr_with_subquery(&scalar)? {

src/query/sql/src/planner/binder/bind_query/bind.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl Binder {
7070

7171
for (idx, cte) in with.ctes.iter().enumerate() {
7272
let table_name = self.normalize_identifier(&cte.alias.name).name;
73-
if bind_context.cte_map_ref.contains_key(&table_name) {
73+
if bind_context.cte_context.cte_map.contains_key(&table_name) {
7474
return Err(ErrorCode::SemanticError(format!(
7575
"Duplicate common table expression: {table_name}"
7676
)));
@@ -87,11 +87,12 @@ impl Binder {
8787
materialized: cte.materialized,
8888
recursive: with.recursive,
8989
cte_idx: idx,
90-
used_count: 0,
9190
columns: vec![],
9291
};
93-
self.ctes_map.insert(table_name.clone(), cte_info.clone());
94-
bind_context.cte_map_ref.insert(table_name, cte_info);
92+
bind_context
93+
.cte_context
94+
.cte_map
95+
.insert(table_name, cte_info);
9596
}
9697

9798
Ok(())
@@ -117,8 +118,6 @@ impl Binder {
117118
&self.name_resolution_ctx,
118119
self.metadata.clone(),
119120
&[],
120-
self.m_cte_bound_ctx.clone(),
121-
self.ctes_map.clone(),
122121
);
123122
let mut order_by_items = Vec::with_capacity(query.order_by.len());
124123

src/query/sql/src/planner/binder/bind_query/bind_select.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,9 @@ impl Binder {
247247

248248
let mut output_context = BindContext::new();
249249
output_context.parent = from_context.parent;
250+
output_context
251+
.cte_context
252+
.set_cte_context(from_context.cte_context.clone());
250253
output_context.columns = from_context.columns;
251254

252255
Ok((s_expr, output_context))

src/query/sql/src/planner/binder/bind_query/bind_value.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use databend_common_expression::DataSchema;
3030
use databend_common_expression::DataSchemaRefExt;
3131
use databend_common_expression::Evaluator;
3232
use databend_common_functions::BUILTIN_FUNCTIONS;
33-
use indexmap::IndexMap;
3433

3534
use crate::binder::wrap_cast;
3635
use crate::optimizer::ColumnSet;
@@ -389,8 +388,6 @@ pub fn bind_values(
389388
name_resolution_ctx,
390389
metadata.clone(),
391390
&[],
392-
HashMap::new(),
393-
Box::new(IndexMap::new()),
394391
);
395392

396393
let num_values = values.len();

0 commit comments

Comments
 (0)