Skip to content

Commit 9c25dcc

Browse files
ltd0924Copilot
andauthored
[LLM] Update Multinode Deployment (#2830)
* [LLM] fix multinode bugs * [LLM] update multinode deployment * [LLM] update multinode deployment * [LLM] update multinode deployment * [LLM] update multinode deployment * [LLM] update multinode deployment * [LLM] fix ci bugs * Update fastdeploy/engine/args_utils.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * [LLM] update random port * [LLM] update random port * [LLM] fix ci bugs * fix ci bugs --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent d245d1c commit 9c25dcc

File tree

11 files changed

+108
-56
lines changed

11 files changed

+108
-56
lines changed

fastdeploy/engine/args_utils.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,19 @@ class EngineArgs:
124124
Ratio of tokens to process in a block.
125125
"""
126126

127-
pod_ips: Optional[List[str]] = None
127+
dist_init_ip: Optional[str] = None
128128
"""
129-
List of IP addresses for nodes in the cluster.
129+
The master node ip of multinode deployment
130+
"""
131+
132+
nnodes: int = 1
133+
"""
134+
The number of nodes in multinode deployment
135+
"""
136+
137+
node_rank: int = 0
138+
"""
139+
The rank of the current node in multinode deployment
130140
"""
131141

132142
swap_space: float = None
@@ -485,11 +495,25 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
485495
# Cluster system parameters group
486496
system_group = parser.add_argument_group("System Configuration")
487497
system_group.add_argument(
488-
"--pod-ips",
489-
type=lambda s: s.split(",") if s else None,
490-
default=EngineArgs.pod_ips,
498+
"--dist-init-ip",
499+
default=EngineArgs.dist_init_ip,
491500
help=
492-
"List of IP addresses for nodes in the cluster (comma-separated).")
501+
"IP addresses of master node.")
502+
503+
system_group.add_argument(
504+
"--nnodes",
505+
type=int,
506+
default=EngineArgs.nnodes,
507+
help=
508+
"The number of all nodes.")
509+
510+
system_group.add_argument(
511+
"--node-rank",
512+
type=int,
513+
default=EngineArgs.node_rank,
514+
help=
515+
"node rank id (range [0, nnodes)).")
516+
493517

494518

495519
# Performance tuning parameters group
@@ -789,7 +813,9 @@ def create_engine_config(self) -> Config:
789813
max_num_seqs=self.max_num_seqs,
790814
speculative_config=speculative_cfg,
791815
max_num_batched_tokens=self.max_num_batched_tokens,
792-
pod_ips=self.pod_ips,
816+
dist_init_ip=self.dist_init_ip,
817+
nnodes=self.nnodes,
818+
node_rank=self.node_rank,
793819
use_warmup=self.use_warmup,
794820
engine_worker_queue_port=self.engine_worker_queue_port,
795821
limit_mm_per_prompt=self.limit_mm_per_prompt,

fastdeploy/engine/config.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# You may obtain a copy of the License at
77
#
88
# http://www.apache.org/licenses/LICENSE-2.0
9-
#
9+
#dist_init_ip
1010
# Unless required by applicable law or agreed to in writing, software
1111
# distributed under the License is distributed on an "AS IS" BASIS,
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,7 +24,7 @@
2424
from fastdeploy.platforms import current_platform
2525
from fastdeploy.scheduler import SchedulerConfig
2626
from fastdeploy.utils import (ceil_div, check_unified_ckpt, get_host_ip,
27-
is_port_available, llm_logger)
27+
is_port_available, get_random_port, llm_logger)
2828

2929
TaskOption = Literal["generate"]
3030

@@ -642,7 +642,9 @@ def __init__(
642642
max_model_len: int = 8192,
643643
max_num_seqs: int = 8,
644644
max_num_batched_tokens: Optional[int] = None,
645-
pod_ips: Optional[List[str]] = None,
645+
dist_init_ip: str = None,
646+
nnodes: int = 1,
647+
node_rank: int = 0,
646648
speculative_config: Optional[Dict[str, Any]] = None,
647649
graph_optimization_config: Optional[Dict[str, Any]] = None,
648650
use_warmup: bool = False,
@@ -675,7 +677,6 @@ def __init__(
675677
max_model_len (int): Maximum model length. Default is 8192.
676678
max_num_seqs (int): Maximum number of sequences. Default is 8.
677679
max_num_batched_tokens (Optional[int]): Maximum number of batched tokens. Default is None.
678-
pod_ips (Optional[List[str]]): List of POD IPs. Default is None.
679680
mm_processor_kwargs (Optional[Dict[str, Any]]): Additional arguments for multi-modal processor. Default is None.
680681
speculative_config (Optional[Dict[str, Any]]): Speculative execution configuration. Default is None.
681682
graph_optimization_config (Optional[Dict[str, Any]]): Graph optimizaion backend execution configuration. Default is None.
@@ -699,7 +700,16 @@ def __init__(
699700
self.tokenizer = tokenizer
700701
self.max_num_batched_tokens = max_num_batched_tokens
701702
self.tensor_parallel_size = tensor_parallel_size
702-
self.pod_ips = pod_ips
703+
self.dist_init_ip = dist_init_ip
704+
705+
self.nnode = nnodes
706+
self.node_rank = node_rank
707+
if self.dist_init_ip is None:
708+
self.master_ip = "0.0.0.0"
709+
else:
710+
self.master_ip = self.dist_init_ip
711+
self.dist_init_addr = f"{self.dist_init_ip}:{get_random_port()}"
712+
703713
self.max_model_len = max_model_len
704714
self.max_num_seqs = max_num_seqs
705715
self.limit_mm_per_prompt = limit_mm_per_prompt
@@ -716,14 +726,8 @@ def __init__(
716726
self.graph_optimization_config = graph_optimization_config
717727
self.guided_decoding_backend = guided_decoding_backend
718728
self.disable_any_whitespace = disable_any_whitespace
719-
self.is_master = True
720729
self._str_to_list("innode_prefill_ports", int)
721-
self._str_to_list("pod_ips", str)
722730

723-
if self.pod_ips is None:
724-
self.nnode = 1
725-
else:
726-
self.nnode = len(self.pod_ips)
727731

728732
assert self.splitwise_role in ["mixed", "prefill", "decode"]
729733

@@ -778,9 +782,9 @@ def postprocess(self):
778782

779783
self.host_ip = get_host_ip()
780784

781-
if self.pod_ips is None:
782-
self.pod_ips = ["0.0.0.0"]
783-
elif self.host_ip != self.pod_ips[0]:
785+
if self.dist_init_ip is None or self.host_ip == self.master_ip:
786+
self.is_master = True
787+
else:
784788
self.is_master = False
785789

786790
import paddle

fastdeploy/engine/engine.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def start(self, api_server_pid=None):
174174
cache_config=self.cfg.cache_config,
175175
tensor_parallel_size=self.cfg.tensor_parallel_size,
176176
device_ids=device_ids,
177-
pod_ip=self.cfg.pod_ips[0],
177+
pod_ip=self.cfg.master_ip,
178178
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
179179
pid_suffix=self.ipc_signal_suffix)
180180

@@ -239,11 +239,12 @@ def start(self, api_server_pid=None):
239239

240240
if self.cfg.parallel_config.enable_expert_parallel and self.cfg.parallel_config.data_parallel_size > 1:
241241
self.dp_processed = []
242-
for i in range(1, self.cfg.parallel_config.data_parallel_size):
242+
for i in range(1, self.cfg.parallel_config.data_parallel_size // self.cfg.nnode):
243243
time.sleep(1)
244244
self.dp_processed.append(
245245
multiprocessing.Process(target=start_expert_service,
246-
args=(self.cfg, i,
246+
args=(self.cfg,
247+
i + self.cfg.node_rank * self.cfg.worker_num_per_node,
247248
self.ipc_signal_suffix)))
248249
llm_logger.info(f"Engine is initialized successfully with {self.cfg.tensor_parallel_size}" \
249250
+ " data parallel id {}".format(i))
@@ -1007,16 +1008,14 @@ def _start_worker_service(self):
10071008
)
10081009

10091010
arguments = (
1010-
f" --nnodes {str(self.cfg.nnode)}"
1011-
f" --ips {','.join(self.cfg.pod_ips)}"
10121011
f" --devices {self.cfg.device_ids} {py_script}"
10131012
f" --max_num_seqs {self.cfg.max_num_seqs} --max_model_len {self.cfg.max_model_len}"
10141013
f" --gpu_memory_utilization {self.cfg.cache_config.gpu_memory_utilization}"
10151014
f" --model_name_or_path {str(self.cfg.model_name_or_path)}"
10161015
f" --device_ids {self.cfg.device_ids}"
10171016
f" --tensor_parallel_size {self.cfg.tensor_parallel_size}"
10181017
f" --engine_worker_queue_port {str(self.cfg.engine_worker_queue_port)}"
1019-
f" --pod_ip {self.cfg.pod_ips[0]}"
1018+
f" --pod_ip {self.cfg.master_ip}"
10201019
f" --total_block_num {self.cfg.cache_config.total_block_num}"
10211020
f" --block_size {self.cfg.cache_config.block_size}"
10221021
f" --enc_dec_block_num {self.cfg.cache_config.enc_dec_block_num}"
@@ -1057,7 +1056,11 @@ def _start_worker_service(self):
10571056
if value:
10581057
arguments = arguments + f" --{worker_flag}"
10591058
if self.cfg.nnode > 1:
1060-
pd_cmd = pd_cmd + f" --ips {self.cfg.ips}"
1059+
pd_cmd = pd_cmd + (
1060+
f" --master {self.cfg.dist_init_addr}"
1061+
f" --nnodes {str(self.cfg.nnode)}"
1062+
f" --rank {str(self.cfg.node_rank)}"
1063+
)
10611064
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
10621065
llm_logger.info("Launch worker service command: {}".format(pd_cmd))
10631066
p = subprocess.Popen(
@@ -1158,7 +1161,7 @@ def _stop_profile(self):
11581161
cache_config=self.cfg.cache_config,
11591162
tensor_parallel_size=self.cfg.tensor_parallel_size,
11601163
device_ids=device_ids,
1161-
pod_ip=self.cfg.pod_ips[0],
1164+
pod_ip=self.cfg.master_ip,
11621165
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
11631166
pid_suffix=self.ipc_signal_suffix)
11641167
def check_health(self, time_interval_threashold=30):
@@ -1245,8 +1248,9 @@ def start_queue_service(self):
12451248
"""
12461249
start queue service for engine worker communication
12471250
"""
1248-
address = (self.cfg.pod_ips[0], self.cfg.engine_worker_queue_port)
1249-
if self.cfg.host_ip == self.cfg.pod_ips[0] or self.cfg.pod_ips[0] == "0.0.0.0":
1251+
address = (self.cfg.master_ip, self.cfg.engine_worker_queue_port)
1252+
if self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0":
1253+
llm_logger.info(f"Starting engine worker queue server service at {address}")
12501254
self.engine_worker_queue_server = EngineWorkerQueue(
12511255
address=address,
12521256
is_server=True,
@@ -1256,7 +1260,7 @@ def start_queue_service(self):
12561260

12571261
if self.cfg.cache_config.enable_prefix_caching or self.cfg.splitwise_role != 'mixed':
12581262
self.cache_task_queue = EngineCacheQueue(
1259-
address=(self.cfg.pod_ips[0], self.cfg.cache_config.cache_queue_port),
1263+
address=(self.cfg.master_ip, self.cfg.cache_config.cache_queue_port),
12601264
authkey=b'cache_queue_service',
12611265
is_server=True,
12621266
num_client=self.cfg.tensor_parallel_size,
@@ -1270,4 +1274,6 @@ def start_queue_service(self):
12701274
is_server=False,
12711275
num_client=self.cfg.tensor_parallel_size,
12721276
client_id=0,
1273-
local_data_parallel_id=0)
1277+
local_data_parallel_size=self.cfg.parallel_config.data_parallel_size,
1278+
local_data_parallel_id= min(self.cfg.worker_num_per_node * self.cfg.node_rank,
1279+
self.cfg.parallel_config.data_parallel_size - 1))

fastdeploy/engine/expert_service.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ def __init__(self, cfg, local_data_parallel_id):
4949
cfg (Config): Config object containing all the configuration parameters.
5050
"""
5151
self.cfg = cfg
52-
start_pos = local_data_parallel_id * self.cfg.tensor_parallel_size
53-
end_pos = (local_data_parallel_id + 1) * self.cfg.tensor_parallel_size
52+
start_pos = (local_data_parallel_id * self.cfg.tensor_parallel_size) % self.cfg.worker_num_per_node
53+
end_pos = ((local_data_parallel_id + 1) * self.cfg.tensor_parallel_size) % self.cfg.worker_num_per_node
5454
self.cfg.cache_config.rdma_comm_ports = self.cfg.cache_config.rdma_comm_ports[
5555
start_pos:end_pos]
5656
self.cfg.local_device_ids = self.cfg.device_ids.split(
@@ -65,7 +65,7 @@ def __init__(self, cfg, local_data_parallel_id):
6565

6666
self.cfg.parallel_config.local_data_parallel_id = local_data_parallel_id
6767

68-
address = (cfg.pod_ips[0], cfg.engine_worker_queue_port)
68+
address = (cfg.master_ip, cfg.engine_worker_queue_port)
6969
self.engine_worker_queue = EngineWorkerQueue(
7070
address=address,
7171
is_server=False,
@@ -118,7 +118,7 @@ def start(self, ipc_signal_suffix, local_data_parallel_id):
118118
cache_config=self.cfg.cache_config,
119119
tensor_parallel_size=self.cfg.tensor_parallel_size,
120120
device_ids=self.cfg.local_device_ids,
121-
pod_ip=self.cfg.pod_ips[0],
121+
pod_ip=self.cfg.master_ip,
122122
engine_worker_queue_port=self.cfg.engine_worker_queue_port,
123123
pid_suffix=f"{local_data_parallel_id}_{ipc_signal_suffix}"
124124
)

fastdeploy/entrypoints/llm.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def __init__(
8585

8686
self.mutex = threading.Lock()
8787
self.req_output = dict()
88-
self.master_node_ip = self.llm_engine.cfg.pod_ips[0]
88+
self.master_node_ip = self.llm_engine.cfg.master_ip
8989
self._receive_output_thread = threading.Thread(
9090
target=self._receive_output, daemon=True)
9191
self._receive_output_thread.start()

fastdeploy/entrypoints/openai/api_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ async def lifespan(app: FastAPI):
122122
args.mm_processor_kwargs, args.enable_mm,
123123
args.reasoning_parser)
124124
app.state.dynamic_load_weight = args.dynamic_load_weight
125-
chat_handler = OpenAIServingChat(engine_client, pid, args.pod_ips)
126-
completion_handler = OpenAIServingCompletion(engine_client, pid, args.pod_ips)
125+
chat_handler = OpenAIServingChat(engine_client, pid, args.dist_init_ip)
126+
completion_handler = OpenAIServingCompletion(engine_client, pid, args.dist_init_ip)
127127
engine_client.create_zmq_client(model=pid, mode=zmq.PUSH)
128128
engine_client.pid = pid
129129
app.state.engine_client = engine_client

fastdeploy/entrypoints/openai/serving_chat.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,16 @@ class OpenAIServingChat:
4040
OpenAI-style chat completions serving
4141
"""
4242

43-
def __init__(self, engine_client, pid, pod_ips):
43+
def __init__(self, engine_client, pid, dist_init_ip):
4444
self.engine_client = engine_client
4545
self.pid = pid
46-
self.pod_ips = pod_ips
46+
self.master_ip = dist_init_ip
4747
self.host_ip = get_host_ip()
4848

4949
def _check_master(self):
50-
if self.pod_ips is None:
50+
if self.master_ip is None:
5151
return True
52-
if self.host_ip == self.pod_ips[0]:
52+
if self.host_ip == self.master_ip:
5353
return True
5454
return False
5555

fastdeploy/entrypoints/openai/serving_completion.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,16 @@
4545

4646

4747
class OpenAIServingCompletion:
48-
def __init__(self, engine_client, pid, pod_ips):
48+
def __init__(self, engine_client, pid, dist_init_ip):
4949
self.engine_client = engine_client
5050
self.pid = pid
51-
self.pod_ips = pod_ips
51+
self.master_ip = dist_init_ip
5252
self.host_ip = get_host_ip()
5353

5454
def _check_master(self):
55-
if self.pod_ips is None:
55+
if self.master_ip is None:
5656
return True
57-
if self.host_ip == self.pod_ips[0]:
57+
if self.host_ip == self.master_ip:
5858
return True
5959
return False
6060

fastdeploy/utils.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@
2727
from logging.handlers import BaseRotatingHandler
2828
from pathlib import Path
2929
from typing import Literal, TypeVar, Union
30-
30+
import random
31+
import socket
3132
import requests
3233
import yaml
3334
from aistudio_sdk.snapshot_download import snapshot_download
@@ -421,6 +422,19 @@ def get_host_ip():
421422
return ip
422423

423424

425+
426+
427+
def get_random_port():
428+
while True:
429+
port = random.randint(49152, 65535)
430+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
431+
try:
432+
s.bind(("0.0.0.0", port))
433+
return port
434+
except OSError:
435+
continue
436+
437+
424438
def is_port_available(host, port):
425439
"""
426440
Check the port is available

fastdeploy/worker/gpu_worker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
from fastdeploy.config import FDConfig
2525
from fastdeploy.engine.request import Request
26+
from fastdeploy.platforms import current_platform
2627
from fastdeploy.utils import get_logger
2728
from fastdeploy.worker.gpu_model_runner import GPUModelRunner
2829
from fastdeploy.worker.output import ModelRunnerOutput
@@ -50,11 +51,12 @@ def init_device(self):
5051
"""
5152
Initialize device and construct model runner
5253
"""
54+
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
5355
if self.device_config.device_type == "cuda" and paddle.device.is_compiled_with_cuda(
5456
):
5557
# Set evironment variable
5658
self.device_ids = self.parallel_config.device_ids.split(",")
57-
self.device = f"gpu:{self.local_rank}"
59+
self.device = f"gpu:{self.local_rank % self.max_chips_per_node}"
5860
paddle.device.set_device(self.device)
5961
paddle.set_default_dtype(self.parallel_config.dtype)
6062

@@ -72,7 +74,7 @@ def init_device(self):
7274
self.model_runner: GPUModelRunner = GPUModelRunner(
7375
fd_config=self.fd_config,
7476
device=self.device,
75-
device_id=self.device_ids[self.local_rank],
77+
device_id=self.device_ids[self.local_rank % self.max_chips_per_node],
7678
rank=self.rank,
7779
local_rank=self.local_rank)
7880

0 commit comments

Comments
 (0)