diff --git a/python/.env.example b/python/.env.example index 3158a3832433..b7154cdb706f 100644 --- a/python/.env.example +++ b/python/.env.example @@ -45,4 +45,5 @@ AZCOSMOS_CONTAINER_NAME = "" ASTRADB_APP_TOKEN="" ASTRADB_ID="" ASTRADB_REGION="" -ASTRADB_KEYSPACE="" \ No newline at end of file +ASTRADB_KEYSPACE="" +ACA_POOL_MANAGEMENT_ENDPOINT="" \ No newline at end of file diff --git a/python/samples/concepts/auto_function_calling/azure_python_code_interpreter_function_calling.py b/python/samples/concepts/auto_function_calling/azure_python_code_interpreter_function_calling.py new file mode 100644 index 000000000000..8280faeea204 --- /dev/null +++ b/python/samples/concepts/auto_function_calling/azure_python_code_interpreter_function_calling.py @@ -0,0 +1,125 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import datetime + +from azure.core.credentials import AccessToken +from azure.core.exceptions import ClientAuthenticationError +from azure.identity import DefaultAzureCredential + +from semantic_kernel.connectors.ai.function_call_behavior import FunctionCallBehavior +from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import ( + AzureChatPromptExecutionSettings, +) +from semantic_kernel.connectors.ai.open_ai.services.azure_chat_completion import AzureChatCompletion +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import ( + SessionsPythonTool, +) +from semantic_kernel.core_plugins.time_plugin import TimePlugin +from semantic_kernel.exceptions.function_exceptions import FunctionExecutionException +from semantic_kernel.functions.kernel_arguments import KernelArguments +from semantic_kernel.kernel import Kernel +from semantic_kernel.utils.settings import ( + azure_container_apps_settings_from_dot_env_as_dict, + azure_openai_settings_from_dot_env_as_dict, +) + +auth_token: AccessToken | None = None + +ACA_TOKEN_ENDPOINT = "https://acasessions.io/.default" + + +async def auth_callback() -> str: + """Auth callback for the SessionsPythonTool. + This is a sample auth callback that shows how to use Azure's DefaultAzureCredential + to get an access token. + """ + global auth_token + current_utc_timestamp = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + + if not auth_token or auth_token.expires_on < current_utc_timestamp: + credential = DefaultAzureCredential() + + try: + auth_token = credential.get_token(ACA_TOKEN_ENDPOINT) + except ClientAuthenticationError as cae: + err_messages = getattr(cae, "messages", []) + raise FunctionExecutionException( + f"Failed to retrieve the client auth token with messages: {' '.join(err_messages)}" + ) from cae + + return auth_token.token + + +kernel = Kernel() + +service_id = "sessions-tool" +chat_service = AzureChatCompletion( + service_id=service_id, **azure_openai_settings_from_dot_env_as_dict(include_api_version=True) +) +kernel.add_service(chat_service) + +sessions_tool = SessionsPythonTool( + **azure_container_apps_settings_from_dot_env_as_dict(), + auth_callback=auth_callback, +) + +kernel.add_plugin(sessions_tool, "SessionsTool") +kernel.add_plugin(TimePlugin(), "Time") + +chat_function = kernel.add_function( + prompt="{{$chat_history}}{{$user_input}}", + plugin_name="ChatBot", + function_name="Chat", +) + +req_settings = AzureChatPromptExecutionSettings(service_id=service_id, tool_choice="auto") + +filter = {"excluded_plugins": ["ChatBot"]} +req_settings.function_call_behavior = FunctionCallBehavior.EnableFunctions(auto_invoke=True, filters=filter) + +arguments = KernelArguments(settings=req_settings) + +history = ChatHistory() + + +async def chat() -> bool: + try: + user_input = input("User:> ") + except KeyboardInterrupt: + print("\n\nExiting chat...") + return False + except EOFError: + print("\n\nExiting chat...") + return False + + if user_input == "exit": + print("\n\nExiting chat...") + return False + + arguments["chat_history"] = history + arguments["user_input"] = user_input + answer = await kernel.invoke( + function=chat_function, + arguments=arguments, + ) + print(f"Mosscap:> {answer}") + history.add_user_message(user_input) + history.add_assistant_message(str(answer)) + return True + + +async def main() -> None: + print( + "Welcome to the chat bot!\ + \n Type 'exit' to exit.\ + \n Try a Python code execution question to see the function calling in action (i.e. what is 1+1?)." + ) + chatting = True + while chatting: + chatting = await chat() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/concepts/plugins/azure_python_code_interpreter.py b/python/samples/concepts/plugins/azure_python_code_interpreter.py new file mode 100644 index 000000000000..6c773afe939d --- /dev/null +++ b/python/samples/concepts/plugins/azure_python_code_interpreter.py @@ -0,0 +1,70 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import datetime + +from azure.core.credentials import AccessToken +from azure.core.exceptions import ClientAuthenticationError +from azure.identity import DefaultAzureCredential + +from semantic_kernel.connectors.ai.open_ai.services.azure_chat_completion import AzureChatCompletion +from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import ( + SessionsPythonTool, +) +from semantic_kernel.exceptions.function_exceptions import FunctionExecutionException +from semantic_kernel.kernel import Kernel +from semantic_kernel.utils.settings import ( + azure_container_apps_settings_from_dot_env_as_dict, + azure_openai_settings_from_dot_env_as_dict, +) + +auth_token: AccessToken | None = None + +ACA_TOKEN_ENDPOINT = "https://acasessions.io/.default" + + +async def auth_callback() -> str: + """Auth callback for the SessionsPythonTool. + This is a sample auth callback that shows how to use Azure's DefaultAzureCredential + to get an access token. + """ + global auth_token + current_utc_timestamp = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + + if not auth_token or auth_token.expires_on < current_utc_timestamp: + credential = DefaultAzureCredential() + + try: + auth_token = credential.get_token(ACA_TOKEN_ENDPOINT) + except ClientAuthenticationError as cae: + err_messages = getattr(cae, "messages", []) + raise FunctionExecutionException( + f"Failed to retrieve the client auth token with messages: {' '.join(err_messages)}" + ) from cae + + return auth_token.token + + +async def main(): + kernel = Kernel() + + service_id = "python-code-interpreter" + chat_service = AzureChatCompletion( + service_id=service_id, **azure_openai_settings_from_dot_env_as_dict(include_api_version=True) + ) + kernel.add_service(chat_service) + + python_code_interpreter = SessionsPythonTool( + **azure_container_apps_settings_from_dot_env_as_dict(), auth_callback=auth_callback + ) + + sessions_tool = kernel.add_plugin(python_code_interpreter, "PythonCodeInterpreter") + + code = "import json\n\ndef add_numbers(a, b):\n return a + b\n\nargs = '{\"a\": 1, \"b\": 1}'\nargs_dict = json.loads(args)\nprint(add_numbers(args_dict['a'], args_dict['b']))" # noqa: E501 + result = await kernel.invoke(sessions_tool["execute_code"], code=code) + + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/semantic_kernel/core_plugins/__init__.py b/python/semantic_kernel/core_plugins/__init__.py index 568c6b993769..0f6aed98a679 100644 --- a/python/semantic_kernel/core_plugins/__init__.py +++ b/python/semantic_kernel/core_plugins/__init__.py @@ -5,6 +5,9 @@ ) from semantic_kernel.core_plugins.http_plugin import HttpPlugin from semantic_kernel.core_plugins.math_plugin import MathPlugin +from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import ( + SessionsPythonTool, +) from semantic_kernel.core_plugins.text_memory_plugin import TextMemoryPlugin from semantic_kernel.core_plugins.text_plugin import TextPlugin from semantic_kernel.core_plugins.time_plugin import TimePlugin @@ -17,5 +20,6 @@ "HttpPlugin", "ConversationSummaryPlugin", "MathPlugin", + "SessionsPythonTool", "WebSearchEnginePlugin", ] diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/README.md b/python/semantic_kernel/core_plugins/sessions_python_tool/README.md new file mode 100644 index 000000000000..9ac97aafa8b9 --- /dev/null +++ b/python/semantic_kernel/core_plugins/sessions_python_tool/README.md @@ -0,0 +1,132 @@ +# Getting Started with the Sessions Python Plugin + +## Authentication to ARM (management.azure.com) + +For any call to ARM (management.azure.com), use the access token retrieved from the below call: + +```az account get-access-token --resource ``` + +## Generate a Session Pool + +a. Call the following API to generate a Session Pool: + +```PUT ``` + +Body properties: + +- location: Azure Region +- properties: + - poolManagementType: + - Today there are two Pool Management Types supported: + - "Manual" + - In this model, the user will call generateSessions API which supports batch mode (to generate 100s of sessions in one API call, and then user is free to update/specialize the session as needed or execute code in the session) + - "Dynamic" + - In this mode, the pool management is handled by the platform. Currently, the dynamic mode is only implemented for Python code execution scenario, which has its own APIs to execute code. + - maxConcurrentSessions: + - Maximum number of active sessions allowed + - name: + - Name of the sessions pool + - dynamicPoolConfiguration: Specifies the type of sessions generated by the platform + - poolType: Type of images used for the pool + - Valid values ["JupyterPython", "FunctionsPython"] + - executionType: + - Valid values ["Timed"] + - coolDownPeriodSeconds: + - Integer representing the maximum time allowed before the platform scales down the container + - sessionPoolSecrets: Secrets associated with the Session Pool + - name: Name of the secret + - value: Secret Value + +Example Generation of Session Pool: + +```json +{ + "location": "koreacentral", + "properties": { + "poolManagementType": "Dynamic", + "maxConcurrentSessions": 10, + "name": "{{SessionPoolName}}", + "dynamicPoolConfiguration": { + "poolType": "JupyterPython", + "executionType": "Timed", + "coolDownPeriodInSecond": 310 + } + } +} +``` + +Curl Example: + +```curl +curl -X PUT "https://management.azure.com/subscriptions/{{SubscriptionId}}/resourceGroups/{{ResourceGroup}}/providers/Microsoft.App/sessionPools/{{SessionPoolName}}?api-version=2023-08-01-preview" \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer $token" \ + -d '{"location": "koreacentral","properties": { "poolManagementType": "Dynamic", "maxConcurrentSessions": 10, "name": "{{SessionPoolName}}", "dynamicPoolConfiguration": { "poolType": "JupyterPython", "executionType": "Timed", "coolDownPeriodInSecond": 310 } } }' +``` + +If all goes well, you should receive a 200 Status Code. The response will contain a `poolManagementEndpoint` which is required to configure the Python Plugin below. + +## Configuring the Python Plugin + +To successfully use the Python Plugin in Semantic Kernel, you must install the Poetry `azure` extras by running `poetry install -E azure`. + +Next, in the .env file, add the `poolManagementEndpoint` value from above to the variable `ACA_POOL_MANAGEMENT_ENDPOINT`. The `poolManagementEndpoint` should look something like: + +```html +https://eastus.acasessions.io/subscriptions/{{subscriptionId}}/resourceGroups/{{resourceGroup}}/sessionPools/{{sessionPool}}/python/execute +``` + +It is possible to add the code interpreter plugin as follows: + +```python +kernel = Kernel() + +service_id = "azure_oai" +chat_service = AzureChatCompletion( + service_id=service_id, **azure_openai_settings_from_dot_env_as_dict(include_api_version=True) +) +kernel.add_service(chat_service) + +python_code_interpreter = SessionsPythonTool( + **azure_container_apps_settings_from_dot_env_as_dict(), auth_callback=auth_callback +) + +sessions_tool = kernel.add_plugin(python_code_interpreter, "PythonCodeInterpreter") + +code = "import json\n\ndef add_numbers(a, b):\n return a + b\n\nargs = '{\"a\": 1, \"b\": 1}'\nargs_dict = json.loads(args)\nprint(add_numbers(args_dict['a'], args_dict['b']))" +result = await kernel.invoke(sessions_tool["execute_code"], code=code) + +print(result) +``` + +Instead of hard-coding a well-formatted Python code string, you may use automatic function calling inside of SK and allow the model to form the Python and call the plugin. + +The authentication callback must return a valid token for the session pool. One possible way of doing this with a `DefaultAzureCredential` is as follows: + +```python +async def auth_callback() -> str: + """Auth callback for the SessionsPythonTool. + This is a sample auth callback that shows how to use Azure's DefaultAzureCredential + to get an access token. + """ + global auth_token + current_utc_timestamp = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + + if not auth_token or auth_token.expires_on < current_utc_timestamp: + credential = DefaultAzureCredential() + + try: + auth_token = credential.get_token(ACA_TOKEN_ENDPOINT) + except ClientAuthenticationError as cae: + err_messages = getattr(cae, "messages", []) + raise FunctionExecutionException( + f"Failed to retrieve the client auth token with messages: {' '.join(err_messages)}" + ) from cae + + return auth_token.token +``` + +Currently, there are two concept examples that show this plugin in more detail: + +- [Plugin example](../../../samples/concepts/plugins/azure_python_code_interpreter.py): shows the basic usage of calling the code execute function on the plugin. +- [Function Calling example](../../../samples/concepts/auto_function_calling/azure_python_code_interpreter_function_calling.py): shows a simple chat application that leverages the Python code interpreter plugin for function calling. diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/__init__.py b/python/semantic_kernel/core_plugins/sessions_python_tool/__init__.py new file mode 100644 index 000000000000..3acd831b3481 --- /dev/null +++ b/python/semantic_kernel/core_plugins/sessions_python_tool/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) Microsoft. All rights reserved. + +from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import ( + SessionsPythonTool, +) +from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_settings import ( + SessionsPythonSettings, +) + +__all__ = ["SessionsPythonTool", "SessionsPythonSettings"] diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_plugin.py b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_plugin.py new file mode 100644 index 000000000000..38c62178ac7c --- /dev/null +++ b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_plugin.py @@ -0,0 +1,244 @@ +# Copyright (c) Microsoft. All rights reserved. + +from __future__ import annotations + +import logging +import os +import re +from io import BufferedReader, BytesIO +from typing import Annotated, Any, Awaitable, Callable + +import httpx +from pydantic import field_validator + +from semantic_kernel.connectors.ai.open_ai.const import USER_AGENT +from semantic_kernel.connectors.telemetry import HTTP_USER_AGENT, version_info +from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_settings import ( + SessionsPythonSettings, +) +from semantic_kernel.core_plugins.sessions_python_tool.sessions_remote_file_metadata import SessionsRemoteFileMetadata +from semantic_kernel.exceptions.function_exceptions import FunctionExecutionException +from semantic_kernel.functions.kernel_function_decorator import kernel_function +from semantic_kernel.kernel_pydantic import KernelBaseModel + +logger = logging.getLogger(__name__) + + +SESSIONS_USER_AGENT = f"{HTTP_USER_AGENT}/{version_info} (Language=Python)" + + +class SessionsPythonTool(KernelBaseModel): + """A plugin for running Python code in an Azure Container Apps dynamic sessions code interpreter.""" + + pool_management_endpoint: str + settings: SessionsPythonSettings | None = None + auth_callback: Callable[..., Awaitable[Any]] + http_client: httpx.AsyncClient | None = None + + def __init__( + self, + pool_management_endpoint: str, + auth_callback: Callable[..., Awaitable[Any]], + settings: SessionsPythonSettings | None = None, + http_client: httpx.AsyncClient | None = None, + **kwargs, + ): + """Initializes a new instance of the SessionsPythonTool class.""" + if not settings: + settings = SessionsPythonSettings() + + if not http_client: + http_client = httpx.AsyncClient() + + super().__init__( + pool_management_endpoint=pool_management_endpoint, + auth_callback=auth_callback, + settings=settings, + http_client=http_client, + **kwargs, + ) + + @field_validator("pool_management_endpoint", mode="before") + @classmethod + def _validate_endpoint(cls, endpoint: str): + """Validates the pool management endpoint.""" + if "/python/execute" in endpoint: + # Remove '/python/execute/' and ensure the endpoint ends with a '/' + endpoint = endpoint.replace("/python/execute", "").rstrip("/") + "/" + if not endpoint.endswith("/"): + # Ensure the endpoint ends with a '/' + endpoint = endpoint + "/" + return endpoint + + async def _ensure_auth_token(self) -> str: + """Ensure the auth token is valid.""" + + try: + auth_token = await self.auth_callback() + except Exception as e: + logger.error(f"Failed to retrieve the client auth token with message: {str(e)}") + raise FunctionExecutionException(f"Failed to retrieve the client auth token with messages: {str(e)}") from e + + return auth_token + + def _sanitize_input(self, code: str) -> str: + """Sanitize input to the python REPL. + Remove whitespace, backtick & python (if llm mistakes python console as terminal) + Args: + query: The query to sanitize + Returns: + str: The sanitized query + """ + + # Removes `, whitespace & python from start + code = re.sub(r"^(\s|`)*(?i:python)?\s*", "", code) + # Removes whitespace & ` from end + code = re.sub(r"(\s|`)*$", "", code) + return code + + @kernel_function( + description="""Executes the provided Python code. + Start and end the code snippet with double quotes to define it as a string. + Insert \\n within the string wherever a new line should appear. + Add spaces directly after \\n sequences to replicate indentation. + Use \" to include double quotes within the code without ending the string. + Keep everything in a single line; the \\n sequences will represent line breaks + when the string is processed or displayed. + """, + name="execute_code", + ) + async def execute_code(self, code: Annotated[str, "The valid Python code to execute"]) -> str: + """ + Executes the provided Python code + Args: + code (str): The valid Python code to execute + Returns: + str: The result of the Python code execution in the form of Result, Stdout, and Stderr + Raises: + FunctionExecutionException: If the provided code is empty + """ + + if not code: + raise FunctionExecutionException("The provided code is empty") + + if self.settings.sanitize_input: + code = self._sanitize_input(code) + + auth_token = await self._ensure_auth_token() + + logger.info(f"Executing Python code: {code}") + + self.http_client.headers.update( + { + "Authorization": f"Bearer {auth_token}", + "Content-Type": "application/json", + USER_AGENT: SESSIONS_USER_AGENT, + } + ) + + self.settings.python_code = code + + request_body = { + "properties": self.settings.model_dump(exclude_none=True, exclude={"sanitize_input"}, by_alias=True), + } + + response = await self.http_client.post( + url=f"{self.pool_management_endpoint}python/execute/", + json=request_body, + ) + response.raise_for_status() + + result = response.json() + return f"Result:\n{result['result']}Stdout:\n{result['stdout']}Stderr:\n{result['stderr']}" # noqa: E501 + + @kernel_function(name="upload_file", description="Uploads a file for the current Session ID") + async def upload_file( + self, *, data: BufferedReader = None, remote_file_path: str = None, local_file_path: str = None + ) -> SessionsRemoteFileMetadata: + """Upload a file to the session pool. + Args: + data (BufferedReader): The file data to upload. + remote_file_path (str): The path to the file in the session. + local_file_path (str): The path to the file on the local machine. + Returns: + RemoteFileMetadata: The metadata of the uploaded file. + """ + + if data and local_file_path: + raise ValueError("data and local_file_path cannot be provided together") + + if local_file_path: + if not remote_file_path: + remote_file_path = os.path.basename(local_file_path) + data = open(local_file_path, "rb") + + auth_token = await self._ensure_auth_token() + self.http_client.headers.update( + { + "Authorization": f"Bearer {auth_token}", + USER_AGENT: SESSIONS_USER_AGENT, + } + ) + files = [("file", (remote_file_path, data, "application/octet-stream"))] + + response = await self.http_client.post( + url=f"{self.pool_management_endpoint}python/uploadFile?identifier={self.settings.session_id}", + json={}, + files=files, + ) + + response.raise_for_status() + + response_json = response.json() + return SessionsRemoteFileMetadata.from_dict(response_json) + + @kernel_function(name="list_files", description="Lists all files in the provided Session ID") + async def list_files(self) -> list[SessionsRemoteFileMetadata]: + """List the files in the session pool. + Returns: + list[SessionsRemoteFileMetadata]: The metadata for the files in the session pool + """ + auth_token = await self._ensure_auth_token() + self.http_client.headers.update( + { + "Authorization": f"Bearer {auth_token}", + USER_AGENT: SESSIONS_USER_AGENT, + } + ) + + response = await self.http_client.get( + url=f"{self.pool_management_endpoint}python/files?identifier={self.settings.session_id}", + ) + response.raise_for_status() + + response_json = response.json() + return [SessionsRemoteFileMetadata.from_dict(entry) for entry in response_json["$values"]] + + async def download_file(self, *, remote_file_path: str, local_file_path: str = None) -> BufferedReader | None: + """Download a file from the session pool. + Args: + remote_file_path: The path to download the file from, relative to `/mnt/data`. + local_file_path: The path to save the downloaded file to. If not provided, the + file is returned as a BufferedReader. + Returns: + BufferedReader: The data of the downloaded file. + """ + auth_token = await self.auth_callback() + self.http_client.headers.update( + { + "Authorization": f"Bearer {auth_token}", + USER_AGENT: SESSIONS_USER_AGENT, + } + ) + + response = await self.http_client.get( + url=f"{self.pool_management_endpoint}python/downloadFile?identifier={self.settings.session_id}&filename={remote_file_path}", # noqa: E501 + ) + response.raise_for_status() + + if local_file_path: + with open(local_file_path, "wb") as f: + f.write(response.content) + return None + + return BytesIO(response.content) diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_settings.py b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_settings.py new file mode 100644 index 000000000000..4ea3457ed57f --- /dev/null +++ b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_settings.py @@ -0,0 +1,34 @@ +# Copyright (c) Microsoft. All rights reserved. + +from __future__ import annotations + +import uuid +from enum import Enum + +from pydantic import Field + +from semantic_kernel.kernel_pydantic import KernelBaseModel + + +class CodeInputType(str, Enum): + """Code input type.""" + + Inline = "inline" + + +class CodeExecutionType(str, Enum): + """Code execution type.""" + + Synchronous = "synchronous" + # Asynchronous = "asynchronous" TODO: Enable when available + + +class SessionsPythonSettings(KernelBaseModel): + """The Sessions Python code interpreter settings.""" + + session_id: str | None = Field(default_factory=lambda: str(uuid.uuid4()), alias="identifier") + code_input_type: CodeInputType | None = Field(default=CodeInputType.Inline, alias="codeInputType") + execution_type: CodeExecutionType | None = Field(default=CodeExecutionType.Synchronous, alias="executionType") + python_code: str | None = Field(alias="pythonCode", default=None) + timeout_in_sec: int | None = Field(default=100, alias="timeoutInSeconds") + sanitize_input: bool | None = Field(default=True, alias="sanitizeInput") diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_remote_file_metadata.py b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_remote_file_metadata.py new file mode 100644 index 000000000000..2d22c67b31cb --- /dev/null +++ b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_remote_file_metadata.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft. All rights reserved. + +from semantic_kernel.kernel_pydantic import KernelBaseModel + + +class SessionsRemoteFileMetadata(KernelBaseModel): + """Metadata for a file in the session.""" + + filename: str + """The filename relative to `/mnt/data`.""" + + size_in_bytes: int + """The size of the file in bytes.""" + + @property + def full_path(self) -> str: + """Get the full path of the file.""" + return f"/mnt/data/{self.filename}" + + @staticmethod + def from_dict(data: dict) -> "SessionsRemoteFileMetadata": + """Create a RemoteFileMetadata object from a dictionary.""" + return SessionsRemoteFileMetadata(filename=data["filename"], size_in_bytes=data["bytes"]) diff --git a/python/semantic_kernel/utils/settings.py b/python/semantic_kernel/utils/settings.py index 1c7a56473a9e..63f3c0d933a0 100644 --- a/python/semantic_kernel/utils/settings.py +++ b/python/semantic_kernel/utils/settings.py @@ -351,3 +351,27 @@ def booking_sample_settings_from_dot_env_as_dict() -> dict[str, str]: """ client_id, tenant_id, client_secret = booking_sample_settings_from_dot_env() return {"client_id": client_id, "tenant_id": tenant_id, "client_secret": client_secret} + + +def azure_container_apps_settings_from_dot_env() -> str: + """ + Reads the Azure Container Apps environment variables from the .env file. + Returns: + str: Azure Container Apps pool management connection string + """ + config = dotenv_values(".env") + connection_string = config.get("ACA_POOL_MANAGEMENT_ENDPOINT", None) + + assert connection_string is not None, "Azure Container Apps connection string not found in .env file" + + return connection_string + + +def azure_container_apps_settings_from_dot_env_as_dict() -> dict[str, str]: + """ + Reads the Azure Container Apps environment variables from the .env file. + Returns: + Dict[str, str]: Azure Container Apps environment variables + """ + pool_management_endpoint = azure_container_apps_settings_from_dot_env() + return {"pool_management_endpoint": pool_management_endpoint} diff --git a/python/tests/unit/core_plugins/test_sessions_python_plugin.py b/python/tests/unit/core_plugins/test_sessions_python_plugin.py new file mode 100644 index 000000000000..2c2daf0c9ec2 --- /dev/null +++ b/python/tests/unit/core_plugins/test_sessions_python_plugin.py @@ -0,0 +1,283 @@ +# Copyright (c) Microsoft. All rights reserved. + +from io import BufferedReader, BytesIO +from unittest.mock import mock_open, patch + +import httpx +import pytest + +from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import ( + SessionsPythonTool, +) +from semantic_kernel.kernel import Kernel + + +def test_auth_callback(): + return "sample_token" + + +def test_it_can_be_instantiated(): + plugin = SessionsPythonTool(pool_management_endpoint="https://example.com", auth_callback=test_auth_callback) + assert plugin is not None + + +def test_validate_endpoint(): + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/execute/", auth_callback=test_auth_callback + ) + assert plugin is not None + assert plugin.pool_management_endpoint == "https://example.com/" + + +def test_it_can_be_imported(kernel: Kernel): + plugin = SessionsPythonTool(pool_management_endpoint="https://example.com", auth_callback=test_auth_callback) + assert kernel.add_plugin(plugin=plugin, plugin_name="PythonCodeInterpreter") + assert kernel.plugins["PythonCodeInterpreter"] is not None + assert kernel.plugins["PythonCodeInterpreter"].name == "PythonCodeInterpreter" + + +@pytest.mark.asyncio +@patch("httpx.AsyncClient.post") +async def test_call_to_container_succeeds(mock_post): + async def async_return(result): + return result + + with patch( + "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token", + return_value="test_token", + ): + mock_request = httpx.Request(method="POST", url="https://example.com/python/execute/") + + mock_response = httpx.Response( + status_code=200, json={"result": "success", "stdout": "", "stderr": ""}, request=mock_request + ) + + mock_post.return_value = await async_return(mock_response) + + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/execute/", auth_callback=test_auth_callback + ) + result = await plugin.execute_code("print('hello world')") + + assert result is not None + mock_post.assert_awaited_once() + + +@pytest.mark.asyncio +@patch("httpx.AsyncClient.post") +async def test_call_to_container_fails_raises_exception(mock_post): + async def async_return(result): + return result + + with patch( + "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token", + return_value="test_token", + ): + mock_request = httpx.Request(method="POST", url="https://example.com/python/execute/") + + mock_response = httpx.Response(status_code=500, request=mock_request) + + mock_post.return_value = await async_return(mock_response) + + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/execute/", auth_callback=test_auth_callback + ) + + with pytest.raises(Exception): + _ = await plugin.execute_code("print('hello world')") + + +@pytest.mark.asyncio +@patch("httpx.AsyncClient.post") +async def test_upload_file_with_local_path(mock_post): + """Test upload_file when providing a local file path.""" + + async def async_return(result): + return result + + with patch( + "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token", + return_value="test_token", + ), patch("builtins.open", mock_open(read_data=b"file data")): + mock_request = httpx.Request(method="POST", url="https://example.com/python/uploadFile?identifier=None") + + mock_response = httpx.Response( + status_code=200, json={"filename": "test.txt", "bytes": 123}, request=mock_request + ) + mock_post.return_value = await async_return(mock_response) + + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/", auth_callback=lambda: "sample_token" + ) + + result = await plugin.upload_file(local_file_path="test.txt", remote_file_path="uploaded_test.txt") + assert result.filename == "test.txt" + assert result.size_in_bytes == 123 + mock_post.assert_awaited_once() + + +@pytest.mark.asyncio +@patch("httpx.AsyncClient.post") +async def test_upload_file_with_buffer(mock_post): + """Test upload_file when providing file data as a BufferedReader.""" + + async def async_return(result): + return result + + with patch( + "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token", + return_value="test_token", + ): + mock_request = httpx.Request(method="POST", url="https://example.com/python/uploadFile?identifier=None") + + mock_response = httpx.Response( + status_code=200, json={"filename": "buffer_file.txt", "bytes": 456}, request=mock_request + ) + mock_post.return_value = await async_return(mock_response) + + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/", auth_callback=lambda: "sample_token" + ) + + data_buffer = BufferedReader(BytesIO(b"file data")) + + result = await plugin.upload_file(data=data_buffer, remote_file_path="buffer_file.txt") + assert result.filename == "buffer_file.txt" + assert result.size_in_bytes == 456 + mock_post.assert_awaited_once() + + +@pytest.mark.asyncio +@patch("httpx.AsyncClient.get") +async def test_list_files(mock_get): + """Test list_files function.""" + + async def async_return(result): + return result + + with patch( + "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token", + return_value="test_token", + ): + + mock_request = httpx.Request(method="GET", url="https://example.com/python/files?identifier=None") + + mock_response = httpx.Response( + status_code=200, + json={ + "$values": [ + {"filename": "test1.txt", "bytes": 123}, + {"filename": "test2.txt", "bytes": 456}, + ] + }, + request=mock_request, + ) + mock_get.return_value = await async_return(mock_response) + + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/", auth_callback=lambda: "sample_token" + ) + + files = await plugin.list_files() + assert len(files) == 2 + assert files[0].filename == "test1.txt" + assert files[0].size_in_bytes == 123 + assert files[1].filename == "test2.txt" + assert files[1].size_in_bytes == 456 + mock_get.assert_awaited_once() + + +@pytest.mark.asyncio +@patch("httpx.AsyncClient.get") +async def test_download_file_to_local(mock_get): + """Test download_file when saving to a local file path.""" + + async def async_return(result): + return result + + async def mock_auth_callback(): + return "test_token" + + with patch( + "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token", + return_value="test_token", + ), patch("builtins.open", mock_open()) as mock_file: + mock_request = httpx.Request( + method="GET", url="https://example.com/python/downloadFile?identifier=None&filename=remote_test.txt" + ) + + mock_response = httpx.Response(status_code=200, content=b"file data", request=mock_request) + mock_get.return_value = await async_return(mock_response) + + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/", auth_callback=mock_auth_callback + ) + + await plugin.download_file(remote_file_path="remote_test.txt", local_file_path="local_test.txt") + mock_get.assert_awaited_once() + mock_file.assert_called_once_with("local_test.txt", "wb") + mock_file().write.assert_called_once_with(b"file data") + + +@pytest.mark.asyncio +@patch("httpx.AsyncClient.get") +async def test_download_file_to_buffer(mock_get): + """Test download_file when returning as a BufferedReader.""" + + async def async_return(result): + return result + + async def mock_auth_callback(): + return "test_token" + + with patch( + "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token", + return_value="test_token", + ): + mock_request = httpx.Request( + method="GET", url="https://example.com/python/downloadFile?identifier=None&filename=remote_test.txt" + ) + + mock_response = httpx.Response(status_code=200, content=b"file data", request=mock_request) + mock_get.return_value = await async_return(mock_response) + + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/", auth_callback=mock_auth_callback + ) + + buffer = await plugin.download_file(remote_file_path="remote_test.txt") + assert buffer is not None + assert buffer.read() == b"file data" + mock_get.assert_awaited_once() + + +@pytest.mark.parametrize( + "input_code, expected_output", + [ + # Basic whitespace removal + (" print('hello') ", "print('hello')"), + (" \n `print('hello')` ", "print('hello')"), + ("` print('hello')`", "print('hello')"), + # Removal of 'python' keyword + (" python print('hello') ", "print('hello')"), + (" Python print('hello') ", "print('hello')"), + ("` python print('hello')` ", "print('hello')"), + ("`Python print('hello')`", "print('hello')"), + # Mixed usage + (" ` python print('hello')` ", "print('hello')"), + (" `python print('hello') `", "print('hello')"), + # Code without any issues + ("print('hello')", "print('hello')"), + # Empty code + ("", ""), + ("` `", ""), + (" ", ""), + ], +) +def test_sanitize_input(input_code, expected_output): + """Test the `_sanitize_input` function with various inputs.""" + plugin = SessionsPythonTool( + pool_management_endpoint="https://example.com/python/", auth_callback=lambda: "sample_token" + ) + sanitized_code = plugin._sanitize_input(input_code) + assert sanitized_code == expected_output