Skip to content

GrpcWorkerAgentRuntimeHost : _pending_responses key collision when multiple senders target the same runtime with identical request_id → KeyError('1') #7016

@seunggil1

Description

@seunggil1

What happened?

Describe the bug

  • _pending_responses key collision when multiple senders target the same runtime with identical request_id → KeyError('1')

  • When two different GrpcWorkerAgentRuntime clients send messages that ultimately target the same runtime , the host stores response waiters in _pending_responses under target_client_id only, keyed by request_id string.
    Because each sender starts its local request counter from 1, both messages use request_id == "1". The second insert overwrites the first entry; later, when popping on response, the key is missing → KeyError('1').

This appears to be a design bug in how the host keys pending responses; it ignores the sender and assumes request_id is unique per target runtime.

Exception in callback GrpcWorkerAgentRuntimeHostServicer._raise_on_exception(<Task finishe...KeyError('1')>)
handle: <Handle GrpcWorkerAgentRuntimeHostServicer._raise_on_exception(<Task finishe...KeyError('1')>)>
Traceback (most recent call last):
  File "/Users/user/miniconda3/envs/autogen_0.5.7/lib/python3.12/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/user/Documents/GitHub/autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py", line 183, in _raise_on_exception
    raise exception
  File "/Users/user/miniconda3/envs/autogen_0.5.7/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result
    result = coro.send(None)
             ^^^^^^^^^^^^^^^
  File "/Users/user/Documents/GitHub/autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py", line 268, in _process_response
    future = self._pending_responses[client_id].pop(response.request_id)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: '1'

To Reproduce

Topology

  • Host: GrpcWorkerAgentRuntimeHost on localhost:50051.
  • Runtime-1: registers MyAgent1, MyAgent2 (both agents live on the same runtime).
  • Runtime-2: no agents; used only to send the first message.

Sequence

  1. Runtime-2 → (send) → MyAgent1 (on Runtime-1).
    Host records pending[target_client_id(Runtime-1)]["1"] = future.
  2. Inside MyAgent1 (Runtime-1), it forwards a message to Myagent2 (also Runtime-1).
    Host again records pending[target_client_id(Runtime-1)]["1"] = future → overwrites step 1.
  3. A response arrives for the first request; host tries to pop("1") but it’s already overwritten → KeyError('1').

Concrete reproduction process

  1. messages.py
from dataclasses import dataclass

@dataclass
class MyMessage:
    content: str
  1. autgoen_host.py
  • Please run the file separately first.
import asyncio
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost

async def main():
    host = GrpcWorkerAgentRuntimeHost(address="localhost:50051")
    host.start()  # Start a host service in the background.

    await asyncio.Event().wait()

if __name__ == '__main__':
    asyncio.run(main())
  1. autogen_runtime_with_agent.py
  • Please run the file separately first.
  • Runtime-1: registers MyAgent1, MyAgent2 (both agents live on the same runtime).
import asyncio
from autogen_core import MessageContext, RoutedAgent, message_handler, AgentId, rpc, try_get_known_serializers_for_type
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
from messages import MyMessage

class MyAgent1(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__("My agent1")
        self._name = name
        self._counter = 0

    @rpc
    async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
        content = f"{self._name} received message : {message.content}"
        my_agent2_result : MyMessage = await self.send_message(MyMessage(content=message.content), AgentId("MyAgent2", "default"))
        content += f"\n{self._name} received from MyAgent2 : {my_agent2_result.content}"
        return MyMessage(content=content)

class MyAgent2(RoutedAgent):
    def __init__(self, name: str) -> None:
        super().__init__("My agent2")
        self._name = name

    @rpc
    async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
        content = f"{self._name} received message : {message.content}"
        return MyMessage(content=content)

async def main():
    runtime1 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
    await runtime1.start()
    runtime1.add_message_serializer(try_get_known_serializers_for_type(MyMessage))
    await MyAgent1.register(runtime1, "MyAgent1", lambda: MyAgent1("my_agent1"))
    await MyAgent2.register(runtime1, "MyAgent2", lambda: MyAgent2("my_agent2"))
    await asyncio.Event().wait()


if __name__ == '__main__':
    asyncio.run(main())
  1. send_message.py
import asyncio

from autogen_core import try_get_known_serializers_for_type, AgentId
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
from messages import MyMessage

async def main():
    runtime2 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
    await runtime2.start()
    runtime2.add_message_serializer(try_get_known_serializers_for_type(MyMessage))
    result : MyMessage = await runtime2.send_message(MyMessage(content="work2 message"), AgentId("MyAgent1", "default"))
    print(result.content)
    await runtime2.stop()

if __name__ == '__main__':
    asyncio.run(main())

Execution results

  1. send message from runtime2
result : MyMessage = await runtime2.send_message(MyMessage(content="work2 message"), AgentId("MyAgent1", "default"))
  1. register target_client_id, request_id in GrpcWorkerAgentRuntimeHostServicer._pending_responses

target_client_id : 389797f4-200a-4bc5-af3b-92f71868e4b2
request_id : 1

future = asyncio.get_event_loop().create_future()
self._pending_responses.setdefault(target_client_id, {})[request.request_id] = future
# Create a task to wait for the response and send it back to the client.
send_response_task = asyncio.create_task(self._wait_and_send_response(future, client_id))

  1. Handle messages received in MyAgent1 and send them to MyAgent2
@rpc
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
    content = f"{self._name} received message : {message.content}"
    my_agent2_result : MyMessage = await self.send_message(MyMessage(content=message.content), AgentId("MyAgent2", "default"))
    content += f"\n{self._name} received from MyAgent2 : {my_agent2_result.content}"
    return MyMessage(content=content)
  1. register target_client_id, request_id in GrpcWorkerAgentRuntimeHostServicer._pending_responses
  • Although it is a newly created message, it has the same target_client_id and request_id as before.
  • _Overwriting occurs in self.pending_responses.

target_client_id : 389797f4-200a-4bc5-af3b-92f71868e4b2
request_id : 1

future = asyncio.get_event_loop().create_future()
self._pending_responses.setdefault(target_client_id, {})[request.request_id] = future
# Create a task to wait for the response and send it back to the client.
send_response_task = asyncio.create_task(self._wait_and_send_response(future, client_id))

  1. Handle messages received in MyAgent2 and return response
@rpc
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
    content = f"{self._name} received message : {message.content}"
    return MyMessage(content=content)
  1. Remove the values corresponding to client_id and request_id from GrpcWorkerAgentRuntimeHostServicer._pending_responses.

target_client_id : 389797f4-200a-4bc5-af3b-92f71868e4b2
request_id : 1

async def _process_response(self, response: agent_worker_pb2.RpcResponse, client_id: ClientConnectionId) -> None:
# Setting the result of the future will send the response back to the original sender.
future = self._pending_responses[client_id].pop(response.request_id)
future.set_result(response)

  1. MyAgent1 sends the response received from MyAgent2 as a response.
@rpc
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
    content = f"{self._name} received message : {message.content}"
    my_agent2_result : MyMessage = await self.send_message(MyMessage(content=message.content), AgentId("MyAgent2", "default"))
    content += f"\n{self._name} received from MyAgent2 : {my_agent2_result.content}"
    return MyMessage(content=content)
  1. Remove the values corresponding to client_id and request_id from GrpcWorkerAgentRuntimeHostServicer._pending_responses.

target_client_id : 389797f4-200a-4bc5-af3b-92f71868e4b2
request_id : 1

File "/Users/user/Documents/GitHub/autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py", line 270, in _process_response
    future = self._pending_responses[client_id].pop(response.request_id)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: '1'

Expected behavior

  • The error should not occur, and a normal response should be received.
  • If the usage is not intended, it should be prevented or error information should be displayed to alert the user.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context

  • When executing sequentially using the Pycharm debugger, the error did not occur in some cases.
  • It seems to only reproduce on Mac, not on Windows. (I'm not entirely sure.).

Which packages was the bug in?

Python Extensions (autogen-ext)

AutoGen library version.

Python 0.7.4

Other library version.

No response

Model used

No response

Model provider

None

Other model provider

No response

Python version

3.11

.NET version

None

Operating system

MacOS

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions