22
22
from vllm_ascend .eplb .core .worker .eplb_worker import EplbProcess
23
23
from vllm_ascend .eplb .core .loader .device_transfer_loader import D2DExpertWeightLoader
24
24
25
+
25
26
class EplbUpdator :
26
27
27
28
def __init__ (self , redundant_enable ):
@@ -34,7 +35,7 @@ def set_adaptor(self, adaptor):
34
35
35
36
def init_eplb (self , redundant_enable ):
36
37
37
- self .redundant_enable = redundant_enable
38
+ self .redundant_enable = redundant_enable
38
39
self .num_iterations : torch .int64 = 130
39
40
40
41
self .weight_update_counter = 0
@@ -63,18 +64,25 @@ def init_eplb(self, redundant_enable):
63
64
})
64
65
65
66
self .eplb = EplbProcess (
66
- shared_dict = self .shared_dict ,
67
- planner_q = self .planner_block_queue ,
68
- block_update_q = self .block_update_queue ,
69
- redundant_enable = self .redundant_enable ,
70
- policy_type = 2 ,
71
- enable_d2d = True
67
+ shared_dict = self .shared_dict ,
68
+ planner_q = self .planner_block_queue ,
69
+ block_update_q = self .block_update_queue ,
70
+ redundant_enable = self .redundant_enable ,
71
+ policy_type = 2 ,
72
+ enable_d2d = True
72
73
)
73
74
74
75
self .eplb_process = self .eplb ._launch_process ()
75
76
77
+ # todo - 新增 eplb 周期统计
78
+
79
+
76
80
logger .info (f"[ModelRunner] Launched EPLB process (pid={ self .eplb_process .pid } )" )
77
81
82
+ def get_expert_load (self ) -> str :
83
+ """todo 确认moe_load的值是什么类型"""
84
+ # return '{"a":"b"}' # mock
85
+ return self .shared_dict ['moe_load' ]
78
86
79
87
def get_update_iteration (self ):
80
88
self .cur_iterations = self .cur_iterations + 1
@@ -101,14 +109,16 @@ def forward_before(self):
101
109
self .weight_loading = True
102
110
103
111
if self .update_in_flight and self .weight_loading and self .weight_update_counter < self .num_moe_layers :
104
- (expert_send_info , expert_recv_info , updated_expert_map , log2phy_map , layer_id ) = self .update_info_all .pop (0 )
112
+ (expert_send_info , expert_recv_info , updated_expert_map , log2phy_map , layer_id ) = self .update_info_all .pop (
113
+ 0 )
105
114
rank_id = torch .distributed .get_rank ()
106
115
self .eplb_loader .set_log2phy_map (log2phy_map )
107
116
expert_send_info_this_rank = expert_send_info [rank_id ] if rank_id in expert_send_info else []
108
117
expert_recv_info_this_rank = expert_recv_info [rank_id ] if rank_id in expert_recv_info else []
109
- #logger.info(f"check update info, layer = {layer_id}, send = {expert_send_info_this_rank}, recv = {expert_recv_info_this_rank}")
118
+ # logger.info(f"check update info, layer = {layer_id}, send = {expert_send_info_this_rank}, recv = {expert_recv_info_this_rank}")
110
119
self .eplb_loader .generate_expert_d2d_transfer_task (expert_send_info_this_rank ,
111
- expert_recv_info_this_rank , updated_expert_map [rank_id ], layer_id + 3 )
120
+ expert_recv_info_this_rank , updated_expert_map [rank_id ],
121
+ layer_id + 3 )
112
122
self .weight_update_counter += 1
113
123
if self .weight_update_counter == self .num_moe_layers :
114
124
self .weight_update_counter = 0
@@ -177,7 +187,7 @@ def warm_up_eplb(self):
177
187
continue
178
188
comm_op_list .append (
179
189
dist .P2POp (dist .irecv , src_tensor , src_rank )
180
- )
190
+ )
181
191
if comm_op_list :
182
192
reqs = dist .batch_isend_irecv (comm_op_list )
183
193
0 commit comments