Skip to content

Commit 969751a

Browse files
author
lt
committed
merge from remote default branch and fix conflict
2 parents 9d9c93a + 1b7b87b commit 969751a

File tree

5 files changed

+113
-61
lines changed

5 files changed

+113
-61
lines changed

vllm_ascend/eplb/core/policy/dynamic_ep_v2.py

Lines changed: 56 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,30 @@ def non_redundant_expert_information(origin_deployment, updated_weights, num_rad
383383
return device_assignments, device_weights, device_loads, device_counts
384384

385385
@staticmethod
386-
def distribute_redun_experts(device_assignments, device_weights, device_loads, device_counts, redundant_expert_list,
386+
def recomputing_weight(layer_workloads, device_assignments, device_weights, device_loads):
387+
# 统计专家出现次数
388+
num_all_experts = [0] * len(layer_workloads)
389+
num_devices = len(device_assignments)
390+
for device_id in range(num_devices):
391+
num_expert_per_npu = len(device_assignments[device_id])
392+
for idx in range(num_expert_per_npu):
393+
num_all_experts[idx] += device_assignments[device_id][idx]
394+
395+
for device_id in range(num_devices):
396+
num_expert_per_npu = len(device_weights[device_id])
397+
total_weight = 0.0
398+
for idx in range(num_expert_per_npu):
399+
expert_id = device_assignments[device_id][idx]
400+
if num_all_experts[expert_id] == 0:
401+
print("Error: Division by zero")
402+
device_weights[device_id][idx] = layer_workloads[expert_id] / num_all_experts[expert_id]
403+
total_weight += device_weights[device_id][idx]
404+
device_loads[device_id] = total_weight
405+
406+
return device_weights, device_loads
407+
408+
@staticmethod
409+
def distribute_redun_experts(self, layer_workloads, device_assignments, device_weights, device_loads, device_counts, redundant_expert_list,
387410
items_per_device, expert_form_device, num_experts):
388411

389412
num_devices = len(device_assignments)
@@ -411,18 +434,16 @@ def distribute_redun_experts(device_assignments, device_weights, device_loads, d
411434
communication_box_index = expert_form_device[expert_id]
412435
com_between_devices[candidate][communication_box_index] = expert_id
413436
# 极端情况下存在冗余专家没装箱 导致箱子有空位 随机填入专家 待优化
437+
flag = False
414438
for dev_id in range(num_devices):
415439
# 检查容量限制
416440
if device_counts[dev_id] < items_per_device:
417441
# 遍历合适的专家
418442
for expert_id in range(num_experts):
419443
if expert_id not in device_assignments[dev_id]:
420-
# 找到对应权重
421-
weight = 0
422-
for i in range(num_devices):
423-
for j in range(len(device_assignments[i])):
424-
if expert_id == device_assignments[i][j]:
425-
weight = device_weights[i][j]
444+
flag = True
445+
# 随机初始化一个权重
446+
weight = 0.0
426447
# 和该专家相关的卡权重发生变化 待修改
427448
device_assignments[dev_id].insert(0, expert_id)
428449
device_weights[dev_id].insert(0, weight)
@@ -432,12 +453,14 @@ def distribute_redun_experts(device_assignments, device_weights, device_loads, d
432453
communication_box_index = expert_form_device[expert_id]
433454
com_between_devices[dev_id][communication_box_index] = expert_id
434455
break
435-
#todo 重新生成权重
456+
457+
if flag:
458+
device_weights, device_loads = self.recomputing_weight(layer_workloads, device_assignments, device_weights, device_loads)
436459

437460
return device_assignments, device_weights, device_loads, device_counts, com_between_devices
438461

439462
@staticmethod
440-
def redundancy_again(self, origin_weights, num_redundant_experts, origin_deployment, expert_form_device, num_node,
463+
def redundancy_again(self, layer_workloads, origin_weights, num_redundant_experts, origin_deployment, expert_form_device, num_node,
441464
is_node_redundant):
442465

443466
# 每张卡上专家数量
@@ -461,6 +484,8 @@ def redundancy_again(self, origin_weights, num_redundant_experts, origin_deploym
461484

462485
# 新计算的冗余专家进行分配
463486
device_assignments, device_weights, device_loads, device_counts, com_between_devices = self.distribute_redun_experts(
487+
self,
488+
layer_workloads,
464489
device_assignments,
465490
device_weights,
466491
device_loads,
@@ -554,6 +579,7 @@ def redundant_expert_deployment(self, layer_workloads, original_deployment, expe
554579

555580
cur_device_assignments, cur_device_weights, cur_device_loads, cur_device_counts, cur_com_between_devices = self.redundancy_again(
556581
self,
582+
layer_workloads,
557583
cur_node_weights,
558584
per_node_redun_expert_num,
559585
cur_original_deployment,
@@ -569,6 +595,7 @@ def redundant_expert_deployment(self, layer_workloads, original_deployment, expe
569595
else:
570596
device_assignments, device_weights, device_loads, device_counts, com_between_devices = self.redundancy_again(
571597
self,
598+
layer_workloads,
572599
weights,
573600
redundancy_expert_num,
574601
original_deployment,
@@ -583,7 +610,7 @@ def redundant_expert_deployment(self, layer_workloads, original_deployment, expe
583610

584611
@staticmethod
585612
def two_device_exchange_experts(cur_device_result, exchange_device_result, cur_exchanged_expert_id,
586-
next_exchanged_expert_id, ave_workload, increment, num_redundancy_expert):
613+
next_exchanged_expert_id, ave_workload, increment, num_redundancy_expert, cur_org_placement, next_org_placement):
587614

588615
cur_device_weight = cur_device_result['expert_weights']
589616
next_device_weight = exchange_device_result['expert_weights']
@@ -609,7 +636,8 @@ def two_device_exchange_experts(cur_device_result, exchange_device_result, cur_e
609636
continue
610637
# 交换专家限制卡内专家不同
611638
change_flag = True
612-
if cur_device_expert_id[index] in next_device_expert_id or next_device_expert_id[next_index] in cur_device_expert_id:
639+
if ((cur_device_expert_id[index] in next_device_expert_id or next_device_expert_id[next_index] in cur_device_expert_id) or
640+
(cur_org_placement[0] == next_device_expert_id[next_index] or next_org_placement[0] == cur_device_expert_id[index])):
613641
change_flag = False
614642
# 选择的专家不能是参与过交换的
615643
if (cur_device_expert_id[index] not in cur_exchanged_expert_id) and (
@@ -627,8 +655,7 @@ def two_device_exchange_experts(cur_device_result, exchange_device_result, cur_e
627655

628656
@staticmethod
629657
def expert_exchange_between_devices(self, ave_workload, increment, cur_layer_result, com_between_devices, num_redundancy_expert,
630-
node_idx=0,
631-
per_node_device_num=0, is_node_redundant=False):
658+
org_placement_table, node_idx=0, per_node_device_num=0, is_node_redundant=False):
632659

633660
if is_node_redundant:
634661
# 拿出当前节点内设备的信息
@@ -677,7 +704,9 @@ def expert_exchange_between_devices(self, ave_workload, increment, cur_layer_res
677704
next_exchanged_expert_id,
678705
ave_workload,
679706
increment,
680-
num_redundancy_expert)
707+
num_redundancy_expert,
708+
org_placement_table[max_weight_device_id],
709+
org_placement_table[min_weight_device_id])
681710

682711
# 有符合条件的专家进行交换
683712
if cur_exchange_index != -1:
@@ -700,7 +729,7 @@ def expert_exchange_between_devices(self, ave_workload, increment, cur_layer_res
700729

701730
@staticmethod
702731
def exchange_experts(self, layer_result, layer_com_between_devices, num_nodes, device_num, is_node_redundant,
703-
ave_workload, increment, num_redundancy_expert):
732+
ave_workload, increment, num_redundancy_expert, org_placement_table):
704733

705734
global_deployment = []
706735

@@ -709,9 +738,9 @@ def exchange_experts(self, layer_result, layer_com_between_devices, num_nodes, d
709738
for node_idx in range(num_nodes):
710739
self.expert_exchange_between_devices(self, ave_workload, increment, layer_result,
711740
layer_com_between_devices, num_redundancy_expert,
712-
node_idx, per_node_device_num, is_node_redundant)
741+
org_placement_table, node_idx, per_node_device_num, is_node_redundant)
713742
else:
714-
self.expert_exchange_between_devices(self, ave_workload, increment, layer_result, layer_com_between_devices, num_redundancy_expert)
743+
self.expert_exchange_between_devices(self, ave_workload, increment, layer_result, layer_com_between_devices, num_redundancy_expert, org_placement_table)
715744

716745
max_workload = 0
717746
for box in layer_result:
@@ -734,14 +763,15 @@ def count_elements(self, lst):
734763
return count
735764

736765
def rebalance_experts(self, current_expert_table, expert_workload):
737-
766+
# 输入:当前专家部署信息和对应的负载信息,形状为layer_num, num_npus, experts_per_npu
738767
info = DynamicTable()
739-
info.workload_table = np.array(expert_workload)
740-
info.placement_table = np.array(current_expert_table)
768+
info.workload_table = expert_workload.numpy()
769+
info.placement_table = current_expert_table.numpy()
741770
layer_num, num_npus, experts_per_npu = info.workload_table.shape
742771
expert_ids, counts = np.unique(info.placement_table[0], return_counts=True)
743772
num_redundancy_expert = self.get_redundant_num(num_npus, counts)
744773
num_original_expert = len(expert_ids)
774+
# 负载信息转化为 58 * 256
745775
layer_workloads = self.add_redundant(info.placement_table, info.workload_table, num_original_expert)
746776
max_heat_per_layer_before = self.calculate_max_heat_per_layer(info.workload_table, layer_num)
747777
npu_heat_all_origin = sum(max_heat_per_layer_before)
@@ -764,50 +794,31 @@ def rebalance_experts(self, current_expert_table, expert_workload):
764794
# 每个卡部署的专家数量 一个冗余专家
765795
global_deployment = [[[] for _ in range(num_npus)] for _ in range(layer_num)]
766796
# 统计更换数据集后的初始58层不均衡度
767-
layer_initial_imbalance = self.calculate_initial_imbalance(current_expert_table, layer_workloads)
797+
layer_initial_imbalance = self.calculate_initial_imbalance(info.placement_table, layer_workloads)
768798
# 遍历获得每一层的放置策略,考虑计算均衡
769799
max_heat_per_layer_after = np.zeros([layer_num])
770800
sum_num = 0
771801
for layer in range(layer_num):
772802
# 不均衡度小于特定阈值不调整
773803
if layer_initial_imbalance[layer] < 1.1:
774-
global_deployment[layer] = current_expert_table[layer]
804+
global_deployment[layer] = info.placement_table[layer]
775805
continue
776806

777807
ave_workload = np.sum(layer_workloads[layer]) / num_npus
778-
for device_id, device in enumerate(current_expert_table[layer]):
808+
for device_id, device in enumerate(info.placement_table[layer]):
779809
for index, expert_id in enumerate(device):
780810
if index != 0:
781811
expert_from_device[layer][expert_id] = device_id
782812

783813
# 调整冗余专家
784814
result, max_workload, com_between_devices = self.redundant_expert_deployment(self, layer_workloads[layer],
785-
current_expert_table[layer],
815+
info.placement_table[layer],
786816
expert_from_device[layer],
787817
num_node, False)
788818
# 交换专家
789819
global_deployment[layer], new_max_workload = self.exchange_experts(self, result, com_between_devices,
790820
num_node, num_npus, False, ave_workload,
791-
0.05, num_redundancy_expert)
792-
793-
# To guarantee there is no expert movement inside a NPU
794-
start_physical_idx = 1 if num_redundancy_expert else 0
795-
for rank in range(num_npus):
796-
physical_expert = start_physical_idx
797-
while physical_expert in range(start_physical_idx, experts_per_npu):
798-
# skip the expert which is moved into this rank
799-
if global_deployment[layer][rank][physical_expert] not in current_expert_table[layer, rank, :]:
800-
physical_expert += 1
801-
continue
802-
803-
if global_deployment[layer][rank][physical_expert] != current_expert_table[layer][rank][physical_expert]:
804-
right_idx = np.where(current_expert_table[layer][rank] == global_deployment[layer][rank][physical_expert])[0][0]
805-
# exchange expert with the expert on the right physical index
806-
tempt = global_deployment[layer][rank][right_idx]
807-
global_deployment[layer][rank][right_idx] = global_deployment[layer][rank][physical_expert]
808-
global_deployment[layer][rank][physical_expert] = tempt
809-
else:
810-
physical_expert += 1
821+
0.05, num_redundancy_expert, info.placement_table[layer])
811822

812823
for device_id in range(num_npus):
813824
com_between_devices[device_id] = {int(key): int(value) for key, value in
@@ -828,8 +839,4 @@ def rebalance_experts(self, current_expert_table, expert_workload):
828839
if npu_heat_all_after < 0.95 * npu_heat_all_origin:
829840
change = 1
830841

831-
return change, per_layer_priority, np.array(global_deployment).tolist()
832-
833-
834-
835-
842+
return change, per_layer_priority, np.array(global_deployment).tolist()

vllm_ascend/eplb/core/worker/eplb_worker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,13 @@ def do_update(self):
6262

6363
#根据负载信息,获取更新后的专家表
6464
load_info, old_placement = self.global2local(load_info, self.old_expert_maps, self.num_local_experts)
65-
self.shared_dict["load_info"] = load_info
6665
changed, priority, new_placement = self.calculate_rebalance_experts(load_info, old_placement)
6766

6867
if not torch.is_tensor(new_placement):
6968
new_placement = torch.tensor(new_placement)
7069
self.check_expert_placement(old_placement, new_placement)
7170
new_expert_maps = self.local2global(new_placement)
72-
71+
self.update_expert_map(new_expert_maps)
7372
logger.debug(f"[EPLB Process new_map differs, performing D2D")
7473

7574
update_info = self.compose_expert_update_info_bipartite(new_expert_maps, self.old_expert_maps)\

vllm_ascend/eplb/eplb_updator.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ def init_eplb(self, expert_map_path):
7474
"moe_load": None,
7575
# 所有的专家表[num_layers, world_size, num_experts]
7676
"expert_maps": None,
77-
# 热度负载信息 [num_layers, world_size, local_num_experts]
78-
"load_info": None,
7977
})
8078

8179
self.eplb = EplbProcess(
@@ -235,11 +233,28 @@ def unpack_update_batch(self, packed_update_info):
235233
]
236234
return recovered
237235

238-
def get_expert_load(self) -> torch.Tensor:
239-
load_info = self.shared_dict["load_info"] # Tensor [L, W, local_experts_num]
240-
logger.info(f"lt -- load_info {load_info=}...")
241-
return load_info
242-
236+
def get_expert_load(self):
237+
expert_maps = self.shared_dict["expert_maps"]
238+
moe_load = self.shared_dict["moe_load"] # Tensor [L, W, global_experts_num]
239+
num_local_experts = expert_maps.max() + 1
240+
load_info, _ = ExpertMapUtils.global2local_load(moe_load, expert_maps, num_local_experts)
241+
242+
L, W, _ = load_info.shape
243+
244+
expert_load: Dict[str, List[dict]] = {}
245+
for c in range(W):
246+
layers: List[dict] = []
247+
for l in range(L):
248+
counts_1d = load_info[l, c]
249+
250+
layer_val = {
251+
f"expert_{e}": int(v)
252+
for e, v in enumerate(counts_1d.tolist())
253+
}
254+
layers.append({f"layer_{l}": layer_val})
255+
expert_load[f"card_{c}"] = layers
256+
257+
return {"expert_load": expert_load}
243258

244259
def update_expert_load_statistical_period(self, num_expert_load_gather: int, num_iterations: int):
245260
logger.info(f" start update {self.num_expert_load_gather=}, {self.num_iterations}...")

vllm_ascend/eplb/tool/eplb_utils.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,33 @@ def global2local(cls,
8383
pt_local[g_idx, slot_idx] = k_idx
8484

8585
return pt_local
86+
87+
@classmethod
88+
def global2local_load(self,
89+
workload: torch.Tensor,
90+
placement: torch.Tensor,
91+
E_local: int
92+
) -> tuple[torch.Tensor, torch.Tensor]:
93+
94+
L, G, _ = placement.shape
95+
device = placement.device
96+
97+
wt_local = torch.full((L, G, E_local),
98+
fill_value=-1,
99+
dtype=workload.dtype,
100+
device=device)
101+
pt_local = torch.full((L, G, E_local),
102+
fill_value=-1,
103+
dtype=torch.long,
104+
device=device)
105+
106+
valid = placement >= 0
107+
l_idx, g_idx, k_idx = valid.nonzero(as_tuple=True)
108+
109+
slot_idx = placement[l_idx, g_idx, k_idx]
110+
values = workload[l_idx, g_idx, k_idx]
111+
112+
wt_local[l_idx, g_idx, slot_idx] = values
113+
pt_local[l_idx, g_idx, slot_idx] = k_idx
114+
115+
return wt_local, pt_local

vllm_ascend/worker/model_runner_v1.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1231,15 +1231,16 @@ def execute_model(
12311231
intermediate_tensors: Optional[IntermediateTensors] = None,
12321232
) -> Union[ModelRunnerOutput, torch.Tensor]:
12331233

1234-
if self.dynamic_eplb:
1235-
self.eplb_updator.forward_before()
1236-
12371234
with ProfileExecuteDuration().capture_async(
12381235
"prepare input and forward"):
12391236
self._update_states(scheduler_output)
12401237
if not scheduler_output.total_num_scheduled_tokens:
12411238
# Return empty ModelRunnerOuptut if there's no work to do.
12421239
return EMPTY_MODEL_RUNNER_OUTPUT
1240+
1241+
if self.dynamic_eplb:
1242+
self.eplb_updator.forward_before()
1243+
12431244
(attn_metadata, hidden_states, spec_decode_metadata, positions,
12441245
num_scheduled_tokens,
12451246
sample_indices) = (self._process_reqs(scheduler_output,

0 commit comments

Comments
 (0)