Skip to content

Commit 98d217e

Browse files
authored
[core][cgraph] Fix illegal memory access of cgraph when used in PP (#51734)
This fixes the illegal memory access bug reported in #51596 The culprit PR is #51305 This PR technically just manually reverts #51305 because it's hard to simply `git revert` that commit, because we now enforce that there are no core->air dependencies in CI. See #51699 This PR simply reverts to the original logic before #51305 , and makes code copy as needed. We wanted to first get the bug mitigated, and then follow up to simplify and tailor the logic of `utils.get_cuda_devices()`, as it can be tricky. ## Related issue number Closes #51596 --------- Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
1 parent 24ad12d commit 98d217e

File tree

4 files changed

+67
-22
lines changed

4 files changed

+67
-22
lines changed

python/ray/dag/tests/experimental/test_torch_tensor_dag.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
Communicator,
1919
TorchTensorAllocator,
2020
)
21-
from ray.experimental.channel.utils import get_default_torch_device
21+
from ray.experimental.channel.utils import get_devices
2222
from ray.experimental.channel.torch_tensor_type import TorchTensorType
2323
from ray.experimental.channel.nccl_group import _NcclGroup
2424
from ray._private.test_utils import (
@@ -40,7 +40,7 @@
4040
@ray.remote
4141
class TorchTensorWorker:
4242
def __init__(self):
43-
self.device = get_default_torch_device(allow_cpu=True)
43+
self.device = get_devices()[0]
4444

4545
def init_distributed(self, world_size, rank):
4646
torch.distributed.init_process_group(
@@ -569,7 +569,7 @@ def initialize(self, rank: int) -> None:
569569
rank == expected_rank
570570
), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}"
571571
self._rank = rank
572-
self._device = get_default_torch_device(allow_cpu=True)
572+
self._device = get_devices()[0]
573573

574574
def get_rank(self, actor: ray.actor.ActorHandle) -> int:
575575
actor_ids = [a._ray_actor_id for a in self._actor_handles]
@@ -703,7 +703,7 @@ def initialize(self, rank: int) -> None:
703703
rank == expected_rank
704704
), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}"
705705
self._rank = rank
706-
self._device = get_default_torch_device(allow_cpu=True)
706+
self._device = get_devices()[0]
707707

708708
def get_rank(self, actor: ray.actor.ActorHandle) -> int:
709709
actor_ids = [a._ray_actor_id for a in self._actor_handles]
@@ -850,7 +850,7 @@ def initialize(self, rank: int) -> None:
850850
rank == expected_rank
851851
), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}"
852852
self._rank = rank
853-
self._device = get_default_torch_device(allow_cpu=True)
853+
self._device = get_devices()[0]
854854

855855
def get_rank(self, actor: ray.actor.ActorHandle) -> int:
856856
actor_ids = [a._ray_actor_id for a in self._actor_handles]

python/ray/experimental/channel/common.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import ray
2020
import ray.exceptions
2121
from ray.experimental.channel.communicator import Communicator
22-
from ray.experimental.channel.utils import get_default_torch_device
22+
from ray.experimental.channel.utils import get_devices
2323
from ray.experimental.channel.serialization_context import _SerializationContext
2424
from ray.util.annotations import DeveloperAPI, PublicAPI
2525

@@ -163,7 +163,7 @@ def torch_available(self) -> bool:
163163
@property
164164
def torch_device(self) -> "torch.device":
165165
if self._torch_device is None:
166-
self._torch_device = get_default_torch_device(allow_cpu=True)
166+
self._torch_device = get_devices()[0]
167167

168168
return self._torch_device
169169

python/ray/experimental/channel/nccl_group.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from ray.exceptions import RayChannelError
77
from ray.experimental.channel.communicator import Communicator, TorchTensorAllocator
88
from ray.experimental.util.types import ReduceOp
9-
from ray.experimental.channel.utils import get_default_torch_device
9+
from ray.experimental.channel.utils import get_devices
1010

1111
if TYPE_CHECKING:
1212
import cupy as cp
@@ -102,7 +102,7 @@ def __init__(
102102
import cupy as cp
103103

104104
# TODO(swang): Allow default device to be overridden.
105-
device = get_default_torch_device(allow_cpu=False)
105+
device = get_devices()[0]
106106
self._cuda_stream = cp.cuda.ExternalStream(
107107
cuda_stream, device_id=device.index
108108
)

python/ray/experimental/channel/utils.py

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import TYPE_CHECKING, List, Optional, Tuple
22

33
import ray
4+
import os
45

56
if TYPE_CHECKING:
67
import torch
@@ -95,22 +96,66 @@ def get_actor_node(actor: Optional["ray.actor.ActorHandle"]) -> str:
9596
)
9697

9798

98-
def get_default_torch_device(*, allow_cpu: bool) -> "torch.device":
99-
"""Get the default torch device inside this actor or driver.
99+
def get_cuda_devices() -> List["torch.device"]:
100+
"""Gets the correct torch cuda device list configured for this process.
100101
101-
If any GPUs are available, the default device will be cuda:0 and we will rely on
102-
torch to handle mapping CUDA_VISIBLE_DEVICES to a physical device.
103-
104-
If no GPUs are available, a CPU device will be returned if allow_cpu is true, else
105-
the function will raise a RuntimeError.
102+
Assumes that `CUDA_VISIBLE_DEVICES` is set and is a
103+
superset of the `ray.get_gpu_ids()`.
106104
"""
105+
# Note: currently this method replicates the logic from
106+
# `CUDATorchDeviceManager.get_devices()`.
107+
# TODO(rui): tailor and clean up the logic for proper use in
108+
# Compiled Graphs.
107109
import torch
108110

109-
accelerator_ids = ray.get_runtime_context().get_accelerator_ids()
110-
if not accelerator_ids.get("GPU", []):
111-
if allow_cpu:
112-
return torch.device("cpu")
111+
# GPU IDs are assigned by Ray after you specify "use_gpu"
112+
# GPU `ray.get_gpu_ids()` may return ints or may return strings.
113+
# We should always convert to strings.
114+
gpu_ids = [str(id) for id in ray.get_gpu_ids()]
115+
116+
device_ids = []
117+
118+
if len(gpu_ids) > 0:
119+
cuda_visible_str = os.environ.get("CUDA_VISIBLE_DEVICES", "")
120+
if cuda_visible_str and cuda_visible_str != "NoDevFiles":
121+
cuda_visible_list = cuda_visible_str.split(",")
113122
else:
114-
raise RuntimeError("No CUDA device available.")
123+
cuda_visible_list = []
124+
125+
# By default, there should only be one GPU ID if `use_gpu=True`.
126+
# If there are multiple GPUs, return a list of devices.
127+
# If using fractional GPUs, these IDs are not guaranteed
128+
# to be unique across different processes.
129+
for gpu_id in gpu_ids:
130+
try:
131+
device_ids.append(cuda_visible_list.index(gpu_id))
132+
except IndexError:
133+
raise RuntimeError(
134+
"CUDA_VISIBLE_DEVICES set incorrectly. "
135+
f"Got {cuda_visible_str}, expected to include {gpu_id}. "
136+
"Did you override the `CUDA_VISIBLE_DEVICES` environment"
137+
" variable? If not, please help file an issue on Github."
138+
)
139+
140+
else:
141+
# If called on the driver or outside of Ray Train, return the
142+
# 0th device.
143+
device_ids.append(0)
144+
145+
return [torch.device(f"cuda:{device_id}") for device_id in device_ids]
146+
115147

116-
return torch.device("cuda:0")
148+
def get_devices() -> List["torch.device"]:
149+
"""Gets the correct torch device list configured for this process.
150+
151+
Returns a list of torch devices allocated for the current worker.
152+
If no devices are assigned, then it returns a list with a single CPU device.
153+
"""
154+
155+
import torch
156+
157+
gpu_ids = [str(id) for id in ray.get_gpu_ids()]
158+
if len(gpu_ids) > 0:
159+
return get_cuda_devices()
160+
else:
161+
return [torch.device("cpu")]

0 commit comments

Comments
 (0)