Skip to content

Commit be4affa

Browse files
allenwang28facebook-github-bot
authored andcommitted
Expose RDMA support through Python APIs
Summary: This change brings in RDMA support from Rust to the Python APIs, overwriting the existing stand-in RDMABuffer APIs. There are quite a few changes, so here's a high level overview: ## `monarch_hyperactor` (rust) Within `PyProcMesh`, adds an optional RDMAMangerActor attribute. This mimics the prior functionality - before, a `proc_mesh` (in Python) would unconditionally spin up an `RdmaManagerActor`. Now, a `proc_mesh` will spin up an `RdmaManagerActor` if and only if `tensor_engine` is enabled and `ibverbs` APIs are supported. A few other pieces must be accessible within the RDMA extension library - specifically `Mailbox` / the send caps capabiliites. ## RDMA extension Adds an `extension` folder within `monarch_rdma` which contains the actual bindings. Consistent with other bindings, provides a blocking and non-blocking version. ## RDMA components Corrects behavior of a few edge cases, i.e. `ibverbs_supported` based on the number of devices, supporting a "loopback" case, wherein two actors share an RDMA buffer but are spawned on the same proc. ## Python Removes the stand-in RDMAManagerActor In rdma.py, adds in basic support for RDMABuffer CPU. While CUDA<>CPU|CUDA is implemented in `monarch_rdma`, this was running into issues with MR registration. Logging shows that the code path correctly interprets the address as CUDA, but `cuMemGetHandleForAddressRange` returns the handle/fd as `-1` and a null pointer MR. RDMABuffer will only support CPU for now with GPU support as a follow up, cc dstaay-fb # Test Updates tests for anything touching RDMABuffer. Splits out `test_python_actor` into `test_python_actor` and `test_rdma` now that RDMA is utilizing the backend network. Differential Revision: D76937776
1 parent bbcbcdf commit be4affa

File tree

18 files changed

+829
-334
lines changed

18 files changed

+829
-334
lines changed

monarch_extension/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiproces
2626
libc = "0.2.139"
2727
monarch_hyperactor = { version = "0.0.0", path = "../monarch_hyperactor" }
2828
monarch_messages = { version = "0.0.0", path = "../monarch_messages", optional = true }
29+
monarch_rdma_extension = { version = "0.0.0", path = "../monarch_rdma/extension" }
2930
monarch_simulator_lib = { version = "0.0.0", path = "../monarch_simulator", optional = true }
3031
monarch_tensor_worker = { version = "0.0.0", path = "../monarch_tensor_worker", optional = true }
3132
monarch_types = { version = "0.0.0", path = "../monarch_types" }

monarch_extension/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
120120
module,
121121
"monarch_extension.mesh_controller",
122122
)?)?;
123+
monarch_rdma_extension::register_python_bindings(&get_or_add_new_module(module, "rdma")?)?;
123124
}
124125
simulation_tools::register_python_bindings(&get_or_add_new_module(
125126
module,
@@ -166,6 +167,7 @@ pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
166167
module,
167168
"monarch_hyperactor.runtime",
168169
)?)?;
170+
169171
hyperactor_extension::alloc::register_python_bindings(&get_or_add_new_module(
170172
module,
171173
"hyperactor_extension.alloc",

monarch_hyperactor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" }
2020
hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiprocess" }
2121
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
2222
inventory = "0.3.8"
23+
monarch_rdma = { version = "0.0.0", path = "../monarch_rdma" }
2324
monarch_types = { version = "0.0.0", path = "../monarch_types" }
2425
ndslice = { version = "0.0.0", path = "../ndslice" }
2526
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl PythonActorMesh {
6262
}
6363

6464
#[getter]
65-
fn client(&self) -> PyMailbox {
65+
pub fn client(&self) -> PyMailbox {
6666
self.client.clone()
6767
}
6868

monarch_hyperactor/src/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ use crate::shape::PyShape;
5252
name = "Mailbox",
5353
module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
5454
)]
55-
pub(super) struct PyMailbox {
56-
pub(super) inner: Mailbox,
55+
pub struct PyMailbox {
56+
pub inner: Mailbox,
5757
}
5858

5959
#[pymethods]

monarch_hyperactor/src/proc_mesh.rs

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ use hyperactor_mesh::proc_mesh::SharedSpawnable;
2828
use hyperactor_mesh::shared_cell::SharedCell;
2929
use hyperactor_mesh::shared_cell::SharedCellPool;
3030
use hyperactor_mesh::shared_cell::SharedCellRef;
31+
use monarch_rdma::IbverbsConfig;
32+
use monarch_rdma::RdmaManagerActor;
3133
use monarch_types::PickledPyObject;
3234
use ndslice::Shape;
3335
use pyo3::IntoPyObjectExt;
@@ -116,9 +118,14 @@ pub struct PyProcMesh {
116118
proc_events: SharedCell<Mutex<ProcEvents>>,
117119
stop_monitor_sender: mpsc::Sender<bool>,
118120
user_monitor_registered: AtomicBool,
121+
pub(super) rdma_manager: Option<SharedCell<RootActorMesh<'static, RdmaManagerActor>>>,
119122
}
120123

121-
fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'py, PyAny>> {
124+
fn allocate_proc_mesh<'py>(
125+
py: Python<'py>,
126+
alloc: &PyAlloc,
127+
has_tensor_engine: Option<bool>,
128+
) -> PyResult<Bound<'py, PyAny>> {
122129
let alloc = match alloc.take() {
123130
Some(alloc) => alloc,
124131
None => {
@@ -132,11 +139,15 @@ fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'
132139
let mesh = ProcMesh::allocate(alloc)
133140
.await
134141
.map_err(|err| PyException::new_err(err.to_string()))?;
135-
Ok(PyProcMesh::monitored(mesh, world_id))
142+
PyProcMesh::monitored(mesh, world_id, has_tensor_engine.unwrap_or(false)).await
136143
})
137144
}
138145

139-
fn allocate_proc_mesh_blocking<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<PyProcMesh> {
146+
fn allocate_proc_mesh_blocking<'py>(
147+
py: Python<'py>,
148+
alloc: &PyAlloc,
149+
has_tensor_engine: Option<bool>,
150+
) -> PyResult<PyProcMesh> {
140151
let alloc = match alloc.take() {
141152
Some(alloc) => alloc,
142153
None => {
@@ -150,14 +161,18 @@ fn allocate_proc_mesh_blocking<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResul
150161
let mesh = ProcMesh::allocate(alloc)
151162
.await
152163
.map_err(|err| PyException::new_err(err.to_string()))?;
153-
Ok(PyProcMesh::monitored(mesh, world_id))
164+
PyProcMesh::monitored(mesh, world_id, has_tensor_engine.unwrap_or(false)).await
154165
})?
155166
}
156167

157168
impl PyProcMesh {
158169
/// Create a new [`PyProcMesh`] with a monitor that crashes the
159170
/// process on any proc failure.
160-
fn monitored(mut proc_mesh: ProcMesh, world_id: WorldId) -> Self {
171+
async fn monitored(
172+
mut proc_mesh: ProcMesh,
173+
world_id: WorldId,
174+
has_tensor_engine: bool,
175+
) -> Result<Self, PyErr> {
161176
let (sender, abort_receiver) = mpsc::channel::<bool>(1);
162177
let proc_events = SharedCell::from(Mutex::new(proc_mesh.events().unwrap()));
163178
let monitor = tokio::spawn(Self::default_proc_mesh_monitor(
@@ -167,13 +182,34 @@ impl PyProcMesh {
167182
world_id,
168183
abort_receiver,
169184
));
170-
Self {
171-
inner: SharedCell::from(TrackedProcMesh::from(proc_mesh)),
185+
186+
let tracked_proc_mesh = TrackedProcMesh::from(proc_mesh);
187+
188+
// Create optional RDMA manager
189+
let rdma_manager = if has_tensor_engine && monarch_rdma::ibverbs_supported() {
190+
// TODO - make this configurable
191+
let config = IbverbsConfig::default();
192+
tracing::debug!("rdma is enabled, using device {}", config.device);
193+
let actor_mesh = tracked_proc_mesh
194+
.spawn("rdma_manager", &config)
195+
.await
196+
.map_err(|err| PyException::new_err(err.to_string()))?;
197+
Some(actor_mesh)
198+
} else {
199+
if has_tensor_engine {
200+
tracing::info!("rdma is not enabled on this hardware");
201+
}
202+
None
203+
};
204+
205+
Ok(Self {
206+
inner: SharedCell::from(tracked_proc_mesh),
172207
keepalive: Keepalive::new(monitor),
173208
proc_events,
174209
stop_monitor_sender: sender,
175210
user_monitor_registered: AtomicBool::new(false),
176-
}
211+
rdma_manager,
212+
})
177213
}
178214

179215
/// The default monitor of the proc mesh for crashes. If a proc crashes, we print the reason
@@ -224,21 +260,25 @@ impl PyProcMesh {
224260
#[pymethods]
225261
impl PyProcMesh {
226262
#[classmethod]
263+
#[pyo3(signature = (alloc, has_tensor_engine=false))]
227264
fn allocate_nonblocking<'py>(
228265
_cls: &Bound<'_, PyType>,
229266
py: Python<'py>,
230267
alloc: &PyAlloc,
268+
has_tensor_engine: bool,
231269
) -> PyResult<Bound<'py, PyAny>> {
232-
allocate_proc_mesh(py, alloc)
270+
allocate_proc_mesh(py, alloc, Some(has_tensor_engine))
233271
}
234272

235273
#[classmethod]
274+
#[pyo3(signature = (alloc, has_tensor_engine=false))]
236275
fn allocate_blocking<'py>(
237276
_cls: &Bound<'_, PyType>,
238277
py: Python<'py>,
239278
alloc: &PyAlloc,
279+
has_tensor_engine: bool,
240280
) -> PyResult<PyProcMesh> {
241-
allocate_proc_mesh_blocking(py, alloc)
281+
allocate_proc_mesh_blocking(py, alloc, Some(has_tensor_engine))
242282
}
243283

244284
fn spawn_nonblocking<'py>(

monarch_rdma/extension/Cargo.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# @generated by autocargo from //monarch/monarch_rdma/extension:monarch_rdma_extension
2+
3+
[package]
4+
name = "monarch_rdma_extension"
5+
version = "0.0.0"
6+
authors = ["Meta"]
7+
edition = "2021"
8+
license = "BSD-3-Clause"
9+
10+
[lib]
11+
path = "lib.rs"
12+
13+
[dependencies]
14+
hyperactor = { version = "0.0.0", path = "../../hyperactor" }
15+
monarch_hyperactor = { version = "0.0.0", path = "../../monarch_hyperactor" }
16+
monarch_rdma = { version = "0.0.0", path = ".." }
17+
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
18+
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }
19+
serde = { version = "1.0.185", features = ["derive", "rc"] }
20+
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] }

0 commit comments

Comments
 (0)