Skip to content

[core] [serve] ray.cancel on async actor task not cancellation the ray task #52628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
abrarsheikh opened this issue Apr 26, 2025 · 6 comments · Fixed by #52703
Closed

[core] [serve] ray.cancel on async actor task not cancellation the ray task #52628

abrarsheikh opened this issue Apr 26, 2025 · 6 comments · Fixed by #52703
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P0 Issues that should be fixed in short order stability

Comments

@abrarsheikh
Copy link
Contributor

What happened + What you expected to happen

this issue is created specifically for ray core but the broader context is provided in #52533 (comment)

tldr is that in ray serve when a batch of request is made and immediately cancelled, handful of request run to completion because their underlying ray actor task that run those request don't actually cancel even though serve internally calls ray.cancel.

The point in serve code that is responsible for calling cancel on the ray actor task is this

Versions / Dependencies

2.44.1

Reproduction script

  1. checkout debug branch that contain the necessary logging to observe this issue
git fetch --all
git checkout CI-965-abrar-cancel_3_debug
  1. build ray from source (maybe necessary)

  2. run the following repro script

This script triggers a burst of concurrent HTTP requests to a Ray Serve deployment, waits for the requests to be processed or cancelled, then scans the Ray Serve logs to extract all successfully served request IDs (GET / 200) and their corresponding Ray task IDs from cancellation events. It prints a table showing which requests were assigned a task that was supposed to be cancelled but still ran to completion.

import re
import time
import asyncio
import requests
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from ray import serve
import shutil
import glob
import os
import ray

ray.init(logging_level="debug", dashboard_host="0.0.0.0")

LOG_DIR = "/tmp/ray/session_latest/logs"

# Matches: ... 30cdafed-5816-4a34-8994-424b4da6c9f2 -- GET / 200 ...
GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200")

# Matches: ... UUID -- ... Ray task ID: TaskID(390a4a4a...)
CANCEL_LINE_RE = re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+ .*? ([a-f0-9\-]{36}) -- .*?TaskID\(([a-f0-9]+)\)")

NUM_PARALLEL_REQUESTS = 250

def my_func():
    @serve.deployment(max_ongoing_requests=10)
    class Parent:
        async def __call__(self) -> int:
            return await asyncio.sleep(5)

    app = Parent.bind()
    serve.run(app)
    time.sleep(1.0)

    def get_and_cancel(x: int) -> None:
        try:
            _ = requests.get("http://localhost:8000/", timeout=2)
        except Exception:
            pass

    with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc:
        list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS)))

    # Let Serve finish processing and flush logs
    time.sleep(10)

def logs_have_request_ids():
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    if GET_LINE_RE.search(line):
                        return True
    return False

def extract_request_ids():
    request_ids = set()
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = GET_LINE_RE.search(line)
                    if match:
                        request_ids.add(match.group(1))
    return request_ids

def find_task_ids(request_ids):
    mapping = defaultdict(list)
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = CANCEL_LINE_RE.search(line)
                    if match:
                        req_id, task_id = match.groups()
                        if req_id in request_ids:
                            mapping[req_id].append(task_id)
    return mapping

def main():
    print("Cleaning logs directory...")
    shutil.rmtree(LOG_DIR, ignore_errors=True)

    print("Waiting for logs with 'GET / 200'...")
    attempts = 0

    while not logs_have_request_ids():
        print(f"Waiting for logs with 'GET / 200'... (attempt {attempts})")
        my_func()
        time.sleep(1)
        attempts += 1

    print("Found logs, extracting request and task IDs...")
    request_ids = extract_request_ids()
    task_id_mapping = find_task_ids(request_ids)

    print("\n\nFollowing are the request IDs and their corresponding Ray task IDs that were supposed to be cancelled but ran to completion\n")
    print(f"{'Request ID':<40} | Ray Task ID(s)")
    print("-" * 80)
    for req_id, task_ids in task_id_mapping.items():
        print(f"{req_id:<40} | {', '.join(task_ids)}")

if __name__ == "__main__":
    main()

this will output something like

Following are the request IDs and their corresponding Ray task IDs that were supposed to be cancelled but ran to completion

Request ID                               | Ray Task ID(s)
--------------------------------------------------------------------------------
4426d915-ed42-45e7-b373-ca8899d19a1b     | 3c239ef5941e72a09642004102f5eaaf3928e1e701000000

expect it to take about 10 mins

Use the task id to search the core-worker logs to confirm that cancel is in fact being called

[2025-04-26 06:27:16,570 I 426616 426637] core_worker.cc:4377: Cancel an actor task task_id=3c239ef5941e72a09642004102f5eaaf3928e1e701000000 actor_id=9642004102f5eaaf3928e1e701000000

Issue Severity

Low: It annoys or frustrates me.

@abrarsheikh abrarsheikh added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Apr 26, 2025
@masoudcharkhabi masoudcharkhabi added core Issues that should be addressed in Ray Core stability labels Apr 28, 2025
@edoakes
Copy link
Collaborator

edoakes commented Apr 28, 2025

@abrarsheikh have you confirmed that on the receiving side, the actor task is not being canceled?

From the writeup it seems that the level of diagnosis is: we call ray.cancel, but the task continues to run to completion.

This could be an issue in the core asyncio code that doesn't properly send the cancellation signal, or it could be something in the serve replica code that doesn't properly handle it. Have you narrowed it down further?

edoakes pushed a commit that referenced this issue Apr 29, 2025
…52591)

- **Deployment handle**: Cancel `replica_results_future` upfront to
prevent race conditions when checking for cancellation before calling
`cancel()`.

- **Router**: Deconstruct `run_coroutine_threadsafe` to access the
underlying async task, enabling proactive cancellation when the
concurrent `Future` is cancelled but the async task continues to run
(e.g., due to lack of `await`). In the callback, explicitly fetch and
cancel the `replica_result` from the async task.


More context in #52533


## How do i know this works

ran the following repro script
```python
import re
import time
import asyncio
import requests
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from ray import serve
import shutil
import glob
import os
import ray

ray.init(logging_level="debug", dashboard_host="0.0.0.0")

LOG_DIR = "/tmp/ray/session_latest/logs"

GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200")
CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica")
ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt")

NUM_PARALLEL_REQUESTS = 250
NUM_RUNS = 100  # <--- Change this to control how many times to run my_func()

def my_func():
    @serve.deployment(max_ongoing_requests=10)
    class Parent:
        async def __call__(self) -> int:
            return await asyncio.sleep(5)

    app = Parent.bind()
    serve.run(app)
    time.sleep(1.0)

    def get_and_cancel(x: int) -> None:
        try:
            _ = requests.get("http://localhost:8000/", timeout=2)
        except Exception:
            pass

    with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc:
        list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS)))

    # Let Serve finish processing and flush logs
    time.sleep(10)

def extract_request_ids(pattern):
    request_ids = set()
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = pattern.search(line)
                    if match:
                        request_ids.add(match.group(1))
    return request_ids

def main():
    print("Cleaning logs directory...")
    shutil.rmtree(LOG_DIR, ignore_errors=True)

    print(f"Running my_func() {NUM_RUNS} times...")
    for i in range(NUM_RUNS):
        print(f"Run {i+1}/{NUM_RUNS}")
        my_func()

    print("Finished running my_func(). Analyzing logs...")

    request_ids_completed_200 = extract_request_ids(GET_LINE_RE)
    request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE)
    request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE)

    print("\n\nSummary:\n")

    print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}")
    print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in #52628")
    print(request_ids_cancelled & request_ids_completed_200)
    missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled
    if missing_cancellation_log:
        print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons")
        print(missing_cancellation_log)

    print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(#52591)")
    print(request_ids_completed_despite_cancel)


if __name__ == "__main__":
    main()

```

```bash
Summary:


⚠️ Total requests completed with 200(goal is to have 0): 16

⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in #52628
{'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'}

✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(#52591)
{'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'}
```

Including unit tests.

This PR does not fix the other issue mentioned in #52533 where
`ray.cancel` does not actually cancel the replica actor task. That will
be a follow up investigation+fix

---------

Signed-off-by: abrar <abrar@anyscale.com>
@abrarsheikh
Copy link
Contributor Author

abrarsheikh commented Apr 29, 2025

I tried adding logs at multiple places in the serve replica receiver code, but I am not seeing cancellation show up anywhere. One other observation I had was that the cancellation request (ray.cancel) happens before any code in the task has run, not sure what the implication of that is, but I thought I would mention it.

Here is a small repro script that maybe isolates the problem to the core

import asyncio
import concurrent.futures

import ray


@ray.remote(num_cpus=.1)
class Worker:
    async def long_running_task(self, task_id):
        await asyncio.sleep(15)
        return task_id

async def main():
    worker = Worker.remote()

    NUM_TASKS = 5000

    # Launch more tasks than available CPUs to cause queuing
    # Assume num_cpus = 2, launch 5 tasks
    with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_TASKS) as executor:
        futures = [executor.submit(worker.long_running_task.remote, i) for i in range(NUM_TASKS)]
        refs = [future.result() for future in concurrent.futures.as_completed(futures)]

    # Immediately cancel one of the later tasks before it has a chance to run
    tasks_to_cancel = []
    for i in range(NUM_TASKS - 1, -1, -1):
        tasks_to_cancel.append(i)
        ray.cancel(refs[i])

    tasks_cancelled = []
    tasks_completed = []
    # Gather results, catching any cancellation
    for task_to_cancel in range(NUM_TASKS-1, -1, -1):
        try:
            result = await refs[task_to_cancel]
            tasks_completed.append(task_to_cancel)
            # print(f"Result: {result}")
        except ray.exceptions.TaskCancelledError:
            tasks_cancelled.append(task_to_cancel)

    print(f"Total tasks enqueued: {NUM_TASKS}")
    print(f"Total tasks completed: {len(tasks_completed)}")
    print(f"Total tasks cancelled: {len(tasks_cancelled)}")
    assert set(tasks_cancelled) == set(tasks_to_cancel), f"Task that were not cancelled: {set(tasks_to_cancel) - set(tasks_cancelled)}"

asyncio.run(main())

I had to run it a few times and increase NUM_TASKS to get it to fail. It doesn't matter how large a value we set for sleep(), some tasks run to completion despite immediate cancellation.

I suspect this is similar to what's happening in serve, but not 100% sure.

ktyxx pushed a commit to ktyxx/ray that referenced this issue Apr 29, 2025
…ay-project#52591)

- **Deployment handle**: Cancel `replica_results_future` upfront to
prevent race conditions when checking for cancellation before calling
`cancel()`.

- **Router**: Deconstruct `run_coroutine_threadsafe` to access the
underlying async task, enabling proactive cancellation when the
concurrent `Future` is cancelled but the async task continues to run
(e.g., due to lack of `await`). In the callback, explicitly fetch and
cancel the `replica_result` from the async task.


More context in ray-project#52533


## How do i know this works

ran the following repro script
```python
import re
import time
import asyncio
import requests
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from ray import serve
import shutil
import glob
import os
import ray

ray.init(logging_level="debug", dashboard_host="0.0.0.0")

LOG_DIR = "/tmp/ray/session_latest/logs"

GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200")
CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica")
ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt")

NUM_PARALLEL_REQUESTS = 250
NUM_RUNS = 100  # <--- Change this to control how many times to run my_func()

def my_func():
    @serve.deployment(max_ongoing_requests=10)
    class Parent:
        async def __call__(self) -> int:
            return await asyncio.sleep(5)

    app = Parent.bind()
    serve.run(app)
    time.sleep(1.0)

    def get_and_cancel(x: int) -> None:
        try:
            _ = requests.get("http://localhost:8000/", timeout=2)
        except Exception:
            pass

    with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc:
        list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS)))

    # Let Serve finish processing and flush logs
    time.sleep(10)

def extract_request_ids(pattern):
    request_ids = set()
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = pattern.search(line)
                    if match:
                        request_ids.add(match.group(1))
    return request_ids

def main():
    print("Cleaning logs directory...")
    shutil.rmtree(LOG_DIR, ignore_errors=True)

    print(f"Running my_func() {NUM_RUNS} times...")
    for i in range(NUM_RUNS):
        print(f"Run {i+1}/{NUM_RUNS}")
        my_func()

    print("Finished running my_func(). Analyzing logs...")

    request_ids_completed_200 = extract_request_ids(GET_LINE_RE)
    request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE)
    request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE)

    print("\n\nSummary:\n")

    print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}")
    print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in ray-project#52628")
    print(request_ids_cancelled & request_ids_completed_200)
    missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled
    if missing_cancellation_log:
        print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons")
        print(missing_cancellation_log)

    print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)")
    print(request_ids_completed_despite_cancel)


if __name__ == "__main__":
    main()

```

```bash
Summary:


⚠️ Total requests completed with 200(goal is to have 0): 16

⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in ray-project#52628
{'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'}

✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)
{'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'}
```

Including unit tests.

This PR does not fix the other issue mentioned in ray-project#52533 where
`ray.cancel` does not actually cancel the replica actor task. That will
be a follow up investigation+fix

---------

Signed-off-by: abrar <abrar@anyscale.com>
@edoakes
Copy link
Collaborator

edoakes commented Apr 29, 2025

Looking at the core cancellation path for asyncio actor tasks, my guess is the problem lies here:

future = worker.core_worker.get_queued_future(task_id)

It could be that in rare cases, cancel_async_task is executed before the future is actually submitted and populated here:

with self._task_id_to_future_lock:

Need to study the concurrency model here a bit more, because it looks naiively like there are clear race conditions related to the fine-grained lock that gates access to the futures.

@edoakes
Copy link
Collaborator

edoakes commented Apr 29, 2025

I don't see any mutex being held on the cancellation path in the core worker:

void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request,

@edoakes
Copy link
Collaborator

edoakes commented Apr 29, 2025

I confirmed the above:

  • Bumped NUM_TASKS to 5000 in the script
  • Added print on this line in the cancellation callback if the future is not found
  • For each task that isn't cancelled appropriately, the print statement is hit

This happens reliably.

@edoakes
Copy link
Collaborator

edoakes commented Apr 29, 2025

I implemented a naiive fix that loops forever until the future exists in cancel_async_task, it fixes the issue.

-        future = worker.core_worker.get_queued_future(task_id)
-        if future is not None:
-            future.cancel()
+        while True:
+            future = worker.core_worker.get_queued_future(task_id)
+            if future is not None:
+                future.cancel()
+                break
+            else:
+                print("FUTURE DID NOT EXIST, WILL RETRY:", task_id)
+            time.sleep(0.1)

Obviously not the right fix, but confirms the timing of the race condition: cancel_async_task happens before the future is submitted and added to the map.

@edoakes edoakes added P0 Issues that should be fixed in short order and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Apr 29, 2025
iamjustinhsu pushed a commit that referenced this issue May 3, 2025
…52591)

- **Deployment handle**: Cancel `replica_results_future` upfront to
prevent race conditions when checking for cancellation before calling
`cancel()`.

- **Router**: Deconstruct `run_coroutine_threadsafe` to access the
underlying async task, enabling proactive cancellation when the
concurrent `Future` is cancelled but the async task continues to run
(e.g., due to lack of `await`). In the callback, explicitly fetch and
cancel the `replica_result` from the async task.

More context in #52533

## How do i know this works

ran the following repro script
```python
import re
import time
import asyncio
import requests
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from ray import serve
import shutil
import glob
import os
import ray

ray.init(logging_level="debug", dashboard_host="0.0.0.0")

LOG_DIR = "/tmp/ray/session_latest/logs"

GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200")
CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica")
ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt")

NUM_PARALLEL_REQUESTS = 250
NUM_RUNS = 100  # <--- Change this to control how many times to run my_func()

def my_func():
    @serve.deployment(max_ongoing_requests=10)
    class Parent:
        async def __call__(self) -> int:
            return await asyncio.sleep(5)

    app = Parent.bind()
    serve.run(app)
    time.sleep(1.0)

    def get_and_cancel(x: int) -> None:
        try:
            _ = requests.get("http://localhost:8000/", timeout=2)
        except Exception:
            pass

    with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc:
        list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS)))

    # Let Serve finish processing and flush logs
    time.sleep(10)

def extract_request_ids(pattern):
    request_ids = set()
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = pattern.search(line)
                    if match:
                        request_ids.add(match.group(1))
    return request_ids

def main():
    print("Cleaning logs directory...")
    shutil.rmtree(LOG_DIR, ignore_errors=True)

    print(f"Running my_func() {NUM_RUNS} times...")
    for i in range(NUM_RUNS):
        print(f"Run {i+1}/{NUM_RUNS}")
        my_func()

    print("Finished running my_func(). Analyzing logs...")

    request_ids_completed_200 = extract_request_ids(GET_LINE_RE)
    request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE)
    request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE)

    print("\n\nSummary:\n")

    print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}")
    print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in #52628")
    print(request_ids_cancelled & request_ids_completed_200)
    missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled
    if missing_cancellation_log:
        print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons")
        print(missing_cancellation_log)

    print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(#52591)")
    print(request_ids_completed_despite_cancel)

if __name__ == "__main__":
    main()

```

```bash
Summary:

⚠️ Total requests completed with 200(goal is to have 0): 16

⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in #52628
{'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'}

✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(#52591)
{'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'}
```

Including unit tests.

This PR does not fix the other issue mentioned in #52533 where
`ray.cancel` does not actually cancel the replica actor task. That will
be a follow up investigation+fix

---------

Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: jhsu <jhsu@anyscale.com>
vickytsang pushed a commit to ROCm/ray that referenced this issue May 5, 2025
…ay-project#52591)

- **Deployment handle**: Cancel `replica_results_future` upfront to
prevent race conditions when checking for cancellation before calling
`cancel()`.

- **Router**: Deconstruct `run_coroutine_threadsafe` to access the
underlying async task, enabling proactive cancellation when the
concurrent `Future` is cancelled but the async task continues to run
(e.g., due to lack of `await`). In the callback, explicitly fetch and
cancel the `replica_result` from the async task.


More context in ray-project#52533


## How do i know this works

ran the following repro script
```python
import re
import time
import asyncio
import requests
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from ray import serve
import shutil
import glob
import os
import ray

ray.init(logging_level="debug", dashboard_host="0.0.0.0")

LOG_DIR = "/tmp/ray/session_latest/logs"

GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200")
CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica")
ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt")

NUM_PARALLEL_REQUESTS = 250
NUM_RUNS = 100  # <--- Change this to control how many times to run my_func()

def my_func():
    @serve.deployment(max_ongoing_requests=10)
    class Parent:
        async def __call__(self) -> int:
            return await asyncio.sleep(5)

    app = Parent.bind()
    serve.run(app)
    time.sleep(1.0)

    def get_and_cancel(x: int) -> None:
        try:
            _ = requests.get("http://localhost:8000/", timeout=2)
        except Exception:
            pass

    with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc:
        list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS)))

    # Let Serve finish processing and flush logs
    time.sleep(10)

def extract_request_ids(pattern):
    request_ids = set()
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = pattern.search(line)
                    if match:
                        request_ids.add(match.group(1))
    return request_ids

def main():
    print("Cleaning logs directory...")
    shutil.rmtree(LOG_DIR, ignore_errors=True)

    print(f"Running my_func() {NUM_RUNS} times...")
    for i in range(NUM_RUNS):
        print(f"Run {i+1}/{NUM_RUNS}")
        my_func()

    print("Finished running my_func(). Analyzing logs...")

    request_ids_completed_200 = extract_request_ids(GET_LINE_RE)
    request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE)
    request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE)

    print("\n\nSummary:\n")

    print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}")
    print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in ray-project#52628")
    print(request_ids_cancelled & request_ids_completed_200)
    missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled
    if missing_cancellation_log:
        print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons")
        print(missing_cancellation_log)

    print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)")
    print(request_ids_completed_despite_cancel)


if __name__ == "__main__":
    main()

```

```bash
Summary:


⚠️ Total requests completed with 200(goal is to have 0): 16

⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in ray-project#52628
{'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'}

✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)
{'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'}
```

Including unit tests.

This PR does not fix the other issue mentioned in ray-project#52533 where
`ray.cancel` does not actually cancel the replica actor task. That will
be a follow up investigation+fix

---------

Signed-off-by: abrar <abrar@anyscale.com>
@edoakes edoakes closed this as completed in c2d4944 May 7, 2025
zhaoch23 pushed a commit to Bye-legumes/ray that referenced this issue May 14, 2025
…ay-project#52591)

- **Deployment handle**: Cancel `replica_results_future` upfront to
prevent race conditions when checking for cancellation before calling
`cancel()`.

- **Router**: Deconstruct `run_coroutine_threadsafe` to access the
underlying async task, enabling proactive cancellation when the
concurrent `Future` is cancelled but the async task continues to run
(e.g., due to lack of `await`). In the callback, explicitly fetch and
cancel the `replica_result` from the async task.

More context in ray-project#52533

## How do i know this works

ran the following repro script
```python
import re
import time
import asyncio
import requests
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from ray import serve
import shutil
import glob
import os
import ray

ray.init(logging_level="debug", dashboard_host="0.0.0.0")

LOG_DIR = "/tmp/ray/session_latest/logs"

GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200")
CANCEL_REQUEST_LINE_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Cancelling request that has already been assigned to a replica")
ASYNCIO_COMPLETED_DESPITE_CANCEL_RE = re.compile(r".*?([a-f0-9\-]{36}).*?Asyncio task completed despite cancellation attempt")

NUM_PARALLEL_REQUESTS = 250
NUM_RUNS = 100  # <--- Change this to control how many times to run my_func()

def my_func():
    @serve.deployment(max_ongoing_requests=10)
    class Parent:
        async def __call__(self) -> int:
            return await asyncio.sleep(5)

    app = Parent.bind()
    serve.run(app)
    time.sleep(1.0)

    def get_and_cancel(x: int) -> None:
        try:
            _ = requests.get("http://localhost:8000/", timeout=2)
        except Exception:
            pass

    with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc:
        list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS)))

    # Let Serve finish processing and flush logs
    time.sleep(10)

def extract_request_ids(pattern):
    request_ids = set()
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = pattern.search(line)
                    if match:
                        request_ids.add(match.group(1))
    return request_ids

def main():
    print("Cleaning logs directory...")
    shutil.rmtree(LOG_DIR, ignore_errors=True)

    print(f"Running my_func() {NUM_RUNS} times...")
    for i in range(NUM_RUNS):
        print(f"Run {i+1}/{NUM_RUNS}")
        my_func()

    print("Finished running my_func(). Analyzing logs...")

    request_ids_completed_200 = extract_request_ids(GET_LINE_RE)
    request_ids_cancelled = extract_request_ids(CANCEL_REQUEST_LINE_RE)
    request_ids_completed_despite_cancel = extract_request_ids(ASYNCIO_COMPLETED_DESPITE_CANCEL_RE)

    print("\n\nSummary:\n")

    print(f"\n⚠️ Total requests completed with 200(goal is to have 0): {len(request_ids_completed_200)}")
    print(f"\n⚠️ Out of the {len(request_ids_completed_200)} requests that completed with 200, following {len(request_ids_cancelled & request_ids_completed_200)} were cancelled with ray.cancel(), this issue is tracked in ray-project#52628")
    print(request_ids_cancelled & request_ids_completed_200)
    missing_cancellation_log = request_ids_completed_200 - request_ids_cancelled
    if missing_cancellation_log:
        print(f"\n ❌ Following {len(missing_cancellation_log)} request IDs completed with 200 for unknown reasons")
        print(missing_cancellation_log)

    print(f"\n✅ Following {len(request_ids_completed_despite_cancel)} requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)")
    print(request_ids_completed_despite_cancel)

if __name__ == "__main__":
    main()

```

```bash
Summary:

⚠️ Total requests completed with 200(goal is to have 0): 16

⚠️ Out of the 16 requests that completed with 200, following 16 were cancelled with ray.cancel(), this issue is tracked in ray-project#52628
{'13ef62f9-f8d1-4337-9385-c3464133a2d6', '3878651a-8a87-4443-83d7-e19682df12f2', 'd26e19ef-9c97-42b8-9398-b2ac8797ea25', '7dbac3e8-d640-4002-b2ab-804580dd6064', 'c7997538-5b47-4106-8aea-4dcdcff989ac', '2cc9a963-94f1-4d3d-8d5d-fa743952dd24', 'ca301d23-7b6e-4a63-9f31-a24dffdd6f7f', '58089dc2-d936-4928-b320-9fd8f1a06272', '7d3004c7-dac8-4ca9-a666-56569f4dea36', 'e5d5f10a-4370-4a32-97f2-fb3247b64868', '0f6eadb0-37b3-465c-aac1-35301dd060fe', '7b56ec4e-11d3-4297-8267-18b257859a9c', '69f52266-9290-4b47-984b-ad05f5ba4b01', '0e9caba0-9dcd-47ee-927b-1b03741e343b', '735b2a8b-4b76-42f5-9cec-f3443ff77aee', 'bef30c8d-45aa-411e-b2f3-30d999b318b6'}

✅ Following 28 requests would have completed with 200 if not for the fix in this current PR(ray-project#52591)
{'bffb22ff-3092-4906-9e3a-d555d072479e', 'e107962e-e840-43ee-9062-1bcf6f8cfe83', '9374490b-24c5-46d5-ad96-c37303d3e6ec', '3c72d933-3b39-415c-a2d9-7788e6fa84f3', 'f5bcf20f-446f-4723-b751-364aea50608c', '0a37528e-f500-4c64-90e1-f3296d52e836', 'd96fcd81-6eac-42f6-ab94-52713f34cc10', '7e9af900-d2a0-4a97-9be1-7498be664284', '503dc860-d0d2-4c20-ae8b-68ba91556c6e', 'eb91fe94-d6f4-4d69-b882-a25022a5017c', '48b41b02-9e37-406c-86ba-ee5a74d6bd7b', '439bbed8-7cfc-4558-a1f7-8982ccec2c23', 'be6a9b26-56ab-4324-8bdf-dab63b8f0574', '77080edf-fa26-40b2-aed5-2bbd09a2b4bb', 'eef2e360-1744-4c0b-bc46-6244946a53b3', '6a8ec4d6-790c-4445-84e9-d2059cdb7307', '9ee60267-edb1-4e5b-9796-95084e154152', 'd9cac3fd-a49c-4786-9a99-b91705f73b23', 'b5215d2f-9ed1-4817-a942-254d2c2b56c0', '13e6d993-bbda-44d5-a0a6-3a25670c4373', '2ba7b1e6-a91f-453a-b910-2b0d8ea6deb2', '61410d33-4fd0-4ae7-81af-bc17905d3275', '46131d2e-de76-45e8-bb90-141b190d815f', '248289b4-654a-43cb-9705-c45ea6681e3d', 'aad6053c-4262-4bd6-b313-a04796a6734c', '444b82fd-7b7c-4351-b2ff-f1eb1ecd488c', '7df24f6e-38f5-4e36-b3e9-e0171dee159e', '627156ff-20b3-4ec4-a34d-7616d83da00f'}
```

Including unit tests.

This PR does not fix the other issue mentioned in ray-project#52533 where
`ray.cancel` does not actually cancel the replica actor task. That will
be a follow up investigation+fix

---------

Signed-off-by: abrar <abrar@anyscale.com>
Signed-off-by: zhaoch23 <c233zhao@uwaterloo.ca>
zhaoch23 pushed a commit to Bye-legumes/ray that referenced this issue May 14, 2025
…ray-project#52703)

See linked issue for the original symptom and reproduction.

When a `CancelTask` RPC message is received, we need to handle 4
possible cases:

1. The `PushTask` RPC hasn't been received yet.
2. The `PushTask` RPC has been received but the task isn't executing
yet.
3. The `PushTask` RPC has been received and the task is now executing.
4. The task finished executing and the `PushTask` RPC reply has been
sent.

The code currently handles (1) and (4) by relying on client-side
retries: we return `success=False` and expect the client to retry the
cancellation (unless the task has already finished in case (4), which it
knows).

However, there is a race condition between cases (2) and (3) where the
task is no longer considered queued in the
`OutOfOrderActorSchedulingQueue`, but it hasn't actually started
executing yet and therefore there is no future to cancel. This can
happen because:

- We [erase the task
ID](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc#L240)
from the pending map before actually executing the task. After this,
`CancelTaskIfFound` will return false.
- We then post the work to start running the request [to the
io_service_](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc#L245).
- We post the `RunRequest` callback that eventually runs the task [to
the fiber
thread](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/transport/out_of_order_actor_scheduling_queue.cc#L156).
- The logic to cancel the task runs on the
[task_execution_service_](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/core_worker.cc#L4485).

This means there is no guarantee that the task has actually started to
execute when we call
[cancel_async_task_](https://github.com/ray-project/ray/blob/master/src/ray/core_worker/core_worker.cc#L4462).

This PR fixes the problem by extending the reliance on client retries:
we return a boolean from `cancel_async_task_` that indicates if the task
was cancelled. If not, it's up to the client to retry.

The proper long-term fix would be to serialize the executions and
cancellations inside of the scheduling queue / task executor, but that
will require a lot of refactoring work. We need to simplify the
concurrency model in these classes.

Closes ray-project#52628

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Signed-off-by: zhaoch23 <c233zhao@uwaterloo.ca>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core P0 Issues that should be fixed in short order stability
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants