Skip to content

Commit 0a1c573

Browse files
clock: bind undeliverable port in sim clock's mailbox (#517)
Summary: Pull Request resolved: #517 (1) pesky warnings in python about using a mailbox for sending without binding the undeliverable port (see D78031232) turned out to be due to the use of `SimClock`. hack in a "fix" that ~~terminates the process~~ logs the event (the idea of terminating the process didn't work out - test failures[*] ) if an undeliverable message is encountered in this context. we'll definitely want to revisit this however (kaiyuan-li, thomasywang) and fix it more appropriately (maybe by threading through a mailbox from upstack that has the undeliverable port bound and bounces undeliverables into supervision events perhaps?). (2) provide `unused_return_handle()` and use it to replace the `/*unused*/ monitored_return_handle()` idiom from the codebase. --- [*] the failed tests being, (1) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/controller:controller-unittest -- --exact 'monarch/controller:controller-unittest - tests::test_sim_supervision_failure'`, (2)`buck test 'fbcode//mode/opt' fbcode//monarch/python/tests:test_sim_backend -- --exact 'monarch/python/tests:test_sim_backend - test_sim_backend.py::TestSimBackend::test_local_mesh_setup'`, (3) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/hyperactor:hyperactor-unittest -- --exact 'monarch/hyperactor:hyperactor-unittest - clock::tests::test_sim_timeout'` in all cases i *think* the tests themselves are passing and the undeliverable messages are actually encountered during teardown. Differential Revision: D78191921
1 parent 54471df commit 0a1c573

File tree

4 files changed

+49
-15
lines changed

4 files changed

+49
-15
lines changed

hyperactor/src/clock.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,25 @@ use std::error::Error;
1212
use std::fmt;
1313
use std::sync::LazyLock;
1414
use std::sync::Mutex;
15+
use std::sync::OnceLock;
1516
use std::time::SystemTime;
1617

1718
use futures::pin_mut;
1819
use hyperactor_telemetry::TelemetryClock;
1920
use serde::Deserialize;
2021
use serde::Serialize;
2122

23+
use crate::ActorId;
2224
use crate::Mailbox;
2325
use crate::channel::ChannelAddr;
26+
use crate::data::Named;
2427
use crate::id;
28+
use crate::mailbox::DeliveryError;
29+
use crate::mailbox::MailboxSender;
30+
use crate::mailbox::MessageEnvelope;
31+
use crate::mailbox::Undeliverable;
32+
use crate::mailbox::UndeliverableMailboxSender;
33+
use crate::mailbox::unused_return_handle;
2534
use crate::simnet::SleepEvent;
2635
use crate::simnet::simnet_handle;
2736

@@ -175,6 +184,30 @@ impl ClockKind {
175184
}
176185
}
177186

187+
// TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
188+
// from upstack and handle undeliverable messages properly.
189+
static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
190+
fn simclock_mailbox_id() -> ActorId {
191+
id!(proc[0].proc).clone()
192+
}
193+
fn simclock_mailbox() -> &'static Mailbox {
194+
SIMCLOCK_MAILBOX.get_or_init(|| {
195+
let mailbox = Mailbox::new_detached(simclock_mailbox_id());
196+
let (undeliverable_messages, mut rx) =
197+
mailbox.open_port::<Undeliverable<MessageEnvelope>>();
198+
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
199+
tokio::spawn(async move {
200+
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
201+
envelope.try_set_error(DeliveryError::BrokenLink(
202+
"message returned to undeliverable port".to_string(),
203+
));
204+
UndeliverableMailboxSender.post(envelope, unused_return_handle());
205+
}
206+
});
207+
mailbox
208+
})
209+
}
210+
178211
/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
179212
/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
180213
/// time to the wakeup time and use the transmitter to wake up this green thread
@@ -184,7 +217,7 @@ pub struct SimClock;
184217
impl Clock for SimClock {
185218
/// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
186219
async fn sleep(&self, duration: tokio::time::Duration) {
187-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
220+
let mailbox = simclock_mailbox().clone();
188221
let (tx, rx) = mailbox.open_once_port::<()>();
189222

190223
simnet_handle()
@@ -199,7 +232,7 @@ impl Clock for SimClock {
199232
}
200233

201234
async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
202-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
235+
let mailbox = simclock_mailbox().clone();
203236
let (tx, rx) = mailbox.open_once_port::<()>();
204237

205238
simnet_handle()
@@ -234,7 +267,7 @@ impl Clock for SimClock {
234267
where
235268
F: std::future::Future<Output = T>,
236269
{
237-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
270+
let mailbox = simclock_mailbox().clone();
238271
let (tx, deadline_rx) = mailbox.open_once_port::<()>();
239272

240273
simnet_handle()

hyperactor/src/mailbox.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub use undeliverable::Undeliverable;
128128
pub use undeliverable::UndeliverableMessageError;
129129
pub use undeliverable::monitored_return_handle; // TODO: Audit
130130
pub use undeliverable::supervise_undeliverable_messages;
131+
pub use undeliverable::unused_return_handle;
131132
/// For [`MailboxAdminMessage`], a message type for mailbox administration.
132133
pub mod mailbox_admin_message;
133134
pub use mailbox_admin_message::MailboxAdminMessage;
@@ -1310,7 +1311,7 @@ impl cap::sealed::CanSend for Mailbox {
13101311
.get_or_init(DashSet::new)
13111312
.insert(actor_id.clone())
13121313
{
1313-
let bt = std::backtrace::Backtrace::capture();
1314+
let bt = std::backtrace::Backtrace::force_capture();
13141315
tracing::warn!(
13151316
actor_id = ?actor_id,
13161317
backtrace = ?bt,
@@ -2833,8 +2834,7 @@ mod tests {
28332834
envelope.try_set_error(DeliveryError::BrokenLink(
28342835
"returned in unit test".to_string(),
28352836
));
2836-
UndeliverableMailboxSender
2837-
.post(envelope, /*unused */ monitored_return_handle().clone());
2837+
UndeliverableMailboxSender.post(envelope, unused_return_handle().clone());
28382838
}
28392839
});
28402840
drop(return_handle);

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,12 @@ static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelop
5353
pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
5454
let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
5555
let (return_handle, mut rx) = new_undeliverable_port();
56-
// Don't reuse `return_handle` for `h`: else it will never get
57-
// dropped and the task will never return.
58-
let (h, _) = new_undeliverable_port();
5956
crate::init::get_runtime().spawn(async move {
6057
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
6158
envelope.try_set_error(DeliveryError::BrokenLink(
6259
"message returned to undeliverable port".to_string(),
6360
));
64-
super::UndeliverableMailboxSender.post(envelope, /*unused */ h.clone());
61+
super::UndeliverableMailboxSender.post(envelope, unused_return_handle());
6562
}
6663
});
6764
return_handle
@@ -70,6 +67,12 @@ pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
7067
return_handle.clone()
7168
}
7269

70+
/// For when a return handle is required but we know it won't be used.
71+
/// Any attempt to do so will manifest as a panic.
72+
pub fn unused_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
73+
unimplemented!()
74+
}
75+
7376
/// Returns a message envelope to its original sender.
7477
pub(crate) fn return_undeliverable(
7578
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
@@ -149,8 +152,7 @@ pub fn supervise_undeliverable_messages(
149152
))
150153
.is_err()
151154
{
152-
UndeliverableMailboxSender
153-
.post(envelope.clone(), /*unused*/ monitored_return_handle())
155+
UndeliverableMailboxSender.post(envelope.clone(), unused_return_handle())
154156
}
155157
}
156158
});

hyperactor_mesh/src/comm.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use hyperactor::mailbox::MailboxSender;
3131
use hyperactor::mailbox::Undeliverable;
3232
use hyperactor::mailbox::UndeliverableMailboxSender;
3333
use hyperactor::mailbox::UndeliverableMessageError;
34-
use hyperactor::mailbox::monitored_return_handle;
34+
use hyperactor::mailbox::unused_return_handle;
3535
use hyperactor::reference::UnboundPort;
3636
use ndslice::selection::routing::RoutingFrame;
3737
use serde::Deserialize;
@@ -203,8 +203,7 @@ impl Actor for CommActor {
203203
}
204204

205205
// 3. A return of an undeliverable message was itself returned.
206-
UndeliverableMailboxSender
207-
.post(message_envelope, /*unused */ monitored_return_handle());
206+
UndeliverableMailboxSender.post(message_envelope, unused_return_handle());
208207
Ok(())
209208
}
210209
}

0 commit comments

Comments
 (0)