Skip to content

Commit 360ede3

Browse files
[llm] ray.llm support custom accelerators (#51359)
Signed-off-by: liuxsh9 <liuxiaoshuang4@huawei.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com> Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
1 parent ad3e5d5 commit 360ede3

File tree

9 files changed

+94
-18
lines changed

9 files changed

+94
-18
lines changed

python/ray/data/llm.py

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ class ProcessorConfig(_ProcessorConfig):
1919
On the other hand, small batch sizes are more fault-tolerant and could
2020
reduce bubbles in the data pipeline. You can tune the batch size to balance
2121
the throughput and fault-tolerance based on your use case.
22+
resources_per_bundle: The resource bundles for placement groups.
23+
You can specify a custom device label e.g. {'NPU': 1}.
24+
The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.
2225
accelerator_type: The accelerator type used by the LLM stage in a processor.
2326
Default to None, meaning that only the CPU will be used.
2427
concurrency: The number of workers for data parallelism. Default to 1.

python/ray/llm/_internal/batch/processor/base.py

+6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ class ProcessorConfig(BaseModelExtended):
3030
"You can tune the batch size to balance the throughput and fault-tolerance "
3131
"based on your use case. Defaults to 64.",
3232
)
33+
resources_per_bundle: Optional[Dict[str, float]] = Field(
34+
default=None,
35+
description="This will override the default resource bundles for placement groups. "
36+
"You can specify a custom device label e.g. {'NPU': 1}. "
37+
"The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.",
38+
)
3339
accelerator_type: Optional[str] = Field(
3440
default=None,
3541
description="The accelerator type used by the LLM stage in a processor. "

python/ray/llm/_internal/batch/processor/vllm_engine_proc.py

+1
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ def build_vllm_engine_processor(
193193
# This is used to make sure we overlap batches to avoid the tail
194194
# latency of each batch.
195195
max_concurrency=config.max_concurrent_batches,
196+
resources=config.resources_per_bundle,
196197
accelerator_type=config.accelerator_type,
197198
runtime_env=config.runtime_env,
198199
),

python/ray/llm/_internal/batch/stages/vllm_engine_stage.py

+40-14
Original file line numberDiff line numberDiff line change
@@ -570,26 +570,42 @@ def __del__(self):
570570
self.llm.shutdown()
571571

572572

573-
def _ray_scheduling_strategy_fn(num_gpus_per_instance: int, accelerator_type: str):
574-
"""
575-
Create a Ray scheduling strategy for vLLM engine.
573+
def _ray_scheduling_strategy_fn(
574+
num_bundles_per_replica: int,
575+
accelerator_type: Optional[str] = None,
576+
resources_per_bundle: Optional[Dict[str, float]] = None,
577+
):
578+
"""Create a Ray scheduling strategy for the engine.
576579
577580
Args:
578-
num_gpus_per_instance: The number of GPUs per instance.
579-
accelerator_type: The accelerator type.
581+
num_bundles_per_replica: The number of device bundles per
582+
engine replica.
583+
accelerator_type: The accelerator type. If None, the
584+
accelerator_type label will not be set.
585+
resources_per_bundle: The custom resources per bundle.
586+
If None, we default to 1xGPU + 1xCPU bundle.
580587
581588
Returns:
582589
The Ray scheduling strategy.
583590
"""
584591

585592
def _get_bundle() -> Dict[str, float]:
586-
bundle: Dict[str, float] = {"GPU": 1, "CPU": 1}
593+
594+
bundle = {}
595+
# Custom resources
596+
if resources_per_bundle:
597+
bundle = resources_per_bundle
598+
else:
599+
# GPU bundles
600+
bundle = {"GPU": 1, "CPU": 1}
601+
602+
# Accelerator type
587603
if accelerator_type:
588604
bundle[f"accelerator_type:{accelerator_type}"] = 0.001
589605
return bundle
590606

591607
pg = ray.util.placement_group(
592-
[_get_bundle()] * num_gpus_per_instance,
608+
[_get_bundle()] * num_bundles_per_replica,
593609
strategy="STRICT_PACK",
594610
)
595611
return dict(
@@ -626,30 +642,40 @@ def post_init(cls, values):
626642
if accelerator_type:
627643
ray_remote_args["accelerator_type"] = accelerator_type
628644

629-
# Setup num_gpus required per vLLM engine.
645+
# Setup num_workers required per vLLM engine.
630646
tp_size = engine_kwargs.get("tensor_parallel_size", 1)
631647
pp_size = engine_kwargs.get("pipeline_parallel_size", 1)
632-
num_gpus = tp_size * pp_size
648+
num_bundles_per_replica = tp_size * pp_size
633649

634650
# Use the MP backend by default.
635651
engine_kwargs.setdefault("distributed_executor_backend", "mp")
636652
executor_backend = engine_kwargs.get("distributed_executor_backend")
637653

638-
# When Ray is used in the vLLM engine, we set num_gpus to 0 so that
654+
# When Ray is used in the vLLM engine, we set num_devices to 0 so that
639655
# Ray Data won't reserve GPUs in advance. Instead, we specify scheduling
640656
# strategy in .map_batches() arguments and let vLLM Ray executor to
641657
# create placement groups for each TP/PP worker.
642-
if executor_backend == "ray" and num_gpus > 1:
658+
resources_per_bundle = map_batches_kwargs.pop("resources", None)
659+
if executor_backend == "ray" and num_bundles_per_replica > 1:
643660
# Note that we have to use partial() to pass a function
644661
# instead of an object.
645662
map_batches_kwargs["ray_remote_args_fn"] = partial(
646663
_ray_scheduling_strategy_fn,
647-
num_gpus,
664+
num_bundles_per_replica,
648665
accelerator_type,
666+
resources_per_bundle,
649667
)
650-
num_gpus = 0
668+
ray_remote_args["num_gpus"] = 0
669+
else:
670+
if not resources_per_bundle:
671+
# Default to GPUs per bundle if custom resources are not specified.
672+
ray_remote_args["num_gpus"] = num_bundles_per_replica
673+
else:
674+
ray_remote_args["resources"] = {
675+
resource_key: resource_count * num_bundles_per_replica
676+
for resource_key, resource_count in resources_per_bundle.items()
677+
}
651678

652-
map_batches_kwargs["num_gpus"] = num_gpus
653679
map_batches_kwargs.update(ray_remote_args)
654680
return values
655681

python/ray/llm/_internal/serve/configs/server_models.py

+7
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ class LLMConfig(BaseModelExtended):
190190
),
191191
)
192192

193+
resources_per_bundle: Optional[Dict[str, float]] = Field(
194+
default=None,
195+
description="This will override the default resource bundles for placement groups. "
196+
"You can specify a custom device label e.g. {'NPU': 1}. "
197+
"The default resource bundle for LLM Stage is always a GPU resource i.e. {'GPU': 1}.",
198+
)
199+
193200
accelerator_type: Optional[str] = Field(
194201
default=None,
195202
description=f"The type of accelerator runs the model on. Only the following values are supported: {str([t.value for t in GPUType])}",

python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py

+12-3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ class VLLMEngineConfig(BaseModelExtended):
5151
None,
5252
description="Configuration for cloud storage mirror. This is for where the weights are downloaded from.",
5353
)
54+
resources_per_bundle: Optional[Dict[str, float]] = Field(
55+
default=None,
56+
description="This overrides the vLLM engine worker's default resource configuration, "
57+
"the number of resources returned by `placement_bundles`.",
58+
)
5459
accelerator_type: Optional[GPUType] = Field(
5560
None,
5661
description="The type of accelerator to use. This is used to determine the placement group strategy.",
@@ -104,6 +109,7 @@ def from_llm_config(cls, llm_config: LLMConfig) -> "VLLMEngineConfig":
104109
model_id=llm_config.model_id,
105110
hf_model_id=hf_model_id,
106111
mirror_config=mirror_config,
112+
resources_per_bundle=llm_config.resources_per_bundle,
107113
accelerator_type=llm_config.accelerator_type,
108114
engine_kwargs=llm_config.engine_kwargs,
109115
runtime_env=llm_config.runtime_env,
@@ -122,7 +128,7 @@ def pipeline_parallel_degree(self) -> int:
122128
return self.engine_kwargs.get("pipeline_parallel_size", 1)
123129

124130
@property
125-
def num_gpu_workers(self) -> int:
131+
def num_devices(self) -> int:
126132
return self.tensor_parallel_degree * self.pipeline_parallel_degree
127133

128134
@property
@@ -134,10 +140,13 @@ def placement_strategy(self) -> str:
134140

135141
@property
136142
def placement_bundles(self) -> List[Dict[str, float]]:
137-
bundle = {"GPU": 1}
143+
if self.resources_per_bundle:
144+
bundle = self.resources_per_bundle
145+
else:
146+
bundle = {"GPU": 1}
138147
if self.accelerator_type:
139148
bundle[self.ray_accelerator_type()] = 0.001
140-
bundles = [bundle for _ in range(self.num_gpu_workers)]
149+
bundles = [bundle for _ in range(self.num_devices)]
141150

142151
return bundles
143152

python/ray/llm/_internal/serve/observability/usage_telemetry/usage.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,6 @@ def push_telemetry_report_for_all_models(
266266
max_replicas=max_replicas,
267267
tensor_parallel_degree=engine_config.tensor_parallel_degree,
268268
gpu_type=model.accelerator_type or DEFAULT_GPU_TYPE,
269-
num_gpus=engine_config.num_gpu_workers,
269+
num_gpus=engine_config.num_devices,
270270
)
271271
_push_telemetry_report(telemetry_model)

python/ray/llm/tests/serve/configs/test_models.py

+22
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,28 @@ def test_get_serve_options_without_accelerator_type(self):
212212
}
213213
assert serve_options == expected_options
214214

215+
def test_resources_per_bundle(self):
216+
"""Test that resources_per_bundle is correctly parsed."""
217+
218+
# Test the default resource bundle
219+
serve_options = LLMConfig(
220+
model_loading_config=dict(model_id="test_model"),
221+
engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2),
222+
).get_serve_options(name_prefix="Test:")
223+
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [
224+
{"GPU": 1} for _ in range(6)
225+
]
226+
227+
# Test the custom resource bundle
228+
serve_options = LLMConfig(
229+
model_loading_config=dict(model_id="test_model"),
230+
engine_kwargs=dict(tensor_parallel_size=3, pipeline_parallel_size=2),
231+
resources_per_bundle={"XPU": 1},
232+
).get_serve_options(name_prefix="Test:")
233+
assert serve_options["placement_group_bundles"] == [{"CPU": 1, "GPU": 0}] + [
234+
{"XPU": 1} for _ in range(6)
235+
]
236+
215237

216238
if __name__ == "__main__":
217239
sys.exit(pytest.main(["-v", __file__]))

python/ray/util/accelerators/accelerators.py

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
GOOGLE_TPU_V5P = "TPU-V5P"
2727
GOOGLE_TPU_V5LITEPOD = "TPU-V5LITEPOD"
2828
GOOGLE_TPU_V6E = "TPU-V6E"
29+
HUAWEI_NPU_910B = "Ascend910B"
30+
HUAWEI_NPU_910B4 = "Ascend910B4"
2931

3032
# Use these instead of NVIDIA_A100 if you need a specific accelerator size. Note that
3133
# these labels are not auto-added to nodes, you'll have to add them manually in

0 commit comments

Comments
 (0)