Skip to content

remove s3 bucket polling when waiting for transformation results #587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Jun 25, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
90b6173
remove s3 bucket polling when waiting for transformation results
MattShirley May 6, 2025
ee7b02c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 6, 2025
5b04365
add begin_at, update tests
MattShirley May 14, 2025
85b5346
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 14, 2025
b80d72d
use python3.9 compliant method to get UTC time
MattShirley May 14, 2025
ccfe38c
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 14, 2025
36cb4aa
fix breaking tests
MattShirley May 14, 2025
eb9a937
add additional test coverage support
MattShirley May 20, 2025
bf805bb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 20, 2025
9492997
flake8 compliance
MattShirley May 20, 2025
90a0a6b
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 20, 2025
ab9d0cf
code coverage improvement
MattShirley May 20, 2025
3644ee1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 20, 2025
9b506c5
add ServiceXFile class and refactor tests to match previous
MattShirley May 22, 2025
932a7e9
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
6d18ff3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 22, 2025
04ae90c
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley May 22, 2025
c1da28f
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
ffd87ad
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley May 22, 2025
98392dd
resolve breaking tests
MattShirley May 22, 2025
8e952eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 22, 2025
232be85
flake8
MattShirley May 22, 2025
0f094c0
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley May 22, 2025
b79cb26
flake8
MattShirley May 22, 2025
c8ad74c
flake8
MattShirley May 22, 2025
489e05d
fix for python3.9
MattShirley May 22, 2025
4e01dee
remove extraneous print call
MattShirley May 22, 2025
ee0423f
simplify typing
MattShirley Jun 4, 2025
f5bf546
add feature branching
MattShirley Jun 11, 2025
5bbe40f
add testing
MattShirley Jun 11, 2025
201531a
remove duplicated test
MattShirley Jun 11, 2025
268592f
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley Jun 12, 2025
7db79e1
fix breaking test after merging master in
MattShirley Jun 12, 2025
6e20fe6
try to fix breaking test on server
MattShirley Jun 12, 2025
270bd8b
improve code coverage
MattShirley Jun 12, 2025
fc29f59
temporarily remove failing test in server (but not locally)
MattShirley Jun 12, 2025
bf49d24
attempt to fix breaking server test
MattShirley Jun 12, 2025
b49edc7
pull master and fix tests
MattShirley Jun 17, 2025
aa3f2fb
fix broken test
MattShirley Jun 17, 2025
9785fc6
update models
MattShirley Jun 20, 2025
72c7bca
update tests
MattShirley Jun 23, 2025
d218879
resolve merge conflicts
MattShirley Jun 23, 2025
b0eee6c
update pyproject
MattShirley Jun 24, 2025
925c358
get s3 file name directly from API
MattShirley Jun 24, 2025
7ddfc60
add warning notification and additional testing
MattShirley Jun 25, 2025
23a0f13
remove pyproject changes
MattShirley Jun 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,10 @@ dynamic_context = "test_function"

[tool.pytest.ini_options]
asyncio_mode = "auto"

[tool.hatch.envs.test]
features = ["test"]

[tool.hatch.envs.test.scripts]
test = "pytest {args}"
cov = "pytest --cov=servicex {args}"
2 changes: 1 addition & 1 deletion servicex/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ class ServiceXInfo(DocStringBaseModel):

app_version: str = Field(alias="app-version")
code_gen_image: dict[str, str] = Field(alias="code-gen-image")
capabilities: list[str]
capabilities: list[str] = Field(default_factory=list)


class DatasetFile(BaseModel):
Expand Down
10 changes: 9 additions & 1 deletion servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,14 @@ async def get_signed_url(
"poll_local_transformation_results"
in await self.servicex.get_servicex_capabilities()
)

if not use_local_polling:
logger.warning(
"ServiceX is using legacy S3 bucket polling. Future versions of the "
"ServiceX client will not support this method. Please update your "
"ServiceX server to the latest version."
)

while True:
if not cached_record:
await asyncio.sleep(self.minio_polling_interval)
Expand Down Expand Up @@ -597,7 +605,7 @@ async def get_signed_url(
)
else:
if use_local_polling:
expected_size = None
expected_size = file.total_bytes
else:
expected_size = file.size
download_tasks.append(
Expand Down
106 changes: 53 additions & 53 deletions servicex/servicex_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ class AuthorizationError(BaseException):
class ServiceXFile:
created_at: datetime.datetime
filename: str
total_bytes: int


async def _extract_message(r: ClientResponse):
async def _extract_message(r: Response):
try:
o = r.json()
error_message = o.get("message", str(r))
Expand Down Expand Up @@ -141,20 +142,20 @@ async def get_servicex_info(self) -> ServiceXInfo:
return self._servicex_info

headers = await self._get_authorization()
retry_options = ExponentialRetry(attempts=3, start_timeout=10)
async with RetryClient(retry_options=retry_options) as client:
async with client.get(url=f"{self.url}/servicex", headers=headers) as r:
if r.status == 401:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation "
f"submission: {r.status} - {error_message}"
)
servicex_info = await r.json()
retry_options = Retry(total=3, backoff_factor=10)
async with AsyncClient(transport=RetryTransport(retry=retry_options)) as client:
r = await client.get(url=f"{self.url}/servicex", headers=headers)
if r.status_code == 401:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code > 400:
error_message = await _extract_message(r)
raise RuntimeError(
"ServiceX WebAPI Error during transformation "
f"submission: {r.status_code} - {error_message}"
)
servicex_info = r.json()
self._servicex_info = ServiceXInfo(**servicex_info)
return self._servicex_info

Expand Down Expand Up @@ -288,50 +289,49 @@ async def get_transformation_results(
if later_than:
params["later_than"] = later_than.isoformat()

async with ClientSession() as session:
async with session.get(headers=headers, url=url, params=params) as r:
if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)

if r.status == 404:
raise ValueError(f"Request {request_id} not found")

if r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed with message: {msg}")

data = await r.json()
response = list()
for result in data.get("results", []):
file = ServiceXFile(
filename=result["s3-object-name"],
created_at=datetime.datetime.fromisoformat(
result["created_at"]
).replace(tzinfo=datetime.timezone.utc),
)
response.append(file)
return response
async with AsyncClient() as session:
r = await session.get(headers=headers, url=url, params=params)
if r.status_code == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)

if r.status_code == 404:
raise ValueError(f"Request {request_id} not found")

if r.status_code != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed with message: {msg}")

data = r.json()
response = list()
for result in data.get("results", []):
file = ServiceXFile(
filename=result["s3-object-name"],
created_at=datetime.datetime.fromisoformat(
result["created_at"]
).replace(tzinfo=datetime.timezone.utc),
total_bytes=result["total-bytes"],
)
response.append(file)
return response

async def cancel_transform(self, transform_id=None):
headers = await self._get_authorization()
path_template = f"/servicex/transformation/{transform_id}/cancel"
url = self.url + path_template.format(transform_id=transform_id)

async with ClientSession() as session:
async with session.get(headers=headers, url=url) as r:
if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status == 404:
raise ValueError(f"Transform {transform_id} not found")
elif r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(
f"Failed to cancel transform {transform_id} - {msg}"
)
async with AsyncClient() as session:
r = await session.get(headers=headers, url=url)
if r.status_code == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status_code == 404:
raise ValueError(f"Transform {transform_id} not found")
elif r.status_code != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to cancel transform {transform_id} - {msg}")

async def submit_transform(self, transform_request: TransformRequest) -> str:
headers = await self._get_authorization()
Expand Down
4 changes: 4 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,12 @@ async def test_download_files(python_dataset):
ServiceXFile(
filename="file1.txt",
created_at=datetime.datetime.now(datetime.timezone.utc),
total_bytes=100,
),
ServiceXFile(
filename="file2.txt",
created_at=datetime.datetime.now(datetime.timezone.utc),
total_bytes=100,
),
]

Expand Down Expand Up @@ -182,10 +184,12 @@ async def test_download_files_with_signed_urls(python_dataset):
ServiceXFile(
filename="file1.txt",
created_at=datetime.datetime.now(datetime.timezone.utc),
total_bytes=100,
),
ServiceXFile(
filename="file2.txt",
created_at=datetime.datetime.now(datetime.timezone.utc),
total_bytes=100,
),
]

Expand Down
Loading