-
Notifications
You must be signed in to change notification settings - Fork 13
remove s3 bucket polling when waiting for transformation results #587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
90b6173
remove s3 bucket polling when waiting for transformation results
MattShirley ee7b02c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 5b04365
add begin_at, update tests
MattShirley 85b5346
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] b80d72d
use python3.9 compliant method to get UTC time
MattShirley ccfe38c
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley 36cb4aa
fix breaking tests
MattShirley eb9a937
add additional test coverage support
MattShirley bf805bb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 9492997
flake8 compliance
MattShirley 90a0a6b
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley ab9d0cf
code coverage improvement
MattShirley 3644ee1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 9b506c5
add ServiceXFile class and refactor tests to match previous
MattShirley 932a7e9
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley 6d18ff3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 04ae90c
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley c1da28f
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley ffd87ad
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley 98392dd
resolve breaking tests
MattShirley 8e952eb
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 232be85
flake8
MattShirley 0f094c0
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley b79cb26
flake8
MattShirley c8ad74c
flake8
MattShirley 489e05d
fix for python3.9
MattShirley 4e01dee
remove extraneous print call
MattShirley ee0423f
simplify typing
MattShirley f5bf546
add feature branching
MattShirley 5bbe40f
add testing
MattShirley 201531a
remove duplicated test
MattShirley 268592f
Merge branch 'master' of https://github.com/ssl-hep/ServiceX_frontend…
MattShirley 7db79e1
fix breaking test after merging master in
MattShirley 6e20fe6
try to fix breaking test on server
MattShirley 270bd8b
improve code coverage
MattShirley fc29f59
temporarily remove failing test in server (but not locally)
MattShirley bf49d24
attempt to fix breaking server test
MattShirley b49edc7
pull master and fix tests
MattShirley aa3f2fb
fix broken test
MattShirley 9785fc6
update models
MattShirley 72c7bca
update tests
MattShirley d218879
resolve merge conflicts
MattShirley b0eee6c
update pyproject
MattShirley 925c358
get s3 file name directly from API
MattShirley 7ddfc60
add warning notification and additional testing
MattShirley 23a0f13
remove pyproject changes
MattShirley File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,9 @@ | |
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
import os | ||
import time | ||
import datetime | ||
from typing import Optional, Dict, List | ||
from dataclasses import dataclass | ||
|
||
import httpx | ||
from httpx import AsyncClient, Response | ||
|
@@ -41,13 +43,25 @@ | |
retry_if_not_exception_type, | ||
) | ||
|
||
from servicex.models import TransformRequest, TransformStatus, CachedDataset | ||
from servicex.models import ( | ||
TransformRequest, | ||
TransformStatus, | ||
CachedDataset, | ||
ServiceXInfo, | ||
) | ||
|
||
|
||
class AuthorizationError(BaseException): | ||
pass | ||
|
||
|
||
@dataclass | ||
class ServiceXFile: | ||
created_at: datetime.datetime | ||
filename: str | ||
total_bytes: int | ||
|
||
|
||
async def _extract_message(r: Response): | ||
try: | ||
o = r.json() | ||
|
@@ -63,6 +77,9 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): | |
self.refresh_token = refresh_token | ||
self.token = None | ||
|
||
# interact with _servicex_info via get_servicex_info | ||
self._servicex_info: Optional[ServiceXInfo] = None | ||
|
||
async def _get_token(self): | ||
url = f"{self.url}/token/refresh" | ||
headers = {"Authorization": f"Bearer {self.refresh_token}"} | ||
|
@@ -120,6 +137,31 @@ async def _get_authorization(self, force_reauth: bool = False) -> Dict[str, str] | |
await self._get_token() | ||
return {"Authorization": f"Bearer {self.token}"} | ||
|
||
async def get_servicex_info(self) -> ServiceXInfo: | ||
if self._servicex_info: | ||
return self._servicex_info | ||
ponyisi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
headers = await self._get_authorization() | ||
retry_options = Retry(total=3, backoff_factor=10) | ||
async with AsyncClient(transport=RetryTransport(retry=retry_options)) as client: | ||
r = await client.get(url=f"{self.url}/servicex", headers=headers) | ||
if r.status_code == 401: | ||
raise AuthorizationError( | ||
f"Not authorized to access serviceX at {self.url}" | ||
) | ||
elif r.status_code > 400: | ||
error_message = await _extract_message(r) | ||
raise RuntimeError( | ||
"ServiceX WebAPI Error during transformation " | ||
f"submission: {r.status_code} - {error_message}" | ||
) | ||
servicex_info = r.json() | ||
self._servicex_info = ServiceXInfo(**servicex_info) | ||
return self._servicex_info | ||
|
||
async def get_servicex_capabilities(self) -> List[str]: | ||
return (await self.get_servicex_info()).capabilities | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a test for this line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
|
||
async def get_transforms(self) -> List[TransformStatus]: | ||
headers = await self._get_authorization() | ||
retry_options = Retry(total=3, backoff_factor=10) | ||
|
@@ -232,6 +274,48 @@ async def delete_transform(self, transform_id=None): | |
msg = await _extract_message(r) | ||
raise RuntimeError(f"Failed to delete transform {transform_id} - {msg}") | ||
|
||
async def get_transformation_results( | ||
self, request_id: str, later_than: Optional[datetime.datetime] = None | ||
): | ||
if ( | ||
"poll_local_transformation_results" | ||
not in await self.get_servicex_capabilities() | ||
): | ||
raise ValueError("ServiceX capabilities not found") | ||
|
||
headers = await self._get_authorization() | ||
url = self.url + f"/servicex/transformation/{request_id}/results" | ||
params = {} | ||
if later_than: | ||
params["later_than"] = later_than.isoformat() | ||
|
||
async with AsyncClient() as session: | ||
r = await session.get(headers=headers, url=url, params=params) | ||
if r.status_code == 403: | ||
raise AuthorizationError( | ||
f"Not authorized to access serviceX at {self.url}" | ||
) | ||
|
||
if r.status_code == 404: | ||
raise ValueError(f"Request {request_id} not found") | ||
|
||
if r.status_code != 200: | ||
msg = await _extract_message(r) | ||
raise RuntimeError(f"Failed with message: {msg}") | ||
|
||
data = r.json() | ||
response = list() | ||
for result in data.get("results", []): | ||
file = ServiceXFile( | ||
filename=result["s3-object-name"], | ||
created_at=datetime.datetime.fromisoformat( | ||
result["created_at"] | ||
).replace(tzinfo=datetime.timezone.utc), | ||
total_bytes=result["total-bytes"], | ||
) | ||
response.append(file) | ||
return response | ||
|
||
async def cancel_transform(self, transform_id=None): | ||
headers = await self._get_authorization() | ||
path_template = f"/servicex/transformation/{transform_id}/cancel" | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.