Skip to content

Commit b5b9192

Browse files
committed
eep basic
1 parent 7c12a76 commit b5b9192

File tree

22 files changed

+1544
-65
lines changed

22 files changed

+1544
-65
lines changed

experimental/bench.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#!/bin/bash
2+
3+
MODEL_NAME="deepseek-ai/DeepSeek-V2-Lite-Chat"
4+
HOST="localhost"
5+
PORT=8006
6+
7+
vllm bench serve \
8+
--model $MODEL_NAME \
9+
--host $HOST \
10+
--port $PORT \
11+
--num-prompts 5

experimental/nvshmem.patch

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
From 18c0599c2f07ec965132efa25961dc8179c2dda3 Mon Sep 17 00:00:00 2001
2+
From: Yongji Wu <wuyongji317@gmail.com>
3+
Date: Tue, 20 May 2025 13:41:12 -0700
4+
Subject: [PATCH] fix reinit issues due to states not cleaned up
5+
6+
fix double free
7+
---
8+
src/host/init/init.cu | 10 ++++++++++
9+
.../internal/host/nvshmemi_mem_transport.hpp | 15 +++++++++++++++
10+
src/modules/bootstrap/uid/bootstrap_uid.cpp | 5 +++++
11+
3 files changed, 30 insertions(+)
12+
13+
diff --git a/src/host/init/init.cu b/src/host/init/init.cu
14+
index b1c5dbf..1fecb4b 100644
15+
--- a/src/host/init/init.cu
16+
+++ b/src/host/init/init.cu
17+
@@ -43,6 +43,8 @@
18+
#include "internal/host/nvshmemi_types.h"
19+
#include "internal/host/shared_memory.h"
20+
#include "internal/host/nvshmemi_symmetric_heap.hpp"
21+
+// eep-dev
22+
+#include "internal/host/nvshmemi_mem_transport.hpp"
23+
24+
extern __constant__ nvshmemi_device_host_state_t nvshmemi_device_state_d;
25+
static std::map<void *, int> registered_device_states;
26+
@@ -1293,6 +1295,14 @@ void nvshmemid_hostlib_finalize(void *device_ctx, void *transport_device_ctx) {
27+
/* Multi-init Multi-fini*/
28+
nvshmemi_state = NULL;
29+
nvshmemi_device_state.nvshmemi_is_nvshmem_initialized = 0;
30+
+
31+
+ // eep-dev
32+
+ nvshmemi_mem_p2p_transport::destroy_instance();
33+
+ nvshmemi_mem_remote_transport::destroy_instance();
34+
+ free(nvshmemi_default_session);
35+
+ nvshmemi_default_session = nullptr;
36+
+ nvshmemi_device_state.nvshmemi_is_nvshmem_bootstrapped = false;
37+
+
38+
nvshmemi_is_device_state_ready = false;
39+
} else
40+
nvshmemi_boot_handle.barrier(&nvshmemi_boot_handle);
41+
diff --git a/src/include/internal/host/nvshmemi_mem_transport.hpp b/src/include/internal/host/nvshmemi_mem_transport.hpp
42+
index 2495844..e4f408a 100644
43+
--- a/src/include/internal/host/nvshmemi_mem_transport.hpp
44+
+++ b/src/include/internal/host/nvshmemi_mem_transport.hpp
45+
@@ -36,6 +36,13 @@ class nvshmemi_mem_p2p_transport final {
46+
return p2p_objref_;
47+
}
48+
}
49+
+ // eep-dev
50+
+ static void destroy_instance(void) {
51+
+ if (p2p_objref_ != nullptr) {
52+
+ delete p2p_objref_;
53+
+ p2p_objref_ = nullptr;
54+
+ }
55+
+ }
56+
57+
void print_mem_handle(int pe_id, int transport_idx, nvshmemi_symmetric_heap &obj);
58+
59+
@@ -87,6 +94,14 @@ class nvshmemi_mem_remote_transport final {
60+
}
61+
}
62+
63+
+ // eep-dev
64+
+ static void destroy_instance(void) {
65+
+ if (remote_objref_ != nullptr) {
66+
+ delete remote_objref_;
67+
+ remote_objref_ = nullptr;
68+
+ }
69+
+ }
70+
+
71+
int gather_mem_handles(nvshmemi_symmetric_heap &obj, uint64_t heap_offset, size_t size);
72+
/* On-demand registration and release of memory */
73+
int register_mem_handle(nvshmem_mem_handle_t *local_handles, int transport_idx,
74+
diff --git a/src/modules/bootstrap/uid/bootstrap_uid.cpp b/src/modules/bootstrap/uid/bootstrap_uid.cpp
75+
index a1fa748..788fa96 100644
76+
--- a/src/modules/bootstrap/uid/bootstrap_uid.cpp
77+
+++ b/src/modules/bootstrap/uid/bootstrap_uid.cpp
78+
@@ -630,6 +630,11 @@ int nvshmemi_bootstrap_plugin_pre_init(bootstrap_handle_t* handle, const int abi
79+
// Discover the network for bootstrap, if not done previously.
80+
// This code needs to be stateful to be able to be called multiple times by the caller
81+
BOOTSTRAP_CHECK(bootstrap_net_init());
82+
+ // eep-dev
83+
+ if (handle->pre_init_ops != nullptr) {
84+
+ BOOTSTRAP_PTR_FREE(handle->pre_init_ops);
85+
+ handle->pre_init_ops = nullptr;
86+
+ }
87+
if (handle->pre_init_ops == nullptr) {
88+
BOOTSTRAP_CALLOC(&handle->pre_init_ops, 1);
89+
handle->pre_init_ops->get_unique_id = bootstrap_get_unique_id;
90+
--
91+
2.43.0
92+

experimental/serve_deepseek_v2.sh

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#!/bin/bash
2+
3+
# Serve DeepSeek V2 model with vLLM
4+
# This script demonstrates how to serve the DeepSeek V2 model using vLLM's V1 engine
5+
6+
# MODEL_NAME="gaunernst/DeepSeek-V2-Lite-Chat-FP8"
7+
MODEL_NAME="deepseek-ai/DeepSeek-V2-Lite-Chat"
8+
HOST="0.0.0.0"
9+
PORT=8006
10+
11+
DATA_PARALLEL_SIZE=3
12+
DATA_PARALLEL_SIZE_LOCAL=$DATA_PARALLEL_SIZE
13+
14+
export VLLM_USE_V1=1
15+
export VLLM_ALL2ALL_BACKEND="pplx"
16+
export VLLM_USE_DEEP_GEMM=1
17+
18+
# Launch the vLLM server
19+
vllm serve $MODEL_NAME --trust-remote-code \
20+
--disable-log-requests \
21+
--host $HOST \
22+
--port $PORT \
23+
--tensor-parallel-size 1 \
24+
--enable-expert-parallel \
25+
--enable-eplb \
26+
--num-redundant-experts 32 \
27+
--enforce-eager \
28+
--data-parallel-backend ray \
29+
--data-parallel-size $DATA_PARALLEL_SIZE \
30+
--data-parallel-size-local $DATA_PARALLEL_SIZE_LOCAL \
31+
--data-parallel-start-rank 0

experimental/test_scale.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env python3
2+
# SPDX-License-Identifier: Apache-2.0
3+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
4+
5+
import argparse
6+
import json
7+
import sys
8+
9+
import requests
10+
11+
12+
def test_scale(host, port, new_dp_size):
13+
url = f"http://{host}:{port}/scale"
14+
payload = {"new_data_parallel_size": new_dp_size}
15+
headers = {"Content-Type": "application/json"}
16+
17+
print(f"Sending scale request to {url}")
18+
print(f"Payload: {json.dumps(payload, indent=2)}")
19+
20+
try:
21+
response = requests.post(url,
22+
json=payload,
23+
headers=headers,
24+
timeout=300)
25+
26+
print(f"Status Code: {response.status_code}")
27+
print(f"Response: {response.text}")
28+
29+
if response.status_code == 200:
30+
print("Scale up/down request successful!")
31+
return True
32+
else:
33+
print("Scale up/down request failed!")
34+
return False
35+
36+
except requests.exceptions.RequestException as e:
37+
print(f"Request failed: {e}")
38+
return False
39+
40+
41+
def main():
42+
parser = argparse.ArgumentParser(
43+
description="Test scale up/down functionality")
44+
parser.add_argument("--host", default="localhost", help="API server host")
45+
parser.add_argument("--port",
46+
type=int,
47+
default=8006,
48+
help="API server port")
49+
parser.add_argument("--new_dp_size",
50+
type=int,
51+
default=2,
52+
help="New data parallel size")
53+
54+
args = parser.parse_args()
55+
56+
success = test_scale(args.host, args.port, args.new_dp_size)
57+
sys.exit(0 if success else 1)
58+
59+
60+
if __name__ == "__main__":
61+
main()

experimental/test_stateless_pg.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
import torch
4+
from torch.multiprocessing import spawn
5+
6+
from vllm.distributed.utils import (
7+
stateless_destroy_torch_distributed_process_group,
8+
stateless_init_torch_distributed_process_group)
9+
10+
11+
def worker_process(rank: int, world_size: int, host: str, port1: int,
12+
port2: int):
13+
torch.cuda.set_device(rank % torch.cuda.device_count())
14+
15+
# Create first process group with all workers
16+
pg1 = stateless_init_torch_distributed_process_group(host=host,
17+
port=port1,
18+
rank=rank,
19+
world_size=world_size,
20+
backend="gloo")
21+
22+
# Create second process group with worldsize-1 workers (excluding last rank)
23+
pg2 = None
24+
if rank < world_size - 1:
25+
pg2 = stateless_init_torch_distributed_process_group(
26+
host=host,
27+
port=port2,
28+
rank=rank,
29+
world_size=world_size - 1,
30+
backend="gloo")
31+
32+
# Test both groups work simultaneously
33+
tensor1 = torch.tensor([rank], dtype=torch.float32)
34+
torch.distributed.all_reduce(tensor1, group=pg1)
35+
expected1 = sum(range(world_size))
36+
assert tensor1.item(
37+
) == expected1, f"PG1 failed: got {tensor1.item()}, expected {expected1}"
38+
print(f"Rank {rank}: PG1 all_reduce passed")
39+
40+
if pg2 is not None:
41+
tensor2 = torch.tensor([rank], dtype=torch.float32)
42+
torch.distributed.all_reduce(tensor2, group=pg2)
43+
expected2 = sum(range(world_size - 1))
44+
assert tensor2.item() == expected2, (
45+
f"PG2 failed: got {tensor2.item()}, expected {expected2}")
46+
print(f"Rank {rank}: PG2 all_reduce passed")
47+
48+
# Destroy first process group
49+
stateless_destroy_torch_distributed_process_group(pg1)
50+
print(f"Rank {rank}: PG1 destroyed")
51+
52+
# Last rank exits here
53+
if rank == world_size - 1:
54+
print(f"Rank {rank}: Exiting")
55+
return
56+
57+
# Test second group still works after destroying
58+
# first group and last rank exit
59+
tensor3 = torch.tensor([rank * 10], dtype=torch.float32)
60+
torch.distributed.all_reduce(tensor3, group=pg2)
61+
expected3 = sum(i * 10 for i in range(world_size - 1))
62+
assert tensor3.item() == expected3, (
63+
f"PG2 after PG1 destroy failed: got {tensor3.item()}, "
64+
f"expected {expected3}")
65+
print(f"Rank {rank}: PG2 after PG1 destroy passed")
66+
67+
# Clean up
68+
if pg2 is not None:
69+
stateless_destroy_torch_distributed_process_group(pg2)
70+
print(f"Rank {rank}: PG2 destroyed")
71+
72+
73+
def test_stateless_process_groups():
74+
assert not torch.distributed.is_initialized(
75+
), "torch.distributed should not be initialized"
76+
77+
world_size = 4
78+
host = "127.0.0.1"
79+
port1 = 29600
80+
port2 = 29601
81+
82+
print(f"Testing stateless process groups with world_size={world_size}")
83+
84+
spawn(worker_process,
85+
args=(world_size, host, port1, port2),
86+
nprocs=world_size,
87+
join=True)
88+
89+
print("Test completed successfully!")
90+
91+
92+
if __name__ == "__main__":
93+
test_stateless_process_groups()

vllm/config.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,6 +1954,20 @@ def has_unfinished_dp(dp_group: "ProcessGroup",
19541954
aggregated_has_unfinished = bool(tensor.item())
19551955
return aggregated_has_unfinished
19561956

1957+
# eep-dev
1958+
@staticmethod
1959+
def sync_kv_cache_memory(dp_group: "ProcessGroup",
1960+
kv_cache_memory: int) -> None:
1961+
if kv_cache_memory == -1:
1962+
kv_cache_memory = torch.iinfo(torch.int64).max
1963+
tensor = torch.tensor([kv_cache_memory],
1964+
dtype=torch.int64,
1965+
device="cpu")
1966+
# we cannot use broadcast for stateless dp group since it depends
1967+
# on global rank
1968+
torch.distributed.all_reduce(tensor, op=ReduceOp.MIN, group=dp_group)
1969+
return tensor.item()
1970+
19571971
def compute_hash(self):
19581972
"""
19591973
Provide a hash that uniquely identifies all the configs

0 commit comments

Comments
 (0)