From 72aeb696a5e8426b20647679751b6807c40da7b1 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 24 Jun 2025 21:20:46 +0800 Subject: [PATCH 01/15] rm router logits Improve TTOP 3ms Signed-off-by: ttanzhiqiang <389825161@qq.com> --- examples/run_dp_attention_etp16.sh | 1 + vllm_ascend/envs.py | 2 ++ vllm_ascend/models/deepseek_v2.py | 8 ++++++-- vllm_ascend/ops/fused_moe.py | 16 +++++++++++----- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/examples/run_dp_attention_etp16.sh b/examples/run_dp_attention_etp16.sh index b736492411..72885f00d3 100644 --- a/examples/run_dp_attention_etp16.sh +++ b/examples/run_dp_attention_etp16.sh @@ -5,6 +5,7 @@ source /usr/local/Ascend/ascend-toolkit/set_env.sh source /usr/local/Ascend/nnal/atb/set_env.sh export ASCEND_LAUNCH_BLOCKING=0 export VLLM_VERSION=0.9.0 +export VLLM_ASCEND_RM_ROUTER_LOGITS=1 nohup python -m vllm.entrypoints.openai.api_server --model=/mnt/deepseek/DeepSeek-R1-W8A8-VLLM \ --quantization ascend \ diff --git a/vllm_ascend/envs.py b/vllm_ascend/envs.py index 6599241627..2042992788 100644 --- a/vllm_ascend/envs.py +++ b/vllm_ascend/envs.py @@ -121,6 +121,8 @@ # value to False to disable the optimized model. "USE_OPTIMIZED_MODEL": lambda: bool(int(os.getenv('USE_OPTIMIZED_MODEL', '1'))), + # Remove the two communications of get_dp_group().all_gather and change it to one, and do gate after the communication + "VLLM_ASCEND_RM_ROUTER_LOGITS": lambda: int(os.getenv("VLLM_ASCEND_RM_ROUTER_LOGITS", 0)), } # end-env-vars-definition diff --git a/vllm_ascend/models/deepseek_v2.py b/vllm_ascend/models/deepseek_v2.py index e96b2e9847..f415713160 100644 --- a/vllm_ascend/models/deepseek_v2.py +++ b/vllm_ascend/models/deepseek_v2.py @@ -71,7 +71,7 @@ from vllm_ascend.quantization.w8a8_dynamic import AscendW8A8DynamicLinearMethod from vllm_ascend.utils import (dispose_tensor, npu_stream_switch, npu_wait_tensor) - +import vllm_ascend.envs as envs_ascend class CustomDeepseekV2SiluAndMul(SiluAndMul): @@ -288,6 +288,7 @@ def __init__( self.ep_group = get_ep_group() self.params_dtype = torch.get_default_dtype() + self.rm_router_logits = envs_ascend.VLLM_ASCEND_RM_ROUTER_LOGITS def forward( self, @@ -309,7 +310,9 @@ def forward( is_prefill = is_prefill or attn_metadata.with_prefill_across_dp # router_logits: (num_tokens, n_experts) - router_logits, _ = self.gate(hidden_states) + router_logits = None + if not self.rm_router_logits: + router_logits, _ = self.gate(hidden_states) experts_hidden_states = self.experts( hidden_states=hidden_states, @@ -318,6 +321,7 @@ def forward( top_k=CustomDeepseekV2MoE.top_k, enable_force_load_balance=enable_force_load_balance, shared_experts=self.shared_experts, + gate=self.gate ) hidden_states = ( diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index d65f12c93f..76fb263bc4 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1137,6 +1137,7 @@ def __init__( self.activation = activation self.log2phy = None self.global_redundant_expert_num = 0 + self.rm_router_logits = envs_ascend.VLLM_ASCEND_RM_ROUTER_LOGITS ascend_config = get_ascend_config() expert_map_path = ascend_config.expert_map_path @@ -1211,7 +1212,8 @@ def forward(self, is_prefill: bool, enable_force_load_balance: bool = False, top_k: Optional[int] = None, - shared_experts: Optional[Any] = None): + shared_experts: Optional[Any] = None, + gate=None): assert self.quant_method is not None if top_k: @@ -1255,11 +1257,15 @@ def forward(self, hidden_states = nn.functional.pad( hidden_states, (0, 0, 0, max_num_tokens_across_dp - num_tokens)) - router_logits = nn.functional.pad( - router_logits, - (0, 0, 0, max_num_tokens_across_dp - num_tokens)) + if not self.rm_router_logits: + router_logits = nn.functional.pad( + router_logits, + (0, 0, 0, max_num_tokens_across_dp - num_tokens)) hidden_states = get_dp_group().all_gather(hidden_states, 0) - router_logits = get_dp_group().all_gather(router_logits, 0) + if self.rm_router_logits: + router_logits, _ = gate(hidden_states) + else: + router_logits = get_dp_group().all_gather(router_logits, 0) # Matrix multiply. e_hidden_states = self.quant_method.apply( From 04ad4c29f1673427abfb8c3680268fe5ec7894b2 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 24 Jun 2025 22:59:56 +0800 Subject: [PATCH 02/15] update Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/models/deepseek_v2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm_ascend/models/deepseek_v2.py b/vllm_ascend/models/deepseek_v2.py index f415713160..47f4afebea 100644 --- a/vllm_ascend/models/deepseek_v2.py +++ b/vllm_ascend/models/deepseek_v2.py @@ -64,6 +64,7 @@ maybe_prefix) from vllm.sequence import IntermediateTensors +import vllm_ascend.envs as envs_ascend from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.distributed.parallel_state import get_ep_group from vllm_ascend.ops.fused_moe import AscendFusedMoE @@ -71,7 +72,7 @@ from vllm_ascend.quantization.w8a8_dynamic import AscendW8A8DynamicLinearMethod from vllm_ascend.utils import (dispose_tensor, npu_stream_switch, npu_wait_tensor) -import vllm_ascend.envs as envs_ascend + class CustomDeepseekV2SiluAndMul(SiluAndMul): @@ -321,8 +322,7 @@ def forward( top_k=CustomDeepseekV2MoE.top_k, enable_force_load_balance=enable_force_load_balance, shared_experts=self.shared_experts, - gate=self.gate - ) + gate=self.gate) hidden_states = ( experts_hidden_states[0] * self.routed_scaling_factor + From f13442ed505470dc7ce262a135a4b0bd19e4cd29 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Wed, 25 Jun 2025 11:48:08 +0800 Subject: [PATCH 03/15] update Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/envs.py | 3 ++- vllm_ascend/ops/fused_moe.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm_ascend/envs.py b/vllm_ascend/envs.py index 2042992788..08a7665047 100644 --- a/vllm_ascend/envs.py +++ b/vllm_ascend/envs.py @@ -122,7 +122,8 @@ "USE_OPTIMIZED_MODEL": lambda: bool(int(os.getenv('USE_OPTIMIZED_MODEL', '1'))), # Remove the two communications of get_dp_group().all_gather and change it to one, and do gate after the communication - "VLLM_ASCEND_RM_ROUTER_LOGITS": lambda: int(os.getenv("VLLM_ASCEND_RM_ROUTER_LOGITS", 0)), + "VLLM_ASCEND_RM_ROUTER_LOGITS": + lambda: int(os.getenv("VLLM_ASCEND_RM_ROUTER_LOGITS", 0)), } # end-env-vars-definition diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 76fb263bc4..5d560b1985 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1260,7 +1260,8 @@ def forward(self, if not self.rm_router_logits: router_logits = nn.functional.pad( router_logits, - (0, 0, 0, max_num_tokens_across_dp - num_tokens)) + (0, 0, 0, + max_num_tokens_across_dp - num_tokens)) hidden_states = get_dp_group().all_gather(hidden_states, 0) if self.rm_router_logits: router_logits, _ = gate(hidden_states) From 86df0a26706194fd3dc18ebfd8e22606cd937a37 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 8 Jul 2025 12:22:27 +0800 Subject: [PATCH 04/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- examples/run_dp_attention_etp16.sh | 1 - vllm_ascend/envs.py | 3 --- vllm_ascend/models/deepseek_v2.py | 3 --- vllm_ascend/ops/fused_moe.py | 18 +++++++++++++----- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/examples/run_dp_attention_etp16.sh b/examples/run_dp_attention_etp16.sh index 0826357a68..2713212f77 100644 --- a/examples/run_dp_attention_etp16.sh +++ b/examples/run_dp_attention_etp16.sh @@ -4,7 +4,6 @@ source /usr/local/Ascend/ascend-toolkit/set_env.sh source /usr/local/Ascend/nnal/atb/set_env.sh export ASCEND_LAUNCH_BLOCKING=0 export VLLM_VERSION=0.9.0 -export VLLM_ASCEND_RM_ROUTER_LOGITS=1 nohup python -m vllm.entrypoints.openai.api_server --model=/mnt/deepseek/DeepSeek-R1-W8A8-VLLM \ --quantization ascend \ diff --git a/vllm_ascend/envs.py b/vllm_ascend/envs.py index 13fbc3bf12..7bded5747a 100644 --- a/vllm_ascend/envs.py +++ b/vllm_ascend/envs.py @@ -121,9 +121,6 @@ # value to False to disable the optimized model. "USE_OPTIMIZED_MODEL": lambda: bool(int(os.getenv('USE_OPTIMIZED_MODEL', '1'))), - # Remove the two communications of get_dp_group().all_gather and change it to one, and do gate after the communication - "VLLM_ASCEND_RM_ROUTER_LOGITS": - lambda: int(os.getenv("VLLM_ASCEND_RM_ROUTER_LOGITS", 0)), # The tolerance of the kv cache size, if the difference between the # actual kv cache size and the cached kv cache size is less than this value, # then the cached kv cache size will be used. diff --git a/vllm_ascend/models/deepseek_v2.py b/vllm_ascend/models/deepseek_v2.py index 2840e1dc1c..ad768ebbaa 100644 --- a/vllm_ascend/models/deepseek_v2.py +++ b/vllm_ascend/models/deepseek_v2.py @@ -368,7 +368,6 @@ def __init__( self.ep_group = get_ep_group() self.params_dtype = torch.get_default_dtype() - self.rm_router_logits = envs_ascend.VLLM_ASCEND_RM_ROUTER_LOGITS def forward(self, hidden_states: torch.Tensor, @@ -392,8 +391,6 @@ def forward(self, # router_logits: (num_tokens, n_experts) router_logits = None - if not self.rm_router_logits: - router_logits, _ = self.gate(hidden_states) experts_hidden_states = self.experts( hidden_states=hidden_states, diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index e3d8e1087e..a6b249882e 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1148,7 +1148,6 @@ def __init__( self.activation = activation self.log2phy = None self.global_redundant_expert_num = 0 - self.rm_router_logits = envs_ascend.VLLM_ASCEND_RM_ROUTER_LOGITS ascend_config = get_ascend_config() expert_map_path = ascend_config.expert_map_path @@ -1270,6 +1269,12 @@ def forward(self, if not self.enable_multistream_moe or fused_moe_state != FusedMoEState.MC2: shared_hidden_states = shared_experts(hidden_states) + if fused_moe_state not in [ + FusedMoEState.AllGather, FusedMoEState.AllGatherEP, + FusedMoEState.NaiveMulticast + ]: + router_logits, _ = gate(hidden_states) + tp_size = get_tensor_model_parallel_world_size() if (tp_size > 1 and fused_moe_state not in [ FusedMoEState.AllGather, FusedMoEState.AllGatherEP, @@ -1291,7 +1296,7 @@ def forward(self, router_logits = chunk_router_logits[tp_rank] if self.dp_size > 1: - if fused_moe_state == FusedMoEState.AllGather: + if (fused_moe_state == FusedMoEState.AllGather or fused_moe_state == FusedMoEState.AllGatherEP): # NOTE: When in torchair graph, it has been padded in model_runner_v1 if not self.torchair_graph_enabled: attn_metadata = get_forward_context().attn_metadata @@ -1302,13 +1307,13 @@ def forward(self, hidden_states, (0, 0, 0, max_num_tokens_across_dp - num_tokens)) - if not self.rm_router_logits: + if not is_deepseek_v3_r1: router_logits = nn.functional.pad( router_logits, (0, 0, 0, max_num_tokens_across_dp - num_tokens)) hidden_states = get_dp_group().all_gather(hidden_states, 0) - if self.rm_router_logits: + if is_deepseek_v3_r1: router_logits, _ = gate(hidden_states) else: router_logits = get_dp_group().all_gather(router_logits, 0) @@ -1318,7 +1323,10 @@ def forward(self, ).dp_metadata.cu_tokens_across_dp_cpu hidden_states = self.naive_multicast(hidden_states, cu_tokens_across_dp_cpu) - router_logits = self.naive_multicast(router_logits, + if is_deepseek_v3_r1: + router_logits, _ = gate(hidden_states) + else: + router_logits = self.naive_multicast(router_logits, cu_tokens_across_dp_cpu) From 2f77bc9e4d6d2fa269394348dd63ab8405c6c125 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 8 Jul 2025 12:24:18 +0800 Subject: [PATCH 05/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/models/deepseek_v2.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm_ascend/models/deepseek_v2.py b/vllm_ascend/models/deepseek_v2.py index ad768ebbaa..aaf99058d5 100644 --- a/vllm_ascend/models/deepseek_v2.py +++ b/vllm_ascend/models/deepseek_v2.py @@ -69,7 +69,6 @@ make_empty_intermediate_tensors_factory, make_layers, maybe_prefix) from vllm.sequence import IntermediateTensors -import vllm_ascend.envs as envs_ascend from vllm_ascend.ascend_config import get_ascend_config from vllm_ascend.distributed.parallel_state import get_ep_group from vllm_ascend.ops.fused_moe import AscendFusedMoE From 6f18307ba37893a14947eedcc5f8ddafa23b6f95 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 8 Jul 2025 13:54:21 +0800 Subject: [PATCH 06/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/ops/fused_moe.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index a6b249882e..3a31215bd9 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1273,7 +1273,7 @@ def forward(self, FusedMoEState.AllGather, FusedMoEState.AllGatherEP, FusedMoEState.NaiveMulticast ]: - router_logits, _ = gate(hidden_states) + router_logits, _ = gate(hidden_states) tp_size = get_tensor_model_parallel_world_size() if (tp_size > 1 and fused_moe_state not in [ @@ -1296,7 +1296,8 @@ def forward(self, router_logits = chunk_router_logits[tp_rank] if self.dp_size > 1: - if (fused_moe_state == FusedMoEState.AllGather or fused_moe_state == FusedMoEState.AllGatherEP): + if (fused_moe_state == FusedMoEState.AllGather + or fused_moe_state == FusedMoEState.AllGatherEP): # NOTE: When in torchair graph, it has been padded in model_runner_v1 if not self.torchair_graph_enabled: attn_metadata = get_forward_context().attn_metadata @@ -1311,7 +1312,7 @@ def forward(self, router_logits = nn.functional.pad( router_logits, (0, 0, 0, - max_num_tokens_across_dp - num_tokens)) + max_num_tokens_across_dp - num_tokens)) hidden_states = get_dp_group().all_gather(hidden_states, 0) if is_deepseek_v3_r1: router_logits, _ = gate(hidden_states) @@ -1326,9 +1327,8 @@ def forward(self, if is_deepseek_v3_r1: router_logits, _ = gate(hidden_states) else: - router_logits = self.naive_multicast(router_logits, - cu_tokens_across_dp_cpu) - + router_logits = self.naive_multicast( + router_logits, cu_tokens_across_dp_cpu) # Matrix multiply. e_hidden_states = self.quant_method.apply( From cb15e059bb621978af2b74fb5174751ddf4ae26d Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 8 Jul 2025 15:29:05 +0800 Subject: [PATCH 07/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/ops/fused_moe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 3a31215bd9..92efbddd66 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1269,10 +1269,10 @@ def forward(self, if not self.enable_multistream_moe or fused_moe_state != FusedMoEState.MC2: shared_hidden_states = shared_experts(hidden_states) - if fused_moe_state not in [ + if (fused_moe_state not in [ FusedMoEState.AllGather, FusedMoEState.AllGatherEP, FusedMoEState.NaiveMulticast - ]: + ] or not is_deepseek_v3_r1): router_logits, _ = gate(hidden_states) tp_size = get_tensor_model_parallel_world_size() From d8755c9fb3b96c491fef667daf1196105e81ec81 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 8 Jul 2025 15:54:24 +0800 Subject: [PATCH 08/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/ops/fused_moe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 92efbddd66..35b4c9a9d5 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1272,7 +1272,7 @@ def forward(self, if (fused_moe_state not in [ FusedMoEState.AllGather, FusedMoEState.AllGatherEP, FusedMoEState.NaiveMulticast - ] or not is_deepseek_v3_r1): + ] or not is_deepseek_v3_r1): router_logits, _ = gate(hidden_states) tp_size = get_tensor_model_parallel_world_size() From e0c36a836707514aa7410204d70f45f14185859f Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 8 Jul 2025 15:58:51 +0800 Subject: [PATCH 09/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/ops/fused_moe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 35b4c9a9d5..6d466f7afe 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1269,10 +1269,10 @@ def forward(self, if not self.enable_multistream_moe or fused_moe_state != FusedMoEState.MC2: shared_hidden_states = shared_experts(hidden_states) - if (fused_moe_state not in [ + if not (fused_moe_state in [ FusedMoEState.AllGather, FusedMoEState.AllGatherEP, FusedMoEState.NaiveMulticast - ] or not is_deepseek_v3_r1): + ] and is_deepseek_v3_r1): router_logits, _ = gate(hidden_states) tp_size = get_tensor_model_parallel_world_size() From 9e15f42e7a8b204d4544ae2f2b5725baf356aa9d Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 8 Jul 2025 18:07:54 +0800 Subject: [PATCH 10/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/ops/fused_moe.py | 39 ++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 6d466f7afe..81fd5eb17c 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -1269,12 +1269,6 @@ def forward(self, if not self.enable_multistream_moe or fused_moe_state != FusedMoEState.MC2: shared_hidden_states = shared_experts(hidden_states) - if not (fused_moe_state in [ - FusedMoEState.AllGather, FusedMoEState.AllGatherEP, - FusedMoEState.NaiveMulticast - ] and is_deepseek_v3_r1): - router_logits, _ = gate(hidden_states) - tp_size = get_tensor_model_parallel_world_size() if (tp_size > 1 and fused_moe_state not in [ FusedMoEState.AllGather, FusedMoEState.AllGatherEP, @@ -1296,8 +1290,7 @@ def forward(self, router_logits = chunk_router_logits[tp_rank] if self.dp_size > 1: - if (fused_moe_state == FusedMoEState.AllGather - or fused_moe_state == FusedMoEState.AllGatherEP): + if fused_moe_state == FusedMoEState.AllGather: # NOTE: When in torchair graph, it has been padded in model_runner_v1 if not self.torchair_graph_enabled: attn_metadata = get_forward_context().attn_metadata @@ -1308,27 +1301,29 @@ def forward(self, hidden_states, (0, 0, 0, max_num_tokens_across_dp - num_tokens)) - if not is_deepseek_v3_r1: - router_logits = nn.functional.pad( - router_logits, - (0, 0, 0, - max_num_tokens_across_dp - num_tokens)) + # if not is_deepseek_v3_r1: + # router_logits = nn.functional.pad( + # router_logits, + # (0, 0, 0, + # max_num_tokens_across_dp - num_tokens)) hidden_states = get_dp_group().all_gather(hidden_states, 0) - if is_deepseek_v3_r1: - router_logits, _ = gate(hidden_states) - else: - router_logits = get_dp_group().all_gather(router_logits, 0) + router_logits, _ = gate(hidden_states) + # if is_deepseek_v3_r1: + # router_logits, _ = gate(hidden_states) + # else: + # router_logits = get_dp_group().all_gather(router_logits, 0) elif fused_moe_state == FusedMoEState.NaiveMulticast: cu_tokens_across_dp_cpu = get_forward_context( ).dp_metadata.cu_tokens_across_dp_cpu hidden_states = self.naive_multicast(hidden_states, cu_tokens_across_dp_cpu) - if is_deepseek_v3_r1: - router_logits, _ = gate(hidden_states) - else: - router_logits = self.naive_multicast( - router_logits, cu_tokens_across_dp_cpu) + router_logits, _ = gate(hidden_states) + # if is_deepseek_v3_r1: + # router_logits, _ = gate(hidden_states) + # else: + # router_logits = self.naive_multicast( + # router_logits, cu_tokens_across_dp_cpu) # Matrix multiply. e_hidden_states = self.quant_method.apply( From a595a67b199875512ecc8b3a305934ace0c99b93 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Tue, 8 Jul 2025 19:39:39 +0800 Subject: [PATCH 11/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/models/deepseek_v2.py | 3 +++ vllm_ascend/ops/fused_moe.py | 39 +++++++++++++++++-------------- vllm_ascend/utils.py | 16 +++++++++++++ 3 files changed, 40 insertions(+), 18 deletions(-) diff --git a/vllm_ascend/models/deepseek_v2.py b/vllm_ascend/models/deepseek_v2.py index aaf99058d5..6a2b337cc1 100644 --- a/vllm_ascend/models/deepseek_v2.py +++ b/vllm_ascend/models/deepseek_v2.py @@ -367,6 +367,7 @@ def __init__( self.ep_group = get_ep_group() self.params_dtype = torch.get_default_dtype() + self.rm_router_logits = self.experts.rm_router_logits def forward(self, hidden_states: torch.Tensor, @@ -390,6 +391,8 @@ def forward(self, # router_logits: (num_tokens, n_experts) router_logits = None + if not self.rm_router_logits: + router_logits, _ = self.gate(hidden_states) experts_hidden_states = self.experts( hidden_states=hidden_states, diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 81fd5eb17c..b34197b284 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -38,8 +38,9 @@ from vllm_ascend.distributed.parallel_state import get_ep_group, get_etp_group from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer from vllm_ascend.utils import (FusedMoEState, dispose_tensor, - get_fused_moe_state, is_310p, npu_stream_switch, - npu_wait_tensor, vllm_version_is) + get_fused_moe_state, get_rm_router_logits_state, + is_310p, npu_stream_switch, npu_wait_tensor, + vllm_version_is) if vllm_version_is("0.9.1"): from vllm.model_executor.layers.fused_moe.layer import \ @@ -1149,6 +1150,10 @@ def __init__( self.log2phy = None self.global_redundant_expert_num = 0 + is_deepseek_v3_r1 = self.global_num_experts == 256 + self.rm_router_logits = get_rm_router_logits_state( + self.moe_parallel_config.ep_size, self.dp_size, is_deepseek_v3_r1) + ascend_config = get_ascend_config() expert_map_path = ascend_config.expert_map_path if expert_map_path and os.path.exists(expert_map_path): @@ -1301,29 +1306,27 @@ def forward(self, hidden_states, (0, 0, 0, max_num_tokens_across_dp - num_tokens)) - # if not is_deepseek_v3_r1: - # router_logits = nn.functional.pad( - # router_logits, - # (0, 0, 0, - # max_num_tokens_across_dp - num_tokens)) + if not self.rm_router_logits: + router_logits = nn.functional.pad( + router_logits, + (0, 0, 0, + max_num_tokens_across_dp - num_tokens)) hidden_states = get_dp_group().all_gather(hidden_states, 0) - router_logits, _ = gate(hidden_states) - # if is_deepseek_v3_r1: - # router_logits, _ = gate(hidden_states) - # else: - # router_logits = get_dp_group().all_gather(router_logits, 0) + if self.rm_router_logits: + router_logits, _ = gate(hidden_states) + else: + router_logits = get_dp_group().all_gather(router_logits, 0) elif fused_moe_state == FusedMoEState.NaiveMulticast: cu_tokens_across_dp_cpu = get_forward_context( ).dp_metadata.cu_tokens_across_dp_cpu hidden_states = self.naive_multicast(hidden_states, cu_tokens_across_dp_cpu) - router_logits, _ = gate(hidden_states) - # if is_deepseek_v3_r1: - # router_logits, _ = gate(hidden_states) - # else: - # router_logits = self.naive_multicast( - # router_logits, cu_tokens_across_dp_cpu) + if self.rm_router_logits: + router_logits, _ = gate(hidden_states) + else: + router_logits = self.naive_multicast( + router_logits, cu_tokens_across_dp_cpu) # Matrix multiply. e_hidden_states = self.quant_method.apply( diff --git a/vllm_ascend/utils.py b/vllm_ascend/utils.py index 448bf42f7a..adcb94b591 100644 --- a/vllm_ascend/utils.py +++ b/vllm_ascend/utils.py @@ -425,6 +425,22 @@ class FusedMoEState(Enum): NaiveMulticast = 4 +# TODO(ttanzhiqiang): rm_router_logits +# dp>1 will trigger +# In theory, this solution is only applicable to AllGather and AllGatherEP, because in the dp scenario, the previous operation was gate + two communications, and now it is changed to one communication + gate operation, which can save some communication time. In theory, all moe AllGather and AllGatherEP solutions can follow this logic, but now other moe models (qwen3-235b) dp solutions are not adjusted, so use the switch to control it to prevent code errors. +def get_rm_router_logits_state(ep_size: int, dp_size: int, + is_deepseek_v3_r1: bool): + # the fusion operator torch_npu.npu_grouped_matmul_finalize_routing called by allgather ep + # only supports deepseek v3/r1 + if dp_size > 1: + if (envs.VLLM_ENABLE_FUSED_EXPERTS_ALLGATHER_EP and ep_size > 1 + and is_deepseek_v3_r1): + return True + elif ep_size == 1 and is_deepseek_v3_r1: + return True + return False + + # TODO(zzzzwwjj): add soc_version to choose branch def get_fused_moe_state(ep_size: int, with_prefill: bool, is_deepseek_v3_r1: bool): From fa50f6abdef3a502bef9352eab11004de3989f98 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Wed, 9 Jul 2025 10:57:27 +0800 Subject: [PATCH 12/15] deepseekv3/r1 support rm_router_logits in [AllGatherEP, AllGather, NaiveMulticast] Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/ops/fused_moe.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 139bd14e06..3c5ba21d48 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -45,7 +45,6 @@ get_fused_moe_state, get_rm_router_logits_state, is_310p, npu_stream_switch, npu_wait_tensor) - MOE_ALL2ALL_BUFFER: bool = envs_ascend.MOE_ALL2ALL_BUFFER From e4fc29f5679d9dbf91e2fc493b1f5fe54487aa7d Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Wed, 9 Jul 2025 12:07:57 +0800 Subject: [PATCH 13/15] Empty submission Signed-off-by: ttanzhiqiang <389825161@qq.com> From a0be155df3c560c12d52fee3a353b3b81ca55a76 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Thu, 10 Jul 2025 10:55:26 +0800 Subject: [PATCH 14/15] Empty submission Signed-off-by: ttanzhiqiang <389825161@qq.com> From af900cc02a5b43948891f5fe1ac50a312ed7e767 Mon Sep 17 00:00:00 2001 From: ttanzhiqiang <389825161@qq.com> Date: Thu, 10 Jul 2025 22:22:15 +0800 Subject: [PATCH 15/15] update Signed-off-by: ttanzhiqiang <389825161@qq.com> --- vllm_ascend/ops/fused_moe.py | 5 +++-- vllm_ascend/utils.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/vllm_ascend/ops/fused_moe.py b/vllm_ascend/ops/fused_moe.py index 377f810f4e..9e6ca13cb1 100644 --- a/vllm_ascend/ops/fused_moe.py +++ b/vllm_ascend/ops/fused_moe.py @@ -44,8 +44,9 @@ from vllm_ascend.distributed.parallel_state import get_ep_group, get_etp_group from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer from vllm_ascend.utils import (FusedMoEState, dispose_tensor, - get_all_reduce_merge_state, get_fused_moe_state, get_rm_router_logits_state, - is_310p, npu_stream_switch, npu_wait_tensor) + get_all_reduce_merge_state, get_fused_moe_state, + get_rm_router_logits_state, is_310p, + npu_stream_switch, npu_wait_tensor) MOE_ALL2ALL_BUFFER: bool = envs_ascend.MOE_ALL2ALL_BUFFER diff --git a/vllm_ascend/utils.py b/vllm_ascend/utils.py index a9584adfa2..d2e61586c1 100644 --- a/vllm_ascend/utils.py +++ b/vllm_ascend/utils.py @@ -425,7 +425,6 @@ class FusedMoEState(Enum): NaiveMulticast = 4 - # TODO(ttanzhiqiang): rm_router_logits # dp>1 will trigger # In theory, this solution is only applicable to AllGather and AllGatherEP, because in the dp scenario, the previous operation was gate + two communications, and now it is changed to one communication + gate operation, which can save some communication time. In theory, all moe AllGather and AllGatherEP solutions can follow this logic, but now other moe models (qwen3-235b) dp solutions are not adjusted, so use the switch to control it to prevent code errors. @@ -440,7 +439,8 @@ def get_rm_router_logits_state(ep_size: int, dp_size: int, elif ep_size == 1 and is_deepseek_v3_r1: return True return False - + + # TODO(ttanzhiqiang): all_reduce merge # When all_reduce_merge is in progress, shared_experts does not do all_reduce in mlp, but waits until shared_experts+router_experts are completed before doing all_reduce # Currently, all_reduce_merge is enabled by default in the AllGather, AllGatherEP and NaiveMulticast scenarios of the deepseek model. @@ -454,6 +454,7 @@ def get_all_reduce_merge_state(ep_size: int, is_deepseek_v3_r1: bool): return True return False + # TODO(zzzzwwjj): add soc_version to choose branch def get_fused_moe_state(ep_size: int, with_prefill: bool, is_deepseek_v3_r1: bool):