|
1 | 1 | # SPDX-License-Identifier: Apache-2.0
|
2 | 2 | # SPDX-FileCopyrightText: Copyright contributors to the vLLM project
|
3 |
| - |
4 |
| -from typing import Final, Optional, Union |
| 3 | +import re |
| 4 | +from functools import lru_cache |
| 5 | +from typing import Any, Dict, Final, List, Optional, Tuple, Union |
5 | 6 |
|
6 | 7 | import jinja2
|
7 | 8 | from fastapi import Request
|
|
17 | 18 | ErrorResponse,
|
18 | 19 | TokenizeChatRequest,
|
19 | 20 | TokenizeRequest,
|
20 |
| - TokenizeResponse) |
| 21 | + TokenizeResponse, |
| 22 | + TokenizerInfoResponse) |
21 | 23 | # yapf: enable
|
22 | 24 | from vllm.entrypoints.openai.serving_engine import OpenAIServing
|
23 | 25 | from vllm.entrypoints.openai.serving_models import OpenAIServingModels
|
24 | 26 | from vllm.logger import init_logger
|
| 27 | +from vllm.transformers_utils.tokenizer import (AnyTokenizer, decode_tokens, |
| 28 | + encode_tokens) |
| 29 | +from vllm.transformers_utils.tokenizers import MistralTokenizer |
25 | 30 |
|
26 | 31 | logger = init_logger(__name__)
|
27 | 32 |
|
@@ -155,3 +160,207 @@ async def create_detokenize(
|
155 | 160 | input_text = prompt_input["prompt"]
|
156 | 161 |
|
157 | 162 | return DetokenizeResponse(prompt=input_text)
|
| 163 | + |
| 164 | + async def get_tokenizer_info( |
| 165 | + self) -> Union[TokenizerInfoResponse, ErrorResponse]: |
| 166 | + """Get comprehensive tokenizer information.""" |
| 167 | + try: |
| 168 | + tokenizer = await self.engine_client.get_tokenizer() |
| 169 | + info = TokenizerInfo(tokenizer, self.model_config, |
| 170 | + self.chat_template).to_dict() |
| 171 | + return TokenizerInfoResponse(**info) |
| 172 | + except Exception as e: |
| 173 | + return self.create_error_response( |
| 174 | + f"Failed to get tokenizer info: {str(e)}") |
| 175 | + |
| 176 | + |
| 177 | +class TokenizerInfo: |
| 178 | + |
| 179 | + def __init__(self, tokenizer: AnyTokenizer, model_config: ModelConfig, |
| 180 | + chat_template: Optional[str]): |
| 181 | + self.tokenizer = tokenizer |
| 182 | + self.model_config = model_config |
| 183 | + self.chat_template = chat_template |
| 184 | + |
| 185 | + def to_dict(self) -> Dict[str, Any]: |
| 186 | + """Convert to dictionary for API response.""" |
| 187 | + return { |
| 188 | + **self._get_core_info(), |
| 189 | + **self._get_model_info(), |
| 190 | + **self._get_special_tokens(), |
| 191 | + **self._get_tokenizer_attributes(), |
| 192 | + **self._get_chat_template_info(), |
| 193 | + } |
| 194 | + |
| 195 | + def _get_core_info(self) -> Dict[str, Any]: |
| 196 | + """Get core tokenizer information.""" |
| 197 | + vocab_size = getattr(self.tokenizer, 'vocab_size', None) |
| 198 | + tokenizer_type = type(self.tokenizer).__name__ |
| 199 | + |
| 200 | + return { |
| 201 | + "tokenizer_type": tokenizer_type, |
| 202 | + "vocab_size": vocab_size, |
| 203 | + "tokenizer_backend": self._detect_backend(tokenizer_type), |
| 204 | + "is_cached": "Cached" in tokenizer_type, |
| 205 | + "is_fast": getattr(self.tokenizer, 'is_fast', False), |
| 206 | + "max_token_id": vocab_size - 1 if vocab_size else None, |
| 207 | + } |
| 208 | + |
| 209 | + def _get_model_info(self) -> Dict[str, Any]: |
| 210 | + """Get model configuration information.""" |
| 211 | + if not self.model_config: |
| 212 | + return {} |
| 213 | + |
| 214 | + return { |
| 215 | + "model_name": |
| 216 | + self.model_config.model, |
| 217 | + "tokenizer_name": |
| 218 | + getattr(self.model_config, 'tokenizer', None), |
| 219 | + "tokenizer_mode": |
| 220 | + getattr(self.model_config, 'tokenizer_mode', 'auto'), |
| 221 | + "trust_remote_code": |
| 222 | + getattr(self.model_config, 'trust_remote_code', False), |
| 223 | + "tokenizer_revision": |
| 224 | + getattr(self.model_config, 'tokenizer_revision', None), |
| 225 | + "is_gguf": |
| 226 | + getattr(self.model_config, 'gguf_file', None) is not None, |
| 227 | + "gguf_file": |
| 228 | + getattr(self.model_config, 'gguf_file', None), |
| 229 | + } |
| 230 | + |
| 231 | + def _get_special_tokens(self) -> Dict[str, Any]: |
| 232 | + """Get all special tokens using the official HuggingFace method.""" |
| 233 | + special_tokens = {} |
| 234 | + |
| 235 | + # Use the official special_tokens_map property - this is the canonical way |
| 236 | + if hasattr(self.tokenizer, 'special_tokens_map'): |
| 237 | + for key, token_str in self.tokenizer.special_tokens_map.items(): |
| 238 | + if key == "additional_special_tokens" and isinstance( |
| 239 | + token_str, list): |
| 240 | + # Handle additional special tokens as a clean mapping |
| 241 | + for tok_str in token_str: |
| 242 | + try: |
| 243 | + token_id = self.tokenizer.convert_tokens_to_ids( |
| 244 | + tok_str) |
| 245 | + if token_id >= 0: # Valid token ID |
| 246 | + special_tokens[tok_str] = token_id |
| 247 | + except: |
| 248 | + pass |
| 249 | + else: |
| 250 | + # Handle regular special tokens |
| 251 | + if token_str: # Make sure it's not None or empty |
| 252 | + try: |
| 253 | + token_id = getattr(self.tokenizer, f"{key}_id", |
| 254 | + None) |
| 255 | + if token_id is not None and token_id >= 0: |
| 256 | + special_tokens[token_str] = token_id |
| 257 | + except: |
| 258 | + pass |
| 259 | + |
| 260 | + return {"special_tokens": special_tokens} |
| 261 | + |
| 262 | + def _get_tokenizer_attributes(self) -> Dict[str, Any]: |
| 263 | + """Get tokenizer attributes and capabilities.""" |
| 264 | + return { |
| 265 | + # HuggingFace attributes |
| 266 | + "model_max_length": |
| 267 | + getattr(self.tokenizer, 'model_max_length', None), |
| 268 | + "truncation_side": |
| 269 | + getattr(self.tokenizer, 'truncation_side', 'right'), |
| 270 | + "padding_side": |
| 271 | + getattr(self.tokenizer, 'padding_side', 'right'), |
| 272 | + "clean_up_tokenization_spaces": |
| 273 | + getattr(self.tokenizer, 'clean_up_tokenization_spaces', True), |
| 274 | + # Capabilities |
| 275 | + "supports_encoding": |
| 276 | + hasattr(self.tokenizer, 'encode') or callable(self.tokenizer), |
| 277 | + "supports_decoding": |
| 278 | + hasattr(self.tokenizer, 'decode'), |
| 279 | + } |
| 280 | + |
| 281 | + def _get_chat_template_info(self) -> Dict[str, Any]: |
| 282 | + """Get chat template information.""" |
| 283 | + template, source = self._find_chat_template() |
| 284 | + |
| 285 | + if not template: |
| 286 | + return { |
| 287 | + "has_chat_template": False, |
| 288 | + "chat_template": None, |
| 289 | + "chat_template_source": "none", |
| 290 | + "supports_system_message": False, |
| 291 | + "supports_tools": False, |
| 292 | + } |
| 293 | + |
| 294 | + return { |
| 295 | + "has_chat_template": |
| 296 | + True, |
| 297 | + "chat_template": |
| 298 | + template, |
| 299 | + "chat_template_source": |
| 300 | + source, |
| 301 | + "supports_system_message": |
| 302 | + "system" in template.lower(), |
| 303 | + "supports_tools": |
| 304 | + any(word in template.lower() for word in ["tool", "function"]), |
| 305 | + } |
| 306 | + |
| 307 | + def _detect_backend(self, tokenizer_type: str) -> str: |
| 308 | + """Detect tokenizer backend from type name.""" |
| 309 | + if isinstance(self.tokenizer, MistralTokenizer): |
| 310 | + return "mistral" |
| 311 | + elif "Fast" in tokenizer_type or getattr(self.tokenizer, 'is_fast', |
| 312 | + False): |
| 313 | + return "huggingface_fast" |
| 314 | + elif "SentencePiece" in tokenizer_type: |
| 315 | + return "sentencepiece" |
| 316 | + elif "Tiktoken" in tokenizer_type: |
| 317 | + return "tiktoken" |
| 318 | + elif "Cached" in tokenizer_type: |
| 319 | + return "cached" |
| 320 | + else: |
| 321 | + return "huggingface_slow" |
| 322 | + |
| 323 | + def _safe_token_to_string(self, token) -> Optional[str]: |
| 324 | + """Convert token to string safely.""" |
| 325 | + if hasattr(token, 'content'): |
| 326 | + return token.content |
| 327 | + elif token: |
| 328 | + return str(token) |
| 329 | + return None |
| 330 | + |
| 331 | + def _safe_get_token_id(self, token_str: str) -> Optional[int]: |
| 332 | + """Get token ID safely.""" |
| 333 | + try: |
| 334 | + token_id = self.tokenizer.convert_tokens_to_ids(token_str) |
| 335 | + return token_id if token_id >= 0 else None |
| 336 | + except: |
| 337 | + return None |
| 338 | + |
| 339 | + def _safe_get_token_id_by_attr(self, attr: str) -> Optional[int]: |
| 340 | + """Get token ID by attribute name safely.""" |
| 341 | + try: |
| 342 | + token_id = getattr(self.tokenizer, f"{attr}_id", None) |
| 343 | + return token_id if isinstance(token_id, |
| 344 | + int) and token_id >= 0 else None |
| 345 | + except: |
| 346 | + return None |
| 347 | + |
| 348 | + def _find_chat_template(self) -> Tuple[Optional[str], str]: |
| 349 | + """Find chat template from various sources.""" |
| 350 | + # Check tokenizer |
| 351 | + if hasattr(self.tokenizer, |
| 352 | + 'chat_template') and self.tokenizer.chat_template: |
| 353 | + return self.tokenizer.chat_template, "tokenizer" |
| 354 | + |
| 355 | + # Check underlying tokenizer |
| 356 | + if hasattr(self.tokenizer, 'tokenizer'): |
| 357 | + underlying = self.tokenizer.tokenizer |
| 358 | + if hasattr(underlying, |
| 359 | + 'chat_template') and underlying.chat_template: |
| 360 | + return underlying.chat_template, "underlying" |
| 361 | + |
| 362 | + # Check config |
| 363 | + if self.chat_template: |
| 364 | + return self.chat_template, "config" |
| 365 | + |
| 366 | + return None, "none" |
0 commit comments