Skip to content

Commit 7dfb926

Browse files
qiyuandong-dbyhuang-db
authored andcommitted
[SPARK-52312][SQL] Ignore V2WriteCommand when caching DataFrame
### What changes were proposed in this pull request? We found an issue that `V2WriteCommand` plans were not properly excluded from `DataFrame` caching, which can cause unintended side effects. For example, when `cache()` is called on a `DataFrame` created from an `INSERT` SQL statement, the `INSERT` command gets re-executed during the caching process because the underlying plan is not being ignored. This PR fixes this by - making `V2WriteCommand` extend the `IgnoreCachedData` trait - updating the caching logic to skip plans that extend `IgnoreCachedData`, preventing inapplicable plans from being cached ### Why are the changes needed? This is a bug, since calling `cache()` on a `DataFrame` shouldn't re-execute the command that created it. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests were added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#51032 from qiyuandong-db/SPARK-52312-ignore-v2writecommand-caching. Authored-by: Qiyuan Dong <qiyuan.dong@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 8d73424 commit 7dfb926

File tree

3 files changed

+21
-1
lines changed

3 files changed

+21
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,11 @@ trait KeepAnalyzedQuery extends Command {
5656
/**
5757
* Base trait for DataSourceV2 write commands
5858
*/
59-
trait V2WriteCommand extends UnaryCommand with KeepAnalyzedQuery with CTEInChildren {
59+
trait V2WriteCommand
60+
extends UnaryCommand
61+
with KeepAnalyzedQuery
62+
with CTEInChildren
63+
with IgnoreCachedData {
6064
def table: NamedRelation
6165
def query: LogicalPlan
6266
def isByName: Boolean

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,11 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
125125
storageLevel: StorageLevel): Unit = {
126126
if (storageLevel == StorageLevel.NONE) {
127127
// Do nothing for StorageLevel.NONE since it will not actually cache any data.
128+
} else if (unnormalizedPlan.isInstanceOf[IgnoreCachedData]) {
129+
logWarning(
130+
log"Asked to cache a plan that is inapplicable for caching: " +
131+
log"${MDC(LOGICAL_PLAN, unnormalizedPlan)}"
132+
)
128133
} else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
129134
logWarning("Asked to cache already cached data.")
130135
} else {

sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,4 +875,15 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
875875
}
876876
}
877877
}
878+
879+
test("SPARK-52312: caching dataframe created from INSERT shouldn't re-execute the command") {
880+
spark.sql("CREATE TABLE testcat.table_name (c1 int, c2 string) USING foo")
881+
882+
val insertDF = spark.sql("INSERT INTO testcat.table_name VALUES (1, 'a'), (2, 'b')")
883+
checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b")))
884+
885+
// Caching the DataFrame created from INSERT should not re-execute the command
886+
insertDF.cache()
887+
checkAnswer(spark.table("testcat.table_name"), Seq(Row(1, "a"), Row(2, "b")))
888+
}
878889
}

0 commit comments

Comments
 (0)