Skip to content

Commit 46e855f

Browse files
thomasywangfacebook-github-bot
authored andcommitted
local_proc_mesh with sim channels (#475)
Summary: Pull Request resolved: #475 We want to instantiate local_proc_meshes that use ChannelTransport::SIm instead of ChannelTranport::Local so that the simulator can intercept and control the delivery of messages To preserve to `allocate()` interface so that we can reuse existing test generation macros we will create a wrapper class for this around `LocalAlloc` Differential Revision: D77941640
1 parent 78231f1 commit 46e855f

File tree

10 files changed

+268
-4
lines changed

10 files changed

+268
-4
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -997,4 +997,10 @@ mod tests {
997997
buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap()
998998
)));
999999
}
1000+
1001+
mod sim {
1002+
use crate::alloc::sim::SimAllocator;
1003+
1004+
actor_mesh_test_suite!(SimAllocator::new_and_start_simnet());
1005+
}
10001006
}

hyperactor_mesh/src/alloc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod local;
1313
pub(crate) mod logtailer;
1414
pub mod process;
1515
pub mod remoteprocess;
16+
pub mod sim;
1617

1718
use std::collections::HashMap;
1819
use std::fmt;

hyperactor_mesh/src/alloc/local.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,15 @@ pub struct LocalAlloc {
7575
todo_rx: mpsc::UnboundedReceiver<Action>,
7676
stopped: bool,
7777
failed: bool,
78+
transport: ChannelTransport,
7879
}
7980

8081
impl LocalAlloc {
8182
fn new(spec: AllocSpec) -> Self {
83+
Self::new_with_transport(spec, ChannelTransport::Local)
84+
}
85+
86+
pub(crate) fn new_with_transport(spec: AllocSpec, transport: ChannelTransport) -> Self {
8287
let name = ShortUuid::generate();
8388
let (todo_tx, todo_rx) = mpsc::unbounded_channel();
8489
for rank in 0..spec.shape.slice().len() {
@@ -94,6 +99,7 @@ impl LocalAlloc {
9499
todo_rx,
95100
stopped: false,
96101
failed: false,
102+
transport,
97103
}
98104
}
99105

@@ -123,7 +129,7 @@ impl LocalAlloc {
123129
&self.name
124130
}
125131

126-
fn size(&self) -> usize {
132+
pub(crate) fn size(&self) -> usize {
127133
self.spec.shape.slice().len()
128134
}
129135
}
@@ -249,7 +255,7 @@ impl Alloc for LocalAlloc {
249255
}
250256

251257
fn transport(&self) -> ChannelTransport {
252-
ChannelTransport::Local
258+
self.transport.clone()
253259
}
254260

255261
async fn stop(&mut self) -> Result<(), AllocatorError> {

hyperactor_mesh/src/alloc/sim.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! Support for allocating procs in the local process with simulated channels.
10+
11+
#![allow(dead_code)] // until it is used outside of testing
12+
13+
use async_trait::async_trait;
14+
use hyperactor::WorldId;
15+
use hyperactor::channel::ChannelAddr;
16+
use hyperactor::channel::ChannelTransport;
17+
use hyperactor::mailbox::MailboxServerHandle;
18+
use hyperactor::proc::Proc;
19+
use ndslice::Shape;
20+
21+
use super::ProcStopReason;
22+
use crate::alloc::Alloc;
23+
use crate::alloc::AllocSpec;
24+
use crate::alloc::Allocator;
25+
use crate::alloc::AllocatorError;
26+
use crate::alloc::LocalAlloc;
27+
use crate::alloc::ProcState;
28+
use crate::shortuuid::ShortUuid;
29+
30+
/// An allocator that runs procs in the local process with network traffic going through simulated channels.
31+
/// Other than transport, the underlying implementation is an inner LocalAlloc.
32+
pub struct SimAllocator;
33+
34+
#[async_trait]
35+
impl Allocator for SimAllocator {
36+
type Alloc = SimAlloc;
37+
38+
async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
39+
Ok(SimAlloc::new(spec))
40+
}
41+
}
42+
43+
impl SimAllocator {
44+
#[cfg(test)]
45+
pub(crate) fn new_and_start_simnet() -> Self {
46+
hyperactor::simnet::start();
47+
Self
48+
}
49+
}
50+
51+
struct SimProc {
52+
proc: Proc,
53+
addr: ChannelAddr,
54+
handle: MailboxServerHandle,
55+
}
56+
57+
/// A simulated allocation. It is a collection of procs that are running in the local process.
58+
pub struct SimAlloc {
59+
inner: LocalAlloc,
60+
}
61+
62+
impl SimAlloc {
63+
fn new(spec: AllocSpec) -> Self {
64+
Self {
65+
inner: LocalAlloc::new_with_transport(
66+
spec,
67+
ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
68+
),
69+
}
70+
}
71+
/// A chaos monkey that can be used to stop procs at random.
72+
pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
73+
self.inner.chaos_monkey()
74+
}
75+
76+
/// A function to shut down the alloc for testing purposes.
77+
pub(crate) fn stopper(&self) -> impl Fn() + 'static {
78+
self.inner.stopper()
79+
}
80+
81+
pub(crate) fn name(&self) -> &ShortUuid {
82+
self.inner.name()
83+
}
84+
85+
fn size(&self) -> usize {
86+
self.inner.size()
87+
}
88+
}
89+
90+
#[async_trait]
91+
impl Alloc for SimAlloc {
92+
async fn next(&mut self) -> Option<ProcState> {
93+
self.inner.next().await
94+
}
95+
96+
fn shape(&self) -> &Shape {
97+
self.inner.shape()
98+
}
99+
100+
fn world_id(&self) -> &WorldId {
101+
self.inner.world_id()
102+
}
103+
104+
fn transport(&self) -> ChannelTransport {
105+
self.inner.transport()
106+
}
107+
108+
async fn stop(&mut self) -> Result<(), AllocatorError> {
109+
self.inner.stop().await
110+
}
111+
}
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use super::*;
116+
117+
#[tokio::test]
118+
async fn test_allocator_basic() {
119+
hyperactor::simnet::start();
120+
crate::alloc::testing::test_allocator_basic(SimAllocator).await;
121+
}
122+
}

monarch_hyperactor/src/alloc.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use hyperactor_mesh::alloc::ProcessAllocator;
2727
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAlloc;
2828
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocHost;
2929
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocInitializer;
30+
use hyperactor_mesh::alloc::sim::SimAllocator;
3031
use pyo3::exceptions::PyRuntimeError;
3132
use pyo3::prelude::*;
3233
use tokio::process::Command;
@@ -81,6 +82,53 @@ impl PyLocalAllocator {
8182
}
8283
}
8384

85+
#[pyclass(
86+
name = "SimAllocatorBase",
87+
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
88+
subclass
89+
)]
90+
pub struct PySimAllocator;
91+
92+
#[pymethods]
93+
impl PySimAllocator {
94+
#[new]
95+
fn new() -> Self {
96+
PySimAllocator {}
97+
}
98+
99+
fn allocate_nonblocking<'py>(
100+
&self,
101+
py: Python<'py>,
102+
spec: &PyAllocSpec,
103+
) -> PyResult<Bound<'py, PyAny>> {
104+
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
105+
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
106+
// pretty easily.
107+
let spec = spec.inner.clone();
108+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
109+
SimAllocator
110+
.allocate(spec)
111+
.await
112+
.map(|inner| PyAlloc::new(Box::new(inner)))
113+
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
114+
})
115+
}
116+
117+
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
118+
// We could use Bound here, and acquire the GIL inside of
119+
// `signal_safe_block_on`, but it is rather awkward with the current
120+
// APIs, and we can anyway support Arc/Mutex pretty easily.
121+
let spec = spec.inner.clone();
122+
signal_safe_block_on(py, async move {
123+
SimAllocator
124+
.allocate(spec)
125+
.await
126+
.map(|inner| PyAlloc::new(Box::new(inner)))
127+
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
128+
})?
129+
}
130+
}
131+
84132
#[pyclass(
85133
name = "ProcessAllocatorBase",
86134
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
@@ -330,6 +378,7 @@ impl PyRemoteAllocator {
330378
pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
331379
hyperactor_mod.add_class::<PyProcessAllocator>()?;
332380
hyperactor_mod.add_class::<PyLocalAllocator>()?;
381+
hyperactor_mod.add_class::<PySimAllocator>()?;
333382
hyperactor_mod.add_class::<PyRemoteAllocator>()?;
334383

335384
Ok(())

python/monarch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@
113113
"timer": ("monarch.timer", "timer"),
114114
"ProcessAllocator": ("monarch._src.actor.allocator", "ProcessAllocator"),
115115
"LocalAllocator": ("monarch._src.actor.allocator", "LocalAllocator"),
116+
"SimAllocator": ("monarch._src_actor.allocator", "SimAllocator"),
117+
"ActorFuture": ("monarch.future", "ActorFuture"),
116118
"builtins": ("monarch.builtins", "builtins"),
117119
}
118120

@@ -181,6 +183,8 @@ def __getattr__(name):
181183
"timer",
182184
"ProcessAllocator",
183185
"LocalAllocator",
186+
"SimAllocator",
187+
"ActorFuture",
184188
"builtins",
185189
]
186190
assert sorted(__all__) == sorted(_public_api)

python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,26 @@ class LocalAllocatorBase:
6969
"""
7070
...
7171

72+
class SimAllocatorBase:
73+
async def allocate_nonblocking(self, spec: AllocSpec) -> Alloc:
74+
"""
75+
Allocate a process according to the provided spec.
76+
77+
Arguments:
78+
- `spec`: The spec to allocate according to.
79+
"""
80+
...
81+
82+
def allocate_blocking(self, spec: AllocSpec) -> Alloc:
83+
"""
84+
Allocate a process according to the provided spec, blocking until an
85+
alloc is returned.
86+
87+
Arguments:
88+
- `spec`: The spec to allocate according to.
89+
"""
90+
...
91+
7292
class RemoteAllocatorBase:
7393
def __new__(
7494
cls,

python/monarch/_src/actor/allocator.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
LocalAllocatorBase,
2020
ProcessAllocatorBase,
2121
RemoteAllocatorBase,
22+
SimAllocatorBase,
2223
)
2324

2425
from monarch._src.actor.future import Future
@@ -72,6 +73,28 @@ def allocate(self, spec: AllocSpec) -> Future[Alloc]:
7273
)
7374

7475

76+
@final
77+
class SimAllocator(SimAllocatorBase):
78+
"""
79+
An allocator that allocates by spawning actors into the current process using simulated channels for transport
80+
"""
81+
82+
def allocate(self, spec: AllocSpec) -> Future[Alloc]:
83+
"""
84+
Allocate a process according to the provided spec.
85+
86+
Arguments:
87+
- `spec`: The spec to allocate according to.
88+
89+
Returns:
90+
- A future that will be fulfilled when the requested allocation is fulfilled.
91+
"""
92+
return Future(
93+
lambda: self.allocate_nonblocking(spec),
94+
lambda: self.allocate_blocking(spec),
95+
)
96+
97+
7598
class RemoteAllocInitializer(abc.ABC):
7699
"""Subclass-able Python interface for `hyperactor_mesh::alloc::remoteprocess:RemoteProcessAllocInitializer`.
77100

python/monarch/_src/actor/proc_mesh.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
)
3636
from monarch._rust_bindings.monarch_hyperactor.shape import Shape, Slice
3737
from monarch._src.actor.actor_mesh import _Actor, _ActorMeshRefImpl, Actor, ActorMeshRef
38-
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator
38+
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator, SimAllocator
3939
from monarch._src.actor.code_sync import RsyncMeshClient, WorkspaceLocation
4040
from monarch._src.actor.code_sync.auto_reload import AutoReloadActor
4141

@@ -312,6 +312,33 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[Pro
312312
)
313313

314314

315+
async def sim_proc_mesh_nonblocking(
316+
*, gpus: Optional[int] = None, hosts: int = 1
317+
) -> ProcMesh:
318+
if gpus is None:
319+
gpus = _local_device_count()
320+
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts)
321+
allocator = SimAllocator()
322+
alloc = await allocator.allocate(spec)
323+
return await ProcMesh.from_alloc(alloc)
324+
325+
326+
def sim_proc_mesh_blocking(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh:
327+
if gpus is None:
328+
gpus = _local_device_count()
329+
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts)
330+
allocator = SimAllocator()
331+
alloc = allocator.allocate(spec).get()
332+
return ProcMesh.from_alloc(alloc).get()
333+
334+
335+
def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[ProcMesh]:
336+
return Future(
337+
lambda: sim_proc_mesh_nonblocking(gpus=gpus, hosts=hosts),
338+
lambda: sim_proc_mesh_blocking(gpus=gpus, hosts=hosts),
339+
)
340+
341+
315342
_BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"
316343

317344

python/monarch/actor/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323
ValueMesh,
2424
)
2525
from monarch._src.actor.future import Future
26-
from monarch._src.actor.proc_mesh import local_proc_mesh, proc_mesh, ProcMesh
26+
from monarch._src.actor.proc_mesh import (
27+
local_proc_mesh,
28+
proc_mesh,
29+
ProcMesh,
30+
sim_proc_mesh,
31+
)
2732

2833
__all__ = [
2934
"Accumulator",
@@ -41,5 +46,6 @@
4146
"proc_mesh",
4247
"ProcMesh",
4348
"send",
49+
"sim_proc_mesh",
4450
"ValueMesh",
4551
]

0 commit comments

Comments
 (0)