From 18b06c7b14ef125014feb5e180c5e6fc330043c6 Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Tue, 27 May 2025 14:45:38 +0200 Subject: [PATCH 1/4] Ignore V2WriteCommand when caching --- .../catalyst/plans/logical/v2Commands.scala | 6 ++- .../spark/sql/execution/CacheManager.scala | 21 +++++--- .../spark/sql/DataFrameWriterV2Suite.scala | 49 +++++++++++++++++++ 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 089b1a4afab1f..43756ce5eacbb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -56,7 +56,11 @@ trait KeepAnalyzedQuery extends Command { /** * Base trait for DataSourceV2 write commands */ -trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with CTEInChildren { +trait V2WriteCommand + extends UnaryCommand + with KeepAnalyzedQuery + with CTEInChildren + with IgnoreCachedData { def table: NamedRelation def query: LogicalPlan def isByName: Boolean diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 029a938318ec7..e8405fcd7ab75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -94,13 +94,20 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { query: Dataset[_], tableName: Option[String], storageLevel: StorageLevel): Unit = { - cacheQueryInternal( - query.sparkSession, - query.queryExecution.analyzed, - query.queryExecution.normalized, - tableName, - storageLevel - ) + if (query.queryExecution.analyzed.isInstanceOf[IgnoreCachedData]) { + logWarning( + log"Asked to cache a plan that is inapplicable for caching: " + + log"${MDC(LOGICAL_PLAN, query.queryExecution.analyzed)}" + ) + } else { + cacheQueryInternal( + query.sparkSession, + query.queryExecution.analyzed, + query.queryExecution.normalized, + tableName, + storageLevel + ) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index 0b0770fef7da4..e7898079933b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -875,4 +875,53 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo } } } + + test("SPARK-52312: caching dataframe created from INSERT shouldn't re-execute the command") { + spark.sql("CREATE TABLE testcat.table_name (c1 int, c2 string) USING foo") + + val insertDF = spark.sql("INSERT INTO testcat.table_name VALUES (1, 'a'), (2, 'b')") + checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"))) + + // Caching the DataFrame created from INSERT should not re-execute the command + insertDF.cache() + checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"))) + } + + test( + "SPARK-52312: caching dataframe created from INSERT OVERWRITE shouldn't re-execute the command" + ) { + spark.sql("CREATE TABLE testcat.table_name (c1 int, c2 string) USING foo") + + val insertDF = spark.sql("INSERT OVERWRITE testcat.table_name VALUES (1, 'a'), (2, 'b')") + checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"))) + + // Insert one more row after the INSERT OVERWRITE + spark.sql("INSERT INTO testcat.table_name VALUES (3, 'c')") + checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))) + + // Caching the DataFrame created from INSERT should not re-execute the command + insertDF.cache() + checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))) + } + + test( + "SPARK-52312: " + + "caching dataframe created from dynamic INSERT OVERWRITE shouldn't re-execute the command" + ) { + // Create a table partitioned by c1 + spark.sql("CREATE TABLE testcat.table_name (c1 int, c2 string) USING foo PARTITIONED BY (c1)") + + spark.sql(s"SET spark.sql.sources.partitionOverwriteMode = DYNAMIC") + + val insertDF = sql("INSERT OVERWRITE testcat.table_name VALUES (1, 'a'), (2, 'b')") + checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"))) + + // Insert one more row after the dynamic INSERT OVERWRITE, in the existing partition c1 = 1 + spark.sql("INSERT INTO testcat.table_name VALUES (1, 'c')") + checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"), Row(1, "c"))) + + // Caching the DataFrame created from INSERT should not re-execute the command + insertDF.cache() + checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"), Row(1, "c"))) + } } From e9942b786127d8af3f47876a6c56ff126472997a Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Wed, 28 May 2025 13:21:26 +0200 Subject: [PATCH 2/4] Remove repeated tests --- .../spark/sql/DataFrameWriterV2Suite.scala | 38 ------------------- 1 file changed, 38 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index e7898079933b6..c9c29ab4746b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -886,42 +886,4 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo insertDF.cache() checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"))) } - - test( - "SPARK-52312: caching dataframe created from INSERT OVERWRITE shouldn't re-execute the command" - ) { - spark.sql("CREATE TABLE testcat.table_name (c1 int, c2 string) USING foo") - - val insertDF = spark.sql("INSERT OVERWRITE testcat.table_name VALUES (1, 'a'), (2, 'b')") - checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"))) - - // Insert one more row after the INSERT OVERWRITE - spark.sql("INSERT INTO testcat.table_name VALUES (3, 'c')") - checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))) - - // Caching the DataFrame created from INSERT should not re-execute the command - insertDF.cache() - checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"))) - } - - test( - "SPARK-52312: " + - "caching dataframe created from dynamic INSERT OVERWRITE shouldn't re-execute the command" - ) { - // Create a table partitioned by c1 - spark.sql("CREATE TABLE testcat.table_name (c1 int, c2 string) USING foo PARTITIONED BY (c1)") - - spark.sql(s"SET spark.sql.sources.partitionOverwriteMode = DYNAMIC") - - val insertDF = sql("INSERT OVERWRITE testcat.table_name VALUES (1, 'a'), (2, 'b')") - checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"))) - - // Insert one more row after the dynamic INSERT OVERWRITE, in the existing partition c1 = 1 - spark.sql("INSERT INTO testcat.table_name VALUES (1, 'c')") - checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"), Row(1, "c"))) - - // Caching the DataFrame created from INSERT should not re-execute the command - insertDF.cache() - checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b"), Row(1, "c"))) - } } From 7c6cc96347eb452c72aabf7c8bf21211e45d58f7 Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Wed, 28 May 2025 13:26:18 +0200 Subject: [PATCH 3/4] Move IgnoreCachedData check to cacheQueryInternal --- .../spark/sql/execution/CacheManager.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index e8405fcd7ab75..7d456d107aad3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -94,20 +94,13 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { query: Dataset[_], tableName: Option[String], storageLevel: StorageLevel): Unit = { - if (query.queryExecution.analyzed.isInstanceOf[IgnoreCachedData]) { - logWarning( - log"Asked to cache a plan that is inapplicable for caching: " + - log"${MDC(LOGICAL_PLAN, query.queryExecution.analyzed)}" - ) - } else { - cacheQueryInternal( - query.sparkSession, - query.queryExecution.analyzed, - query.queryExecution.normalized, - tableName, - storageLevel - ) - } + cacheQueryInternal( + query.sparkSession, + query.queryExecution.analyzed, + query.queryExecution.normalized, + tableName, + storageLevel + ) } /** @@ -134,6 +127,11 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // Do nothing for StorageLevel.NONE since it will not actually cache any data. } else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) { logWarning("Asked to cache already cached data.") + } else if (unnormalizedPlan.isInstanceOf[IgnoreCachedData]) { + logWarning( + log"Asked to cache a plan that is inapplicable for caching: " + + log"${MDC(LOGICAL_PLAN, unnormalizedPlan)}" + ) } else { val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) val inMemoryRelation = sessionWithConfigsOff.withActive { From 05539d3d9428d68ea37946d6c2df8300f3fc53fd Mon Sep 17 00:00:00 2001 From: Qiyuan Dong Date: Thu, 29 May 2025 11:55:02 +0200 Subject: [PATCH 4/4] address comments --- .../scala/org/apache/spark/sql/execution/CacheManager.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 7d456d107aad3..5b68be2c4ce55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -125,13 +125,13 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { storageLevel: StorageLevel): Unit = { if (storageLevel == StorageLevel.NONE) { // Do nothing for StorageLevel.NONE since it will not actually cache any data. - } else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) { - logWarning("Asked to cache already cached data.") } else if (unnormalizedPlan.isInstanceOf[IgnoreCachedData]) { logWarning( log"Asked to cache a plan that is inapplicable for caching: " + log"${MDC(LOGICAL_PLAN, unnormalizedPlan)}" ) + } else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) { + logWarning("Asked to cache already cached data.") } else { val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) val inMemoryRelation = sessionWithConfigsOff.withActive {