@@ -521,26 +521,28 @@ def __del__(self):
521
521
self .llm .shutdown ()
522
522
523
523
524
- def _ray_scheduling_strategy_fn (num_gpus_per_instance : int , accelerator_type : str ):
524
+ def _ray_scheduling_strategy_fn (
525
+ num_workers_per_instance : int , accelerator_type : str , resources : Optional [Dict [str , float ]] = None
526
+ ):
525
527
"""
526
528
Create a Ray scheduling strategy for vLLM engine.
527
529
528
530
Args:
529
- num_gpus_per_instance : The number of GPUs per instance.
531
+ num_workers_per_instance : The number of workers per instance.
530
532
accelerator_type: The accelerator type.
531
533
532
534
Returns:
533
535
The Ray scheduling strategy.
534
536
"""
535
537
536
538
def _get_bundle () -> Dict [str , float ]:
537
- bundle : Dict [str , float ] = {"GPU" : 1 , "CPU" : 1 }
539
+ bundle : Dict [str , float ] = resources if resources else {"GPU" : 1 , "CPU" : 1 }
538
540
if accelerator_type :
539
541
bundle [f"accelerator_type:{ accelerator_type } " ] = 0.001
540
542
return bundle
541
543
542
544
pg = ray .util .placement_group (
543
- [_get_bundle ()] * num_gpus_per_instance ,
545
+ [_get_bundle ()] * num_workers_per_instance ,
544
546
strategy = "STRICT_PACK" ,
545
547
)
546
548
return dict (
@@ -569,6 +571,7 @@ def post_init(cls, values):
569
571
The updated values.
570
572
"""
571
573
map_batches_kwargs = values ["map_batches_kwargs" ]
574
+ resources_per_worker = map_batches_kwargs .get ("resources" )
572
575
accelerator_type = map_batches_kwargs .get ("accelerator_type" , "" )
573
576
fn_constructor_kwargs = values ["fn_constructor_kwargs" ]
574
577
engine_kwargs = fn_constructor_kwargs .get ("engine_kwargs" , {})
@@ -577,29 +580,36 @@ def post_init(cls, values):
577
580
if accelerator_type :
578
581
ray_remote_args ["accelerator_type" ] = accelerator_type
579
582
580
- # Setup num_gpus required per vLLM engine.
583
+ # Setup num_workers required per vLLM engine.
581
584
tp_size = engine_kwargs .get ("tensor_parallel_size" , 1 )
582
585
pp_size = engine_kwargs .get ("pipeline_parallel_size" , 1 )
583
- num_gpus = tp_size * pp_size
586
+ num_workers = tp_size * pp_size
584
587
585
588
# Use the MP backend by default.
586
589
engine_kwargs .setdefault ("distributed_executor_backend" , "mp" )
587
590
executor_backend = engine_kwargs .get ("distributed_executor_backend" )
588
591
589
- # When Ray is used in the vLLM engine, we set num_gpus to 0 so that
592
+ # When Ray is used in the vLLM engine, we set num_devices to 0 so that
590
593
# Ray Data won't reserve GPUs in advance. Instead, we specify scheduling
591
594
# strategy in .map_batches() arguments and let vLLM Ray executor to
592
595
# create placement groups for each TP/PP worker.
593
- if executor_backend == "ray" and num_gpus > 1 :
596
+ num_mp_workers = num_workers
597
+ if executor_backend == "ray" and num_workers > 1 :
594
598
# Note that we have to use partial() to pass a function
595
599
# instead of an object.
596
600
map_batches_kwargs ["ray_remote_args_fn" ] = partial (
597
601
_ray_scheduling_strategy_fn ,
598
- num_gpus ,
602
+ num_workers ,
599
603
accelerator_type ,
600
604
)
601
- num_gpus = 0
605
+ num_mp_workers = 0
606
+
607
+ if not resources_per_worker :
608
+ map_batches_kwargs ["num_gpus" ] = num_mp_workers
609
+ else :
610
+ ray_remote_args ["resources" ] = {
611
+ key : value * num_mp_workers for key , value in resources_per_worker .items ()
612
+ }
602
613
603
- map_batches_kwargs ["num_gpus" ] = num_gpus
604
614
map_batches_kwargs .update (ray_remote_args )
605
615
return values
0 commit comments