-
Notifications
You must be signed in to change notification settings - Fork 13
add supported server resources checks #592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
MattShirley
wants to merge
5
commits into
remove-s3-bucket-polling
Choose a base branch
from
add-server-resources
base: remove-s3-bucket-polling
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
a11eb98
add supported server resources checks
MattShirley 9db76ad
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 4ff0b34
flake8 compliance
MattShirley 5eaaad4
fix breaking tests
MattShirley db04cef
resolve breaking tests
MattShirley File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,9 @@ | |
import os | ||
import time | ||
import datetime | ||
from typing import Optional, Dict, List | ||
import asyncio | ||
from typing import Optional, Dict, List, Any, TypeVar, Callable, cast | ||
from functools import wraps | ||
|
||
from aiohttp import ClientSession | ||
import httpx | ||
|
@@ -44,6 +46,85 @@ | |
|
||
from servicex.models import TransformRequest, TransformStatus, CachedDataset | ||
|
||
T = TypeVar("T") | ||
|
||
|
||
def requires_resource( | ||
resource_name: str, | ||
) -> Callable[[Callable[..., T]], Callable[..., T]]: | ||
""" | ||
Decorator to check if a specific API resource is available on the server. | ||
|
||
Args: | ||
resource_name: The name of the resource that needs to be available | ||
|
||
Returns: | ||
A decorator function that wraps around class methods | ||
|
||
Raises: | ||
ResourceNotAvailableError: If the required resource is not available on the server | ||
""" | ||
|
||
def decorator(func: Callable[..., T]) -> Callable[..., T]: | ||
# Determine if function is async at decoration time (not runtime) | ||
is_async = asyncio.iscoroutinefunction(func) | ||
func_name = func.__name__ | ||
|
||
# Class-level cache for sync method resources | ||
sync_cache_key = f"_sync_resources_for_{resource_name}" | ||
|
||
if is_async: | ||
|
||
@wraps(func) | ||
async def async_wrapper(self, *args: Any, **kwargs: Any) -> T: | ||
# Get resources and check availability in one operation | ||
if resource_name not in await self.get_resources(): | ||
raise ResourceNotAvailableError( | ||
f"Resource '{resource_name}' required for '{func_name}' is unavailable" | ||
) | ||
return await func(self, *args, **kwargs) | ||
|
||
return cast(Callable[..., T], async_wrapper) | ||
else: | ||
|
||
@wraps(func) | ||
def sync_wrapper(self, *args: Any, **kwargs: Any) -> T: | ||
# Initialize class-level cache attributes if needed | ||
cls = self.__class__ | ||
if not hasattr(cls, sync_cache_key): | ||
setattr(cls, sync_cache_key, (None, 0)) # (resources, timestamp) | ||
|
||
cache_ttl = getattr(self, "_resources_cache_ttl", 300) | ||
cached_resources, timestamp = getattr(cls, sync_cache_key) | ||
current_time = time.time() | ||
|
||
# Check if cache needs refresh | ||
if cached_resources is None or (current_time - timestamp) >= cache_ttl: | ||
loop = asyncio.new_event_loop() | ||
try: | ||
cached_resources = loop.run_until_complete(self.get_resources()) | ||
setattr(cls, sync_cache_key, (cached_resources, current_time)) | ||
finally: | ||
loop.close() | ||
|
||
# Check resource availability | ||
if resource_name not in cached_resources: | ||
raise ResourceNotAvailableError( | ||
f"Resource '{resource_name}' required for '{func_name}' is unavailable" | ||
) | ||
|
||
return func(self, *args, **kwargs) | ||
|
||
return cast(Callable[..., T], sync_wrapper) | ||
|
||
return decorator | ||
|
||
|
||
class ResourceNotAvailableError(Exception): | ||
"""Exception raised when a required resource is not available on the server.""" | ||
|
||
pass | ||
|
||
|
||
class AuthorizationError(BaseException): | ||
pass | ||
|
@@ -64,6 +145,47 @@ def __init__(self, url: str, refresh_token: Optional[str] = None): | |
self.refresh_token = refresh_token | ||
self.token = None | ||
|
||
self._available_resources: Optional[Dict[str, Any]] = None | ||
self._resources_last_updated: Optional[float] = None | ||
self._resources_cache_ttl = 60 * 5 | ||
|
||
async def get_resources(self) -> Dict[str, Any]: | ||
""" | ||
Fetches the list of available resources from the server. | ||
Caches the result for 5 minutes to avoid excessive API calls. | ||
|
||
Returns: | ||
A dictionary of available resources with their properties | ||
""" | ||
current_time = time.time() | ||
|
||
# Return cached resources if they exist and are not expired | ||
if ( | ||
self._available_resources is not None | ||
and self._resources_last_updated is not None | ||
and current_time - self._resources_last_updated < self._resources_cache_ttl | ||
): | ||
return self._available_resources | ||
|
||
# Fetch resources from server | ||
headers = await self._get_authorization() | ||
async with ClientSession() as session: | ||
async with session.get( | ||
headers=headers, url=f"{self.url}/servicex/resources" | ||
) as r: | ||
if r.status == 403: | ||
raise AuthorizationError( | ||
f"Not authorized to access serviceX at {self.url}" | ||
) | ||
elif r.status != 200: | ||
msg = await _extract_message(r) | ||
raise RuntimeError(f"Failed to get resources: {r.status} - {msg}") | ||
|
||
self._available_resources = await r.json() | ||
self._resources_last_updated = current_time | ||
|
||
return self._available_resources | ||
|
||
async def _get_token(self): | ||
url = f"{self.url}/token/refresh" | ||
headers = {"Authorization": f"Bearer {self.refresh_token}"} | ||
|
@@ -229,6 +351,7 @@ async def delete_transform(self, transform_id=None): | |
f"Failed to delete transform {transform_id} - {msg}" | ||
) | ||
|
||
@requires_resource("transformationresults") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In principle, in your code, we should never fail this check, right? |
||
async def get_transformation_results( | ||
self, request_id: str, begin_at: datetime.datetime | ||
): | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason that
get_transformation_results
can't just return the same list of objects thatself.minio.list_bucket()
would? This would simplify the failover logicThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
list_bucket returns a list of objects with a
filename
attribute, while the ServiceX adapter call is just turning JSON into dictionaries which have a 'file-path' key. I can change the key name to 'filename', but I don't think it's a good idea to change the dictionaries to objects with filename attributes given that pattern isn't reused anywhere else in the ServiceX adapter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In principle you could of course change the minio adapter to list the files with a dictionary instead of a list of objects of course... I would just like the "list of files to be downloaded" to be a type that doesn't depend on the source of the information.
Note also my comment on the backend PR - I'm concerned that this thing may need a new column in the database anyway, and whether you want to keep the same name may be up for discussion.