From 05497e1c7ea511007b2f0bb42dc3f55618e904c1 Mon Sep 17 00:00:00 2001 From: acensia Date: Wed, 2 Jul 2025 14:50:15 +0900 Subject: [PATCH] feat: isDelta property on Message for streaming (#267) --- src/a2a/server/events/event_consumer.py | 2 +- src/a2a/server/tasks/task_manager.py | 24 ++++++++++++++++++++---- src/a2a/server/tasks/task_updater.py | 1 + src/a2a/types.py | 4 ++++ 4 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/a2a/server/events/event_consumer.py b/src/a2a/server/events/event_consumer.py index 1fe5c3f3..cb434be0 100644 --- a/src/a2a/server/events/event_consumer.py +++ b/src/a2a/server/events/event_consumer.py @@ -102,7 +102,7 @@ async def consume_all(self) -> AsyncGenerator[Event]: is_final_event = ( (isinstance(event, TaskStatusUpdateEvent) and event.final) - or isinstance(event, Message) + or (isinstance(event, Message) and not event.isDelta) or ( isinstance(event, Task) and event.status.state diff --git a/src/a2a/server/tasks/task_manager.py b/src/a2a/server/tasks/task_manager.py index 5474e155..f27ccc91 100644 --- a/src/a2a/server/tasks/task_manager.py +++ b/src/a2a/server/tasks/task_manager.py @@ -78,6 +78,25 @@ async def get_task(self) -> Task | None: logger.debug('Task %s not found.', self.task_id) return self._current_task + async def add_history_message(self, task: Task) -> None: + """Adds a message to the task's history. + + If the message is not a delta, it appends the message to the history. + If the message is a delta, but with the same messageId, it extends the last message in the history. + If the message is a delta, but with a different messageId, it appends the message to the history. + + Args: + task: The task with the message to add to the history. + """ + if not task.status.message: + return + if not task.history: + task.history = [task.status.message] + elif task.history[-1].messageId != task.status.message.messageId: + task.history.append(task.status.message) + elif task.status.message.isDelta: + task.history[-1].parts.extend(task.status.message.parts) + async def save_task_event( self, event: Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent ) -> Task | None: @@ -132,10 +151,7 @@ async def save_task_event( 'Updating task %s status to: %s', task.id, event.status.state ) if task.status.message: - if not task.history: - task.history = [task.status.message] - else: - task.history.append(task.status.message) + await self.add_history_message(task) if event.metadata: if not task.metadata: task.metadata = {} diff --git a/src/a2a/server/tasks/task_updater.py b/src/a2a/server/tasks/task_updater.py index 169f3b97..401e9f84 100644 --- a/src/a2a/server/tasks/task_updater.py +++ b/src/a2a/server/tasks/task_updater.py @@ -182,4 +182,5 @@ def new_agent_message( messageId=str(uuid.uuid4()), metadata=metadata, parts=parts, + isDelta=False ) diff --git a/src/a2a/types.py b/src/a2a/types.py index 7f952369..a45f1c5d 100644 --- a/src/a2a/types.py +++ b/src/a2a/types.py @@ -1336,6 +1336,10 @@ class Message(BaseModel): """ Identifier of task the message is related to """ + isDelta: bool | None = None + """ + Indicates if this is a delta message + """ class MessageSendParams(BaseModel):