-
-
Notifications
You must be signed in to change notification settings - Fork 8.7k
[Performance] EPLB Execution Optimization #20990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
👋 Hi! Thank you for contributing to the vLLM project. 💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels. Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging. To run CI, PR reviewers can either: Add 🚀 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @david6666666, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces significant performance optimizations for Expert Parallelism Load Balancing (EPLB) by refactoring the existing implementation into a new asynchronous module. The core improvement lies in a multi-stage, asynchronous expert weight rearrangement process that leverages hardware abstraction, background threads, and pre-allocated buffers to minimize latency impact during inference. The changes aim to make the expert shuffling more efficient and less disruptive to the main computation flow.
Highlights
- New Asynchronous EPLB Module: Introduced a new
async_eplb
module (vllm/distributed/async_eplb
) to house the optimized Expert Parallelism Load Balancing (EPLB) logic, replacing the previouseplb
module. - Hardware Abstraction for Distributed Operations: Added a
DeviceBackend
abstraction (vllm/distributed/async_eplb/DB.py
) with concrete implementations for CUDA and NPU, enabling more portable and extensible distributed communication primitives likesynchronize
,create_buffer_like
,all_gather
,batch_isend_irecv
, andbarrier
. - Multi-Stage Asynchronous Expert Rearrangement: Implemented a sophisticated multi-stage process for expert weight rearrangement: (1)
pre_calculate
determines the optimal expert mapping, (2) a background thread asynchronously moves expert weights to a pre-allocated buffer usingasync_move_to_buffer
(viaeplb_worker
), and (3)move_to_workspace
synchronously transfers weights from the buffer to the active model workspace. This design aims to offload heavy inter-GPU communication, reducing its impact on inter-token latency. - Layer-by-Layer Expert Shuffling: The expert rearrangement process now shuffles one MoE layer at a time (
_run_periodically
ineplb_state.py
incrementsself.layer
), aligning with the PR's goal to lower the impact on inter-token latency.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces an asynchronous execution path for Expert Parallelism Load Balancing (EPLB) to improve performance. The code appears to be in a very early, work-in-progress state and requires significant refinement and testing. The core algorithm in rebalance_algo.py
seems sound, but the new orchestration and execution logic in eplb_state.py
and rebalance_execute_async.py
needs substantial attention.
被子线程调用,持续检查并在条件满足时执行搬运 | ||
进程应该什么时候停下来,终止条件待定 | ||
""" | ||
while not _stop_event.is_set(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.physical_to_logical_map.copy_(new_physical_to_logical_map) | ||
self.logical_to_physical_map.copy_(new_logical_to_physical_map) | ||
self.logical_replica_count.copy_(new_logical_replica_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variables new_physical_to_logical_map
, new_logical_to_physical_map
, and new_logical_replica_count
are not defined in the scope of post_eplb
. They are local variables in the pre_calculate
method. You should use the instance attributes (self.new_...
) which are set in pre_calculate
.
self.physical_to_logical_map.copy_(self.new_physical_to_logical_map)
self.logical_to_physical_map.copy_(self.new_logical_to_physical_map)
self.logical_replica_count.copy_(self.new_logical_replica_count)
def create_device_backend(use_cuda: bool = True) -> DeviceBackend: | ||
if use_cuda : | ||
return CUDABackend() | ||
elif use_npu : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ep_buffer_ready: bool = false | ||
|
||
cp : bool=false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logical_replica_count, | ||
) | ||
#buffer_size需要确定一下 | ||
self.buffer = torch.empty((buffer_size,), dtype=torch.uint8, device=self.device) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variables buffer_size
and self.device
are not defined here.
self.device
will fail becausebuild
is a class method, soself
refers to the class, not an instance. You should use thedevice
parameter that is passed into the method.buffer_size
is not defined anywhere. It needs to be calculated or passed in, for example fromparallel_config
.
from typing import Sequence, Iterable, Set | ||
from functools import partial | ||
|
||
import torch | ||
import torch.distributed as dist | ||
import threading | ||
from functools import partial | ||
from typing import Sequence, Iterable, Set, List, Optional | ||
|
||
# 假设 P2POp 和 batch_isend_irecv 已经定义 | ||
class P2POp: | ||
def __init__(self, func, tensor, dst): | ||
self.func = func | ||
self.tensor = tensor | ||
self.dst = dst | ||
|
||
def batch_isend_irecv(ops: List[P2POp]): | ||
reqs = [] | ||
for op in ops: | ||
if op.func == dist.isend: | ||
reqs.append(op.func(op.tensor, op.dst)) | ||
elif op.func == dist.irecv: | ||
reqs.append(op.func(op.tensor, op.dst)) | ||
return reqs | ||
|
||
def get_global_rank(group, rank): | ||
return rank # 简化实现 | ||
|
||
def get_ep_ranks_with_expert(expert, num_local_experts, old_indices, new_indices): | ||
# 简化实现 | ||
return [0, 1], [0, 1] | ||
|
||
def idx_local_to_global(local_idx, local_cnt, ep_rank): | ||
return local_idx + ep_rank * local_cnt | ||
|
||
def barrier(group): | ||
dist.barrier(group=group) | ||
|
||
# 假设 GroupCoordinator 类已定义 | ||
class GroupCoordinator: | ||
def __init__(self, process_group): | ||
self.device_group = process_group | ||
|
||
# 全局变量和获取函数 | ||
_EP: Optional[GroupCoordinator] = None | ||
|
||
def get_ep_group() -> GroupCoordinator: | ||
assert _EP is not None, ("expert parallel group is not initialized") | ||
return _EP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This entire block of code re-imports many modules and re-defines several functions and classes (P2POp
, batch_isend_irecv
, get_global_rank
, etc.) that are either already available in torch.distributed
or defined just above in this same file (lines 17-95). This is highly redundant and will lead to maintenance issues.
Please remove these duplicated definitions and use the original ones from torch.distributed
or the ones defined earlier in this file.
reqs = batch_isend_irecv(send_reqs) | ||
# 注意:这里使用 req.wait() 会同步该流,但不会阻塞其他流 | ||
for req in reqs: | ||
req.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using req.wait()
inside an async
function is a blocking call that will freeze the entire asyncio event loop until the operation completes. This defeats the purpose of using asyncio
.
If you need to wait for a request without blocking the loop, you should consider running the blocking operation in a separate thread using asyncio.to_thread
(Python 3.9+) or loop.run_in_executor
.
The same issue exists on line 293 with req.wait()
and on line 316 with cuda_stream.synchronize()
.
def synchronize(self) -> None: | ||
|
||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The synchronize
method in NPUBackend
is implemented as a pass
, which means it does nothing. This breaks the contract of the DeviceBackend
and can lead to race conditions or incorrect behavior if this backend is used. It should be implemented with the NPU-equivalent synchronization call, for example torch_npu.npu.synchronize()
.
Similarly, other methods like all_gather
, batch_isend_irecv
, and barrier
are also empty. In a distributed setting, these no-op implementations will cause processes to hang. They should either be fully implemented or raise a NotImplementedError
.
def synchronize(self) -> None: | |
pass | |
def synchronize(self) -> None: | |
torch_npu.npu.synchronize() |
if (self.expert_rearrangement_step | ||
>= self.expert_rearrangement_step_interval): | ||
self.expert_rearrangement_step = 0 | ||
pre_calculate(model,is_profile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
|
||
__all__ = ["rearrange_expert_weights_inplace"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essential Elements of an Effective PR Description Checklist
supported_models.md
andexamples
for a new model.Purpose
issue #20805
pr #18343
EPLB Execution
Test Plan
Test Result
(Optional) Documentation Update