diff --git a/monarch_hyperactor/src/actor_mesh.rs b/monarch_hyperactor/src/actor_mesh.rs index acac417e..9c706e30 100644 --- a/monarch_hyperactor/src/actor_mesh.rs +++ b/monarch_hyperactor/src/actor_mesh.rs @@ -29,8 +29,8 @@ use crate::shape::PyShape; module = "monarch._rust_bindings.monarch_hyperactor.actor_mesh" )] pub struct PythonActorMesh { - pub(super) inner: SharedCell>, - pub(super) client: PyMailbox, + pub inner: SharedCell>, + pub client: PyMailbox, pub(super) _keepalive: Keepalive, } @@ -62,7 +62,7 @@ impl PythonActorMesh { } #[getter] - fn client(&self) -> PyMailbox { + pub fn client(&self) -> PyMailbox { self.client.clone() } diff --git a/monarch_hyperactor/src/mailbox.rs b/monarch_hyperactor/src/mailbox.rs index 47f503b6..38bc5997 100644 --- a/monarch_hyperactor/src/mailbox.rs +++ b/monarch_hyperactor/src/mailbox.rs @@ -52,8 +52,8 @@ use crate::shape::PyShape; name = "Mailbox", module = "monarch._rust_bindings.monarch_hyperactor.mailbox" )] -pub(super) struct PyMailbox { - pub(super) inner: Mailbox, +pub struct PyMailbox { + pub inner: Mailbox, } #[pymethods] diff --git a/monarch_rdma/extension/Cargo.toml b/monarch_rdma/extension/Cargo.toml new file mode 100644 index 00000000..cbb88be3 --- /dev/null +++ b/monarch_rdma/extension/Cargo.toml @@ -0,0 +1,22 @@ +# @generated by autocargo from //monarch/monarch_rdma/extension:monarch_rdma_extension + +[package] +name = "monarch_rdma_extension" +version = "0.0.0" +authors = ["Meta"] +edition = "2021" +license = "BSD-3-Clause" + +[lib] +path = "lib.rs" +test = false +doctest = false + +[dependencies] +hyperactor = { version = "0.0.0", path = "../../hyperactor" } +monarch_hyperactor = { version = "0.0.0", path = "../../monarch_hyperactor" } +monarch_rdma = { version = "0.0.0", path = ".." } +pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] } +pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] } +serde = { version = "1.0.185", features = ["derive", "rc"] } +serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] } diff --git a/monarch_rdma/extension/lib.rs b/monarch_rdma/extension/lib.rs new file mode 100644 index 00000000..70ffeeb6 --- /dev/null +++ b/monarch_rdma/extension/lib.rs @@ -0,0 +1,290 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#![allow(unsafe_op_in_unsafe_fn)] +use hyperactor::ActorId; +use hyperactor::ActorRef; +use hyperactor::Named; +use hyperactor::ProcId; +use monarch_hyperactor::mailbox::PyMailbox; +use monarch_hyperactor::runtime::signal_safe_block_on; +use monarch_rdma::RdmaBuffer; +use monarch_rdma::RdmaManagerActor; +use monarch_rdma::RdmaManagerMessageClient; +use monarch_rdma::ibverbs_supported; +use pyo3::BoundObject; +use pyo3::exceptions::PyException; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::*; +use pyo3::types::PyTuple; +use pyo3::types::PyType; +use serde::Deserialize; +use serde::Serialize; + +macro_rules! setup_rdma_context { + ($self:ident, $local_proc_id:expr) => {{ + let proc_id: ProcId = $local_proc_id.parse().unwrap(); + let local_owner_id = ActorId(proc_id, "rdma_manager".to_string(), 0); + let local_owner_ref: ActorRef = ActorRef::attest(local_owner_id); + let buffer = $self.buffer.clone(); + (local_owner_ref, buffer) + }}; +} + +#[pyclass(name = "_RdmaBuffer", module = "monarch._rust_bindings.rdma")] +#[derive(Clone, Serialize, Deserialize, Named)] +struct PyRdmaBuffer { + buffer: RdmaBuffer, + owner_ref: ActorRef, +} + +async fn create_rdma_buffer( + addr: usize, + size: usize, + proc_id: String, + client: PyMailbox, +) -> PyResult { + // Get the owning RdmaManagerActor's ActorRef + let proc_id: ProcId = proc_id.parse().unwrap(); + let owner_id = ActorId(proc_id, "rdma_manager".to_string(), 0); + let owner_ref: ActorRef = ActorRef::attest(owner_id); + + // Create the RdmaBuffer + let buffer = owner_ref.request_buffer(&client.inner, addr, size).await?; + Ok(PyRdmaBuffer { buffer, owner_ref }) +} + +#[pymethods] +impl PyRdmaBuffer { + #[classmethod] + fn create_rdma_buffer_blocking<'py>( + _cls: &Bound<'_, PyType>, + py: Python<'py>, + addr: usize, + size: usize, + proc_id: String, + client: PyMailbox, + ) -> PyResult { + if !ibverbs_supported() { + return Err(PyException::new_err( + "ibverbs is not supported on this system", + )); + } + signal_safe_block_on(py, create_rdma_buffer(addr, size, proc_id, client))? + } + + #[classmethod] + fn create_rdma_buffer_nonblocking<'py>( + _cls: &Bound<'_, PyType>, + py: Python<'py>, + addr: usize, + size: usize, + proc_id: String, + client: PyMailbox, + ) -> PyResult> { + if !ibverbs_supported() { + return Err(PyException::new_err( + "ibverbs is not supported on this system", + )); + } + pyo3_async_runtimes::tokio::future_into_py( + py, + create_rdma_buffer(addr, size, proc_id, client), + ) + } + + #[classmethod] + fn rdma_supported<'py>(_cls: &Bound<'_, PyType>, _py: Python<'py>) -> bool { + ibverbs_supported() + } + + #[pyo3(name = "__repr__")] + fn repr(&self) -> String { + format!("", self.buffer) + } + + /// Reads data from the local buffer and places it into this remote RDMA buffer. + /// + /// This operation appears as "read_into" from the caller's perspective (reading from local memory + /// into the remote buffer), but internally it's implemented as a "write_from" operation on the + /// local buffer since the data flows from the local buffer to the remote one. + /// + /// # Arguments + /// * `addr` - The address of the local buffer to read from + /// * `size` - The size of the data to transfer + /// * `local_proc_id` - The process ID where the local buffer resides + /// * `client` - The mailbox for communication + /// * `timeout` - Maximum time in milliseconds to wait for the operation + #[pyo3(signature = (addr, size, local_proc_id, client, timeout))] + fn read_into<'py>( + &self, + py: Python<'py>, + addr: usize, + size: usize, + local_proc_id: String, + client: PyMailbox, + timeout: u64, + ) -> PyResult> { + let (local_owner_ref, buffer) = setup_rdma_context!(self, local_proc_id); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let local_buffer = local_owner_ref + .request_buffer(&client.inner, addr, size) + .await?; + let _result_ = local_buffer + .write_from(&client.inner, buffer, timeout) + .await + .map_err(|e| PyException::new_err(format!("failed to read into buffer: {}", e)))?; + Ok(()) + }) + } + + /// Reads data from the local buffer and places it into this remote RDMA buffer. + /// + /// This operation appears as "read_into" from the caller's perspective (reading from local memory + /// into the remote buffer), but internally it's implemented as a "write_from" operation on the + /// local buffer since the data flows from the local buffer to the remote one. + /// + /// This is the blocking version of `read_into`, compatible with non asyncio Python code. + /// + /// # Arguments + /// * `addr` - The address of the local buffer to read from + /// * `size` - The size of the data to transfer + /// * `local_proc_id` - The process ID where the local buffer resides + /// * `client` - The mailbox for communication + /// * `timeout` - Maximum time in milliseconds to wait for the operation + #[pyo3(signature = (addr, size, local_proc_id, client, timeout))] + fn read_into_blocking<'py>( + &self, + py: Python<'py>, + addr: usize, + size: usize, + local_proc_id: String, + client: PyMailbox, + timeout: u64, + ) -> PyResult { + let (local_owner_ref, buffer) = setup_rdma_context!(self, local_proc_id); + signal_safe_block_on(py, async move { + let local_buffer = local_owner_ref + .request_buffer(&client.inner, addr, size) + .await?; + local_buffer + .write_from(&client.inner, buffer, timeout) + .await + .map_err(|e| PyException::new_err(format!("failed to read into buffer: {}", e))) + })? + } + + /// Writes data from this remote RDMA buffer into a local buffer. + /// + /// This operation appears as "write_from" from the caller's perspective (writing from the remote + /// buffer into local memory), but internally it's implemented as a "read_into" operation on the + /// local buffer since the data flows from the remote buffer to the local one. + /// + /// # Arguments + /// * `addr` - The address of the local buffer to write to + /// * `size` - The size of the data to transfer + /// * `local_proc_id` - The process ID where the local buffer resides + /// * `client` - The mailbox for communication + /// * `timeout` - Maximum time in milliseconds to wait for the operation + #[pyo3(signature = (addr, size, local_proc_id, client, timeout))] + fn write_from<'py>( + &self, + py: Python<'py>, + addr: usize, + size: usize, + local_proc_id: String, + client: PyMailbox, + timeout: u64, + ) -> PyResult> { + let (local_owner_ref, buffer) = setup_rdma_context!(self, local_proc_id); + pyo3_async_runtimes::tokio::future_into_py(py, async move { + let local_buffer = local_owner_ref + .request_buffer(&client.inner, addr, size) + .await?; + let _result_ = local_buffer + .read_into(&client.inner, buffer, timeout) + .await + .map_err(|e| PyException::new_err(format!("failed to write from buffer: {}", e)))?; + Ok(()) + }) + } + + /// Writes data from this remote RDMA buffer into a local buffer. + /// + /// This operation appears as "write_from" from the caller's perspective (writing from the remote + /// buffer into local memory), but internally it's implemented as a "read_into" operation on the + /// local buffer since the data flows from the remote buffer to the local one. + /// + /// This is the blocking version of `write_from`, compatible with non asyncio Python code. + /// + /// # Arguments + /// * `addr` - The address of the local buffer to write to + /// * `size` - The size of the data to transfer + /// * `local_proc_id` - The process ID where the local buffer resides + /// * `client` - The mailbox for communication + /// * `timeout` - Maximum time in milliseconds to wait for the operation + #[pyo3(signature = (addr, size, local_proc_id, client, timeout))] + fn write_from_blocking<'py>( + &self, + py: Python<'py>, + addr: usize, + size: usize, + local_proc_id: String, + client: PyMailbox, + timeout: u64, + ) -> PyResult { + let (local_owner_ref, buffer) = setup_rdma_context!(self, local_proc_id); + signal_safe_block_on(py, async move { + let local_buffer = local_owner_ref + .request_buffer(&client.inner, addr, size) + .await?; + local_buffer + .read_into(&client.inner, buffer, timeout) + .await + .map_err(|e| PyException::new_err(format!("failed to write from buffer: {}", e))) + })? + } + + fn __reduce__(&self) -> PyResult<(PyObject, PyObject)> { + Python::with_gil(|py| { + let ctor = py.get_type::().to_object(py); + let json = serde_json::to_string(self).map_err(|e| { + PyErr::new::(format!("Serialization failed: {}", e)) + })?; + + let args = PyTuple::new_bound(py, [json]).into_py(py); + Ok((ctor, args)) + }) + } + + #[new] + fn new_from_json(json: &str) -> PyResult { + let deserialized: PyRdmaBuffer = serde_json::from_str(json) + .map_err(|e| PyErr::new::(format!("Deserialization failed: {}", e)))?; + Ok(deserialized) + } + + fn drop<'py>(&self, py: Python<'py>) -> PyResult> { + // no op with CPUs, currently a stub. + // TODO - replace with correct GPU behavior. + pyo3_async_runtimes::tokio::future_into_py(py, async move { Ok(()) }) + } + + fn drop_blocking<'py>(&self, py: Python<'py>) -> PyResult<()> { + signal_safe_block_on(py, async move { + // no op with CPUs, currently a stub. + // TODO - replace with correct GPU behavior. + Ok(()) + })? + } +} + +pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> { + module.add_class::()?; + Ok(()) +} diff --git a/monarch_rdma/src/ibverbs_primitives.rs b/monarch_rdma/src/ibverbs_primitives.rs index fc4df446..90ef5c3e 100644 --- a/monarch_rdma/src/ibverbs_primitives.rs +++ b/monarch_rdma/src/ibverbs_primitives.rs @@ -595,9 +595,8 @@ pub fn ibverbs_supported() -> bool { let device_list = rdmacore_sys::ibv_get_device_list(&mut num_devices); if !device_list.is_null() { rdmacore_sys::ibv_free_device_list(device_list); - return true; } - false + num_devices > 0 } } diff --git a/monarch_rdma/src/rdma_components.rs b/monarch_rdma/src/rdma_components.rs index b5d34c55..d4153f2c 100644 --- a/monarch_rdma/src/rdma_components.rs +++ b/monarch_rdma/src/rdma_components.rs @@ -90,7 +90,7 @@ impl RdmaBuffer { /// # Returns /// `Ok(bool)` indicating if the operation completed successfully. pub async fn read_into( - &mut self, + &self, client: &Mailbox, remote: RdmaBuffer, timeout: u64, @@ -124,7 +124,7 @@ impl RdmaBuffer { /// # Returns /// `Ok(bool)` indicating if the operation completed successfully. pub async fn write_from( - &mut self, + &self, client: &Mailbox, remote: RdmaBuffer, timeout: u64, @@ -477,7 +477,7 @@ impl RdmaQueuePair { pd: *mut rdmacore_sys::ibv_pd, config: IbverbsConfig, ) -> Result { - tracing::info!("creating an RdmaQueuePair from config {}", config); + tracing::debug!("creating an RdmaQueuePair from config {}", config); // SAFETY: // This code uses unsafe rdmacore_sys calls to interact with the RDMA device, but is safe because: // - All pointers are properly initialized and checked for null before use diff --git a/monarch_rdma/src/rdma_manager_actor.rs b/monarch_rdma/src/rdma_manager_actor.rs index 84c51e4e..620feb99 100644 --- a/monarch_rdma/src/rdma_manager_actor.rs +++ b/monarch_rdma/src/rdma_manager_actor.rs @@ -230,17 +230,25 @@ impl RdmaManagerMessageHandler for RdmaManagerActor { remote: ActorRef, ) -> Result { if !self.is_connected(cx, remote.clone()).await? { - self.initialize_qp(cx, remote.clone()).await?; - remote.initialize_qp(cx, cx.bind().clone()).await?; - - let remote_endpoint = remote.connection_info(cx, cx.bind().clone()).await?; - self.connect(cx, remote.clone(), remote_endpoint).await?; - - let local_endpoint = self.connection_info(cx, remote.clone()).await?; - remote - .connect(cx, cx.bind().clone(), local_endpoint) - .await?; + let is_loopback = + remote.actor_id().clone() == cx.bind::().actor_id().clone(); + + if is_loopback { + self.initialize_qp(cx, remote.clone()).await?; + let endpoint = self.connection_info(cx, remote.clone()).await?; + self.connect(cx, remote.clone(), endpoint).await?; + } else { + self.initialize_qp(cx, remote.clone()).await?; + remote.initialize_qp(cx, cx.bind().clone()).await?; + let remote_endpoint = remote.connection_info(cx, cx.bind().clone()).await?; + self.connect(cx, remote.clone(), remote_endpoint).await?; + let local_endpoint = self.connection_info(cx, remote.clone()).await?; + remote + .connect(cx, cx.bind().clone(), local_endpoint) + .await?; + } } + let qp = self .qp_map .get_mut(&remote.actor_id().clone()) diff --git a/python/monarch/_rust_bindings/rdma/__init__.pyi b/python/monarch/_rust_bindings/rdma/__init__.pyi new file mode 100644 index 00000000..e5d0f92b --- /dev/null +++ b/python/monarch/_rust_bindings/rdma/__init__.pyi @@ -0,0 +1,64 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +from typing import Any, final, Optional + +@final +class _RdmaMemoryRegionView: + def __init__(self, addr: int, size_in_bytes: int) -> None: ... + +@final +class _RdmaBuffer: + name: str + + @classmethod + def create_rdma_buffer_blocking( + cls, addr: int, size: int, proc_id: str, client: Any + ) -> _RdmaBuffer: ... + @classmethod + async def create_rdma_buffer_nonblocking( + cls, addr: int, size: int, proc_id: str, client: Any + ) -> Any: ... + async def drop(self, client: Any): ... + def drop_blocking(self, client: Any): ... + async def read_into( + self, + addr: int, + size: int, + local_proc_id: str, + client: Any, + timeout: int, + ) -> Any: ... + def read_into_blocking( + self, + addr: int, + size: int, + local_proc_id: str, + client: Any, + timeout: int, + ) -> Any: ... + async def write_from( + self, + addr: int, + size: int, + local_proc_id: str, + client: Any, + timeout: int, + ) -> Any: ... + def write_from_blocking( + self, + addr: int, + size: int, + local_proc_id: str, + client: Any, + timeout: int, + ) -> Any: ... + def __reduce__(self) -> tuple: ... + def __repr__(self) -> str: ... + @staticmethod + def new_from_json(json: str) -> _RdmaBuffer: ... + @classmethod + def rdma_supported(cls) -> bool: ...