Skip to content

Commit 3fc94aa

Browse files
committed
Fixes
1 parent c17c6b9 commit 3fc94aa

File tree

8 files changed

+322
-120
lines changed

8 files changed

+322
-120
lines changed
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.connector.metric;
18+
19+
public interface MergeMetrics {
20+
21+
class Builder {
22+
private long numTargetRowsCopied = -1;
23+
private long numTargetRowsInserted = -1;
24+
private long numTargetRowsDeleted = -1;
25+
private long numTargetRowsUpdated = -1;
26+
private long numTargetRowsMatchedUpdated = -1;
27+
private long numTargetRowsMatchedDeleted = -1;
28+
private long numTargetRowsNotMatchedBySourceUpdated = -1;
29+
private long numTargetRowsNotMatchedBySourceDeleted = -1;
30+
private long numSourceRows = -1;
31+
32+
public Builder numTargetRowsCopied(long numTargetRowsCopied) {
33+
this.numTargetRowsCopied = numTargetRowsCopied;
34+
return this;
35+
}
36+
37+
public Builder numTargetRowsInserted(long numTargetRowsInserted) {
38+
this.numTargetRowsInserted = numTargetRowsInserted;
39+
return this;
40+
}
41+
42+
public Builder numTargetRowsDeleted(long numTargetRowsDeleted) {
43+
this.numTargetRowsDeleted = numTargetRowsDeleted;
44+
return this;
45+
}
46+
47+
public Builder numTargetRowsUpdated(long numTargetRowsUpdated) {
48+
this.numTargetRowsUpdated = numTargetRowsUpdated;
49+
return this;
50+
}
51+
52+
public Builder numTargetRowsMatchedUpdated(long numTargetRowsMatchedUpdated) {
53+
this.numTargetRowsMatchedUpdated = numTargetRowsMatchedUpdated;
54+
return this;
55+
}
56+
57+
public Builder numTargetRowsMatchedDeleted(long numTargetRowsMatchedDeleted) {
58+
this.numTargetRowsMatchedDeleted = numTargetRowsMatchedDeleted;
59+
return this;
60+
}
61+
62+
public Builder numTargetRowsNotMatchedBySourceUpdated(long numTargetRowsNotMatchedBySourceUpdated) {
63+
this.numTargetRowsNotMatchedBySourceUpdated = numTargetRowsNotMatchedBySourceUpdated;
64+
return this;
65+
}
66+
67+
public Builder numTargetRowsNotMatchedBySourceDeleted(long numTargetRowsNotMatchedBySourceDeleted) {
68+
this.numTargetRowsNotMatchedBySourceDeleted = numTargetRowsNotMatchedBySourceDeleted;
69+
return this;
70+
}
71+
72+
public MergeMetrics build() {
73+
return new MergeMetrics() {
74+
@Override
75+
public long numTargetRowsCopied() {
76+
return numTargetRowsCopied;
77+
}
78+
79+
@Override
80+
public long numTargetRowsInserted() {
81+
return numTargetRowsInserted;
82+
}
83+
84+
@Override
85+
public long numTargetRowsDeleted() {
86+
return numTargetRowsDeleted;
87+
}
88+
89+
@Override
90+
public long numTargetRowsUpdated() {
91+
return numTargetRowsUpdated;
92+
}
93+
94+
@Override
95+
public long numTargetRowsMatchedUpdated() {
96+
return numTargetRowsMatchedUpdated;
97+
}
98+
99+
@Override
100+
public long numTargetRowsMatchedDeleted() {
101+
return numTargetRowsMatchedDeleted;
102+
}
103+
104+
@Override
105+
public long numTargetRowsNotMatchedBySourceUpdated() {
106+
return numTargetRowsNotMatchedBySourceUpdated;
107+
}
108+
109+
@Override
110+
public long numTargetRowsNotMatchedBySourceDeleted() {
111+
return numTargetRowsNotMatchedBySourceDeleted;
112+
}
113+
};
114+
}
115+
}
116+
117+
/**
118+
* Returns a new builder for MergeMetrics.
119+
*/
120+
static Builder builder() {
121+
return new MergeMetrics.Builder();
122+
}
123+
124+
/**
125+
* Returns the number of target rows copied unmodified because they did not match any action.
126+
*/
127+
long numTargetRowsCopied();
128+
129+
/**
130+
* Returns the number of target rows inserted.
131+
*/
132+
long numTargetRowsInserted();
133+
134+
/**
135+
* Returns the number of target rows deleted.
136+
*/
137+
long numTargetRowsDeleted();
138+
139+
/**
140+
* Returns the number of target rows updated.
141+
*/
142+
long numTargetRowsUpdated();
143+
144+
/**
145+
* Returns the number of target rows matched and updated by a matched clause.
146+
*/
147+
long numTargetRowsMatchedUpdated();
148+
149+
/**
150+
* Returns the number of target rows matched and deleted by a matched clause.
151+
*/
152+
long numTargetRowsMatchedDeleted();
153+
154+
/**
155+
* Returns the number of target rows updated by a not matched by source clause.
156+
*/
157+
long numTargetRowsNotMatchedBySourceUpdated();
158+
159+
/**
160+
* Returns the number of target rows deleted by a not matched by source clause.
161+
*/
162+
long numTargetRowsNotMatchedBySourceDeleted();
163+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.spark.annotation.Evolving;
2121
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
22+
import org.apache.spark.sql.connector.metric.MergeMetrics;
2223

2324
/**
2425
* An interface that defines how to write the data to data source for batch processing.
@@ -106,18 +107,18 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
106107
*/
107108
void abort(WriterCommitMessage[] messages);
108109

109-
110110
/**
111-
* Whether this batch write requests execution metrics. Returns a row level operation command this batch write
112-
* is part of, if requested. Return null if not requested.
111+
* Whether this batch write requests merge execution metrics.
113112
*/
114-
default RowLevelOperation.Command requestExecMetrics() {
115-
return null;
113+
default boolean requestMergeMetrics() {
114+
return false;
116115
}
117116

118117
/**
119-
* Provides an array of query execution metrics to the batch write prior to commit.
120-
* @param metrics an array of execution metrics
118+
* Similar to {@link #commit(WriterCommitMessage[])}, but providing merge exec metrics to this batch write.
119+
* @param metrics merge execution metrics
121120
*/
122-
default void execMetrics(CustomTaskMetric[] metrics) {}
121+
default void commitWithMerge(WriterCommitMessage[] messages, MergeMetrics metrics) {
122+
commit(messages);
123+
}
123124
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,5 @@ default CustomMetric[] supportedCustomMetrics() {
8686
default CustomTaskMetric[] reportDriverMetrics() {
8787
return new CustomTaskMetric[]{};
8888
}
89+
8990
}

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -501,32 +501,13 @@ abstract class InMemoryBaseTable(
501501
options: CaseInsensitiveStringMap)
502502
extends BatchScanBaseClass(_data, readSchema, tableSchema) with SupportsRuntimeFiltering {
503503

504-
var setFilters = Array.empty[Filter]
505-
506-
override def reportDriverMetrics(): Array[CustomTaskMetric] =
507-
Array(new CustomTaskMetric{
508-
override def name(): String = "numSplits"
509-
override def value(): Long = 1L
510-
})
511-
512-
override def supportedCustomMetrics(): Array[CustomMetric] = {
513-
Array(new CustomMetric {
514-
override def name(): String = "numSplits"
515-
override def description(): String = "number of splits in the scan"
516-
override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
517-
taskMetrics.sum.toString
518-
}
519-
})
520-
}
521-
522504
override def filterAttributes(): Array[NamedReference] = {
523505
val scanFields = readSchema.fields.map(_.name).toSet
524506
partitioning.flatMap(_.references)
525507
.filter(ref => scanFields.contains(ref.fieldNames.mkString(".")))
526508
}
527509

528510
override def filter(filters: Array[Filter]): Unit = {
529-
this.setFilters = filters
530511
if (partitioning.length == 1 && partitioning.head.references().length == 1) {
531512
val ref = partitioning.head.references().head
532513
filters.foreach {
@@ -779,14 +760,6 @@ private class BufferedRowsReader(
779760

780761
override def close(): Unit = {}
781762

782-
override def currentMetricsValues(): Array[CustomTaskMetric] =
783-
Array[CustomTaskMetric](
784-
new CustomTaskMetric {
785-
override def name(): String = "numSplits"
786-
override def value(): Long = 1
787-
}
788-
)
789-
790763
private def extractFieldValue(
791764
field: StructField,
792765
schema: StructType,

0 commit comments

Comments
 (0)