Skip to content

Commit 1c14234

Browse files
committed
Streaming tests done
- Added pandas and awkward streaming - Tried to clean up and remove duplicate code
1 parent c57d649 commit 1c14234

File tree

2 files changed

+95
-4
lines changed

2 files changed

+95
-4
lines changed

servicex/servicex.py

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ class StreamInfoPath:
4848
file: str
4949

5050

51+
@dataclass
52+
class StreamInfoData:
53+
'''Contains information on accessing ServiceX data via converted data
54+
'''
55+
data: Any
56+
file: str
57+
58+
5159
class ServiceXDataset(ServiceXABC):
5260
'''
5361
Used to access an instance of ServiceX at an end point on the internet. Support convieration
@@ -220,6 +228,16 @@ async def get_data_awkward_async(self, selection_query: str):
220228
return self._converter.combine_awkward(await self._data_return(
221229
selection_query, lambda f: self._converter.convert_to_awkward(f)))
222230

231+
async def get_data_awkward_stream(self, selection_query: str):
232+
async for a in self._stream_return(selection_query,
233+
lambda f: self._converter.convert_to_awkward(f)):
234+
yield a
235+
236+
async def get_data_pandas_stream(self, selection_query: str):
237+
async for a in self._stream_return(selection_query,
238+
lambda f: self._converter.convert_to_pandas(f)):
239+
yield a
240+
223241
async def get_data_rootfiles_url_stream(self, selection_query: str) \
224242
-> AsyncIterator[StreamInfoUrl]:
225243
'''Returns, as an async iterator, each completed batch of work from ServiceX.
@@ -345,10 +363,10 @@ async def _data_return(self, selection_query: str,
345363
data Data converted to the "proper" format, depending
346364
on the converter call.
347365
'''
348-
as_data = ((f.file, asyncio.ensure_future(converter(f.path)))
349-
async for f in self._stream_local_files(selection_query, data_format))
350-
351-
all_data = {d[0]: await d[1] async for d in as_data}
366+
all_data = {
367+
f.file: f.data
368+
async for f in self._stream_return(selection_query, converter, data_format)
369+
}
352370

353371
# Convert them to the proper format
354372

@@ -358,6 +376,33 @@ async def _data_return(self, selection_query: str,
358376

359377
return ordered_data
360378

379+
async def _stream_return(self, selection_query: str,
380+
converter: Callable[[Path], Awaitable[Any]],
381+
data_format: str = 'root-file') -> AsyncIterator[StreamInfoData]:
382+
'''Given a query, return the data, in the order it arrives back
383+
converted as appropriate.
384+
385+
For certian types of exceptions, the queries will be repeated. For example,
386+
if `ServiceX` indicates that it was restarted in the middle of the query, then
387+
the query will be re-submitted.
388+
389+
Arguments:
390+
391+
selection_query `qastle` data that makes up the selection request.
392+
converter A `Callable` that will convert the data returned from
393+
`ServiceX` as a set of files.
394+
395+
Returns:
396+
397+
data Data converted to the "proper" format, depending
398+
on the converter call.
399+
'''
400+
as_data = (StreamInfoData(await asyncio.ensure_future(converter(f.path)), f.file)
401+
async for f in self._stream_local_files(selection_query, data_format))
402+
403+
async for r in as_data:
404+
yield r
405+
361406
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
362407
async def _stream_local_files(self, selection_query: str,
363408
data_format: str = 'root-file'):

tests/test_servicex.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,52 @@ async def test_stream_parquet_files(mocker):
589589
assert mock_servicex_adaptor.query_json['result-format'] == 'parquet'
590590

591591

592+
@pytest.mark.asyncio
593+
async def test_stream_awkward_data(mocker):
594+
'Get a parquet file pulling back minio info as it arrives'
595+
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
596+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
597+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
598+
mock_logger = mocker.MagicMock(spec=log_adaptor)
599+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
600+
data_adaptor.convert_to_awkward.return_value = {'JetPt': 10}
601+
602+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
603+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
604+
minio_adaptor=mock_minio_adaptor, # type: ignore
605+
cache_adaptor=mock_cache,
606+
local_log=mock_logger,
607+
data_convert_adaptor=data_adaptor)
608+
lst = [f_info async for f_info in ds.get_data_awkward_stream('(valid qastle string)')]
609+
610+
assert len(lst) == 1
611+
assert lst[0].file == 'one_minio_entry'
612+
assert isinstance(lst[0].data, dict)
613+
614+
615+
@pytest.mark.asyncio
616+
async def test_stream_pandas_data(mocker):
617+
'Get a parquet file pulling back minio info as it arrives'
618+
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
619+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
620+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
621+
mock_logger = mocker.MagicMock(spec=log_adaptor)
622+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
623+
data_adaptor.convert_to_pandas.return_value = {'JetPt': 10}
624+
625+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
626+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
627+
minio_adaptor=mock_minio_adaptor, # type: ignore
628+
cache_adaptor=mock_cache,
629+
local_log=mock_logger,
630+
data_convert_adaptor=data_adaptor)
631+
lst = [f_info async for f_info in ds.get_data_pandas_stream('(valid qastle string)')]
632+
633+
assert len(lst) == 1
634+
assert lst[0].file == 'one_minio_entry'
635+
assert isinstance(lst[0].data, dict)
636+
637+
592638
@pytest.mark.asyncio
593639
async def test_status_exception(mocker):
594640
'Make sure status error - like transform not found - is reported all the way to the top'

0 commit comments

Comments
 (0)