diff --git a/python/.env.example b/python/.env.example
index 3158a3832433..b7154cdb706f 100644
--- a/python/.env.example
+++ b/python/.env.example
@@ -45,4 +45,5 @@ AZCOSMOS_CONTAINER_NAME = ""
ASTRADB_APP_TOKEN=""
ASTRADB_ID=""
ASTRADB_REGION=""
-ASTRADB_KEYSPACE=""
\ No newline at end of file
+ASTRADB_KEYSPACE=""
+ACA_POOL_MANAGEMENT_ENDPOINT=""
\ No newline at end of file
diff --git a/python/samples/concepts/auto_function_calling/azure_python_code_interpreter_function_calling.py b/python/samples/concepts/auto_function_calling/azure_python_code_interpreter_function_calling.py
new file mode 100644
index 000000000000..8280faeea204
--- /dev/null
+++ b/python/samples/concepts/auto_function_calling/azure_python_code_interpreter_function_calling.py
@@ -0,0 +1,125 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+import asyncio
+import datetime
+
+from azure.core.credentials import AccessToken
+from azure.core.exceptions import ClientAuthenticationError
+from azure.identity import DefaultAzureCredential
+
+from semantic_kernel.connectors.ai.function_call_behavior import FunctionCallBehavior
+from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import (
+ AzureChatPromptExecutionSettings,
+)
+from semantic_kernel.connectors.ai.open_ai.services.azure_chat_completion import AzureChatCompletion
+from semantic_kernel.contents.chat_history import ChatHistory
+from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import (
+ SessionsPythonTool,
+)
+from semantic_kernel.core_plugins.time_plugin import TimePlugin
+from semantic_kernel.exceptions.function_exceptions import FunctionExecutionException
+from semantic_kernel.functions.kernel_arguments import KernelArguments
+from semantic_kernel.kernel import Kernel
+from semantic_kernel.utils.settings import (
+ azure_container_apps_settings_from_dot_env_as_dict,
+ azure_openai_settings_from_dot_env_as_dict,
+)
+
+auth_token: AccessToken | None = None
+
+ACA_TOKEN_ENDPOINT = "https://acasessions.io/.default"
+
+
+async def auth_callback() -> str:
+ """Auth callback for the SessionsPythonTool.
+ This is a sample auth callback that shows how to use Azure's DefaultAzureCredential
+ to get an access token.
+ """
+ global auth_token
+ current_utc_timestamp = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
+
+ if not auth_token or auth_token.expires_on < current_utc_timestamp:
+ credential = DefaultAzureCredential()
+
+ try:
+ auth_token = credential.get_token(ACA_TOKEN_ENDPOINT)
+ except ClientAuthenticationError as cae:
+ err_messages = getattr(cae, "messages", [])
+ raise FunctionExecutionException(
+ f"Failed to retrieve the client auth token with messages: {' '.join(err_messages)}"
+ ) from cae
+
+ return auth_token.token
+
+
+kernel = Kernel()
+
+service_id = "sessions-tool"
+chat_service = AzureChatCompletion(
+ service_id=service_id, **azure_openai_settings_from_dot_env_as_dict(include_api_version=True)
+)
+kernel.add_service(chat_service)
+
+sessions_tool = SessionsPythonTool(
+ **azure_container_apps_settings_from_dot_env_as_dict(),
+ auth_callback=auth_callback,
+)
+
+kernel.add_plugin(sessions_tool, "SessionsTool")
+kernel.add_plugin(TimePlugin(), "Time")
+
+chat_function = kernel.add_function(
+ prompt="{{$chat_history}}{{$user_input}}",
+ plugin_name="ChatBot",
+ function_name="Chat",
+)
+
+req_settings = AzureChatPromptExecutionSettings(service_id=service_id, tool_choice="auto")
+
+filter = {"excluded_plugins": ["ChatBot"]}
+req_settings.function_call_behavior = FunctionCallBehavior.EnableFunctions(auto_invoke=True, filters=filter)
+
+arguments = KernelArguments(settings=req_settings)
+
+history = ChatHistory()
+
+
+async def chat() -> bool:
+ try:
+ user_input = input("User:> ")
+ except KeyboardInterrupt:
+ print("\n\nExiting chat...")
+ return False
+ except EOFError:
+ print("\n\nExiting chat...")
+ return False
+
+ if user_input == "exit":
+ print("\n\nExiting chat...")
+ return False
+
+ arguments["chat_history"] = history
+ arguments["user_input"] = user_input
+ answer = await kernel.invoke(
+ function=chat_function,
+ arguments=arguments,
+ )
+ print(f"Mosscap:> {answer}")
+ history.add_user_message(user_input)
+ history.add_assistant_message(str(answer))
+ return True
+
+
+async def main() -> None:
+ print(
+ "Welcome to the chat bot!\
+ \n Type 'exit' to exit.\
+ \n Try a Python code execution question to see the function calling in action (i.e. what is 1+1?)."
+ )
+ chatting = True
+ while chatting:
+ chatting = await chat()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/python/samples/concepts/plugins/azure_python_code_interpreter.py b/python/samples/concepts/plugins/azure_python_code_interpreter.py
new file mode 100644
index 000000000000..6c773afe939d
--- /dev/null
+++ b/python/samples/concepts/plugins/azure_python_code_interpreter.py
@@ -0,0 +1,70 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+import asyncio
+import datetime
+
+from azure.core.credentials import AccessToken
+from azure.core.exceptions import ClientAuthenticationError
+from azure.identity import DefaultAzureCredential
+
+from semantic_kernel.connectors.ai.open_ai.services.azure_chat_completion import AzureChatCompletion
+from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import (
+ SessionsPythonTool,
+)
+from semantic_kernel.exceptions.function_exceptions import FunctionExecutionException
+from semantic_kernel.kernel import Kernel
+from semantic_kernel.utils.settings import (
+ azure_container_apps_settings_from_dot_env_as_dict,
+ azure_openai_settings_from_dot_env_as_dict,
+)
+
+auth_token: AccessToken | None = None
+
+ACA_TOKEN_ENDPOINT = "https://acasessions.io/.default"
+
+
+async def auth_callback() -> str:
+ """Auth callback for the SessionsPythonTool.
+ This is a sample auth callback that shows how to use Azure's DefaultAzureCredential
+ to get an access token.
+ """
+ global auth_token
+ current_utc_timestamp = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
+
+ if not auth_token or auth_token.expires_on < current_utc_timestamp:
+ credential = DefaultAzureCredential()
+
+ try:
+ auth_token = credential.get_token(ACA_TOKEN_ENDPOINT)
+ except ClientAuthenticationError as cae:
+ err_messages = getattr(cae, "messages", [])
+ raise FunctionExecutionException(
+ f"Failed to retrieve the client auth token with messages: {' '.join(err_messages)}"
+ ) from cae
+
+ return auth_token.token
+
+
+async def main():
+ kernel = Kernel()
+
+ service_id = "python-code-interpreter"
+ chat_service = AzureChatCompletion(
+ service_id=service_id, **azure_openai_settings_from_dot_env_as_dict(include_api_version=True)
+ )
+ kernel.add_service(chat_service)
+
+ python_code_interpreter = SessionsPythonTool(
+ **azure_container_apps_settings_from_dot_env_as_dict(), auth_callback=auth_callback
+ )
+
+ sessions_tool = kernel.add_plugin(python_code_interpreter, "PythonCodeInterpreter")
+
+ code = "import json\n\ndef add_numbers(a, b):\n return a + b\n\nargs = '{\"a\": 1, \"b\": 1}'\nargs_dict = json.loads(args)\nprint(add_numbers(args_dict['a'], args_dict['b']))" # noqa: E501
+ result = await kernel.invoke(sessions_tool["execute_code"], code=code)
+
+ print(result)
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/python/semantic_kernel/core_plugins/__init__.py b/python/semantic_kernel/core_plugins/__init__.py
index 568c6b993769..0f6aed98a679 100644
--- a/python/semantic_kernel/core_plugins/__init__.py
+++ b/python/semantic_kernel/core_plugins/__init__.py
@@ -5,6 +5,9 @@
)
from semantic_kernel.core_plugins.http_plugin import HttpPlugin
from semantic_kernel.core_plugins.math_plugin import MathPlugin
+from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import (
+ SessionsPythonTool,
+)
from semantic_kernel.core_plugins.text_memory_plugin import TextMemoryPlugin
from semantic_kernel.core_plugins.text_plugin import TextPlugin
from semantic_kernel.core_plugins.time_plugin import TimePlugin
@@ -17,5 +20,6 @@
"HttpPlugin",
"ConversationSummaryPlugin",
"MathPlugin",
+ "SessionsPythonTool",
"WebSearchEnginePlugin",
]
diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/README.md b/python/semantic_kernel/core_plugins/sessions_python_tool/README.md
new file mode 100644
index 000000000000..9ac97aafa8b9
--- /dev/null
+++ b/python/semantic_kernel/core_plugins/sessions_python_tool/README.md
@@ -0,0 +1,132 @@
+# Getting Started with the Sessions Python Plugin
+
+## Authentication to ARM (management.azure.com)
+
+For any call to ARM (management.azure.com), use the access token retrieved from the below call:
+
+```az account get-access-token --resource ```
+
+## Generate a Session Pool
+
+a. Call the following API to generate a Session Pool:
+
+```PUT ```
+
+Body properties:
+
+- location: Azure Region
+- properties:
+ - poolManagementType:
+ - Today there are two Pool Management Types supported:
+ - "Manual"
+ - In this model, the user will call generateSessions API which supports batch mode (to generate 100s of sessions in one API call, and then user is free to update/specialize the session as needed or execute code in the session)
+ - "Dynamic"
+ - In this mode, the pool management is handled by the platform. Currently, the dynamic mode is only implemented for Python code execution scenario, which has its own APIs to execute code.
+ - maxConcurrentSessions:
+ - Maximum number of active sessions allowed
+ - name:
+ - Name of the sessions pool
+ - dynamicPoolConfiguration: Specifies the type of sessions generated by the platform
+ - poolType: Type of images used for the pool
+ - Valid values ["JupyterPython", "FunctionsPython"]
+ - executionType:
+ - Valid values ["Timed"]
+ - coolDownPeriodSeconds:
+ - Integer representing the maximum time allowed before the platform scales down the container
+ - sessionPoolSecrets: Secrets associated with the Session Pool
+ - name: Name of the secret
+ - value: Secret Value
+
+Example Generation of Session Pool:
+
+```json
+{
+ "location": "koreacentral",
+ "properties": {
+ "poolManagementType": "Dynamic",
+ "maxConcurrentSessions": 10,
+ "name": "{{SessionPoolName}}",
+ "dynamicPoolConfiguration": {
+ "poolType": "JupyterPython",
+ "executionType": "Timed",
+ "coolDownPeriodInSecond": 310
+ }
+ }
+}
+```
+
+Curl Example:
+
+```curl
+curl -X PUT "https://management.azure.com/subscriptions/{{SubscriptionId}}/resourceGroups/{{ResourceGroup}}/providers/Microsoft.App/sessionPools/{{SessionPoolName}}?api-version=2023-08-01-preview" \
+ -H "Content-Type: application/json" \
+ -H "Authorization: Bearer $token" \
+ -d '{"location": "koreacentral","properties": { "poolManagementType": "Dynamic", "maxConcurrentSessions": 10, "name": "{{SessionPoolName}}", "dynamicPoolConfiguration": { "poolType": "JupyterPython", "executionType": "Timed", "coolDownPeriodInSecond": 310 } } }'
+```
+
+If all goes well, you should receive a 200 Status Code. The response will contain a `poolManagementEndpoint` which is required to configure the Python Plugin below.
+
+## Configuring the Python Plugin
+
+To successfully use the Python Plugin in Semantic Kernel, you must install the Poetry `azure` extras by running `poetry install -E azure`.
+
+Next, in the .env file, add the `poolManagementEndpoint` value from above to the variable `ACA_POOL_MANAGEMENT_ENDPOINT`. The `poolManagementEndpoint` should look something like:
+
+```html
+https://eastus.acasessions.io/subscriptions/{{subscriptionId}}/resourceGroups/{{resourceGroup}}/sessionPools/{{sessionPool}}/python/execute
+```
+
+It is possible to add the code interpreter plugin as follows:
+
+```python
+kernel = Kernel()
+
+service_id = "azure_oai"
+chat_service = AzureChatCompletion(
+ service_id=service_id, **azure_openai_settings_from_dot_env_as_dict(include_api_version=True)
+)
+kernel.add_service(chat_service)
+
+python_code_interpreter = SessionsPythonTool(
+ **azure_container_apps_settings_from_dot_env_as_dict(), auth_callback=auth_callback
+)
+
+sessions_tool = kernel.add_plugin(python_code_interpreter, "PythonCodeInterpreter")
+
+code = "import json\n\ndef add_numbers(a, b):\n return a + b\n\nargs = '{\"a\": 1, \"b\": 1}'\nargs_dict = json.loads(args)\nprint(add_numbers(args_dict['a'], args_dict['b']))"
+result = await kernel.invoke(sessions_tool["execute_code"], code=code)
+
+print(result)
+```
+
+Instead of hard-coding a well-formatted Python code string, you may use automatic function calling inside of SK and allow the model to form the Python and call the plugin.
+
+The authentication callback must return a valid token for the session pool. One possible way of doing this with a `DefaultAzureCredential` is as follows:
+
+```python
+async def auth_callback() -> str:
+ """Auth callback for the SessionsPythonTool.
+ This is a sample auth callback that shows how to use Azure's DefaultAzureCredential
+ to get an access token.
+ """
+ global auth_token
+ current_utc_timestamp = int(datetime.datetime.now(datetime.timezone.utc).timestamp())
+
+ if not auth_token or auth_token.expires_on < current_utc_timestamp:
+ credential = DefaultAzureCredential()
+
+ try:
+ auth_token = credential.get_token(ACA_TOKEN_ENDPOINT)
+ except ClientAuthenticationError as cae:
+ err_messages = getattr(cae, "messages", [])
+ raise FunctionExecutionException(
+ f"Failed to retrieve the client auth token with messages: {' '.join(err_messages)}"
+ ) from cae
+
+ return auth_token.token
+```
+
+Currently, there are two concept examples that show this plugin in more detail:
+
+- [Plugin example](../../../samples/concepts/plugins/azure_python_code_interpreter.py): shows the basic usage of calling the code execute function on the plugin.
+- [Function Calling example](../../../samples/concepts/auto_function_calling/azure_python_code_interpreter_function_calling.py): shows a simple chat application that leverages the Python code interpreter plugin for function calling.
diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/__init__.py b/python/semantic_kernel/core_plugins/sessions_python_tool/__init__.py
new file mode 100644
index 000000000000..3acd831b3481
--- /dev/null
+++ b/python/semantic_kernel/core_plugins/sessions_python_tool/__init__.py
@@ -0,0 +1,10 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import (
+ SessionsPythonTool,
+)
+from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_settings import (
+ SessionsPythonSettings,
+)
+
+__all__ = ["SessionsPythonTool", "SessionsPythonSettings"]
diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_plugin.py b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_plugin.py
new file mode 100644
index 000000000000..38c62178ac7c
--- /dev/null
+++ b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_plugin.py
@@ -0,0 +1,244 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+from __future__ import annotations
+
+import logging
+import os
+import re
+from io import BufferedReader, BytesIO
+from typing import Annotated, Any, Awaitable, Callable
+
+import httpx
+from pydantic import field_validator
+
+from semantic_kernel.connectors.ai.open_ai.const import USER_AGENT
+from semantic_kernel.connectors.telemetry import HTTP_USER_AGENT, version_info
+from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_settings import (
+ SessionsPythonSettings,
+)
+from semantic_kernel.core_plugins.sessions_python_tool.sessions_remote_file_metadata import SessionsRemoteFileMetadata
+from semantic_kernel.exceptions.function_exceptions import FunctionExecutionException
+from semantic_kernel.functions.kernel_function_decorator import kernel_function
+from semantic_kernel.kernel_pydantic import KernelBaseModel
+
+logger = logging.getLogger(__name__)
+
+
+SESSIONS_USER_AGENT = f"{HTTP_USER_AGENT}/{version_info} (Language=Python)"
+
+
+class SessionsPythonTool(KernelBaseModel):
+ """A plugin for running Python code in an Azure Container Apps dynamic sessions code interpreter."""
+
+ pool_management_endpoint: str
+ settings: SessionsPythonSettings | None = None
+ auth_callback: Callable[..., Awaitable[Any]]
+ http_client: httpx.AsyncClient | None = None
+
+ def __init__(
+ self,
+ pool_management_endpoint: str,
+ auth_callback: Callable[..., Awaitable[Any]],
+ settings: SessionsPythonSettings | None = None,
+ http_client: httpx.AsyncClient | None = None,
+ **kwargs,
+ ):
+ """Initializes a new instance of the SessionsPythonTool class."""
+ if not settings:
+ settings = SessionsPythonSettings()
+
+ if not http_client:
+ http_client = httpx.AsyncClient()
+
+ super().__init__(
+ pool_management_endpoint=pool_management_endpoint,
+ auth_callback=auth_callback,
+ settings=settings,
+ http_client=http_client,
+ **kwargs,
+ )
+
+ @field_validator("pool_management_endpoint", mode="before")
+ @classmethod
+ def _validate_endpoint(cls, endpoint: str):
+ """Validates the pool management endpoint."""
+ if "/python/execute" in endpoint:
+ # Remove '/python/execute/' and ensure the endpoint ends with a '/'
+ endpoint = endpoint.replace("/python/execute", "").rstrip("/") + "/"
+ if not endpoint.endswith("/"):
+ # Ensure the endpoint ends with a '/'
+ endpoint = endpoint + "/"
+ return endpoint
+
+ async def _ensure_auth_token(self) -> str:
+ """Ensure the auth token is valid."""
+
+ try:
+ auth_token = await self.auth_callback()
+ except Exception as e:
+ logger.error(f"Failed to retrieve the client auth token with message: {str(e)}")
+ raise FunctionExecutionException(f"Failed to retrieve the client auth token with messages: {str(e)}") from e
+
+ return auth_token
+
+ def _sanitize_input(self, code: str) -> str:
+ """Sanitize input to the python REPL.
+ Remove whitespace, backtick & python (if llm mistakes python console as terminal)
+ Args:
+ query: The query to sanitize
+ Returns:
+ str: The sanitized query
+ """
+
+ # Removes `, whitespace & python from start
+ code = re.sub(r"^(\s|`)*(?i:python)?\s*", "", code)
+ # Removes whitespace & ` from end
+ code = re.sub(r"(\s|`)*$", "", code)
+ return code
+
+ @kernel_function(
+ description="""Executes the provided Python code.
+ Start and end the code snippet with double quotes to define it as a string.
+ Insert \\n within the string wherever a new line should appear.
+ Add spaces directly after \\n sequences to replicate indentation.
+ Use \" to include double quotes within the code without ending the string.
+ Keep everything in a single line; the \\n sequences will represent line breaks
+ when the string is processed or displayed.
+ """,
+ name="execute_code",
+ )
+ async def execute_code(self, code: Annotated[str, "The valid Python code to execute"]) -> str:
+ """
+ Executes the provided Python code
+ Args:
+ code (str): The valid Python code to execute
+ Returns:
+ str: The result of the Python code execution in the form of Result, Stdout, and Stderr
+ Raises:
+ FunctionExecutionException: If the provided code is empty
+ """
+
+ if not code:
+ raise FunctionExecutionException("The provided code is empty")
+
+ if self.settings.sanitize_input:
+ code = self._sanitize_input(code)
+
+ auth_token = await self._ensure_auth_token()
+
+ logger.info(f"Executing Python code: {code}")
+
+ self.http_client.headers.update(
+ {
+ "Authorization": f"Bearer {auth_token}",
+ "Content-Type": "application/json",
+ USER_AGENT: SESSIONS_USER_AGENT,
+ }
+ )
+
+ self.settings.python_code = code
+
+ request_body = {
+ "properties": self.settings.model_dump(exclude_none=True, exclude={"sanitize_input"}, by_alias=True),
+ }
+
+ response = await self.http_client.post(
+ url=f"{self.pool_management_endpoint}python/execute/",
+ json=request_body,
+ )
+ response.raise_for_status()
+
+ result = response.json()
+ return f"Result:\n{result['result']}Stdout:\n{result['stdout']}Stderr:\n{result['stderr']}" # noqa: E501
+
+ @kernel_function(name="upload_file", description="Uploads a file for the current Session ID")
+ async def upload_file(
+ self, *, data: BufferedReader = None, remote_file_path: str = None, local_file_path: str = None
+ ) -> SessionsRemoteFileMetadata:
+ """Upload a file to the session pool.
+ Args:
+ data (BufferedReader): The file data to upload.
+ remote_file_path (str): The path to the file in the session.
+ local_file_path (str): The path to the file on the local machine.
+ Returns:
+ RemoteFileMetadata: The metadata of the uploaded file.
+ """
+
+ if data and local_file_path:
+ raise ValueError("data and local_file_path cannot be provided together")
+
+ if local_file_path:
+ if not remote_file_path:
+ remote_file_path = os.path.basename(local_file_path)
+ data = open(local_file_path, "rb")
+
+ auth_token = await self._ensure_auth_token()
+ self.http_client.headers.update(
+ {
+ "Authorization": f"Bearer {auth_token}",
+ USER_AGENT: SESSIONS_USER_AGENT,
+ }
+ )
+ files = [("file", (remote_file_path, data, "application/octet-stream"))]
+
+ response = await self.http_client.post(
+ url=f"{self.pool_management_endpoint}python/uploadFile?identifier={self.settings.session_id}",
+ json={},
+ files=files,
+ )
+
+ response.raise_for_status()
+
+ response_json = response.json()
+ return SessionsRemoteFileMetadata.from_dict(response_json)
+
+ @kernel_function(name="list_files", description="Lists all files in the provided Session ID")
+ async def list_files(self) -> list[SessionsRemoteFileMetadata]:
+ """List the files in the session pool.
+ Returns:
+ list[SessionsRemoteFileMetadata]: The metadata for the files in the session pool
+ """
+ auth_token = await self._ensure_auth_token()
+ self.http_client.headers.update(
+ {
+ "Authorization": f"Bearer {auth_token}",
+ USER_AGENT: SESSIONS_USER_AGENT,
+ }
+ )
+
+ response = await self.http_client.get(
+ url=f"{self.pool_management_endpoint}python/files?identifier={self.settings.session_id}",
+ )
+ response.raise_for_status()
+
+ response_json = response.json()
+ return [SessionsRemoteFileMetadata.from_dict(entry) for entry in response_json["$values"]]
+
+ async def download_file(self, *, remote_file_path: str, local_file_path: str = None) -> BufferedReader | None:
+ """Download a file from the session pool.
+ Args:
+ remote_file_path: The path to download the file from, relative to `/mnt/data`.
+ local_file_path: The path to save the downloaded file to. If not provided, the
+ file is returned as a BufferedReader.
+ Returns:
+ BufferedReader: The data of the downloaded file.
+ """
+ auth_token = await self.auth_callback()
+ self.http_client.headers.update(
+ {
+ "Authorization": f"Bearer {auth_token}",
+ USER_AGENT: SESSIONS_USER_AGENT,
+ }
+ )
+
+ response = await self.http_client.get(
+ url=f"{self.pool_management_endpoint}python/downloadFile?identifier={self.settings.session_id}&filename={remote_file_path}", # noqa: E501
+ )
+ response.raise_for_status()
+
+ if local_file_path:
+ with open(local_file_path, "wb") as f:
+ f.write(response.content)
+ return None
+
+ return BytesIO(response.content)
diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_settings.py b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_settings.py
new file mode 100644
index 000000000000..4ea3457ed57f
--- /dev/null
+++ b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_python_settings.py
@@ -0,0 +1,34 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+from __future__ import annotations
+
+import uuid
+from enum import Enum
+
+from pydantic import Field
+
+from semantic_kernel.kernel_pydantic import KernelBaseModel
+
+
+class CodeInputType(str, Enum):
+ """Code input type."""
+
+ Inline = "inline"
+
+
+class CodeExecutionType(str, Enum):
+ """Code execution type."""
+
+ Synchronous = "synchronous"
+ # Asynchronous = "asynchronous" TODO: Enable when available
+
+
+class SessionsPythonSettings(KernelBaseModel):
+ """The Sessions Python code interpreter settings."""
+
+ session_id: str | None = Field(default_factory=lambda: str(uuid.uuid4()), alias="identifier")
+ code_input_type: CodeInputType | None = Field(default=CodeInputType.Inline, alias="codeInputType")
+ execution_type: CodeExecutionType | None = Field(default=CodeExecutionType.Synchronous, alias="executionType")
+ python_code: str | None = Field(alias="pythonCode", default=None)
+ timeout_in_sec: int | None = Field(default=100, alias="timeoutInSeconds")
+ sanitize_input: bool | None = Field(default=True, alias="sanitizeInput")
diff --git a/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_remote_file_metadata.py b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_remote_file_metadata.py
new file mode 100644
index 000000000000..2d22c67b31cb
--- /dev/null
+++ b/python/semantic_kernel/core_plugins/sessions_python_tool/sessions_remote_file_metadata.py
@@ -0,0 +1,23 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+from semantic_kernel.kernel_pydantic import KernelBaseModel
+
+
+class SessionsRemoteFileMetadata(KernelBaseModel):
+ """Metadata for a file in the session."""
+
+ filename: str
+ """The filename relative to `/mnt/data`."""
+
+ size_in_bytes: int
+ """The size of the file in bytes."""
+
+ @property
+ def full_path(self) -> str:
+ """Get the full path of the file."""
+ return f"/mnt/data/{self.filename}"
+
+ @staticmethod
+ def from_dict(data: dict) -> "SessionsRemoteFileMetadata":
+ """Create a RemoteFileMetadata object from a dictionary."""
+ return SessionsRemoteFileMetadata(filename=data["filename"], size_in_bytes=data["bytes"])
diff --git a/python/semantic_kernel/utils/settings.py b/python/semantic_kernel/utils/settings.py
index 1c7a56473a9e..63f3c0d933a0 100644
--- a/python/semantic_kernel/utils/settings.py
+++ b/python/semantic_kernel/utils/settings.py
@@ -351,3 +351,27 @@ def booking_sample_settings_from_dot_env_as_dict() -> dict[str, str]:
"""
client_id, tenant_id, client_secret = booking_sample_settings_from_dot_env()
return {"client_id": client_id, "tenant_id": tenant_id, "client_secret": client_secret}
+
+
+def azure_container_apps_settings_from_dot_env() -> str:
+ """
+ Reads the Azure Container Apps environment variables from the .env file.
+ Returns:
+ str: Azure Container Apps pool management connection string
+ """
+ config = dotenv_values(".env")
+ connection_string = config.get("ACA_POOL_MANAGEMENT_ENDPOINT", None)
+
+ assert connection_string is not None, "Azure Container Apps connection string not found in .env file"
+
+ return connection_string
+
+
+def azure_container_apps_settings_from_dot_env_as_dict() -> dict[str, str]:
+ """
+ Reads the Azure Container Apps environment variables from the .env file.
+ Returns:
+ Dict[str, str]: Azure Container Apps environment variables
+ """
+ pool_management_endpoint = azure_container_apps_settings_from_dot_env()
+ return {"pool_management_endpoint": pool_management_endpoint}
diff --git a/python/tests/unit/core_plugins/test_sessions_python_plugin.py b/python/tests/unit/core_plugins/test_sessions_python_plugin.py
new file mode 100644
index 000000000000..2c2daf0c9ec2
--- /dev/null
+++ b/python/tests/unit/core_plugins/test_sessions_python_plugin.py
@@ -0,0 +1,283 @@
+# Copyright (c) Microsoft. All rights reserved.
+
+from io import BufferedReader, BytesIO
+from unittest.mock import mock_open, patch
+
+import httpx
+import pytest
+
+from semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin import (
+ SessionsPythonTool,
+)
+from semantic_kernel.kernel import Kernel
+
+
+def test_auth_callback():
+ return "sample_token"
+
+
+def test_it_can_be_instantiated():
+ plugin = SessionsPythonTool(pool_management_endpoint="https://example.com", auth_callback=test_auth_callback)
+ assert plugin is not None
+
+
+def test_validate_endpoint():
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/execute/", auth_callback=test_auth_callback
+ )
+ assert plugin is not None
+ assert plugin.pool_management_endpoint == "https://example.com/"
+
+
+def test_it_can_be_imported(kernel: Kernel):
+ plugin = SessionsPythonTool(pool_management_endpoint="https://example.com", auth_callback=test_auth_callback)
+ assert kernel.add_plugin(plugin=plugin, plugin_name="PythonCodeInterpreter")
+ assert kernel.plugins["PythonCodeInterpreter"] is not None
+ assert kernel.plugins["PythonCodeInterpreter"].name == "PythonCodeInterpreter"
+
+
+@pytest.mark.asyncio
+@patch("httpx.AsyncClient.post")
+async def test_call_to_container_succeeds(mock_post):
+ async def async_return(result):
+ return result
+
+ with patch(
+ "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token",
+ return_value="test_token",
+ ):
+ mock_request = httpx.Request(method="POST", url="https://example.com/python/execute/")
+
+ mock_response = httpx.Response(
+ status_code=200, json={"result": "success", "stdout": "", "stderr": ""}, request=mock_request
+ )
+
+ mock_post.return_value = await async_return(mock_response)
+
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/execute/", auth_callback=test_auth_callback
+ )
+ result = await plugin.execute_code("print('hello world')")
+
+ assert result is not None
+ mock_post.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+@patch("httpx.AsyncClient.post")
+async def test_call_to_container_fails_raises_exception(mock_post):
+ async def async_return(result):
+ return result
+
+ with patch(
+ "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token",
+ return_value="test_token",
+ ):
+ mock_request = httpx.Request(method="POST", url="https://example.com/python/execute/")
+
+ mock_response = httpx.Response(status_code=500, request=mock_request)
+
+ mock_post.return_value = await async_return(mock_response)
+
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/execute/", auth_callback=test_auth_callback
+ )
+
+ with pytest.raises(Exception):
+ _ = await plugin.execute_code("print('hello world')")
+
+
+@pytest.mark.asyncio
+@patch("httpx.AsyncClient.post")
+async def test_upload_file_with_local_path(mock_post):
+ """Test upload_file when providing a local file path."""
+
+ async def async_return(result):
+ return result
+
+ with patch(
+ "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token",
+ return_value="test_token",
+ ), patch("builtins.open", mock_open(read_data=b"file data")):
+ mock_request = httpx.Request(method="POST", url="https://example.com/python/uploadFile?identifier=None")
+
+ mock_response = httpx.Response(
+ status_code=200, json={"filename": "test.txt", "bytes": 123}, request=mock_request
+ )
+ mock_post.return_value = await async_return(mock_response)
+
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/", auth_callback=lambda: "sample_token"
+ )
+
+ result = await plugin.upload_file(local_file_path="test.txt", remote_file_path="uploaded_test.txt")
+ assert result.filename == "test.txt"
+ assert result.size_in_bytes == 123
+ mock_post.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+@patch("httpx.AsyncClient.post")
+async def test_upload_file_with_buffer(mock_post):
+ """Test upload_file when providing file data as a BufferedReader."""
+
+ async def async_return(result):
+ return result
+
+ with patch(
+ "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token",
+ return_value="test_token",
+ ):
+ mock_request = httpx.Request(method="POST", url="https://example.com/python/uploadFile?identifier=None")
+
+ mock_response = httpx.Response(
+ status_code=200, json={"filename": "buffer_file.txt", "bytes": 456}, request=mock_request
+ )
+ mock_post.return_value = await async_return(mock_response)
+
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/", auth_callback=lambda: "sample_token"
+ )
+
+ data_buffer = BufferedReader(BytesIO(b"file data"))
+
+ result = await plugin.upload_file(data=data_buffer, remote_file_path="buffer_file.txt")
+ assert result.filename == "buffer_file.txt"
+ assert result.size_in_bytes == 456
+ mock_post.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+@patch("httpx.AsyncClient.get")
+async def test_list_files(mock_get):
+ """Test list_files function."""
+
+ async def async_return(result):
+ return result
+
+ with patch(
+ "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token",
+ return_value="test_token",
+ ):
+
+ mock_request = httpx.Request(method="GET", url="https://example.com/python/files?identifier=None")
+
+ mock_response = httpx.Response(
+ status_code=200,
+ json={
+ "$values": [
+ {"filename": "test1.txt", "bytes": 123},
+ {"filename": "test2.txt", "bytes": 456},
+ ]
+ },
+ request=mock_request,
+ )
+ mock_get.return_value = await async_return(mock_response)
+
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/", auth_callback=lambda: "sample_token"
+ )
+
+ files = await plugin.list_files()
+ assert len(files) == 2
+ assert files[0].filename == "test1.txt"
+ assert files[0].size_in_bytes == 123
+ assert files[1].filename == "test2.txt"
+ assert files[1].size_in_bytes == 456
+ mock_get.assert_awaited_once()
+
+
+@pytest.mark.asyncio
+@patch("httpx.AsyncClient.get")
+async def test_download_file_to_local(mock_get):
+ """Test download_file when saving to a local file path."""
+
+ async def async_return(result):
+ return result
+
+ async def mock_auth_callback():
+ return "test_token"
+
+ with patch(
+ "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token",
+ return_value="test_token",
+ ), patch("builtins.open", mock_open()) as mock_file:
+ mock_request = httpx.Request(
+ method="GET", url="https://example.com/python/downloadFile?identifier=None&filename=remote_test.txt"
+ )
+
+ mock_response = httpx.Response(status_code=200, content=b"file data", request=mock_request)
+ mock_get.return_value = await async_return(mock_response)
+
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/", auth_callback=mock_auth_callback
+ )
+
+ await plugin.download_file(remote_file_path="remote_test.txt", local_file_path="local_test.txt")
+ mock_get.assert_awaited_once()
+ mock_file.assert_called_once_with("local_test.txt", "wb")
+ mock_file().write.assert_called_once_with(b"file data")
+
+
+@pytest.mark.asyncio
+@patch("httpx.AsyncClient.get")
+async def test_download_file_to_buffer(mock_get):
+ """Test download_file when returning as a BufferedReader."""
+
+ async def async_return(result):
+ return result
+
+ async def mock_auth_callback():
+ return "test_token"
+
+ with patch(
+ "semantic_kernel.core_plugins.sessions_python_tool.sessions_python_plugin.SessionsPythonTool._ensure_auth_token",
+ return_value="test_token",
+ ):
+ mock_request = httpx.Request(
+ method="GET", url="https://example.com/python/downloadFile?identifier=None&filename=remote_test.txt"
+ )
+
+ mock_response = httpx.Response(status_code=200, content=b"file data", request=mock_request)
+ mock_get.return_value = await async_return(mock_response)
+
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/", auth_callback=mock_auth_callback
+ )
+
+ buffer = await plugin.download_file(remote_file_path="remote_test.txt")
+ assert buffer is not None
+ assert buffer.read() == b"file data"
+ mock_get.assert_awaited_once()
+
+
+@pytest.mark.parametrize(
+ "input_code, expected_output",
+ [
+ # Basic whitespace removal
+ (" print('hello') ", "print('hello')"),
+ (" \n `print('hello')` ", "print('hello')"),
+ ("` print('hello')`", "print('hello')"),
+ # Removal of 'python' keyword
+ (" python print('hello') ", "print('hello')"),
+ (" Python print('hello') ", "print('hello')"),
+ ("` python print('hello')` ", "print('hello')"),
+ ("`Python print('hello')`", "print('hello')"),
+ # Mixed usage
+ (" ` python print('hello')` ", "print('hello')"),
+ (" `python print('hello') `", "print('hello')"),
+ # Code without any issues
+ ("print('hello')", "print('hello')"),
+ # Empty code
+ ("", ""),
+ ("` `", ""),
+ (" ", ""),
+ ],
+)
+def test_sanitize_input(input_code, expected_output):
+ """Test the `_sanitize_input` function with various inputs."""
+ plugin = SessionsPythonTool(
+ pool_management_endpoint="https://example.com/python/", auth_callback=lambda: "sample_token"
+ )
+ sanitized_code = plugin._sanitize_input(input_code)
+ assert sanitized_code == expected_output