Skip to content

Commit f1b576e

Browse files
refactor!: Push notification changes [DO NOT MERGE] (#212)
``` Release-As: 0.2.11 ``` * Removes push_notifier interface from the SDK and introduces push_notification_config_store and push_notification_sender for supporting push notifications. Following is an example for the changes required with new version: Previously: ```py from a2a.server.tasks import InMemoryPushNotifier ... ... httpx_client = httpx.AsyncClient() request_handler = DefaultRequestHandler( agent_executor=CurrencyAgentExecutor(), task_store=InMemoryTaskStore(), push_notifier=InMemoryPushNotifier(httpx_client), ) ... ... ``` Now: ```py from a2a.server.tasks import InMemoryPushNotificationConfigStore, BasePushNotificationSender ... ... httpx_client = httpx.AsyncClient() push_notification_config_store = InMemoryPushNotificationConfigStore() push_notification_sender = BasePushNotificationSender(httpx_client, config_store=push_notification_config_store) request_handler = DefaultRequestHandler( agent_executor=CurrencyAgentExecutor(), task_store=InMemoryTaskStore(), push_config_store=push_notification_config_store, push_sender=push_notification_sender ) ... ... ``` * Adds support for more than one push_notification_config per task. * Adds support for List and Delete push notification configurations --------- Co-authored-by: Holt Skinner <13262395+holtskinner@users.noreply.github.com>
1 parent bf939a8 commit f1b576e

17 files changed

+1231
-210
lines changed

src/a2a/server/apps/jsonrpc/jsonrpc_app.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
A2ARequest,
2626
AgentCard,
2727
CancelTaskRequest,
28+
DeleteTaskPushNotificationConfigRequest,
2829
GetTaskPushNotificationConfigRequest,
2930
GetTaskRequest,
3031
InternalError,
@@ -33,6 +34,7 @@
3334
JSONRPCError,
3435
JSONRPCErrorResponse,
3536
JSONRPCResponse,
37+
ListTaskPushNotificationConfigRequest,
3638
SendMessageRequest,
3739
SendStreamingMessageRequest,
3840
SendStreamingMessageResponse,
@@ -297,12 +299,22 @@ async def _process_non_streaming_request(
297299
request_obj, context
298300
)
299301
case SetTaskPushNotificationConfigRequest():
300-
handler_result = await self.handler.set_push_notification(
302+
handler_result = await self.handler.set_push_notification_config(
301303
request_obj,
302304
context,
303305
)
304306
case GetTaskPushNotificationConfigRequest():
305-
handler_result = await self.handler.get_push_notification(
307+
handler_result = await self.handler.get_push_notification_config(
308+
request_obj,
309+
context,
310+
)
311+
case ListTaskPushNotificationConfigRequest():
312+
handler_result = await self.handler.list_push_notification_config(
313+
request_obj,
314+
context,
315+
)
316+
case DeleteTaskPushNotificationConfigRequest():
317+
handler_result = await self.handler.delete_push_notification_config(
306318
request_obj,
307319
context,
308320
)

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 83 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
import logging
3-
import uuid
43

54
from collections.abc import AsyncGenerator
65
from typing import cast
@@ -21,15 +20,18 @@
2120
)
2221
from a2a.server.request_handlers.request_handler import RequestHandler
2322
from a2a.server.tasks import (
24-
PushNotifier,
23+
PushNotificationConfigStore,
24+
PushNotificationSender,
2525
ResultAggregator,
2626
TaskManager,
2727
TaskStore,
2828
)
2929
from a2a.types import (
30+
DeleteTaskPushNotificationConfigParams,
3031
GetTaskPushNotificationConfigParams,
3132
InternalError,
3233
InvalidParamsError,
34+
ListTaskPushNotificationConfigParams,
3335
Message,
3436
MessageSendConfiguration,
3537
MessageSendParams,
@@ -67,12 +69,13 @@ class DefaultRequestHandler(RequestHandler):
6769

6870
_running_agents: dict[str, asyncio.Task]
6971

70-
def __init__(
72+
def __init__( # noqa: PLR0913
7173
self,
7274
agent_executor: AgentExecutor,
7375
task_store: TaskStore,
7476
queue_manager: QueueManager | None = None,
75-
push_notifier: PushNotifier | None = None,
77+
push_config_store: PushNotificationConfigStore | None = None,
78+
push_sender: PushNotificationSender | None = None,
7679
request_context_builder: RequestContextBuilder | None = None,
7780
) -> None:
7881
"""Initializes the DefaultRequestHandler.
@@ -81,14 +84,16 @@ def __init__(
8184
agent_executor: The `AgentExecutor` instance to run agent logic.
8285
task_store: The `TaskStore` instance to manage task persistence.
8386
queue_manager: The `QueueManager` instance to manage event queues. Defaults to `InMemoryQueueManager`.
84-
push_notifier: The `PushNotifier` instance for sending push notifications. Defaults to None.
87+
push_config_store: The `PushNotificationConfigStore` instance for managing push notification configurations. Defaults to None.
88+
push_sender: The `PushNotificationSender` instance for sending push notifications. Defaults to None.
8589
request_context_builder: The `RequestContextBuilder` instance used
8690
to build request contexts. Defaults to `SimpleRequestContextBuilder`.
8791
"""
8892
self.agent_executor = agent_executor
8993
self.task_store = task_store
9094
self._queue_manager = queue_manager or InMemoryQueueManager()
91-
self._push_notifier = push_notifier
95+
self._push_config_store = push_config_store
96+
self._push_sender = push_sender
9297
self._request_context_builder = (
9398
request_context_builder
9499
or SimpleRequestContextBuilder(
@@ -198,15 +203,15 @@ async def _setup_message_execution(
198203

199204
task = task_manager.update_with_message(params.message, task)
200205
if self.should_add_push_info(params):
201-
assert isinstance(self._push_notifier, PushNotifier)
206+
assert self._push_config_store is not None
202207
assert isinstance(
203208
params.configuration, MessageSendConfiguration
204209
)
205210
assert isinstance(
206211
params.configuration.pushNotificationConfig,
207212
PushNotificationConfig,
208213
)
209-
await self._push_notifier.set_info(
214+
await self._push_config_store.set_info(
210215
task.id, params.configuration.pushNotificationConfig
211216
)
212217

@@ -247,10 +252,10 @@ async def _send_push_notification_if_needed(
247252
self, task_id: str, result_aggregator: ResultAggregator
248253
) -> None:
249254
"""Sends push notification if configured and task is available."""
250-
if self._push_notifier and task_id:
255+
if self._push_sender and task_id:
251256
latest_task = await result_aggregator.current_result
252257
if isinstance(latest_task, Task):
253-
await self._push_notifier.send_notification(latest_task)
258+
await self._push_sender.send_notification(latest_task)
254259

255260
async def on_message_send(
256261
self,
@@ -329,11 +334,11 @@ async def on_message_send_stream(
329334
self._validate_task_id_match(task_id, event.id)
330335

331336
if (
332-
self._push_notifier
337+
self._push_config_store
333338
and params.configuration
334339
and params.configuration.pushNotificationConfig
335340
):
336-
await self._push_notifier.set_info(
341+
await self._push_config_store.set_info(
337342
task_id,
338343
params.configuration.pushNotificationConfig,
339344
)
@@ -372,16 +377,14 @@ async def on_set_task_push_notification_config(
372377
373378
Requires a `PushNotifier` to be configured.
374379
"""
375-
if not self._push_notifier:
380+
if not self._push_config_store:
376381
raise ServerError(error=UnsupportedOperationError())
377382

378383
task: Task | None = await self.task_store.get(params.taskId)
379384
if not task:
380385
raise ServerError(error=TaskNotFoundError())
381386

382-
# Generate a unique id for the notification
383-
params.pushNotificationConfig.id = str(uuid.uuid4())
384-
await self._push_notifier.set_info(
387+
await self._push_config_store.set_info(
385388
params.taskId,
386389
params.pushNotificationConfig,
387390
)
@@ -395,21 +398,27 @@ async def on_get_task_push_notification_config(
395398
) -> TaskPushNotificationConfig:
396399
"""Default handler for 'tasks/pushNotificationConfig/get'.
397400
398-
Requires a `PushNotifier` to be configured.
401+
Requires a `PushConfigStore` to be configured.
399402
"""
400-
if not self._push_notifier:
403+
if not self._push_config_store:
401404
raise ServerError(error=UnsupportedOperationError())
402405

403406
task: Task | None = await self.task_store.get(params.id)
404407
if not task:
405408
raise ServerError(error=TaskNotFoundError())
406409

407-
push_notification_config = await self._push_notifier.get_info(params.id)
408-
if not push_notification_config:
409-
raise ServerError(error=InternalError())
410+
push_notification_config = await self._push_config_store.get_info(
411+
params.id
412+
)
413+
if not push_notification_config or not push_notification_config[0]:
414+
raise ServerError(
415+
error=InternalError(
416+
message='Push notification config not found'
417+
)
418+
)
410419

411420
return TaskPushNotificationConfig(
412-
taskId=params.id, pushNotificationConfig=push_notification_config
421+
taskId=params.id, pushNotificationConfig=push_notification_config[0]
413422
)
414423

415424
async def on_resubscribe_to_task(
@@ -450,10 +459,61 @@ async def on_resubscribe_to_task(
450459
async for event in result_aggregator.consume_and_emit(consumer):
451460
yield event
452461

462+
async def on_list_task_push_notification_config(
463+
self,
464+
params: ListTaskPushNotificationConfigParams,
465+
context: ServerCallContext | None = None,
466+
) -> list[TaskPushNotificationConfig]:
467+
"""Default handler for 'tasks/pushNotificationConfig/list'.
468+
469+
Requires a `PushConfigStore` to be configured.
470+
"""
471+
if not self._push_config_store:
472+
raise ServerError(error=UnsupportedOperationError())
473+
474+
task: Task | None = await self.task_store.get(params.id)
475+
if not task:
476+
raise ServerError(error=TaskNotFoundError())
477+
478+
push_notification_config_list = await self._push_config_store.get_info(
479+
params.id
480+
)
481+
482+
task_push_notification_config = []
483+
if push_notification_config_list:
484+
for config in push_notification_config_list:
485+
task_push_notification_config.append(
486+
TaskPushNotificationConfig(
487+
taskId=params.id, pushNotificationConfig=config
488+
)
489+
)
490+
491+
return task_push_notification_config
492+
493+
async def on_delete_task_push_notification_config(
494+
self,
495+
params: DeleteTaskPushNotificationConfigParams,
496+
context: ServerCallContext | None = None,
497+
) -> None:
498+
"""Default handler for 'tasks/pushNotificationConfig/delete'.
499+
500+
Requires a `PushConfigStore` to be configured.
501+
"""
502+
if not self._push_config_store:
503+
raise ServerError(error=UnsupportedOperationError())
504+
505+
task: Task | None = await self.task_store.get(params.id)
506+
if not task:
507+
raise ServerError(error=TaskNotFoundError())
508+
509+
await self._push_config_store.delete_info(
510+
params.id, params.pushNotificationConfigId
511+
)
512+
453513
def should_add_push_info(self, params: MessageSendParams) -> bool:
454514
"""Determines if push notification info should be set for a task."""
455515
return bool(
456-
self._push_notifier
516+
self._push_config_store
457517
and params.configuration
458518
and params.configuration.pushNotificationConfig
459519
)

src/a2a/server/request_handlers/jsonrpc_handler.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
CancelTaskRequest,
1111
CancelTaskResponse,
1212
CancelTaskSuccessResponse,
13+
DeleteTaskPushNotificationConfigRequest,
14+
DeleteTaskPushNotificationConfigResponse,
15+
DeleteTaskPushNotificationConfigSuccessResponse,
1316
GetTaskPushNotificationConfigRequest,
1417
GetTaskPushNotificationConfigResponse,
1518
GetTaskPushNotificationConfigSuccessResponse,
@@ -18,6 +21,9 @@
1821
GetTaskSuccessResponse,
1922
InternalError,
2023
JSONRPCErrorResponse,
24+
ListTaskPushNotificationConfigRequest,
25+
ListTaskPushNotificationConfigResponse,
26+
ListTaskPushNotificationConfigSuccessResponse,
2127
Message,
2228
SendMessageRequest,
2329
SendMessageResponse,
@@ -214,7 +220,7 @@ async def on_resubscribe_to_task(
214220
)
215221
)
216222

217-
async def get_push_notification(
223+
async def get_push_notification_config(
218224
self,
219225
request: GetTaskPushNotificationConfigRequest,
220226
context: ServerCallContext | None = None,
@@ -252,7 +258,7 @@ async def get_push_notification(
252258
lambda self: self.agent_card.capabilities.pushNotifications,
253259
'Push notifications are not supported by the agent',
254260
)
255-
async def set_push_notification(
261+
async def set_push_notification_config(
256262
self,
257263
request: SetTaskPushNotificationConfigRequest,
258264
context: ServerCallContext | None = None,
@@ -325,3 +331,69 @@ async def on_get_task(
325331
id=request.id, error=e.error if e.error else InternalError()
326332
)
327333
)
334+
335+
async def list_push_notification_config(
336+
self,
337+
request: ListTaskPushNotificationConfigRequest,
338+
context: ServerCallContext | None = None,
339+
) -> ListTaskPushNotificationConfigResponse:
340+
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.
341+
342+
Args:
343+
request: The incoming `ListTaskPushNotificationConfigRequest` object.
344+
context: Context provided by the server.
345+
346+
Returns:
347+
A `ListTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
348+
"""
349+
try:
350+
config = (
351+
await self.request_handler.on_list_task_push_notification_config(
352+
request.params, context
353+
)
354+
)
355+
return prepare_response_object(
356+
request.id,
357+
config,
358+
(list,),
359+
ListTaskPushNotificationConfigSuccessResponse,
360+
ListTaskPushNotificationConfigResponse,
361+
)
362+
except ServerError as e:
363+
return ListTaskPushNotificationConfigResponse(
364+
root=JSONRPCErrorResponse(
365+
id=request.id, error=e.error if e.error else InternalError()
366+
)
367+
)
368+
369+
async def delete_push_notification_config(
370+
self,
371+
request: DeleteTaskPushNotificationConfigRequest,
372+
context: ServerCallContext | None = None,
373+
) -> DeleteTaskPushNotificationConfigResponse:
374+
"""Handles the 'tasks/pushNotificationConfig/list' JSON-RPC method.
375+
376+
Args:
377+
request: The incoming `DeleteTaskPushNotificationConfigRequest` object.
378+
context: Context provided by the server.
379+
380+
Returns:
381+
A `DeleteTaskPushNotificationConfigResponse` object containing the config or a JSON-RPC error.
382+
"""
383+
try:
384+
(
385+
await self.request_handler.on_delete_task_push_notification_config(
386+
request.params, context
387+
)
388+
)
389+
return DeleteTaskPushNotificationConfigResponse(
390+
root=DeleteTaskPushNotificationConfigSuccessResponse(
391+
id=request.id, result=None
392+
)
393+
)
394+
except ServerError as e:
395+
return DeleteTaskPushNotificationConfigResponse(
396+
root=JSONRPCErrorResponse(
397+
id=request.id, error=e.error if e.error else InternalError()
398+
)
399+
)

0 commit comments

Comments
 (0)