Skip to content

[SPARK-52705][SQL] Refactor deterministic check for grouping expressions #51391

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,19 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
messageParameters = Map.empty)
}

case a: Aggregate => ExprUtils.assertValidAggregation(a)
case a: Aggregate =>
a.groupingExpressions.foreach(
expression =>
if (!expression.deterministic) {
throw SparkException.internalError(
msg = s"Non-deterministic expression '${toSQLExpr(expression)}' should not " +
"appear in grouping expression.",
context = expression.origin.getQueryContext,
summary = expression.origin.context.summary
)
}
)
ExprUtils.assertValidAggregation(a)

case CollectMetrics(name, metrics, _, _) =>
if (name == null || name.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis

import scala.jdk.CollectionConverters._

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.ExprUtils.toSQLExpr
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule

Expand All @@ -39,10 +41,20 @@ object PullOutNondeterministic extends Rule[LogicalPlan] {
val nondeterToAttr =
NondeterministicExpressionCollection.getNondeterministicToAttributes(a.groupingExpressions)
val newChild = Project(a.child.output ++ nondeterToAttr.values.asScala.toSeq, a.child)
a.transformExpressions { case e =>
val deterministicAggregate = a.transformExpressions { case e =>
Option(nondeterToAttr.get(e)).map(_.toAttribute).getOrElse(e)
}.copy(child = newChild)

deterministicAggregate.groupingExpressions.foreach(expr => if (!expr.deterministic) {
throw SparkException.internalError(
msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not appear in " +
"grouping expression.",
context = expr.origin.getQueryContext,
summary = expr.origin.context.summary)
})

deterministicAggregate

// Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail)
// and we want to retain them inside the aggregate functions.
case m: CollectMetrics => m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
import java.text.{DecimalFormat, DecimalFormatSymbols, ParsePosition}
import java.util.Locale

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{DataTypeMismatch, TypeCheckSuccess}
Expand Down Expand Up @@ -209,17 +208,6 @@ object ExprUtils extends EvalHelper with QueryErrorsBase {
"sqlExpr" -> toSQLExpr(expr),
"dataType" -> toSQLType(expr.dataType)))
}

if (!expr.deterministic) {
// This is just a sanity check, our analysis rule PullOutNondeterministic should
// already pull out those nondeterministic expressions and evaluate them in
// a Project node.
throw SparkException.internalError(
msg = s"Non-deterministic expression '${toSQLExpr(expr)}' should not appear in " +
"grouping expression.",
context = expr.origin.getQueryContext,
summary = expr.origin.context.summary)
}
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mihailotim-db and @cloud-fan , is this safe to remove?

This PR only added this logic at CheckAnalysis.scala and PullOutNondeterministic.scala. However, assertValidAggregation seems to be used by more places?

$ git grep assertValidAggregation | grep -v CheckAnalysis.scala | grep -v PullOutNondeterministic.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateResolver.scala:   * [[Aggregate]] using the [[ExprUtils.assertValidAggregation]], update the `scopes` with the
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateResolver.scala:        ExprUtils.assertValidAggregation(resolvedAggregate)
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateResolver.scala:   * unintentional exceptions from being thrown by [[ExprUtils.assertValidAggregation]], so the
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/LateralColumnAliasResolver.scala:        ExprUtils.assertValidAggregation(aggregate)
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ProjectResolver.scala:   * validate it using the [[ExprUtils.assertValidAggregation]].
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/ProjectResolver.scala:          ExprUtils.assertValidAggregation(aggregate)
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExprUtils.scala:  def assertValidAggregation(a: Aggregate): Unit = {
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:          ExprUtils.assertValidAggregation(a)

Can we add back this to be safe? I don't see any negative effect to have this here. Do have a chance for this to cause any failure at single pass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are moving this to CheckAnalysis. It going to be ran just before assertValidAggregation. For single-pass, we are going to run PullOutNondeterministic after we finish resolving the plan. Because we are calling ExprUtils.assertValidAggregation during the bottom-up pass, we can't have this check there because it would cause false positive failures (non-deterministic expressions would get pushed down later)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, can I understand like you are saying that this PR is technically reducing test coverage of this code path (by reducing the invocation paths) due to the single-pass requirement, @mihailotim-db ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the sense that we are not invoking this between rules but only in CheckAnalysis, yes. But from what I can see, we only call assertValidAggregation from single-pass analyzer, CheckAnalysis and Optimizer, so we don't have an issue there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mihailotim-db for the explanation. Late LGTM.

}

a.groupingExpressions.foreach(checkValidGroupingExprs)
Expand Down