Skip to content

Commit a11eb98

Browse files
committed
add supported server resources checks
1 parent 36cb4aa commit a11eb98

File tree

2 files changed

+131
-10
lines changed

2 files changed

+131
-10
lines changed

servicex/query_core.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -560,23 +560,30 @@ async def get_signed_url(
560560
if progress:
561561
progress.advance(task_id=download_progress, task_type="Download")
562562

563+
transformation_results_enabled = "transformationresults" in await self.servicex.get_resources()
564+
563565
while True:
564566
if not cached_record:
565567
await asyncio.sleep(self.minio_polling_interval)
566568
if self.minio:
567569
# if self.minio exists, self.current_status will too
568570
if self.current_status.files_completed > len(files_seen):
569-
new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc)
570-
files = await self.servicex.get_transformation_results(
571-
self.current_status.request_id, begin_at
572-
)
573-
begin_at = new_begin_at
571+
if transformation_results_enabled:
572+
new_begin_at = datetime.datetime.now(tz=datetime.timezone.utc)
573+
files = await self.servicex.get_transformation_results(
574+
self.current_status.request_id, begin_at
575+
)
576+
begin_at = new_begin_at
577+
else:
578+
files = await self.minio.list_bucket()
574579

575580
for file in files:
576-
if "file-path" not in file:
577-
continue
578-
579-
file_path = file["file-path"].replace("/", ":")
581+
if transformation_results_enabled:
582+
if "file-path" not in file:
583+
continue
584+
file_path = file.get("file-path", '').replace("/", ":")
585+
else:
586+
file_path = file.filename
580587

581588
if file_path not in files_seen:
582589
if signed_urls_only:

servicex/servicex_adapter.py

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import os
2929
import time
3030
import datetime
31-
from typing import Optional, Dict, List
31+
import asyncio
32+
from typing import Optional, Dict, List, Any, TypeVar, Callable, cast
33+
from functools import wraps
3234

3335
from aiohttp import ClientSession
3436
import httpx
@@ -44,6 +46,78 @@
4446

4547
from servicex.models import TransformRequest, TransformStatus, CachedDataset
4648

49+
T = TypeVar('T')
50+
51+
def requires_resource(resource_name: str) -> Callable[[Callable[..., T]], Callable[..., T]]:
52+
"""
53+
Decorator to check if a specific API resource is available on the server before executing the method.
54+
55+
Args:
56+
resource_name: The name of the resource that needs to be available
57+
58+
Returns:
59+
A decorator function that wraps around class methods
60+
61+
Raises:
62+
ResourceNotAvailableError: If the required resource is not available on the server
63+
"""
64+
65+
def decorator(func: Callable[..., T]) -> Callable[..., T]:
66+
# Determine if function is async at decoration time (not runtime)
67+
is_async = asyncio.iscoroutinefunction(func)
68+
func_name = func.__name__
69+
70+
# Class-level cache for sync method resources
71+
sync_cache_key = f'_sync_resources_for_{resource_name}'
72+
73+
if is_async:
74+
@wraps(func)
75+
async def async_wrapper(self, *args: Any, **kwargs: Any) -> T:
76+
# Get resources and check availability in one operation
77+
if resource_name not in await self.get_resources():
78+
raise ResourceNotAvailableError(
79+
f"Resource '{resource_name}' required for '{func_name}' is unavailable"
80+
)
81+
return await func(self, *args, **kwargs)
82+
83+
return cast(Callable[..., T], async_wrapper)
84+
else:
85+
@wraps(func)
86+
def sync_wrapper(self, *args: Any, **kwargs: Any) -> T:
87+
# Initialize class-level cache attributes if needed
88+
cls = self.__class__
89+
if not hasattr(cls, sync_cache_key):
90+
setattr(cls, sync_cache_key, (None, 0)) # (resources, timestamp)
91+
92+
cache_ttl = getattr(self, '_resources_cache_ttl', 300)
93+
cached_resources, timestamp = getattr(cls, sync_cache_key)
94+
current_time = time.time()
95+
96+
# Check if cache needs refresh
97+
if cached_resources is None or (current_time - timestamp) >= cache_ttl:
98+
loop = asyncio.new_event_loop()
99+
try:
100+
cached_resources = loop.run_until_complete(self.get_resources())
101+
setattr(cls, sync_cache_key, (cached_resources, current_time))
102+
finally:
103+
loop.close()
104+
105+
# Check resource availability
106+
if resource_name not in cached_resources:
107+
raise ResourceNotAvailableError(
108+
f"Resource '{resource_name}' required for '{func_name}' is unavailable"
109+
)
110+
111+
return func(self, *args, **kwargs)
112+
113+
return cast(Callable[..., T], sync_wrapper)
114+
115+
return decorator
116+
117+
118+
class ResourceNotAvailableError(Exception):
119+
"""Exception raised when a required resource is not available on the server."""
120+
pass
47121

48122
class AuthorizationError(BaseException):
49123
pass
@@ -64,6 +138,45 @@ def __init__(self, url: str, refresh_token: Optional[str] = None):
64138
self.refresh_token = refresh_token
65139
self.token = None
66140

141+
self._available_resources: Optional[Dict[str, Any]] = None
142+
self._resources_last_updated: Optional[float] = None
143+
self._resources_cache_ttl = 60*5
144+
145+
async def get_resources(self) -> Dict[str, Any]:
146+
"""
147+
Fetches the list of available resources from the server.
148+
Caches the result for 5 minutes to avoid excessive API calls.
149+
150+
Returns:
151+
A dictionary of available resources with their properties
152+
"""
153+
current_time = time.time()
154+
155+
# Return cached resources if they exist and are not expired
156+
if (self._available_resources is not None and
157+
self._resources_last_updated is not None and
158+
current_time - self._resources_last_updated < self._resources_cache_ttl):
159+
return self._available_resources
160+
161+
# Fetch resources from server
162+
headers = await self._get_authorization()
163+
async with ClientSession() as session:
164+
async with session.get(
165+
headers=headers, url=f"{self.url}/servicex/resources"
166+
) as r:
167+
if r.status == 403:
168+
raise AuthorizationError(
169+
f"Not authorized to access serviceX at {self.url}"
170+
)
171+
elif r.status != 200:
172+
msg = await _extract_message(r)
173+
raise RuntimeError(f"Failed to get resources: {r.status} - {msg}")
174+
175+
self._available_resources = await r.json()
176+
self._resources_last_updated = current_time
177+
178+
return self._available_resources
179+
67180
async def _get_token(self):
68181
url = f"{self.url}/token/refresh"
69182
headers = {"Authorization": f"Bearer {self.refresh_token}"}
@@ -229,6 +342,7 @@ async def delete_transform(self, transform_id=None):
229342
f"Failed to delete transform {transform_id} - {msg}"
230343
)
231344

345+
@requires_resource("transformationresults")
232346
async def get_transformation_results(
233347
self, request_id: str, begin_at: datetime.datetime
234348
):

0 commit comments

Comments
 (0)