diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java new file mode 100644 index 0000000000000..f7b96453e3c0a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/MergeMetrics.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connector.metric; + +import java.util.OptionalLong; + +/** + * Execution metrics for a Merge Operation for a Connector that supports RowLevelOperations + * of this type. + */ +public interface MergeMetrics { + + /** + * Returns the number of target rows copied unmodified because they did not match any action. + */ + OptionalLong numTargetRowsCopied(); + + /** + * Returns the number of target rows inserted. + */ + OptionalLong numTargetRowsInserted(); + + /** + * Returns the number of target rows deleted. + */ + OptionalLong numTargetRowsDeleted(); + + /** + * Returns the number of target rows updated. + */ + OptionalLong numTargetRowsUpdated(); + + /** + * Returns the number of target rows matched and updated by a matched clause. + */ + OptionalLong numTargetRowsMatchedUpdated(); + + /** + * Returns the number of target rows matched and deleted by a matched clause. + */ + OptionalLong numTargetRowsMatchedDeleted(); + + /** + * Returns the number of target rows updated by a not matched by source clause. + */ + OptionalLong numTargetRowsNotMatchedBySourceUpdated(); + + /** + * Returns the number of target rows deleted by a not matched by source clause. + */ + OptionalLong numTargetRowsNotMatchedBySourceDeleted(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java index 8c068928415f4..116d360abd431 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.write; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.connector.metric.MergeMetrics; /** * An interface that defines how to write the data to data source for batch processing. @@ -104,4 +105,13 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * clean up the data left by data writers. */ void abort(WriterCommitMessage[] messages); + + /** + * Similar to {@link #commit(WriterCommitMessage[])}, but providing merge exec metrics to + * this batch write. + * @param metrics merge execution metrics + */ + default void commitMerge(WriterCommitMessage[] messages, MergeMetrics metrics) { + commit(messages); + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index d032829d8645f..5f863d9c1ad44 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -23,6 +23,7 @@ import java.util import java.util.OptionalLong import scala.collection.mutable +import scala.collection.mutable.ListBuffer import com.google.common.base.Objects @@ -144,6 +145,8 @@ abstract class InMemoryBaseTable( // The key `Seq[Any]` is the partition values, value is a set of splits, each with a set of rows. val dataMap: mutable.Map[Seq[Any], Seq[BufferedRows]] = mutable.Map.empty + val commits: ListBuffer[Commit] = ListBuffer[Commit]() + def data: Array[BufferedRows] = dataMap.values.flatten.toArray def rows: Seq[InternalRow] = dataMap.values.flatten.flatMap(_.rows).toSeq @@ -575,6 +578,9 @@ abstract class InMemoryBaseTable( } protected abstract class TestBatchWrite extends BatchWrite { + + var commitProperties: mutable.Map[String, String] = mutable.Map.empty[String, String] + override def createBatchWriterFactory(info: PhysicalWriteInfo): DataWriterFactory = { BufferedRowsWriterFactory } @@ -583,8 +589,11 @@ abstract class InMemoryBaseTable( } class Append(val info: LogicalWriteInfo) extends TestBatchWrite { + override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { withData(messages.map(_.asInstanceOf[BufferedRows])) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -593,6 +602,8 @@ abstract class InMemoryBaseTable( val newData = messages.map(_.asInstanceOf[BufferedRows]) dataMap --= newData.flatMap(_.rows.map(getKey)) withData(newData) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -600,6 +611,8 @@ abstract class InMemoryBaseTable( override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { dataMap.clear() withData(messages.map(_.asInstanceOf[BufferedRows])) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() } } @@ -841,6 +854,8 @@ class InMemoryCustomDriverTaskMetric(value: Long) extends CustomTaskMetric { override def value(): Long = value } +case class Commit(id: Long, properties: Map[String, String]) + sealed trait Operation case object Write extends Operation case object Delete extends Operation diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala index aeb807768b076..2cb8c9f192712 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connector.catalog +import java.time.Instant import java.util import org.apache.spark.sql.catalyst.InternalRow @@ -24,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.connector.catalog.constraints.Constraint import org.apache.spark.sql.connector.distributions.{Distribution, Distributions} import org.apache.spark.sql.connector.expressions.{FieldReference, LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform} +import org.apache.spark.sql.connector.metric.MergeMetrics import org.apache.spark.sql.connector.read.{Scan, ScanBuilder} import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite, DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering, RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo, SupportsDelta, Write, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.connector.write.RowLevelOperation.Command @@ -111,7 +113,35 @@ class InMemoryRowLevelOperationTable( override def description(): String = "InMemoryPartitionReplaceOperation" } - private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) extends TestBatchWrite { + abstract class RowLevelOperationBatchWrite extends TestBatchWrite { + + override def commitMerge(messages: Array[WriterCommitMessage], metrics: MergeMetrics): + Unit = { + commitProperties += "numTargetRowsCopied" -> metrics.numTargetRowsCopied().orElse(-1).toString + commitProperties += "numTargetRowsInserted" -> + metrics.numTargetRowsInserted().orElse(-1).toString + commitProperties += "numTargetRowsDeleted" -> + metrics.numTargetRowsDeleted().orElse(-1).toString + commitProperties += "numTargetRowsUpdated" -> + metrics.numTargetRowsUpdated().orElse(-1).toString + commitProperties += "numTargetRowsInserted" -> + metrics.numTargetRowsInserted().orElse(-1).toString + commitProperties += ("numTargetRowsMatchedDeleted" + -> metrics.numTargetRowsMatchedDeleted().orElse(-1).toString) + commitProperties += ("numTargetRowsMatchedUpdated" + -> metrics.numTargetRowsMatchedUpdated().orElse(-1).toString) + commitProperties += ("numTargetRowsNotMatchedBySourceUpdated" + -> metrics.numTargetRowsNotMatchedBySourceUpdated().orElse(-1).toString) + commitProperties += ("numTargetRowsNotMatchedBySourceDeleted" + -> metrics.numTargetRowsNotMatchedBySourceDeleted().orElse(-1).toString) + commit(messages) + commits += Commit(Instant.now().toEpochMilli, commitProperties.toMap) + commitProperties.clear() + } + } + + private case class PartitionBasedReplaceData(scan: InMemoryBatchScan) + extends RowLevelOperationBatchWrite { override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { val newData = messages.map(_.asInstanceOf[BufferedRows]) @@ -165,7 +195,7 @@ class InMemoryRowLevelOperationTable( } } - private object TestDeltaBatchWrite extends DeltaBatchWrite { + private object TestDeltaBatchWrite extends RowLevelOperationBatchWrite with DeltaBatchWrite{ override def createBatchWriterFactory(info: PhysicalWriteInfo): DeltaWriterFactory = { DeltaBufferedRowsWriterFactory } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 4436c6b24f7c8..24ca65c7586fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.OptionalLong + import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkEnv, SparkException, TaskContext} @@ -30,10 +32,11 @@ import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUt import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, REINSERT_OPERATION, UPDATE_OPERATION, WRITE_OPERATION, WRITE_WITH_METADATA_OPERATION} import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog, TableInfo, TableWritePrivilege} import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.connector.metric.CustomMetric +import org.apache.spark.sql.connector.metric.{CustomMetric, MergeMetrics} import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{LongAccumulator, Utils} @@ -398,7 +401,7 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec { /** * The base physical plan for writing data into data source v2. */ -trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { +trait V2TableWriteExec extends V2CommandExec with UnaryExecNode with AdaptiveSparkPlanHelper { def query: SparkPlan def writingTask: WritingSparkTask[_] = DataWritingSparkTask @@ -451,8 +454,12 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { } ) + val mergeMetricsOpt = getMergeMetrics(query) logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.") - batchWrite.commit(messages) + mergeMetricsOpt match { + case Some(metrics) => batchWrite.commitMerge(messages, metrics) + case None => batchWrite.commit(messages) + } logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} committed.") commitProgress = Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value)) } catch { @@ -474,6 +481,30 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode { Nil } + + private def getMergeMetrics(query: SparkPlan): Option[MergeMetrics] = { + collectFirst(query) { case m: MergeRowsExec => m }.map{ n => + MergeMetricsImpl( + numTargetRowsCopied = metric(n.metrics, "numTargetRowsCopied"), + numTargetRowsDeleted = metric(n.metrics, "numTargetRowsDeleted"), + numTargetRowsUpdated = metric(n.metrics, "numTargetRowsUpdated"), + numTargetRowsInserted = metric(n.metrics, "numTargetRowsInserted"), + numTargetRowsMatchedDeleted = metric(n.metrics, "numTargetRowsMatchedDeleted"), + numTargetRowsMatchedUpdated = metric(n.metrics, "numTargetRowsMatchedUpdated"), + numTargetRowsNotMatchedBySourceDeleted = + metric(n.metrics, "numTargetRowsNotMatchedBySourceDeleted"), + numTargetRowsNotMatchedBySourceUpdated = + metric(n.metrics, "numTargetRowsNotMatchedBySourceUpdated") + ) + } + } + + private def metric(metrics: Map[String, SQLMetric], metric: String): OptionalLong = { + metrics.get(metric) match { + case Some(m) => OptionalLong.of(m.value) + case None => OptionalLong.empty() + } + } } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { @@ -729,3 +760,12 @@ private[v2] case class DataWritingSparkTaskResult( */ private[sql] case class StreamWriterCommitProgress(numOutputRows: Long) +private case class MergeMetricsImpl( + override val numTargetRowsCopied: OptionalLong, + override val numTargetRowsDeleted: OptionalLong, + override val numTargetRowsUpdated: OptionalLong, + override val numTargetRowsInserted: OptionalLong, + override val numTargetRowsMatchedUpdated: OptionalLong, + override val numTargetRowsMatchedDeleted: OptionalLong, + override val numTargetRowsNotMatchedBySourceUpdated: OptionalLong, + override val numTargetRowsNotMatchedBySourceDeleted: OptionalLong) extends MergeMetrics diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 21b171bee9614..b6e7586e0e30e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -21,7 +21,7 @@ import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, In, Not} import org.apache.spark.sql.catalyst.optimizer.BuildLeft -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, TableInfo} +import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, InMemoryTable, TableInfo} import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression, LiteralValue} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -1811,6 +1811,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(1, 1000, "hr"), // updated Row(2, 200, "software"), Row(3, 300, "hr"))) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "2")) + assert(commitProps("numTargetRowsInserted") === "0") + assert(commitProps("numTargetRowsUpdated") === "1") + assert(commitProps("numTargetRowsDeleted") === "0") + assert(commitProps("numTargetRowsMatchedUpdated") === "1") + assert(commitProps("numTargetRowsMatchedDeleted") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1856,6 +1867,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(2, 200, "software"), Row(3, 300, "hr"), Row(5, 400, "executive"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === "0") + assert(commitProps("numTargetRowsInserted") === "1") + assert(commitProps("numTargetRowsUpdated") === "0") + assert(commitProps("numTargetRowsDeleted") === "0") + assert(commitProps("numTargetRowsMatchedUpdated") === "0") + assert(commitProps("numTargetRowsMatchedDeleted") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1883,7 +1905,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase |""".stripMargin } - assertMetric(mergeExec, "numTargetRowsCopied", if (deltaMerge) 0 else 3) assertMetric(mergeExec, "numTargetRowsInserted", 0) assertMetric(mergeExec, "numTargetRowsUpdated", 2) @@ -1901,6 +1922,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(3, 300, "hr"), Row(4, 400, "marketing"), Row(5, -1, "executive"))) // updated + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "0") + assert(commitProps("numTargetRowsUpdated") === "2") + assert(commitProps("numTargetRowsDeleted") === "0") + assert(commitProps("numTargetRowsMatchedUpdated") === "1") + assert(commitProps("numTargetRowsMatchedDeleted") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -1947,6 +1979,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing")) // Row(5, 500, "executive") deleted ) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "0") + assert(commitProps("numTargetRowsUpdated") === "0") + assert(commitProps("numTargetRowsDeleted") === "2") + assert(commitProps("numTargetRowsMatchedUpdated") === "0") + assert(commitProps("numTargetRowsMatchedDeleted") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "1") } } @@ -1994,6 +2037,17 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), Row(5, -1, "executive"), // updated Row(6, -1, "dummy"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "1") + assert(commitProps("numTargetRowsUpdated") === "2") + assert(commitProps("numTargetRowsDeleted") === "0") + assert(commitProps("numTargetRowsMatchedUpdated") === "1") + assert(commitProps("numTargetRowsMatchedDeleted") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "0") } } @@ -2032,7 +2086,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceUpdated", 0) assertMetric(mergeExec, "numTargetRowsNotMatchedBySourceDeleted", 1) - checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq( @@ -2042,6 +2095,62 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), // Row(5, 500, "executive") deleted Row(6, -1, "dummy"))) // inserted + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "1") + assert(commitProps("numTargetRowsUpdated") === "0") + assert(commitProps("numTargetRowsDeleted") === "2") + assert(commitProps("numTargetRowsMatchedUpdated") === "0") + assert(commitProps("numTargetRowsMatchedDeleted") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "1") + } + } + + test("SPARK-52689: V2 write metrics for merge") { + Seq("true", "false").foreach { aqeEnabled: String => + withTempView("source") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) { + createAndInitTable("pk INT NOT NULL, salary INT, dep STRING", + """{ "pk": 1, "salary": 100, "dep": "hr" } + |{ "pk": 2, "salary": 200, "dep": "software" } + |{ "pk": 3, "salary": 300, "dep": "hr" } + |{ "pk": 4, "salary": 400, "dep": "marketing" } + |{ "pk": 5, "salary": 500, "dep": "executive" } + |""".stripMargin) + + val sourceDF = Seq(1, 2, 6, 10).toDF("pk") + sourceDF.createOrReplaceTempView("source") + + sql( + s"""MERGE INTO $tableNameAsString t + |USING source s + |ON t.pk = s.pk + |WHEN MATCHED AND salary < 200 THEN + | DELETE + |WHEN NOT MATCHED AND s.pk < 10 THEN + | INSERT (pk, salary, dep) VALUES (s.pk, -1, "dummy") + |WHEN NOT MATCHED BY SOURCE AND salary > 400 THEN + | DELETE + |""".stripMargin + ) + + val table = catalog.loadTable(ident) + val commitProps = table.asInstanceOf[InMemoryTable].commits.last.properties + assert(commitProps("numTargetRowsCopied") === (if (deltaMerge) "0" else "3")) + assert(commitProps("numTargetRowsInserted") === "1") + assert(commitProps("numTargetRowsUpdated") === "0") + assert(commitProps("numTargetRowsDeleted") === "2") + assert(commitProps("numTargetRowsMatchedUpdated") === "0") + assert(commitProps("numTargetRowsMatchedDeleted") === "1") + assert(commitProps("numTargetRowsNotMatchedBySourceUpdated") === "0") + assert(commitProps("numTargetRowsNotMatchedBySourceDeleted") === "1") + + sql(s"DROP TABLE $tableNameAsString") + } + } } }