Skip to content

Conversation

karuppayya
Copy link
Contributor

@karuppayya karuppayya commented Sep 3, 2025

What changes were proposed in this pull request?

We have the ability top clean up shuffle in spark.sql.classic.shuffleDependency.fileCleanup.enabled.
Honoring this in Thrift server and cleaning up shuffle.
Related PR comment here

Why are the changes needed?

This is to bring the behavior in par with other modes of sql execution(classic, connect)

Does this PR introduce any user-facing change?

No

How was this patch tested?

NA

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Sep 3, 2025
}
}
Dataset.ofRows(self, plan, tracker)
Dataset.ofRows(self, plan, tracker,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkExecuteStatementOperation's code eventually leads here.

@karuppayya
Copy link
Contributor Author

cc: @cloud-fan

@HyukjinKwon HyukjinKwon changed the title [SPARK-53469] Ability to cleanup shuffle in Thrift server [SPARK-53469][SQL] Ability to cleanup shuffle in Thrift server Sep 3, 2025
@karuppayya
Copy link
Contributor Author

@cloud-fan Can you help review this change

@karuppayya
Copy link
Contributor Author

@cloud-fan Can you please help review this chnage

sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5")
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5")

conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments to explain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


protected override def beforeAll(): Unit = {
super.beforeAll()
sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
sparkSession.sparkContext.setJobDescription(substitutorCommand)
val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan)
val execution = sparkSession.sql(command).queryExecution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this PR does not enable shuffle cleanup in thriftserver yet?

Copy link
Contributor Author

@karuppayya karuppayya Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if i fully understand.
But on why i changed this line
Before: the query execution object was created with the default DonotCleanup, without honoring the shuffle cleanup configs
After: the query execution is created with the DataSet APIs, where the configs are honored when set, giving an opportunity to cleanup generated shuffle after the SQL.
Did i answer your question?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, so it's more like a bug fix to make thriftserver to respect this config, but the config is off by default so thriftserver does not do shuffle cleanup by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thats right

sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5")
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5")

// Tests depend on intermediate results that would otherwise be cleaned up when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a red light to me. Runtime filter is a very powerful optimization and we should make sure the shuffle cleanup won't break it.

Copy link
Contributor Author

@karuppayya karuppayya Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent catch! Thanks @cloud-fan

I don't believe the root cause is with shuffle file cleanup itself, but rather with how Adaptive Query Execution handles subquery execution/synchronization.

  1. During codegen phase, FilterExec looks for subquery results(bloom filter) but doesn't find them (at times), so it skips using the Bloom filter optimization.
    The lazy val gets populated based on the subquery execution result ie null if had not complete, a bloom filter otherwise. This is then later used in codegen
  // The bloom filter created from `bloomFilterExpression`.
  @transient private lazy val bloomFilter = {
    val bytes = bloomFilterExpression.eval().asInstanceOf[Array[Byte]]
    if (bytes == null) null else deserialize(bytes)
  }

  1. The main query finishes execution while the subquery is still running in the background (separate execution context).
  2. As part of query completion, shuffle cleanup removes all shuffle files, including those needed by the still-running subquery(while subquery results are also no longer needed as main query has completed, this is a bug in that it doesn't use the bloom filters)
  3. Subquery execution (that had started earlier) fails with FetchFailedException trying to access cleaned-up shuffle data.

This suites verifies only the logical plan for the presence of BloomfilterAggregate and does not the verify if the code indeed used Bllom filter based filtering.

This can be easily reproduced by running this suite. (Its not consistent, and fails based on when the subquery completes. But I am sure atleast one test would fail and cause a ripple and fail subsequent tests since sc gets stopped)

Copy link
Contributor Author

@karuppayya karuppayya Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added loggers to prove and verify that its a bug in this commit

Output

karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 6
karuppayyar: suite run 1 end

karuppayyar: suite run 2 start
karuppayyar: subquery started 25
karuppayyar: subquery ended 24
karuppayyar: query ended 25
karuppayyar: removing shuffle 8,9
karuppayyar: suite run 2 end

karuppayyar: suite run 3 start
17:32:07.521 ERROR org.apache.spark.storage.ShuffleBlockFetcherIterator: Failed to create input stream from local block
java.io.IOException: Error in reading FileSegmentManagedBuffer[file=/private/var/folders/tn/62m7jt2j2b7116x0q6wtzg0c0000gn/T/blockmgr-72dd6798-f43d-48a7-8d4c-0a9c44ba09a9/35/shuffle_8_38_0.data,offset=0,length=5195]
	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:110)

Ideally it should looks like this(ie with adpative disabled) ie main query starts-> subqueries execute and completes-> main query starts executyion and completes

karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: subquery ended 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 7,8
karuppayyar: suite run 1 end

Every subquery should end before query ends.
You can see that subquery execution doesnot complete before the main query ends and therein not using the subquery result.

The side effect of removing shuffle is that when main query completes, it removes the shuffle of subquery(which has not completed and its result is no longer useful) and subquery execution fails with FetchFailure like above when it tries to run to completion. This helped surfacing the issue.

I am not sure if this is the case with all subqueries(looks like that), this could result in correctness issues cc: @dongjoon-hyun too.

@cloud-fan @dongjoon-hyun Do you thinks its a bug(in which case i can attempt a fix) or am i missing something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the subquery result is no longer needed, we can swallow any error from it?

Copy link
Contributor Author

@karuppayya karuppayya Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an issue with Inject Runtime Filters and Adaptive.
The subquery should populate the bloom filter before the actual query runs.
But when adpative is enabled, the query doesnt wait for the subquery results which is the actual issue.
(This is not related to this PR itself, instead a completely different issue IMO. But this PR cannot be merged before the subquery issue is fixed )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't completely follow the conclusion. Spark local mode is not a testing mode as users can run Spark locally as a single node engine to do some real work. Can we fix this issue?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. I'll open a fix later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opend the fix #52606. PTAL. @cloud-fan @karuppayya

Copy link
Contributor Author

@karuppayya karuppayya Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Ngone51. I did a pass of the PR.
I also verified the chnage withe InjectRuntimeFilterSuite and reverted my testdata chnages. (I will retrigger test once the changes are merged)


protected override def beforeAll(): Unit = {
super.beforeAll()
// Tests depend on intermediate results that would otherwise be cleaned up when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here, AQE is turned on by default now and let's make sure it's compatible with shuffle cleanup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my wording might not have not been clear.
Thge query execution indeed succeeds and there is no correctness issue.
The issue is with how we assert in tests
In org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite#checkNumLocalShuffleReads

  private def checkNumLocalShuffleReads(
      plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
    val numShuffles = collect(plan) {
      case s: ShuffleQueryStageExec => s
    }.length

    val numLocalReads = collect(plan) {
      case read: AQEShuffleReadExec if read.isLocalRead => read
    }
    numLocalReads.foreach { r =>
      val rdd = r.execute()
      val parts = rdd.partitions
      assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
    }
    assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead))
  }

Specifically rdd.preferredLocations(_).nonEmpty) will be empty after the cleanup and the assertion fails.
if shuffle clean up is enabled, i dont think this assertion should pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we disable cleanup only for that test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an assertion method used by all tests.

@karuppayya karuppayya force-pushed the SPARK-53469 branch 4 times, most recently from a4d9ac6 to 687e70a Compare September 30, 2025 02:31
@karuppayya
Copy link
Contributor Author

@cloud-fan can you please take a look.

@karuppayya
Copy link
Contributor Author

cc: @somani (long time!) since it touches the Runtime filter tests.
tl; dr: Fixing the test data to return atlease one row to get all the tests run. Even without it queries run, but subquery runs behind the scenes, even after main query completes.

@cloud-fan
Copy link
Contributor

just back from the holiday. @karuppayya can you take a look at #52213 (comment) ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants