Skip to content

Commit d2cc107

Browse files
khakhlyukHyukjinKwon
authored andcommitted
[MINOR][CONNECT][PYTHON] Fix a race condition in ExecutePlanResponseReattachableIterator.shutdown
### What changes were proposed in this pull request? The lock that keeps the ThreadPoolExecutor is reentrant, so the same thread can access it multiple times. At the same time del finalizer can be called from the same thread while the thread pool is being shutdown. In this PR we first set the thread pool instance to none and then call shutdown. ### Why are the changes needed? We have seen race conditions in multi-threading scenarios: ``` File "/app/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py", line 352, in __del_. return self.close () File "/app/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py" , line 348, in close self._release_all() File "/app/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/reattach.py" ', line 258, in _release_all self._release_thread_pool.submit(target) File "/usr/local/lib/python3. 12/concurrent/futures/thread.py™, line 171, in submit raise RuntimeError(' cannot schedule new futures after shutdown' ) RuntimeError: cannot schedule new futures after shutdown ``` This PR fixes the race condition. The problem was introduced in spark 4.0. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #51369 from khakhlyuk/fix-race-condition. Authored-by: Alex Khakhlyuk <alex.khakhlyuk@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 1ffbfbe commit d2cc107

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

python/pyspark/sql/connect/client/reattach.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ def shutdown(cls: Type["ExecutePlanResponseReattachableIterator"]) -> None:
7979
"""
8080
with cls._lock:
8181
if cls._release_thread_pool_instance is not None:
82-
cls._get_or_create_release_thread_pool().shutdown()
82+
thread_pool = cls._release_thread_pool_instance
8383
cls._release_thread_pool_instance = None
84+
thread_pool.shutdown()
8485

8586
def __init__(
8687
self,

0 commit comments

Comments
 (0)