From 1fd4a55e45d4e50c248553f22c52b8ba2dc9f63d Mon Sep 17 00:00:00 2001 From: Shayne Fletcher Date: Sat, 12 Jul 2025 10:03:08 -0700 Subject: [PATCH] : clock: bind undeliverable port in sim clock's mailbox (#517) Summary: (1) pesky warnings in python about using a mailbox for sending without binding the undeliverable port (see D78031232) turned out to be due to the use of `SimClock`. hack in a "fix" that ~~terminates the process~~ logs the event (the idea of terminating the process didn't work out - test failures[*] ) if an undeliverable message is encountered in this context. we'll definitely want to revisit this however (kaiyuan-li, thomasywang) and fix it more appropriately (maybe by threading through a mailbox from upstack that has the undeliverable port bound and bounces undeliverables into supervision events perhaps?). --- [*] the failed tests being, (1) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/controller:controller-unittest -- --exact 'monarch/controller:controller-unittest - tests::test_sim_supervision_failure'`, (2)`buck test 'fbcode//mode/opt' fbcode//monarch/python/tests:test_sim_backend -- --exact 'monarch/python/tests:test_sim_backend - test_sim_backend.py::TestSimBackend::test_local_mesh_setup'`, (3) `buck test 'fbcode//mode/dev-nosan' fbcode//monarch/hyperactor:hyperactor-unittest -- --exact 'monarch/hyperactor:hyperactor-unittest - clock::tests::test_sim_timeout'` in all cases i *think* the tests themselves are passing and the undeliverable messages are actually encountered during teardown. Differential Revision: D78191921 --- hyperactor/src/clock.rs | 37 ++++++++++++++++++++++++++++++++++--- hyperactor/src/mailbox.rs | 4 ++-- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/hyperactor/src/clock.rs b/hyperactor/src/clock.rs index d9863e01..0758a45b 100644 --- a/hyperactor/src/clock.rs +++ b/hyperactor/src/clock.rs @@ -12,6 +12,7 @@ use std::error::Error; use std::fmt; use std::sync::LazyLock; use std::sync::Mutex; +use std::sync::OnceLock; use std::time::SystemTime; use futures::pin_mut; @@ -19,9 +20,17 @@ use hyperactor_telemetry::TelemetryClock; use serde::Deserialize; use serde::Serialize; +use crate::ActorId; use crate::Mailbox; use crate::channel::ChannelAddr; +use crate::data::Named; use crate::id; +use crate::mailbox::DeliveryError; +use crate::mailbox::MailboxSender; +use crate::mailbox::MessageEnvelope; +use crate::mailbox::Undeliverable; +use crate::mailbox::UndeliverableMailboxSender; +use crate::mailbox::monitored_return_handle; use crate::simnet::SleepEvent; use crate::simnet::simnet_handle; @@ -175,6 +184,28 @@ impl ClockKind { } } +// TODO (SF, 2025-07-11): Remove this global, thread through a mailbox +// from upstack and handle undeliverable messages properly. +fn simclock_mailbox() -> &'static Mailbox { + static SIMCLOCK_MAILBOX: OnceLock = OnceLock::new(); + SIMCLOCK_MAILBOX.get_or_init(|| { + let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone()); + let (undeliverable_messages, mut rx) = + mailbox.open_port::>(); + undeliverable_messages.bind_to(Undeliverable::::port()); + tokio::spawn(async move { + while let Ok(Undeliverable(mut envelope)) = rx.recv().await { + envelope.try_set_error(DeliveryError::BrokenLink( + "message returned to undeliverable port".to_string(), + )); + UndeliverableMailboxSender + .post(envelope, /*unused */ monitored_return_handle()) + } + }); + mailbox + }) +} + /// Clock to be used in simulator runs that allows the simnet to create a scheduled event for. /// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's /// time to the wakeup time and use the transmitter to wake up this green thread @@ -184,7 +215,7 @@ pub struct SimClock; impl Clock for SimClock { /// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet async fn sleep(&self, duration: tokio::time::Duration) { - let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone()); + let mailbox = simclock_mailbox().clone(); let (tx, rx) = mailbox.open_once_port::<()>(); simnet_handle() @@ -199,7 +230,7 @@ impl Clock for SimClock { } async fn non_advancing_sleep(&self, duration: tokio::time::Duration) { - let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone()); + let mailbox = simclock_mailbox().clone(); let (tx, rx) = mailbox.open_once_port::<()>(); simnet_handle() @@ -234,7 +265,7 @@ impl Clock for SimClock { where F: std::future::Future, { - let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone()); + let mailbox = simclock_mailbox().clone(); let (tx, deadline_rx) = mailbox.open_once_port::<()>(); simnet_handle() diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index d5207342..24bac8a7 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -1310,7 +1310,7 @@ impl cap::sealed::CanSend for Mailbox { .get_or_init(DashSet::new) .insert(actor_id.clone()) { - let bt = std::backtrace::Backtrace::capture(); + let bt = std::backtrace::Backtrace::force_capture(); tracing::warn!( actor_id = ?actor_id, backtrace = ?bt, @@ -2834,7 +2834,7 @@ mod tests { "returned in unit test".to_string(), )); UndeliverableMailboxSender - .post(envelope, /*unused */ monitored_return_handle().clone()); + .post(envelope, /*unused */ monitored_return_handle()); } }); drop(return_handle);