-
Notifications
You must be signed in to change notification settings - Fork 4k
Python: Support a unified agent thread #11104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Python Test Coverage Report •Python Unit Test Overview
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really good improvement! Well done! I have one thought, I would argue for a seperation of AgentThreads into Local and Remote, all the ones that use ChatHistory internally can be Local ones, with a bunch of shared code, while the Remote have to implement more to adhere to the base. Then for local ones we could also look at adding things like a VectorStore so you can create easily persist local threads as well, potentially aligning with agent memory work of @westey-m
from semantic_kernel.utils.naming import generate_random_ascii_name | ||
from semantic_kernel.utils.validation import AGENT_NAME_REGEX | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
TMessage = TypeVar("TMessage", bound=ChatMessageContent | StreamingChatMessageContent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamingChatMessageContent is a subclass of ChatMessageContent, so might not even need this...
@@ -75,7 +130,7 @@ def _configure_plugins(cls, data: Any) -> Any: | |||
return data | |||
|
|||
@abstractmethod | |||
async def get_response(self, *args, **kwargs) -> ChatMessageContent: | |||
async def get_response(self, *args, **kwargs) -> AgentResponseItem[ChatMessageContent]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also unify the params passed to this, to make this an actual abstraction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we can consolidate these now. For this particular one:
async def get_response(self, *args, **kwargs) -> AgentResponseItem[ChatMessageContent]: | |
async def get_response(self, *, message: str | ChatMessageContent, thread: AgentThread | None = None, **kwargs) -> AgentResponseItem[ChatMessageContent]: |
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def start(self) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the type always str
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will be the underlying thread Id, or a guid string in the case of ChatCompletionThread.
@property | ||
def is_active(self) -> bool: | ||
"""Indicates whether the thread is currently active.""" | ||
return self._is_active |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not skip the _is_active
and just do:
return self._is_active | |
return self._thread_id is not None |
raise NotImplementedError | ||
|
||
@abstractmethod | ||
async def start(self) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a __aenter__
and __aexit__
that call start and end respectively?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought this also but now I've changed my mind. We've changed the name of these from start/end to create/delete to better reflect what is actually happening. If we use an async context for this then we would always be deleting the thread when it goes out of local scope and I don't think that's what people would want or expect to happen in most cases. So the AgentThread will automatically create one if it'n not there, but it will not delete it for you, the caller needs to explicitly handle that.
|
||
Args: | ||
message: The chat message content to post to the thread. | ||
thread: An optional existing thread to configure. If None, a new AzureAIAgentThread is created. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thread: An optional existing thread to configure. If None, a new AzureAIAgentThread is created. | |
thread: An optional existing thread to configure. If None, a new AutogenConversibleAgentThread is created. |
): | ||
self._chat_history.add_message(new_message) | ||
|
||
async def retrieve_current_chat_history(self) -> ChatHistory: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be present in other threads as well?
@@ -75,7 +130,7 @@ def _configure_plugins(cls, data: Any) -> Any: | |||
return data | |||
|
|||
@abstractmethod | |||
async def get_response(self, *args, **kwargs) -> ChatMessageContent: | |||
async def get_response(self, *args, **kwargs) -> AgentResponseItem[ChatMessageContent]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this API is meant to be the simplest way of dealing with a Agent, should this return this wrapped object, or just return the CMC, for me it would make sense to be able to just call and not worry about threads...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe in combination with a _current_thread_id
/_current_thread
attribute on the Agent, so that repeated calls to either this or the invoke methods, would just work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agents are stateless so we probably shouldn't manage the thread id inside the agents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Tao, you could use the same agent with different threads concurrently so the agent can't store the thread state. I also wish there was a way to retain the CMC return type and smuggle a thread into it, but so far, we have not found a better option that this.
input_text: str, | ||
thread: AgentThread | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very inconsistent, in the Azure AI agent, the thread is below the *...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and ChatCompletionAgent and other use, message
and this one uses input_text
, let's make that consistent as well
raise RuntimeError("Cannot retrieve chat history, since the thread is not currently active.") | ||
return self._chat_history | ||
|
||
async def reduce(self) -> ChatHistory | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this not present in other Threads that have a internal ChatHistory?
"ChatCompletionAgent", | ||
"ChatCompletionAgentThread", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also include the other agent types here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can - I kept them scoped to their types so:
from semantic_kernel.agents.azure_ai import AzureAIAgent
from semantic_kernel.agents.open_ai import AzureAssistantAgent
Since we're already having breaking changes, and we want to get to GA, we could simplify it to:
from semantic_kernel.agents import AzureAIAgent, AzureAssistantAgent
raise RuntimeError("Cannot retrieve chat history, since the thread is not currently active.") | ||
return self._chat_history | ||
|
||
async def reduce(self) -> ChatHistory | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this pattern of explicitly calling reduce still useful or should this happen automatically after a new m,essage has been added? With the new pattern you could also create different AgentThread derivatives that contain a ChatHistory and do the reduction in various ways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like having to manually call reduce... I would prefer it to happen manually. We should sync on this.
@override | ||
async def start(self) -> str: | ||
"""Starts the thread and returns its ID.""" | ||
if self._is_active: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be idempotent and just return the current thread id in this case?
Changes now handled in a feature branch #11119. Will incorporate the necessary feedback there. Closing this PR to avoid any confusion. |
Motivation and Context
Currently, OpenAI Assistants, Azure AI Agents, and the Bedrock Agent (to some degree) support the concept of a thread ID. We lack a consistent abstraction to handle threads through SK Agents. This PR brings in the thread abstraction, via an
AgentThread
base class, and allows each agent to implement its own thread. This now means that there is a concept of a thread for a chat completion agent, which before, only used the ChatHistory. TheChatCompletionAgentThread
now manages an underlying ChatHistory, and allows a user to pass in their own ChatHistory as needed.Additionally, now all
get_response
,invoke
, orinvoke_stream
return types were updated toAgentResponseItem
which contains the ChatMessageContent and the thread, returned to the caller. The thread is required to be returned now because a user can kick off an agent invoke without having to pass in the thread. One will be created on their behalf, and then returned for later invocations.This PR contains several breaking changes to different SK Agents. We're aware that this can cause disruption, but this is a necessary change we need to get in as we move towards an "Agent First" / Agent Thread model.
All samples were updated to the new model. Docs updates coming soon.
Description
Introduce the unified thread model to all SK agents.
NOTE The Bedrock agents unit tests are currently failing. I will fix them.
Contribution Checklist