Skip to content

Upgrade a2a to spec v0.2.3 #2144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
be6e37f
feat: upgrade A2A protocol from v0.1 to v0.2.3
physicsrob Jul 6, 2025
22e825f
test: add comprehensive test for Pydantic model outputs with metadata
physicsrob Jul 6, 2025
a6d958b
feat: add context storage for conversation continuity
physicsrob Jul 7, 2025
3901954
Fix types
physicsrob Jul 7, 2025
55f3ced
Fix misattributed TODOs -- oops!
physicsrob Jul 7, 2025
0a95bf9
Update docs/a2a.md
physicsrob Jul 8, 2025
5785901
Update fasta2a/fasta2a/applications.py
physicsrob Jul 8, 2025
d1ff90a
Update fasta2a/fasta2a/applications.py
physicsrob Jul 8, 2025
15df524
Update fasta2a/fasta2a/schema.py
physicsrob Jul 8, 2025
1a245a7
Update fasta2a/fasta2a/schema.py
physicsrob Jul 8, 2025
ffe8a6e
Update fasta2a/fasta2a/schema.py
physicsrob Jul 8, 2025
5fd0dde
Update docs
physicsrob Jul 8, 2025
872714e
Remove deprecation exception
physicsrob Jul 8, 2025
9372d3c
Remove deprecation exception
physicsrob Jul 8, 2025
c5dd525
Change DataPart data type back to dict[str, Any]; Update result artif…
physicsrob Jul 8, 2025
a131482
More PR feedback on spec
physicsrob Jul 8, 2025
bfd305f
fix: make DataPart spec-compliant and improve message/artifact separ…
physicsrob Jul 8, 2025
2f05b6c
Remove is_task/is_message
physicsrob Jul 8, 2025
a23582a
Address PR feedback on task_manager
physicsrob Jul 8, 2025
06b2d88
Update tests for requiring message_id
physicsrob Jul 8, 2025
36fc9e8
apply my comments
Kludex Jul 8, 2025
a984a68
Merge remote-tracking branch 'origin/main' into part1-upgrade-a2a-v0.2.3
Kludex Jul 8, 2025
d50cd8e
update types with claude
Kludex Jul 8, 2025
d24fcca
update tests
Kludex Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion docs/a2a.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The library is designed to be used with any agentic framework, and is **not excl
Given the nature of the A2A protocol, it's important to understand the design before using it, as a developer
you'll need to provide some components:

- [`Storage`][fasta2a.Storage]: to save and load tasks
- [`Storage`][fasta2a.Storage]: to save and load tasks, as well as store context for conversations
- [`Broker`][fasta2a.Broker]: to schedule tasks
- [`Worker`][fasta2a.Worker]: to execute tasks

Expand All @@ -55,6 +55,28 @@ flowchart TB

FastA2A allows you to bring your own [`Storage`][fasta2a.Storage], [`Broker`][fasta2a.Broker] and [`Worker`][fasta2a.Worker].

#### Understanding Tasks and Context

In the A2A protocol:

- **Task**: Represents one complete execution of an agent. When a client sends a message to the agent, a new task is created. The agent runs until completion (or failure), and this entire execution is considered one task. The final output is stored as a task artifact.

- **Context**: Represents a conversation thread that can span multiple tasks. The A2A protocol uses a `context_id` to maintain conversation continuity:
- When a new message is sent without a `context_id`, the server generates a new one
- Subsequent messages can include the same `context_id` to continue the conversation
- All tasks sharing the same `context_id` have access to the complete message history

#### Storage Architecture

The [`Storage`][fasta2a.Storage] component serves two purposes:

1. **Task Storage**: Stores tasks in A2A protocol format, including their status, artifacts, and message history
2. **Context Storage**: Stores conversation context in a format optimized for the specific agent implementation

This design allows for agents to store rich internal state (e.g., tool calls, reasoning traces) as well as store task-specific A2A-formatted messages and artifacts.

For example, a PydanticAI agent might store its complete internal message format (including tool calls and responses) in the context storage, while storing only the A2A-compliant messages in the task history.


### Installation

Expand Down Expand Up @@ -94,3 +116,12 @@ uvicorn agent_to_a2a:app --host 0.0.0.0 --port 8000
```

Since the goal of `to_a2a` is to be a convenience method, it accepts the same arguments as the [`FastA2A`][fasta2a.FastA2A] constructor.

When using `to_a2a()`, PydanticAI automatically:

- Stores the complete conversation history (including tool calls and responses) in the context storage
- Ensures that subsequent messages with the same `context_id` have access to the full conversation history
- Persists agent results as A2A artifacts:
- String results become `TextPart` artifacts and also appear in the message history
- Structured data (Pydantic models, dataclasses, tuples, etc.) become `DataPart` artifacts with the data wrapped as `{"result": <your_data>}`
- Artifacts include metadata with type information and JSON schema when available
20 changes: 10 additions & 10 deletions fasta2a/fasta2a/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@

from .broker import Broker
from .schema import (
AgentCapabilities,
AgentCard,
Authentication,
Capabilities,
Provider,
AgentProvider,
Skill,
a2a_request_ta,
a2a_response_ta,
Expand All @@ -39,7 +38,7 @@ def __init__(
url: str = 'http://localhost:8000',
version: str = '1.0.0',
description: str | None = None,
provider: Provider | None = None,
provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
# Starlette
debug: bool = False,
Expand Down Expand Up @@ -85,16 +84,17 @@ async def _agent_card_endpoint(self, request: Request) -> Response:
if self._agent_card_json_schema is None:
agent_card = AgentCard(
name=self.name,
description=self.description or 'FastA2A Agent',
url=self.url,
version=self.version,
protocol_version='0.2.5',
skills=self.skills,
default_input_modes=self.default_input_modes,
default_output_modes=self.default_output_modes,
capabilities=Capabilities(streaming=False, push_notifications=False, state_transition_history=False),
authentication=Authentication(schemes=[]),
capabilities=AgentCapabilities(
streaming=False, push_notifications=False, state_transition_history=False
),
)
if self.description is not None:
agent_card['description'] = self.description
if self.provider is not None:
agent_card['provider'] = self.provider
self._agent_card_json_schema = agent_card_ta.dump_json(agent_card, by_alias=True)
Expand All @@ -116,8 +116,8 @@ async def _agent_run_endpoint(self, request: Request) -> Response:
data = await request.body()
a2a_request = a2a_request_ta.validate_json(data)

if a2a_request['method'] == 'tasks/send':
jsonrpc_response = await self.task_manager.send_task(a2a_request)
if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'tasks/get':
jsonrpc_response = await self.task_manager.get_task(a2a_request)
elif a2a_request['method'] == 'tasks/cancel':
Expand Down
41 changes: 23 additions & 18 deletions fasta2a/fasta2a/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
GetTaskRequest,
GetTaskResponse,
Message,
PushNotificationConfig,
SendTaskRequest,
SendTaskResponse,
TaskSendParams,
MessageSendConfiguration,
MessageSendParams,
SendMessageRequest,
SendMessageResponse,
a2a_request_ta,
send_message_request_ta,
send_message_response_ta,
)

send_task_response_ta = pydantic.TypeAdapter(SendTaskResponse)
get_task_response_ta = pydantic.TypeAdapter(GetTaskResponse)

try:
Expand All @@ -37,26 +38,30 @@ def __init__(self, base_url: str = 'http://localhost:8000', http_client: httpx.A
self.http_client = http_client
self.http_client.base_url = base_url

async def send_task(
async def send_message(
self,
message: Message,
history_length: int | None = None,
push_notification: PushNotificationConfig | None = None,
*,
metadata: dict[str, Any] | None = None,
) -> SendTaskResponse:
task = TaskSendParams(message=message, id=str(uuid.uuid4()))
if history_length is not None:
task['history_length'] = history_length
if push_notification is not None:
task['push_notification'] = push_notification
configuration: MessageSendConfiguration | None = None,
) -> SendMessageResponse:
"""Send a message using the A2A protocol.

Returns a JSON-RPC response containing either a result (Task) or an error.
"""
params = MessageSendParams(message=message)
if metadata is not None:
task['metadata'] = metadata
params['metadata'] = metadata
if configuration is not None:
params['configuration'] = configuration

payload = SendTaskRequest(jsonrpc='2.0', id=None, method='tasks/send', params=task)
content = a2a_request_ta.dump_json(payload, by_alias=True)
request_id = str(uuid.uuid4())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this ID? Is it the request_id?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the request_id for JSON-RPC. Let me know if you think a clarifying comment would be helpful, or if there's something off about the variable names

payload = SendMessageRequest(jsonrpc='2.0', id=request_id, method='message/send', params=params)
content = send_message_request_ta.dump_json(payload, by_alias=True)
response = await self.http_client.post('/', content=content, headers={'Content-Type': 'application/json'})
self._raise_for_status(response)
return send_task_response_ta.validate_json(response.content)

return send_message_response_ta.validate_json(response.content)

async def get_task(self, task_id: str) -> GetTaskResponse:
payload = GetTaskRequest(jsonrpc='2.0', id=None, method='tasks/get', params={'id': task_id})
Expand Down
Loading
Loading