From c44326451832a13b66dfd826051455853818f1ae Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sun, 18 Aug 2024 14:45:58 +0800 Subject: [PATCH 01/15] fix the wildcard expand for filter plan --- datafusion/expr/src/logical_plan/builder.rs | 8 ++++ datafusion/expr/src/logical_plan/plan.rs | 23 ++++++++++-- datafusion/expr/src/logical_plan/tree_node.rs | 28 +++++++++++--- datafusion/expr/src/utils.rs | 9 +++++ .../src/analyzer/expand_wildcard_rule.rs | 37 +++++++++++++++++-- 5 files changed, 92 insertions(+), 13 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 2e53a682854c..b581bffbae9f 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -380,6 +380,14 @@ impl LogicalPlanBuilder { .map(Self::from) } + /// Apply a filter which is used for a having clause + pub fn having(self, expr: impl Into) -> Result { + let expr = normalize_col(expr.into(), &self.plan)?; + Filter::try_new_having(expr, Arc::new(self.plan)) + .map(LogicalPlan::Filter) + .map(Self::from) + } + /// Make a builder for a prepare logical plan from the builder's plan pub fn prepare(self, name: String, data_types: Vec) -> Result { Ok(Self::from(LogicalPlan::Prepare(Prepare { diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 2bab6d516a73..99d81a654da5 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -643,9 +643,9 @@ impl LogicalPlan { // todo it isn't clear why the schema is not recomputed here Ok(LogicalPlan::Values(Values { schema, values })) } - LogicalPlan::Filter(Filter { predicate, input }) => { - Filter::try_new(predicate, input).map(LogicalPlan::Filter) - } + LogicalPlan::Filter(Filter { + predicate, input, .. + }) => Filter::try_new(predicate, input).map(LogicalPlan::Filter), LogicalPlan::Repartition(_) => Ok(self), LogicalPlan::Window(Window { input, @@ -2081,6 +2081,8 @@ pub struct Filter { pub predicate: Expr, /// The incoming logical plan pub input: Arc, + /// The flag to indicate if the filter is a having clause + pub having: bool, } impl Filter { @@ -2089,6 +2091,20 @@ impl Filter { /// Notes: as Aliases have no effect on the output of a filter operator, /// they are removed from the predicate expression. pub fn try_new(predicate: Expr, input: Arc) -> Result { + Self::try_new_internal(predicate, input, false) + } + + /// Create a new filter operator for a having clause. + /// This is similar to a filter, but its having flag is set to true. + pub fn try_new_having(predicate: Expr, input: Arc) -> Result { + Self::try_new_internal(predicate, input, true) + } + + fn try_new_internal( + predicate: Expr, + input: Arc, + having: bool, + ) -> Result { // Filter predicates must return a boolean value so we try and validate that here. // Note that it is not always possible to resolve the predicate expression during plan // construction (such as with correlated subqueries) so we make a best effort here and @@ -2105,6 +2121,7 @@ impl Filter { Ok(Self { predicate: predicate.unalias_nested().data, input, + having, }) } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index dbe43128fd38..539cb1cf5fb2 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -87,8 +87,17 @@ impl TreeNode for LogicalPlan { schema, }) }), - LogicalPlan::Filter(Filter { predicate, input }) => rewrite_arc(input, f)? - .update_data(|input| LogicalPlan::Filter(Filter { predicate, input })), + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) => rewrite_arc(input, f)?.update_data(|input| { + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) + }), LogicalPlan::Repartition(Repartition { input, partitioning_scheme, @@ -561,10 +570,17 @@ impl LogicalPlan { value.into_iter().map_until_stop_and_collect(&mut f) })? .update_data(|values| LogicalPlan::Values(Values { schema, values })), - LogicalPlan::Filter(Filter { predicate, input }) => f(predicate)? - .update_data(|predicate| { - LogicalPlan::Filter(Filter { predicate, input }) - }), + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) => f(predicate)?.update_data(|predicate| { + LogicalPlan::Filter(Filter { + predicate, + input, + having, + }) + }), LogicalPlan::Repartition(Repartition { input, partitioning_scheme, diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 4db5061e8fe7..0103b2caa465 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -804,6 +804,15 @@ pub fn find_base_plan(input: &LogicalPlan) -> &LogicalPlan { match input { LogicalPlan::Window(window) => find_base_plan(&window.input), LogicalPlan::Aggregate(agg) => find_base_plan(&agg.input), + LogicalPlan::Filter(filter) => { + if filter.having { + // If a filter is used for a having clause, its input plan is an aggregation. + // We should expand the wildcard expression based on the aggregation's input plan. + find_base_plan(&filter.input) + } else { + input + } + } _ => input, } } diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs index 53ba3042f522..80316d1f5231 100644 --- a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs @@ -160,13 +160,14 @@ fn replace_columns( mod tests { use arrow::datatypes::{DataType, Field, Schema}; + use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan}; + use crate::Analyzer; use datafusion_common::{JoinType, TableReference}; use datafusion_expr::{ - col, in_subquery, qualified_wildcard, table_scan, wildcard, LogicalPlanBuilder, + col, ident, in_subquery, lit, qualified_wildcard, table_scan, wildcard, + LogicalPlanBuilder, }; - - use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan}; - use crate::Analyzer; + use datafusion_functions_aggregate::expr_fn::max; use super::*; @@ -301,4 +302,32 @@ mod tests { Ok(()) } + + #[test] + fn plan_having_wildcard_projection() -> Result<()> { + let aggregate = + table_scan(Some("t1"), &employee_schema(), Some(vec![0, 1, 2, 3, 4]))? + .aggregate( + vec![ + col("t1.id"), + col("t1.first_name"), + col("t1.last_name"), + col("t1.state"), + col("t1.salary"), + ], + vec![max(col("t1.salary"))], + )? + .build()?; + let plan = LogicalPlanBuilder::from(aggregate) + .having(ident("max(t1.salary)").gt(lit(100)))? + .project(vec![wildcard()])? + .build()?; + + let expected = "Projection: t1.id, t1.first_name, t1.last_name, t1.state, t1.salary [id:Int32, first_name:Utf8, last_name:Utf8, state:Utf8, salary:Int32]\ + \n Filter: max(t1.salary) > Int32(100) [id:Int32, first_name:Utf8, last_name:Utf8, state:Utf8, salary:Int32, max(t1.salary):Int32;N]\ + \n Aggregate: groupBy=[[t1.id, t1.first_name, t1.last_name, t1.state, t1.salary]], aggr=[[max(t1.salary)]] [id:Int32, first_name:Utf8, last_name:Utf8, state:Utf8, salary:Int32, max(t1.salary):Int32;N]\ + \n TableScan: t1 projection=[id, first_name, last_name, state, salary] [id:Int32, first_name:Utf8, last_name:Utf8, state:Utf8, salary:Int32]"; + + assert_plan_eq(plan, expected) + } } From fdb850dc484ef42cf5623d87e856e9b94428ee9e Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sun, 18 Aug 2024 14:46:35 +0800 Subject: [PATCH 02/15] expand the wildcard for the error message --- datafusion/sql/src/select.rs | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 339234d9965c..9e898fcd69ad 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -35,7 +35,8 @@ use datafusion_expr::expr_rewriter::{ }; use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::utils::{ - expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, + expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns, + find_aggregate_exprs, find_window_exprs, }; use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, @@ -214,7 +215,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr { LogicalPlanBuilder::from(plan) - .filter(having_expr_post_aggr)? + .having(having_expr_post_aggr)? .build()? } else { plan @@ -749,6 +750,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) .collect::>>()?; + // If the having expression is present and the group by expression is not present, + // we can ensure this is an invalid query. Expand the wildcard expression here to + // get a better error message. + let select_exprs_post_aggr = if having_expr_opt.is_some() + && group_by_exprs.is_empty() + { + select_exprs_post_aggr + .into_iter() + .map(|expr| { + if let Expr::Wildcard { qualifier, options } = expr { + if let Some(qualifier) = qualifier { + Ok::<_, DataFusionError>(expand_qualified_wildcard( + &qualifier, + input.schema(), + Some(&options), + )?) + } else { + Ok(expand_wildcard(input.schema(), &input, Some(&options))?) + } + } else { + Ok(vec![expr]) + } + }) + .collect::>>()? + .into_iter() + .flatten() + .collect() + } else { + select_exprs_post_aggr + }; + // finally, we have some validation that the re-written projection can be resolved // from the aggregate output columns check_columns_satisfy_exprs( From 950472f7c3bc8a675ecd899043d3105ca2ee3425 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sun, 18 Aug 2024 14:46:53 +0800 Subject: [PATCH 03/15] add the tests --- .../sqllogictest/test_files/aggregate.slt | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 0cda24d6ff5e..6821ddaea117 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5643,3 +5643,24 @@ query I??III?T select count(null), min(null), max(null), bit_and(NULL), bit_or(NULL), bit_xor(NULL), nth_value(NULL, 1), string_agg(NULL, ','); ---- 0 NULL NULL NULL NULL NULL NULL NULL + +statement ok +create table having_test(v1 int, v2 int) + +statement ok +insert into having_test values (1, 2), (2, 3), (3, 4) + +query II +select * from having_test group by v1, v2 having max(v1) = 3 +---- +3 4 + +query error DataFusion error: Error during planning: Projection references non-aggregate values: Expression having_test\.v1 could not be resolved from available columns: max\(having_test\.v1\) +select * from having_test having max(v1) = 3 + +# because v2 is not in the group by clause, the sql is invalid +query error +select * from having_test group by v1 having max(v1) = 3 + +statement ok +drop table having_test From 7bf37d755655aedf29c12b75858e340040eacb29 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sun, 18 Aug 2024 14:55:14 +0800 Subject: [PATCH 04/15] fix recompute_schema --- datafusion/expr/src/logical_plan/plan.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 99d81a654da5..d8e7257eb812 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -644,8 +644,10 @@ impl LogicalPlan { Ok(LogicalPlan::Values(Values { schema, values })) } LogicalPlan::Filter(Filter { - predicate, input, .. - }) => Filter::try_new(predicate, input).map(LogicalPlan::Filter), + predicate, input, having, + }) => { + Filter::try_new_internal(predicate, input, having).map(LogicalPlan::Filter) + }, LogicalPlan::Repartition(_) => Ok(self), LogicalPlan::Window(Window { input, From ffedb96b72a3c717b0a59f3d268ecf305f7bd8bb Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sun, 18 Aug 2024 14:57:21 +0800 Subject: [PATCH 05/15] fix clippy --- datafusion/sql/src/select.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 9e898fcd69ad..d5c56791a028 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -767,7 +767,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Some(&options), )?) } else { - Ok(expand_wildcard(input.schema(), &input, Some(&options))?) + Ok(expand_wildcard(input.schema(), input, Some(&options))?) } } else { Ok(vec![expr]) From 2392092227927a00d09764aa906dcf45707e2926 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Sun, 18 Aug 2024 15:04:45 +0800 Subject: [PATCH 06/15] cargo fmt --- datafusion/expr/src/logical_plan/plan.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d8e7257eb812..1ac757f84d99 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -644,10 +644,11 @@ impl LogicalPlan { Ok(LogicalPlan::Values(Values { schema, values })) } LogicalPlan::Filter(Filter { - predicate, input, having, - }) => { - Filter::try_new_internal(predicate, input, having).map(LogicalPlan::Filter) - }, + predicate, + input, + having, + }) => Filter::try_new_internal(predicate, input, having) + .map(LogicalPlan::Filter), LogicalPlan::Repartition(_) => Ok(self), LogicalPlan::Window(Window { input, From 9284e873a581f66ae73fff37ed6505b39ff91c26 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 19 Aug 2024 00:56:57 +0800 Subject: [PATCH 07/15] change the check for having clause --- datafusion/sql/src/select.rs | 61 +++++++++++++++++------------------- 1 file changed, 28 insertions(+), 33 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index d5c56791a028..0598f379c961 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashSet; +use std::ops::Deref; use std::sync::Arc; use crate::planner::{ @@ -35,8 +36,8 @@ use datafusion_expr::expr_rewriter::{ }; use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::utils::{ - expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns, - find_aggregate_exprs, find_window_exprs, + expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_column_exprs, + find_window_exprs, }; use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, @@ -750,37 +751,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) .collect::>>()?; - // If the having expression is present and the group by expression is not present, - // we can ensure this is an invalid query. Expand the wildcard expression here to - // get a better error message. - let select_exprs_post_aggr = if having_expr_opt.is_some() - && group_by_exprs.is_empty() - { - select_exprs_post_aggr - .into_iter() - .map(|expr| { - if let Expr::Wildcard { qualifier, options } = expr { - if let Some(qualifier) = qualifier { - Ok::<_, DataFusionError>(expand_qualified_wildcard( - &qualifier, - input.schema(), - Some(&options), - )?) - } else { - Ok(expand_wildcard(input.schema(), input, Some(&options))?) - } - } else { - Ok(vec![expr]) - } - }) - .collect::>>()? - .into_iter() - .flatten() - .collect() - } else { - select_exprs_post_aggr - }; - // finally, we have some validation that the re-written projection can be resolved // from the aggregate output columns check_columns_satisfy_exprs( @@ -801,6 +771,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { "HAVING clause references non-aggregate values", )?; + // If the select items only contain scalar expressions, we don't need to check + // the column used by the HAVING clause. + // + // For example, the following query is valid: + // SELECT 1 FROM t1 HAVING MAX(t1.v1) = 3; + if !check_contain_scalar_only(&select_exprs_post_aggr) { + // the column used by the HAVING clause must appear in the GROUP BY clause or + // must be part of an aggregate function. + let having_columns = find_column_exprs(&[having_expr.clone()]); + check_columns_satisfy_exprs( + &column_exprs_post_aggr, + &having_columns, + "HAVING clause references non-aggregate values", + )?; + } + Some(having_expr_post_aggr) } else { None @@ -810,6 +796,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } +fn check_contain_scalar_only(exprs: &[Expr]) -> bool { + exprs.iter().all(|expr| match expr { + Expr::ScalarFunction(_) => true, + Expr::Literal(_) => true, + Expr::Alias(alias) => check_contain_scalar_only(&[alias.expr.deref().clone()]), + _ => false, + }) +} + // If there are any multiple-defined windows, we raise an error. fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> { for (i, window_def_i) in window_defs.iter().enumerate() { From 39aa3193d6cb6fb444005435b5c78d57f01159cd Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 19 Aug 2024 00:57:40 +0800 Subject: [PATCH 08/15] rename the function and moving the tests --- datafusion/expr/src/logical_plan/builder.rs | 2 +- datafusion/expr/src/logical_plan/plan.rs | 2 +- .../src/analyzer/expand_wildcard_rule.rs | 28 ---------------- .../sqllogictest/test_files/aggregate.slt | 32 ++++++++++++++++++- 4 files changed, 33 insertions(+), 31 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index b581bffbae9f..a1e410a446aa 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -383,7 +383,7 @@ impl LogicalPlanBuilder { /// Apply a filter which is used for a having clause pub fn having(self, expr: impl Into) -> Result { let expr = normalize_col(expr.into(), &self.plan)?; - Filter::try_new_having(expr, Arc::new(self.plan)) + Filter::try_new_with_having(expr, Arc::new(self.plan)) .map(LogicalPlan::Filter) .map(Self::from) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 1ac757f84d99..f4b3f40c245c 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2099,7 +2099,7 @@ impl Filter { /// Create a new filter operator for a having clause. /// This is similar to a filter, but its having flag is set to true. - pub fn try_new_having(predicate: Expr, input: Arc) -> Result { + pub fn try_new_with_having(predicate: Expr, input: Arc) -> Result { Self::try_new_internal(predicate, input, true) } diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs index 80316d1f5231..8fd906f67b30 100644 --- a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs @@ -302,32 +302,4 @@ mod tests { Ok(()) } - - #[test] - fn plan_having_wildcard_projection() -> Result<()> { - let aggregate = - table_scan(Some("t1"), &employee_schema(), Some(vec![0, 1, 2, 3, 4]))? - .aggregate( - vec![ - col("t1.id"), - col("t1.first_name"), - col("t1.last_name"), - col("t1.state"), - col("t1.salary"), - ], - vec![max(col("t1.salary"))], - )? - .build()?; - let plan = LogicalPlanBuilder::from(aggregate) - .having(ident("max(t1.salary)").gt(lit(100)))? - .project(vec![wildcard()])? - .build()?; - - let expected = "Projection: t1.id, t1.first_name, t1.last_name, t1.state, t1.salary [id:Int32, first_name:Utf8, last_name:Utf8, state:Utf8, salary:Int32]\ - \n Filter: max(t1.salary) > Int32(100) [id:Int32, first_name:Utf8, last_name:Utf8, state:Utf8, salary:Int32, max(t1.salary):Int32;N]\ - \n Aggregate: groupBy=[[t1.id, t1.first_name, t1.last_name, t1.state, t1.salary]], aggr=[[max(t1.salary)]] [id:Int32, first_name:Utf8, last_name:Utf8, state:Utf8, salary:Int32, max(t1.salary):Int32;N]\ - \n TableScan: t1 projection=[id, first_name, last_name, state, salary] [id:Int32, first_name:Utf8, last_name:Utf8, state:Utf8, salary:Int32]"; - - assert_plan_eq(plan, expected) - } } diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 6821ddaea117..ad9b781d9d03 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5655,9 +5655,39 @@ select * from having_test group by v1, v2 having max(v1) = 3 ---- 3 4 -query error DataFusion error: Error during planning: Projection references non-aggregate values: Expression having_test\.v1 could not be resolved from available columns: max\(having_test\.v1\) +query TT +EXPLAIN select * from having_test group by v1, v2 having max(v1) = 3 +---- +logical_plan +01)Projection: having_test.v1, having_test.v2 +02)--Filter: max(having_test.v1) = Int32(3) +03)----Aggregate: groupBy=[[having_test.v1, having_test.v2]], aggr=[[max(having_test.v1)]] +04)------TableScan: having_test projection=[v1, v2] +physical_plan +01)ProjectionExec: expr=[v1@0 as v1, v2@1 as v2] +02)--CoalesceBatchesExec: target_batch_size=8192 +03)----FilterExec: max(having_test.v1)@2 = 3 +04)------AggregateExec: mode=FinalPartitioned, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([v1@0, v2@1], 4), input_partitions=4 +07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +08)--------------AggregateExec: mode=Partial, gby=[v1@0 as v1, v2@1 as v2], aggr=[max(having_test.v1)] +09)----------------MemoryExec: partitions=1, partition_sizes=[1] + + +query error DataFusion error: Error during planning: HAVING clause references non-aggregate values: Expression having_test\.v1 could not be resolved from available columns: max\(having_test\.v1\) select * from having_test having max(v1) = 3 +# If the select items only contain scalar expressions, the having clause is valid. +query P +select now() from having_test having max(v1) = 4 +---- + +# If the select items only contain scalar expressions, the having clause is valid. +query I +select 0 from having_test having max(v1) = 4 +---- + # because v2 is not in the group by clause, the sql is invalid query error select * from having_test group by v1 having max(v1) = 3 From 020ee7dc15ff99325081bf48571fd1ed92aaf34c Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 19 Aug 2024 01:17:31 +0800 Subject: [PATCH 09/15] fix check --- datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs index 8fd906f67b30..dd422f7aab95 100644 --- a/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs @@ -164,10 +164,8 @@ mod tests { use crate::Analyzer; use datafusion_common::{JoinType, TableReference}; use datafusion_expr::{ - col, ident, in_subquery, lit, qualified_wildcard, table_scan, wildcard, - LogicalPlanBuilder, + col, in_subquery, qualified_wildcard, table_scan, wildcard, LogicalPlanBuilder, }; - use datafusion_functions_aggregate::expr_fn::max; use super::*; From ab68aa6cc0ed6ca5a7619e1ec33db6afa53822c6 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Tue, 20 Aug 2024 21:31:14 +0800 Subject: [PATCH 10/15] expand the schema for aggregate plan --- datafusion/sql/src/select.rs | 22 ++-------- datafusion/sql/src/utils.rs | 34 ++++++++++++++- .../sqllogictest/test_files/aggregate.slt | 41 +++++++++++++++++-- 3 files changed, 74 insertions(+), 23 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 0598f379c961..3478e5efa40b 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -36,8 +36,7 @@ use datafusion_expr::expr_rewriter::{ }; use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::utils::{ - expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_column_exprs, - find_window_exprs, + expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, }; use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, @@ -756,6 +755,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { check_columns_satisfy_exprs( &column_exprs_post_aggr, &select_exprs_post_aggr, + input, "Projection references non-aggregate values", )?; @@ -768,25 +768,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { check_columns_satisfy_exprs( &column_exprs_post_aggr, &[having_expr_post_aggr.clone()], + input, "HAVING clause references non-aggregate values", )?; - - // If the select items only contain scalar expressions, we don't need to check - // the column used by the HAVING clause. - // - // For example, the following query is valid: - // SELECT 1 FROM t1 HAVING MAX(t1.v1) = 3; - if !check_contain_scalar_only(&select_exprs_post_aggr) { - // the column used by the HAVING clause must appear in the GROUP BY clause or - // must be part of an aggregate function. - let having_columns = find_column_exprs(&[having_expr.clone()]); - check_columns_satisfy_exprs( - &column_exprs_post_aggr, - &having_columns, - "HAVING clause references non-aggregate values", - )?; - } - Some(having_expr_post_aggr) } else { None diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index af161bba45c1..d7c320e93ea8 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -27,10 +27,13 @@ use datafusion_common::tree_node::{ }; use datafusion_common::{ exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue, + TableReference, }; use datafusion_expr::builder::get_unnested_columns; use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; -use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; +use datafusion_expr::utils::{ + expr_as_column_expr, exprlist_to_fields, find_column_exprs, +}; use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan}; use sqlparser::ast::{Ident, Value}; @@ -90,6 +93,7 @@ pub(crate) fn rebase_expr( pub(crate) fn check_columns_satisfy_exprs( columns: &[Expr], exprs: &[Expr], + input: &LogicalPlan, message_prefix: &str, ) -> Result<()> { columns.iter().try_for_each(|c| match c { @@ -119,9 +123,37 @@ pub(crate) fn check_columns_satisfy_exprs( _ => check_column_satisfies_expr(columns, e, message_prefix)?, } } + let column_names = columns + .iter() + .map(|c| format!("{}", c.schema_name())) + .collect::>(); + for e in exprs { + if let Expr::Wildcard { .. } = e { + exprlist_to_fields(vec![e], input)?.into_iter().try_for_each(|(table, field)| { + let column_name = qualified_name(table, field.name()); + if !column_names.iter().any(|c| c == &column_name) { + plan_err!( + "{}: Wildcard column {} could not be resolved from available columns: {}", + message_prefix, + column_name, + expr_vec_fmt!(columns) + ) + } else { + Ok(()) + } + })?; + }; + } Ok(()) } +fn qualified_name(qualifier: Option, name: &str) -> String { + match qualifier { + Some(q) => format!("{}.{}", q, name), + None => name.to_string(), + } +} + fn check_column_satisfies_expr( columns: &[Expr], expr: &Expr, diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 96b75f2461d7..11654ebdb790 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5646,9 +5646,16 @@ select count(null), min(null), max(null), bit_and(NULL), bit_or(NULL), bit_xor(N statement ok create table having_test(v1 int, v2 int) +statement ok +create table join_table(v1 int, v2 int) + statement ok insert into having_test values (1, 2), (2, 3), (3, 4) +statement ok +insert into join_table values (1, 2), (2, 3), (3, 4) + + query II select * from having_test group by v1, v2 having max(v1) = 3 ---- @@ -5674,9 +5681,38 @@ physical_plan 09)----------------MemoryExec: partitions=1, partition_sizes=[1] -query error DataFusion error: Error during planning: HAVING clause references non-aggregate values: Expression having_test\.v1 could not be resolved from available columns: max\(having_test\.v1\) +query error DataFusion error: Error during planning: Projection references non-aggregate values: Wildcard column having_test\.v1 could not be resolved from available columns: max\(having_test\.v1\) select * from having_test having max(v1) = 3 +query I +select max(v1) from having_test having max(v1) = 3 +---- +3 + +query I +select max(v1), * exclude (v1, v2) from having_test having max(v1) = 3 +---- +3 + +# because v1, v2 is not in the group by clause, the sql is invalid +query III +select max(v1), * replace ('v1' as v3) from having_test group by v1, v2 having max(v1) = 3 +---- +3 3 4 + +query III +select max(v1), t.* from having_test t group by v1, v2 having max(v1) = 3 +---- +3 3 4 + +query error DataFusion error: Error during planning: Projection references non-aggregate values: Wildcard column j\.v1 could not be resolved from available columns: t\.v1, t\.v2, max\(t\.v1\) +select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by t.v1, t.v2 having max(t.v1) = 3 + +query III +select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by j.v1, j.v2 having max(t.v1) = 3 +---- +3 3 4 + # If the select items only contain scalar expressions, the having clause is valid. query P select now() from having_test having max(v1) = 4 @@ -5687,8 +5723,7 @@ query I select 0 from having_test having max(v1) = 4 ---- -# because v2 is not in the group by clause, the sql is invalid -query error +query error DataFusion error: Error during planning: Projection references non-aggregate values: Wildcard column having_test\.v2 could not be resolved from available columns: having_test\.v1, max\(having_test\.v1\) select * from having_test group by v1 having max(v1) = 3 statement ok From 6cb23daad36a2ae99e14ab45d34a23637195476f Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Tue, 20 Aug 2024 21:40:17 +0800 Subject: [PATCH 11/15] reduce the time to expand wildcard --- datafusion/sql/src/select.rs | 11 ++++----- datafusion/sql/src/utils.rs | 43 ++++++++++++++++-------------------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 3478e5efa40b..14ff26c59f2e 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -35,9 +35,7 @@ use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, }; use datafusion_expr::logical_plan::tree_node::unwrap_arc; -use datafusion_expr::utils::{ - expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, -}; +use datafusion_expr::utils::{expr_as_column_expr, expr_to_columns, exprlist_to_fields, find_aggregate_exprs, find_window_exprs}; use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, @@ -750,12 +748,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) .collect::>>()?; + let wildcard_exprs = select_exprs_post_aggr.iter().filter(|expr| matches!(expr, Expr::Wildcard { .. })).collect::>(); + let wildcard_fields = exprlist_to_fields(wildcard_exprs, input)?; + // finally, we have some validation that the re-written projection can be resolved // from the aggregate output columns check_columns_satisfy_exprs( &column_exprs_post_aggr, &select_exprs_post_aggr, - input, + &wildcard_fields, "Projection references non-aggregate values", )?; @@ -768,7 +769,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { check_columns_satisfy_exprs( &column_exprs_post_aggr, &[having_expr_post_aggr.clone()], - input, + &wildcard_fields, "HAVING clause references non-aggregate values", )?; Some(having_expr_post_aggr) diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index d7c320e93ea8..08a753bd92eb 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -18,10 +18,8 @@ //! SQL Utility Functions use std::collections::HashMap; - -use arrow_schema::{ - DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, -}; +use std::sync::Arc; +use arrow_schema::{DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE}; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; @@ -32,7 +30,7 @@ use datafusion_common::{ use datafusion_expr::builder::get_unnested_columns; use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; use datafusion_expr::utils::{ - expr_as_column_expr, exprlist_to_fields, find_column_exprs, + expr_as_column_expr, find_column_exprs, }; use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan}; use sqlparser::ast::{Ident, Value}; @@ -93,7 +91,7 @@ pub(crate) fn rebase_expr( pub(crate) fn check_columns_satisfy_exprs( columns: &[Expr], exprs: &[Expr], - input: &LogicalPlan, + wildcard_fields: &[(Option, Arc)], message_prefix: &str, ) -> Result<()> { columns.iter().try_for_each(|c| match c { @@ -127,27 +125,24 @@ pub(crate) fn check_columns_satisfy_exprs( .iter() .map(|c| format!("{}", c.schema_name())) .collect::>(); - for e in exprs { - if let Expr::Wildcard { .. } = e { - exprlist_to_fields(vec![e], input)?.into_iter().try_for_each(|(table, field)| { - let column_name = qualified_name(table, field.name()); - if !column_names.iter().any(|c| c == &column_name) { - plan_err!( - "{}: Wildcard column {} could not be resolved from available columns: {}", - message_prefix, - column_name, - expr_vec_fmt!(columns) - ) - } else { - Ok(()) - } - })?; - }; - } + + wildcard_fields.into_iter().try_for_each(|(table, field)| { + let column_name = qualified_name(table, field.name()); + if !column_names.iter().any(|c| c == &column_name) { + plan_err!( + "{}: Wildcard column {} could not be resolved from available columns: {}", + message_prefix, + column_name, + expr_vec_fmt!(columns) + ) + } else { + Ok(()) + } + })?; Ok(()) } -fn qualified_name(qualifier: Option, name: &str) -> String { +fn qualified_name(qualifier: &Option, name: &str) -> String { match qualifier { Some(q) => format!("{}.{}", q, name), None => name.to_string(), From 18793298d1f5def42ff935511638653770091eea Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Tue, 20 Aug 2024 23:24:37 +0800 Subject: [PATCH 12/15] clean the testing table after tested --- datafusion/sqllogictest/test_files/aggregate.slt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 11654ebdb790..b5f420f2aab0 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5729,6 +5729,9 @@ select * from having_test group by v1 having max(v1) = 3 statement ok drop table having_test +statement ok +drop table join_table + # test min/max Float16 without group expression query RRTT WITH data AS ( From 69109adb3dc973f165abc0233083122a3c004afc Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Wed, 21 Aug 2024 22:16:32 +0800 Subject: [PATCH 13/15] fmt and address review --- datafusion/sql/src/select.rs | 21 ++++++++------------- datafusion/sql/src/utils.rs | 17 +++++++++-------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 14ff26c59f2e..d1ebfc58ac76 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::HashSet; -use std::ops::Deref; use std::sync::Arc; use crate::planner::{ @@ -35,7 +34,10 @@ use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, }; use datafusion_expr::logical_plan::tree_node::unwrap_arc; -use datafusion_expr::utils::{expr_as_column_expr, expr_to_columns, exprlist_to_fields, find_aggregate_exprs, find_window_exprs}; +use datafusion_expr::utils::{ + expr_as_column_expr, expr_to_columns, exprlist_to_fields, find_aggregate_exprs, + find_window_exprs, +}; use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, @@ -748,9 +750,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) .collect::>>()?; - let wildcard_exprs = select_exprs_post_aggr.iter().filter(|expr| matches!(expr, Expr::Wildcard { .. })).collect::>(); + let wildcard_exprs = select_exprs_post_aggr + .iter() + .filter(|expr| matches!(expr, Expr::Wildcard { .. })) + .collect::>(); let wildcard_fields = exprlist_to_fields(wildcard_exprs, input)?; - // finally, we have some validation that the re-written projection can be resolved // from the aggregate output columns check_columns_satisfy_exprs( @@ -781,15 +785,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } -fn check_contain_scalar_only(exprs: &[Expr]) -> bool { - exprs.iter().all(|expr| match expr { - Expr::ScalarFunction(_) => true, - Expr::Literal(_) => true, - Expr::Alias(alias) => check_contain_scalar_only(&[alias.expr.deref().clone()]), - _ => false, - }) -} - // If there are any multiple-defined windows, we raise an error. fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> { for (i, window_def_i) in window_defs.iter().enumerate() { diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 08a753bd92eb..17b6a9d75cbd 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -17,9 +17,10 @@ //! SQL Utility Functions -use std::collections::HashMap; -use std::sync::Arc; -use arrow_schema::{DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE}; +use arrow_schema::{ + DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, + DECIMAL_DEFAULT_SCALE, +}; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; @@ -29,11 +30,11 @@ use datafusion_common::{ }; use datafusion_expr::builder::get_unnested_columns; use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; -use datafusion_expr::utils::{ - expr_as_column_expr, find_column_exprs, -}; +use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan}; use sqlparser::ast::{Ident, Value}; +use std::collections::HashMap; +use std::sync::Arc; /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { @@ -123,10 +124,10 @@ pub(crate) fn check_columns_satisfy_exprs( } let column_names = columns .iter() - .map(|c| format!("{}", c.schema_name())) + .map(|c| c.schema_name().to_string()) .collect::>(); - wildcard_fields.into_iter().try_for_each(|(table, field)| { + wildcard_fields.iter().try_for_each(|(table, field)| { let column_name = qualified_name(table, field.name()); if !column_names.iter().any(|c| c == &column_name) { plan_err!( From 15cb06288ab2450cb7e1b7686cdb082f489652a4 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Wed, 21 Aug 2024 23:09:01 +0800 Subject: [PATCH 14/15] stop expand wildcard and add more check for group-by and selects --- datafusion/sql/src/select.rs | 57 ++++++++++++++++--- datafusion/sql/src/utils.rs | 36 ++---------- .../sqllogictest/test_files/aggregate.slt | 8 ++- 3 files changed, 57 insertions(+), 44 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index d1ebfc58ac76..683f48159ff6 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashSet; +use std::ops::Deref; use std::sync::Arc; use crate::planner::{ @@ -35,7 +36,7 @@ use datafusion_expr::expr_rewriter::{ }; use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::utils::{ - expr_as_column_expr, expr_to_columns, exprlist_to_fields, find_aggregate_exprs, + expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_column_exprs, find_window_exprs, }; use datafusion_expr::{ @@ -750,17 +751,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input)) .collect::>>()?; - let wildcard_exprs = select_exprs_post_aggr - .iter() - .filter(|expr| matches!(expr, Expr::Wildcard { .. })) - .collect::>(); - let wildcard_fields = exprlist_to_fields(wildcard_exprs, input)?; // finally, we have some validation that the re-written projection can be resolved // from the aggregate output columns check_columns_satisfy_exprs( &column_exprs_post_aggr, &select_exprs_post_aggr, - &wildcard_fields, "Projection references non-aggregate values", )?; @@ -770,12 +765,47 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let having_expr_post_aggr = rebase_expr(having_expr, &aggr_projection_exprs, input)?; - check_columns_satisfy_exprs( + let is_grouping_projection_valid = check_columns_satisfy_exprs( &column_exprs_post_aggr, &[having_expr_post_aggr.clone()], - &wildcard_fields, "HAVING clause references non-aggregate values", )?; + + // If the select items only contain scalar expressions, we don't need to check + // the column used by the HAVING clause. + // + // For example, the following query is valid: + // SELECT 1 FROM t1 HAVING MAX(t1.v1) = 3; + // + // If group-by and projection are invalid, we should check the columns used by having clause. + if !check_contain_scalar_only(&select_exprs_post_aggr) + && !is_grouping_projection_valid + { + // the column used by the HAVING clause must appear in the GROUP BY clause or + // must be part of an aggregate function. + let having_columns = find_column_exprs(&[having_expr.clone()]); + // aggregation functions are checked in the previous step. In this step, + // we only need to check the projection columns containing the columns in the having clause. + let aggr_projection_columns = aggr_projection_exprs + .into_iter() + .filter(|expr| { + matches!(expr, Expr::Column(_) | Expr::Wildcard { .. }) + }) + .collect::>(); + // If the projection column is empty, it means the projection only contains + // aggregation functions or wildcard. In this case, we don't need to check + // the column used by the HAVING clause. + // For example, the following query is valid: + // SELECT MAX(v1) FROM t1 HAVING MAX(v1) = 3; + if !aggr_projection_columns.is_empty() { + check_columns_satisfy_exprs( + &aggr_projection_columns, + &having_columns, + "HAVING clause references non-aggregate values", + )?; + } + } + Some(having_expr_post_aggr) } else { None @@ -785,6 +815,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } +fn check_contain_scalar_only(exprs: &[Expr]) -> bool { + exprs.iter().all(|expr| match expr { + Expr::ScalarFunction(_) => true, + Expr::Literal(_) => true, + Expr::Alias(alias) => check_contain_scalar_only(&[alias.expr.deref().clone()]), + _ => false, + }) +} + // If there are any multiple-defined windows, we raise an error. fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> { for (i, window_def_i) in window_defs.iter().enumerate() { diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 17b6a9d75cbd..c70d0023e753 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -18,15 +18,13 @@ //! SQL Utility Functions use arrow_schema::{ - DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, - DECIMAL_DEFAULT_SCALE, + DataType, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, DECIMAL_DEFAULT_SCALE, }; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; use datafusion_common::{ exec_err, internal_err, plan_err, Column, DataFusionError, Result, ScalarValue, - TableReference, }; use datafusion_expr::builder::get_unnested_columns; use datafusion_expr::expr::{Alias, GroupingSet, Unnest, WindowFunction}; @@ -34,7 +32,6 @@ use datafusion_expr::utils::{expr_as_column_expr, find_column_exprs}; use datafusion_expr::{expr_vec_fmt, Expr, ExprSchemable, LogicalPlan}; use sqlparser::ast::{Ident, Value}; use std::collections::HashMap; -use std::sync::Arc; /// Make a best-effort attempt at resolving all columns in the expression tree pub(crate) fn resolve_columns(expr: &Expr, plan: &LogicalPlan) -> Result { @@ -89,12 +86,12 @@ pub(crate) fn rebase_expr( /// Determines if the set of `Expr`'s are a valid projection on the input /// `Expr::Column`'s. +/// Return true if the expressions are valid and not empty. pub(crate) fn check_columns_satisfy_exprs( columns: &[Expr], exprs: &[Expr], - wildcard_fields: &[(Option, Arc)], message_prefix: &str, -) -> Result<()> { +) -> Result { columns.iter().try_for_each(|c| match c { Expr::Column(_) => Ok(()), _ => internal_err!("Expr::Column are required"), @@ -122,32 +119,7 @@ pub(crate) fn check_columns_satisfy_exprs( _ => check_column_satisfies_expr(columns, e, message_prefix)?, } } - let column_names = columns - .iter() - .map(|c| c.schema_name().to_string()) - .collect::>(); - - wildcard_fields.iter().try_for_each(|(table, field)| { - let column_name = qualified_name(table, field.name()); - if !column_names.iter().any(|c| c == &column_name) { - plan_err!( - "{}: Wildcard column {} could not be resolved from available columns: {}", - message_prefix, - column_name, - expr_vec_fmt!(columns) - ) - } else { - Ok(()) - } - })?; - Ok(()) -} - -fn qualified_name(qualifier: &Option, name: &str) -> String { - match qualifier { - Some(q) => format!("{}.{}", q, name), - None => name.to_string(), - } + Ok(!column_exprs.is_empty()) } fn check_column_satisfies_expr( diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index b5f420f2aab0..6789358c15f9 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5681,7 +5681,7 @@ physical_plan 09)----------------MemoryExec: partitions=1, partition_sizes=[1] -query error DataFusion error: Error during planning: Projection references non-aggregate values: Wildcard column having_test\.v1 could not be resolved from available columns: max\(having_test\.v1\) +query error select * from having_test having max(v1) = 3 query I @@ -5705,7 +5705,8 @@ select max(v1), t.* from having_test t group by v1, v2 having max(v1) = 3 ---- 3 3 4 -query error DataFusion error: Error during planning: Projection references non-aggregate values: Wildcard column j\.v1 could not be resolved from available columns: t\.v1, t\.v2, max\(t\.v1\) +# j.* should also be included in the group-by clause +query error select max(t.v1), j.* from having_test t join join_table j on t.v1 = j.v1 group by t.v1, t.v2 having max(t.v1) = 3 query III @@ -5723,7 +5724,8 @@ query I select 0 from having_test having max(v1) = 4 ---- -query error DataFusion error: Error during planning: Projection references non-aggregate values: Wildcard column having_test\.v2 could not be resolved from available columns: having_test\.v1, max\(having_test\.v1\) +# v2 should also be included in group-by clause +query error select * from having_test group by v1 having max(v1) = 3 statement ok From 17ebb047c41e94aa807816e7ed68ebe174b9852f Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Thu, 22 Aug 2024 10:26:54 +0800 Subject: [PATCH 15/15] simplify the having check --- datafusion/sql/src/select.rs | 50 ++---------------------------------- datafusion/sql/src/utils.rs | 5 ++-- 2 files changed, 4 insertions(+), 51 deletions(-) diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 683f48159ff6..0679173db16e 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::HashSet; -use std::ops::Deref; use std::sync::Arc; use crate::planner::{ @@ -36,8 +35,7 @@ use datafusion_expr::expr_rewriter::{ }; use datafusion_expr::logical_plan::tree_node::unwrap_arc; use datafusion_expr::utils::{ - expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_column_exprs, - find_window_exprs, + expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs, }; use datafusion_expr::{ qualified_wildcard_with_options, wildcard_with_options, Aggregate, Expr, Filter, @@ -765,47 +763,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let having_expr_post_aggr = rebase_expr(having_expr, &aggr_projection_exprs, input)?; - let is_grouping_projection_valid = check_columns_satisfy_exprs( + check_columns_satisfy_exprs( &column_exprs_post_aggr, &[having_expr_post_aggr.clone()], "HAVING clause references non-aggregate values", )?; - // If the select items only contain scalar expressions, we don't need to check - // the column used by the HAVING clause. - // - // For example, the following query is valid: - // SELECT 1 FROM t1 HAVING MAX(t1.v1) = 3; - // - // If group-by and projection are invalid, we should check the columns used by having clause. - if !check_contain_scalar_only(&select_exprs_post_aggr) - && !is_grouping_projection_valid - { - // the column used by the HAVING clause must appear in the GROUP BY clause or - // must be part of an aggregate function. - let having_columns = find_column_exprs(&[having_expr.clone()]); - // aggregation functions are checked in the previous step. In this step, - // we only need to check the projection columns containing the columns in the having clause. - let aggr_projection_columns = aggr_projection_exprs - .into_iter() - .filter(|expr| { - matches!(expr, Expr::Column(_) | Expr::Wildcard { .. }) - }) - .collect::>(); - // If the projection column is empty, it means the projection only contains - // aggregation functions or wildcard. In this case, we don't need to check - // the column used by the HAVING clause. - // For example, the following query is valid: - // SELECT MAX(v1) FROM t1 HAVING MAX(v1) = 3; - if !aggr_projection_columns.is_empty() { - check_columns_satisfy_exprs( - &aggr_projection_columns, - &having_columns, - "HAVING clause references non-aggregate values", - )?; - } - } - Some(having_expr_post_aggr) } else { None @@ -815,15 +778,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } -fn check_contain_scalar_only(exprs: &[Expr]) -> bool { - exprs.iter().all(|expr| match expr { - Expr::ScalarFunction(_) => true, - Expr::Literal(_) => true, - Expr::Alias(alias) => check_contain_scalar_only(&[alias.expr.deref().clone()]), - _ => false, - }) -} - // If there are any multiple-defined windows, we raise an error. fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> { for (i, window_def_i) in window_defs.iter().enumerate() { diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index c70d0023e753..c32acecaae5f 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -86,12 +86,11 @@ pub(crate) fn rebase_expr( /// Determines if the set of `Expr`'s are a valid projection on the input /// `Expr::Column`'s. -/// Return true if the expressions are valid and not empty. pub(crate) fn check_columns_satisfy_exprs( columns: &[Expr], exprs: &[Expr], message_prefix: &str, -) -> Result { +) -> Result<()> { columns.iter().try_for_each(|c| match c { Expr::Column(_) => Ok(()), _ => internal_err!("Expr::Column are required"), @@ -119,7 +118,7 @@ pub(crate) fn check_columns_satisfy_exprs( _ => check_column_satisfies_expr(columns, e, message_prefix)?, } } - Ok(!column_exprs.is_empty()) + Ok(()) } fn check_column_satisfies_expr(