Skip to content

Upgrade a2a to spec v0.2.3 #2144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
be6e37f
feat: upgrade A2A protocol from v0.1 to v0.2.3
physicsrob Jul 6, 2025
22e825f
test: add comprehensive test for Pydantic model outputs with metadata
physicsrob Jul 6, 2025
a6d958b
feat: add context storage for conversation continuity
physicsrob Jul 7, 2025
3901954
Fix types
physicsrob Jul 7, 2025
55f3ced
Fix misattributed TODOs -- oops!
physicsrob Jul 7, 2025
0a95bf9
Update docs/a2a.md
physicsrob Jul 8, 2025
5785901
Update fasta2a/fasta2a/applications.py
physicsrob Jul 8, 2025
d1ff90a
Update fasta2a/fasta2a/applications.py
physicsrob Jul 8, 2025
15df524
Update fasta2a/fasta2a/schema.py
physicsrob Jul 8, 2025
1a245a7
Update fasta2a/fasta2a/schema.py
physicsrob Jul 8, 2025
ffe8a6e
Update fasta2a/fasta2a/schema.py
physicsrob Jul 8, 2025
5fd0dde
Update docs
physicsrob Jul 8, 2025
872714e
Remove deprecation exception
physicsrob Jul 8, 2025
9372d3c
Remove deprecation exception
physicsrob Jul 8, 2025
c5dd525
Change DataPart data type back to dict[str, Any]; Update result artif…
physicsrob Jul 8, 2025
a131482
More PR feedback on spec
physicsrob Jul 8, 2025
bfd305f
fix: make DataPart spec-compliant and improve message/artifact separ…
physicsrob Jul 8, 2025
2f05b6c
Remove is_task/is_message
physicsrob Jul 8, 2025
a23582a
Address PR feedback on task_manager
physicsrob Jul 8, 2025
06b2d88
Update tests for requiring message_id
physicsrob Jul 8, 2025
36fc9e8
apply my comments
Kludex Jul 8, 2025
a984a68
Merge remote-tracking branch 'origin/main' into part1-upgrade-a2a-v0.2.3
Kludex Jul 8, 2025
d50cd8e
update types with claude
Kludex Jul 8, 2025
d24fcca
update tests
Kludex Jul 8, 2025
270872c
update code
Kludex Jul 8, 2025
7971e34
coverage
Kludex Jul 8, 2025
3a4c4c6
Merge branch 'main' into part1-upgrade-a2a-v0.2.3
Kludex Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion docs/a2a.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The library is designed to be used with any agentic framework, and is **not excl
Given the nature of the A2A protocol, it's important to understand the design before using it, as a developer
you'll need to provide some components:

- [`Storage`][fasta2a.Storage]: to save and load tasks
- [`Storage`][fasta2a.Storage]: to save and load tasks, as well as store context for conversations
- [`Broker`][fasta2a.Broker]: to schedule tasks
- [`Worker`][fasta2a.Worker]: to execute tasks

Expand All @@ -55,6 +55,28 @@ flowchart TB

FastA2A allows you to bring your own [`Storage`][fasta2a.Storage], [`Broker`][fasta2a.Broker] and [`Worker`][fasta2a.Worker].

#### Understanding Tasks and Context

In the A2A protocol:

- **Task**: Represents one complete execution of an agent. When a client sends a message to the agent, a new task is created. The agent runs until completion (or failure), and this entire execution is considered one task. The final output is stored as a task artifact.

- **Context**: Represents a conversation thread that can span multiple tasks. The A2A protocol uses a `context_id` to maintain conversation continuity:
- When a new message is sent without a `context_id`, the server generates a new one
- Subsequent messages can include the same `context_id` to continue the conversation
- All tasks sharing the same `context_id` have access to the complete message history

#### Storage Architecture

The [`Storage`][fasta2a.Storage] component serves two purposes:

1. **Task Storage**: Stores tasks in A2A protocol format, including their status, artifacts, and message history
2. **Context Storage**: Stores conversation context in a format optimized for the specific agent implementation

This dual-purpose design allows flexibility for agents to store rich internal state (e.g., tool calls, reasoning traces) while maintaining efficient conversation continuity across multiple task executions.

For example, a PydanticAI agent might store its complete internal message format (including tool calls and responses) in the context storage, while storing only the A2A-compliant messages in the task history.


### Installation

Expand Down Expand Up @@ -94,3 +116,7 @@ uvicorn agent_to_a2a:app --host 0.0.0.0 --port 8000
```

Since the goal of `to_a2a` is to be a convenience method, it accepts the same arguments as the [`FastA2A`][fasta2a.FastA2A] constructor.

When using `to_a2a()`, PydanticAI automatically:
- Stores the complete conversation history (including tool calls and responses) in the context storage
- Ensures that subsequent messages with the same `context_id` have access to the full conversation history
14 changes: 12 additions & 2 deletions fasta2a/fasta2a/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
a2a_request_ta,
a2a_response_ta,
agent_card_ta,
send_message_request_ta,
)
from .storage import Storage
from .task_manager import TaskManager
Expand Down Expand Up @@ -116,8 +117,17 @@ async def _agent_run_endpoint(self, request: Request) -> Response:
data = await request.body()
a2a_request = a2a_request_ta.validate_json(data)

if a2a_request['method'] == 'tasks/send':
jsonrpc_response = await self.task_manager.send_task(a2a_request)
if a2a_request['method'] == 'message/send':
message_request = send_message_request_ta.validate_json(data)
jsonrpc_response = await self.task_manager.send_message(message_request)
elif a2a_request['method'] == 'message/stream':
# Streaming support not yet implemented
raise NotImplementedError(
'message/stream method is not implemented yet. Streaming support will be added in a future update.'
)
elif a2a_request['method'] == 'tasks/send': # type: ignore[comparison-overlap]
# Legacy method - no longer supported
raise NotImplementedError('tasks/send is deprecated. Use message/send instead.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for this. Just drop it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

elif a2a_request['method'] == 'tasks/get':
jsonrpc_response = await self.task_manager.get_task(a2a_request)
elif a2a_request['method'] == 'tasks/cancel':
Expand Down
41 changes: 23 additions & 18 deletions fasta2a/fasta2a/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
GetTaskRequest,
GetTaskResponse,
Message,
PushNotificationConfig,
SendTaskRequest,
SendTaskResponse,
TaskSendParams,
MessageSendConfiguration,
MessageSendParams,
SendMessageRequest,
SendMessageResponse,
a2a_request_ta,
send_message_request_ta,
send_message_response_ta,
)

send_task_response_ta = pydantic.TypeAdapter(SendTaskResponse)
get_task_response_ta = pydantic.TypeAdapter(GetTaskResponse)

try:
Expand All @@ -37,26 +38,30 @@ def __init__(self, base_url: str = 'http://localhost:8000', http_client: httpx.A
self.http_client = http_client
self.http_client.base_url = base_url

async def send_task(
async def send_message(
self,
message: Message,
history_length: int | None = None,
push_notification: PushNotificationConfig | None = None,
*,
metadata: dict[str, Any] | None = None,
) -> SendTaskResponse:
task = TaskSendParams(message=message, id=str(uuid.uuid4()))
if history_length is not None:
task['history_length'] = history_length
if push_notification is not None:
task['push_notification'] = push_notification
configuration: MessageSendConfiguration | None = None,
) -> SendMessageResponse:
"""Send a message using the A2A protocol.
Returns a JSON-RPC response containing either a result (Task | Message) or an error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand by the spec is possible to return both, but we always return Task.

"""
params = MessageSendParams(message=message)
if metadata is not None:
task['metadata'] = metadata
params['metadata'] = metadata
if configuration is not None:
params['configuration'] = configuration

payload = SendTaskRequest(jsonrpc='2.0', id=None, method='tasks/send', params=task)
content = a2a_request_ta.dump_json(payload, by_alias=True)
request_id = str(uuid.uuid4())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this ID? Is it the request_id?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the request_id for JSON-RPC. Let me know if you think a clarifying comment would be helpful, or if there's something off about the variable names

payload = SendMessageRequest(jsonrpc='2.0', id=request_id, method='message/send', params=params)
content = send_message_request_ta.dump_json(payload, by_alias=True)
response = await self.http_client.post('/', content=content, headers={'Content-Type': 'application/json'})
self._raise_for_status(response)
return send_task_response_ta.validate_json(response.content)

return send_message_response_ta.validate_json(response.content)

async def get_task(self, task_id: str) -> GetTaskResponse:
payload = GetTaskRequest(jsonrpc='2.0', id=None, method='tasks/get', params={'id': task_id})
Expand Down
Loading
Loading