Skip to content

[SPARK-52686][SQL] Union should be resolved only if there are no duplicates #51376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
case e @ Except(left, right, _) if !e.duplicateResolved && noMissingInput(right) =>
e.copy(right = dedupRight(left, right))
// Only after we finish by-name resolution for Union
case u: Union if !u.byName && !u.duplicateResolved =>
case u: Union if !u.byName && !u.duplicatesResolvedBetweenBranches =>
val unionWithChildOutputsDeduplicated =
DeduplicateUnionChildOutput.deduplicateOutputPerChild(u)
// Use projection-based de-duplication for Union to avoid breaking the checkpoint sharing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.optimizer

import java.util.HashSet

import scala.collection.mutable

import org.apache.spark.SparkException
Expand Down Expand Up @@ -895,6 +897,24 @@ object LimitPushDown extends Rule[LogicalPlan] {
*/
object PushProjectionThroughUnion extends Rule[LogicalPlan] {

/**
* When pushing a [[Project]] through [[Union]] we need to maintain the invariant that [[Union]]
* children must have unique [[ExprId]]s per branch. We can safely deduplicate [[ExprId]]s
* without updating any references because those [[ExprId]]s will simply remain unused.
* For example, in a `Project(col1#1, col#1)` we will alias the second `col1` and get
* `Project(col1#1, col1 as col1#2)`. We don't need to update any references to `col1#1` we
* aliased because `col1#1` still exists in [[Project]] output.
*/
private def deduplicateProjectList(projectList: Seq[NamedExpression]) = {
val existingExprIds = new HashSet[ExprId]
projectList.map(attr => if (existingExprIds.contains(attr.exprId)) {
Alias(attr, attr.name)()
} else {
existingExprIds.add(attr.exprId)
attr
})
}

/**
* Maps Attributes from the left side to the corresponding Attribute on the right side.
*/
Expand Down Expand Up @@ -923,10 +943,15 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] {
}

def pushProjectionThroughUnion(projectList: Seq[NamedExpression], u: Union): Seq[LogicalPlan] = {
val newFirstChild = Project(projectList, u.children.head)
val deduplicatedProjectList = if (conf.unionIsResolvedWhenDuplicatesPerChildResolved) {
deduplicateProjectList(projectList)
} else {
projectList
}
val newFirstChild = Project(deduplicatedProjectList, u.children.head)
val newOtherChildren = u.children.tail.map { child =>
val rewrites = buildRewrites(u.children.head, child)
Project(projectList.map(pushToRight(_, rewrites)), child)
Project(deduplicatedProjectList.map(pushToRight(_, rewrites)), child)
}
newFirstChild +: newOtherChildren
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,20 @@ case class Union(
Some(sum.toLong)
}

def duplicateResolved: Boolean = {
private def duplicatesResolvedPerBranch: Boolean =
children.forall(child => child.outputSet.size == child.output.size)

def duplicatesResolvedBetweenBranches: Boolean = {
children.map(_.outputSet.size).sum ==
AttributeSet.fromAttributeSets(children.map(_.outputSet)).size
}

override lazy val resolved: Boolean = {
children.length > 1 && !(byName || allowMissingCol) && childrenResolved && allChildrenCompatible
children.length > 1 &&
!(byName || allowMissingCol) &&
childrenResolved &&
allChildrenCompatible &&
(!conf.unionIsResolvedWhenDuplicatesPerChildResolved || duplicatesResolvedPerBranch)
}

override protected def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): Union =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,15 @@ object SQLConf {
}
}

val UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED =
buildConf("spark.sql.analyzer.unionIsResolvedWhenDuplicatesPerChildResolved")
.internal()
.doc(
"When true, union should only be resolved once there are no duplicate attributes in " +
"each branch.")
.booleanConf
.createWithDefault(true)

val ONLY_NECESSARY_AND_UNIQUE_METADATA_COLUMNS =
buildConf("spark.sql.analyzer.uniqueNecessaryMetadataColumns")
.internal()
Expand Down Expand Up @@ -6852,6 +6861,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def useNullsForMissingDefaultColumnValues: Boolean =
getConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES)

def unionIsResolvedWhenDuplicatesPerChildResolved: Boolean =
getConf(SQLConf.UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED)

override def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS)

override def doubleQuotedIdentifiers: Boolean = ansiEnabled && getConf(DOUBLE_QUOTED_IDENTIFIERS)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Union false, false
'Union false, false
:- LocalRelation <empty>, [id#0L, a#0, b#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0]
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Union false, false
'Union false, false
:- LocalRelation <empty>, [id#0L, a#0, b#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Union false, false
'Union false, false
:- Project [id#0L, a#0]
: +- LocalRelation <empty>, [id#0L, a#0, b#0]
+- Project [id#0L, a#0]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Union false, false
'Union false, false
:- Project [id#0L, a#0, b#0, null AS payload#0]
: +- LocalRelation <empty>, [id#0L, a#0, b#0]
+- Project [id#0L, a#0, null AS b#0, payload#0]
Expand Down
39 changes: 39 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4963,6 +4963,45 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
)
}

test("SPARK-52686: Union should be resolved only if there are no duplicates") {
Copy link
Contributor

@LuciferYang LuciferYang Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added test has been causing failures in the Non-Ansi daily test since 5 days ago:

image

We can reproduce it locally using the following command:

SPARK_ANSI_SQL_MODE=false build/sbt clean "sql/testOnly org.apache.spark.sql.SQLQuerySuite"
[info] - SPARK-52686: Union should be resolved only if there are no duplicates *** FAILED *** (143 milliseconds)
[info]   9 did not equal 8 (SQLQuerySuite.scala:4998)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1023(SQLQuerySuite.scala:4998)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info]   at org.apache.spark.sql.SQLQuerySuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLQuerySuite.scala:63)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:253)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:251)
[info]   at org.apache.spark.sql.SQLQuerySuite.withSQLConf(SQLQuerySuite.scala:63)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1022(SQLQuerySuite.scala:4975)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1022$adapted(SQLQuerySuite.scala:4972)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1021(SQLQuerySuite.scala:4972)

Could you take a look at this issue when you have time? thanks @mihailotim-db
also cc @cloud-fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, the test is dependent on WidenSetOperationTypes triggering and I guess that happens only with ANSI false. Should we just set ANSI to true for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followup to fix the test: #51490

withTable("t1", "t2", "t3") {
sql("CREATE TABLE t1 (col1 STRING, col2 STRING, col3 STRING)")
sql("CREATE TABLE t2 (col1 STRING, col2 DOUBLE, col3 STRING)")
sql("CREATE TABLE t3 (col1 STRING, col2 DOUBLE, a STRING, col3 STRING)")

for (confValue <- Seq(false, true)) {
withSQLConf(
SQLConf.UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED.key -> confValue.toString
) {
val analyzedPlan = sql(
"""SELECT
| *
|FROM (
| SELECT col1, col2, NULL AS a, col1 FROM t1
| UNION
| SELECT col1, col2, NULL AS a, col3 FROM t2
| UNION
| SELECT * FROM t3
|)""".stripMargin
).queryExecution.analyzed

val projectCount = analyzedPlan.collect {
case project: Project => project
}.size

// When UNION_IS_RESOLVED_WHEN_DUPLICATES_PER_CHILD_RESOLVED is disabled, we resolve
// outer Union before deduplicating ExprIds in inner union. Because of this we get an
// additional unnecessary Project (see SPARK-52686).
if (confValue) {
assert(projectCount == 7)
} else {
assert(projectCount == 8)
}
}
}
}
}

Seq(true, false).foreach { codegenEnabled =>
test(s"SPARK-52060: one row relation with codegen enabled - $codegenEnabled") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegenEnabled.toString) {
Expand Down