diff --git a/vllm/distributed/async_eplb/DB.py b/vllm/distributed/async_eplb/DB.py new file mode 100644 index 00000000000..ae0bc58f6f8 --- /dev/null +++ b/vllm/distributed/async_eplb/DB.py @@ -0,0 +1,82 @@ +from abc import ABC, abstractmethod +import torch +import torch_npu +import torch.distributed as dist + +class DeviceBackend(ABC): + """硬件后端抽象基类,定义统一接口""" + + @abstractmethod + def synchronize(self) -> None: + """同步当前设备的所有操作""" + pass + + @abstractmethod + def create_buffer_like(self, tensor: torch.Tensor) -> torch.Tensor: + """创建与输入张量相同类型和设备的缓冲区""" + pass + + @abstractmethod + def all_gather(self, output_tensor_list: list[torch.Tensor], + input_tensor: torch.Tensor, group=None) -> None: + """执行all_gather集体通信操作""" + pass + + @abstractmethod + def batch_isend_irecv(self, p2p_ops: list[P2POp]) -> list[dist.Work]: + """执行批量异步发送和接收操作""" + pass + + @abstractmethod + def barrier(self, group=None) -> None: + """执行屏障同步""" + pass + + +class CUDABackend(DeviceBackend): + """CUDA/NVIDIA GPU后端实现""" + + def synchronize(self) -> None: + torch.cuda.synchronize() + + def create_buffer_like(self, tensor: torch.Tensor) -> torch.Tensor: + return torch.empty_like(tensor, device='cuda') + + def all_gather(self, output_tensor_list: list[torch.Tensor], + input_tensor: torch.Tensor, group=None) -> None: + dist.all_gather(output_tensor_list, input_tensor, group=group) + + def batch_isend_irecv(self, p2p_ops: list[P2POp]) -> list[dist.Work]: + return dist.batch_isend_irecv(p2p_ops) + + def barrier(self, group=None) -> None: + dist.barrier(group=group) + + +class NPUBackend(DeviceBackend): + """NPU后端实现""" + + def synchronize(self) -> None: + + pass + + def create_buffer_like(self, tensor: torch.Tensor) -> torch.Tensor: + return torch.empty_like(tensor, device='npu') + + def all_gather(self, output_tensor_list: list[torch.Tensor], + input_tensor: torch.Tensor, group=None) -> None: + pass + + def batch_isend_irecv(self, p2p_ops: list[P2POp]) -> list[dist.Work]: + pass + + def barrier(self, group=None) -> None: + pass + + +# 根据可用硬件创建适当的后端 +def create_device_backend(use_cuda: bool = True) -> DeviceBackend: + if use_cuda : + return CUDABackend() + elif use_npu : + return NPUBackend() \ No newline at end of file diff --git a/vllm/distributed/async_eplb/eplb_state.py b/vllm/distributed/async_eplb/eplb_state.py new file mode 100644 index 00000000000..1a3558353e3 --- /dev/null +++ b/vllm/distributed/async_eplb/eplb_state.py @@ -0,0 +1,439 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +Expert parallelism load balancer (EPLB) metrics and states. + +# Glossary + +- **Logical Expert**: An expert that is part of the model's logical structure. + It holds a set of weights and is replicated across multiple physical + experts. +- **Redundant Expert**: To achieve load balancing, for some popular logical + experts, we create additional copies of the expert weights. During inference, + each of these copies can be routed to by the same set of tokens. +- **Physical Expert**: An expert that is instantiated on a specific device. + It is a replica of a logical expert and can be rearranged across devices. + I.e., one logical expert may have multiple sets of weights initialized on + different devices, and each of these sets is a physical expert. +- **Local Physical Expert**: A physical expert that is instantiated on the + current device. + +For example: DeepSeek-R1 has 256 logical experts, so each MoE layer +has 256 sets of linear layer weights in the model parameters. If we add 32 +redundant experts, DeepSeek-R1 will have 256 + 32 = 288 physical experts in +total. And when deploying, we'll have 288 sets of linear layer weights for each +MoE layer. If we have 32 EP ranks, then each GPU will hold 288 / 32 = 9 local +physical experts. +""" + +import time +from collections.abc import Sequence +from dataclasses import dataclass + +import torch +from torch.distributed import all_gather, all_reduce + +from vllm.config import ParallelConfig +from vllm.distributed.parallel_state import get_ep_group, get_node_count +from vllm.logger import init_logger +from vllm.model_executor.models.interfaces import MixtureOfExperts + +from .rebalance_algo import rebalance_experts +from .rebalance_execute import move_from_buffer,eplb_worker + +logger = init_logger(__name__) + + +@dataclass +class EplbState: + """EPLB metrics.""" + + physical_to_logical_map: torch.Tensor + + logical_to_physical_map: torch.Tensor + + logical_replica_count: torch.Tensor + + expert_load_pass: torch.Tensor + + expert_load_window: torch.Tensor + + expert_load_window_step: int = 0 + + expert_load_window_size: int = 0 + + expert_rearrangement_step: int = 0 + + expert_rearrangement_step_interval: int = 0 + +#--------------------------------------新增的---------------------------------------------------- + + layer:int = 0 + + new_physical_to_logical_map: torch.tensor + + new_logical_to_physical_map: torch.tensor + + new_logical_replica_count: torch.tensor + + ep_group: None + + ep_buffer_ready: bool = false + + cp : bool=false + + buffer :None + #两个搬运操作之间需要传递的中间变量 + is_unchanged: list[bool] + is_received_locally: list[bool] + experts_recv_loc: dict[int, int] + num_local_experts: int = 0 + + @staticmethod + def build_initial_global_physical_to_logical_map( + num_routed_experts: int, + num_redundant_experts: int, + ) -> Sequence[int]: + """ + Build an initial expert arrangement using the following structure: + [original routed experts, redundant experts] + + Returns: + physical_to_logical_map (Sequence[int]): A list of integers, + where each integer is the index of the logical expert + that the corresponding physical expert maps to. + """ + global_physical_to_logical_map = list(range(num_routed_experts)) + global_physical_to_logical_map += [ + i % num_routed_experts for i in range(num_redundant_experts) + ] + return global_physical_to_logical_map + + @classmethod + def build( + cls, + model: MixtureOfExperts, + device: torch.device, + parallel_config: ParallelConfig, + ) -> "EplbState": + """ + Build the initial EPLB state. + """ + physical_to_logical_map_list = ( + cls.build_initial_global_physical_to_logical_map( + model.num_routed_experts, + model.num_redundant_experts, + )) + physical_to_logical_map = torch.tensor( + physical_to_logical_map_list, + device=device, + ) + logical_to_physical_map = torch.full( + (model.num_logical_experts, model.num_redundant_experts + 1), + -1, + device=device, + ) + logical_replica_count = torch.zeros( + (model.num_logical_experts, ), + device=device, + dtype=torch.long, + ) + + for i in range(model.num_physical_experts): + logical_idx = physical_to_logical_map[i] + logical_to_physical_map[logical_idx, + logical_replica_count[logical_idx]] = i + logical_replica_count[logical_idx] += 1 + + # Duplicate initial mapping for all layers + physical_to_logical_map = physical_to_logical_map.unsqueeze(0).expand( + model.num_moe_layers, + -1, + ).contiguous() + logical_to_physical_map = logical_to_physical_map.unsqueeze(0).expand( + model.num_moe_layers, + -1, + -1, + ).contiguous() + logical_replica_count = logical_replica_count.unsqueeze(0).expand( + model.num_moe_layers, + -1, + ).contiguous() + + expert_load_pass = torch.zeros( + (model.num_moe_layers, model.num_local_physical_experts), + dtype=torch.int32, + device=device, + ) + expert_load_window_size = parallel_config.eplb_window_size + expert_load_window = torch.zeros( + (expert_load_window_size, model.num_moe_layers, + model.num_local_physical_experts), + dtype=torch.int32, + device=device, + ) + + # Set the initial progress of rearrangement to 3/4 + eplb_step_interval = parallel_config.eplb_step_interval + expert_rearrangement_step = max( + 0, eplb_step_interval - eplb_step_interval // 4) + + model.set_eplb_state( + expert_load_pass, + logical_to_physical_map, + logical_replica_count, + ) + #buffer_size需要确定一下 + self.buffer = torch.empty((buffer_size,), dtype=torch.uint8, device=self.device) + #拉起线程 + self._async_loop(model) + + return cls( + physical_to_logical_map, + logical_to_physical_map, + logical_replica_count, + expert_load_pass, + expert_load_window, + expert_load_window_size=expert_load_window_size, + expert_rearrangement_step=expert_rearrangement_step, + expert_rearrangement_step_interval=eplb_step_interval, + ) + + def step(self, + model: MixtureOfExperts, + is_dummy: bool = False, + is_profile: bool = False, + log_stats: bool = False) -> None: + """ + Step the EPLB state. + + Args: + model (MixtureOfExperts): The MoE model. + is_dummy (bool): If `True`, this is a dummy step and the load + metrics recorded in this forward pass will not count. Defaults + to `False`. + is_profile (bool): If `True`, perform a dummy rearrangement + with maximum communication cost. This is used in `profile_run` + to reserve enough memory for the communication buffer. + log_stats (bool): If `True`, log the expert load metrics. + + # Stats + The metrics are all summed up across layers. + - `avg_tokens`: The average load across ranks. + - `max_tokens`: The maximum load across ranks. + - `balancedness`: The ratio of average load to maximum load. + """ + + if is_profile: + self.pre_calculate(model, is_profile=True) + return + + if is_dummy: + # Do not record load metrics for dummy steps + self.expert_load_pass.zero_() + + if log_stats: + # `num_tokens`: (num_moe_layers,) + num_tokens = self.expert_load_pass.sum(dim=-1) + + # Collect load metrics from all ranks + ep_group = get_ep_group().device_group + num_tokens_list = [ + torch.empty_like(num_tokens) for _ in range(ep_group.size()) + ] + all_gather(num_tokens_list, num_tokens, group=ep_group) + # Stack to get (num_ranks, num_moe_layers) + num_tokens_per_rank = torch.stack(num_tokens_list).float() + + # Compute balancedness ratio: + # for each layer: + # (mean load across ranks) / (max load across ranks) + avg_tokens_tensor = num_tokens_per_rank.mean(dim=0).sum(dim=0) + max_tokens_tensor = num_tokens_per_rank.max(dim=0).values.sum( + dim=0) + + # Just to make type checker happy + tokens_tensors: list[float] = torch.stack( + [avg_tokens_tensor, max_tokens_tensor]).tolist() + avg_tokens, max_tokens = tokens_tensors + balancedness = avg_tokens / max_tokens if max_tokens > 0 else 0.0 + + if ep_group.rank() == 0: + logger.info( + "EPLB step: avg_tokens=%.2f, max_tokens=%d, " + "balancedness=%.4f", avg_tokens, max_tokens, balancedness) + + # Update the expert load sliding window + if not is_dummy: + self.expert_load_window[self.expert_load_window_step] = ( + self.expert_load_pass.clone()) + self.expert_load_window_step += 1 + if self.expert_load_window_step >= self.expert_load_window_size: + self.expert_load_window_step = 0 + self.expert_load_pass.zero_() + + # Step the expert rearrangement step + # Note that even if this is a dummy step, we still increment the + # rearrangement step and perform rearrangement to ensure all ranks are + # performing collective communication. + self.expert_rearrangement_step += 1 + """ + 从缓冲区搬运到工作区,在一次前向计算step后执行 + + """ + if not self.ep_buffer_ready: + self.move_to_workspace(model) + if (self.expert_rearrangement_step + >= self.expert_rearrangement_step_interval): + self.expert_rearrangement_step = 0 + pre_calculate(model,is_profile) + + + def pre_calculate(self, + model: MixtureOfExperts, + is_profile: bool = False) -> None: + """ + Rearrange the experts according to the current load. + """ + + ep_group = get_ep_group().device_group + ep_rank = ep_group.rank() + + time_start = None + is_main_rank = ep_rank == 0 + if is_main_rank: + torch.cuda.synchronize() + time_start = time.perf_counter() + logger.info("Rearranging experts %s...", + "(profile)" if is_profile else "") + + # This mapping is only used here, so we do not store it in the state + physical_expert_start = ep_rank * model.num_local_physical_experts + physical_expert_end = (physical_expert_start + + model.num_local_physical_experts) + # (num_moe_layers, num_local_physical_experts) + local_physical_to_logical_map = self.physical_to_logical_map[ + :, + physical_expert_start:physical_expert_end, + ] + + # Map the local physical expert load to global logical experts + logical_expert_load_window = torch.zeros( + self.expert_load_window_size, + model.num_moe_layers, + model.num_logical_experts, + dtype=self.expert_load_window.dtype, + device=self.expert_load_window.device, + ) + logical_expert_load_window.scatter_add_( + dim=-1, + index=local_physical_to_logical_map.unsqueeze(0).expand_as( + self.expert_load_window).long(), + src=self.expert_load_window, + ) + + # Perform all-reduce to get the expert load across all ranks + global_expert_load_window = logical_expert_load_window.sum(dim=0) + all_reduce(global_expert_load_window, group=ep_group) + + # TODO(bowen): Treat differently for prefill and decode nodes + num_replicas = model.num_physical_experts + num_groups = model.num_expert_groups + num_nodes = get_node_count() + num_devicess = ep_group.size() + + if num_devices % num_nodes != 0: + logger.warning_once( + f"num_deviceus % num_nodes != 0, " + "not using hierarchical rearrangement algorithm.\n" + f"{num_devices=}, {num_nodes=}") + + # Get new expert mappings + ( + self.new_physical_to_logical_map, + self.new_logical_to_physical_map, + self.new_logical_replica_count, + ) = (rebalance_experts( + global_expert_load_window, + num_replicas, + num_groups, + num_nodes, + num_devices, + )) + + self.cp= True + self.ep_group=ep_group + + #--------------------------------------------------------------------------------- + #到此为止最优排布已经计算完成,把计算到的排布保存到self,两种搬移都需要访问这些数据; + #这个函数会在推理4000步的时候调用,计算需要的中间变量并更新保存 + #--------------------------------------------------------------------------------- + def _async_loop(self, model): + experts_stream = torch.cuda.Stream() + """创建子线程异步事件循环,支持多GPU分布式通信""" + def thread_target(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + # 循环执行任务 + loop.run_until_complete(self._run_periodically(model, experts_stream)) + except Exception as e: + print(f"异步循环异常 (Rank {rank}): {e}") + finally: + # 清理资源 + loop.close() + dist.destroy_process_group() + + # 创建并启动子线程 + thread = threading.Thread(target=thread_target, daemon=True) + thread.start() + return thread + + + async def _run_periodically(self,model,stream): + """ + 被子线程调用,持续检查并在条件满足时执行搬运 + 进程应该什么时候停下来,终止条件待定 + """ + while not _stop_event.is_set(): + if not self.ep_buffer_ready and self.cp: + await eplb_worker( + self.physical_to_logical_map, + self.new_physical_to_logical_map, + model.expert_weights, + is_profile, + layer=self.layer, + cuda_stream=stream, + ) + self.layer+=1 + self.ep_buffer_ready = True # 执行完f后设置A为True + else: + await asyncio.sleep(0.1) # 短暂休眠避免CPU占用过高 + + + def move_to_workspace(self, model: MixtureOfExperts): + async_move_from_buffer( + expert_weights=model.expert_weights, + expert_weights_buffer=self.buffer,is_unchanged=self.is_unchanged, + is_received_locally=self.is_received_locally, + experts_recv_loc=self.experts_recv_loc, + new_indices=self.new_physical_to_logical_map, + ) + + def post_eplb(self, + model: MixtureOfExperts, + is_profile: bool = False) -> None: + + if not is_profile: + self.physical_to_logical_map.copy_(new_physical_to_logical_map) + self.logical_to_physical_map.copy_(new_logical_to_physical_map) + self.logical_replica_count.copy_(new_logical_replica_count) + + if is_main_rank: + assert time_start is not None + torch.cuda.synchronize() + time_end = time.perf_counter() + logger.info( + "Rearranged experts%sin %.2f seconds.", + " (profile) " if is_profile else " ", + time_end - time_start, + ) diff --git a/vllm/distributed/async_eplb/rebalance_algo.py b/vllm/distributed/async_eplb/rebalance_algo.py new file mode 100644 index 00000000000..7ad6d566b55 --- /dev/null +++ b/vllm/distributed/async_eplb/rebalance_algo.py @@ -0,0 +1,233 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +Expert parallelism load balancer (EPLB) for vLLM. + +This module implements the core rearrangement algorithm. + +The rearrangement algorithm is adapted from +[DeepSeek EPLB](https://github.com/deepseek-ai/eplb). + +Please find at [#12](https://github.com/deepseek-ai/EPLB/issues/12) an example +on how the EPLB algorithm works. +""" + +import torch + + +def balanced_packing(weight: torch.Tensor, + num_packs: int) -> tuple[torch.Tensor, torch.Tensor]: + """ + Pack n weighted objects to m packs, such that each bin contains exactly + n/m objects and the weights of all packs are as balanced as possible. + + Parameters: + weight: [X, n], the weight of each item + num_packs: number of packs + + Returns: + pack_index: [X, n], the pack index of each item + rank_in_pack: [X, n], the rank of the item in the pack + """ + num_layers, num_groups = weight.shape + assert num_groups % num_packs == 0 + groups_per_pack = num_groups // num_packs + + if groups_per_pack == 1: + pack_index = torch.arange(weight.size(-1), + dtype=torch.int64, + device=weight.device).expand(weight.shape) + rank_in_pack = torch.zeros_like(weight, dtype=torch.int64) + return pack_index, rank_in_pack + + indices = weight.float().sort(-1, descending=True).indices.cpu() + pack_index = torch.full_like(weight, + fill_value=-1, + dtype=torch.int64, + device="cpu") + rank_in_pack = torch.full_like(pack_index, fill_value=-1) + for i in range(num_layers): + pack_weights = [0] * num_packs + pack_items = [0] * num_packs + for group in indices[i]: + pack = min( + (i + for i in range(num_packs) if pack_items[i] < groups_per_pack), + key=pack_weights.__getitem__, + ) + assert pack_items[pack] < groups_per_pack + pack_index[i, group] = pack + rank_in_pack[i, group] = pack_items[pack] + pack_weights[pack] += weight[i, group] + pack_items[pack] += 1 + return pack_index, rank_in_pack + + +def replicate_experts( + weight: torch.Tensor, + num_phy: int) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + Replicate `num_log` experts to `num_phy` replicas, such that the maximum + load of all replicas is minimized. + + Parameters: + weight: [X, num_log] + num_phy: total number of experts after replication + + Returns: + phy2log: [X, num_phy], logical expert id of each physical expert + rank: [X, num_phy], the replica rank + logcnt: [X, num_log], number of replicas for each logical expert + """ + n, num_log = weight.shape + num_redundant = num_phy - num_log + assert num_redundant >= 0 + device = weight.device + phy2log = torch.arange(num_phy, dtype=torch.int64, + device=device).repeat(n, 1) + rank = torch.zeros(n, num_phy, dtype=torch.int64, device=device) + logcnt = torch.ones(n, num_log, dtype=torch.int64, device=device) + arangen = torch.arange(n, dtype=torch.int64, device=device) + for i in range(num_log, num_phy): + redundant_indices = (weight / logcnt).max(dim=-1).indices + phy2log[:, i] = redundant_indices + rank[:, i] = logcnt[arangen, redundant_indices] + logcnt[arangen, redundant_indices] += 1 + return phy2log, rank, logcnt + + +def rebalance_experts_hierarchical( + weight: torch.Tensor, + num_physical_experts: int, + num_groups: int, + num_nodes: int, + num_gpus: int, +): + """ + Parameters: + weight: [num_moe_layers, num_logical_experts] + num_physical_experts: number of physical experts after replication + num_groups: number of expert groups + num_nodes: number of server nodes, where the intra-node network + (e.g, NVLink) is faster + num_gpus: number of GPUs, must be a multiple of `num_nodes` + + Returns: + physical_to_logical_map: [num_moe_layers, num_physical_experts] + logical_to_physical_map: [num_moe_layers, num_logical_experts, X] + logical_count: [num_moe_layers, num_logical_experts] + """ + num_layers, num_logical_experts = weight.shape + assert num_logical_experts % num_groups == 0 + group_size = num_logical_experts // num_groups + assert num_groups % num_nodes == 0 + groups_per_node = num_groups // num_nodes + assert num_gpus % num_nodes == 0 + assert num_physical_experts % num_gpus == 0 + phy_experts_per_gpu = num_physical_experts // num_gpus + + def inverse(perm: torch.Tensor) -> torch.Tensor: + inv = torch.empty_like(perm) + inv.scatter_( + 1, + perm, + torch.arange(perm.size(1), dtype=torch.int64, + device=perm.device).expand(perm.shape), + ) + return inv + + # Step 1: pack groups to nodes + tokens_per_group = weight.unflatten(-1, (num_groups, group_size)).sum(-1) + group_pack_index, group_rank_in_pack = balanced_packing( + tokens_per_group, num_nodes) + log2mlog = (((group_pack_index * groups_per_node + group_rank_in_pack) * + group_size).unsqueeze(-1) + + torch.arange(group_size, + dtype=torch.int64, + device=group_pack_index.device)).flatten(-2) + mlog2log = inverse(log2mlog) + + # Step 2: construct redundant experts within nodes + # [num_layers * num_nodes, num_logical_experts // num_nodes] + tokens_per_mlog = weight.gather(-1, mlog2log).view( + -1, num_logical_experts // num_nodes) + phy2mlog, phyrank, mlogcnt = replicate_experts( + tokens_per_mlog, num_physical_experts // num_nodes) + + # Step 3: pack physical_experts to GPUs + # [num_layers * num_nodes, num_physical_experts // num_nodes] + tokens_per_phy = (tokens_per_mlog / mlogcnt).gather(-1, phy2mlog) + pack_index, rank_in_pack = balanced_packing(tokens_per_phy, + num_gpus // num_nodes) + phy2pphy = pack_index * phy_experts_per_gpu + rank_in_pack + pphy2phy = inverse(phy2pphy) + + pphy2mlog = phy2mlog.gather( + -1, pphy2phy) # [num_layers * num_nodes, num_log_per_nodes] + pphy2mlog = (pphy2mlog.view(num_layers, num_nodes, -1) + torch.arange( + 0, + num_logical_experts, + num_logical_experts // num_nodes, + device=group_pack_index.device, + ).view(1, -1, 1)).flatten(-2) + pphy2log = mlog2log.gather(-1, pphy2mlog) + pphyrank = phyrank.gather(-1, pphy2phy).view(num_layers, -1) + logcnt = mlogcnt.view(num_layers, -1).gather(-1, log2mlog) + return pphy2log, pphyrank, logcnt + + +def rebalance_experts( + weight: torch.Tensor, + num_replicas: int, + num_groups: int, + num_nodes: int, + num_gpus: int, +) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + Entry point for expert-parallelism load balancer. + + Parameters: + weight: [layers, num_logical_experts], the load statistics for all + logical experts + num_replicas: number of physical experts, must be a multiple of + `num_gpus` + num_groups: number of expert groups + num_nodes: number of server nodes, where the intra-node network + (e.g, NVLink) is faster + num_gpus: number of GPUs, must be a multiple of `num_nodes` + + Returns: + physical_to_logical_map: [layers, num_replicas], the expert index of + each replica + logical_to_physical_map: [layers, num_logical_experts, X], the replica + indices for each expert + expert_count: [layers, num_logical_experts], number of physical + replicas for each logical expert + """ + num_layers, num_logical_experts = weight.shape + weight = weight.float().cpu() + if num_groups % num_nodes == 0: + # use hierarchical load-balance policy + phy2log, phyrank, logcnt = rebalance_experts_hierarchical( + weight, num_replicas, num_groups, num_nodes, num_gpus) + else: + # use global load-balance policy + phy2log, phyrank, logcnt = rebalance_experts_hierarchical( + weight, num_replicas, 1, 1, num_gpus) + num_redundant_experts = num_replicas - num_logical_experts + maxlogcnt = num_redundant_experts + 1 + log2phy: torch.Tensor = torch.full( + (num_layers, num_logical_experts, maxlogcnt), + -1, + dtype=torch.int64, + device=logcnt.device, + ) + log2phy.view(num_layers, -1).scatter_( + -1, + phy2log * maxlogcnt + phyrank, + torch.arange(num_replicas, dtype=torch.int64, + device=log2phy.device).expand(num_layers, -1), + ) + return phy2log, log2phy, logcnt + + +__all__ = ["rebalance_experts"] diff --git a/vllm/distributed/async_eplb/rebalance_execute_async.py b/vllm/distributed/async_eplb/rebalance_execute_async.py new file mode 100644 index 00000000000..03e1571fc55 --- /dev/null +++ b/vllm/distributed/async_eplb/rebalance_execute_async.py @@ -0,0 +1,432 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +The actual execution of the rearrangement. + +This involves the exchange of expert weights between GPUs. +""" + +from collections.abc import Iterable, MutableSequence, Sequence +from functools import partial + +import threading +import torch +from torch.distributed import (P2POp, ProcessGroup, + batch_isend_irecv, barrier) + + +def idx_local_to_global( + local_idx: int, + local_cnt: int, + ep_rank: int, +) -> int: + """ + Convert a local expert index to a global expert index. + """ + return ep_rank * local_cnt + local_idx + + +def idx_global_to_local( + global_idx: int, + local_cnt: int, + ep_rank: int, +) -> int: + """ + Convert a global expert index to a local expert index. + """ + return global_idx - ep_rank * local_cnt + + +def global_idx_to_rank( + global_idx: int, + local_cnt: int, +) -> int: + """ + Convert a global expert index to a rank index. + """ + return global_idx // local_cnt + + +def get_ep_ranks_with_expert( + idx: int, + num_local_experts: int, + old_indices: Sequence[int], + new_indices: Sequence[int], +) -> tuple[MutableSequence[int], MutableSequence[int]]: + """ + Get the ranks of the experts that need to be exchanged. + + Args: + idx: The index of the expert. + num_local_experts: The number of local experts. + old_indices: The old indices of the experts. + new_indices: The new indices of the experts. + + Returns: + A tuple of two lists: + - The ranks of the experts that need to be sent. + - The ranks of the experts that need to be received. + """ + global2rank = partial( + global_idx_to_rank, + local_cnt=num_local_experts, + ) + + ranks_to_send: list[int] = [] + ranks_to_recv: list[int] = [] + + for i, e in enumerate(old_indices): + if e == idx: + rank = global2rank(i) + if not ranks_to_send or ranks_to_send[-1] != rank: + ranks_to_send.append(rank) + + for i, e in enumerate(new_indices): + if e == idx: + rank = global2rank(i) + if not ranks_to_recv or ranks_to_recv[-1] != rank: + ranks_to_recv.append(rank) + + # Remove those ranks that can get this expert locally. + ranks_to_send_set = set(ranks_to_send) + ranks_to_recv_actual = [ + rank for rank in ranks_to_recv if rank not in ranks_to_send_set + ] + + return ranks_to_send, ranks_to_recv_actual + +from typing import Sequence, Iterable, Set +from functools import partial + +import torch +import torch.distributed as dist +import threading +from functools import partial +from typing import Sequence, Iterable, Set, List, Optional + +# 假设 P2POp 和 batch_isend_irecv 已经定义 +class P2POp: + def __init__(self, func, tensor, dst): + self.func = func + self.tensor = tensor + self.dst = dst + +def batch_isend_irecv(ops: List[P2POp]): + reqs = [] + for op in ops: + if op.func == dist.isend: + reqs.append(op.func(op.tensor, op.dst)) + elif op.func == dist.irecv: + reqs.append(op.func(op.tensor, op.dst)) + return reqs + +def get_global_rank(group, rank): + return rank # 简化实现 + +def get_ep_ranks_with_expert(expert, num_local_experts, old_indices, new_indices): + # 简化实现 + return [0, 1], [0, 1] + +def idx_local_to_global(local_idx, local_cnt, ep_rank): + return local_idx + ep_rank * local_cnt + +def barrier(group): + dist.barrier(group=group) + +# 假设 GroupCoordinator 类已定义 +class GroupCoordinator: + def __init__(self, process_group): + self.device_group = process_group + +# 全局变量和获取函数 +_EP: Optional[GroupCoordinator] = None + +def get_ep_group() -> GroupCoordinator: + assert _EP is not None, ("expert parallel group is not initialized") + return _EP + +async def async_move_to_buffer( + layer_idx: int, + num_local_experts: int, + ep_rank: int, + old_indices: Sequence[int], + new_indices: Sequence[int], + expert_weights: Iterable[torch.Tensor], + expert_weights_buffer: Sequence[torch.Tensor], + cuda_stream: Optional[torch.cuda.Stream] = None, +) -> None: + """ + 异步将专家权重搬运到缓冲区,实现: + 1. 已完成发送的专家释放锁 + 2. 新存入buffer的专家添加锁(不释放,供后续使用) + """ + # 获取专家并行组 + ep_group = get_ep_group().device_group + + # 创建新的CUDA Stream + #cuda_stream = torch.cuda.Stream() + + expert_locks = [threading.Lock() for _ in range(num_local_experts)] + # 初始化锁定专家集合(跟踪新存入buffer的专家) + locked_experts = set() + + local2global = partial(idx_local_to_global, local_cnt=num_local_experts, ep_rank=ep_rank) + + # 标记未变化的专家(原逻辑保留) + is_unchanged = [ + old_indices[local2global(i)] == new_indices[local2global(i)] + for i in range(num_local_experts) + ] + + # 本地权重复制到缓冲区(原逻辑保留,本地复制完成后释放锁) + is_received_locally = is_unchanged[:] + for src in range(num_local_experts): + src_global = local2global(src) + for dst in range(num_local_experts): + dst_global = local2global(dst) + if is_received_locally[dst]: + continue + if old_indices[src_global] == new_indices[dst_global]: + # 本地复制时加锁,防止写入冲突 + expert_locks[dst].acquire() + try: + is_received_locally[dst] = True + with torch.cuda.stream(cuda_stream): + for weight, buffer in zip(expert_weights, expert_weights_buffer): + buffer[dst].copy_(weight[src]) + finally: + # 本地复制完成后释放锁(非新存入的跨进程专家,无需长期锁定) + expert_locks[dst].release() + + # 准备跨进程发送和接收操作(原逻辑保留) + p2p_ops: list[P2POp] = [] + + # 发送操作(修改:发送完成后释放锁) + experts_send_loc = {} + for src in range(num_local_experts): + expert = old_indices[local2global(src)] + if expert in experts_send_loc: + continue + experts_send_loc[expert] = src + + for expert, src in sorted(experts_send_loc.items()): + # 发送前为源专家加锁,确保发送时数据不被修改 + expert_locks[src].acquire() + try: + ranks_to_send, ranks_to_recv = get_ep_ranks_with_expert( + expert, num_local_experts, old_indices, new_indices + ) + # 计算目标进程并添加发送操作(原逻辑保留) + num_dst_per_sender = len(ranks_to_recv) // len(ranks_to_send) + sender_pos = ranks_to_send.index(ep_rank) + recv_begin = sender_pos * num_dst_per_sender + recv_end = recv_begin + num_dst_per_sender + recv_ranks = ranks_to_recv[recv_begin:recv_end] + remainder_start = len(ranks_to_send) * num_dst_per_sender + recver_pos = remainder_start + sender_pos + if recver_pos < len(ranks_to_recv): + recv_ranks.append(ranks_to_recv[recver_pos]) + + # 在指定CUDA Stream中执行发送操作 + with torch.cuda.stream(cuda_stream): + for dst in recv_ranks: + dst_global = get_global_rank(ep_group, dst) + p2p_ops += [ + P2POp(torch.distributed.isend, weight[src], dst_global) + for weight in expert_weights + ] + + # 执行当前专家的发送操作(在指定流中) + send_reqs = [op for op in p2p_ops if op.tensor is weight[src]] # 筛选当前专家的发送请求 + if send_reqs: + with torch.cuda.stream(cuda_stream): + reqs = batch_isend_irecv(send_reqs) + # 注意:这里使用 req.wait() 会同步该流,但不会阻塞其他流 + for req in reqs: + req.wait() + finally: + # 发送完成后释放锁(已完成发送的专家,无需继续锁定) + expert_locks[src].release() + + # 接收操作(修改:新存入buffer的专家添加锁并保持) + experts_recv_loc = {} + for dst in range(num_local_experts): + if is_received_locally[dst]: + continue + expert = new_indices[local2global(dst)] + if expert in experts_recv_loc: + continue + experts_recv_loc[expert] = dst + + # 【改动:临时字典存储每个目标专家的临时接收张量】 + temp_tensors = {} + for expert, dst in sorted(experts_recv_loc.items()): + ranks_to_send, ranks_to_recv = get_ep_ranks_with_expert( + expert, num_local_experts, old_indices, new_indices + ) + # 计算源进程(原逻辑保留) + num_dst_per_sender = len(ranks_to_recv) // len(ranks_to_send) + recver_pos = ranks_to_recv.index(ep_rank) + remainder_start = len(ranks_to_send) * num_dst_per_sender + if recver_pos < remainder_start: + src = ranks_to_send[recver_pos // num_dst_per_sender] + else: + src = ranks_to_send[recver_pos - remainder_start] + src_global = get_global_rank(ep_group, src) + + # 创建临时张量接收数据(原逻辑保留) + temp_tensor = torch.empty_like(expert_weights_buffer[0][dst]) + temp_tensors[dst] = temp_tensor + + # 在指定CUDA Stream中执行接收操作 + with torch.cuda.stream(cuda_stream): + p2p_ops += [ + P2POp(torch.distributed.irecv, temp_tensor, src_global) + for _ in expert_weights_buffer + ] + + # 执行异步P2P通信(在指定流中) + if p2p_ops: + with torch.cuda.stream(cuda_stream): + reqs = batch_isend_irecv(p2p_ops) + # 注意:这里使用 req.wait() 会同步该流,但不会阻塞其他流 + for req in reqs: + req.wait() + + # 【改动:将临时张量的数据写入缓冲区,为新存入的专家添加锁并保持】 + for dst in temp_tensors: + # 为新存入的专家加锁(防止后续被意外修改) + expert_locks[dst].acquire() + locked_experts.add(dst) # 记录锁定的专家,供后续函数参考 + try: + # 在指定CUDA Stream中将临时张量的数据复制到缓冲区 + with torch.cuda.stream(cuda_stream): + for weight, buffer in zip(expert_weights, expert_weights_buffer): + buffer[dst].copy_(temp_tensors[dst]) + except Exception as e: + # 若发生异常,释放锁避免死锁 + expert_locks[dst].release() + locked_experts.discard(dst) + raise e # 重新抛出异常,不隐藏错误 + + # 设置屏障(在指定CUDA Stream中) + with torch.cuda.stream(cuda_stream): + barrier(group=ep_group) + + # 等待所有操作在该流中完成 + cuda_stream.synchronize() + + # 此时缓冲区已准备好,但仍被锁定 + return is_unchanged, is_received_locally, experts_recv_loc + +def move_from_buffer( + expert_weights: Iterable[torch.Tensor], + expert_weights_buffer: Sequence[torch.Tensor], + is_unchanged: list[bool], + is_received_locally: list[bool], + experts_recv_loc: dict[int, int], + new_indices: Sequence[int], +) -> None: + """ + 将专家权重从缓冲区搬运到工作区 + 完成后释放锁 + """ + num_local_experts= len(expert_weights_buffer) + local2global = partial(idx_local_to_global, local_cnt=num_local_experts, ep_rank=ep_rank) + + # 将缓冲区的专家权重搬运到工作区 + # copy_操作为CUDA特定,需要抽象 + for dst in range(num_local_experts): + if is_unchanged[dst]: + continue + if is_received_locally[dst]: + for weight, buffer in zip(expert_weights, expert_weights_buffer): + weight[dst].copy_(buffer[dst]) + else: + expert = new_indices[local2global(dst)] + src = experts_recv_loc[expert] + for weight, buffer in zip(expert_weights, expert_weights_buffer): + weight[dst].copy_(buffer[src]) + + # 释放锁,允许其他层使用缓冲区 + ep_buf_ready=false + + +async def eplb_worker( + old_global_expert_indices: torch.Tensor, + new_global_expert_indices: torch.Tensor, + expert_weights: Sequence[Iterable[torch.Tensor]], + expert_weights_buffer, + is_profile: bool = False, + layer: int = 0, + cuda_stream : Optional[torch.cuda.Stream] = None, +) -> None: + """ + Rearranges the expert weights in place according to the new expert indices. + + The value of the indices arguments are logical indices of the experts, + while keys are physical. + + Args: + old_global_expert_indices: Shape (num_moe_layers, num_physical_experts). + new_global_expert_indices: Shape (num_moe_layers, num_physical_experts). + expert_weights: A sequence of shape (num_moe_layers)(weight_count) + of tensors of shape (num_local_physical_experts, hidden_size_i). + For example, a linear layer may have up and down projection, + so weight_count = 2. Each weight's hidden size can be different. + ep_group: The device process group for expert parallelism. + is_profile (bool): If `True`, do not perform any actual weight copy. + This is used during profile run, where we only perform dummy + communications to reserve enough memory for the buffers. + """ + num_moe_layers, num_physical_experts = old_global_expert_indices.shape + assert len(expert_weights) == num_moe_layers + + ep_group = get_ep_group().device_group + num_local_physical_experts = next(iter(expert_weights[0])).shape[0] + assert new_global_expert_indices.shape == (num_moe_layers, + num_physical_experts) + + ep_rank = ep_group.rank() + ep_size = ep_group.size() + assert num_physical_experts == ep_size * num_local_physical_experts + + # A buffer to hold the expert weights in one layer during the exchange. + # NOTE: Currently we assume the same weights across different layers + # have the same shape. + + #expert_weights_buffer = [torch.empty_like(w) for w in expert_weights[0]] + + if is_profile: + # Maximum send size is to send all local experts to all ranks, + # So we use a dummy `all_gather` to reserve enough communication buffer + for weight, buffer in zip(expert_weights[0], expert_weights_buffer): + # A `/dev/null`-like buffer to avoid real memory allocation + dummy_recv_buffer = [buffer for _ in range(ep_size)] + # NOTE(bowen): Needed this barrier to avoid OOM during actual + # execution. I'm not very sure why this is needed + torch.distributed.barrier() + all_gather( + dummy_recv_buffer, + weight, + group=ep_group, + ) + return + + torch.cuda.synchronize() + await async_move_to_buffer( + num_local_physical_experts, + ep_rank, + old_global_expert_indices[layer].tolist(), + new_global_expert_indices[layer].tolist(), + expert_weights[layer], + expert_weights_buffer, + ep_group, + cuda_stream=cuda_stream, + ) + # NOTE(bowen): We need this synchronize to run, but I don't know why. + # If you figure out the reason, please let me know -- thank you! + #N卡GPU专属操作,需要抽象 + + + +__all__ = ["rearrange_expert_weights_inplace"] diff --git a/vllm/model_executor/layers/fused_moe/layer.py b/vllm/model_executor/layers/fused_moe/layer.py index da772c11155..2bf8e7b9708 100644 --- a/vllm/model_executor/layers/fused_moe/layer.py +++ b/vllm/model_executor/layers/fused_moe/layer.py @@ -15,7 +15,8 @@ from vllm.distributed import (get_dp_group, get_ep_group, get_tensor_model_parallel_world_size, tensor_model_parallel_all_reduce) -from vllm.distributed.eplb.eplb_state import EplbState +#from vllm.distributed.eplb.eplb_state import EplbState +from vllm.distributed.async_eplb.eplb_state import EplbState from vllm.forward_context import ForwardContext, get_forward_context from vllm.logger import init_logger from vllm.model_executor.custom_op import CustomOp diff --git a/vllm/v1/worker/gpu_model_runner.py b/vllm/v1/worker/gpu_model_runner.py index 734df82589a..22217f49438 100644 --- a/vllm/v1/worker/gpu_model_runner.py +++ b/vllm/v1/worker/gpu_model_runner.py @@ -20,7 +20,8 @@ from vllm.compilation.counter import compilation_counter from vllm.config import (CompilationLevel, VllmConfig, get_layers_from_vllm_config, update_config) -from vllm.distributed.eplb.eplb_state import EplbState +#from vllm.distributed.eplb.eplb_state import EplbState +from vllm.distributed.async_eplb.eplb_state import EplbState from vllm.distributed.kv_transfer import (get_kv_transfer_group, has_kv_transfer_group) from vllm.distributed.kv_transfer.kv_connector.v1 import KVConnectorBase_V1