-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52686][SQL] Union
should be resolved only if there are no duplicates
#51376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-52686][SQL] Union
should be resolved only if there are no duplicates
#51376
Conversation
Union
should be resolved only if there are no duplicates
...alyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
Outdated
Show resolved
Hide resolved
dd36ea8
to
cd2d37e
Compare
ea8b329
to
99ddfe3
Compare
sql/connect/common/src/test/resources/query-tests/explain-results/union.explain
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
cc @yaooqinn and @peter-toth
There are some optimizer tests failing because |
@mihailotim-db can you push a commit with the proposed change so that we can review? |
99ddfe3
to
8989e52
Compare
@cloud-fan Done! |
f0876d6
to
b34547c
Compare
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
Show resolved
Hide resolved
@@ -895,6 +897,24 @@ object LimitPushDown extends Rule[LogicalPlan] { | |||
*/ | |||
object PushProjectionThroughUnion extends Rule[LogicalPlan] { | |||
|
|||
/** | |||
* When pushing a [[Project]] through [[Union]] we need to maintain the invariant that | |||
* [[Union]] children must have unique [[ExprId]]s per branch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't DeduplicateRelations
handle the conflicting attributes within a Project?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does and that is the issue. We are going to deduplicate ids there, but then get to optimizer and add duplicate ids again. For example if we are pushing down Project(col1#1, col1#1)
we are going to add it to both branches of Union, making the Union unresolved
170ffef
to
8ea496d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I'm not sure we need the new config though.
@peter-toth Config is a safety net in case there are other cases in Optimizer that can make Union unresolved (similar to |
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
8ea496d
to
688eb79
Compare
thanks, merging to master! |
…plicates ### What changes were proposed in this pull request? Union should be `resolved` only if there are no duplicates in any of the children and there are no conflicting attributes per branch. ### Why are the changes needed? This is necessary in order to prevent some rules like `ResolveReferences` or `WidenSetOperationTypes` resolving upper nodes while Union still has duplicate expr ids. For the following query pattern: ``` sql("""CREATE TABLE t1 (col1 STRING, col2 STRING, col3 STRING)""".stripMargin) sql("""CREATE TABLE t2 (col1 STRING, col2 DOUBLE, col3 STRING)""".stripMargin) sql("""CREATE TABLE t3 (col1 STRING, col2 DOUBLE, a STRING, col3 STRING)""".stripMargin) sql("""SELECT | * |FROM ( | SELECT col1, col2, NULL AS a, col1 FROM t1 | UNION | SELECT col1, col2, NULL AS a, col3 FROM t2 | UNION | SELECT * FROM t3 |)""".stripMargin) ``` Because at the moment `Union` can be resolved even if there are duplicates in a branch, plan is transformed in a following way: ``` Union +- Union :- Project col1#5, col2#6, null AS a#3, col1#5 +-Project col1#8, col2#9, null AS a#4, col3#10 ``` becomes ``` Union +- Project col1#5, col2#16, cast(a#3 as string) AS a#17, col1#5 +- Union :- Project col1#5, col2#6, null AS a#3, col1#5 +-Project col1#8, col2#9, null AS a#4, col3#10 ``` we end up with duplicate `col1#5` in both the outer project and the inner one. After `ResolveReferences` triggers, we will deduplicate both the inner and outer projects, resulting in an unnecessary project. Instead, by waiting to first deduplicate expr ids in the inner project before continuing resolution, the Project we insert between Unions will not contain duplicate ExprIds and we don't need to add another unnecessary one. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51376 from mihailotim-db/mihailotim-db/fix_union_resolved. Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…plicates ### What changes were proposed in this pull request? Union should be `resolved` only if there are no duplicates in any of the children and there are no conflicting attributes per branch. ### Why are the changes needed? This is necessary in order to prevent some rules like `ResolveReferences` or `WidenSetOperationTypes` resolving upper nodes while Union still has duplicate expr ids. For the following query pattern: ``` sql("""CREATE TABLE t1 (col1 STRING, col2 STRING, col3 STRING)""".stripMargin) sql("""CREATE TABLE t2 (col1 STRING, col2 DOUBLE, col3 STRING)""".stripMargin) sql("""CREATE TABLE t3 (col1 STRING, col2 DOUBLE, a STRING, col3 STRING)""".stripMargin) sql("""SELECT | * |FROM ( | SELECT col1, col2, NULL AS a, col1 FROM t1 | UNION | SELECT col1, col2, NULL AS a, col3 FROM t2 | UNION | SELECT * FROM t3 |)""".stripMargin) ``` Because at the moment `Union` can be resolved even if there are duplicates in a branch, plan is transformed in a following way: ``` Union +- Union :- Project col1#5, col2#6, null AS a#3, col1#5 +-Project col1#8, col2#9, null AS a#4, col3#10 ``` becomes ``` Union +- Project col1#5, col2#16, cast(a#3 as string) AS a#17, col1#5 +- Union :- Project col1#5, col2#6, null AS a#3, col1#5 +-Project col1#8, col2#9, null AS a#4, col3#10 ``` we end up with duplicate `col1#5` in both the outer project and the inner one. After `ResolveReferences` triggers, we will deduplicate both the inner and outer projects, resulting in an unnecessary project. Instead, by waiting to first deduplicate expr ids in the inner project before continuing resolution, the Project we insert between Unions will not contain duplicate ExprIds and we don't need to add another unnecessary one. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51376 from mihailotim-db/mihailotim-db/fix_union_resolved. Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@@ -4963,6 +4963,45 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark | |||
) | |||
} | |||
|
|||
test("SPARK-52686: Union should be resolved only if there are no duplicates") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The newly added test has been causing failures in the Non-Ansi daily test since 5 days ago:

We can reproduce it locally using the following command:
SPARK_ANSI_SQL_MODE=false build/sbt clean "sql/testOnly org.apache.spark.sql.SQLQuerySuite"
[info] - SPARK-52686: Union should be resolved only if there are no duplicates *** FAILED *** (143 milliseconds)
[info] 9 did not equal 8 (SQLQuerySuite.scala:4998)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info] at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1023(SQLQuerySuite.scala:4998)
[info] at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info] at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info] at org.apache.spark.sql.SQLQuerySuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLQuerySuite.scala:63)
[info] at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:253)
[info] at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:251)
[info] at org.apache.spark.sql.SQLQuerySuite.withSQLConf(SQLQuerySuite.scala:63)
[info] at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1022(SQLQuerySuite.scala:4975)
[info] at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1022$adapted(SQLQuerySuite.scala:4972)
[info] at scala.collection.immutable.List.foreach(List.scala:334)
[info] at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1021(SQLQuerySuite.scala:4972)
Could you take a look at this issue when you have time? thanks @mihailotim-db
also cc @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, the test is dependent on WidenSetOperationTypes
triggering and I guess that happens only with ANSI false. Should we just set ANSI to true for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followup to fix the test: #51490
What changes were proposed in this pull request?
Union should be
resolved
only if there are no duplicates in any of the children and there are no conflicting attributes per branch.Why are the changes needed?
This is necessary in order to prevent some rules like
ResolveReferences
orWidenSetOperationTypes
resolving upper nodes while Union still has duplicate expr ids. For the following query pattern:Because at the moment
Union
can be resolved even if there are duplicates in a branch, plan is transformed in a following way:becomes
we end up with duplicate
col1#5
in both the outer project and the inner one. AfterResolveReferences
triggers, we will deduplicate both the inner and outer projects, resulting in an unnecessary project.Instead, by waiting to first deduplicate expr ids in the inner project before continuing resolution, the Project we insert between Unions will not contain duplicate ExprIds and we don't need to add another unnecessary one.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a test case.
Was this patch authored or co-authored using generative AI tooling?
No