Skip to content

Commit 7509439

Browse files
authored
fix(query): fix group by with alias column can't bind the column (#16804)
* fix(query): fix group by with alias column * modify mock redis port * fix explain test * fix * fix typos * fix nested agg srf * add tests
1 parent 6dff8e3 commit 7509439

28 files changed

+720
-338
lines changed

โ€Žsrc/query/sql/src/planner/binder/aggregate.rs

Lines changed: 100 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,14 @@ use itertools::Itertools;
3333
use super::prune_by_children;
3434
use super::ExprContext;
3535
use super::Finder;
36+
use crate::binder::project_set::SetReturningRewriter;
3637
use crate::binder::scalar::ScalarBinder;
3738
use crate::binder::select::SelectList;
3839
use crate::binder::Binder;
3940
use crate::binder::ColumnBinding;
4041
use crate::binder::ColumnBindingBuilder;
4142
use crate::binder::Visibility;
43+
use crate::normalize_identifier;
4244
use crate::optimizer::SExpr;
4345
use crate::plans::walk_expr_mut;
4446
use crate::plans::Aggregate;
@@ -347,6 +349,13 @@ impl Binder {
347349
bind_context: &mut BindContext,
348350
select_list: &mut SelectList,
349351
) -> Result<()> {
352+
if !bind_context.srf_info.srfs.is_empty() {
353+
// Rewrite the Set-returning functions in Aggregate function as columns.
354+
let mut srf_rewriter = SetReturningRewriter::new(bind_context, true);
355+
for item in select_list.items.iter_mut() {
356+
srf_rewriter.visit(&mut item.scalar)?;
357+
}
358+
}
350359
let mut rewriter = AggregateRewriter::new(bind_context, self.metadata.clone());
351360
for item in select_list.items.iter_mut() {
352361
rewriter.visit(&mut item.scalar)?;
@@ -373,22 +382,16 @@ impl Binder {
373382

374383
// Extract available aliases from `SELECT` clause,
375384
for item in select_list.items.iter() {
376-
if let SelectTarget::AliasedExpr { alias: Some(_), .. } = item.select_target {
377-
let column = if let ScalarExpr::BoundColumnRef(column_ref) = &item.scalar {
378-
let mut column = column_ref.column.clone();
379-
column.column_name = item.alias.clone();
380-
column
381-
} else {
382-
self.create_derived_column_binding(
383-
item.alias.clone(),
384-
item.scalar.data_type()?,
385-
Some(item.scalar.clone()),
386-
)
387-
};
388-
available_aliases.push((column, item.scalar.clone()));
385+
if let SelectTarget::AliasedExpr {
386+
alias: Some(alias), ..
387+
} = item.select_target
388+
{
389+
let column = normalize_identifier(alias, &self.name_resolution_ctx);
390+
available_aliases.push((column.name, item.scalar.clone()));
389391
}
390392
}
391393

394+
let original_context = bind_context.expr_context.clone();
392395
bind_context.set_expr_context(ExprContext::GroupClaue);
393396
match group_by {
394397
GroupBy::Normal(exprs) => self.resolve_group_items(
@@ -398,7 +401,7 @@ impl Binder {
398401
&available_aliases,
399402
false,
400403
&mut vec![],
401-
),
404+
)?,
402405
GroupBy::All => {
403406
let groups = self.resolve_group_all(select_list)?;
404407
self.resolve_group_items(
@@ -408,10 +411,10 @@ impl Binder {
408411
&available_aliases,
409412
false,
410413
&mut vec![],
411-
)
414+
)?;
412415
}
413416
GroupBy::GroupingSets(sets) => {
414-
self.resolve_grouping_sets(bind_context, select_list, sets, &available_aliases)
417+
self.resolve_grouping_sets(bind_context, select_list, sets, &available_aliases)?;
415418
}
416419
// TODO: avoid too many clones.
417420
GroupBy::Rollup(exprs) => {
@@ -420,16 +423,18 @@ impl Binder {
420423
for i in (0..=exprs.len()).rev() {
421424
sets.push(exprs[0..i].to_vec());
422425
}
423-
self.resolve_grouping_sets(bind_context, select_list, &sets, &available_aliases)
426+
self.resolve_grouping_sets(bind_context, select_list, &sets, &available_aliases)?;
424427
}
425428
GroupBy::Cube(exprs) => {
426429
// CUBE (a,b) => GROUPING SETS ((a,b),(a),(b),()) // All subsets
427430
let sets = (0..=exprs.len())
428431
.flat_map(|count| exprs.clone().into_iter().combinations(count))
429432
.collect::<Vec<_>>();
430-
self.resolve_grouping_sets(bind_context, select_list, &sets, &available_aliases)
433+
self.resolve_grouping_sets(bind_context, select_list, &sets, &available_aliases)?;
431434
}
432435
}
436+
bind_context.set_expr_context(original_context);
437+
Ok(())
433438
}
434439

435440
pub fn bind_aggregate(
@@ -490,7 +495,7 @@ impl Binder {
490495
bind_context: &mut BindContext,
491496
select_list: &SelectList<'_>,
492497
sets: &[Vec<Expr>],
493-
available_aliases: &[(ColumnBinding, ScalarExpr)],
498+
available_aliases: &[(String, ScalarExpr)],
494499
) -> Result<()> {
495500
let mut grouping_sets = Vec::with_capacity(sets.len());
496501
for set in sets {
@@ -587,7 +592,7 @@ impl Binder {
587592
bind_context: &mut BindContext,
588593
select_list: &SelectList<'_>,
589594
group_by: &[Expr],
590-
available_aliases: &[(ColumnBinding, ScalarExpr)],
595+
available_aliases: &[(String, ScalarExpr)],
591596
collect_grouping_sets: bool,
592597
grouping_sets: &mut Vec<Vec<ScalarExpr>>,
593598
) -> Result<()> {
@@ -640,9 +645,9 @@ impl Binder {
640645
self.metadata.clone(),
641646
&[],
642647
);
643-
let (scalar_expr, _) = scalar_binder
648+
let (mut scalar_expr, _) = scalar_binder
644649
.bind(expr)
645-
.or_else(|e| Self::resolve_alias_item(bind_context, expr, available_aliases, e))?;
650+
.or_else(|e| self.resolve_alias_item(bind_context, expr, available_aliases, e))?;
646651

647652
if collect_grouping_sets && !grouping_sets.last().unwrap().contains(&scalar_expr) {
648653
grouping_sets.last_mut().unwrap().push(scalar_expr.clone());
@@ -657,6 +662,11 @@ impl Binder {
657662
continue;
658663
}
659664

665+
if !bind_context.srf_info.srfs.is_empty() {
666+
let mut srf_rewriter = SetReturningRewriter::new(bind_context, false);
667+
srf_rewriter.visit(&mut scalar_expr)?;
668+
}
669+
660670
let group_item_name = format!("{:#}", expr);
661671
let index = if let ScalarExpr::BoundColumnRef(BoundColumnRef {
662672
column: ColumnBinding { index, .. },
@@ -766,17 +776,18 @@ impl Binder {
766776

767777
Ok((scalar, alias))
768778
}
779+
769780
fn resolve_alias_item(
781+
&mut self,
770782
bind_context: &mut BindContext,
771783
expr: &Expr,
772-
available_aliases: &[(ColumnBinding, ScalarExpr)],
784+
available_aliases: &[(String, ScalarExpr)],
773785
original_error: ErrorCode,
774786
) -> Result<(ScalarExpr, DataType)> {
775787
let mut result: Vec<usize> = vec![];
776788
// If cannot resolve group item, then try to find an available alias
777-
for (i, (column_binding, _)) in available_aliases.iter().enumerate() {
789+
for (i, (alias, _)) in available_aliases.iter().enumerate() {
778790
// Alias of the select item
779-
let col_name = column_binding.column_name.as_str();
780791
if let Expr::ColumnRef {
781792
column:
782793
ColumnRef {
@@ -787,7 +798,7 @@ impl Binder {
787798
..
788799
} = expr
789800
{
790-
if col_name.eq_ignore_ascii_case(column.name()) {
801+
if alias.eq_ignore_ascii_case(column.name()) {
791802
result.push(i);
792803
}
793804
}
@@ -801,31 +812,75 @@ impl Binder {
801812
.set_span(expr.span()),
802813
)
803814
} else {
804-
let (column_binding, scalar) = available_aliases[result[0]].clone();
805-
// We will add the alias to BindContext, so we can reference it
806-
// in `HAVING` and `ORDER BY` clause.
807-
bind_context.add_column_binding(column_binding.clone());
815+
let (alias, mut scalar) = available_aliases[result[0]].clone();
808816

809-
let index = column_binding.index;
810-
bind_context.aggregate_info.group_items.push(ScalarItem {
811-
scalar: scalar.clone(),
812-
index,
813-
});
814-
bind_context.aggregate_info.group_items_map.insert(
815-
scalar.clone(),
816-
bind_context.aggregate_info.group_items.len() - 1,
817-
);
817+
if !bind_context.srf_info.srfs.is_empty() {
818+
let mut srf_rewriter = SetReturningRewriter::new(bind_context, false);
819+
srf_rewriter.visit(&mut scalar)?;
820+
}
821+
822+
// check scalar first, avoid duplicate create column.
823+
let mut scalar_column_index = None;
824+
let column_binding = if let Some(column_index) =
825+
bind_context.aggregate_info.group_items_map.get(&scalar)
826+
{
827+
scalar_column_index = Some(*column_index);
828+
829+
let group_item = &bind_context.aggregate_info.group_items[*column_index];
830+
ColumnBindingBuilder::new(
831+
alias.clone(),
832+
group_item.index,
833+
Box::new(group_item.scalar.data_type()?),
834+
Visibility::Visible,
835+
)
836+
.build()
837+
} else if let ScalarExpr::BoundColumnRef(column_ref) = &scalar {
838+
let mut column = column_ref.column.clone();
839+
column.column_name = alias.clone();
840+
column
841+
} else {
842+
self.create_derived_column_binding(
843+
alias.clone(),
844+
scalar.data_type()?,
845+
Some(scalar.clone()),
846+
)
847+
};
848+
849+
if scalar_column_index.is_none() {
850+
let index = column_binding.index;
851+
bind_context.aggregate_info.group_items.push(ScalarItem {
852+
scalar: scalar.clone(),
853+
index,
854+
});
855+
bind_context.aggregate_info.group_items_map.insert(
856+
scalar.clone(),
857+
bind_context.aggregate_info.group_items.len() - 1,
858+
);
859+
scalar_column_index = Some(bind_context.aggregate_info.group_items.len() - 1);
860+
}
861+
862+
let scalar_column_index = scalar_column_index.unwrap();
818863

819-
// Add a mapping (alias -> scalar), so we can resolve the alias later
820864
let column_ref: ScalarExpr = BoundColumnRef {
821865
span: scalar.span(),
822-
column: column_binding,
866+
column: column_binding.clone(),
823867
}
824868
.into();
825-
bind_context.aggregate_info.group_items_map.insert(
826-
column_ref,
827-
bind_context.aggregate_info.group_items.len() - 1,
828-
);
869+
let has_column = bind_context
870+
.aggregate_info
871+
.group_items_map
872+
.contains_key(&column_ref);
873+
if !has_column {
874+
// We will add the alias to BindContext, so we can reference it
875+
// in `HAVING` and `ORDER BY` clause.
876+
bind_context.add_column_binding(column_binding.clone());
877+
878+
// Add a mapping (alias -> scalar), so we can resolve the alias later
879+
bind_context
880+
.aggregate_info
881+
.group_items_map
882+
.insert(column_ref, scalar_column_index);
883+
}
829884

830885
Ok((scalar.clone(), scalar.data_type()?))
831886
}

โ€Žsrc/query/sql/src/planner/binder/bind_context.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@ use itertools::Itertools;
3939
use super::AggregateInfo;
4040
use super::INTERNAL_COLUMN_FACTORY;
4141
use crate::binder::column_binding::ColumnBinding;
42+
use crate::binder::project_set::SetReturningInfo;
4243
use crate::binder::window::WindowInfo;
4344
use crate::binder::ColumnBindingBuilder;
4445
use crate::normalize_identifier;
4546
use crate::optimizer::SExpr;
4647
use crate::plans::MaterializedCte;
4748
use crate::plans::RelOperator;
4849
use crate::plans::ScalarExpr;
49-
use crate::plans::ScalarItem;
5050
use crate::ColumnSet;
5151
use crate::IndexType;
5252
use crate::MetadataRef;
@@ -122,6 +122,9 @@ pub struct BindContext {
122122

123123
pub windows: WindowInfo,
124124

125+
/// Set-returning functions info in current context.
126+
pub srf_info: SetReturningInfo,
127+
125128
pub cte_context: CteContext,
126129

127130
/// True if there is aggregation in current context, which means
@@ -134,9 +137,6 @@ pub struct BindContext {
134137
/// It's used to check if the view has a loop dependency.
135138
pub view_info: Option<(String, String)>,
136139

137-
/// Set-returning functions in current context.
138-
pub srfs: Vec<ScalarItem>,
139-
140140
/// True if there is async function in current context, need rewrite.
141141
pub have_async_func: bool,
142142
/// True if there is udf script in current context, need rewrite.
@@ -251,10 +251,10 @@ impl BindContext {
251251
bound_internal_columns: BTreeMap::new(),
252252
aggregate_info: AggregateInfo::default(),
253253
windows: WindowInfo::default(),
254+
srf_info: SetReturningInfo::default(),
254255
cte_context: CteContext::default(),
255256
in_grouping: false,
256257
view_info: None,
257-
srfs: Vec::new(),
258258
have_async_func: false,
259259
have_udf_script: false,
260260
have_udf_server: false,
@@ -272,10 +272,10 @@ impl BindContext {
272272
bound_internal_columns: BTreeMap::new(),
273273
aggregate_info: Default::default(),
274274
windows: Default::default(),
275+
srf_info: Default::default(),
275276
cte_context: parent.cte_context.clone(),
276277
in_grouping: false,
277278
view_info: None,
278-
srfs: Vec::new(),
279279
have_async_func: false,
280280
have_udf_script: false,
281281
have_udf_server: false,

โ€Žsrc/query/sql/src/planner/binder/bind_query/bind_select.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,15 @@ impl Binder {
134134
.map(|item| (item.alias.clone(), item.scalar.clone()))
135135
.collect::<Vec<_>>();
136136

137-
let have_srfs = !from_context.srfs.is_empty();
138-
if have_srfs {
139-
// Bind set returning functions first.
140-
s_expr = self.bind_project_set(&mut from_context, s_expr)?;
137+
// Check Set-returning functions, if the argument contains aggregation function or group item,
138+
// set as lazy Set-returning functions.
139+
if !from_context.srf_info.srfs.is_empty() {
140+
self.check_project_set_select(&mut from_context)?;
141+
}
142+
143+
// Bind Set-returning functions before filter plan and aggregate plan.
144+
if !from_context.srf_info.srfs.is_empty() {
145+
s_expr = self.bind_project_set(&mut from_context, s_expr, false)?;
141146
}
142147

143148
// To support using aliased column in `WHERE` clause,
@@ -179,7 +184,7 @@ impl Binder {
179184
)?;
180185

181186
// After all analysis is done.
182-
if !have_srfs {
187+
if from_context.srf_info.srfs.is_empty() {
183188
// Ignore SRFs.
184189
self.analyze_lazy_materialization(
185190
&from_context,
@@ -208,6 +213,11 @@ impl Binder {
208213
s_expr = self.bind_window_function(window_info, s_expr)?;
209214
}
210215

216+
// Bind lazy Set-returning functions after aggregate plan.
217+
if !from_context.srf_info.lazy_srf_set.is_empty() {
218+
s_expr = self.bind_project_set(&mut from_context, s_expr, true)?;
219+
}
220+
211221
if let Some(qualify) = qualify {
212222
s_expr = self.bind_qualify(&mut from_context, qualify, s_expr)?;
213223
}

0 commit comments

Comments
ย (0)