Skip to content

Commit ae15926

Browse files
authored
[core][1/N] Fix KillActor RPC for threaded actors (#51414)
When we send a request to `/api/actors/kill` on the Ray dashboard with `force_kill=false`, the actor process will not actually be killed. See the following "Reproduction" section for more details. When `force_kill` is `false`, the function [CoreWorker::Exit](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1167) will post `drain_references_callback` to `task_execution_service_`. The callback function includes several parts, such as: * [Disconnect](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1078): * Based on my observation and experiments, no further `KillActor` RPCs are received after `local_raylet_client_->Disconnect` is called. * [Shutdown](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1010) * [task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6ed8cfd08927ec30dc4d3bf777cda50b57bab28f/src/ray/core_worker/core_worker.cc#L1028) will block until all running Ray tasks in the thread pools finish. * Non-threaded actor * In the following "Reproduction" section, the first request (`force_kill=false`) doesn't kill the process because `task_receiver_->Stop()` blocks execution, waiting for the actor task sleep to finish—which is an infinite loop. The second request (`force_kill=true`) kills the process because `drain_references_callback` was not called because both `drain_references_callback` and Ray task execution (i.e. `SleepActor.sleep` in this case) are submitted to `task_execution_service_` and both are executing on the main thread. That is, `Disconnect` is not called, so the process can still receive the force-kill `KillActor` RPC. * Threaded actor * The first request (`force_kill=false`) doesn't kill the process because `task_receiver_->Stop()` blocks execution, waiting for the actor task sleep to finish—which is an infinite loop. The second request (`force_kill=true`) doesn't kill the process because it cannot receive the force-kill `KillActor` RPC, as `Disconnect` was called after the first request. ### Solution This PR focuses on ensuring that core worker processes can still receive the `KillActor` RPC even when the previous RPC is blocked due to `task_receiver_->Stop()`. That is, the force-kill `KillActor` RPC can kill the process in the following "Reproduction" section. The solution is to ensure that `CoreWorker::Exit` does not block before calling `Disconnect`. Hence, this PR moves `task_receiver_->Stop()` to the very beginning of `drain_references_callback` instead of calling it after `Disconnect`. Possible followups: * Add a check to determine whether the concurrency group manager has already been stopped, and explicitly throw an exception if `Stop` is called twice. * Add timeout for `CoreWorker::Exit` to avoid blocking forever for long-running Ray tasks. ### Reproduction * `test.py` ```python import ray import time @ray.remote(concurrency_groups={"io": 1}) class SleepActor: def sleep(self): print("Sleep") while True: time.sleep(10) def echo(self, val): return val sa = SleepActor.remote() ray.get(sa.sleep.remote()) ``` * Reproduction ```sh ray start --head --include-dashboard=True --num-cpus=1 python3 test.py # Find the actor ID created by `test.py` ray list actors # Send a request to Ray dashboard `/api/actors/kill` with `force_kill=false`. curl -X GET -w "%{http_code}" "http://localhost:8265/api/actors/kill?actor_id=${YOUR_ACTOR_ID}&force_kill=false" # [Example output] {"result": true, "msg": "Requested actor with id ac24dd03dbfcb8ad02b5ceba01000000 to terminate. It will exit once running tasks complete", "data": {}}200% # [Example output of test.py] ray.exceptions.ActorDiedError: The actor died unexpectedly before finishing this task. class_name: SleepActor actor_id: 545606bd4458db0326a55b9701000000 pid: 2334580 namespace: 7b9c4d27-b70b-46d6-ad93-26f0da3e89f4 ip: 172.31.13.10 The actor is dead because its worker process has died. Worker exit type: INTENDED_SYSTEM_EXIT Worker exit detail: Worker exits because the actor is killed. The actor is dead because it was killed by `ray.kill`. # The actor is dead from the output of `ray list actors` ray list actors # The Ray Actor process is still running! ps aux | grep "ray::" # [Example output] ubuntu 2332284 0.5 0.1 19396584 73628 pts/12 SNl 05:02 0:01 ray::SleepActor.sleep # Force kill the actor curl -X GET -w "%{http_code}" "http://localhost:8265/api/actors/kill?actor_id=${YOUR_ACTOR_ID}&force_kill=false" # [Example output] {"result": true, "msg": "Force killed actor with id 545606bd4458db0326a55b9701000000", "data": {}}200% # The Ray Actor process is still running! ps aux | grep "ray::" ``` ## Related issue number #51058 Signed-off-by: kaihsun <kaihsun@anyscale.com>
1 parent 51d1af9 commit ae15926

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

python/ray/dashboard/modules/snapshot/tests/test_actors.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,16 @@ def _kill_actor_using_dashboard_gcs(
5050
return resp_json
5151

5252

53-
def test_kill_actor_gcs(ray_start_with_dashboard):
53+
@pytest.mark.parametrize("enable_concurrency_group", [False, True])
54+
def test_kill_actor_gcs(ray_start_with_dashboard, enable_concurrency_group):
5455
# Start the dashboard
5556
webui_url = ray_start_with_dashboard["webui_url"]
5657
assert wait_until_server_available(webui_url)
5758
webui_url = format_web_url(webui_url)
5859

59-
@ray.remote
60+
concurrency_groups = {"io": 1} if enable_concurrency_group else None
61+
62+
@ray.remote(concurrency_groups=concurrency_groups)
6063
class Actor:
6164
def f(self):
6265
ray._private.worker.show_in_dashboard("test")

src/ray/core_worker/core_worker.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1025,7 +1025,6 @@ void CoreWorker::Shutdown() {
10251025
if (worker_context_.CurrentActorIsAsync()) {
10261026
options_.terminate_asyncio_thread();
10271027
}
1028-
task_receiver_->Stop();
10291028
task_execution_service_.stop();
10301029
}
10311030
if (options_.on_worker_shutdown) {
@@ -1217,6 +1216,13 @@ void CoreWorker::Exit(
12171216
// drain the object references.
12181217
task_execution_service_.post(
12191218
[this, shutdown]() {
1219+
RAY_LOG(INFO) << "Wait for currently executing tasks in the underlying thread "
1220+
"pools to finish.";
1221+
// Wait for currently executing tasks in the underlying thread pools to
1222+
// finish. Note that if tasks have been posted to the thread pools but not
1223+
// started yet, they will not be executed.
1224+
task_receiver_->Stop();
1225+
12201226
bool not_actor_task = false;
12211227
{
12221228
absl::MutexLock lock(&mutex_);

0 commit comments

Comments
 (0)