Skip to content

Commit a9881a8

Browse files
committed
[SPARK-52260][SQL][TESTS] Add test for Update/Merge Into/Delete From table
### What changes were proposed in this pull request? Add test for Update/Merge Into/Delete From table ### Why are the changes needed? Improve test coverage for the table constraint project ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #50982 from gengliangwang/dmlTest. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 4325d3a commit a9881a8

File tree

7 files changed

+446
-20
lines changed

7 files changed

+446
-20
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/RowLevelOperationTable.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.connector.write
2020
import java.util
2121

2222
import org.apache.spark.sql.connector.catalog.{Column, SupportsRead, SupportsRowLevelOperations, SupportsWrite, Table, TableCapability}
23+
import org.apache.spark.sql.connector.catalog.constraints.Constraint
2324
import org.apache.spark.sql.connector.read.ScanBuilder
2425
import org.apache.spark.sql.types.StructType
2526
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -40,6 +41,7 @@ private[sql] case class RowLevelOperationTable(
4041
override def schema: StructType = table.schema
4142
override def columns: Array[Column] = table.columns()
4243
override def capabilities: util.Set[TableCapability] = table.capabilities
44+
override def constraints(): Array[Constraint] = table.constraints()
4345
override def toString: String = table.toString
4446

4547
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {

sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTableCatalog.scala

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,37 +17,27 @@
1717

1818
package org.apache.spark.sql.connector.catalog
1919

20-
import java.util
21-
2220
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
23-
import org.apache.spark.sql.connector.expressions.Transform
2421

2522
class InMemoryRowLevelOperationTableCatalog extends InMemoryTableCatalog {
2623
import CatalogV2Implicits._
2724

28-
override def createTable(
29-
ident: Identifier,
30-
columns: Array[Column],
31-
partitions: Array[Transform],
32-
properties: util.Map[String, String]): Table = {
25+
override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
3326
if (tables.containsKey(ident)) {
3427
throw new TableAlreadyExistsException(ident.asMultipartIdentifier)
3528
}
3629

37-
InMemoryTableCatalog.maybeSimulateFailedTableCreation(properties)
30+
InMemoryTableCatalog.maybeSimulateFailedTableCreation(tableInfo.properties)
3831

3932
val tableName = s"$name.${ident.quoted}"
40-
val schema = CatalogV2Util.v2ColumnsToStructType(columns)
41-
val table = new InMemoryRowLevelOperationTable(tableName, schema, partitions, properties)
33+
val schema = CatalogV2Util.v2ColumnsToStructType(tableInfo.columns)
34+
val table = new InMemoryRowLevelOperationTable(
35+
tableName, schema, tableInfo.partitions, tableInfo.properties, tableInfo.constraints())
4236
tables.put(ident, table)
4337
namespaces.putIfAbsent(ident.namespace.toList, Map())
4438
table
4539
}
4640

47-
override def createTable(ident: Identifier, tableInfo: TableInfo): Table = {
48-
createTable(ident, tableInfo.columns(), tableInfo.partitions(), tableInfo.properties)
49-
}
50-
5141
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
5242
val table = loadTable(ident).asInstanceOf[InMemoryRowLevelOperationTable]
5343
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)

sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,31 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase {
6363
Row(6, "hr", "new-text")))
6464
}
6565

66+
test("delete from table with table constraints") {
67+
sql(
68+
s"""
69+
|CREATE TABLE $tableNameAsString (
70+
| pk INT NOT NULL PRIMARY KEY,
71+
| id INT UNIQUE,
72+
| dep STRING,
73+
| CONSTRAINT pk_check CHECK (pk > 0))
74+
| PARTITIONED BY (dep)
75+
|""".stripMargin)
76+
append("pk INT NOT NULL, id INT, dep STRING",
77+
"""{ "pk": 1, "id": 2, "dep": "hr" }
78+
|{ "pk": 2, "id": 4, "dep": "eng" }
79+
|{ "pk": 3, "id": 6, "dep": "eng" }
80+
|""".stripMargin)
81+
sql(s"DELETE FROM $tableNameAsString WHERE pk < 2")
82+
checkAnswer(
83+
sql(s"SELECT * FROM $tableNameAsString"),
84+
Seq(Row(2, 4, "eng"), Row(3, 6, "eng")))
85+
sql(s"DELETE FROM $tableNameAsString WHERE pk >=3")
86+
checkAnswer(
87+
sql(s"SELECT * FROM $tableNameAsString"),
88+
Seq(Row(2, 4, "eng")))
89+
}
90+
6691
test("delete from table containing struct column with default value") {
6792
sql(
6893
s"""CREATE TABLE $tableNameAsString (

sql/core/src/test/scala/org/apache/spark/sql/connector/RowLevelOperationSuiteBase.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2727
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, GenericRowWithSchema}
2828
import org.apache.spark.sql.catalyst.types.DataTypeUtils
2929
import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
30-
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Delete, Identifier, InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog, Insert, MetadataColumn, Operation, Reinsert, Update, Write}
30+
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Delete, Identifier, InMemoryRowLevelOperationTable, InMemoryRowLevelOperationTableCatalog, Insert, MetadataColumn, Operation, Reinsert, TableInfo, Update, Write}
3131
import org.apache.spark.sql.connector.expressions.LogicalExpressions.{identity, reference}
3232
import org.apache.spark.sql.connector.expressions.Transform
3333
import org.apache.spark.sql.execution.{InSubqueryExec, QueryExecution, SparkPlan}
@@ -102,7 +102,12 @@ abstract class RowLevelOperationSuiteBase
102102

103103
protected def createTable(columns: Array[Column]): Unit = {
104104
val transforms = Array[Transform](identity(reference(Seq("dep"))))
105-
catalog.createTable(ident, columns, transforms, extraTableProps)
105+
val tableInfo = new TableInfo.Builder()
106+
.withColumns(columns)
107+
.withPartitions(transforms)
108+
.withProperties(extraTableProps)
109+
.build()
110+
catalog.createTable(ident, tableInfo)
106111
}
107112

108113
protected def createAndInitTable(schemaString: String, jsonData: String): Unit = {

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ trait DDLCommandTestUtils extends SQLTestUtils {
6262
(f: String => Unit): Unit = {
6363
val nsCat = s"$cat.$ns"
6464
withNamespace(nsCat) {
65-
sql(s"CREATE NAMESPACE $nsCat")
65+
sql(s"CREATE NAMESPACE IF NOT EXISTS $nsCat")
6666
val t = s"$nsCat.$tableName"
6767
withTable(t) {
6868
f(t)

0 commit comments

Comments
 (0)