-
Notifications
You must be signed in to change notification settings - Fork 13
add supported server resources checks #592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
90b6173
remove s3 bucket polling when waiting for transformation results
MattShirley ee7b02c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 5b04365
add begin_at, update tests
MattShirley 85b5346
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] b80d72d
use python3.9 compliant method to get UTC time
MattShirley ccfe38c
Merge branch 'remove-s3-bucket-polling' of https://github.com/ssl-hep…
MattShirley 36cb4aa
fix breaking tests
MattShirley a11eb98
add supported server resources checks
MattShirley 9db76ad
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 4ff0b34
flake8 compliance
MattShirley 5eaaad4
fix breaking tests
MattShirley db04cef
resolve breaking tests
MattShirley File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,10 @@ | |
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
import os | ||
import time | ||
from typing import Optional, Dict, List | ||
import datetime | ||
import asyncio | ||
from typing import Optional, Dict, List, Any, TypeVar, Callable, cast | ||
from functools import wraps | ||
|
||
from aiohttp import ClientSession | ||
import httpx | ||
|
@@ -43,6 +46,85 @@ | |
|
||
from servicex.models import TransformRequest, TransformStatus, CachedDataset | ||
|
||
T = TypeVar("T") | ||
|
||
|
||
def requires_resource( | ||
resource_name: str, | ||
) -> Callable[[Callable[..., T]], Callable[..., T]]: | ||
""" | ||
Decorator to check if a specific API resource is available on the server. | ||
|
||
Args: | ||
resource_name: The name of the resource that needs to be available | ||
|
||
Returns: | ||
A decorator function that wraps around class methods | ||
|
||
Raises: | ||
ResourceNotAvailableError: If the required resource is not available on the server | ||
""" | ||
|
||
def decorator(func: Callable[..., T]) -> Callable[..., T]: | ||
# Determine if function is async at decoration time (not runtime) | ||
is_async = asyncio.iscoroutinefunction(func) | ||
func_name = func.__name__ | ||
|
||
# Class-level cache for sync method resources | ||
sync_cache_key = f"_sync_resources_for_{resource_name}" | ||
|
||
if is_async: | ||
|
||
@wraps(func) | ||
async def async_wrapper(self, *args: Any, **kwargs: Any) -> T: | ||
# Get resources and check availability in one operation | ||
if resource_name not in await self.get_resources(): | ||
raise ResourceNotAvailableError( | ||
f"Resource '{resource_name}' required for '{func_name}' is unavailable" | ||
) | ||
return await func(self, *args, **kwargs) | ||
|
||
return cast(Callable[..., T], async_wrapper) | ||
else: | ||
|
||
@wraps(func) | ||
def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: | ||
# Initialize class-level cache attributes if needed | ||
cls = self.__class__ | ||
if not hasattr(cls, sync_cache_key): | ||
setattr(cls, sync_cache_key, (None, 0)) # (resources, timestamp) | ||
|
||
cache_ttl = getattr(self, "_resources_cache_ttl", 300) | ||
cached_resources, timestamp = getattr(cls, sync_cache_key) | ||
current_time = time.time() | ||
|
||
# Check if cache needs refresh | ||
if cached_resources is None or (current_time - timestamp) >= cache_ttl: | ||
loop = asyncio.new_event_loop() | ||
try: | ||
cached_resources = loop.run_until_complete(self.get_resources()) | ||
setattr(cls, sync_cache_key, (cached_resources, current_time)) | ||
finally: | ||
loop.close() | ||
|
||
# Check resource availability | ||
if resource_name not in cached_resources: | ||
raise ResourceNotAvailableError( | ||
f"Resource '{resource_name}' required for '{func_name}' is unavailable" | ||
) | ||
|
||
return func(self, *args, **kwargs) | ||
|
||
return cast(Callable[..., T], sync_wrapper) | ||
|
||
return decorator | ||
|
||
|
||
class ResourceNotAvailableError(Exception): | ||
"""Exception raised when a required resource is not available on the server.""" | ||
|
||
pass | ||
|
||
|
||
class AuthorizationError(BaseException): | ||
pass | ||
|
@@ -63,6 +145,47 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): | |
self.refresh_token = refresh_token | ||
self.token = None | ||
|
||
self._available_resources: Optional[Dict[str, Any]] = None | ||
self._resources_last_updated: Optional[float] = None | ||
self._resources_cache_ttl = 60 * 5 | ||
|
||
async def get_resources(self) -> Dict[str, Any]: | ||
""" | ||
Fetches the list of available resources from the server. | ||
Caches the result for 5 minutes to avoid excessive API calls. | ||
|
||
Returns: | ||
A dictionary of available resources with their properties | ||
""" | ||
current_time = time.time() | ||
|
||
# Return cached resources if they exist and are not expired | ||
if ( | ||
self._available_resources is not None | ||
and self._resources_last_updated is not None | ||
and current_time - self._resources_last_updated < self._resources_cache_ttl | ||
): | ||
return self._available_resources | ||
|
||
# Fetch resources from server | ||
headers = await self._get_authorization() | ||
async with ClientSession() as session: | ||
async with session.get( | ||
headers=headers, url=f"{self.url}/servicex/resources" | ||
) as r: | ||
if r.status == 403: | ||
raise AuthorizationError( | ||
f"Not authorized to access serviceX at {self.url}" | ||
) | ||
elif r.status != 200: | ||
msg = await _extract_message(r) | ||
raise RuntimeError(f"Failed to get resources: {r.status} - {msg}") | ||
|
||
self._available_resources = await r.json() | ||
self._resources_last_updated = current_time | ||
|
||
return self._available_resources | ||
|
||
async def _get_token(self): | ||
url = f"{self.url}/token/refresh" | ||
headers = {"Authorization": f"Bearer {self.refresh_token}"} | ||
|
@@ -228,14 +351,41 @@ async def delete_transform(self, transform_id=None): | |
f"Failed to delete transform {transform_id} - {msg}" | ||
) | ||
|
||
@requires_resource("transformationresults") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In principle, in your code, we should never fail this check, right? |
||
async def get_transformation_results( | ||
self, request_id: str, begin_at: datetime.datetime | ||
): | ||
headers = await self._get_authorization() | ||
url = self.url + f"/servicex/transformation/{request_id}/results" | ||
|
||
params = {} | ||
if begin_at: | ||
params["begin_at"] = begin_at.isoformat() | ||
|
||
async with ClientSession() as session: | ||
async with session.get(headers=headers, url=url, params=params) as r: | ||
if r.status == 403: | ||
raise AuthorizationError( | ||
f"Not authorized to access serviceX at {self.url}" | ||
) | ||
|
||
if r.status == 404: | ||
raise ValueError(f"Request {request_id} not found") | ||
|
||
if r.status != 200: | ||
msg = await _extract_message(r) | ||
raise RuntimeError(f"Failed with message: {msg}") | ||
|
||
data = await r.json() | ||
return data.get("results") | ||
|
||
async def cancel_transform(self, transform_id=None): | ||
headers = await self._get_authorization() | ||
path_template = f"/servicex/transformation/{transform_id}/cancel" | ||
url = self.url + path_template.format(transform_id=transform_id) | ||
|
||
async with ClientSession() as session: | ||
async with session.get(headers=headers, url=url) as r: | ||
|
||
if r.status == 403: | ||
raise AuthorizationError( | ||
f"Not authorized to access serviceX at {self.url}" | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason that
get_transformation_results
can't just return the same list of objects thatself.minio.list_bucket()
would? This would simplify the failover logicThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
list_bucket returns a list of objects with a
filename
attribute, while the ServiceX adapter call is just turning JSON into dictionaries which have a 'file-path' key. I can change the key name to 'filename', but I don't think it's a good idea to change the dictionaries to objects with filename attributes given that pattern isn't reused anywhere else in the ServiceX adapter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle you could of course change the minio adapter to list the files with a dictionary instead of a list of objects of course... I would just like the "list of files to be downloaded" to be a type that doesn't depend on the source of the information.
Note also my comment on the backend PR - I'm concerned that this thing may need a new column in the database anyway, and whether you want to keep the same name may be up for discussion.