Skip to content

remove s3 bucket polling when waiting for transformation results #587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
90b6173
remove s3 bucket polling when waiting for transformation results
MattShirley May 6, 2025
ee7b02c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 6, 2025
5b04365
add begin_at, update tests
MattShirley May 14, 2025
85b5346
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 14, 2025
b80d72d
use python3.9 compliant method to get UTC time
MattShirley May 14, 2025
ccfe38c
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 14, 2025
36cb4aa
fix breaking tests
MattShirley May 14, 2025
eb9a937
add additional test coverage support
MattShirley May 20, 2025
bf805bb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 20, 2025
9492997
flake8 compliance
MattShirley May 20, 2025
90a0a6b
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 20, 2025
ab9d0cf
code coverage improvement
MattShirley May 20, 2025
3644ee1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 20, 2025
9b506c5
add ServiceXFile class and refactor tests to match previous
MattShirley May 22, 2025
932a7e9
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
6d18ff3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 22, 2025
04ae90c
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley May 22, 2025
c1da28f
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
ffd87ad
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley May 22, 2025
98392dd
resolve breaking tests
MattShirley May 22, 2025
8e952eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 22, 2025
232be85
flake8
MattShirley May 22, 2025
0f094c0
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
b79cb26
flake8
MattShirley May 22, 2025
c8ad74c
flake8
MattShirley May 22, 2025
489e05d
fix for python3.9
MattShirley May 22, 2025
4e01dee
remove extraneous print call
MattShirley May 22, 2025
ee0423f
simplify typing
MattShirley Jun 4, 2025
f5bf546
add feature branching
MattShirley Jun 11, 2025
5bbe40f
add testing
MattShirley Jun 11, 2025
201531a
remove duplicated test
MattShirley Jun 11, 2025
268592f
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley Jun 12, 2025
7db79e1
fix breaking test after merging master in
MattShirley Jun 12, 2025
6e20fe6
try to fix breaking test on server
MattShirley Jun 12, 2025
270bd8b
improve code coverage
MattShirley Jun 12, 2025
fc29f59
temporarily remove failing test in server (but not locally)
MattShirley Jun 12, 2025
bf49d24
attempt to fix breaking server test
MattShirley Jun 12, 2025
b49edc7
pull master and fix tests
MattShirley Jun 17, 2025
aa3f2fb
fix broken test
MattShirley Jun 17, 2025
9785fc6
update models
MattShirley Jun 20, 2025
72c7bca
update tests
MattShirley Jun 23, 2025
d218879
resolve merge conflicts
MattShirley Jun 23, 2025
b0eee6c
update pyproject
MattShirley Jun 24, 2025
925c358
get s3 file name directly from API
MattShirley Jun 24, 2025
7ddfc60
add warning notification and additional testing
MattShirley Jun 25, 2025
23a0f13
remove pyproject changes
MattShirley Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import annotations

import datetime
import abc
import asyncio
from abc import ABC
Expand Down Expand Up @@ -318,6 +319,7 @@ def transform_complete(task: Task):
else None
)

begin_at = datetime.datetime.now(tz=datetime.timezone.utc)
if not cached_record:

if self.cache.is_transform_request_submitted(sx_request_hash):
Expand All @@ -342,13 +344,18 @@ def transform_complete(task: Task):

download_files_task = loop.create_task(
self.download_files(
signed_urls_only, expandable_progress, download_progress, cached_record
signed_urls_only,
expandable_progress,
download_progress,
cached_record,
begin_at,
)
)

try:
signed_urls = []
downloaded_files = []

download_result = await download_files_task
if signed_urls_only:
signed_urls = download_result
Expand Down Expand Up @@ -517,11 +524,13 @@ async def download_files(
progress: ExpandableProgress,
download_progress: TaskID,
cached_record: Optional[TransformedResults],
begin_at: datetime.datetime,
) -> List[str]:
"""
Task to monitor the list of files in the transform output's bucket. Any new files
will be downloaded.
"""

files_seen = set()
result_uris = []
download_tasks = []
Expand Down Expand Up @@ -557,15 +566,22 @@ async def get_signed_url(
if self.minio:
# if self.minio exists, self.current_status will too
if self.current_status.files_completed > len(files_seen):
files = await self.minio.list_bucket()
new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is relying on synchronization of clocks between the client and the server. Wouldn't it be better to set new_begin_at to be the latest result timestamp we see in the transform_results?

files = await self.servicex.get_transformation_results(
self.current_status.request_id, begin_at
)
begin_at = new_begin_at

for file in files:
if file.filename not in files_seen:
file_path = file.get("file-path", "").replace("/", ":")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on being kept in line with the transformer sidecar, and I think it may fail in certain cases (like the parquet output format is selected). Should we add a new field in the transform_result table that stores the object_name as uploaded to S3 (as determined by https://github.com/ssl-hep/ServiceX/blob/1991e6b2ea00dcbd8cdb9b9ed32fd44049f0dea3/transformer_sidecar/src/transformer_sidecar/transformer.py#L349 etc.) ?


if file_path != "" and file_path not in files_seen:
if signed_urls_only:
download_tasks.append(
loop.create_task(
get_signed_url(
self.minio,
file.filename,
file_path,
progress,
download_progress,
)
Expand All @@ -576,14 +592,14 @@ async def get_signed_url(
loop.create_task(
download_file(
self.minio,
file.filename,
file_path,
progress,
download_progress,
shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501
)
)
) # NOQA 501
files_seen.add(file.filename)
files_seen.add(file_path)

# Once the transform is complete and all files are seen we can stop polling.
# Also, if we are just downloading or signing urls for a previous transform
Expand Down
28 changes: 27 additions & 1 deletion servicex/servicex_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import time
import datetime
from typing import Optional, Dict, List

from aiohttp import ClientSession
Expand Down Expand Up @@ -228,14 +229,39 @@ async def delete_transform(self, transform_id=None):
f"Failed to delete transform {transform_id} - {msg}"
)

async def get_transformation_results(
self, request_id: str, begin_at: datetime.datetime
):
headers = await self._get_authorization()
url = self.url + f"/servicex/transformation/{request_id}/results"
params = {
"begin_at": begin_at.isoformat(),
}

async with ClientSession() as session:
async with session.get(headers=headers, url=url, params=params) as r:
if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)

if r.status == 404:
raise ValueError(f"Request {request_id} not found")

if r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed with message: {msg}")

data = await r.json()
return data.get("results")

async def cancel_transform(self, transform_id=None):
headers = await self._get_authorization()
path_template = f"/servicex/transformation/{transform_id}/cancel"
url = self.url + path_template.format(transform_id=transform_id)

async with ClientSession() as session:
async with session.get(headers=headers, url=url) as r:

if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
Expand Down
30 changes: 20 additions & 10 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pytest
import tempfile
import os
import datetime

from unittest.mock import AsyncMock, Mock, patch
from servicex.dataset_identifier import FileListDataset
Expand Down Expand Up @@ -120,16 +121,21 @@ async def test_as_files_cached(transformed_result, python_dataset):
@pytest.mark.asyncio
async def test_download_files(python_dataset):
signed_urls_only = False
begin_at = datetime.datetime.now(tz=datetime.timezone.utc)
download_progress = "download_task_id"
minio_mock = AsyncMock()
config = Configuration(cache_path="temp_dir", api_endpoints=[])
python_dataset.configuration = config
python_dataset.servicex = AsyncMock()
python_dataset.servicex.get_transformation_results = AsyncMock(
side_effect=[
[{"file-path": "file1.txt"}],
[{"file-path": "file2.txt"}],
]
)

minio_mock.download_file.return_value = Path("/path/to/downloaded_file")
minio_mock.get_signed_url.return_value = Path("http://example.com/signed_url")
minio_mock.list_bucket.return_value = [
Mock(filename="file1.txt"),
Mock(filename="file2.txt"),
]

progress_mock = Mock()
python_dataset.minio_polling_interval = 0
Expand All @@ -138,7 +144,7 @@ async def test_download_files(python_dataset):
python_dataset.configuration.shortened_downloaded_filename = False

result_uris = await python_dataset.download_files(
signed_urls_only, progress_mock, download_progress, None
signed_urls_only, progress_mock, download_progress, None, begin_at
)
minio_mock.download_file.assert_awaited()
minio_mock.get_signed_url.assert_not_awaited()
Expand All @@ -148,25 +154,29 @@ async def test_download_files(python_dataset):
@pytest.mark.asyncio
async def test_download_files_with_signed_urls(python_dataset):
signed_urls_only = True
begin_at = datetime.datetime.now(tz=datetime.timezone.utc)
download_progress = "download_task_id"
minio_mock = AsyncMock()
config = Configuration(cache_path="temp_dir", api_endpoints=[])
python_dataset.configuration = config
minio_mock.download_file.return_value = "/path/to/downloaded_file"
minio_mock.get_signed_url.return_value = "http://example.com/signed_url"
minio_mock.list_bucket.return_value = [
Mock(filename="file1.txt"),
Mock(filename="file2.txt"),
]
progress_mock = Mock()

python_dataset.servicex = AsyncMock()
python_dataset.servicex.get_transformation_results = AsyncMock(
side_effect=[
[{"file-path": "file1.txt"}],
[{"file-path": "file2.txt"}],
]
)
python_dataset.minio_polling_interval = 0
python_dataset.minio = minio_mock
python_dataset.current_status = Mock(status="Complete", files_completed=2)
python_dataset.configuration.shortened_downloaded_filename = False

result_uris = await python_dataset.download_files(
signed_urls_only, progress_mock, download_progress, None
signed_urls_only, progress_mock, download_progress, None, begin_at
)
minio_mock.download_file.assert_not_called()
minio_mock.get_signed_url.assert_called()
Expand Down
82 changes: 82 additions & 0 deletions tests/test_servicex_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import os
import tempfile
import time
import datetime
from unittest.mock import patch

import httpx
Expand Down Expand Up @@ -502,3 +503,84 @@ async def test_get_authorization(servicex):
with patch("google.auth.jwt.decode", return_value={"exp": time.time() - 90}):
r = await servicex._get_authorization()
get_token.assert_called_once()


@pytest.mark.asyncio
@patch("servicex.servicex_adapter.ClientSession.get")
async def test_get_transformation_results_success(get_transformation_results, servicex):
get_transformation_results.return_value.__aenter__.return_value.status = 200

request_id = "123-45-6789"
now = datetime.datetime.now(datetime.timezone.utc)
await servicex.get_transformation_results(request_id, now)

get_transformation_results.assert_called_with(
url=f"https://servicex.org/servicex/transformation/{request_id}/results",
headers={},
params={
"begin_at": now.isoformat(),
},
)


@pytest.mark.asyncio
@patch("servicex.servicex_adapter.ClientSession.get")
async def test_get_transformation_results_not_found(
get_transformation_results, servicex
):
get_transformation_results.return_value.__aenter__.return_value.status = 404
request_id = "123-45-6789"
now = datetime.datetime.now(datetime.timezone.utc)

with pytest.raises(ValueError):
await servicex.get_transformation_results(request_id, now)

get_transformation_results.assert_called_with(
url=f"https://servicex.org/servicex/transformation/{request_id}/results",
headers={},
params={
"begin_at": now.isoformat(),
},
)


@pytest.mark.asyncio
@patch("servicex.servicex_adapter.ClientSession.get")
async def test_get_transformation_results_not_authorized(
get_transformation_results, servicex
):
get_transformation_results.return_value.__aenter__.return_value.status = 403
request_id = "123-45-6789"
now = datetime.datetime.now(datetime.timezone.utc)

with pytest.raises(AuthorizationError):
await servicex.get_transformation_results(request_id, now)

get_transformation_results.assert_called_with(
url=f"https://servicex.org/servicex/transformation/{request_id}/results",
headers={},
params={
"begin_at": now.isoformat(),
},
)


@pytest.mark.asyncio
@patch("servicex.servicex_adapter.ClientSession.get")
async def test_get_transformation_results_server_error(
get_transformation_results, servicex
):
get_transformation_results.return_value.__aenter__.return_value.status = 500
request_id = "123-45-6789"
now = datetime.datetime.now(datetime.timezone.utc)

with pytest.raises(RuntimeError):
await servicex.get_transformation_results(request_id, now)

get_transformation_results.assert_called_with(
url=f"https://servicex.org/servicex/transformation/{request_id}/results",
headers={},
params={
"begin_at": now.isoformat(),
},
)
Loading
Loading