Skip to content

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Oct 14, 2025

What changes were proposed in this pull request?

This PR fixes a bug where MapOutputTrackerMaster.shuffleStatuses is mistakenly cleaned up by Shuffle Cleanup feature in local cluster. The fix is done by avoid invoking mapOutputTracker.unregisterShuffle() in BlockManagerStorageEndpoint when mapOutputTracker is MapOutputTrackerMaster as it only happens in local cluster (non-local cluster should use MapOutputTrackerWorker instead).

Why are the changes needed?

MapOutputTrackerMaster.shuffleStatuses should only be cleaned when ContextCleaner considers the shuffle is no longer referenced anywhere. Otherwise, any subsequent access (which still reference that shuffle) to the same shuffle metadata in MapOutputTrackerMaster can lead to SparkException and crash the SparkContext. Note this currently only happens in local cluster due to both driver and executor use the MapOutputTrackerMaster. E.g., an ongoing subquery could access the same shuffle metadata which could have been removed after the main query completes. See the detailed discussion at #52213 (comment).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Updated the existing tests.

Was this patch authored or co-authored using generative AI tooling?

No.

case _ =>
}
// Shuffle cleanup should not clean up shuffle metadata on the driver
assert(mapOutputTrackerMaster.shuffleStatuses.nonEmpty)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests fails this assert before the fix.

// through `ContextCleaner` when the shuffle is considered no longer referenced anywhere.
// Otherwise, we might hit exceptions if there is any subsequent access (which still
// reference that shuffle) to that shuffle metadata in `MapOutputTrackerMaster`. E.g.,
// an ongoing subquery could access the same shuffle metadata which could have been
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we should mention this, since the ideal behavior would be to terminate any subqueries when the main query completes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be a race even if we terminating the subquery?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main query has ended already. The subqueries results are anyway not going to be used (and is also a waste of resource in allowing it to continue).

Copy link
Member Author

@Ngone51 Ngone51 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we should cancel the running subquery and we should do it. My point is the running subquery could still access MapOutputTrackerMaster even if we cancel it right after the main query ends due to the race between them. So I think it's fine to mention it here.

case RemoveShuffle(shuffleId) =>
doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", context) {
if (mapOutputTracker != null) {
if (mapOutputTracker != null && !mapOutputTracker.isInstanceOf[MapOutputTrackerMaster]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in check be in unregistershuffle if we dont expect this to be called in master.
There can be someone else calling this same method on local mode in future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that. That way requires us to pass isLocal into MapOutputTrackerMaster, involves more changes. But I also agree it's safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with local mode, we can't clean up shuffle files only?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think shuffle cleanup still happens in local.
But the shuffle metadata cleanup only happens from ContextCleaner in driver.

@karuppayya
Copy link
Contributor

cc: @cloud-fan

@Ngone51
Copy link
Member Author

Ngone51 commented Oct 15, 2025

cc @jiangxb1987 @bozhang2820

// an ongoing subquery could access the same shuffle metadata which could have been
// cleaned up after the main query completes. Note this currently only happens in local
// cluster where both driver and executor use the `MapOutputTrackerMaster`.
mapOutputTracker.unregisterShuffle(shuffleId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for non-local cluster, mapOutputTracker is at executors and unregisterShuffle only unregister the shuffle at executor side?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants