Skip to content

Commit 915c657

Browse files
abrarsheikhzhaoch23
authored andcommitted
propogate cancellation to router async task from cuncurrent future (ray-project#52591)
- **Deployment handle**: Cancel `replica_results_future` upfront to prevent race conditions when checking for cancellation before calling `cancel()`. - **Router**: Deconstruct `run_coroutine_threadsafe` to access the underlying async task, enabling proactive cancellation when the concurrent `Future` is cancelled but the async task continues to run (e.g., due to lack of `await`). In the callback, explicitly fetch and cancel the `replica_result` from the async task. More context in ray-project#52533 ## How do i know this works ran the following repro script ```python import re import time import asyncio import requests from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from ray import serve import shutil import glob import os import ray ray.init(logging_level="debug", dashboard_host="0.0.0.0") LOG_DIR = "/tmp/ray/session_latest/logs" GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200") CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica") ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt") NUM_PARALLEL_REQUESTS = 250 NUM_RUNS = 100 # <--- Change this to control how many times to run my_func() def my_func(): @serve.deployment(max_ongoing_requests=10) class Parent: async def __call__(self) -> int: return await asyncio.sleep(5) app = Parent.bind() serve.run(app) time.sleep(1.0) def get_and_cancel(x: int) -> None: try: _ = requests.get("http://localhost:8000/", timeout=2) except Exception: pass with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc: list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS))) # Let Serve finish processing and flush logs time.sleep(10) def extract_request_ids(pattern): request_ids = set() for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True): if os.path.isfile(path): with open(path, 'r', errors='ignore') as f: for line in f: match = pattern.search(line) if match: request_ids.add(match.group(1)) return request_ids def main(): print("Cleaning logs directory...") shutil.rmtree(LOG_DIR, ignore_errors=True) print(f"Running my_func() {NUM_RUNS} times...") for i in range(NUM_RUNS): print(f"Run {i+1}/{NUM_RUNS}") my_func() print("Finished running my_func(). Analyzing logs...") request_ids_completed_200 = extract_request_ids(GET_LINE_RE) request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE) request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE) print("\n\nSummary:\n") print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}") print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in ray-project#52628") print(request_ids_cancelled & request_ids_completed_200) missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled if missing_cancellation_log: print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons") print(missing_cancellation_log) print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)") print(request_ids_completed_despite_cancel) if __name__ == "__main__": main() ``` ```bash Summary: ⚠️ Total requests completed with 200(goal is to have 0): 16 ⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in ray-project#52628 {'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'} ✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(ray-project#52591) {'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'} ``` Including unit tests. This PR does not fix the other issue mentioned in ray-project#52533 where `ray.cancel` does not actually cancel the replica actor task. That will be a follow up investigation+fix --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: zhaoch23 <c233zhao@uwaterloo.ca>
1 parent 3d26408 commit 915c657

File tree

6 files changed

+402
-9
lines changed

6 files changed

+402
-9
lines changed

python/ray/serve/_private/replica_scheduler/replica_wrapper.py

+8
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,16 @@
1111
ReplicaQueueLengthInfo,
1212
RunningReplicaInfo,
1313
)
14+
from ray.serve._private.constants import SERVE_LOGGER_NAME
1415
from ray.serve._private.replica_result import ActorReplicaResult, ReplicaResult
1516
from ray.serve._private.replica_scheduler.common import PendingRequest
1617
from ray.serve._private.utils import JavaActorHandleProxy
1718
from ray.serve.generated.serve_pb2 import RequestMetadata as RequestMetadataProto
1819

20+
import logging
21+
22+
logger = logging.getLogger(SERVE_LOGGER_NAME)
23+
1924

2025
class ReplicaWrapper(ABC):
2126
"""This is used to abstract away details of the transport layer
@@ -105,6 +110,9 @@ async def send_request_python(
105110
return ActorReplicaResult(obj_ref_gen, pr.metadata), queue_len_info
106111
except asyncio.CancelledError as e:
107112
# HTTP client disconnected or request was explicitly canceled.
113+
logger.info(
114+
"Cancelling request that has already been assigned to a replica."
115+
)
108116
ray.cancel(obj_ref_gen)
109117
raise e from None
110118

python/ray/serve/_private/router.py

+49-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@
3434
from ray.serve._private.metrics_utils import InMemoryMetricsStore, MetricsPusher
3535
from ray.serve._private.replica_result import ReplicaResult
3636
from ray.serve._private.replica_scheduler import PendingRequest, ReplicaScheduler
37-
from ray.serve._private.utils import generate_request_id, resolve_deployment_response
37+
from ray.serve._private.utils import (
38+
generate_request_id,
39+
resolve_deployment_response,
40+
run_coroutine_or_future_threadsafe,
41+
)
3842
from ray.serve.config import AutoscalingConfig
3943
from ray.serve.exceptions import BackPressureError, DeploymentUnavailableError
4044
from ray.util import metrics
@@ -695,12 +699,54 @@ def assign_request(
695699
*request_args,
696700
**request_kwargs,
697701
) -> concurrent.futures.Future[ReplicaResult]:
698-
return asyncio.run_coroutine_threadsafe(
702+
"""Schedules assign_request call on the internal asyncio loop.
703+
704+
This method uses `run_coroutine_threadsafe` to execute the actual request
705+
assignment logic (`_asyncio_router.assign_request`) on the dedicated
706+
asyncio event loop thread. It returns a `concurrent.futures.Future` that
707+
can be awaited or queried from the calling thread.
708+
709+
Returns:
710+
A concurrent.futures.Future resolving to the ReplicaResult representing
711+
the assigned request.
712+
"""
713+
714+
def asyncio_future_callback(
715+
asyncio_future: asyncio.Future, concurrent_future: concurrent.futures.Future
716+
):
717+
"""Callback attached to the asyncio Task running assign_request.
718+
719+
This runs when the asyncio Task finishes (completes, fails, or is cancelled).
720+
Its primary goal is to propagate cancellation initiated via the
721+
`concurrent_future` back to the `ReplicaResult` in situations where
722+
asyncio_future didn't see the cancellation event in time. Think of it
723+
like a second line of defense for cancellation of replica results.
724+
"""
725+
# Check if the cancellation originated from the concurrent.futures.Future
726+
if (
727+
concurrent_future.cancelled()
728+
and not asyncio_future.cancelled()
729+
and asyncio_future.exception() is None
730+
):
731+
result: ReplicaResult = asyncio_future.result()
732+
logger.info(
733+
"Asyncio task completed despite cancellation attempt. "
734+
"Attempting to cancel the request that was assigned to a replica."
735+
)
736+
result.cancel()
737+
738+
task = self._asyncio_loop.create_task(
699739
self._asyncio_router.assign_request(
700740
request_meta, *request_args, **request_kwargs
701-
),
741+
)
742+
)
743+
# Schedule the actual request assignment coroutine on the asyncio loop thread.
744+
concurrent_future = run_coroutine_or_future_threadsafe(
745+
task,
702746
loop=self._asyncio_loop,
703747
)
748+
task.add_done_callback(lambda _: asyncio_future_callback(_, concurrent_future))
749+
return concurrent_future
704750

705751
def shutdown(self) -> concurrent.futures.Future:
706752
return asyncio.run_coroutine_threadsafe(

python/ray/serve/_private/utils.py

+33
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
from ray.serve.config import gRPCOptions
1717
import requests
18+
from asyncio import ensure_future, futures, coroutines
19+
import concurrent.futures
1820

1921
import ray
2022
import ray.util.serialization_addons
@@ -623,3 +625,34 @@ def wait_for_interrupt() -> None:
623625

624626
def is_grpc_enabled(grpc_config: gRPCOptions) -> bool:
625627
return grpc_config.port > 0 and len(grpc_config.grpc_servicer_functions) > 0
628+
629+
630+
def run_coroutine_or_future_threadsafe(coro_or_future, loop):
631+
"""Submit a coroutine object or future to a given event loop.
632+
633+
Ref: https://github.com/python/cpython/blob/eef49c359505eaf109d519d39e53dfd3c78d066a/Lib/asyncio/tasks.py#L991
634+
635+
Return a concurrent.futures.Future to access the result.
636+
"""
637+
if not coroutines.iscoroutine(coro_or_future) and not futures.isfuture(
638+
coro_or_future
639+
):
640+
raise TypeError("A coroutine object or future is required")
641+
642+
if futures.isfuture(coro_or_future):
643+
assert loop == coro_or_future.get_loop()
644+
645+
future = concurrent.futures.Future()
646+
647+
def callback():
648+
try:
649+
futures._chain_future(ensure_future(coro_or_future, loop=loop), future)
650+
except (SystemExit, KeyboardInterrupt):
651+
raise
652+
except BaseException as exc:
653+
if future.set_running_or_notify_cancel():
654+
future.set_exception(exc)
655+
raise
656+
657+
loop.call_soon_threadsafe(callback)
658+
return future

python/ray/serve/handle.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -315,11 +315,16 @@ def cancel(self):
315315
return
316316

317317
self._cancelled = True
318-
if not self._replica_result_future.done():
319-
self._replica_result_future.cancel()
320-
elif self._replica_result_future.exception() is None:
318+
self._replica_result_future.cancel()
319+
try:
320+
# try to fetch the results synchronously. if it succeeds,
321+
# we will explicitly cancel the replica result. if it fails,
322+
# the request is already cancelled and we can return early.
321323
self._fetch_future_result_sync()
322-
self._replica_result.cancel()
324+
except RequestCancelledError:
325+
# request is already cancelled nothing to do here
326+
return
327+
self._replica_result.cancel()
323328

324329
@DeveloperAPI
325330
def cancelled(self) -> bool:

0 commit comments

Comments
 (0)