Skip to content

Commit 6b24b5a

Browse files
committed
Split up get_files
- return minio info only - Tests still pass. Working on #132
1 parent 518f651 commit 6b24b5a

File tree

1 file changed

+52
-7
lines changed

1 file changed

+52
-7
lines changed

servicex/servicex.py

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ async def get_data_awkward_async(self, selection_query: str):
166166
return self._converter.combine_awkward(await self._data_return(
167167
selection_query, lambda f: self._converter.convert_to_awkward(f)))
168168

169+
# async def get_data_minio_async(self, selection_query: str):
170+
# '''Async iterator return of minio buckets/files for a query. The results are made
171+
# available as soon as they are returned from ServiceX.
172+
173+
# Args:
174+
# selection_query (str): The query string
175+
# '''
176+
169177
async def _file_return(self, selection_query: str, data_format: str):
170178
'''
171179
Given a query, return the list of files, in a unique order, that hold
@@ -378,15 +386,12 @@ async def _get_files_from_servicex(self, request_id: str,
378386
good = True
379387
try:
380388

381-
# Setup the status sequence from servicex
382-
stream_status = transform_status_stream(self._servicex_adaptor, client, request_id)
383-
stream_notified = stream_status_updates(stream_status, notifier)
384-
stream_watched = trap_servicex_failures(stream_notified)
385-
stream_unique = stream_unique_updates_only(stream_watched)
389+
# Get the stream of minio bucket new files.
390+
stream_new_object = self._get_minio_bucket_files_from_servicex(
391+
request_id, client, minio_adaptor, notifier
392+
)
386393

387394
# Next, download the files as they are found (and return them):
388-
stream_new_object = find_new_bucket_files(minio_adaptor, request_id,
389-
stream_unique)
390395
stream_downloaded = self._download_a_file(stream_new_object, request_id,
391396
minio_adaptor, notifier)
392397

@@ -407,6 +412,46 @@ async def _get_files_from_servicex(self, request_id: str,
407412
self._log.write_query_log(request_id, notifier.total, notifier.failed,
408413
run_time, good, self._cache.path)
409414

415+
async def _get_minio_bucket_files_from_servicex(self, request_id: str,
416+
client: aiohttp.ClientSession,
417+
minio_adaptor: MinioAdaptor,
418+
notifier: _status_update_wrapper):
419+
'''Create an async stream of `minio` bucket/filenames from a request id.
420+
421+
Args:
422+
request_id (str): The request id that we should be polling for updates.
423+
client (aiohttp.ClientSession): The client connection to make API queries on
424+
minio_adaptor (MinioAdaptor): The minio adaptor we can use to connect to the minio
425+
bucket for new items.
426+
notifier (_status_update_wrapper): Allows us to send updates of progress
427+
back to the user
428+
429+
Yields:
430+
[type]: Returns xxx and yyy.
431+
'''
432+
start_time = time.monotonic()
433+
try:
434+
435+
# Setup the status sequence from servicex
436+
stream_status = transform_status_stream(self._servicex_adaptor, client, request_id)
437+
stream_notified = stream_status_updates(stream_status, notifier)
438+
stream_watched = trap_servicex_failures(stream_notified)
439+
stream_unique = stream_unique_updates_only(stream_watched)
440+
441+
# Next, download the files as they are found (and return them):
442+
stream_new_object = find_new_bucket_files(minio_adaptor, request_id,
443+
stream_unique)
444+
445+
# Return the minio information.
446+
async for info in stream_new_object:
447+
yield info
448+
449+
finally:
450+
end_time = time.monotonic()
451+
run_time = timedelta(seconds=end_time - start_time)
452+
logging.getLogger(__name__).info(f'Running servicex query for '
453+
f'{request_id} took {run_time} (no files downloaded)')
454+
410455
def _build_json_query(self, selection_query: str, data_type: str) -> Dict[str, str]:
411456
'''
412457
Returns a list of locally written files for a given selection query.

0 commit comments

Comments
 (0)