Skip to content

Elastic Expert Parallel Initial Support #20775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

ruisearch42
Copy link
Collaborator

@ruisearch42 ruisearch42 commented Jul 10, 2025

Essential Elements of an Effective PR Description Checklist

  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.

Purpose

This corresponds to Milestone 1 of #20323 .

Co-authored with @libertyeagle

Supported functionality:

  • Retained engine-core state destroy & reinitialization
    • Distributed environment
    • Distributed communicators
    • Model structure & weights: including EPLB weight reshuffle
  • Scale up: new engine-core startup
    • KV cache initialization: use available GPU memory information from existing engine-core to skip expensive profiling
  • Scale down: unneeded engine-core shutdown
  • Control plane
    • API server endpoint
    • DP engine-core scheduling: e.g. collective operations (from retained and new engine-cores) need to happen at the same time
    • Traffic handling with a simple strategy of draining and dropping during scaling

TODO for this PR:

  • More testing with repeated scale up/down
  • Address FIXME
  • Minor refactors and cleanups
    • e.g., remove/move/cleanup scripts in experimental or examples directory

Follow-ups after this PR

Test Plan

Test with PPLX kernel and DeepSeek-V2-Lite

Test Result

Can alternate scale up and down multiple times, and drain/drop traffic

(Optional) Documentation Update

Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

@mergify mergify bot added deepseek Related to DeepSeek models frontend v1 labels Jul 10, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @ruisearch42, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces initial support for Elastic Expert Parallelism (EEP) in vLLM, enabling dynamic scaling of data parallel (DP) engine cores, particularly beneficial for Mixture-of-Experts (MoE) models. The changes encompass re-initializing distributed environments, managing KV cache states across scaling events, and orchestrating expert weight reshuffling to adapt to the new parallel configurations. A new API endpoint is added to trigger these scaling operations, with mechanisms to handle in-flight requests during transitions.

Highlights

  • Dynamic Data Parallel Scaling: Introduces core functionality to dynamically scale up and down the number of data parallel (DP) engine cores, enabling flexible resource allocation for vLLM deployments.
  • Elastic Expert Parallelism (EPLB) Integration: Implements logic to rebalance and reshuffle expert weights across the new set of available GPUs during scaling operations, ensuring efficient utilization for Mixture-of-Experts (MoE) models.
  • Distributed Environment Re-initialization: Adds mechanisms to gracefully tear down and re-initialize PyTorch distributed process groups and related communication states across engine cores during dynamic scaling events.
  • KV Cache State Management: Enables newly added engine cores to initialize their KV cache based on the available memory information from existing engine cores, optimizing startup time during scale-up.
  • API for Scaling Control: Exposes a new /scale API endpoint on the vLLM server, allowing external systems to programmatically trigger scale-up or scale-down operations.
  • Traffic Handling During Scaling: Incorporates a traffic draining and dropping strategy during scaling transitions to minimize disruption to in-flight requests, ensuring service continuity.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major new feature: elastic expert parallelism. This is a substantial change that touches many parts of the vLLM engine, from the low-level distributed communication and worker management to the high-level API server. The implementation seems well-thought-out, with a multi-phase approach to scaling to handle the complexities of a distributed system. The use of a scaling flag to gracefully handle traffic during scaling is a good design choice.

My review focuses on a few key areas:

  • Correctness: I found a type hint mismatch that should be fixed. I also pointed out a commented-out assertion that might hide potential issues.
  • Maintainability & Robustness: I've suggested improvements for a magic number and a custom communication protocol to make the code more robust and easier to maintain.

Overall, this is a great step towards elastic inference in vLLM. The changes are complex, and I appreciate the effort that went into this.

vllm/config.py Outdated
Comment on lines 1959 to 1969
def sync_kv_cache_memory(dp_group: "ProcessGroup",
kv_cache_memory: int) -> None:
if kv_cache_memory == -1:
kv_cache_memory = torch.iinfo(torch.int64).max
tensor = torch.tensor([kv_cache_memory],
dtype=torch.int64,
device="cpu")
# we cannot use broadcast for stateless dp group since it depends
# on global rank
torch.distributed.all_reduce(tensor, op=ReduceOp.MIN, group=dp_group)
return tensor.item()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The return type hint for sync_kv_cache_memory is -> None, but the function returns an integer via tensor.item(). This should be corrected to -> int to match the implementation.

    @staticmethod
    def sync_kv_cache_memory(dp_group: "ProcessGroup",
                             kv_cache_memory: int) -> int:

Comment on lines +197 to +200
# TODO(yongji): hard-wired to make sure the tensor does not get resized
MAX_PHYSICAL_EXPERT_FACTOR = 2
max_slots_per_logical_expert = (model.num_logical_experts *
MAX_PHYSICAL_EXPERT_FACTOR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MAX_PHYSICAL_EXPERT_FACTOR is hardcoded to 2. Consider making this configurable or adding a more detailed explanation in the comment about the trade-offs and potential failure scenarios if this factor is exceeded. This will help future developers understand the constraints of the current implementation.

@@ -853,7 +867,7 @@ def _init_data_parallel(self, vllm_config: VllmConfig):
local_dp_rank = vllm_config.parallel_config.data_parallel_rank_local

assert dp_size > 1
assert 0 <= local_dp_rank <= dp_rank < dp_size
# assert 0 <= local_dp_rank <= dp_rank < dp_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This assertion assert 0 <= local_dp_rank <= dp_rank < dp_size is commented out. Please add a comment explaining why this assertion is disabled. If it's no longer needed, it should be removed. If it's temporarily disabled for scaling, a comment would be very helpful for future maintenance and debugging.

Comment on lines +924 to +932
if len(buf) > 4 and buf[4:].startswith(b"SCALE_UP"):
# Extract new engine count from the first 4 bytes
new_engine_count = int.from_bytes(
buf[:4], "little")
# Send scale up notification to coordinator
scale_msg = msgspec.msgpack.encode(
("SCALE_UP", new_engine_count))
await socket.send(scale_msg)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation for scale-up notification uses a custom protocol (4-byte integer for count + magic string). This can be brittle and hard to extend in the future. Consider using msgspec for consistency and robustness. For example, you could encode a tuple like ("SCALE_UP", new_engine_count).

Comment on lines 132 to 134
# eep-dev
if old_indices[src_global] == -1 or new_indices[dst_global] == -1:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check prevents issues with potentially invalid indices during scaling operations. It ensures that the code gracefully handles cases where experts are being removed or added, maintaining the integrity of the expert mapping.

Comment on lines 147 to 149
# eep-dev
if expert == -1:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check prevents issues with potentially invalid indices during scaling operations. It ensures that the code gracefully handles cases where experts are being removed or added, maintaining the integrity of the expert mapping.

Comment on lines 192 to 194
# eep-dev
if expert == -1:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check prevents issues with potentially invalid indices during scaling operations. It ensures that the code gracefully handles cases where experts are being removed or added, maintaining the integrity of the expert mapping.

Comment on lines 242 to 243
# eep-dev
# assert self.fused_experts == fused_experts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This assertion is commented out. It's important to understand why it was disabled and whether it's safe to remove it completely. If the condition it checks is no longer valid due to architectural changes, the assertion should be removed. Otherwise, it should be re-enabled with a comment explaining the reason for its original presence and why it's now conditionally disabled.

Copy link

mergify bot commented Jul 10, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @ruisearch42.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jul 10, 2025
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@mergify mergify bot added the documentation Improvements or additions to documentation label Jul 10, 2025
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Comment on lines +1088 to +1115
def _init_engines_direct(self, vllm_config: VllmConfig, local_only: bool,
local_start_index: int, input_address: str,
output_address: str,
executor_class: type[Executor], log_stats: bool):
"""Self-contained client mode, launch engine and coordinator process
as needed."""

parallel_config = vllm_config.parallel_config
assert parallel_config.data_parallel_rank == 0
assert local_start_index == 0

addresses = EngineZmqAddresses(
inputs=[input_address],
outputs=[output_address],
)

if len(self.core_engines) > 1:
coordinator = DPCoordinator(parallel_config)
self.resources.coordinator = coordinator
addresses.coordinator_input, addresses.coordinator_output = (
coordinator.get_engine_socket_addresses())

# Start all engines.
self.resources.engine_manager = (CoreEngineActorManager(
vllm_config=vllm_config,
addresses=addresses,
executor_class=executor_class,
log_stats=log_stats))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is no longer used/needed and can be deleted. The corresponding logic (including for ray) is now in the launch_core_engines util function.

@njhill
Copy link
Member

njhill commented Jul 11, 2025

Thanks @ruisearch42 I haven't reviewed in detail yet but it looks this great to me, the changes look very clean. However, I can see that this is targeted only at the "self-contained" data parallel mode with a single logical shared API server.

We expect for large scale multi-node deployments we would use the external LB mode (for example when used with dynamo or llm-d), where a separate vLLM is launched per rank and there's a separate API server / http endpoint per engine process (as described in the docs here).

I think much of the logic would be shared/common (like the coordinator changes) but the mechanics of how scaling is triggered I'm guessing would need to be different - something more like each of the ranks registering with the coordinator when they are started, and having this cause reinitialization of the other ranks. So that scaling up would be done by launching another vllm instance and scaling down would be done by shutting one down.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@libertyeagle can we upstream this?

@@ -630,6 +630,11 @@ async def create_chat_completion(request: ChatCompletionRequest,
return base(raw_request).create_error_response(
message="The model does not support Chat Completions API")

if raw_request.app.state.scaling:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a middleware

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deepseek Related to DeepSeek models documentation Improvements or additions to documentation frontend needs-rebase v1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants