diff --git a/python/samples/demos/self_hosted_agent/README.md b/python/samples/demos/self_hosted_agent/README.md new file mode 100644 index 000000000000..c88ac4e24374 --- /dev/null +++ b/python/samples/demos/self_hosted_agent/README.md @@ -0,0 +1,38 @@ +## Self Hosted Agents + +This sample demonstrates how to create multi-agent group chat application where agents are hosted outside of the Semantic Kernel. `agents` folder contains `fastapi` server that hosts 2 agents and exposes them via REST apis. `app` folder contains the Semantic Kernel application that orchestrates between the agents using `AgentGroupChat`. + +### Running the sample + +You will need to deploy Azure OpenAI models in [Azure AI Foundry](https://learn.microsoft.com/en-us/azure/ai-studio/). Keep the endpoint and the key handy. + +#### Running the server +1. Navigate to `agents` folder. +2. Create a virtual environment and install the dependencies. +```bash +uv venv --python=3.10 +source .venv/bin/activate +``` +3. Install the dependencies. +```bash +uv sync +``` +4. Create `.env` file using `.env.example` as template. +5. Run the server. +```bash +fastapi run main.py +``` + +Note the address the fastapi server is running on. You will need it in the next step. + +#### Running the app +1. In a different terminal, navigate to `app` folder. +2. Create `.env` file using `.env.example` as template. Replace `server_url` with the address of the fastapi server. +```bash +REVIEWER_AGENT_URL = "/agent/reviewer" +COPYWRITER_AGENT_URL = "/agent/copywriter" +``` +3. Run the app. +```bash +python main.py +``` diff --git a/python/samples/demos/self_hosted_agent/agents/.env.example b/python/samples/demos/self_hosted_agent/agents/.env.example new file mode 100644 index 000000000000..d163c2bf4aaf --- /dev/null +++ b/python/samples/demos/self_hosted_agent/agents/.env.example @@ -0,0 +1,3 @@ +AZURE_AI_AGENT_PROJECT_CONNECTION_STRING = "" +AZURE_AI_INFERENCE_ENDPOINT = "" +AZURE_AI_INFERENCE_MODEL_DEPLOYMENT_NAME = "" \ No newline at end of file diff --git a/python/samples/demos/self_hosted_agent/agents/main.py b/python/samples/demos/self_hosted_agent/agents/main.py new file mode 100644 index 000000000000..638921f80a59 --- /dev/null +++ b/python/samples/demos/self_hosted_agent/agents/main.py @@ -0,0 +1,90 @@ +# Copyright (c) Microsoft. All rights reserved. + +import logging +import os +from abc import ABC +from typing import Any + +from azure.ai.projects.aio import AIProjectClient +from azure.identity.aio import DefaultAzureCredential +from dotenv import load_dotenv +from fastapi import FastAPI +from openai.types.chat.chat_completion import ChatCompletion +from openai.types.chat.chat_completion_system_message_param import ChatCompletionSystemMessageParam +from pydantic import BaseModel, ConfigDict + +load_dotenv() + +app = FastAPI() + +client = AIProjectClient.from_connection_string( + credential=DefaultAzureCredential(), conn_str=os.environ["AZURE_AI_AGENT_PROJECT_CONNECTION_STRING"] +) + +logger = logging.getLogger("custom_agent_logger") + +REVIEWER_INSTRUCTIONS = """ +You are an art director who has opinions about copywriting born of a love for David Ogilvy. +The goal is to determine if the given copy is acceptable to print. +If so, state that it is approved. Only include the word "approved" if it is so. +If not, provide insight on how to refine suggested copy without example. +""" + +COPYWRITER_INSTRUCTIONS = """ +You are a copywriter with ten years of experience and are known for brevity and a dry humor. +The goal is to refine and decide on the single best copy as an expert in the field. +Only provide a single proposal per response. +You're laser focused on the goal at hand. +Don't waste time with chit chat. +Consider suggestions when refining an idea. +""" + + +class APIRequestFormat(BaseModel, ABC): + """Specific settings for the Chat Completion endpoint.""" + + model_config = ConfigDict(populate_by_name=True, arbitrary_types_allowed=True, validate_assignment=True) + + stream: bool = False + messages: list[dict[str, Any]] + parallel_tool_calls: bool | None = None + tools: list[dict[str, Any]] | None = None + tool_choice: str | None = None + + +@app.post("/agent/reviewer") +async def reviewer_agent(request: APIRequestFormat) -> ChatCompletion: + messages = request.messages + logger.info("Financial Coach agent") + logger.info(messages) + openai = await client.inference.get_azure_openai_client(api_version="2024-06-01") + # replace the system message in messages with custom system message + if messages[0]["role"] == "system" or messages[0]["role"] == "developer": + messages[0]["content"] = REVIEWER_INSTRUCTIONS + else: + messages = [ChatCompletionSystemMessageParam(role="system", content=REVIEWER_INSTRUCTIONS), *messages] + + return await openai.chat.completions.create( + model=os.getenv("AZURE_AI_INFERENCE_MODEL_DEPLOYMENT_NAME") or "gpt-4o", + messages=messages, + **request.model_dump(exclude={"messages"}, exclude_none=True), + ) + + +@app.post("/agent/copywriter") +async def copywriter_agent(request: APIRequestFormat) -> ChatCompletion: + messages = request.messages + logger.info("Guardrail agent") + logger.info(messages) + openai = await client.inference.get_azure_openai_client(api_version="2024-06-01") + # replace the system message in messages with custom system message + if messages[0]["role"] == "system" or messages[0]["role"] == "developer": + messages[0]["content"] = COPYWRITER_INSTRUCTIONS + else: + messages = [ChatCompletionSystemMessageParam(role="system", content=COPYWRITER_INSTRUCTIONS), *messages] + + return await openai.chat.completions.create( + model=os.getenv("AZURE_AI_INFERENCE_MODEL_DEPLOYMENT_NAME") or "gpt-4o", + messages=messages, + **request.model_dump(exclude={"messages"}, exclude_none=True), + ) diff --git a/python/samples/demos/self_hosted_agent/agents/pyproject.toml b/python/samples/demos/self_hosted_agent/agents/pyproject.toml new file mode 100644 index 000000000000..be2eb46b1b4d --- /dev/null +++ b/python/samples/demos/self_hosted_agent/agents/pyproject.toml @@ -0,0 +1,15 @@ +[project] +name = "agents" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "aiohttp>=3.11.12", + "azure-ai-projects>=1.0.0b5", + "azure-identity>=1.20.0", + "fastapi[standard]>=0.115.8", + "openai>=1.62.0", + "pydantic>=2.10.6", + "python-dotenv>=1.0.1", +] diff --git a/python/samples/demos/self_hosted_agent/app/.env.example b/python/samples/demos/self_hosted_agent/app/.env.example new file mode 100644 index 000000000000..7c4acd75a169 --- /dev/null +++ b/python/samples/demos/self_hosted_agent/app/.env.example @@ -0,0 +1,2 @@ +REVIEWER_AGENT_URL = "/agent/reviewer" +COPYWRITER_AGENT_URL = "/agent/copywriter" diff --git a/python/samples/demos/self_hosted_agent/app/main.py b/python/samples/demos/self_hosted_agent/app/main.py new file mode 100644 index 000000000000..1b17034e508a --- /dev/null +++ b/python/samples/demos/self_hosted_agent/app/main.py @@ -0,0 +1,75 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import os +import sys + +from dotenv import load_dotenv +from self_hosted_api_chat_completion import SelfHostedChatCompletion + +from semantic_kernel.agents import AgentGroupChat, ChatCompletionAgent +from semantic_kernel.agents.strategies.termination.termination_strategy import TerminationStrategy +from semantic_kernel.contents import AuthorRole, ChatMessageContent +from semantic_kernel.kernel import Kernel + +load_dotenv() + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +##################################################################### +# The following sample demonstrates how to create a self-hosted.....# +# and have them participate in a group chat to work towards ......# +# the user's requirement. # +##################################################################### + + +class ApprovalTerminationStrategy(TerminationStrategy): + """A strategy for determining when an agent should terminate.""" + + async def should_agent_terminate(self, agent, history): + """Check if the agent should terminate.""" + return "approved" in history[-1].content.lower() + + +REVIEWER_NAME = "ArtDirector" +COPYWRITER_NAME = "CopyWriter" + + +async def main(): + try: + kernel = Kernel() + + agent_reviewer = ChatCompletionAgent( + id="artdirector", + kernel=kernel, + name=REVIEWER_NAME, + service=SelfHostedChatCompletion(url=os.getenv("REVIEWER_AGENT_URL") or "", ai_model_id=REVIEWER_NAME), + ) + + agent_writer = ChatCompletionAgent( + id="copywriter", + kernel=kernel, + name=COPYWRITER_NAME, + service=SelfHostedChatCompletion(url=os.getenv("COPYWRITER_AGENT_URL") or "", ai_model_id=COPYWRITER_NAME), + ) + + chat = AgentGroupChat( + agents=[agent_writer, agent_reviewer], + termination_strategy=ApprovalTerminationStrategy(agents=[agent_reviewer], maximum_iterations=10), + ) + + input = "a slogan for a new line of electric cars." + + await chat.add_chat_message(ChatMessageContent(role=AuthorRole.USER, content=input)) + print(f"# {AuthorRole.USER}: '{input}'") + + async for content in chat.invoke(): + print(f"# {content.role} - {content.name or '*'}: '{content.content}'") + + print(f"# IS COMPLETE: {chat.is_complete}") + except Exception as e: + print(e) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/demos/self_hosted_agent/app/self_hosted_api_chat_completion.py b/python/samples/demos/self_hosted_agent/app/self_hosted_api_chat_completion.py new file mode 100644 index 000000000000..161fc2c687bb --- /dev/null +++ b/python/samples/demos/self_hosted_agent/app/self_hosted_api_chat_completion.py @@ -0,0 +1,362 @@ +# Copyright (c) Microsoft. All rights reserved. + +import sys +from collections.abc import AsyncGenerator, Callable +from typing import TYPE_CHECKING, Any, ClassVar, cast + +from pydantic import BaseModel + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + +import httpx +from openai import AsyncStream, BadRequestError +from openai.lib._parsing._completions import type_to_response_format_param +from openai.types import CompletionUsage as OpenAICompletionUsage +from openai.types.chat.chat_completion import Choice +from openai.types.chat.chat_completion_chunk import ChatCompletionChunk, ChoiceDeltaFunctionCall, ChoiceDeltaToolCall +from openai.types.chat.chat_completion_chunk import Choice as ChunkChoice +from openai.types.chat.chat_completion_message import FunctionCall +from openai.types.chat.chat_completion_message_tool_call import ChatCompletionMessageToolCall + +from semantic_kernel.connectors.ai.chat_completion_client_base import ChatCompletionClientBase +from semantic_kernel.connectors.ai.completion_usage import CompletionUsage +from semantic_kernel.connectors.ai.function_calling_utils import update_settings_from_function_call_configuration +from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceType +from semantic_kernel.connectors.ai.open_ai import OpenAIChatPromptExecutionSettings +from semantic_kernel.connectors.ai.open_ai.exceptions.content_filter_ai_exception import ContentFilterAIException +from semantic_kernel.connectors.utils.structured_output_schema import generate_structured_output_response_format_schema +from semantic_kernel.contents.annotation_content import AnnotationContent +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.file_reference_content import FileReferenceContent +from semantic_kernel.contents.function_call_content import FunctionCallContent +from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent +from semantic_kernel.contents.streaming_text_content import StreamingTextContent +from semantic_kernel.contents.text_content import TextContent +from semantic_kernel.contents.utils.author_role import AuthorRole +from semantic_kernel.contents.utils.finish_reason import FinishReason +from semantic_kernel.exceptions import ( + ServiceInvalidExecutionSettingsError, + ServiceInvalidResponseError, + ServiceResponseException, +) +from semantic_kernel.schema.kernel_json_schema_builder import KernelJsonSchemaBuilder +from semantic_kernel.utils.telemetry.model_diagnostics.decorators import ( + trace_chat_completion, + trace_streaming_chat_completion, +) + +if TYPE_CHECKING: + from semantic_kernel.connectors.ai.function_call_choice_configuration import FunctionCallChoiceConfiguration + from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings + + +class SelfHostedChatCompletion(ChatCompletionClientBase): + """Self-Hosted Agent Chat completion class.""" + + MODEL_PROVIDER_NAME: ClassVar[str] = "customapi" + SUPPORTS_FUNCTION_CALLING: ClassVar[bool] = True + url: str + + # region Overriding base class methods + # most of the methods are overridden from the ChatCompletionClientBase class, otherwise it is mentioned + + # Override from AIServiceClientBase + + @override + def get_prompt_execution_settings_class(self) -> type["PromptExecutionSettings"]: + return OpenAIChatPromptExecutionSettings + + # Override from AIServiceClientBase + @override + def service_url(self) -> str | None: + return self.url + + @override + @trace_chat_completion(MODEL_PROVIDER_NAME) + async def _inner_get_chat_message_contents( + self, + chat_history: "ChatHistory", + settings: "PromptExecutionSettings", + ) -> list["ChatMessageContent"]: + if not isinstance(settings, OpenAIChatPromptExecutionSettings): + settings = self.get_prompt_execution_settings_from_settings(settings) + assert isinstance(settings, OpenAIChatPromptExecutionSettings) # nosec + + settings.stream = False + settings.messages = self._prepare_chat_history_for_request(chat_history) + settings.ai_model_id = settings.ai_model_id or self.ai_model_id + + return await self._send_completion_request(settings) + + @override + @trace_streaming_chat_completion(MODEL_PROVIDER_NAME) + async def _inner_get_streaming_chat_message_contents( + self, + chat_history: "ChatHistory", + settings: "PromptExecutionSettings", + function_invoke_attempt: int = 0, + ) -> AsyncGenerator[list["StreamingChatMessageContent"], Any]: + if not isinstance(settings, OpenAIChatPromptExecutionSettings): + settings = self.get_prompt_execution_settings_from_settings(settings) + assert isinstance(settings, OpenAIChatPromptExecutionSettings) # nosec + + settings.stream = True + settings.stream_options = {"include_usage": True} + settings.messages = self._prepare_chat_history_for_request(chat_history) + settings.ai_model_id = settings.ai_model_id or self.ai_model_id + + response = await self._send_completion_request(settings) + if not isinstance(response, AsyncStream): + raise ServiceInvalidResponseError("Expected an AsyncStream[ChatCompletionChunk] response.") + async for chunk in response: + if len(chunk.choices) == 0 and chunk.usage is None: + continue + + assert isinstance(chunk, ChatCompletionChunk) # nosec + chunk_metadata = self._get_metadata_from_streaming_chat_response(chunk) + if chunk.usage is not None: + # Usage is contained in the last chunk where the choices are empty + # We are duplicating the usage metadata to all the choices in the response + yield [ + StreamingChatMessageContent( + role=AuthorRole.ASSISTANT, + content="", + choice_index=i, + inner_content=chunk, + ai_model_id=settings.ai_model_id, + metadata=chunk_metadata, + function_invoke_attempt=function_invoke_attempt, + ) + for i in range(settings.number_of_responses or 1) + ] + else: + yield [ + self._create_streaming_chat_message_content(chunk, choice, chunk_metadata, function_invoke_attempt) + for choice in chunk.choices + ] + + @override + def _verify_function_choice_settings(self, settings: "PromptExecutionSettings") -> None: + if not isinstance(settings, OpenAIChatPromptExecutionSettings): + raise ServiceInvalidExecutionSettingsError("The settings must be an OpenAIChatPromptExecutionSettings.") + if settings.number_of_responses is not None and settings.number_of_responses > 1: + raise ServiceInvalidExecutionSettingsError( + "Auto-invocation of tool calls may only be used with a " + "OpenAIChatPromptExecutions.number_of_responses of 1." + ) + + @override + def _update_function_choice_settings_callback( + self, + ) -> Callable[["FunctionCallChoiceConfiguration", "PromptExecutionSettings", FunctionChoiceType], None]: + return update_settings_from_function_call_configuration + + @override + def _reset_function_choice_settings(self, settings: "PromptExecutionSettings") -> None: + if hasattr(settings, "tool_choice"): + settings.tool_choice = None + if hasattr(settings, "tools"): + settings.tools = None + + # endregion + + # region content creation + async def _send_completion_request( + self, + settings: OpenAIChatPromptExecutionSettings, + ) -> list[ChatMessageContent]: + """Execute the appropriate call to self-hosted agents.""" + try: + settings_dict = settings.prepare_settings_dict() + self._handle_structured_output(settings, settings_dict) + if settings.tools is None: + settings_dict.pop("parallel_tool_calls", None) + async with httpx.AsyncClient(timeout=30) as client: + raw_response = await client.post(self.url, json=settings_dict, timeout=30) + response = raw_response.json() + response_metadata = self._get_metadata_from_chat_response(response) + return [ + self._create_chat_message_content(response, Choice(**choice), response_metadata) + for choice in response["choices"] + ] + # return ChatCompletion(**response) + except BadRequestError as ex: + if ex.code == "content_filter": + raise ContentFilterAIException( + f"{type(self)} service encountered a content error", + ex, + ) from ex + raise ServiceResponseException( + f"{type(self)} service failed to complete the prompt", + ex, + ) from ex + except Exception as ex: + raise ServiceResponseException( + f"{type(self)} service failed to complete the prompt", + ex, + ) from ex + + def _handle_structured_output( + self, request_settings: OpenAIChatPromptExecutionSettings, settings: dict[str, Any] + ) -> None: + response_format = getattr(request_settings, "response_format", None) + if getattr(request_settings, "structured_json_response", False) and response_format: + # Case 1: response_format is a type and subclass of BaseModel + if isinstance(response_format, type) and issubclass(response_format, BaseModel): + settings["response_format"] = type_to_response_format_param(response_format) + # Case 2: response_format is a type but not a subclass of BaseModel + elif isinstance(response_format, type): + generated_schema = KernelJsonSchemaBuilder.build(parameter_type=response_format, structured_output=True) + assert generated_schema is not None # nosec + settings["response_format"] = generate_structured_output_response_format_schema( + name=response_format.__name__, schema=generated_schema + ) + # Case 3: response_format is a dictionary, pass it without modification + elif isinstance(response_format, dict): + settings["response_format"] = response_format + + def _create_chat_message_content( + self, response: Any, choice: Choice, response_metadata: dict[str, Any] + ) -> "ChatMessageContent": + """Create a chat message content object from a choice.""" + metadata = self._get_metadata_from_chat_choice(choice) + metadata.update(response_metadata) + + items: list[Any] = self._get_tool_calls_from_chat_choice(choice) + items.extend(self._get_function_call_from_chat_choice(choice)) + if choice.message.content: + items.append(TextContent(text=choice.message.content)) + elif hasattr(choice.message, "refusal") and choice.message.refusal: + items.append(TextContent(text=choice.message.refusal)) + + return ChatMessageContent( + inner_content=response, + ai_model_id=self.ai_model_id, + metadata=metadata, + role=AuthorRole(choice.message.role), + items=items, + finish_reason=(FinishReason(choice.finish_reason) if choice.finish_reason else None), + ) + + def _create_streaming_chat_message_content( + self, + chunk: ChatCompletionChunk, + choice: ChunkChoice, + chunk_metadata: dict[str, Any], + function_invoke_attempt: int, + ) -> StreamingChatMessageContent: + """Create a streaming chat message content object from a choice.""" + metadata = self._get_metadata_from_chat_choice(choice) + metadata.update(chunk_metadata) + + items: list[Any] = self._get_tool_calls_from_chat_choice(choice) + items.extend(self._get_function_call_from_chat_choice(choice)) + if choice.delta and choice.delta.content is not None: + items.append(StreamingTextContent(choice_index=choice.index, text=choice.delta.content)) + return StreamingChatMessageContent( + choice_index=choice.index, + inner_content=chunk, + ai_model_id=self.ai_model_id, + metadata=metadata, + role=(AuthorRole(choice.delta.role) if choice.delta and choice.delta.role else AuthorRole.ASSISTANT), + finish_reason=(FinishReason(choice.finish_reason) if choice.finish_reason else None), + items=items, + function_invoke_attempt=function_invoke_attempt, + ) + + def _get_metadata_from_chat_response(self, response: Any) -> dict[str, Any]: + """Get metadata from a chat response.""" + return { + "id": response["id"], + "created": response["created"], + "system_fingerprint": response["system_fingerprint"], + "usage": CompletionUsage.from_openai(OpenAICompletionUsage(**response["usage"])) + if response["usage"] is not None + else None, + } + + def _get_metadata_from_streaming_chat_response(self, response: ChatCompletionChunk) -> dict[str, Any]: + """Get metadata from a streaming chat response.""" + return { + "id": response.id, + "created": response.created, + "system_fingerprint": response.system_fingerprint, + "usage": CompletionUsage.from_openai(response.usage) if response.usage is not None else None, + } + + def _get_metadata_from_chat_choice(self, choice: Choice | ChunkChoice) -> dict[str, Any]: + """Get metadata from a chat choice.""" + return { + "logprobs": getattr(choice, "logprobs", None), + } + + def _get_tool_calls_from_chat_choice(self, choice: Choice | ChunkChoice) -> list[FunctionCallContent]: + """Get tool calls from a chat choice.""" + content = choice.message if isinstance(choice, Choice) else choice.delta + if content and (tool_calls := getattr(content, "tool_calls", None)) is not None: + return [ + FunctionCallContent( + id=tool.id, + index=getattr(tool, "index", None), + name=tool.function.name, + arguments=tool.function.arguments, + ) + for tool in cast(list[ChatCompletionMessageToolCall] | list[ChoiceDeltaToolCall], tool_calls) + if tool.function is not None + ] + # When you enable asynchronous content filtering in Azure OpenAI, you may receive empty deltas + return [] + + def _get_function_call_from_chat_choice(self, choice: Choice | ChunkChoice) -> list[FunctionCallContent]: + """Get a function call from a chat choice.""" + content = choice.message if isinstance(choice, Choice) else choice.delta + if content and (function_call := getattr(content, "function_call", None)) is not None: + function_call = cast(FunctionCall | ChoiceDeltaFunctionCall, function_call) + return [ + FunctionCallContent( + id="legacy_function_call", name=function_call.name, arguments=function_call.arguments + ) + ] + # When you enable asynchronous content filtering in Azure OpenAI, you may receive empty deltas + return [] + + def _prepare_chat_history_for_request( + self, + chat_history: "ChatHistory", + role_key: str = "role", + content_key: str = "content", + ) -> Any: + """Prepare the chat history for a request. + + Allowing customization of the key names for role/author, and optionally overriding the role. + + ChatRole.TOOL messages need to be formatted different than system/user/assistant messages: + They require a "tool_call_id" and (function) "name" key, and the "metadata" key should + be removed. The "encoding" key should also be removed. + + Override this method to customize the formatting of the chat history for a request. + + Args: + chat_history (ChatHistory): The chat history to prepare. + role_key (str): The key name for the role/author. + content_key (str): The key name for the content/message. + + Returns: + prepared_chat_history (Any): The prepared chat history for a request. + """ + return [ + { + **message.to_dict(role_key=role_key, content_key=content_key), + role_key: "developer" + if self.instruction_role == "developer" and message.to_dict(role_key=role_key)[role_key] == "system" + else message.to_dict(role_key=role_key)[role_key], + } + for message in chat_history.messages + if not isinstance(message, (AnnotationContent, FileReferenceContent)) + ] + + # endregion