-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
What happened?
Describe the bug
-
_pending_responses key collision when multiple senders target the same runtime with identical request_id → KeyError('1')
-
When two different
GrpcWorkerAgentRuntime
clients send messages that ultimately target the same runtime , the host stores response waiters in _pending_responses under target_client_id only, keyed by request_id string.
Because each sender starts its local request counter from 1, both messages use request_id == "1". The second insert overwrites the first entry; later, when popping on response, the key is missing → KeyError('1').
This appears to be a design bug in how the host keys pending responses; it ignores the sender and assumes request_id is unique per target runtime.
Exception in callback GrpcWorkerAgentRuntimeHostServicer._raise_on_exception(<Task finishe...KeyError('1')>)
handle: <Handle GrpcWorkerAgentRuntimeHostServicer._raise_on_exception(<Task finishe...KeyError('1')>)>
Traceback (most recent call last):
File "/Users/user/miniconda3/envs/autogen_0.5.7/lib/python3.12/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "/Users/user/Documents/GitHub/autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py", line 183, in _raise_on_exception
raise exception
File "/Users/user/miniconda3/envs/autogen_0.5.7/lib/python3.12/asyncio/tasks.py", line 314, in __step_run_and_handle_result
result = coro.send(None)
^^^^^^^^^^^^^^^
File "/Users/user/Documents/GitHub/autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py", line 268, in _process_response
future = self._pending_responses[client_id].pop(response.request_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: '1'
To Reproduce
Topology
- Host: GrpcWorkerAgentRuntimeHost on localhost:50051.
- Runtime-1: registers MyAgent1, MyAgent2 (both agents live on the same runtime).
- Runtime-2: no agents; used only to send the first message.
Sequence
- Runtime-2 → (send) → MyAgent1 (on Runtime-1).
Host records pending[target_client_id(Runtime-1)]["1"] = future. - Inside MyAgent1 (Runtime-1), it forwards a message to Myagent2 (also Runtime-1).
Host again records pending[target_client_id(Runtime-1)]["1"] = future → overwrites step 1. - A response arrives for the first request; host tries to pop("1") but it’s already overwritten → KeyError('1').
Concrete reproduction process
- messages.py
from dataclasses import dataclass
@dataclass
class MyMessage:
content: str
- autgoen_host.py
- Please run the file separately first.
import asyncio
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost
async def main():
host = GrpcWorkerAgentRuntimeHost(address="localhost:50051")
host.start() # Start a host service in the background.
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(main())
- autogen_runtime_with_agent.py
- Please run the file separately first.
- Runtime-1: registers MyAgent1, MyAgent2 (both agents live on the same runtime).
import asyncio
from autogen_core import MessageContext, RoutedAgent, message_handler, AgentId, rpc, try_get_known_serializers_for_type
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
from messages import MyMessage
class MyAgent1(RoutedAgent):
def __init__(self, name: str) -> None:
super().__init__("My agent1")
self._name = name
self._counter = 0
@rpc
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
content = f"{self._name} received message : {message.content}"
my_agent2_result : MyMessage = await self.send_message(MyMessage(content=message.content), AgentId("MyAgent2", "default"))
content += f"\n{self._name} received from MyAgent2 : {my_agent2_result.content}"
return MyMessage(content=content)
class MyAgent2(RoutedAgent):
def __init__(self, name: str) -> None:
super().__init__("My agent2")
self._name = name
@rpc
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
content = f"{self._name} received message : {message.content}"
return MyMessage(content=content)
async def main():
runtime1 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await runtime1.start()
runtime1.add_message_serializer(try_get_known_serializers_for_type(MyMessage))
await MyAgent1.register(runtime1, "MyAgent1", lambda: MyAgent1("my_agent1"))
await MyAgent2.register(runtime1, "MyAgent2", lambda: MyAgent2("my_agent2"))
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(main())
- send_message.py
import asyncio
from autogen_core import try_get_known_serializers_for_type, AgentId
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
from messages import MyMessage
async def main():
runtime2 = GrpcWorkerAgentRuntime(host_address="localhost:50051")
await runtime2.start()
runtime2.add_message_serializer(try_get_known_serializers_for_type(MyMessage))
result : MyMessage = await runtime2.send_message(MyMessage(content="work2 message"), AgentId("MyAgent1", "default"))
print(result.content)
await runtime2.stop()
if __name__ == '__main__':
asyncio.run(main())
Execution results
- send message from runtime2
result : MyMessage = await runtime2.send_message(MyMessage(content="work2 message"), AgentId("MyAgent1", "default"))
- register
target_client_id
,request_id
inGrpcWorkerAgentRuntimeHostServicer._pending_responses
target_client_id : 389797f4-200a-4bc5-af3b-92f71868e4b2
request_id : 1
autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py
Lines 246 to 250 in 3107855
future = asyncio.get_event_loop().create_future() | |
self._pending_responses.setdefault(target_client_id, {})[request.request_id] = future | |
# Create a task to wait for the response and send it back to the client. | |
send_response_task = asyncio.create_task(self._wait_and_send_response(future, client_id)) |
- Handle messages received in MyAgent1 and send them to MyAgent2
@rpc
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
content = f"{self._name} received message : {message.content}"
my_agent2_result : MyMessage = await self.send_message(MyMessage(content=message.content), AgentId("MyAgent2", "default"))
content += f"\n{self._name} received from MyAgent2 : {my_agent2_result.content}"
return MyMessage(content=content)
- register
target_client_id
,request_id
inGrpcWorkerAgentRuntimeHostServicer._pending_responses
- Although it is a newly created message, it has the same target_client_id and request_id as before.
- _Overwriting occurs in self.pending_responses.
target_client_id : 389797f4-200a-4bc5-af3b-92f71868e4b2
request_id : 1
autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py
Lines 246 to 250 in 3107855
future = asyncio.get_event_loop().create_future() | |
self._pending_responses.setdefault(target_client_id, {})[request.request_id] = future | |
# Create a task to wait for the response and send it back to the client. | |
send_response_task = asyncio.create_task(self._wait_and_send_response(future, client_id)) |
- Handle messages received in MyAgent2 and return response
@rpc
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
content = f"{self._name} received message : {message.content}"
return MyMessage(content=content)
- Remove the values corresponding to
client_id
andrequest_id
fromGrpcWorkerAgentRuntimeHostServicer._pending_responses
.
target_client_id : 389797f4-200a-4bc5-af3b-92f71868e4b2
request_id : 1
autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py
Lines 266 to 269 in 3107855
async def _process_response(self, response: agent_worker_pb2.RpcResponse, client_id: ClientConnectionId) -> None: | |
# Setting the result of the future will send the response back to the original sender. | |
future = self._pending_responses[client_id].pop(response.request_id) | |
future.set_result(response) |
- MyAgent1 sends the response received from MyAgent2 as a response.
@rpc
async def my_message_handler(self, message: MyMessage, ctx: MessageContext) -> MyMessage:
content = f"{self._name} received message : {message.content}"
my_agent2_result : MyMessage = await self.send_message(MyMessage(content=message.content), AgentId("MyAgent2", "default"))
content += f"\n{self._name} received from MyAgent2 : {my_agent2_result.content}"
return MyMessage(content=content)
- Remove the values corresponding to
client_id
andrequest_id
fromGrpcWorkerAgentRuntimeHostServicer._pending_responses
.
target_client_id : 389797f4-200a-4bc5-af3b-92f71868e4b2
request_id : 1
- However, since the values have already been removed, a key error occurs.
autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py
Lines 266 to 269 in 3107855
async def _process_response(self, response: agent_worker_pb2.RpcResponse, client_id: ClientConnectionId) -> None: # Setting the result of the future will send the response back to the original sender. future = self._pending_responses[client_id].pop(response.request_id) future.set_result(response)
File "/Users/user/Documents/GitHub/autogen/python/packages/autogen-ext/src/autogen_ext/runtimes/grpc/_worker_runtime_host_servicer.py", line 270, in _process_response
future = self._pending_responses[client_id].pop(response.request_id)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: '1'
Expected behavior
- The error should not occur, and a normal response should be received.
- If the usage is not intended, it should be prevented or error information should be displayed to alert the user.
Screenshots
If applicable, add screenshots to help explain your problem.
Additional context
- When executing sequentially using the Pycharm debugger, the error did not occur in some cases.
- It seems to only reproduce on Mac, not on Windows. (I'm not entirely sure.).
Which packages was the bug in?
Python Extensions (autogen-ext)
AutoGen library version.
Python 0.7.4
Other library version.
No response
Model used
No response
Model provider
None
Other model provider
No response
Python version
3.11
.NET version
None
Operating system
MacOS