Skip to content

[SPARK-52921][SQL] Specify outputPartitioning for UnionExec for partitioner aware case #51623

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

viirya
Copy link
Member

@viirya viirya commented Jul 22, 2025

What changes were proposed in this pull request?

This patch updates outputPartitioning for UnionExec operator for the case that the partitioner is aware. So the output partitioning can be known.

Why are the changes needed?

Currently the output partitioning of UnionExec is simply unknown. But if the partitioner is known to be the same for all children RDDs, SparkContext.union produces a PartitionerAwareUnionRDD which reuses the partition. For such cases, the output partitioning of UnionExec is actually known to be the same as its children nodes.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test.

Was this patch authored or co-authored using generative AI tooling?

No

@viirya viirya marked this pull request as draft July 22, 2025 21:35
@@ -1606,11 +1606,15 @@ class SparkContext(config: SparkConf) extends Logging {
new ReliableCheckpointRDD[T](this, path)
}

protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment about the assumption, rdds.filter(!_.partitions.isEmpty)? Otherwise, it may cause correctness issues later if we use this blindly.

Otherwise, we had better include the assumption inside this method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment and a check.

private lazy val childrenRDDs = children.map(_.execute())

override def outputPartitioning: Partitioning = {
val nonEmptyRdds = childrenRDDs.filter(!_.partitions.isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. We can remove this too if isPartitionerAwareUnion has the logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because SparkContext.union uses nonEmptyRdds, so I didn't move nonEmptyRdds logic into isPartitionerAwareUnion. I leave to the callers to pass in non empty rdds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it~ Thank you for the explanation.

@dongjoon-hyun
Copy link
Member

cc @peter-toth

@@ -1606,11 +1606,17 @@ class SparkContext(config: SparkConf) extends Logging {
new ReliableCheckpointRDD[T](this, path)
}

// Note that input rdds must be all non-empty, i.e., rdds.filter(_.partitions.isEmpty).isEmpty
protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = {
assert(!rdds.exists(_.partitions.isEmpty), "Must not have empty RDDs")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@viirya viirya marked this pull request as ready for review July 22, 2025 23:00
@viirya viirya changed the title [SPARK-XXXXX][SQL] Specify outputPartitioning for UnionExec for partitioner aware case [SPARK-52921][SQL] Specify outputPartitioning for UnionExec for partitioner aware case Jul 22, 2025
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @viirya .

// child operator will be replaced by Spark in query planning later, in other
// words, `execute` won't be actually called on them during the execution of
// this plan. So we can safely return the default partitioning.
case e if NonFatal(e) => super.outputPartitioning
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles nodes that don't implement execute method. The reason is described like the comment said.

protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = {
assert(!rdds.exists(_.partitions.isEmpty), "Must not have empty RDDs")
val partitioners = rdds.flatMap(_.partitioner).toSet
rdds.forall(_.partitioner.isDefined) && partitioners.size == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need the partitioners set before the forall isDefined check.

Copy link
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a minor nit.

@viirya
Copy link
Member Author

viirya commented Jul 23, 2025

Hmm, there are a few test failures, I will take a look.

"default partitioning.")
.version("4.1.0")
.booleanConf
.createWithDefault(true)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For safety, added an internal config for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants