Skip to content

Commit a5786a1

Browse files
authored
fix(server): Improve event consumer error handling (#282)
# Description Enhances the event consumer to gracefully handle `pydantic.ValidationError` by logging the error and continuing execution instead of crashing. This improves the reliability of the event stream processing. **Changes:** - `src/a2a/server/events/event_consumer.py`: Added specific `except` block for `ValidationError`. - `tests/server/events/test_event_consumer.py`: Added a test case to verify the new error handling logic.
1 parent ecb321a commit a5786a1

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from collections.abc import AsyncGenerator
66

7+
from pydantic import ValidationError
8+
79
from a2a.server.events.event_queue import Event, EventQueue
810
from a2a.types import (
911
InternalError,
@@ -138,6 +140,9 @@ async def consume_all(self) -> AsyncGenerator[Event]:
138140
# python 3.12 and get a queue empty error on an open queue
139141
if self.queue.is_closed():
140142
break
143+
except ValidationError as e:
144+
logger.error(f"Invalid event format received: {e}")
145+
continue
141146
except Exception as e:
142147
logger.error(
143148
f'Stopping event consumption due to exception: {e}'

tests/server/events/test_event_consumer.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import asyncio
2-
32
from typing import Any
43
from unittest.mock import AsyncMock, MagicMock, patch
54

65
import pytest
6+
from pydantic import ValidationError
77

88
from a2a.server.events.event_consumer import EventConsumer, QueueClosed
99
from a2a.server.events.event_queue import EventQueue
@@ -343,3 +343,24 @@ def test_agent_task_callback_no_exception(event_consumer: EventConsumer):
343343

344344
assert event_consumer._exception is None # Should remain None
345345
mock_task.exception.assert_called_once()
346+
347+
348+
@pytest.mark.asyncio
349+
async def test_consume_all_handles_validation_error(
350+
event_consumer: EventConsumer, mock_event_queue: AsyncMock
351+
):
352+
"""Test that consume_all gracefully handles a pydantic.ValidationError."""
353+
# Simulate dequeue_event raising a ValidationError
354+
mock_event_queue.dequeue_event.side_effect = [
355+
ValidationError.from_exception_data(title="Test Error", line_errors=[]),
356+
asyncio.CancelledError # To stop the loop for the test
357+
]
358+
359+
with patch("a2a.server.events.event_consumer.logger.error") as logger_error_mock:
360+
with pytest.raises(asyncio.CancelledError):
361+
async for _ in event_consumer.consume_all():
362+
pass
363+
364+
# Check that the specific error was logged and the consumer continued
365+
logger_error_mock.assert_called_once()
366+
assert "Invalid event format received" in logger_error_mock.call_args[0][0]

0 commit comments

Comments
 (0)