@@ -570,26 +570,42 @@ def __del__(self):
570
570
self .llm .shutdown ()
571
571
572
572
573
- def _ray_scheduling_strategy_fn (num_gpus_per_instance : int , accelerator_type : str ):
574
- """
575
- Create a Ray scheduling strategy for vLLM engine.
573
+ def _ray_scheduling_strategy_fn (
574
+ num_bundles_per_replica : int ,
575
+ accelerator_type : Optional [str ] = None ,
576
+ resources_per_bundle : Optional [Dict [str , float ]] = None ,
577
+ ):
578
+ """Create a Ray scheduling strategy for the engine.
576
579
577
580
Args:
578
- num_gpus_per_instance: The number of GPUs per instance.
579
- accelerator_type: The accelerator type.
581
+ num_bundles_per_replica: The number of device bundles per
582
+ engine replica.
583
+ accelerator_type: The accelerator type. If None, the
584
+ accelerator_type label will not be set.
585
+ resources_per_bundle: The custom resources per bundle.
586
+ If None, we default to 1xGPU + 1xCPU bundle.
580
587
581
588
Returns:
582
589
The Ray scheduling strategy.
583
590
"""
584
591
585
592
def _get_bundle () -> Dict [str , float ]:
586
- bundle : Dict [str , float ] = {"GPU" : 1 , "CPU" : 1 }
593
+
594
+ bundle = {}
595
+ # Custom resources
596
+ if resources_per_bundle :
597
+ bundle = resources_per_bundle
598
+ else :
599
+ # GPU bundles
600
+ bundle = {"GPU" : 1 , "CPU" : 1 }
601
+
602
+ # Accelerator type
587
603
if accelerator_type :
588
604
bundle [f"accelerator_type:{ accelerator_type } " ] = 0.001
589
605
return bundle
590
606
591
607
pg = ray .util .placement_group (
592
- [_get_bundle ()] * num_gpus_per_instance ,
608
+ [_get_bundle ()] * num_bundles_per_replica ,
593
609
strategy = "STRICT_PACK" ,
594
610
)
595
611
return dict (
@@ -626,30 +642,40 @@ def post_init(cls, values):
626
642
if accelerator_type :
627
643
ray_remote_args ["accelerator_type" ] = accelerator_type
628
644
629
- # Setup num_gpus required per vLLM engine.
645
+ # Setup num_workers required per vLLM engine.
630
646
tp_size = engine_kwargs .get ("tensor_parallel_size" , 1 )
631
647
pp_size = engine_kwargs .get ("pipeline_parallel_size" , 1 )
632
- num_gpus = tp_size * pp_size
648
+ num_bundles_per_replica = tp_size * pp_size
633
649
634
650
# Use the MP backend by default.
635
651
engine_kwargs .setdefault ("distributed_executor_backend" , "mp" )
636
652
executor_backend = engine_kwargs .get ("distributed_executor_backend" )
637
653
638
- # When Ray is used in the vLLM engine, we set num_gpus to 0 so that
654
+ # When Ray is used in the vLLM engine, we set num_devices to 0 so that
639
655
# Ray Data won't reserve GPUs in advance. Instead, we specify scheduling
640
656
# strategy in .map_batches() arguments and let vLLM Ray executor to
641
657
# create placement groups for each TP/PP worker.
642
- if executor_backend == "ray" and num_gpus > 1 :
658
+ resources_per_bundle = map_batches_kwargs .pop ("resources" , None )
659
+ if executor_backend == "ray" and num_bundles_per_replica > 1 :
643
660
# Note that we have to use partial() to pass a function
644
661
# instead of an object.
645
662
map_batches_kwargs ["ray_remote_args_fn" ] = partial (
646
663
_ray_scheduling_strategy_fn ,
647
- num_gpus ,
664
+ num_bundles_per_replica ,
648
665
accelerator_type ,
666
+ resources_per_bundle ,
649
667
)
650
- num_gpus = 0
668
+ ray_remote_args ["num_gpus" ] = 0
669
+ else :
670
+ if not resources_per_bundle :
671
+ # Default to GPUs per bundle if custom resources are not specified.
672
+ ray_remote_args ["num_gpus" ] = num_bundles_per_replica
673
+ else :
674
+ ray_remote_args ["resources" ] = {
675
+ resource_key : resource_count * num_bundles_per_replica
676
+ for resource_key , resource_count in resources_per_bundle .items ()
677
+ }
651
678
652
- map_batches_kwargs ["num_gpus" ] = num_gpus
653
679
map_batches_kwargs .update (ray_remote_args )
654
680
return values
655
681
0 commit comments