Skip to content

Commit 90b6173

Browse files
committed
remove s3 bucket polling when waiting for transformation results
1 parent c548ed2 commit 90b6173

File tree

2 files changed

+33
-7
lines changed

2 files changed

+33
-7
lines changed

servicex/query_core.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,7 @@ async def download_files(
522522
Task to monitor the list of files in the transform output's bucket. Any new files
523523
will be downloaded.
524524
"""
525+
525526
files_seen = set()
526527
result_uris = []
527528
download_tasks = []
@@ -557,15 +558,19 @@ async def get_signed_url(
557558
if self.minio:
558559
# if self.minio exists, self.current_status will too
559560
if self.current_status.files_completed > len(files_seen):
560-
files = await self.minio.list_bucket()
561+
files = await self.servicex.get_transformation_results(self.current_status.request_id)
562+
561563
for file in files:
562-
if file.filename not in files_seen:
564+
if 'file-path' not in file:
565+
continue
566+
567+
file_path = file['file-path'].replace('/', ':')
568+
if file_path not in files_seen:
563569
if signed_urls_only:
564570
download_tasks.append(
565571
loop.create_task(
566572
get_signed_url(
567-
self.minio,
568-
file.filename,
573+
file_path,
569574
progress,
570575
download_progress,
571576
)
@@ -576,14 +581,14 @@ async def get_signed_url(
576581
loop.create_task(
577582
download_file(
578583
self.minio,
579-
file.filename,
584+
file_path,
580585
progress,
581586
download_progress,
582587
shorten_filename=self.configuration.shortened_downloaded_filename, # NOQA: E501
583588
)
584589
)
585590
) # NOQA 501
586-
files_seen.add(file.filename)
591+
files_seen.add(file_path)
587592

588593
# Once the transform is complete and all files are seen we can stop polling.
589594
# Also, if we are just downloading or signing urls for a previous transform

servicex/servicex_adapter.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,35 @@ async def delete_transform(self, transform_id=None):
228228
f"Failed to delete transform {transform_id} - {msg}"
229229
)
230230

231+
async def get_transformation_results(self, request_id: str):
232+
headers = await self._get_authorization()
233+
url = self.url + f'/servicex/internal/transformation/{request_id}/results'
234+
235+
async with ClientSession() as session:
236+
async with session.get(headers=headers, url=url) as r:
237+
if r.status == 403:
238+
raise AuthorizationError(
239+
f"Not authorized to access serviceX at {self.url}"
240+
)
241+
242+
if r.status == 404:
243+
raise ValueError(f"Request {request_id} not found")
244+
245+
if r.status != 200:
246+
msg = await _extract_message(r)
247+
raise RuntimeError(
248+
f"Failed with message: {msg}"
249+
)
250+
251+
return (await r.json())['results']
252+
231253
async def cancel_transform(self, transform_id=None):
232254
headers = await self._get_authorization()
233255
path_template = f"/servicex/transformation/{transform_id}/cancel"
234256
url = self.url + path_template.format(transform_id=transform_id)
235257

236258
async with ClientSession() as session:
237259
async with session.get(headers=headers, url=url) as r:
238-
239260
if r.status == 403:
240261
raise AuthorizationError(
241262
f"Not authorized to access serviceX at {self.url}"

0 commit comments

Comments
 (0)