Skip to content

Commit a6e1312

Browse files
thomasywangfacebook-github-bot
authored andcommitted
local_proc_mesh with sim channels
Summary: We want to instantiate local_proc_meshes that use ChannelTransport::SIm instead of ChannelTranport::Local so that the simulator can intercept and control the delivery of messages Rollback Plan: Differential Revision: D77941640
1 parent cbdb8dc commit a6e1312

File tree

6 files changed

+129
-9
lines changed

6 files changed

+129
-9
lines changed

hyperactor_mesh/src/alloc/local.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ impl Allocator for LocalAllocator {
5858
}
5959
}
6060

61+
impl LocalAllocator {
62+
pub async fn allocate_sim(&mut self, spec: AllocSpec) -> Result<LocalAlloc, AllocatorError> {
63+
Ok(LocalAlloc::new_sim(spec))
64+
}
65+
}
66+
6167
struct LocalProc {
6268
proc: Proc,
6369
addr: ChannelAddr,
@@ -75,10 +81,22 @@ pub struct LocalAlloc {
7581
todo_rx: mpsc::UnboundedReceiver<Action>,
7682
stopped: bool,
7783
failed: bool,
84+
transport: ChannelTransport,
7885
}
7986

8087
impl LocalAlloc {
8188
fn new(spec: AllocSpec) -> Self {
89+
Self::new_with_transport(spec, ChannelTransport::Local)
90+
}
91+
92+
fn new_sim(spec: AllocSpec) -> Self {
93+
Self::new_with_transport(
94+
spec,
95+
ChannelTransport::Sim(Box::new(ChannelTransport::Unix)),
96+
)
97+
}
98+
99+
fn new_with_transport(spec: AllocSpec, transport: ChannelTransport) -> Self {
82100
let name = ShortUuid::generate();
83101
let (todo_tx, todo_rx) = mpsc::unbounded_channel();
84102
for rank in 0..spec.shape.slice().len() {
@@ -94,6 +112,7 @@ impl LocalAlloc {
94112
todo_rx,
95113
stopped: false,
96114
failed: false,
115+
transport,
97116
}
98117
}
99118

@@ -249,7 +268,7 @@ impl Alloc for LocalAlloc {
249268
}
250269

251270
fn transport(&self) -> ChannelTransport {
252-
ChannelTransport::Local
271+
self.transport.clone()
253272
}
254273

255274
async fn stop(&mut self) -> Result<(), AllocatorError> {

monarch_hyperactor/src/alloc.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,53 @@ impl PyLocalAllocator {
8181
}
8282
}
8383

84+
#[pyclass(
85+
name = "SimAllocatorBase",
86+
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
87+
subclass
88+
)]
89+
pub struct PySimAllocator;
90+
91+
#[pymethods]
92+
impl PySimAllocator {
93+
#[new]
94+
fn new() -> Self {
95+
PySimAllocator {}
96+
}
97+
98+
fn allocate_nonblocking<'py>(
99+
&self,
100+
py: Python<'py>,
101+
spec: &PyAllocSpec,
102+
) -> PyResult<Bound<'py, PyAny>> {
103+
// We could use Bound here, and acquire the GIL inside of `future_into_py`, but
104+
// it is rather awkward with the current APIs, and we can anyway support Arc/Mutex
105+
// pretty easily.
106+
let spec = spec.inner.clone();
107+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
108+
LocalAllocator
109+
.allocate_sim(spec)
110+
.await
111+
.map(|inner| PyAlloc::new(Box::new(inner)))
112+
.map_err(|e| PyRuntimeError::new_err(format!("{}", e)))
113+
})
114+
}
115+
116+
fn allocate_blocking<'py>(&self, py: Python<'py>, spec: &PyAllocSpec) -> PyResult<PyAlloc> {
117+
// We could use Bound here, and acquire the GIL inside of
118+
// `signal_safe_block_on`, but it is rather awkward with the current
119+
// APIs, and we can anyway support Arc/Mutex pretty easily.
120+
let spec = spec.inner.clone();
121+
signal_safe_block_on(py, async move {
122+
LocalAllocator
123+
.allocate_sim(spec)
124+
.await
125+
.map(|inner| PyAlloc::new(Box::new(inner)))
126+
.map_err(|e| PyRuntimeError::new_err(format!("{:?}", e)))
127+
})?
128+
}
129+
}
130+
84131
#[pyclass(
85132
name = "ProcessAllocatorBase",
86133
module = "monarch._rust_bindings.monarch_hyperactor.alloc",
@@ -330,6 +377,7 @@ impl PyRemoteAllocator {
330377
pub fn register_python_bindings(hyperactor_mod: &Bound<'_, PyModule>) -> PyResult<()> {
331378
hyperactor_mod.add_class::<PyProcessAllocator>()?;
332379
hyperactor_mod.add_class::<PyLocalAllocator>()?;
380+
hyperactor_mod.add_class::<PySimAllocator>()?;
333381
hyperactor_mod.add_class::<PyRemoteAllocator>()?;
334382

335383
Ok(())

python/monarch/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@
113113
"timer": ("monarch.timer", "timer"),
114114
"ProcessAllocator": ("monarch._src.actor.allocator", "ProcessAllocator"),
115115
"LocalAllocator": ("monarch._src.actor.allocator", "LocalAllocator"),
116+
"SimAllocator": ("monarch._src_actor.allocator", "SimAllocator"),
117+
"ActorFuture": ("monarch.future", "ActorFuture"),
116118
"builtins": ("monarch.builtins", "builtins"),
117119
}
118120

@@ -181,6 +183,8 @@ def __getattr__(name):
181183
"timer",
182184
"ProcessAllocator",
183185
"LocalAllocator",
186+
"SimAllocator",
187+
"ActorFuture",
184188
"builtins",
185189
]
186190
assert sorted(__all__) == sorted(_public_api)

python/monarch/_rust_bindings/monarch_hyperactor/alloc.pyi

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,26 @@ class LocalAllocatorBase:
6969
"""
7070
...
7171

72+
class SimAllocatorBase:
73+
async def allocate_nonblocking(self, spec: AllocSpec) -> Alloc:
74+
"""
75+
Allocate a process according to the provided spec.
76+
77+
Arguments:
78+
- `spec`: The spec to allocate according to.
79+
"""
80+
...
81+
82+
def allocate_blocking(self, spec: AllocSpec) -> Alloc:
83+
"""
84+
Allocate a process according to the provided spec, blocking until an
85+
alloc is returned.
86+
87+
Arguments:
88+
- `spec`: The spec to allocate according to.
89+
"""
90+
...
91+
7292
class RemoteAllocatorBase:
7393
def __new__(
7494
cls,

python/monarch/_src/actor/allocator.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
LocalAllocatorBase,
2020
ProcessAllocatorBase,
2121
RemoteAllocatorBase,
22+
SimAllocatorBase,
2223
)
2324

2425
from monarch._src.actor.future import Future
@@ -72,6 +73,28 @@ def allocate(self, spec: AllocSpec) -> Future[Alloc]:
7273
)
7374

7475

76+
@final
77+
class SimAllocator(SimAllocatorBase):
78+
"""
79+
An allocator that allocates by spawning actors into the current process using simulated channels for transport
80+
"""
81+
82+
def allocate(self, spec: AllocSpec) -> Future[Alloc]:
83+
"""
84+
Allocate a process according to the provided spec.
85+
86+
Arguments:
87+
- `spec`: The spec to allocate according to.
88+
89+
Returns:
90+
- A future that will be fulfilled when the requested allocation is fulfilled.
91+
"""
92+
return Future(
93+
lambda: self.allocate_nonblocking(spec),
94+
lambda: self.allocate_blocking(spec),
95+
)
96+
97+
7598
class RemoteAllocInitializer(abc.ABC):
7699
"""Subclass-able Python interface for `hyperactor_mesh::alloc::remoteprocess:RemoteProcessAllocInitializer`.
77100

python/monarch/_src/actor/proc_mesh.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
)
3535
from monarch._rust_bindings.monarch_hyperactor.shape import Shape, Slice
3636
from monarch._src.actor.actor_mesh import _Actor, _ActorMeshRefImpl, Actor, ActorMeshRef
37-
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator
37+
from monarch._src.actor.allocator import LocalAllocator, ProcessAllocator, SimAllocator
3838
from monarch._src.actor.code_sync import RsyncMeshClient, WorkspaceLocation
3939
from monarch._src.actor.code_sync.auto_reload import AutoReloadActor
4040

@@ -287,29 +287,35 @@ def __del__(self) -> None:
287287

288288

289289
async def local_proc_mesh_nonblocking(
290-
*, gpus: Optional[int] = None, hosts: int = 1
290+
*, gpus: Optional[int] = None, hosts: int = 1, simulated: bool = False
291291
) -> ProcMesh:
292292
if gpus is None:
293293
gpus = _local_device_count()
294294
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts)
295-
allocator = LocalAllocator()
295+
allocator = LocalAllocator() if not simulated else SimAllocator()
296296
alloc = await allocator.allocate(spec)
297297
return await ProcMesh.from_alloc(alloc)
298298

299299

300-
def local_proc_mesh_blocking(*, gpus: Optional[int] = None, hosts: int = 1) -> ProcMesh:
300+
def local_proc_mesh_blocking(
301+
*, gpus: Optional[int] = None, hosts: int = 1, simulated: bool = False
302+
) -> ProcMesh:
301303
if gpus is None:
302304
gpus = _local_device_count()
303305
spec = AllocSpec(AllocConstraints(), gpus=gpus, hosts=hosts)
304-
allocator = LocalAllocator()
306+
allocator = LocalAllocator() if not simulated else SimAllocator()
305307
alloc = allocator.allocate(spec).get()
306308
return ProcMesh.from_alloc(alloc).get()
307309

308310

309-
def local_proc_mesh(*, gpus: Optional[int] = None, hosts: int = 1) -> Future[ProcMesh]:
311+
def local_proc_mesh(
312+
*, gpus: Optional[int] = None, hosts: int = 1, simulated: bool = False
313+
) -> Future[ProcMesh]:
310314
return Future(
311-
lambda: local_proc_mesh_nonblocking(gpus=gpus, hosts=hosts),
312-
lambda: local_proc_mesh_blocking(gpus=gpus, hosts=hosts),
315+
lambda: local_proc_mesh_nonblocking(
316+
gpus=gpus, hosts=hosts, simulated=simulated
317+
),
318+
lambda: local_proc_mesh_blocking(gpus=gpus, hosts=hosts, simulated=simulated),
313319
)
314320

315321

0 commit comments

Comments
 (0)