Skip to content

Commit 63ce0c9

Browse files
allenwang28facebook-github-bot
authored andcommitted
Expose RDMA support through Python APIs (#462)
Summary: This change brings in RDMA support from Rust to the Python APIs, overwriting the existing stand-in RDMABuffer APIs. There are quite a few changes, so here's a high level overview: ## `monarch_hyperactor` (rust) Within `PyProcMesh`, adds an optional RDMAMangerActor attribute. This mimics the prior functionality - before, a `proc_mesh` (in Python) would unconditionally spin up an `RdmaManagerActor`. Now, a `proc_mesh` will spin up an `RdmaManagerActor` if and only if `tensor_engine` is enabled and `ibverbs` APIs are supported. A few other pieces must be accessible within the RDMA extension library - specifically `Mailbox` / the send caps capabiliites. ## RDMA extension Adds an `extension` folder within `monarch_rdma` which contains the actual bindings. Consistent with other bindings, provides a blocking and non-blocking version. ## RDMA components Corrects behavior of a few edge cases, i.e. `ibverbs_supported` based on the number of devices, supporting a "loopback" case, wherein two actors share an RDMA buffer but are spawned on the same proc. ## Python Removes the stand-in RDMAManagerActor In rdma.py, adds in basic support for RDMABuffer CPU. While CUDA<>CPU|CUDA is implemented in `monarch_rdma`, this was running into issues with MR registration. Logging shows that the code path correctly interprets the address as CUDA, but `cuMemGetHandleForAddressRange` returns the handle/fd as `-1` and a null pointer MR. RDMABuffer will only support CPU for now with GPU support as a follow up, cc dstaay-fb # Test Updates tests for anything touching RDMABuffer. Splits out `test_python_actor` into `test_python_actor` and `test_rdma` now that RDMA is utilizing the backend network. Differential Revision: D76937776
1 parent 5077852 commit 63ce0c9

File tree

22 files changed

+890
-402
lines changed

22 files changed

+890
-402
lines changed

examples/grpo_actor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import torch.optim as optim
1616

1717
from monarch.actor import Actor, ActorMeshRef, endpoint, proc_mesh
18-
from monarch.rdma import RDMABuffer
18+
from monarch.tensor_engine import RDMABuffer
1919
from torch.distributions import Categorical, kl_divergence
2020

2121
"""

mode/tensor_engine

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
--config
2+
monarch.enable_tensor_engine=true

monarch_extension/Cargo.toml

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,14 @@ doctest = false
1414
crate-type = ["cdylib"]
1515

1616
[dependencies]
17-
anyhow = "1.0.98"
18-
async-trait = "0.1.86"
19-
bincode = "1.3.3"
20-
clap = { version = "4.5.38", features = ["derive", "env", "string", "unicode", "wrap_help"] }
21-
controller = { version = "0.0.0", path = "../controller", optional = true }
22-
hyperactor = { version = "0.0.0", path = "../hyperactor" }
23-
hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" }
24-
hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiprocess" }
25-
libc = "0.2.139"
26-
monarch_hyperactor = { version = "0.0.0", path = "../monarch_hyperactor" }
27-
monarch_messages = { version = "0.0.0", path = "../monarch_messages", optional = true }
28-
monarch_simulator_lib = { version = "0.0.0", path = "../monarch_simulator", optional = true }
29-
monarch_tensor_worker = { version = "0.0.0", path = "../monarch_tensor_worker", optional = true }
30-
monarch_types = { version = "0.0.0", path = "../monarch_types" }
17+
controller = { path = "../controller", optional = true }
18+
monarch_messages = { path = "../monarch_messages", optional = true }
19+
monarch_simulator_lib = { path = "../monarch_simulator", optional = true }
20+
monarch_tensor_worker = { path = "../monarch_tensor_worker", optional = true }
3121
nccl-sys = { path = "../nccl-sys", optional = true }
32-
ndslice = { version = "0.0.0", path = "../ndslice" }
33-
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
34-
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }
35-
serde = { version = "1.0.185", features = ["derive", "rc"] }
36-
tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] }
37-
torch-sys = { version = "0.0.0", path = "../torch-sys", optional = true }
38-
torch-sys-cuda = { version = "0.0.0", path = "../torch-sys-cuda", optional = true }
39-
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
22+
torch-sys = { path = "../torch-sys", optional = true }
23+
torch-sys-cuda = { path = "../torch-sys-cuda", optional = true }
4024

4125
[features]
42-
default = ["tensor_engine"]
43-
tensor_engine = ["dep:controller", "dep:monarch_messages", "dep:monarch_simulator_lib", "dep:monarch_tensor_worker", "dep:nccl-sys", "dep:torch-sys", "dep:torch-sys-cuda"]
26+
default = []
27+
tensor_engine = ["dep:controller", "dep:monarch_messages", "dep:monarch_simulator_lib", "dep:monarch_tensor_worker", "dep:nccl-sys", "dep:torch-sys", "dep:torch-sys-cuda", "fbcode//monarch/monarch_rdma/extension:monarch_rdma_extension"]

monarch_extension/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
120120
module,
121121
"monarch_extension.mesh_controller",
122122
)?)?;
123+
monarch_rdma_extension::register_python_bindings(&get_or_add_new_module(module, "rdma")?)?;
123124
}
124125
simulation_tools::register_python_bindings(&get_or_add_new_module(
125126
module,
@@ -166,6 +167,7 @@ pub fn mod_init(module: &Bound<'_, PyModule>) -> PyResult<()> {
166167
module,
167168
"monarch_hyperactor.runtime",
168169
)?)?;
170+
169171
monarch_hyperactor::telemetry::register_python_bindings(&get_or_add_new_module(
170172
module,
171173
"monarch_hyperactor.telemetry",

monarch_hyperactor/Cargo.toml

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,15 @@ license = "BSD-3-Clause"
99

1010
[dependencies]
1111
anyhow = "1.0.98"
12-
async-trait = "0.1.86"
13-
bincode = "1.3.3"
1412
clap = { version = "4.5.38", features = ["derive", "env", "string", "unicode", "wrap_help"] }
15-
erased-serde = "0.3.27"
16-
fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
1713
hyperactor = { version = "0.0.0", path = "../hyperactor" }
1814
hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" }
19-
hyperactor_multiprocess = { version = "0.0.0", path = "../hyperactor_multiprocess" }
20-
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
21-
inventory = "0.3.8"
22-
monarch_types = { version = "0.0.0", path = "../monarch_types" }
23-
ndslice = { version = "0.0.0", path = "../ndslice" }
24-
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
25-
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }
26-
serde = { version = "1.0.185", features = ["derive", "rc"] }
27-
serde_bytes = "0.11"
28-
thiserror = "2.0.12"
2915
tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] }
3016
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
17+
18+
[dev-dependencies]
19+
ndslice = { version = "0.0.0", path = "../ndslice" }
20+
21+
[features]
22+
default = []
23+
tensor_engine = ["dep:monarch_rdma"]

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ impl PythonActorMesh {
6262
}
6363

6464
#[getter]
65-
fn client(&self) -> PyMailbox {
65+
pub fn client(&self) -> PyMailbox {
6666
self.client.clone()
6767
}
6868

monarch_hyperactor/src/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ use crate::shape::PyShape;
5252
name = "Mailbox",
5353
module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
5454
)]
55-
pub(super) struct PyMailbox {
56-
pub(super) inner: Mailbox,
55+
pub struct PyMailbox {
56+
pub inner: Mailbox,
5757
}
5858

5959
#[pymethods]

monarch_hyperactor/src/proc_mesh.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ use hyperactor_mesh::proc_mesh::SharedSpawnable;
2727
use hyperactor_mesh::shared_cell::SharedCell;
2828
use hyperactor_mesh::shared_cell::SharedCellPool;
2929
use hyperactor_mesh::shared_cell::SharedCellRef;
30+
#[cfg(feature = "tensor_engine")]
31+
use monarch_rdma::IbverbsConfig;
32+
#[cfg(feature = "tensor_engine")]
33+
use monarch_rdma::RdmaManagerActor;
3034
use monarch_types::PickledPyObject;
3135
use ndslice::Shape;
3236
use pyo3::IntoPyObjectExt;
@@ -116,6 +120,8 @@ pub struct PyProcMesh {
116120
proc_events: SharedCell<Mutex<ProcEvents>>,
117121
stop_monitor_sender: mpsc::Sender<bool>,
118122
user_monitor_registered: AtomicBool,
123+
#[cfg(feature = "tensor_engine")]
124+
pub(super) rdma_manager: Option<SharedCell<RootActorMesh<'static, RdmaManagerActor>>>,
119125
}
120126

121127
fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'py, PyAny>> {
@@ -132,7 +138,7 @@ fn allocate_proc_mesh<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResult<Bound<'
132138
let mesh = ProcMesh::allocate(alloc)
133139
.await
134140
.map_err(|err| PyException::new_err(err.to_string()))?;
135-
Ok(PyProcMesh::monitored(mesh, world_id))
141+
PyProcMesh::monitored(mesh, world_id).await
136142
})
137143
}
138144

@@ -150,14 +156,14 @@ fn allocate_proc_mesh_blocking<'py>(py: Python<'py>, alloc: &PyAlloc) -> PyResul
150156
let mesh = ProcMesh::allocate(alloc)
151157
.await
152158
.map_err(|err| PyException::new_err(err.to_string()))?;
153-
Ok(PyProcMesh::monitored(mesh, world_id))
159+
PyProcMesh::monitored(mesh, world_id).await
154160
})?
155161
}
156162

157163
impl PyProcMesh {
158164
/// Create a new [`PyProcMesh`] with a monitor that crashes the
159165
/// process on any proc failure.
160-
fn monitored(mut proc_mesh: ProcMesh, world_id: WorldId) -> Self {
166+
async fn monitored(mut proc_mesh: ProcMesh, world_id: WorldId) -> Result<Self, PyErr> {
161167
let (sender, abort_receiver) = mpsc::channel::<bool>(1);
162168
let proc_events = SharedCell::from(Mutex::new(proc_mesh.events().unwrap()));
163169
let monitor = tokio::spawn(Self::default_proc_mesh_monitor(
@@ -167,13 +173,34 @@ impl PyProcMesh {
167173
world_id,
168174
abort_receiver,
169175
));
170-
Self {
171-
inner: SharedCell::from(TrackedProcMesh::from(proc_mesh)),
176+
177+
let tracked_proc_mesh: TrackedProcMesh = TrackedProcMesh::from(proc_mesh);
178+
179+
// Create optional RDMA manager
180+
#[cfg(feature = "tensor_engine")]
181+
let rdma_manager = if monarch_rdma::ibverbs_supported() {
182+
// TODO - make this configurable
183+
let config = IbverbsConfig::default();
184+
tracing::debug!("rdma is enabled, using device {}", config.device);
185+
let actor_mesh = tracked_proc_mesh
186+
.spawn("rdma_manager", &config)
187+
.await
188+
.map_err(|err| PyException::new_err(err.to_string()))?;
189+
Some(actor_mesh)
190+
} else {
191+
tracing::info!("rdma is not enabled on this hardware");
192+
None
193+
};
194+
195+
Ok(Self {
196+
inner: SharedCell::from(tracked_proc_mesh),
172197
keepalive: Keepalive::new(monitor),
173198
proc_events,
174199
stop_monitor_sender: sender,
175200
user_monitor_registered: AtomicBool::new(false),
176-
}
201+
#[cfg(feature = "tensor_engine")]
202+
rdma_manager,
203+
})
177204
}
178205

179206
/// The default monitor of the proc mesh for crashes. If a proc crashes, we print the reason

monarch_rdma/extension/Cargo.toml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# @generated by autocargo from //monarch/monarch_rdma/extension:monarch_rdma_extension
2+
3+
[package]
4+
name = "monarch_rdma_extension"
5+
version = "0.0.0"
6+
authors = ["Meta"]
7+
edition = "2021"
8+
license = "BSD-3-Clause"
9+
10+
[lib]
11+
path = "lib.rs"
12+
13+
[dependencies]
14+
hyperactor = { version = "0.0.0", path = "../../hyperactor" }
15+
monarch_hyperactor = { version = "0.0.0", path = "../../monarch_hyperactor" }
16+
monarch_rdma = { version = "0.0.0", path = ".." }
17+
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
18+
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }
19+
serde = { version = "1.0.185", features = ["derive", "rc"] }
20+
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] }

0 commit comments

Comments
 (0)