-
Notifications
You must be signed in to change notification settings - Fork 6.3k
[core] [serve] ray.cancel on async actor task not cancellation the ray task #52628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@abrarsheikh have you confirmed that on the receiving side, the actor task is not being canceled? From the writeup it seems that the level of diagnosis is: we call This could be an issue in the core asyncio code that doesn't properly send the cancellation signal, or it could be something in the serve replica code that doesn't properly handle it. Have you narrowed it down further? |
…52591) - **Deployment handle**: Cancel `replica_results_future` upfront to prevent race conditions when checking for cancellation before calling `cancel()`. - **Router**: Deconstruct `run_coroutine_threadsafe` to access the underlying async task, enabling proactive cancellation when the concurrent `Future` is cancelled but the async task continues to run (e.g., due to lack of `await`). In the callback, explicitly fetch and cancel the `replica_result` from the async task. More context in #52533 ## How do i know this works ran the following repro script ```python import re import time import asyncio import requests from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from ray import serve import shutil import glob import os import ray ray.init(logging_level="debug", dashboard_host="0.0.0.0") LOG_DIR = "/tmp/ray/session_latest/logs" GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200") CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica") ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt") NUM_PARALLEL_REQUESTS = 250 NUM_RUNS = 100 # <--- Change this to control how many times to run my_func() def my_func(): @serve.deployment(max_ongoing_requests=10) class Parent: async def __call__(self) -> int: return await asyncio.sleep(5) app = Parent.bind() serve.run(app) time.sleep(1.0) def get_and_cancel(x: int) -> None: try: _ = requests.get("http://localhost:8000/", timeout=2) except Exception: pass with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc: list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS))) # Let Serve finish processing and flush logs time.sleep(10) def extract_request_ids(pattern): request_ids = set() for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True): if os.path.isfile(path): with open(path, 'r', errors='ignore') as f: for line in f: match = pattern.search(line) if match: request_ids.add(match.group(1)) return request_ids def main(): print("Cleaning logs directory...") shutil.rmtree(LOG_DIR, ignore_errors=True) print(f"Running my_func() {NUM_RUNS} times...") for i in range(NUM_RUNS): print(f"Run {i+1}/{NUM_RUNS}") my_func() print("Finished running my_func(). Analyzing logs...") request_ids_completed_200 = extract_request_ids(GET_LINE_RE) request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE) request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE) print("\n\nSummary:\n") print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}") print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in #52628") print(request_ids_cancelled & request_ids_completed_200) missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled if missing_cancellation_log: print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons") print(missing_cancellation_log) print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(#52591)") print(request_ids_completed_despite_cancel) if __name__ == "__main__": main() ``` ```bash Summary:⚠️ Total requests completed with 200(goal is to have 0): 16⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in #52628 {'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'} ✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(#52591) {'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'} ``` Including unit tests. This PR does not fix the other issue mentioned in #52533 where `ray.cancel` does not actually cancel the replica actor task. That will be a follow up investigation+fix --------- Signed-off-by: abrar <abrar@anyscale.com>
I tried adding logs at multiple places in the serve replica receiver code, but I am not seeing cancellation show up anywhere. One other observation I had was that the cancellation request (ray.cancel) happens before any code in the task has run, not sure what the implication of that is, but I thought I would mention it. Here is a small repro script that maybe isolates the problem to the core import asyncio
import concurrent.futures
import ray
@ray.remote(num_cpus=.1)
class Worker:
async def long_running_task(self, task_id):
await asyncio.sleep(15)
return task_id
async def main():
worker = Worker.remote()
NUM_TASKS = 5000
# Launch more tasks than available CPUs to cause queuing
# Assume num_cpus = 2, launch 5 tasks
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_TASKS) as executor:
futures = [executor.submit(worker.long_running_task.remote, i) for i in range(NUM_TASKS)]
refs = [future.result() for future in concurrent.futures.as_completed(futures)]
# Immediately cancel one of the later tasks before it has a chance to run
tasks_to_cancel = []
for i in range(NUM_TASKS - 1, -1, -1):
tasks_to_cancel.append(i)
ray.cancel(refs[i])
tasks_cancelled = []
tasks_completed = []
# Gather results, catching any cancellation
for task_to_cancel in range(NUM_TASKS-1, -1, -1):
try:
result = await refs[task_to_cancel]
tasks_completed.append(task_to_cancel)
# print(f"Result: {result}")
except ray.exceptions.TaskCancelledError:
tasks_cancelled.append(task_to_cancel)
print(f"Total tasks enqueued: {NUM_TASKS}")
print(f"Total tasks completed: {len(tasks_completed)}")
print(f"Total tasks cancelled: {len(tasks_cancelled)}")
assert set(tasks_cancelled) == set(tasks_to_cancel), f"Task that were not cancelled: {set(tasks_to_cancel) - set(tasks_cancelled)}"
asyncio.run(main()) I had to run it a few times and increase I suspect this is similar to what's happening in serve, but not 100% sure. |
…ay-project#52591) - **Deployment handle**: Cancel `replica_results_future` upfront to prevent race conditions when checking for cancellation before calling `cancel()`. - **Router**: Deconstruct `run_coroutine_threadsafe` to access the underlying async task, enabling proactive cancellation when the concurrent `Future` is cancelled but the async task continues to run (e.g., due to lack of `await`). In the callback, explicitly fetch and cancel the `replica_result` from the async task. More context in ray-project#52533 ## How do i know this works ran the following repro script ```python import re import time import asyncio import requests from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from ray import serve import shutil import glob import os import ray ray.init(logging_level="debug", dashboard_host="0.0.0.0") LOG_DIR = "/tmp/ray/session_latest/logs" GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200") CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica") ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt") NUM_PARALLEL_REQUESTS = 250 NUM_RUNS = 100 # <--- Change this to control how many times to run my_func() def my_func(): @serve.deployment(max_ongoing_requests=10) class Parent: async def __call__(self) -> int: return await asyncio.sleep(5) app = Parent.bind() serve.run(app) time.sleep(1.0) def get_and_cancel(x: int) -> None: try: _ = requests.get("http://localhost:8000/", timeout=2) except Exception: pass with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc: list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS))) # Let Serve finish processing and flush logs time.sleep(10) def extract_request_ids(pattern): request_ids = set() for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True): if os.path.isfile(path): with open(path, 'r', errors='ignore') as f: for line in f: match = pattern.search(line) if match: request_ids.add(match.group(1)) return request_ids def main(): print("Cleaning logs directory...") shutil.rmtree(LOG_DIR, ignore_errors=True) print(f"Running my_func() {NUM_RUNS} times...") for i in range(NUM_RUNS): print(f"Run {i+1}/{NUM_RUNS}") my_func() print("Finished running my_func(). Analyzing logs...") request_ids_completed_200 = extract_request_ids(GET_LINE_RE) request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE) request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE) print("\n\nSummary:\n") print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}") print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in ray-project#52628") print(request_ids_cancelled & request_ids_completed_200) missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled if missing_cancellation_log: print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons") print(missing_cancellation_log) print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)") print(request_ids_completed_despite_cancel) if __name__ == "__main__": main() ``` ```bash Summary:⚠️ Total requests completed with 200(goal is to have 0): 16⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in ray-project#52628 {'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'} ✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(ray-project#52591) {'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'} ``` Including unit tests. This PR does not fix the other issue mentioned in ray-project#52533 where `ray.cancel` does not actually cancel the replica actor task. That will be a follow up investigation+fix --------- Signed-off-by: abrar <abrar@anyscale.com>
Looking at the core cancellation path for asyncio actor tasks, my guess is the problem lies here: Line 2550 in afe51d8
It could be that in rare cases, Line 4592 in afe51d8
Need to study the concurrency model here a bit more, because it looks naiively like there are clear race conditions related to the fine-grained lock that gates access to the futures. |
I don't see any mutex being held on the cancellation path in the core worker: ray/src/ray/core_worker/core_worker.cc Line 4343 in afe51d8
|
I confirmed the above:
This happens reliably. |
I implemented a naiive fix that loops forever until the future exists in - future = worker.core_worker.get_queued_future(task_id)
- if future is not None:
- future.cancel()
+ while True:
+ future = worker.core_worker.get_queued_future(task_id)
+ if future is not None:
+ future.cancel()
+ break
+ else:
+ print("FUTURE DID NOT EXIST, WILL RETRY:", task_id)
+ time.sleep(0.1) Obviously not the right fix, but confirms the timing of the race condition: |
…52591) - **Deployment handle**: Cancel `replica_results_future` upfront to prevent race conditions when checking for cancellation before calling `cancel()`. - **Router**: Deconstruct `run_coroutine_threadsafe` to access the underlying async task, enabling proactive cancellation when the concurrent `Future` is cancelled but the async task continues to run (e.g., due to lack of `await`). In the callback, explicitly fetch and cancel the `replica_result` from the async task. More context in #52533 ## How do i know this works ran the following repro script ```python import re import time import asyncio import requests from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from ray import serve import shutil import glob import os import ray ray.init(logging_level="debug", dashboard_host="0.0.0.0") LOG_DIR = "/tmp/ray/session_latest/logs" GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200") CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica") ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt") NUM_PARALLEL_REQUESTS = 250 NUM_RUNS = 100 # <--- Change this to control how many times to run my_func() def my_func(): @serve.deployment(max_ongoing_requests=10) class Parent: async def __call__(self) -> int: return await asyncio.sleep(5) app = Parent.bind() serve.run(app) time.sleep(1.0) def get_and_cancel(x: int) -> None: try: _ = requests.get("http://localhost:8000/", timeout=2) except Exception: pass with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc: list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS))) # Let Serve finish processing and flush logs time.sleep(10) def extract_request_ids(pattern): request_ids = set() for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True): if os.path.isfile(path): with open(path, 'r', errors='ignore') as f: for line in f: match = pattern.search(line) if match: request_ids.add(match.group(1)) return request_ids def main(): print("Cleaning logs directory...") shutil.rmtree(LOG_DIR, ignore_errors=True) print(f"Running my_func() {NUM_RUNS} times...") for i in range(NUM_RUNS): print(f"Run {i+1}/{NUM_RUNS}") my_func() print("Finished running my_func(). Analyzing logs...") request_ids_completed_200 = extract_request_ids(GET_LINE_RE) request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE) request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE) print("\n\nSummary:\n") print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}") print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in #52628") print(request_ids_cancelled & request_ids_completed_200) missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled if missing_cancellation_log: print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons") print(missing_cancellation_log) print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(#52591)") print(request_ids_completed_despite_cancel) if __name__ == "__main__": main() ``` ```bash Summary:⚠️ Total requests completed with 200(goal is to have 0): 16⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in #52628 {'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'} ✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(#52591) {'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'} ``` Including unit tests. This PR does not fix the other issue mentioned in #52533 where `ray.cancel` does not actually cancel the replica actor task. That will be a follow up investigation+fix --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: jhsu <jhsu@anyscale.com>
…ay-project#52591) - **Deployment handle**: Cancel `replica_results_future` upfront to prevent race conditions when checking for cancellation before calling `cancel()`. - **Router**: Deconstruct `run_coroutine_threadsafe` to access the underlying async task, enabling proactive cancellation when the concurrent `Future` is cancelled but the async task continues to run (e.g., due to lack of `await`). In the callback, explicitly fetch and cancel the `replica_result` from the async task. More context in ray-project#52533 ## How do i know this works ran the following repro script ```python import re import time import asyncio import requests from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from ray import serve import shutil import glob import os import ray ray.init(logging_level="debug", dashboard_host="0.0.0.0") LOG_DIR = "/tmp/ray/session_latest/logs" GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200") CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica") ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt") NUM_PARALLEL_REQUESTS = 250 NUM_RUNS = 100 # <--- Change this to control how many times to run my_func() def my_func(): @serve.deployment(max_ongoing_requests=10) class Parent: async def __call__(self) -> int: return await asyncio.sleep(5) app = Parent.bind() serve.run(app) time.sleep(1.0) def get_and_cancel(x: int) -> None: try: _ = requests.get("http://localhost:8000/", timeout=2) except Exception: pass with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc: list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS))) # Let Serve finish processing and flush logs time.sleep(10) def extract_request_ids(pattern): request_ids = set() for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True): if os.path.isfile(path): with open(path, 'r', errors='ignore') as f: for line in f: match = pattern.search(line) if match: request_ids.add(match.group(1)) return request_ids def main(): print("Cleaning logs directory...") shutil.rmtree(LOG_DIR, ignore_errors=True) print(f"Running my_func() {NUM_RUNS} times...") for i in range(NUM_RUNS): print(f"Run {i+1}/{NUM_RUNS}") my_func() print("Finished running my_func(). Analyzing logs...") request_ids_completed_200 = extract_request_ids(GET_LINE_RE) request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE) request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE) print("\n\nSummary:\n") print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}") print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in ray-project#52628") print(request_ids_cancelled & request_ids_completed_200) missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled if missing_cancellation_log: print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons") print(missing_cancellation_log) print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)") print(request_ids_completed_despite_cancel) if __name__ == "__main__": main() ``` ```bash Summary:⚠️ Total requests completed with 200(goal is to have 0): 16⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in ray-project#52628 {'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'} ✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(ray-project#52591) {'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'} ``` Including unit tests. This PR does not fix the other issue mentioned in ray-project#52533 where `ray.cancel` does not actually cancel the replica actor task. That will be a follow up investigation+fix --------- Signed-off-by: abrar <abrar@anyscale.com>
…ay-project#52591) - **Deployment handle**: Cancel `replica_results_future` upfront to prevent race conditions when checking for cancellation before calling `cancel()`. - **Router**: Deconstruct `run_coroutine_threadsafe` to access the underlying async task, enabling proactive cancellation when the concurrent `Future` is cancelled but the async task continues to run (e.g., due to lack of `await`). In the callback, explicitly fetch and cancel the `replica_result` from the async task. More context in ray-project#52533 ## How do i know this works ran the following repro script ```python import re import time import asyncio import requests from collections import defaultdict from concurrent.futures import ThreadPoolExecutor from ray import serve import shutil import glob import os import ray ray.init(logging_level="debug", dashboard_host="0.0.0.0") LOG_DIR = "/tmp/ray/session_latest/logs" GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200") CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica") ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt") NUM_PARALLEL_REQUESTS = 250 NUM_RUNS = 100 # <--- Change this to control how many times to run my_func() def my_func(): @serve.deployment(max_ongoing_requests=10) class Parent: async def __call__(self) -> int: return await asyncio.sleep(5) app = Parent.bind() serve.run(app) time.sleep(1.0) def get_and_cancel(x: int) -> None: try: _ = requests.get("http://localhost:8000/", timeout=2) except Exception: pass with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc: list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS))) # Let Serve finish processing and flush logs time.sleep(10) def extract_request_ids(pattern): request_ids = set() for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True): if os.path.isfile(path): with open(path, 'r', errors='ignore') as f: for line in f: match = pattern.search(line) if match: request_ids.add(match.group(1)) return request_ids def main(): print("Cleaning logs directory...") shutil.rmtree(LOG_DIR, ignore_errors=True) print(f"Running my_func() {NUM_RUNS} times...") for i in range(NUM_RUNS): print(f"Run {i+1}/{NUM_RUNS}") my_func() print("Finished running my_func(). Analyzing logs...") request_ids_completed_200 = extract_request_ids(GET_LINE_RE) request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE) request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE) print("\n\nSummary:\n") print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}") print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in ray-project#52628") print(request_ids_cancelled & request_ids_completed_200) missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled if missing_cancellation_log: print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons") print(missing_cancellation_log) print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)") print(request_ids_completed_despite_cancel) if __name__ == "__main__": main() ``` ```bash Summary:⚠️ Total requests completed with 200(goal is to have 0): 16⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in ray-project#52628 {'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'} ✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(ray-project#52591) {'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'} ``` Including unit tests. This PR does not fix the other issue mentioned in ray-project#52533 where `ray.cancel` does not actually cancel the replica actor task. That will be a follow up investigation+fix --------- Signed-off-by: abrar <abrar@anyscale.com> Signed-off-by: zhaoch23 <c233zhao@uwaterloo.ca>
…ray-project#52703) See linked issue for the original symptom and reproduction. When a `CancelTask` RPC message is received, we need to handle 4 possible cases: 1. The `PushTask` RPC hasn't been received yet. 2. The `PushTask` RPC has been received but the task isn't executing yet. 3. The `PushTask` RPC has been received and the task is now executing. 4. The task finished executing and the `PushTask` RPC reply has been sent. The code currently handles (1) and (4) by relying on client-side retries: we return `success=False` and expect the client to retry the cancellation (unless the task has already finished in case (4), which it knows). However, there is a race condition between cases (2) and (3) where the task is no longer considered queued in the `OutOfOrderActorSchedulingQueue`, but it hasn't actually started executing yet and therefore there is no future to cancel. This can happen because: - We [erase the task ID](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc#L240) from the pending map before actually executing the task. After this, `CancelTaskIfFound` will return false. - We then post the work to start running the request [to the io_service_](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc#L245). - We post the `RunRequest` callback that eventually runs the task [to the fiber thread](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc#L156). - The logic to cancel the task runs on the [task_execution_service_](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/core_worker.cc#L4485). This means there is no guarantee that the task has actually started to execute when we call [cancel_async_task_](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/core_worker.cc#L4462). This PR fixes the problem by extending the reliance on client retries: we return a boolean from `cancel_async_task_` that indicates if the task was cancelled. If not, it's up to the client to retry. The proper long-term fix would be to serialize the executions and cancellations inside of the scheduling queue / task executor, but that will require a lot of refactoring work. We need to simplify the concurrency model in these classes. Closes ray-project#52628 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: zhaoch23 <c233zhao@uwaterloo.ca>
What happened + What you expected to happen
this issue is created specifically for ray core but the broader context is provided in #52533 (comment)
tldr is that in ray serve when a batch of request is made and immediately cancelled, handful of request run to completion because their underlying ray actor task that run those request don't actually cancel even though serve internally calls
ray.cancel
.The point in serve code that is responsible for calling cancel on the ray actor task is this
ray/python/ray/serve/_private/replica_scheduler/replica_wrapper.py
Line 108 in 7faa62a
Versions / Dependencies
2.44.1
Reproduction script
build ray from source (maybe necessary)
run the following repro script
This script triggers a burst of concurrent HTTP requests to a Ray Serve deployment, waits for the requests to be processed or cancelled, then scans the Ray Serve logs to extract all successfully served request IDs (GET / 200) and their corresponding Ray task IDs from cancellation events. It prints a table showing which requests were assigned a task that was supposed to be cancelled but still ran to completion.
this will output something like
expect it to take about 10 mins
Use the task id to search the core-worker logs to confirm that cancel is in fact being called
Issue Severity
Low: It annoys or frustrates me.
The text was updated successfully, but these errors were encountered: