Skip to content

Commit 0f29820

Browse files
committed
Do not display the download bar if we are streaming URL's
1 parent 1c14234 commit 0f29820

File tree

5 files changed

+103
-21
lines changed

5 files changed

+103
-21
lines changed

scripts/test.ipynb

Whitespace-only changes.

servicex/servicex.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from dataclasses import dataclass
77
from datetime import timedelta
88
from pathlib import Path
9-
from typing import (Any, AsyncIterator, Awaitable, Callable, Dict, List,
9+
from typing import (Any, AsyncGenerator, AsyncIterator, Awaitable, Callable, Dict, List,
1010
Optional, Tuple, Union)
1111

1212
import aiohttp
@@ -287,7 +287,7 @@ async def convert_to_file(f: Path) -> Path:
287287

288288
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
289289
async def _stream_url_buckets(self, selection_query: str, data_format: str) \
290-
-> AsyncIterator[StreamInfoUrl]:
290+
-> AsyncGenerator[StreamInfoUrl, None]:
291291
'''Get a list of files back for a request
292292
293293
Args:
@@ -314,7 +314,7 @@ async def _stream_url_buckets(self, selection_query: str, data_format: str) \
314314
# Look up the cache, and then fetch an iterator going thorugh the results
315315
# from either servicex or the cache, depending.
316316
try:
317-
notifier = self._create_notifier()
317+
notifier = self._create_notifier(False)
318318
minio_files = self._get_minio_bucket_files_from_servicex(request_id, client,
319319
minio_adaptor, notifier)
320320

@@ -405,7 +405,8 @@ async def _stream_return(self, selection_query: str,
405405

406406
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
407407
async def _stream_local_files(self, selection_query: str,
408-
data_format: str = 'root-file'):
408+
data_format: str = 'root-file') \
409+
-> AsyncGenerator[StreamInfoPath, None]:
409410
'''
410411
Given a query, return the data as a list of paths pointing to local files
411412
that contain the results of the query. This is an async generator, and files
@@ -425,7 +426,7 @@ async def _stream_local_files(self, selection_query: str,
425426
on the converter call.
426427
'''
427428
# Get a notifier to update anyone who wants to listen.
428-
notifier = self._create_notifier()
429+
notifier = self._create_notifier(True)
429430

430431
# Get all the files
431432
as_files = \

servicex/servicexabc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ def __init__(self,
6868
status_callback_factory if status_callback_factory is not None \
6969
else _null_progress_feedback
7070

71-
def _create_notifier(self) -> _status_update_wrapper:
71+
def _create_notifier(self, downloading: bool) -> _status_update_wrapper:
7272
'Internal method to create a updater from the status call-back'
73-
return _status_update_wrapper(self._status_callback_factory(self._dataset))
73+
return _status_update_wrapper(self._status_callback_factory(self._dataset, downloading))
7474

7575
@abstractmethod
7676
async def get_data_rootfiles_async(self, selection_query: str) -> List[Path]:

servicex/utils.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,16 @@ def sanitize_filename(fname: str):
8787

8888

8989
StatusUpdateCallback = Callable[[Optional[int], int, int, int], None]
90-
# The sig of the call-back
90+
# The sig of the call-back. Arguments are:
91+
# 1. Processed
92+
# 1. Downloaded
93+
# 1. Remaining
94+
# 1. Failed
9195

92-
StatusUpdateFactory = Callable[[str], StatusUpdateCallback]
96+
StatusUpdateFactory = Callable[[str, bool], StatusUpdateCallback]
9397
# Factory method that returns a factory that can run the status callback
94-
# First argument is a string.
98+
# First argument is a string, second argument is True if a download progress
99+
# bar can be displayed as well.
95100

96101

97102
class _status_update_wrapper:
@@ -192,14 +197,20 @@ async def stream_unique_updates_only(stream: AsyncIterator[TransformTuple]):
192197
yield p
193198

194199

195-
def _run_default_wrapper(ds_name: str) -> StatusUpdateCallback:
196-
'''
197-
Create a feedback object for everyone to use to pass feedback to. Uses tqdm (default).
200+
def _run_default_wrapper(ds_name: str, downloading: bool) -> StatusUpdateCallback:
201+
'''Create a feedback object for everyone to use to pass feedback to. Uses tqdm (default).
202+
203+
Args:
204+
ds_name (str): The dataset name
205+
downloading (bool): True if we will be downloading the files, not just processing them.
206+
207+
Returns:
208+
StatusUpdateCallback: The updater callback
198209
'''
199-
return _default_wrapper_mgr(ds_name).update
210+
return _default_wrapper_mgr(ds_name, downloading).update
200211

201212

202-
def _null_progress_feedback(ds_name: str) -> None:
213+
def _null_progress_feedback(ds_name: str, downloading: bool) -> None:
203214
'''
204215
Internal routine to create a feedback object that does not
205216
give anyone feedback!
@@ -209,9 +220,10 @@ def _null_progress_feedback(ds_name: str) -> None:
209220

210221
class _default_wrapper_mgr:
211222
'Default progress bar'
212-
def __init__(self, sample_name: Optional[str] = None):
223+
def __init__(self, sample_name: Optional[str] = None, show_download_bar: bool = True):
213224
self._tqdm_p: Optional[tqdm] = None
214225
self._tqdm_d: Optional[tqdm] = None
226+
self._show_download_progress = show_download_bar
215227
self._sample_name = sample_name
216228

217229
def _init_tqdm(self):
@@ -221,9 +233,10 @@ def _init_tqdm(self):
221233
self._tqdm_p = tqdm(total=9e9, desc=self._sample_name, unit='file',
222234
leave=False, dynamic_ncols=True,
223235
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]')
224-
self._tqdm_d = tqdm(total=9e9, desc=" Downloaded", unit='file',
225-
leave=False, dynamic_ncols=True,
226-
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]')
236+
if self._show_download_progress:
237+
self._tqdm_d = tqdm(total=9e9, desc=" Downloaded", unit='file',
238+
leave=False, dynamic_ncols=True,
239+
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]')
227240

228241
def _update_bar(self, bar: Optional[tqdm], total: Optional[int], num: int, failed: int):
229242
assert bar is not None, 'Internal error - bar was not initalized'
@@ -244,7 +257,8 @@ def _update_bar(self, bar: Optional[tqdm], total: Optional[int], num: int, faile
244257
def update(self, total: Optional[int], processed: int, downloaded: int, failed: int):
245258
self._init_tqdm()
246259
self._update_bar(self._tqdm_p, total, processed, failed)
247-
self._update_bar(self._tqdm_d, total, downloaded, failed)
260+
if self._show_download_progress:
261+
self._update_bar(self._tqdm_d, total, downloaded, failed)
248262

249263

250264
# Changes in the json that won't affect the result

tests/test_servicex.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -778,7 +778,7 @@ def check_in(total: Optional[int], processed: int, downloaded: int, failed: int)
778778
cache_adaptor=mock_cache,
779779
data_convert_adaptor=data_adaptor,
780780
local_log=mock_logger,
781-
status_callback_factory=lambda ds: check_in)
781+
status_callback_factory=lambda ds, downloading: check_in)
782782
ds.get_data_rootfiles('(valid qastle string)')
783783

784784
assert f_total == 1
@@ -787,6 +787,73 @@ def check_in(total: Optional[int], processed: int, downloaded: int, failed: int)
787787
assert f_failed == 0
788788

789789

790+
def test_callback_is_downloading(mocker):
791+
'Make sure this file download sets the file-download marker in the callback'
792+
mock_cache = build_cache_mock(mocker)
793+
mock_logger = mocker.MagicMock(spec=log_adaptor)
794+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
795+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
796+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
797+
798+
def check_in(total: Optional[int], processed: int, downloaded: int, failed: int):
799+
pass
800+
801+
ds_name = None
802+
ds_downloading = None
803+
804+
def build_it(ds: str, downloading: bool):
805+
nonlocal ds_name, ds_downloading
806+
ds_name = ds
807+
ds_downloading = downloading
808+
return check_in
809+
810+
ds = fe.ServiceXDataset('http://one-ds',
811+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
812+
minio_adaptor=mock_minio_adaptor, # type: ignore
813+
cache_adaptor=mock_cache,
814+
data_convert_adaptor=data_adaptor,
815+
local_log=mock_logger,
816+
status_callback_factory=build_it)
817+
ds.get_data_rootfiles('(valid qastle string)')
818+
819+
assert ds_name == 'http://one-ds'
820+
assert ds_downloading
821+
822+
823+
@pytest.mark.asyncio
824+
async def test_callback_is_not_downloading(mocker):
825+
'Stream download of Minio URLs should not set the download marker'
826+
mock_cache = build_cache_mock(mocker)
827+
mock_logger = mocker.MagicMock(spec=log_adaptor)
828+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
829+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
830+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
831+
832+
def check_in(total: Optional[int], processed: int, downloaded: int, failed: int):
833+
pass
834+
835+
ds_name = None
836+
ds_downloading = None
837+
838+
def build_it(ds: str, downloading: bool):
839+
nonlocal ds_name, ds_downloading
840+
ds_name = ds
841+
ds_downloading = downloading
842+
return check_in
843+
844+
ds = fe.ServiceXDataset('http://one-ds',
845+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
846+
minio_adaptor=mock_minio_adaptor, # type: ignore
847+
cache_adaptor=mock_cache,
848+
data_convert_adaptor=data_adaptor,
849+
local_log=mock_logger,
850+
status_callback_factory=build_it)
851+
_ = [f async for f in ds.get_data_rootfiles_url_stream('(valid qastle string)')]
852+
853+
assert ds_name == 'http://one-ds'
854+
assert not ds_downloading
855+
856+
790857
@pytest.mark.asyncio
791858
async def test_callback_none(mocker):
792859
'Get a root file with a single file'

0 commit comments

Comments
 (0)