Skip to content

[Performance] EPLB Execution Optimization #20990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions vllm/distributed/async_eplb/DB.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from abc import ABC, abstractmethod
import torch
import torch_npu
import torch.distributed as dist

class DeviceBackend(ABC):
"""硬件后端抽象基类,定义统一接口"""

@abstractmethod
def synchronize(self) -> None:
"""同步当前设备的所有操作"""
pass

@abstractmethod
def create_buffer_like(self, tensor: torch.Tensor) -> torch.Tensor:
"""创建与输入张量相同类型和设备的缓冲区"""
pass

@abstractmethod
def all_gather(self, output_tensor_list: list[torch.Tensor],
input_tensor: torch.Tensor, group=None) -> None:
"""执行all_gather集体通信操作"""
pass

@abstractmethod
def batch_isend_irecv(self, p2p_ops: list[P2POp]) -> list[dist.Work]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

P2POp is used as a type hint here but is not defined or imported in this file. This will cause a NameError at runtime. The same issue exists on lines 49 and 70. You should import it from torch.distributed at the top of the file.

"""执行批量异步发送和接收操作"""
pass

Check failure on line 28 in vllm/distributed/async_eplb/DB.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F821)

vllm/distributed/async_eplb/DB.py:28:47: F821 Undefined name `P2POp`

@abstractmethod
def barrier(self, group=None) -> None:
"""执行屏障同步"""
pass


class CUDABackend(DeviceBackend):
"""CUDA/NVIDIA GPU后端实现"""

def synchronize(self) -> None:
torch.cuda.synchronize()

def create_buffer_like(self, tensor: torch.Tensor) -> torch.Tensor:
return torch.empty_like(tensor, device='cuda')

def all_gather(self, output_tensor_list: list[torch.Tensor],
input_tensor: torch.Tensor, group=None) -> None:
dist.all_gather(output_tensor_list, input_tensor, group=group)

def batch_isend_irecv(self, p2p_ops: list[P2POp]) -> list[dist.Work]:
return dist.batch_isend_irecv(p2p_ops)

def barrier(self, group=None) -> None:
dist.barrier(group=group)

Check failure on line 53 in vllm/distributed/async_eplb/DB.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F821)

vllm/distributed/async_eplb/DB.py:53:47: F821 Undefined name `P2POp`


class NPUBackend(DeviceBackend):
"""NPU后端实现"""

def synchronize(self) -> None:

pass
Comment on lines +59 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The synchronize method in NPUBackend is implemented as a pass, which means it does nothing. This breaks the contract of the DeviceBackend and can lead to race conditions or incorrect behavior if this backend is used. It should be implemented with the NPU-equivalent synchronization call, for example torch_npu.npu.synchronize().

Similarly, other methods like all_gather, batch_isend_irecv, and barrier are also empty. In a distributed setting, these no-op implementations will cause processes to hang. They should either be fully implemented or raise a NotImplementedError.

Suggested change
def synchronize(self) -> None:
pass
def synchronize(self) -> None:
torch_npu.npu.synchronize()


def create_buffer_like(self, tensor: torch.Tensor) -> torch.Tensor:
return torch.empty_like(tensor, device='npu')

def all_gather(self, output_tensor_list: list[torch.Tensor],
input_tensor: torch.Tensor, group=None) -> None:
pass

def batch_isend_irecv(self, p2p_ops: list[P2POp]) -> list[dist.Work]:
pass

def barrier(self, group=None) -> None:
pass


Check failure on line 76 in vllm/distributed/async_eplb/DB.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (F821)

vllm/distributed/async_eplb/DB.py:76:47: F821 Undefined name `P2POp`
# 根据可用硬件创建适当的后端
def create_device_backend(use_cuda: bool = True) -> DeviceBackend:
if use_cuda :
return CUDABackend()
elif use_npu :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The variable use_npu is not defined in this function's scope, which will cause a NameError at runtime. The function signature only includes use_cuda. You should either add use_npu as a parameter or detect NPU availability automatically, for example with torch_npu.npu.is_available().

return NPUBackend()
Loading
Loading