@@ -20,14 +20,12 @@ package org.apache.spark.sql.connector.catalog
20
20
import java .time .Instant
21
21
import java .util
22
22
23
- import scala .collection .mutable .ListBuffer
24
-
25
23
import org .apache .spark .sql .catalyst .InternalRow
26
24
import org .apache .spark .sql .catalyst .expressions .GenericInternalRow
27
25
import org .apache .spark .sql .connector .catalog .constraints .Constraint
28
26
import org .apache .spark .sql .connector .distributions .{Distribution , Distributions }
29
27
import org .apache .spark .sql .connector .expressions .{FieldReference , LogicalExpressions , NamedReference , SortDirection , SortOrder , Transform }
30
- import org .apache .spark .sql .connector .metric .CustomTaskMetric
28
+ import org .apache .spark .sql .connector .metric .V2ExecMetric
31
29
import org .apache .spark .sql .connector .read .{Scan , ScanBuilder }
32
30
import org .apache .spark .sql .connector .write .{BatchWrite , DeltaBatchWrite , DeltaWrite , DeltaWriteBuilder , DeltaWriter , DeltaWriterFactory , LogicalWriteInfo , PhysicalWriteInfo , RequiresDistributionAndOrdering , RowLevelOperation , RowLevelOperationBuilder , RowLevelOperationInfo , SupportsDelta , Write , WriteBuilder , WriterCommitMessage }
33
31
import org .apache .spark .sql .connector .write .RowLevelOperation .Command
@@ -50,8 +48,6 @@ class InMemoryRowLevelOperationTable(
50
48
constraints)
51
49
with SupportsRowLevelOperations {
52
50
53
- private val _scans = ListBuffer .empty[Scan ]
54
-
55
51
private final val PARTITION_COLUMN_REF = FieldReference (PartitionKeyColumn .name)
56
52
private final val INDEX_COLUMN_REF = FieldReference (IndexColumn .name)
57
53
private final val SUPPORTS_DELTAS = " supports-deltas"
@@ -74,16 +70,6 @@ class InMemoryRowLevelOperationTable(
74
70
}
75
71
}
76
72
77
- class InMemoryRowLevelOperationScanBuilder (tableSchema : StructType ,
78
- options : CaseInsensitiveStringMap )
79
- extends InMemoryScanBuilder (tableSchema, options) {
80
- override def build : Scan = {
81
- val scan = super .build
82
- _scans += scan
83
- scan
84
- }
85
- }
86
-
87
73
case class PartitionBasedOperation (command : Command ) extends RowLevelOperation {
88
74
var configuredScan : InMemoryBatchScan = _
89
75
@@ -117,7 +103,7 @@ class InMemoryRowLevelOperationTable(
117
103
SortDirection .ASCENDING .defaultNullOrdering()))
118
104
}
119
105
120
- override def toBatch : BatchWrite = PartitionBasedReplaceData (configuredScan, command )
106
+ override def toBatch : BatchWrite = PartitionBasedReplaceData (configuredScan)
121
107
122
108
override def description : String = " InMemoryWrite"
123
109
}
@@ -127,33 +113,14 @@ class InMemoryRowLevelOperationTable(
127
113
override def description (): String = " InMemoryPartitionReplaceOperation"
128
114
}
129
115
130
- abstract class RowLevelOperationBatchWrite ( command : Command ) extends TestBatchWrite {
131
- override def requestExecMetrics (): Command = command
116
+ abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
117
+ override def requestExecMetrics (): Boolean = true
132
118
133
- override def execMetrics (metrics : Array [CustomTaskMetric ]): Unit = {
134
- metrics.foreach(m => commitProperties += ( m.name() -> m.value().toString ))
119
+ override def execMetrics (metrics : Array [V2ExecMetric ]): Unit = {
120
+ metrics.foreach(m => commitProperties += s " ${m.metricType()} . ${ m.name()} " -> m.value())
135
121
}
136
122
137
123
override def commit (messages : Array [WriterCommitMessage ]): Unit = {
138
- assert(_scans.size <= 2 , " Expected at most two scans in row-level operations" )
139
- assert(_scans.count{ case s : InMemoryBatchScan => s.setFilters.nonEmpty } <= 1 ,
140
- " Expected at most one scan with runtime filters in row-level operations" )
141
- assert(_scans.count{ case s : InMemoryBatchScan => s.setFilters.isEmpty } <= 1 ,
142
- " Expected at most one scan without runtime filters in row-level operations" )
143
-
144
- _scans.foreach{
145
- case s : InMemoryBatchScan =>
146
- val prefix = if (s.setFilters.isEmpty) {
147
- " "
148
- } else {
149
- " secondScan."
150
- }
151
- s.reportDriverMetrics().foreach { metric =>
152
- commitProperties += (prefix + metric.name() -> metric.value().toString)
153
- }
154
- case _ =>
155
- }
156
- _scans.clear()
157
124
doCommit(messages)
158
125
commits += Commit (Instant .now().toEpochMilli, commitProperties.toMap)
159
126
commitProperties.clear()
@@ -162,9 +129,8 @@ class InMemoryRowLevelOperationTable(
162
129
def doCommit (messages : Array [WriterCommitMessage ]): Unit
163
130
}
164
131
165
- private case class PartitionBasedReplaceData (scan : InMemoryBatchScan ,
166
- command : RowLevelOperation .Command )
167
- extends RowLevelOperationBatchWrite (command) {
132
+ private case class PartitionBasedReplaceData (scan : InMemoryBatchScan )
133
+ extends RowLevelOperationBatchWrite {
168
134
169
135
override def doCommit (messages : Array [WriterCommitMessage ]): Unit = dataMap.synchronized {
170
136
val newData = messages.map(_.asInstanceOf [BufferedRows ])
@@ -187,7 +153,7 @@ class InMemoryRowLevelOperationTable(
187
153
override def rowId (): Array [NamedReference ] = Array (PK_COLUMN_REF )
188
154
189
155
override def newScanBuilder (options : CaseInsensitiveStringMap ): ScanBuilder = {
190
- new InMemoryRowLevelOperationScanBuilder (schema, options)
156
+ new InMemoryScanBuilder (schema, options)
191
157
}
192
158
193
159
override def newWriteBuilder (info : LogicalWriteInfo ): DeltaWriteBuilder = {
@@ -208,7 +174,7 @@ class InMemoryRowLevelOperationTable(
208
174
)
209
175
}
210
176
211
- override def toBatch : DeltaBatchWrite = TestDeltaBatchWrite (command)
177
+ override def toBatch : DeltaBatchWrite = TestDeltaBatchWrite
212
178
}
213
179
}
214
180
}
@@ -218,8 +184,8 @@ class InMemoryRowLevelOperationTable(
218
184
}
219
185
}
220
186
221
- private case class TestDeltaBatchWrite ( command : Command )
222
- extends RowLevelOperationBatchWrite (command) with DeltaBatchWrite {
187
+ private object TestDeltaBatchWrite extends RowLevelOperationBatchWrite
188
+ with DeltaBatchWrite {
223
189
224
190
override def createBatchWriterFactory (info : PhysicalWriteInfo ): DeltaWriterFactory = {
225
191
DeltaBufferedRowsWriterFactory
0 commit comments