diff --git a/packages/toolbox-core/src/toolbox_core/client.py b/packages/toolbox-core/src/toolbox_core/client.py index de4e4107..c3551d8a 100644 --- a/packages/toolbox-core/src/toolbox_core/client.py +++ b/packages/toolbox-core/src/toolbox_core/client.py @@ -13,6 +13,7 @@ # limitations under the License. +from asyncio import get_running_loop from types import MappingProxyType from typing import Any, Awaitable, Callable, Mapping, Optional, Union @@ -28,11 +29,13 @@ class ToolboxClient: An asynchronous client for interacting with a Toolbox service. Provides methods to discover and load tools defined by a remote Toolbox - service endpoint. It manages an underlying `aiohttp.ClientSession`. + service endpoint. It manages an underlying `aiohttp.ClientSession`, if one + is not provided. """ __base_url: str __session: ClientSession + __manage_session: bool def __init__( self, @@ -56,7 +59,9 @@ def __init__( self.__base_url = url # If no aiohttp.ClientSession is provided, make our own + self.__manage_session = False if session is None: + self.__manage_session = True session = ClientSession() self.__session = session @@ -136,16 +141,32 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): """ await self.close() + def __del__(self): + # This method is a "best-effort" safety net. + # It should NOT be relied upon for guaranteed resource cleanup. + # Explicitly using "async with" or calling "await client.close()" is the correct way. + if self.__manage_session: + try: + loop = get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + # If a loop is running, try to schedule the close operation. + # This is "fire-and-forget"; there's no guarantee it will complete + # before the loop or interpreter shuts down. + loop.create_task(self.__session.close()) + async def close(self): """ Asynchronously closes the underlying client session. Doing so will cause any tools created by this Client to cease to function. If the session was provided externally during initialization, the caller - is responsible for its lifecycle, but calling close here will still - attempt to close it. + is responsible for its lifecycle. """ - await self.__session.close() + if self.__manage_session and not self.__session.closed: + await self.__session.close() async def load_tool( self, diff --git a/packages/toolbox-core/src/toolbox_core/sync_client.py b/packages/toolbox-core/src/toolbox_core/sync_client.py index 312a27ad..979bca0d 100644 --- a/packages/toolbox-core/src/toolbox_core/sync_client.py +++ b/packages/toolbox-core/src/toolbox_core/sync_client.py @@ -13,7 +13,7 @@ # limitations under the License. -import asyncio +from asyncio import AbstractEventLoop, new_event_loop, run_coroutine_threadsafe from threading import Thread from typing import Any, Awaitable, Callable, Mapping, Optional, Union @@ -29,7 +29,7 @@ class ToolboxSyncClient: service endpoint. """ - __loop: Optional[asyncio.AbstractEventLoop] = None + __loop: Optional[AbstractEventLoop] = None __thread: Optional[Thread] = None def __init__( @@ -49,7 +49,7 @@ def __init__( # Running a loop in a background thread allows us to support async # methods from non-async environments. if self.__class__.__loop is None: - loop = asyncio.new_event_loop() + loop = new_event_loop() thread = Thread(target=loop.run_forever, daemon=True) thread.start() self.__class__.__thread = thread @@ -58,7 +58,7 @@ def __init__( async def create_client(): return ToolboxClient(url, client_headers=client_headers) - self.__async_client = asyncio.run_coroutine_threadsafe( + self.__async_client = run_coroutine_threadsafe( create_client(), self.__class__.__loop ).result() @@ -66,13 +66,12 @@ def close(self): """ Synchronously closes the underlying client session. Doing so will cause any tools created by this Client to cease to function. - - If the session was provided externally during initialization, the caller - is responsible for its lifecycle, but calling close here will still - attempt to close it. """ coro = self.__async_client.close() - asyncio.run_coroutine_threadsafe(coro, self.__loop).result() + run_coroutine_threadsafe(coro, self.__loop).result(timeout=5) + + def __del__(self): + self.close() def load_tool( self, @@ -108,7 +107,7 @@ def load_tool( if not self.__loop or not self.__thread: raise ValueError("Background loop or thread cannot be None.") - async_tool = asyncio.run_coroutine_threadsafe(coro, self.__loop).result() + async_tool = run_coroutine_threadsafe(coro, self.__loop).result() return ToolboxSyncTool(async_tool, self.__loop, self.__thread) def load_toolset( @@ -151,7 +150,7 @@ def load_toolset( if not self.__loop or not self.__thread: raise ValueError("Background loop or thread cannot be None.") - async_tools = asyncio.run_coroutine_threadsafe(coro, self.__loop).result() + async_tools = run_coroutine_threadsafe(coro, self.__loop).result() return [ ToolboxSyncTool(async_tool, self.__loop, self.__thread) for async_tool in async_tools diff --git a/packages/toolbox-core/tests/conftest.py b/packages/toolbox-core/tests/conftest.py index e579f843..6d2686a0 100644 --- a/packages/toolbox-core/tests/conftest.py +++ b/packages/toolbox-core/tests/conftest.py @@ -22,6 +22,7 @@ import subprocess import tempfile import time +from asyncio import run_coroutine_threadsafe from typing import Generator import google @@ -29,6 +30,8 @@ from google.auth import compute_engine from google.cloud import secretmanager, storage +from toolbox_core import ToolboxSyncClient + #### Define Utility Functions def get_env_var(key: str) -> str: @@ -92,6 +95,38 @@ def get_auth_token(client_id: str) -> str: #### Define Fixtures +@pytest_asyncio.fixture(autouse=True) +def patch_sync_client_for_deadlock(monkeypatch): + """ + Automatically replaces the blocking `ToolboxSyncClient.close()` method + with a non-blocking version for the entire test run. + + The original `ToolboxSyncClient.close()` is a blocking method because it + calls `.result()`. In the pytest environment, this blocking call creates a + deadlock during the test teardown phase when it conflicts with other fixtures + (like `sync_client_environment` or `toolbox_server`) that are also managing + background processes and threads. + + By replacing `close` with this safe, non-blocking version, we prevent the + deadlock and allow the test suite's fixtures to tear down cleanly. + This change is only active during the test run. + """ + + def non_blocking_close(self): + """A replacement for close() that doesn't block.""" + if hasattr(self.__class__, "_ToolboxSyncClient__loop") and hasattr( + self, "_ToolboxSyncClient__async_client" + ): + loop = self.__class__._ToolboxSyncClient__loop + async_client = self._ToolboxSyncClient__async_client + + if loop and loop.is_running(): + coro = async_client.close() + run_coroutine_threadsafe(coro, loop) + + monkeypatch.setattr(ToolboxSyncClient, "close", non_blocking_close) + + @pytest_asyncio.fixture(scope="session") def project_id() -> str: return get_env_var("GOOGLE_CLOUD_PROJECT") diff --git a/packages/toolbox-core/tests/test_sync_client.py b/packages/toolbox-core/tests/test_sync_client.py index 28f2b27b..32634d53 100644 --- a/packages/toolbox-core/tests/test_sync_client.py +++ b/packages/toolbox-core/tests/test_sync_client.py @@ -47,7 +47,7 @@ def sync_client_environment(): if original_loop and original_loop.is_running(): original_loop.call_soon_threadsafe(original_loop.stop) if original_thread and original_thread.is_alive(): - original_thread.join(timeout=5) + original_thread.join() ToolboxSyncClient._ToolboxSyncClient__loop = None ToolboxSyncClient._ToolboxSyncClient__thread = None @@ -63,7 +63,7 @@ def sync_client_environment(): if test_loop and test_loop.is_running(): test_loop.call_soon_threadsafe(test_loop.stop) if test_thread and test_thread.is_alive(): - test_thread.join(timeout=5) + test_thread.join() # Explicitly set to None to ensure a clean state for the next fixture use/test. ToolboxSyncClient._ToolboxSyncClient__loop = None @@ -288,30 +288,6 @@ def test_sync_client_creation_in_isolated_env(self, sync_client): sync_client._ToolboxSyncClient__async_client, ToolboxClient ), "Async client should be ToolboxClient instance" - @pytest.mark.usefixtures("sync_client_environment") - def test_sync_client_close_method(self): - """ - Tests the close() method of ToolboxSyncClient when manually created. - The sync_client_environment ensures loop/thread cleanup. - """ - mock_async_client_instance = AsyncMock(spec=ToolboxClient) - # AsyncMock methods are already AsyncMocks - # mock_async_client_instance.close = AsyncMock(return_value=None) - - with patch( - "toolbox_core.sync_client.ToolboxClient", - return_value=mock_async_client_instance, - ) as MockedAsyncClientConst: - client = ToolboxSyncClient(TEST_BASE_URL) - # The sync client passes its internal loop to the async client. - MockedAsyncClientConst.assert_called_once_with( - TEST_BASE_URL, client_headers=None - ) - - client.close() # This call closes the async_client's session. - mock_async_client_instance.close.assert_awaited_once() - # The sync_client_environment fixture handles stopping the loop/thread. - @pytest.mark.usefixtures("sync_client_environment") def test_sync_client_context_manager(self, aioresponses, tool_schema_minimal): """ diff --git a/packages/toolbox-langchain/src/toolbox_langchain/client.py b/packages/toolbox-langchain/src/toolbox_langchain/client.py index ab9ff7c0..324a995c 100644 --- a/packages/toolbox-langchain/src/toolbox_langchain/client.py +++ b/packages/toolbox-langchain/src/toolbox_langchain/client.py @@ -17,7 +17,6 @@ from warnings import warn from toolbox_core.sync_client import ToolboxSyncClient as ToolboxCoreSyncClient -from toolbox_core.sync_tool import ToolboxSyncTool from .tools import ToolboxTool