Skip to content

Commit 5a4932b

Browse files
szehon-hocloud-fan
authored andcommitted
[SPARK-52683][SQL] Support ExternalCatalog alterTableSchema
### What changes were proposed in this pull request? Add a new ExternalCatalog and SessionCatalog API alterTableSchema that will supersede alterTableDataSchema. ### Why are the changes needed? Because ExternalCatalog::alterTableDataSchema takes dataSchema only (without partition columns), we lost the context of the partition column. This will make it impossible for us to support column order where partition column are not at the end. See #51342 for context More generally, this is a better intuitive API than alterTableDataSchema, because the caller no longer needs to strip out partition columns. Also, it is not immediately intuitive that data schema means without partition columns. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test, move test for alterTableDataSchema to the new API ### Was this patch authored or co-authored using generative AI tooling? No Closes #51373 from szehon-ho/alter_table_schema. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent d89a4af commit 5a4932b

File tree

14 files changed

+159
-16
lines changed

14 files changed

+159
-16
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,21 @@ trait ExternalCatalog {
120120
* @param db Database that table to alter schema for exists in
121121
* @param table Name of table to alter schema for
122122
* @param newDataSchema Updated data schema to be used for the table.
123+
* @deprecated since 4.1.0 use `alterTableSchema` instead.
123124
*/
124125
def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit
125126

127+
/**
128+
* Alter the schema of a table identified by the provided database and table name.
129+
*
130+
* All partition columns must be preserved.
131+
*
132+
* @param db Database that table to alter schema for exists in
133+
* @param table Name of table to alter schema for
134+
* @param newSchema Updated data schema to be used for the table.
135+
*/
136+
def alterTableSchema(db: String, table: String, newSchema: StructType): Unit
137+
126138
/** Alter the statistics of a table. If `stats` is None, then remove all existing statistics. */
127139
def alterTableStats(db: String, table: String, stats: Option[CatalogStatistics]): Unit
128140

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogWithListener.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
125125
postToAll(AlterTableEvent(db, table, AlterTableKind.DATASCHEMA))
126126
}
127127

128+
override def alterTableSchema(db: String, table: String, newSchema: StructType): Unit = {
129+
postToAll(AlterTablePreEvent(db, table, AlterTableKind.SCHEMA))
130+
delegate.alterTableSchema(db, table, newSchema)
131+
postToAll(AlterTableEvent(db, table, AlterTableKind.SCHEMA))
132+
}
133+
128134
override def alterTableStats(
129135
db: String,
130136
table: String,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,21 @@ class InMemoryCatalog(
331331
catalog(db).tables(table).table = origTable.copy(schema = newSchema)
332332
}
333333

334+
override def alterTableSchema(
335+
db: String,
336+
table: String,
337+
newSchema: StructType): Unit = synchronized {
338+
requireTableExists(db, table)
339+
val origTable = catalog(db).tables(table).table
340+
341+
val partCols = origTable.partitionColumnNames
342+
assert(newSchema.map(_.name).takeRight(partCols.length) == partCols,
343+
s"Partition columns ${partCols.mkString("[", ", ", "]")} are only supported at the end of " +
344+
s"the new schema ${newSchema.catalogString} for now.")
345+
346+
catalog(db).tables(table).table = origTable.copy(schema = newSchema)
347+
}
348+
334349
override def alterTableStats(
335350
db: String,
336351
table: String,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ class SessionCatalog(
484484
*
485485
* @param identifier TableIdentifier
486486
* @param newDataSchema Updated data schema to be used for the table
487+
* @deprecated since 4.1.0 use `alterTableSchema` instead.
487488
*/
488489
def alterTableDataSchema(
489490
identifier: TableIdentifier,
@@ -507,6 +508,25 @@ class SessionCatalog(
507508
externalCatalog.alterTableDataSchema(db, table, newDataSchema)
508509
}
509510

511+
/**
512+
* Alter the schema of a table identified by the provided table identifier. All partition columns
513+
* must be preserved.
514+
*
515+
* @param identifier TableIdentifier
516+
* @param newSchema Updated schema to be used for the table
517+
*/
518+
def alterTableSchema(
519+
identifier: TableIdentifier,
520+
newSchema: StructType): Unit = {
521+
val qualifiedIdent = qualifyIdentifier(identifier)
522+
val db = qualifiedIdent.database.get
523+
val table = qualifiedIdent.table
524+
requireDbExists(db)
525+
requireTableExists(qualifiedIdent)
526+
527+
externalCatalog.alterTableSchema(db, table, newSchema)
528+
}
529+
510530
private def columnNameResolved(
511531
resolver: Resolver,
512532
schema: StructType,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ case class RenameTableEvent(
126126
object AlterTableKind extends Enumeration {
127127
val TABLE = "table"
128128
val DATASCHEMA = "dataSchema"
129+
val SCHEMA = "schema"
129130
val STATS = "stats"
130131
}
131132

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,9 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
128128

129129
// ALTER schema
130130
val newSchema = new StructType().add("id", "long", nullable = false)
131-
catalog.alterTableDataSchema("db5", "tbl1", newSchema)
132-
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) ::
133-
AlterTableEvent("db5", "tbl1", AlterTableKind.DATASCHEMA) :: Nil)
131+
catalog.alterTableSchema("db5", "tbl1", newSchema)
132+
checkEvents(AlterTablePreEvent("db5", "tbl1", AlterTableKind.SCHEMA) ::
133+
AlterTableEvent("db5", "tbl1", AlterTableKind.SCHEMA) :: Nil)
134134

135135
// ALTER stats
136136
catalog.alterTableStats("db5", "tbl1", None)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,12 +245,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
245245

246246
test("alter table schema") {
247247
val catalog = newBasicCatalog()
248-
val newDataSchema = StructType(Seq(
248+
val newSchema = StructType(Seq(
249249
StructField("col1", IntegerType),
250-
StructField("new_field_2", StringType)))
251-
catalog.alterTableDataSchema("db2", "tbl1", newDataSchema)
250+
StructField("new_field_2", StringType),
251+
StructField("a", IntegerType),
252+
StructField("b", StringType)))
253+
catalog.alterTableSchema("db2", "tbl1", newSchema)
252254
val newTbl1 = catalog.getTable("db2", "tbl1")
253-
assert(newTbl1.dataSchema == newDataSchema)
255+
assert(newTbl1.dataSchema == StructType(newSchema.take(2)))
256+
assert(newTbl1.schema == newSchema)
254257
}
255258

256259
test("alter table stats") {
@@ -983,6 +986,32 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
983986
"db2", "tbl1", Seq(part1.spec), ignoreIfNotExists = false, purge = false, retainData = false)
984987
assert(fs.exists(partPath))
985988
}
989+
990+
test("SPARK-52683: support alterTableSchema partitioned columns") {
991+
val catalog = newBasicCatalog()
992+
993+
val schema = new StructType()
994+
.add("a", IntegerType)
995+
.add("b", IntegerType)
996+
.add("c", StringType)
997+
val table = CatalogTable(
998+
identifier = TableIdentifier("t", Some("db1")),
999+
tableType = CatalogTableType.MANAGED,
1000+
storage = storageFormat,
1001+
schema = schema,
1002+
partitionColumnNames = Seq("c"),
1003+
provider = Some("hive"))
1004+
catalog.createTable(table, ignoreIfExists = false)
1005+
1006+
val newSchema = new StructType()
1007+
.add("b", LongType)
1008+
.add("a", IntegerType)
1009+
.add("c", StringType)
1010+
catalog.alterTableSchema("db1", "t", newSchema)
1011+
1012+
assert(catalog.getTable("db1", "t").schema == newSchema)
1013+
assert(catalog.getTable("db1", "t").partitionColumnNames == Seq("c"))
1014+
}
9861015
}
9871016

9881017

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
564564
}
565565
}
566566

567-
test("alter table add columns") {
567+
test("alter data schema add columns") {
568568
withBasicCatalog { sessionCatalog =>
569569
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
570570
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
@@ -580,6 +580,22 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
580580
}
581581
}
582582

583+
test("alter schema add columns") {
584+
withBasicCatalog { sessionCatalog =>
585+
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)
586+
val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1")
587+
val newSchema = StructType(oldTab.dataSchema.fields ++
588+
Seq(StructField("c3", IntegerType)) ++ oldTab.partitionSchema)
589+
590+
sessionCatalog.alterTableSchema(
591+
TableIdentifier("t1", Some("default")),
592+
newSchema)
593+
594+
val newTab = sessionCatalog.externalCatalog.getTable("default", "t1")
595+
assert(newTab.schema == newSchema)
596+
}
597+
}
598+
583599
test("alter table drop columns") {
584600
withBasicCatalog { sessionCatalog =>
585601
sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false)

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,9 @@ case class AlterTableAddColumnsCommand(
247247
}
248248
DDLUtils.checkTableColumns(catalogTable, StructType(colsWithProcessedDefaults))
249249

250-
val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
251-
catalog.alterTableDataSchema(table, StructType(existingSchema ++ colsWithProcessedDefaults))
250+
val existingDataSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
251+
catalog.alterTableSchema(table,
252+
StructType(existingDataSchema ++ colsWithProcessedDefaults ++ catalogTable.partitionSchema))
252253
Seq.empty[Row]
253254
}
254255

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
309309
collation = collation, storage = storage))
310310
}
311311
if (changes.exists(_.isInstanceOf[TableChange.ColumnChange])) {
312-
catalog.alterTableDataSchema(ident.asTableIdentifier, schema)
312+
catalog.alterTableSchema(ident.asTableIdentifier, schema)
313313
}
314314
} catch {
315315
case _: NoSuchTableException =>

0 commit comments

Comments
 (0)