Skip to content

Commit 39d3cf7

Browse files
authored
[serve][P0] fix await deployment response hanging issue (#52561)
## Why are these changes needed? If the user passes a `DeploymentResponse` object directly into a deployment handle, we will resolve it to the object ref under the hood before scheduling the request. But we were incorrectly using a threading.Lock when getting the object ref so this can cause deadlock if the same `DeploymentResponse` object is used for multiple downstream calls. Instead we should be using an asyncio lock. ## Related issue number closes #51190 --------- Signed-off-by: Cindy Zhang <cindyzyx9@gmail.com>
1 parent f402339 commit 39d3cf7

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

python/ray/serve/_private/replica_result.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import inspect
23
import threading
34
import time
@@ -62,6 +63,7 @@ def __init__(
6263
self._is_streaming: bool = metadata.is_streaming
6364
self._request_id: str = metadata.request_id
6465
self._object_ref_or_gen_sync_lock = threading.Lock()
66+
self._lazy_object_ref_or_gen_asyncio_lock = None
6567

6668
if isinstance(obj_ref_or_gen, ray.ObjectRefGenerator):
6769
self._obj_ref_gen = obj_ref_or_gen
@@ -73,6 +75,14 @@ def __init__(
7375
self._obj_ref_gen is not None
7476
), "An ObjectRefGenerator must be passed for streaming requests."
7577

78+
@property
79+
def _object_ref_or_gen_asyncio_lock(self) -> asyncio.Lock:
80+
"""Lazy `asyncio.Lock` object."""
81+
if self._lazy_object_ref_or_gen_asyncio_lock is None:
82+
self._lazy_object_ref_or_gen_asyncio_lock = asyncio.Lock()
83+
84+
return self._lazy_object_ref_or_gen_asyncio_lock
85+
7686
def _process_response(f: Union[Callable, Coroutine]):
7787
@wraps(f)
7888
def wrapper(self, *args, **kwargs):
@@ -174,7 +184,7 @@ async def to_object_ref_async(self) -> ray.ObjectRef:
174184
# object ref cached in order to avoid calling `__anext__()` to
175185
# resolve to the underlying object ref more than once.
176186
# See: https://github.com/ray-project/ray/issues/43879.
177-
with self._object_ref_or_gen_sync_lock:
187+
async with self._object_ref_or_gen_asyncio_lock:
178188
if self._obj_ref is None:
179189
self._obj_ref = await self._obj_ref_gen.__anext__()
180190

python/ray/serve/tests/test_handle_1.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import concurrent.futures
33
import sys
44
import threading
5+
from typing import Any
56

67
import pytest
78

@@ -311,5 +312,32 @@ def f():
311312
h._init(_prefer_local_routing=True)
312313

313314

315+
def test_response_used_in_multiple_calls(serve_instance):
316+
@serve.deployment(graceful_shutdown_timeout_s=0)
317+
class F:
318+
async def __call__(self, sleep_amt: int, x: Any):
319+
await asyncio.sleep(sleep_amt)
320+
return f"({x})"
321+
322+
@serve.deployment(graceful_shutdown_timeout_s=0)
323+
class Ingress:
324+
async def __init__(self, h):
325+
self.h = h
326+
327+
async def __call__(self):
328+
# r1 will take 5 seconds to finish. This makes sure when h.remote() is
329+
# started for r2 and r3 (and both rely on r1), r1 is still executing.
330+
r1 = self.h.remote(5, "r1")
331+
332+
# Neither of these should get stuck.
333+
r2 = self.h.remote(0, r1)
334+
r3 = self.h.remote(0, r1)
335+
336+
return await r2, await r3
337+
338+
h = serve.run(Ingress.bind(F.bind()))
339+
assert h.remote().result(timeout_s=10) == ("((r1))", "((r1))")
340+
341+
314342
if __name__ == "__main__":
315343
sys.exit(pytest.main(["-v", "-s", __file__]))

0 commit comments

Comments
 (0)