Skip to content

Commit 6b6961b

Browse files
committed
refactor: Update FastA2A to use Google A2A SDK
1 parent f19eea2 commit 6b6961b

File tree

9 files changed

+225
-1126
lines changed

9 files changed

+225
-1126
lines changed

fasta2a/fasta2a/__init__.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
from .applications import FastA2A
2-
from .broker import Broker
3-
from .schema import Skill
2+
from .client import A2AClient
3+
from .schema import Message, Part, Role, Skill, TextPart
44
from .storage import Storage
55
from .worker import Worker
66

7-
__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker']
7+
__all__ = [
8+
"FastA2A",
9+
"A2AClient",
10+
"Worker",
11+
"Storage",
12+
"Skill",
13+
"Message",
14+
"Part",
15+
"Role",
16+
"TextPart",
17+
]

fasta2a/fasta2a/applications.py

Lines changed: 75 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,135 +1,106 @@
11
from __future__ import annotations as _annotations
22

3-
from collections.abc import AsyncIterator, Sequence
4-
from contextlib import asynccontextmanager
53
from typing import Any
64

7-
from starlette.applications import Starlette
5+
import httpx
6+
from a2a.server.apps.jsonrpc.starlette_app import A2AStarletteApplication
7+
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
8+
from a2a.server.tasks.inmemory_push_notifier import InMemoryPushNotifier
9+
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
10+
from a2a.types import AgentCapabilities, AgentCard, AgentProvider
811
from starlette.middleware import Middleware
9-
from starlette.requests import Request
10-
from starlette.responses import Response
1112
from starlette.routing import Route
1213
from starlette.types import ExceptionHandler, Lifespan, Receive, Scope, Send
1314

14-
from .broker import Broker
15-
from .schema import (
16-
AgentCard,
17-
Authentication,
18-
Capabilities,
19-
Provider,
20-
Skill,
21-
a2a_request_ta,
22-
a2a_response_ta,
23-
agent_card_ta,
24-
)
15+
from .schema import Skill
2516
from .storage import Storage
26-
from .task_manager import TaskManager
17+
from .worker import Worker
2718

2819

29-
class FastA2A(Starlette):
30-
"""The main class for the FastA2A library."""
20+
class FastA2A:
21+
"""
22+
The main class for the FastA2A library. It provides a simple way to create
23+
an A2A server by wrapping the Google A2A SDK.
24+
"""
3125

3226
def __init__(
3327
self,
3428
*,
35-
storage: Storage,
36-
broker: Broker,
29+
worker: Worker,
30+
storage: Storage | None = None,
3731
# Agent card
38-
name: str | None = None,
39-
url: str = 'http://localhost:8000',
40-
version: str = '1.0.0',
32+
name: str = "Agent",
33+
url: str = "http://localhost:8000",
34+
version: str = "1.0.0",
4135
description: str | None = None,
42-
provider: Provider | None = None,
36+
provider: AgentProvider | None = None,
4337
skills: list[Skill] | None = None,
4438
# Starlette
4539
debug: bool = False,
46-
routes: Sequence[Route] | None = None,
47-
middleware: Sequence[Middleware] | None = None,
40+
routes: list[Route] | None = None,
41+
middleware: list[Middleware] | None = None,
4842
exception_handlers: dict[Any, ExceptionHandler] | None = None,
49-
lifespan: Lifespan[FastA2A] | None = None,
43+
lifespan: Lifespan | None = None,
5044
):
51-
if lifespan is None:
52-
lifespan = _default_lifespan
45+
"""
46+
Initializes the FastA2A application.
47+
48+
Args:
49+
worker: An implementation of `fasta2a.Worker` (which is an `a2a.server.agent_execution.AgentExecutor`).
50+
storage: An implementation of `fasta2a.Storage` (which is an `a2a.server.tasks.TaskStore`).
51+
Defaults to `InMemoryTaskStore`.
52+
name: The human-readable name of the agent.
53+
url: The URL where the agent is hosted.
54+
version: The version of the agent.
55+
description: A human-readable description of the agent.
56+
provider: The service provider of the agent.
57+
skills: A list of skills the agent can perform.
58+
debug: Starlette's debug flag.
59+
routes: A list of additional Starlette routes.
60+
middleware: A list of Starlette middleware.
61+
exception_handlers: A dictionary of Starlette exception handlers.
62+
lifespan: A Starlette lifespan context manager.
63+
"""
64+
self.agent_card = AgentCard(
65+
name=name,
66+
url=url,
67+
version=version,
68+
description=description or "A FastA2A Agent",
69+
provider=provider,
70+
skills=skills or [],
71+
capabilities=AgentCapabilities(
72+
streaming=True, pushNotifications=True, stateTransitionHistory=True
73+
),
74+
defaultInputModes=["application/json"],
75+
defaultOutputModes=["application/json"],
76+
securitySchemes={},
77+
)
5378

54-
super().__init__(
79+
self.storage = storage or InMemoryTaskStore()
80+
self.worker = worker
81+
82+
# The SDK's DefaultRequestHandler uses httpx to send push notifications
83+
http_client = httpx.AsyncClient()
84+
push_notifier = InMemoryPushNotifier(httpx_client)
85+
86+
request_handler = DefaultRequestHandler(
87+
agent_executor=self.worker,
88+
task_store=self.storage,
89+
push_notifier=push_notifier,
90+
)
91+
92+
a2a_app = A2AStarletteApplication(
93+
agent_card=self.agent_card,
94+
http_handler=request_handler,
95+
)
96+
97+
self.app = a2a_app.build(
5598
debug=debug,
5699
routes=routes,
57100
middleware=middleware,
58101
exception_handlers=exception_handlers,
59102
lifespan=lifespan,
60103
)
61104

62-
self.name = name or 'Agent'
63-
self.url = url
64-
self.version = version
65-
self.description = description
66-
self.provider = provider
67-
self.skills = skills or []
68-
# NOTE: For now, I don't think there's any reason to support any other input/output modes.
69-
self.default_input_modes = ['application/json']
70-
self.default_output_modes = ['application/json']
71-
72-
self.task_manager = TaskManager(broker=broker, storage=storage)
73-
74-
# Setup
75-
self._agent_card_json_schema: bytes | None = None
76-
self.router.add_route('/.well-known/agent.json', self._agent_card_endpoint, methods=['HEAD', 'GET', 'OPTIONS'])
77-
self.router.add_route('/', self._agent_run_endpoint, methods=['POST'])
78-
79105
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
80-
if scope['type'] == 'http' and not self.task_manager.is_running:
81-
raise RuntimeError('TaskManager was not properly initialized.')
82-
await super().__call__(scope, receive, send)
83-
84-
async def _agent_card_endpoint(self, request: Request) -> Response:
85-
if self._agent_card_json_schema is None:
86-
agent_card = AgentCard(
87-
name=self.name,
88-
url=self.url,
89-
version=self.version,
90-
skills=self.skills,
91-
default_input_modes=self.default_input_modes,
92-
default_output_modes=self.default_output_modes,
93-
capabilities=Capabilities(streaming=False, push_notifications=False, state_transition_history=False),
94-
authentication=Authentication(schemes=[]),
95-
)
96-
if self.description is not None:
97-
agent_card['description'] = self.description
98-
if self.provider is not None:
99-
agent_card['provider'] = self.provider
100-
self._agent_card_json_schema = agent_card_ta.dump_json(agent_card, by_alias=True)
101-
return Response(content=self._agent_card_json_schema, media_type='application/json')
102-
103-
async def _agent_run_endpoint(self, request: Request) -> Response:
104-
"""This is the main endpoint for the A2A server.
105-
106-
Although the specification allows freedom of choice and implementation, I'm pretty sure about some decisions.
107-
108-
1. The server will always either send a "submitted" or a "failed" on `tasks/send`.
109-
Never a "completed" on the first message.
110-
2. There are three possible ends for the task:
111-
2.1. The task was "completed" successfully.
112-
2.2. The task was "canceled".
113-
2.3. The task "failed".
114-
3. The server will send a "working" on the first chunk on `tasks/pushNotification/get`.
115-
"""
116-
data = await request.body()
117-
a2a_request = a2a_request_ta.validate_json(data)
118-
119-
if a2a_request['method'] == 'tasks/send':
120-
jsonrpc_response = await self.task_manager.send_task(a2a_request)
121-
elif a2a_request['method'] == 'tasks/get':
122-
jsonrpc_response = await self.task_manager.get_task(a2a_request)
123-
elif a2a_request['method'] == 'tasks/cancel':
124-
jsonrpc_response = await self.task_manager.cancel_task(a2a_request)
125-
else:
126-
raise NotImplementedError(f'Method {a2a_request["method"]} not implemented.')
127-
return Response(
128-
content=a2a_response_ta.dump_json(jsonrpc_response, by_alias=True), media_type='application/json'
129-
)
130-
131-
132-
@asynccontextmanager
133-
async def _default_lifespan(app: FastA2A) -> AsyncIterator[None]:
134-
async with app.task_manager:
135-
yield
106+
await self.app(scope, receive, send)

fasta2a/fasta2a/broker.py

Lines changed: 0 additions & 98 deletions
This file was deleted.

0 commit comments

Comments
 (0)