Skip to content

Draft: WIP NixlConnector allow configurable handshake backend +HTTP #19447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
efbd791
initial noodling
wseaton Jun 2, 2025
9dd2c1c
attempt to background agent registration
wseaton Jun 10, 2025
a0c03cd
more simple immediate retry
wseaton Jun 11, 2025
cf616b5
fix bad merge
wseaton Jun 11, 2025
505f586
implement nicks suggestions
wseaton Jun 16, 2025
a1a0918
change retry logic and move to scheduler; simply background handshake…
wseaton Jun 16, 2025
518a59f
fixup by changing types at callsite
wseaton Jun 16, 2025
6426e3f
fixup popping requests
wseaton Jun 16, 2025
29885af
debug logging
wseaton Jun 16, 2025
504bba7
working checkpoint
wseaton Jun 17, 2025
dd49c96
fix unreachable
wseaton Jun 17, 2025
f65c1b3
remove uv.lock
wseaton Jun 17, 2025
efd655b
remove unused
wseaton Jun 17, 2025
dba3835
flip protocol; fix scheduling order bug
wseaton Jun 18, 2025
85855d1
fix bug in case of no kvconnectorgroup
wseaton Jun 18, 2025
1fc1af4
actually use handshake timeout; simplify route
wseaton Jun 18, 2025
9d8c15c
revert back to working het TP logic
wseaton Jun 19, 2025
fbf0630
Merge branch 'main' into kv-xfer-updates
wseaton Jun 26, 2025
69af91c
Resolve merge conflicts and remove deprecated pending handshake logic
wseaton Jun 26, 2025
91af1cd
Merge remote-tracking branch 'origin/main' into kv-xfer-updates
wseaton Jun 27, 2025
83ec83a
move nixl sidechannel to own entrypoint
wseaton Jun 27, 2025
9a86b37
allow configurable handshake strategy
wseaton Jun 30, 2025
cc887f5
pre-commit fixes
wseaton Jun 30, 2025
b960355
satisfy mypy
wseaton Jun 30, 2025
e0e88b2
add comments; retrigger fastcheck
wseaton Jun 30, 2025
34e3e55
add type hint in test
wseaton Jun 30, 2025
2325470
revert api change
wseaton Jun 30, 2025
430ad03
fix usage
wseaton Jun 30, 2025
4f30966
remove dead code
wseaton Jun 30, 2025
54e088e
fix type declaration
wseaton Jun 30, 2025
ead79d6
revert spurious changes to multi-connector
wseaton Jun 30, 2025
1a6ac2c
remove dead code for threading
wseaton Jul 1, 2025
f940b6d
remove dead engine code
wseaton Jul 1, 2025
7768f21
fix optional ordering
wseaton Jul 1, 2025
3568422
naming, fix engine default param issue; pr feedback
wseaton Jul 9, 2025
332723a
use built in util
wseaton Jul 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,4 @@ shellcheck*/

# Ingore moe/marlin_moe gen code
csrc/moe/marlin_moe_wna16/kernel_*
uv.lock
10 changes: 9 additions & 1 deletion vllm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import textwrap
import uuid
import warnings
from collections import Counter
from collections import Counter, defaultdict
from contextlib import contextmanager
from dataclasses import (MISSING, Field, asdict, field, fields, is_dataclass,
replace)
Expand All @@ -33,6 +33,8 @@
import vllm.envs as envs
from vllm import version
from vllm.compilation.inductor_pass import CallableInductorPass, InductorPass
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
KVConnectorHandshakeMetadata)
from vllm.logger import init_logger
from vllm.model_executor.layers.quantization import (QUANTIZATION_METHODS,
QuantizationMethods,
Expand Down Expand Up @@ -1511,6 +1513,12 @@ class CacheConfig:
num_cpu_blocks: Optional[int] = field(default=None, init=False)
"""The number of blocks to allocate for CPU memory."""

transfer_handshake_metadata: dict[int, dict[int,
KVConnectorHandshakeMetadata]] = field(
default_factory=lambda: defaultdict(dict),
init=False)
"""Metadata for the KV connector handshake."""

def compute_hash(self) -> str:
"""
WARNING: Whenever a new field is added to this config,
Expand Down
75 changes: 71 additions & 4 deletions vllm/distributed/kv_transfer/kv_connector/v1/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@

import enum
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Callable, Optional

import torch
import msgspec
from pydantic_core import core_schema

from vllm.logger import init_logger
from vllm.v1.core.sched.output import SchedulerOutput
Expand Down Expand Up @@ -62,18 +64,58 @@
Abstract Metadata used to communicate between the
Scheduler KVConnector and Worker KVConnector.
"""
pass

def __init__(self):
pass


class KVConnectorHandshakeMetadata(
msgspec.Struct,
omit_defaults=True, # type: ignore[call-arg]
# required for @cached_property.
dict=True):
"""
Metadata optionally used for out of band connector handshake between P/D workers.

Check failure on line 78 in vllm/distributed/kv_transfer/kv_connector/v1/base.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/distributed/kv_transfer/kv_connector/v1/base.py:78:81: E501 Line too long (85 > 80)
"""
connector_type: str = "base"

@classmethod
def __get_pydantic_core_schema__(
cls,
_source_type: Any,
_handler: Callable[[Any], core_schema.CoreSchema]
) -> core_schema.CoreSchema:
"""bridge msgspec.Struct with pydantic for schema generation"""
return core_schema.no_info_after_validator_function(
cls,
core_schema.dict_schema()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The __get_pydantic_core_schema__ method provides a bridge between msgspec.Struct and Pydantic. This is a neat way to integrate the two. Ensure that the core_schema.dict_schema() accurately represents the structure you intend for Pydantic validation and schema generation, especially if KVConnectorHandshakeMetadata evolves to have more complex nested structures. For now, it seems appropriate for a dictionary representation.


class KVConnectorTransferMetadata(
msgspec.Struct,
omit_defaults=True, # type: ignore[call-arg]
dict=True):
"""
Wrapper for transfer handshake metadata sent between engine and utils.
"""
tensor_parallel_rank: int
data_parallel_rank: int
content: Optional[dict]


class KVConnectorBase_V1(ABC):

def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
def __init__(self,
vllm_config: "VllmConfig",
role: KVConnectorRole):
logger.warning(
"Initializing KVConnectorBase_V1. This API is experimental and "
"subject to change in the future as we iterate the design.")
self._connector_metadata = KVConnectorMetadata()
self._vllm_config = vllm_config
self._role = role
self._handshake_metadata: Optional[KVConnectorHandshakeMetadata] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Initializing _handshake_metadata to None is clear. Ensure that all code paths that might access _handshake_metadata correctly handle the None case, especially before it's set by set_handshake_metadata.



@property
def role(self) -> KVConnectorRole:
Expand Down Expand Up @@ -104,7 +146,7 @@
"""
self._connector_metadata = KVConnectorMetadata()

def _get_connector_metadata(self) -> KVConnectorMetadata:
def get_connector_metadata(self) -> KVConnectorMetadata:
"""Get the connector metadata.

This function should only be called inside the connector.
Expand Down Expand Up @@ -201,6 +243,31 @@
"""
return None, None

def set_handshake_metadata(
self, handshake_metadata: KVConnectorHandshakeMetadata) -> None:
"""
Set the handshake metadata for the connector.

This metadata is used for out-of-band connector handshake
between P/D workers.

Args:
handshake_metadata (KVConnectorHandshakeMetadata): the handshake
metadata.
"""
self._handshake_metadata = handshake_metadata


def get_handshake_metadata(
self) -> Optional[KVConnectorHandshakeMetadata]:
"""
Get the handshake metadata for the connector.

Returns:
KVConnectorHandshakeMetadata: the handshake metadata.
"""
return self._handshake_metadata

# ==============================
# Scheduler-side methods
# ==============================
Expand Down
Loading
Loading