Skip to content

Commit eeb57f9

Browse files
[Data] Implement forceful releasing of actors upon shutdown of StreamingExecutor (#51769)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? We've recently run into the issue where we had 1. Large pipeline execution was triggered in *tight* succession (one after another, immediately) 2. We had N GPUs available and all N used by the ActorPool 3. GPUs not being released in time before next execution begins 4. Subsequent dataset execution times out after 10m not being able to get the required GPUs Changes --- 1. Added `force` param to `PhysicalOperator.shutdown` method 2. Revisited pending/running actors release seq to kill these if it's a forced shutdown 3. Made sure shutdown seq awaits `on_exit` callback returning 4. Cleaned up a bunch of dead code ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
1 parent 3da4bcb commit eeb57f9

File tree

6 files changed

+74
-202
lines changed

6 files changed

+74
-202
lines changed

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ def internal_queue_size(self) -> int:
457457
"""
458458
return 0
459459

460-
def shutdown(self) -> None:
460+
def shutdown(self, force: bool = False) -> None:
461461
"""Abort execution and release all resources used by this operator.
462462
463463
This release any Ray resources acquired by this operator such as active

python/ray/data/_internal/execution/operators/actor_pool_map_operator.py

Lines changed: 63 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
class ActorPoolMapOperator(MapOperator):
3535
"""A MapOperator implementation that executes tasks on an actor pool.
3636
37+
NOTE: This class is NOT thread-safe
38+
3739
This class manages the state of a pool of actors used for task execution, as well
3840
as dispatch of tasks to those actors.
3941
@@ -273,10 +275,10 @@ def all_inputs_done(self):
273275
# once the bundle queue is exhausted.
274276
self._inputs_done = True
275277

276-
def shutdown(self):
278+
def shutdown(self, force: bool = False):
277279
# We kill all actors in the pool on shutdown, even if they are busy doing work.
278-
self._actor_pool.kill_all_actors()
279-
super().shutdown()
280+
self._actor_pool.shutdown(force=force)
281+
super().shutdown(force)
280282

281283
# Warn if the user specified a batch or block size that prevents full
282284
# parallelization across the actor pool. We only know this information after
@@ -462,6 +464,8 @@ class _ActorPool(AutoscalingActorPool):
462464
actors when the operator is done submitting work to the pool.
463465
"""
464466

467+
_ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S = 30
468+
465469
def __init__(
466470
self,
467471
compute_strategy: ActorPoolStrategy,
@@ -485,9 +489,6 @@ def __init__(
485489
self._running_actors: Dict[ray.actor.ActorHandle, _ActorState] = {}
486490
# Actors that are not yet ready (still pending creation).
487491
self._pending_actors: Dict[ObjectRef, ray.actor.ActorHandle] = {}
488-
# Whether actors that become idle should be eagerly killed. This is False until
489-
# the first call to kill_idle_actors().
490-
self._should_kill_idle_actors = False
491492
# Track locality matching stats.
492493
self._locality_hits: int = 0
493494
self._locality_misses: int = 0
@@ -547,7 +548,7 @@ def scale_up(self, num_actors: int) -> int:
547548
def scale_down(self, num_actors: int) -> int:
548549
num_killed = 0
549550
for _ in range(num_actors):
550-
if self.kill_inactive_actor():
551+
if self._kill_inactive_actor():
551552
num_killed += 1
552553
return num_killed
553554

@@ -575,9 +576,6 @@ def add_pending_actor(self, actor: ray.actor.ActorHandle, ready_ref: ray.ObjectR
575576
actor: The not-yet-ready actor to add as pending to the pool.
576577
ready_ref: The ready future for the actor.
577578
"""
578-
# The caller shouldn't add new actors to the pool after invoking
579-
# kill_inactive_actors().
580-
assert not self._should_kill_idle_actors
581579
self._pending_actors[ready_ref] = actor
582580

583581
def pending_to_running(self, ready_ref: ray.ObjectRef) -> bool:
@@ -664,11 +662,6 @@ def return_actor(self, actor: ray.actor.ActorHandle):
664662
assert actor in self._running_actors
665663
assert self._running_actors[actor].num_tasks_in_flight > 0
666664
self._running_actors[actor].num_tasks_in_flight -= 1
667-
if (
668-
self._should_kill_idle_actors
669-
and self._running_actors[actor].num_tasks_in_flight == 0
670-
):
671-
self._remove_actor(actor)
672665

673666
def get_pending_actor_refs(self) -> List[ray.ObjectRef]:
674667
return list(self._pending_actors.keys())
@@ -692,7 +685,7 @@ def num_free_slots(self) -> int:
692685
for running_actor in self._running_actors.values()
693686
)
694687

695-
def kill_inactive_actor(self) -> bool:
688+
def _kill_inactive_actor(self) -> bool:
696689
"""Kills a single pending or idle actor, if any actors are pending/idle.
697690
698691
Returns whether an inactive actor was actually killed.
@@ -709,70 +702,79 @@ def _maybe_kill_pending_actor(self) -> bool:
709702
if self._pending_actors:
710703
# At least one pending actor, so kill first one.
711704
ready_ref = next(iter(self._pending_actors.keys()))
712-
self._remove_actor(self._pending_actors[ready_ref])
713705
del self._pending_actors[ready_ref]
714706
return True
715707
# No pending actors, so indicate to the caller that no actors were killed.
716708
return False
717709

718710
def _maybe_kill_idle_actor(self) -> bool:
719-
for actor, running_actor in self._running_actors.items():
720-
if running_actor.num_tasks_in_flight == 0:
711+
for actor, state in self._running_actors.items():
712+
if state.num_tasks_in_flight == 0:
721713
# At least one idle actor, so kill first one found.
722-
self._remove_actor(actor)
714+
# NOTE: This is a fire-and-forget op
715+
self._release_running_actor(actor)
723716
return True
724717
# No idle actors, so indicate to the caller that no actors were killed.
725718
return False
726719

727-
def kill_all_inactive_actors(self):
728-
"""Kills all currently inactive actors and ensures that all actors that become
729-
idle in the future will be eagerly killed.
730-
731-
This is called once the operator is done submitting work to the pool, and this
732-
function is idempotent. Adding new pending actors after calling this function
733-
will raise an error.
734-
"""
735-
self._kill_all_pending_actors()
736-
self._kill_all_idle_actors()
737-
738-
def kill_all_actors(self):
720+
def shutdown(self, force: bool = False):
739721
"""Kills all actors, including running/active actors.
740722
741723
This is called once the operator is shutting down.
742724
"""
743-
self._kill_all_pending_actors()
744-
self._kill_all_running_actors()
725+
self._release_pending_actors(force=force)
726+
self._release_running_actors(force=force)
745727

746-
def _kill_all_pending_actors(self):
747-
for _, actor in self._pending_actors.items():
748-
self._remove_actor(actor)
728+
def _release_pending_actors(self, force: bool):
729+
# Release pending actors from the set of pending ones
730+
pending = dict(self._pending_actors)
749731
self._pending_actors.clear()
750732

751-
def _kill_all_idle_actors(self):
752-
idle_actors = [
753-
actor
754-
for actor, running_actor in self._running_actors.items()
755-
if running_actor.num_tasks_in_flight == 0
756-
]
757-
for actor in idle_actors:
758-
self._remove_actor(actor)
759-
self._should_kill_idle_actors = True
760-
761-
def _kill_all_running_actors(self):
762-
actors = list(self._running_actors.keys())
763-
for actor in actors:
764-
self._remove_actor(actor)
765-
766-
def _remove_actor(self, actor: ray.actor.ActorHandle):
767-
"""Remove the given actor from the pool."""
768-
# NOTE: we remove references to the actor and let ref counting
733+
if force:
734+
for _, actor in pending.items():
735+
# NOTE: Actors can't be brought back after being ``ray.kill``-ed,
736+
# hence we're only doing that if this is a forced release
737+
ray.kill(actor)
738+
739+
def _release_running_actors(self, force: bool):
740+
running = list(self._running_actors.keys())
741+
742+
on_exit_refs = []
743+
744+
# First release actors and collect their shutdown hook object-refs
745+
for actor in running:
746+
on_exit_refs.append(self._release_running_actor(actor))
747+
748+
# Wait for all actors to shutdown gracefully before killing them
749+
ray.wait(on_exit_refs, timeout=self._ACTOR_POOL_GRACEFUL_SHUTDOWN_TIMEOUT_S)
750+
751+
# NOTE: Actors can't be brought back after being ``ray.kill``-ed,
752+
# hence we're only doing that if this is a forced release
753+
if force:
754+
for actor in running:
755+
ray.kill(actor)
756+
757+
def _release_running_actor(
758+
self, actor: ray.actor.ActorHandle
759+
) -> Optional[ObjectRef]:
760+
"""Remove the given actor from the pool and trigger its `on_exit` callback.
761+
762+
This method returns a ``ref`` to the result
763+
"""
764+
# NOTE: By default, we remove references to the actor and let ref counting
769765
# garbage collect the actor, instead of using ray.kill.
770-
# Because otherwise the actor cannot be restarted upon lineage reconstruction.
771-
if actor in self._running_actors:
772-
# Call `on_exit` to trigger `UDF.__del__` which may perform
773-
# cleanup operations.
774-
actor.on_exit.remote()
775-
del self._running_actors[actor]
766+
#
767+
# Otherwise, actor cannot be reconstructed for the purposes of produced
768+
# object's lineage reconstruction.
769+
if actor not in self._running_actors:
770+
return None
771+
772+
# Call `on_exit` to trigger `UDF.__del__` which may perform
773+
# cleanup operations.
774+
ref = actor.on_exit.remote()
775+
del self._running_actors[actor]
776+
777+
return ref
776778

777779
def _get_location(self, bundle: RefBundle) -> Optional[NodeIdStr]:
778780
"""Ask Ray for the node id of the given bundle.

python/ray/data/_internal/execution/operators/map_operator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ def get_stats(self) -> StatsDict:
473473
def get_map_transformer(self) -> MapTransformer:
474474
return self._map_transformer
475475

476-
def shutdown(self):
476+
def shutdown(self, force: bool = False):
477477
self._data_tasks.clear()
478478
self._metadata_tasks.clear()
479479

python/ray/data/_internal/execution/operators/task_pool_map_operator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def _add_bundled_input(self, bundle: RefBundle):
108108
)
109109
self._submit_data_task(gen, bundle)
110110

111-
def shutdown(self):
111+
def shutdown(self, force: bool = False):
112112
# Cancel all active tasks.
113113
for _, task in self._data_tasks.items():
114114
ray.cancel(task.get_waitable())
@@ -121,7 +121,8 @@ def shutdown(self):
121121
# a different error, or cancellation failed. In all cases, we
122122
# swallow the exception.
123123
pass
124-
super().shutdown()
124+
125+
super().shutdown(force)
125126

126127
def progress_str(self) -> str:
127128
return ""

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def shutdown(self, exception: Optional[Exception] = None):
220220
self._global_info.set_description(prog_bar_msg)
221221
self._global_info.close()
222222
for op, state in self._topology.items():
223-
op.shutdown()
223+
op.shutdown(force=True)
224224
state.close_progress_bars()
225225
if exception is None:
226226
for callback in get_execution_callbacks(self._data_context):

0 commit comments

Comments
 (0)