Skip to content

Commit 3f02518

Browse files
: clock: bind undeliverable port in sim clock's mailbox (#517)
Summary: (1) pesky warnings in python about using a mailbox for sending without binding the undeliverable port (see D78031232) turned out to be due to the use of `SimClock`. hack in a "fix" that ~~terminates the process~~ logs the event (the idea of terminating the process didn't work out - test failures[*] ) if an undeliverable message is encountered in this context. we'll definitely want to revisit this however (kaiyuan-li, thomasywang) and fix it more appropriately (maybe by threading through a mailbox from upstack that has the undeliverable port bound and bounces undeliverables into supervision events perhaps?). --- [*] the failed tests being, (1) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/controller:controller-unittest -- --exact 'monarch/controller:controller-unittest - tests::test_sim_supervision_failure'`, (2)`buck test 'fbcode//mode/opt' fbcode//monarch/python/tests:test_sim_backend -- --exact 'monarch/python/tests:test_sim_backend - test_sim_backend.py::TestSimBackend::test_local_mesh_setup'`, (3) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/hyperactor:hyperactor-unittest -- --exact 'monarch/hyperactor:hyperactor-unittest - clock::tests::test_sim_timeout'` in all cases i *think* the tests themselves are passing and the undeliverable messages are actually encountered during teardown. Differential Revision: D78191921
1 parent 54471df commit 3f02518

File tree

2 files changed

+36
-5
lines changed

2 files changed

+36
-5
lines changed

hyperactor/src/clock.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,25 @@ use std::error::Error;
1212
use std::fmt;
1313
use std::sync::LazyLock;
1414
use std::sync::Mutex;
15+
use std::sync::OnceLock;
1516
use std::time::SystemTime;
1617

1718
use futures::pin_mut;
1819
use hyperactor_telemetry::TelemetryClock;
1920
use serde::Deserialize;
2021
use serde::Serialize;
2122

23+
use crate::ActorId;
2224
use crate::Mailbox;
2325
use crate::channel::ChannelAddr;
26+
use crate::data::Named;
2427
use crate::id;
28+
use crate::mailbox::DeliveryError;
29+
use crate::mailbox::MailboxSender;
30+
use crate::mailbox::MessageEnvelope;
31+
use crate::mailbox::Undeliverable;
32+
use crate::mailbox::UndeliverableMailboxSender;
33+
use crate::mailbox::monitored_return_handle;
2534
use crate::simnet::SleepEvent;
2635
use crate::simnet::simnet_handle;
2736

@@ -175,6 +184,28 @@ impl ClockKind {
175184
}
176185
}
177186

187+
// TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
188+
// from upstack and handle undeliverable messages properly.
189+
fn simclock_mailbox() -> &'static Mailbox {
190+
static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
191+
SIMCLOCK_MAILBOX.get_or_init(|| {
192+
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
193+
let (undeliverable_messages, mut rx) =
194+
mailbox.open_port::<Undeliverable<MessageEnvelope>>();
195+
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
196+
tokio::spawn(async move {
197+
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
198+
envelope.try_set_error(DeliveryError::BrokenLink(
199+
"message returned to undeliverable port".to_string(),
200+
));
201+
UndeliverableMailboxSender
202+
.post(envelope, /*unused */ monitored_return_handle())
203+
}
204+
});
205+
mailbox
206+
})
207+
}
208+
178209
/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
179210
/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
180211
/// time to the wakeup time and use the transmitter to wake up this green thread
@@ -184,7 +215,7 @@ pub struct SimClock;
184215
impl Clock for SimClock {
185216
/// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
186217
async fn sleep(&self, duration: tokio::time::Duration) {
187-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
218+
let mailbox = simclock_mailbox().clone();
188219
let (tx, rx) = mailbox.open_once_port::<()>();
189220

190221
simnet_handle()
@@ -199,7 +230,7 @@ impl Clock for SimClock {
199230
}
200231

201232
async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
202-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
233+
let mailbox = simclock_mailbox().clone();
203234
let (tx, rx) = mailbox.open_once_port::<()>();
204235

205236
simnet_handle()
@@ -234,7 +265,7 @@ impl Clock for SimClock {
234265
where
235266
F: std::future::Future<Output = T>,
236267
{
237-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
268+
let mailbox = simclock_mailbox().clone();
238269
let (tx, deadline_rx) = mailbox.open_once_port::<()>();
239270

240271
simnet_handle()

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,7 +1310,7 @@ impl cap::sealed::CanSend for Mailbox {
13101310
.get_or_init(DashSet::new)
13111311
.insert(actor_id.clone())
13121312
{
1313-
let bt = std::backtrace::Backtrace::capture();
1313+
let bt = std::backtrace::Backtrace::force_capture();
13141314
tracing::warn!(
13151315
actor_id = ?actor_id,
13161316
backtrace = ?bt,
@@ -2834,7 +2834,7 @@ mod tests {
28342834
"returned in unit test".to_string(),
28352835
));
28362836
UndeliverableMailboxSender
2837-
.post(envelope, /*unused */ monitored_return_handle().clone());
2837+
.post(envelope, /*unused */ monitored_return_handle());
28382838
}
28392839
});
28402840
drop(return_handle);

0 commit comments

Comments
 (0)