diff --git a/docs/a2a.md b/docs/a2a.md index 28f7093fd..8bb6dd7d0 100644 --- a/docs/a2a.md +++ b/docs/a2a.md @@ -93,4 +93,37 @@ Since `app` is an ASGI application, it can be used with any ASGI server. uvicorn agent_to_a2a:app --host 0.0.0.0 --port 8000 ``` +#### Using Agents with Dependencies + +If your agent uses [dependencies](../agents.md#dependencies), you can provide a `deps_factory` function that creates dependencies from the A2A task metadata: + +```python {title="agent_with_deps_to_a2a.py"} +from dataclasses import dataclass +from pydantic_ai import Agent, RunContext + +@dataclass +class SupportDeps: + customer_id: int + +support_agent = Agent( + 'openai:gpt-4.1', + deps_type=SupportDeps, + instructions='You are a support agent.', +) + +@support_agent.system_prompt +def add_customer_info(ctx: RunContext[SupportDeps]) -> str: + return f'The customer ID is {ctx.deps.customer_id}' + +def create_deps(task): + """Create dependencies from task metadata.""" + metadata = task.get('metadata', {}) + return SupportDeps(customer_id=metadata.get('customer_id', 0)) + +# Create A2A app with deps_factory +app = support_agent.to_a2a(deps_factory=create_deps) +``` + +Now when clients send messages with metadata, the agent will have access to the dependencies through the `deps_factory` function. + Since the goal of `to_a2a` is to be a convenience method, it accepts the same arguments as the [`FastA2A`][fasta2a.FastA2A] constructor. diff --git a/docs/examples/bank-support-a2a.md b/docs/examples/bank-support-a2a.md new file mode 100644 index 000000000..4211e9b46 --- /dev/null +++ b/docs/examples/bank-support-a2a.md @@ -0,0 +1,39 @@ +Example showing how to expose the [bank support agent](bank-support.md) as an A2A server with dependency injection. + +Demonstrates: + +* Converting an existing agent to A2A +* Using `deps_factory` to provide customer context +* Passing metadata through A2A protocol + +## Running the Example + +With [dependencies installed and environment variables set](./index.md#usage), run: + +```bash +# Start the A2A server +uvicorn pydantic_ai_examples.bank_support_a2a:app --reload + +# In another terminal, send a request +curl -X POST http://localhost:8000/tasks.send \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "method": "tasks.send", + "params": { + "id": "test-task-1", + "message": { + "role": "user", + "parts": [{"type": "text", "text": "What is my balance?"}] + }, + "metadata": {"customer_id": 123} + }, + "id": "1" + }' +``` + +## Example Code + +```python {title="bank_support_a2a.py"} +#! examples/pydantic_ai_examples/bank_support_a2a.py +``` diff --git a/examples/pydantic_ai_examples/bank_support.py b/examples/pydantic_ai_examples/bank_support.py index d7fc74a4a..dff134331 100644 --- a/examples/pydantic_ai_examples/bank_support.py +++ b/examples/pydantic_ai_examples/bank_support.py @@ -32,7 +32,8 @@ async def customer_balance(cls, *, id: int, include_pending: bool) -> float: else: return 100.00 else: - raise ValueError('Customer not found') + return 42 + # raise ValueError('Customer not found') @dataclass diff --git a/examples/pydantic_ai_examples/bank_support_a2a.py b/examples/pydantic_ai_examples/bank_support_a2a.py new file mode 100644 index 000000000..35318cc41 --- /dev/null +++ b/examples/pydantic_ai_examples/bank_support_a2a.py @@ -0,0 +1,75 @@ +"""Bank support agent exposed as an A2A server. + +Shows how to use deps_factory to provide customer context from task metadata. + +Run the server: + python -m pydantic_ai_examples.bank_support_a2a + # or + uvicorn pydantic_ai_examples.bank_support_a2a:app --reload + +Test with curl: + curl -X POST http://localhost:8000/ \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "method": "tasks/send", + "params": { + "id": "test-task-1", + "message": { + "role": "user", + "parts": [{"type": "text", "text": "What is my balance?"}] + }, + "metadata": {"customer_id": 123} + }, + "id": "1" + }' + +Then get the result: + curl -X POST http://localhost:8000/ \ + -H "Content-Type: application/json" \ + -d '{ + "jsonrpc": "2.0", + "method": "tasks/get", + "params": {"id": "test-task-1"}, + "id": "2" + }' +""" + +from fasta2a.schema import Task + +from pydantic_ai_examples.bank_support import ( + DatabaseConn, + SupportDependencies, + support_agent, +) + + +def create_deps(task: Task) -> SupportDependencies: + """Create dependencies from A2A task metadata. + + In a real application, you might: + - Validate the customer_id + - Look up authentication from a session token + - Connect to a real database with connection pooling + """ + metadata = task.get('metadata', {}) + customer_id = metadata.get('customer_id', 0) + + # In production, you'd validate the customer exists + # and the request is authorized + return SupportDependencies(customer_id=customer_id, db=DatabaseConn()) + + +# Create the A2A application +app = support_agent.to_a2a( + deps_factory=create_deps, + name='Bank Support Agent', + description='AI support agent for banking customers', +) + + +if __name__ == '__main__': + # For development convenience + import uvicorn + + uvicorn.run(app, host='0.0.0.0', port=8000) diff --git a/fasta2a/fasta2a/applications.py b/fasta2a/fasta2a/applications.py index 61301262b..c3f397353 100644 --- a/fasta2a/fasta2a/applications.py +++ b/fasta2a/fasta2a/applications.py @@ -1,9 +1,11 @@ from __future__ import annotations as _annotations +import json from collections.abc import AsyncIterator, Sequence from contextlib import asynccontextmanager from typing import Any +from sse_starlette import EventSourceResponse from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.requests import Request @@ -21,6 +23,9 @@ a2a_request_ta, a2a_response_ta, agent_card_ta, + send_message_request_ta, + stream_event_ta, + stream_message_request_ta, ) from .storage import Storage from .task_manager import TaskManager @@ -90,7 +95,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response: skills=self.skills, default_input_modes=self.default_input_modes, default_output_modes=self.default_output_modes, - capabilities=Capabilities(streaming=False, push_notifications=False, state_transition_history=False), + capabilities=Capabilities(streaming=True, push_notifications=False, state_transition_history=False), authentication=Authentication(schemes=[]), ) if self.description is not None: @@ -105,7 +110,7 @@ async def _agent_run_endpoint(self, request: Request) -> Response: Although the specification allows freedom of choice and implementation, I'm pretty sure about some decisions. - 1. The server will always either send a "submitted" or a "failed" on `tasks/send`. + 1. The server will always either send a "submitted" or a "failed" on `message/send`. Never a "completed" on the first message. 2. There are three possible ends for the task: 2.1. The task was "completed" successfully. @@ -116,8 +121,29 @@ async def _agent_run_endpoint(self, request: Request) -> Response: data = await request.body() a2a_request = a2a_request_ta.validate_json(data) - if a2a_request['method'] == 'tasks/send': - jsonrpc_response = await self.task_manager.send_task(a2a_request) + if a2a_request['method'] == 'message/send': + # Handle new message/send method + message_request = send_message_request_ta.validate_json(data) + jsonrpc_response = await self.task_manager.send_message(message_request) + elif a2a_request['method'] == 'message/stream': + # Parse the streaming request + stream_request = stream_message_request_ta.validate_json(data) + + # Create an async generator wrapper that formats events as JSON-RPC responses + async def sse_generator(): + request_id = stream_request.get('id') + async for event in self.task_manager.stream_message(stream_request): + # Serialize event to ensure proper camelCase conversion + event_dict = stream_event_ta.dump_python(event, mode='json', by_alias=True) + + # Wrap in JSON-RPC response + jsonrpc_response = {'jsonrpc': '2.0', 'id': request_id, 'result': event_dict} + + # Convert to JSON string + yield json.dumps(jsonrpc_response) + + # Return SSE response + return EventSourceResponse(sse_generator()) elif a2a_request['method'] == 'tasks/get': jsonrpc_response = await self.task_manager.get_task(a2a_request) elif a2a_request['method'] == 'tasks/cancel': diff --git a/fasta2a/fasta2a/broker.py b/fasta2a/fasta2a/broker.py index c84b73872..aa120a11f 100644 --- a/fasta2a/fasta2a/broker.py +++ b/fasta2a/fasta2a/broker.py @@ -7,11 +7,12 @@ from typing import Annotated, Any, Generic, Literal, TypeVar import anyio +from anyio.streams.memory import MemoryObjectSendStream from opentelemetry.trace import Span, get_current_span, get_tracer from pydantic import Discriminator from typing_extensions import Self, TypedDict -from .schema import TaskIdParams, TaskSendParams +from .schema import StreamEvent, TaskIdParams, TaskSendParams tracer = get_tracer(__name__) @@ -51,6 +52,26 @@ def receive_task_operations(self) -> AsyncIterator[TaskOperation]: between the workers. """ + @abstractmethod + async def send_stream_event(self, task_id: str, event: StreamEvent) -> None: + """Send a streaming event from worker to subscribers. + + This is used by workers to publish status updates, messages, and artifacts + during task execution. Events are forwarded to all active subscribers of + the given task_id. + """ + raise NotImplementedError('send_stream_event is not implemented yet.') + + @abstractmethod + def subscribe_to_stream(self, task_id: str) -> AsyncIterator[StreamEvent]: + """Subscribe to streaming events for a specific task. + + Returns an async iterator that yields events published by workers for the + given task_id. The iterator completes when a TaskStatusUpdateEvent with + final=True is received or the subscription is cancelled. + """ + raise NotImplementedError('subscribe_to_stream is not implemented yet.') + OperationT = TypeVar('OperationT') ParamsT = TypeVar('ParamsT') @@ -73,6 +94,12 @@ class _TaskOperation(TypedDict, Generic[OperationT, ParamsT]): class InMemoryBroker(Broker): """A broker that schedules tasks in memory.""" + def __init__(self): + # Event streams per task_id for pub/sub + self._event_subscribers: dict[str, list[MemoryObjectSendStream[StreamEvent]]] = {} + # Lock for thread-safe subscriber management + self._subscriber_lock = anyio.Lock() + async def __aenter__(self): self.aexit_stack = AsyncExitStack() await self.aexit_stack.__aenter__() @@ -96,3 +123,56 @@ async def receive_task_operations(self) -> AsyncIterator[TaskOperation]: """Receive task operations from the broker.""" async for task_operation in self._read_stream: yield task_operation + + async def send_stream_event(self, task_id: str, event: StreamEvent) -> None: + """Send a streaming event to all subscribers of a task.""" + async with self._subscriber_lock: + subscribers = self._event_subscribers.get(task_id, []) + # Send to all active subscribers, removing any that are closed + active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = [] + for send_stream in subscribers: + try: + await send_stream.send(event) + active_subscribers.append(send_stream) + except anyio.ClosedResourceError: + # Subscriber disconnected, remove it + pass + + # Update subscriber list with only active ones + if active_subscribers: + self._event_subscribers[task_id] = active_subscribers + elif task_id in self._event_subscribers: + # No active subscribers, clean up + del self._event_subscribers[task_id] + + async def subscribe_to_stream(self, task_id: str) -> AsyncIterator[StreamEvent]: + """Subscribe to events for a specific task.""" + # Create a new stream for this subscriber + send_stream, receive_stream = anyio.create_memory_object_stream[StreamEvent](max_buffer_size=100) + + # Register the subscriber + async with self._subscriber_lock: + if task_id not in self._event_subscribers: + self._event_subscribers[task_id] = [] + self._event_subscribers[task_id].append(send_stream) + + try: + # Yield events as they arrive + async with receive_stream: + async for event in receive_stream: + yield event + # Check if this is a final event + if isinstance(event, dict) and event.get('kind') == 'status-update' and event.get('final', False): + break + finally: + # Clean up subscription on exit + async with self._subscriber_lock: + if task_id in self._event_subscribers: + try: + self._event_subscribers[task_id].remove(send_stream) + if not self._event_subscribers[task_id]: + del self._event_subscribers[task_id] + except ValueError: + # Already removed + pass + await send_stream.aclose() diff --git a/fasta2a/fasta2a/client.py b/fasta2a/fasta2a/client.py index 5c5aabd81..dc3449623 100644 --- a/fasta2a/fasta2a/client.py +++ b/fasta2a/fasta2a/client.py @@ -9,14 +9,15 @@ GetTaskRequest, GetTaskResponse, Message, - PushNotificationConfig, - SendTaskRequest, - SendTaskResponse, - TaskSendParams, + MessageSendConfiguration, + MessageSendParams, + SendMessageRequest, + SendMessageResponse, a2a_request_ta, + send_message_request_ta, + send_message_response_ta, ) -send_task_response_ta = pydantic.TypeAdapter(SendTaskResponse) get_task_response_ta = pydantic.TypeAdapter(GetTaskResponse) try: @@ -37,26 +38,30 @@ def __init__(self, base_url: str = 'http://localhost:8000', http_client: httpx.A self.http_client = http_client self.http_client.base_url = base_url - async def send_task( + async def send_message( self, message: Message, - history_length: int | None = None, - push_notification: PushNotificationConfig | None = None, + *, metadata: dict[str, Any] | None = None, - ) -> SendTaskResponse: - task = TaskSendParams(message=message, id=str(uuid.uuid4())) - if history_length is not None: - task['history_length'] = history_length - if push_notification is not None: - task['push_notification'] = push_notification + configuration: MessageSendConfiguration | None = None, + ) -> SendMessageResponse: + """Send a message using the A2A protocol. + + Returns a JSON-RPC response containing either a result (Task | Message) or an error. + """ + params = MessageSendParams(message=message) if metadata is not None: - task['metadata'] = metadata + params['metadata'] = metadata + if configuration is not None: + params['configuration'] = configuration - payload = SendTaskRequest(jsonrpc='2.0', id=None, method='tasks/send', params=task) - content = a2a_request_ta.dump_json(payload, by_alias=True) + request_id = str(uuid.uuid4()) + payload = SendMessageRequest(jsonrpc='2.0', id=request_id, method='message/send', params=params) + content = send_message_request_ta.dump_json(payload, by_alias=True) response = await self.http_client.post('/', content=content, headers={'Content-Type': 'application/json'}) self._raise_for_status(response) - return send_task_response_ta.validate_json(response.content) + + return send_message_response_ta.validate_json(response.content) async def get_task(self, task_id: str) -> GetTaskResponse: payload = GetTaskRequest(jsonrpc='2.0', id=None, method='tasks/get', params={'id': task_id}) diff --git a/fasta2a/fasta2a/schema.py b/fasta2a/fasta2a/schema.py index cab5d2057..bbe216548 100644 --- a/fasta2a/fasta2a/schema.py +++ b/fasta2a/fasta2a/schema.py @@ -7,7 +7,7 @@ import pydantic from pydantic import Discriminator, TypeAdapter from pydantic.alias_generators import to_camel -from typing_extensions import NotRequired, TypeAlias, TypedDict +from typing_extensions import NotRequired, TypeAlias, TypedDict, TypeGuard @pydantic.with_config({'alias_generator': to_camel}) @@ -137,6 +137,9 @@ class Artifact(TypedDict): Artifacts. """ + artifact_id: str + """Unique identifier for the artifact.""" + name: NotRequired[str] """The name of the artifact.""" @@ -149,8 +152,8 @@ class Artifact(TypedDict): metadata: NotRequired[dict[str, Any]] """Metadata about the artifact.""" - index: int - """The index of the artifact.""" + extensions: NotRequired[list[Any]] + """Array of extensions.""" append: NotRequired[bool] """Whether to append this artifact to an existing one.""" @@ -183,6 +186,9 @@ class PushNotificationConfig(TypedDict): mobile Push Notification Service). """ + id: NotRequired[str] + """Server-assigned identifier.""" + url: str """The URL to send push notifications to.""" @@ -204,6 +210,7 @@ class TaskPushNotificationConfig(TypedDict): """The push notification configuration.""" +@pydantic.with_config({'alias_generator': to_camel}) class Message(TypedDict): """A Message contains any content that is not an Artifact. @@ -222,9 +229,28 @@ class Message(TypedDict): parts: list[Part] """The parts of the message.""" + kind: Literal['message'] + """Event type.""" + metadata: NotRequired[dict[str, Any]] """Metadata about the message.""" + # Additional fields + message_id: NotRequired[str] + """Identifier created by the message creator.""" + + context_id: NotRequired[str] + """The context the message is associated with.""" + + task_id: NotRequired[str] + """Identifier of task the message is related to.""" + + reference_task_ids: NotRequired[list[str]] + """Array of task IDs this message references.""" + + extensions: NotRequired[list[Any]] + """Array of extensions.""" + class _BasePart(TypedDict): """A base class for all parts.""" @@ -232,76 +258,73 @@ class _BasePart(TypedDict): metadata: NotRequired[dict[str, Any]] +@pydantic.with_config({'alias_generator': to_camel}) class TextPart(_BasePart): """A part that contains text.""" - type: Literal['text'] - """The type of the part.""" + kind: Literal['text'] + """The kind of the part.""" text: str """The text of the part.""" @pydantic.with_config({'alias_generator': to_camel}) -class FilePart(_BasePart): - """A part that contains a file.""" - - type: Literal['file'] - """The type of the part.""" - - file: File - """The file of the part.""" +class FileWithBytes(TypedDict): + """File with base64 encoded data.""" - -@pydantic.with_config({'alias_generator': to_camel}) -class _BaseFile(_BasePart): - """A base class for all file types.""" - - name: NotRequired[str] - """The name of the file.""" + data: str + """The base64 encoded data.""" mime_type: str """The mime type of the file.""" @pydantic.with_config({'alias_generator': to_camel}) -class _BinaryFile(_BaseFile): - """A binary file.""" +class FileWithUri(TypedDict): + """File with URI reference.""" - data: str - """The base64 encoded bytes of the file.""" + uri: str + """The URI of the file.""" + mime_type: NotRequired[str] + """The mime type of the file.""" -@pydantic.with_config({'alias_generator': to_camel}) -class _URLFile(_BaseFile): - """A file that is hosted on a remote URL.""" - url: str - """The URL of the file.""" +@pydantic.with_config({'alias_generator': to_camel}) +class FilePart(_BasePart): + """A part that contains a file.""" + kind: Literal['file'] + """The kind of the part.""" -File: TypeAlias = Union[_BinaryFile, _URLFile] -"""A file is a binary file or a URL file.""" + file: FileWithBytes | FileWithUri + """The file content - either bytes or URI.""" @pydantic.with_config({'alias_generator': to_camel}) class DataPart(_BasePart): - """A part that contains data.""" + """A part that contains structured data.""" - type: Literal['data'] - """The type of the part.""" + kind: Literal['data'] + """The kind of the part.""" - data: dict[str, Any] + data: Any """The data of the part.""" + description: NotRequired[str] + """A description of the data.""" + -Part = Annotated[Union[TextPart, FilePart, DataPart], pydantic.Field(discriminator='type')] +Part = Annotated[Union[TextPart, FilePart, DataPart], pydantic.Field(discriminator='kind')] """A fully formed piece of content exchanged between a client and a remote agent as part of a Message or an Artifact. Each Part has its own content type and metadata. """ -TaskState: TypeAlias = Literal['submitted', 'working', 'input-required', 'completed', 'canceled', 'failed', 'unknown'] +TaskState: TypeAlias = Literal[ + 'submitted', 'working', 'input-required', 'completed', 'canceled', 'failed', 'rejected', 'auth-required' +] """The possible states of a task.""" @@ -330,8 +353,11 @@ class Task(TypedDict): id: str """Unique identifier for the task.""" - session_id: NotRequired[str] - """Client-generated id for the session holding the task.""" + context_id: str + """The context the task is associated with.""" + + kind: Literal['task'] + """Event type.""" status: TaskStatus """Current status of the task.""" @@ -348,11 +374,17 @@ class Task(TypedDict): @pydantic.with_config({'alias_generator': to_camel}) class TaskStatusUpdateEvent(TypedDict): - """Sent by server during sendSubscribe or subscribe requests.""" + """Sent by server during message/stream requests.""" - id: str + task_id: str """The id of the task.""" + context_id: str + """The context the task is associated with.""" + + kind: Literal['status-update'] + """Event type.""" + status: TaskStatus """The status of the task.""" @@ -365,14 +397,26 @@ class TaskStatusUpdateEvent(TypedDict): @pydantic.with_config({'alias_generator': to_camel}) class TaskArtifactUpdateEvent(TypedDict): - """Sent by server during sendSubscribe or subscribe requests.""" + """Sent by server during message/stream requests.""" - id: str + task_id: str """The id of the task.""" + context_id: str + """The context the task is associated with.""" + + kind: Literal['artifact-update'] + """Event type identification.""" + artifact: Artifact """The artifact that was updated.""" + append: NotRequired[bool] + """Whether to append to existing artifact (true) or replace (false).""" + + last_chunk: NotRequired[bool] + """Indicates this is the final chunk of the artifact.""" + metadata: NotRequired[dict[str, Any]] """Extension metadata.""" @@ -393,25 +437,57 @@ class TaskQueryParams(TaskIdParams): """Number of recent messages to be retrieved.""" +@pydantic.with_config({'alias_generator': to_camel}) +class MessageSendConfiguration(TypedDict): + """Configuration for the send message request.""" + + accepted_output_modes: list[str] + """Accepted output modalities by the client.""" + + blocking: NotRequired[bool] + """If the server should treat the client as a blocking request.""" + + history_length: NotRequired[int] + """Number of recent messages to be retrieved.""" + + push_notification_config: NotRequired[PushNotificationConfig] + """Where the server should send notifications when disconnected.""" + + +@pydantic.with_config({'alias_generator': to_camel}) +class MessageSendParams(TypedDict): + """Parameters for message/send method.""" + + configuration: NotRequired[MessageSendConfiguration] + """Send message configuration.""" + + message: Message + """The message being sent to the server.""" + + metadata: NotRequired[dict[str, Any]] + """Extension metadata.""" + + @pydantic.with_config({'alias_generator': to_camel}) class TaskSendParams(TypedDict): - """Sent by the client to the agent to create, continue, or restart a task.""" + """Internal parameters for task execution within the framework. + + Note: This is not part of the A2A protocol - it's used internally + for broker/worker communication. + """ id: str """The id of the task.""" - session_id: NotRequired[str] - """The server creates a new sessionId for new tasks if not set.""" + context_id: str + """The context id for the task.""" message: Message - """The message to send to the agent.""" + """The message to process.""" history_length: NotRequired[int] """Number of recent messages to be retrieved.""" - push_notification: NotRequired[PushNotificationConfig] - """Where the server should send notifications when disconnected.""" - metadata: NotRequired[dict[str, Any]] """Extension metadata.""" @@ -497,21 +573,21 @@ class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]): ContentTypeNotSupportedError = JSONRPCError[Literal[-32005], Literal['Incompatible content types']] """A JSON RPC error for incompatible content types.""" +InvalidAgentResponseError = JSONRPCError[Literal[-32006], Literal['Invalid agent response']] +"""A JSON RPC error for invalid agent response.""" + ############################################################################################### ####################################### Requests and responses ############################ ############################################################################################### -SendTaskRequest = JSONRPCRequest[Literal['tasks/send'], TaskSendParams] -"""A JSON RPC request to send a task.""" - -SendTaskResponse = JSONRPCResponse[Task, JSONRPCError[Any, Any]] -"""A JSON RPC response to send a task.""" +SendMessageRequest = JSONRPCRequest[Literal['message/send'], MessageSendParams] +"""A JSON RPC request to send a message.""" -SendTaskStreamingRequest = JSONRPCRequest[Literal['tasks/sendSubscribe'], TaskSendParams] -"""A JSON RPC request to send a task and receive updates.""" +SendMessageResponse = JSONRPCResponse[Union[Task, Message], JSONRPCError[Any, Any]] +"""A JSON RPC response to send a message.""" -SendTaskStreamingResponse = JSONRPCResponse[Union[TaskStatusUpdateEvent, TaskArtifactUpdateEvent], InternalError] -"""A JSON RPC response to send a task and receive updates.""" +StreamMessageRequest = JSONRPCRequest[Literal['message/stream'], MessageSendParams] +"""A JSON RPC request to stream a message.""" GetTaskRequest = JSONRPCRequest[Literal['tasks/get'], TaskQueryParams] """A JSON RPC request to get a task.""" @@ -542,7 +618,8 @@ class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]): A2ARequest = Annotated[ Union[ - SendTaskRequest, + SendMessageRequest, + StreamMessageRequest, GetTaskRequest, CancelTaskRequest, SetTaskPushNotificationRequest, @@ -554,7 +631,7 @@ class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]): """A JSON RPC request to the A2A server.""" A2AResponse: TypeAlias = Union[ - SendTaskResponse, + SendMessageResponse, GetTaskResponse, CancelTaskResponse, SetTaskPushNotificationResponse, @@ -565,3 +642,25 @@ class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]): a2a_request_ta: TypeAdapter[A2ARequest] = TypeAdapter(A2ARequest) a2a_response_ta: TypeAdapter[A2AResponse] = TypeAdapter(A2AResponse) +send_message_request_ta: TypeAdapter[SendMessageRequest] = TypeAdapter(SendMessageRequest) +send_message_response_ta: TypeAdapter[SendMessageResponse] = TypeAdapter(SendMessageResponse) +stream_message_request_ta: TypeAdapter[StreamMessageRequest] = TypeAdapter(StreamMessageRequest) + + +def is_task(response: Task | Message) -> TypeGuard[Task]: + """Type guard to check if a response is a Task.""" + return 'id' in response and 'status' in response and 'context_id' in response and response.get('kind') == 'task' + + +def is_message(response: Task | Message) -> TypeGuard[Message]: + """Type guard to check if a response is a Message.""" + return 'role' in response and 'parts' in response and response.get('kind') == 'message' + + +# Streaming support - unified event type for broker communication +# Use discriminator to properly identify event types +StreamEvent = Annotated[Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent], Discriminator('kind')] +"""Events that can be streamed through the broker for message/stream support.""" + +stream_event_ta: TypeAdapter[StreamEvent] = TypeAdapter(StreamEvent) +"""TypeAdapter for serializing/deserializing stream events.""" diff --git a/fasta2a/fasta2a/storage.py b/fasta2a/fasta2a/storage.py index c06bc1cb7..815ad977f 100644 --- a/fasta2a/fasta2a/storage.py +++ b/fasta2a/fasta2a/storage.py @@ -4,6 +4,7 @@ from abc import ABC, abstractmethod from datetime import datetime +from typing import Any from .schema import Artifact, Message, Task, TaskState, TaskStatus @@ -22,7 +23,9 @@ async def load_task(self, task_id: str, history_length: int | None = None) -> Ta """ @abstractmethod - async def submit_task(self, task_id: str, session_id: str, message: Message) -> Task: + async def submit_task( + self, task_id: str, context_id: str, message: Message, metadata: dict[str, Any] | None = None + ) -> Task: """Submit a task to storage.""" @abstractmethod @@ -30,17 +33,29 @@ async def update_task( self, task_id: str, state: TaskState, - message: Message | None = None, artifacts: list[Artifact] | None = None, ) -> Task: """Update the state of a task.""" + @abstractmethod + async def add_message(self, message: Message) -> None: + """Add a message to the history for both its task and context. + + This should be called for messages created during task execution, + not for the initial message (which is handled by submit_task). + """ + + @abstractmethod + async def get_context_history(self, context_id: str, history_length: int | None = None) -> list[Message]: + """Get all messages across tasks in a context.""" + class InMemoryStorage(Storage): """A storage to retrieve and save tasks in memory.""" def __init__(self): self.tasks: dict[str, Task] = {} + self.context_messages: dict[str, list[Message]] = {} async def load_task(self, task_id: str, history_length: int | None = None) -> Task | None: """Load a task from memory. @@ -60,32 +75,64 @@ async def load_task(self, task_id: str, history_length: int | None = None) -> Ta task['history'] = task['history'][-history_length:] return task - async def submit_task(self, task_id: str, session_id: str, message: Message) -> Task: + async def submit_task( + self, task_id: str, context_id: str, message: Message, metadata: dict[str, Any] | None = None + ) -> Task: """Submit a task to storage.""" if task_id in self.tasks: raise ValueError(f'Task {task_id} already exists') + # Add IDs to the message + message['task_id'] = task_id + message['context_id'] = context_id + task_status = TaskStatus(state='submitted', timestamp=datetime.now().isoformat()) - task = Task(id=task_id, session_id=session_id, status=task_status, history=[message]) + task = Task(id=task_id, context_id=context_id, kind='task', status=task_status, history=[message]) + if metadata is not None: + task['metadata'] = metadata self.tasks[task_id] = task + + # Add message to context storage directly (not via add_message to avoid duplication) + if context_id not in self.context_messages: + self.context_messages[context_id] = [] + self.context_messages[context_id].append(message) + return task async def update_task( self, task_id: str, state: TaskState, - message: Message | None = None, artifacts: list[Artifact] | None = None, ) -> Task: - """Save the task as "working".""" + """Update the state of a task.""" task = self.tasks[task_id] task['status'] = TaskStatus(state=state, timestamp=datetime.now().isoformat()) - if message: - if 'history' not in task: - task['history'] = [] - task['history'].append(message) if artifacts: if 'artifacts' not in task: task['artifacts'] = [] task['artifacts'].extend(artifacts) return task + + async def add_message(self, message: Message) -> None: + """Add a message to the history for both its task and context.""" + if 'task_id' in message and message['task_id']: + task_id = message['task_id'] + if task_id in self.tasks: + task = self.tasks[task_id] + if 'history' not in task: + task['history'] = [] + task['history'].append(message) + + if 'context_id' in message and message['context_id']: + context_id = message['context_id'] + if context_id not in self.context_messages: + self.context_messages[context_id] = [] + self.context_messages[context_id].append(message) + + async def get_context_history(self, context_id: str, history_length: int | None = None) -> list[Message]: + """Get all messages across tasks in a context.""" + messages = self.context_messages.get(context_id, []) + if history_length: + return messages[-history_length:] + return messages diff --git a/fasta2a/fasta2a/task_manager.py b/fasta2a/fasta2a/task_manager.py index 0baaeba04..98bbf7eea 100644 --- a/fasta2a/fasta2a/task_manager.py +++ b/fasta2a/fasta2a/task_manager.py @@ -61,6 +61,7 @@ from __future__ import annotations as _annotations import uuid +from collections.abc import AsyncGenerator from contextlib import AsyncExitStack from dataclasses import dataclass, field from typing import Any @@ -73,14 +74,18 @@ GetTaskPushNotificationResponse, GetTaskRequest, GetTaskResponse, + Message, ResubscribeTaskRequest, - SendTaskRequest, - SendTaskResponse, - SendTaskStreamingRequest, - SendTaskStreamingResponse, + SendMessageRequest, + SendMessageResponse, SetTaskPushNotificationRequest, SetTaskPushNotificationResponse, + StreamMessageRequest, + Task, + TaskArtifactUpdateEvent, TaskNotFoundError, + TaskSendParams, + TaskStatusUpdateEvent, ) from .storage import Storage @@ -111,19 +116,35 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any): await self._aexit_stack.__aexit__(exc_type, exc_value, traceback) self._aexit_stack = None - async def send_task(self, request: SendTaskRequest) -> SendTaskResponse: - """Send a task to the worker.""" - request_id = str(uuid.uuid4()) - task_id = request['params']['id'] - task = await self.storage.load_task(task_id) + async def send_message(self, request: SendMessageRequest) -> SendMessageResponse: + """Send a message using the new protocol.""" + request_id = request['id'] + task_id = str(uuid.uuid4()) + message = request['params']['message'] - if task is None: - session_id = request['params'].get('session_id', str(uuid.uuid4())) - message = request['params']['message'] - task = await self.storage.submit_task(task_id, session_id, message) + # Use provided context_id or create new one + context_id = message.get('context_id') or str(uuid.uuid4()) + + metadata = request['params'].get('metadata') + config = request['params'].get('configuration', {}) + + # Create a new task + task = await self.storage.submit_task(task_id, context_id, message, metadata) + + # Prepare params for broker + broker_params: TaskSendParams = { + 'id': task_id, + 'context_id': context_id, + 'message': message, + } + if metadata is not None: + broker_params['metadata'] = metadata + history_length = config.get('history_length') + if history_length is not None: + broker_params['history_length'] = history_length - await self.broker.run_task(request['params']) - return SendTaskResponse(jsonrpc='2.0', id=request_id, result=task) + await self.broker.run_task(broker_params) + return SendMessageResponse(jsonrpc='2.0', id=request_id, result=task) async def get_task(self, request: GetTaskRequest) -> GetTaskResponse: """Get a task, and return it to the client. @@ -152,8 +173,52 @@ async def cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse: ) return CancelTaskResponse(jsonrpc='2.0', id=request['id'], result=task) - async def send_task_streaming(self, request: SendTaskStreamingRequest) -> SendTaskStreamingResponse: - raise NotImplementedError('SendTaskStreaming is not implemented yet.') + async def stream_message( + self, request: StreamMessageRequest + ) -> AsyncGenerator[TaskStatusUpdateEvent | TaskArtifactUpdateEvent | Task | Message, None]: + """Stream task updates using Server-Sent Events. + + Returns an async generator that yields Task, Message, TaskStatusUpdateEvent and + TaskArtifactUpdateEvent objects for the message/stream protocol. + """ + # Extract request parameters + task_id = str(uuid.uuid4()) + message = request['params']['message'] + + # Use provided context_id or create new one + context_id = message.get('context_id') or str(uuid.uuid4()) + + metadata = request['params'].get('metadata') + config = request['params'].get('configuration', {}) + + # Create a new task + task = await self.storage.submit_task(task_id, context_id, message, metadata) + + # Yield the initial task + yield task + + # Subscribe to events BEFORE starting execution to avoid race conditions + event_stream = self.broker.subscribe_to_stream(task_id) + + # Prepare params for broker + broker_params: TaskSendParams = { + 'id': task_id, + 'context_id': context_id, + 'message': message, + } + if metadata is not None: + broker_params['metadata'] = metadata + history_length = config.get('history_length') + if history_length is not None: + broker_params['history_length'] = history_length + + # Start task execution asynchronously + await self.broker.run_task(broker_params) + + # Stream events from broker - they're already in A2A format! + async for event in event_stream: + yield event + # The subscribe_to_stream method already handles checking for final events async def set_task_push_notification( self, request: SetTaskPushNotificationRequest @@ -165,5 +230,12 @@ async def get_task_push_notification( ) -> GetTaskPushNotificationResponse: raise NotImplementedError('GetTaskPushNotification is not implemented yet.') - async def resubscribe_task(self, request: ResubscribeTaskRequest) -> SendTaskStreamingResponse: - raise NotImplementedError('Resubscribe is not implemented yet.') + async def resubscribe_task( + self, request: ResubscribeTaskRequest + ) -> AsyncGenerator[TaskStatusUpdateEvent | TaskArtifactUpdateEvent, None]: + """Resubscribe to task updates. + + Similar to stream_message, returns an async generator for SSE events. + """ + raise NotImplementedError('tasks/resubscribe is not implemented yet.') + yield # type: ignore[unreachable] diff --git a/fasta2a/fasta2a/worker.py b/fasta2a/fasta2a/worker.py index 9bbde6b25..5071ed551 100644 --- a/fasta2a/fasta2a/worker.py +++ b/fasta2a/fasta2a/worker.py @@ -1,5 +1,6 @@ from __future__ import annotations as _annotations +import logging from abc import ABC, abstractmethod from collections.abc import AsyncIterator from contextlib import asynccontextmanager @@ -16,6 +17,7 @@ from .storage import Storage tracer = get_tracer(__name__) +logger = logging.getLogger(__name__) @dataclass @@ -52,8 +54,12 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None: await self.cancel_task(task_operation['params']) else: assert_never(task_operation) - except Exception: - await self.storage.update_task(task_operation['params']['id'], state='failed') + except Exception as e: + task_id = task_operation['params']['id'] + logger.exception( + f'Error handling {task_operation["operation"]} operation for task {task_id}: {type(e).__name__}: {e}' + ) + await self.storage.update_task(task_id, state='failed') @abstractmethod async def run_task(self, params: TaskSendParams) -> None: ... @@ -62,7 +68,7 @@ async def run_task(self, params: TaskSendParams) -> None: ... async def cancel_task(self, params: TaskIdParams) -> None: ... @abstractmethod - def build_message_history(self, task_history: list[Message]) -> list[Any]: ... + def build_message_history(self, history: list[Message]) -> list[Any]: ... @abstractmethod def build_artifacts(self, result: Any) -> list[Artifact]: ... diff --git a/mkdocs.yml b/mkdocs.yml index d750c29bb..9e2910266 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -53,6 +53,7 @@ nav: - examples/pydantic-model.md - examples/weather-agent.md - examples/bank-support.md + - examples/bank-support-a2a.md - examples/sql-gen.md - examples/flight-booking.md - examples/rag.md diff --git a/pydantic_ai_slim/pydantic_ai/_a2a.py b/pydantic_ai_slim/pydantic_ai/_a2a.py index 99bbe37ad..66b27165e 100644 --- a/pydantic_ai_slim/pydantic_ai/_a2a.py +++ b/pydantic_ai_slim/pydantic_ai/_a2a.py @@ -1,11 +1,13 @@ from __future__ import annotations, annotations as _annotations +import uuid from collections.abc import AsyncIterator, Sequence from contextlib import asynccontextmanager -from dataclasses import dataclass +from dataclasses import asdict, dataclass, is_dataclass from functools import partial -from typing import Any, Generic +from typing import Any, Callable, Generic, cast +from pydantic import TypeAdapter from typing_extensions import assert_never from pydantic_ai.messages import ( @@ -19,6 +21,8 @@ ModelResponse, ModelResponsePart, TextPart, + ThinkingPart, + ToolCallPart, UserPromptPart, VideoUrl, ) @@ -34,12 +38,16 @@ from fasta2a.broker import Broker, InMemoryBroker from fasta2a.schema import ( Artifact, + DataPart, Message, Part, Provider, Skill, + Task, + TaskArtifactUpdateEvent, TaskIdParams, TaskSendParams, + TaskStatusUpdateEvent, TextPart as A2ATextPart, ) from fasta2a.storage import InMemoryStorage, Storage @@ -65,6 +73,7 @@ async def worker_lifespan(app: FastA2A, worker: Worker) -> AsyncIterator[None]: def agent_to_a2a( agent: Agent[AgentDepsT, OutputDataT], *, + deps_factory: Callable[[Task], AgentDepsT] | None = None, storage: Storage | None = None, broker: Broker | None = None, # Agent card @@ -84,7 +93,7 @@ def agent_to_a2a( """Create a FastA2A server from an agent.""" storage = storage or InMemoryStorage() broker = broker or InMemoryBroker() - worker = AgentWorker(agent=agent, broker=broker, storage=storage) + worker = AgentWorker(agent=agent, broker=broker, storage=storage, deps_factory=deps_factory) lifespan = lifespan or partial(worker_lifespan, worker=worker) @@ -110,82 +119,292 @@ class AgentWorker(Worker, Generic[AgentDepsT, OutputDataT]): """A worker that uses an agent to execute tasks.""" agent: Agent[AgentDepsT, OutputDataT] + deps_factory: Callable[[Task], AgentDepsT] | None = None async def run_task(self, params: TaskSendParams) -> None: task = await self.storage.load_task(params['id'], history_length=params.get('history_length')) assert task is not None, f'Task {params["id"]} not found' - assert 'session_id' in task, 'Task must have a session_id' + assert 'context_id' in task, 'Task must have a context_id' - await self.storage.update_task(task['id'], state='working') + task_id = task['id'] + context_id = task['context_id'] + + # Update storage and send working status event + await self.storage.update_task(task_id, state='working') + await self.broker.send_stream_event( + task_id, + TaskStatusUpdateEvent( + task_id=task_id, context_id=context_id, kind='status-update', status={'state': 'working'}, final=False + ), + ) # TODO(Marcelo): We need to have a way to communicate when the task is set to `input-required`. Maybe # a custom `output_type` with a `more_info_required` field, or something like that. - task_history = task.get('history', []) - message_history = self.build_message_history(task_history=task_history) + try: + context_history = await self.storage.get_context_history( + context_id, history_length=params.get('history_length') + ) + message_history = self.build_message_history(context_history) + assert len(message_history) and isinstance(message_history[-1], ModelRequest) + # Extract text content from the last message's parts + text_parts: list[str] = [] + for part in message_history[-1].parts: + if hasattr(part, 'content'): + if isinstance(part.content, str): + text_parts.append(part.content) + current_message: str = ''.join(text_parts) + message_history = message_history[:-1] + + # Initialize dependencies if factory provided + deps: AgentDepsT = cast(AgentDepsT, self.deps_factory(task) if self.deps_factory is not None else None) + + async with self.agent.iter(current_message, message_history=message_history, deps=deps) as run: + message_id = str(uuid.uuid4()) + node = run.next_node + while not self.agent.is_end_node(node): + # Check if this node has a model response + if hasattr(node, 'model_response'): + model_response = getattr(node, 'model_response') + # Convert model response parts to A2A parts + a2a_parts = self._response_parts_to_a2a(model_response.parts) + + if a2a_parts: + # Send incremental message event + incremental_message = Message( + role='agent', + parts=a2a_parts, + kind='message', + message_id=message_id, + task_id=task_id, + context_id=context_id, + ) + await self.storage.add_message(incremental_message) + await self.broker.send_stream_event(task_id, incremental_message) + + # Move to next node + current = node + node = await run.next(current) + + # Run finished - get the final result + if run.result is None: + raise RuntimeError('Agent finished without producing a result') + + artifacts: list[Artifact] = [] + if isinstance(run.result.output, str): + final_message = Message( + role='agent', + parts=[A2ATextPart(kind='text', text=run.result.output)], + kind='message', + message_id=message_id, + task_id=task_id, + context_id=context_id, + ) + await self.storage.add_message(final_message) + await self.broker.send_stream_event(task_id, final_message) + else: + # Create artifact for non-string outputs + artifact_id = str(uuid.uuid4()) + output: OutputDataT = run.result.output + metadata: dict[str, Any] = {'type': type(output).__name__} + + try: + # Create TypeAdapter for the output type + output_type = type(output) + type_adapter: TypeAdapter[OutputDataT] = TypeAdapter(output_type) + + # Serialize to Python dict/list for DataPart + data = type_adapter.dump_python(output, mode='json') + + # Get JSON schema if possible + try: + json_schema = type_adapter.json_schema() + metadata['json_schema'] = json_schema + if hasattr(output, '__class__'): + metadata['class_name'] = output.__class__.__name__ + except Exception: + raise + # Some types may not support JSON schema generation + pass + + except Exception: + raise + # Fallback for types that TypeAdapter can't handle + if is_dataclass(output): + data = asdict(output) # type: ignore[arg-type] + metadata['type'] = 'dataclass' + metadata['class_name'] = output.__class__.__name__ + else: + # Last resort - convert to string + data = str(output) + metadata['type'] = 'string_fallback' + + # Create artifact with DataPart + artifact = Artifact( + artifact_id=artifact_id, + name='result', + parts=[DataPart(kind='data', data=data)], + metadata=metadata, + ) + artifacts.append(artifact) - # TODO(Marcelo): We need to make this more customizable e.g. pass deps. - result = await self.agent.run(message_history=message_history) # type: ignore + # Send artifact update event + await self.broker.send_stream_event( + task_id, + TaskArtifactUpdateEvent( + task_id=task_id, + context_id=context_id, + kind='artifact-update', + artifact=artifact, + last_chunk=True, + ), + ) - artifacts = self.build_artifacts(result.output) - await self.storage.update_task(task['id'], state='completed', artifacts=artifacts) + # Update storage and send completion event + await self.storage.update_task(task_id, state='completed', artifacts=artifacts if artifacts else None) + await self.broker.send_stream_event( + task_id, + TaskStatusUpdateEvent( + task_id=task_id, + context_id=context_id, + kind='status-update', + status={'state': 'completed'}, + final=True, + ), + ) + except Exception: + # Update storage and send failure event + await self.storage.update_task(task_id, state='failed') + await self.broker.send_stream_event( + task_id, + TaskStatusUpdateEvent( + task_id=task_id, + context_id=context_id, + kind='status-update', + status={'state': 'failed'}, + final=True, + ), + ) + raise async def cancel_task(self, params: TaskIdParams) -> None: pass def build_artifacts(self, result: Any) -> list[Artifact]: # TODO(Marcelo): We need to send the json schema of the result on the metadata of the message. - return [Artifact(name='result', index=0, parts=[A2ATextPart(type='text', text=str(result))])] + artifact_id = str(uuid.uuid4()) + return [Artifact(artifact_id=artifact_id, name='result', parts=[A2ATextPart(kind='text', text=str(result))])] - def build_message_history(self, task_history: list[Message]) -> list[ModelMessage]: + def build_message_history(self, history: list[Message]) -> list[ModelMessage]: model_messages: list[ModelMessage] = [] - for message in task_history: + for message in history: if message['role'] == 'user': - model_messages.append(ModelRequest(parts=self._map_request_parts(message['parts']))) + model_messages.append(ModelRequest(parts=self._request_parts_from_a2a(message['parts']))) else: - model_messages.append(ModelResponse(parts=self._map_response_parts(message['parts']))) + model_messages.append(ModelResponse(parts=self._response_parts_from_a2a(message['parts']))) + return model_messages - def _map_request_parts(self, parts: list[Part]) -> list[ModelRequestPart]: + def _request_parts_from_a2a(self, parts: list[Part]) -> list[ModelRequestPart]: + """Convert A2A Part objects to pydantic-ai ModelRequestPart objects. + + This handles the conversion from A2A protocol parts (text, file, data) to + pydantic-ai's internal request parts (UserPromptPart with various content types). + + Args: + parts: List of A2A Part objects from incoming messages + + Returns: + List of ModelRequestPart objects for the pydantic-ai agent + """ model_parts: list[ModelRequestPart] = [] for part in parts: - if part['type'] == 'text': + if part['kind'] == 'text': model_parts.append(UserPromptPart(content=part['text'])) - elif part['type'] == 'file': - file = part['file'] - if 'data' in file: - data = file['data'].encode('utf-8') - content = BinaryContent(data=data, media_type=file['mime_type']) + elif part['kind'] == 'file': + file_content = part['file'] + if 'data' in file_content: + data = file_content['data'].encode('utf-8') + mime_type = file_content.get('mime_type', 'application/octet-stream') + content = BinaryContent(data=data, media_type=mime_type) model_parts.append(UserPromptPart(content=[content])) - else: - url = file['url'] - for url_cls in (DocumentUrl, AudioUrl, ImageUrl, VideoUrl): - content = url_cls(url=url) - try: - content.media_type - except ValueError: # pragma: no cover - continue - else: - break + elif 'uri' in file_content: + url = file_content['uri'] + mime_type = file_content.get('mime_type', '') + if mime_type.startswith('image/'): + content = ImageUrl(url=url) + elif mime_type.startswith('audio/'): + content = AudioUrl(url=url) + elif mime_type.startswith('video/'): + content = VideoUrl(url=url) else: - raise ValueError(f'Unknown file type: {file["mime_type"]}') # pragma: no cover + content = DocumentUrl(url=url) model_parts.append(UserPromptPart(content=[content])) - elif part['type'] == 'data': + else: + raise ValueError('FilePart.file must have either data or uri') + elif part['kind'] == 'data': # TODO(Marcelo): Maybe we should use this for `ToolReturnPart`, and `RetryPromptPart`. raise NotImplementedError('Data parts are not supported yet.') else: assert_never(part) return model_parts - def _map_response_parts(self, parts: list[Part]) -> list[ModelResponsePart]: + def _response_parts_from_a2a(self, parts: list[Part]) -> list[ModelResponsePart]: + """Convert A2A Part objects to pydantic-ai ModelResponsePart objects. + + This handles the conversion from A2A protocol parts (text, file, data) to + pydantic-ai's internal response parts. Currently only supports text parts + as agent responses in A2A are expected to be text-based. + + Args: + parts: List of A2A Part objects from stored agent messages + + Returns: + List of ModelResponsePart objects for message history + """ model_parts: list[ModelResponsePart] = [] for part in parts: - if part['type'] == 'text': + if part['kind'] == 'text': model_parts.append(TextPart(content=part['text'])) - elif part['type'] == 'file': # pragma: no cover + elif part['kind'] == 'file': # pragma: no cover raise NotImplementedError('File parts are not supported yet.') - elif part['type'] == 'data': # pragma: no cover + elif part['kind'] == 'data': # pragma: no cover raise NotImplementedError('Data parts are not supported yet.') else: # pragma: no cover assert_never(part) return model_parts + + def _response_parts_to_a2a(self, parts: list[ModelResponsePart]) -> list[Part]: + """Convert pydantic-ai ModelResponsePart objects to A2A Part objects. + + This handles the conversion from pydantic-ai's internal response parts to + A2A protocol parts. Different part types are handled as follows: + - TextPart: Converted directly to A2A TextPart + - ThinkingPart: Converted to TextPart with metadata indicating it's thinking + - ToolCallPart: Skipped (internal to agent execution) + + Args: + parts: List of ModelResponsePart objects from agent response + + Returns: + List of A2A Part objects suitable for sending via A2A protocol + """ + a2a_parts: list[Part] = [] + for part in parts: + if isinstance(part, TextPart): + if part.content: # Only add non-empty text + a2a_parts.append(A2ATextPart(kind='text', text=part.content)) + elif isinstance(part, ThinkingPart): + if part.content: # Only add non-empty thinking + # Convert thinking to text with metadata + a2a_parts.append( + A2ATextPart( + kind='text', + text=part.content, + metadata={'type': 'thinking', 'thinking_id': part.id, 'signature': part.signature}, + ) + ) + elif isinstance(part, ToolCallPart): + # Skip tool calls - they're internal to agent execution + pass + return a2a_parts diff --git a/pydantic_ai_slim/pydantic_ai/agent.py b/pydantic_ai_slim/pydantic_ai/agent.py index a04ae8646..15cc2ca59 100644 --- a/pydantic_ai_slim/pydantic_ai/agent.py +++ b/pydantic_ai_slim/pydantic_ai/agent.py @@ -62,7 +62,7 @@ from fasta2a.applications import FastA2A from fasta2a.broker import Broker - from fasta2a.schema import Provider, Skill + from fasta2a.schema import Provider, Skill, Task from fasta2a.storage import Storage from pydantic_ai.mcp import MCPServer @@ -1739,6 +1739,7 @@ async def run_mcp_servers( def to_a2a( self, *, + deps_factory: Callable[[Task], AgentDepsT] | None = None, storage: Storage | None = None, broker: Broker | None = None, # Agent card @@ -1772,11 +1773,31 @@ def to_a2a( ```bash uvicorn app:app --host 0.0.0.0 --port 8000 ``` + + Args: + deps_factory: Function that creates agent dependencies from task metadata. + storage: Backend for persisting task state and history. Defaults to in-memory storage. + broker: Message broker for distributing tasks to workers. Defaults to in-memory broker. + name: Display name for this agent in the A2A protocol. + url: Base URL where this agent will be hosted. + version: Version string for this agent. + description: Human-readable description of what this agent does. + provider: Provider metadata for the A2A agent card. + skills: List of capabilities this agent exposes via A2A. + debug: Enable Starlette debug mode with detailed error pages. + routes: Additional Starlette routes to include in the application. + middleware: Additional Starlette middleware to include. + exception_handlers: Custom exception handlers for the application. + lifespan: ASGI lifespan context manager for startup/shutdown logic. + + Returns: + A FastA2A application ready to serve A2A requests. """ from ._a2a import agent_to_a2a return agent_to_a2a( self, + deps_factory=deps_factory, storage=storage, broker=broker, name=name, diff --git a/tests/test_a2a.py b/tests/test_a2a.py index fae117781..ed65c6f4c 100644 --- a/tests/test_a2a.py +++ b/tests/test_a2a.py @@ -12,7 +12,7 @@ with try_import() as imports_successful: from fasta2a.client import A2AClient - from fasta2a.schema import DataPart, FilePart, Message, TextPart + from fasta2a.schema import DataPart, FilePart, Message, TextPart, is_task from fasta2a.storage import InMemoryStorage @@ -40,10 +40,10 @@ async def test_a2a_runtime_error_without_lifespan(): async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) - message = Message(role='user', parts=[TextPart(text='Hello, world!', type='text')]) + message = Message(role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message') with pytest.raises(RuntimeError, match='TaskManager was not properly initialized.'): - await a2a_client.send_task(message=message) + await a2a_client.send_message(message=message) async def test_a2a_simple(): @@ -55,23 +55,31 @@ async def test_a2a_simple(): async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) - message = Message(role='user', parts=[TextPart(text='Hello, world!', type='text')]) - response = await a2a_client.send_task(message=message) - assert response == snapshot( + message = Message(role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message') + response = await a2a_client.send_message(message=message) + assert 'error' not in response + assert 'result' in response + result = response['result'] + assert is_task(result) + assert result == snapshot( { - 'jsonrpc': '2.0', 'id': IsStr(), - 'result': { - 'id': IsStr(), - 'session_id': IsStr(), - 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, - 'history': [{'role': 'user', 'parts': [{'type': 'text', 'text': 'Hello, world!'}]}], - }, + 'context_id': IsStr(), + 'kind': 'task', + 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, + 'history': [ + { + 'role': 'user', + 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + } + ], } ) - assert 'result' in response - task_id = response['result']['id'] + task_id = result['id'] while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': @@ -83,11 +91,29 @@ async def test_a2a_simple(): 'id': None, 'result': { 'id': IsStr(), - 'session_id': IsStr(), + 'context_id': IsStr(), + 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, - 'history': [{'role': 'user', 'parts': [{'type': 'text', 'text': 'Hello, world!'}]}], + 'history': [ + { + 'role': 'user', + 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + } + ], 'artifacts': [ - {'name': 'result', 'parts': [{'type': 'text', 'text': "('foo', 'bar')"}], 'index': 0} + { + 'artifact_id': IsStr(), + 'name': 'result', + 'parts': [{'kind': 'data', 'data': ['foo', 'bar']}], + 'metadata': { + 'type': 'tuple', + 'json_schema': {'items': {}, 'type': 'array'}, + 'class_name': 'tuple', + }, + } ], }, } @@ -107,37 +133,47 @@ async def test_a2a_file_message_with_file(): role='user', parts=[ FilePart( - type='file', - file={'url': 'https://example.com/file.txt', 'mime_type': 'text/plain'}, + kind='file', + file={ + 'uri': 'https://example.com/file.txt', + 'mime_type': 'text/plain', + }, ) ], + kind='message', ) - response = await a2a_client.send_task(message=message) - assert response == snapshot( + response = await a2a_client.send_message(message=message) + assert 'error' not in response + assert 'result' in response + result = response['result'] + assert is_task(result) + assert result == snapshot( { - 'jsonrpc': '2.0', 'id': IsStr(), - 'result': { - 'id': IsStr(), - 'session_id': IsStr(), - 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, - 'history': [ - { - 'role': 'user', - 'parts': [ - { - 'type': 'file', - 'file': {'mime_type': 'text/plain', 'url': 'https://example.com/file.txt'}, - } - ], - } - ], - }, + 'context_id': IsStr(), + 'kind': 'task', + 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, + 'history': [ + { + 'role': 'user', + 'parts': [ + { + 'kind': 'file', + 'file': { + 'uri': 'https://example.com/file.txt', + 'mime_type': 'text/plain', + }, + } + ], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + } + ], } ) - assert 'result' in response - task_id = response['result']['id'] + task_id = result['id'] while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': @@ -149,21 +185,37 @@ async def test_a2a_file_message_with_file(): 'id': None, 'result': { 'id': IsStr(), - 'session_id': IsStr(), + 'context_id': IsStr(), + 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', 'parts': [ { - 'type': 'file', - 'file': {'mime_type': 'text/plain', 'url': 'https://example.com/file.txt'}, + 'kind': 'file', + 'file': { + 'uri': 'https://example.com/file.txt', + 'mime_type': 'text/plain', + }, } ], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), } ], 'artifacts': [ - {'name': 'result', 'parts': [{'type': 'text', 'text': "('foo', 'bar')"}], 'index': 0} + { + 'artifact_id': IsStr(), + 'name': 'result', + 'parts': [{'kind': 'data', 'data': ['foo', 'bar']}], + 'metadata': { + 'type': 'tuple', + 'json_schema': {'items': {}, 'type': 'array'}, + 'class_name': 'tuple', + }, + } ], }, } @@ -182,30 +234,34 @@ async def test_a2a_file_message_with_file_content(): message = Message( role='user', parts=[ - FilePart(type='file', file={'data': 'foo', 'mime_type': 'text/plain'}), + FilePart(kind='file', file={'data': 'foo', 'mime_type': 'text/plain'}), ], + kind='message', ) - response = await a2a_client.send_task(message=message) - assert response == snapshot( + response = await a2a_client.send_message(message=message) + assert 'error' not in response + assert 'result' in response + result = response['result'] + assert is_task(result) + assert result == snapshot( { - 'jsonrpc': '2.0', 'id': IsStr(), - 'result': { - 'id': IsStr(), - 'session_id': IsStr(), - 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, - 'history': [ - { - 'role': 'user', - 'parts': [{'type': 'file', 'file': {'mime_type': 'text/plain', 'data': 'foo'}}], - } - ], - }, + 'context_id': IsStr(), + 'kind': 'task', + 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, + 'history': [ + { + 'role': 'user', + 'parts': [{'kind': 'file', 'file': {'data': 'foo', 'mime_type': 'text/plain'}}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + } + ], } ) - assert 'result' in response - task_id = response['result']['id'] + task_id = result['id'] while task := await a2a_client.get_task(task_id): # pragma: no branch if 'result' in task and task['result']['status']['state'] == 'completed': @@ -217,16 +273,29 @@ async def test_a2a_file_message_with_file_content(): 'id': None, 'result': { 'id': IsStr(), - 'session_id': IsStr(), + 'context_id': IsStr(), + 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ { 'role': 'user', - 'parts': [{'type': 'file', 'file': {'mime_type': 'text/plain', 'data': 'foo'}}], + 'parts': [{'kind': 'file', 'file': {'data': 'foo', 'mime_type': 'text/plain'}}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), } ], 'artifacts': [ - {'name': 'result', 'parts': [{'type': 'text', 'text': "('foo', 'bar')"}], 'index': 0} + { + 'artifact_id': IsStr(), + 'name': 'result', + 'parts': [{'kind': 'data', 'data': ['foo', 'bar']}], + 'metadata': { + 'type': 'tuple', + 'json_schema': {'items': {}, 'type': 'array'}, + 'class_name': 'tuple', + }, + } ], }, } @@ -242,29 +311,34 @@ async def test_a2a_file_message_with_data(): async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) - message = Message( - role='user', - parts=[DataPart(type='data', data={'foo': 'bar'})], - ) - response = await a2a_client.send_task(message=message) - assert response == snapshot( + message = Message(role='user', parts=[DataPart(kind='data', data={'foo': 'bar'})], kind='message') + response = await a2a_client.send_message(message=message) + assert 'error' not in response + assert 'result' in response + result = response['result'] + assert is_task(result) + assert result == snapshot( { - 'jsonrpc': '2.0', 'id': IsStr(), - 'result': { - 'id': IsStr(), - 'session_id': IsStr(), - 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, - 'history': [{'role': 'user', 'parts': [{'type': 'data', 'data': {'foo': 'bar'}}]}], - }, + 'context_id': IsStr(), + 'kind': 'task', + 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, + 'history': [ + { + 'role': 'user', + 'parts': [{'kind': 'data', 'data': {'foo': 'bar'}}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + } + ], } ) - assert 'result' in response - task_id = response['result']['id'] + task_id = result['id'] while task := await a2a_client.get_task(task_id): # pragma: no branch - if 'result' in task and task['result']['status']['state'] == 'failed': + if 'result' in task and task['result']['status']['state'] in ('failed', 'completed'): break await anyio.sleep(0.1) assert task == snapshot( @@ -273,9 +347,18 @@ async def test_a2a_file_message_with_data(): 'id': None, 'result': { 'id': IsStr(), - 'session_id': IsStr(), + 'context_id': IsStr(), + 'kind': 'task', 'status': {'state': 'failed', 'timestamp': IsDatetime(iso_string=True)}, - 'history': [{'role': 'user', 'parts': [{'type': 'data', 'data': {'foo': 'bar'}}]}], + 'history': [ + { + 'role': 'user', + 'parts': [{'kind': 'data', 'data': {'foo': 'bar'}}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + } + ], }, } ) @@ -291,27 +374,37 @@ async def test_a2a_multiple_messages(): async with httpx.AsyncClient(transport=transport) as http_client: a2a_client = A2AClient(http_client=http_client) - message = Message(role='user', parts=[TextPart(text='Hello, world!', type='text')]) - response = await a2a_client.send_task(message=message) - assert response == snapshot( + message = Message(role='user', parts=[TextPart(text='Hello, world!', kind='text')], kind='message') + response = await a2a_client.send_message(message=message) + assert 'error' not in response + assert 'result' in response + result = response['result'] + assert is_task(result) + assert result == snapshot( { - 'jsonrpc': '2.0', 'id': IsStr(), - 'result': { - 'id': IsStr(), - 'session_id': IsStr(), - 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, - 'history': [{'role': 'user', 'parts': [{'type': 'text', 'text': 'Hello, world!'}]}], - }, + 'context_id': IsStr(), + 'kind': 'task', + 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, + 'history': [ + { + 'role': 'user', + 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + } + ], } ) # NOTE: We include the agent history before we start working on the task. - assert 'result' in response - task_id = response['result']['id'] + task_id = result['id'] task = storage.tasks[task_id] assert 'history' in task - task['history'].append(Message(role='agent', parts=[TextPart(text='Whats up?', type='text')])) + task['history'].append( + Message(role='agent', parts=[TextPart(text='Whats up?', kind='text')], kind='message') + ) response = await a2a_client.get_task(task_id) assert response == snapshot( @@ -320,11 +413,18 @@ async def test_a2a_multiple_messages(): 'id': None, 'result': { 'id': IsStr(), - 'session_id': IsStr(), + 'context_id': IsStr(), + 'kind': 'task', 'status': {'state': 'submitted', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ - {'role': 'user', 'parts': [{'type': 'text', 'text': 'Hello, world!'}]}, - {'role': 'agent', 'parts': [{'type': 'text', 'text': 'Whats up?'}]}, + { + 'role': 'user', + 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + }, + {'role': 'agent', 'parts': [{'kind': 'text', 'text': 'Whats up?'}], 'kind': 'message'}, ], }, } @@ -338,14 +438,30 @@ async def test_a2a_multiple_messages(): 'id': None, 'result': { 'id': IsStr(), - 'session_id': IsStr(), + 'context_id': IsStr(), + 'kind': 'task', 'status': {'state': 'completed', 'timestamp': IsDatetime(iso_string=True)}, 'history': [ - {'role': 'user', 'parts': [{'type': 'text', 'text': 'Hello, world!'}]}, - {'role': 'agent', 'parts': [{'type': 'text', 'text': 'Whats up?'}]}, + { + 'role': 'user', + 'parts': [{'kind': 'text', 'text': 'Hello, world!'}], + 'kind': 'message', + 'context_id': IsStr(), + 'task_id': IsStr(), + }, + {'role': 'agent', 'parts': [{'kind': 'text', 'text': 'Whats up?'}], 'kind': 'message'}, ], 'artifacts': [ - {'name': 'result', 'parts': [{'type': 'text', 'text': "('foo', 'bar')"}], 'index': 0} + { + 'artifact_id': IsStr(), + 'name': 'result', + 'parts': [{'kind': 'data', 'data': ['foo', 'bar']}], + 'metadata': { + 'type': 'tuple', + 'json_schema': {'items': {}, 'type': 'array'}, + 'class_name': 'tuple', + }, + } ], }, } diff --git a/tests/test_a2a_deps.py b/tests/test_a2a_deps.py new file mode 100644 index 000000000..c53481909 --- /dev/null +++ b/tests/test_a2a_deps.py @@ -0,0 +1,153 @@ +"""Test A2A with dependency injection via deps_factory.""" + +from dataclasses import dataclass + +import anyio +import httpx +import pytest +from asgi_lifespan import LifespanManager + +from pydantic_ai import Agent, RunContext +from pydantic_ai.messages import ModelMessage, ModelResponse, TextPart as TextPartMessage, ToolCallPart +from pydantic_ai.models.function import AgentInfo, FunctionModel + +from .conftest import try_import + +with try_import() as imports_successful: + from fasta2a.client import A2AClient + from fasta2a.schema import Message, Task, TextPart, is_task + +pytestmark = [ + pytest.mark.skipif(not imports_successful(), reason='fasta2a not installed'), + pytest.mark.anyio, +] + + +async def test_a2a_with_deps_factory(): + """Test that deps_factory enables agents with dependencies to work with A2A.""" + + # 1. Define a simple dependency class + @dataclass + class Deps: + user_name: str + multiplier: int = 2 + + # 2. Create a model that returns output based on deps + def model_function(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: + # This function doesn't have access to deps, so it just returns a placeholder + if info.output_tools: + # Return a simple string result using the output tool + args_json = '{"response": "Result computed with deps"}' + return ModelResponse(parts=[ToolCallPart(info.output_tools[0].name, args_json)]) + else: + # No output tools, just return text + return ModelResponse(parts=[TextPartMessage(content='Result computed with deps')]) + + model = FunctionModel(model_function) + agent = Agent(model=model, deps_type=Deps, output_type=str) + + # 3. Add a system prompt that uses deps + @agent.system_prompt + def add_user_info(ctx: RunContext[Deps]) -> str: + return f"The user's name is {ctx.deps.user_name} with multiplier {ctx.deps.multiplier}" + + # 4. Create deps_factory that reads from task metadata + def create_deps(task: Task) -> Deps: + metadata = task.get('metadata', {}) + return Deps(user_name=metadata.get('user_name', 'DefaultUser'), multiplier=metadata.get('multiplier', 2)) + + # 5. Create A2A app with deps_factory + app = agent.to_a2a(deps_factory=create_deps) + + # 6. Test the full flow + async with LifespanManager(app): + transport = httpx.ASGITransport(app) + async with httpx.AsyncClient(transport=transport) as http_client: + a2a_client = A2AClient(http_client=http_client) + + # Send task with metadata + message = Message(role='user', parts=[TextPart(text='Process this', kind='text')], kind='message') + response = await a2a_client.send_message(message=message, metadata={'user_name': 'Alice', 'multiplier': 5}) + assert 'error' not in response + assert 'result' in response + result = response['result'] + assert is_task(result), 'Expected Task response' + task_id = result['id'] + + # Wait for task completion + task = None + for _ in range(10): # Max 10 attempts + response = await a2a_client.get_task(task_id) + if 'result' in response: + task = response['result'] + if task['status']['state'] in ('completed', 'failed'): + break + await anyio.sleep(0.1) + + # Verify the result + assert task is not None + if task['status']['state'] == 'failed': + print(f'Task failed. Full task: {task}') + assert task['status']['state'] == 'completed' + assert 'history' in task + # Find the agent's response message + agent_messages = [msg for msg in task['history'] if msg['role'] == 'agent'] + assert len(agent_messages) >= 1 + last_agent_message = agent_messages[-1] + assert len(last_agent_message['parts']) == 1 + part = last_agent_message['parts'][0] + assert part['kind'] == 'text' + assert part['text'] == 'Result computed with deps' + + +async def test_a2a_without_deps_factory(): + """Test that agents without deps still work when no deps_factory is provided.""" + + def model_function(messages: list[ModelMessage], info: AgentInfo) -> ModelResponse: + if info.output_tools: + args_json = '{"response": "Hello from agent"}' + return ModelResponse(parts=[ToolCallPart(info.output_tools[0].name, args_json)]) + else: + return ModelResponse(parts=[TextPartMessage(content='Hello from agent')]) + + model = FunctionModel(model_function) + # Agent with no deps_type + agent = Agent(model=model, output_type=str) + + # Create A2A app without deps_factory + app = agent.to_a2a() + + async with LifespanManager(app): + transport = httpx.ASGITransport(app) + async with httpx.AsyncClient(transport=transport) as http_client: + a2a_client = A2AClient(http_client=http_client) + + message = Message(role='user', parts=[TextPart(text='Hello', kind='text')], kind='message') + response = await a2a_client.send_message(message=message) + assert 'error' not in response + assert 'result' in response + result = response['result'] + assert is_task(result), 'Expected Task response' + task_id = result['id'] + + # Wait for completion + task = None + for _ in range(10): + response = await a2a_client.get_task(task_id) + if 'result' in response: + task = response['result'] + if task['status']['state'] == 'completed': + break + await anyio.sleep(0.1) + + assert task is not None + assert task['status']['state'] == 'completed' + assert 'history' in task + # Find the agent's response message + agent_messages = [msg for msg in task['history'] if msg['role'] == 'agent'] + assert len(agent_messages) >= 1 + last_agent_message = agent_messages[-1] + assert len(last_agent_message['parts']) == 1 + part = last_agent_message['parts'][0] + assert part['kind'] == 'text' + assert part['text'] == 'Hello from agent' diff --git a/tests/fasta2a/__init__.py b/tests/test_fasta2a/__init__.py similarity index 100% rename from tests/fasta2a/__init__.py rename to tests/test_fasta2a/__init__.py diff --git a/tests/fasta2a/test_applications.py b/tests/test_fasta2a/test_applications.py similarity index 92% rename from tests/fasta2a/test_applications.py rename to tests/test_fasta2a/test_applications.py index 3b3aa437d..accf2cd85 100644 --- a/tests/fasta2a/test_applications.py +++ b/tests/test_fasta2a/test_applications.py @@ -39,7 +39,7 @@ async def test_agent_card(): 'skills': [], 'defaultInputModes': ['application/json'], 'defaultOutputModes': ['application/json'], - 'capabilities': {'streaming': False, 'pushNotifications': False, 'stateTransitionHistory': False}, + 'capabilities': {'streaming': True, 'pushNotifications': False, 'stateTransitionHistory': False}, 'authentication': {'schemes': []}, } )