Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,8 @@ class SparkSession private(
parsedPlan
}
}
Dataset.ofRows(self, plan, tracker)
Dataset.ofRows(self, plan, tracker,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkExecuteStatementOperation's code eventually leads here.

QueryExecution.determineShuffleCleanupMode(sessionState.conf))
}

/** @inheritdoc */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEProp
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.util.Utils

class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSparkSession
with AdaptiveSparkPlanHelper {
Expand Down Expand Up @@ -205,6 +206,9 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5")
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5")

// Tests depend on intermediate results that would otherwise be cleaned up when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a red light to me. Runtime filter is a very powerful optimization and we should make sure the shuffle cleanup won't break it.

Copy link
Contributor Author

@karuppayya karuppayya Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent catch! Thanks @cloud-fan

I don't believe the root cause is with shuffle file cleanup itself, but rather with how Adaptive Query Execution handles subquery execution/synchronization.

  1. During codegen phase, FilterExec looks for subquery results(bloom filter) but doesn't find them (at times), so it skips using the Bloom filter optimization.
    The lazy val gets populated based on the subquery execution result ie null if had not complete, a bloom filter otherwise. This is then later used in codegen
  // The bloom filter created from `bloomFilterExpression`.
  @transient private lazy val bloomFilter = {
    val bytes = bloomFilterExpression.eval().asInstanceOf[Array[Byte]]
    if (bytes == null) null else deserialize(bytes)
  }

  1. The main query finishes execution while the subquery is still running in the background (separate execution context).
  2. As part of query completion, shuffle cleanup removes all shuffle files, including those needed by the still-running subquery(while subquery results are also no longer needed as main query has completed, this is a bug in that it doesn't use the bloom filters)
  3. Subquery execution (that had started earlier) fails with FetchFailedException trying to access cleaned-up shuffle data.

This suites verifies only the logical plan for the presence of BloomfilterAggregate and does not the verify if the code indeed used Bllom filter based filtering.

This can be easily reproduced by running this suite. (Its not consistent, and fails based on when the subquery completes. But I am sure atleast one test would fail and cause a ripple and fail subsequent tests since sc gets stopped)

Copy link
Contributor Author

@karuppayya karuppayya Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added loggers to prove and verify that its a bug in this commit

Output

karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 6
karuppayyar: suite run 1 end

karuppayyar: suite run 2 start
karuppayyar: subquery started 25
karuppayyar: subquery ended 24
karuppayyar: query ended 25
karuppayyar: removing shuffle 8,9
karuppayyar: suite run 2 end

karuppayyar: suite run 3 start
17:32:07.521 ERROR org.apache.spark.storage.ShuffleBlockFetcherIterator: Failed to create input stream from local block
java.io.IOException: Error in reading FileSegmentManagedBuffer[file=/private/var/folders/tn/62m7jt2j2b7116x0q6wtzg0c0000gn/T/blockmgr-72dd6798-f43d-48a7-8d4c-0a9c44ba09a9/35/shuffle_8_38_0.data,offset=0,length=5195]
	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:110)

Ideally it should looks like this(ie with adpative disabled) ie main query starts-> subqueries execute and completes-> main query starts executyion and completes

karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: subquery ended 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 7,8
karuppayyar: suite run 1 end

Every subquery should end before query ends.
You can see that subquery execution doesnot complete before the main query ends and therein not using the subquery result.

The side effect of removing shuffle is that when main query completes, it removes the shuffle of subquery(which has not completed and its result is no longer useful) and subquery execution fails with FetchFailure like above when it tries to run to completion. This helped surfacing the issue.

I am not sure if this is the case with all subqueries(looks like that), this could result in correctness issues cc: @dongjoon-hyun too.

@cloud-fan @dongjoon-hyun Do you thinks its a bug(in which case i can attempt a fix) or am i missing something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the subquery result is no longer needed, we can swallow any error from it?

Copy link
Contributor Author

@karuppayya karuppayya Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an issue with Inject Runtime Filters and Adaptive.
The subquery should populate the bloom filter before the actual query runs.
But when adpative is enabled, the query doesnt wait for the subquery results which is the actual issue.
(This is not related to this PR itself, instead a completely different issue IMO. But this PR cannot be merged before the subquery issue is fixed )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't completely follow the conclusion. Spark local mode is not a testing mode as users can run Spark locally as a single node engine to do some real work. Can we fix this issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I'll open a fix later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opend the fix #52606. PTAL. @cloud-fan @karuppayya

Copy link
Contributor Author

@karuppayya karuppayya Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Ngone51. I did a pass of the PR.
I also verified the chnage withe InjectRuntimeFilterSuite and reverted my testdata chnages. (I will retrigger test once the changes are merged)

// shuffle clean up is enabled, causing test failures.
conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments to explain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing
// complicated.
conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeScalarSubqueries.ruleName)
Expand All @@ -213,6 +217,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
protected override def afterAll(): Unit = try {
conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
SQLConf.OPTIMIZER_EXCLUDED_RULES.defaultValueString)
conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, Utils.isTesting)

sql("DROP TABLE IF EXISTS bf1")
sql("DROP TABLE IF EXISTS bf2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ class AdaptiveQueryExecSuite

setupTestData()

protected override def beforeAll(): Unit = {
super.beforeAll()
// Tests depend on intermediate results that would otherwise be cleaned up when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here, AQE is turned on by default now and let's make sure it's compatible with shuffle cleanup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my wording might not have not been clear.
Thge query execution indeed succeeds and there is no correctness issue.
The issue is with how we assert in tests
In org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite#checkNumLocalShuffleReads

  private def checkNumLocalShuffleReads(
      plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
    val numShuffles = collect(plan) {
      case s: ShuffleQueryStageExec => s
    }.length

    val numLocalReads = collect(plan) {
      case read: AQEShuffleReadExec if read.isLocalRead => read
    }
    numLocalReads.foreach { r =>
      val rdd = r.execute()
      val parts = rdd.partitions
      assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
    }
    assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead))
  }

Specifically rdd.preferredLocations(_).nonEmpty) will be empty after the cleanup and the assertion fails.
if shuffle clean up is enabled, i dont think this assertion should pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we disable cleanup only for that test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an assertion method used by all tests.

// shuffle clean up is enabled, causing test failures.
sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

protected override def afterAll(): Unit = {
sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, true)
super.afterAll()
}

private def runAdaptiveAndVerifyResult(query: String,
skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = {
var finalPlanCnt = 0
Expand Down Expand Up @@ -1871,9 +1883,7 @@ class AdaptiveQueryExecSuite
}

withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
// Disabling cleanup as the test assertions depend on them
SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") {
SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
val df = sql(
"""
|SELECT * FROM (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.COMMAND
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.CommandResult
import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution}
import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution}
import org.apache.spark.sql.execution.HiveResult.hiveResultString
import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -67,7 +66,7 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv.
new VariableSubstitution().substitute(command)
}
sparkSession.sparkContext.setJobDescription(substitutorCommand)
val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan)
val execution = sparkSession.sql(command).queryExecution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR does not enable shuffle cleanup in thriftserver yet?

Copy link
Contributor Author

@karuppayya karuppayya Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if i fully understand.
But on why i changed this line
Before: the query execution object was created with the default DonotCleanup, without honoring the shuffle cleanup configs
After: the query execution is created with the DataSet APIs, where the configs are honored when set, giving an opportunity to cleanup generated shuffle after the SQL.
Did i answer your question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, so it's more like a bug fix to make thriftserver to respect this config, but the config is off by default so thriftserver does not do shuffle cleanup by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thats right

// The SQL command has been executed above via `executePlan`, therefore we don't need to
// wrap it again with a new execution ID when getting Hive result.
execution.logical match {
Expand Down