Skip to content

Commit f2d0f91

Browse files
allenwang28facebook-github-bot
authored andcommitted
(1/N) Creates Rust extension for monarch_rdma (#462)
Summary: This change introduces an `extension` folder for `monarch_rdma`, prepping for Python<>RDMA APIs: - Consistent with other bindings, provides a blocking and non-blocking version. - Corrects behavior of a few edge cases for RDMA components, i.e. `ibverbs_supported` based on the number of devices, supporting a "loopback" case, wherein two actors share an RDMA buffer but are spawned on the same proc. Reviewed By: dstaay-fb Differential Revision: D76937776
1 parent ca2248e commit f2d0f91

File tree

8 files changed

+403
-20
lines changed

8 files changed

+403
-20
lines changed

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ use crate::shape::PyShape;
2929
module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh"
3030
)]
3131
pub struct PythonActorMesh {
32-
pub(super) inner: SharedCell<RootActorMesh<'static, PythonActor>>,
33-
pub(super) client: PyMailbox,
32+
pub inner: SharedCell<RootActorMesh<'static, PythonActor>>,
33+
pub client: PyMailbox,
3434
pub(super) _keepalive: Keepalive,
3535
}
3636

@@ -62,7 +62,7 @@ impl PythonActorMesh {
6262
}
6363

6464
#[getter]
65-
fn client(&self) -> PyMailbox {
65+
pub fn client(&self) -> PyMailbox {
6666
self.client.clone()
6767
}
6868

monarch_hyperactor/src/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ use crate::shape::PyShape;
5252
name = "Mailbox",
5353
module = "monarch._rust_bindings.monarch_hyperactor.mailbox"
5454
)]
55-
pub(super) struct PyMailbox {
56-
pub(super) inner: Mailbox,
55+
pub struct PyMailbox {
56+
pub inner: Mailbox,
5757
}
5858

5959
#[pymethods]

monarch_rdma/extension/Cargo.toml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# @generated by autocargo from //monarch/monarch_rdma/extension:monarch_rdma_extension
2+
3+
[package]
4+
name = "monarch_rdma_extension"
5+
version = "0.0.0"
6+
authors = ["Meta"]
7+
edition = "2021"
8+
license = "BSD-3-Clause"
9+
10+
[lib]
11+
path = "lib.rs"
12+
test = false
13+
doctest = false
14+
15+
[dependencies]
16+
hyperactor = { version = "0.0.0", path = "../../hyperactor" }
17+
monarch_hyperactor = { version = "0.0.0", path = "../../monarch_hyperactor" }
18+
monarch_rdma = { version = "0.0.0", path = ".." }
19+
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
20+
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }
21+
serde = { version = "1.0.185", features = ["derive", "rc"] }
22+
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] }

monarch_rdma/extension/lib.rs

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
#![allow(unsafe_op_in_unsafe_fn)]
10+
use hyperactor::ActorId;
11+
use hyperactor::ActorRef;
12+
use hyperactor::Named;
13+
use hyperactor::ProcId;
14+
use monarch_hyperactor::mailbox::PyMailbox;
15+
use monarch_hyperactor::runtime::signal_safe_block_on;
16+
use monarch_rdma::RdmaBuffer;
17+
use monarch_rdma::RdmaManagerActor;
18+
use monarch_rdma::RdmaManagerMessageClient;
19+
use monarch_rdma::ibverbs_supported;
20+
use pyo3::BoundObject;
21+
use pyo3::exceptions::PyException;
22+
use pyo3::exceptions::PyValueError;
23+
use pyo3::prelude::*;
24+
use pyo3::types::PyTuple;
25+
use pyo3::types::PyType;
26+
use serde::Deserialize;
27+
use serde::Serialize;
28+
29+
macro_rules! setup_rdma_context {
30+
($self:ident, $local_proc_id:expr) => {{
31+
let proc_id: ProcId = $local_proc_id.parse().unwrap();
32+
let local_owner_id = ActorId(proc_id, "rdma_manager".to_string(), 0);
33+
let local_owner_ref: ActorRef<RdmaManagerActor> = ActorRef::attest(local_owner_id);
34+
let buffer = $self.buffer.clone();
35+
(local_owner_ref, buffer)
36+
}};
37+
}
38+
39+
#[pyclass(name = "_RdmaBuffer", module = "monarch._rust_bindings.rdma")]
40+
#[derive(Clone, Serialize, Deserialize, Named)]
41+
struct PyRdmaBuffer {
42+
buffer: RdmaBuffer,
43+
owner_ref: ActorRef<RdmaManagerActor>,
44+
}
45+
46+
async fn create_rdma_buffer(
47+
addr: usize,
48+
size: usize,
49+
proc_id: String,
50+
client: PyMailbox,
51+
) -> PyResult<PyRdmaBuffer> {
52+
// Get the owning RdmaManagerActor's ActorRef
53+
let proc_id: ProcId = proc_id.parse().unwrap();
54+
let owner_id = ActorId(proc_id, "rdma_manager".to_string(), 0);
55+
let owner_ref: ActorRef<RdmaManagerActor> = ActorRef::attest(owner_id);
56+
57+
// Create the RdmaBuffer
58+
let buffer = owner_ref.request_buffer(&client.inner, addr, size).await?;
59+
Ok(PyRdmaBuffer { buffer, owner_ref })
60+
}
61+
62+
#[pymethods]
63+
impl PyRdmaBuffer {
64+
#[classmethod]
65+
fn create_rdma_buffer_blocking<'py>(
66+
_cls: &Bound<'_, PyType>,
67+
py: Python<'py>,
68+
addr: usize,
69+
size: usize,
70+
proc_id: String,
71+
client: PyMailbox,
72+
) -> PyResult<PyRdmaBuffer> {
73+
if !ibverbs_supported() {
74+
return Err(PyException::new_err(
75+
"ibverbs is not supported on this system",
76+
));
77+
}
78+
signal_safe_block_on(py, create_rdma_buffer(addr, size, proc_id, client))?
79+
}
80+
81+
#[classmethod]
82+
fn create_rdma_buffer_nonblocking<'py>(
83+
_cls: &Bound<'_, PyType>,
84+
py: Python<'py>,
85+
addr: usize,
86+
size: usize,
87+
proc_id: String,
88+
client: PyMailbox,
89+
) -> PyResult<Bound<'py, PyAny>> {
90+
if !ibverbs_supported() {
91+
return Err(PyException::new_err(
92+
"ibverbs is not supported on this system",
93+
));
94+
}
95+
pyo3_async_runtimes::tokio::future_into_py(
96+
py,
97+
create_rdma_buffer(addr, size, proc_id, client),
98+
)
99+
}
100+
101+
#[classmethod]
102+
fn rdma_supported<'py>(_cls: &Bound<'_, PyType>, _py: Python<'py>) -> bool {
103+
ibverbs_supported()
104+
}
105+
106+
#[pyo3(name = "__repr__")]
107+
fn repr(&self) -> String {
108+
format!("<RdmaBuffer'{:?}'>", self.buffer)
109+
}
110+
111+
/// Reads data from the local buffer and places it into this remote RDMA buffer.
112+
///
113+
/// This operation appears as "read_into" from the caller's perspective (reading from local memory
114+
/// into the remote buffer), but internally it's implemented as a "write_from" operation on the
115+
/// local buffer since the data flows from the local buffer to the remote one.
116+
///
117+
/// # Arguments
118+
/// * `addr` - The address of the local buffer to read from
119+
/// * `size` - The size of the data to transfer
120+
/// * `local_proc_id` - The process ID where the local buffer resides
121+
/// * `client` - The mailbox for communication
122+
/// * `timeout` - Maximum time in milliseconds to wait for the operation
123+
#[pyo3(signature = (addr, size, local_proc_id, client, timeout))]
124+
fn read_into<'py>(
125+
&self,
126+
py: Python<'py>,
127+
addr: usize,
128+
size: usize,
129+
local_proc_id: String,
130+
client: PyMailbox,
131+
timeout: u64,
132+
) -> PyResult<Bound<'py, PyAny>> {
133+
let (local_owner_ref, buffer) = setup_rdma_context!(self, local_proc_id);
134+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
135+
let local_buffer = local_owner_ref
136+
.request_buffer(&client.inner, addr, size)
137+
.await?;
138+
let _result_ = local_buffer
139+
.write_from(&client.inner, buffer, timeout)
140+
.await
141+
.map_err(|e| PyException::new_err(format!("failed to read into buffer: {}", e)))?;
142+
Ok(())
143+
})
144+
}
145+
146+
/// Reads data from the local buffer and places it into this remote RDMA buffer.
147+
///
148+
/// This operation appears as "read_into" from the caller's perspective (reading from local memory
149+
/// into the remote buffer), but internally it's implemented as a "write_from" operation on the
150+
/// local buffer since the data flows from the local buffer to the remote one.
151+
///
152+
/// This is the blocking version of `read_into`, compatible with non asyncio Python code.
153+
///
154+
/// # Arguments
155+
/// * `addr` - The address of the local buffer to read from
156+
/// * `size` - The size of the data to transfer
157+
/// * `local_proc_id` - The process ID where the local buffer resides
158+
/// * `client` - The mailbox for communication
159+
/// * `timeout` - Maximum time in milliseconds to wait for the operation
160+
#[pyo3(signature = (addr, size, local_proc_id, client, timeout))]
161+
fn read_into_blocking<'py>(
162+
&self,
163+
py: Python<'py>,
164+
addr: usize,
165+
size: usize,
166+
local_proc_id: String,
167+
client: PyMailbox,
168+
timeout: u64,
169+
) -> PyResult<bool> {
170+
let (local_owner_ref, buffer) = setup_rdma_context!(self, local_proc_id);
171+
signal_safe_block_on(py, async move {
172+
let local_buffer = local_owner_ref
173+
.request_buffer(&client.inner, addr, size)
174+
.await?;
175+
local_buffer
176+
.write_from(&client.inner, buffer, timeout)
177+
.await
178+
.map_err(|e| PyException::new_err(format!("failed to read into buffer: {}", e)))
179+
})?
180+
}
181+
182+
/// Writes data from this remote RDMA buffer into a local buffer.
183+
///
184+
/// This operation appears as "write_from" from the caller's perspective (writing from the remote
185+
/// buffer into local memory), but internally it's implemented as a "read_into" operation on the
186+
/// local buffer since the data flows from the remote buffer to the local one.
187+
///
188+
/// # Arguments
189+
/// * `addr` - The address of the local buffer to write to
190+
/// * `size` - The size of the data to transfer
191+
/// * `local_proc_id` - The process ID where the local buffer resides
192+
/// * `client` - The mailbox for communication
193+
/// * `timeout` - Maximum time in milliseconds to wait for the operation
194+
#[pyo3(signature = (addr, size, local_proc_id, client, timeout))]
195+
fn write_from<'py>(
196+
&self,
197+
py: Python<'py>,
198+
addr: usize,
199+
size: usize,
200+
local_proc_id: String,
201+
client: PyMailbox,
202+
timeout: u64,
203+
) -> PyResult<Bound<'py, PyAny>> {
204+
let (local_owner_ref, buffer) = setup_rdma_context!(self, local_proc_id);
205+
pyo3_async_runtimes::tokio::future_into_py(py, async move {
206+
let local_buffer = local_owner_ref
207+
.request_buffer(&client.inner, addr, size)
208+
.await?;
209+
let _result_ = local_buffer
210+
.read_into(&client.inner, buffer, timeout)
211+
.await
212+
.map_err(|e| PyException::new_err(format!("failed to write from buffer: {}", e)))?;
213+
Ok(())
214+
})
215+
}
216+
217+
/// Writes data from this remote RDMA buffer into a local buffer.
218+
///
219+
/// This operation appears as "write_from" from the caller's perspective (writing from the remote
220+
/// buffer into local memory), but internally it's implemented as a "read_into" operation on the
221+
/// local buffer since the data flows from the remote buffer to the local one.
222+
///
223+
/// This is the blocking version of `write_from`, compatible with non asyncio Python code.
224+
///
225+
/// # Arguments
226+
/// * `addr` - The address of the local buffer to write to
227+
/// * `size` - The size of the data to transfer
228+
/// * `local_proc_id` - The process ID where the local buffer resides
229+
/// * `client` - The mailbox for communication
230+
/// * `timeout` - Maximum time in milliseconds to wait for the operation
231+
#[pyo3(signature = (addr, size, local_proc_id, client, timeout))]
232+
fn write_from_blocking<'py>(
233+
&self,
234+
py: Python<'py>,
235+
addr: usize,
236+
size: usize,
237+
local_proc_id: String,
238+
client: PyMailbox,
239+
timeout: u64,
240+
) -> PyResult<bool> {
241+
let (local_owner_ref, buffer) = setup_rdma_context!(self, local_proc_id);
242+
signal_safe_block_on(py, async move {
243+
let local_buffer = local_owner_ref
244+
.request_buffer(&client.inner, addr, size)
245+
.await?;
246+
local_buffer
247+
.read_into(&client.inner, buffer, timeout)
248+
.await
249+
.map_err(|e| PyException::new_err(format!("failed to write from buffer: {}", e)))
250+
})?
251+
}
252+
253+
fn __reduce__(&self) -> PyResult<(PyObject, PyObject)> {
254+
Python::with_gil(|py| {
255+
let ctor = py.get_type::<PyRdmaBuffer>().to_object(py);
256+
let json = serde_json::to_string(self).map_err(|e| {
257+
PyErr::new::<PyValueError, _>(format!("Serialization failed: {}", e))
258+
})?;
259+
260+
let args = PyTuple::new_bound(py, [json]).into_py(py);
261+
Ok((ctor, args))
262+
})
263+
}
264+
265+
#[new]
266+
fn new_from_json(json: &str) -> PyResult<Self> {
267+
let deserialized: PyRdmaBuffer = serde_json::from_str(json)
268+
.map_err(|e| PyErr::new::<PyValueError, _>(format!("Deserialization failed: {}", e)))?;
269+
Ok(deserialized)
270+
}
271+
272+
fn drop<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
273+
// no op with CPUs, currently a stub.
274+
// TODO - replace with correct GPU behavior.
275+
pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(()) })
276+
}
277+
278+
fn drop_blocking<'py>(&self, py: Python<'py>) -> PyResult<()> {
279+
signal_safe_block_on(py, async move {
280+
// no op with CPUs, currently a stub.
281+
// TODO - replace with correct GPU behavior.
282+
Ok(())
283+
})?
284+
}
285+
}
286+
287+
pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
288+
module.add_class::<PyRdmaBuffer>()?;
289+
Ok(())
290+
}

monarch_rdma/src/ibverbs_primitives.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -595,9 +595,8 @@ pub fn ibverbs_supported() -> bool {
595595
let device_list = rdmacore_sys::ibv_get_device_list(&mut num_devices);
596596
if !device_list.is_null() {
597597
rdmacore_sys::ibv_free_device_list(device_list);
598-
return true;
599598
}
600-
false
599+
num_devices > 0
601600
}
602601
}
603602

monarch_rdma/src/rdma_components.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl RdmaBuffer {
9090
/// # Returns
9191
/// `Ok(bool)` indicating if the operation completed successfully.
9292
pub async fn read_into(
93-
&mut self,
93+
&self,
9494
client: &Mailbox,
9595
remote: RdmaBuffer,
9696
timeout: u64,
@@ -124,7 +124,7 @@ impl RdmaBuffer {
124124
/// # Returns
125125
/// `Ok(bool)` indicating if the operation completed successfully.
126126
pub async fn write_from(
127-
&mut self,
127+
&self,
128128
client: &Mailbox,
129129
remote: RdmaBuffer,
130130
timeout: u64,
@@ -477,7 +477,7 @@ impl RdmaQueuePair {
477477
pd: *mut rdmacore_sys::ibv_pd,
478478
config: IbverbsConfig,
479479
) -> Result<Self, anyhow::Error> {
480-
tracing::info!("creating an RdmaQueuePair from config {}", config);
480+
tracing::debug!("creating an RdmaQueuePair from config {}", config);
481481
// SAFETY:
482482
// This code uses unsafe rdmacore_sys calls to interact with the RDMA device, but is safe because:
483483
// - All pointers are properly initialized and checked for null before use

0 commit comments

Comments
 (0)