Skip to content

Commit b2cea5f

Browse files
sryzaasl3
authored andcommitted
[SPARK-52576][SDP] Drop/recreate on full refresh and MV update
### What changes were proposed in this pull request? Some pipeline runs result in wiping out and replacing all the data for a table: - Every run of a materialized view - Runs of streaming tables that have the "full refresh" flag In the current implementation, this "wipe out and replace" is implemented by: - Truncating the table - Altering the table to drop/update/add columns that don't match the columns in the DataFrame for the current run The reason that we want originally wanted to truncate + alter instead of drop / recreate is that dropping has some undesirable effects. E.g. it interrupts readers of the table and wipes away things like ACLs. However, we discovered that not all catalogs support dropping columns (e.g. Hive does not), and there’s no way to tell whether a catalog supports dropping columns or not. So this PR changes the implementation to drop/recreate the table instead of truncate/alter. ### Why are the changes needed? See section above. ### Does this PR introduce _any_ user-facing change? Yes, see section above. No releases contained the old behavior. ### How was this patch tested? - Tests in MaterializeTablesSuite - Ran the tests in MaterializeTablesSuite with Hive instead of the default catalog ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51280 from sryza/drop-on-full-refresh. Authored-by: Sandy Ryza <sandy.ryza@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 802bcc2 commit b2cea5f

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,13 @@ object DatasetManager extends Logging {
178178
}
179179

180180
// Wipe the data if we need to
181-
if ((isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined) {
182-
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
181+
val dropTable = (isFullRefresh || !table.isStreamingTable) && existingTableOpt.isDefined
182+
if (dropTable) {
183+
catalog.dropTable(identifier)
183184
}
184185

185186
// Alter the table if we need to
186-
if (existingTableOpt.isDefined) {
187+
if (existingTableOpt.isDefined && !dropTable) {
187188
val existingSchema = existingTableOpt.get.schema()
188189

189190
val targetSchema = if (table.isStreamingTable && !isFullRefresh) {
@@ -198,7 +199,7 @@ object DatasetManager extends Logging {
198199
}
199200

200201
// Create the table if we need to
201-
if (existingTableOpt.isEmpty) {
202+
if (dropTable || existingTableOpt.isEmpty) {
202203
catalog.createTable(
203204
identifier,
204205
new TableInfo.Builder()

0 commit comments

Comments
 (0)