From 489e5ba5ce202663c76c076f41a75f7e12de95f2 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Thu, 19 Jun 2025 13:10:52 +0000 Subject: [PATCH 01/36] updated Signed-off-by: rshaw@neuralmagic.com --- .../kv_connector/v1/nixl_connector.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index bdab4850d4c..d3da22546bf 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -921,6 +921,15 @@ def _read_blocks( # corresponding rank. With heterogeneous TP, fixing D>P, the D tp # workers will issue xfers to parts of the P worker remote kv caches. + # Sort block ids to ensure nixl can merge contiguous blocks. + start = time.perf_counter() + sorted_idx = sorted(range(len(local_block_ids)), + key=local_block_ids.__getitem__) + local_block_ids = [local_block_ids[i] for i in sorted_idx] + remote_block_ids = [remote_block_ids[i] for i in sorted_idx] + end = time.perf_counter() + print(f"REORDER took: {end - start}") + # Get descs ids. local_block_descs_ids: list[int] = [] remote_block_descs_ids: list[int] = [] @@ -965,10 +974,17 @@ def _read_blocks( remote_xfer_side_handle, remote_block_descs_ids, notif_msg=notif_id, + # skip_desc_merge=True, ) # Begin async xfer. + start = time.perf_counter() self.nixl_wrapper.transfer(handle) + end = time.perf_counter() + print(f"self.nixl_wrapper.transfer() TIME: {end-start}") + if end - start > 0.2: + print(f"{local_block_ids=}") + print(f"{remote_block_ids=}") # Use handle to check completion in future step(). # TODO (NickLucche) surface xfer elapsed time From 5b8c64dc77394bbfb827bc1455d2054166b903fd Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Thu, 19 Jun 2025 13:12:43 +0000 Subject: [PATCH 02/36] updated Signed-off-by: rshaw@neuralmagic.com --- .../kv_connector/v1/nixl_connector.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index d3da22546bf..053901c1cc4 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -921,14 +921,14 @@ def _read_blocks( # corresponding rank. With heterogeneous TP, fixing D>P, the D tp # workers will issue xfers to parts of the P worker remote kv caches. - # Sort block ids to ensure nixl can merge contiguous blocks. - start = time.perf_counter() - sorted_idx = sorted(range(len(local_block_ids)), - key=local_block_ids.__getitem__) - local_block_ids = [local_block_ids[i] for i in sorted_idx] - remote_block_ids = [remote_block_ids[i] for i in sorted_idx] - end = time.perf_counter() - print(f"REORDER took: {end - start}") + # # Sort block ids to ensure nixl can merge contiguous blocks. + # start = time.perf_counter() + # sorted_idx = sorted(range(len(local_block_ids)), + # key=local_block_ids.__getitem__) + # local_block_ids = [local_block_ids[i] for i in sorted_idx] + # remote_block_ids = [remote_block_ids[i] for i in sorted_idx] + # end = time.perf_counter() + # print(f"REORDER took: {end - start}") # Get descs ids. local_block_descs_ids: list[int] = [] @@ -974,7 +974,7 @@ def _read_blocks( remote_xfer_side_handle, remote_block_descs_ids, notif_msg=notif_id, - # skip_desc_merge=True, + skip_desc_merge=True, # this causes the issue to emerge immediately. ) # Begin async xfer. From 87bf6812b24f92aa34dfb3326c4b6db0ef442927 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Thu, 19 Jun 2025 13:15:50 +0000 Subject: [PATCH 03/36] updated Signed-off-by: rshaw@neuralmagic.com --- pd_justfile/Justfile | 110 ++++++++++++++++++++++++++++++++++ pd_justfile/port_allocator.py | 99 ++++++++++++++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 pd_justfile/Justfile create mode 100755 pd_justfile/port_allocator.py diff --git a/pd_justfile/Justfile b/pd_justfile/Justfile new file mode 100644 index 00000000000..92f7c6ab2d4 --- /dev/null +++ b/pd_justfile/Justfile @@ -0,0 +1,110 @@ +# Setting this allows creating a symlink to Justfile from another dir +set working-directory := "/home/rshaw/vllm/pd_examples/" + +# Needed for the proxy server +vllm-directory := "/home/rshaw/vllm/" + +# MODEL := "Qwen/Qwen3-0.6B" +MODEL := "meta-llama/Llama-3.1-8B-Instruct" + +port PORT: + @python port_allocator.py {{PORT}} + + +prefill: + VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5557) \ + CUDA_VISIBLE_DEVICES=0,1 \ + vllm serve {{MODEL}} \ + --port $(just port 8100) \ + --tensor-parallel-size 2 \ + --enforce-eager \ + --disable-log-requests \ + --block-size 128 \ + --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' + +prefill_b: + VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5558) \ + CUDA_VISIBLE_DEVICES=6 \ + vllm serve {{MODEL}} \ + --port $(just port 8200) \ + --enforce-eager \ + --disable-log-requests \ + --block-size 128 \ + --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' + +decode: + VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5559) \ + CUDA_VISIBLE_DEVICES=2,3,4,5 \ + vllm serve {{MODEL}} \ + --port $(just port 8300) \ + --tensor-parallel-size 2 \ + --enforce-eager \ + --disable-log-requests \ + --block-size 128 \ + --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' + +# proxy: +# python "{{vllm-directory}}tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" \ +# --port $(just port 8192) \ +# --prefiller-port $(just port 8100) $(just port 8200) \ +# --prefiller-host localhost localhost \ +# --decoder-port $(just port 8300) + +proxy: + python "{{vllm-directory}}tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" \ + --port $(just port 8192) \ + --prefiller-port $(just port 8100) \ + --prefiller-host localhost \ + --decoder-port $(just port 8300) + +send_request: + curl -X POST http://localhost:$(just port 8192)/v1/completions \ + -H "Content-Type: application/json" \ + -d '{ \ + "model": "{{MODEL}}", \ + "prompt": "XXRed Hat is the best open source company by far across Linux, K8s, and AI, and vLLM has the greatest community in open source AI software infrastructure. I love vLLM because", \ + "max_tokens": 150, \ + "temperature": 0.7 \ + }' + +benchmark NUM_PROMPTS: + python {{vllm-directory}}/benchmarks/benchmark_serving.py \ + --port $(just port 8192) \ + --model {{MODEL}} \ + --dataset-name random \ + --random-input-len 10000 \ + --random-output-len 100 \ + --num-prompts {{NUM_PROMPTS}} \ + --seed $(date +%s) \ + +benchmark_one INPUT_LEN: + python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ + --model {{MODEL}} \ + --input-len {{INPUT_LEN}} \ + --output-len 1 \ + --num-requests 10 \ + --seed $(date +%s) \ + --port $(just port 8192) + +benchmark_one_no_pd INPUT_LEN: + python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ + --model {{MODEL}} \ + --input-len {{INPUT_LEN}} \ + --output-len 1 \ + --num-requests 10 \ + --seed $(date +%s) \ + --port $(just port 8100) + +reset_prefix_cache: + curl -X POST http://localhost:$(just port 8100)/reset_prefix_cache && \ + curl -X POST http://localhost:$(just port 8200)/reset_prefix_cache + +eval: + lm_eval --model local-completions --tasks gsm8k \ + --model_args model={{MODEL}},base_url=http://127.0.0.1:$(just port 8192)/v1/completions,num_concurrent=100,max_retries=3,tokenized_requests=False \ + --limit 1000 + +eval_port PORT: + lm_eval --model local-completions --tasks gsm8k \ + --model_args model={{MODEL}},base_url=http://127.0.0.1:$(just port {{PORT}})/v1/completions,num_concurrent=100,max_retries=3,tokenized_requests=False \ + --limit 1000 \ No newline at end of file diff --git a/pd_justfile/port_allocator.py b/pd_justfile/port_allocator.py new file mode 100755 index 00000000000..1ac5486049f --- /dev/null +++ b/pd_justfile/port_allocator.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +""" +Port Allocation Utility + +A small utility that generates consistent port numbers based on username and default port +to avoid port collisions during development. +""" + +import argparse +import getpass +import hashlib + + +def allocate_port(base_port, + username=None, + project_name=None, + port_range=None): + """ + Allocate a port based on username and base port. + + Args: + base_port (int): The default port number for the service + username (str, optional): Username to use for hashing. Defaults to current user. + project_name (str, optional): Project name to make ports unique per project + port_range (tuple, optional): Range of valid ports (min, max). Defaults to (1024, 65535). + + Returns: + int: A port number derived from hashing the username and base port + """ + if not username: + username = getpass.getuser() + + if not port_range: + port_range = (1024, 65535) + + min_port, max_port = port_range + available_range = max_port - min_port + + # Create hash input from username, base_port and optional project_name + hash_input = f"{username}:{base_port}" + if project_name: + hash_input = f"{project_name}:{hash_input}" + + # Create a hash and convert to an integer in our port range + hash_obj = hashlib.md5(hash_input.encode()) + hash_int = int(hash_obj.hexdigest(), 16) + + # Generate a port within the valid range + port_offset = hash_int % available_range + allocated_port = min_port + port_offset + + # Check if it's too close to the base_port (within 10) + if abs(allocated_port - base_port) < 10: + # Add a small offset to avoid collisions with the default port + allocated_port = (allocated_port + 100) % available_range + min_port + + return allocated_port + + +def main(): + parser = argparse.ArgumentParser( + description='Allocate a consistent port based on username and base port' + ) + parser.add_argument('base_port', + type=int, + help='The default port number for the service') + parser.add_argument('--username', + '-u', + help='Username to use (defaults to current user)') + parser.add_argument('--project', + '-p', + help='Project name to make ports unique per project') + parser.add_argument('--env-var', + '-e', + help='Output as export ENV_VAR=port') + parser.add_argument('--min-port', + type=int, + default=1024, + help='Minimum port number') + parser.add_argument('--max-port', + type=int, + default=65535, + help='Maximum port number') + + args = parser.parse_args() + + port = allocate_port(args.base_port, + username=args.username, + project_name=args.project, + port_range=(args.min_port, args.max_port)) + + if args.env_var: + print(f"export {args.env_var}={port}") + else: + print(port) + + +if __name__ == "__main__": + main() From 852ee4b132164a4e8f9ddc8fbfa96796dca0248d Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Thu, 19 Jun 2025 13:16:50 +0000 Subject: [PATCH 04/36] updated Signed-off-by: rshaw@neuralmagic.com --- pd_justfile/Justfile | 25 ++++--------------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/pd_justfile/Justfile b/pd_justfile/Justfile index 92f7c6ab2d4..92433c6604e 100644 --- a/pd_justfile/Justfile +++ b/pd_justfile/Justfile @@ -13,20 +13,10 @@ port PORT: prefill: VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5557) \ - CUDA_VISIBLE_DEVICES=0,1 \ + CUDA_VISIBLE_DEVICES=0 \ vllm serve {{MODEL}} \ --port $(just port 8100) \ - --tensor-parallel-size 2 \ - --enforce-eager \ - --disable-log-requests \ - --block-size 128 \ - --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' - -prefill_b: - VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5558) \ - CUDA_VISIBLE_DEVICES=6 \ - vllm serve {{MODEL}} \ - --port $(just port 8200) \ + --tensor-parallel-size 1 \ --enforce-eager \ --disable-log-requests \ --block-size 128 \ @@ -34,22 +24,15 @@ prefill_b: decode: VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5559) \ - CUDA_VISIBLE_DEVICES=2,3,4,5 \ + CUDA_VISIBLE_DEVICES=1 \ vllm serve {{MODEL}} \ --port $(just port 8300) \ - --tensor-parallel-size 2 \ + --tensor-parallel-size 1 \ --enforce-eager \ --disable-log-requests \ --block-size 128 \ --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' -# proxy: -# python "{{vllm-directory}}tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" \ -# --port $(just port 8192) \ -# --prefiller-port $(just port 8100) $(just port 8200) \ -# --prefiller-host localhost localhost \ -# --decoder-port $(just port 8300) - proxy: python "{{vllm-directory}}tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" \ --port $(just port 8192) \ From 9eaa81b9c96783b124c9bc86b3813b12d2f181ec Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Thu, 19 Jun 2025 13:18:39 +0000 Subject: [PATCH 05/36] updated Signed-off-by: rshaw@neuralmagic.com --- benchmarks/benchmark_one_concurrent_req.py | 387 +++++++++++++++++++++ 1 file changed, 387 insertions(+) create mode 100644 benchmarks/benchmark_one_concurrent_req.py diff --git a/benchmarks/benchmark_one_concurrent_req.py b/benchmarks/benchmark_one_concurrent_req.py new file mode 100644 index 00000000000..26c330d9a1f --- /dev/null +++ b/benchmarks/benchmark_one_concurrent_req.py @@ -0,0 +1,387 @@ +# SPDX-License-Identifier: Apache-2.0 +import argparse +import asyncio +import logging +import random +import time +from dataclasses import dataclass +from typing import Optional + +import aiohttp # Import aiohttp +import numpy as np +from tqdm import tqdm + +from backend_request_func import RequestFuncInput, RequestFuncOutput +from benchmark_dataset import RandomDataset, SampleRequest + +try: + from vllm.transformers_utils.tokenizer import get_tokenizer +except ImportError: + from backend_request_func import get_tokenizer + +logger = logging.getLogger(__name__) + + +@dataclass +class BenchmarkMetrics: + completed: int + total_input: int + total_output: int + mean_ttft_ms: float + median_ttft_ms: float + std_ttft_ms: float + percentiles_ttft_ms: list[tuple[float, float]] + mean_itl_ms: float + median_itl_ms: float + std_itl_ms: float + percentiles_itl_ms: list[tuple[float, float]] + mean_e2el_ms: float + median_e2el_ms: float + std_e2el_ms: float + percentiles_e2el_ms: list[tuple[float, float]] + + +async def reset_cache(reset_url: str): + """Sends a POST request to reset the prefix cache.""" + logger.debug("Resetting prefix cache at %s", reset_url) + try: + async with ( + aiohttp.ClientSession() as session, + session.post(reset_url) as response, + ): + response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) + logger.debug("Prefix cache reset successful: %s", response.status) + except aiohttp.ClientConnectorError as e: + logger.error("Failed to connect to cache reset endpoint %s: %s}", reset_url, e) + except aiohttp.ClientResponseError as e: + logger.error( + "Cache reset request failed with status %s: %s", e.status, e.message + ) + except Exception as e: + logger.error("An unexpected error occurred during cache reset: %s", e) + + +async def sequential_benchmark( + backend: str, + api_url: str, + model_id: str, + tokenizer, + input_requests: list[SampleRequest], + request_func, + selected_percentiles: list[float], + cache_reset_url: Optional[str] = None, +): + """ + Benchmark that processes requests sequentially, waiting for each to complete + before starting the next one. Resets prefix cache between requests. + """ + outputs = [] + + pbar = tqdm(total=len(input_requests)) + + # Small request to force a forward pass. + # Used for resetting the prefix cache. + # dummy_req_input = RequestFuncInput( + # model=model_id, + # prompt="0", + # api_url=api_url, + # prompt_len=1, + # output_len=2, + # ) + + # print("Starting initial single prompt test run...") + # test_output = await request_func(request_func_input=dummy_req_input) + # if not test_output.success: + # raise ValueError( + # "Initial test run failed - Please check your configuration. " + # "Error: %s", test_output.error) + # else: + # print("Initial test run completed. Starting sequential benchmark...") + + benchmark_start_time = time.perf_counter() + + # Process requests sequentially + for request in input_requests: + prompt, prompt_len, output_len = ( + request.prompt, + request.prompt_len, + request.expected_output_len, + ) + + logger.info("Sending request with len %s", request.prompt_len) + logger.debug('Request str: "%s"', request.prompt[:50]) + request_start_time = time.perf_counter() + + # print(f"{prompt=}") + request_func_input = RequestFuncInput( + model=model_id, + prompt=prompt, + api_url=api_url, + prompt_len=prompt_len, + output_len=output_len, + ) + + output = await request_func(request_func_input=request_func_input) + + request_end_time = time.perf_counter() + # Add timing information + if output.success and not hasattr(output, "latency"): + output.latency = request_end_time - request_start_time + logger.info("Finished request with latency %.4f s", output.latency) + + outputs.append(output) + pbar.update(1) + + # Reset prefix cache if configured, except after the very last request + if cache_reset_url and False: + await request_func(request_func_input=dummy_req_input) + await reset_cache(cache_reset_url) + + pbar.close() + + benchmark_duration = time.perf_counter() - benchmark_start_time + + # Calculate metrics + metrics = calculate_metrics( + input_requests=input_requests, + outputs=outputs, + dur_s=benchmark_duration, + tokenizer=tokenizer, + selected_percentiles=selected_percentiles, + ) + + print_results(metrics, benchmark_duration) + + result = { + "duration": benchmark_duration, + "completed": metrics.completed, + "total_input_tokens": metrics.total_input, + "total_output_tokens": metrics.total_output, + "input_lens": [request.prompt_len for request in input_requests], + "output_lens": [ + output.output_tokens if output.success else 0 for output in outputs + ], + "ttfts": [output.ttft for output in outputs if output.success], + "itls": [output.itl for output in outputs if output.success], + "generated_texts": [ + output.generated_text for output in outputs if output.success + ], + "errors": [output.error for output in outputs if not output.success], + } + + # Add summary statistics + for stat_name in ["ttft", "itl", "e2el"]: + for metric_name in ["mean", "median", "std"]: + result[f"{metric_name}_{stat_name}_ms"] = getattr( + metrics, f"{metric_name}_{stat_name}_ms" + ) + + for p, value in getattr(metrics, f"percentiles_{stat_name}_ms"): + p_word = str(int(p)) if int(p) == p else str(p) + result[f"p{p_word}_{stat_name}_ms"] = value + + return result + + +def calculate_metrics( + input_requests: list[SampleRequest], + outputs: list[RequestFuncOutput], + dur_s: float, + tokenizer, + selected_percentiles: list[float], +) -> BenchmarkMetrics: + """Calculate benchmark metrics from results.""" + total_input = 0 + completed = 0 + total_output = 0 + ttfts = [] + itls = [] + e2els = [] + + for i, output in enumerate(outputs): + if output.success: + output_len = output.output_tokens + + if not output_len: + # Use tokenizer to count output tokens if not provided + output_len = len( + tokenizer(output.generated_text, add_special_tokens=False).input_ids + ) + + total_output += output_len + total_input += input_requests[i].prompt_len + + if hasattr(output, "ttft") and output.ttft is not None: + ttfts.append(output.ttft) + + if hasattr(output, "itl") and output.itl: + # Ensure itl is a list of floats + if isinstance(output.itl, list): + itls.extend(output.itl) + else: + logger.warning( + "Expected list for ITL but got %s. Appending as is.", + type(output.itl), + ) + itls.append(output.itl) + + if hasattr(output, "latency") and output.latency is not None: + e2els.append(output.latency) + + completed += 1 + + return BenchmarkMetrics( + completed=completed, + total_input=total_input, + total_output=total_output, + mean_ttft_ms=np.mean(ttfts or [0]) * 1000, + median_ttft_ms=np.median(ttfts or [0]) * 1000, + std_ttft_ms=np.std(ttfts or [0]) * 1000, + percentiles_ttft_ms=[ + (p, np.percentile(ttfts or [0], p) * 1000) for p in selected_percentiles + ], + mean_itl_ms=np.mean(itls or [0]) * 1000, + median_itl_ms=np.median(itls or [0]) * 1000, + std_itl_ms=np.std(itls or [0]) * 1000, + percentiles_itl_ms=[ + (p, np.percentile(itls or [0], p) * 1000) for p in selected_percentiles + ], + mean_e2el_ms=np.mean(e2els or [0]) * 1000, + median_e2el_ms=np.median(e2els or [0]) * 1000, + std_e2el_ms=np.std(e2els or [0]) * 1000, + percentiles_e2el_ms=[ + (p, np.percentile(e2els or [0], p) * 1000) for p in selected_percentiles + ], + ) + + +def print_results(metrics: BenchmarkMetrics, benchmark_duration: float): + """Print benchmark results in a formatted way.""" + print("{s:{c}^{n}}".format(s=" Sequential Benchmark Result ", n=60, c="=")) + print("{:<40} {:<10}".format("Successful requests:", metrics.completed)) + print("{:<40} {:<10.2f}".format("Benchmark duration (s):", benchmark_duration)) + print("{:<40} {:<10}".format("Total input tokens:", metrics.total_input)) + print("{:<40} {:<10}".format("Total generated tokens:", metrics.total_output)) + + def print_metric_stats(metric_name, header): + print("{s:{c}^{n}}".format(s=header, n=60, c="-")) + print( + "{:<40} {:<10.2f}".format( + f"Mean {metric_name} (ms):", + getattr(metrics, f"mean_{metric_name.lower()}_ms"), + ) + ) + print( + "{:<40} {:<10.2f}".format( + f"Median {metric_name} (ms):", + getattr(metrics, f"median_{metric_name.lower()}_ms"), + ) + ) + + for p, value in getattr(metrics, f"percentiles_{metric_name.lower()}_ms"): + p_word = str(int(p)) if int(p) == p else str(p) + print("{:<40} {:<10.2f}".format(f"P{p_word} {metric_name} (ms):", value)) + + print_metric_stats("TTFT", "Time to First Token") + print_metric_stats("ITL", "Inter-token Latency") + print_metric_stats("E2EL", "End-to-end Latency") + print("=" * 60) + + +async def main_async(args): + # Import needed functions based on your setup + from backend_request_func import ASYNC_REQUEST_FUNCS + + backend = args.backend + model_id = args.model + tokenizer_id = args.tokenizer if args.tokenizer is not None else args.model + + # Set up API URL + if args.base_url is not None: + api_url = f"{args.base_url}{args.endpoint}" + else: + api_url = f"http://{args.host}:{args.port}{args.endpoint}" + + # Set up Cache Reset URL + cache_reset_url = f"http://{args.host}:{args.port}/reset_prefix_cache" + logger.info("Prefix cache reset configured at: %s", cache_reset_url) + + # Get tokenizer + tokenizer = get_tokenizer(tokenizer_id, trust_remote_code=args.trust_remote_code) + + # Get request function + if backend in ASYNC_REQUEST_FUNCS: + request_func = ASYNC_REQUEST_FUNCS[backend] + else: + raise ValueError(f"Unknown backend: {backend}") + + input_requests = RandomDataset().sample( + tokenizer=tokenizer, + num_requests=args.num_requests, + prefix_len=0, + input_len=args.input_len, + output_len=args.output_len, + range_ratio=0.0, + ) + + # Run benchmark + result = await sequential_benchmark( + backend=backend, + api_url=api_url, + model_id=model_id, + tokenizer=tokenizer, + input_requests=input_requests, + request_func=request_func, + selected_percentiles=[50, 90, 95, 99], + cache_reset_url=cache_reset_url, + ) + + return result + + +def main(args): + print(args) + random.seed(args.seed) + np.random.seed(args.seed) + + asyncio.run(main_async(args)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Sequential benchmark for LLM serving") + parser.add_argument( + "--backend", type=str, default="vllm", help="Backend to use for requests" + ) + parser.add_argument( + "--base-url", + type=str, + default=None, + help="Server base URL (overrides --host and --port)", + ) + parser.add_argument("--host", type=str, default="127.0.0.1") + parser.add_argument("--port", type=int, default=8000) + parser.add_argument( + "--endpoint", type=str, default="/v1/completions", help="API endpoint" + ) + parser.add_argument("--model", type=str, required=True, help="Name of the model") + parser.add_argument( + "--tokenizer", type=str, help="Name of the tokenizer (defaults to model name)" + ) + parser.add_argument( + "--num-requests", type=int, default=100, help="Number of requests to process" + ) + parser.add_argument( + "--input-len", type=int, default=128, help="Input len for generated prompts" + ) + parser.add_argument( + "--output-len", type=int, default=None, help="Override output len for requests" + ) + parser.add_argument("--seed", type=int, default=42) + parser.add_argument( + "--trust-remote-code", + action="store_true", + help="Trust remote code from HuggingFace", + ) + + args = parser.parse_args() + main(args) From fea0731cf45d4f0cf1622cde6ada570d8da4931f Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 27 Jun 2025 15:11:23 +0000 Subject: [PATCH 06/36] update Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 053901c1cc4..5283c3e206e 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -974,17 +974,10 @@ def _read_blocks( remote_xfer_side_handle, remote_block_descs_ids, notif_msg=notif_id, - skip_desc_merge=True, # this causes the issue to emerge immediately. ) # Begin async xfer. - start = time.perf_counter() self.nixl_wrapper.transfer(handle) - end = time.perf_counter() - print(f"self.nixl_wrapper.transfer() TIME: {end-start}") - if end - start > 0.2: - print(f"{local_block_ids=}") - print(f"{remote_block_ids=}") # Use handle to check completion in future step(). # TODO (NickLucche) surface xfer elapsed time From 5d2eac70e77d081f5a0ddf416abf48c9eef58fb4 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 27 Jun 2025 15:12:03 +0000 Subject: [PATCH 07/36] update Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 5283c3e206e..bdab4850d4c 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -921,15 +921,6 @@ def _read_blocks( # corresponding rank. With heterogeneous TP, fixing D>P, the D tp # workers will issue xfers to parts of the P worker remote kv caches. - # # Sort block ids to ensure nixl can merge contiguous blocks. - # start = time.perf_counter() - # sorted_idx = sorted(range(len(local_block_ids)), - # key=local_block_ids.__getitem__) - # local_block_ids = [local_block_ids[i] for i in sorted_idx] - # remote_block_ids = [remote_block_ids[i] for i in sorted_idx] - # end = time.perf_counter() - # print(f"REORDER took: {end - start}") - # Get descs ids. local_block_descs_ids: list[int] = [] remote_block_descs_ids: list[int] = [] From f9617c75ad8a077ca9492aa60c5687e637a855c5 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 27 Jun 2025 18:48:05 +0000 Subject: [PATCH 08/36] updated Signed-off-by: rshaw@neuralmagic.com --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index bdab4850d4c..aff27c45559 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -968,7 +968,10 @@ def _read_blocks( ) # Begin async xfer. + start = time.perf_counter() self.nixl_wrapper.transfer(handle) + end = time.perf_counter() + logger.info("======== LAUNCH TIME: %s ========", end - start) # Use handle to check completion in future step(). # TODO (NickLucche) surface xfer elapsed time From ee2a4b0889d833a87139071296c0ef47ff587890 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 01:11:22 +0000 Subject: [PATCH 09/36] updated Signed-off-by: rshaw@neuralmagic.com --- .../kv_connector/v1/nixl_connector.py | 62 ++++++++++++------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index aff27c45559..46d7c157ef5 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -36,6 +36,7 @@ Transfer = tuple[int, float] # (xfer_handle, start_time) GET_META_MSG = b"get_meta_msg" +NIXL_MAX_DESCS = 1000 logger = init_logger(__name__) @@ -371,8 +372,8 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self._registered_descs: list[Any] = [] # In progress transfers. - # [req_id -> list[handle]] - self._recving_transfers = defaultdict[str, list[Transfer]](list) + # [req_id -> list[handles], agent_name, notif_id] + self._recving_transfers: dict[str, tuple[list[int], str, str]] = {} # Complete transfer tracker. Used by the rank 0 to track finished # transactions on ranks 1 to N-1. @@ -826,7 +827,8 @@ def _get_new_notifs(self) -> set[str]: return notified_req_ids def _pop_done_transfers( - self, transfers: dict[str, list[tuple[int, float]]]) -> set[str]: + self, transfers: dict[str, tuple[list[int], str, + str]]) -> set[str]: """ Pop completed xfers by checking for DONE state. Args: @@ -835,18 +837,29 @@ def _pop_done_transfers( set of req_ids that have all done xfers """ done_req_ids: set[str] = set() - for req_id, handles in list(transfers.items()): - for handle, xfer_stime in handles: + for req_id, (handles, agent_name, notif_id) in list(transfers.items()): + new_handles = [] + for handle in handles: xfer_state = self.nixl_wrapper.check_xfer_state(handle) if xfer_state == "DONE": self.nixl_wrapper.release_xfer_handle(handle) - done_req_ids.add(req_id) - del transfers[req_id] elif xfer_state == "PROC": - continue + new_handles.append(handle) else: raise RuntimeError("Transfer failed with state %s", xfer_state) + + # Done. + if len(new_handles) == 0: + start = time.perf_counter() + self.nixl_wrapper.send_notif(agent_name, notif_id) + del transfers[req_id] + done_req_ids.add(req_id) + end = time.perf_counter() + print(f"========= SEND NOTIF TIME: {end - start} =========") + else: + transfers[req_id] = (new_handles, notif_id, agent_name) + return done_req_ids def start_load_kv(self, metadata: NixlConnectorMetadata): @@ -958,25 +971,32 @@ def _read_blocks( assert len(local_block_descs_ids) == len(remote_block_descs_ids) # Prepare transfer with Nixl. - handle = self.nixl_wrapper.make_prepped_xfer( - "READ", - local_xfer_side_handle, - local_block_descs_ids, - remote_xfer_side_handle, - remote_block_descs_ids, - notif_msg=notif_id, - ) + CHUNK_SIZE = 1000 + handles = [] + for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): + handles.append( + self.nixl_wrapper.make_prepped_xfer( + "READ", + local_xfer_side_handle, + local_block_descs_ids[i:i + CHUNK_SIZE], + remote_xfer_side_handle, + remote_block_descs_ids[i:i + CHUNK_SIZE], + skip_desc_merge=True, + )) # Begin async xfer. start = time.perf_counter() - self.nixl_wrapper.transfer(handle) + # for handle in handles: + # self.nixl_wrapper.transfer(handle) + self.nixl_wrapper.transfer_batched(handles) end = time.perf_counter() logger.info("======== LAUNCH TIME: %s ========", end - start) - # Use handle to check completion in future step(). - # TODO (NickLucche) surface xfer elapsed time - self._recving_transfers[request_id].append( - (handle, time.perf_counter())) + # Keep track of ongoing transfers. + remote_rank = self.tp_rank // tp_ratio + agent_name = self._remote_agents[dst_engine_id][remote_rank] + assert request_id not in self._recving_transfers + self._recving_transfers[request_id] = (handles, agent_name, notif_id) def _get_block_descs_ids(self, engine_id: str, From dedb1a5424c3bd92f989fff5f4282325134fc6ed Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 01:30:06 +0000 Subject: [PATCH 10/36] updated Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 46d7c157ef5..47433ae0dab 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -330,7 +330,10 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self.block_size = vllm_config.cache_config.block_size # Agent. - self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None) + self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), + None, + num_workers=None, + num_shared_workers=16) # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) @@ -986,9 +989,9 @@ def _read_blocks( # Begin async xfer. start = time.perf_counter() - # for handle in handles: - # self.nixl_wrapper.transfer(handle) - self.nixl_wrapper.transfer_batched(handles) + for handle in handles: + self.nixl_wrapper.transfer(handle) + # self.nixl_wrapper.transfer_batched(handles) end = time.perf_counter() logger.info("======== LAUNCH TIME: %s ========", end - start) From c481d30c17d3b51f8a3d4a9775e6d52a8eab8bbd Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 01:39:15 +0000 Subject: [PATCH 11/36] update Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 47433ae0dab..c5ff321cd9f 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -333,7 +333,8 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None, num_workers=None, - num_shared_workers=16) + # num_shared_workers=16) # setting this > 0 causes the notifs to be recved + num_shared_workers=None) # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) @@ -819,6 +820,7 @@ def _get_new_notifs(self) -> set[str]: """ notified_req_ids: set[str] = set() for notifs in self.nixl_wrapper.get_new_notifs().values(): + print(f"{notifs=}") for notif in notifs: req_id, tp_ratio = notif.decode("utf-8").rsplit(":", 1) self.consumer_notification_counts_by_req[req_id] += 1 @@ -853,6 +855,7 @@ def _pop_done_transfers( xfer_state) # Done. + print(f"{len(new_handles)=}") if len(new_handles) == 0: start = time.perf_counter() self.nixl_wrapper.send_notif(agent_name, notif_id) From 8283d7b85c7d8f485e6d02864fc24fc9334b2ad6 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 01:45:03 +0000 Subject: [PATCH 12/36] updated Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index c5ff321cd9f..9adb561a180 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -333,8 +333,8 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None, num_workers=None, - # num_shared_workers=16) # setting this > 0 causes the notifs to be recved - num_shared_workers=None) + num_shared_workers=16) # setting this > 0 causes the notifs to be recved + # num_shared_workers=None) # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) @@ -864,7 +864,7 @@ def _pop_done_transfers( end = time.perf_counter() print(f"========= SEND NOTIF TIME: {end - start} =========") else: - transfers[req_id] = (new_handles, notif_id, agent_name) + transfers[req_id] = (new_handles, agent_name, notif_id) return done_req_ids @@ -977,7 +977,7 @@ def _read_blocks( assert len(local_block_descs_ids) == len(remote_block_descs_ids) # Prepare transfer with Nixl. - CHUNK_SIZE = 1000 + CHUNK_SIZE = 100 handles = [] for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): handles.append( From b9be6fd35a9639bd91c4a8cde0a767c6736baa12 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 01:51:37 +0000 Subject: [PATCH 13/36] updated to make send_notif work Signed-off-by: rshaw@neuralmagic.com --- pd_justfile/Justfile | 2 +- .../kv_transfer/kv_connector/v1/nixl_connector.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pd_justfile/Justfile b/pd_justfile/Justfile index 92433c6604e..5641967b4ea 100644 --- a/pd_justfile/Justfile +++ b/pd_justfile/Justfile @@ -12,7 +12,7 @@ port PORT: prefill: - VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5557) \ + VLLM_NIXL_SIDE_CHANNEL_PORT=5557 \ CUDA_VISIBLE_DEVICES=0 \ vllm serve {{MODEL}} \ --port $(just port 8100) \ diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 9adb561a180..ea01196fbc5 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -330,11 +330,16 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self.block_size = vllm_config.cache_config.block_size # Agent. + import os + num_workers = 16 + # setting num workers on the prefiller causes the notifs to not be recved??? + if os.getenv("VLLM_NIXL_SIDE_CHANNEL_PORT", "") == "5557": + num_workers = None + print(f"NUM_WORKERS: {num_workers=}") self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None, - num_workers=None, - num_shared_workers=16) # setting this > 0 causes the notifs to be recved - # num_shared_workers=None) + num_workers=num_workers, + num_shared_workers=None) # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) @@ -820,6 +825,7 @@ def _get_new_notifs(self) -> set[str]: """ notified_req_ids: set[str] = set() for notifs in self.nixl_wrapper.get_new_notifs().values(): + # WE GET NOTHING FROM HERE IF NUM_WORKERS > 0. print(f"{notifs=}") for notif in notifs: req_id, tp_ratio = notif.decode("utf-8").rsplit(":", 1) From cda2f2c4536dc4c37df54160ae9e15a2c7ee6799 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 01:54:43 +0000 Subject: [PATCH 14/36] updated Signed-off-by: rshaw@neuralmagic.com --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index ea01196fbc5..82ae98db921 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -333,6 +333,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): import os num_workers = 16 # setting num workers on the prefiller causes the notifs to not be recved??? + # this is a hack to make sure we set num workers on the prefiller to 1. if os.getenv("VLLM_NIXL_SIDE_CHANNEL_PORT", "") == "5557": num_workers = None print(f"NUM_WORKERS: {num_workers=}") From 5d8b6653664dbb8585c37fdbd25eb5042523355e Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 01:59:02 +0000 Subject: [PATCH 15/36] updated Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 82ae98db921..c1f1db7e567 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -340,7 +340,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None, num_workers=num_workers, - num_shared_workers=None) + num_shared_workers=None) # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) @@ -999,9 +999,12 @@ def _read_blocks( # Begin async xfer. start = time.perf_counter() - for handle in handles: - self.nixl_wrapper.transfer(handle) - # self.nixl_wrapper.transfer_batched(handles) + # IT WORKS WITH THIS: + # for handle in handles: + # self.nixl_wrapper.transfer(handle) + + # IT FAILS WITH THIS: + self.nixl_wrapper.transfer_batched(handles) end = time.perf_counter() logger.info("======== LAUNCH TIME: %s ========", end - start) From 17546dc79f5089565c22b6431e7324a998cc1461 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 30 Jun 2025 14:40:18 +0530 Subject: [PATCH 16/36] Add threading for load-balancing to different workers --- .../kv_connector/v1/nixl_connector.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index c1f1db7e567..13c7806dbb7 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -13,6 +13,7 @@ import msgspec import torch import zmq +from concurrent.futures import ThreadPoolExecutor, as_completed from vllm import envs from vllm.attention.selector import backend_name_to_enum, get_attn_backend @@ -986,16 +987,23 @@ def _read_blocks( # Prepare transfer with Nixl. CHUNK_SIZE = 100 handles = [] - for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): - handles.append( - self.nixl_wrapper.make_prepped_xfer( + futures = [] + with ThreadPoolExecutor() as executor: + for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): + future = executor.submit( + self.nixl_wrapper.make_prepped_xfer, "READ", local_xfer_side_handle, local_block_descs_ids[i:i + CHUNK_SIZE], remote_xfer_side_handle, remote_block_descs_ids[i:i + CHUNK_SIZE], skip_desc_merge=True, - )) + ) + futures.append(future) + + for future in futures: + handles.append(future.result()) + # Begin async xfer. start = time.perf_counter() From c4b9b2e68214bb0553b9ae90a6d0640980c66093 Mon Sep 17 00:00:00 2001 From: Pravein Govindan Kannan Date: Mon, 30 Jun 2025 15:03:52 +0530 Subject: [PATCH 17/36] Increase chunk size to reduce no. of threads --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 13c7806dbb7..c6c86d74ab5 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -985,7 +985,7 @@ def _read_blocks( assert len(local_block_descs_ids) == len(remote_block_descs_ids) # Prepare transfer with Nixl. - CHUNK_SIZE = 100 + CHUNK_SIZE = 1000 handles = [] futures = [] with ThreadPoolExecutor() as executor: From f015919fc8efec4c8b8d94374afeda98a1f0a81b Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 12:25:48 +0000 Subject: [PATCH 18/36] add comment about hack Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index c6c86d74ab5..745ef415029 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -7,13 +7,13 @@ import uuid from collections import defaultdict from collections.abc import Iterator +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Optional import msgspec import torch import zmq -from concurrent.futures import ThreadPoolExecutor, as_completed from vllm import envs from vllm.attention.selector import backend_name_to_enum, get_attn_backend @@ -333,7 +333,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): # Agent. import os num_workers = 16 - # setting num workers on the prefiller causes the notifs to not be recved??? + # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. if os.getenv("VLLM_NIXL_SIDE_CHANNEL_PORT", "") == "5557": num_workers = None @@ -988,6 +988,10 @@ def _read_blocks( CHUNK_SIZE = 1000 handles = [] futures = [] + # NOTE: this is a hack to make make_prepped_xfer into threads so that + # different workers are allocated for each chuck. Without this change, + # nixl was allocating the same worker (0) for all the chunks and the + # overall launch time was >300 ms. with ThreadPoolExecutor() as executor: for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): future = executor.submit( @@ -1004,14 +1008,10 @@ def _read_blocks( for future in futures: handles.append(future.result()) - # Begin async xfer. start = time.perf_counter() - # IT WORKS WITH THIS: # for handle in handles: # self.nixl_wrapper.transfer(handle) - - # IT FAILS WITH THIS: self.nixl_wrapper.transfer_batched(handles) end = time.perf_counter() logger.info("======== LAUNCH TIME: %s ========", end - start) From 569de248cb5b6149ca65b2a4ced751096752ccb4 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 12:36:19 +0000 Subject: [PATCH 19/36] cleanup Signed-off-by: rshaw@neuralmagic.com --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 745ef415029..618630426d0 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -827,8 +827,6 @@ def _get_new_notifs(self) -> set[str]: """ notified_req_ids: set[str] = set() for notifs in self.nixl_wrapper.get_new_notifs().values(): - # WE GET NOTHING FROM HERE IF NUM_WORKERS > 0. - print(f"{notifs=}") for notif in notifs: req_id, tp_ratio = notif.decode("utf-8").rsplit(":", 1) self.consumer_notification_counts_by_req[req_id] += 1 From 491347cbc39311664fbbbf3afd2b935e7d4c18d2 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 13:42:36 +0000 Subject: [PATCH 20/36] updated Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 618630426d0..b16f3d6ce3c 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -861,7 +861,6 @@ def _pop_done_transfers( xfer_state) # Done. - print(f"{len(new_handles)=}") if len(new_handles) == 0: start = time.perf_counter() self.nixl_wrapper.send_notif(agent_name, notif_id) @@ -1008,9 +1007,9 @@ def _read_blocks( # Begin async xfer. start = time.perf_counter() - # for handle in handles: - # self.nixl_wrapper.transfer(handle) - self.nixl_wrapper.transfer_batched(handles) + for handle in handles: + self.nixl_wrapper.transfer(handle) + # self.nixl_wrapper.transfer_batched(handles) end = time.perf_counter() logger.info("======== LAUNCH TIME: %s ========", end - start) From 6babd393662577125b28f530eab24e53c3753faf Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 19:30:14 +0000 Subject: [PATCH 21/36] print out Signed-off-by: rshaw@neuralmagic.com --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index b16f3d6ce3c..f297d44ccf8 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -717,6 +717,8 @@ def add_remote_agent(self, # Create dst descs and xfer side handles. TP workers have same #blocks. if engine_id in self.dst_num_blocks: + print(f"{self.dst_num_blocks[engine_id]=}") + print(f"{nixl_agent_meta.num_blocks=}") assert self.dst_num_blocks[engine_id] == nixl_agent_meta.num_blocks else: self.dst_num_blocks[engine_id] = nixl_agent_meta.num_blocks From 128eca2ce3bdfd455a25a01e5a768c6ed5350c8b Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 19:48:33 +0000 Subject: [PATCH 22/36] update for use batched Signed-off-by: rshaw@neuralmagic.com --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index f297d44ccf8..325c54eca07 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -38,6 +38,9 @@ Transfer = tuple[int, float] # (xfer_handle, start_time) GET_META_MSG = b"get_meta_msg" NIXL_MAX_DESCS = 1000 +import os + +USE_BATCHED = os.getenv("USE_BATCHED", "1") == "1" logger = init_logger(__name__) @@ -717,8 +720,6 @@ def add_remote_agent(self, # Create dst descs and xfer side handles. TP workers have same #blocks. if engine_id in self.dst_num_blocks: - print(f"{self.dst_num_blocks[engine_id]=}") - print(f"{nixl_agent_meta.num_blocks=}") assert self.dst_num_blocks[engine_id] == nixl_agent_meta.num_blocks else: self.dst_num_blocks[engine_id] = nixl_agent_meta.num_blocks @@ -1009,9 +1010,11 @@ def _read_blocks( # Begin async xfer. start = time.perf_counter() - for handle in handles: - self.nixl_wrapper.transfer(handle) - # self.nixl_wrapper.transfer_batched(handles) + if USE_BATCHED: + self.nixl_wrapper.transfer_batched(handles) + else: + for handle in handles: + self.nixl_wrapper.transfer(handle) end = time.perf_counter() logger.info("======== LAUNCH TIME: %s ========", end - start) From 70b76554d1667528110a56812e00a26406fdb9cd Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 20:01:56 +0000 Subject: [PATCH 23/36] updated Signed-off-by: rshaw@neuralmagic.com --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 325c54eca07..cdeaf18cbd4 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -338,7 +338,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): num_workers = 16 # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. - if os.getenv("VLLM_NIXL_SIDE_CHANNEL_PORT", "") == "5557": + if os.getenv("VLLM_IS_PREFILL", "0") == "0": num_workers = None print(f"NUM_WORKERS: {num_workers=}") self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), From 15bc311d28880da775859ee29628b24b8943d33a Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Mon, 30 Jun 2025 20:09:12 +0000 Subject: [PATCH 24/36] updated Signed-off-by: rshaw@neuralmagic.com --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index cdeaf18cbd4..c0039a66876 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -338,7 +338,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): num_workers = 16 # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. - if os.getenv("VLLM_IS_PREFILL", "0") == "0": + if os.getenv("VLLM_IS_PREFILL", "0") == "1": num_workers = None print(f"NUM_WORKERS: {num_workers=}") self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), From 1172b70b7953d7379827762b68e86bf367eae69a Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 1 Jul 2025 00:16:07 +0000 Subject: [PATCH 25/36] updated vllm Signed-off-by: rshaw@neuralmagic.com --- .../kv_connector/v1/nixl_connector.py | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index c0039a66876..0abfa489e13 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -343,8 +343,8 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): print(f"NUM_WORKERS: {num_workers=}") self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None, - num_workers=num_workers, - num_shared_workers=None) + num_workers=None, + num_shared_workers=num_workers) # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) @@ -987,32 +987,28 @@ def _read_blocks( # Prepare transfer with Nixl. CHUNK_SIZE = 1000 handles = [] - futures = [] # NOTE: this is a hack to make make_prepped_xfer into threads so that # different workers are allocated for each chuck. Without this change, # nixl was allocating the same worker (0) for all the chunks and the # overall launch time was >300 ms. - with ThreadPoolExecutor() as executor: - for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): - future = executor.submit( - self.nixl_wrapper.make_prepped_xfer, - "READ", - local_xfer_side_handle, - local_block_descs_ids[i:i + CHUNK_SIZE], - remote_xfer_side_handle, - remote_block_descs_ids[i:i + CHUNK_SIZE], - skip_desc_merge=True, - ) - futures.append(future) - - for future in futures: - handles.append(future.result()) + for i in range(0, len(local_block_descs_ids), CHUNK_SIZE): + handle = self.nixl_wrapper.make_prepped_xfer( + "READ", + local_xfer_side_handle, + local_block_descs_ids[i:i + CHUNK_SIZE], + remote_xfer_side_handle, + remote_block_descs_ids[i:i + CHUNK_SIZE], + skip_desc_merge=True, + ) + handles.append(handle) # Begin async xfer. start = time.perf_counter() if USE_BATCHED: + print("BATCHED!") self.nixl_wrapper.transfer_batched(handles) else: + print("NON BATCHED!") for handle in handles: self.nixl_wrapper.transfer(handle) end = time.perf_counter() From 56939c835d42accacd6cab7c4fbba9b13d62c5fc Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 1 Jul 2025 01:34:46 +0000 Subject: [PATCH 26/36] updated Signed-off-by: rshaw@neuralmagic.com --- .../distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 0abfa489e13..54b867e9fea 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -335,7 +335,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): # Agent. import os - num_workers = 16 + num_workers = 32 # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. if os.getenv("VLLM_IS_PREFILL", "0") == "1": @@ -985,7 +985,7 @@ def _read_blocks( assert len(local_block_descs_ids) == len(remote_block_descs_ids) # Prepare transfer with Nixl. - CHUNK_SIZE = 1000 + CHUNK_SIZE = 500 handles = [] # NOTE: this is a hack to make make_prepped_xfer into threads so that # different workers are allocated for each chuck. Without this change, From ff5a0cfa6edbe78d901527f2019c2ac0eeb68983 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 1 Jul 2025 02:49:54 +0000 Subject: [PATCH 27/36] updated Signed-off-by: rshaw@neuralmagic.com --- pd_justfile/Justfile | 6 ++++-- .../kv_transfer/kv_connector/v1/nixl_connector.py | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pd_justfile/Justfile b/pd_justfile/Justfile index 5641967b4ea..aa83ea66e1f 100644 --- a/pd_justfile/Justfile +++ b/pd_justfile/Justfile @@ -12,8 +12,9 @@ port PORT: prefill: + VLLM_IS_PREFILL=1 \ VLLM_NIXL_SIDE_CHANNEL_PORT=5557 \ - CUDA_VISIBLE_DEVICES=0 \ + CUDA_VISIBLE_DEVICES=7 \ vllm serve {{MODEL}} \ --port $(just port 8100) \ --tensor-parallel-size 1 \ @@ -23,8 +24,9 @@ prefill: --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' decode: + VLLM_IS_PREFILL=0 \ VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5559) \ - CUDA_VISIBLE_DEVICES=1 \ + CUDA_VISIBLE_DEVICES=6 \ vllm serve {{MODEL}} \ --port $(just port 8300) \ --tensor-parallel-size 1 \ diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 54b867e9fea..fba9d15ef9a 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -7,7 +7,6 @@ import uuid from collections import defaultdict from collections.abc import Iterator -from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from typing import TYPE_CHECKING, Any, Optional @@ -335,7 +334,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): # Agent. import os - num_workers = 32 + num_workers = 64 # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. if os.getenv("VLLM_IS_PREFILL", "0") == "1": From 7fbcbbfc45ae21cc9cdb0838ae35837e9f2afb0d Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 1 Jul 2025 03:15:16 +0000 Subject: [PATCH 28/36] updated Signed-off-by: rshaw@neuralmagic.com --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index fba9d15ef9a..b852f552081 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -334,7 +334,7 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): # Agent. import os - num_workers = 64 + num_workers = 32 # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. if os.getenv("VLLM_IS_PREFILL", "0") == "1": From c22a6cb1cc83b42669f754b6c105c82547f4078b Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 7 Jul 2025 00:30:51 +0000 Subject: [PATCH 29/36] cleanup Signed-off-by: Robert Shaw --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index b852f552081..4c777c7788c 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -37,9 +37,6 @@ Transfer = tuple[int, float] # (xfer_handle, start_time) GET_META_MSG = b"get_meta_msg" NIXL_MAX_DESCS = 1000 -import os - -USE_BATCHED = os.getenv("USE_BATCHED", "1") == "1" logger = init_logger(__name__) @@ -1003,15 +1000,9 @@ def _read_blocks( # Begin async xfer. start = time.perf_counter() - if USE_BATCHED: - print("BATCHED!") - self.nixl_wrapper.transfer_batched(handles) - else: - print("NON BATCHED!") - for handle in handles: - self.nixl_wrapper.transfer(handle) + self.nixl_wrapper.transfer_batched(handles) end = time.perf_counter() - logger.info("======== LAUNCH TIME: %s ========", end - start) + logger.info("========== TRANSFER BATCHED: %s ==========", end - start) # Keep track of ongoing transfers. remote_rank = self.tp_rank // tp_ratio From b835205d33615ee60e01775a6f625fb4857061b4 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 7 Jul 2025 00:32:42 +0000 Subject: [PATCH 30/36] updated Signed-off-by: Robert Shaw --- pd_justfile/Justfile | 95 ------------------ pd_justfile/port_allocator.py | 99 ------------------- .../kv_connector/v1/nixl_connector.py | 4 - 3 files changed, 198 deletions(-) delete mode 100644 pd_justfile/Justfile delete mode 100755 pd_justfile/port_allocator.py diff --git a/pd_justfile/Justfile b/pd_justfile/Justfile deleted file mode 100644 index aa83ea66e1f..00000000000 --- a/pd_justfile/Justfile +++ /dev/null @@ -1,95 +0,0 @@ -# Setting this allows creating a symlink to Justfile from another dir -set working-directory := "/home/rshaw/vllm/pd_examples/" - -# Needed for the proxy server -vllm-directory := "/home/rshaw/vllm/" - -# MODEL := "Qwen/Qwen3-0.6B" -MODEL := "meta-llama/Llama-3.1-8B-Instruct" - -port PORT: - @python port_allocator.py {{PORT}} - - -prefill: - VLLM_IS_PREFILL=1 \ - VLLM_NIXL_SIDE_CHANNEL_PORT=5557 \ - CUDA_VISIBLE_DEVICES=7 \ - vllm serve {{MODEL}} \ - --port $(just port 8100) \ - --tensor-parallel-size 1 \ - --enforce-eager \ - --disable-log-requests \ - --block-size 128 \ - --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' - -decode: - VLLM_IS_PREFILL=0 \ - VLLM_NIXL_SIDE_CHANNEL_PORT=$(just port 5559) \ - CUDA_VISIBLE_DEVICES=6 \ - vllm serve {{MODEL}} \ - --port $(just port 8300) \ - --tensor-parallel-size 1 \ - --enforce-eager \ - --disable-log-requests \ - --block-size 128 \ - --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' - -proxy: - python "{{vllm-directory}}tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" \ - --port $(just port 8192) \ - --prefiller-port $(just port 8100) \ - --prefiller-host localhost \ - --decoder-port $(just port 8300) - -send_request: - curl -X POST http://localhost:$(just port 8192)/v1/completions \ - -H "Content-Type: application/json" \ - -d '{ \ - "model": "{{MODEL}}", \ - "prompt": "XXRed Hat is the best open source company by far across Linux, K8s, and AI, and vLLM has the greatest community in open source AI software infrastructure. I love vLLM because", \ - "max_tokens": 150, \ - "temperature": 0.7 \ - }' - -benchmark NUM_PROMPTS: - python {{vllm-directory}}/benchmarks/benchmark_serving.py \ - --port $(just port 8192) \ - --model {{MODEL}} \ - --dataset-name random \ - --random-input-len 10000 \ - --random-output-len 100 \ - --num-prompts {{NUM_PROMPTS}} \ - --seed $(date +%s) \ - -benchmark_one INPUT_LEN: - python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ - --model {{MODEL}} \ - --input-len {{INPUT_LEN}} \ - --output-len 1 \ - --num-requests 10 \ - --seed $(date +%s) \ - --port $(just port 8192) - -benchmark_one_no_pd INPUT_LEN: - python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ - --model {{MODEL}} \ - --input-len {{INPUT_LEN}} \ - --output-len 1 \ - --num-requests 10 \ - --seed $(date +%s) \ - --port $(just port 8100) - -reset_prefix_cache: - curl -X POST http://localhost:$(just port 8100)/reset_prefix_cache && \ - curl -X POST http://localhost:$(just port 8200)/reset_prefix_cache - -eval: - lm_eval --model local-completions --tasks gsm8k \ - --model_args model={{MODEL}},base_url=http://127.0.0.1:$(just port 8192)/v1/completions,num_concurrent=100,max_retries=3,tokenized_requests=False \ - --limit 1000 - -eval_port PORT: - lm_eval --model local-completions --tasks gsm8k \ - --model_args model={{MODEL}},base_url=http://127.0.0.1:$(just port {{PORT}})/v1/completions,num_concurrent=100,max_retries=3,tokenized_requests=False \ - --limit 1000 \ No newline at end of file diff --git a/pd_justfile/port_allocator.py b/pd_justfile/port_allocator.py deleted file mode 100755 index 1ac5486049f..00000000000 --- a/pd_justfile/port_allocator.py +++ /dev/null @@ -1,99 +0,0 @@ -#!/usr/bin/env python3 -""" -Port Allocation Utility - -A small utility that generates consistent port numbers based on username and default port -to avoid port collisions during development. -""" - -import argparse -import getpass -import hashlib - - -def allocate_port(base_port, - username=None, - project_name=None, - port_range=None): - """ - Allocate a port based on username and base port. - - Args: - base_port (int): The default port number for the service - username (str, optional): Username to use for hashing. Defaults to current user. - project_name (str, optional): Project name to make ports unique per project - port_range (tuple, optional): Range of valid ports (min, max). Defaults to (1024, 65535). - - Returns: - int: A port number derived from hashing the username and base port - """ - if not username: - username = getpass.getuser() - - if not port_range: - port_range = (1024, 65535) - - min_port, max_port = port_range - available_range = max_port - min_port - - # Create hash input from username, base_port and optional project_name - hash_input = f"{username}:{base_port}" - if project_name: - hash_input = f"{project_name}:{hash_input}" - - # Create a hash and convert to an integer in our port range - hash_obj = hashlib.md5(hash_input.encode()) - hash_int = int(hash_obj.hexdigest(), 16) - - # Generate a port within the valid range - port_offset = hash_int % available_range - allocated_port = min_port + port_offset - - # Check if it's too close to the base_port (within 10) - if abs(allocated_port - base_port) < 10: - # Add a small offset to avoid collisions with the default port - allocated_port = (allocated_port + 100) % available_range + min_port - - return allocated_port - - -def main(): - parser = argparse.ArgumentParser( - description='Allocate a consistent port based on username and base port' - ) - parser.add_argument('base_port', - type=int, - help='The default port number for the service') - parser.add_argument('--username', - '-u', - help='Username to use (defaults to current user)') - parser.add_argument('--project', - '-p', - help='Project name to make ports unique per project') - parser.add_argument('--env-var', - '-e', - help='Output as export ENV_VAR=port') - parser.add_argument('--min-port', - type=int, - default=1024, - help='Minimum port number') - parser.add_argument('--max-port', - type=int, - default=65535, - help='Maximum port number') - - args = parser.parse_args() - - port = allocate_port(args.base_port, - username=args.username, - project_name=args.project, - port_range=(args.min_port, args.max_port)) - - if args.env_var: - print(f"export {args.env_var}={port}") - else: - print(port) - - -if __name__ == "__main__": - main() diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 4c777c7788c..325530cab69 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -36,7 +36,6 @@ Transfer = tuple[int, float] # (xfer_handle, start_time) GET_META_MSG = b"get_meta_msg" -NIXL_MAX_DESCS = 1000 logger = init_logger(__name__) @@ -861,12 +860,9 @@ def _pop_done_transfers( # Done. if len(new_handles) == 0: - start = time.perf_counter() self.nixl_wrapper.send_notif(agent_name, notif_id) del transfers[req_id] done_req_ids.add(req_id) - end = time.perf_counter() - print(f"========= SEND NOTIF TIME: {end - start} =========") else: transfers[req_id] = (new_handles, agent_name, notif_id) From f16bf6387710c5e3e4eb20ca73fb1b8754591637 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 7 Jul 2025 01:13:20 +0000 Subject: [PATCH 31/36] updated Signed-off-by: Robert Shaw --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 325530cab69..11c61f52ca7 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -329,17 +329,14 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self.block_size = vllm_config.cache_config.block_size # Agent. - import os - num_workers = 32 # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. - if os.getenv("VLLM_IS_PREFILL", "0") == "1": - num_workers = None - print(f"NUM_WORKERS: {num_workers=}") + NUM_WORKERS = 32 + logger.info(f"{NUM_WORKERS=}") self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None, num_workers=None, - num_shared_workers=num_workers) + num_shared_workers=NUM_WORKERS) # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) From bd57841c7ba605cf958f6185012a5b91726f5ff7 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 7 Jul 2025 01:14:10 +0000 Subject: [PATCH 32/36] updated Signed-off-by: Robert Shaw --- tools/Justfile | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 tools/Justfile diff --git a/tools/Justfile b/tools/Justfile new file mode 100644 index 00000000000..0ffced37272 --- /dev/null +++ b/tools/Justfile @@ -0,0 +1,79 @@ +# Needed for the proxy server +vllm-directory := "/home/rshaw/vllm/" + +# MODEL := "Qwen/Qwen3-0.6B" +MODEL := "meta-llama/Llama-3.1-8B-Instruct" +PROXY_PORT := "8192" +PREFILL_PORT := "8100" +DECODE_PORT := "8200" + +prefill: + VLLM_NIXL_SIDE_CHANNEL_PORT=5557 \ + CUDA_VISIBLE_DEVICES=0,7 \ + vllm serve {{MODEL}} \ + --port {{PREFILL_PORT}} \ + --tensor-parallel-size 2 \ + --enforce-eager \ + --disable-log-requests \ + --block-size 128 \ + --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' + +decode: + VLLM_NIXL_SIDE_CHANNEL_PORT=5567 \ + CUDA_VISIBLE_DEVICES=4,5 \ + vllm serve {{MODEL}} \ + --port {{DECODE_PORT}} \ + --tensor-parallel-size 2 \ + --enforce-eager \ + --disable-log-requests \ + --block-size 128 \ + --kv-transfer-config '{"kv_connector":"NixlConnector","kv_role":"kv_both"}' + +proxy: + python "{{vllm-directory}}tests/v1/kv_connector/nixl_integration/toy_proxy_server.py" \ + --port {{PROXY_PORT}} \ + --prefiller-port {{PREFILL_PORT}} \ + --decoder-port {{DECODE_PORT}} + +send_request: + curl -X POST http://localhost:{{PROXY_PORT}}/v1/completions \ + -H "Content-Type: application/json" \ + -d '{ \ + "model": "{{MODEL}}", \ + "prompt": "Red Hat is the best open source company by far across Linux, K8s, and AI, and vLLM has the greatest community in open source AI software infrastructure. I love vLLM because", \ + "max_tokens": 150, \ + "temperature": 0.7 \ + }' + +benchmark NUM_PROMPTS: + python {{vllm-directory}}/benchmarks/benchmark_serving.py \ + --port {{PROXY_PORT}} \ + --model {{MODEL}} \ + --dataset-name random \ + --random-input-len 30000 \ + --random-output-len 10 \ + --num-prompts {{NUM_PROMPTS}} \ + --seed $(date +%s) \ + +benchmark_one INPUT_LEN: + python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ + --port {{PROXY_PORT}} \ + --model {{MODEL}} \ + --input-len {{INPUT_LEN}} \ + --output-len 1 \ + --num-requests 10 \ + --seed $(date +%s) + +benchmark_one_no_pd INPUT_LEN: + python {{vllm-directory}}benchmarks/benchmark_one_concurrent_req.py \ + --port {{DECODE_PORT}} \ + --model {{MODEL}} \ + --input-len {{INPUT_LEN}} \ + --output-len 1 \ + --num-requests 10 \ + --seed $(date +%s) + +eval: + lm_eval --model local-completions --tasks gsm8k \ + --model_args model={{MODEL}},base_url=http://127.0.0.1:{{PROXY_PORT}}/v1/completions,num_concurrent=100,max_retries=3,tokenized_requests=False \ + --limit 1000 From f65450e3dc9541eb311b8dd559b6460184864784 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Mon, 7 Jul 2025 01:27:40 +0000 Subject: [PATCH 33/36] updated Signed-off-by: Robert Shaw --- .../kv_transfer/kv_connector/v1/nixl_connector.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 11c61f52ca7..325530cab69 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -329,14 +329,17 @@ def __init__(self, vllm_config: VllmConfig, engine_id: str): self.block_size = vllm_config.cache_config.block_size # Agent. + import os + num_workers = 32 # setting num_workers on the prefiller causes no notifs to be recved??? # this is a hack to make sure we set num workers on the prefiller to 1. - NUM_WORKERS = 32 - logger.info(f"{NUM_WORKERS=}") + if os.getenv("VLLM_IS_PREFILL", "0") == "1": + num_workers = None + print(f"NUM_WORKERS: {num_workers=}") self.nixl_wrapper = NixlWrapper(str(uuid.uuid4()), None, num_workers=None, - num_shared_workers=NUM_WORKERS) + num_shared_workers=num_workers) # Map of engine_id -> {rank0: agent_name0, rank1: agent_name1..}. self._remote_agents: dict[str, dict[int, str]] = defaultdict(dict) From 81fdcec214f1ecbd8c952d2fe3142f33df61310e Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Thu, 10 Jul 2025 13:32:28 +0000 Subject: [PATCH 34/36] added logging Signed-off-by: Robert Shaw --- .../kv_connector/v1/nixl_connector.py | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 325530cab69..3d3c393eb39 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -37,6 +37,9 @@ Transfer = tuple[int, float] # (xfer_handle, start_time) GET_META_MSG = b"get_meta_msg" +import os +LOG_XFER_TIME = os.getenv("VLLM_LOG_XFER_TIME", "0") == "1" + logger = init_logger(__name__) # Lazy import nixl_wrapper to avoid loading nixl_bindings if nixl is not used @@ -764,8 +767,11 @@ def get_finished(self) -> tuple[set[str], set[str]]: to Rank 0 once their transaction is done + Rank 0 returns finished sets to Scheduler only once all ranks are done. """ + + start = time.perf_counter() done_sending = self._get_new_notifs() done_recving = self._pop_done_transfers(self._recving_transfers) + if len(done_sending) > 0 or len(done_recving) > 0: logger.debug( "Rank %s, get_finished: %s requests done sending " @@ -806,6 +812,10 @@ def get_finished(self) -> tuple[set[str], set[str]]: if self._done_sending_count[req_id] == self.world_size: del self._done_sending_count[req_id] all_done_sending.add(req_id) + + end = time.perf_counter() + if LOG_XFER_TIME: + logger.info("========== .get_finished(): %s ==========", end - start) return all_done_sending, all_done_recving @@ -815,6 +825,10 @@ def get_finished(self) -> tuple[set[str], set[str]]: self.tp_group.send_object(finished_req_ids, dst=0) # Unused as only Rank 0 results are sent to scheduler. + end = time.perf_counter() + if LOG_XFER_TIME: + logger.info("========== .get_finished(): %s ==========", end - start) + return done_sending, done_recving def _get_new_notifs(self) -> set[str]: @@ -845,8 +859,8 @@ def _pop_done_transfers( Returns: set of req_ids that have all done xfers """ - done_req_ids: set[str] = set() - for req_id, (handles, agent_name, notif_id) in list(transfers.items()): + done_req_ids: set[str, float] = set() + for req_id, (handles, agent_name, notif_id, start_time) in list(transfers.items()): new_handles = [] for handle in handles: xfer_state = self.nixl_wrapper.check_xfer_state(handle) @@ -863,6 +877,9 @@ def _pop_done_transfers( self.nixl_wrapper.send_notif(agent_name, notif_id) del transfers[req_id] done_req_ids.add(req_id) + if LOG_XFER_TIME: + logger.info("========== transmission time: %s ==========", time.perf_counter() - start_time) + else: transfers[req_id] = (new_handles, agent_name, notif_id) @@ -998,13 +1015,14 @@ def _read_blocks( start = time.perf_counter() self.nixl_wrapper.transfer_batched(handles) end = time.perf_counter() - logger.info("========== TRANSFER BATCHED: %s ==========", end - start) + if LOG_XFER_TIME: + logger.info("========== .transfer_batched(): %s ==========", end - start) # Keep track of ongoing transfers. remote_rank = self.tp_rank // tp_ratio agent_name = self._remote_agents[dst_engine_id][remote_rank] assert request_id not in self._recving_transfers - self._recving_transfers[request_id] = (handles, agent_name, notif_id) + self._recving_transfers[request_id] = (handles, agent_name, notif_id, time.perf_counter()) def _get_block_descs_ids(self, engine_id: str, From d0bb3fa02c82e6c292186c295a5a2f2b9881ac29 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Thu, 10 Jul 2025 13:54:33 +0000 Subject: [PATCH 35/36] fix Signed-off-by: Robert Shaw --- vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 3d3c393eb39..8b6a8ef9fb1 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -881,7 +881,7 @@ def _pop_done_transfers( logger.info("========== transmission time: %s ==========", time.perf_counter() - start_time) else: - transfers[req_id] = (new_handles, agent_name, notif_id) + transfers[req_id] = (new_handles, agent_name, notif_id, start_time) return done_req_ids From 45c02abd72585d91899582fd0931af001c1a98b2 Mon Sep 17 00:00:00 2001 From: Robert Shaw Date: Fri, 11 Jul 2025 00:57:50 +0000 Subject: [PATCH 36/36] updated Signed-off-by: Robert Shaw --- vllm/model_executor/model_loader/weight_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/model_executor/model_loader/weight_utils.py b/vllm/model_executor/model_loader/weight_utils.py index 857f4bca682..f96f4cc6f5e 100644 --- a/vllm/model_executor/model_loader/weight_utils.py +++ b/vllm/model_executor/model_loader/weight_utils.py @@ -75,7 +75,7 @@ def enable_hf_transfer(): class DisabledTqdm(tqdm): def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs, disable=True) + super().__init__(*args, **kwargs, disable=False) def get_lock(model_name_or_path: Union[str, Path],