Skip to content

Commit 50e0e0a

Browse files
clock: bind undeliverable port in sim clock's mailbox (#517)
Summary: Pull Request resolved: #517 (1) pesky warnings in python about using a mailbox for sending without binding the undeliverable port (see D78031232) turned out to be due to the use of `SimClock`. hack in a "fix" that ~~terminates the process~~ logs the event (the idea of terminating the process didn't work out - test failures[*] ) if an undeliverable message is encountered in this context. we'll definitely want to revisit this however (kaiyuan-li, thomasywang) and fix it more appropriately (maybe by threading through a mailbox from upstack that has the undeliverable port bound and bounces undeliverables into supervision events perhaps?). (2) provide `unused_return_handle()` and use it to replace the `/*unused*/ monitored_return_handle()` idiom from the codebase. --- [*] the failed tests being, (1) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/controller:controller-unittest -- --exact 'monarch/controller:controller-unittest - tests::test_sim_supervision_failure'`, (2)`buck test 'fbcode//mode/opt' fbcode//monarch/python/tests:test_sim_backend -- --exact 'monarch/python/tests:test_sim_backend - test_sim_backend.py::TestSimBackend::test_local_mesh_setup'`, (3) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/hyperactor:hyperactor-unittest -- --exact 'monarch/hyperactor:hyperactor-unittest - clock::tests::test_sim_timeout'` in all cases i *think* the tests themselves are passing and the undeliverable messages are actually encountered during teardown. Differential Revision: D78191921
1 parent 54471df commit 50e0e0a

File tree

4 files changed

+46
-16
lines changed

4 files changed

+46
-16
lines changed

hyperactor/src/clock.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,25 @@ use std::error::Error;
1212
use std::fmt;
1313
use std::sync::LazyLock;
1414
use std::sync::Mutex;
15+
use std::sync::OnceLock;
1516
use std::time::SystemTime;
1617

1718
use futures::pin_mut;
1819
use hyperactor_telemetry::TelemetryClock;
1920
use serde::Deserialize;
2021
use serde::Serialize;
2122

23+
use crate::ActorId;
2224
use crate::Mailbox;
2325
use crate::channel::ChannelAddr;
26+
use crate::data::Named;
2427
use crate::id;
28+
use crate::mailbox::DeliveryError;
29+
use crate::mailbox::MailboxSender;
30+
use crate::mailbox::MessageEnvelope;
31+
use crate::mailbox::Undeliverable;
32+
use crate::mailbox::UndeliverableMailboxSender;
33+
use crate::mailbox::unused_return_handle;
2534
use crate::simnet::SleepEvent;
2635
use crate::simnet::simnet_handle;
2736

@@ -175,6 +184,27 @@ impl ClockKind {
175184
}
176185
}
177186

187+
// TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
188+
// from upstack and handle undeliverable messages properly.
189+
fn simclock_mailbox() -> &'static Mailbox {
190+
static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
191+
SIMCLOCK_MAILBOX.get_or_init(|| {
192+
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
193+
let (undeliverable_messages, mut rx) =
194+
mailbox.open_port::<Undeliverable<MessageEnvelope>>();
195+
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
196+
tokio::spawn(async move {
197+
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
198+
envelope.try_set_error(DeliveryError::BrokenLink(
199+
"message returned to undeliverable port".to_string(),
200+
));
201+
UndeliverableMailboxSender.post(envelope, unused_return_handle());
202+
}
203+
});
204+
mailbox
205+
})
206+
}
207+
178208
/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
179209
/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
180210
/// time to the wakeup time and use the transmitter to wake up this green thread
@@ -184,7 +214,7 @@ pub struct SimClock;
184214
impl Clock for SimClock {
185215
/// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
186216
async fn sleep(&self, duration: tokio::time::Duration) {
187-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
217+
let mailbox = simclock_mailbox().clone();
188218
let (tx, rx) = mailbox.open_once_port::<()>();
189219

190220
simnet_handle()
@@ -199,7 +229,7 @@ impl Clock for SimClock {
199229
}
200230

201231
async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
202-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
232+
let mailbox = simclock_mailbox().clone();
203233
let (tx, rx) = mailbox.open_once_port::<()>();
204234

205235
simnet_handle()
@@ -234,7 +264,7 @@ impl Clock for SimClock {
234264
where
235265
F: std::future::Future<Output = T>,
236266
{
237-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
267+
let mailbox = simclock_mailbox().clone();
238268
let (tx, deadline_rx) = mailbox.open_once_port::<()>();
239269

240270
simnet_handle()

hyperactor/src/mailbox.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub use undeliverable::Undeliverable;
128128
pub use undeliverable::UndeliverableMessageError;
129129
pub use undeliverable::monitored_return_handle; // TODO: Audit
130130
pub use undeliverable::supervise_undeliverable_messages;
131+
pub use undeliverable::unused_return_handle;
131132
/// For [`MailboxAdminMessage`], a message type for mailbox administration.
132133
pub mod mailbox_admin_message;
133134
pub use mailbox_admin_message::MailboxAdminMessage;
@@ -1310,7 +1311,7 @@ impl cap::sealed::CanSend for Mailbox {
13101311
.get_or_init(DashSet::new)
13111312
.insert(actor_id.clone())
13121313
{
1313-
let bt = std::backtrace::Backtrace::capture();
1314+
let bt = std::backtrace::Backtrace::force_capture();
13141315
tracing::warn!(
13151316
actor_id = ?actor_id,
13161317
backtrace = ?bt,
@@ -2833,8 +2834,7 @@ mod tests {
28332834
envelope.try_set_error(DeliveryError::BrokenLink(
28342835
"returned in unit test".to_string(),
28352836
));
2836-
UndeliverableMailboxSender
2837-
.post(envelope, /*unused */ monitored_return_handle().clone());
2837+
UndeliverableMailboxSender.post(envelope, unused_return_handle().clone());
28382838
}
28392839
});
28402840
drop(return_handle);

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,12 @@ static MONITORED_RETURN_HANDLE: OnceLock<PortHandle<Undeliverable<MessageEnvelop
5353
pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
5454
let return_handle = MONITORED_RETURN_HANDLE.get_or_init(|| {
5555
let (return_handle, mut rx) = new_undeliverable_port();
56-
// Don't reuse `return_handle` for `h`: else it will never get
57-
// dropped and the task will never return.
58-
let (h, _) = new_undeliverable_port();
5956
crate::init::get_runtime().spawn(async move {
6057
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
6158
envelope.try_set_error(DeliveryError::BrokenLink(
6259
"message returned to undeliverable port".to_string(),
6360
));
64-
super::UndeliverableMailboxSender.post(envelope, /*unused */ h.clone());
61+
super::UndeliverableMailboxSender.post(envelope, unused_return_handle());
6562
}
6663
});
6764
return_handle
@@ -70,14 +67,19 @@ pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
7067
return_handle.clone()
7168
}
7269

70+
/// For when a return handle is required but we know it won't be used.
71+
pub fn unused_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
72+
panic!("unused_return_handle was called: this indicates a logic error; the return handle was unexpectedly used")
73+
}
74+
7375
/// Returns a message envelope to its original sender.
7476
pub(crate) fn return_undeliverable(
7577
return_handle: PortHandle<Undeliverable<MessageEnvelope>>,
7678
envelope: MessageEnvelope,
7779
) {
7880
let envelope_copy = envelope.clone();
7981
if (return_handle.send(Undeliverable(envelope))).is_err() {
80-
UndeliverableMailboxSender.post(envelope_copy, /*unsued*/ return_handle)
82+
UndeliverableMailboxSender.post(envelope_copy, unused_return_handle())
8183
}
8284
}
8385

@@ -149,8 +151,7 @@ pub fn supervise_undeliverable_messages(
149151
))
150152
.is_err()
151153
{
152-
UndeliverableMailboxSender
153-
.post(envelope.clone(), /*unused*/ monitored_return_handle())
154+
UndeliverableMailboxSender.post(envelope.clone(), unused_return_handle())
154155
}
155156
}
156157
});

hyperactor_mesh/src/comm.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use hyperactor::mailbox::MailboxSender;
3131
use hyperactor::mailbox::Undeliverable;
3232
use hyperactor::mailbox::UndeliverableMailboxSender;
3333
use hyperactor::mailbox::UndeliverableMessageError;
34-
use hyperactor::mailbox::monitored_return_handle;
34+
use hyperactor::mailbox::unused_return_handle;
3535
use hyperactor::reference::UnboundPort;
3636
use ndslice::selection::routing::RoutingFrame;
3737
use serde::Deserialize;
@@ -203,8 +203,7 @@ impl Actor for CommActor {
203203
}
204204

205205
// 3. A return of an undeliverable message was itself returned.
206-
UndeliverableMailboxSender
207-
.post(message_envelope, /*unused */ monitored_return_handle());
206+
UndeliverableMailboxSender.post(message_envelope, unused_return_handle());
208207
Ok(())
209208
}
210209
}

0 commit comments

Comments
 (0)