Skip to content

Commit 8915c60

Browse files
mihailotim-dbcloud-fan
authored andcommitted
[SPARK-52705][SQL] Refactor deterministic check for grouping expressions
### What changes were proposed in this pull request? Move check for non-deterministic expressions in grouping expressions from `ExprUtils` to `CheckAnalysis`. ### Why are the changes needed? This is necessary in order to be able to utilize `PullOutNonDeterminstic` rule as a post-processing rewrite rule in single-pass analyzer. Because `ExprUtils.assertValidAggregate` is called during the bottom-up traversal, we can't check for non-determinstic expressions there ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51391 from mihailotim-db/mihailotim-db/pull_out_nondeterministic. Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent f8ab74e commit 8915c60

File tree

3 files changed

+26
-14
lines changed

3 files changed

+26
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,19 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
569569
messageParameters = Map.empty)
570570
}
571571

572-
case a: Aggregate => ExprUtils.assertValidAggregation(a)
572+
case a: Aggregate =>
573+
a.groupingExpressions.foreach(
574+
expression =>
575+
if (!expression.deterministic) {
576+
throw SparkException.internalError(
577+
msg = s"Non-deterministic expression '${toSQLExpr(expression)}' should not " +
578+
"appear in grouping expression.",
579+
context = expression.origin.getQueryContext,
580+
summary = expression.origin.context.summary
581+
)
582+
}
583+
)
584+
ExprUtils.assertValidAggregation(a)
573585

574586
case CollectMetrics(name, metrics, _, _) =>
575587
if (name == null || name.isEmpty) {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/PullOutNondeterministic.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import scala.jdk.CollectionConverters._
2121

22+
import org.apache.spark.SparkException
2223
import org.apache.spark.sql.catalyst.expressions._
24+
import org.apache.spark.sql.catalyst.expressions.ExprUtils.toSQLExpr
2325
import org.apache.spark.sql.catalyst.plans.logical._
2426
import org.apache.spark.sql.catalyst.rules.Rule
2527

@@ -39,10 +41,20 @@ object PullOutNondeterministic extends Rule[LogicalPlan] {
3941
val nondeterToAttr =
4042
NondeterministicExpressionCollection.getNondeterministicToAttributes(a.groupingExpressions)
4143
val newChild = Project(a.child.output ++ nondeterToAttr.values.asScala.toSeq, a.child)
42-
a.transformExpressions { case e =>
44+
val deterministicAggregate = a.transformExpressions { case e =>
4345
Option(nondeterToAttr.get(e)).map(_.toAttribute).getOrElse(e)
4446
}.copy(child = newChild)
4547

48+
deterministicAggregate.groupingExpressions.foreach(expr => if (!expr.deterministic) {
49+
throw SparkException.internalError(
50+
msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not appear in " +
51+
"grouping expression.",
52+
context = expr.origin.getQueryContext,
53+
summary = expr.origin.context.summary)
54+
})
55+
56+
deterministicAggregate
57+
4658
// Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail)
4759
// and we want to retain them inside the aggregate functions.
4860
case m: CollectMetrics => m

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
2020
import java.text.{DecimalFormat, DecimalFormatSymbols, ParsePosition}
2121
import java.util.Locale
2222

23-
import org.apache.spark.SparkException
2423
import org.apache.spark.sql.catalyst.analysis._
2524
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2625
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
@@ -209,17 +208,6 @@ object ExprUtils extends EvalHelper with QueryErrorsBase {
209208
"sqlExpr" -> toSQLExpr(expr),
210209
"dataType" -> toSQLType(expr.dataType)))
211210
}
212-
213-
if (!expr.deterministic) {
214-
// This is just a sanity check, our analysis rule PullOutNondeterministic should
215-
// already pull out those nondeterministic expressions and evaluate them in
216-
// a Project node.
217-
throw SparkException.internalError(
218-
msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not appear in " +
219-
"grouping expression.",
220-
context = expr.origin.getQueryContext,
221-
summary = expr.origin.context.summary)
222-
}
223211
}
224212

225213
a.groupingExpressions.foreach(checkValidGroupingExprs)

0 commit comments

Comments
 (0)