Skip to content

[Performance] EPLB Execution Optimization #20990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

david6666666
Copy link

@david6666666 david6666666 commented Jul 15, 2025

Essential Elements of an Effective PR Description Checklist

  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.

Purpose

issue #20805
pr #18343
EPLB Execution

  • Parallelize the rearrangement algorithm (calculating new expert mapping, not the communication)
  • Shuffle one layer at once and use multiple steps, to lower the impact on inter-token latency
  • Investigate should we pre-allocate expert weight buffer used for transferring
  • Take locality into consideration in expert weight transmission, e.g. prioritize transferring to GPUs on the same node
  • Use process asynchronously moves the weight to the buffer

Test Plan

Test Result

(Optional) Documentation Update

Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

@mergify mergify bot added the v1 label Jul 15, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @david6666666, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces significant performance optimizations for Expert Parallelism Load Balancing (EPLB) by refactoring the existing implementation into a new asynchronous module. The core improvement lies in a multi-stage, asynchronous expert weight rearrangement process that leverages hardware abstraction, background threads, and pre-allocated buffers to minimize latency impact during inference. The changes aim to make the expert shuffling more efficient and less disruptive to the main computation flow.

Highlights

  • New Asynchronous EPLB Module: Introduced a new async_eplb module (vllm/distributed/async_eplb) to house the optimized Expert Parallelism Load Balancing (EPLB) logic, replacing the previous eplb module.
  • Hardware Abstraction for Distributed Operations: Added a DeviceBackend abstraction (vllm/distributed/async_eplb/DB.py) with concrete implementations for CUDA and NPU, enabling more portable and extensible distributed communication primitives like synchronize, create_buffer_like, all_gather, batch_isend_irecv, and barrier.
  • Multi-Stage Asynchronous Expert Rearrangement: Implemented a sophisticated multi-stage process for expert weight rearrangement: (1) pre_calculate determines the optimal expert mapping, (2) a background thread asynchronously moves expert weights to a pre-allocated buffer using async_move_to_buffer (via eplb_worker), and (3) move_to_workspace synchronously transfers weights from the buffer to the active model workspace. This design aims to offload heavy inter-GPU communication, reducing its impact on inter-token latency.
  • Layer-by-Layer Expert Shuffling: The expert rearrangement process now shuffles one MoE layer at a time (_run_periodically in eplb_state.py increments self.layer), aligning with the PR's goal to lower the impact on inter-token latency.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an asynchronous execution path for Expert Parallelism Load Balancing (EPLB) to improve performance. The code appears to be in a very early, work-in-progress state and requires significant refinement and testing. The core algorithm in rebalance_algo.py seems sound, but the new orchestration and execution logic in eplb_state.py and rebalance_execute_async.py needs substantial attention.

被子线程调用,持续检查并在条件满足时执行搬运
进程应该什么时候停下来,终止条件待定
"""
while not _stop_event.is_set():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The variable _stop_event is not defined. To gracefully manage the lifecycle of the background thread, you need to create and manage a threading.Event or asyncio.Event. This event should be part of the EplbState so it can be set from outside the thread to signal termination.

Comment on lines +427 to +429
self.physical_to_logical_map.copy_(new_physical_to_logical_map)
self.logical_to_physical_map.copy_(new_logical_to_physical_map)
self.logical_replica_count.copy_(new_logical_replica_count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The variables new_physical_to_logical_map, new_logical_to_physical_map, and new_logical_replica_count are not defined in the scope of post_eplb. They are local variables in the pre_calculate method. You should use the instance attributes (self.new_...) which are set in pre_calculate.

            self.physical_to_logical_map.copy_(self.new_physical_to_logical_map)
            self.logical_to_physical_map.copy_(self.new_logical_to_physical_map)
            self.logical_replica_count.copy_(self.new_logical_replica_count)

def create_device_backend(use_cuda: bool = True) -> DeviceBackend:
if use_cuda :
return CUDABackend()
elif use_npu :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The variable use_npu is not defined in this function's scope, which will cause a NameError at runtime. The function signature only includes use_cuda. You should either add use_npu as a parameter or detect NPU availability automatically, for example with torch_npu.npu.is_available().

Comment on lines +80 to +82
ep_buffer_ready: bool = false

cp : bool=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Python uses True and False (with capital letters) for boolean values, not false. This will cause a SyntaxError.

    ep_buffer_ready: bool = False

    cp: bool = False

logical_replica_count,
)
#buffer_size需要确定一下
self.buffer = torch.empty((buffer_size,), dtype=torch.uint8, device=self.device)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The variables buffer_size and self.device are not defined here.

  • self.device will fail because build is a class method, so self refers to the class, not an instance. You should use the device parameter that is passed into the method.
  • buffer_size is not defined anywhere. It needs to be calculated or passed in, for example from parallel_config.

Comment on lines +97 to +145
from typing import Sequence, Iterable, Set
from functools import partial

import torch
import torch.distributed as dist
import threading
from functools import partial
from typing import Sequence, Iterable, Set, List, Optional

# 假设 P2POp 和 batch_isend_irecv 已经定义
class P2POp:
def __init__(self, func, tensor, dst):
self.func = func
self.tensor = tensor
self.dst = dst

def batch_isend_irecv(ops: List[P2POp]):
reqs = []
for op in ops:
if op.func == dist.isend:
reqs.append(op.func(op.tensor, op.dst))
elif op.func == dist.irecv:
reqs.append(op.func(op.tensor, op.dst))
return reqs

def get_global_rank(group, rank):
return rank # 简化实现

def get_ep_ranks_with_expert(expert, num_local_experts, old_indices, new_indices):
# 简化实现
return [0, 1], [0, 1]

def idx_local_to_global(local_idx, local_cnt, ep_rank):
return local_idx + ep_rank * local_cnt

def barrier(group):
dist.barrier(group=group)

# 假设 GroupCoordinator 类已定义
class GroupCoordinator:
def __init__(self, process_group):
self.device_group = process_group

# 全局变量和获取函数
_EP: Optional[GroupCoordinator] = None

def get_ep_group() -> GroupCoordinator:
assert _EP is not None, ("expert parallel group is not initialized")
return _EP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This entire block of code re-imports many modules and re-defines several functions and classes (P2POp, batch_isend_irecv, get_global_rank, etc.) that are either already available in torch.distributed or defined just above in this same file (lines 17-95). This is highly redundant and will lead to maintenance issues.

Please remove these duplicated definitions and use the original ones from torch.distributed or the ones defined earlier in this file.

reqs = batch_isend_irecv(send_reqs)
# 注意:这里使用 req.wait() 会同步该流,但不会阻塞其他流
for req in reqs:
req.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using req.wait() inside an async function is a blocking call that will freeze the entire asyncio event loop until the operation completes. This defeats the purpose of using asyncio.

If you need to wait for a request without blocking the loop, you should consider running the blocking operation in a separate thread using asyncio.to_thread (Python 3.9+) or loop.run_in_executor.

The same issue exists on line 293 with req.wait() and on line 316 with cuda_stream.synchronize().

Comment on lines +59 to +61
def synchronize(self) -> None:

pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The synchronize method in NPUBackend is implemented as a pass, which means it does nothing. This breaks the contract of the DeviceBackend and can lead to race conditions or incorrect behavior if this backend is used. It should be implemented with the NPU-equivalent synchronization call, for example torch_npu.npu.synchronize().

Similarly, other methods like all_gather, batch_isend_irecv, and barrier are also empty. In a distributed setting, these no-op implementations will cause processes to hang. They should either be fully implemented or raise a NotImplementedError.

Suggested change
def synchronize(self) -> None:
pass
def synchronize(self) -> None:
torch_npu.npu.synchronize()

if (self.expert_rearrangement_step
>= self.expert_rearrangement_step_interval):
self.expert_rearrangement_step = 0
pre_calculate(model,is_profile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This should be a method call on the instance: self.pre_calculate(...).

            self.pre_calculate(model, is_profile)




__all__ = ["rearrange_expert_weights_inplace"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The __all__ variable exports rearrange_expert_weights_inplace, but this function is not defined in the file. The main entry point seems to be eplb_worker. Please update __all__ to reflect the actual public API of this module, for example __all__ = ["eplb_worker"].

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants