-
Notifications
You must be signed in to change notification settings - Fork 279
[0.9.1][Feature]Moe alltoallv communication optimization for unquantized RL training sence & alltoallv support dpo #1547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 40 commits
7ff288e
6d7b5b4
6a8e1a9
4805c5a
d68ce07
0aff693
f6ab19e
a94c094
91570d8
e7c0d2d
47439e8
cf3f1c8
a4126f3
807aaf0
6f6efc1
305a0eb
5411ed6
3f88769
854c149
d0bd006
49e9771
a9bccf8
e31a7df
0a22312
ee1dd49
eef1093
eb54e22
b02ad40
66807e0
d24758e
d76c4fb
f883902
d5656f4
df52070
adf3f74
5956ef0
af85566
d4ad734
847d52d
3b7269a
deb4319
8b369df
a8b3e15
d290b7d
2c102d3
565fa2d
f980ad0
969ee25
a70be9a
1f09708
402f889
141407d
b1d7305
62cebe1
80b1d0d
e87df11
267db60
b0572c8
e4f1050
eaed83d
b97baf4
d232d49
1e435e6
8effdd0
a8136b7
d29deae
94b7b5b
c2f670d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. | ||
# Copyright 2023 The vLLM team. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# This file is a part of the vllm-ascend project. | ||
# | ||
""" | ||
Compare the outputs of vLLM with and without aclgraph. | ||
Run `pytest tests/multicard/test_data_parallel.py`. | ||
""" | ||
|
||
import os | ||
import subprocess | ||
import sys | ||
from unittest.mock import patch | ||
|
||
import pytest | ||
|
||
MODELS = ["vllm-ascend/Qwen3-30B-A3B-Puring"] | ||
|
||
|
||
@pytest.mark.parametrize("model", MODELS) | ||
@pytest.mark.parametrize("max_tokens", [32]) | ||
@patch.dict( | ||
os.environ, { | ||
"ASCEND_RT_VISIBLE_DEVICES": "0,1,2,3", | ||
"VLLM_ASCEND_ENABLE_MOE_ALL2ALL_SEQ": "1", | ||
"VLLM_ASCEND_ENABLE_DBO": "1" | ||
}) | ||
def test_qwen3_moe_inference(model, max_tokens): | ||
script = "examples/offline_data_parallel.py" | ||
|
||
env = os.environ.copy() | ||
|
||
cmd = [ | ||
sys.executable, | ||
script, | ||
"--model", | ||
model, | ||
"--dp-size", | ||
"2", | ||
"--tp-size", | ||
"2", | ||
"--node-size", | ||
"1", | ||
"--node-rank", | ||
"0", | ||
"--trust-remote-code", | ||
"--enforce-eager", | ||
] | ||
|
||
print(f"Running subprocess: {' '.join(cmd)}") | ||
proc = subprocess.run(cmd, | ||
env=env, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.STDOUT, | ||
timeout=600) | ||
output = proc.stdout.decode() | ||
|
||
print(output) | ||
|
||
assert "DP rank 0 needs to process" in output | ||
assert "DP rank 1 needs to process" in output | ||
assert "Generated text:" in output | ||
assert proc.returncode == 0 |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -0,0 +1,127 @@ | ||||
# SPDX-License-Identifier: Apache-2.0 | ||||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project | ||||
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. | ||||
import importlib | ||||
from unittest.mock import MagicMock, patch | ||||
|
||||
import pytest | ||||
import torch | ||||
|
||||
from vllm_ascend.distributed.tensor_parallel import ( | ||||
_gather_along_first_dim, _gather_along_last_dim, | ||||
_reduce_scatter_along_first_dim, _reduce_scatter_along_last_dim, | ||||
all_to_all_hp2sp, all_to_all_sp2hp) | ||||
|
||||
|
||||
@pytest.fixture | ||||
def test_tensor(): | ||||
return torch.randn(8, 16) | ||||
|
||||
|
||||
@pytest.fixture | ||||
def test_tensor_last_dim(): | ||||
return torch.randn(8, 16, 32) | ||||
|
||||
|
||||
@pytest.fixture | ||||
def mock_group(): | ||||
return MagicMock() | ||||
|
||||
|
||||
@pytest.fixture(autouse=True) | ||||
def mock_dist(): | ||||
with patch("torch.distributed") as mock: | ||||
mock.get_world_size.return_value = 4 | ||||
mock.get_rank.return_value = 0 | ||||
yield mock | ||||
|
||||
|
||||
class TestDistributedCommunication: | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to inherits from testbase: Line 25 in ee40d3d
to make sure the test will not be impacted in future patch There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ut/base.py is removed in v0.9.1-dev, we inherits from unittest.TestCase instead. |
||||
|
||||
@pytest.mark.parametrize("world_size", [1, 4]) | ||||
def test_gather_along_first_dim(self, test_tensor, mock_group, mock_dist, | ||||
world_size): | ||||
"""test _gather_along_first_dim""" | ||||
mock_dist.get_world_size.return_value = world_size | ||||
|
||||
result = _gather_along_first_dim(test_tensor, mock_group) | ||||
|
||||
if world_size == 1: | ||||
assert torch.equal(result, test_tensor) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use self.assertXXX also same for other changes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||
else: | ||||
assert result.shape == (32, 16) # 8*4=32 | ||||
|
||||
def test_gather_along_first_dim_unequal_split(self, test_tensor, | ||||
mock_group): | ||||
"""test unequal split""" | ||||
output_split_sizes = [5, 10, 15, 2] | ||||
result = _gather_along_first_dim(test_tensor, mock_group, | ||||
output_split_sizes) | ||||
assert result.shape == (32, 16) # 5+10+15+2=32 | ||||
|
||||
@pytest.mark.parametrize("world_size", [1, 4]) | ||||
def test_gather_along_last_dim(self, test_tensor_last_dim, mock_group, | ||||
mock_dist, world_size): | ||||
"""test _gather_along_last_dim""" | ||||
mock_dist.get_world_size.return_value = world_size | ||||
|
||||
result = _gather_along_last_dim(test_tensor_last_dim, mock_group) | ||||
|
||||
if world_size == 1: | ||||
assert torch.equal(result, test_tensor_last_dim) | ||||
else: | ||||
assert result.shape == (8, 16, 32 * world_size) # 8*4=32 | ||||
|
||||
@pytest.mark.parametrize("input_shape,expected_shape", [ | ||||
((32, 16), (8, 16)), | ||||
((40, 10), (10, 10)), | ||||
]) | ||||
def test_reduce_scatter_along_first_dim(self, mock_group, input_shape, | ||||
expected_shape): | ||||
input_tensor = torch.randn(*input_shape) | ||||
result = _reduce_scatter_along_first_dim(input_tensor, mock_group) | ||||
assert result.shape == expected_shape | ||||
|
||||
def test_reduce_scatter_along_last_dim(self, mock_group): | ||||
input_tensor = torch.randn(8, 16, 32) | ||||
result = _reduce_scatter_along_last_dim(input_tensor, mock_group) | ||||
assert result.shape == (8, 16, 8) # 32/4=8 | ||||
|
||||
@pytest.mark.parametrize("func,input_shape,expected_shape", [ | ||||
("all_gather_last_dim_from_tensor_parallel_region", (8, 16, 32), | ||||
(8, 16, 128)), | ||||
("reduce_scatter_to_sequence_parallel_region", (32, 16), (8, 16)), | ||||
("reduce_scatter_last_dim_to_tensor_parallel_region", (8, 16, 32), | ||||
(8, 16, 8)), | ||||
("gather_from_sequence_parallel_region", (8, 16), (32, 16)), | ||||
]) | ||||
def test_wrapper_functions(self, mock_group, func, input_shape, | ||||
expected_shape): | ||||
"""test wrapper funcs""" | ||||
mod = importlib.import_module( | ||||
'vllm_ascend.distributed.tensor_parallel') | ||||
globals = mod.__dict__ | ||||
test_func = globals[func] | ||||
input_tensor = torch.randn(*input_shape) | ||||
result = test_func(input_tensor, mock_group) | ||||
assert result.shape == expected_shape | ||||
|
||||
@pytest.mark.parametrize( | ||||
"input_shape,output_shape", | ||||
[ | ||||
((8, 16), (32, 4)), # [num_tokens/TP, H] -> [num_tokens, H/TP] | ||||
]) | ||||
def test_all_to_all_sp2hp(self, mock_group, input_shape, output_shape): | ||||
input_tensor = torch.randn(*input_shape) | ||||
result = all_to_all_sp2hp(input_tensor, mock_group) | ||||
assert result.shape == output_shape | ||||
|
||||
@pytest.mark.parametrize( | ||||
"input_shape,output_shape", | ||||
[ | ||||
((32, 4), (8, 16)), # [num_tokens, H/TP] -> [num_tokens/TP, H] | ||||
]) | ||||
def test_all_to_all_hp2sp(self, mock_group, input_shape, output_shape): | ||||
input_tensor = torch.randn(*input_shape) | ||||
result = all_to_all_hp2sp(input_tensor, mock_group) | ||||
assert result.shape == output_shape |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
# SPDX-License-Identifier: Apache-2.0 | ||
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project | ||
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved. | ||
import math | ||
|
||
import pytest | ||
import torch | ||
|
||
from vllm_ascend.ops.moe_dispatcher.moe_utils import ( | ||
get_capacity, group_limited_topk, permute, sort_chunks_by_idxs, | ||
topk_softmax_with_capacity, unpermute) | ||
|
||
import vllm_ascend.patch.worker.patch_common.patch_utils # type: ignore[import] # isort: skip # noqa | ||
|
||
|
||
|
||
class TestMoeUtils: | ||
|
||
@pytest.fixture | ||
def setup(self): | ||
self.num_tokens = 16 | ||
self.num_experts = 4 | ||
self.hidden_size = 8 | ||
self.topk = 2 | ||
self.capacity_factor = 1.0 | ||
self.group_topk = 2 | ||
self.num_groups = 2 | ||
self.scaling_factor = 1.0 | ||
|
||
def test_group_limited_topk(self, setup): | ||
# Test group-limited topk routing | ||
scores = torch.randn(self.num_tokens, self.num_experts) | ||
probs, indices = group_limited_topk(scores, | ||
topk=self.topk, | ||
num_tokens=self.num_tokens, | ||
num_experts=self.num_experts, | ||
num_groups=self.num_groups, | ||
group_topk=self.group_topk) | ||
|
||
assert probs.shape == (self.num_tokens, self.topk) | ||
assert indices.shape == (self.num_tokens, self.topk) | ||
assert torch.all(indices < self.num_experts) | ||
|
||
@pytest.mark.parametrize("score_function", ["softmax"]) | ||
def test_topk_softmax_with_capacity(self, setup, score_function): | ||
# Test topk softmax with capacity | ||
logits = torch.randn(self.num_tokens, self.num_experts) | ||
|
||
# Test without capacity | ||
probs, routing_map, tokens_per_expert, top_indices = topk_softmax_with_capacity( | ||
logits, topk=self.topk, score_function=score_function) | ||
assert probs.shape == (self.num_tokens, self.num_experts) | ||
assert routing_map.shape == (self.num_tokens, self.num_experts) | ||
assert tokens_per_expert.shape == (self.num_experts, ) | ||
|
||
# Test with group routing | ||
probs, routing_map, tokens_per_expert, top_indices = topk_softmax_with_capacity( | ||
logits, | ||
topk=self.topk, | ||
num_groups=self.num_groups, | ||
group_topk=self.group_topk, | ||
score_function=score_function) | ||
assert probs.shape == (self.num_tokens, self.num_experts) | ||
|
||
def test_get_capacity(self, setup): | ||
# Test capacity calculation | ||
capacity = get_capacity(num_tokens=self.num_tokens, | ||
num_experts=self.num_experts, | ||
capacity_factor=self.capacity_factor) | ||
expected = math.ceil( | ||
(self.num_tokens / self.num_experts) * self.capacity_factor) | ||
assert capacity == expected | ||
|
||
# Test with min capacity | ||
min_capacity = 5 | ||
capacity = get_capacity(num_tokens=self.num_tokens, | ||
num_experts=self.num_experts, | ||
capacity_factor=self.capacity_factor, | ||
min_capacity=min_capacity) | ||
assert capacity == min_capacity | ||
|
||
def test_permute(self, setup): | ||
# Test token permutation | ||
tokens = torch.randn(self.num_tokens, self.hidden_size) | ||
routing_map = torch.randint( | ||
0, 2, (self.num_tokens, self.num_experts)).bool() | ||
|
||
# Basic permutation | ||
permuted_tokens, sorted_indices = permute(tokens, routing_map) | ||
assert permuted_tokens.shape[0] == routing_map.sum() | ||
assert sorted_indices.shape[0] == routing_map.sum() | ||
|
||
# With drop and pad | ||
capacity = get_capacity(num_tokens=self.num_tokens * self.topk, | ||
num_experts=self.num_experts, | ||
capacity_factor=self.capacity_factor) | ||
num_out_tokens = capacity * self.num_experts | ||
permuted_tokens, sorted_indices = permute( | ||
tokens, | ||
routing_map, | ||
num_out_tokens=num_out_tokens, | ||
drop_and_pad=True) | ||
assert permuted_tokens.shape[0] == num_out_tokens | ||
assert sorted_indices.shape[0] == num_out_tokens | ||
|
||
def test_unpermute(self, setup): | ||
# Test token unpermutation | ||
tokens = torch.randn(self.num_tokens, self.hidden_size) | ||
routing_map = torch.randint( | ||
0, 2, (self.num_tokens, self.num_experts)).bool() | ||
probs = torch.rand(self.num_tokens, self.num_experts) | ||
|
||
# First permute | ||
permuted_tokens, sorted_indices = permute(tokens, routing_map) | ||
|
||
# Then unpermute | ||
restored_tokens = unpermute(permuted_tokens, | ||
sorted_indices, | ||
tokens.shape, | ||
probs=probs, | ||
routing_map=routing_map) | ||
assert restored_tokens.shape == tokens.shape | ||
|
||
# With drop and pad | ||
capacity = get_capacity(num_tokens=self.num_tokens * self.topk, | ||
num_experts=self.num_experts, | ||
capacity_factor=self.capacity_factor) | ||
num_out_tokens = capacity * self.num_experts | ||
permuted_tokens, sorted_indices = permute( | ||
tokens, | ||
routing_map, | ||
num_out_tokens=num_out_tokens, | ||
drop_and_pad=True) | ||
restored_tokens = unpermute(permuted_tokens, | ||
sorted_indices, | ||
tokens.shape, | ||
probs=probs, | ||
routing_map=routing_map, | ||
drop_and_pad=True) | ||
assert restored_tokens.shape == tokens.shape | ||
|
||
def test_sort_chunks_by_idxs(self, setup): | ||
# Test chunk sorting | ||
input_tensor = torch.randn(10, self.hidden_size) | ||
split_sizes = torch.tensor([3, 2, 5]) | ||
sorted_idxs = torch.tensor([2, 0, 1]) | ||
|
||
output = sort_chunks_by_idxs(input_tensor, split_sizes, sorted_idxs) | ||
assert output.shape == input_tensor.shape | ||
|
||
# Verify the order is correct | ||
expected = torch.cat( | ||
[input_tensor[5:], input_tensor[0:3], input_tensor[3:5]]) | ||
assert torch.allclose(output, expected) | ||
|
||
@pytest.mark.parametrize("score_function", ["softmax"]) | ||
def test_score_functions(self, setup, score_function): | ||
# Test different score functions | ||
logits = torch.randn(self.num_tokens, self.num_experts) | ||
expert_bias = torch.randn(self.num_experts) | ||
|
||
probs, routing_map, tokens_per_expert, top_indices = topk_softmax_with_capacity( | ||
logits, | ||
topk=self.topk, | ||
score_function=score_function, | ||
expert_bias=expert_bias) | ||
assert probs.shape == (self.num_tokens, self.num_experts) | ||
assert routing_map.shape == (self.num_tokens, self.num_experts) | ||
assert tokens_per_expert.shape == (self.num_experts, ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong header
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed