From 90b61735ddfe2bbf10559b6d8448cca8bcaabbe3 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 6 May 2025 15:50:02 -0500 Subject: [PATCH 01/35] remove s3 bucket polling when waiting for transformation results --- servicex/query_core.py | 17 +++++++++++------ servicex/servicex_adapter.py | 23 ++++++++++++++++++++++- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index ac4f5f09..b631c840 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -522,6 +522,7 @@ async def download_files( Task to monitor the list of files in the transform output's bucket. Any new files will be downloaded. """ + files_seen = set() result_uris = [] download_tasks = [] @@ -557,15 +558,19 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - files = await self.minio.list_bucket() + files = await self.servicex.get_transformation_results(self.current_status.request_id) + for file in files: - if file.filename not in files_seen: + if 'file-path' not in file: + continue + + file_path = file['file-path'].replace('/', ':') + if file_path not in files_seen: if signed_urls_only: download_tasks.append( loop.create_task( get_signed_url( - self.minio, - file.filename, + file_path, progress, download_progress, ) @@ -576,14 +581,14 @@ async def get_signed_url( loop.create_task( download_file( self.minio, - file.filename, + file_path, progress, download_progress, shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 ) ) ) # NOQA 501 - files_seen.add(file.filename) + files_seen.add(file_path) # Once the transform is complete and all files are seen we can stop polling. # Also, if we are just downloading or signing urls for a previous transform diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index baa7b5b0..5779d5da 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -228,6 +228,28 @@ async def delete_transform(self, transform_id=None): f"Failed to delete transform {transform_id} - {msg}" ) + async def get_transformation_results(self, request_id: str): + headers = await self._get_authorization() + url = self.url + f'/servicex/internal/transformation/{request_id}/results' + + async with ClientSession() as session: + async with session.get(headers=headers, url=url) as r: + if r.status == 403: + raise AuthorizationError( + f"Not authorized to access serviceX at {self.url}" + ) + + if r.status == 404: + raise ValueError(f"Request {request_id} not found") + + if r.status != 200: + msg = await _extract_message(r) + raise RuntimeError( + f"Failed with message: {msg}" + ) + + return (await r.json())['results'] + async def cancel_transform(self, transform_id=None): headers = await self._get_authorization() path_template = f"/servicex/transformation/{transform_id}/cancel" @@ -235,7 +257,6 @@ async def cancel_transform(self, transform_id=None): async with ClientSession() as session: async with session.get(headers=headers, url=url) as r: - if r.status == 403: raise AuthorizationError( f"Not authorized to access serviceX at {self.url}" From ee7b02c2a04ae0cd96798da0238fd7565158bb09 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 6 May 2025 20:51:44 +0000 Subject: [PATCH 02/35] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/query_core.py | 8 +++++--- servicex/servicex_adapter.py | 8 +++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index b631c840..bba338c5 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -558,13 +558,15 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - files = await self.servicex.get_transformation_results(self.current_status.request_id) + files = await self.servicex.get_transformation_results( + self.current_status.request_id + ) for file in files: - if 'file-path' not in file: + if "file-path" not in file: continue - file_path = file['file-path'].replace('/', ':') + file_path = file["file-path"].replace("/", ":") if file_path not in files_seen: if signed_urls_only: download_tasks.append( diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 5779d5da..910fb067 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -230,7 +230,7 @@ async def delete_transform(self, transform_id=None): async def get_transformation_results(self, request_id: str): headers = await self._get_authorization() - url = self.url + f'/servicex/internal/transformation/{request_id}/results' + url = self.url + f"/servicex/internal/transformation/{request_id}/results" async with ClientSession() as session: async with session.get(headers=headers, url=url) as r: @@ -244,11 +244,9 @@ async def get_transformation_results(self, request_id: str): if r.status != 200: msg = await _extract_message(r) - raise RuntimeError( - f"Failed with message: {msg}" - ) + raise RuntimeError(f"Failed with message: {msg}") - return (await r.json())['results'] + return (await r.json())["results"] async def cancel_transform(self, transform_id=None): headers = await self._get_authorization() From 5b04365ed7bdf0fa80708733730e8305229ef9db Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 14 May 2025 14:40:51 -0500 Subject: [PATCH 03/35] add begin_at, update tests --- servicex/query_core.py | 13 +++++++-- servicex/servicex_adapter.py | 14 +++++++--- tests/test_servicex_dataset.py | 49 +++++++++++++++++++++++----------- 3 files changed, 54 insertions(+), 22 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index bba338c5..2b816f99 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -27,6 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from __future__ import annotations +import datetime import abc import asyncio from abc import ABC @@ -318,6 +319,7 @@ def transform_complete(task: Task): else None ) + begin_at = datetime.datetime.now(datetime.UTC) if not cached_record: if self.cache.is_transform_request_submitted(sx_request_hash): @@ -342,13 +344,14 @@ def transform_complete(task: Task): download_files_task = loop.create_task( self.download_files( - signed_urls_only, expandable_progress, download_progress, cached_record + signed_urls_only, expandable_progress, download_progress, cached_record, begin_at ) ) try: signed_urls = [] downloaded_files = [] + download_result = await download_files_task if signed_urls_only: signed_urls = download_result @@ -517,6 +520,7 @@ async def download_files( progress: ExpandableProgress, download_progress: TaskID, cached_record: Optional[TransformedResults], + begin_at: datetime.datetime, ) -> List[str]: """ Task to monitor the list of files in the transform output's bucket. Any new files @@ -558,20 +562,25 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): + new_begin_at = datetime.datetime.now(datetime.UTC) files = await self.servicex.get_transformation_results( - self.current_status.request_id + self.current_status.request_id, + begin_at ) + begin_at = new_begin_at for file in files: if "file-path" not in file: continue file_path = file["file-path"].replace("/", ":") + if file_path not in files_seen: if signed_urls_only: download_tasks.append( loop.create_task( get_signed_url( + self.minio, file_path, progress, download_progress, diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 910fb067..231a5216 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -27,6 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import os import time +import datetime from typing import Optional, Dict, List from aiohttp import ClientSession @@ -228,12 +229,16 @@ async def delete_transform(self, transform_id=None): f"Failed to delete transform {transform_id} - {msg}" ) - async def get_transformation_results(self, request_id: str): + async def get_transformation_results(self, request_id: str, begin_at: datetime.datetime): headers = await self._get_authorization() - url = self.url + f"/servicex/internal/transformation/{request_id}/results" + url = self.url + f"/servicex/transformation/{request_id}/results" + + params = {} + if begin_at: + params["begin_at"] = begin_at.isoformat() async with ClientSession() as session: - async with session.get(headers=headers, url=url) as r: + async with session.get(headers=headers, url=url, params=params) as r: if r.status == 403: raise AuthorizationError( f"Not authorized to access serviceX at {self.url}" @@ -246,7 +251,8 @@ async def get_transformation_results(self, request_id: str): msg = await _extract_message(r) raise RuntimeError(f"Failed with message: {msg}") - return (await r.json())["results"] + data = await r.json() + return data.get("results") async def cancel_transform(self, transform_id=None): headers = await self._get_authorization() diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 628dc2e0..f489954f 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -204,8 +204,18 @@ def cache_transform(record: TransformedResults): @pytest.mark.asyncio async def test_submit(mocker): servicex = AsyncMock() + servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + + servicex.get_transformation_results = AsyncMock(side_effect=[ + [{"file-path": file1.filename}], + [ + {"file-path": file1.filename}, + {"file-path": file2.filename}, + ], + ]) + servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -214,7 +224,6 @@ async def test_submit(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -235,6 +244,7 @@ async def test_submit(mocker): config=Configuration(api_endpoints=[]), ) datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") + with ExpandableProgress(display_progress=False) as progress: datasource.result_format = ResultFormat.parquet result = await datasource.submit_and_download( @@ -245,11 +255,18 @@ async def test_submit(mocker): mock_cache.cache_transform.assert_called_once() + @pytest.mark.asyncio async def test_submit_partial_success(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + + servicex.get_transformation_results = AsyncMock(side_effect=[ + [{"file-path": file1.filename}], + [{"file-path": file1.filename}], + ]) + servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -258,7 +275,6 @@ async def test_submit_partial_success(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -295,13 +311,16 @@ async def test_use_of_cache(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + servicex.get_transformation_results = AsyncMock(return_value=[ + {"file-path": file1.filename}, + {"file-path": file2.filename} + ]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, transform_status3, ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -336,7 +355,7 @@ async def test_use_of_cache(mocker): # second round, should hit the cache (and not call the sx_adapter, minio, or update_record) with ExpandableProgress(display_progress=False) as progress: servicex2 = AsyncMock() - mock_minio.list_bucket.reset_mock() + servicex.get_transformation_results.reset_mock() mock_minio.get_signed_url.reset_mock() datasource2 = Query( dataset_identifier=did, @@ -354,14 +373,14 @@ async def test_use_of_cache(mocker): signed_urls_only=True, expandable_progress=progress ) servicex2.assert_not_awaited() - mock_minio.list_bucket.assert_not_awaited() + servicex.get_transformation_results.assert_not_awaited() mock_minio.get_signed_url.assert_not_awaited() upd.assert_not_called() assert result1 == result2 upd.reset_mock() servicex.get_transform_status.reset_mock(side_effect=True) servicex.get_transform_status.return_value = transform_status3 - mock_minio.list_bucket.reset_mock(side_effect=True) + servicex.get_transformation_results.reset_mock(side_effect=True) # third round, should hit the cache and download files (and call update_record) with ExpandableProgress(display_progress=False) as progress: await datasource.submit_and_download( @@ -371,14 +390,14 @@ async def test_use_of_cache(mocker): assert mock_minio.download_file.await_count == 2 upd.assert_called_once() # fourth round, should hit the cache (and nothing else) - mock_minio.list_bucket.reset_mock() + servicex.get_transformation_results.reset_mock() mock_minio.download_file.reset_mock() with ExpandableProgress(display_progress=False) as progress: await datasource.submit_and_download( signed_urls_only=False, expandable_progress=progress ) servicex.assert_not_awaited() - mock_minio.list_bucket.assert_not_awaited() + servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() upd.assert_called_once() cache.close() @@ -396,7 +415,6 @@ async def test_submit_cancel(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -437,7 +455,6 @@ async def test_submit_fatal(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) @@ -482,7 +499,6 @@ async def test_submit_generic(mocker, codegen_list): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock() mock_cache = mocker.MagicMock(QueryCache) @@ -531,7 +547,6 @@ async def test_submit_cancelled(mocker, codegen_list): sx.get_transform_status.side_effect = [transform_status4] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock() mock_cache = mocker.MagicMock(QueryCache) @@ -601,10 +616,12 @@ async def test_use_of_ignore_cache(mocker, servicex): transform_status3, ] ) - + servicex.get_transformation_results = AsyncMock(return_value=[ + {"file-path": file1.filename}, + {"file-path": file2.filename} + ]) # Prepare Minio mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) did = FileListDataset("/foo/bar/baz.root") @@ -674,13 +691,13 @@ async def test_use_of_ignore_cache(mocker, servicex): transform_status1, transform_status3, ] - mock_minio.list_bucket.reset_mock() + servicex.get_transformation_results.reset_mock() mock_minio.download_file.reset_mock() with ExpandableProgress(display_progress=False) as progress: res = await datasource_without_ignore_cache.submit_and_download( signed_urls_only=True, expandable_progress=progress ) # noqa - mock_minio.list_bucket.assert_not_awaited() + servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() assert len(res.signed_url_list) == 2 cache.close() From 85b53466c094b0530299d7663c6d0f14cba504bf Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 14 May 2025 19:41:11 +0000 Subject: [PATCH 04/35] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/query_core.py | 9 +++++--- servicex/servicex_adapter.py | 4 +++- tests/test_servicex_dataset.py | 41 +++++++++++++++++----------------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 2b816f99..cfc4eb4d 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -344,7 +344,11 @@ def transform_complete(task: Task): download_files_task = loop.create_task( self.download_files( - signed_urls_only, expandable_progress, download_progress, cached_record, begin_at + signed_urls_only, + expandable_progress, + download_progress, + cached_record, + begin_at, ) ) @@ -564,8 +568,7 @@ async def get_signed_url( if self.current_status.files_completed > len(files_seen): new_begin_at = datetime.datetime.now(datetime.UTC) files = await self.servicex.get_transformation_results( - self.current_status.request_id, - begin_at + self.current_status.request_id, begin_at ) begin_at = new_begin_at diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 231a5216..0ce0e67f 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -229,7 +229,9 @@ async def delete_transform(self, transform_id=None): f"Failed to delete transform {transform_id} - {msg}" ) - async def get_transformation_results(self, request_id: str, begin_at: datetime.datetime): + async def get_transformation_results( + self, request_id: str, begin_at: datetime.datetime + ): headers = await self._get_authorization() url = self.url + f"/servicex/transformation/{request_id}/results" diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index f489954f..45519e69 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -208,13 +208,15 @@ async def test_submit(mocker): servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(side_effect=[ - [{"file-path": file1.filename}], - [ - {"file-path": file1.filename}, - {"file-path": file2.filename}, - ], - ]) + servicex.get_transformation_results = AsyncMock( + side_effect=[ + [{"file-path": file1.filename}], + [ + {"file-path": file1.filename}, + {"file-path": file2.filename}, + ], + ] + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -255,17 +257,18 @@ async def test_submit(mocker): mock_cache.cache_transform.assert_called_once() - @pytest.mark.asyncio async def test_submit_partial_success(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(side_effect=[ - [{"file-path": file1.filename}], - [{"file-path": file1.filename}], - ]) + servicex.get_transformation_results = AsyncMock( + side_effect=[ + [{"file-path": file1.filename}], + [{"file-path": file1.filename}], + ] + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -311,10 +314,9 @@ async def test_use_of_cache(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(return_value=[ - {"file-path": file1.filename}, - {"file-path": file2.filename} - ]) + servicex.get_transformation_results = AsyncMock( + return_value=[{"file-path": file1.filename}, {"file-path": file2.filename}] + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -616,10 +618,9 @@ async def test_use_of_ignore_cache(mocker, servicex): transform_status3, ] ) - servicex.get_transformation_results = AsyncMock(return_value=[ - {"file-path": file1.filename}, - {"file-path": file2.filename} - ]) + servicex.get_transformation_results = AsyncMock( + return_value=[{"file-path": file1.filename}, {"file-path": file2.filename}] + ) # Prepare Minio mock_minio = AsyncMock() mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) From b80d72d9ebf45c4ab4a4e48f9b6d87a8575f3dd2 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 14 May 2025 14:54:09 -0500 Subject: [PATCH 05/35] use python3.9 compliant method to get UTC time --- servicex/query_core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 2b816f99..2f2b8547 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -319,7 +319,7 @@ def transform_complete(task: Task): else None ) - begin_at = datetime.datetime.now(datetime.UTC) + begin_at = datetime.datetime.now(tz=datetime.timezone.utc) if not cached_record: if self.cache.is_transform_request_submitted(sx_request_hash): @@ -562,7 +562,7 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - new_begin_at = datetime.datetime.now(datetime.UTC) + new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc) files = await self.servicex.get_transformation_results( self.current_status.request_id, begin_at From 36cb4aabe26bac7207df46423b082ef1342c8119 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 14 May 2025 17:01:58 -0500 Subject: [PATCH 06/35] fix breaking tests --- tests/test_dataset.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 2ef18283..ef09c0d7 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -28,6 +28,7 @@ import pytest import tempfile import os +import datetime from unittest.mock import AsyncMock, Mock, patch from servicex.dataset_identifier import FileListDataset @@ -120,16 +121,21 @@ async def test_as_files_cached(transformed_result, python_dataset): @pytest.mark.asyncio async def test_download_files(python_dataset): signed_urls_only = False + begin_at = datetime.datetime.now(tz=datetime.timezone.utc) download_progress = "download_task_id" minio_mock = AsyncMock() config = Configuration(cache_path="temp_dir", api_endpoints=[]) python_dataset.configuration = config + python_dataset.servicex = AsyncMock() + python_dataset.servicex.get_transformation_results = AsyncMock( + side_effect=[ + [{"file-path": "file1.txt"}], + [{"file-path": "file2.txt"}], + ] + ) + minio_mock.download_file.return_value = Path("/path/to/downloaded_file") minio_mock.get_signed_url.return_value = Path("http://example.com/signed_url") - minio_mock.list_bucket.return_value = [ - Mock(filename="file1.txt"), - Mock(filename="file2.txt"), - ] progress_mock = Mock() python_dataset.minio_polling_interval = 0 @@ -138,7 +144,7 @@ async def test_download_files(python_dataset): python_dataset.configuration.shortened_downloaded_filename = False result_uris = await python_dataset.download_files( - signed_urls_only, progress_mock, download_progress, None + signed_urls_only, progress_mock, download_progress, None, begin_at ) minio_mock.download_file.assert_awaited() minio_mock.get_signed_url.assert_not_awaited() @@ -148,25 +154,29 @@ async def test_download_files(python_dataset): @pytest.mark.asyncio async def test_download_files_with_signed_urls(python_dataset): signed_urls_only = True + begin_at = datetime.datetime.now(tz=datetime.timezone.utc) download_progress = "download_task_id" minio_mock = AsyncMock() config = Configuration(cache_path="temp_dir", api_endpoints=[]) python_dataset.configuration = config minio_mock.download_file.return_value = "/path/to/downloaded_file" minio_mock.get_signed_url.return_value = "http://example.com/signed_url" - minio_mock.list_bucket.return_value = [ - Mock(filename="file1.txt"), - Mock(filename="file2.txt"), - ] progress_mock = Mock() + python_dataset.servicex = AsyncMock() + python_dataset.servicex.get_transformation_results = AsyncMock( + side_effect=[ + [{"file-path": "file1.txt"}], + [{"file-path": "file2.txt"}], + ] + ) python_dataset.minio_polling_interval = 0 python_dataset.minio = minio_mock python_dataset.current_status = Mock(status="Complete", files_completed=2) python_dataset.configuration.shortened_downloaded_filename = False result_uris = await python_dataset.download_files( - signed_urls_only, progress_mock, download_progress, None + signed_urls_only, progress_mock, download_progress, None, begin_at ) minio_mock.download_file.assert_not_called() minio_mock.get_signed_url.assert_called() From eb9a937405013174872e9920b0a2b92e524ed7ff Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 20 May 2025 16:04:42 -0500 Subject: [PATCH 07/35] add additional test coverage support --- servicex/servicex_adapter.py | 7 ++-- tests/test_servicex_adapter.py | 73 ++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 0ce0e67f..b5a06f0e 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -234,10 +234,9 @@ async def get_transformation_results( ): headers = await self._get_authorization() url = self.url + f"/servicex/transformation/{request_id}/results" - - params = {} - if begin_at: - params["begin_at"] = begin_at.isoformat() + params = { + "begin_at": begin_at.isoformat(), + } async with ClientSession() as session: async with session.get(headers=headers, url=url, params=params) as r: diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 99d8f5c0..8043face 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -28,6 +28,8 @@ import os import tempfile import time +import datetime +import unittest from unittest.mock import patch import httpx @@ -502,3 +504,74 @@ async def test_get_authorization(servicex): with patch("google.auth.jwt.decode", return_value={"exp": time.time() - 90}): r = await servicex._get_authorization() get_token.assert_called_once() + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.ClientSession.get") +async def test_get_transformation_results_success(get_transformation_results, servicex): + get_transformation_results.return_value.__aenter__.return_value.status = 200 + + request_id = "123-45-6789" + now = datetime.datetime.now(datetime.timezone.utc) + await servicex.get_transformation_results(request_id, now) + + get_transformation_results.assert_called_with( + url=f"https://servicex.org/servicex/transformation/{request_id}/results", + headers={}, + params={ + "begin_at": now.isoformat(), + } + ) + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.ClientSession.get") +async def test_get_transformation_results_not_found(get_transformation_results, servicex): + get_transformation_results.return_value.__aenter__.return_value.status = 404 + request_id = "123-45-6789" + now = datetime.datetime.now(datetime.timezone.utc) + + with pytest.raises(ValueError): + await servicex.get_transformation_results(request_id, now) + + get_transformation_results.assert_called_with( + url=f"https://servicex.org/servicex/transformation/{request_id}/results", + headers={}, + params={ + "begin_at": now.isoformat(), + } + ) + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.ClientSession.get") +async def test_get_transformation_results_not_authorized(get_transformation_results, servicex): + get_transformation_results.return_value.__aenter__.return_value.status = 403 + request_id = "123-45-6789" + now = datetime.datetime.now(datetime.timezone.utc) + + with pytest.raises(AuthorizationError): + await servicex.get_transformation_results(request_id, now) + + get_transformation_results.assert_called_with( + url=f"https://servicex.org/servicex/transformation/{request_id}/results", + headers={}, + params={ + "begin_at": now.isoformat(), + } + ) + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.ClientSession.get") +async def test_get_transformation_results_server_error(get_transformation_results, servicex): + get_transformation_results.return_value.__aenter__.return_value.status = 500 + request_id = "123-45-6789" + now = datetime.datetime.now(datetime.timezone.utc) + + with pytest.raises(RuntimeError): + await servicex.get_transformation_results(request_id, now) + + get_transformation_results.assert_called_with( + url=f"https://servicex.org/servicex/transformation/{request_id}/results", + headers={}, + params={ + "begin_at": now.isoformat(), + } + ) From bf805bb92cc59ed8fc0c6c89e5a51dffc76df492 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 20 May 2025 21:05:09 +0000 Subject: [PATCH 08/35] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_servicex_adapter.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 8043face..8918f384 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -505,6 +505,7 @@ async def test_get_authorization(servicex): r = await servicex._get_authorization() get_token.assert_called_once() + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") async def test_get_transformation_results_success(get_transformation_results, servicex): @@ -519,12 +520,15 @@ async def test_get_transformation_results_success(get_transformation_results, se headers={}, params={ "begin_at": now.isoformat(), - } + }, ) + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") -async def test_get_transformation_results_not_found(get_transformation_results, servicex): +async def test_get_transformation_results_not_found( + get_transformation_results, servicex +): get_transformation_results.return_value.__aenter__.return_value.status = 404 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -537,12 +541,15 @@ async def test_get_transformation_results_not_found(get_transformation_results, headers={}, params={ "begin_at": now.isoformat(), - } + }, ) + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") -async def test_get_transformation_results_not_authorized(get_transformation_results, servicex): +async def test_get_transformation_results_not_authorized( + get_transformation_results, servicex +): get_transformation_results.return_value.__aenter__.return_value.status = 403 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -555,12 +562,15 @@ async def test_get_transformation_results_not_authorized(get_transformation_resu headers={}, params={ "begin_at": now.isoformat(), - } + }, ) + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") -async def test_get_transformation_results_server_error(get_transformation_results, servicex): +async def test_get_transformation_results_server_error( + get_transformation_results, servicex +): get_transformation_results.return_value.__aenter__.return_value.status = 500 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -573,5 +583,5 @@ async def test_get_transformation_results_server_error(get_transformation_result headers={}, params={ "begin_at": now.isoformat(), - } + }, ) From 9492997f6e38776e9419e2d2b4656df7afab1a14 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 20 May 2025 16:12:51 -0500 Subject: [PATCH 09/35] flake8 compliance --- tests/test_servicex_adapter.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 8043face..6346e2eb 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -29,7 +29,6 @@ import tempfile import time import datetime -import unittest from unittest.mock import patch import httpx @@ -505,6 +504,7 @@ async def test_get_authorization(servicex): r = await servicex._get_authorization() get_token.assert_called_once() + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") async def test_get_transformation_results_success(get_transformation_results, servicex): @@ -519,12 +519,15 @@ async def test_get_transformation_results_success(get_transformation_results, se headers={}, params={ "begin_at": now.isoformat(), - } + }, ) + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") -async def test_get_transformation_results_not_found(get_transformation_results, servicex): +async def test_get_transformation_results_not_found( + get_transformation_results, servicex +): get_transformation_results.return_value.__aenter__.return_value.status = 404 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -537,12 +540,15 @@ async def test_get_transformation_results_not_found(get_transformation_results, headers={}, params={ "begin_at": now.isoformat(), - } + }, ) + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") -async def test_get_transformation_results_not_authorized(get_transformation_results, servicex): +async def test_get_transformation_results_not_authorized( + get_transformation_results, servicex +): get_transformation_results.return_value.__aenter__.return_value.status = 403 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -555,12 +561,15 @@ async def test_get_transformation_results_not_authorized(get_transformation_resu headers={}, params={ "begin_at": now.isoformat(), - } + }, ) + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") -async def test_get_transformation_results_server_error(get_transformation_results, servicex): +async def test_get_transformation_results_server_error( + get_transformation_results, servicex +): get_transformation_results.return_value.__aenter__.return_value.status = 500 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -573,5 +582,5 @@ async def test_get_transformation_results_server_error(get_transformation_result headers={}, params={ "begin_at": now.isoformat(), - } + }, ) From ab9d0cf0fc9bce2f6c81dd68f67e0cc6e54a5bdc Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 20 May 2025 16:20:32 -0500 Subject: [PATCH 10/35] code coverage improvement --- servicex/query_core.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index e8ee2c36..bba90458 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -573,12 +573,9 @@ async def get_signed_url( begin_at = new_begin_at for file in files: - if "file-path" not in file: - continue + file_path = file.get("file-path", '').replace("/", ":") - file_path = file["file-path"].replace("/", ":") - - if file_path not in files_seen: + if file_path != '' and file_path not in files_seen: if signed_urls_only: download_tasks.append( loop.create_task( From 3644ee1e8fb3fa6dc263a3c06fc66a90a5d1d8c4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 20 May 2025 21:21:19 +0000 Subject: [PATCH 11/35] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/query_core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index bba90458..90b91894 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -573,9 +573,9 @@ async def get_signed_url( begin_at = new_begin_at for file in files: - file_path = file.get("file-path", '').replace("/", ":") + file_path = file.get("file-path", "").replace("/", ":") - if file_path != '' and file_path not in files_seen: + if file_path != "" and file_path not in files_seen: if signed_urls_only: download_tasks.append( loop.create_task( From 9b506c53ae7509d4568e86b8a6610a00a7705e31 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 22 May 2025 09:55:46 -0500 Subject: [PATCH 12/35] add ServiceXFile class and refactor tests to match previous --- servicex/query_core.py | 10 +++++----- servicex/servicex_adapter.py | 11 ++++++++++- tests/test_dataset.py | 23 +++++++++++----------- tests/test_servicex_adapter.py | 16 +++++++++++---- tests/test_servicex_dataset.py | 36 ++++++++++++---------------------- 5 files changed, 51 insertions(+), 45 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index bba90458..28b4fc70 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -573,15 +573,15 @@ async def get_signed_url( begin_at = new_begin_at for file in files: - file_path = file.get("file-path", '').replace("/", ":") + filename = file.filename - if file_path != '' and file_path not in files_seen: + if filename != '' and filename not in files_seen: if signed_urls_only: download_tasks.append( loop.create_task( get_signed_url( self.minio, - file_path, + filename, progress, download_progress, ) @@ -592,14 +592,14 @@ async def get_signed_url( loop.create_task( download_file( self.minio, - file_path, + filename, progress, download_progress, shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 ) ) ) # NOQA 501 - files_seen.add(file_path) + files_seen.add(filename) # Once the transform is complete and all files are seen we can stop polling. # Also, if we are just downloading or signing urls for a previous transform diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index b5a06f0e..065ba2e2 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -29,6 +29,7 @@ import time import datetime from typing import Optional, Dict, List +from dataclasses import dataclass from aiohttp import ClientSession import httpx @@ -48,6 +49,10 @@ class AuthorizationError(BaseException): pass +@dataclass +class ServiceXFile: + filename: str + async def _extract_message(r: ClientResponse): try: @@ -253,7 +258,11 @@ async def get_transformation_results( raise RuntimeError(f"Failed with message: {msg}") data = await r.json() - return data.get("results") + response = list() + for result in data.get("results", []): + file = ServiceXFile(filename=result["file-path"].replace("/", ":")) + response.append(file) + return response async def cancel_transform(self, transform_id=None): headers = await self._get_authorization() diff --git a/tests/test_dataset.py b/tests/test_dataset.py index ef09c0d7..19ad3539 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -127,12 +127,11 @@ async def test_download_files(python_dataset): config = Configuration(cache_path="temp_dir", api_endpoints=[]) python_dataset.configuration = config python_dataset.servicex = AsyncMock() - python_dataset.servicex.get_transformation_results = AsyncMock( - side_effect=[ - [{"file-path": "file1.txt"}], - [{"file-path": "file2.txt"}], - ] - ) + python_dataset.servicex.get_transformation_results = AsyncMock() + python_dataset.servicex.get_transformation_results.return_value = [ + Mock(filename="file1.txt"), + Mock(filename="file2.txt"), + ] minio_mock.download_file.return_value = Path("/path/to/downloaded_file") minio_mock.get_signed_url.return_value = Path("http://example.com/signed_url") @@ -164,12 +163,12 @@ async def test_download_files_with_signed_urls(python_dataset): progress_mock = Mock() python_dataset.servicex = AsyncMock() - python_dataset.servicex.get_transformation_results = AsyncMock( - side_effect=[ - [{"file-path": "file1.txt"}], - [{"file-path": "file2.txt"}], - ] - ) + python_dataset.servicex.get_transformation_results = AsyncMock() + python_dataset.servicex.get_transformation_results.return_value = [ + Mock(filename="file1.txt"), + Mock(filename="file2.txt"), + ] + python_dataset.minio_polling_interval = 0 python_dataset.minio = minio_mock python_dataset.current_status = Mock(status="Complete", files_completed=2) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 6346e2eb..9fd71a03 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -29,7 +29,7 @@ import tempfile import time import datetime -from unittest.mock import patch +from unittest.mock import patch, AsyncMock import httpx import pytest @@ -507,14 +507,22 @@ async def test_get_authorization(servicex): @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") -async def test_get_transformation_results_success(get_transformation_results, servicex): - get_transformation_results.return_value.__aenter__.return_value.status = 200 +async def test_get_transformation_results_success(get, servicex): + get.return_value.__aenter__.return_value.status = 200 + get.return_value.__aenter__.return_value.json = AsyncMock(return_value={ + "results": [ + {"file-path": "file1.txt"}, + {"file-path": "file2.txt"}, + ] + }) + + print(get) request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) await servicex.get_transformation_results(request_id, now) - get_transformation_results.assert_called_with( + get.assert_called_with( url=f"https://servicex.org/servicex/transformation/{request_id}/results", headers={}, params={ diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 45519e69..52e9c18d 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -27,7 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import tempfile from typing import List -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, patch, Mock from pathlib import PurePath import pytest from itertools import cycle @@ -208,15 +208,7 @@ async def test_submit(mocker): servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock( - side_effect=[ - [{"file-path": file1.filename}], - [ - {"file-path": file1.filename}, - {"file-path": file2.filename}, - ], - ] - ) + servicex.get_transformation_results = AsyncMock(side_effect=[[file1], [file1, file2]]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -262,13 +254,7 @@ async def test_submit_partial_success(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - - servicex.get_transformation_results = AsyncMock( - side_effect=[ - [{"file-path": file1.filename}], - [{"file-path": file1.filename}], - ] - ) + servicex.get_transformation_results = AsyncMock(side_effect=[[file1], [file1]]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -314,9 +300,11 @@ async def test_use_of_cache(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock( - return_value=[{"file-path": file1.filename}, {"file-path": file2.filename}] - ) + servicex.get_transformation_results = AsyncMock() + servicex.get_transformation_results.return_value = [ + Mock(filename="file1.txt"), + Mock(filename="file2.txt"), + ] servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -618,9 +606,11 @@ async def test_use_of_ignore_cache(mocker, servicex): transform_status3, ] ) - servicex.get_transformation_results = AsyncMock( - return_value=[{"file-path": file1.filename}, {"file-path": file2.filename}] - ) + servicex.get_transformation_results = AsyncMock() + servicex.get_transformation_results.return_value = [ + Mock(filename="file1.txt"), + Mock(filename="file2.txt"), + ] # Prepare Minio mock_minio = AsyncMock() mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) From 6d18ff3814eaea605ab446da8edd1b34cc8c4618 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 22 May 2025 18:09:34 +0000 Subject: [PATCH 13/35] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/query_core.py | 2 +- servicex/servicex_adapter.py | 1 + tests/test_servicex_adapter.py | 14 ++++++++------ tests/test_servicex_dataset.py | 4 +++- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 28b4fc70..243beaab 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -575,7 +575,7 @@ async def get_signed_url( for file in files: filename = file.filename - if filename != '' and filename not in files_seen: + if filename != "" and filename not in files_seen: if signed_urls_only: download_tasks.append( loop.create_task( diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 065ba2e2..5332b9d4 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -49,6 +49,7 @@ class AuthorizationError(BaseException): pass + @dataclass class ServiceXFile: filename: str diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 9fd71a03..4d3aafb1 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -509,12 +509,14 @@ async def test_get_authorization(servicex): @patch("servicex.servicex_adapter.ClientSession.get") async def test_get_transformation_results_success(get, servicex): get.return_value.__aenter__.return_value.status = 200 - get.return_value.__aenter__.return_value.json = AsyncMock(return_value={ - "results": [ - {"file-path": "file1.txt"}, - {"file-path": "file2.txt"}, - ] - }) + get.return_value.__aenter__.return_value.json = AsyncMock( + return_value={ + "results": [ + {"file-path": "file1.txt"}, + {"file-path": "file2.txt"}, + ] + } + ) print(get) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 52e9c18d..77d3ed6e 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -208,7 +208,9 @@ async def test_submit(mocker): servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(side_effect=[[file1], [file1, file2]]) + servicex.get_transformation_results = AsyncMock( + side_effect=[[file1], [file1, file2]] + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ From 98392dd1d99bb1ef8d4aa79b4c337a4752bd7006 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 22 May 2025 16:46:13 -0500 Subject: [PATCH 14/35] resolve breaking tests --- servicex/query_core.py | 11 +++++------ servicex/servicex_adapter.py | 14 +++++++++----- tests/test_dataset.py | 16 ++++++++-------- tests/test_servicex_adapter.py | 13 +++++++------ tests/test_servicex_dataset.py | 32 +++++++++++++++++++++++--------- 5 files changed, 52 insertions(+), 34 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 243beaab..85603550 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -319,7 +319,6 @@ def transform_complete(task: Task): else None ) - begin_at = datetime.datetime.now(tz=datetime.timezone.utc) if not cached_record: if self.cache.is_transform_request_submitted(sx_request_hash): @@ -348,7 +347,6 @@ def transform_complete(task: Task): expandable_progress, download_progress, cached_record, - begin_at, ) ) @@ -524,7 +522,6 @@ async def download_files( progress: ExpandableProgress, download_progress: TaskID, cached_record: Optional[TransformedResults], - begin_at: datetime.datetime, ) -> List[str]: """ Task to monitor the list of files in the transform output's bucket. Any new files @@ -560,17 +557,16 @@ async def get_signed_url( if progress: progress.advance(task_id=download_progress, task_type="Download") + later_than: datetime.datetime | None = None while True: if not cached_record: await asyncio.sleep(self.minio_polling_interval) if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc) files = await self.servicex.get_transformation_results( - self.current_status.request_id, begin_at + self.current_status.request_id, later_than ) - begin_at = new_begin_at for file in files: filename = file.filename @@ -601,6 +597,9 @@ async def get_signed_url( ) # NOQA 501 files_seen.add(filename) + if later_than is None or file.created_at > later_than: + later_than = file.created_at + # Once the transform is complete and all files are seen we can stop polling. # Also, if we are just downloading or signing urls for a previous transform # then we know it is complete as well diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 5332b9d4..54280816 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -52,6 +52,7 @@ class AuthorizationError(BaseException): @dataclass class ServiceXFile: + created_at: datetime.datetime filename: str @@ -236,13 +237,13 @@ async def delete_transform(self, transform_id=None): ) async def get_transformation_results( - self, request_id: str, begin_at: datetime.datetime + self, request_id: str, later_than: datetime.datetime | None = None ): headers = await self._get_authorization() url = self.url + f"/servicex/transformation/{request_id}/results" - params = { - "begin_at": begin_at.isoformat(), - } + params = {} + if later_than: + params["later_than"] = later_than.isoformat() async with ClientSession() as session: async with session.get(headers=headers, url=url, params=params) as r: @@ -261,7 +262,10 @@ async def get_transformation_results( data = await r.json() response = list() for result in data.get("results", []): - file = ServiceXFile(filename=result["file-path"].replace("/", ":")) + file = ServiceXFile( + filename=result["file-path"].replace("/", ":"), + created_at=datetime.datetime.fromisoformat(result["created_at"]), + ) response.append(file) return response diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 19ad3539..18bfbfc8 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -45,6 +45,8 @@ ) from rich.progress import Progress +from servicex.servicex_adapter import ServiceXFile + @pytest.mark.asyncio async def test_as_signed_urls_happy(transformed_result): @@ -121,7 +123,6 @@ async def test_as_files_cached(transformed_result, python_dataset): @pytest.mark.asyncio async def test_download_files(python_dataset): signed_urls_only = False - begin_at = datetime.datetime.now(tz=datetime.timezone.utc) download_progress = "download_task_id" minio_mock = AsyncMock() config = Configuration(cache_path="temp_dir", api_endpoints=[]) @@ -129,8 +130,8 @@ async def test_download_files(python_dataset): python_dataset.servicex = AsyncMock() python_dataset.servicex.get_transformation_results = AsyncMock() python_dataset.servicex.get_transformation_results.return_value = [ - Mock(filename="file1.txt"), - Mock(filename="file2.txt"), + ServiceXFile(filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile(filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), ] minio_mock.download_file.return_value = Path("/path/to/downloaded_file") @@ -143,7 +144,7 @@ async def test_download_files(python_dataset): python_dataset.configuration.shortened_downloaded_filename = False result_uris = await python_dataset.download_files( - signed_urls_only, progress_mock, download_progress, None, begin_at + signed_urls_only, progress_mock, download_progress, None ) minio_mock.download_file.assert_awaited() minio_mock.get_signed_url.assert_not_awaited() @@ -153,7 +154,6 @@ async def test_download_files(python_dataset): @pytest.mark.asyncio async def test_download_files_with_signed_urls(python_dataset): signed_urls_only = True - begin_at = datetime.datetime.now(tz=datetime.timezone.utc) download_progress = "download_task_id" minio_mock = AsyncMock() config = Configuration(cache_path="temp_dir", api_endpoints=[]) @@ -165,8 +165,8 @@ async def test_download_files_with_signed_urls(python_dataset): python_dataset.servicex = AsyncMock() python_dataset.servicex.get_transformation_results = AsyncMock() python_dataset.servicex.get_transformation_results.return_value = [ - Mock(filename="file1.txt"), - Mock(filename="file2.txt"), + ServiceXFile(filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile(filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), ] python_dataset.minio_polling_interval = 0 @@ -175,7 +175,7 @@ async def test_download_files_with_signed_urls(python_dataset): python_dataset.configuration.shortened_downloaded_filename = False result_uris = await python_dataset.download_files( - signed_urls_only, progress_mock, download_progress, None, begin_at + signed_urls_only, progress_mock, download_progress, None ) minio_mock.download_file.assert_not_called() minio_mock.get_signed_url.assert_called() diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 4d3aafb1..989f4035 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -29,6 +29,7 @@ import tempfile import time import datetime +from unittest import result from unittest.mock import patch, AsyncMock import httpx @@ -512,8 +513,8 @@ async def test_get_transformation_results_success(get, servicex): get.return_value.__aenter__.return_value.json = AsyncMock( return_value={ "results": [ - {"file-path": "file1.txt"}, - {"file-path": "file2.txt"}, + {"file-path": "file1.txt", "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat()}, + {"file-path": "file2.txt", "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat()}, ] } ) @@ -528,7 +529,7 @@ async def test_get_transformation_results_success(get, servicex): url=f"https://servicex.org/servicex/transformation/{request_id}/results", headers={}, params={ - "begin_at": now.isoformat(), + "later_than": now.isoformat(), }, ) @@ -549,7 +550,7 @@ async def test_get_transformation_results_not_found( url=f"https://servicex.org/servicex/transformation/{request_id}/results", headers={}, params={ - "begin_at": now.isoformat(), + "later_than": now.isoformat(), }, ) @@ -570,7 +571,7 @@ async def test_get_transformation_results_not_authorized( url=f"https://servicex.org/servicex/transformation/{request_id}/results", headers={}, params={ - "begin_at": now.isoformat(), + "later_than": now.isoformat(), }, ) @@ -591,6 +592,6 @@ async def test_get_transformation_results_server_error( url=f"https://servicex.org/servicex/transformation/{request_id}/results", headers={}, params={ - "begin_at": now.isoformat(), + "later_than": now.isoformat(), }, ) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 77d3ed6e..d8d9378e 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -25,6 +25,7 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import datetime import tempfile from typing import List from unittest.mock import AsyncMock, patch, Mock @@ -46,6 +47,7 @@ ) from servicex.query_cache import QueryCache from servicex.query_core import ServiceXException, Query +from servicex.servicex_adapter import ServiceXFile from servicex.servicex_client import ServiceXClient from servicex.uproot_raw.uproot_raw import UprootRawQuery @@ -208,10 +210,15 @@ async def test_submit(mocker): servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock( - side_effect=[[file1], [file1, file2]] - ) - + servicex.get_transformation_results = AsyncMock(side_effect=[ + [ + ServiceXFile(filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc)), + ], + [ + ServiceXFile(filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile(filename="file2", created_at=datetime.datetime.now(datetime.timezone.utc)), + ] + ]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -256,7 +263,14 @@ async def test_submit_partial_success(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(side_effect=[[file1], [file1]]) + servicex.get_transformation_results = AsyncMock(side_effect=[ + [ + ServiceXFile(filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc)), + ], + [ + ServiceXFile(filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc)), + ] + ]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -304,8 +318,8 @@ async def test_use_of_cache(mocker): servicex.submit_transform.return_value = {"request_id": '123-456-789"'} servicex.get_transformation_results = AsyncMock() servicex.get_transformation_results.return_value = [ - Mock(filename="file1.txt"), - Mock(filename="file2.txt"), + ServiceXFile(filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile(filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), ] servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -610,8 +624,8 @@ async def test_use_of_ignore_cache(mocker, servicex): ) servicex.get_transformation_results = AsyncMock() servicex.get_transformation_results.return_value = [ - Mock(filename="file1.txt"), - Mock(filename="file2.txt"), + ServiceXFile(filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile(filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), ] # Prepare Minio mock_minio = AsyncMock() From 8e952eb844c2c737e1449f75c0b99502dd6deb8a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 22 May 2025 21:46:32 +0000 Subject: [PATCH 15/35] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- servicex/servicex_adapter.py | 4 +- tests/test_dataset.py | 20 ++++++++-- tests/test_servicex_adapter.py | 14 ++++++- tests/test_servicex_dataset.py | 69 ++++++++++++++++++++++++---------- 4 files changed, 81 insertions(+), 26 deletions(-) diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 54280816..9b34829a 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -264,7 +264,9 @@ async def get_transformation_results( for result in data.get("results", []): file = ServiceXFile( filename=result["file-path"].replace("/", ":"), - created_at=datetime.datetime.fromisoformat(result["created_at"]), + created_at=datetime.datetime.fromisoformat( + result["created_at"] + ), ) response.append(file) return response diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 18bfbfc8..66a48c1a 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -130,8 +130,14 @@ async def test_download_files(python_dataset): python_dataset.servicex = AsyncMock() python_dataset.servicex.get_transformation_results = AsyncMock() python_dataset.servicex.get_transformation_results.return_value = [ - ServiceXFile(filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), - ServiceXFile(filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile( + filename="file1.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ServiceXFile( + filename="file2.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), ] minio_mock.download_file.return_value = Path("/path/to/downloaded_file") @@ -165,8 +171,14 @@ async def test_download_files_with_signed_urls(python_dataset): python_dataset.servicex = AsyncMock() python_dataset.servicex.get_transformation_results = AsyncMock() python_dataset.servicex.get_transformation_results.return_value = [ - ServiceXFile(filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), - ServiceXFile(filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile( + filename="file1.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ServiceXFile( + filename="file2.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), ] python_dataset.minio_polling_interval = 0 diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 989f4035..c4efd47a 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -513,8 +513,18 @@ async def test_get_transformation_results_success(get, servicex): get.return_value.__aenter__.return_value.json = AsyncMock( return_value={ "results": [ - {"file-path": "file1.txt", "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat()}, - {"file-path": "file2.txt", "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat()}, + { + "file-path": "file1.txt", + "created_at": datetime.datetime.now( + datetime.timezone.utc + ).isoformat(), + }, + { + "file-path": "file2.txt", + "created_at": datetime.datetime.now( + datetime.timezone.utc + ).isoformat(), + }, ] } ) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index d8d9378e..12d3ffee 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -210,15 +210,26 @@ async def test_submit(mocker): servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(side_effect=[ - [ - ServiceXFile(filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc)), - ], - [ - ServiceXFile(filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc)), - ServiceXFile(filename="file2", created_at=datetime.datetime.now(datetime.timezone.utc)), + servicex.get_transformation_results = AsyncMock( + side_effect=[ + [ + ServiceXFile( + filename="file1", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ], + [ + ServiceXFile( + filename="file1", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ServiceXFile( + filename="file2", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ], ] - ]) + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, @@ -263,14 +274,22 @@ async def test_submit_partial_success(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_transformation_results = AsyncMock(side_effect=[ - [ - ServiceXFile(filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc)), - ], - [ - ServiceXFile(filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc)), + servicex.get_transformation_results = AsyncMock( + side_effect=[ + [ + ServiceXFile( + filename="file1", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ], + [ + ServiceXFile( + filename="file1", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ], ] - ]) + ) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -318,8 +337,14 @@ async def test_use_of_cache(mocker): servicex.submit_transform.return_value = {"request_id": '123-456-789"'} servicex.get_transformation_results = AsyncMock() servicex.get_transformation_results.return_value = [ - ServiceXFile(filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), - ServiceXFile(filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile( + filename="file1.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ServiceXFile( + filename="file2.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), ] servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -624,8 +649,14 @@ async def test_use_of_ignore_cache(mocker, servicex): ) servicex.get_transformation_results = AsyncMock() servicex.get_transformation_results.return_value = [ - ServiceXFile(filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), - ServiceXFile(filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc)), + ServiceXFile( + filename="file1.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ServiceXFile( + filename="file2.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), ] # Prepare Minio mock_minio = AsyncMock() From 232be85b2993c47bf0e1368dbd49911b7e786779 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 22 May 2025 16:51:01 -0500 Subject: [PATCH 16/35] flake8 --- tests/test_servicex_adapter.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 989f4035..9fd717dd 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -29,7 +29,6 @@ import tempfile import time import datetime -from unittest import result from unittest.mock import patch, AsyncMock import httpx @@ -513,8 +512,10 @@ async def test_get_transformation_results_success(get, servicex): get.return_value.__aenter__.return_value.json = AsyncMock( return_value={ "results": [ - {"file-path": "file1.txt", "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat()}, - {"file-path": "file2.txt", "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat()}, + {"file-path": "file1.txt", "created_at": + datetime.datetime.now(datetime.timezone.utc).isoformat()}, + {"file-path": "file2.txt", "created_at": + datetime.datetime.now(datetime.timezone.utc).isoformat()}, ] } ) From b79cb26f5c2dd15acde7b09cd7f8fee07c06ac52 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 22 May 2025 16:54:07 -0500 Subject: [PATCH 17/35] flake8 --- tests/test_servicex_adapter.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index c4efd47a..928864b5 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -29,7 +29,6 @@ import tempfile import time import datetime -from unittest import result from unittest.mock import patch, AsyncMock import httpx From c8ad74ce617d250ed31b60657ef8e5fdc0772d70 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 22 May 2025 16:54:54 -0500 Subject: [PATCH 18/35] flake8 --- tests/test_servicex_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 12d3ffee..46335d37 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -28,7 +28,7 @@ import datetime import tempfile from typing import List -from unittest.mock import AsyncMock, patch, Mock +from unittest.mock import AsyncMock, patch from pathlib import PurePath import pytest from itertools import cycle From 489e05db94f7b51b5942556d26853f06dcd39edf Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 22 May 2025 16:58:52 -0500 Subject: [PATCH 19/35] fix for python3.9 --- servicex/servicex_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index 9b34829a..b877c2ce 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -237,7 +237,7 @@ async def delete_transform(self, transform_id=None): ) async def get_transformation_results( - self, request_id: str, later_than: datetime.datetime | None = None + self, request_id: str, later_than: Optional[datetime.datetime] = None ): headers = await self._get_authorization() url = self.url + f"/servicex/transformation/{request_id}/results" From 4e01dee3232219fde66ceed693c1c904fbbca9a2 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 22 May 2025 17:01:23 -0500 Subject: [PATCH 20/35] remove extraneous print call --- tests/test_servicex_adapter.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 928864b5..51873b12 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -528,8 +528,6 @@ async def test_get_transformation_results_success(get, servicex): } ) - print(get) - request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) await servicex.get_transformation_results(request_id, now) From ee0423f04b582f644fb762f5035efe22dc8e12bb Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 4 Jun 2025 13:27:46 -0500 Subject: [PATCH 21/35] simplify typing --- servicex/query_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 85603550..129957f5 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -557,7 +557,7 @@ async def get_signed_url( if progress: progress.advance(task_id=download_progress, task_type="Download") - later_than: datetime.datetime | None = None + later_than = datetime.datetime.min while True: if not cached_record: await asyncio.sleep(self.minio_polling_interval) From f5bf54664c242cfccedb0af19d0e454e12b864f4 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 11 Jun 2025 10:42:03 -0500 Subject: [PATCH 22/35] add feature branching --- servicex/models.py | 10 +++++++++ servicex/query_core.py | 18 +++++++++++----- servicex/servicex_adapter.py | 41 +++++++++++++++++++++++++++++++++++- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/servicex/models.py b/servicex/models.py index 37bde544..2ba488db 100644 --- a/servicex/models.py +++ b/servicex/models.py @@ -230,6 +230,16 @@ class TransformedResults(DocStringBaseModel): """URL for looking up logs on the ServiceX server""" +class ServiceXInfo(DocStringBaseModel): + r""" + Model for ServiceX Info properties + """ + + app_version: str = Field(alias="app-version") + code_gen_image: dict[str, str] = Field(alias="code-gen-image") + capabilities: list[str] + + class DatasetFile(BaseModel): """ Model for a file in a cached dataset diff --git a/servicex/query_core.py b/servicex/query_core.py index 129957f5..9cf9d578 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -564,9 +564,13 @@ async def get_signed_url( if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - files = await self.servicex.get_transformation_results( - self.current_status.request_id, later_than - ) + capabilities = await self.servicex.get_servicex_capabilities() + if "poll_local_transformation_results" in capabilities: + files = await self.servicex.get_transformation_results( + self.current_status.request_id, later_than + ) + else: + files = await self.minio.list_bucket() for file in files: filename = file.filename @@ -597,8 +601,12 @@ async def get_signed_url( ) # NOQA 501 files_seen.add(filename) - if later_than is None or file.created_at > later_than: - later_than = file.created_at + if ( + "poll_local_transformation_results" + in await self.servicex.get_servicex_capabilities() + ): + if file.created_at > later_than: + later_than = file.created_at # Once the transform is complete and all files are seen we can stop polling. # Also, if we are just downloading or signing urls for a previous transform diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index b877c2ce..74e8619a 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -43,7 +43,12 @@ retry_if_not_exception_type, ) -from servicex.models import TransformRequest, TransformStatus, CachedDataset +from servicex.models import ( + TransformRequest, + TransformStatus, + CachedDataset, + ServiceXInfo, +) class AuthorizationError(BaseException): @@ -71,6 +76,9 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): self.refresh_token = refresh_token self.token = None + # interact with _servicex_info via get_servicex_info + self._servicex_info: Optional[ServiceXInfo] = None + async def _get_token(self): url = f"{self.url}/token/refresh" headers = {"Authorization": f"Bearer {self.refresh_token}"} @@ -118,6 +126,31 @@ async def _get_authorization(self, force_reauth: bool = False) -> Dict[str, str] await self._get_token() return {"Authorization": f"Bearer {self.token}"} + async def get_servicex_info(self) -> ServiceXInfo: + if self._servicex_info: + return self._servicex_info + + headers = await self._get_authorization() + retry_options = ExponentialRetry(attempts=3, start_timeout=10) + async with RetryClient(retry_options=retry_options) as client: + async with client.get(url=f"{self.url}/servicex", headers=headers) as r: + if r.status == 401: + raise AuthorizationError( + f"Not authorized to access serviceX at {self.url}" + ) + elif r.status > 400: + error_message = await _extract_message(r) + raise RuntimeError( + "ServiceX WebAPI Error during transformation " + f"submission: {r.status} - {error_message}" + ) + servicex_info = await r.json() + self._servicex_info = ServiceXInfo(**servicex_info) + return self._servicex_info + + async def get_servicex_capabilities(self) -> List[str]: + return (await self.get_servicex_info()).capabilities + async def get_transforms(self) -> List[TransformStatus]: headers = await self._get_authorization() retry_options = ExponentialRetry(attempts=3, start_timeout=10) @@ -239,6 +272,12 @@ async def delete_transform(self, transform_id=None): async def get_transformation_results( self, request_id: str, later_than: Optional[datetime.datetime] = None ): + if ( + "poll_local_transformation_results" + not in await self.get_servicex_capabilities() + ): + raise ValueError("ServiceX capabilities not found") + headers = await self._get_authorization() url = self.url + f"/servicex/transformation/{request_id}/results" params = {} From 5bbe40f594d34cb768713c94e8511172554febf6 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 11 Jun 2025 14:50:10 -0500 Subject: [PATCH 23/35] add testing --- servicex/query_core.py | 2 +- tests/test_dataset.py | 8 ++ tests/test_servicex_adapter.py | 22 ++++ tests/test_servicex_dataset.py | 202 +++++++++++++++++++++++++++++++++ 4 files changed, 233 insertions(+), 1 deletion(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index 9cf9d578..175a7b28 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -557,7 +557,7 @@ async def get_signed_url( if progress: progress.advance(task_id=download_progress, task_type="Download") - later_than = datetime.datetime.min + later_than = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) while True: if not cached_record: await asyncio.sleep(self.minio_polling_interval) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 66a48c1a..37d07aaa 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -128,6 +128,10 @@ async def test_download_files(python_dataset): config = Configuration(cache_path="temp_dir", api_endpoints=[]) python_dataset.configuration = config python_dataset.servicex = AsyncMock() + python_dataset.servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) + python_dataset.servicex.get_transformation_results = AsyncMock() python_dataset.servicex.get_transformation_results.return_value = [ ServiceXFile( @@ -169,6 +173,10 @@ async def test_download_files_with_signed_urls(python_dataset): progress_mock = Mock() python_dataset.servicex = AsyncMock() + python_dataset.servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) + python_dataset.servicex.get_transformation_results = AsyncMock() python_dataset.servicex.get_transformation_results.return_value = [ ServiceXFile( diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 51873b12..a1039a18 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -508,6 +508,9 @@ async def test_get_authorization(servicex): @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") async def test_get_transformation_results_success(get, servicex): + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) get.return_value.__aenter__.return_value.status = 200 get.return_value.__aenter__.return_value.json = AsyncMock( return_value={ @@ -541,11 +544,24 @@ async def test_get_transformation_results_success(get, servicex): ) +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.ClientSession.get") +async def test_get_transformation_results_no_feature_flag(get, servicex): + servicex.get_servicex_capabilities = AsyncMock(return_value=[]) + request_id = "123-45-6789" + now = datetime.datetime.now(datetime.timezone.utc) + with pytest.raises(ValueError): + await servicex.get_transformation_results(request_id, now) + + @pytest.mark.asyncio @patch("servicex.servicex_adapter.ClientSession.get") async def test_get_transformation_results_not_found( get_transformation_results, servicex ): + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) get_transformation_results.return_value.__aenter__.return_value.status = 404 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -567,6 +583,9 @@ async def test_get_transformation_results_not_found( async def test_get_transformation_results_not_authorized( get_transformation_results, servicex ): + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) get_transformation_results.return_value.__aenter__.return_value.status = 403 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -588,6 +607,9 @@ async def test_get_transformation_results_not_authorized( async def test_get_transformation_results_server_error( get_transformation_results, servicex ): + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) get_transformation_results.return_value.__aenter__.return_value.status = 500 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 46335d37..ba9f9687 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -209,6 +209,9 @@ async def test_submit(mocker): servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) servicex.get_transformation_results = AsyncMock( side_effect=[ @@ -269,11 +272,62 @@ async def test_submit(mocker): mock_cache.cache_transform.assert_called_once() +@pytest.mark.asyncio +async def test_submit_s3_polling(mocker): + servicex = AsyncMock() + + servicex.submit_transform = AsyncMock() + servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + servicex.get_servicex_capabilities = AsyncMock(return_value=[]) + + servicex.get_transform_status = AsyncMock() + servicex.get_transform_status.side_effect = [ + transform_status1, + transform_status2, + transform_status3, + ] + + mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) + mock_minio.download_file = AsyncMock( + side_effect=lambda a, _, shorten_filename: PurePath(a) + ) + + mock_cache = mocker.MagicMock(QueryCache) + mock_cache.get_transform_by_hash = mocker.MagicMock(return_value=None) + mock_cache.transformed_results = mocker.MagicMock(side_effect=transformed_results) + mock_cache.cache_transform = mocker.MagicMock(side_effect=cache_transform) + mock_cache.cache_path_for_transform = mocker.MagicMock(return_value=PurePath(".")) + mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) + did = FileListDataset("/foo/bar/baz.root") + datasource = Query( + dataset_identifier=did, + title="ServiceX Client", + codegen="uproot", + sx_adapter=servicex, + query_cache=mock_cache, + config=Configuration(api_endpoints=[]), + ) + datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") + + with ExpandableProgress(display_progress=False) as progress: + datasource.result_format = ResultFormat.parquet + result = await datasource.submit_and_download( + signed_urls_only=False, expandable_progress=progress + ) + print(mock_minio.download_file.call_args) + assert result.file_list == ["file1", "file2"] + mock_cache.cache_transform.assert_called_once() + + @pytest.mark.asyncio async def test_submit_partial_success(mocker): servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) servicex.get_transformation_results = AsyncMock( side_effect=[ [ @@ -329,12 +383,61 @@ async def test_submit_partial_success(mocker): mock_cache.cache_transform.assert_not_called() +@pytest.mark.asyncio +async def test_submit_partial_success_s3_polling(mocker): + servicex = AsyncMock() + servicex.submit_transform = AsyncMock() + servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + servicex.get_servicex_capabilities = AsyncMock(return_value=[]) + + servicex.get_transform_status = AsyncMock() + servicex.get_transform_status.side_effect = [ + transform_status1, + transform_status2, + transform_status6, + ] + + mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) + mock_minio.download_file = AsyncMock( + side_effect=lambda a, _, shorten_filename: PurePath(a) + ) + + mock_cache = mocker.MagicMock(QueryCache) + mock_cache.get_transform_by_hash = mocker.MagicMock(return_value=None) + mock_cache.transformed_results = mocker.MagicMock(side_effect=transformed_results) + mock_cache.cache_transform = mocker.MagicMock(side_effect=cache_transform) + mock_cache.cache_path_for_transform = mocker.MagicMock(return_value=PurePath(".")) + mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) + did = FileListDataset("/foo/bar/baz.root") + datasource = Query( + dataset_identifier=did, + title="ServiceX Client", + codegen="uproot", + sx_adapter=servicex, + query_cache=mock_cache, + config=Configuration(api_endpoints=[]), + ) + datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") + with ExpandableProgress(display_progress=False) as progress: + datasource.result_format = ResultFormat.parquet + result = await datasource.submit_and_download( + signed_urls_only=False, expandable_progress=progress + ) + print(mock_minio.download_file.call_args) + assert result.file_list == ["file1"] + mock_cache.cache_transform.assert_not_called() + + @pytest.mark.asyncio async def test_use_of_cache(mocker): """Do we pick up the cache on the second request for the same transform?""" servicex = AsyncMock() servicex.submit_transform = AsyncMock() servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) servicex.get_transformation_results = AsyncMock() servicex.get_transformation_results.return_value = [ ServiceXFile( @@ -434,6 +537,102 @@ async def test_use_of_cache(mocker): cache.close() +@pytest.mark.asyncio +async def test_use_of_cache_s3_polling(mocker): + """Do we pick up the cache on the second request for the same transform?""" + servicex = AsyncMock() + servicex.submit_transform = AsyncMock() + servicex.submit_transform.return_value = {"request_id": '123-456-789"'} + servicex.get_servicex_capabilities = AsyncMock(return_value=[]) + servicex.get_transform_status = AsyncMock() + servicex.get_transform_status.side_effect = [ + transform_status1, + transform_status3, + ] + mock_minio = AsyncMock() + mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) + mock_minio.download_file = AsyncMock( + side_effect=lambda a, _, shorten_filename: PurePath(a) + ) + mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) + + mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) + + did = FileListDataset("/foo/bar/baz.root") + with tempfile.TemporaryDirectory() as temp_dir: + config = Configuration(cache_path=temp_dir, api_endpoints=[]) + cache = QueryCache(config) + datasource = Query( + dataset_identifier=did, + title="ServiceX Client", + codegen="uproot", + sx_adapter=servicex, + query_cache=cache, + config=config, + ) + datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") + datasource.result_format = ResultFormat.parquet + upd = mocker.patch.object( + cache, "update_record", side_effect=cache.update_record + ) + with ExpandableProgress(display_progress=False) as progress: + result1 = await datasource.submit_and_download( + signed_urls_only=True, expandable_progress=progress + ) + upd.assert_not_called() + upd.reset_mock() + assert mock_minio.get_signed_url.await_count == 2 + # second round, should hit the cache (and not call the sx_adapter, minio, or update_record) + with ExpandableProgress(display_progress=False) as progress: + servicex2 = AsyncMock() + mock_minio.list_bucket.reset_mock() + mock_minio.get_signed_url.reset_mock() + datasource2 = Query( + dataset_identifier=did, + title="ServiceX Client", + codegen="uproot", + sx_adapter=servicex2, + query_cache=cache, + config=config, + ) + datasource2.query_string_generator = FuncADLQuery_Uproot().FromTree( + "nominal" + ) + datasource2.result_format = ResultFormat.parquet + result2 = await datasource2.submit_and_download( + signed_urls_only=True, expandable_progress=progress + ) + servicex2.assert_not_awaited() + mock_minio.list_bucket.assert_not_awaited() + mock_minio.get_signed_url.assert_not_awaited() + upd.assert_not_called() + assert result1 == result2 + upd.reset_mock() + servicex.get_transform_status.reset_mock(side_effect=True) + servicex.get_transform_status.return_value = transform_status3 + mock_minio.list_bucket.reset_mock(side_effect=True) + # third round, should hit the cache and download files (and call update_record) + with ExpandableProgress(display_progress=False) as progress: + await datasource.submit_and_download( + signed_urls_only=False, expandable_progress=progress + ) + servicex.assert_not_awaited() + assert mock_minio.download_file.await_count == 2 + upd.assert_called_once() + # fourth round, should hit the cache (and nothing else) + mock_minio.list_bucket.reset_mock() + mock_minio.download_file.reset_mock() + with ExpandableProgress(display_progress=False) as progress: + await datasource.submit_and_download( + signed_urls_only=False, expandable_progress=progress + ) + servicex.assert_not_awaited() + mock_minio.list_bucket.assert_not_awaited() + mock_minio.download_file.assert_not_awaited() + upd.assert_called_once() + cache.close() + + @pytest.mark.asyncio async def test_submit_cancel(mocker): servicex = AsyncMock() @@ -647,6 +846,9 @@ async def test_use_of_ignore_cache(mocker, servicex): transform_status3, ] ) + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) servicex.get_transformation_results = AsyncMock() servicex.get_transformation_results.return_value = [ ServiceXFile( From 201531a6ac48e76938eb0bd838aa2fb3bdce9f5f Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 11 Jun 2025 15:41:31 -0500 Subject: [PATCH 24/35] remove duplicated test --- tests/test_servicex_dataset.py | 369 ++++++++++----------------------- 1 file changed, 110 insertions(+), 259 deletions(-) diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index ba9f9687..d84d3399 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -203,82 +203,38 @@ def cache_transform(record: TransformedResults): return +@pytest.mark.parametrize("use_s3_polling", [False, True]) @pytest.mark.asyncio -async def test_submit(mocker): +async def test_submit(mocker, use_s3_polling): servicex = AsyncMock() - servicex.submit_transform = AsyncMock() - servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_servicex_capabilities = AsyncMock( - return_value=["poll_local_transformation_results"] - ) - - servicex.get_transformation_results = AsyncMock( - side_effect=[ - [ - ServiceXFile( - filename="file1", - created_at=datetime.datetime.now(datetime.timezone.utc), - ), - ], - [ - ServiceXFile( - filename="file1", - created_at=datetime.datetime.now(datetime.timezone.utc), - ), - ServiceXFile( - filename="file2", - created_at=datetime.datetime.now(datetime.timezone.utc), - ), - ], - ] - ) - servicex.get_transform_status = AsyncMock() - servicex.get_transform_status.side_effect = [ - transform_status1, - transform_status2, - transform_status3, - ] - - mock_minio = AsyncMock() - mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) - ) - - mock_cache = mocker.MagicMock(QueryCache) - mock_cache.get_transform_by_hash = mocker.MagicMock(return_value=None) - mock_cache.transformed_results = mocker.MagicMock(side_effect=transformed_results) - mock_cache.cache_transform = mocker.MagicMock(side_effect=cache_transform) - mock_cache.cache_path_for_transform = mocker.MagicMock(return_value=PurePath(".")) - mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) - did = FileListDataset("/foo/bar/baz.root") - datasource = Query( - dataset_identifier=did, - title="ServiceX Client", - codegen="uproot", - sx_adapter=servicex, - query_cache=mock_cache, - config=Configuration(api_endpoints=[]), - ) - datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") - - with ExpandableProgress(display_progress=False) as progress: - datasource.result_format = ResultFormat.parquet - result = await datasource.submit_and_download( - signed_urls_only=False, expandable_progress=progress + servicex.submit_transform.return_value = {"request_id": "123-456-789"} + + # Configure capabilities based on polling type + capabilities = [] if use_s3_polling else ["poll_local_transformation_results"] + servicex.get_servicex_capabilities = AsyncMock(return_value=capabilities) + + if not use_s3_polling: + servicex.get_transformation_results = AsyncMock( + side_effect=[ + [ + ServiceXFile( + filename="file1", + created_at=datetime.datetime.now(datetime.timezone.utc), + ) + ], + [ + ServiceXFile( + filename="file1", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ServiceXFile( + filename="file2", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ], + ] ) - print(mock_minio.download_file.call_args) - assert result.file_list == ["file1", "file2"] - mock_cache.cache_transform.assert_called_once() - - -@pytest.mark.asyncio -async def test_submit_s3_polling(mocker): - servicex = AsyncMock() - - servicex.submit_transform = AsyncMock() - servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_servicex_capabilities = AsyncMock(return_value=[]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -288,17 +244,21 @@ async def test_submit_s3_polling(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) + if use_s3_polling: + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]]) + mock_cache = mocker.MagicMock(QueryCache) mock_cache.get_transform_by_hash = mocker.MagicMock(return_value=None) mock_cache.transformed_results = mocker.MagicMock(side_effect=transformed_results) mock_cache.cache_transform = mocker.MagicMock(side_effect=cache_transform) mock_cache.cache_path_for_transform = mocker.MagicMock(return_value=PurePath(".")) + mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) + did = FileListDataset("/foo/bar/baz.root") datasource = Query( dataset_identifier=did, @@ -315,80 +275,38 @@ async def test_submit_s3_polling(mocker): result = await datasource.submit_and_download( signed_urls_only=False, expandable_progress=progress ) - print(mock_minio.download_file.call_args) + assert result.file_list == ["file1", "file2"] mock_cache.cache_transform.assert_called_once() +@pytest.mark.parametrize("use_s3_polling", [False, True]) @pytest.mark.asyncio -async def test_submit_partial_success(mocker): +async def test_submit_partial_success(mocker, use_s3_polling): servicex = AsyncMock() servicex.submit_transform = AsyncMock() - servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_servicex_capabilities = AsyncMock( - return_value=["poll_local_transformation_results"] - ) - servicex.get_transformation_results = AsyncMock( - side_effect=[ - [ - ServiceXFile( - filename="file1", - created_at=datetime.datetime.now(datetime.timezone.utc), - ), - ], - [ - ServiceXFile( - filename="file1", - created_at=datetime.datetime.now(datetime.timezone.utc), - ), - ], - ] - ) - - servicex.get_transform_status = AsyncMock() - servicex.get_transform_status.side_effect = [ - transform_status1, - transform_status2, - transform_status6, - ] - - mock_minio = AsyncMock() - mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) - ) - - mock_cache = mocker.MagicMock(QueryCache) - mock_cache.get_transform_by_hash = mocker.MagicMock(return_value=None) - mock_cache.transformed_results = mocker.MagicMock(side_effect=transformed_results) - mock_cache.cache_transform = mocker.MagicMock(side_effect=cache_transform) - mock_cache.cache_path_for_transform = mocker.MagicMock(return_value=PurePath(".")) - mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) - did = FileListDataset("/foo/bar/baz.root") - datasource = Query( - dataset_identifier=did, - title="ServiceX Client", - codegen="uproot", - sx_adapter=servicex, - query_cache=mock_cache, - config=Configuration(api_endpoints=[]), - ) - datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") - with ExpandableProgress(display_progress=False) as progress: - datasource.result_format = ResultFormat.parquet - result = await datasource.submit_and_download( - signed_urls_only=False, expandable_progress=progress + servicex.submit_transform.return_value = {"request_id": "123-456-789"} + + capabilities = [] if use_s3_polling else ["poll_local_transformation_results"] + servicex.get_servicex_capabilities = AsyncMock(return_value=capabilities) + + if not use_s3_polling: + servicex.get_transformation_results = AsyncMock( + side_effect=[ + [ + ServiceXFile( + filename="file1", + created_at=datetime.datetime.now(datetime.timezone.utc), + ) + ], + [ + ServiceXFile( + filename="file1", + created_at=datetime.datetime.now(datetime.timezone.utc), + ) + ], + ] ) - print(mock_minio.download_file.call_args) - assert result.file_list == ["file1"] - mock_cache.cache_transform.assert_not_called() - - -@pytest.mark.asyncio -async def test_submit_partial_success_s3_polling(mocker): - servicex = AsyncMock() - servicex.submit_transform = AsyncMock() - servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_servicex_capabilities = AsyncMock(return_value=[]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ @@ -398,17 +316,21 @@ async def test_submit_partial_success_s3_polling(mocker): ] mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) + if use_s3_polling: + mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]]) + mock_cache = mocker.MagicMock(QueryCache) mock_cache.get_transform_by_hash = mocker.MagicMock(return_value=None) mock_cache.transformed_results = mocker.MagicMock(side_effect=transformed_results) mock_cache.cache_transform = mocker.MagicMock(side_effect=cache_transform) mock_cache.cache_path_for_transform = mocker.MagicMock(return_value=PurePath(".")) + mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) + did = FileListDataset("/foo/bar/baz.root") datasource = Query( dataset_identifier=did, @@ -419,143 +341,56 @@ async def test_submit_partial_success_s3_polling(mocker): config=Configuration(api_endpoints=[]), ) datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") + with ExpandableProgress(display_progress=False) as progress: datasource.result_format = ResultFormat.parquet result = await datasource.submit_and_download( signed_urls_only=False, expandable_progress=progress ) - print(mock_minio.download_file.call_args) + assert result.file_list == ["file1"] mock_cache.cache_transform.assert_not_called() +@pytest.mark.parametrize("use_s3_polling", [False, True]) @pytest.mark.asyncio -async def test_use_of_cache(mocker): +async def test_use_of_cache(mocker, use_s3_polling): """Do we pick up the cache on the second request for the same transform?""" servicex = AsyncMock() servicex.submit_transform = AsyncMock() - servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_servicex_capabilities = AsyncMock( - return_value=["poll_local_transformation_results"] - ) - servicex.get_transformation_results = AsyncMock() - servicex.get_transformation_results.return_value = [ - ServiceXFile( - filename="file1.txt", - created_at=datetime.datetime.now(datetime.timezone.utc), - ), - ServiceXFile( - filename="file2.txt", - created_at=datetime.datetime.now(datetime.timezone.utc), - ), - ] - servicex.get_transform_status = AsyncMock() - servicex.get_transform_status.side_effect = [ - transform_status1, - transform_status3, - ] - mock_minio = AsyncMock() - mock_minio.download_file = AsyncMock( - side_effect=lambda a, _, shorten_filename: PurePath(a) - ) - mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) + servicex.submit_transform.return_value = {"request_id": "123-456-789"} - mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) + capabilities = [] if use_s3_polling else ["poll_local_transformation_results"] + servicex.get_servicex_capabilities = AsyncMock(return_value=capabilities) - did = FileListDataset("/foo/bar/baz.root") - with tempfile.TemporaryDirectory() as temp_dir: - config = Configuration(cache_path=temp_dir, api_endpoints=[]) - cache = QueryCache(config) - datasource = Query( - dataset_identifier=did, - title="ServiceX Client", - codegen="uproot", - sx_adapter=servicex, - query_cache=cache, - config=config, - ) - datasource.query_string_generator = FuncADLQuery_Uproot().FromTree("nominal") - datasource.result_format = ResultFormat.parquet - upd = mocker.patch.object( - cache, "update_record", side_effect=cache.update_record - ) - with ExpandableProgress(display_progress=False) as progress: - result1 = await datasource.submit_and_download( - signed_urls_only=True, expandable_progress=progress - ) - upd.assert_not_called() - upd.reset_mock() - assert mock_minio.get_signed_url.await_count == 2 - # second round, should hit the cache (and not call the sx_adapter, minio, or update_record) - with ExpandableProgress(display_progress=False) as progress: - servicex2 = AsyncMock() - servicex.get_transformation_results.reset_mock() - mock_minio.get_signed_url.reset_mock() - datasource2 = Query( - dataset_identifier=did, - title="ServiceX Client", - codegen="uproot", - sx_adapter=servicex2, - query_cache=cache, - config=config, - ) - datasource2.query_string_generator = FuncADLQuery_Uproot().FromTree( - "nominal" - ) - datasource2.result_format = ResultFormat.parquet - result2 = await datasource2.submit_and_download( - signed_urls_only=True, expandable_progress=progress - ) - servicex2.assert_not_awaited() - servicex.get_transformation_results.assert_not_awaited() - mock_minio.get_signed_url.assert_not_awaited() - upd.assert_not_called() - assert result1 == result2 - upd.reset_mock() - servicex.get_transform_status.reset_mock(side_effect=True) - servicex.get_transform_status.return_value = transform_status3 - servicex.get_transformation_results.reset_mock(side_effect=True) - # third round, should hit the cache and download files (and call update_record) - with ExpandableProgress(display_progress=False) as progress: - await datasource.submit_and_download( - signed_urls_only=False, expandable_progress=progress - ) - servicex.assert_not_awaited() - assert mock_minio.download_file.await_count == 2 - upd.assert_called_once() - # fourth round, should hit the cache (and nothing else) - servicex.get_transformation_results.reset_mock() - mock_minio.download_file.reset_mock() - with ExpandableProgress(display_progress=False) as progress: - await datasource.submit_and_download( - signed_urls_only=False, expandable_progress=progress - ) - servicex.assert_not_awaited() - servicex.get_transformation_results.assert_not_awaited() - mock_minio.download_file.assert_not_awaited() - upd.assert_called_once() - cache.close() - - -@pytest.mark.asyncio -async def test_use_of_cache_s3_polling(mocker): - """Do we pick up the cache on the second request for the same transform?""" - servicex = AsyncMock() - servicex.submit_transform = AsyncMock() - servicex.submit_transform.return_value = {"request_id": '123-456-789"'} - servicex.get_servicex_capabilities = AsyncMock(return_value=[]) servicex.get_transform_status = AsyncMock() servicex.get_transform_status.side_effect = [ transform_status1, transform_status3, ] + + if not use_s3_polling: + servicex.get_transformation_results = AsyncMock() + servicex.get_transformation_results.return_value = [ + ServiceXFile( + filename="file1.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ServiceXFile( + filename="file2.txt", + created_at=datetime.datetime.now(datetime.timezone.utc), + ), + ] + mock_minio = AsyncMock() - mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) mock_minio.download_file = AsyncMock( side_effect=lambda a, _, shorten_filename: PurePath(a) ) mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"]) + if use_s3_polling: + mock_minio.list_bucket = AsyncMock(return_value=[file1, file2]) + mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio) did = FileListDataset("/foo/bar/baz.root") @@ -582,10 +417,13 @@ async def test_use_of_cache_s3_polling(mocker): upd.assert_not_called() upd.reset_mock() assert mock_minio.get_signed_url.await_count == 2 - # second round, should hit the cache (and not call the sx_adapter, minio, or update_record) + with ExpandableProgress(display_progress=False) as progress: servicex2 = AsyncMock() - mock_minio.list_bucket.reset_mock() + if use_s3_polling: + mock_minio.list_bucket.reset_mock() + else: + servicex.get_transformation_results.reset_mock() mock_minio.get_signed_url.reset_mock() datasource2 = Query( dataset_identifier=did, @@ -603,15 +441,22 @@ async def test_use_of_cache_s3_polling(mocker): signed_urls_only=True, expandable_progress=progress ) servicex2.assert_not_awaited() - mock_minio.list_bucket.assert_not_awaited() + if use_s3_polling: + mock_minio.list_bucket.assert_not_awaited() + else: + servicex.get_transformation_results.assert_not_awaited() mock_minio.get_signed_url.assert_not_awaited() upd.assert_not_called() assert result1 == result2 upd.reset_mock() servicex.get_transform_status.reset_mock(side_effect=True) servicex.get_transform_status.return_value = transform_status3 - mock_minio.list_bucket.reset_mock(side_effect=True) - # third round, should hit the cache and download files (and call update_record) + + if use_s3_polling: + mock_minio.list_bucket.reset_mock(side_effect=True) + else: + servicex.get_transformation_results.reset_mock(side_effect=True) + with ExpandableProgress(display_progress=False) as progress: await datasource.submit_and_download( signed_urls_only=False, expandable_progress=progress @@ -619,15 +464,21 @@ async def test_use_of_cache_s3_polling(mocker): servicex.assert_not_awaited() assert mock_minio.download_file.await_count == 2 upd.assert_called_once() - # fourth round, should hit the cache (and nothing else) - mock_minio.list_bucket.reset_mock() + + if use_s3_polling: + mock_minio.list_bucket.reset_mock() + else: + servicex.get_transformation_results.reset_mock() mock_minio.download_file.reset_mock() with ExpandableProgress(display_progress=False) as progress: await datasource.submit_and_download( signed_urls_only=False, expandable_progress=progress ) servicex.assert_not_awaited() - mock_minio.list_bucket.assert_not_awaited() + if use_s3_polling: + mock_minio.list_bucket.assert_not_awaited() + else: + servicex.get_transformation_results.assert_not_awaited() mock_minio.download_file.assert_not_awaited() upd.assert_called_once() cache.close() From 7db79e14d7d11bfced91d8154fc87a2653732ac0 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 12 Jun 2025 12:15:20 -0500 Subject: [PATCH 25/35] fix breaking test after merging master in --- servicex/query_core.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/servicex/query_core.py b/servicex/query_core.py index a731153a..66631c85 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -562,14 +562,18 @@ async def get_signed_url( progress.advance(task_id=download_progress, task_type="Download") later_than = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + + use_local_polling = ( + "poll_local_transformation_results" + in await self.servicex.get_servicex_capabilities() + ) while True: if not cached_record: await asyncio.sleep(self.minio_polling_interval) if self.minio: # if self.minio exists, self.current_status will too if self.current_status.files_completed > len(files_seen): - capabilities = await self.servicex.get_servicex_capabilities() - if "poll_local_transformation_results" in capabilities: + if use_local_polling: files = await self.servicex.get_transformation_results( self.current_status.request_id, later_than ) @@ -592,6 +596,10 @@ async def get_signed_url( ) ) else: + if use_local_polling: + expected_size = None + else: + expected_size = file.size download_tasks.append( loop.create_task( download_file( @@ -600,7 +608,7 @@ async def get_signed_url( progress, download_progress, shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501 - expected_size=file.size, + expected_size=expected_size, ) ) ) # NOQA 501 From 6e20fe6cab9e79f76710218971148ebe38193b1d Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 12 Jun 2025 12:25:45 -0500 Subject: [PATCH 26/35] try to fix breaking test on server --- tests/test_servicex_adapter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index ff081a92..42fa0d56 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -99,6 +99,7 @@ async def test_get_transforms_auth_error(mock_get, servicex): async def test_get_transforms_wlcg_bearer_token( decode, servicex, transform_status_response ): + servicex.get_servicex_capabilities = AsyncMock(return_value=[]) token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False) token_file.write( """" From 270bd8b530ce54f6526109d247f09f3a05670e2f Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 12 Jun 2025 13:29:15 -0500 Subject: [PATCH 27/35] improve code coverage --- tests/test_servicex_adapter.py | 126 ++++++++++++++++++++++++++++++++- 1 file changed, 125 insertions(+), 1 deletion(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 42fa0d56..971468b4 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -36,7 +36,12 @@ from aiohttp import ContentTypeError from pytest_asyncio import fixture -from servicex.models import TransformRequest, ResultDestination, ResultFormat +from servicex.models import ( + TransformRequest, + ResultDestination, + ResultFormat, + ServiceXInfo, +) from servicex.servicex_adapter import ServiceXAdapter, AuthorizationError @@ -627,3 +632,122 @@ async def test_get_transformation_results_server_error( "later_than": now.isoformat(), }, ) + + +def test_get_bearer_token_file(tmp_path, monkeypatch): + token_file = tmp_path / "btf" + token_file.write_text("bearer123") + monkeypatch.setenv("BEARER_TOKEN_FILE", str(token_file)) + assert ServiceXAdapter._get_bearer_token_file() == "bearer123" + monkeypatch.delenv("BEARER_TOKEN_FILE", raising=False) + assert ServiceXAdapter._get_bearer_token_file() is None + + +@patch("servicex.servicex_adapter.jwt.decode", return_value={"exp": 1600000000}) +def test_get_token_expiration_success(decode): + assert ServiceXAdapter._get_token_expiration("dummy") == 1600000000 + + +@patch("servicex.servicex_adapter.jwt.decode", return_value={"sub": "noexp"}) +def test_get_token_expiration_no_exp(decode): + with pytest.raises(RuntimeError): + ServiceXAdapter._get_token_expiration("dummy") + + +@pytest.mark.asyncio +async def test_get_authorization_no_token_no_refresh(servicex, monkeypatch): + monkeypatch.delenv("BEARER_TOKEN_FILE", raising=False) + headers = await servicex._get_authorization() + assert headers == {} + + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.jwt.decode", return_value={"exp": time.time() + 120}) +async def test_get_authorization_with_valid_token(decode, servicex): + servicex.token = "tok123" + headers = await servicex._get_authorization() + assert headers == {"Authorization": "Bearer tok123"} + + +@pytest.mark.asyncio +async def test_get_authorization_with_refresh(monkeypatch): + s = ServiceXAdapter("https://servicex.org", refresh_token="rftok") + monkeypatch.delenv("BEARER_TOKEN_FILE", raising=False) + + async def fake_get_token(self): + self.token = "newtoken" + + monkeypatch.setattr(ServiceXAdapter, "_get_token", fake_get_token) + headers = await s._get_authorization() + assert headers == {"Authorization": "Bearer newtoken"} + + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.RetryClient.get") +async def test_get_servicex_info_success(mock_get, servicex): + mock_get.return_value.__aenter__.return_value.status = 200 + mock_get.return_value.__aenter__.return_value.json = AsyncMock( + return_value={ + "capabilities": ["a", "b"], + "app-version": "1.0", + "code-gen-image": {"func_adl": "image1", "uproot": "image2"}, + } + ) + info = await servicex.get_servicex_info() + assert isinstance(info, ServiceXInfo) + assert info.capabilities == ["a", "b"] + + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.RetryClient.get") +async def test_get_servicex_info_auth_error(mock_get, servicex): + mock_get.return_value.__aenter__.return_value.status = 401 + with pytest.raises(AuthorizationError): + await servicex.get_servicex_info() + + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.RetryClient.get") +async def test_get_servicex_info_server_error(mock_get, servicex): + mock_get.return_value.__aenter__.return_value.status = 500 + mock_get.return_value.__aenter__.return_value.json = AsyncMock( + return_value={"message": "oops"} + ) + with pytest.raises(RuntimeError) as e: + await servicex.get_servicex_info() + assert "ServiceX WebAPI Error during transformation submission: 500 - oops" in str( + e.value + ) + + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.ClientSession.get") +async def test_get_transformation_results_parsing(mock_get, servicex): + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) + msg_time = datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) + mock_get.return_value.__aenter__.return_value.status = 200 + mock_get.return_value.__aenter__.return_value.json = AsyncMock( + return_value={ + "results": [ + {"file-path": "dir1/file.txt", "created_at": msg_time.isoformat()} + ] + } + ) + res = await servicex.get_transformation_results("id123", None) + assert len(res) == 1 + assert res[0].filename == "dir1:file.txt" + assert res[0].created_at == msg_time + + +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.ClientSession.get") +async def test_get_transformation_results_empty(mock_get, servicex): + servicex.get_servicex_capabilities = AsyncMock( + return_value=["poll_local_transformation_results"] + ) + mock_get.return_value.__aenter__.return_value.status = 200 + mock_get.return_value.__aenter__.return_value.json = AsyncMock(return_value={}) + res = await servicex.get_transformation_results("id123", None) + assert res == [] From fc29f5912741958be5d62c6fb07068f4217b967f Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 12 Jun 2025 13:39:35 -0500 Subject: [PATCH 28/35] temporarily remove failing test in server (but not locally) --- tests/test_servicex_adapter.py | 54 ++++++++++++++++------------------ 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 971468b4..1a54ab7b 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -25,8 +25,6 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import os -import tempfile import time import datetime from unittest.mock import patch, AsyncMock @@ -99,32 +97,32 @@ async def test_get_transforms_auth_error(mock_get, servicex): assert "Not authorized to access serviceX at" in str(err.value) -@pytest.mark.asyncio -@patch("servicex.servicex_adapter.jwt.decode") -async def test_get_transforms_wlcg_bearer_token( - decode, servicex, transform_status_response -): - servicex.get_servicex_capabilities = AsyncMock(return_value=[]) - token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False) - token_file.write( - """" - eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c - """ - ) - token_file.close() - - with patch.dict(os.environ, {"BEARER_TOKEN_FILE": token_file.name}): - # Try with no expiration at all - with pytest.raises(RuntimeError): - await servicex.get_transforms() - - # Try with an expired token - with pytest.raises(AuthorizationError) as err: - decode.return_value = {"exp": 0.0} - await servicex.get_transforms() - assert "ServiceX access token request rejected:" in str(err.value) - - os.remove(token_file.name) +# @pytest.mark.asyncio +# @patch("servicex.servicex_adapter.jwt.decode") +# async def test_get_transforms_wlcg_bearer_token( +# decode, servicex, transform_status_response +# ): +# servicex.get_servicex_capabilities = AsyncMock(return_value=[]) +# token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False) +# token_file.write( +# """" +# eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c +# """ +# ) +# token_file.close() +# +# with patch.dict(os.environ, {"BEARER_TOKEN_FILE": token_file.name}): +# # Try with no expiration at all +# with pytest.raises(RuntimeError): +# await servicex.get_transforms() +# +# # Try with an expired token +# with pytest.raises(AuthorizationError) as err: +# decode.return_value = {"exp": 0.0} +# await servicex.get_transforms() +# assert "ServiceX access token request rejected:" in str(err.value) +# +# os.remove(token_file.name) @pytest.mark.asyncio From bf49d248b1b6e3c945127ceabebeb2d842bc95ee Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Thu, 12 Jun 2025 14:05:51 -0500 Subject: [PATCH 29/35] attempt to fix breaking server test --- tests/test_servicex_adapter.py | 59 +++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 1a54ab7b..d9ec7387 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -25,6 +25,8 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import os +import tempfile import time import datetime from unittest.mock import patch, AsyncMock @@ -97,32 +99,37 @@ async def test_get_transforms_auth_error(mock_get, servicex): assert "Not authorized to access serviceX at" in str(err.value) -# @pytest.mark.asyncio -# @patch("servicex.servicex_adapter.jwt.decode") -# async def test_get_transforms_wlcg_bearer_token( -# decode, servicex, transform_status_response -# ): -# servicex.get_servicex_capabilities = AsyncMock(return_value=[]) -# token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False) -# token_file.write( -# """" -# eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c -# """ -# ) -# token_file.close() -# -# with patch.dict(os.environ, {"BEARER_TOKEN_FILE": token_file.name}): -# # Try with no expiration at all -# with pytest.raises(RuntimeError): -# await servicex.get_transforms() -# -# # Try with an expired token -# with pytest.raises(AuthorizationError) as err: -# decode.return_value = {"exp": 0.0} -# await servicex.get_transforms() -# assert "ServiceX access token request rejected:" in str(err.value) -# -# os.remove(token_file.name) +@pytest.mark.asyncio +@patch("servicex.servicex_adapter.RetryClient.get") +@patch("servicex.servicex_adapter.jwt.decode") +async def test_get_transforms_wlcg_bearer_token( + decode, http_get, servicex, transform_status_response +): + http_get.return_value.__aenter__.return_value.json.return_value = ( + transform_status_response + ) + http_get.return_value.__aenter__.return_value.status = 200 + servicex.get_servicex_capabilities = AsyncMock(return_value=[]) + token_file = tempfile.NamedTemporaryFile(mode="w+t", delete=False) + token_file.write( + """" + eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c + """ + ) + token_file.close() + + with patch.dict(os.environ, {"BEARER_TOKEN_FILE": token_file.name}): + # Try with no expiration at all + with pytest.raises(RuntimeError): + await servicex.get_transforms() + + # Try with an expired token + with pytest.raises(AuthorizationError) as err: + decode.return_value = {"exp": 0.0} + await servicex.get_transforms() + assert "ServiceX access token request rejected:" in str(err.value) + + os.remove(token_file.name) @pytest.mark.asyncio From aa3f2fbf7f3133fb51e510254f0de3b82b109e5e Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 17 Jun 2025 13:29:55 -0500 Subject: [PATCH 30/35] fix broken test --- tests/test_servicex_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 3ff1917c..fe406be5 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -745,7 +745,7 @@ async def test_get_transformation_results_parsing(mock_get, servicex): "results": [ { "file-path": "dir1/file.txt", - "s3-object-name": "dir1/file.txt", + "s3-object-name": "dir1:file.txt", "created_at": msg_time.isoformat(), } ] From 72c7bca598c6e7e6df597009752402456bb9a824 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Mon, 23 Jun 2025 14:20:38 -0500 Subject: [PATCH 31/35] update tests --- servicex/minio_adapter.py | 7 ++- servicex/query_core.py | 5 +- servicex/servicex_adapter.py | 4 +- tests/test_dataset.py | 4 ++ tests/test_servicex_adapter.py | 97 +++++++++++++++++----------------- tests/test_servicex_dataset.py | 9 ++++ 6 files changed, 74 insertions(+), 52 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index e7dc89da..96b6f87d 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -128,11 +128,14 @@ async def download_file( ) async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3: + sanitized_object_name = object_name.replace("/", ":") if expected_size is not None: remotesize = expected_size else: async with _file_transfer_sem: - info = await s3.head_object(Bucket=self.bucket, Key=object_name) + info = await s3.head_object( + Bucket=self.bucket, Key=sanitized_object_name + ) remotesize = info["ContentLength"] if path.exists(): # if file size is the same, let's not download anything @@ -143,7 +146,7 @@ async def download_file( async with _file_transfer_sem: await s3.download_file( Bucket=self.bucket, - Key=object_name, + Key=sanitized_object_name, Filename=path.as_posix(), Config=_transferconfig, ) diff --git a/servicex/query_core.py b/servicex/query_core.py index acd468fb..9165b447 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -583,6 +583,9 @@ async def get_signed_url( for file in files: filename = file.filename + if use_local_polling: + filename = filename.replace("/", ":") + if filename != "" and filename not in files_seen: if signed_urls_only: download_tasks.append( @@ -597,7 +600,7 @@ async def get_signed_url( ) else: if use_local_polling: - expected_size = None + expected_size = file.total_bytes else: expected_size = file.size download_tasks.append( diff --git a/servicex/servicex_adapter.py b/servicex/servicex_adapter.py index b8da15e4..25f7b8db 100644 --- a/servicex/servicex_adapter.py +++ b/servicex/servicex_adapter.py @@ -59,6 +59,7 @@ class AuthorizationError(BaseException): class ServiceXFile: created_at: datetime.datetime filename: str + total_bytes: int async def _extract_message(r: Response): @@ -289,7 +290,7 @@ async def get_transformation_results( params["later_than"] = later_than.isoformat() async with AsyncClient() as session: - r = await session.get(headers=headers, url=url) + r = await session.get(headers=headers, url=url, params=params) if r.status_code == 403: raise AuthorizationError( f"Not authorized to access serviceX at {self.url}" @@ -310,6 +311,7 @@ async def get_transformation_results( created_at=datetime.datetime.fromisoformat( result["created_at"] ).replace(tzinfo=datetime.timezone.utc), + total_bytes=result["total-bytes"], ) response.append(file) return response diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 37d07aaa..1bf2446f 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -137,10 +137,12 @@ async def test_download_files(python_dataset): ServiceXFile( filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc), + total_bytes=100, ), ServiceXFile( filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc), + total_bytes=100, ), ] @@ -182,10 +184,12 @@ async def test_download_files_with_signed_urls(python_dataset): ServiceXFile( filename="file1.txt", created_at=datetime.datetime.now(datetime.timezone.utc), + total_bytes=100, ), ServiceXFile( filename="file2.txt", created_at=datetime.datetime.now(datetime.timezone.utc), + total_bytes=100, ), ] diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 240a2380..5b2c049d 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -98,11 +98,11 @@ async def test_get_transforms_auth_error(mock_get, servicex): @pytest.mark.asyncio -@patch("servicex.servicex_adapter.RetryClient.post") -@patch("servicex.servicex_adapter.RetryClient.get") +@patch("servicex.servicex_adapter.AsyncClient.get") +@patch("servicex.servicex_adapter.AsyncClient.post") @patch("servicex.servicex_adapter.jwt.decode") async def test_get_transforms_wlcg_bearer_token( - decode, http_get, post, servicex, transform_status_response + decode, post, http_get, servicex, transform_status_response ): http_get.return_value.__aenter__.return_value.json.return_value = ( transform_status_response @@ -523,32 +523,29 @@ async def test_get_authorization(servicex): @pytest.mark.asyncio -@patch("servicex.servicex_adapter.ClientSession.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_transformation_results_success(get, servicex): servicex.get_servicex_capabilities = AsyncMock( return_value=["poll_local_transformation_results"] ) - get.return_value.__aenter__.return_value.status = 200 - get.return_value.__aenter__.return_value.json = AsyncMock( - return_value={ - "results": [ - { - "file-path": "file1.txt", - "s3-object-name": "file1.txt", - "created_at": datetime.datetime.now( - datetime.timezone.utc - ).isoformat(), - }, - { - "file-path": "file2.txt", - "s3-object-name": "file2.txt", - "created_at": datetime.datetime.now( - datetime.timezone.utc - ).isoformat(), - }, - ] - } - ) + get.return_value = MagicMock() + get.return_value.json.return_value = { + "results": [ + { + "file-path": "file1.txt", + "total-bytes": 100, + "s3-object-name": "file1.txt", + "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), + }, + { + "file-path": "file2.txt", + "total-bytes": 100, + "s3-object-name": "file2.txt", + "created_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), + }, + ] + } + get.return_value.status_code = 200 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -564,7 +561,7 @@ async def test_get_transformation_results_success(get, servicex): @pytest.mark.asyncio -@patch("servicex.servicex_adapter.ClientSession.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_transformation_results_no_feature_flag(get, servicex): servicex.get_servicex_capabilities = AsyncMock(return_value=[]) request_id = "123-45-6789" @@ -574,14 +571,16 @@ async def test_get_transformation_results_no_feature_flag(get, servicex): @pytest.mark.asyncio -@patch("servicex.servicex_adapter.ClientSession.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_transformation_results_not_found( get_transformation_results, servicex ): servicex.get_servicex_capabilities = AsyncMock( return_value=["poll_local_transformation_results"] ) - get_transformation_results.return_value.__aenter__.return_value.status = 404 + get_transformation_results.return_value = MagicMock() + get_transformation_results.return_value.status_code = 404 + request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -598,14 +597,15 @@ async def test_get_transformation_results_not_found( @pytest.mark.asyncio -@patch("servicex.servicex_adapter.ClientSession.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_transformation_results_not_authorized( get_transformation_results, servicex ): servicex.get_servicex_capabilities = AsyncMock( return_value=["poll_local_transformation_results"] ) - get_transformation_results.return_value.__aenter__.return_value.status = 403 + get_transformation_results.return_value = MagicMock() + get_transformation_results.return_value.status_code = 403 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -622,14 +622,15 @@ async def test_get_transformation_results_not_authorized( @pytest.mark.asyncio -@patch("servicex.servicex_adapter.ClientSession.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_transformation_results_server_error( get_transformation_results, servicex ): servicex.get_servicex_capabilities = AsyncMock( return_value=["poll_local_transformation_results"] ) - get_transformation_results.return_value.__aenter__.return_value.status = 500 + get_transformation_results.return_value = MagicMock() + get_transformation_results.return_value.status = 500 request_id = "123-45-6789" now = datetime.datetime.now(datetime.timezone.utc) @@ -694,10 +695,10 @@ async def fake_get_token(self): @pytest.mark.asyncio -@patch("servicex.servicex_adapter.RetryClient.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_servicex_info_success(mock_get, servicex): - mock_get.return_value.__aenter__.return_value.status = 200 - mock_get.return_value.__aenter__.return_value.json = AsyncMock( + mock_get.return_value.status_code = 200 + mock_get.return_value.json = MagicMock( return_value={ "capabilities": ["a", "b"], "app-version": "1.0", @@ -710,20 +711,18 @@ async def test_get_servicex_info_success(mock_get, servicex): @pytest.mark.asyncio -@patch("servicex.servicex_adapter.RetryClient.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_servicex_info_auth_error(mock_get, servicex): - mock_get.return_value.__aenter__.return_value.status = 401 + mock_get.return_value.status_code = 401 with pytest.raises(AuthorizationError): await servicex.get_servicex_info() @pytest.mark.asyncio -@patch("servicex.servicex_adapter.RetryClient.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_servicex_info_server_error(mock_get, servicex): - mock_get.return_value.__aenter__.return_value.status = 500 - mock_get.return_value.__aenter__.return_value.json = AsyncMock( - return_value={"message": "oops"} - ) + mock_get.return_value.status_code = 500 + mock_get.return_value.json = MagicMock(return_value={"message": "oops"}) with pytest.raises(RuntimeError) as e: await servicex.get_servicex_info() assert "ServiceX WebAPI Error during transformation submission: 500 - oops" in str( @@ -732,19 +731,21 @@ async def test_get_servicex_info_server_error(mock_get, servicex): @pytest.mark.asyncio -@patch("servicex.servicex_adapter.ClientSession.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_transformation_results_parsing(mock_get, servicex): servicex.get_servicex_capabilities = AsyncMock( return_value=["poll_local_transformation_results"] ) msg_time = datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) - mock_get.return_value.__aenter__.return_value.status = 200 - mock_get.return_value.__aenter__.return_value.json = AsyncMock( + mock_get.return_value = MagicMock() + mock_get.return_value.status_code = 200 + mock_get.return_value.json = MagicMock( return_value={ "results": [ { "file-path": "dir1/file.txt", "s3-object-name": "dir1:file.txt", + "total-bytes": 100, "created_at": msg_time.isoformat(), } ] @@ -757,12 +758,12 @@ async def test_get_transformation_results_parsing(mock_get, servicex): @pytest.mark.asyncio -@patch("servicex.servicex_adapter.ClientSession.get") +@patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_transformation_results_empty(mock_get, servicex): servicex.get_servicex_capabilities = AsyncMock( return_value=["poll_local_transformation_results"] ) - mock_get.return_value.__aenter__.return_value.status = 200 - mock_get.return_value.__aenter__.return_value.json = AsyncMock(return_value={}) + mock_get.return_value.status_code = 200 + mock_get.return_value.json = MagicMock(return_value={"results": []}) res = await servicex.get_transformation_results("id123", None) assert res == [] diff --git a/tests/test_servicex_dataset.py b/tests/test_servicex_dataset.py index 6b2032f8..4299a3c2 100644 --- a/tests/test_servicex_dataset.py +++ b/tests/test_servicex_dataset.py @@ -220,16 +220,19 @@ async def test_submit(mocker, use_s3_polling): [ ServiceXFile( filename="file1", + total_bytes=100, created_at=datetime.datetime.now(datetime.timezone.utc), ) ], [ ServiceXFile( filename="file1", + total_bytes=100, created_at=datetime.datetime.now(datetime.timezone.utc), ), ServiceXFile( filename="file2", + total_bytes=100, created_at=datetime.datetime.now(datetime.timezone.utc), ), ], @@ -297,12 +300,14 @@ async def test_submit_partial_success(mocker, use_s3_polling): ServiceXFile( filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc), + total_bytes=100, ) ], [ ServiceXFile( filename="file1", created_at=datetime.datetime.now(datetime.timezone.utc), + total_bytes=100, ) ], ] @@ -374,10 +379,12 @@ async def test_use_of_cache(mocker, use_s3_polling): servicex.get_transformation_results.return_value = [ ServiceXFile( filename="file1.txt", + total_bytes=100, created_at=datetime.datetime.now(datetime.timezone.utc), ), ServiceXFile( filename="file2.txt", + total_bytes=100, created_at=datetime.datetime.now(datetime.timezone.utc), ), ] @@ -704,10 +711,12 @@ async def test_use_of_ignore_cache(mocker, servicex): servicex.get_transformation_results.return_value = [ ServiceXFile( filename="file1.txt", + total_bytes=100, created_at=datetime.datetime.now(datetime.timezone.utc), ), ServiceXFile( filename="file2.txt", + total_bytes=100, created_at=datetime.datetime.now(datetime.timezone.utc), ), ] From b0eee6c0bf87f6eba788b6b25f50b6a200e63766 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 24 Jun 2025 10:30:04 -0500 Subject: [PATCH 32/35] update pyproject --- pyproject.toml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f606f7c3..b2b12588 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,8 +36,8 @@ dependencies = [ "func_adl>=3.2.6", "requests>=2.31", "pydantic>=2.6.0", - "aiohttp-retry>=2.8.3", "httpx>=0.24", + "httpx_retries>=0.3.2", "aioboto3>=14.1.0", "tinydb>=4.7", "google-auth>=2.17", @@ -128,3 +128,10 @@ dynamic_context = "test_function" [tool.pytest.ini_options] asyncio_mode = "auto" + +[tool.hatch.envs.test] +features = ["test"] + +[tool.hatch.envs.test.scripts] +test = "pytest {args}" +cov = "pytest --cov=servicex {args}" From 925c358a7226680bbcd89e9b4cdfb6d725fc97d8 Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Tue, 24 Jun 2025 14:46:09 -0500 Subject: [PATCH 33/35] get s3 file name directly from API --- servicex/minio_adapter.py | 7 ++----- servicex/query_core.py | 3 --- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/servicex/minio_adapter.py b/servicex/minio_adapter.py index 96b6f87d..e7dc89da 100644 --- a/servicex/minio_adapter.py +++ b/servicex/minio_adapter.py @@ -128,14 +128,11 @@ async def download_file( ) async with self.minio.client("s3", endpoint_url=self.endpoint_host) as s3: - sanitized_object_name = object_name.replace("/", ":") if expected_size is not None: remotesize = expected_size else: async with _file_transfer_sem: - info = await s3.head_object( - Bucket=self.bucket, Key=sanitized_object_name - ) + info = await s3.head_object(Bucket=self.bucket, Key=object_name) remotesize = info["ContentLength"] if path.exists(): # if file size is the same, let's not download anything @@ -146,7 +143,7 @@ async def download_file( async with _file_transfer_sem: await s3.download_file( Bucket=self.bucket, - Key=sanitized_object_name, + Key=object_name, Filename=path.as_posix(), Config=_transferconfig, ) diff --git a/servicex/query_core.py b/servicex/query_core.py index 9165b447..6549c0df 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -583,9 +583,6 @@ async def get_signed_url( for file in files: filename = file.filename - if use_local_polling: - filename = filename.replace("/", ":") - if filename != "" and filename not in files_seen: if signed_urls_only: download_tasks.append( From 7ddfc605a9fed16219fada747eb3b98f162b13bc Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 25 Jun 2025 13:51:27 -0500 Subject: [PATCH 34/35] add warning notification and additional testing --- servicex/query_core.py | 8 ++++++ tests/test_servicex_adapter.py | 45 ++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/servicex/query_core.py b/servicex/query_core.py index 6549c0df..ad2341ef 100644 --- a/servicex/query_core.py +++ b/servicex/query_core.py @@ -567,6 +567,14 @@ async def get_signed_url( "poll_local_transformation_results" in await self.servicex.get_servicex_capabilities() ) + + if not use_local_polling: + logger.warning( + "ServiceX is using legacy S3 bucket polling. Future versions of the " + "ServiceX client will not support this method. Please update your " + "ServiceX server to the latest version." + ) + while True: if not cached_record: await asyncio.sleep(self.minio_polling_interval) diff --git a/tests/test_servicex_adapter.py b/tests/test_servicex_adapter.py index 884230a1..4d9f4bd1 100644 --- a/tests/test_servicex_adapter.py +++ b/tests/test_servicex_adapter.py @@ -732,6 +732,51 @@ async def test_get_servicex_info_server_error(mock_get, servicex): ) +@pytest.mark.asyncio +async def test_get_servicex_info_caching(servicex): + servicex_info_data = { + "capabilities": ["a", "b"], + "app-version": "1.0", + "code-gen-image": {"func_adl": "image1", "uproot": "image2"}, + } + + with patch("servicex.servicex_adapter.AsyncClient.get") as mock_get: + mock_get.return_value.status_code = 200 + mock_get.return_value.json = MagicMock(return_value=servicex_info_data) + + info1 = await servicex.get_servicex_info() + assert isinstance(info1, ServiceXInfo) + assert info1.capabilities == ["a", "b"] + assert mock_get.call_count == 1 + + # Second call should use cached ServiceXInfo without additional HTTP request + info2 = await servicex.get_servicex_info() + assert info2 is info1 + assert mock_get.call_count == 1 + + +@pytest.mark.asyncio +async def test_get_servicex_capabilities(servicex): + servicex_info_data = { + "capabilities": ["feature1", "feature2", "feature3"], + "app-version": "1.0", + "code-gen-image": {"func_adl": "image1", "uproot": "image2"}, + } + + with patch("servicex.servicex_adapter.AsyncClient.get") as mock_get: + mock_get.return_value.status_code = 200 + mock_get.return_value.json = MagicMock(return_value=servicex_info_data) + + capabilities1 = await servicex.get_servicex_capabilities() + assert capabilities1 == ["feature1", "feature2", "feature3"] + assert mock_get.call_count == 1 + + # Second call should use cached ServiceXInfo without additional HTTP request + capabilities2 = await servicex.get_servicex_capabilities() + assert capabilities2 == ["feature1", "feature2", "feature3"] + assert mock_get.call_count == 1 + + @pytest.mark.asyncio @patch("servicex.servicex_adapter.AsyncClient.get") async def test_get_transformation_results_parsing(mock_get, servicex): From 23a0f1349c7035671a19cb0be75c95ec08c227ce Mon Sep 17 00:00:00 2001 From: Matt Shirley Date: Wed, 25 Jun 2025 18:30:41 -0500 Subject: [PATCH 35/35] remove pyproject changes --- pyproject.toml | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b2b12588..a3a7acf5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -128,10 +128,3 @@ dynamic_context = "test_function" [tool.pytest.ini_options] asyncio_mode = "auto" - -[tool.hatch.envs.test] -features = ["test"] - -[tool.hatch.envs.test.scripts] -test = "pytest {args}" -cov = "pytest --cov=servicex {args}"