Skip to content

Python actor mesh supervision support #522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ pub struct RootActorMesh<'a, A: RemoteActor> {
proc_mesh: ProcMeshRef<'a>,
name: String,
pub(crate) ranks: Vec<ActorRef<A>>, // temporary until we remove `ArcActorMesh`.
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
// The receiver of supervision events. It is None if it has been transferred to
// an actor event observer.
actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
}

impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
Expand All @@ -193,7 +195,7 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
name,
ranks,
actor_supervision_rx,
actor_supervision_rx: Some(actor_supervision_rx),
}
}

Expand All @@ -207,7 +209,7 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
name,
ranks,
actor_supervision_rx,
actor_supervision_rx: Some(actor_supervision_rx),
}
}

Expand All @@ -216,21 +218,35 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
self.proc_mesh.client().open_port()
}

/// An event stream of proc events. Each ProcMesh can produce only one such
/// An event stream of actor events. Each RootActorMesh can produce only one such
/// stream, returning None after the first call.
pub fn events(&mut self) -> Option<ActorSupervisionEvents> {
self.actor_supervision_rx
.take()
.map(|actor_supervision_rx| ActorSupervisionEvents {
actor_supervision_rx,
mesh_id: self.id(),
})
}
}

/// Supervision event stream for actor mesh. It emits actor supervision events.
pub struct ActorSupervisionEvents {
// The receiver of supervision events from proc mesh.
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
// The name of the actor mesh.
mesh_id: ActorMeshId,
}

impl ActorSupervisionEvents {
pub async fn next(&mut self) -> Option<ActorSupervisionEvent> {
let result = self.actor_supervision_rx.recv().await;
match result.as_ref() {
Some(event) => {
tracing::debug!("Received supervision event: {event:?}");
}
None => {
tracing::info!(
"Supervision stream for actor mesh {} was closed!",
self.name
);
}
};
if result.is_none() {
tracing::info!(
"supervision stream for actor mesh {:?} was closed!",
self.mesh_id
);
}
result
}
}
Expand Down
23 changes: 11 additions & 12 deletions hyperactor_mesh/src/proc_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ impl ProcMesh {
}

/// Proc lifecycle events.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ProcEvent {
/// The proc of the given rank was stopped with the provided reason.
Stopped(usize, ProcStopReason),
Expand Down Expand Up @@ -560,17 +560,15 @@ impl ProcEvents {
};
// transmit to the correct root actor mesh.
{
let Some(tx) = self.actor_event_router.get(actor_id.name()) else {
if let Some(tx) = self.actor_event_router.get(actor_id.name()) {
if tx.send(event).is_err() {
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
}
} else {
tracing::warn!("received supervision event for unregistered actor {}", actor_id);
continue;
};
let Ok(_) = tx.send(event) else {
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
continue;
};
}
}
// TODO: Actor supervision events need to be wired to the frontend.
// TODO: This event should be handled by the proc mesh if unhandled by actor mesh.
// Send this event to Python proc mesh to keep its health status up to date.
break Some(ProcEvent::Crashed(*rank, actor_status.to_string()))
}
}
Expand Down Expand Up @@ -777,6 +775,7 @@ mod tests {
let mut events = mesh.events().unwrap();

let mut actors = mesh.spawn::<TestActor>("failing", &()).await.unwrap();
let mut actor_events = actors.events().unwrap();

actors
.cast(
Expand All @@ -790,7 +789,7 @@ mod tests {
ProcEvent::Crashed(0, reason) if reason.contains("failmonkey")
);

let event = actors.next().await.unwrap();
let event = actor_events.next().await.unwrap();
assert_matches!(event.actor_status(), ActorStatus::Failed(_));
assert_eq!(event.actor_id().1, "failing".to_string());
assert_eq!(event.actor_id().2, 0);
Expand All @@ -806,6 +805,6 @@ mod tests {
);

assert!(events.next().await.is_none());
assert!(actors.next().await.is_none());
assert!(actor_events.next().await.is_none());
}
}
10 changes: 10 additions & 0 deletions hyperactor_mesh/src/shared_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,16 @@ impl<T> SharedCell<T> {
SharedCellRef::from(self.inner.clone().try_read_owned()?)
}

/// Execute given closure with write access to the underlying data. If the cell is empty, returns an error.
pub async fn with_mut<F, R>(&self, f: F) -> Result<R, EmptyCellError>
where
F: FnOnce(&mut T) -> R,
{
let mut inner = self.inner.write(true).await;
let value = inner.value.as_mut().ok_or(EmptyCellError {})?;
Ok(f(value))
}

/// Take the item out of the cell, leaving it in an unusable state.
pub async fn take(&self) -> Result<T, EmptyCellError> {
let mut inner = self.inner.write(true).await;
Expand Down
5 changes: 5 additions & 0 deletions monarch_extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
"monarch_hyperactor.selection",
)?)?;

monarch_hyperactor::supervision::register_python_bindings(&get_or_add_new_module(
module,
"monarch_hyperactor.supervision",
)?)?;

#[cfg(feature = "tensor_engine")]
{
client::register_python_bindings(&get_or_add_new_module(
Expand Down
Loading
Loading