-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52921][SQL] Specify outputPartitioning for UnionExec for partitioner aware case #51623
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@@ -1606,11 +1606,15 @@ class SparkContext(config: SparkConf) extends Logging { | |||
new ReliableCheckpointRDD[T](this, path) | |||
} | |||
|
|||
protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment about the assumption, rdds.filter(!_.partitions.isEmpty)
? Otherwise, it may cause correctness issues later if we use this blindly.
Otherwise, we had better include the assumption inside this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment and a check.
private lazy val childrenRDDs = children.map(_.execute()) | ||
|
||
override def outputPartitioning: Partitioning = { | ||
val nonEmptyRdds = childrenRDDs.filter(!_.partitions.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. We can remove this too if isPartitionerAwareUnion
has the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because SparkContext.union
uses nonEmptyRdds
, so I didn't move nonEmptyRdds
logic into isPartitionerAwareUnion
. I leave to the callers to pass in non empty rdds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it~ Thank you for the explanation.
cc @peter-toth |
@@ -1606,11 +1606,17 @@ class SparkContext(config: SparkConf) extends Logging { | |||
new ReliableCheckpointRDD[T](this, path) | |||
} | |||
|
|||
// Note that input rdds must be all non-empty, i.e., rdds.filter(_.partitions.isEmpty).isEmpty | |||
protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = { | |||
assert(!rdds.exists(_.partitions.isEmpty), "Must not have empty RDDs") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @viirya .
// child operator will be replaced by Spark in query planning later, in other | ||
// words, `execute` won't be actually called on them during the execution of | ||
// this plan. So we can safely return the default partitioning. | ||
case e if NonFatal(e) => super.outputPartitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This handles nodes that don't implement execute
method. The reason is described like the comment said.
protected[spark] def isPartitionerAwareUnion[T: ClassTag](rdds: Seq[RDD[T]]): Boolean = { | ||
assert(!rdds.exists(_.partitions.isEmpty), "Must not have empty RDDs") | ||
val partitioners = rdds.flatMap(_.partitioner).toSet | ||
rdds.forall(_.partitioner.isDefined) && partitioners.size == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we don't need the partitioners
set before the forall isDefined check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just a minor nit.
Hmm, there are a few test failures, I will take a look. |
"default partitioning.") | ||
.version("4.1.0") | ||
.booleanConf | ||
.createWithDefault(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For safety, added an internal config for it.
What changes were proposed in this pull request?
This patch updates
outputPartitioning
forUnionExec
operator for the case that the partitioner is aware. So the output partitioning can be known.Why are the changes needed?
Currently the output partitioning of
UnionExec
is simply unknown. But if the partitioner is known to be the same for all children RDDs,SparkContext.union
produces aPartitionerAwareUnionRDD
which reuses the partition. For such cases, the output partitioning ofUnionExec
is actually known to be the same as its children nodes.Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test.
Was this patch authored or co-authored using generative AI tooling?
No