-
-
Notifications
You must be signed in to change notification settings - Fork 8.9k
[BugFix] Fix KVConnector TP worker aggregation #21473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,8 @@ | |
from vllm.distributed import (ensure_model_parallel_initialized, | ||
init_distributed_environment, | ||
set_custom_all_reduce) | ||
from vllm.distributed.kv_transfer import ensure_kv_transfer_initialized | ||
from vllm.distributed.kv_transfer import (ensure_kv_transfer_initialized, | ||
has_kv_transfer_group) | ||
from vllm.distributed.parallel_state import get_pp_group, get_tp_group | ||
from vllm.logger import init_logger | ||
from vllm.lora.request import LoRARequest | ||
|
@@ -333,19 +334,20 @@ def execute_model( | |
assert isinstance(output, IntermediateTensors) | ||
get_pp_group().send_tensor_dict(output.tensors, | ||
all_gather_group=get_tp_group()) | ||
if not has_kv_transfer_group(): | ||
return None | ||
|
||
# In case of PP with kv transfer, we need to pass through the | ||
# finished_sending and finished_recving buffers. | ||
empty_output = EMPTY_MODEL_RUNNER_OUTPUT | ||
new_output = EMPTY_MODEL_RUNNER_OUTPUT | ||
if output.finished_sending or output.finished_recving: | ||
empty_output = copy.copy(empty_output) | ||
empty_output.finished_sending = output.finished_sending | ||
empty_output.finished_recving = output.finished_recving | ||
output = empty_output | ||
new_output = copy.copy(new_output) | ||
new_output.finished_sending = output.finished_sending | ||
new_output.finished_recving = output.finished_recving | ||
output = new_output | ||
njhill marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
assert isinstance(output, ModelRunnerOutput) | ||
# return output only from the driver worker | ||
return output if self.is_driver_worker else None | ||
return output | ||
Comment on lines
+347
to
+350
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change modifies the original behavior: if no KV connector is present, non-driver workers in the last PP rank now return output. I'm not certain this has any practical impact, though—under return new_output
assert isinstance(output, ModelRunnerOutput)
return_output = self.is_driver_worker or has_kv_transfer_group()
return output if return_output else None |
||
|
||
def profile(self, is_start: bool = True): | ||
if self.profiler is None: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this needs to be a deepcopy right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we updated the value of
EMPTY_MODEL_RUNNER_OUTPUT
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't completely follow what you mean but I'm pretty sure it doesn't need to be a deep copy. We are just copying because we don't want to modify the shared
EMPTY_MODEL_RUNNER_OUTPUT
itself.