Skip to content

(3/N) Add functionality for RDMAManagerActor creation #514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions monarch_rdma/extension/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ doctest = false

[dependencies]
hyperactor = { version = "0.0.0", path = "../../hyperactor" }
hyperactor_mesh = { version = "0.0.0", path = "../../hyperactor_mesh" }
monarch_hyperactor = { version = "0.0.0", path = "../../monarch_hyperactor" }
monarch_rdma = { version = "0.0.0", path = ".." }
pyo3 = { version = "0.24", features = ["anyhow", "multiple-pymethods"] }
pyo3-async-runtimes = { version = "0.24", features = ["attributes", "tokio-runtime"] }
serde = { version = "1.0.185", features = ["derive", "rc"] }
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] }
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
91 changes: 90 additions & 1 deletion monarch_rdma/extension/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ use hyperactor::ActorId;
use hyperactor::ActorRef;
use hyperactor::Named;
use hyperactor::ProcId;
use hyperactor_mesh::RootActorMesh;
use hyperactor_mesh::shared_cell::SharedCell;
use monarch_hyperactor::mailbox::PyMailbox;
use monarch_hyperactor::proc_mesh::PyProcMesh;
use monarch_hyperactor::runtime::signal_safe_block_on;
use monarch_rdma::IbverbsConfig;
use monarch_rdma::RdmaBuffer;
use monarch_rdma::RdmaManagerActor;
use monarch_rdma::RdmaManagerMessageClient;
use monarch_rdma::ibverbs_supported;
use pyo3::BoundObject;
use pyo3::exceptions::PyException;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
Expand Down Expand Up @@ -284,7 +287,93 @@ impl PyRdmaBuffer {
}
}

#[pyclass(name = "_RdmaManager", module = "monarch._rust_bindings.rdma")]
pub struct PyRdmaManager {
inner: SharedCell<RootActorMesh<'static, RdmaManagerActor>>,
device: String,
}

#[pymethods]
impl PyRdmaManager {
#[pyo3(name = "__repr__")]
fn repr(&self) -> String {
format!("<RdmaManager(device='{}')>", self.device)
}

#[getter]
fn device(&self) -> &str {
&self.device
}
}

/// Creates an RDMA manager actor on the given ProcMesh.
/// Returns the actor mesh if RDMA is supported, None otherwise.
#[pyfunction]
fn create_rdma_manager_blocking<'py>(
py: Python<'py>,
proc_mesh: &PyProcMesh,
) -> PyResult<Option<PyRdmaManager>> {
if !ibverbs_supported() {
tracing::info!("rdma is not enabled on this hardware");
return Ok(None);
}

// TODO - make this configurable
let config = IbverbsConfig::default();
tracing::debug!("rdma is enabled, using device {}", config.device);

let tracked_proc_mesh = proc_mesh.try_inner()?;
let device = config.device.to_string();

let actor_mesh = signal_safe_block_on(py, async move {
tracked_proc_mesh
.spawn("rdma_manager", &config)
.await
.map_err(|err| PyException::new_err(err.to_string()))
})??;

Ok(Some(PyRdmaManager {
inner: actor_mesh,
device,
}))
}

/// Creates an RDMA manager actor on the given ProcMesh (async version).
/// Returns the actor mesh if RDMA is supported, None otherwise.
#[pyfunction]
fn create_rdma_manager_nonblocking<'py>(
py: Python<'py>,
proc_mesh: &PyProcMesh,
) -> PyResult<Bound<'py, PyAny>> {
if !ibverbs_supported() {
tracing::info!("rdma is not enabled on this hardware");
return Ok(py.None().into_bound(py));
}

// TODO - make this configurable
let config = IbverbsConfig::default();
tracing::debug!("rdma is enabled, using device {}", config.device);

let tracked_proc_mesh = proc_mesh.try_inner()?;
let device = config.device.to_string();

pyo3_async_runtimes::tokio::future_into_py(py, async move {
let actor_mesh = tracked_proc_mesh
.spawn::<RdmaManagerActor>("rdma_manager", &config)
.await
.map_err(|err| PyException::new_err(err.to_string()))?;

Ok(Some(PyRdmaManager {
inner: actor_mesh,
device,
}))
})
}

pub fn register_python_bindings(module: &Bound<'_, PyModule>) -> PyResult<()> {
module.add_class::<PyRdmaBuffer>()?;
module.add_class::<PyRdmaManager>()?;
module.add_function(wrap_pyfunction!(create_rdma_manager_blocking, module)?)?;
module.add_function(wrap_pyfunction!(create_rdma_manager_nonblocking, module)?)?;
Ok(())
}
7 changes: 7 additions & 0 deletions python/monarch/_rust_bindings/rdma/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ from typing import Any, final, Optional
class _RdmaMemoryRegionView:
def __init__(self, addr: int, size_in_bytes: int) -> None: ...

@final
class _RdmaManager:
device: str
def __repr__(self) -> str: ...

def create_rdma_manager_blocking(proc_mesh: Any) -> Optional[_RdmaManager]: ...
async def create_rdma_manager_nonblocking(proc_mesh: Any) -> Optional[_RdmaManager]: ...
@final
class _RdmaBuffer:
name: str
Expand Down