Skip to content

Commit ed02db5

Browse files
szehon-hoyhuang-db
authored andcommitted
[SPARK-52272][SQL] V2SessionCatalog does not alter schema on Hive Catalog
### What changes were proposed in this pull request? V2SessionCatalog delegates alterTable to SessionCatalog (V1) alterTable. In the case of "hive", this should be changed to delegate to either alterTable or alterTableDataSchema. ### Why are the changes needed? SessionCatalog has two API's, alterTable and alterTableDataSchema. alterTable will silently ignore schema changes in Hive implementation (the API Javadoc: ` If the underlying implementation does not support altering a certain field, this becomes a no-op.` ). So for Hive case, V2SessionCatalog needs to delegate schema changes to V1 alterTableDataSchema ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new test in HiveDDLSuite, verify functionality and which method is called. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51007 from szehon-ho/hive_alter_table_comment_new. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent d18ee39 commit ed02db5

File tree

3 files changed

+50
-56
lines changed

3 files changed

+50
-56
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,10 +302,15 @@ class V2SessionCatalog(catalog: SessionCatalog)
302302

303303
val finalProperties = CatalogV2Util.applyClusterByChanges(properties, schema, changes)
304304
try {
305-
catalog.alterTable(
306-
catalogTable.copy(
307-
properties = finalProperties, schema = schema, owner = owner, comment = comment,
308-
collation = collation, storage = storage))
305+
if (changes.exists(!_.isInstanceOf[TableChange.ColumnChange])) {
306+
catalog.alterTable(
307+
catalogTable.copy(
308+
properties = finalProperties, schema = schema, owner = owner, comment = comment,
309+
collation = collation, storage = storage))
310+
}
311+
if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) {
312+
catalog.alterTableDataSchema(ident.asTableIdentifier, schema)
313+
}
309314
} catch {
310315
case _: NoSuchTableException =>
311316
throw QueryCompilationErrors.noSuchTableError(ident)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -590,22 +590,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
590590
parameters = Map("fieldName" -> "`missing_col`", "fields" -> "`id`, `data`"))
591591
}
592592

593-
test("alterTable: rename top-level column") {
594-
val catalog = newCatalog()
595-
596-
catalog.createTable(testIdent, columns, emptyTrans, emptyProps)
597-
val table = catalog.loadTable(testIdent)
598-
599-
assert(table.columns === columns)
600-
601-
catalog.alterTable(testIdent, TableChange.renameColumn(Array("id"), "some_id"))
602-
val updated = catalog.loadTable(testIdent)
603-
604-
val expectedSchema = new StructType().add("some_id", IntegerType).add("data", StringType)
605-
606-
assert(updated.schema == expectedSchema)
607-
}
608-
609593
test("alterTable: rename nested column") {
610594
val catalog = newCatalog()
611595

@@ -627,26 +611,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
627611
assert(updated.columns === expectedColumns)
628612
}
629613

630-
test("alterTable: rename struct column") {
631-
val catalog = newCatalog()
632-
633-
val pointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
634-
val tableColumns = columns :+ Column.create("point", pointStruct)
635-
636-
catalog.createTable(testIdent, tableColumns, emptyTrans, emptyProps)
637-
val table = catalog.loadTable(testIdent)
638-
639-
assert(table.columns === tableColumns)
640-
641-
catalog.alterTable(testIdent, TableChange.renameColumn(Array("point"), "p"))
642-
val updated = catalog.loadTable(testIdent)
643-
644-
val newPointStruct = new StructType().add("x", DoubleType).add("y", DoubleType)
645-
val expectedColumns = columns :+ Column.create("p", newPointStruct)
646-
647-
assert(updated.columns === expectedColumns)
648-
}
649-
650614
test("alterTable: rename missing column fails") {
651615
val catalog = newCatalog()
652616

@@ -686,21 +650,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
686650
assert(updated.columns === expectedColumns)
687651
}
688652

689-
test("alterTable: delete top-level column") {
690-
val catalog = newCatalog()
691-
692-
catalog.createTable(testIdent, columns, emptyTrans, emptyProps)
693-
val table = catalog.loadTable(testIdent)
694-
695-
assert(table.columns === columns)
696-
697-
catalog.alterTable(testIdent, TableChange.deleteColumn(Array("id"), false))
698-
val updated = catalog.loadTable(testIdent)
699-
700-
val expectedSchema = new StructType().add("data", StringType)
701-
assert(updated.schema == expectedSchema)
702-
}
703-
704653
test("alterTable: delete nested column") {
705654
val catalog = newCatalog()
706655

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ import java.net.URI
2222
import java.time.LocalDateTime
2323
import java.util.Locale
2424

25+
import scala.jdk.CollectionConverters._
26+
2527
import org.apache.hadoop.fs.Path
2628
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
29+
import org.mockito.ArgumentMatchers.any
30+
import org.mockito.Mockito.{spy, times, verify}
2731
import org.scalatest.BeforeAndAfterEach
2832

2933
import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
@@ -32,12 +36,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier
3236
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
3337
import org.apache.spark.sql.catalyst.catalog._
3438
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
35-
import org.apache.spark.sql.connector.catalog.CatalogManager
39+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, TableChange, TableInfo}
3640
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
3741
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
3842
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
3943
import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
4044
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
45+
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
4146
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
4247
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
4348
import org.apache.spark.sql.hive.orc.OrcFileOperator
@@ -3392,4 +3397,39 @@ class HiveDDLSuite
33923397
)
33933398
}
33943399
}
3400+
3401+
test("SPARK-52272: V2SessionCatalog does not alter schema on Hive Catalog") {
3402+
val spyCatalog = spy(spark.sessionState.catalog.externalCatalog)
3403+
val v1SessionCatalog = new SessionCatalog(spyCatalog)
3404+
val v2SessionCatalog = new V2SessionCatalog(v1SessionCatalog)
3405+
withTable("t1") {
3406+
val identifier = Identifier.of(Array("default"), "t1")
3407+
val outputSchema = new StructType().add("a", IntegerType, true, "comment1")
3408+
v2SessionCatalog.createTable(
3409+
identifier,
3410+
new TableInfo.Builder()
3411+
.withProperties(Map.empty.asJava)
3412+
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
3413+
.withPartitions(Array.empty)
3414+
.build()
3415+
)
3416+
v2SessionCatalog.alterTable(identifier, TableChange.setProperty("foo", "bar"))
3417+
val loaded = v2SessionCatalog.loadTable(identifier)
3418+
assert(loaded.properties().get("foo") == "bar")
3419+
3420+
verify(spyCatalog, times(1)).alterTable(any[CatalogTable])
3421+
verify(spyCatalog, times(0)).alterTableDataSchema(
3422+
any[String], any[String], any[StructType])
3423+
3424+
v2SessionCatalog.alterTable(identifier,
3425+
TableChange.updateColumnComment(Array("a"), "comment2"))
3426+
val loaded2 = v2SessionCatalog.loadTable(identifier)
3427+
assert(loaded2.columns().length == 1)
3428+
assert(loaded2.columns.head.comment() == "comment2")
3429+
3430+
verify(spyCatalog, times(1)).alterTable(any[CatalogTable])
3431+
verify(spyCatalog, times(1)).alterTableDataSchema(
3432+
any[String], any[String], any[StructType])
3433+
}
3434+
}
33953435
}

0 commit comments

Comments
 (0)