Skip to content

[SPARK-52686][SQL] Union should be resolved only if there are no duplicates #51376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

mihailotim-db
Copy link
Contributor

@mihailotim-db mihailotim-db commented Jul 4, 2025

What changes were proposed in this pull request?

Union should be resolved only if there are no duplicates in any of the children and there are no conflicting attributes per branch.

Why are the changes needed?

This is necessary in order to prevent some rules like ResolveReferences or WidenSetOperationTypes resolving upper nodes while Union still has duplicate expr ids. For the following query pattern:

    sql("""CREATE TABLE t1 (col1 STRING, col2 STRING, col3 STRING)""".stripMargin)
    sql("""CREATE TABLE t2 (col1 STRING, col2 DOUBLE, col3 STRING)""".stripMargin)
    sql("""CREATE TABLE t3 (col1 STRING, col2 DOUBLE, a STRING, col3 STRING)""".stripMargin)
    sql("""SELECT
          |    *
          |FROM (
          |    SELECT col1, col2, NULL AS a, col1 FROM t1
          |    UNION
          |    SELECT col1, col2, NULL AS a, col3 FROM t2
          |    UNION
          |    SELECT * FROM t3
          |)""".stripMargin)

Because at the moment Union can be resolved even if there are duplicates in a branch, plan is transformed in a following way:

Union                                                                                          
+- Union
     :- Project col1#5, col2#6, null AS a#3, col1#5
     +-Project col1#8, col2#9, null AS a#4, col3#10   

becomes

Union
+- Project col1#5, col2#16, cast(a#3 as string) AS a#17, col1#5                                                                                      
    +- Union
         :- Project col1#5, col2#6, null AS a#3, col1#5
         +-Project col1#8, col2#9, null AS a#4, col3#10   

we end up with duplicate col1#5 in both the outer project and the inner one. After ResolveReferences triggers, we will deduplicate both the inner and outer projects, resulting in an unnecessary project.

Instead, by waiting to first deduplicate expr ids in the inner project before continuing resolution, the Project we insert between Unions will not contain duplicate ExprIds and we don't need to add another unnecessary one.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added a test case.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Jul 4, 2025
@mihailotim-db mihailotim-db changed the title ] [SPARK-52686][SQL] Union should be resolved only if there are no duplicates Jul 4, 2025
@mihailotim-db mihailotim-db force-pushed the mihailotim-db/fix_union_resolved branch 2 times, most recently from dd36ea8 to cd2d37e Compare July 7, 2025 11:34
@mihailotim-db mihailotim-db force-pushed the mihailotim-db/fix_union_resolved branch 2 times, most recently from ea8b329 to 99ddfe3 Compare July 7, 2025 12:24
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

cc @yaooqinn and @peter-toth

@mihailotim-db
Copy link
Contributor Author

mihailotim-db commented Jul 7, 2025

There are some optimizer tests failing because PushProjectionThroughUnion doesn't take into account that project list must contain unique ExprIds even though we have an Analyzer rule that does deduplication. I think we can simply alias these duplicate IDs without in place without adding an extra project. What do you think @cloud-fan @dongjoon-hyun @vladimirg-db ?

@cloud-fan
Copy link
Contributor

@mihailotim-db can you push a commit with the proposed change so that we can review?

@mihailotim-db mihailotim-db force-pushed the mihailotim-db/fix_union_resolved branch from 99ddfe3 to 8989e52 Compare July 8, 2025 07:42
@mihailotim-db
Copy link
Contributor Author

@cloud-fan Done!

@mihailotim-db mihailotim-db force-pushed the mihailotim-db/fix_union_resolved branch 2 times, most recently from f0876d6 to b34547c Compare July 8, 2025 10:31
@@ -895,6 +897,24 @@ object LimitPushDown extends Rule[LogicalPlan] {
*/
object PushProjectionThroughUnion extends Rule[LogicalPlan] {

/**
* When pushing a [[Project]] through [[Union]] we need to maintain the invariant that
* [[Union]] children must have unique [[ExprId]]s per branch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't DeduplicateRelations handle the conflicting attributes within a Project?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does and that is the issue. We are going to deduplicate ids there, but then get to optimizer and add duplicate ids again. For example if we are pushing down Project(col1#1, col1#1) we are going to add it to both branches of Union, making the Union unresolved

@mihailotim-db mihailotim-db force-pushed the mihailotim-db/fix_union_resolved branch 2 times, most recently from 170ffef to 8ea496d Compare July 8, 2025 14:24
Copy link
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I'm not sure we need the new config though.

@mihailotim-db
Copy link
Contributor Author

@peter-toth Config is a safety net in case there are other cases in Optimizer that can make Union unresolved (similar to PushProjectionThroughUnion). I am not aware of any other rules, but better be safe in this case

@mihailotim-db mihailotim-db force-pushed the mihailotim-db/fix_union_resolved branch from 8ea496d to 688eb79 Compare July 8, 2025 15:49
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 308d3e8 Jul 9, 2025
asl3 pushed a commit to asl3/spark that referenced this pull request Jul 14, 2025
…plicates

### What changes were proposed in this pull request?

Union should be `resolved` only if there are no duplicates in any of the children and there are no conflicting attributes per branch.

### Why are the changes needed?

This is necessary in order to prevent some rules like `ResolveReferences` or `WidenSetOperationTypes` resolving upper nodes while Union still has duplicate expr ids. For the following query pattern:
```
    sql("""CREATE TABLE t1 (col1 STRING, col2 STRING, col3 STRING)""".stripMargin)
    sql("""CREATE TABLE t2 (col1 STRING, col2 DOUBLE, col3 STRING)""".stripMargin)
    sql("""CREATE TABLE t3 (col1 STRING, col2 DOUBLE, a STRING, col3 STRING)""".stripMargin)
    sql("""SELECT
          |    *
          |FROM (
          |    SELECT col1, col2, NULL AS a, col1 FROM t1
          |    UNION
          |    SELECT col1, col2, NULL AS a, col3 FROM t2
          |    UNION
          |    SELECT * FROM t3
          |)""".stripMargin)
```
Because at the moment `Union` can be resolved even if there are duplicates in a branch, plan is transformed in a following way:
```
Union
+- Union
     :- Project col1#5, col2#6, null AS a#3, col1#5
     +-Project col1#8, col2#9, null AS a#4, col3#10
```
becomes
```
Union
+- Project col1#5, col2#16, cast(a#3 as string) AS a#17, col1#5
    +- Union
         :- Project col1#5, col2#6, null AS a#3, col1#5
         +-Project col1#8, col2#9, null AS a#4, col3#10
```
we end up with duplicate `col1#5` in both the outer project and the inner one. After `ResolveReferences` triggers, we will deduplicate both the inner and outer projects, resulting in an unnecessary project.

Instead, by waiting to first deduplicate expr ids in the inner project before continuing resolution, the Project we insert between Unions will not contain duplicate ExprIds and we don't need to add another unnecessary one.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added a test case.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51376 from mihailotim-db/mihailotim-db/fix_union_resolved.

Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
ksbeyer pushed a commit to ksbeyer/spark that referenced this pull request Jul 14, 2025
…plicates

### What changes were proposed in this pull request?

Union should be `resolved` only if there are no duplicates in any of the children and there are no conflicting attributes per branch.

### Why are the changes needed?

This is necessary in order to prevent some rules like `ResolveReferences` or `WidenSetOperationTypes` resolving upper nodes while Union still has duplicate expr ids. For the following query pattern:
```
    sql("""CREATE TABLE t1 (col1 STRING, col2 STRING, col3 STRING)""".stripMargin)
    sql("""CREATE TABLE t2 (col1 STRING, col2 DOUBLE, col3 STRING)""".stripMargin)
    sql("""CREATE TABLE t3 (col1 STRING, col2 DOUBLE, a STRING, col3 STRING)""".stripMargin)
    sql("""SELECT
          |    *
          |FROM (
          |    SELECT col1, col2, NULL AS a, col1 FROM t1
          |    UNION
          |    SELECT col1, col2, NULL AS a, col3 FROM t2
          |    UNION
          |    SELECT * FROM t3
          |)""".stripMargin)
```
Because at the moment `Union` can be resolved even if there are duplicates in a branch, plan is transformed in a following way:
```
Union
+- Union
     :- Project col1#5, col2#6, null AS a#3, col1#5
     +-Project col1#8, col2#9, null AS a#4, col3#10
```
becomes
```
Union
+- Project col1#5, col2#16, cast(a#3 as string) AS a#17, col1#5
    +- Union
         :- Project col1#5, col2#6, null AS a#3, col1#5
         +-Project col1#8, col2#9, null AS a#4, col3#10
```
we end up with duplicate `col1#5` in both the outer project and the inner one. After `ResolveReferences` triggers, we will deduplicate both the inner and outer projects, resulting in an unnecessary project.

Instead, by waiting to first deduplicate expr ids in the inner project before continuing resolution, the Project we insert between Unions will not contain duplicate ExprIds and we don't need to add another unnecessary one.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added a test case.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#51376 from mihailotim-db/mihailotim-db/fix_union_resolved.

Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@@ -4963,6 +4963,45 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
)
}

test("SPARK-52686: Union should be resolved only if there are no duplicates") {
Copy link
Contributor

@LuciferYang LuciferYang Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added test has been causing failures in the Non-Ansi daily test since 5 days ago:

image

We can reproduce it locally using the following command:

SPARK_ANSI_SQL_MODE=false build/sbt clean "sql/testOnly org.apache.spark.sql.SQLQuerySuite"
[info] - SPARK-52686: Union should be resolved only if there are no duplicates *** FAILED *** (143 milliseconds)
[info]   9 did not equal 8 (SQLQuerySuite.scala:4998)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1023(SQLQuerySuite.scala:4998)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)
[info]   at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)
[info]   at org.apache.spark.sql.SQLQuerySuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLQuerySuite.scala:63)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:253)
[info]   at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:251)
[info]   at org.apache.spark.sql.SQLQuerySuite.withSQLConf(SQLQuerySuite.scala:63)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1022(SQLQuerySuite.scala:4975)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1022$adapted(SQLQuerySuite.scala:4972)
[info]   at scala.collection.immutable.List.foreach(List.scala:334)
[info]   at org.apache.spark.sql.SQLQuerySuite.$anonfun$new$1021(SQLQuerySuite.scala:4972)

Could you take a look at this issue when you have time? thanks @mihailotim-db
also cc @cloud-fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, the test is dependent on WidenSetOperationTypes triggering and I guess that happens only with ANSI false. Should we just set ANSI to true for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followup to fix the test: #51490

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants