Skip to content

Commit 4629a32

Browse files
thomasywangfacebook-github-bot
authored andcommitted
local_proc_mesh with sim channels (#475)
Summary: Pull Request resolved: #475 We want to instantiate local_proc_meshes that use ChannelTransport::SIm instead of ChannelTranport::Local so that the simulator can intercept and control the delivery of messages To preserve to `allocate()` interface so that we can reuse existing test generation macros we will create a wrapper class for this around `LocalAlloc` Reviewed By: colin2328 Differential Revision: D77941640 fbshipit-source-id: 0be3a11b408d6a2b838de0508f1d4501ccb5afbc
1 parent 62459b3 commit 4629a32

File tree

10 files changed

+268
-4
lines changed

10 files changed

+268
-4
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -992,4 +992,10 @@ mod tests {
992992
buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap()
993993
)));
994994
}
995+
996+
mod sim {
997+
use crate::alloc::sim::SimAllocator;
998+
999+
actor_mesh_test_suite!(SimAllocator::new_and_start_simnet());
1000+
}
9951001
}

hyperactor_mesh/src/alloc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod local;
1313
pub(crate) mod logtailer;
1414
pub mod process;
1515
pub mod remoteprocess;
16+
pub mod sim;
1617

1718
use std::collections::HashMap;
1819
use std::fmt;

hyperactor_mesh/src/alloc/local.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,15 @@ pub struct LocalAlloc {
7676
todo_rx: mpsc::UnboundedReceiver<Action>,
7777
stopped: bool,
7878
failed: bool,
79+
transport: ChannelTransport,
7980
}
8081

8182
impl LocalAlloc {
8283
fn new(spec: AllocSpec) -> Self {
84+
Self::new_with_transport(spec, ChannelTransport::Local)
85+
}
86+
87+
pub(crate) fn new_with_transport(spec: AllocSpec, transport: ChannelTransport) -> Self {
8388
let name = ShortUuid::generate();
8489
let (todo_tx, todo_rx) = mpsc::unbounded_channel();
8590
for rank in 0..spec.shape.slice().len() {
@@ -95,6 +100,7 @@ impl LocalAlloc {
95100
todo_rx,
96101
stopped: false,
97102
failed: false,
103+
transport,
98104
}
99105
}
100106

@@ -124,7 +130,7 @@ impl LocalAlloc {
124130
&self.name
125131
}
126132

127-
fn size(&self) -> usize {
133+
pub(crate) fn size(&self) -> usize {
128134
self.spec.shape.slice().len()
129135
}
130136
}
@@ -250,7 +256,7 @@ impl Alloc for LocalAlloc {
250256
}
251257

252258
fn transport(&self) -> ChannelTransport {
253-
ChannelTransport::Local
259+
self.transport.clone()
254260
}
255261

256262
async fn log_source(&self) -> Result<LogSource, AllocatorError> {

hyperactor_mesh/src/alloc/sim.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
//! Support for allocating procs in the local process with simulated channels.
10+
11+
#![allow(dead_code)] // until it is used outside of testing
12+
13+
use async_trait::async_trait;
14+
use hyperactor::WorldId;
15+
use hyperactor::channel::ChannelAddr;
16+
use hyperactor::channel::ChannelTransport;
17+
use hyperactor::mailbox::MailboxServerHandle;
18+
use hyperactor::proc::Proc;
19+
use ndslice::Shape;
20+
21+
use super::ProcStopReason;
22+
use crate::alloc::Alloc;
23+
use crate::alloc::AllocSpec;
24+
use crate::alloc::Allocator;
25+
use crate::alloc::AllocatorError;
26+
use crate::alloc::LocalAlloc;
27+
use crate::alloc::ProcState;
28+
use crate::log_source::LogSource;
29+
use crate::shortuuid::ShortUuid;
30+
31+
/// An allocator that runs procs in the local process with network traffic going through simulated channels.
32+
/// Other than transport, the underlying implementation is an inner LocalAlloc.
33+
pub struct SimAllocator;
34+
35+
#[async_trait]
36+
impl Allocator for SimAllocator {
37+
type Alloc = SimAlloc;
38+
39+
async fn allocate(&mut self, spec: AllocSpec) -> Result<Self::Alloc, AllocatorError> {
40+
Ok(SimAlloc::new(spec))
41+
}
42+
}
43+
44+
impl SimAllocator {
45+
#[cfg(test)]
46+
pub(crate) fn new_and_start_simnet() -> Self {
47+
hyperactor::simnet::start();
48+
Self
49+
}
50+
}
51+
52+
struct SimProc {
53+
proc: Proc,
54+
addr: ChannelAddr,
55+
handle: MailboxServerHandle,
56+
}
57+
58+
/// A simulated allocation. It is a collection of procs that are running in the local process.
59+
pub struct SimAlloc {
60+
inner: LocalAlloc,
61+
}
62+
63+
impl SimAlloc {
64+
fn new(spec: AllocSpec) -> Self {
65+
Self {
66+
inner: LocalAlloc::new_with_transport(
67+
spec,
68+
ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
69+
),
70+
}
71+
}
72+
/// A chaos monkey that can be used to stop procs at random.
73+
pub(crate) fn chaos_monkey(&self) -> impl Fn(usize, ProcStopReason) + 'static {
74+
self.inner.chaos_monkey()
75+
}
76+
77+
/// A function to shut down the alloc for testing purposes.
78+
pub(crate) fn stopper(&self) -> impl Fn() + 'static {
79+
self.inner.stopper()
80+
}
81+
82+
pub(crate) fn name(&self) -> &ShortUuid {
83+
self.inner.name()
84+
}
85+
86+
fn size(&self) -> usize {
87+
self.inner.size()
88+
}
89+
}
90+
91+
#[async_trait]
92+
impl Alloc for SimAlloc {
93+
async fn next(&mut self) -> Option<ProcState> {
94+
self.inner.next().await
95+
}
96+
97+
fn shape(&self) -> &Shape {
98+
self.inner.shape()
99+
}
100+
101+
fn world_id(&self) -> &WorldId {
102+
self.inner.world_id()
103+
}
104+
105+
fn transport(&self) -> ChannelTransport {
106+
self.inner.transport()
107+
}
108+
109+
async fn stop(&mut self) -> Result<(), AllocatorError> {
110+
self.inner.stop().await
111+
}
112+
113+
async fn log_source(&self) -> Result<LogSource, AllocatorError> {
114+
self.inner.log_source().await
115+
}
116+
}
117+
118+
#[cfg(test)]
119+
mod tests {
120+
use super::*;
121+
122+
#[tokio::test]
123+
async fn test_allocator_basic() {
124+
hyperactor::simnet::start();
125+
crate::alloc::testing::test_allocator_basic(SimAllocator).await;
126+
}
127+
}

monarch_hyperactor/src/alloc.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use hyperactor_mesh::alloc::ProcessAllocator;
2828
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAlloc;
2929
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocHost;
3030
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocInitializer;
31+
use hyperactor_mesh::alloc::sim::SimAllocator;
3132
use hyperactor_mesh::log_source::LogSource;
3233
use hyperactor_mesh::shape::Shape;
3334
use ndslice::Slice;
@@ -227,6 +228,53 @@ impl PyLocalAllocator {
227228
}
228229
}
229230

231+
#[pyclass(
232+
name = "SimAllocatorBase",
233+
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
234+
subclass
235+
)]
236+
pub struct PySimAllocator;
237+
238+
#[pymethods]
239+
impl PySimAllocator {
240+
#[new]
241+
fn new() -> Self {
242+
PySimAllocator {}
243+
}
244+
245+
fn allocate_nonblocking<'py>(
246+
&self,
247+
py: Python<'py>,
248+
spec: &PyAllocSpec,
249+
) -> PyResult<Bound<'py, PyAny>> {
250+
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
251+
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
252+
// pretty easily.
253+
let spec = spec.inner.clone();
254+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
255+
SimAllocator
256+
.allocate(spec)
257+
.await
258+
.map(|inner| PyAlloc::new(Box::new(inner)))
259+
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
260+
})
261+
}
262+
263+
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
264+
// We could use Bound here, and acquire the GIL inside of
265+
// `signal_safe_block_on`, but it is rather awkward with the current
266+
// APIs, and we can anyway support Arc/Mutex pretty easily.
267+
let spec = spec.inner.clone();
268+
signal_safe_block_on(py, async move {
269+
SimAllocator
270+
.allocate(spec)
271+
.await
272+
.map(|inner| PyAlloc::new(Box::new(inner)))
273+
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
274+
})?
275+
}
276+
}
277+
230278
#[pyclass(
231279
name = "ProcessAllocatorBase",
232280
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
@@ -479,6 +527,7 @@ pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResul
479527
hyperactor_mod.add_class::<PyAllocSpec>()?;
480528
hyperactor_mod.add_class::<PyProcessAllocator>()?;
481529
hyperactor_mod.add_class::<PyLocalAllocator>()?;
530+
hyperactor_mod.add_class::<PySimAllocator>()?;
482531
hyperactor_mod.add_class::<PyRemoteAllocator>()?;
483532

484533
Ok(())

python/monarch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@
116116
"timer": ("monarch.timer", "timer"),
117117
"ProcessAllocator": ("monarch._src.actor.allocator", "ProcessAllocator"),
118118
"LocalAllocator": ("monarch._src.actor.allocator", "LocalAllocator"),
119+
"SimAllocator": ("monarch._src_actor.allocator", "SimAllocator"),
120+
"ActorFuture": ("monarch.future", "ActorFuture"),
119121
"builtins": ("monarch.builtins", "builtins"),
120122
}
121123

@@ -184,6 +186,8 @@ def __getattr__(name):
184186
"timer",
185187
"ProcessAllocator",
186188
"LocalAllocator",
189+
"SimAllocator",
190+
"ActorFuture",
187191
"builtins",
188192
]
189193
assert sorted(__all__) == sorted(_public_api)

python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,26 @@ class LocalAllocatorBase:
100100
"""
101101
...
102102

103+
class SimAllocatorBase:
104+
async def allocate_nonblocking(self, spec: AllocSpec) -> Alloc:
105+
"""
106+
Allocate a process according to the provided spec.
107+
108+
Arguments:
109+
- `spec`: The spec to allocate according to.
110+
"""
111+
...
112+
113+
def allocate_blocking(self, spec: AllocSpec) -> Alloc:
114+
"""
115+
Allocate a process according to the provided spec, blocking until an
116+
alloc is returned.
117+
118+
Arguments:
119+
- `spec`: The spec to allocate according to.
120+
"""
121+
...
122+
103123
class RemoteAllocatorBase:
104124
def __new__(
105125
cls,

python/monarch/_src/actor/allocator.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
LocalAllocatorBase,
1717
ProcessAllocatorBase,
1818
RemoteAllocatorBase,
19+
SimAllocatorBase,
1920
)
2021

2122
from monarch._src.actor.future import Future
@@ -69,6 +70,28 @@ def allocate(self, spec: AllocSpec) -> Future[Alloc]:
6970
)
7071

7172

73+
@final
74+
class SimAllocator(SimAllocatorBase):
75+
"""
76+
An allocator that allocates by spawning actors into the current process using simulated channels for transport
77+
"""
78+
79+
def allocate(self, spec: AllocSpec) -> Future[Alloc]:
80+
"""
81+
Allocate a process according to the provided spec.
82+
83+
Arguments:
84+
- `spec`: The spec to allocate according to.
85+
86+
Returns:
87+
- A future that will be fulfilled when the requested allocation is fulfilled.
88+
"""
89+
return Future(
90+
lambda: self.allocate_nonblocking(spec),
91+
lambda: self.allocate_blocking(spec),
92+
)
93+
94+
7295
class RemoteAllocInitializer(abc.ABC):
7396
"""Subclass-able Python interface for `hyperactor_mesh::alloc::remoteprocess:RemoteProcessAllocInitializer`.
7497

python/monarch/_src/actor/proc_mesh.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
)
3636
from monarch._rust_bindings.monarch_hyperactor.shape import Shape, Slice
3737
from monarch._src.actor.actor_mesh import _Actor, _ActorMeshRefImpl, Actor, ActorMeshRef
38-
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator
38+
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator, SimAllocator
3939
from monarch._src.actor.code_sync import RsyncMeshClient, WorkspaceLocation
4040
from monarch._src.actor.code_sync.auto_reload import AutoReloadActor
4141
from monarch._src.actor.debugger import (
@@ -329,6 +329,33 @@ def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[Pro
329329
)
330330

331331

332+
async def sim_proc_mesh_nonblocking(
333+
*, gpus: Optional[int] = None, hosts: int = 1
334+
) -> ProcMesh:
335+
if gpus is None:
336+
gpus = _local_device_count()
337+
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts)
338+
allocator = SimAllocator()
339+
alloc = await allocator.allocate(spec)
340+
return await ProcMesh.from_alloc(alloc)
341+
342+
343+
def sim_proc_mesh_blocking(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh:
344+
if gpus is None:
345+
gpus = _local_device_count()
346+
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts)
347+
allocator = SimAllocator()
348+
alloc = allocator.allocate(spec).get()
349+
return ProcMesh.from_alloc(alloc).get()
350+
351+
352+
def sim_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[ProcMesh]:
353+
return Future(
354+
lambda: sim_proc_mesh_nonblocking(gpus=gpus, hosts=hosts),
355+
lambda: sim_proc_mesh_blocking(gpus=gpus, hosts=hosts),
356+
)
357+
358+
332359
_BOOTSTRAP_MAIN = "monarch._src.actor.bootstrap_main"
333360

334361

0 commit comments

Comments
 (0)