Skip to content

Commit 1e36110

Browse files
proc_mesh: supervise undeliverable messages for mesh client (#383)
Summary: Pull Request resolved: #383 undeliverable messages returned to a proc mesh client are now transformed into actor supervision events, which are in turn mapped to `ProcEvent::Crashed` events. supervision is handled via `supervise_undeliverable_messages`, and direct access to the proc mesh client undeliverable message port receiver is removed. Reviewed By: vidhyav Differential Revision: D77555707 fbshipit-source-id: 2ec3643324a3592654f186f4d09afd5f62402d6f
1 parent f3e0fc7 commit 1e36110

File tree

4 files changed

+69
-50
lines changed

4 files changed

+69
-50
lines changed

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ mod undeliverable;
124124
pub use undeliverable::Undeliverable;
125125
pub use undeliverable::UndeliverableMessageError;
126126
pub use undeliverable::monitored_return_handle; // TODO: Audit
127+
pub use undeliverable::supervise_undeliverable_messages;
127128
/// For [`MailboxAdminMessage`], a message type for mailbox administration.
128129
pub mod mailbox_admin_message;
129130
pub use mailbox_admin_message::MailboxAdminMessage;

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ use crate::ActorId;
1616
use crate::Message;
1717
use crate::PortId;
1818
use crate::RemoteMessage;
19+
use crate::actor::ActorStatus;
1920
use crate::id;
2021
use crate::mailbox::DeliveryError;
2122
use crate::mailbox::MailboxSender;
2223
use crate::mailbox::MessageEnvelope;
2324
use crate::mailbox::PortHandle;
2425
use crate::mailbox::PortReceiver;
2526
use crate::mailbox::UndeliverableMailboxSender;
27+
use crate::supervision::ActorSupervisionEvent;
2628

2729
/// An undeliverable `M`-typed message (in practice `M` is
2830
/// [MessageEnvelope]).
@@ -130,3 +132,33 @@ impl UndeliverableMessageError {
130132
}
131133
}
132134
}
135+
136+
/// Spawns a task that listens for undeliverable messages and posts a
137+
/// corresponding `ActorSupervisionEvent` to the given supervision
138+
/// port.
139+
///
140+
/// The `mailbox_id` identifies the source mailbox for context in
141+
/// supervision events.
142+
pub fn supervise_undeliverable_messages(
143+
mailbox_id: ActorId,
144+
supervision_port: PortHandle<ActorSupervisionEvent>,
145+
mut rx: PortReceiver<Undeliverable<MessageEnvelope>>,
146+
) {
147+
tokio::spawn(async move {
148+
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
149+
envelope.try_set_error(DeliveryError::BrokenLink(
150+
"message returned to undeliverable port".to_string(),
151+
));
152+
if supervision_port
153+
.send(ActorSupervisionEvent::new(
154+
mailbox_id.clone(),
155+
ActorStatus::Failed(format!("message not delivered: {}", envelope)),
156+
))
157+
.is_err()
158+
{
159+
UndeliverableMailboxSender
160+
.post(envelope.clone(), /*unused*/ monitored_return_handle())
161+
}
162+
}
163+
});
164+
}

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 17 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -505,9 +505,9 @@ mod tests {
505505
use hyperactor::ProcId;
506506
use hyperactor::WorldId;
507507
use hyperactor::attrs::Attrs;
508-
use hyperactor::mailbox::Undeliverable;
509508

510509
use super::*;
510+
use crate::proc_mesh::ProcEvent;
511511

512512
// These tests are parametric over allocators.
513513
#[macro_export]
@@ -783,18 +783,18 @@ mod tests {
783783

784784
let name = alloc.name().to_string();
785785
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
786-
let mut undeliverable_rx = mesh.client_undeliverable_receiver().take()
787-
.expect("client_undeliverable_receiver should be available");
786+
let mut events = mesh.events().unwrap();
788787

789788
// Send a message to a non-existent actor (the proc however exists).
790789
let unmonitored_reply_to = mesh.client().open_port::<usize>().0.bind();
791790
let bad_actor = ActorRef::<TestActor>::attest(ActorId(ProcId(WorldId(name.clone()), 0), "foo".into(), 0));
792791
bad_actor.send(mesh.client(), GetRank(true, unmonitored_reply_to)).unwrap();
793792

794793
// The message will be returned!
795-
let Undeliverable(msg) = undeliverable_rx.recv().await.unwrap();
796-
assert_eq!(mesh.client().actor_id(), msg.sender());
797-
assert_eq!(&bad_actor.actor_id().port_id(GetRank::port()), msg.dest());
794+
assert_matches!(
795+
events.next().await.unwrap(),
796+
ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
797+
);
798798

799799
// TODO: Stop the proc.
800800
}
@@ -870,7 +870,6 @@ mod tests {
870870
let monkey = alloc.chaos_monkey();
871871
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
872872
let mut events = mesh.events().unwrap();
873-
let mut undeliverable_msg_rx = mesh.client_undeliverable_receiver().take().unwrap();
874873

875874
let ping_pong_actor_params = PingPongActorParams::new(
876875
PortRef::attest_message_port(mesh.client().actor_id()),
@@ -901,8 +900,10 @@ mod tests {
901900
.unwrap();
902901

903902
// The message will be returned!
904-
let Undeliverable(msg) = undeliverable_msg_rx.recv().await.unwrap();
905-
assert_eq!(msg.sender(), mesh.client().actor_id());
903+
assert_matches!(
904+
events.next().await.unwrap(),
905+
ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
906+
);
906907

907908
// Get 'pong' to send 'ping' a message. Since 'ping's
908909
// mailbox is stopped, the send will timeout and fail.
@@ -914,11 +915,9 @@ mod tests {
914915
.unwrap();
915916

916917
// The message will be returned!
917-
let Undeliverable(msg) = undeliverable_msg_rx.recv().await.unwrap();
918-
assert_eq!(msg.sender(), pong.actor_id());
919-
assert_eq!(
920-
msg.dest(),
921-
&ping.actor_id().port_id(PingPongMessage::port())
918+
assert_matches!(
919+
events.next().await.unwrap(),
920+
ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
922921
);
923922
}
924923

@@ -938,10 +937,6 @@ mod tests {
938937

939938
let stop = alloc.stopper();
940939
let mut mesh = ProcMesh::allocate(alloc).await.unwrap();
941-
let mut undeliverable_rx = mesh
942-
.client_undeliverable_receiver()
943-
.take()
944-
.expect("client_undeliverable_receiver should be available");
945940
let mut events = mesh.events().unwrap();
946941

947942
let actor_mesh = mesh
@@ -970,15 +965,10 @@ mod tests {
970965
.cast(sel!(*), GetRank(false, reply_handle.bind()))
971966
.unwrap();
972967

973-
// The message will be returned.
974-
let Undeliverable(msg) = undeliverable_rx.recv().await.unwrap();
975-
assert_eq!(
976-
msg.sender(),
977-
&ActorId(
978-
ProcId(actor_mesh.world_id().clone(), 0),
979-
"comm".to_owned(),
980-
0
981-
)
968+
// The message will be returned!
969+
assert_matches!(
970+
events.next().await.unwrap(),
971+
ProcEvent::Crashed(0, reason) if reason.contains("failed: message not delivered")
982972
);
983973

984974
// Stop the mesh.

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ pub struct ProcMesh {
8282
#[allow(dead_code)] // will be used in subsequent diff
8383
client_proc: Proc,
8484
client: Mailbox,
85-
client_undeliverable_receiver: Option<PortReceiver<Undeliverable<MessageEnvelope>>>,
8685
comm_actors: Vec<ActorRef<CommActor>>,
8786
world_id: WorldId,
8887
}
@@ -211,16 +210,25 @@ impl ProcMesh {
211210

212211
// TODO: No actor bound to "supervisor" yet.
213212
let supervisor = client_proc.attach("supervisor")?;
214-
let (supervison_port, supervision_events) = supervisor.open_port();
213+
let (supervision_port, supervision_events) =
214+
supervisor.open_port::<ActorSupervisionEvent>();
215215

216216
// Now, configure the full mesh, so that the local agents are
217-
// wired up to our router. Bind an undeliverable message port
218-
// in the client and return the port receiver.
219-
// No actor bound to this "client" yet
217+
// wired up to our router.
218+
// TODO: No actor bound to "client" yet.
220219
let client = client_proc.attach("client")?;
220+
// Bind an undeliverable message port in the client.
221221
let (undeliverable_messages, client_undeliverable_receiver) =
222222
client.open_port::<Undeliverable<MessageEnvelope>>();
223223
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
224+
// Monitor undeliverable messages from the client and emit
225+
// corresponding actor supervision events via the supervision
226+
// port.
227+
hyperactor::mailbox::supervise_undeliverable_messages(
228+
client.actor_id().clone(),
229+
supervision_port.clone(),
230+
client_undeliverable_receiver,
231+
);
224232

225233
// Map of procs -> channel addresses
226234
let address_book: HashMap<_, _> = running
@@ -235,7 +243,7 @@ impl ProcMesh {
235243
&client,
236244
rank,
237245
router_channel_addr.clone(),
238-
supervison_port.bind(),
246+
supervision_port.bind(),
239247
address_book.clone(),
240248
config_handle.bind(),
241249
)
@@ -304,7 +312,6 @@ impl ProcMesh {
304312
.collect(),
305313
client_proc,
306314
client,
307-
client_undeliverable_receiver: Some(client_undeliverable_receiver),
308315
comm_actors,
309316
world_id,
310317
})
@@ -399,22 +406,6 @@ impl ProcMesh {
399406
&self.client
400407
}
401408

402-
/// Returns a mutable reference to the client mailbox's
403-
/// undeliverable message port receiver.
404-
///
405-
/// This allows the caller to extract the
406-
/// `PortReceiver<Undeliverable<MessageEnvelope>>` by calling
407-
/// `.take()` on the returned `Option`, transferring ownership of
408-
/// the receiver.
409-
///
410-
/// Typically used to access the port bound by
411-
/// `ProcMesh::allocate`.
412-
pub fn client_undeliverable_receiver(
413-
&mut self,
414-
) -> &mut Option<PortReceiver<Undeliverable<MessageEnvelope>>> {
415-
&mut self.client_undeliverable_receiver
416-
}
417-
418409
pub fn client_proc(&self) -> &Proc {
419410
&self.client_proc
420411
}
@@ -508,6 +499,11 @@ impl ProcEvents {
508499
Ok(event) = self.event_state.supervision_events.recv() => {
509500
let (actor_id, actor_status) = event.clone().into_inner();
510501
let Some(rank) = self.ranks.get(actor_id.proc_id()) else {
502+
let client_proc = ProcId(WorldId(format!("{}_manager", self.event_state.alloc.world_id().name())), 0);
503+
let client = client_proc.actor_id("client", 0);
504+
if client == actor_id {
505+
break Some(ProcEvent::Crashed(0, actor_status.to_string()));
506+
}
511507
tracing::warn!("received supervision event for unmapped actor {}", actor_id);
512508
continue;
513509
};

0 commit comments

Comments
 (0)