Skip to content

[SPARK-51505][SQL] Always show empty partition number metrics in AQEShuffleReadExec #51608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ case class AQEShuffleReadExec private(
numPartitionsMetric.set(partitionSpecs.length)
driverAccumUpdates += (numPartitionsMetric.id -> partitionSpecs.length.toLong)

val numEmptyPartitionsMetric = metrics("numEmptyPartitions")
val numEmptyPartitions = child match {
case s: ShuffleQueryStageExec =>
s.mapStats.map(stats => stats.bytesByPartitionId.count(_ == 0)).getOrElse(0)
case _ => 0
}
numEmptyPartitionsMetric.set(numEmptyPartitions)
driverAccumUpdates += (numEmptyPartitionsMetric.id -> numEmptyPartitions.toLong)

if (hasSkewedPartition) {
val skewedSpecs = partitionSpecs.collect {
case p: PartialReducerPartitionSpec => p
Expand All @@ -200,15 +209,7 @@ case class AQEShuffleReadExec private(
val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions")
val x = partitionSpecs.count(isCoalescedSpec)
numCoalescedPartitionsMetric.set(x)
val numEmptyPartitionsMetric = metrics("numEmptyPartitions")
val y = child match {
case s: ShuffleQueryStageExec =>
s.mapStats.map(stats => stats.bytesByPartitionId.count(_ == 0)).getOrElse(0)
case _ => 0
}
numEmptyPartitionsMetric.set(y)
driverAccumUpdates ++= Seq(numCoalescedPartitionsMetric.id -> x,
numEmptyPartitionsMetric.id -> y)
driverAccumUpdates ++= Seq(numCoalescedPartitionsMetric.id -> x)
}

partitionDataSizes.foreach { dataSizes =>
Expand All @@ -223,7 +224,9 @@ case class AQEShuffleReadExec private(

@transient override lazy val metrics: Map[String, SQLMetric] = {
if (shuffleStage.isDefined) {
Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")) ++ {
Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions"),
"numEmptyPartitions" ->
SQLMetrics.createMetric(sparkContext, "number of empty partitions")) ++ {
if (isLocalRead) {
// We split the mapper partition evenly when creating local shuffle read, so no
// data size info is available.
Expand All @@ -244,9 +247,7 @@ case class AQEShuffleReadExec private(
} ++ {
if (hasCoalescedPartition) {
Map("numCoalescedPartitions" ->
SQLMetrics.createMetric(sparkContext, "number of coalesced partitions"),
"numEmptyPartitions" ->
SQLMetrics.createMetric(sparkContext, "number of empty partitions"))
SQLMetrics.createMetric(sparkContext, "number of coalesced partitions"))
} else {
Map.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,16 +503,14 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with SQLConfHelper

test("SPARK-51505: log empty partition number metrics") {
val test: SparkSession => Unit = { spark: SparkSession =>
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
val df = spark.range(0, 1000, 1, 5).withColumn("value", when(col("id") < 500, 0)
.otherwise(1)).groupBy("value").agg("value" -> "sum")
df.collect()
val plan = df.queryExecution.executedPlan
val coalesce = collectFirst(plan) {
case e: AQEShuffleReadExec => e
}.get
assert(coalesce.metrics("numEmptyPartitions").value == 3)
}
val df = spark.range(0, 1000, 1, 10).withColumn("value", expr("id % 3"))
.groupBy("value").agg("value" -> "sum")
df.collect()
val plan = df.queryExecution.executedPlan
val coalesce = collectFirst(plan) {
case e: AQEShuffleReadExec => e
}.get
assert(coalesce.metrics("numEmptyPartitions").value == 2)
}
withSparkSession(test, 100, None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ class AdaptiveQueryExecSuite
}
}

test("metrics of the shuffle read") {
test("AABCmetrics of the shuffle read") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT key FROM testData GROUP BY key")
Expand Down Expand Up @@ -1124,7 +1124,7 @@ class AdaptiveQueryExecSuite
assert(reads.length == 1)
val read = reads.head
assert(read.isLocalRead)
assert(read.metrics.keys.toSeq == Seq("numPartitions"))
assert(read.metrics.keys.toSeq == Seq("numPartitions", "numEmptyPartitions"))
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
}

Expand Down