Skip to content

Commit a7b299b

Browse files
clock: bind undeliverable port in sim clock's mailbox (#517)
Summary: Pull Request resolved: #517 (1) pesky warnings in python about using a mailbox for sending without binding the undeliverable port (see D78031232) turned out to be due to the use of `SimClock`. hack in a "fix" that ~~terminates the process~~ logs the event (the idea of terminating the process didn't work out - test failures[*] ) if an undeliverable message is encountered in this context. we'll definitely want to revisit this however (kaiyuan-li, thomasywang) and fix it more appropriately (maybe by threading through a mailbox from upstack that has the undeliverable port bound and bounces undeliverables into supervision events perhaps?). --- [*] the failed tests being, (1) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/controller:controller-unittest -- --exact 'monarch/controller:controller-unittest - tests::test_sim_supervision_failure'`, (2)`buck test 'fbcode//mode/opt' fbcode//monarch/python/tests:test_sim_backend -- --exact 'monarch/python/tests:test_sim_backend - test_sim_backend.py::TestSimBackend::test_local_mesh_setup'`, (3) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/hyperactor:hyperactor-unittest -- --exact 'monarch/hyperactor:hyperactor-unittest - clock::tests::test_sim_timeout'` in all cases i *think* the tests themselves are passing and the undeliverable messages are actually encountered during teardown. Reviewed By: highker Differential Revision: D78191921 fbshipit-source-id: e498e21fe7be0d447ee20b29d180c1a4525bd93f
1 parent dd496a4 commit a7b299b

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

hyperactor/src/clock.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use std::error::Error;
1212
use std::fmt;
1313
use std::sync::LazyLock;
1414
use std::sync::Mutex;
15+
use std::sync::OnceLock;
1516
use std::time::SystemTime;
1617

1718
use futures::pin_mut;
@@ -21,7 +22,14 @@ use serde::Serialize;
2122

2223
use crate::Mailbox;
2324
use crate::channel::ChannelAddr;
25+
use crate::data::Named;
2426
use crate::id;
27+
use crate::mailbox::DeliveryError;
28+
use crate::mailbox::MailboxSender;
29+
use crate::mailbox::MessageEnvelope;
30+
use crate::mailbox::Undeliverable;
31+
use crate::mailbox::UndeliverableMailboxSender;
32+
use crate::mailbox::monitored_return_handle;
2533
use crate::simnet::SleepEvent;
2634
use crate::simnet::simnet_handle;
2735

@@ -184,7 +192,7 @@ pub struct SimClock;
184192
impl Clock for SimClock {
185193
/// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
186194
async fn sleep(&self, duration: tokio::time::Duration) {
187-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
195+
let mailbox = SimClock::mailbox().clone();
188196
let (tx, rx) = mailbox.open_once_port::<()>();
189197

190198
simnet_handle()
@@ -199,7 +207,7 @@ impl Clock for SimClock {
199207
}
200208

201209
async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
202-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
210+
let mailbox = SimClock::mailbox().clone();
203211
let (tx, rx) = mailbox.open_once_port::<()>();
204212

205213
simnet_handle()
@@ -234,7 +242,7 @@ impl Clock for SimClock {
234242
where
235243
F: std::future::Future<Output = T>,
236244
{
237-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
245+
let mailbox = SimClock::mailbox().clone();
238246
let (tx, deadline_rx) = mailbox.open_once_port::<()>();
239247

240248
simnet_handle()
@@ -259,6 +267,28 @@ impl Clock for SimClock {
259267
}
260268

261269
impl SimClock {
270+
// TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
271+
// from upstack and handle undeliverable messages properly.
272+
fn mailbox() -> &'static Mailbox {
273+
static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
274+
SIMCLOCK_MAILBOX.get_or_init(|| {
275+
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
276+
let (undeliverable_messages, mut rx) =
277+
mailbox.open_port::<Undeliverable<MessageEnvelope>>();
278+
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
279+
tokio::spawn(async move {
280+
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
281+
envelope.try_set_error(DeliveryError::BrokenLink(
282+
"message returned to undeliverable port".to_string(),
283+
));
284+
UndeliverableMailboxSender
285+
.post(envelope, /*unused */ monitored_return_handle())
286+
}
287+
});
288+
mailbox
289+
})
290+
}
291+
262292
/// Advance the sumulator's time to the specified instant
263293
pub fn advance_to(&self, millis: u64) {
264294
let mut guard = SIM_TIME.now.lock().unwrap();

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1310,7 +1310,7 @@ impl cap::sealed::CanSend for Mailbox {
13101310
.get_or_init(DashSet::new)
13111311
.insert(actor_id.clone())
13121312
{
1313-
let bt = std::backtrace::Backtrace::capture();
1313+
let bt = std::backtrace::Backtrace::force_capture();
13141314
tracing::warn!(
13151315
actor_id = ?actor_id,
13161316
backtrace = ?bt,
@@ -2834,7 +2834,7 @@ mod tests {
28342834
"returned in unit test".to_string(),
28352835
));
28362836
UndeliverableMailboxSender
2837-
.post(envelope, /*unused */ monitored_return_handle().clone());
2837+
.post(envelope, /*unused */ monitored_return_handle());
28382838
}
28392839
});
28402840
drop(return_handle);

0 commit comments

Comments
 (0)