Skip to content

Commit c2dd021

Browse files
pan3793dongjoon-hyun
authored andcommitted
[SPARK-52632][SQL] Pretty display V2 write plan nodes
### What changes were proposed in this pull request? Pretty display V2 write plan nodes by overriding `def stringArgs`. ### Why are the changes needed? Better UX for UI display and `EXPLAIN` output ### Does this PR introduce _any_ user-facing change? Yes, change affects UI display and `EXPLAIN` output for V2 write cases. ### How was this patch tested? Use AppendData as an example. Before <img width="1721" alt="Xnip2025-07-01_16-39-59" src="https://github.com/user-attachments/assets/9dcab5e2-c768-45a3-8c0b-f7059c8c1389" /> After <img width="1712" alt="Xnip2025-07-01_16-30-00" src="https://github.com/user-attachments/assets/fd3fdd7c-da5e-49be-b801-f3debde02808" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51332 from pan3793/SPARK-52632. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 3fb4adb commit c2dd021

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStr
2727
import org.apache.spark.sql.sources.DataSourceRegister
2828
import org.apache.spark.sql.types.StructType
2929
import org.apache.spark.sql.util.CaseInsensitiveStringMap
30+
import org.apache.spark.util.Utils
3031

3132
/**
3233
* This is no-op datasource. It does not do anything besides consuming its input.
@@ -48,6 +49,7 @@ private[noop] object NoopTable extends Table with SupportsWrite {
4849
TableCapability.TRUNCATE,
4950
TableCapability.ACCEPT_ANY_SCHEMA)
5051
}
52+
override def toString: String = Utils.getFormattedClassName(this)
5153
}
5254

5355
private[noop] object NoopWriteBuilder extends WriteBuilder
@@ -59,6 +61,7 @@ private[noop] object NoopWriteBuilder extends WriteBuilder
5961
private[noop] object NoopWrite extends Write {
6062
override def toBatch: BatchWrite = NoopBatchWrite
6163
override def toStreaming: StreamingWrite = NoopStreamingWrite
64+
override def toString: String = Utils.getFormattedClassName(this)
6265
}
6366

6467
private[noop] object NoopBatchWrite extends BatchWrite {
@@ -67,6 +70,7 @@ private[noop] object NoopBatchWrite extends BatchWrite {
6770
override def useCommitCoordinator(): Boolean = false
6871
override def commit(messages: Array[WriterCommitMessage]): Unit = {}
6972
override def abort(messages: Array[WriterCommitMessage]): Unit = {}
73+
override def toString: String = Utils.getFormattedClassName(this)
7074
}
7175

7276
private[noop] object NoopWriterFactory extends DataWriterFactory {
@@ -86,6 +90,7 @@ private[noop] object NoopStreamingWrite extends StreamingWrite {
8690
override def useCommitCoordinator(): Boolean = false
8791
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
8892
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
93+
override def toString: String = Utils.getFormattedClassName(this)
8994
}
9095

9196
private[noop] object NoopStreamingDataWriterFactory extends StreamingDataWriterFactory {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,6 @@ case class ReplaceDataExec(
303303
projections: ReplaceDataProjections,
304304
write: Write) extends V2ExistingTableWriteExec {
305305

306-
override val stringArgs: Iterator[Any] = Iterator(query, write)
307-
308306
override def writingTask: WritingSparkTask[_] = {
309307
projections match {
310308
case ReplaceDataProjections(dataProj, Some(metadataProj)) =>
@@ -328,8 +326,6 @@ case class WriteDeltaExec(
328326
projections: WriteDeltaProjections,
329327
write: DeltaWrite) extends V2ExistingTableWriteExec {
330328

331-
override lazy val stringArgs: Iterator[Any] = Iterator(query, write)
332-
333329
override lazy val writingTask: WritingSparkTask[_] = {
334330
if (projections.metadataProjection.isDefined) {
335331
DeltaWithMetadataWritingSparkTask(projections)
@@ -349,6 +345,8 @@ case class WriteToDataSourceV2Exec(
349345
query: SparkPlan,
350346
writeMetrics: Seq[CustomMetric]) extends V2TableWriteExec {
351347

348+
override val stringArgs: Iterator[Any] = Iterator(batchWrite, query)
349+
352350
override val customMetrics: Map[String, SQLMetric] = writeMetrics.map { customMetric =>
353351
customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric)
354352
}.toMap
@@ -367,6 +365,8 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec {
367365
def refreshCache: () => Unit
368366
def write: Write
369367

368+
override val stringArgs: Iterator[Any] = Iterator(query, write)
369+
370370
override val customMetrics: Map[String, SQLMetric] =
371371
write.supportedCustomMetrics().map { customMetric =>
372372
customMetric.name() -> SQLMetrics.createV2CustomMetric(sparkContext, customMetric)

0 commit comments

Comments
 (0)