From a11eb98d2b5e19ab3dadf92567a518ae534ece27 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 15 May 2025 15:23:16 -0500 Subject: [PATCH 1/5] add supported server resources checks --- servicex/query_core.py | 25 +++++--- servicex/servicex_adapter.py | 116 ++++++++++++++++++++++++++++++++++- 2 files changed, 131 insertions(+), 10 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index e8ee2c36..ab2ca443 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -560,23 +560,30 @@ async def get_signed_url( if progress: progress.advance(task_id=download_progress, task_type="Download") + transformation_results_enabled = "transformationresults" in await self.servicex.get_resources() + while True: if not cached_record: await asyncio.sleep(self.minio_polling_interval) if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc) - files = await self.servicex.get_transformation_results( - self.current_status.request_id, begin_at - ) - begin_at = new_begin_at + if transformation_results_enabled: + new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc) + files = await self.servicex.get_transformation_results( + self.current_status.request_id, begin_at + ) + begin_at = new_begin_at + else: + files = await self.minio.list_bucket() for file in files: - if "file-path" not in file: - continue - - file_path = file["file-path"].replace("/", ":") + if transformation_results_enabled: + if "file-path" not in file: + continue + file_path = file.get("file-path", '').replace("/", ":") + else: + file_path = file.filename if file_path not in files_seen: if signed_urls_only: diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 0ce0e67f..a3380bb3 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -28,7 +28,9 @@ import os import time import datetime -from typing import Optional, Dict, List +import asyncio +from typing import Optional, Dict, List, Any, TypeVar, Callable, cast +from functools import wraps from aiohttp import ClientSession import httpx @@ -44,6 +46,78 @@ from servicex.models import TransformRequest, TransformStatus, CachedDataset +T = TypeVar('T') + +def requires_resource(resource_name: str) -> Callable[[Callable[..., T]], Callable[..., T]]: + """ + Decorator to check if a specific API resource is available on the server before executing the method. + + Args: + resource_name: The name of the resource that needs to be available + + Returns: + A decorator function that wraps around class methods + + Raises: + ResourceNotAvailableError: If the required resource is not available on the server + """ + + def decorator(func: Callable[..., T]) -> Callable[..., T]: + # Determine if function is async at decoration time (not runtime) + is_async = asyncio.iscoroutinefunction(func) + func_name = func.__name__ + + # Class-level cache for sync method resources + sync_cache_key = f'_sync_resources_for_{resource_name}' + + if is_async: + @wraps(func) + async def async_wrapper(self, *args: Any, **kwargs: Any) -> T: + # Get resources and check availability in one operation + if resource_name not in await self.get_resources(): + raise ResourceNotAvailableError( + f"Resource '{resource_name}' required for '{func_name}' is unavailable" + ) + return await func(self, *args, **kwargs) + + return cast(Callable[..., T], async_wrapper) + else: + @wraps(func) + def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: + # Initialize class-level cache attributes if needed + cls = self.__class__ + if not hasattr(cls, sync_cache_key): + setattr(cls, sync_cache_key, (None, 0)) # (resources, timestamp) + + cache_ttl = getattr(self, '_resources_cache_ttl', 300) + cached_resources, timestamp = getattr(cls, sync_cache_key) + current_time = time.time() + + # Check if cache needs refresh + if cached_resources is None or (current_time - timestamp) >= cache_ttl: + loop = asyncio.new_event_loop() + try: + cached_resources = loop.run_until_complete(self.get_resources()) + setattr(cls, sync_cache_key, (cached_resources, current_time)) + finally: + loop.close() + + # Check resource availability + if resource_name not in cached_resources: + raise ResourceNotAvailableError( + f"Resource '{resource_name}' required for '{func_name}' is unavailable" + ) + + return func(self, *args, **kwargs) + + return cast(Callable[..., T], sync_wrapper) + + return decorator + + +class ResourceNotAvailableError(Exception): + """Exception raised when a required resource is not available on the server.""" + pass class AuthorizationError(BaseException): pass @@ -64,6 +138,45 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): self.refresh_token = refresh_token self.token = None + self._available_resources: Optional[Dict[str, Any]] = None + self._resources_last_updated: Optional[float] = None + self._resources_cache_ttl = 60*5 + + async def get_resources(self) -> Dict[str, Any]: + """ + Fetches the list of available resources from the server. + Caches the result for 5 minutes to avoid excessive API calls. + + Returns: + A dictionary of available resources with their properties + """ + current_time = time.time() + + # Return cached resources if they exist and are not expired + if (self._available_resources is not None and + self._resources_last_updated is not None and + current_time - self._resources_last_updated < self._resources_cache_ttl): + return self._available_resources + + # Fetch resources from server + headers = await self._get_authorization() + async with ClientSession() as session: + async with session.get( + headers=headers, url=f"{self.url}/servicex/resources" + ) as r: + if r.status == 403: + raise AuthorizationError( + f"Not authorized to access serviceX at {self.url}" + ) + elif r.status != 200: + msg = await _extract_message(r) + raise RuntimeError(f"Failed to get resources: {r.status} - {msg}") + + self._available_resources = await r.json() + self._resources_last_updated = current_time + + return self._available_resources + async def _get_token(self): url = f"{self.url}/token/refresh" headers = {"Authorization": f"Bearer {self.refresh_token}"} @@ -229,6 +342,7 @@ async def delete_transform(self, transform_id=None): f"Failed to delete transform {transform_id} - {msg}" ) + @requires_resource("transformationresults") async def get_transformation_results( self, request_id: str, begin_at: datetime.datetime ): From 9db76ad871176fe807044145c5cc8f5d8ada6c57 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 15 May 2025 20:28:32 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/query_core.py | 6 ++++-- servicex/servicex_adapter.py | 27 ++++++++++++++++++--------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index ab2ca443..edcf39cd 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -560,7 +560,9 @@ async def get_signed_url( if progress: progress.advance(task_id=download_progress, task_type="Download") - transformation_results_enabled = "transformationresults" in await self.servicex.get_resources() + transformation_results_enabled = ( + "transformationresults" in await self.servicex.get_resources() + ) while True: if not cached_record: @@ -581,7 +583,7 @@ async def get_signed_url( if transformation_results_enabled: if "file-path" not in file: continue - file_path = file.get("file-path", '').replace("/", ":") + file_path = file.get("file-path", "").replace("/", ":") else: file_path = file.filename diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index a3380bb3..7fb4fb20 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -46,9 +46,12 @@ from servicex.models import TransformRequest, TransformStatus, CachedDataset -T = TypeVar('T') +T = TypeVar("T") -def requires_resource(resource_name: str) -> Callable[[Callable[..., T]], Callable[..., T]]: + +def requires_resource( + resource_name: str, +) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator to check if a specific API resource is available on the server before executing the method. @@ -68,9 +71,10 @@ def decorator(func: Callable[..., T]) -> Callable[..., T]: func_name = func.__name__ # Class-level cache for sync method resources - sync_cache_key = f'_sync_resources_for_{resource_name}' + sync_cache_key = f"_sync_resources_for_{resource_name}" if is_async: + @wraps(func) async def async_wrapper(self, *args: Any, **kwargs: Any) -> T: # Get resources and check availability in one operation @@ -82,6 +86,7 @@ async def async_wrapper(self, *args: Any, **kwargs: Any) -> T: return cast(Callable[..., T], async_wrapper) else: + @wraps(func) def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: # Initialize class-level cache attributes if needed @@ -89,7 +94,7 @@ def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: if not hasattr(cls, sync_cache_key): setattr(cls, sync_cache_key, (None, 0)) # (resources, timestamp) - cache_ttl = getattr(self, '_resources_cache_ttl', 300) + cache_ttl = getattr(self, "_resources_cache_ttl", 300) cached_resources, timestamp = getattr(cls, sync_cache_key) current_time = time.time() @@ -117,8 +122,10 @@ def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: class ResourceNotAvailableError(Exception): """Exception raised when a required resource is not available on the server.""" + pass + class AuthorizationError(BaseException): pass @@ -140,7 +147,7 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): self._available_resources: Optional[Dict[str, Any]] = None self._resources_last_updated: Optional[float] = None - self._resources_cache_ttl = 60*5 + self._resources_cache_ttl = 60 * 5 async def get_resources(self) -> Dict[str, Any]: """ @@ -153,16 +160,18 @@ async def get_resources(self) -> Dict[str, Any]: current_time = time.time() # Return cached resources if they exist and are not expired - if (self._available_resources is not None and - self._resources_last_updated is not None and - current_time - self._resources_last_updated < self._resources_cache_ttl): + if ( + self._available_resources is not None + and self._resources_last_updated is not None + and current_time - self._resources_last_updated < self._resources_cache_ttl + ): return self._available_resources # Fetch resources from server headers = await self._get_authorization() async with ClientSession() as session: async with session.get( - headers=headers, url=f"{self.url}/servicex/resources" + headers=headers, url=f"{self.url}/servicex/resources" ) as r: if r.status == 403: raise AuthorizationError( From 4ff0b34f73d8a6a8ed569391fe7a6a11aa7ef634 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 15 May 2025 15:31:13 -0500 Subject: [PATCH 3/5] flake8 compliance --- servicex/servicex_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 7fb4fb20..ac8c23d2 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -53,7 +53,7 @@ def requires_resource( resource_name: str, ) -> Callable[[Callable[..., T]], Callable[..., T]]: """ - Decorator to check if a specific API resource is available on the server before executing the method. + Decorator to check if a specific API resource is available on the server. Args: resource_name: The name of the resource that needs to be available From 5eaaad4dc6806879b3e380bf86f1d78163cc126c Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Fri, 16 May 2025 10:41:20 -0500 Subject: [PATCH 4/5] fix breaking tests --- tests/test_dataset.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index ef09c0d7..39e6b4c9 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -136,6 +136,10 @@ async def test_download_files(python_dataset): minio_mock.download_file.return_value = Path("/path/to/downloaded_file") minio_mock.get_signed_url.return_value = Path("http://example.com/signed_url") + minio_mock.list_bucket.return_value = [ + Mock(filename="file1.txt"), + Mock(filename="file2.txt"), + ] progress_mock = Mock() python_dataset.minio_polling_interval = 0 @@ -161,6 +165,10 @@ async def test_download_files_with_signed_urls(python_dataset): python_dataset.configuration = config minio_mock.download_file.return_value = "/path/to/downloaded_file" minio_mock.get_signed_url.return_value = "http://example.com/signed_url" + minio_mock.list_bucket.return_value = [ + Mock(filename="file1.txt"), + Mock(filename="file2.txt"), + ] progress_mock = Mock() python_dataset.servicex = AsyncMock() From db04cef183116119cb09103eb82f439cdf3897c8 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 20 May 2025 10:08:08 -0500 Subject: [PATCH 5/5] resolve breaking tests --- tests/test_servicex_dataset.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 45519e69..6956378a 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -226,6 +226,7 @@ async def test_submit(mocker): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -278,6 +279,7 @@ async def test_submit_partial_success(mocker): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -323,6 +325,7 @@ async def test_use_of_cache(mocker): transform_status3, ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -357,6 +360,7 @@ async def test_use_of_cache(mocker): # second round, should hit the cache (and not call the sx_adapter, minio, or update_record) with ExpandableProgress(display_progress=False) as progress: servicex2 = AsyncMock() + mock_minio.list_bucket.reset_mock() servicex.get_transformation_results.reset_mock() mock_minio.get_signed_url.reset_mock() datasource2 = Query( @@ -375,6 +379,7 @@ async def test_use_of_cache(mocker): signed_urls_only=True, expandable_progress=progress ) servicex2.assert_not_awaited() + mock_minio.list_bucket.assert_not_awaited() servicex.get_transformation_results.assert_not_awaited() mock_minio.get_signed_url.assert_not_awaited() upd.assert_not_called() @@ -399,6 +404,7 @@ async def test_use_of_cache(mocker): signed_urls_only=False, expandable_progress=progress ) servicex.assert_not_awaited() + mock_minio.list_bucket.assert_not_awaited() servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() upd.assert_called_once() @@ -417,6 +423,7 @@ async def test_submit_cancel(mocker): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -457,6 +464,7 @@ async def test_submit_fatal(mocker): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -501,6 +509,7 @@ async def test_submit_generic(mocker, codegen_list): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock() mock_cache = mocker.MagicMock(QueryCache) @@ -549,6 +558,7 @@ async def test_submit_cancelled(mocker, codegen_list): sx.get_transform_status.side_effect = [transform_status4] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock() mock_cache = mocker.MagicMock(QueryCache) @@ -623,6 +633,7 @@ async def test_use_of_ignore_cache(mocker, servicex): ) # Prepare Minio mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) did = FileListDataset("/foo/bar/baz.root") @@ -698,6 +709,7 @@ async def test_use_of_ignore_cache(mocker, servicex): res = await datasource_without_ignore_cache.submit_and_download( signed_urls_only=True, expandable_progress=progress ) # noqa + mock_minio.list_bucket.assert_not_awaited() servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() assert len(res.signed_url_list) == 2