From bb8800330255c551f5d11e892e0847b3d8e88ec5 Mon Sep 17 00:00:00 2001 From: zoooo0820 Date: Fri, 4 Jul 2025 15:41:24 +0800 Subject: [PATCH 1/2] support stop_reqs --- docs/usage/environment_variables.md | 8 +++- docs/zh/usage/environment_variables.md | 9 +++- fastdeploy/config.py | 4 ++ fastdeploy/engine/config.py | 1 + fastdeploy/engine/sampling_params.py | 1 + fastdeploy/envs.py | 4 ++ .../model_executor/pre_and_post_process.py | 42 ++++++++++++------- fastdeploy/worker/gpu_model_runner.py | 22 +++++++--- fastdeploy/worker/output.py | 9 ++++ fastdeploy/worker/xpu_model_runner.py | 6 ++- 10 files changed, 80 insertions(+), 26 deletions(-) diff --git a/docs/usage/environment_variables.md b/docs/usage/environment_variables.md index 2cf9ff73d8..1ce5bad824 100644 --- a/docs/usage/environment_variables.md +++ b/docs/usage/environment_variables.md @@ -32,6 +32,10 @@ environment_variables: dict[str, Callable[[], Any]] = { "FD_STOP_SEQS_MAX_LEN": lambda: os.getenv("FD_STOP_SEQS_MAX_LEN", "8"), + # Whether to use stop sequences (0 or 1) + "FD_USE_STOP_SEQ": + lambda: os.getenv("FD_USE_STOP_SEQ", 0), + # GPU devices to use (comma-separated string, e.g. 0,1,2) "CUDA_VISIBLE_DEVICES": lambda: os.getenv("CUDA_VISIBLE_DEVICES", None), @@ -67,6 +71,6 @@ environment_variables: dict[str, Callable[[], Any]] = { # Switch from standalone PD to centralized inference (0 or 1) "FD_PD_CHANGEABLE": lambda: os.getenv("FD_PD_CHANGEABLE", "1"), - + } -``` \ No newline at end of file +``` diff --git a/docs/zh/usage/environment_variables.md b/docs/zh/usage/environment_variables.md index d952e757d4..d79f3902f4 100644 --- a/docs/zh/usage/environment_variables.md +++ b/docs/zh/usage/environment_variables.md @@ -1,5 +1,6 @@ # FastDeploy 环境变量说明 FastDeploy 的环境变量保存在了代码库根目录下 fastdeploy/envs.py 文件中,以下是其对应的中文版说明: + ```python environment_variables: dict[str, Callable[[], Any]] = { # 构建 FastDeploy 时使用的 CUDA 架构版本,这是一个字符串列表,例如[80,90] @@ -30,6 +31,10 @@ environment_variables: dict[str, Callable[[], Any]] = { "FD_STOP_SEQS_MAX_LEN": lambda: os.getenv("FD_STOP_SEQS_MAX_LEN", "8"), + # 是否使用停止序列 + "FD_USE_STOP_SEQ": + lambda: os.getenv("FD_USE_STOP_SEQ", 0), + # 将要使用的GPU设备,这是一个用逗号分隔的字符串,例如 0,1,2 "CUDA_VISIBLE_DEVICES": lambda: os.getenv("CUDA_VISIBLE_DEVICES", None), @@ -65,6 +70,6 @@ environment_variables: dict[str, Callable[[], Any]] = { # 是否从单机 PD 分离转换为集中式推理 "FD_PD_CHANGEABLE": lambda: os.getenv("FD_PD_CHANGEABLE", "1"), - + } -``` \ No newline at end of file +``` diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 446e59298d..324752464d 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -22,6 +22,7 @@ from paddleformers.transformers.configuration_utils import PretrainedConfig +from fastdeploy import envs from fastdeploy.model_executor.layers.quantization.quant_base import \ QuantConfigBase from fastdeploy.utils import get_logger @@ -124,6 +125,9 @@ def __init__( self.tie_word_embeddings = tie_word_embeddings self.is_quantized = is_quantized + self.max_stop_seqs_num = int(envs.FD_MAX_STOP_SEQS_NUM) + self.stop_seqs_max_len = int(envs.FD_STOP_SEQS_MAX_LEN) + self.use_stop_seq = int(envs.FD_USE_STOP_SEQ) @dataclass class MoEConfig: diff --git a/fastdeploy/engine/config.py b/fastdeploy/engine/config.py index c538890581..72c62c4a60 100644 --- a/fastdeploy/engine/config.py +++ b/fastdeploy/engine/config.py @@ -126,6 +126,7 @@ def read_from_env(self): """ self.max_stop_seqs_num = int(envs.FD_MAX_STOP_SEQS_NUM) self.stop_seqs_max_len = int(envs.FD_STOP_SEQS_MAX_LEN) + self.use_stop_seq = int(envs.FD_USE_STOP_SEQ) def reset_config_value(key, value): if not hasattr(self, key.lower()): diff --git a/fastdeploy/engine/sampling_params.py b/fastdeploy/engine/sampling_params.py index 0f60cf36b7..1f88c80e24 100644 --- a/fastdeploy/engine/sampling_params.py +++ b/fastdeploy/engine/sampling_params.py @@ -85,6 +85,7 @@ class SamplingParams: seed: Optional[int] = None stop: Optional[Union[str, List[str]]] = None stop_token_ids: Optional[Union[List[List[int]], List[int]]] = None + stop_seqs_len: Optional[int] = None max_tokens: Optional[int] = None reasoning_max_tokens: Optional[int] = None min_tokens: int = 1 diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 8ef8a5149c..b7af814490 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -52,6 +52,10 @@ "FD_STOP_SEQS_MAX_LEN": lambda: os.getenv("FD_STOP_SEQS_MAX_LEN", "8"), + # Whether to use stop sequences (0 or 1) + "FD_USE_STOP_SEQ": + lambda: os.getenv("FD_USE_STOP_SEQ", "0"), + # GPU devices that will be used. This is a string that # splited by comma, such as 0,1,2. "CUDA_VISIBLE_DEVICES": diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index 8f3601ff61..f740e41dfc 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -21,11 +21,12 @@ from fastdeploy.engine.config import SpeculativeConfig from fastdeploy.model_executor.ops.gpu import ( get_padding_offset, save_output, set_stop_value_multi_ends, - speculate_clear_accept_nums, speculate_get_output_padding_offset, - speculate_get_padding_offset, speculate_get_seq_lens_output, - speculate_save_output, speculate_set_value_by_flags_and_idx, - speculate_step_paddle, speculate_step_system_cache, speculate_update_v3, - step_paddle, step_system_cache, update_inputs, step_reschedule) + set_stop_value_multi_seqs, speculate_clear_accept_nums, + speculate_get_output_padding_offset, speculate_get_padding_offset, + speculate_get_seq_lens_output, speculate_save_output, + speculate_set_value_by_flags_and_idx, speculate_step_paddle, + speculate_step_system_cache, speculate_update_v3, step_paddle, + step_reschedule, step_system_cache, update_inputs) from fastdeploy.platforms import current_platform from fastdeploy.worker.output import ModelOutputData @@ -105,7 +106,8 @@ def pre_process( def post_process_normal(sampled_token_ids: paddle.Tensor, model_output: ModelOutputData, save_each_rank: bool = False, - skip_save_output: bool = False) -> None: + skip_save_output: bool = False, + use_stop_seqs: bool = False) -> None: """ Post-processing steps after completing a single token generation. """ # 1. Set stop value paddle.assign( @@ -122,12 +124,23 @@ def post_process_normal(sampled_token_ids: paddle.Tensor, paddle.logical_or(model_output.stop_flags, length_cond), model_output.stop_flags, ) - # TODO(gongshaotian): Add use_stop_seqs - set_stop_value_multi_ends(sampled_token_ids, model_output.stop_flags, - model_output.seq_lens_this_time, - model_output.eos_token_id, - model_output.next_tokens, False) # multi ends + if not use_stop_seqs: + set_stop_value_multi_ends(sampled_token_ids, model_output.stop_flags, + model_output.seq_lens_this_time, + model_output.eos_token_id, + model_output.next_tokens, False) # multi ends + else: + set_stop_value_multi_seqs( + sampled_token_ids, + model_output.pre_ids, + model_output.step_idx, + model_output.stop_flags, + model_output.seq_lens_this_time, + model_output.stop_token_ids, + model_output.stop_seqs_len, + model_output.eos_token_id, + ) # 2. Update the input buffer of the model with paddle.framework._no_check_dy2st_diff(): update_inputs( @@ -197,13 +210,14 @@ def post_process(sampled_token_ids: paddle.Tensor, model_output: ModelOutputData, save_each_rank: bool = False, speculative_decoding: bool = False, - skip_save_output: bool = False) -> None: + skip_save_output: bool = False, + use_stop_seq: bool = False) -> None: """ Post-processing steps after completing a single token generation. """ if speculative_decoding: post_process_specualate(model_output, skip_save_output) else: post_process_normal(sampled_token_ids, model_output, save_each_rank, - skip_save_output) + skip_save_output, use_stop_seq) def step_cuda( @@ -217,7 +231,7 @@ def step_cuda( TODO(gongshaotian): normalization name """ - + if speculative_config.method is not None: if enable_prefix_caching: speculate_step_system_cache( diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index c13f232d38..0b75630e26 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -280,9 +280,9 @@ def insert_prefill_inputs(self, req_dicts: List[Request]): stop_seqs_num = len(request.get("stop_seqs_len")) for i in range(stop_seqs_num, self.model_config.max_stop_seqs_num): - request.stop_seqs_len.append(0) + request.sampling_params.stop_seqs_len.append(0) self.share_inputs["stop_seqs_len"][:] = np.array( - request.stop_seqs_len, dtype="int32") + request.sampling_params.stop_seqs_len, dtype="int32") self.share_inputs["stop_seqs"][:stop_seqs_num, :len( request.get("stop_token_ids")[0])] = np.array( request.get("stop_token_ids"), dtype="int64") @@ -505,7 +505,7 @@ def _init_share_inputs(self, max_num_seqs: int): self.model_config.stop_seqs_max_len ], -1, - dtype="int32") + dtype="int64") if self.speculative_decoding: max_draft_token_num = self.speculative_config.num_speculative_tokens self.share_inputs["input_ids_cpu"] = paddle.full( @@ -832,7 +832,11 @@ def _dummy_run(self, accept_tokens=self.share_inputs["accept_tokens"] if self.speculative_decoding else None, accept_num=self.share_inputs["accept_num"] - if self.speculative_decoding else None) + if self.speculative_decoding else None, + stop_token_ids=self.share_inputs["stop_seqs"] + if self.model_config.use_stop_seq else None, + stop_seqs_len=self.share_inputs["stop_seqs_len"] + if self.model_config.use_stop_seq else None) post_process(sampled_token_ids=sampled_token_ids, model_output=model_output_data, @@ -1065,7 +1069,12 @@ class at the server level, which is too granular for ModelRunner. accept_tokens=self.share_inputs["accept_tokens"] if self.speculative_decoding else None, accept_num=self.share_inputs["accept_num"] - if self.speculative_decoding else None) + if self.speculative_decoding else None, + stop_token_ids=self.share_inputs["stop_seqs"] + if self.model_config.use_stop_seq else None, + stop_seqs_len=self.share_inputs["stop_seqs_len"] + if self.model_config.use_stop_seq else None, + ) if self.speculative_config.method in ["mtp"] and \ self.parallel_config.splitwise_role == "prefill": @@ -1076,7 +1085,8 @@ class at the server level, which is too granular for ModelRunner. model_output=model_output_data, save_each_rank=self.parallel_config.use_ep, speculative_decoding=self.speculative_decoding, - skip_save_output=skip_save_output) + skip_save_output=skip_save_output, + use_stop_seq=self.model_config.use_stop_seq) # 6. Speculative decode if self.speculative_decoding: diff --git a/fastdeploy/worker/output.py b/fastdeploy/worker/output.py index 7d3c1198fb..151e45857c 100644 --- a/fastdeploy/worker/output.py +++ b/fastdeploy/worker/output.py @@ -132,6 +132,15 @@ class ModelOutputData: """ accept_num: paddle.Tensor + """ + the token ids of stop sequence + """ + stop_token_ids: paddle.Tensor + + """ + the length of stop sequence + """ + stop_seqs_len: paddle.Tensor @dataclass class ModelRunnerOutput: diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index b075356f99..38f3cf8ffa 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -320,9 +320,9 @@ def process_prefill_inputs(self, req_dicts: List[Request]): stop_seqs_num = len(request.get("stop_seqs_len")) for i in range(stop_seqs_num, self.model_config.max_stop_seqs_num): - request.stop_seqs_len.append(0) + request.sampling_params.stop_seqs_len.append(0) self.share_inputs["stop_seqs_len"][:] = np.array( - request.stop_seqs_len, dtype="int32") + request.sampling_params.stop_seqs_len, dtype="int32") self.share_inputs["stop_seqs"][:stop_seqs_num, :len( request.get("stop_token_ids")[0])] = np.array( request.get("stop_token_ids"), dtype="int64") @@ -719,6 +719,8 @@ class at the server level, which is too granular for ModelRunner. actual_draft_token_num=None, accept_tokens=None, accept_num=None, + stop_token_ids=None, + stop_seqs_len=None, ) xpu_post_process(sampled_token_ids=sampled_token_ids, model_output=model_output_data) From 2781e51951f45fd16f44a51778e15cd33a0166f1 Mon Sep 17 00:00:00 2001 From: zoooo0820 Date: Fri, 4 Jul 2025 18:39:55 +0800 Subject: [PATCH 2/2] support stop_token_ids --- fastdeploy/input/ernie_processor.py | 16 ++++++++++++++-- fastdeploy/input/text_processor.py | 12 +++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/fastdeploy/input/ernie_processor.py b/fastdeploy/input/ernie_processor.py index 51dbed7663..8fad09fcb0 100644 --- a/fastdeploy/input/ernie_processor.py +++ b/fastdeploy/input/ernie_processor.py @@ -20,10 +20,9 @@ from paddleformers.generation import GenerationConfig from fastdeploy import envs -from fastdeploy.utils import data_processor_logger from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer - from fastdeploy.input.text_processor import BaseDataProcessor +from fastdeploy.utils import data_processor_logger _SAMPLING_EPS = 1e-5 @@ -93,11 +92,19 @@ def process_request(self, request, max_model_len=None, **kwargs): if request.get("eos_token_ids") is None or len( request.eos_token_ids) == 0: request.eos_token_ids = self.eos_token_ids + + # 暂时 stop 和 stop_token_ids 只有一个生效,前者优先级更高 stop_sequences = request.get("stop", []) if stop_sequences is not None and len(stop_sequences) != 0: stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences) request.set("stop_token_ids", stop_seqs) request.set("stop_seqs_len", stop_seqs_len) + else: + stop_token_ids_list = request.get("stop_token_ids", []) + if len(stop_token_ids_list) != 0: + request.set("stop_token_ids", [stop_token_ids_list]) + request.set("stop_seqs_len", [len(stop_token_ids_list)]) + if request.prompt_token_ids is None or len( request.prompt_token_ids) == 0: @@ -149,6 +156,11 @@ def process_request_dict(self, request, max_model_len=None): stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences) request['stop_token_ids'] = stop_seqs request['stop_seqs_len'] = stop_seqs_len + else: + stop_token_ids_list = request.get("stop_token_ids", []) + if len(stop_token_ids_list) != 0: + request["stop_token_ids"] = [stop_token_ids_list] + request["stop_seqs_len"] = [len(stop_token_ids_list)] system = request.get("system") # 处理prompt_token_ids diff --git a/fastdeploy/input/text_processor.py b/fastdeploy/input/text_processor.py index ae2dc1f29c..5cdbcf80bd 100644 --- a/fastdeploy/input/text_processor.py +++ b/fastdeploy/input/text_processor.py @@ -235,6 +235,11 @@ def process_request(self, request, max_model_len=None, **kwargs): stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences) request.set("stop_token_ids", stop_seqs) request.set("stop_seqs_len", stop_seqs_len) + else: + stop_token_ids_list = request.get("stop_token_ids", []) + if len(stop_token_ids_list) != 0: + request.set("stop_token_ids", [stop_token_ids_list]) + request.set("stop_seqs_len", [len(stop_token_ids_list)]) if request.prompt_token_ids is None or len( request.prompt_token_ids) == 0: @@ -282,6 +287,11 @@ def process_request_dict(self, request, max_model_len=None, **kwargs): stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences) request['stop_token_ids'] = stop_seqs request['stop_seqs_len'] = stop_seqs_len + else: + stop_token_ids_list = request.get("stop_token_ids", []) + if len(stop_token_ids_list) != 0: + request["stop_token_ids"] = [stop_token_ids_list] + request["stop_seqs_len"] = [len(stop_token_ids_list)] data_processor_logger.info(f"Processing request {request}") # 处理prompt_token_ids @@ -630,6 +640,6 @@ def update_stop_seq(self, stop_sequences): pad_id=-1, return_seq_len=True, return_array=False) - data_processor_logger.debug( + data_processor_logger.info( f"processed stop_seqs: {stop_seqs}, {stop_seqs_len}") return stop_seqs, stop_seqs_len