Skip to content

remove s3 bucket polling when waiting for transformation results #587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
90b6173
remove s3 bucket polling when waiting for transformation results
MattShirley May 6, 2025
ee7b02c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 6, 2025
5b04365
add begin_at, update tests
MattShirley May 14, 2025
85b5346
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 14, 2025
b80d72d
use python3.9 compliant method to get UTC time
MattShirley May 14, 2025
ccfe38c
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 14, 2025
36cb4aa
fix breaking tests
MattShirley May 14, 2025
eb9a937
add additional test coverage support
MattShirley May 20, 2025
bf805bb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 20, 2025
9492997
flake8 compliance
MattShirley May 20, 2025
90a0a6b
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 20, 2025
ab9d0cf
code coverage improvement
MattShirley May 20, 2025
3644ee1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 20, 2025
9b506c5
add ServiceXFile class and refactor tests to match previous
MattShirley May 22, 2025
932a7e9
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
6d18ff3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 22, 2025
04ae90c
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley May 22, 2025
c1da28f
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
ffd87ad
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley May 22, 2025
98392dd
resolve breaking tests
MattShirley May 22, 2025
8e952eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 22, 2025
232be85
flake8
MattShirley May 22, 2025
0f094c0
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
b79cb26
flake8
MattShirley May 22, 2025
c8ad74c
flake8
MattShirley May 22, 2025
489e05d
fix for python3.9
MattShirley May 22, 2025
4e01dee
remove extraneous print call
MattShirley May 22, 2025
ee0423f
simplify typing
MattShirley Jun 4, 2025
f5bf546
add feature branching
MattShirley Jun 11, 2025
5bbe40f
add testing
MattShirley Jun 11, 2025
201531a
remove duplicated test
MattShirley Jun 11, 2025
268592f
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley Jun 12, 2025
7db79e1
fix breaking test after merging master in
MattShirley Jun 12, 2025
6e20fe6
try to fix breaking test on server
MattShirley Jun 12, 2025
270bd8b
improve code coverage
MattShirley Jun 12, 2025
fc29f59
temporarily remove failing test in server (but not locally)
MattShirley Jun 12, 2025
bf49d24
attempt to fix breaking server test
MattShirley Jun 12, 2025
b49edc7
pull master and fix tests
MattShirley Jun 17, 2025
aa3f2fb
fix broken test
MattShirley Jun 17, 2025
9785fc6
update models
MattShirley Jun 20, 2025
72c7bca
update tests
MattShirley Jun 23, 2025
d218879
resolve merge conflicts
MattShirley Jun 23, 2025
b0eee6c
update pyproject
MattShirley Jun 24, 2025
925c358
get s3 file name directly from API
MattShirley Jun 24, 2025
7ddfc60
add warning notification and additional testing
MattShirley Jun 25, 2025
23a0f13
remove pyproject changes
MattShirley Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions servicex/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ class TransformedResults(DocStringBaseModel):
"""URL for looking up logs on the ServiceX server"""


class ServiceXInfo(DocStringBaseModel):
r"""
Model for ServiceX Info properties
"""

app_version: str = Field(alias="app-version")
code_gen_image: dict[str, str] = Field(alias="code-gen-image")
capabilities: list[str] = Field(default_factory=list)


class DatasetFile(BaseModel):
"""
Model for a file in a cached dataset
Expand Down
50 changes: 43 additions & 7 deletions servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import annotations

import datetime
import abc
import asyncio
from abc import ABC
Expand Down Expand Up @@ -342,13 +343,17 @@ def transform_complete(task: Task):

download_files_task = loop.create_task(
self.download_files(
signed_urls_only, expandable_progress, download_progress, cached_record
signed_urls_only,
expandable_progress,
download_progress,
cached_record,
)
)

try:
signed_urls = []
downloaded_files = []

download_result = await download_files_task
if signed_urls_only:
signed_urls = download_result
Expand Down Expand Up @@ -522,6 +527,7 @@ async def download_files(
Task to monitor the list of files in the transform output's bucket. Any new files
will be downloaded.
"""

files_seen = set()
result_uris = []
download_tasks = []
Expand Down Expand Up @@ -555,40 +561,70 @@ async def get_signed_url(
if progress:
progress.advance(task_id=download_progress, task_type="Download")

later_than = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)

use_local_polling = (
"poll_local_transformation_results"
in await self.servicex.get_servicex_capabilities()
)

if not use_local_polling:
logger.warning(
"ServiceX is using legacy S3 bucket polling. Future versions of the "
"ServiceX client will not support this method. Please update your "
"ServiceX server to the latest version."
)

while True:
if not cached_record:
await asyncio.sleep(self.minio_polling_interval)
if self.minio:
# if self.minio exists, self.current_status will too
if self.current_status.files_completed > len(files_seen):
files = await self.minio.list_bucket()
if use_local_polling:
files = await self.servicex.get_transformation_results(
self.current_status.request_id, later_than
)
else:
files = await self.minio.list_bucket()

for file in files:
if file.filename not in files_seen:
filename = file.filename

if filename != "" and filename not in files_seen:
if signed_urls_only:
download_tasks.append(
loop.create_task(
get_signed_url(
self.minio,
file.filename,
filename,
progress,
download_progress,
)
)
)
else:
if use_local_polling:
expected_size = file.total_bytes
else:
expected_size = file.size
download_tasks.append(
loop.create_task(
download_file(
self.minio,
file.filename,
filename,
progress,
download_progress,
shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501
expected_size=file.size,
expected_size=expected_size,
)
)
) # NOQA 501
files_seen.add(file.filename)
files_seen.add(filename)

if use_local_polling:
if file.created_at > later_than:
later_than = file.created_at

# Once the transform is complete and all files are seen we can stop polling.
# Also, if we are just downloading or signing urls for a previous transform
Expand Down
86 changes: 85 additions & 1 deletion servicex/servicex_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import time
import datetime
from typing import Optional, Dict, List
from dataclasses import dataclass

import httpx
from httpx import AsyncClient, Response
Expand All @@ -41,13 +43,25 @@
retry_if_not_exception_type,
)

from servicex.models import TransformRequest, TransformStatus, CachedDataset
from servicex.models import (
TransformRequest,
TransformStatus,
CachedDataset,
ServiceXInfo,
)


class AuthorizationError(BaseException):
pass


@dataclass
class ServiceXFile:
created_at: datetime.datetime
filename: str
total_bytes: int


async def _extract_message(r: Response):
try:
o = r.json()
Expand All @@ -63,6 +77,9 @@ def __init__(self, url: str, refresh_token: Optional[str] = None):
self.refresh_token = refresh_token
self.token = None

# interact with _servicex_info via get_servicex_info
self._servicex_info: Optional[ServiceXInfo] = None

async def _get_token(self):
url = f"{self.url}/token/refresh"
headers = {"Authorization": f"Bearer {self.refresh_token}"}
Expand Down Expand Up @@ -120,6 +137,31 @@ async def _get_authorization(self, force_reauth: bool = False) -> Dict[str, str]
await self._get_token()
return {"Authorization": f"Bearer {self.token}"}

async def get_servicex_info(self) -> ServiceXInfo:
if self._servicex_info:
return self._servicex_info

headers = await self._get_authorization()
retry_options = Retry(total=3, backoff_factor=10)
async with AsyncClient(transport=RetryTransport(retry=retry_options)) as client:
r = await client.get(url=f"{self.url}/servicex", headers=headers)
if r.status_code == 401:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation "
f"submission: {r.status_code} - {error_message}"
)
servicex_info = r.json()
self._servicex_info = ServiceXInfo(**servicex_info)
return self._servicex_info

async def get_servicex_capabilities(self) -> List[str]:
return (await self.get_servicex_info()).capabilities
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


async def get_transforms(self) -> List[TransformStatus]:
headers = await self._get_authorization()
retry_options = Retry(total=3, backoff_factor=10)
Expand Down Expand Up @@ -232,6 +274,48 @@ async def delete_transform(self, transform_id=None):
msg = await _extract_message(r)
raise RuntimeError(f"Failed to delete transform {transform_id} - {msg}")

async def get_transformation_results(
self, request_id: str, later_than: Optional[datetime.datetime] = None
):
if (
"poll_local_transformation_results"
not in await self.get_servicex_capabilities()
):
raise ValueError("ServiceX capabilities not found")

headers = await self._get_authorization()
url = self.url + f"/servicex/transformation/{request_id}/results"
params = {}
if later_than:
params["later_than"] = later_than.isoformat()

async with AsyncClient() as session:
r = await session.get(headers=headers, url=url, params=params)
if r.status_code == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)

if r.status_code == 404:
raise ValueError(f"Request {request_id} not found")

if r.status_code != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed with message: {msg}")

data = r.json()
response = list()
for result in data.get("results", []):
file = ServiceXFile(
filename=result["s3-object-name"],
created_at=datetime.datetime.fromisoformat(
result["created_at"]
).replace(tzinfo=datetime.timezone.utc),
total_bytes=result["total-bytes"],
)
response.append(file)
return response

async def cancel_transform(self, transform_id=None):
headers = await self._get_authorization()
path_template = f"/servicex/transformation/{transform_id}/cancel"
Expand Down
49 changes: 41 additions & 8 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pytest
import tempfile
import os
import datetime

from unittest.mock import AsyncMock, Mock, patch
from servicex.dataset_identifier import FileListDataset
Expand All @@ -44,6 +45,8 @@
)
from rich.progress import Progress

from servicex.servicex_adapter import ServiceXFile


@pytest.mark.asyncio
async def test_as_signed_urls_happy(transformed_result):
Expand Down Expand Up @@ -124,12 +127,27 @@ async def test_download_files(python_dataset):
minio_mock = AsyncMock()
config = Configuration(cache_path="temp_dir", api_endpoints=[])
python_dataset.configuration = config
python_dataset.servicex = AsyncMock()
python_dataset.servicex.get_servicex_capabilities = AsyncMock(
return_value=["poll_local_transformation_results"]
)

python_dataset.servicex.get_transformation_results = AsyncMock()
python_dataset.servicex.get_transformation_results.return_value = [
ServiceXFile(
filename="file1.txt",
created_at=datetime.datetime.now(datetime.timezone.utc),
total_bytes=100,
),
ServiceXFile(
filename="file2.txt",
created_at=datetime.datetime.now(datetime.timezone.utc),
total_bytes=100,
),
]

minio_mock.download_file.return_value = Path("/path/to/downloaded_file")
minio_mock.get_signed_url.return_value = Path("http://example.com/signed_url")
minio_mock.list_bucket.return_value = [
Mock(filename="file1.txt"),
Mock(filename="file2.txt"),
]

progress_mock = Mock()
python_dataset.minio_polling_interval = 0
Expand All @@ -154,12 +172,27 @@ async def test_download_files_with_signed_urls(python_dataset):
python_dataset.configuration = config
minio_mock.download_file.return_value = "/path/to/downloaded_file"
minio_mock.get_signed_url.return_value = "http://example.com/signed_url"
minio_mock.list_bucket.return_value = [
Mock(filename="file1.txt"),
Mock(filename="file2.txt"),
]
progress_mock = Mock()

python_dataset.servicex = AsyncMock()
python_dataset.servicex.get_servicex_capabilities = AsyncMock(
return_value=["poll_local_transformation_results"]
)

python_dataset.servicex.get_transformation_results = AsyncMock()
python_dataset.servicex.get_transformation_results.return_value = [
ServiceXFile(
filename="file1.txt",
created_at=datetime.datetime.now(datetime.timezone.utc),
total_bytes=100,
),
ServiceXFile(
filename="file2.txt",
created_at=datetime.datetime.now(datetime.timezone.utc),
total_bytes=100,
),
]

python_dataset.minio_polling_interval = 0
python_dataset.minio = minio_mock
python_dataset.current_status = Mock(status="Complete", files_completed=2)
Expand Down
Loading