Skip to content

Python: Support a unified agent thread #11104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from

Conversation

moonbox3
Copy link
Contributor

Motivation and Context

Currently, OpenAI Assistants, Azure AI Agents, and the Bedrock Agent (to some degree) support the concept of a thread ID. We lack a consistent abstraction to handle threads through SK Agents. This PR brings in the thread abstraction, via an AgentThread base class, and allows each agent to implement its own thread. This now means that there is a concept of a thread for a chat completion agent, which before, only used the ChatHistory. The ChatCompletionAgentThread now manages an underlying ChatHistory, and allows a user to pass in their own ChatHistory as needed.

Additionally, now all get_response, invoke, or invoke_stream return types were updated to AgentResponseItem which contains the ChatMessageContent and the thread, returned to the caller. The thread is required to be returned now because a user can kick off an agent invoke without having to pass in the thread. One will be created on their behalf, and then returned for later invocations.

This PR contains several breaking changes to different SK Agents. We're aware that this can cause disruption, but this is a necessary change we need to get in as we move towards an "Agent First" / Agent Thread model.

All samples were updated to the new model. Docs updates coming soon.

Description

Introduce the unified thread model to all SK agents.

  • Agent Group Chat Channels were updated to work with these new threads -- although the Agent Group Chat patterns are changing over the next few weeks as we move towards a preview experience, we want to make sure that devs can use group chat even in its current state. Minimal amount of work was done around Agent Group Chat, just so the existing samples work. Return types were not updated for channels.
  • Update the tracing decorator to preserve type hints, so typing works properly throughout the code base.
  • Update samples with the new thread patterns.
  • Update unit tests (Bedrock fixes coming soon).

NOTE The Bedrock agents unit tests are currently failing. I will fix them.

Contribution Checklist

@moonbox3 moonbox3 added PR: breaking change Pull requests that introduce breaking changes agents labels Mar 21, 2025
@moonbox3 moonbox3 requested a review from a team as a code owner March 21, 2025 09:29
@markwallace-microsoft markwallace-microsoft added python Pull requests for the Python Semantic Kernel documentation labels Mar 21, 2025
@markwallace-microsoft
Copy link
Member

markwallace-microsoft commented Mar 21, 2025

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
semantic_kernel/agents
   agent.py1121488%48, 52, 61–72, 81, 118, 225–227
semantic_kernel/agents/autogen
   autogen_conversable_agent.py1161290%67, 73, 84–86, 247–248, 257–260, 289
semantic_kernel/agents/azure_ai
   azure_ai_agent.py1662585%76, 84–85, 90, 95–103, 154, 156, 162, 173–178, 180, 182, 203–206, 272, 369, 437, 535, 540
   azure_ai_channel.py41393%93, 120–121
semantic_kernel/agents/bedrock
   bedrock_agent.py2445179%77–83, 92, 158, 289, 308, 324–327, 333, 335, 338, 343, 377, 409–429, 433, 467, 486–489, 491–494, 536, 547–548, 560, 596–607, 618, 635–648, 665–674, 696
   bedrock_agent_base.py1414171%101, 119–120, 145, 158–160, 187, 206–208, 213, 232–234, 239, 243–244, 264–266, 275, 294–298, 303, 322–326, 331, 343–345, 357–376
semantic_kernel/agents/channels
   bedrock_agent_channel.py79594%178–187, 194, 208–209
   chat_history_channel.py69297%147, 150
semantic_kernel/agents/chat_completion
   chat_completion_agent.py2071991%76, 82, 94, 99–103, 161, 164, 174, 182–187, 196, 336, 402, 463, 499
semantic_kernel/agents/open_ai
   open_ai_assistant_agent.py2311494%79, 93, 98–107, 264, 277, 313, 454, 459, 528, 627, 724
TOTAL21158239989% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
3336 5 💤 0 ❌ 0 🔥 1m 30s ⏱️

Copy link
Member

@eavanvalkenburg eavanvalkenburg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good improvement! Well done! I have one thought, I would argue for a seperation of AgentThreads into Local and Remote, all the ones that use ChatHistory internally can be Local ones, with a bunch of shared code, while the Remote have to implement more to adhere to the base. Then for local ones we could also look at adding things like a VectorStore so you can create easily persist local threads as well, potentially aligning with agent memory work of @westey-m

from semantic_kernel.utils.naming import generate_random_ascii_name
from semantic_kernel.utils.validation import AGENT_NAME_REGEX

logger: logging.Logger = logging.getLogger(__name__)

TMessage = TypeVar("TMessage", bound=ChatMessageContent | StreamingChatMessageContent)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamingChatMessageContent is a subclass of ChatMessageContent, so might not even need this...

@@ -75,7 +130,7 @@ def _configure_plugins(cls, data: Any) -> Any:
return data

@abstractmethod
async def get_response(self, *args, **kwargs) -> ChatMessageContent:
async def get_response(self, *args, **kwargs) -> AgentResponseItem[ChatMessageContent]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also unify the params passed to this, to make this an actual abstraction?

Copy link
Contributor

@TaoChenOSU TaoChenOSU Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we can consolidate these now. For this particular one:

Suggested change
async def get_response(self, *args, **kwargs) -> AgentResponseItem[ChatMessageContent]:
async def get_response(self, *, message: str | ChatMessageContent, thread: AgentThread | None = None, **kwargs) -> AgentResponseItem[ChatMessageContent]:

raise NotImplementedError

@abstractmethod
async def start(self) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the type always str?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will be the underlying thread Id, or a guid string in the case of ChatCompletionThread.

@property
def is_active(self) -> bool:
"""Indicates whether the thread is currently active."""
return self._is_active
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not skip the _is_active and just do:

Suggested change
return self._is_active
return self._thread_id is not None

raise NotImplementedError

@abstractmethod
async def start(self) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a __aenter__ and __aexit__ that call start and end respectively?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this also but now I've changed my mind. We've changed the name of these from start/end to create/delete to better reflect what is actually happening. If we use an async context for this then we would always be deleting the thread when it goes out of local scope and I don't think that's what people would want or expect to happen in most cases. So the AgentThread will automatically create one if it'n not there, but it will not delete it for you, the caller needs to explicitly handle that.


Args:
message: The chat message content to post to the thread.
thread: An optional existing thread to configure. If None, a new AzureAIAgentThread is created.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
thread: An optional existing thread to configure. If None, a new AzureAIAgentThread is created.
thread: An optional existing thread to configure. If None, a new AutogenConversibleAgentThread is created.

):
self._chat_history.add_message(new_message)

async def retrieve_current_chat_history(self) -> ChatHistory:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be present in other threads as well?

@@ -75,7 +130,7 @@ def _configure_plugins(cls, data: Any) -> Any:
return data

@abstractmethod
async def get_response(self, *args, **kwargs) -> ChatMessageContent:
async def get_response(self, *args, **kwargs) -> AgentResponseItem[ChatMessageContent]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this API is meant to be the simplest way of dealing with a Agent, should this return this wrapped object, or just return the CMC, for me it would make sense to be able to just call and not worry about threads...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe in combination with a _current_thread_id/_current_thread attribute on the Agent, so that repeated calls to either this or the invoke methods, would just work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agents are stateless so we probably shouldn't manage the thread id inside the agents.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Tao, you could use the same agent with different threads concurrently so the agent can't store the thread state. I also wish there was a way to retain the CMC return type and smuggle a thread into it, but so far, we have not found a better option that this.

input_text: str,
thread: AgentThread | None = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very inconsistent, in the Azure AI agent, the thread is below the *...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and ChatCompletionAgent and other use, message and this one uses input_text, let's make that consistent as well

raise RuntimeError("Cannot retrieve chat history, since the thread is not currently active.")
return self._chat_history

async def reduce(self) -> ChatHistory | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not present in other Threads that have a internal ChatHistory?

"ChatCompletionAgent",
"ChatCompletionAgentThread",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also include the other agent types here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can - I kept them scoped to their types so:

from semantic_kernel.agents.azure_ai import AzureAIAgent
from semantic_kernel.agents.open_ai import AzureAssistantAgent

Since we're already having breaking changes, and we want to get to GA, we could simplify it to:

from semantic_kernel.agents import AzureAIAgent, AzureAssistantAgent

raise RuntimeError("Cannot retrieve chat history, since the thread is not currently active.")
return self._chat_history

async def reduce(self) -> ChatHistory | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this pattern of explicitly calling reduce still useful or should this happen automatically after a new m,essage has been added? With the new pattern you could also create different AgentThread derivatives that contain a ChatHistory and do the reduction in various ways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like having to manually call reduce... I would prefer it to happen manually. We should sync on this.

@override
async def start(self) -> str:
"""Starts the thread and returns its ID."""
if self._is_active:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be idempotent and just return the current thread id in this case?

@moonbox3
Copy link
Contributor Author

Changes now handled in a feature branch #11119. Will incorporate the necessary feedback there. Closing this PR to avoid any confusion.

@moonbox3 moonbox3 closed this Mar 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
agents documentation PR: breaking change Pull requests that introduce breaking changes python Pull requests for the Python Semantic Kernel
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants