Skip to content

Python: pseudo-stream copilot invoke_stream #12548

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 20, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
CopilotStudioAgentSettings,
)
from semantic_kernel.contents.chat_message_content import ChatMessageContent
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
from semantic_kernel.contents.utils.author_role import AuthorRole
from semantic_kernel.exceptions.agent_exceptions import (
AgentInitializationException,
Expand All @@ -54,7 +55,6 @@
from typing_extensions import override

if TYPE_CHECKING: # pragma: no cover
from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent
from semantic_kernel.kernel import Kernel

logger: logging.Logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -510,7 +510,7 @@ async def invoke(
yield AgentResponseItem(message=response, thread=thread)

@override
def invoke_stream(
async def invoke_stream(
self,
messages: str | ChatMessageContent | list[str | ChatMessageContent] | None = None,
*,
Expand All @@ -519,8 +519,53 @@ def invoke_stream(
arguments: KernelArguments | None = None,
kernel: "Kernel | None" = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseItem["StreamingChatMessageContent"]]:
raise NotImplementedError("Streaming is not supported for Copilot Studio agents.")
) -> AsyncIterable[AgentResponseItem[StreamingChatMessageContent]]:
"""Invoke the agent and stream the response.

Note: this is a “pseudo-streaming” implementation.

We're internally delegating to the real async generator `_inner_invoke`.
Each complete ChatMessageContent is wrapped in exactly one
StreamingChatMessageContent chunk, so downstream consumers can iterate
without change. The stream yields at least once; callers still receive
on_intermediate callbacks in real time.

Args:
messages: The messages to send to the agent.
thread: The thread to use for the agent.
on_intermediate_message: A callback function to call with each intermediate message.
arguments: The arguments to pass to the agent.
kernel: The kernel to use for the agent.
**kwargs: Additional keyword arguments.

Yields:
A chat message content and thread with the response.
"""
thread = await self._ensure_thread_exists_with_messages(
messages=messages,
thread=thread,
construct_thread=lambda: CopilotStudioAgentThread(self.client),
expected_type=CopilotStudioAgentThread,
)
if not isinstance(thread, CopilotStudioAgentThread):
raise AgentThreadOperationException("The thread is not a Copilot Studio Agent thread.")

normalized_messages = self._normalize_messages(messages)

responses: list[ChatMessageContent] = []
async for resp in self._inner_invoke(
thread=thread,
messages=normalized_messages,
on_intermediate_message=on_intermediate_message,
arguments=arguments,
kernel=kernel,
**kwargs,
):
responses.append(resp)

for i, resp in enumerate(responses):
stream_msg = self._to_streaming(resp, index=i)
yield AgentResponseItem(message=stream_msg, thread=thread)

# endregion

Expand Down Expand Up @@ -598,6 +643,21 @@ def _normalize_messages(messages: str | ChatMessageContent | list[str | ChatMess
normalized.append(m.content if isinstance(m, ChatMessageContent) else str(m))
return normalized

@staticmethod
def _to_streaming(
msg: ChatMessageContent,
*,
index: int,
) -> StreamingChatMessageContent:
"""Wrap a complete ChatMessageContent in a StreamingChatMessageContent."""
return StreamingChatMessageContent(
role=msg.role,
name=msg.name,
content=msg.content,
choice_index=index,
metadata=msg.metadata,
)

@override
async def _notify_thread_of_new_message(self, thread, new_message):
"""Copilot Studio Agent doesn't need to notify the thread of new messages.
Expand Down
Loading