Skip to content

Commit 1bd4eda

Browse files
moonlifacebook-github-bot
authored andcommitted
Python actor mesh supervision support (#522)
Summary: Pull Request resolved: #522 This diff exposes the Rust ActorMesh supervision API to Python ActorMesh. It also wires the supervision events to endpoint calls, including *call()/call_one()/choose()/stream()*. So when a supervision error happens, all inflight calls will get notified, new calls will be failed. The current diff fails the whole mesh when any actor fails in the mesh. A followup diff will provide more granular management here, such that we may only fail the calls that actually expect reply from the failed actors. Reviewed By: colin2328 Differential Revision: D77434080 fbshipit-source-id: 654528b7882c1f5f7762a1059d52764a5282859a
1 parent 2d369e1 commit 1bd4eda

File tree

16 files changed

+804
-116
lines changed

16 files changed

+804
-116
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ pub struct RootActorMesh<'a, A: RemoteActor> {
179179
proc_mesh: ProcMeshRef<'a>,
180180
name: String,
181181
pub(crate) ranks: Vec<ActorRef<A>>, // temporary until we remove `ArcActorMesh`.
182-
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
182+
// The receiver of supervision events. It is None if it has been transferred to
183+
// an actor event observer.
184+
actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
183185
}
184186

185187
impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
@@ -193,7 +195,7 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
193195
proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
194196
name,
195197
ranks,
196-
actor_supervision_rx,
198+
actor_supervision_rx: Some(actor_supervision_rx),
197199
}
198200
}
199201

@@ -207,7 +209,7 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
207209
proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
208210
name,
209211
ranks,
210-
actor_supervision_rx,
212+
actor_supervision_rx: Some(actor_supervision_rx),
211213
}
212214
}
213215

@@ -216,21 +218,35 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
216218
self.proc_mesh.client().open_port()
217219
}
218220

219-
/// An event stream of proc events. Each ProcMesh can produce only one such
221+
/// An event stream of actor events. Each RootActorMesh can produce only one such
220222
/// stream, returning None after the first call.
223+
pub fn events(&mut self) -> Option<ActorSupervisionEvents> {
224+
self.actor_supervision_rx
225+
.take()
226+
.map(|actor_supervision_rx| ActorSupervisionEvents {
227+
actor_supervision_rx,
228+
mesh_id: self.id(),
229+
})
230+
}
231+
}
232+
233+
/// Supervision event stream for actor mesh. It emits actor supervision events.
234+
pub struct ActorSupervisionEvents {
235+
// The receiver of supervision events from proc mesh.
236+
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
237+
// The name of the actor mesh.
238+
mesh_id: ActorMeshId,
239+
}
240+
241+
impl ActorSupervisionEvents {
221242
pub async fn next(&mut self) -> Option<ActorSupervisionEvent> {
222243
let result = self.actor_supervision_rx.recv().await;
223-
match result.as_ref() {
224-
Some(event) => {
225-
tracing::debug!("Received supervision event: {event:?}");
226-
}
227-
None => {
228-
tracing::info!(
229-
"Supervision stream for actor mesh {} was closed!",
230-
self.name
231-
);
232-
}
233-
};
244+
if result.is_none() {
245+
tracing::info!(
246+
"supervision stream for actor mesh {:?} was closed!",
247+
self.mesh_id
248+
);
249+
}
234250
result
235251
}
236252
}

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use hyperactor::Mailbox;
2020
use hyperactor::Named;
2121
use hyperactor::RemoteMessage;
2222
use hyperactor::WorldId;
23+
use hyperactor::actor::ActorStatus;
2324
use hyperactor::actor::RemoteActor;
2425
use hyperactor::actor::remote::Remote;
2526
use hyperactor::cap;
@@ -451,7 +452,7 @@ impl ProcMesh {
451452
}
452453

453454
/// Proc lifecycle events.
454-
#[derive(Debug)]
455+
#[derive(Debug, Clone)]
455456
pub enum ProcEvent {
456457
/// The proc of the given rank was stopped with the provided reason.
457458
Stopped(usize, ProcStopReason),
@@ -506,6 +507,21 @@ impl ProcEvents {
506507
continue;
507508
};
508509

510+
// Need to send this event to actor meshes to notify them of the proc's death.
511+
// TODO(albertli): only send this event to all root actor meshes if any of them use this proc.
512+
for entry in self.actor_event_router.iter() {
513+
// Make a dummy actor supervision event.
514+
let client_proc = ProcId(WorldId(format!("{}_manager", self.event_state.alloc.world_id().name())), 0);
515+
let client = client_proc.actor_id("client", 0);
516+
let event = ActorSupervisionEvent::new(
517+
client.clone(),
518+
ActorStatus::Failed(format!("proc {} is stopped", proc_id))
519+
);
520+
if entry.value().send(event).is_err() {
521+
tracing::warn!("unable to transmit supervision event to actor {}", client);
522+
}
523+
}
524+
509525
break Some(ProcEvent::Stopped(*rank, reason));
510526
}
511527
Ok(event) = self.event_state.supervision_events.recv() => {
@@ -521,17 +537,15 @@ impl ProcEvents {
521537
};
522538
// transmit to the correct root actor mesh.
523539
{
524-
let Some(tx) = self.actor_event_router.get(actor_id.name()) else {
540+
if let Some(tx) = self.actor_event_router.get(actor_id.name()) {
541+
if tx.send(event).is_err() {
542+
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
543+
}
544+
} else {
525545
tracing::warn!("received supervision event for unregistered actor {}", actor_id);
526-
continue;
527-
};
528-
let Ok(_) = tx.send(event) else {
529-
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
530-
continue;
531-
};
546+
}
532547
}
533-
// TODO: Actor supervision events need to be wired to the frontend.
534-
// TODO: This event should be handled by the proc mesh if unhandled by actor mesh.
548+
// Send this event to Python proc mesh to keep its health status up to date.
535549
break Some(ProcEvent::Crashed(*rank, actor_status.to_string()))
536550
}
537551
}
@@ -738,6 +752,7 @@ mod tests {
738752
let mut events = mesh.events().unwrap();
739753

740754
let mut actors = mesh.spawn::<TestActor>("failing", &()).await.unwrap();
755+
let mut actor_events = actors.events().unwrap();
741756

742757
actors
743758
.cast(
@@ -751,7 +766,7 @@ mod tests {
751766
ProcEvent::Crashed(0, reason) if reason.contains("failmonkey")
752767
);
753768

754-
let event = actors.next().await.unwrap();
769+
let mut event = actor_events.next().await.unwrap();
755770
assert_matches!(event.actor_status(), ActorStatus::Failed(_));
756771
assert_eq!(event.actor_id().1, "failing".to_string());
757772
assert_eq!(event.actor_id().2, 0);
@@ -767,7 +782,9 @@ mod tests {
767782
);
768783

769784
assert!(events.next().await.is_none());
770-
assert!(actors.next().await.is_none());
785+
event = actor_events.next().await.unwrap();
786+
assert_matches!(event.actor_status(), ActorStatus::Failed(_));
787+
assert_eq!(event.actor_id().2, 0);
771788
}
772789

773790
#[timed_test::async_timed_test(timeout_secs = 5)]

hyperactor_mesh/src/shared_cell.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,16 @@ impl<T> SharedCell<T> {
156156
SharedCellRef::from(self.inner.clone().try_read_owned()?)
157157
}
158158

159+
/// Execute given closure with write access to the underlying data. If the cell is empty, returns an error.
160+
pub async fn with_mut<F, R>(&self, f: F) -> Result<R, EmptyCellError>
161+
where
162+
F: FnOnce(&mut T) -> R,
163+
{
164+
let mut inner = self.inner.write(true).await;
165+
let value = inner.value.as_mut().ok_or(EmptyCellError {})?;
166+
Ok(f(value))
167+
}
168+
159169
/// Take the item out of the cell, leaving it in an unusable state.
160170
pub async fn take(&self) -> Result<T, EmptyCellError> {
161171
let mut inner = self.inner.write(true).await;

hyperactor_telemetry/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -505,13 +505,13 @@ pub fn initialize_logging(clock: impl TelemetryClock + Send + 'static) {
505505
} else {
506506
None
507507
})
508+
.with(file_layer)
509+
.with(stderr_layer)
508510
.with(if is_layer_enabled(DISABLE_RECORDER_TRACING) {
509511
Some(recorder().layer())
510512
} else {
511513
None
512514
})
513-
.with(file_layer)
514-
.with(stderr_layer)
515515
.try_init()
516516
{
517517
tracing::debug!("logging already initialized for this process: {}", err);

monarch_extension/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
7979
"monarch_hyperactor.selection",
8080
)?)?;
8181

82+
monarch_hyperactor::supervision::register_python_bindings(&get_or_add_new_module(
83+
module,
84+
"monarch_hyperactor.supervision",
85+
)?)?;
86+
8287
#[cfg(feature = "tensor_engine")]
8388
{
8489
client::register_python_bindings(&get_or_add_new_module(

0 commit comments

Comments
 (0)