Skip to content

Commit 34416f7

Browse files
vladimirg-dbyhuang-db
authored andcommitted
[SPARK-52392][SQL] New single-pass Analyzer functionality
### What changes were proposed in this pull request? New iteration of single-pass Analyzer improvements - Implement HAVING - Remove excessive stack frames by flattening `withNewScope`-shaped methods - Implement default view collation in single-pass Analyzer - Move hive table resolution to MetadataResolver extensions - Other bugfixes ### Why are the changes needed? To replace the existing Spark Analyzer with the single-pass. one. ### Does this PR introduce _any_ user-facing change? No, single-pass Analyzer is not yet enabled. ### How was this patch tested? CI with `ANALYZER_DUAL_RUN_LEGACY_AND_SINGLE_PASS_RESOLVER`. ### Was this patch authored or co-authored using generative AI tooling? Yes. Closes apache#51078 from vladimirg-db/vladimir-golubev_data/single-pass-analyzer/improvements. Authored-by: Vladimir Golubev <vladimir.golubev@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 255249f commit 34416f7

File tree

50 files changed

+1743
-794
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1743
-794
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateExpressionResolver.scala

Lines changed: 35 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,14 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis.resolver
1919

20-
import java.util.IdentityHashMap
21-
22-
import org.apache.spark.SparkException
2320
import org.apache.spark.sql.AnalysisException
2421
import org.apache.spark.sql.catalyst.analysis.{
2522
AnsiTypeCoercion,
2623
CollationTypeCoercion,
2724
TypeCoercion
2825
}
29-
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, OuterReference, SubExprUtils}
26+
import org.apache.spark.sql.catalyst.expressions.{Expression, OuterReference, SubExprUtils}
3027
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ListAgg}
31-
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Sort}
3228
import org.apache.spark.sql.catalyst.util.toPrettySQL
3329
import org.apache.spark.sql.errors.QueryCompilationErrors
3430

@@ -90,8 +86,6 @@ class AggregateExpressionResolver(
9086
* 1. Update the [[ExpressionResolver.expressionResolutionContextStack]];
9187
* 2. Handle [[OuterReference]] in [[AggregateExpression]], if there are any (see
9288
* `handleOuterAggregateExpression`);
93-
* 3. Handle [[AggregateExpression]] in [[Sort]] operator (see
94-
* `handleAggregateExpressionInSort`);
9589
* - Validation:
9690
* 1. [[ListAgg]] is not allowed in DISTINCT aggregates if it contains [[SortOrder]] different
9791
* from its child;
@@ -124,12 +118,7 @@ class AggregateExpressionResolver(
124118
if (expressionResolutionContext.hasOuterReferences) {
125119
handleOuterAggregateExpression(aggregateExpressionWithChildrenResolved)
126120
} else {
127-
traversals.current.parentOperator match {
128-
case Sort(_, _, aggregate: Aggregate, _) =>
129-
handleAggregateExpressionInSort(aggregateExpressionWithChildrenResolved, aggregate)
130-
case other =>
131-
aggregateExpressionWithChildrenResolved
132-
}
121+
aggregateExpressionWithChildrenResolved
133122
}
134123
}
135124

@@ -163,12 +152,15 @@ class AggregateExpressionResolver(
163152
* - Create a new subtree without [[OuterReference]]s;
164153
* - Alias this subtree and put it inside the current [[SubqueryScope]];
165154
* - If outer aggregates are allowed, replace the [[AggregateExpression]] with an
166-
* [[OuterReference]] to the auto-generated [[Alias]] that we created. This alias will later
167-
* be injected into the outer [[Aggregate]]; We store the name that needs to be used for the
168-
* [[OuterReference]] in [[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] computed based on
169-
* the [[AggregateExpression]] without [[OuterReference]] pulled out.
155+
* [[OuterReference]] to the auto-generated [[Alias]] that we created in case the subtree
156+
* without [[OuterReference]]s can't be found in the outer
157+
* [[Aggregate.aggregateExpressions]] list. Otherwise, use the [[Alias]] from the outer
158+
* [[Aggregate]]. This alias will later be injected into the outer [[Aggregate]];
159+
* - Store the name that needs to be used for the [[OuterReference]] in
160+
* [[OuterReference.SINGLE_PASS_SQL_STRING_OVERRIDE]] computed based on the
161+
* [[AggregateExpression]] without [[OuterReference]] pulled out.
170162
* - In case we have an [[AggregateExpression]] inside a [[Sort]] operator, we need to handle it
171-
* in a special way (see [[handleAggregateExpressionInSort]] for more details).
163+
* in a special way (see [[handleAggregateExpressionOutsideAggregate]] for more details).
172164
* - Return the original [[AggregateExpression]] otherwise. This is done to stay compatible
173165
* with the fixed-point Analyzer - a proper exception will be thrown later by
174166
* [[ValidateSubqueryExpression]].
@@ -183,19 +175,12 @@ class AggregateExpressionResolver(
183175
}
184176

185177
val resolvedOuterAggregateExpression =
186-
if (subqueryRegistry.currentScope.isOuterAggregateAllowed) {
187-
val aggregateExpressionWithStrippedOuterReferences =
188-
SubExprUtils.stripOuterReference(aggregateExpression)
189-
190-
val outerAggregateExpressionAlias = autoGeneratedAliasProvider.newOuterAlias(
191-
child = aggregateExpressionWithStrippedOuterReferences
192-
)
193-
subqueryRegistry.currentScope.addOuterAggregateExpression(
194-
outerAggregateExpressionAlias,
195-
aggregateExpressionWithStrippedOuterReferences
178+
if (subqueryRegistry.currentScope.aggregateExpressionsExtractor.isDefined) {
179+
extractOuterAggregateExpression(
180+
aggregateExpression = aggregateExpression,
181+
aggregateExpressionsExtractor =
182+
subqueryRegistry.currentScope.aggregateExpressionsExtractor.get
196183
)
197-
198-
OuterReference(outerAggregateExpressionAlias.toAttribute)
199184
} else {
200185
aggregateExpression
201186
}
@@ -211,41 +196,30 @@ class AggregateExpressionResolver(
211196
}
212197
}
213198

214-
/**
215-
* If we order by an [[AggregateExpression]] which is not present in the [[Aggregate]] operator
216-
* (child of the [[Sort]]) we have to extract it (by adding it to the
217-
* `extractedAggregateExpressionAliases` list of the current expression tree traversal) and add
218-
* it to the [[Aggregate]] operator afterwards (this is done in the [[SortResolver]]).
219-
*/
220-
private def handleAggregateExpressionInSort(
221-
aggregateExpression: Expression,
222-
aggregate: Aggregate): Expression = {
223-
val aliasChildToAliasInAggregateExpressions = new IdentityHashMap[Expression, Alias]
224-
val aggregateExpressionsSemanticComparator = new SemanticComparator(
225-
aggregate.aggregateExpressions.collect {
226-
case alias: Alias =>
227-
aliasChildToAliasInAggregateExpressions.put(alias.child, alias)
228-
alias.child
229-
}
199+
private def extractOuterAggregateExpression(
200+
aggregateExpression: AggregateExpression,
201+
aggregateExpressionsExtractor: GroupingAndAggregateExpressionsExtractor): OuterReference = {
202+
val aggregateExpressionWithStrippedOuterReferences =
203+
SubExprUtils.stripOuterReference(aggregateExpression)
204+
205+
val outerAggregateExpressionAlias = autoGeneratedAliasProvider.newOuterAlias(
206+
child = aggregateExpressionWithStrippedOuterReferences
230207
)
231208

232-
val referencedAggregateExpression =
233-
aggregateExpressionsSemanticComparator.collectFirst(aggregateExpression)
209+
val (_, referencedAggregateExpressionAlias) =
210+
aggregateExpressionsExtractor.collectFirstAggregateExpression(
211+
aggregateExpressionWithStrippedOuterReferences
212+
)
234213

235-
referencedAggregateExpression match {
236-
case Some(expression) =>
237-
aliasChildToAliasInAggregateExpressions.get(expression) match {
238-
case null =>
239-
throw SparkException.internalError(
240-
s"No parent alias for expression $expression while extracting aggregate" +
241-
s"expressions in Sort operator."
242-
)
243-
case alias: Alias => alias.toAttribute
244-
}
214+
referencedAggregateExpressionAlias match {
215+
case Some(alias) =>
216+
subqueryRegistry.currentScope.addAliasForOuterAggregateExpression(alias)
217+
OuterReference(alias.toAttribute)
245218
case None =>
246-
val alias = autoGeneratedAliasProvider.newAlias(child = aggregateExpression)
247-
traversals.current.extractedAggregateExpressionAliases.add(alias)
248-
alias.toAttribute
219+
subqueryRegistry.currentScope.addAliasForOuterAggregateExpression(
220+
outerAggregateExpressionAlias
221+
)
222+
OuterReference(outerAggregateExpressionAlias.toAttribute)
249223
}
250224
}
251225

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AggregateResolver.scala

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,11 @@ import org.apache.spark.sql.catalyst.analysis.{
2727
UnresolvedAttribute
2828
}
2929
import org.apache.spark.sql.catalyst.expressions.{
30+
Alias,
3031
AttributeReference,
3132
Expression,
3233
ExprId,
3334
ExprUtils,
34-
IntegerLiteral,
35-
Literal,
3635
NamedExpression
3736
}
3837
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
@@ -69,7 +68,9 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
6968
* output of [[Aggregate]] and return the result.
7069
*/
7170
def resolve(unresolvedAggregate: Aggregate): LogicalPlan = {
72-
val resolvedAggregate = scopes.withNewScope() {
71+
scopes.pushScope()
72+
73+
val resolvedAggregate = try {
7374
val resolvedChild = operatorResolver.resolve(unresolvedAggregate.child)
7475

7576
val resolvedAggregateExpressions = expressionResolver.resolveAggregateExpressions(
@@ -124,9 +125,11 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
124125
operator = resolvedAggregate,
125126
outputList = resolvedAggregate.aggregateExpressions,
126127
groupingAttributeIds = Some(getGroupingAttributeIds(resolvedAggregate)),
127-
aggregateListAliases = scopes.current.getAggregateListAliases
128+
aggregateListAliases = scopes.current.getTopAggregateExpressionAliases
128129
)
129130
}
131+
} finally {
132+
scopes.popScope()
130133
}
131134

132135
scopes.overwriteOutputAndExtendHiddenOutput(
@@ -179,10 +182,13 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
179182
*
180183
* Example 5:
181184
*
182-
* {{{ SELECT col1, 5 FROM VALUES(1) GROUP BY ALL; }}}
183-
* this one should be grouped by keyword `ALL`. If there is an aggregate expression which is a
184-
* [[Literal]] with the Integer data type - preserve the ordinal literal in order to pass logical
185-
* plan comparison. The grouping expressions list will be [col1, 2].
185+
* {{{ SELECT col1 AS b, sum(col2) + col1 FROM VALUES (1, 2) GROUP BY ALL; }}}
186+
* this one should be grouped by keyword `ALL`. It means that the grouping expressions list is
187+
* going to contain all the aggregate expressions that don't have aggregate expressions in their
188+
* subtrees. The grouping expressions list will be [col1 AS `col1`].
189+
* All the [[Alias]]es should be stripped in order to pass logical plan comparison and to prevent
190+
* unintentional exceptions from being thrown by [[ExprUtils.assertValidAggregation]], so the
191+
* final grouping expressions list will be [col1].
186192
*/
187193
private def tryResolveGroupByAll(
188194
aggregateExpressions: ResolvedAggregateExpressions,
@@ -195,13 +201,10 @@ class AggregateResolver(operatorResolver: Resolver, expressionResolver: Expressi
195201
)
196202
}
197203

198-
aggregateExpressions.resolvedExpressionsWithoutAggregates.zipWithIndex.map {
199-
case (expression, index) =>
200-
expression match {
201-
case IntegerLiteral(_) =>
202-
Literal(index + 1)
203-
case _ => expression
204-
}
204+
aggregateExpressions.resolvedExpressionsWithoutAggregates.map {
205+
case alias: Alias =>
206+
alias.child
207+
case other => other
205208
}
206209
}
207210

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AliasResolver.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818
package org.apache.spark.sql.catalyst.analysis.resolver
1919

2020
import org.apache.spark.sql.catalyst.analysis.{AliasResolution, MultiAlias, UnresolvedAlias}
21-
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression}
21+
import org.apache.spark.sql.catalyst.expressions.{
22+
Alias,
23+
Expression,
24+
NamedExpression,
25+
OuterReference
26+
}
2227

2328
/**
2429
* Resolver class that resolves unresolved aliases and handles user-specified aliases.
@@ -29,12 +34,17 @@ class AliasResolver(expressionResolver: ExpressionResolver)
2934
private val scopes = expressionResolver.getNameScopes
3035
private val expressionResolutionContextStack =
3136
expressionResolver.getExpressionResolutionContextStack
37+
private val autoGeneratedAliasProvider = new AutoGeneratedAliasProvider(
38+
expressionResolver.getExpressionIdAssigner
39+
)
3240

3341
/**
3442
* Resolves [[UnresolvedAlias]] by resolving its child and computing the alias name by calling
3543
* [[AliasResolution]] on the result. After resolving it, we assign a correct exprId to the
36-
* resulting [[Alias]]. Here we allow inner aliases to persist until the end of single-pass
37-
* resolution, after which they will be removed in the post-processing phase.
44+
* resulting [[Alias]]. In case result of the [[AliasResolution]] call is an [[OuterReference]],
45+
* we create a new [[Alias]] using the [[AutoGeneratedAliasProvider]]. Here we allow inner
46+
* aliases to persist until the end of single-pass resolution, after which they will be removed
47+
* in the post-processing phase.
3848
*/
3949
override def resolve(unresolvedAlias: UnresolvedAlias): NamedExpression =
4050
scopes.current.lcaRegistry.withNewLcaScope {
@@ -52,6 +62,8 @@ class AliasResolver(expressionResolver: ExpressionResolver)
5262
)
5363
case alias: Alias =>
5464
expressionResolver.getExpressionIdAssigner.mapExpression(alias)
65+
case outerReference: OuterReference =>
66+
autoGeneratedAliasProvider.newAlias(outerReference)
5567
}
5668
}
5769

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AnalyzerBridgeState.scala

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,39 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3434
* @param catalogRelationsWithResolvedMetadata A map from [[UnresolvedCatalogRelation]] to the
3535
* relations with resolved metadata. It allows us to reuse the relation metadata and avoid
3636
* duplicate catalog/table lookups.
37+
* @param hiveRelationsWithResolvedMetadata A map from [[HiveTableRelation]] to their resolved
38+
* [[LogicalRelation]] counterparts. We cannot import those nodes here because of recursive
39+
* dependencies, so we rely on overridden [[LogicalPlan.equals]] and [[LogicalPlan.hashCode]].
40+
* Keys are canonicalized to compensate for stats added by [[DetermineTableStats]].
3741
*/
3842
case class AnalyzerBridgeState(
3943
relationsWithResolvedMetadata: AnalyzerBridgeState.RelationsWithResolvedMetadata =
4044
new AnalyzerBridgeState.RelationsWithResolvedMetadata,
4145
catalogRelationsWithResolvedMetadata: AnalyzerBridgeState.CatalogRelationsWithResolvedMetadata =
42-
new AnalyzerBridgeState.CatalogRelationsWithResolvedMetadata
46+
new AnalyzerBridgeState.CatalogRelationsWithResolvedMetadata,
47+
hiveRelationsWithResolvedMetadata: AnalyzerBridgeState.HiveRelationsWithResolvedMetadata =
48+
new AnalyzerBridgeState.HiveRelationsWithResolvedMetadata
4349
) {
4450
def addUnresolvedRelation(unresolvedRelation: UnresolvedRelation, relation: LogicalPlan): Unit = {
4551
relationsWithResolvedMetadata.put(
46-
BridgedRelationId(unresolvedRelation, AnalysisContext.get.catalogAndNamespace),
47-
relation
48-
)
52+
BridgedRelationId(unresolvedRelation, AnalysisContext.get.catalogAndNamespace),
53+
relation
54+
)
55+
}
56+
57+
def addLogicalRelationForHiveRelation(
58+
hiveRelation: LogicalPlan,
59+
logicalRelation: LogicalPlan): Unit = {
60+
hiveRelationsWithResolvedMetadata.put(hiveRelation.canonicalized, logicalRelation)
61+
}
62+
63+
def getLogicalRelationForHiveRelation(hiveRelation: LogicalPlan): Option[LogicalPlan] = {
64+
Option(hiveRelationsWithResolvedMetadata.get(hiveRelation.canonicalized))
4965
}
5066
}
5167

5268
object AnalyzerBridgeState {
5369
type RelationsWithResolvedMetadata = HashMap[BridgedRelationId, LogicalPlan]
5470
type CatalogRelationsWithResolvedMetadata = HashMap[UnresolvedCatalogRelation, LogicalPlan]
71+
type HiveRelationsWithResolvedMetadata = HashMap[LogicalPlan, LogicalPlan]
5572
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/resolver/AttributeScopeStack.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,21 +80,23 @@ class AttributeScopeStack {
8080
}
8181

8282
/**
83-
* Execute `body` in the context of a fresh attribute scope. Used by [[Project]] and [[Aggregate]]
84-
* validation code since those operators introduce a new scope with fresh expression IDs.
83+
* Push a fresh attribute scope. Used by [[Project]] and [[Aggregate]] validation code since
84+
* those operators introduce a new scope with fresh expression IDs.
8585
*/
86-
def withNewScope[R](isSubqueryRoot: Boolean = false)(body: => R): Unit = {
86+
def pushScope(isSubqueryRoot: Boolean = false): Unit = {
8787
stack.push(
8888
AttributeScope(
8989
attributes = AttributeSet(Seq.empty),
9090
isSubqueryRoot = isSubqueryRoot
9191
)
9292
)
93-
try {
94-
body
95-
} finally {
93+
}
94+
95+
/**
96+
* Pop current attribute scope.
97+
*/
98+
def popScope(): Unit = {
9699
stack.pop()
97-
}
98100
}
99101

100102
override def toString: String = stack.toString

0 commit comments

Comments
 (0)