Skip to content

[Structured Outputs][V1] Skipping with models doesn't contain tokenizers #20365

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions tests/v1/core/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
from vllm.config import (CacheConfig, KVTransferConfig, ModelConfig,
SchedulerConfig, SpeculativeConfig, VllmConfig)
from vllm.multimodal.inputs import MultiModalKwargs, PlaceholderRange
from vllm.sampling_params import SamplingParams
from vllm.sampling_params import GuidedDecodingParams, SamplingParams
from vllm.v1.core.sched.output import CachedRequestData, SchedulerOutput
from vllm.v1.core.sched.scheduler import Scheduler
from vllm.v1.kv_cache_interface import (FullAttentionSpec, KVCacheConfig,
KVCacheGroupSpec)
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.structured_output import StructuredOutputManager
from vllm.v1.structured_output.request import StructuredOutputRequest

EOS_TOKEN_ID = 50256

Expand All @@ -33,6 +34,7 @@ def create_scheduler(
block_size: int = 16,
max_model_len: Optional[int] = None,
num_speculative_tokens: Optional[int] = None,
skip_tokenizer_init: bool = False,
) -> Scheduler:
'''Create scheduler under test.

Expand Down Expand Up @@ -65,6 +67,7 @@ def create_scheduler(
trust_remote_code=True,
dtype="float16",
seed=42,
skip_tokenizer_init=skip_tokenizer_init,
)
# Cache config, optionally force APC
kwargs_cache = ({} if enable_prefix_caching is None else {
Expand Down Expand Up @@ -109,7 +112,8 @@ def create_scheduler(
vllm_config=vllm_config,
kv_cache_config=kv_cache_config,
log_stats=True,
structured_output_manager=StructuredOutputManager(vllm_config),
structured_output_manager=StructuredOutputManager(
vllm_config=vllm_config),
)


Expand Down Expand Up @@ -186,7 +190,7 @@ def test_get_num_unfinished_requests():
])
def test_schedule(enable_prefix_caching: Optional[bool],
prompt_logprobs: Optional[int]):
'''Test scheduling.
'''Test scheduling.
Two cases: default APC/no prompt logprobs; APC=True + prompt logprobs
'''
scheduler = create_scheduler(enable_prefix_caching=enable_prefix_caching)
Expand Down Expand Up @@ -1408,7 +1412,7 @@ def create_requests_with_priority(


def test_priority_scheduling_basic_ordering():
"""Test that requests are scheduled in priority order
"""Test that requests are scheduled in priority order
(lower value = higher priority)."""
scheduler = create_scheduler_with_priority()

Expand Down Expand Up @@ -1437,7 +1441,7 @@ def test_priority_scheduling_basic_ordering():


def test_priority_scheduling_arrival_time_tiebreaker():
"""Test that arrival time is used
"""Test that arrival time is used
as tiebreaker when priorities are equal."""
scheduler = create_scheduler_with_priority()

Expand Down Expand Up @@ -1495,7 +1499,7 @@ def test_priority_scheduling_mixed_priority_and_arrival():


def test_priority_scheduling_preemption():
"""Test that priority scheduling preempts
"""Test that priority scheduling preempts
lower priority requests when memory is constrained."""
# Create scheduler with very limited memory to force preemption
scheduler = create_scheduler_with_priority(
Expand Down Expand Up @@ -1576,7 +1580,7 @@ def test_priority_scheduling_preemption():


def test_priority_scheduling_no_preemption_when_space_available():
"""Test that preemption doesn't happen
"""Test that preemption doesn't happen
when there's space for new requests."""
scheduler = create_scheduler_with_priority(
max_num_seqs=3, # Allow 3 concurrent requests
Expand Down Expand Up @@ -1626,7 +1630,7 @@ def test_priority_scheduling_no_preemption_when_space_available():


def test_priority_scheduling_preemption_victim_selection():
"""Test that the correct victim is selected for
"""Test that the correct victim is selected for
preemption based on priority and arrival time."""
# This test verifies the priority-based victim selection logic
# by checking the waiting queue order after adding requests with different
Expand Down Expand Up @@ -1743,7 +1747,7 @@ def test_priority_scheduling_waiting_queue_order():


def test_priority_scheduling_fcfs_fallback():
"""Test that FCFS behavior is maintained when all
"""Test that FCFS behavior is maintained when all
requests have same priority."""
scheduler = create_scheduler_with_priority()

Expand Down Expand Up @@ -1811,7 +1815,7 @@ def test_priority_scheduling_with_limited_slots():


def test_priority_scheduling_heap_property():
"""Test that the waiting queue maintains heap
"""Test that the waiting queue maintains heap
property for priority scheduling."""
scheduler = create_scheduler_with_priority(
max_num_seqs=1, # Only one request can run at a time
Expand Down Expand Up @@ -1857,3 +1861,39 @@ def test_priority_scheduling_heap_property():
# Verify requests were scheduled in priority order (lowest value first)
expected_priorities = sorted(priorities)
assert scheduled_priorities == expected_priorities


def test_schedule_skip_tokenizer_init():
scheduler = create_scheduler(skip_tokenizer_init=True)
requests = create_requests(num_requests=5)
for request in requests:
scheduler.add_request(request)
output = scheduler.schedule()
assert len(output.scheduled_new_reqs) == len(requests)
assert output.grammar_bitmask is None


def test_schedule_skip_tokenizer_init_structured_output_request():
scheduler = create_scheduler(skip_tokenizer_init=True)
guided_params = GuidedDecodingParams(regex="[0-9]+")
sampling_params = SamplingParams(
ignore_eos=False,
max_tokens=16,
guided_decoding=guided_params,
)
request = Request(
request_id="0",
prompt_token_ids=[0, 1],
multi_modal_inputs=None,
multi_modal_hashes=None,
multi_modal_placeholders=None,
sampling_params=sampling_params,
pooling_params=None,
eos_token_id=EOS_TOKEN_ID,
structured_output_request=StructuredOutputRequest(sampling_params),
)
scheduler.add_request(request)
output = scheduler.schedule()
assert len(output.scheduled_new_reqs) == 0
assert len(scheduler.running) == 0
assert len(scheduler.waiting) == 1
3 changes: 2 additions & 1 deletion tests/v1/kv_connector/unit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def create_scheduler(
vllm_config=vllm_config,
kv_cache_config=kv_cache_config,
log_stats=True,
structured_output_manager=StructuredOutputManager(vllm_config),
structured_output_manager=StructuredOutputManager(
vllm_config=vllm_config),
)


Expand Down
6 changes: 4 additions & 2 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from collections import defaultdict
from collections.abc import Iterable
from typing import Any, Optional, Union
from typing import TYPE_CHECKING, Any, Optional, Union

from vllm.config import VllmConfig
from vllm.distributed.kv_events import EventPublisherFactory, KVEventBatch
Expand All @@ -33,7 +33,9 @@
from vllm.v1.outputs import ModelRunnerOutput
from vllm.v1.request import Request, RequestStatus
from vllm.v1.spec_decode.metrics import SpecDecodingStats
from vllm.v1.structured_output import StructuredOutputManager

if TYPE_CHECKING:
from vllm.v1.structured_output import StructuredOutputManager

logger = init_logger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@
self.collective_rpc("initialize_cache",
args=(num_gpu_blocks, num_cpu_blocks))

self.structured_output_manager = StructuredOutputManager(vllm_config)
self.structured_output_manager = StructuredOutputManager(

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]

Check failure on line 89 in vllm/v1/engine/core.py

View workflow job for this annotation

GitHub Actions / pre-commit

Unexpected keyword argument "vllm_config" for "StructuredOutputManager" [call-arg]
vllm_config=vllm_config)

# Setup scheduler.
if isinstance(vllm_config.scheduler_config.scheduler_cls, str):
Expand Down
5 changes: 5 additions & 0 deletions vllm/v1/engine/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ def _validate_structured_output(self, params: SamplingParams) -> None:
if not params.guided_decoding or not self.decoding_config:
return

if self.model_config.skip_tokenizer_init and self.decoding_config:
raise ValueError(
"'skip_tokenizer_init' is specified during engine startup. This implies that the model doesn't contain sufficient files to setup tokenizers, which structured outputs requires tokenizers to work. Specifying structured outputs parameters will not be supported in conjunction with 'skip_tokenizer_init'." # noqa: E501
)

engine_level_backend = self.decoding_config.backend
if params.guided_decoding.backend:
# Request-level backend selection is not supported in V1.
Expand Down
81 changes: 53 additions & 28 deletions vllm/v1/structured_output/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Optional

from pydantic import ConfigDict, Field
from pydantic.dataclasses import dataclass

from vllm.config import VllmConfig
from vllm.logger import init_logger
from vllm.reasoning import ReasoningParserManager
Expand All @@ -29,36 +32,56 @@
logger = init_logger(__name__)


@dataclass(config=ConfigDict(arbitrary_types_allowed=True))
class StructuredOutputManager:
"""Engine-level manager for structured output requests."""

def __init__(self, vllm_config: VllmConfig):
self.backend: Optional[StructuredOutputBackend] = None
self.reasoner: Optional[ReasoningParser] = None
self.vllm_config = vllm_config

self._grammar_bitmask: Optional[torch.Tensor] = None
self._full_mask = torch.tensor(-1, dtype=torch.int32)

# The default max_workers if not specified is the number of CPUs * 5,
# which is way too high since these tasks are CPU-bound, not I/O bound.
# We also know we would never dominate CPU usage with just grammar
# compilation, so we set it to half the number of CPUs.
max_workers = max(1, (multiprocessing.cpu_count() + 1) // 2)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.tokenizer = init_tokenizer_from_configs(
model_config=self.vllm_config.model_config,
scheduler_config=self.vllm_config.scheduler_config,
lora_config=self.vllm_config.lora_config,
).get_lora_tokenizer(None)
reasoning_backend = vllm_config.decoding_config.reasoning_backend
if reasoning_backend:
reasoner_cls = ReasoningParserManager.get_reasoning_parser(
reasoning_backend)
self.reasoner = reasoner_cls(tokenizer=self.tokenizer)
vllm_config: VllmConfig

backend: Optional[StructuredOutputBackend] = Field(
default=None,
init=False,
repr=False,
)
reasoner: Optional[ReasoningParser] = Field(
default=None,
init=False,
repr=False,
)
_grammar_bitmask: Optional[torch.Tensor] = Field(
default=None,
init=False,
repr=False,
)
_full_mask: torch.Tensor = Field(
default_factory=lambda: torch.tensor(-1, dtype=torch.int32),
init=False,
repr=False,
)

def __post_init__(self):
if not self.vllm_config.model_config.skip_tokenizer_init:
# The default max_workers if not specified is the number
# of CPUs * 5, which is way too high since these tasks are
# CPU-bound, not I/O bound. We also know we would never dominate
# CPU usage with just grammar compilation, so we set it to half
# the number of CPUs.
max_workers = max(1, (multiprocessing.cpu_count() + 1) // 2)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.tokenizer = init_tokenizer_from_configs(
model_config=self.vllm_config.model_config,
scheduler_config=self.vllm_config.scheduler_config,
lora_config=self.vllm_config.lora_config,
).get_lora_tokenizer(None)
reasoning_backend = \
self.vllm_config.decoding_config.reasoning_backend
if reasoning_backend:
reasoner_cls = ReasoningParserManager.get_reasoning_parser(
reasoning_backend)
self.reasoner = reasoner_cls(tokenizer=self.tokenizer)

def grammar_init(self, request: Request) -> None:
if request.structured_output_request is None:
if request.structured_output_request is None or \
self.vllm_config.model_config.skip_tokenizer_init:
return

if TYPE_CHECKING:
Expand Down Expand Up @@ -115,7 +138,8 @@ def grammar_bitmask(
scheduled_spec_decode_tokens: dict[str, list[int]],
) -> Optional[npt.NDArray[np.int32]]:
# Prepare the structured output bitmask for this batch.
if not structured_output_request_ids:
if not structured_output_request_ids \
or self.vllm_config.model_config.skip_tokenizer_init:
return None

max_num_spec_tokens = 0
Expand Down Expand Up @@ -193,7 +217,8 @@ def grammar_bitmask(
return bitmask_tensor.numpy()

def should_advance(self, request: Request) -> bool:
if not request.use_structured_output:
if not request.use_structured_output \
or self.vllm_config.model_config.skip_tokenizer_init:
return False

# To determine whether we can advance the FSM.
Expand Down
4 changes: 1 addition & 3 deletions vllm/v1/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,11 @@
sanity_check_mm_encoder_outputs, scatter_mm_placeholders)

if TYPE_CHECKING:
import xgrammar as xgr
import xgrammar.kernels.apply_token_bitmask_inplace_torch_compile as xgr_torch_compile # noqa: E501

from vllm.model_executor.model_loader.tensorizer import TensorizerConfig
from vllm.v1.core.sched.output import SchedulerOutput
else:
xgr = LazyLoader("xgr", globals(), "xgrammar")
xgr_torch_compile = LazyLoader(
"xgr_torch_compile", globals(),
"xgrammar.kernels.apply_token_bitmask_inplace_torch_compile")
Expand Down Expand Up @@ -1960,7 +1958,7 @@ def maybe_randomize_inputs(self, input_ids: torch.Tensor):
Randomize input_ids if VLLM_RANDOMIZE_DP_DUMMY_INPUTS is set.
This is to help balance expert-selection
- during profile_run
- during DP rank dummy run
- during DP rank dummy run
"""
dp_size = self.vllm_config.parallel_config.data_parallel_size
randomize_inputs = envs.VLLM_RANDOMIZE_DP_DUMMY_INPUTS and dp_size > 1
Expand Down