From 90b61735ddfe2bbf10559b6d8448cca8bcaabbe3 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 6 May 2025 15:50:02 -0500 Subject: [PATCH 01/11] remove s3 bucket polling when waiting for transformation results --- servicex/query_core.py | 17 +++++++++++------ servicex/servicex_adapter.py | 23 ++++++++++++++++++++++- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index ac4f5f09..b631c840 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -522,6 +522,7 @@ async def download_files( Task to monitor the list of files in the transform output's bucket. Any new files will be downloaded. """ + files_seen = set() result_uris = [] download_tasks = [] @@ -557,15 +558,19 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - files = await self.minio.list_bucket() + files = await self.servicex.get_transformation_results(self.current_status.request_id) + for file in files: - if file.filename not in files_seen: + if 'file-path' not in file: + continue + + file_path = file['file-path'].replace('/', ':') + if file_path not in files_seen: if signed_urls_only: download_tasks.append( loop.create_task( get_signed_url( - self.minio, - file.filename, + file_path, progress, download_progress, ) @@ -576,14 +581,14 @@ async def get_signed_url( loop.create_task( download_file( self.minio, - file.filename, + file_path, progress, download_progress, shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 ) ) ) # NOQA 501 - files_seen.add(file.filename) + files_seen.add(file_path) # Once the transform is complete and all files are seen we can stop polling. # Also, if we are just downloading or signing urls for a previous transform diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index baa7b5b0..5779d5da 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -228,6 +228,28 @@ async def delete_transform(self, transform_id=None): f"Failed to delete transform {transform_id} - {msg}" ) + async def get_transformation_results(self, request_id: str): + headers = await self._get_authorization() + url = self.url + f'/servicex/internal/transformation/{request_id}/results' + + async with ClientSession() as session: + async with session.get(headers=headers, url=url) as r: + if r.status == 403: + raise AuthorizationError( + f"Not authorized to access serviceX at {self.url}" + ) + + if r.status == 404: + raise ValueError(f"Request {request_id} not found") + + if r.status != 200: + msg = await _extract_message(r) + raise RuntimeError( + f"Failed with message: {msg}" + ) + + return (await r.json())['results'] + async def cancel_transform(self, transform_id=None): headers = await self._get_authorization() path_template = f"/servicex/transformation/{transform_id}/cancel" @@ -235,7 +257,6 @@ async def cancel_transform(self, transform_id=None): async with ClientSession() as session: async with session.get(headers=headers, url=url) as r: - if r.status == 403: raise AuthorizationError( f"Not authorized to access serviceX at {self.url}" From ee7b02c2a04ae0cd96798da0238fd7565158bb09 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 6 May 2025 20:51:44 +0000 Subject: [PATCH 02/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/query_core.py | 8 +++++--- servicex/servicex_adapter.py | 8 +++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index b631c840..bba338c5 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -558,13 +558,15 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - files = await self.servicex.get_transformation_results(self.current_status.request_id) + files = await self.servicex.get_transformation_results( + self.current_status.request_id + ) for file in files: - if 'file-path' not in file: + if "file-path" not in file: continue - file_path = file['file-path'].replace('/', ':') + file_path = file["file-path"].replace("/", ":") if file_path not in files_seen: if signed_urls_only: download_tasks.append( diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 5779d5da..910fb067 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -230,7 +230,7 @@ async def delete_transform(self, transform_id=None): async def get_transformation_results(self, request_id: str): headers = await self._get_authorization() - url = self.url + f'/servicex/internal/transformation/{request_id}/results' + url = self.url + f"/servicex/internal/transformation/{request_id}/results" async with ClientSession() as session: async with session.get(headers=headers, url=url) as r: @@ -244,11 +244,9 @@ async def get_transformation_results(self, request_id: str): if r.status != 200: msg = await _extract_message(r) - raise RuntimeError( - f"Failed with message: {msg}" - ) + raise RuntimeError(f"Failed with message: {msg}") - return (await r.json())['results'] + return (await r.json())["results"] async def cancel_transform(self, transform_id=None): headers = await self._get_authorization() From 5b04365ed7bdf0fa80708733730e8305229ef9db Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 14 May 2025 14:40:51 -0500 Subject: [PATCH 03/11] add begin_at, update tests --- servicex/query_core.py | 13 +++++++-- servicex/servicex_adapter.py | 14 +++++++--- tests/test_servicex_dataset.py | 49 +++++++++++++++++++++++----------- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index bba338c5..2b816f99 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -27,6 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from __future__ import annotations +import datetime import abc import asyncio from abc import ABC @@ -318,6 +319,7 @@ def transform_complete(task: Task): else None ) + begin_at = datetime.datetime.now(datetime.UTC) if not cached_record: if self.cache.is_transform_request_submitted(sx_request_hash): @@ -342,13 +344,14 @@ def transform_complete(task: Task): download_files_task = loop.create_task( self.download_files( - signed_urls_only, expandable_progress, download_progress, cached_record + signed_urls_only, expandable_progress, download_progress, cached_record, begin_at ) ) try: signed_urls = [] downloaded_files = [] + download_result = await download_files_task if signed_urls_only: signed_urls = download_result @@ -517,6 +520,7 @@ async def download_files( progress: ExpandableProgress, download_progress: TaskID, cached_record: Optional[TransformedResults], + begin_at: datetime.datetime, ) -> List[str]: """ Task to monitor the list of files in the transform output's bucket. Any new files @@ -558,20 +562,25 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): + new_begin_at = datetime.datetime.now(datetime.UTC) files = await self.servicex.get_transformation_results( - self.current_status.request_id + self.current_status.request_id, + begin_at ) + begin_at = new_begin_at for file in files: if "file-path" not in file: continue file_path = file["file-path"].replace("/", ":") + if file_path not in files_seen: if signed_urls_only: download_tasks.append( loop.create_task( get_signed_url( + self.minio, file_path, progress, download_progress, diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 910fb067..231a5216 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -27,6 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import os import time +import datetime from typing import Optional, Dict, List from aiohttp import ClientSession @@ -228,12 +229,16 @@ async def delete_transform(self, transform_id=None): f"Failed to delete transform {transform_id} - {msg}" ) - async def get_transformation_results(self, request_id: str): + async def get_transformation_results(self, request_id: str, begin_at: datetime.datetime): headers = await self._get_authorization() - url = self.url + f"/servicex/internal/transformation/{request_id}/results" + url = self.url + f"/servicex/transformation/{request_id}/results" + + params = {} + if begin_at: + params["begin_at"] = begin_at.isoformat() async with ClientSession() as session: - async with session.get(headers=headers, url=url) as r: + async with session.get(headers=headers, url=url, params=params) as r: if r.status == 403: raise AuthorizationError( f"Not authorized to access serviceX at {self.url}" @@ -246,7 +251,8 @@ async def get_transformation_results(self, request_id: str): msg = await _extract_message(r) raise RuntimeError(f"Failed with message: {msg}") - return (await r.json())["results"] + data = await r.json() + return data.get("results") async def cancel_transform(self, transform_id=None): headers = await self._get_authorization() diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 628dc2e0..f489954f 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -204,8 +204,18 @@ def cache_transform(record: TransformedResults): @pytest.mark.asyncio async def test_submit(mocker): servicex = AsyncMock() + servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + + servicex.get_transformation_results = AsyncMock(side_effect=[ + [{"file-path": file1.filename}], + [ + {"file-path": file1.filename}, + {"file-path": file2.filename}, + ], + ]) + servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -214,7 +224,6 @@ async def test_submit(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -235,6 +244,7 @@ async def test_submit(mocker): config=Configuration(api_endpoints=[]), ) datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") + with ExpandableProgress(display_progress=False) as progress: datasource.result_format = ResultFormat.parquet result = await datasource.submit_and_download( @@ -245,11 +255,18 @@ async def test_submit(mocker): mock_cache.cache_transform.assert_called_once() + @pytest.mark.asyncio async def test_submit_partial_success(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + + servicex.get_transformation_results = AsyncMock(side_effect=[ + [{"file-path": file1.filename}], + [{"file-path": file1.filename}], + ]) + servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -258,7 +275,6 @@ async def test_submit_partial_success(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -295,13 +311,16 @@ async def test_use_of_cache(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + servicex.get_transformation_results = AsyncMock(return_value=[ + {"file-path": file1.filename}, + {"file-path": file2.filename} + ]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, transform_status3, ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -336,7 +355,7 @@ async def test_use_of_cache(mocker): # second round, should hit the cache (and not call the sx_adapter, minio, or update_record) with ExpandableProgress(display_progress=False) as progress: servicex2 = AsyncMock() - mock_minio.list_bucket.reset_mock() + servicex.get_transformation_results.reset_mock() mock_minio.get_signed_url.reset_mock() datasource2 = Query( dataset_identifier=did, @@ -354,14 +373,14 @@ async def test_use_of_cache(mocker): signed_urls_only=True, expandable_progress=progress ) servicex2.assert_not_awaited() - mock_minio.list_bucket.assert_not_awaited() + servicex.get_transformation_results.assert_not_awaited() mock_minio.get_signed_url.assert_not_awaited() upd.assert_not_called() assert result1 == result2 upd.reset_mock() servicex.get_transform_status.reset_mock(side_effect=True) servicex.get_transform_status.return_value = transform_status3 - mock_minio.list_bucket.reset_mock(side_effect=True) + servicex.get_transformation_results.reset_mock(side_effect=True) # third round, should hit the cache and download files (and call update_record) with ExpandableProgress(display_progress=False) as progress: await datasource.submit_and_download( @@ -371,14 +390,14 @@ async def test_use_of_cache(mocker): assert mock_minio.download_file.await_count == 2 upd.assert_called_once() # fourth round, should hit the cache (and nothing else) - mock_minio.list_bucket.reset_mock() + servicex.get_transformation_results.reset_mock() mock_minio.download_file.reset_mock() with ExpandableProgress(display_progress=False) as progress: await datasource.submit_and_download( signed_urls_only=False, expandable_progress=progress ) servicex.assert_not_awaited() - mock_minio.list_bucket.assert_not_awaited() + servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() upd.assert_called_once() cache.close() @@ -396,7 +415,6 @@ async def test_submit_cancel(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -437,7 +455,6 @@ async def test_submit_fatal(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -482,7 +499,6 @@ async def test_submit_generic(mocker, codegen_list): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock() mock_cache = mocker.MagicMock(QueryCache) @@ -531,7 +547,6 @@ async def test_submit_cancelled(mocker, codegen_list): sx.get_transform_status.side_effect = [transform_status4] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock() mock_cache = mocker.MagicMock(QueryCache) @@ -601,10 +616,12 @@ async def test_use_of_ignore_cache(mocker, servicex): transform_status3, ] ) - + servicex.get_transformation_results = AsyncMock(return_value=[ + {"file-path": file1.filename}, + {"file-path": file2.filename} + ]) # Prepare Minio mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) did = FileListDataset("/foo/bar/baz.root") @@ -674,13 +691,13 @@ async def test_use_of_ignore_cache(mocker, servicex): transform_status1, transform_status3, ] - mock_minio.list_bucket.reset_mock() + servicex.get_transformation_results.reset_mock() mock_minio.download_file.reset_mock() with ExpandableProgress(display_progress=False) as progress: res = await datasource_without_ignore_cache.submit_and_download( signed_urls_only=True, expandable_progress=progress ) # noqa - mock_minio.list_bucket.assert_not_awaited() + servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() assert len(res.signed_url_list) == 2 cache.close() From 85b53466c094b0530299d7663c6d0f14cba504bf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 14 May 2025 19:41:11 +0000 Subject: [PATCH 04/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/query_core.py | 9 +++++--- servicex/servicex_adapter.py | 4 +++- tests/test_servicex_dataset.py | 41 +++++++++++++++++----------------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 2b816f99..cfc4eb4d 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -344,7 +344,11 @@ def transform_complete(task: Task): download_files_task = loop.create_task( self.download_files( - signed_urls_only, expandable_progress, download_progress, cached_record, begin_at + signed_urls_only, + expandable_progress, + download_progress, + cached_record, + begin_at, ) ) @@ -564,8 +568,7 @@ async def get_signed_url( if self.current_status.files_completed > len(files_seen): new_begin_at = datetime.datetime.now(datetime.UTC) files = await self.servicex.get_transformation_results( - self.current_status.request_id, - begin_at + self.current_status.request_id, begin_at ) begin_at = new_begin_at diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 231a5216..0ce0e67f 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -229,7 +229,9 @@ async def delete_transform(self, transform_id=None): f"Failed to delete transform {transform_id} - {msg}" ) - async def get_transformation_results(self, request_id: str, begin_at: datetime.datetime): + async def get_transformation_results( + self, request_id: str, begin_at: datetime.datetime + ): headers = await self._get_authorization() url = self.url + f"/servicex/transformation/{request_id}/results" diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index f489954f..45519e69 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -208,13 +208,15 @@ async def test_submit(mocker): servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(side_effect=[ - [{"file-path": file1.filename}], - [ - {"file-path": file1.filename}, - {"file-path": file2.filename}, - ], - ]) + servicex.get_transformation_results = AsyncMock( + side_effect=[ + [{"file-path": file1.filename}], + [ + {"file-path": file1.filename}, + {"file-path": file2.filename}, + ], + ] + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -255,17 +257,18 @@ async def test_submit(mocker): mock_cache.cache_transform.assert_called_once() - @pytest.mark.asyncio async def test_submit_partial_success(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(side_effect=[ - [{"file-path": file1.filename}], - [{"file-path": file1.filename}], - ]) + servicex.get_transformation_results = AsyncMock( + side_effect=[ + [{"file-path": file1.filename}], + [{"file-path": file1.filename}], + ] + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -311,10 +314,9 @@ async def test_use_of_cache(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(return_value=[ - {"file-path": file1.filename}, - {"file-path": file2.filename} - ]) + servicex.get_transformation_results = AsyncMock( + return_value=[{"file-path": file1.filename}, {"file-path": file2.filename}] + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -616,10 +618,9 @@ async def test_use_of_ignore_cache(mocker, servicex): transform_status3, ] ) - servicex.get_transformation_results = AsyncMock(return_value=[ - {"file-path": file1.filename}, - {"file-path": file2.filename} - ]) + servicex.get_transformation_results = AsyncMock( + return_value=[{"file-path": file1.filename}, {"file-path": file2.filename}] + ) # Prepare Minio mock_minio = AsyncMock() mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) From b80d72d9ebf45c4ab4a4e48f9b6d87a8575f3dd2 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 14 May 2025 14:54:09 -0500 Subject: [PATCH 05/11] use python3.9 compliant method to get UTC time --- servicex/query_core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 2b816f99..2f2b8547 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -319,7 +319,7 @@ def transform_complete(task: Task): else None ) - begin_at = datetime.datetime.now(datetime.UTC) + begin_at = datetime.datetime.now(tz=datetime.timezone.utc) if not cached_record: if self.cache.is_transform_request_submitted(sx_request_hash): @@ -562,7 +562,7 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - new_begin_at = datetime.datetime.now(datetime.UTC) + new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc) files = await self.servicex.get_transformation_results( self.current_status.request_id, begin_at From 36cb4aabe26bac7207df46423b082ef1342c8119 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 14 May 2025 17:01:58 -0500 Subject: [PATCH 06/11] fix breaking tests --- tests/test_dataset.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 2ef18283..ef09c0d7 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -28,6 +28,7 @@ import pytest import tempfile import os +import datetime from unittest.mock import AsyncMock, Mock, patch from servicex.dataset_identifier import FileListDataset @@ -120,16 +121,21 @@ async def test_as_files_cached(transformed_result, python_dataset): @pytest.mark.asyncio async def test_download_files(python_dataset): signed_urls_only = False + begin_at = datetime.datetime.now(tz=datetime.timezone.utc) download_progress = "download_task_id" minio_mock = AsyncMock() config = Configuration(cache_path="temp_dir", api_endpoints=[]) python_dataset.configuration = config + python_dataset.servicex = AsyncMock() + python_dataset.servicex.get_transformation_results = AsyncMock( + side_effect=[ + [{"file-path": "file1.txt"}], + [{"file-path": "file2.txt"}], + ] + ) + minio_mock.download_file.return_value = Path("/path/to/downloaded_file") minio_mock.get_signed_url.return_value = Path("http://example.com/signed_url") - minio_mock.list_bucket.return_value = [ - Mock(filename="file1.txt"), - Mock(filename="file2.txt"), - ] progress_mock = Mock() python_dataset.minio_polling_interval = 0 @@ -138,7 +144,7 @@ async def test_download_files(python_dataset): python_dataset.configuration.shortened_downloaded_filename = False result_uris = await python_dataset.download_files( - signed_urls_only, progress_mock, download_progress, None + signed_urls_only, progress_mock, download_progress, None, begin_at ) minio_mock.download_file.assert_awaited() minio_mock.get_signed_url.assert_not_awaited() @@ -148,25 +154,29 @@ async def test_download_files(python_dataset): @pytest.mark.asyncio async def test_download_files_with_signed_urls(python_dataset): signed_urls_only = True + begin_at = datetime.datetime.now(tz=datetime.timezone.utc) download_progress = "download_task_id" minio_mock = AsyncMock() config = Configuration(cache_path="temp_dir", api_endpoints=[]) python_dataset.configuration = config minio_mock.download_file.return_value = "/path/to/downloaded_file" minio_mock.get_signed_url.return_value = "http://example.com/signed_url" - minio_mock.list_bucket.return_value = [ - Mock(filename="file1.txt"), - Mock(filename="file2.txt"), - ] progress_mock = Mock() + python_dataset.servicex = AsyncMock() + python_dataset.servicex.get_transformation_results = AsyncMock( + side_effect=[ + [{"file-path": "file1.txt"}], + [{"file-path": "file2.txt"}], + ] + ) python_dataset.minio_polling_interval = 0 python_dataset.minio = minio_mock python_dataset.current_status = Mock(status="Complete", files_completed=2) python_dataset.configuration.shortened_downloaded_filename = False result_uris = await python_dataset.download_files( - signed_urls_only, progress_mock, download_progress, None + signed_urls_only, progress_mock, download_progress, None, begin_at ) minio_mock.download_file.assert_not_called() minio_mock.get_signed_url.assert_called() From a11eb98d2b5e19ab3dadf92567a518ae534ece27 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 15 May 2025 15:23:16 -0500 Subject: [PATCH 07/11] add supported server resources checks --- servicex/query_core.py | 25 +++++--- servicex/servicex_adapter.py | 116 ++++++++++++++++++++++++++++++++++- 2 files changed, 131 insertions(+), 10 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index e8ee2c36..ab2ca443 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -560,23 +560,30 @@ async def get_signed_url( if progress: progress.advance(task_id=download_progress, task_type="Download") + transformation_results_enabled = "transformationresults" in await self.servicex.get_resources() + while True: if not cached_record: await asyncio.sleep(self.minio_polling_interval) if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc) - files = await self.servicex.get_transformation_results( - self.current_status.request_id, begin_at - ) - begin_at = new_begin_at + if transformation_results_enabled: + new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc) + files = await self.servicex.get_transformation_results( + self.current_status.request_id, begin_at + ) + begin_at = new_begin_at + else: + files = await self.minio.list_bucket() for file in files: - if "file-path" not in file: - continue - - file_path = file["file-path"].replace("/", ":") + if transformation_results_enabled: + if "file-path" not in file: + continue + file_path = file.get("file-path", '').replace("/", ":") + else: + file_path = file.filename if file_path not in files_seen: if signed_urls_only: diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 0ce0e67f..a3380bb3 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -28,7 +28,9 @@ import os import time import datetime -from typing import Optional, Dict, List +import asyncio +from typing import Optional, Dict, List, Any, TypeVar, Callable, cast +from functools import wraps from aiohttp import ClientSession import httpx @@ -44,6 +46,78 @@ from servicex.models import TransformRequest, TransformStatus, CachedDataset +T = TypeVar('T') + +def requires_resource(resource_name: str) -> Callable[[Callable[..., T]], Callable[..., T]]: + """ + Decorator to check if a specific API resource is available on the server before executing the method. + + Args: + resource_name: The name of the resource that needs to be available + + Returns: + A decorator function that wraps around class methods + + Raises: + ResourceNotAvailableError: If the required resource is not available on the server + """ + + def decorator(func: Callable[..., T]) -> Callable[..., T]: + # Determine if function is async at decoration time (not runtime) + is_async = asyncio.iscoroutinefunction(func) + func_name = func.__name__ + + # Class-level cache for sync method resources + sync_cache_key = f'_sync_resources_for_{resource_name}' + + if is_async: + @wraps(func) + async def async_wrapper(self, *args: Any, **kwargs: Any) -> T: + # Get resources and check availability in one operation + if resource_name not in await self.get_resources(): + raise ResourceNotAvailableError( + f"Resource '{resource_name}' required for '{func_name}' is unavailable" + ) + return await func(self, *args, **kwargs) + + return cast(Callable[..., T], async_wrapper) + else: + @wraps(func) + def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: + # Initialize class-level cache attributes if needed + cls = self.__class__ + if not hasattr(cls, sync_cache_key): + setattr(cls, sync_cache_key, (None, 0)) # (resources, timestamp) + + cache_ttl = getattr(self, '_resources_cache_ttl', 300) + cached_resources, timestamp = getattr(cls, sync_cache_key) + current_time = time.time() + + # Check if cache needs refresh + if cached_resources is None or (current_time - timestamp) >= cache_ttl: + loop = asyncio.new_event_loop() + try: + cached_resources = loop.run_until_complete(self.get_resources()) + setattr(cls, sync_cache_key, (cached_resources, current_time)) + finally: + loop.close() + + # Check resource availability + if resource_name not in cached_resources: + raise ResourceNotAvailableError( + f"Resource '{resource_name}' required for '{func_name}' is unavailable" + ) + + return func(self, *args, **kwargs) + + return cast(Callable[..., T], sync_wrapper) + + return decorator + + +class ResourceNotAvailableError(Exception): + """Exception raised when a required resource is not available on the server.""" + pass class AuthorizationError(BaseException): pass @@ -64,6 +138,45 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): self.refresh_token = refresh_token self.token = None + self._available_resources: Optional[Dict[str, Any]] = None + self._resources_last_updated: Optional[float] = None + self._resources_cache_ttl = 60*5 + + async def get_resources(self) -> Dict[str, Any]: + """ + Fetches the list of available resources from the server. + Caches the result for 5 minutes to avoid excessive API calls. + + Returns: + A dictionary of available resources with their properties + """ + current_time = time.time() + + # Return cached resources if they exist and are not expired + if (self._available_resources is not None and + self._resources_last_updated is not None and + current_time - self._resources_last_updated < self._resources_cache_ttl): + return self._available_resources + + # Fetch resources from server + headers = await self._get_authorization() + async with ClientSession() as session: + async with session.get( + headers=headers, url=f"{self.url}/servicex/resources" + ) as r: + if r.status == 403: + raise AuthorizationError( + f"Not authorized to access serviceX at {self.url}" + ) + elif r.status != 200: + msg = await _extract_message(r) + raise RuntimeError(f"Failed to get resources: {r.status} - {msg}") + + self._available_resources = await r.json() + self._resources_last_updated = current_time + + return self._available_resources + async def _get_token(self): url = f"{self.url}/token/refresh" headers = {"Authorization": f"Bearer {self.refresh_token}"} @@ -229,6 +342,7 @@ async def delete_transform(self, transform_id=None): f"Failed to delete transform {transform_id} - {msg}" ) + @requires_resource("transformationresults") async def get_transformation_results( self, request_id: str, begin_at: datetime.datetime ): From 9db76ad871176fe807044145c5cc8f5d8ada6c57 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 15 May 2025 20:28:32 +0000 Subject: [PATCH 08/11] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/query_core.py | 6 ++++-- servicex/servicex_adapter.py | 27 ++++++++++++++++++--------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index ab2ca443..edcf39cd 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -560,7 +560,9 @@ async def get_signed_url( if progress: progress.advance(task_id=download_progress, task_type="Download") - transformation_results_enabled = "transformationresults" in await self.servicex.get_resources() + transformation_results_enabled = ( + "transformationresults" in await self.servicex.get_resources() + ) while True: if not cached_record: @@ -581,7 +583,7 @@ async def get_signed_url( if transformation_results_enabled: if "file-path" not in file: continue - file_path = file.get("file-path", '').replace("/", ":") + file_path = file.get("file-path", "").replace("/", ":") else: file_path = file.filename diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index a3380bb3..7fb4fb20 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -46,9 +46,12 @@ from servicex.models import TransformRequest, TransformStatus, CachedDataset -T = TypeVar('T') +T = TypeVar("T") -def requires_resource(resource_name: str) -> Callable[[Callable[..., T]], Callable[..., T]]: + +def requires_resource( + resource_name: str, +) -> Callable[[Callable[..., T]], Callable[..., T]]: """ Decorator to check if a specific API resource is available on the server before executing the method. @@ -68,9 +71,10 @@ def decorator(func: Callable[..., T]) -> Callable[..., T]: func_name = func.__name__ # Class-level cache for sync method resources - sync_cache_key = f'_sync_resources_for_{resource_name}' + sync_cache_key = f"_sync_resources_for_{resource_name}" if is_async: + @wraps(func) async def async_wrapper(self, *args: Any, **kwargs: Any) -> T: # Get resources and check availability in one operation @@ -82,6 +86,7 @@ async def async_wrapper(self, *args: Any, **kwargs: Any) -> T: return cast(Callable[..., T], async_wrapper) else: + @wraps(func) def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: # Initialize class-level cache attributes if needed @@ -89,7 +94,7 @@ def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: if not hasattr(cls, sync_cache_key): setattr(cls, sync_cache_key, (None, 0)) # (resources, timestamp) - cache_ttl = getattr(self, '_resources_cache_ttl', 300) + cache_ttl = getattr(self, "_resources_cache_ttl", 300) cached_resources, timestamp = getattr(cls, sync_cache_key) current_time = time.time() @@ -117,8 +122,10 @@ def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: class ResourceNotAvailableError(Exception): """Exception raised when a required resource is not available on the server.""" + pass + class AuthorizationError(BaseException): pass @@ -140,7 +147,7 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): self._available_resources: Optional[Dict[str, Any]] = None self._resources_last_updated: Optional[float] = None - self._resources_cache_ttl = 60*5 + self._resources_cache_ttl = 60 * 5 async def get_resources(self) -> Dict[str, Any]: """ @@ -153,16 +160,18 @@ async def get_resources(self) -> Dict[str, Any]: current_time = time.time() # Return cached resources if they exist and are not expired - if (self._available_resources is not None and - self._resources_last_updated is not None and - current_time - self._resources_last_updated < self._resources_cache_ttl): + if ( + self._available_resources is not None + and self._resources_last_updated is not None + and current_time - self._resources_last_updated < self._resources_cache_ttl + ): return self._available_resources # Fetch resources from server headers = await self._get_authorization() async with ClientSession() as session: async with session.get( - headers=headers, url=f"{self.url}/servicex/resources" + headers=headers, url=f"{self.url}/servicex/resources" ) as r: if r.status == 403: raise AuthorizationError( From 4ff0b34f73d8a6a8ed569391fe7a6a11aa7ef634 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 15 May 2025 15:31:13 -0500 Subject: [PATCH 09/11] flake8 compliance --- servicex/servicex_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 7fb4fb20..ac8c23d2 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -53,7 +53,7 @@ def requires_resource( resource_name: str, ) -> Callable[[Callable[..., T]], Callable[..., T]]: """ - Decorator to check if a specific API resource is available on the server before executing the method. + Decorator to check if a specific API resource is available on the server. Args: resource_name: The name of the resource that needs to be available From 5eaaad4dc6806879b3e380bf86f1d78163cc126c Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Fri, 16 May 2025 10:41:20 -0500 Subject: [PATCH 10/11] fix breaking tests --- tests/test_dataset.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index ef09c0d7..39e6b4c9 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -136,6 +136,10 @@ async def test_download_files(python_dataset): minio_mock.download_file.return_value = Path("/path/to/downloaded_file") minio_mock.get_signed_url.return_value = Path("http://example.com/signed_url") + minio_mock.list_bucket.return_value = [ + Mock(filename="file1.txt"), + Mock(filename="file2.txt"), + ] progress_mock = Mock() python_dataset.minio_polling_interval = 0 @@ -161,6 +165,10 @@ async def test_download_files_with_signed_urls(python_dataset): python_dataset.configuration = config minio_mock.download_file.return_value = "/path/to/downloaded_file" minio_mock.get_signed_url.return_value = "http://example.com/signed_url" + minio_mock.list_bucket.return_value = [ + Mock(filename="file1.txt"), + Mock(filename="file2.txt"), + ] progress_mock = Mock() python_dataset.servicex = AsyncMock() From db04cef183116119cb09103eb82f439cdf3897c8 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 20 May 2025 10:08:08 -0500 Subject: [PATCH 11/11] resolve breaking tests --- tests/test_servicex_dataset.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 45519e69..6956378a 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -226,6 +226,7 @@ async def test_submit(mocker): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -278,6 +279,7 @@ async def test_submit_partial_success(mocker): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -323,6 +325,7 @@ async def test_use_of_cache(mocker): transform_status3, ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -357,6 +360,7 @@ async def test_use_of_cache(mocker): # second round, should hit the cache (and not call the sx_adapter, minio, or update_record) with ExpandableProgress(display_progress=False) as progress: servicex2 = AsyncMock() + mock_minio.list_bucket.reset_mock() servicex.get_transformation_results.reset_mock() mock_minio.get_signed_url.reset_mock() datasource2 = Query( @@ -375,6 +379,7 @@ async def test_use_of_cache(mocker): signed_urls_only=True, expandable_progress=progress ) servicex2.assert_not_awaited() + mock_minio.list_bucket.assert_not_awaited() servicex.get_transformation_results.assert_not_awaited() mock_minio.get_signed_url.assert_not_awaited() upd.assert_not_called() @@ -399,6 +404,7 @@ async def test_use_of_cache(mocker): signed_urls_only=False, expandable_progress=progress ) servicex.assert_not_awaited() + mock_minio.list_bucket.assert_not_awaited() servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() upd.assert_called_once() @@ -417,6 +423,7 @@ async def test_submit_cancel(mocker): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -457,6 +464,7 @@ async def test_submit_fatal(mocker): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -501,6 +509,7 @@ async def test_submit_generic(mocker, codegen_list): ] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock() mock_cache = mocker.MagicMock(QueryCache) @@ -549,6 +558,7 @@ async def test_submit_cancelled(mocker, codegen_list): sx.get_transform_status.side_effect = [transform_status4] mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock() mock_cache = mocker.MagicMock(QueryCache) @@ -623,6 +633,7 @@ async def test_use_of_ignore_cache(mocker, servicex): ) # Prepare Minio mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) did = FileListDataset("/foo/bar/baz.root") @@ -698,6 +709,7 @@ async def test_use_of_ignore_cache(mocker, servicex): res = await datasource_without_ignore_cache.submit_and_download( signed_urls_only=True, expandable_progress=progress ) # noqa + mock_minio.list_bucket.assert_not_awaited() servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() assert len(res.signed_url_list) == 2