-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53469][SQL] Ability to cleanup shuffle in Thrift server #52213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
} | ||
} | ||
Dataset.ofRows(self, plan, tracker) | ||
Dataset.ofRows(self, plan, tracker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkExecuteStatementOperation's code eventually leads here.
cc: @cloud-fan |
@cloud-fan Can you help review this change |
@cloud-fan Can you please help review this chnage |
sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5") | ||
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5") | ||
|
||
conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some comments to explain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
protected override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
sparkSession.sparkContext.setJobDescription(substitutorCommand) | ||
val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan) | ||
val execution = sparkSession.sql(command).queryExecution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this PR does not enable shuffle cleanup in thriftserver yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if i fully understand.
But on why i changed this line
Before: the query execution object was created with the default DonotCleanup
, without honoring the shuffle cleanup configs
After: the query execution is created with the DataSet APIs, where the configs are honored when set, giving an opportunity to cleanup generated shuffle after the SQL.
Did i answer your question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, so it's more like a bug fix to make thriftserver to respect this config, but the config is off by default so thriftserver does not do shuffle cleanup by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thats right
2c6849a
to
2841ec3
Compare
sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5") | ||
sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5") | ||
|
||
// Tests depend on intermediate results that would otherwise be cleaned up when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a red light to me. Runtime filter is a very powerful optimization and we should make sure the shuffle cleanup won't break it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent catch! Thanks @cloud-fan
I don't believe the root cause is with shuffle file cleanup itself, but rather with how Adaptive Query Execution handles subquery execution/synchronization.
- During codegen phase, FilterExec looks for subquery results(bloom filter) but doesn't find them (at times), so it skips using the Bloom filter optimization.
The lazy val gets populated based on the subquery execution result ie null if had not complete, a bloom filter otherwise. This is then later used in codegen
// The bloom filter created from `bloomFilterExpression`.
@transient private lazy val bloomFilter = {
val bytes = bloomFilterExpression.eval().asInstanceOf[Array[Byte]]
if (bytes == null) null else deserialize(bytes)
}
- The main query finishes execution while the subquery is still running in the background (separate execution context).
- As part of query completion, shuffle cleanup removes all shuffle files, including those needed by the still-running subquery(while subquery results are also no longer needed as main query has completed, this is a bug in that it doesn't use the bloom filters)
- Subquery execution (that had started earlier) fails with FetchFailedException trying to access cleaned-up shuffle data.
This suites verifies only the logical plan for the presence of BloomfilterAggregate and does not the verify if the code indeed used Bllom filter based filtering.
This can be easily reproduced by running this suite. (Its not consistent, and fails based on when the subquery completes. But I am sure atleast one test would fail and cause a ripple and fail subsequent tests since sc gets stopped)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added loggers to prove and verify that its a bug in this commit
Output
karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 6
karuppayyar: suite run 1 end
karuppayyar: suite run 2 start
karuppayyar: subquery started 25
karuppayyar: subquery ended 24
karuppayyar: query ended 25
karuppayyar: removing shuffle 8,9
karuppayyar: suite run 2 end
karuppayyar: suite run 3 start
17:32:07.521 ERROR org.apache.spark.storage.ShuffleBlockFetcherIterator: Failed to create input stream from local block
java.io.IOException: Error in reading FileSegmentManagedBuffer[file=/private/var/folders/tn/62m7jt2j2b7116x0q6wtzg0c0000gn/T/blockmgr-72dd6798-f43d-48a7-8d4c-0a9c44ba09a9/35/shuffle_8_38_0.data,offset=0,length=5195]
at org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:110)
Ideally it should looks like this(ie with adpative disabled) ie main query starts-> subqueries execute and completes-> main query starts executyion and completes
karuppayyar: suite run 1 start
karuppayyar: subquery started 24
karuppayyar: subquery ended 24
karuppayyar: query ended 24
karuppayyar: removing shuffle 7,8
karuppayyar: suite run 1 end
Every subquery should end before query ends.
You can see that subquery execution doesnot complete before the main query ends and therein not using the subquery result.
The side effect of removing shuffle is that when main query completes, it removes the shuffle of subquery(which has not completed and its result is no longer useful) and subquery execution fails with FetchFailure like above when it tries to run to completion. This helped surfacing the issue.
I am not sure if this is the case with all subqueries(looks like that), this could result in correctness issues cc: @dongjoon-hyun too.
@cloud-fan @dongjoon-hyun Do you thinks its a bug(in which case i can attempt a fix) or am i missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the subquery result is no longer needed, we can swallow any error from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is an issue with Inject Runtime Filters and Adaptive.
The subquery should populate the bloom filter before the actual query runs.
But when adpative is enabled, the query doesnt wait for the subquery results which is the actual issue.
(This is not related to this PR itself, instead a completely different issue IMO. But this PR cannot be merged before the subquery issue is fixed )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't completely follow the conclusion. Spark local mode is not a testing mode as users can run Spark locally as a single node engine to do some real work. Can we fix this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. I'll open a fix later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opend the fix #52606. PTAL. @cloud-fan @karuppayya
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
protected override def beforeAll(): Unit = { | ||
super.beforeAll() | ||
// Tests depend on intermediate results that would otherwise be cleaned up when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto here, AQE is turned on by default now and let's make sure it's compatible with shuffle cleanup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my wording might not have not been clear.
Thge query execution indeed succeeds and there is no correctness issue.
The issue is with how we assert in tests
In org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite#checkNumLocalShuffleReads
private def checkNumLocalShuffleReads(
plan: SparkPlan, numShufflesWithoutLocalRead: Int = 0): Unit = {
val numShuffles = collect(plan) {
case s: ShuffleQueryStageExec => s
}.length
val numLocalReads = collect(plan) {
case read: AQEShuffleReadExec if read.isLocalRead => read
}
numLocalReads.foreach { r =>
val rdd = r.execute()
val parts = rdd.partitions
assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
}
assert(numShuffles === (numLocalReads.length + numShufflesWithoutLocalRead))
}
Specifically rdd.preferredLocations(_).nonEmpty)
will be empty after the cleanup and the assertion fails.
if shuffle clean up is enabled, i dont think this assertion should pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we disable cleanup only for that test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an assertion method used by all tests.
a4d9ac6
to
687e70a
Compare
687e70a
to
23176ed
Compare
@cloud-fan can you please take a look. |
cc: @somani (long time!) since it touches the Runtime filter tests. |
just back from the holiday. @karuppayya can you take a look at #52213 (comment) ? |
What changes were proposed in this pull request?
We have the ability top clean up shuffle in
spark.sql.classic.shuffleDependency.fileCleanup.enabled
.Honoring this in Thrift server and cleaning up shuffle.
Related PR comment here
Why are the changes needed?
This is to bring the behavior in par with other modes of sql execution(classic, connect)
Does this PR introduce any user-facing change?
No
How was this patch tested?
NA
Was this patch authored or co-authored using generative AI tooling?
No