Skip to content

Commit a4e11cb

Browse files
thomasywangfacebook-github-bot
authored andcommitted
local_proc_mesh with sim channels (#475)
Summary: Pull Request resolved: #475 We want to instantiate local_proc_meshes that use ChannelTransport::SIm instead of ChannelTranport::Local so that the simulator can intercept and control the delivery of messages To preserve to `allocate()` interface so that we can reuse existing test generation macros we will create a wrapper class for this around `LocalAlloc` Differential Revision: D77941640
1 parent d80c542 commit a4e11cb

File tree

10 files changed

+268
-4
lines changed

10 files changed

+268
-4
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,4 +1010,10 @@ mod tests {
10101010
buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap()
10111011
)));
10121012
}
1013+
1014+
mod sim {
1015+
use crate::alloc::sim::SimAllocator;
1016+
1017+
actor_mesh_test_suite!(SimAllocator::new_and_start_simnet());
1018+
}
10131019
}

hyperactor_mesh/src/alloc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod local;
1313
pub(crate) mod logtailer;
1414
pub mod process;
1515
pub mod remoteprocess;
16+
pub mod sim;
1617

1718
use std::collections::HashMap;
1819
use std::fmt;

hyperactor_mesh/src/alloc/local.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,15 @@ pub struct LocalAlloc {
7575
todo_rx: mpsc::UnboundedReceiver<Action>,
7676
stopped: bool,
7777
failed: bool,
78+
transport: ChannelTransport,
7879
}
7980

8081
impl LocalAlloc {
8182
fn new(spec: AllocSpec) -> Self {
83+
Self::new_with_transport(spec, ChannelTransport::Local)
84+
}
85+
86+
pub(crate) fn new_with_transport(spec: AllocSpec, transport: ChannelTransport) -> Self {
8287
let name = ShortUuid::generate();
8388
let (todo_tx, todo_rx) = mpsc::unbounded_channel();
8489
for rank in 0..spec.shape.slice().len() {
@@ -94,6 +99,7 @@ impl LocalAlloc {
9499
todo_rx,
95100
stopped: false,
96101
failed: false,
102+
transport,
97103
}
98104
}
99105

@@ -123,7 +129,7 @@ impl LocalAlloc {
123129
&self.name
124130
}
125131

126-
fn size(&self) -> usize {
132+
pub(crate) fn size(&self) -> usize {
127133
self.spec.shape.slice().len()
128134
}
129135
}
@@ -249,7 +255,7 @@ impl Alloc for LocalAlloc {
249255
}
250256

251257
fn transport(&self) -> ChannelTransport {
252-
ChannelTransport::Local
258+
self.transport.clone()
253259
}
254260

255261
async fn stop(&mut self) -> Result<(), AllocatorError> {

hyperactor_mesh/src/alloc/sim.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! Support for allocating procs in the local process with simulated channels.
10+
11+
#![allow(dead_code)] // until it is used outside of testing
12+
13+
use async_trait::async_trait;
14+
use hyperactor::WorldId;
15+
use hyperactor::channel::ChannelAddr;
16+
use hyperactor::channel::ChannelTransport;
17+
use hyperactor::mailbox::MailboxServerHandle;
18+
use hyperactor::proc::Proc;
19+
use ndslice::Shape;
20+
21+
use super::ProcStopReason;
22+
use crate::alloc::Alloc;
23+
use crate::alloc::AllocSpec;
24+
use crate::alloc::Allocator;
25+
use crate::alloc::AllocatorError;
26+
use crate::alloc::LocalAlloc;
27+
use crate::alloc::ProcState;
28+
use crate::shortuuid::ShortUuid;
29+
30+
/// An allocator that runs procs in the local process with network traffic going through simulated channels.
31+
/// Other than transport, the underlying implementation is an inner LocalAlloc.
32+
pub struct SimAllocator;
33+
34+
#[async_trait]
35+
impl Allocator for SimAllocator {
36+
type Alloc = SimAlloc;
37+
38+
async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
39+
Ok(SimAlloc::new(spec))
40+
}
41+
}
42+
43+
impl SimAllocator {
44+
#[cfg(test)]
45+
pub(crate) fn new_and_start_simnet() -> Self {
46+
hyperactor::simnet::start();
47+
Self
48+
}
49+
}
50+
51+
struct SimProc {
52+
proc: Proc,
53+
addr: ChannelAddr,
54+
handle: MailboxServerHandle,
55+
}
56+
57+
/// A simulated allocation. It is a collection of procs that are running in the local process.
58+
pub struct SimAlloc {
59+
inner: LocalAlloc,
60+
}
61+
62+
impl SimAlloc {
63+
fn new(spec: AllocSpec) -> Self {
64+
Self {
65+
inner: LocalAlloc::new_with_transport(
66+
spec,
67+
ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
68+
),
69+
}
70+
}
71+
/// A chaos monkey that can be used to stop procs at random.
72+
pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
73+
self.inner.chaos_monkey()
74+
}
75+
76+
/// A function to shut down the alloc for testing purposes.
77+
pub(crate) fn stopper(&self) -> impl Fn() + 'static {
78+
self.inner.stopper()
79+
}
80+
81+
pub(crate) fn name(&self) -> &ShortUuid {
82+
self.inner.name()
83+
}
84+
85+
fn size(&self) -> usize {
86+
self.inner.size()
87+
}
88+
}
89+
90+
#[async_trait]
91+
impl Alloc for SimAlloc {
92+
async fn next(&mut self) -> Option<ProcState> {
93+
self.inner.next().await
94+
}
95+
96+
fn shape(&self) -> &Shape {
97+
self.inner.shape()
98+
}
99+
100+
fn world_id(&self) -> &WorldId {
101+
self.inner.world_id()
102+
}
103+
104+
fn transport(&self) -> ChannelTransport {
105+
self.inner.transport()
106+
}
107+
108+
async fn stop(&mut self) -> Result<(), AllocatorError> {
109+
self.inner.stop().await
110+
}
111+
}
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use super::*;
116+
117+
#[tokio::test]
118+
async fn test_allocator_basic() {
119+
hyperactor::simnet::start();
120+
crate::alloc::testing::test_allocator_basic(SimAllocator).await;
121+
}
122+
}

monarch_hyperactor/src/alloc.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use hyperactor_mesh::alloc::ProcessAllocator;
2828
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAlloc;
2929
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocHost;
3030
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocInitializer;
31+
use hyperactor_mesh::alloc::sim::SimAllocator;
3132
use hyperactor_mesh::shape::Shape;
3233
use ndslice::Slice;
3334
use pyo3::exceptions::PyRuntimeError;
@@ -222,6 +223,53 @@ impl PyLocalAllocator {
222223
}
223224
}
224225

226+
#[pyclass(
227+
name = "SimAllocatorBase",
228+
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
229+
subclass
230+
)]
231+
pub struct PySimAllocator;
232+
233+
#[pymethods]
234+
impl PySimAllocator {
235+
#[new]
236+
fn new() -> Self {
237+
PySimAllocator {}
238+
}
239+
240+
fn allocate_nonblocking<'py>(
241+
&self,
242+
py: Python<'py>,
243+
spec: &PyAllocSpec,
244+
) -> PyResult<Bound<'py, PyAny>> {
245+
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
246+
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
247+
// pretty easily.
248+
let spec = spec.inner.clone();
249+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
250+
SimAllocator
251+
.allocate(spec)
252+
.await
253+
.map(|inner| PyAlloc::new(Box::new(inner)))
254+
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
255+
})
256+
}
257+
258+
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
259+
// We could use Bound here, and acquire the GIL inside of
260+
// `signal_safe_block_on`, but it is rather awkward with the current
261+
// APIs, and we can anyway support Arc/Mutex pretty easily.
262+
let spec = spec.inner.clone();
263+
signal_safe_block_on(py, async move {
264+
SimAllocator
265+
.allocate(spec)
266+
.await
267+
.map(|inner| PyAlloc::new(Box::new(inner)))
268+
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
269+
})?
270+
}
271+
}
272+
225273
#[pyclass(
226274
name = "ProcessAllocatorBase",
227275
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
@@ -474,6 +522,7 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul
474522
hyperactor_mod.add_class::<PyAllocSpec>()?;
475523
hyperactor_mod.add_class::<PyProcessAllocator>()?;
476524
hyperactor_mod.add_class::<PyLocalAllocator>()?;
525+
hyperactor_mod.add_class::<PySimAllocator>()?;
477526
hyperactor_mod.add_class::<PyRemoteAllocator>()?;
478527

479528
Ok(())

python/monarch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@
113113
"timer": ("monarch.timer", "timer"),
114114
"ProcessAllocator": ("monarch._src.actor.allocator", "ProcessAllocator"),
115115
"LocalAllocator": ("monarch._src.actor.allocator", "LocalAllocator"),
116+
"SimAllocator": ("monarch._src_actor.allocator", "SimAllocator"),
117+
"ActorFuture": ("monarch.future", "ActorFuture"),
116118
"builtins": ("monarch.builtins", "builtins"),
117119
}
118120

@@ -181,6 +183,8 @@ def __getattr__(name):
181183
"timer",
182184
"ProcessAllocator",
183185
"LocalAllocator",
186+
"SimAllocator",
187+
"ActorFuture",
184188
"builtins",
185189
]
186190
assert sorted(__all__) == sorted(_public_api)

python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,26 @@ class LocalAllocatorBase:
100100
"""
101101
...
102102

103+
class SimAllocatorBase:
104+
async def allocate_nonblocking(self, spec: AllocSpec) -> Alloc:
105+
"""
106+
Allocate a process according to the provided spec.
107+
108+
Arguments:
109+
- `spec`: The spec to allocate according to.
110+
"""
111+
...
112+
113+
def allocate_blocking(self, spec: AllocSpec) -> Alloc:
114+
"""
115+
Allocate a process according to the provided spec, blocking until an
116+
alloc is returned.
117+
118+
Arguments:
119+
- `spec`: The spec to allocate according to.
120+
"""
121+
...
122+
103123
class RemoteAllocatorBase:
104124
def __new__(
105125
cls,

python/monarch/_src/actor/allocator.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
LocalAllocatorBase,
1717
ProcessAllocatorBase,
1818
RemoteAllocatorBase,
19+
SimAllocatorBase,
1920
)
2021

2122
from monarch._src.actor.future import Future
@@ -69,6 +70,28 @@ def allocate(self, spec: AllocSpec) -> Future[Alloc]:
6970
)
7071

7172

73+
@final
74+
class SimAllocator(SimAllocatorBase):
75+
"""
76+
An allocator that allocates by spawning actors into the current process using simulated channels for transport
77+
"""
78+
79+
def allocate(self, spec: AllocSpec) -> Future[Alloc]:
80+
"""
81+
Allocate a process according to the provided spec.
82+
83+
Arguments:
84+
- `spec`: The spec to allocate according to.
85+
86+
Returns:
87+
- A future that will be fulfilled when the requested allocation is fulfilled.
88+
"""
89+
return Future(
90+
lambda: self.allocate_nonblocking(spec),
91+
lambda: self.allocate_blocking(spec),
92+
)
93+
94+
7295
class RemoteAllocInitializer(abc.ABC):
7396
"""Subclass-able Python interface for `hyperactor_mesh::alloc::remoteprocess:RemoteProcessAllocInitializer`.
7497

python/monarch/_src/actor/proc_mesh.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
)
3636
from monarch._rust_bindings.monarch_hyperactor.shape import Shape, Slice
3737
from monarch._src.actor.actor_mesh import _Actor, _ActorMeshRefImpl, Actor, ActorMeshRef
38-
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator
38+
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator, SimAllocator
3939
from monarch._src.actor.code_sync import RsyncMeshClient, WorkspaceLocation
4040
from monarch._src.actor.code_sync.auto_reload import AutoReloadActor
4141

@@ -312,6 +312,33 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[Pro
312312
)
313313

314314

315+
async def sim_proc_mesh_nonblocking(
316+
*, gpus: Optional[int] = None, hosts: int = 1
317+
) -> ProcMesh:
318+
if gpus is None:
319+
gpus = _local_device_count()
320+
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts)
321+
allocator = SimAllocator()
322+
alloc = await allocator.allocate(spec)
323+
return await ProcMesh.from_alloc(alloc)
324+
325+
326+
def sim_proc_mesh_blocking(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh:
327+
if gpus is None:
328+
gpus = _local_device_count()
329+
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts)
330+
allocator = SimAllocator()
331+
alloc = allocator.allocate(spec).get()
332+
return ProcMesh.from_alloc(alloc).get()
333+
334+
335+
def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[ProcMesh]:
336+
return Future(
337+
lambda: sim_proc_mesh_nonblocking(gpus=gpus, hosts=hosts),
338+
lambda: sim_proc_mesh_blocking(gpus=gpus, hosts=hosts),
339+
)
340+
341+
315342
_BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"
316343

317344

python/monarch/actor/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323
ValueMesh,
2424
)
2525
from monarch._src.actor.future import Future
26-
from monarch._src.actor.proc_mesh import local_proc_mesh, proc_mesh, ProcMesh
26+
from monarch._src.actor.proc_mesh import (
27+
local_proc_mesh,
28+
proc_mesh,
29+
ProcMesh,
30+
sim_proc_mesh,
31+
)
2732

2833
__all__ = [
2934
"Accumulator",
@@ -41,5 +46,6 @@
4146
"proc_mesh",
4247
"ProcMesh",
4348
"send",
49+
"sim_proc_mesh",
4450
"ValueMesh",
4551
]

0 commit comments

Comments
 (0)