Skip to content

Commit b2ad24d

Browse files
allenwang28facebook-github-bot
authored andcommitted
(2/N) Add in monarch/python/_src/tensor_engine (#486)
Summary: Pull Request resolved: #486 This diff adds in a dedicated target for tensor engine. In rdma.py, adds in basic support for RDMABuffer CPU. While CUDA<>CPU|CUDA is implemented in monarch_rdma, this was running into issues with MR registration. Logging shows that the code path correctly interprets the address as CUDA, but cuMemGetHandleForAddressRange returns the handle/fd as -1 and a null pointer MR. RDMABuffer will only support CPU for now with GPU support as a follow up, cc dstaay-fb Reviewed By: mariusae Differential Revision: D78035395 fbshipit-source-id: f3a85d3c8e70a7ace37aa9a36a3993d471baeea1
1 parent 985ed47 commit b2ad24d

File tree

2 files changed

+215
-0
lines changed

2 files changed

+215
-0
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
import logging
8+
import warnings
9+
from typing import Optional
10+
11+
import torch
12+
from monarch._rust_bindings.rdma import _RdmaBuffer
13+
14+
from monarch._src.actor.future import Future
15+
16+
from monarch.actor import MonarchContext
17+
18+
19+
# RDMARead/WriteTransferWarnings are warnings that are only printed once per process.
20+
# Remove these once GPU support is added.
21+
class RDMAReadTransferWarning(Warning):
22+
pass
23+
24+
25+
class RDMAWriteTransferWarning(Warning):
26+
pass
27+
28+
29+
warnings.simplefilter("once", RDMAReadTransferWarning)
30+
warnings.simplefilter("once", RDMAWriteTransferWarning)
31+
32+
33+
def rdma_supported():
34+
return _RdmaBuffer.rdma_supported()
35+
36+
37+
def _assert_tensor_is_1d_contiguous_uint8(t: torch.Tensor) -> None:
38+
if t.ndim != 1:
39+
raise ValueError(f"Tensor must be 1D, got {t.ndim}D")
40+
if t.dtype != torch.uint8:
41+
raise ValueError(f"Tensor must be uint8, got {t.dtype}")
42+
if not t.is_contiguous():
43+
raise ValueError("Tensor must be contiguous")
44+
45+
46+
class RDMABuffer:
47+
def __init__(self, data: torch.Tensor) -> None:
48+
"""
49+
RDMABuffer only supports 1D contiguous tensors that are 1 byte per item.
50+
51+
To create a 1 byte, 1D view, use t.view(torch.uint8).flatten()
52+
53+
TODO: Create TensorBuffer, which will be main user API supporting non-contiguous , multi-byte-per-elment tensors
54+
"""
55+
assert _RdmaBuffer.rdma_supported()
56+
57+
if data.device.type != "cpu":
58+
# TODO - CUDA support for RDMABuffer exists at the Rust layer, but
59+
# runs into issues with MR creation. For now, only support CPU tensors.
60+
# Remove this once GPU support is added.
61+
raise ValueError(
62+
"RDMABuffer currently only supports CPU tensors (got device {})".format(
63+
data.device
64+
)
65+
)
66+
67+
_assert_tensor_is_1d_contiguous_uint8(data)
68+
assert data.storage_offset() == 0
69+
70+
try:
71+
storage = data.untyped_storage()
72+
addr: int = storage.data_ptr()
73+
size = storage.element_size() * data.numel()
74+
ctx = MonarchContext.get()
75+
f = Future(
76+
lambda: _RdmaBuffer.create_rdma_buffer_nonblocking(
77+
addr=addr,
78+
size=size,
79+
proc_id=ctx.proc_id,
80+
client=ctx.mailbox,
81+
),
82+
lambda: _RdmaBuffer.create_rdma_buffer_blocking(
83+
addr=addr,
84+
size=size,
85+
proc_id=ctx.proc_id,
86+
client=ctx.mailbox,
87+
),
88+
)
89+
self._buffer: _RdmaBuffer = f.get()
90+
# TODO - specific exception
91+
except Exception as e:
92+
logging.error("Failed to create buffer %s", e)
93+
raise e
94+
95+
def read_into(
96+
self,
97+
dst: torch.Tensor,
98+
offset: int = 0,
99+
timeout: int = 3,
100+
) -> Future[Optional[int]]:
101+
"""
102+
Read data from the RDMABuffer into a destination tensor.
103+
104+
The destination tensor must be contiguous and 1 byte per item.
105+
106+
Returns an ActorFuture that can be awaited or called with .get() for blocking operation.
107+
"""
108+
_assert_tensor_is_1d_contiguous_uint8(dst)
109+
dst_gpu = None
110+
if dst.device.type != "cpu":
111+
# TODO - remove this once GPU support is added.
112+
warnings.warn(
113+
"note: read_into only supports CPU tensors, so `dst` is being copied to CPU.",
114+
RDMAReadTransferWarning,
115+
stacklevel=2,
116+
)
117+
dst_gpu = dst
118+
dst = dst.cpu()
119+
storage = dst.untyped_storage()
120+
addr: int = storage.data_ptr() + offset
121+
size = storage.element_size() * dst.numel()
122+
if offset + size > dst.numel():
123+
raise ValueError(
124+
f"offset + size ({offset + size}) must be <= dst.numel() ({dst.numel()})"
125+
)
126+
127+
async def read_into_nonblocking() -> Optional[int]:
128+
res = await self._buffer.read_into(
129+
addr=addr,
130+
size=size,
131+
local_proc_id=MonarchContext.get().proc_id,
132+
client=MonarchContext.get().mailbox,
133+
timeout=timeout,
134+
)
135+
# TODO - remove this once GPU support is added.
136+
if dst_gpu is not None:
137+
dst_gpu.copy_(dst)
138+
return res
139+
140+
def read_into_blocking() -> Optional[int]:
141+
res = self._buffer.read_into_blocking(
142+
addr=addr,
143+
size=size,
144+
local_proc_id=MonarchContext.get().proc_id,
145+
client=MonarchContext.get().mailbox,
146+
timeout=timeout,
147+
)
148+
# TODO - remove this once GPU support is added.
149+
if dst_gpu is not None:
150+
dst_gpu.copy_(dst)
151+
return res
152+
153+
return Future(read_into_nonblocking, read_into_blocking)
154+
155+
def write_from(
156+
self, src: torch.Tensor, offset: int = 0, timeout: int = 3
157+
) -> Future[None]:
158+
"""
159+
Write data from a source tensor into the RDMABuffer.
160+
161+
The source tensor must be contiguous and 1 byte per item.
162+
163+
Returns an ActorFuture that can be awaited or called with .get() for blocking operation.
164+
"""
165+
_assert_tensor_is_1d_contiguous_uint8(src)
166+
src_gpu = None
167+
if src.device.type != "cpu":
168+
# TODO - remove this once GPU support is added.
169+
warnings.warn(
170+
"note: write_from only supports CPU tensors, so we will write to CPU first, then transfer to `src` in place.",
171+
RDMAWriteTransferWarning,
172+
stacklevel=2,
173+
)
174+
src_gpu = src # Save the original GPU tensor reference
175+
src = src.cpu() # Convert to CPU for RDMA operation
176+
storage = src.untyped_storage()
177+
addr: int = storage.data_ptr()
178+
size = storage.element_size() * src.numel()
179+
if size + offset > src.numel():
180+
raise ValueError(
181+
f"size + offset ({size + offset}) must be <= src.numel() ({src.numel()})"
182+
)
183+
184+
async def write_from_nonblocking() -> None:
185+
res = await self._buffer.write_from(
186+
addr=addr,
187+
size=size,
188+
local_proc_id=MonarchContext.get().proc_id,
189+
client=MonarchContext.get().mailbox,
190+
timeout=timeout,
191+
)
192+
# TODO - remove this once GPU support is added.
193+
if src_gpu is not None:
194+
src_gpu.copy_(src)
195+
return res
196+
197+
def write_from_blocking() -> None:
198+
res = self._buffer.write_from_blocking(
199+
addr=addr,
200+
size=size,
201+
local_proc_id=MonarchContext.get().proc_id,
202+
client=MonarchContext.get().mailbox,
203+
timeout=timeout,
204+
)
205+
# TODO - remove this once GPU support is added.
206+
if src_gpu is not None:
207+
src_gpu.copy_(src)
208+
return res
209+
210+
return Future(write_from_nonblocking, write_from_blocking)

0 commit comments

Comments
 (0)