Skip to content

Commit 781f32d

Browse files
committed
Review comments
1 parent 3162af0 commit 781f32d

File tree

4 files changed

+61
-170
lines changed

4 files changed

+61
-170
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java

Lines changed: 10 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -16,148 +16,47 @@
1616
*/
1717
package org.apache.spark.sql.connector.metric;
1818

19-
public interface MergeMetrics {
20-
21-
class Builder {
22-
private long numTargetRowsCopied = -1;
23-
private long numTargetRowsInserted = -1;
24-
private long numTargetRowsDeleted = -1;
25-
private long numTargetRowsUpdated = -1;
26-
private long numTargetRowsMatchedUpdated = -1;
27-
private long numTargetRowsMatchedDeleted = -1;
28-
private long numTargetRowsNotMatchedBySourceUpdated = -1;
29-
private long numTargetRowsNotMatchedBySourceDeleted = -1;
30-
private long numSourceRows = -1;
31-
32-
public Builder numTargetRowsCopied(long numTargetRowsCopied) {
33-
this.numTargetRowsCopied = numTargetRowsCopied;
34-
return this;
35-
}
36-
37-
public Builder numTargetRowsInserted(long numTargetRowsInserted) {
38-
this.numTargetRowsInserted = numTargetRowsInserted;
39-
return this;
40-
}
41-
42-
public Builder numTargetRowsDeleted(long numTargetRowsDeleted) {
43-
this.numTargetRowsDeleted = numTargetRowsDeleted;
44-
return this;
45-
}
46-
47-
public Builder numTargetRowsUpdated(long numTargetRowsUpdated) {
48-
this.numTargetRowsUpdated = numTargetRowsUpdated;
49-
return this;
50-
}
51-
52-
public Builder numTargetRowsMatchedUpdated(long numTargetRowsMatchedUpdated) {
53-
this.numTargetRowsMatchedUpdated = numTargetRowsMatchedUpdated;
54-
return this;
55-
}
56-
57-
public Builder numTargetRowsMatchedDeleted(long numTargetRowsMatchedDeleted) {
58-
this.numTargetRowsMatchedDeleted = numTargetRowsMatchedDeleted;
59-
return this;
60-
}
61-
62-
public Builder numTargetRowsNotMatchedBySourceUpdated(long numTargetRowsNotMatchedBySourceUpdated) {
63-
this.numTargetRowsNotMatchedBySourceUpdated = numTargetRowsNotMatchedBySourceUpdated;
64-
return this;
65-
}
66-
67-
public Builder numTargetRowsNotMatchedBySourceDeleted(long numTargetRowsNotMatchedBySourceDeleted) {
68-
this.numTargetRowsNotMatchedBySourceDeleted = numTargetRowsNotMatchedBySourceDeleted;
69-
return this;
70-
}
71-
72-
public MergeMetrics build() {
73-
return new MergeMetrics() {
74-
@Override
75-
public long numTargetRowsCopied() {
76-
return numTargetRowsCopied;
77-
}
19+
import java.util.OptionalLong;
7820

79-
@Override
80-
public long numTargetRowsInserted() {
81-
return numTargetRowsInserted;
82-
}
83-
84-
@Override
85-
public long numTargetRowsDeleted() {
86-
return numTargetRowsDeleted;
87-
}
88-
89-
@Override
90-
public long numTargetRowsUpdated() {
91-
return numTargetRowsUpdated;
92-
}
93-
94-
@Override
95-
public long numTargetRowsMatchedUpdated() {
96-
return numTargetRowsMatchedUpdated;
97-
}
98-
99-
@Override
100-
public long numTargetRowsMatchedDeleted() {
101-
return numTargetRowsMatchedDeleted;
102-
}
103-
104-
@Override
105-
public long numTargetRowsNotMatchedBySourceUpdated() {
106-
return numTargetRowsNotMatchedBySourceUpdated;
107-
}
108-
109-
@Override
110-
public long numTargetRowsNotMatchedBySourceDeleted() {
111-
return numTargetRowsNotMatchedBySourceDeleted;
112-
}
113-
};
114-
}
115-
}
116-
117-
/**
118-
* Returns a new builder for MergeMetrics.
119-
*/
120-
static Builder builder() {
121-
return new MergeMetrics.Builder();
122-
}
21+
public interface MergeMetrics {
12322

12423
/**
12524
* Returns the number of target rows copied unmodified because they did not match any action.
12625
*/
127-
long numTargetRowsCopied();
26+
OptionalLong numTargetRowsCopied();
12827

12928
/**
13029
* Returns the number of target rows inserted.
13130
*/
132-
long numTargetRowsInserted();
31+
OptionalLong numTargetRowsInserted();
13332

13433
/**
13534
* Returns the number of target rows deleted.
13635
*/
137-
long numTargetRowsDeleted();
36+
OptionalLong numTargetRowsDeleted();
13837

13938
/**
14039
* Returns the number of target rows updated.
14140
*/
142-
long numTargetRowsUpdated();
41+
OptionalLong numTargetRowsUpdated();
14342

14443
/**
14544
* Returns the number of target rows matched and updated by a matched clause.
14645
*/
147-
long numTargetRowsMatchedUpdated();
46+
OptionalLong numTargetRowsMatchedUpdated();
14847

14948
/**
15049
* Returns the number of target rows matched and deleted by a matched clause.
15150
*/
152-
long numTargetRowsMatchedDeleted();
51+
OptionalLong numTargetRowsMatchedDeleted();
15352

15453
/**
15554
* Returns the number of target rows updated by a not matched by source clause.
15655
*/
157-
long numTargetRowsNotMatchedBySourceUpdated();
56+
OptionalLong numTargetRowsNotMatchedBySourceUpdated();
15857

15958
/**
16059
* Returns the number of target rows deleted by a not matched by source clause.
16160
*/
162-
long numTargetRowsNotMatchedBySourceDeleted();
61+
OptionalLong numTargetRowsNotMatchedBySourceDeleted();
16362
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
107107
*/
108108
void abort(WriterCommitMessage[] messages);
109109

110-
/**
111-
* Whether this batch write requests merge execution metrics.
112-
*/
113-
default boolean requestMergeMetrics() {
114-
return false;
115-
}
116-
117110
/**
118111
* Similar to {@link #commit(WriterCommitMessage[])}, but providing merge exec metrics to this batch write.
119112
* @param metrics merge execution metrics
120113
*/
121-
default void commitWithMerge(WriterCommitMessage[] messages, MergeMetrics metrics) {
114+
default void commitMerge(WriterCommitMessage[] messages, MergeMetrics metrics) {
122115
commit(messages);
123116
}
124117
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -114,39 +114,36 @@ class InMemoryRowLevelOperationTable(
114114
}
115115

116116
abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
117-
override def requestMergeMetrics(): Boolean = true
118117

119-
override def commitWithMerge(messages: Array[WriterCommitMessage], metrics: MergeMetrics):
118+
override def commitMerge(messages: Array[WriterCommitMessage], metrics: MergeMetrics):
120119
Unit = {
121-
commitProperties += "numTargetRowsCopied" -> metrics.numTargetRowsCopied().toString
122-
commitProperties += "numTargetRowsInserted" -> metrics.numTargetRowsInserted().toString
123-
commitProperties += "numTargetRowsDeleted" -> metrics.numTargetRowsDeleted().toString
124-
commitProperties += "numTargetRowsUpdated" -> metrics.numTargetRowsUpdated().toString
125-
commitProperties += "numTargetRowsInserted" -> metrics.numTargetRowsInserted().toString
126-
commitProperties += ("numTargetRowsNotMatchedBySourceUpdated"
127-
-> metrics.numTargetRowsNotMatchedBySourceUpdated().toString)
128-
commitProperties += ("numTargetRowsNotMatchedBySourceDeleted"
129-
-> metrics.numTargetRowsNotMatchedBySourceDeleted().toString)
120+
commitProperties += "numTargetRowsCopied" -> metrics.numTargetRowsCopied().orElse(-1).toString
121+
commitProperties += "numTargetRowsInserted" ->
122+
metrics.numTargetRowsInserted().orElse(-1).toString
123+
commitProperties += "numTargetRowsDeleted" ->
124+
metrics.numTargetRowsDeleted().orElse(-1).toString
125+
commitProperties += "numTargetRowsUpdated" ->
126+
metrics.numTargetRowsUpdated().orElse(-1).toString
127+
commitProperties += "numTargetRowsInserted" ->
128+
metrics.numTargetRowsInserted().orElse(-1).toString
130129
commitProperties += ("numTargetRowsMatchedDeleted"
131-
-> metrics.numTargetRowsMatchedDeleted().toString)
130+
-> metrics.numTargetRowsMatchedDeleted().orElse(-1).toString)
132131
commitProperties += ("numTargetRowsMatchedUpdated"
133-
-> metrics.numTargetRowsMatchedUpdated().toString)
132+
-> metrics.numTargetRowsMatchedUpdated().orElse(-1).toString)
133+
commitProperties += ("numTargetRowsNotMatchedBySourceUpdated"
134+
-> metrics.numTargetRowsNotMatchedBySourceUpdated().orElse(-1).toString)
135+
commitProperties += ("numTargetRowsNotMatchedBySourceDeleted"
136+
-> metrics.numTargetRowsNotMatchedBySourceDeleted().orElse(-1).toString)
134137
commit(messages)
135138
commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap)
136139
commitProperties.clear()
137140
}
138-
139-
override def commit(messages: Array[WriterCommitMessage]): Unit = {
140-
doCommit(messages)
141-
}
142-
143-
def doCommit(messages: Array[WriterCommitMessage]): Unit
144141
}
145142

146143
private case class PartitionBasedReplaceData(scan: InMemoryBatchScan)
147144
extends RowLevelOperationBatchWrite {
148145

149-
override def doCommit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
146+
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
150147
val newData = messages.map(_.asInstanceOf[BufferedRows])
151148
val readRows = scan.data.flatMap(_.asInstanceOf[BufferedRows].rows)
152149
val readPartitions = readRows.map(r => getKey(r, schema)).distinct
@@ -203,7 +200,7 @@ class InMemoryRowLevelOperationTable(
203200
DeltaBufferedRowsWriterFactory
204201
}
205202

206-
override def doCommit(messages: Array[WriterCommitMessage]): Unit = {
203+
override def commit(messages: Array[WriterCommitMessage]): Unit = {
207204
val newData = messages.map(_.asInstanceOf[BufferedRows])
208205
withDeletes(newData)
209206
withData(newData)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala

Lines changed: 32 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20+
import java.util.OptionalLong
21+
2022
import scala.jdk.CollectionConverters._
2123

2224
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
@@ -453,15 +455,11 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
453455
}
454456
)
455457

456-
val mergeMetricsOpt = if (batchWrite.requestMergeMetrics()) {
457-
Some(getMergeMetrics(query))
458-
} else {
459-
None
460-
}
458+
val mergeMetricsOpt = getMergeMetrics(query)
461459

462460
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.")
463461
mergeMetricsOpt match {
464-
case Some(metrics) => batchWrite.commitWithMerge(messages, metrics)
462+
case Some(metrics) => batchWrite.commitMerge(messages, metrics)
465463
case None => batchWrite.commit(messages)
466464
}
467465
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.")
@@ -486,35 +484,29 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa
486484
Nil
487485
}
488486

489-
private def getMergeMetrics(query: SparkPlan): MergeMetrics = {
490-
val mergeMetricsBuilder = new MergeMetrics.Builder()
491-
val mergeNode = collectFirst(query) {case c: MergeRowsExec => c}
492-
mergeNode.foreach {n => mergeMetricValues(n.metrics, mergeMetricsBuilder)}
493-
mergeMetricsBuilder.build()
487+
private def getMergeMetrics(query: SparkPlan): Option[MergeMetrics] = {
488+
collectFirst(query) { case m: MergeRowsExec => m }.map{ n =>
489+
MergeMetricsImpl(
490+
numTargetRowsCopied = metric(n.metrics, "numTargetRowsCopied"),
491+
numTargetRowsDeleted = metric(n.metrics, "numTargetRowsDeleted"),
492+
numTargetRowsUpdated = metric(n.metrics, "numTargetRowsUpdated"),
493+
numTargetRowsInserted = metric(n.metrics, "numTargetRowsInserted"),
494+
numTargetRowsMatchedDeleted = metric(n.metrics, "numTargetRowsMatchedDeleted"),
495+
numTargetRowsMatchedUpdated = metric(n.metrics, "numTargetRowsMatchedUpdated"),
496+
numTargetRowsNotMatchedBySourceDeleted =
497+
metric(n.metrics, "numTargetRowsNotMatchedBySourceDeleted"),
498+
numTargetRowsNotMatchedBySourceUpdated =
499+
metric(n.metrics, "numTargetRowsNotMatchedBySourceUpdated")
500+
)
501+
}
494502
}
495503

496-
private def metric(metrics: Map[String, SQLMetric], metric: String): Long = {
504+
private def metric(metrics: Map[String, SQLMetric], metric: String): OptionalLong = {
497505
metrics.get(metric) match {
498-
case Some(m) => m.value
499-
case None => -1
506+
case Some(m) => OptionalLong.of(m.value)
507+
case None => OptionalLong.empty()
500508
}
501509
}
502-
503-
private def mergeMetricValues(mergeNodeMetrics: Map[String, SQLMetric],
504-
metricBuilder: MergeMetrics.Builder): Option[MergeMetrics.Builder] = {
505-
metricBuilder
506-
.numTargetRowsCopied(metric(mergeNodeMetrics, "numTargetRowsCopied"))
507-
.numTargetRowsDeleted(metric(mergeNodeMetrics, "numTargetRowsDeleted"))
508-
.numTargetRowsUpdated(metric(mergeNodeMetrics, "numTargetRowsUpdated"))
509-
.numTargetRowsInserted(metric(mergeNodeMetrics, "numTargetRowsInserted"))
510-
.numTargetRowsNotMatchedBySourceDeleted(
511-
metric(mergeNodeMetrics, "numTargetRowsNotMatchedBySourceDeleted"))
512-
.numTargetRowsNotMatchedBySourceUpdated(
513-
metric(mergeNodeMetrics, "numTargetRowsNotMatchedBySourceUpdated"))
514-
.numTargetRowsMatchedDeleted(metric(mergeNodeMetrics, "numTargetRowsMatchedDeleted"))
515-
.numTargetRowsMatchedUpdated(metric(mergeNodeMetrics, "numTargetRowsMatchedUpdated"))
516-
Some(metricBuilder)
517-
}
518510
}
519511

520512
trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable {
@@ -769,3 +761,13 @@ private[v2] case class DataWritingSparkTaskResult(
769761
* Sink progress information collected after commit.
770762
*/
771763
private[sql] case class StreamWriterCommitProgress(numOutputRows: Long)
764+
765+
private case class MergeMetricsImpl(
766+
override val numTargetRowsCopied: OptionalLong,
767+
override val numTargetRowsDeleted: OptionalLong,
768+
override val numTargetRowsUpdated: OptionalLong,
769+
override val numTargetRowsInserted: OptionalLong,
770+
override val numTargetRowsMatchedUpdated: OptionalLong,
771+
override val numTargetRowsMatchedDeleted: OptionalLong,
772+
override val numTargetRowsNotMatchedBySourceUpdated: OptionalLong,
773+
override val numTargetRowsNotMatchedBySourceDeleted: OptionalLong) extends MergeMetrics

0 commit comments

Comments
 (0)