Skip to content

Commit 6b805b5

Browse files
authored
[core] Threaded actors get stuck forever if they receive two exit signals (#51582)
If a threaded actor receives exit signals twice as shown in the above screenshot, it will execute [task_receiver_->Stop()](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/core_worker.cc#L1224) twice. However, the second call to `task_receiver_->Stop()` will get stuck forever when executing the [releaser](https://github.com/ray-project/ray/blob/6bb9cef9257046ae31f78f6c52015a8ebf009f81/src/ray/core_worker/transport/concurrency_group_manager.cc#L135). ### Reproduction ```sh ray start --head --include-dashboard=True --num-cpus=1 # https://gist.github.com/kevin85421/7a42ac3693537c2148fa554065bb5223 python3 test.py # Some actors are still ALIVE. If all actors are DEAD, increase the number of actors. ray list actors ``` --------- Signed-off-by: kaihsun <kaihsun@anyscale.com>
1 parent 5102523 commit 6b805b5

File tree

5 files changed

+70
-10
lines changed

5 files changed

+70
-10
lines changed

python/ray/_private/state_api_test_utils.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
PredicateType,
3030
SupportedFilterType,
3131
)
32+
import ray._private.test_utils as test_utils
3233

3334

3435
@dataclass
@@ -113,6 +114,16 @@ def invoke_state_api(
113114
return res
114115

115116

117+
def invoke_state_api_n(*args, **kwargs):
118+
def verify():
119+
NUM_API_CALL_SAMPLES = 10
120+
for _ in range(NUM_API_CALL_SAMPLES):
121+
invoke_state_api(*args, **kwargs)
122+
return True
123+
124+
test_utils.wait_for_condition(verify, retry_interval_ms=2000, timeout=30)
125+
126+
116127
def aggregate_perf_results(state_stats: StateAPIStats = GLOBAL_STATE_STATS):
117128
"""Aggregate stats of state API calls
118129

python/ray/tests/test_actor.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020
from ray._private.test_utils import SignalActor
2121
from ray.core.generated import gcs_pb2
2222
from ray._private.utils import hex_to_binary
23+
from ray._private.state_api_test_utils import invoke_state_api, invoke_state_api_n
24+
25+
from ray.util.state import list_actors
26+
2327

2428
# NOTE: We have to import setproctitle after ray because we bundle setproctitle
2529
# with ray.
@@ -283,6 +287,44 @@ def get_val(self):
283287
assert ray.get(g.remote()) == num_remote_functions - 1
284288

285289

290+
@pytest.mark.parametrize("enable_concurrency_group", [False, True])
291+
def test_exit_actor(ray_start_regular, enable_concurrency_group):
292+
concurrency_groups = {"io": 1} if enable_concurrency_group else None
293+
294+
@ray.remote(concurrency_groups=concurrency_groups)
295+
class TestActor:
296+
def exit(self):
297+
ray.actor.exit_actor()
298+
299+
num_actors = 30
300+
actor_class_name = TestActor.__ray_metadata__.class_name
301+
302+
actors = [TestActor.remote() for _ in range(num_actors)]
303+
ray.get([actor.__ray_ready__.remote() for actor in actors])
304+
invoke_state_api(
305+
lambda res: len(res) == num_actors,
306+
list_actors,
307+
filters=[("state", "=", "ALIVE"), ("class_name", "=", actor_class_name)],
308+
limit=1000,
309+
)
310+
311+
ray.wait([actor.exit.remote() for actor in actors], timeout=10.0)
312+
313+
invoke_state_api_n(
314+
lambda res: len(res) == 0,
315+
list_actors,
316+
filters=[("state", "=", "ALIVE"), ("class_name", "=", actor_class_name)],
317+
limit=1000,
318+
)
319+
320+
invoke_state_api(
321+
lambda res: len(res) == num_actors,
322+
list_actors,
323+
filters=[("state", "=", "DEAD"), ("class_name", "=", actor_class_name)],
324+
limit=1000,
325+
)
326+
327+
286328
@pytest.mark.skipif(client_test_enabled(), reason="internal api")
287329
def test_actor_method_metadata_cache(ray_start_regular):
288330
class Actor(object):

release/nightly_tests/stress_tests/test_state_api_scale.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
StateAPIMetric,
88
aggregate_perf_results,
99
invoke_state_api,
10+
invoke_state_api_n,
1011
GLOBAL_STATE_STATS,
1112
)
1213

@@ -38,16 +39,6 @@
3839
MiB = 1024 * 1024
3940

4041

41-
def invoke_state_api_n(*args, **kwargs):
42-
def verify():
43-
NUM_API_CALL_SAMPLES = 10
44-
for _ in range(NUM_API_CALL_SAMPLES):
45-
invoke_state_api(*args, **kwargs)
46-
return True
47-
48-
test_utils.wait_for_condition(verify, retry_interval_ms=2000, timeout=30)
49-
50-
5142
def test_many_tasks(num_tasks: int):
5243
TASK_NAME_TEMPLATE = "pi4_sample_{num_tasks}"
5344
if num_tasks == 0:

src/ray/core_worker/core_worker.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,6 +1174,13 @@ void CoreWorker::Exit(
11741174
const rpc::WorkerExitType exit_type,
11751175
const std::string &detail,
11761176
const std::shared_ptr<LocalMemoryBuffer> &creation_task_exception_pb_bytes) {
1177+
if (is_exit_) {
1178+
RAY_LOG(INFO) << "Exit signal received, but the core worker has already received a "
1179+
"signal and is shutting down.";
1180+
return;
1181+
}
1182+
is_exit_ = true;
1183+
11771184
RAY_LOG(INFO) << "Exit signal received, this process will exit after all outstanding "
11781185
"tasks have finished"
11791186
<< ", exit_type=" << rpc::WorkerExitType_Name(exit_type)

src/ray/core_worker/core_worker.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1893,6 +1893,15 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
18931893

18941894
std::atomic<bool> is_shutdown_ = false;
18951895

1896+
/// Whether the `Exit` function has been called, to avoid executing the exit
1897+
/// process multiple times.
1898+
///
1899+
/// TODO(kevin85421): Currently, there are two public functions, `Exit` and `Shutdown`,
1900+
/// to terminate the core worker gracefully. We should unify them into `Exit()` so we
1901+
/// don't need `is_shutdown_` in the future. See
1902+
/// https://github.com/ray-project/ray/issues/51642 for more details.
1903+
std::atomic<bool> is_exit_ = false;
1904+
18961905
int64_t max_direct_call_object_size_;
18971906

18981907
friend class CoreWorkerTest;

0 commit comments

Comments
 (0)