Skip to content

[core] [serve] ray.cancel on async actor task not cancellation the ray task #52628

Closed
@abrarsheikh

Description

@abrarsheikh

What happened + What you expected to happen

this issue is created specifically for ray core but the broader context is provided in #52533 (comment)

tldr is that in ray serve when a batch of request is made and immediately cancelled, handful of request run to completion because their underlying ray actor task that run those request don't actually cancel even though serve internally calls ray.cancel.

The point in serve code that is responsible for calling cancel on the ray actor task is this

Versions / Dependencies

2.44.1

Reproduction script

  1. checkout debug branch that contain the necessary logging to observe this issue
git fetch --all
git checkout CI-965-abrar-cancel_3_debug
  1. build ray from source (maybe necessary)

  2. run the following repro script

This script triggers a burst of concurrent HTTP requests to a Ray Serve deployment, waits for the requests to be processed or cancelled, then scans the Ray Serve logs to extract all successfully served request IDs (GET / 200) and their corresponding Ray task IDs from cancellation events. It prints a table showing which requests were assigned a task that was supposed to be cancelled but still ran to completion.

import re
import time
import asyncio
import requests
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from ray import serve
import shutil
import glob
import os
import ray

ray.init(logging_level="debug", dashboard_host="0.0.0.0")

LOG_DIR = "/tmp/ray/session_latest/logs"

# Matches: ... 30cdafed-5816-4a34-8994-424b4da6c9f2 -- GET / 200 ...
GET_LINE_RE = re.compile(r"\s([a-f0-9\-]{36}) -- GET / 200")

# Matches: ... UUID -- ... Ray task ID: TaskID(390a4a4a...)
CANCEL_LINE_RE = re.compile(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d+ .*? ([a-f0-9\-]{36}) -- .*?TaskID\(([a-f0-9]+)\)")

NUM_PARALLEL_REQUESTS = 250

def my_func():
    @serve.deployment(max_ongoing_requests=10)
    class Parent:
        async def __call__(self) -> int:
            return await asyncio.sleep(5)

    app = Parent.bind()
    serve.run(app)
    time.sleep(1.0)

    def get_and_cancel(x: int) -> None:
        try:
            _ = requests.get("http://localhost:8000/", timeout=2)
        except Exception:
            pass

    with ThreadPoolExecutor(max_workers=NUM_PARALLEL_REQUESTS) as exc:
        list(exc.map(get_and_cancel, range(NUM_PARALLEL_REQUESTS)))

    # Let Serve finish processing and flush logs
    time.sleep(10)

def logs_have_request_ids():
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    if GET_LINE_RE.search(line):
                        return True
    return False

def extract_request_ids():
    request_ids = set()
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = GET_LINE_RE.search(line)
                    if match:
                        request_ids.add(match.group(1))
    return request_ids

def find_task_ids(request_ids):
    mapping = defaultdict(list)
    for path in glob.glob(f"{LOG_DIR}/**/*", recursive=True):
        if os.path.isfile(path):
            with open(path, 'r', errors='ignore') as f:
                for line in f:
                    match = CANCEL_LINE_RE.search(line)
                    if match:
                        req_id, task_id = match.groups()
                        if req_id in request_ids:
                            mapping[req_id].append(task_id)
    return mapping

def main():
    print("Cleaning logs directory...")
    shutil.rmtree(LOG_DIR, ignore_errors=True)

    print("Waiting for logs with 'GET / 200'...")
    attempts = 0

    while not logs_have_request_ids():
        print(f"Waiting for logs with 'GET / 200'... (attempt {attempts})")
        my_func()
        time.sleep(1)
        attempts += 1

    print("Found logs, extracting request and task IDs...")
    request_ids = extract_request_ids()
    task_id_mapping = find_task_ids(request_ids)

    print("\n\nFollowing are the request IDs and their corresponding Ray task IDs that were supposed to be cancelled but ran to completion\n")
    print(f"{'Request ID':<40} | Ray Task ID(s)")
    print("-" * 80)
    for req_id, task_ids in task_id_mapping.items():
        print(f"{req_id:<40} | {', '.join(task_ids)}")

if __name__ == "__main__":
    main()

this will output something like

Following are the request IDs and their corresponding Ray task IDs that were supposed to be cancelled but ran to completion

Request ID                               | Ray Task ID(s)
--------------------------------------------------------------------------------
4426d915-ed42-45e7-b373-ca8899d19a1b     | 3c239ef5941e72a09642004102f5eaaf3928e1e701000000

expect it to take about 10 mins

Use the task id to search the core-worker logs to confirm that cancel is in fact being called

[2025-04-26 06:27:16,570 I 426616 426637] core_worker.cc:4377: Cancel an actor task task_id=3c239ef5941e72a09642004102f5eaaf3928e1e701000000 actor_id=9642004102f5eaaf3928e1e701000000

Issue Severity

Low: It annoys or frustrates me.

Metadata

Metadata

Assignees

No one assigned

    Labels

    P0Issues that should be fixed in short orderbugSomething that is supposed to be working; but isn'tcommunity-backlogcoreIssues that should be addressed in Ray Corestability

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions