14
14
# limitations under the License.
15
15
# This file is a part of the vllm-ascend project.
16
16
#
17
+
17
18
import torch
19
+ from typing import Dict , List
18
20
import torch .distributed as dist
19
21
import vllm .envs as envs
20
22
from multiprocessing import Queue , Manager
21
23
22
24
from vllm .logger import logger
23
25
from vllm_ascend .eplb .core .worker .eplb_worker import EplbProcess
24
26
from vllm_ascend .eplb .core .loader .device_transfer_loader import D2DExpertWeightLoader
27
+ from vllm_ascend .eplb .tool .eplb_utils import ExpertMapUtils
25
28
26
29
class EplbUpdator :
27
30
@@ -32,6 +35,7 @@ def set_adaptor(self, adaptor):
32
35
self .adaptor = adaptor
33
36
self .eplb_loader = D2DExpertWeightLoader (eplb_adaptor = self .adaptor )
34
37
self .num_moe_layers = self .adaptor .num_moe_layers
38
+ self .global_expert_num = self .adaptor .global_expert_num
35
39
36
40
def init_eplb (self , expert_map_path ):
37
41
self .num_expert_load_gather = 10
@@ -57,7 +61,7 @@ def init_eplb(self, expert_map_path):
57
61
self .cur_iterations : torch .int64 = 0
58
62
59
63
self .wait_worker_iterations : torch .int64 = 0
60
- self .num_wait_worker_iterations : torch .int64 = 10
64
+ self .num_wait_worker_iterations : torch .int64 = 20
61
65
62
66
self .planner_block_queue = Queue ()
63
67
self .block_update_queue = Queue (maxsize = 1 )
@@ -69,7 +73,9 @@ def init_eplb(self, expert_map_path):
69
73
# 热度负载信息 [num_layers, world_size, num_experts]
70
74
"moe_load" : None ,
71
75
# 所有的专家表[num_layers, world_size, num_experts]
72
- "expert_maps" : None
76
+ "expert_maps" : None ,
77
+ # 热度负载信息 [num_layers, world_size, local_num_experts]
78
+ "load_info" : None ,
73
79
})
74
80
75
81
self .eplb = EplbProcess (
@@ -125,30 +131,31 @@ def forward_before(self):
125
131
self .weight_update_counter = 0
126
132
self .update_in_flight = False
127
133
self .update_info_all = []
128
-
129
134
# set asynchronous stream for d2d expert weight update
130
135
self .reqs = []
131
136
self .eplb_loader .asyn_expert_weight_transfer (self .reqs )
132
137
138
+
133
139
def forward_end (self ,dummy_run = False ):
134
- self .adaptor .get_rank_expert_workload ( self . num_moe_layers , dummy_run )
135
- if not self .update_in_flight :
136
- load_gather_iteration , update_iteration = self .get_update_iteration ()
137
- if load_gather_iteration :
138
- moe_load = self .compute_and_set_moe_load (dummy_run )
139
- if update_iteration :
140
- self .wakeup_eplb_worker ()
141
- self .update_in_flight = True
142
- self .wait_worker_iterations = 0
143
- self .weight_loading = False
144
-
145
- if self .update_in_flight :
146
- self .wait_worker_iterations = self .wait_worker_iterations + 1
147
-
148
- self .eplb_loader .update_expert_map_and_weight (self .reqs , self .redundant_enable )
140
+ self .adaptor .collect_topk_ids ( dummy_run )
141
+ if not self .update_in_flight :
142
+ load_gather_iteration , update_iteration = self .get_update_iteration ()
143
+ if load_gather_iteration :
144
+ moe_load = self .compute_and_set_moe_load ()
145
+ if update_iteration :
146
+ self .wakeup_eplb_worker ()
147
+ self .update_in_flight = True
148
+ self .wait_worker_iterations = 0
149
+ self .weight_loading = False
150
+
151
+ if self .update_in_flight :
152
+ self .wait_worker_iterations = self .wait_worker_iterations + 1
153
+
154
+ self .eplb_loader .update_expert_map_and_weight (self .reqs , self .redundant_enable )
149
155
150
156
def compute_and_set_moe_load (self ,dummy_run = False ):
151
- local_load = self .adaptor .get_rank_expert_workload (self .num_moe_layers ,dummy_run )
157
+ local_load = self .adaptor .get_rank_expert_workload ()
158
+
152
159
self ._gather_buffer = None
153
160
if dist .is_initialized ():
154
161
self .world_size = dist .get_world_size ()
@@ -173,7 +180,7 @@ def compute_and_set_moe_load(self,dummy_run=False):
173
180
def warm_up_eplb (self ):
174
181
175
182
self .get_init_expert_map ()
176
-
183
+ self . adaptor . collect_topk_ids ( dummy_run = False )
177
184
self .compute_and_set_moe_load ()
178
185
179
186
src_tensor = torch .empty ((1 ,), device = self .device )
@@ -228,29 +235,18 @@ def unpack_update_batch(self, packed_update_info):
228
235
]
229
236
return recovered
230
237
231
- def get_expert_load (self ) -> str :
232
-
233
- # todo wjh 给到返回值
234
- # return self.shared_dict['moe_load']
235
- # mock json_str
236
- experts_load = ('{\" expert_load\" :['
237
- '{\" ip\" :\" 141.xxx.xxx.181\" ,'
238
- '\" node_0\" :'
239
- '{\" card_0\" :'
240
- '[{\" layer_4\" :{\" expert_0\" :3,\" expert_2\" :1}},{\" layer_5\" :{\" expert_0\" :3,\" expert_2\" :1}}],'
241
- '\" card_1\" :[{\" layer_4\" :{\" expert_1\" :3,\" expert_3\" :1},\" layer_5\" :{\" expert_0\" :3,\" '
242
- 'expert_2\" :1}}]}},{\" ip\" :\" 141.xxx.xxx.177\" ,\" node_0\" :{\" card_0\" :[{\" layer_4\" :'
243
- '{\" expert_0\" :3,\" expert_2\" :1}},{\" layer_5\" :{\" expert_0\" :3,\" expert_2\" :1}}],'
244
- '\" card_1\" :[{\" layer_4\" :{\" expert_1\" :3,\" expert_3\" :1}}]}}]}' )
245
- return experts_load
238
+ def get_expert_load (self ) -> torch .Tensor :
239
+ load_info = self .shared_dict ["load_info" ] # Tensor [L, W, local_experts_num]
240
+ logger .info (f"lt -- load_info { load_info = } ..." )
241
+ return load_info
242
+
246
243
247
244
def update_expert_load_statistical_period (self , num_expert_load_gather : int , num_iterations : int ):
248
245
logger .info (f" start update { self .num_expert_load_gather = } , { self .num_iterations } ..." )
249
246
self .num_expert_load_gather = num_expert_load_gather
250
247
self .num_iterations = num_iterations
251
248
logger .info (f" update { self .num_expert_load_gather = } , { self .num_iterations } success..." )
252
249
253
-
254
250
def shutdown (self ):
255
251
"""
256
252
Clean up the EPLB process.
0 commit comments