@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
19
19
20
20
import scala .jdk .CollectionConverters ._
21
21
22
+ import org .apache .spark .SparkException
22
23
import org .apache .spark .sql .catalyst .expressions ._
24
+ import org .apache .spark .sql .catalyst .expressions .ExprUtils .toSQLExpr
23
25
import org .apache .spark .sql .catalyst .plans .logical ._
24
26
import org .apache .spark .sql .catalyst .rules .Rule
25
27
@@ -39,10 +41,20 @@ object PullOutNondeterministic extends Rule[LogicalPlan] {
39
41
val nondeterToAttr =
40
42
NondeterministicExpressionCollection .getNondeterministicToAttributes(a.groupingExpressions)
41
43
val newChild = Project (a.child.output ++ nondeterToAttr.values.asScala.toSeq, a.child)
42
- a.transformExpressions { case e =>
44
+ val deterministicAggregate = a.transformExpressions { case e =>
43
45
Option (nondeterToAttr.get(e)).map(_.toAttribute).getOrElse(e)
44
46
}.copy(child = newChild)
45
47
48
+ deterministicAggregate.groupingExpressions.foreach(expr => if (! expr.deterministic) {
49
+ throw SparkException .internalError(
50
+ msg = s " Non-deterministic expression ' ${toSQLExpr(expr)}' should not appear in " +
51
+ " grouping expression." ,
52
+ context = expr.origin.getQueryContext,
53
+ summary = expr.origin.context.summary)
54
+ })
55
+
56
+ deterministicAggregate
57
+
46
58
// Don't touch collect metrics. Top-level metrics are not supported (check analysis will fail)
47
59
// and we want to retain them inside the aggregate functions.
48
60
case m : CollectMetrics => m
0 commit comments