From c17c6b9c192eed885c74e9154b1772a23d9d3a7b Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Thu, 26 Jun 2025 10:06:50 -0700 Subject: [PATCH 1/4] Send DML metrics from job to V2Write --- .../spark/sql/connector/write/BatchWrite.java | 16 +++++ .../spark/sql/connector/write/Write.java | 1 - .../connector/catalog/InMemoryBaseTable.scala | 42 +++++++++++ .../InMemoryRowLevelOperationTable.scala | 69 +++++++++++++++++-- .../v2/WriteToDataSourceV2Exec.scala | 24 ++++++- .../connector/MergeIntoTableSuiteBase.scala | 39 +++++++++++ .../sql/hive/execution/HiveDDLSuite.scala | 2 +- 7 files changed, 181 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java index 8c068928415f4..eca64ffb71b0c 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.write; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; /** * An interface that defines how to write the data to data source for batch processing. @@ -104,4 +105,19 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * clean up the data left by data writers. */ void abort(WriterCommitMessage[] messages); + + + /** + * Whether this batch write requests execution metrics. Returns a row level operation command this batch write + * is part of, if requested. Return null if not requested. + */ + default RowLevelOperation.Command requestExecMetrics() { + return null; + } + + /** + * Provides an array of query execution metrics to the batch write prior to commit. + * @param metrics an array of execution metrics + */ + default void execMetrics(CustomTaskMetric[] metrics) {} } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java index f62d194fa7f9f..3f77c75ed8ecd 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java @@ -86,5 +86,4 @@ default CustomMetric[] supportedCustomMetrics() { default CustomTaskMetric[] reportDriverMetrics() { return new CustomTaskMetric[]{}; } - } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index d032829d8645f..3e01d321182a9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -23,6 +23,7 @@ import java.util import java.util.OptionalLong import scala.collection.mutable +import scala.collection.mutable.ListBuffer import com.google.common.base.Objects @@ -144,6 +145,8 @@ abstract class InMemoryBaseTable( // The key `Seq[Any]` is the partition values, value is a set of splits, each with a set of rows. val dataMap: mutable.Map[Seq[Any], Seq[BufferedRows]] = mutable.Map.empty + val commits: ListBuffer[Commit] = ListBuffer[Commit]() + def data: Array[BufferedRows] = dataMap.values.flatten.toArray def rows: Seq[InternalRow] = dataMap.values.flatten.flatMap(_.rows).toSeq @@ -498,6 +501,24 @@ abstract class InMemoryBaseTable( options: CaseInsensitiveStringMap) extends BatchScanBaseClass(_data, readSchema, tableSchema) with SupportsRuntimeFiltering { + var setFilters = Array.empty[Filter] + + override def reportDriverMetrics(): Array[CustomTaskMetric] = + Array(new CustomTaskMetric{ + override def name(): String = "numSplits" + override def value(): Long = 1L + }) + + override def supportedCustomMetrics(): Array[CustomMetric] = { + Array(new CustomMetric { + override def name(): String = "numSplits" + override def description(): String = "number of splits in the scan" + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + taskMetrics.sum.toString + } + }) + } + override def filterAttributes(): Array[NamedReference] = { val scanFields = readSchema.fields.map(_.name).toSet partitioning.flatMap(_.references) @@ -505,6 +526,7 @@ abstract class InMemoryBaseTable( } override def filter(filters: Array[Filter]): Unit = { + this.setFilters = filters if (partitioning.length == 1 && partitioning.head.references().length == 1) { val ref = partitioning.head.references().head filters.foreach { @@ -575,6 +597,9 @@ abstract class InMemoryBaseTable( } protected abstract class TestBatchWrite extends BatchWrite { + + var commitProperties: mutable.Map[String, String] = mutable.Map.empty[String, String] + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { BufferedRowsWriterFactory } @@ -583,8 +608,11 @@ abstract class InMemoryBaseTable( } class Append(val info: LogicalWriteInfo) extends TestBatchWrite { + override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { withData(messages.map(_.asInstanceOf[BufferedRows])) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -593,6 +621,8 @@ abstract class InMemoryBaseTable( val newData = messages.map(_.asInstanceOf[BufferedRows]) dataMap --= newData.flatMap(_.rows.map(getKey)) withData(newData) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -600,6 +630,8 @@ abstract class InMemoryBaseTable( override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { dataMap.clear() withData(messages.map(_.asInstanceOf[BufferedRows])) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -747,6 +779,14 @@ private class BufferedRowsReader( override def close(): Unit = {} + override def currentMetricsValues(): Array[CustomTaskMetric] = + Array[CustomTaskMetric]( + new CustomTaskMetric { + override def name(): String = "numSplits" + override def value(): Long = 1 + } + ) + private def extractFieldValue( field: StructField, schema: StructType, @@ -841,6 +881,8 @@ class InMemoryCustomDriverTaskMetric(value: Long) extends CustomTaskMetric { override def value(): Long = value } +case class Commit(id: Long, properties: Map[String, String]) + sealed trait Operation case object Write extends Operation case object Delete extends Operation diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala index aeb807768b076..5cdec4a6d7aa6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala @@ -17,13 +17,17 @@ package org.apache.spark.sql.connector.catalog +import java.time.Instant import java.util +import scala.collection.mutable.ListBuffer + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{FieldReference, LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform} +import org.apache.spark.sql.connector.metric.CustomTaskMetric import org.apache.spark.sql.connector.read.{Scan, ScanBuilder} import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite, DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering, RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo, SupportsDelta, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.RowLevelOperation.Command @@ -46,6 +50,8 @@ class InMemoryRowLevelOperationTable( constraints) with SupportsRowLevelOperations { + private val _scans = ListBuffer.empty[Scan] + private final val PARTITION_COLUMN_REF = FieldReference(PartitionKeyColumn.name) private final val INDEX_COLUMN_REF = FieldReference(IndexColumn.name) private final val SUPPORTS_DELTAS = "supports-deltas" @@ -68,6 +74,16 @@ class InMemoryRowLevelOperationTable( } } + class InMemoryRowLevelOperationScanBuilder(tableSchema: StructType, + options: CaseInsensitiveStringMap) + extends InMemoryScanBuilder(tableSchema, options) { + override def build: Scan = { + val scan = super.build + _scans += scan + scan + } + } + case class PartitionBasedOperation(command: Command) extends RowLevelOperation { var configuredScan: InMemoryBatchScan = _ @@ -101,7 +117,7 @@ class InMemoryRowLevelOperationTable( SortDirection.ASCENDING.defaultNullOrdering())) } - override def toBatch: BatchWrite = PartitionBasedReplaceData(configuredScan) + override def toBatch: BatchWrite = PartitionBasedReplaceData(configuredScan, command) override def description: String = "InMemoryWrite" } @@ -111,9 +127,46 @@ class InMemoryRowLevelOperationTable( override def description(): String = "InMemoryPartitionReplaceOperation" } - private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) extends TestBatchWrite { + abstract class RowLevelOperationBatchWrite(command: Command) extends TestBatchWrite { + override def requestExecMetrics(): Command = command + + override def execMetrics(metrics: Array[CustomTaskMetric]): Unit = { + metrics.foreach(m => commitProperties += (m.name() -> m.value().toString)) + } + + override def commit(messages: Array[WriterCommitMessage]): Unit = { + assert(_scans.size <= 2, "Expected at most two scans in row-level operations") + assert(_scans.count{ case s: InMemoryBatchScan => s.setFilters.nonEmpty } <= 1, + "Expected at most one scan with runtime filters in row-level operations") + assert(_scans.count{ case s: InMemoryBatchScan => s.setFilters.isEmpty } <= 1, + "Expected at most one scan without runtime filters in row-level operations") + + _scans.foreach{ + case s: InMemoryBatchScan => + val prefix = if (s.setFilters.isEmpty) { + "" + } else { + "secondScan." + } + s.reportDriverMetrics().foreach { metric => + commitProperties += (prefix + metric.name() -> metric.value().toString) + } + case _ => + } + _scans.clear() + doCommit(messages) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() + } + + def doCommit(messages: Array[WriterCommitMessage]): Unit + } + + private case class PartitionBasedReplaceData(scan: InMemoryBatchScan, + command: RowLevelOperation.Command) + extends RowLevelOperationBatchWrite(command) { - override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { + override def doCommit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { val newData = messages.map(_.asInstanceOf[BufferedRows]) val readRows = scan.data.flatMap(_.asInstanceOf[BufferedRows].rows) val readPartitions = readRows.map(r => getKey(r, schema)).distinct @@ -134,7 +187,7 @@ class InMemoryRowLevelOperationTable( override def rowId(): Array[NamedReference] = Array(PK_COLUMN_REF) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - new InMemoryScanBuilder(schema, options) + new InMemoryRowLevelOperationScanBuilder(schema, options) } override def newWriteBuilder(info: LogicalWriteInfo): DeltaWriteBuilder = { @@ -155,7 +208,7 @@ class InMemoryRowLevelOperationTable( ) } - override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite + override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite(command) } } } @@ -165,12 +218,14 @@ class InMemoryRowLevelOperationTable( } } - private object TestDeltaBatchWrite extends DeltaBatchWrite { + private case class TestDeltaBatchWrite(command: Command) + extends RowLevelOperationBatchWrite(command) with DeltaBatchWrite{ + override def createBatchWriterFactory(info: PhysicalWriteInfo): DeltaWriterFactory = { DeltaBufferedRowsWriterFactory } - override def commit(messages: Array[WriterCommitMessage]): Unit = { + override def doCommit(messages: Array[WriterCommitMessage]): Unit = { val newData = messages.map(_.asInstanceOf[BufferedRows]) withDeletes(newData) withData(newData) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 4436c6b24f7c8..b5c62f7569a3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -30,10 +30,11 @@ import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUt import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, WRITE_WITH_METADATA_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.metric.CustomMetric -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage} +import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, RowLevelOperation, Write, WriterCommitMessage} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{LongAccumulator, Utils} @@ -398,7 +399,7 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec { /** * The base physical plan for writing data into data source v2. */ -trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { +trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSparkPlanHelper { def query: SparkPlan def writingTask: WritingSparkTask[_] = DataWritingSparkTask @@ -422,6 +423,22 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { tempRdd } } + + val metricsOpt = batchWrite.requestExecMetrics() match { + case RowLevelOperation.Command.MERGE => + collectFirst(query) { + case m: MergeRowsExec => m.metrics + } + case _ => None + } + metricsOpt.foreach { metrics => + batchWrite.execMetrics( + metrics.map { + case (k, v) => V2ExecMetric(k, v.value) + }.toArray + ) + } + // introduce a local var to avoid serializing the whole class val task = writingTask val writerFactory = batchWrite.createBatchWriterFactory( @@ -729,3 +746,4 @@ private[v2] case class DataWritingSparkTaskResult( */ private[sql] case class StreamWriterCommitProgress(numOutputRows: Long) +private [v2] case class V2ExecMetric(name: String, value: Long) extends CustomTaskMetric diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 21b171bee9614..09728d96fdd77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -2045,6 +2045,45 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } } + test("V2 write metrics for merge") { + + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) + + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") + + sql( + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | DELETE + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | DELETE + |""".stripMargin + ) + + val table = catalog.loadTable(ident) + // scalastyle:off println + println(table) + // scalastyle:on println + sql(s"DROP TABLE $tableNameAsString") + } + } + } + } + private def findMergeExec(query: String): MergeRowsExec = { val plan = executeAndKeepPlan { sql(query) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 13e8d3721d81e..371a1850c19b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableCatalog, TableChange, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} From 3162af01fbbdba5705a9731dd54180925c1ccd73 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 11 Jul 2025 15:35:18 -0700 Subject: [PATCH 2/4] Fixes --- .../sql/connector/metric/MergeMetrics.java | 163 ++++++++++++++++++ .../spark/sql/connector/write/BatchWrite.java | 17 +- .../spark/sql/connector/write/Write.java | 1 + .../connector/catalog/InMemoryBaseTable.scala | 27 --- .../InMemoryRowLevelOperationTable.scala | 78 +++------ .../v2/WriteToDataSourceV2Exec.scala | 62 ++++--- .../connector/MergeIntoTableSuiteBase.scala | 86 ++++++++- .../sql/hive/execution/HiveDDLSuite.scala | 2 +- 8 files changed, 322 insertions(+), 114 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java new file mode 100644 index 0000000000000..bde4c88bcdd50 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connector.metric; + +public interface MergeMetrics { + + class Builder { + private long numTargetRowsCopied = -1; + private long numTargetRowsInserted = -1; + private long numTargetRowsDeleted = -1; + private long numTargetRowsUpdated = -1; + private long numTargetRowsMatchedUpdated = -1; + private long numTargetRowsMatchedDeleted = -1; + private long numTargetRowsNotMatchedBySourceUpdated = -1; + private long numTargetRowsNotMatchedBySourceDeleted = -1; + private long numSourceRows = -1; + + public Builder numTargetRowsCopied(long numTargetRowsCopied) { + this.numTargetRowsCopied = numTargetRowsCopied; + return this; + } + + public Builder numTargetRowsInserted(long numTargetRowsInserted) { + this.numTargetRowsInserted = numTargetRowsInserted; + return this; + } + + public Builder numTargetRowsDeleted(long numTargetRowsDeleted) { + this.numTargetRowsDeleted = numTargetRowsDeleted; + return this; + } + + public Builder numTargetRowsUpdated(long numTargetRowsUpdated) { + this.numTargetRowsUpdated = numTargetRowsUpdated; + return this; + } + + public Builder numTargetRowsMatchedUpdated(long numTargetRowsMatchedUpdated) { + this.numTargetRowsMatchedUpdated = numTargetRowsMatchedUpdated; + return this; + } + + public Builder numTargetRowsMatchedDeleted(long numTargetRowsMatchedDeleted) { + this.numTargetRowsMatchedDeleted = numTargetRowsMatchedDeleted; + return this; + } + + public Builder numTargetRowsNotMatchedBySourceUpdated(long numTargetRowsNotMatchedBySourceUpdated) { + this.numTargetRowsNotMatchedBySourceUpdated = numTargetRowsNotMatchedBySourceUpdated; + return this; + } + + public Builder numTargetRowsNotMatchedBySourceDeleted(long numTargetRowsNotMatchedBySourceDeleted) { + this.numTargetRowsNotMatchedBySourceDeleted = numTargetRowsNotMatchedBySourceDeleted; + return this; + } + + public MergeMetrics build() { + return new MergeMetrics() { + @Override + public long numTargetRowsCopied() { + return numTargetRowsCopied; + } + + @Override + public long numTargetRowsInserted() { + return numTargetRowsInserted; + } + + @Override + public long numTargetRowsDeleted() { + return numTargetRowsDeleted; + } + + @Override + public long numTargetRowsUpdated() { + return numTargetRowsUpdated; + } + + @Override + public long numTargetRowsMatchedUpdated() { + return numTargetRowsMatchedUpdated; + } + + @Override + public long numTargetRowsMatchedDeleted() { + return numTargetRowsMatchedDeleted; + } + + @Override + public long numTargetRowsNotMatchedBySourceUpdated() { + return numTargetRowsNotMatchedBySourceUpdated; + } + + @Override + public long numTargetRowsNotMatchedBySourceDeleted() { + return numTargetRowsNotMatchedBySourceDeleted; + } + }; + } + } + + /** + * Returns a new builder for MergeMetrics. + */ + static Builder builder() { + return new MergeMetrics.Builder(); + } + + /** + * Returns the number of target rows copied unmodified because they did not match any action. + */ + long numTargetRowsCopied(); + + /** + * Returns the number of target rows inserted. + */ + long numTargetRowsInserted(); + + /** + * Returns the number of target rows deleted. + */ + long numTargetRowsDeleted(); + + /** + * Returns the number of target rows updated. + */ + long numTargetRowsUpdated(); + + /** + * Returns the number of target rows matched and updated by a matched clause. + */ + long numTargetRowsMatchedUpdated(); + + /** + * Returns the number of target rows matched and deleted by a matched clause. + */ + long numTargetRowsMatchedDeleted(); + + /** + * Returns the number of target rows updated by a not matched by source clause. + */ + long numTargetRowsNotMatchedBySourceUpdated(); + + /** + * Returns the number of target rows deleted by a not matched by source clause. + */ + long numTargetRowsNotMatchedBySourceDeleted(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java index eca64ffb71b0c..8d65f0fb82f14 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java @@ -19,6 +19,7 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.connector.metric.CustomTaskMetric; +import org.apache.spark.sql.connector.metric.MergeMetrics; /** * An interface that defines how to write the data to data source for batch processing. @@ -106,18 +107,18 @@ default void onDataWriterCommit(WriterCommitMessage message) {} */ void abort(WriterCommitMessage[] messages); - /** - * Whether this batch write requests execution metrics. Returns a row level operation command this batch write - * is part of, if requested. Return null if not requested. + * Whether this batch write requests merge execution metrics. */ - default RowLevelOperation.Command requestExecMetrics() { - return null; + default boolean requestMergeMetrics() { + return false; } /** - * Provides an array of query execution metrics to the batch write prior to commit. - * @param metrics an array of execution metrics + * Similar to {@link #commit(WriterCommitMessage[])}, but providing merge exec metrics to this batch write. + * @param metrics merge execution metrics */ - default void execMetrics(CustomTaskMetric[] metrics) {} + default void commitWithMerge(WriterCommitMessage[] messages, MergeMetrics metrics) { + commit(messages); + } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java index 3f77c75ed8ecd..f62d194fa7f9f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/Write.java @@ -86,4 +86,5 @@ default CustomMetric[] supportedCustomMetrics() { default CustomTaskMetric[] reportDriverMetrics() { return new CustomTaskMetric[]{}; } + } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 3e01d321182a9..5f863d9c1ad44 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -501,24 +501,6 @@ abstract class InMemoryBaseTable( options: CaseInsensitiveStringMap) extends BatchScanBaseClass(_data, readSchema, tableSchema) with SupportsRuntimeFiltering { - var setFilters = Array.empty[Filter] - - override def reportDriverMetrics(): Array[CustomTaskMetric] = - Array(new CustomTaskMetric{ - override def name(): String = "numSplits" - override def value(): Long = 1L - }) - - override def supportedCustomMetrics(): Array[CustomMetric] = { - Array(new CustomMetric { - override def name(): String = "numSplits" - override def description(): String = "number of splits in the scan" - override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { - taskMetrics.sum.toString - } - }) - } - override def filterAttributes(): Array[NamedReference] = { val scanFields = readSchema.fields.map(_.name).toSet partitioning.flatMap(_.references) @@ -526,7 +508,6 @@ abstract class InMemoryBaseTable( } override def filter(filters: Array[Filter]): Unit = { - this.setFilters = filters if (partitioning.length == 1 && partitioning.head.references().length == 1) { val ref = partitioning.head.references().head filters.foreach { @@ -779,14 +760,6 @@ private class BufferedRowsReader( override def close(): Unit = {} - override def currentMetricsValues(): Array[CustomTaskMetric] = - Array[CustomTaskMetric]( - new CustomTaskMetric { - override def name(): String = "numSplits" - override def value(): Long = 1 - } - ) - private def extractFieldValue( field: StructField, schema: StructType, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala index 5cdec4a6d7aa6..f9651fac6845e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala @@ -20,14 +20,12 @@ package org.apache.spark.sql.connector.catalog import java.time.Instant import java.util -import scala.collection.mutable.ListBuffer - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{FieldReference, LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform} -import org.apache.spark.sql.connector.metric.CustomTaskMetric +import org.apache.spark.sql.connector.metric.MergeMetrics import org.apache.spark.sql.connector.read.{Scan, ScanBuilder} import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite, DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering, RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo, SupportsDelta, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.RowLevelOperation.Command @@ -50,8 +48,6 @@ class InMemoryRowLevelOperationTable( constraints) with SupportsRowLevelOperations { - private val _scans = ListBuffer.empty[Scan] - private final val PARTITION_COLUMN_REF = FieldReference(PartitionKeyColumn.name) private final val INDEX_COLUMN_REF = FieldReference(IndexColumn.name) private final val SUPPORTS_DELTAS = "supports-deltas" @@ -74,16 +70,6 @@ class InMemoryRowLevelOperationTable( } } - class InMemoryRowLevelOperationScanBuilder(tableSchema: StructType, - options: CaseInsensitiveStringMap) - extends InMemoryScanBuilder(tableSchema, options) { - override def build: Scan = { - val scan = super.build - _scans += scan - scan - } - } - case class PartitionBasedOperation(command: Command) extends RowLevelOperation { var configuredScan: InMemoryBatchScan = _ @@ -117,7 +103,7 @@ class InMemoryRowLevelOperationTable( SortDirection.ASCENDING.defaultNullOrdering())) } - override def toBatch: BatchWrite = PartitionBasedReplaceData(configuredScan, command) + override def toBatch: BatchWrite = PartitionBasedReplaceData(configuredScan) override def description: String = "InMemoryWrite" } @@ -127,44 +113,38 @@ class InMemoryRowLevelOperationTable( override def description(): String = "InMemoryPartitionReplaceOperation" } - abstract class RowLevelOperationBatchWrite(command: Command) extends TestBatchWrite { - override def requestExecMetrics(): Command = command - - override def execMetrics(metrics: Array[CustomTaskMetric]): Unit = { - metrics.foreach(m => commitProperties += (m.name() -> m.value().toString)) + abstract class RowLevelOperationBatchWrite extends TestBatchWrite { + override def requestMergeMetrics(): Boolean = true + + override def commitWithMerge(messages: Array[WriterCommitMessage], metrics: MergeMetrics): + Unit = { + commitProperties += "numTargetRowsCopied" -> metrics.numTargetRowsCopied().toString + commitProperties += "numTargetRowsInserted" -> metrics.numTargetRowsInserted().toString + commitProperties += "numTargetRowsDeleted" -> metrics.numTargetRowsDeleted().toString + commitProperties += "numTargetRowsUpdated" -> metrics.numTargetRowsUpdated().toString + commitProperties += "numTargetRowsInserted" -> metrics.numTargetRowsInserted().toString + commitProperties += ("numTargetRowsNotMatchedBySourceUpdated" + -> metrics.numTargetRowsNotMatchedBySourceUpdated().toString) + commitProperties += ("numTargetRowsNotMatchedBySourceDeleted" + -> metrics.numTargetRowsNotMatchedBySourceDeleted().toString) + commitProperties += ("numTargetRowsMatchedDeleted" + -> metrics.numTargetRowsMatchedDeleted().toString) + commitProperties += ("numTargetRowsMatchedUpdated" + -> metrics.numTargetRowsMatchedUpdated().toString) + commit(messages) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } override def commit(messages: Array[WriterCommitMessage]): Unit = { - assert(_scans.size <= 2, "Expected at most two scans in row-level operations") - assert(_scans.count{ case s: InMemoryBatchScan => s.setFilters.nonEmpty } <= 1, - "Expected at most one scan with runtime filters in row-level operations") - assert(_scans.count{ case s: InMemoryBatchScan => s.setFilters.isEmpty } <= 1, - "Expected at most one scan without runtime filters in row-level operations") - - _scans.foreach{ - case s: InMemoryBatchScan => - val prefix = if (s.setFilters.isEmpty) { - "" - } else { - "secondScan." - } - s.reportDriverMetrics().foreach { metric => - commitProperties += (prefix + metric.name() -> metric.value().toString) - } - case _ => - } - _scans.clear() doCommit(messages) - commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) - commitProperties.clear() } def doCommit(messages: Array[WriterCommitMessage]): Unit } - private case class PartitionBasedReplaceData(scan: InMemoryBatchScan, - command: RowLevelOperation.Command) - extends RowLevelOperationBatchWrite(command) { + private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) + extends RowLevelOperationBatchWrite { override def doCommit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { val newData = messages.map(_.asInstanceOf[BufferedRows]) @@ -187,7 +167,7 @@ class InMemoryRowLevelOperationTable( override def rowId(): Array[NamedReference] = Array(PK_COLUMN_REF) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - new InMemoryRowLevelOperationScanBuilder(schema, options) + new InMemoryScanBuilder(schema, options) } override def newWriteBuilder(info: LogicalWriteInfo): DeltaWriteBuilder = { @@ -208,7 +188,7 @@ class InMemoryRowLevelOperationTable( ) } - override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite(command) + override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite } } } @@ -218,9 +198,7 @@ class InMemoryRowLevelOperationTable( } } - private case class TestDeltaBatchWrite(command: Command) - extends RowLevelOperationBatchWrite(command) with DeltaBatchWrite{ - + private object TestDeltaBatchWrite extends RowLevelOperationBatchWrite with DeltaBatchWrite{ override def createBatchWriterFactory(info: PhysicalWriteInfo): DeltaWriterFactory = { DeltaBufferedRowsWriterFactory } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index b5c62f7569a3e..1ffab0138745d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -30,8 +30,8 @@ import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUt import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, WRITE_WITH_METADATA_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric} -import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, RowLevelOperation, Write, WriterCommitMessage} +import org.apache.spark.sql.connector.metric.{CustomMetric, MergeMetrics} +import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -424,21 +424,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } } - val metricsOpt = batchWrite.requestExecMetrics() match { - case RowLevelOperation.Command.MERGE => - collectFirst(query) { - case m: MergeRowsExec => m.metrics - } - case _ => None - } - metricsOpt.foreach { metrics => - batchWrite.execMetrics( - metrics.map { - case (k, v) => V2ExecMetric(k, v.value) - }.toArray - ) - } - // introduce a local var to avoid serializing the whole class val task = writingTask val writerFactory = batchWrite.createBatchWriterFactory( @@ -468,8 +453,17 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } ) + val mergeMetricsOpt = if (batchWrite.requestMergeMetrics()) { + Some(getMergeMetrics(query)) + } else { + None + } + logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.") - batchWrite.commit(messages) + mergeMetricsOpt match { + case Some(metrics) => batchWrite.commitWithMerge(messages, metrics) + case None => batchWrite.commit(messages) + } logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.") commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { @@ -491,6 +485,36 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa Nil } + + private def getMergeMetrics(query: SparkPlan): MergeMetrics = { + val mergeMetricsBuilder = new MergeMetrics.Builder() + val mergeNode = collectFirst(query) {case c: MergeRowsExec => c} + mergeNode.foreach {n => mergeMetricValues(n.metrics, mergeMetricsBuilder)} + mergeMetricsBuilder.build() + } + + private def metric(metrics: Map[String, SQLMetric], metric: String): Long = { + metrics.get(metric) match { + case Some(m) => m.value + case None => -1 + } + } + + private def mergeMetricValues(mergeNodeMetrics: Map[String, SQLMetric], + metricBuilder: MergeMetrics.Builder): Option[MergeMetrics.Builder] = { + metricBuilder + .numTargetRowsCopied(metric(mergeNodeMetrics, "numTargetRowsCopied")) + .numTargetRowsDeleted(metric(mergeNodeMetrics, "numTargetRowsDeleted")) + .numTargetRowsUpdated(metric(mergeNodeMetrics, "numTargetRowsUpdated")) + .numTargetRowsInserted(metric(mergeNodeMetrics, "numTargetRowsInserted")) + .numTargetRowsNotMatchedBySourceDeleted( + metric(mergeNodeMetrics, "numTargetRowsNotMatchedBySourceDeleted")) + .numTargetRowsNotMatchedBySourceUpdated( + metric(mergeNodeMetrics, "numTargetRowsNotMatchedBySourceUpdated")) + .numTargetRowsMatchedDeleted(metric(mergeNodeMetrics, "numTargetRowsMatchedDeleted")) + .numTargetRowsMatchedUpdated(metric(mergeNodeMetrics, "numTargetRowsMatchedUpdated")) + Some(metricBuilder) + } } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { @@ -745,5 +769,3 @@ private[v2] case class DataWritingSparkTaskResult( * Sink progress information collected after commit. */ private[sql] case class StreamWriterCommitProgress(numOutputRows: Long) - -private [v2] case class V2ExecMetric(name: String, value: Long) extends CustomTaskMetric diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 09728d96fdd77..b6e7586e0e30e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, In, Not} import org.apache.spark.sql.catalyst.optimizer.BuildLeft -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, TableInfo} +import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, InMemoryTable, TableInfo} import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression, LiteralValue} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -1811,6 +1811,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(1, 1000, "hr"), // updated Row(2, 200, "software"), Row(3, 300, "hr"))) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "2")) + assert(commitProps("numTargetRowsInserted") === "0") + assert(commitProps("numTargetRowsUpdated") === "1") + assert(commitProps("numTargetRowsDeleted") === "0") + assert(commitProps("numTargetRowsMatchedUpdated") === "1") + assert(commitProps("numTargetRowsMatchedDeleted") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1856,6 +1867,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(2, 200, "software"), Row(3, 300, "hr"), Row(5, 400, "executive"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === "0") + assert(commitProps("numTargetRowsInserted") === "1") + assert(commitProps("numTargetRowsUpdated") === "0") + assert(commitProps("numTargetRowsDeleted") === "0") + assert(commitProps("numTargetRowsMatchedUpdated") === "0") + assert(commitProps("numTargetRowsMatchedDeleted") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1883,7 +1905,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase |""".stripMargin } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) assertMetric(mergeExec, "numTargetRowsInserted", 0) assertMetric(mergeExec, "numTargetRowsUpdated", 2) @@ -1901,6 +1922,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(3, 300, "hr"), Row(4, 400, "marketing"), Row(5, -1, "executive"))) // updated + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "0") + assert(commitProps("numTargetRowsUpdated") === "2") + assert(commitProps("numTargetRowsDeleted") === "0") + assert(commitProps("numTargetRowsMatchedUpdated") === "1") + assert(commitProps("numTargetRowsMatchedDeleted") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1947,6 +1979,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing")) // Row(5, 500, "executive") deleted ) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "0") + assert(commitProps("numTargetRowsUpdated") === "0") + assert(commitProps("numTargetRowsDeleted") === "2") + assert(commitProps("numTargetRowsMatchedUpdated") === "0") + assert(commitProps("numTargetRowsMatchedDeleted") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "1") } } @@ -1994,6 +2037,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), Row(5, -1, "executive"), // updated Row(6, -1, "dummy"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "1") + assert(commitProps("numTargetRowsUpdated") === "2") + assert(commitProps("numTargetRowsDeleted") === "0") + assert(commitProps("numTargetRowsMatchedUpdated") === "1") + assert(commitProps("numTargetRowsMatchedDeleted") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -2032,7 +2086,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) - checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq( @@ -2042,11 +2095,21 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), // Row(5, 500, "executive") deleted Row(6, -1, "dummy"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "1") + assert(commitProps("numTargetRowsUpdated") === "0") + assert(commitProps("numTargetRowsDeleted") === "2") + assert(commitProps("numTargetRowsMatchedUpdated") === "0") + assert(commitProps("numTargetRowsMatchedDeleted") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "1") } } - test("V2 write metrics for merge") { - + test("SPARK-52689: V2 write metrics for merge") { Seq("true", "false").foreach { aqeEnabled: String => withTempView("source") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { @@ -2075,9 +2138,16 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase ) val table = catalog.loadTable(ident) - // scalastyle:off println - println(table) - // scalastyle:on println + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "1") + assert(commitProps("numTargetRowsUpdated") === "0") + assert(commitProps("numTargetRowsDeleted") === "2") + assert(commitProps("numTargetRowsMatchedUpdated") === "0") + assert(commitProps("numTargetRowsMatchedDeleted") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "1") + sql(s"DROP TABLE $tableNameAsString") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 371a1850c19b0..13e8d3721d81e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableCatalog, TableChange, TableInfo} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} From 781f32d20f97f5c59d1231083b4a563c9c0a6a80 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 15 Jul 2025 14:17:47 -0700 Subject: [PATCH 3/4] Review comments --- .../sql/connector/metric/MergeMetrics.java | 121 ++---------------- .../spark/sql/connector/write/BatchWrite.java | 9 +- .../InMemoryRowLevelOperationTable.scala | 39 +++--- .../v2/WriteToDataSourceV2Exec.scala | 62 ++++----- 4 files changed, 61 insertions(+), 170 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java index bde4c88bcdd50..f49e30dba27bc 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java @@ -16,148 +16,47 @@ */ package org.apache.spark.sql.connector.metric; -public interface MergeMetrics { - - class Builder { - private long numTargetRowsCopied = -1; - private long numTargetRowsInserted = -1; - private long numTargetRowsDeleted = -1; - private long numTargetRowsUpdated = -1; - private long numTargetRowsMatchedUpdated = -1; - private long numTargetRowsMatchedDeleted = -1; - private long numTargetRowsNotMatchedBySourceUpdated = -1; - private long numTargetRowsNotMatchedBySourceDeleted = -1; - private long numSourceRows = -1; - - public Builder numTargetRowsCopied(long numTargetRowsCopied) { - this.numTargetRowsCopied = numTargetRowsCopied; - return this; - } - - public Builder numTargetRowsInserted(long numTargetRowsInserted) { - this.numTargetRowsInserted = numTargetRowsInserted; - return this; - } - - public Builder numTargetRowsDeleted(long numTargetRowsDeleted) { - this.numTargetRowsDeleted = numTargetRowsDeleted; - return this; - } - - public Builder numTargetRowsUpdated(long numTargetRowsUpdated) { - this.numTargetRowsUpdated = numTargetRowsUpdated; - return this; - } - - public Builder numTargetRowsMatchedUpdated(long numTargetRowsMatchedUpdated) { - this.numTargetRowsMatchedUpdated = numTargetRowsMatchedUpdated; - return this; - } - - public Builder numTargetRowsMatchedDeleted(long numTargetRowsMatchedDeleted) { - this.numTargetRowsMatchedDeleted = numTargetRowsMatchedDeleted; - return this; - } - - public Builder numTargetRowsNotMatchedBySourceUpdated(long numTargetRowsNotMatchedBySourceUpdated) { - this.numTargetRowsNotMatchedBySourceUpdated = numTargetRowsNotMatchedBySourceUpdated; - return this; - } - - public Builder numTargetRowsNotMatchedBySourceDeleted(long numTargetRowsNotMatchedBySourceDeleted) { - this.numTargetRowsNotMatchedBySourceDeleted = numTargetRowsNotMatchedBySourceDeleted; - return this; - } - - public MergeMetrics build() { - return new MergeMetrics() { - @Override - public long numTargetRowsCopied() { - return numTargetRowsCopied; - } +import java.util.OptionalLong; - @Override - public long numTargetRowsInserted() { - return numTargetRowsInserted; - } - - @Override - public long numTargetRowsDeleted() { - return numTargetRowsDeleted; - } - - @Override - public long numTargetRowsUpdated() { - return numTargetRowsUpdated; - } - - @Override - public long numTargetRowsMatchedUpdated() { - return numTargetRowsMatchedUpdated; - } - - @Override - public long numTargetRowsMatchedDeleted() { - return numTargetRowsMatchedDeleted; - } - - @Override - public long numTargetRowsNotMatchedBySourceUpdated() { - return numTargetRowsNotMatchedBySourceUpdated; - } - - @Override - public long numTargetRowsNotMatchedBySourceDeleted() { - return numTargetRowsNotMatchedBySourceDeleted; - } - }; - } - } - - /** - * Returns a new builder for MergeMetrics. - */ - static Builder builder() { - return new MergeMetrics.Builder(); - } +public interface MergeMetrics { /** * Returns the number of target rows copied unmodified because they did not match any action. */ - long numTargetRowsCopied(); + OptionalLong numTargetRowsCopied(); /** * Returns the number of target rows inserted. */ - long numTargetRowsInserted(); + OptionalLong numTargetRowsInserted(); /** * Returns the number of target rows deleted. */ - long numTargetRowsDeleted(); + OptionalLong numTargetRowsDeleted(); /** * Returns the number of target rows updated. */ - long numTargetRowsUpdated(); + OptionalLong numTargetRowsUpdated(); /** * Returns the number of target rows matched and updated by a matched clause. */ - long numTargetRowsMatchedUpdated(); + OptionalLong numTargetRowsMatchedUpdated(); /** * Returns the number of target rows matched and deleted by a matched clause. */ - long numTargetRowsMatchedDeleted(); + OptionalLong numTargetRowsMatchedDeleted(); /** * Returns the number of target rows updated by a not matched by source clause. */ - long numTargetRowsNotMatchedBySourceUpdated(); + OptionalLong numTargetRowsNotMatchedBySourceUpdated(); /** * Returns the number of target rows deleted by a not matched by source clause. */ - long numTargetRowsNotMatchedBySourceDeleted(); + OptionalLong numTargetRowsNotMatchedBySourceDeleted(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java index 8d65f0fb82f14..b620672b1feaf 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java @@ -107,18 +107,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} */ void abort(WriterCommitMessage[] messages); - /** - * Whether this batch write requests merge execution metrics. - */ - default boolean requestMergeMetrics() { - return false; - } - /** * Similar to {@link #commit(WriterCommitMessage[])}, but providing merge exec metrics to this batch write. * @param metrics merge execution metrics */ - default void commitWithMerge(WriterCommitMessage[] messages, MergeMetrics metrics) { + default void commitMerge(WriterCommitMessage[] messages, MergeMetrics metrics) { commit(messages); } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala index f9651fac6845e..2cb8c9f192712 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala @@ -114,39 +114,36 @@ class InMemoryRowLevelOperationTable( } abstract class RowLevelOperationBatchWrite extends TestBatchWrite { - override def requestMergeMetrics(): Boolean = true - override def commitWithMerge(messages: Array[WriterCommitMessage], metrics: MergeMetrics): + override def commitMerge(messages: Array[WriterCommitMessage], metrics: MergeMetrics): Unit = { - commitProperties += "numTargetRowsCopied" -> metrics.numTargetRowsCopied().toString - commitProperties += "numTargetRowsInserted" -> metrics.numTargetRowsInserted().toString - commitProperties += "numTargetRowsDeleted" -> metrics.numTargetRowsDeleted().toString - commitProperties += "numTargetRowsUpdated" -> metrics.numTargetRowsUpdated().toString - commitProperties += "numTargetRowsInserted" -> metrics.numTargetRowsInserted().toString - commitProperties += ("numTargetRowsNotMatchedBySourceUpdated" - -> metrics.numTargetRowsNotMatchedBySourceUpdated().toString) - commitProperties += ("numTargetRowsNotMatchedBySourceDeleted" - -> metrics.numTargetRowsNotMatchedBySourceDeleted().toString) + commitProperties += "numTargetRowsCopied" -> metrics.numTargetRowsCopied().orElse(-1).toString + commitProperties += "numTargetRowsInserted" -> + metrics.numTargetRowsInserted().orElse(-1).toString + commitProperties += "numTargetRowsDeleted" -> + metrics.numTargetRowsDeleted().orElse(-1).toString + commitProperties += "numTargetRowsUpdated" -> + metrics.numTargetRowsUpdated().orElse(-1).toString + commitProperties += "numTargetRowsInserted" -> + metrics.numTargetRowsInserted().orElse(-1).toString commitProperties += ("numTargetRowsMatchedDeleted" - -> metrics.numTargetRowsMatchedDeleted().toString) + -> metrics.numTargetRowsMatchedDeleted().orElse(-1).toString) commitProperties += ("numTargetRowsMatchedUpdated" - -> metrics.numTargetRowsMatchedUpdated().toString) + -> metrics.numTargetRowsMatchedUpdated().orElse(-1).toString) + commitProperties += ("numTargetRowsNotMatchedBySourceUpdated" + -> metrics.numTargetRowsNotMatchedBySourceUpdated().orElse(-1).toString) + commitProperties += ("numTargetRowsNotMatchedBySourceDeleted" + -> metrics.numTargetRowsNotMatchedBySourceDeleted().orElse(-1).toString) commit(messages) commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) commitProperties.clear() } - - override def commit(messages: Array[WriterCommitMessage]): Unit = { - doCommit(messages) - } - - def doCommit(messages: Array[WriterCommitMessage]): Unit } private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) extends RowLevelOperationBatchWrite { - override def doCommit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { + override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { val newData = messages.map(_.asInstanceOf[BufferedRows]) val readRows = scan.data.flatMap(_.asInstanceOf[BufferedRows].rows) val readPartitions = readRows.map(r => getKey(r, schema)).distinct @@ -203,7 +200,7 @@ class InMemoryRowLevelOperationTable( DeltaBufferedRowsWriterFactory } - override def doCommit(messages: Array[WriterCommitMessage]): Unit = { + override def commit(messages: Array[WriterCommitMessage]): Unit = { val newData = messages.map(_.asInstanceOf[BufferedRows]) withDeletes(newData) withData(newData) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 1ffab0138745d..4f79de96d757f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.OptionalLong + import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkEnv, SparkException, TaskContext} @@ -453,15 +455,11 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa } ) - val mergeMetricsOpt = if (batchWrite.requestMergeMetrics()) { - Some(getMergeMetrics(query)) - } else { - None - } + val mergeMetricsOpt = getMergeMetrics(query) logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.") mergeMetricsOpt match { - case Some(metrics) => batchWrite.commitWithMerge(messages, metrics) + case Some(metrics) => batchWrite.commitMerge(messages, metrics) case None => batchWrite.commit(messages) } logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.") @@ -486,35 +484,29 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa Nil } - private def getMergeMetrics(query: SparkPlan): MergeMetrics = { - val mergeMetricsBuilder = new MergeMetrics.Builder() - val mergeNode = collectFirst(query) {case c: MergeRowsExec => c} - mergeNode.foreach {n => mergeMetricValues(n.metrics, mergeMetricsBuilder)} - mergeMetricsBuilder.build() + private def getMergeMetrics(query: SparkPlan): Option[MergeMetrics] = { + collectFirst(query) { case m: MergeRowsExec => m }.map{ n => + MergeMetricsImpl( + numTargetRowsCopied = metric(n.metrics, "numTargetRowsCopied"), + numTargetRowsDeleted = metric(n.metrics, "numTargetRowsDeleted"), + numTargetRowsUpdated = metric(n.metrics, "numTargetRowsUpdated"), + numTargetRowsInserted = metric(n.metrics, "numTargetRowsInserted"), + numTargetRowsMatchedDeleted = metric(n.metrics, "numTargetRowsMatchedDeleted"), + numTargetRowsMatchedUpdated = metric(n.metrics, "numTargetRowsMatchedUpdated"), + numTargetRowsNotMatchedBySourceDeleted = + metric(n.metrics, "numTargetRowsNotMatchedBySourceDeleted"), + numTargetRowsNotMatchedBySourceUpdated = + metric(n.metrics, "numTargetRowsNotMatchedBySourceUpdated") + ) + } } - private def metric(metrics: Map[String, SQLMetric], metric: String): Long = { + private def metric(metrics: Map[String, SQLMetric], metric: String): OptionalLong = { metrics.get(metric) match { - case Some(m) => m.value - case None => -1 + case Some(m) => OptionalLong.of(m.value) + case None => OptionalLong.empty() } } - - private def mergeMetricValues(mergeNodeMetrics: Map[String, SQLMetric], - metricBuilder: MergeMetrics.Builder): Option[MergeMetrics.Builder] = { - metricBuilder - .numTargetRowsCopied(metric(mergeNodeMetrics, "numTargetRowsCopied")) - .numTargetRowsDeleted(metric(mergeNodeMetrics, "numTargetRowsDeleted")) - .numTargetRowsUpdated(metric(mergeNodeMetrics, "numTargetRowsUpdated")) - .numTargetRowsInserted(metric(mergeNodeMetrics, "numTargetRowsInserted")) - .numTargetRowsNotMatchedBySourceDeleted( - metric(mergeNodeMetrics, "numTargetRowsNotMatchedBySourceDeleted")) - .numTargetRowsNotMatchedBySourceUpdated( - metric(mergeNodeMetrics, "numTargetRowsNotMatchedBySourceUpdated")) - .numTargetRowsMatchedDeleted(metric(mergeNodeMetrics, "numTargetRowsMatchedDeleted")) - .numTargetRowsMatchedUpdated(metric(mergeNodeMetrics, "numTargetRowsMatchedUpdated")) - Some(metricBuilder) - } } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { @@ -769,3 +761,13 @@ private[v2] case class DataWritingSparkTaskResult( * Sink progress information collected after commit. */ private[sql] case class StreamWriterCommitProgress(numOutputRows: Long) + +private case class MergeMetricsImpl( + override val numTargetRowsCopied: OptionalLong, + override val numTargetRowsDeleted: OptionalLong, + override val numTargetRowsUpdated: OptionalLong, + override val numTargetRowsInserted: OptionalLong, + override val numTargetRowsMatchedUpdated: OptionalLong, + override val numTargetRowsMatchedDeleted: OptionalLong, + override val numTargetRowsNotMatchedBySourceUpdated: OptionalLong, + override val numTargetRowsNotMatchedBySourceDeleted: OptionalLong) extends MergeMetrics From 8f97c64bab665442631bc49f4e55c08508b67cff Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 15 Jul 2025 14:25:59 -0700 Subject: [PATCH 4/4] lint --- .../org/apache/spark/sql/connector/metric/MergeMetrics.java | 4 ++++ .../java/org/apache/spark/sql/connector/write/BatchWrite.java | 4 ++-- .../execution/datasources/v2/WriteToDataSourceV2Exec.scala | 2 -- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java index f49e30dba27bc..f7b96453e3c0a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java @@ -18,6 +18,10 @@ import java.util.OptionalLong; +/** + * Execution metrics for a Merge Operation for a Connector that supports RowLevelOperations + * of this type. + */ public interface MergeMetrics { /** diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java index b620672b1feaf..116d360abd431 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java @@ -18,7 +18,6 @@ package org.apache.spark.sql.connector.write; import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.metric.MergeMetrics; /** @@ -108,7 +107,8 @@ default void onDataWriterCommit(WriterCommitMessage message) {} void abort(WriterCommitMessage[] messages); /** - * Similar to {@link #commit(WriterCommitMessage[])}, but providing merge exec metrics to this batch write. + * Similar to {@link #commit(WriterCommitMessage[])}, but providing merge exec metrics to + * this batch write. * @param metrics merge execution metrics */ default void commitMerge(WriterCommitMessage[] messages, MergeMetrics metrics) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 4f79de96d757f..24ca65c7586fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -425,7 +425,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa tempRdd } } - // introduce a local var to avoid serializing the whole class val task = writingTask val writerFactory = batchWrite.createBatchWriterFactory( @@ -456,7 +455,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSpa ) val mergeMetricsOpt = getMergeMetrics(query) - logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.") mergeMetricsOpt match { case Some(metrics) => batchWrite.commitMerge(messages, metrics)