-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53898][CORE] Shuffle cleanup should not clean MapOutputTrackerMaster.shuffleStatuses in local cluster #52606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ package org.apache.spark.storage | |
|
||
import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} | ||
|
||
import org.apache.spark.{MapOutputTracker, SparkEnv} | ||
import org.apache.spark.{MapOutputTracker, MapOutputTrackerMaster, SparkEnv} | ||
import org.apache.spark.internal.{Logging, MessageWithContext} | ||
import org.apache.spark.internal.LogKeys.{BLOCK_ID, BROADCAST_ID, RDD_ID, SHUFFLE_ID} | ||
import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv} | ||
|
@@ -57,7 +57,14 @@ class BlockManagerStorageEndpoint( | |
|
||
case RemoveShuffle(shuffleId) => | ||
doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", context) { | ||
if (mapOutputTracker != null) { | ||
if (mapOutputTracker != null && !mapOutputTracker.isInstanceOf[MapOutputTrackerMaster]) { | ||
// SPARK-53898: `MapOutputTrackerMaster.unregisterShuffle()` should only be called | ||
// through `ContextCleaner` when the shuffle is considered no longer referenced anywhere. | ||
// Otherwise, we might hit exceptions if there is any subsequent access (which still | ||
// reference that shuffle) to that shuffle metadata in `MapOutputTrackerMaster`. E.g., | ||
// an ongoing subquery could access the same shuffle metadata which could have been | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if we should mention this, since the ideal behavior would be to terminate any subqueries when the main query completes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There could be a race even if we terminating the subquery? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main query has ended already. The subqueries results are anyway not going to be used (and is also a waste of resource in allowing it to continue). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand we should cancel the running subquery and we should do it. My point is the running subquery could still access |
||
// cleaned up after the main query completes. Note this currently only happens in local | ||
// cluster where both driver and executor use the `MapOutputTrackerMaster`. | ||
mapOutputTracker.unregisterShuffle(shuffleId) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so for non-local cluster, |
||
} | ||
val shuffleManager = SparkEnv.get.shuffleManager | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import scala.collection.mutable | |
import scala.io.Source | ||
import scala.util.Try | ||
|
||
import org.apache.spark.MapOutputTrackerMaster | ||
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, FastOperator, SaveMode} | ||
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, QueryPlanningTrackerCallback, TableIdentifier} | ||
import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, UnresolvedFunction, UnresolvedRelation} | ||
|
@@ -320,11 +321,15 @@ class QueryExecutionSuite extends SharedSparkSession { | |
|
||
private def cleanupShuffles(): Unit = { | ||
val blockManager = spark.sparkContext.env.blockManager | ||
val mapOutputTrackerMaster = | ||
spark.sparkContext.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] | ||
blockManager.diskBlockManager.getAllBlocks().foreach { | ||
case ShuffleIndexBlockId(shuffleId, _, _) => | ||
spark.sparkContext.shuffleDriverComponents.removeShuffle(shuffleId, true) | ||
case _ => | ||
} | ||
// Shuffle cleanup should not clean up shuffle metadata on the driver | ||
assert(mapOutputTrackerMaster.shuffleStatuses.nonEmpty) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tests fails this assert before the fix. |
||
} | ||
|
||
test("SPARK-53413: Cleanup shuffle dependencies for commands") { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in check be in
unregistershuffle
if we dont expect this to be called in master.There can be someone else calling this same method on local mode in future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about that. That way requires us to pass
isLocal
intoMapOutputTrackerMaster
, involves more changes. But I also agree it's safer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with local mode, we can't clean up shuffle files only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think shuffle cleanup still happens in local.
But the shuffle metadata cleanup only happens from
ContextCleaner
in driver.