Skip to content

add supported server resources checks #592

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: remove-s3-bucket-polling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions servicex/query_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,23 +560,32 @@ async def get_signed_url(
if progress:
progress.advance(task_id=download_progress, task_type="Download")

transformation_results_enabled = (
"transformationresults" in await self.servicex.get_resources()
)

while True:
if not cached_record:
await asyncio.sleep(self.minio_polling_interval)
if self.minio:
# if self.minio exists, self.current_status will too
if self.current_status.files_completed > len(files_seen):
new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc)
files = await self.servicex.get_transformation_results(
self.current_status.request_id, begin_at
)
begin_at = new_begin_at
if transformation_results_enabled:
new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc)
files = await self.servicex.get_transformation_results(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason that get_transformation_results can't just return the same list of objects that self.minio.list_bucket() would? This would simplify the failover logic

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list_bucket returns a list of objects with a filename attribute, while the ServiceX adapter call is just turning JSON into dictionaries which have a 'file-path' key. I can change the key name to 'filename', but I don't think it's a good idea to change the dictionaries to objects with filename attributes given that pattern isn't reused anywhere else in the ServiceX adapter.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle you could of course change the minio adapter to list the files with a dictionary instead of a list of objects of course... I would just like the "list of files to be downloaded" to be a type that doesn't depend on the source of the information.

Note also my comment on the backend PR - I'm concerned that this thing may need a new column in the database anyway, and whether you want to keep the same name may be up for discussion.

self.current_status.request_id, begin_at
)
begin_at = new_begin_at
else:
files = await self.minio.list_bucket()

for file in files:
if "file-path" not in file:
continue

file_path = file["file-path"].replace("/", ":")
if transformation_results_enabled:
if "file-path" not in file:
continue
file_path = file.get("file-path", "").replace("/", ":")
else:
file_path = file.filename

if file_path not in files_seen:
if signed_urls_only:
Expand Down
125 changes: 124 additions & 1 deletion servicex/servicex_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import os
import time
import datetime
from typing import Optional, Dict, List
import asyncio
from typing import Optional, Dict, List, Any, TypeVar, Callable, cast
from functools import wraps

from aiohttp import ClientSession
import httpx
Expand All @@ -44,6 +46,85 @@

from servicex.models import TransformRequest, TransformStatus, CachedDataset

T = TypeVar("T")


def requires_resource(
resource_name: str,
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""
Decorator to check if a specific API resource is available on the server.

Args:
resource_name: The name of the resource that needs to be available

Returns:
A decorator function that wraps around class methods

Raises:
ResourceNotAvailableError: If the required resource is not available on the server
"""

def decorator(func: Callable[..., T]) -> Callable[..., T]:
# Determine if function is async at decoration time (not runtime)
is_async = asyncio.iscoroutinefunction(func)
func_name = func.__name__

# Class-level cache for sync method resources
sync_cache_key = f"_sync_resources_for_{resource_name}"

if is_async:

@wraps(func)
async def async_wrapper(self, *args: Any, **kwargs: Any) -> T:
# Get resources and check availability in one operation
if resource_name not in await self.get_resources():
raise ResourceNotAvailableError(
f"Resource '{resource_name}' required for '{func_name}' is unavailable"
)
return await func(self, *args, **kwargs)

return cast(Callable[..., T], async_wrapper)
else:

@wraps(func)
def sync_wrapper(self, *args: Any, **kwargs: Any) -> T:
# Initialize class-level cache attributes if needed
cls = self.__class__
if not hasattr(cls, sync_cache_key):
setattr(cls, sync_cache_key, (None, 0)) # (resources, timestamp)

cache_ttl = getattr(self, "_resources_cache_ttl", 300)
cached_resources, timestamp = getattr(cls, sync_cache_key)
current_time = time.time()

# Check if cache needs refresh
if cached_resources is None or (current_time - timestamp) >= cache_ttl:
loop = asyncio.new_event_loop()
try:
cached_resources = loop.run_until_complete(self.get_resources())
setattr(cls, sync_cache_key, (cached_resources, current_time))
finally:
loop.close()

# Check resource availability
if resource_name not in cached_resources:
raise ResourceNotAvailableError(
f"Resource '{resource_name}' required for '{func_name}' is unavailable"
)

return func(self, *args, **kwargs)

return cast(Callable[..., T], sync_wrapper)

return decorator


class ResourceNotAvailableError(Exception):
"""Exception raised when a required resource is not available on the server."""

pass


class AuthorizationError(BaseException):
pass
Expand All @@ -64,6 +145,47 @@ def __init__(self, url: str, refresh_token: Optional[str] = None):
self.refresh_token = refresh_token
self.token = None

self._available_resources: Optional[Dict[str, Any]] = None
self._resources_last_updated: Optional[float] = None
self._resources_cache_ttl = 60 * 5

async def get_resources(self) -> Dict[str, Any]:
"""
Fetches the list of available resources from the server.
Caches the result for 5 minutes to avoid excessive API calls.

Returns:
A dictionary of available resources with their properties
"""
current_time = time.time()

# Return cached resources if they exist and are not expired
if (
self._available_resources is not None
and self._resources_last_updated is not None
and current_time - self._resources_last_updated < self._resources_cache_ttl
):
return self._available_resources

# Fetch resources from server
headers = await self._get_authorization()
async with ClientSession() as session:
async with session.get(
headers=headers, url=f"{self.url}/servicex/resources"
) as r:
if r.status == 403:
raise AuthorizationError(
f"Not authorized to access serviceX at {self.url}"
)
elif r.status != 200:
msg = await _extract_message(r)
raise RuntimeError(f"Failed to get resources: {r.status} - {msg}")

self._available_resources = await r.json()
self._resources_last_updated = current_time

return self._available_resources

async def _get_token(self):
url = f"{self.url}/token/refresh"
headers = {"Authorization": f"Bearer {self.refresh_token}"}
Expand Down Expand Up @@ -229,6 +351,7 @@ async def delete_transform(self, transform_id=None):
f"Failed to delete transform {transform_id} - {msg}"
)

@requires_resource("transformationresults")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle, in your code, we should never fail this check, right?

async def get_transformation_results(
self, request_id: str, begin_at: datetime.datetime
):
Expand Down
8 changes: 8 additions & 0 deletions tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ async def test_download_files(python_dataset):

minio_mock.download_file.return_value = Path("/path/to/downloaded_file")
minio_mock.get_signed_url.return_value = Path("http://example.com/signed_url")
minio_mock.list_bucket.return_value = [
Mock(filename="file1.txt"),
Mock(filename="file2.txt"),
]

progress_mock = Mock()
python_dataset.minio_polling_interval = 0
Expand All @@ -161,6 +165,10 @@ async def test_download_files_with_signed_urls(python_dataset):
python_dataset.configuration = config
minio_mock.download_file.return_value = "/path/to/downloaded_file"
minio_mock.get_signed_url.return_value = "http://example.com/signed_url"
minio_mock.list_bucket.return_value = [
Mock(filename="file1.txt"),
Mock(filename="file2.txt"),
]
progress_mock = Mock()

python_dataset.servicex = AsyncMock()
Expand Down
12 changes: 12 additions & 0 deletions tests/test_servicex_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ async def test_submit(mocker):
]

mock_minio = AsyncMock()
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1, file2]])
mock_minio.download_file = AsyncMock(
side_effect=lambda a, _, shorten_filename: PurePath(a)
)
Expand Down Expand Up @@ -278,6 +279,7 @@ async def test_submit_partial_success(mocker):
]

mock_minio = AsyncMock()
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
mock_minio.download_file = AsyncMock(
side_effect=lambda a, _, shorten_filename: PurePath(a)
)
Expand Down Expand Up @@ -323,6 +325,7 @@ async def test_use_of_cache(mocker):
transform_status3,
]
mock_minio = AsyncMock()
mock_minio.list_bucket = AsyncMock(return_value=[file1, file2])
mock_minio.download_file = AsyncMock(
side_effect=lambda a, _, shorten_filename: PurePath(a)
)
Expand Down Expand Up @@ -357,6 +360,7 @@ async def test_use_of_cache(mocker):
# second round, should hit the cache (and not call the sx_adapter, minio, or update_record)
with ExpandableProgress(display_progress=False) as progress:
servicex2 = AsyncMock()
mock_minio.list_bucket.reset_mock()
servicex.get_transformation_results.reset_mock()
mock_minio.get_signed_url.reset_mock()
datasource2 = Query(
Expand All @@ -375,6 +379,7 @@ async def test_use_of_cache(mocker):
signed_urls_only=True, expandable_progress=progress
)
servicex2.assert_not_awaited()
mock_minio.list_bucket.assert_not_awaited()
servicex.get_transformation_results.assert_not_awaited()
mock_minio.get_signed_url.assert_not_awaited()
upd.assert_not_called()
Expand All @@ -399,6 +404,7 @@ async def test_use_of_cache(mocker):
signed_urls_only=False, expandable_progress=progress
)
servicex.assert_not_awaited()
mock_minio.list_bucket.assert_not_awaited()
servicex.get_transformation_results.assert_not_awaited()
mock_minio.download_file.assert_not_awaited()
upd.assert_called_once()
Expand All @@ -417,6 +423,7 @@ async def test_submit_cancel(mocker):
]

mock_minio = AsyncMock()
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
mock_minio.download_file = AsyncMock(
side_effect=lambda a, _, shorten_filename: PurePath(a)
)
Expand Down Expand Up @@ -457,6 +464,7 @@ async def test_submit_fatal(mocker):
]

mock_minio = AsyncMock()
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
mock_minio.download_file = AsyncMock(
side_effect=lambda a, _, shorten_filename: PurePath(a)
)
Expand Down Expand Up @@ -501,6 +509,7 @@ async def test_submit_generic(mocker, codegen_list):
]

mock_minio = AsyncMock()
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
mock_minio.download_file = AsyncMock()

mock_cache = mocker.MagicMock(QueryCache)
Expand Down Expand Up @@ -549,6 +558,7 @@ async def test_submit_cancelled(mocker, codegen_list):
sx.get_transform_status.side_effect = [transform_status4]

mock_minio = AsyncMock()
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
mock_minio.download_file = AsyncMock()

mock_cache = mocker.MagicMock(QueryCache)
Expand Down Expand Up @@ -623,6 +633,7 @@ async def test_use_of_ignore_cache(mocker, servicex):
)
# Prepare Minio
mock_minio = AsyncMock()
mock_minio.list_bucket = AsyncMock(side_effect=[[file1], [file1]])
mock_minio.get_signed_url = AsyncMock(side_effect=["http://file1", "http://file2"])
mocker.patch("servicex.minio_adapter.MinioAdapter", return_value=mock_minio)
did = FileListDataset("/foo/bar/baz.root")
Expand Down Expand Up @@ -698,6 +709,7 @@ async def test_use_of_ignore_cache(mocker, servicex):
res = await datasource_without_ignore_cache.submit_and_download(
signed_urls_only=True, expandable_progress=progress
) # noqa
mock_minio.list_bucket.assert_not_awaited()
servicex.get_transformation_results.assert_not_awaited()
mock_minio.download_file.assert_not_awaited()
assert len(res.signed_url_list) == 2
Expand Down
Loading