Skip to content

Commit fd25d79

Browse files
suofacebook-github-bot
authored andcommitted
get rid of hyperactor runtime, attempt 2 (#406)
Summary: Pull Request resolved: #406 Based on the discussion in D77422927, a new attempt: I still want to avoid creating an extra runtime in hyperactor, which I think is a Pretty Bad thing to do. So here, I have `hyperactor::initialize` take a runtime handle provided by the caller. This ensures the user has full control over the runtime, but that we still have a reference to a runtime handy in hyperactor when we need one from a sync context. ghstack-source-id: 293872756 exported-using-ghexport Reviewed By: mariusae Differential Revision: D77632354 fbshipit-source-id: 472333d1544d11e1ee7b02b40d17d0bf3b5c785c
1 parent a6e0863 commit fd25d79

File tree

15 files changed

+53
-46
lines changed

15 files changed

+53
-46
lines changed

controller/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async fn serve(inp: impl AsyncBufRead + Unpin, mut outp: impl AsyncWrite + Unpin
6969

7070
#[tokio::main]
7171
async fn main() -> Result<()> {
72-
hyperactor::initialize();
72+
hyperactor::initialize_with_current_runtime();
7373

7474
match BootstrapCommand::try_parse()? {
7575
BootstrapCommand::Run(cmd) => controller::bootstrap::run(cmd)?.await??,

hyper/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ async fn main() -> Result<(), anyhow::Error> {
6363

6464
async fn run() -> Result<(), anyhow::Error> {
6565
let args = Cli::parse();
66-
hyperactor::initialize();
66+
hyperactor::initialize_with_current_runtime();
6767

6868
match args.command {
6969
Command::Serve(command) => Ok(command.run().await?),

hyperactor/src/channel/net.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ impl<M: RemoteMessage> NetTx<M> {
190190
dest,
191191
status,
192192
};
193-
crate::init::RUNTIME.spawn(Self::run(link, receiver, notify));
193+
crate::init::get_runtime().spawn(Self::run(link, receiver, notify));
194194
tx
195195
}
196196

hyperactor/src/init.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,45 @@
66
* LICENSE file in the root directory of this source tree.
77
*/
88

9-
//! Utilities for launching hyperactor processes.
10-
11-
use std::sync::LazyLock;
129
use std::sync::OnceLock;
1310

1411
use crate::clock::ClockKind;
1512
use crate::panic_handler;
1613

17-
/// A global runtime used in binding async and sync code. Do not use for executing long running or
14+
/// A global runtime handle used for spawning tasks. Do not use for executing long running or
1815
/// compute intensive tasks.
19-
pub(crate) static RUNTIME: LazyLock<tokio::runtime::Runtime> =
20-
LazyLock::new(|| tokio::runtime::Runtime::new().expect("failed to create global runtime"));
16+
static RUNTIME: OnceLock<tokio::runtime::Handle> = OnceLock::new();
17+
18+
/// Get a handle to the global runtime.
19+
///
20+
/// Panics if the runtime has not been initialized *and* the caller is not in an
21+
/// async context.
22+
pub(crate) fn get_runtime() -> tokio::runtime::Handle {
23+
match RUNTIME.get() {
24+
Some(handle) => handle.clone(),
25+
None => tokio::runtime::Handle::current(),
26+
}
27+
}
2128

2229
/// Initialize the Hyperactor runtime. Specifically:
2330
/// - Set up panic handling, so that we get consistent panic stack traces in Actors.
2431
/// - Initialize logging defaults.
25-
pub fn initialize() {
26-
static INITIALIZED: OnceLock<()> = OnceLock::new();
27-
INITIALIZED.get_or_init(|| {
28-
panic_handler::set_panic_hook();
29-
hyperactor_telemetry::initialize_logging(ClockKind::default());
30-
#[cfg(target_os = "linux")]
31-
linux::initialize();
32-
});
32+
/// - Store the provided tokio runtime handle for use by the hyperactor system.
33+
pub fn initialize(handle: tokio::runtime::Handle) {
34+
RUNTIME
35+
.set(handle)
36+
.expect("hyperactor::initialize must only be called once");
37+
38+
panic_handler::set_panic_hook();
39+
hyperactor_telemetry::initialize_logging(ClockKind::default());
40+
#[cfg(target_os = "linux")]
41+
linux::initialize();
42+
}
43+
44+
/// Initialize the Hyperactor runtime using the current tokio runtime handle.
45+
pub fn initialize_with_current_runtime() {
46+
let handle = tokio::runtime::Handle::current();
47+
initialize(handle);
3348
}
3449

3550
#[cfg(target_os = "linux")]

hyperactor/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ pub use hyperactor_telemetry::key_value;
135135
pub use hyperactor_telemetry::kv_pairs;
136136
#[doc(inline)]
137137
pub use init::initialize;
138+
#[doc(inline)]
139+
pub use init::initialize_with_current_runtime;
138140
// Re-exported to make this available to callers of the `register!` macro.
139141
#[doc(hidden)]
140142
pub use inventory::submit;

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -684,7 +684,7 @@ impl<T: Message> Buffer<T> {
684684
{
685685
let (queue, mut next) = mpsc::unbounded_channel();
686686
let (last_processed, processed) = watch::channel(0);
687-
crate::init::RUNTIME.spawn(async move {
687+
crate::init::get_runtime().spawn(async move {
688688
let mut seq = 0;
689689
while let Some((msg, return_handle)) = next.recv().await {
690690
process(msg, return_handle).await;
@@ -931,7 +931,7 @@ impl MailboxClient {
931931
cancel_token: CancellationToken,
932932
addr: ChannelAddr,
933933
) {
934-
crate::init::RUNTIME.spawn(async move {
934+
crate::init::get_runtime().spawn(async move {
935935
loop {
936936
tokio::select! {
937937
changed = rx.changed() => {

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub fn monitored_return_handle() -> PortHandle<Undeliverable<MessageEnvelope>> {
6363
// Don't reuse `return_handle` for `h`: else it will never get
6464
// dropped and the task will never return.
6565
let (h, _) = new_undeliverable_port();
66-
crate::init::RUNTIME.spawn(async move {
66+
crate::init::get_runtime().spawn(async move {
6767
while let Ok(Undeliverable(mut envelope)) = rx.recv().await {
6868
envelope.try_set_error(DeliveryError::BrokenLink(
6969
"message returned to undeliverable port".to_string(),

hyperactor_mesh/src/bootstrap.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,6 @@ pub(crate) enum Allocator2Process {
7878
/// Use [`bootstrap_or_die`] to implement this behavior directly.
7979
pub async fn bootstrap() -> anyhow::Error {
8080
pub async fn go() -> Result<(), anyhow::Error> {
81-
// This ensures, among other things, that we correctly exit child processes
82-
// when the parent dies.
83-
hyperactor::initialize();
84-
8581
let bootstrap_addr: ChannelAddr = std::env::var(BOOTSTRAP_ADDR_ENV)
8682
.map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_ADDR_ENV, err))?
8783
.parse()?;

monarch_extension/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,9 @@ fn get_or_add_new_module<'py>(
6464
#[pymodule]
6565
#[pyo3(name = "_rust_bindings")]
6666
pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
67-
::hyperactor::initialize();
6867
monarch_hyperactor::runtime::initialize(module.py())?;
68+
let runtime = monarch_hyperactor::runtime::get_tokio_runtime();
69+
::hyperactor::initialize(runtime.handle().clone());
6970

7071
monarch_hyperactor::shape::register_python_bindings(&get_or_add_new_module(
7172
module,

monarch_extension/src/tensor_worker.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use hyperactor::data::Serialized;
2626
use hyperactor::reference::ActorId;
2727
use monarch_hyperactor::ndslice::PySlice;
2828
use monarch_hyperactor::proc::PyActorId;
29+
use monarch_hyperactor::runtime::get_tokio_runtime;
2930
use monarch_messages::wire_value::WireValue;
3031
use monarch_messages::wire_value::func_call_args_to_wire_values;
3132
use monarch_messages::worker::*;
@@ -1386,22 +1387,17 @@ fn worker_main(py: Python<'_>) -> PyResult<()> {
13861387
BinaryArgs::Pipe => bootstrap_pipe(),
13871388
BinaryArgs::WorkerServer { rd, wr } => {
13881389
worker_server(
1390+
get_tokio_runtime(),
13891391
// SAFETY: Raw FD passed in from parent.
13901392
BufReader::new(File::from(unsafe { OwnedFd::from_raw_fd(rd) })),
13911393
// SAFETY: Raw FD passed in from parent.
13921394
File::from(unsafe { OwnedFd::from_raw_fd(wr) }),
13931395
)
13941396
}
1395-
BinaryArgs::Worker(args) => {
1396-
let rt = tokio::runtime::Builder::new_multi_thread()
1397-
.enable_all()
1398-
.build()?;
1399-
rt.block_on(async move {
1400-
hyperactor::initialize();
1401-
let _ = bootstrap_worker_proc(args).await?.await;
1402-
Ok(())
1403-
})
1404-
}
1397+
BinaryArgs::Worker(args) => get_tokio_runtime().block_on(async move {
1398+
let _ = bootstrap_worker_proc(args).await?.await;
1399+
Ok(())
1400+
}),
14051401
}
14061402
.map_err(|err: anyhow::Error| PyRuntimeError::new_err(err.to_string()))
14071403
})

0 commit comments

Comments
 (0)