Skip to content

Commit bd5bbb4

Browse files
: clock: bind undeliverable port in sim clock's mailbox
Summary: pesky warnings in python about using a mailbox for sending without binding the undeliverable port (see D78031232) turned out to be due to the use of `SimClock`. hack in a "fix" that terminates the process if an undeliverable message is encountered in this context. we'll definitely want to revisit this however and fix it more appropriately (likely by threading through a mailbox from upstack that has the undeliverable port bound and bounces undeliverables into supervision events perhaps). Differential Revision: D78191921
1 parent d0acc65 commit bd5bbb4

File tree

2 files changed

+33
-4
lines changed

2 files changed

+33
-4
lines changed

hyperactor/src/clock.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,21 @@ use std::error::Error;
1212
use std::fmt;
1313
use std::sync::LazyLock;
1414
use std::sync::Mutex;
15+
use std::sync::OnceLock;
1516
use std::time::SystemTime;
1617

1718
use futures::pin_mut;
1819
use hyperactor_telemetry::TelemetryClock;
1920
use serde::Deserialize;
2021
use serde::Serialize;
2122

23+
use crate::ActorId;
2224
use crate::Mailbox;
2325
use crate::channel::ChannelAddr;
26+
use crate::data::Named;
2427
use crate::id;
28+
use crate::mailbox::MessageEnvelope;
29+
use crate::mailbox::Undeliverable;
2530
use crate::simnet::SleepEvent;
2631
use crate::simnet::simnet_handle;
2732

@@ -175,6 +180,30 @@ impl ClockKind {
175180
}
176181
}
177182

183+
// TODO (SF, 2025-07-11): Remove this global, thread through a mailbox
184+
// from upstack and handle undeliverable messages properly.
185+
static SIMCLOCK_MAILBOX: OnceLock<Mailbox> = OnceLock::new();
186+
fn simclock_mailbox_id() -> ActorId {
187+
id!(proc[0].proc).clone()
188+
}
189+
fn simclock_mailbox() -> &'static Mailbox {
190+
SIMCLOCK_MAILBOX.get_or_init(|| {
191+
let mailbox = Mailbox::new_detached(simclock_mailbox_id());
192+
let (undeliverable_messages, mut rx) =
193+
mailbox.open_port::<Undeliverable<MessageEnvelope>>();
194+
undeliverable_messages.bind_to(Undeliverable::<MessageEnvelope>::port());
195+
tokio::spawn(async move {
196+
if let Ok(Undeliverable(mut envelope)) = rx.recv().await {
197+
tracing::error!(
198+
"simclock mailbox got an undeliverable message; calling `abort()`!"
199+
);
200+
std::process::abort();
201+
}
202+
});
203+
mailbox
204+
})
205+
}
206+
178207
/// Clock to be used in simulator runs that allows the simnet to create a scheduled event for.
179208
/// When the wakeup event becomes the next earliest scheduled event, the simnet will advance it's
180209
/// time to the wakeup time and use the transmitter to wake up this green thread
@@ -184,7 +213,7 @@ pub struct SimClock;
184213
impl Clock for SimClock {
185214
/// Tell the simnet to wake up this green thread after the specified duration has pass on the simnet
186215
async fn sleep(&self, duration: tokio::time::Duration) {
187-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
216+
let mailbox = simclock_mailbox().clone();
188217
let (tx, rx) = mailbox.open_once_port::<()>();
189218

190219
simnet_handle()
@@ -199,7 +228,7 @@ impl Clock for SimClock {
199228
}
200229

201230
async fn non_advancing_sleep(&self, duration: tokio::time::Duration) {
202-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
231+
let mailbox = simclock_mailbox().clone();
203232
let (tx, rx) = mailbox.open_once_port::<()>();
204233

205234
simnet_handle()
@@ -234,7 +263,7 @@ impl Clock for SimClock {
234263
where
235264
F: std::future::Future<Output = T>,
236265
{
237-
let mailbox = Mailbox::new_detached(id!(proc[0].proc).clone());
266+
let mailbox = simclock_mailbox().clone();
238267
let (tx, deadline_rx) = mailbox.open_once_port::<()>();
239268

240269
simnet_handle()

hyperactor/src/mailbox.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1310,7 +1310,7 @@ impl cap::sealed::CanSend for Mailbox {
13101310
.get_or_init(DashSet::new)
13111311
.insert(actor_id.clone())
13121312
{
1313-
let bt = std::backtrace::Backtrace::capture();
1313+
let bt = std::backtrace::Backtrace::force_capture();
13141314
tracing::warn!(
13151315
actor_id = ?actor_id,
13161316
backtrace = ?bt,

0 commit comments

Comments
 (0)