Skip to content

Draft: WIP NixlConnector allow configurable handshake backend +HTTP #19447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 36 commits into
base: main
Choose a base branch
from

Conversation

wseaton
Copy link
Contributor

@wseaton wseaton commented Jun 10, 2025

Essential Elements of an Effective PR Description Checklist

  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.

Purpose

Don't use a custom ZMQ side channel socket for nixl agent registration, instead switch to using http transport which is more desirable for DC-scale serving.

TODO(s)

  • Scheduler changes to move the agent registration out of _add_blocks
  • Refactor to completely seperate route/port
  • Performance test/benchmark against ZMQ

Test Plan

TODO

Test Result

TODO

(Optional) Documentation Update

Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors. You can run other CI tests on top of those by going to your fastcheck build on Buildkite UI (linked in the PR checks section) and unblock them. If you do not have permission to unblock, ping simon-mo or khluu to add you in our Buildkite org.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

🚀

@mergify mergify bot added the frontend label Jun 10, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @wseaton, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request is a work-in-progress draft aimed at replacing the custom ZMQ side channel used by the Nixl KV connector for exchanging metadata with an HTTP-based approach. This involves adding a new API endpoint to serve metadata, updating the Nixl connector to use HTTP requests, and plumbing the metadata collection and distribution through the system configuration.

Highlights

  • Replace ZMQ with HTTP for Nixl Metadata: The core change is migrating the Nixl KV connector's side channel for exchanging agent metadata from a custom ZMQ implementation to a standard HTTP endpoint. This simplifies the communication mechanism.
  • New HTTP Endpoint: A new GET endpoint /get_kv_connector_metadata is added to the API server (vllm/entrypoints/openai/api_server.py) to serve the necessary KV connector handshake metadata collected from the workers.
  • Metadata Propagation: KV connector handshake metadata is now collected from the workers by the executor, passed up to the engine, stored in the vllm_config.cache_config, and included in the initial handshake message sent back to the launcher/utils during engine startup.
  • Async HTTP Handshake: The Nixl worker's handshake logic (_nixl_handshake) is updated to use aiohttp for making asynchronous HTTP requests to fetch metadata from remote engines via the new API endpoint, replacing the previous ZMQ request/reply pattern.
  • Base Connector Updates: The base KV connector class (vllm/distributed/kv_transfer/kv_connector/v1/base.py) is updated to include a dedicated KVConnectorHandshakeMetadata struct and methods (set_handshake_metadata, get_handshake_metadata) to manage this out-of-band information.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configureGemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@mergify mergify bot added the v1 label Jun 10, 2025
Copy link

mergify bot commented Jun 10, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @wseaton.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jun 10, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request makes significant changes to the NixlConnector by replacing ZMQ-based metadata exchange with an HTTP-based mechanism. This involves adding new metadata structures (KVConnectorHandshakeMetadata), modifying existing configuration classes (CacheConfig, EngineCore) to store and propagate this metadata, and introducing a new API endpoint (/get_kv_connector_metadata) to serve it. The core NIXL handshake logic in NixlConnectorWorker now uses aiohttp for asynchronous HTTP requests.

Key areas of feedback:

  • Security: The new HTTP endpoint needs authentication, as noted in a TODO. This is critical.
  • Asynchronous Operations: The use of asyncio.run() in a potentially performance-sensitive path (_read_blocks) should be reviewed. The TODO to move agent registration out of _add_blocks might address this.
  • DP Rank Logic: A FIXME comment in _nixl_handshake regarding Data Parallel rank determination needs to be addressed to ensure robustness across different deployment scenarios.
  • Error Handling and Robustness: Ensure robust error handling for HTTP requests and consider configurability for timeouts.

Overall, the changes seem to be a good step towards simplifying the inter-process communication for metadata exchange. Addressing the highlighted points, especially security and the FIXMEs, will be important.

Comment on lines 416 to 478
# FIXME(weaton): Need to figure out the logic for the right DP rank to
# query. Currently, we assume that the metadata is sent by the
# first DP rank in the TP group.
# Get the data for this tensor parallel rank
tp_data = res.get(str(self.tp_rank), {}).get("0", {})
metadata_bytes = tp_data.get("agent_metadata", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The FIXME(weaton) comment highlights a potential issue with determining the correct Data Parallel (DP) rank to query. This logic (res.get(str(self.tp_rank), {}).get("0", {})) assumes metadata is always provided by DP rank 0 within a TP group. This assumption needs to be robust or made configurable if different DP ranks might host the metadata. This could lead to incorrect handshakes if the assumption doesn't hold in all deployment scenarios.

Comment on lines 872 to 874
@router.get("/get_kv_connector_metadata")
async def get_kv_connector_metadata(raw_request: Request):
kv_connector_metadata = raw_request.app.state.vllm_config.cache_config.transfer_handshake_metadata
return JSONResponse(content=kv_connector_metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

A new endpoint /get_kv_connector_metadata is added. This endpoint exposes transfer_handshake_metadata from the CacheConfig.

Security Concern: The PR description includes a TODO: "Authentication on the route, move it out of the dev server args". This is critical. Exposing this metadata without authentication could be a security risk, as it might reveal internal cluster configuration or state. This endpoint should be protected, especially in production environments.

@@ -722,7 +713,7 @@ def _read_blocks(
):
# NOTE(rob): this takes ~2s. We need to get this off the hotpath.
if dst_engine_id not in self._remote_agents:
self._nixl_handshake(remote_host, remote_port)
asyncio.run(self._nixl_handshake(remote_host, remote_port))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling asyncio.run(self._nixl_handshake(remote_host, remote_port)) here is a significant change. Previously, _nixl_handshake was synchronous (using ZMQ). Now it's an async function.

asyncio.run() creates a new event loop, runs the coroutine, and then closes the loop. If _read_blocks (and by extension, this handshake) is called frequently in a performance-sensitive, synchronous path, repeatedly creating and tearing down event loops could introduce overhead.

Consider if the broader context calling _read_blocks can be made async to avoid asyncio.run(). If not, this might be acceptable if handshakes are infrequent. The PR description mentions a TODO: "Scheduler changes to move the agent registration out of _add_blocks", which might address this by making handshakes less frequent or moving them to a more appropriate (potentially async) context.

Comment on lines 442 to 519
# If metadata_bytes is None, it means the remote agent
# is not using NIXL, so we can skip the handshake.
logger.warning(
"Received None metadata from %s:%s, skipping NIXL handshake",
host, port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Handling the case where metadata_bytes is None by logging a warning and skipping the NIXL handshake is a good fallback. This allows graceful degradation if a remote agent isn't using NIXL or if there's an issue fetching its metadata.

Comment on lines +143 to +383
# Set handshake metadata using the base class method
if hasattr(self.connector_worker, 'xfer_metadata'):
self.set_handshake_metadata(self.connector_worker.xfer_metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to set handshake metadata after registering KV caches is good. The hasattr check for xfer_metadata on self.connector_worker is a safe way to ensure the attribute exists before accessing it, especially if NixlConnectorWorker.register_kv_caches might not always set it (though in the current code, it seems it always does if NixlWrapper is available).

Comment on lines 80 to 91
num_gpu_blocks, num_cpu_blocks, kv_cache_config, \
transfer_handshake_metadata = self._initialize_kv_caches(vllm_config)

vllm_config.cache_config.num_gpu_blocks = num_gpu_blocks
vllm_config.cache_config.num_cpu_blocks = num_cpu_blocks

# Store KV connector metadata for handshake
self.transfer_handshake_metadata = transfer_handshake_metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _initialize_kv_caches method now also returns transfer_handshake_metadata, which is then stored in self.transfer_handshake_metadata. This metadata is collected from workers after KV cache registration. This is a key change to support the HTTP-based handshake mechanism.

Comment on lines 172 to 179
# Collect KV connector xfer metadata from workers (after KV cache registration)
transfer_handshake_metadata = (
self.model_executor.get_kv_connector_handshake_metadata())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Collecting transfer_handshake_metadata from self.model_executor.get_kv_connector_handshake_metadata() after initializing KV caches is the correct place to gather this information. This metadata is then propagated up and eventually exposed via the new API endpoint.

Comment on lines 424 to 470
# Include KV connector metadata if available
if hasattr(self,
'transfer_handshake_metadata') and self.transfer_handshake_metadata:
# self.transfer_handshake_metadata is a list of dicts from workers
# Each dict already has structure {tp_rank: {dp_rank: metadata}}
# Merge all worker dicts into a single dict
content = {}
for worker_dict in self.transfer_handshake_metadata:
if worker_dict is not None:
content.update(worker_dict)
handshake_message["transfer_handshake_metadata"] = content
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The handshake message now includes transfer_handshake_metadata if available. This metadata is a list of dictionaries from workers, each structured as {tp_rank: {dp_rank: metadata}}. The code correctly merges these into a single content dictionary before encoding. This is how the metadata becomes available to other vLLM instances for the HTTP-based handshake.

vllm/v1/utils.py Outdated
Comment on lines 372 to 548
# stash KV connector metadata in vllm_config if passed in.
if "transfer_handshake_metadata" in msg and msg["transfer_handshake_metadata"]:
logger.debug(
"Received transfer handshake metadata from engine %s: %s",
eng_index, msg["transfer_handshake_metadata"])
# Merge the received metadata with existing cache config
for tp_rank, dp_dict in msg["transfer_handshake_metadata"].items():
for dp_rank, metadata in dp_dict.items():
cache_config.transfer_handshake_metadata[tp_rank][
dp_rank] = metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The wait_for_engine_startup function now processes transfer_handshake_metadata if it's present in the handshake message from an engine. It correctly merges this metadata into the cache_config.transfer_handshake_metadata. This allows the orchestrator or client to gather all necessary handshake details from all engine instances. The nested loop structure for merging tp_rank and dp_rank specific metadata is appropriate.

Comment on lines 221 to 267
def get_kv_connector_handshake_metadata(self) -> Optional[dict]:
"""Get KV connector metadata from this worker if available."""

connector = get_kv_transfer_group()
if not is_v1_kv_transfer_group(connector):
logger.warning(
"The KV connector is not a v1 connector. "
"This method is only supported for v1 connectors.")
return None

# Only return metadata if this is a worker role
if connector.role == KVConnectorRole.WORKER:
metadata = connector.get_handshake_metadata()
if metadata is None:
logger.warning(
"KV connector metadata is not available. "
"This may happen if the KV connector is not initialized "
"or the worker is not part of a disaggregated KV cache setup."
)
return None

tp_rank = get_tp_group().rank_in_group
dp_rank = self.vllm_config.parallel_config.data_parallel_rank_local
return {
tp_rank: {
dp_rank: msgspec.to_builtins(metadata)
}
}

return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The get_kv_connector_handshake_metadata method is a new addition to the Worker class. It correctly retrieves handshake metadata from the KV connector if it's a v1 connector and the role is WORKER.

Key points:

  • It checks if the connector is v1 and logs a warning if not. This is good for compatibility.
  • It only returns metadata for WORKER roles, which is logical as schedulers typically don't expose their own NIXL-agent-like metadata in the same way.
  • It handles the case where metadata might be None (e.g., connector not initialized or not a disaggregated setup) by logging a warning and returning None.
  • The returned metadata is structured as {tp_rank: {dp_rank: msgspec.to_builtins(metadata)}}. This structure is consistent with how it's processed in vllm/v1/engine/core.py and vllm/v1/utils.py.

This method is crucial for enabling workers to provide their handshake details, which are then aggregated by the engine and made available for other instances.

Copy link
Member

@russellb russellb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only looked at this very quickly, but is putting this in the main API server just a temporary thing?

This seems like something that should be internal-facing only, not in the same API server exposed externally.

@wseaton
Copy link
Contributor Author

wseaton commented Jun 10, 2025

@russellb this has to be externally accessible for P/D disagg, as one "api server" is basically handshaking with another. But it is important that is is accessible only by trusted peers (since the information can be used by an attacker to leak information about KV blocks, then pivot to potentially register nixl agents, etc). So that authentication mechanism needs to be figured out. Happy to chat about the path forward for this!

wseaton added 3 commits June 11, 2025 14:48
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
@mergify mergify bot removed the needs-rebase label Jun 11, 2025
Signed-off-by: Will Eaton <weaton@redhat.com>

# Handshake only with the other TP remote the current local rank will
# pull from. With homogeneous TP it happens to be the same rank_i.
tp_ratio = self._tp_size[self.engine_id] // metadata.tp_size
tp_ratio = self._tp_size[self.engine_id] // remote_tp_size
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NickLucche can you help review this for correctness? For HTTP metadata xfer, we store nixlagent metadata in a dict tp_rank.dp_rank.metadata so ["0"]["0"]["agent_metadata"] is the default remote agent in the TP1DP1 case.

Is remote_tp_size and the corresponding p_remote_rank calculation correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks correct. However I am not sure why we're bundling tp.dp here.
I am treating each DP as a separate remote for all intents and purposes, hence we would be able to support DP groups with different TP sizes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(although you can't start DP with different TP right now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DP is being bundled in the metadata because the api server runs a collective rpc to get metadata from all workers, which includes DP workers, for now though that metadata is functionally ignored, but it might be useful in the future for us or other connectors

Copy link
Contributor

@NickLucche NickLucche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the http exchange logic in nixl is in a good state.

I am not convinced the increased code complexity in handling the threads is really beneficial though, particularly for readability.


try:
req = Request(url)
with urlopen(req, timeout=5.0) as response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: are we putting timeout in a global var?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, this should probably get moved to something like envs.KV_METADATA_HANDSHAKE_TIMEOUT or something like that.

remote_tp_size = len(res.keys())
# Default case is that the remote TP size is 1, so we can
# directly access the metadata.
tp_data = res.get(str(self.tp_rank), {}).get("0", {})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can't we maintain the same structure regardless of tp0 or tp>1?

raise


remote_tp_size = len(res.keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just fyi I am hoping I can transmit the tp size with the kvmetadata here #19413

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of my comment above, and the fact that all ranks are bundled together right now we get this for free with keys... but it makes the size of this bundle grow with number of ranks. I think this means we should make the api something like get_kv_connector_metadata/{rank}, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense, since rank_i only needs the data from rank_j


# Handshake only with the other TP remote the current local rank will
# pull from. With homogeneous TP it happens to be the same rank_i.
tp_ratio = self._tp_size[self.engine_id] // metadata.tp_size
tp_ratio = self._tp_size[self.engine_id] // remote_tp_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks correct. However I am not sure why we're bundling tp.dp here.
I am treating each DP as a separate remote for all intents and purposes, hence we would be able to support DP groups with different TP sizes.


# Handshake only with the other TP remote the current local rank will
# pull from. With homogeneous TP it happens to be the same rank_i.
tp_ratio = self._tp_size[self.engine_id] // metadata.tp_size
tp_ratio = self._tp_size[self.engine_id] // remote_tp_size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(although you can't start DP with different TP right now)

Comment on lines 488 to 489
agent_metadata=base64.b64decode(metadata_bytes),
kv_caches_base_addr=tp_data["kv_caches_base_addr"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I see here the problem with sending a lot of memory pointers through json..a bit ugly, but it's a one off

Copy link
Contributor Author

@wseaton wseaton Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... also json might not be ideal here since the agent_metadata is binary. we can revisit this encoding, since it's an internal API we could probably just use msgpack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree


logger.debug("NIXL handshake: get metadata took: %s",
pre_register - start_time)
logger.debug("NIXL handshake: add agent took: %s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, is this value changing significantly with http?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's slightly slower w/ nixl 0.2, with nixl 0.3.1 agent registration is a lot faster, which makes the increase more noticable. But, since we are moving this onto a background thread, so the engine is not blocked during handshake. If we can reduce the amount of metadata needed for handshake (like the optimization you already have) I think we can get closer to zmq speed.


# check for timeout (threads running > 30 seconds)
thread_age = time.time() - getattr(thread, '_start_time', time.time())
if thread.is_alive() and thread_age > 30.0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing with constant

@@ -851,39 +883,138 @@ def _pop_done_transfers(
xfer_state)
return done_req_ids

def _process_completed_handshakes(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the increased complexity in this method - perhaps I wish we had separated the async handshake from the http exchange.

Is this meant for speeding up a warmup stage where requests are sent just to get every D to handshake with every other P?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for making sure we reprocess any requests that are deferred because they needed handshakes to happen (as a part of the changes to make handshakes happen in the background), it might be my lack of knowledge of scheduler internals, but when I tried to just continue and have them reprocessed on next engine step, I didn't see the _read_blocks ever retried.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also definitely open to refactoring the background threading stuff... initially I had it working via aiohttp and concurrent.Futures which made the code a bit cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I was expecting to see asyncio usage here too tbh

@wseaton
Copy link
Contributor Author

wseaton commented Jun 12, 2025

There is a good opportunity here to align with other refactoring efforts, specifically proposed changes here #19330 (comment), which would allow pushing some of the retry logic explored here directly into the scheduler.

@chaunceyjiang
Copy link
Contributor

If PP > 1, is this solution compatible?
For example, with PP = 4, how should PP1, PP2, and PP3 handle the metadata?

@mergify mergify bot added qwen Related to Qwen models rocm Related to AMD ROCm structured-output tpu Related to Google TPUs tool-calling labels Jun 30, 2025
Copy link

mergify bot commented Jun 30, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @wseaton.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jun 30, 2025
@mergify mergify bot removed tpu Related to Google TPUs needs-rebase labels Jun 30, 2025
@wseaton wseaton force-pushed the kv-xfer-updates branch 2 times, most recently from 3d5e24a to b960355 Compare June 30, 2025 19:29
wseaton added 10 commits June 30, 2025 15:46
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Signed-off-by: Will Eaton <weaton@redhat.com>
Copy link

mergify bot commented Jul 2, 2025

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @wseaton.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify bot added the needs-rebase label Jul 2, 2025
Copy link
Contributor

@NickLucche NickLucche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nits, I like the abstraction over handshake methods (though I'd be happier with just having to maintain one..).
I still feel like a some of the code in the handshake Strategies is duplicated, maybe we could move that up the hierarchy?

@@ -76,6 +80,241 @@ class ReqMeta:
tp_size: int


class HandshakeStrategy(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we start moving into a separate utils or similar file?

engine_id)
self.add_remote_agent_func = add_remote_agent_func
self._listener_thread: Optional[threading.Thread] = None
self._tp_size_mapping: dict[str, int] = {engine_id: tp_size}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isn't tp_size cleaner? I think dict already gives away the fact that it's a mapping

"""
Handshake strategy that uses HTTP requests to fetch metadata from a
remote server. This is done through the front-end, and is
North-South, not P2P.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumbq: does north-south mean sidecar/proxy reaching out to instances and then seeing that they have the right metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this north-south config it means the GPU worker is reaching out to the API server of the remote, instead of directly handshaking with another peer GPU worker process from remote

Comment on lines +1013 to +1023
rank_data = self.tp_group.recv_object(src=i)
other_sending, other_recving = rank_data

sending_set = other_sending or set()
recving_set = other_recving or set()
for req_id in sending_set | recving_set:
if (req_id in self._done_recving_count
or req_id in self._recving_transfers):
self._done_recving_count[req_id] += 1
else:
self._done_sending_count[req_id] += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably didn't need this change in this PR but that's ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Let me revert this refactoring, the diff is already large and I need to rebase anyways.

Signed-off-by: Will Eaton <weaton@redhat.com>
vllm/utils.py Outdated
Comment on lines 2953 to 2958
# Check if it's an IPv6 address
ip = ipaddress.ip_address(host)
# Ensure IPv6 addresses are bracketed
if (isinstance(ip, ipaddress.IPv6Address)
and not (host.startswith('[') and host.endswith(']'))):
host = f'[{host}]'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use this util for checking for a v6 address

vllm/vllm/utils/__init__.py

Lines 815 to 820 in 9ff2af6

def is_valid_ipv6_address(address: str) -> bool:
try:
ipaddress.IPv6Address(address)
return True
except ValueError:
return False

Signed-off-by: Will Eaton <weaton@redhat.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci/build documentation Improvements or additions to documentation frontend llama Related to Llama models multi-modality Related to multi-modality (#4194) needs-rebase performance Performance-related issues qwen Related to Qwen models rocm Related to AMD ROCm structured-output tool-calling v1
Projects
Status: No status
Status: No status
Development

Successfully merging this pull request may close these issues.

6 participants