-
-
Notifications
You must be signed in to change notification settings - Fork 8.7k
Elastic Expert Parallel Initial Support #20775
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @ruisearch42, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces initial support for Elastic Expert Parallelism (EEP) in vLLM, enabling dynamic scaling of data parallel (DP) engine cores, particularly beneficial for Mixture-of-Experts (MoE) models. The changes encompass re-initializing distributed environments, managing KV cache states across scaling events, and orchestrating expert weight reshuffling to adapt to the new parallel configurations. A new API endpoint is added to trigger these scaling operations, with mechanisms to handle in-flight requests during transitions.
Highlights
- Dynamic Data Parallel Scaling: Introduces core functionality to dynamically scale up and down the number of data parallel (DP) engine cores, enabling flexible resource allocation for vLLM deployments.
- Elastic Expert Parallelism (EPLB) Integration: Implements logic to rebalance and reshuffle expert weights across the new set of available GPUs during scaling operations, ensuring efficient utilization for Mixture-of-Experts (MoE) models.
- Distributed Environment Re-initialization: Adds mechanisms to gracefully tear down and re-initialize PyTorch distributed process groups and related communication states across engine cores during dynamic scaling events.
- KV Cache State Management: Enables newly added engine cores to initialize their KV cache based on the available memory information from existing engine cores, optimizing startup time during scale-up.
- API for Scaling Control: Exposes a new
/scale
API endpoint on the vLLM server, allowing external systems to programmatically trigger scale-up or scale-down operations. - Traffic Handling During Scaling: Incorporates a traffic draining and dropping strategy during scaling transitions to minimize disruption to in-flight requests, ensuring service continuity.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a major new feature: elastic expert parallelism. This is a substantial change that touches many parts of the vLLM engine, from the low-level distributed communication and worker management to the high-level API server. The implementation seems well-thought-out, with a multi-phase approach to scaling to handle the complexities of a distributed system. The use of a scaling
flag to gracefully handle traffic during scaling is a good design choice.
My review focuses on a few key areas:
- Correctness: I found a type hint mismatch that should be fixed. I also pointed out a commented-out assertion that might hide potential issues.
- Maintainability & Robustness: I've suggested improvements for a magic number and a custom communication protocol to make the code more robust and easier to maintain.
Overall, this is a great step towards elastic inference in vLLM. The changes are complex, and I appreciate the effort that went into this.
vllm/config.py
Outdated
def sync_kv_cache_memory(dp_group: "ProcessGroup", | ||
kv_cache_memory: int) -> None: | ||
if kv_cache_memory == -1: | ||
kv_cache_memory = torch.iinfo(torch.int64).max | ||
tensor = torch.tensor([kv_cache_memory], | ||
dtype=torch.int64, | ||
device="cpu") | ||
# we cannot use broadcast for stateless dp group since it depends | ||
# on global rank | ||
torch.distributed.all_reduce(tensor, op=ReduceOp.MIN, group=dp_group) | ||
return tensor.item() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# TODO(yongji): hard-wired to make sure the tensor does not get resized | ||
MAX_PHYSICAL_EXPERT_FACTOR = 2 | ||
max_slots_per_logical_expert = (model.num_logical_experts * | ||
MAX_PHYSICAL_EXPERT_FACTOR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MAX_PHYSICAL_EXPERT_FACTOR
is hardcoded to 2
. Consider making this configurable or adding a more detailed explanation in the comment about the trade-offs and potential failure scenarios if this factor is exceeded. This will help future developers understand the constraints of the current implementation.
@@ -853,7 +867,7 @@ def _init_data_parallel(self, vllm_config: VllmConfig): | |||
local_dp_rank = vllm_config.parallel_config.data_parallel_rank_local | |||
|
|||
assert dp_size > 1 | |||
assert 0 <= local_dp_rank <= dp_rank < dp_size | |||
# assert 0 <= local_dp_rank <= dp_rank < dp_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion assert 0 <= local_dp_rank <= dp_rank < dp_size
is commented out. Please add a comment explaining why this assertion is disabled. If it's no longer needed, it should be removed. If it's temporarily disabled for scaling, a comment would be very helpful for future maintenance and debugging.
if len(buf) > 4 and buf[4:].startswith(b"SCALE_UP"): | ||
# Extract new engine count from the first 4 bytes | ||
new_engine_count = int.from_bytes( | ||
buf[:4], "little") | ||
# Send scale up notification to coordinator | ||
scale_msg = msgspec.msgpack.encode( | ||
("SCALE_UP", new_engine_count)) | ||
await socket.send(scale_msg) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation for scale-up notification uses a custom protocol (4-byte integer for count + magic string). This can be brittle and hard to extend in the future. Consider using msgspec
for consistency and robustness. For example, you could encode a tuple like ("SCALE_UP", new_engine_count)
.
# eep-dev | ||
if old_indices[src_global] == -1 or new_indices[dst_global] == -1: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# eep-dev | ||
if expert == -1: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# eep-dev | ||
if expert == -1: | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# eep-dev | ||
# assert self.fused_experts == fused_experts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion is commented out. It's important to understand why it was disabled and whether it's safe to remove it completely. If the condition it checks is no longer valid due to architectural changes, the assertion should be removed. Otherwise, it should be re-enabled with a comment explaining the reason for its original presence and why it's now conditionally disabled.
This pull request has merge conflicts that must be resolved before it can be |
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
def _init_engines_direct(self, vllm_config: VllmConfig, local_only: bool, | ||
local_start_index: int, input_address: str, | ||
output_address: str, | ||
executor_class: type[Executor], log_stats: bool): | ||
"""Self-contained client mode, launch engine and coordinator process | ||
as needed.""" | ||
|
||
parallel_config = vllm_config.parallel_config | ||
assert parallel_config.data_parallel_rank == 0 | ||
assert local_start_index == 0 | ||
|
||
addresses = EngineZmqAddresses( | ||
inputs=[input_address], | ||
outputs=[output_address], | ||
) | ||
|
||
if len(self.core_engines) > 1: | ||
coordinator = DPCoordinator(parallel_config) | ||
self.resources.coordinator = coordinator | ||
addresses.coordinator_input, addresses.coordinator_output = ( | ||
coordinator.get_engine_socket_addresses()) | ||
|
||
# Start all engines. | ||
self.resources.engine_manager = (CoreEngineActorManager( | ||
vllm_config=vllm_config, | ||
addresses=addresses, | ||
executor_class=executor_class, | ||
log_stats=log_stats)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is no longer used/needed and can be deleted. The corresponding logic (including for ray) is now in the launch_core_engines
util function.
Thanks @ruisearch42 I haven't reviewed in detail yet but it looks this great to me, the changes look very clean. However, I can see that this is targeted only at the "self-contained" data parallel mode with a single logical shared API server. We expect for large scale multi-node deployments we would use the external LB mode (for example when used with dynamo or llm-d), where a separate vLLM is launched per rank and there's a separate API server / http endpoint per engine process (as described in the docs here). I think much of the logic would be shared/common (like the coordinator changes) but the mechanics of how scaling is triggered I'm guessing would need to be different - something more like each of the ranks registering with the coordinator when they are started, and having this cause reinitialization of the other ranks. So that scaling up would be done by launching another vllm instance and scaling down would be done by shutting one down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@libertyeagle can we upstream this?
@@ -630,6 +630,11 @@ async def create_chat_completion(request: ChatCompletionRequest, | |||
return base(raw_request).create_error_response( | |||
message="The model does not support Chat Completions API") | |||
|
|||
if raw_request.app.state.scaling: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a middleware
Essential Elements of an Effective PR Description Checklist
supported_models.md
andexamples
for a new model.Purpose
This corresponds to Milestone 1 of #20323 .
Co-authored with @libertyeagle
Supported functionality:
TODO for this PR:
Follow-ups after this PR
Test Plan
Test with PPLX kernel and DeepSeek-V2-Lite
Test Result
Can alternate scale up and down multiple times, and drain/drop traffic
(Optional) Documentation Update