Skip to content

Commit c57d649

Browse files
committed
Getting parquet files working
- Normalize test names - Add better tests and code for files.
1 parent 828915e commit c57d649

File tree

3 files changed

+148
-29
lines changed

3 files changed

+148
-29
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"getenv",
3737
"gitlab",
3838
"inmem",
39+
"isabstractmethod",
3940
"jupyter",
4041
"jupyterlab",
4142
"leftfoot",

servicex/servicex.py

Lines changed: 85 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ class StreamInfoUrl:
4040
bucket: str
4141

4242

43+
@dataclass
44+
class StreamInfoPath:
45+
'''Contains information on accessing ServiceX data via a local Path
46+
'''
47+
path: Path
48+
file: str
49+
50+
4351
class ServiceXDataset(ServiceXABC):
4452
'''
4553
Used to access an instance of ServiceX at an end point on the internet. Support convieration
@@ -159,11 +167,47 @@ def ignore_cache(self):
159167
async def get_data_rootfiles_async(self, selection_query: str) -> List[Path]:
160168
return await self._file_return(selection_query, 'root-file')
161169

170+
async def get_data_rootfiles_stream(self, selection_query: str) \
171+
-> AsyncIterator[StreamInfoPath]:
172+
'''Returns, as an async iterator, each completed batch of work from Servicex.
173+
The `StreamInfoPath` contains a path where downstream consumers can directly
174+
access the data.
175+
176+
Args:
177+
selection_query (str): The `qastle` query for the data to retreive.
178+
179+
Yields:
180+
AsyncIterator[StreamInfoPath]: As ServiceX completes the data, and it is downloaded
181+
to the local machine, the async iterator returns
182+
a `StreamInfoPath` which can be used to access the
183+
file locally.
184+
'''
185+
async for f_info in self._stream_local_files(selection_query, 'root-files'):
186+
yield f_info
187+
162188
@functools.wraps(ServiceXABC.get_data_parquet_async, updated=())
163189
@_wrap_in_memory_sx_cache
164190
async def get_data_parquet_async(self, selection_query: str) -> List[Path]:
165191
return await self._file_return(selection_query, 'parquet')
166192

193+
async def get_data_parquet_stream(self, selection_query: str) \
194+
-> AsyncIterator[StreamInfoPath]:
195+
'''Returns, as an async iterator, each completed batch of work from Servicex.
196+
The `StreamInfoPath` contains a path where downstream consumers can directly
197+
access the data.
198+
199+
Args:
200+
selection_query (str): The `qastle` query for the data to retreive.
201+
202+
Yields:
203+
AsyncIterator[StreamInfoPath]: As ServiceX completes the data, and it is downloaded
204+
to the local machine, the async iterator returns
205+
a `StreamInfoPath` which can be used to access the
206+
file locally.
207+
'''
208+
async for f_info in self._stream_local_files(selection_query, 'parquet'):
209+
yield f_info
210+
167211
@functools.wraps(ServiceXABC.get_data_pandas_df_async, updated=())
168212
@_wrap_in_memory_sx_cache
169213
async def get_data_pandas_df_async(self, selection_query: str):
@@ -185,18 +229,18 @@ async def get_data_rootfiles_url_stream(self, selection_query: str) \
185229
Args:
186230
selection_query (str): The ServiceX Selection
187231
'''
188-
async for f_info in self._get_minio_buckets(selection_query, 'root-files'):
232+
async for f_info in self._stream_url_buckets(selection_query, 'root-files'):
189233
yield f_info
190234

191-
async def get_data_parquet_minio_stream(self, selection_query: str) \
235+
async def get_data_parquet_url_stream(self, selection_query: str) \
192236
-> AsyncIterator[StreamInfoUrl]:
193237
'''Returns, as an async iterator, each of the files from the minio bucket,
194238
as the files are added there.
195239
196240
Args:
197241
selection_query (str): The ServiceX Selection
198242
'''
199-
async for f_info in self._get_minio_buckets(selection_query, 'parquet'):
243+
async for f_info in self._stream_url_buckets(selection_query, 'parquet'):
200244
yield f_info
201245

202246
async def _file_return(self, selection_query: str, data_format: str):
@@ -224,7 +268,7 @@ async def convert_to_file(f: Path) -> Path:
224268
return await self._data_return(selection_query, convert_to_file, data_format)
225269

226270
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
227-
async def _get_minio_buckets(self, selection_query: str, data_format: str) \
271+
async def _stream_url_buckets(self, selection_query: str, data_format: str) \
228272
-> AsyncIterator[StreamInfoUrl]:
229273
'''Get a list of files back for a request
230274
@@ -280,12 +324,10 @@ async def _get_minio_buckets(self, selection_query: str, data_format: str) \
280324
await self._servicex_adaptor.dump_query_errors(client, request_id)
281325
raise ServiceXException(f'Failed to transform all files in {request_id}') from e
282326

283-
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
284327
async def _data_return(self, selection_query: str,
285328
converter: Callable[[Path], Awaitable[Any]],
286-
data_format: str = 'root-file'):
287-
'''
288-
Given a query, return the data, in a unique order, that hold
329+
data_format: str = 'root-file') -> List[Any]:
330+
'''Given a query, return the data, in a unique order, that hold
289331
the data for the query.
290332
291333
For certian types of exceptions, the queries will be repeated. For example,
@@ -303,25 +345,51 @@ async def _data_return(self, selection_query: str,
303345
data Data converted to the "proper" format, depending
304346
on the converter call.
305347
'''
306-
# Get a notifier to update anyone who wants to listen.
307-
notifier = self._create_notifier()
348+
as_data = ((f.file, asyncio.ensure_future(converter(f.path)))
349+
async for f in self._stream_local_files(selection_query, data_format))
308350

309-
# Get all the files
310-
as_files = \
311-
(f async for f in
312-
self._get_files(selection_query, data_format, notifier))
351+
all_data = {d[0]: await d[1] async for d in as_data}
313352

314353
# Convert them to the proper format
315-
as_data = ((f[0], asyncio.ensure_future(converter(await f[1])))
316-
async for f in as_files)
317354

318355
# Finally, we need them in the proper order so we append them
319356
# all together
320-
all_data = {f[0]: await f[1] async for f in as_data}
321357
ordered_data = [all_data[k] for k in sorted(all_data.keys())]
322358

323359
return ordered_data
324360

361+
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
362+
async def _stream_local_files(self, selection_query: str,
363+
data_format: str = 'root-file'):
364+
'''
365+
Given a query, return the data as a list of paths pointing to local files
366+
that contain the results of the query. This is an async generator, and files
367+
are returned as they arrive.
368+
369+
For certian types of exceptions, the queries will be repeated. For example,
370+
if `ServiceX` indicates that it was restarted in the middle of the query, then
371+
the query will be re-submitted.
372+
373+
Arguments:
374+
375+
selection_query `qastle` data that makes up the selection request.
376+
377+
Returns:
378+
379+
data Data converted to the "proper" format, depending
380+
on the converter call.
381+
'''
382+
# Get a notifier to update anyone who wants to listen.
383+
notifier = self._create_notifier()
384+
385+
# Get all the files
386+
as_files = \
387+
(f async for f in
388+
self._get_files(selection_query, data_format, notifier))
389+
390+
async for name, a_path in as_files:
391+
yield StreamInfoPath(Path(await a_path), name)
392+
325393
async def _get_files(self, selection_query: str, data_type: str,
326394
notifier: _status_update_wrapper) \
327395
-> AsyncIterator[Tuple[str, Awaitable[Path]]]:

tests/test_servicex.py

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
from contextlib import contextmanager
33
from pathlib import Path
4-
from servicex.servicex import StreamInfoUrl
4+
from servicex.servicex import StreamInfoPath, StreamInfoUrl
55
from servicex.cache import Cache
66

77
from confuse.core import Configuration
@@ -141,7 +141,7 @@ async def test_minio_back(mocker):
141141
"/foo/bar.root")
142142

143143
assert len(r) == 1
144-
assert r[0] == '/foo/bar.root'
144+
assert r[0] == Path('/foo/bar.root')
145145

146146

147147
@pytest.mark.asyncio
@@ -185,7 +185,7 @@ def test_good_run_root_files_no_async(mocker):
185185

186186
r = ds.get_data_rootfiles('(valid qastle string)')
187187
assert len(r) == 2
188-
assert r[0] == '/foo/bar.root'
188+
assert r[0] == Path('/foo/bar.root')
189189

190190

191191
@pytest.mark.asyncio
@@ -392,7 +392,7 @@ async def test_good_run_single_ds_2file_awkward(mocker, good_awkward_file_data):
392392

393393

394394
@pytest.mark.asyncio
395-
async def test_good_run_root_files_from_minio(mocker):
395+
async def test_stream_root_files_from_minio(mocker):
396396
'Get a root file pulling back minio info as it arrives'
397397
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
398398
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
@@ -420,7 +420,34 @@ async def test_good_run_root_files_from_minio(mocker):
420420

421421

422422
@pytest.mark.asyncio
423-
async def test_bad_request_id_run_root_files_from_minio(mocker):
423+
async def test_stream_root_files(mocker):
424+
'Get a root file pulling back minio info as it arrives'
425+
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
426+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
427+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
428+
mock_logger = mocker.MagicMock(spec=log_adaptor)
429+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
430+
431+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
432+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
433+
minio_adaptor=mock_minio_adaptor, # type: ignore
434+
cache_adaptor=mock_cache,
435+
local_log=mock_logger,
436+
data_convert_adaptor=data_adaptor)
437+
lst = [f async for f in ds.get_data_rootfiles_stream('(valid qastle string)')]
438+
439+
assert len(lst) == 1
440+
r = lst[0]
441+
assert isinstance(r, StreamInfoPath)
442+
assert r.file == 'one_minio_entry'
443+
assert 'foo' in r.path.parts
444+
assert 'bar.root' in r.path.parts
445+
446+
assert mock_servicex_adaptor.query_json['result-format'] == 'root-file'
447+
448+
449+
@pytest.mark.asyncio
450+
async def test_stream_bad_request_id_run_root_files_from_minio(mocker):
424451
'Using the minio interface - the request_id is not known'
425452
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
426453
transform_status = mocker.MagicMock(side_effect=ServiceXUnknownRequestID('boom'))
@@ -446,7 +473,7 @@ async def test_bad_request_id_run_root_files_from_minio(mocker):
446473

447474

448475
@pytest.mark.asyncio
449-
async def test_bad_transform_run_root_files_from_minio(mocker):
476+
async def test_stream_bad_transform_run_root_files_from_minio(mocker):
450477
'Using the async minio interface - fail to transform (like bad DID)'
451478
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
452479
fatal_transform_status = {
@@ -489,7 +516,7 @@ async def test_bad_transform_run_root_files_from_minio(mocker):
489516

490517

491518
@pytest.mark.asyncio
492-
async def test_bad_file_transform_run_root_files_from_minio(mocker):
519+
async def test_stream_bad_file_transform_run_root_files_from_minio(mocker):
493520
'Using the async minio interface, some files will fail to translate.'
494521
mock_cache = build_cache_mock(mocker)
495522
mock_logger = mocker.MagicMock(spec=log_adaptor)
@@ -514,7 +541,7 @@ async def test_bad_file_transform_run_root_files_from_minio(mocker):
514541

515542

516543
@pytest.mark.asyncio
517-
async def test_good_run_parquet_files_from_minio(mocker):
544+
async def test_stream_parquet_files_from_minio(mocker):
518545
'Get a parquet file pulling back minio info as it arrives'
519546
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
520547
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
@@ -528,9 +555,7 @@ async def test_good_run_parquet_files_from_minio(mocker):
528555
cache_adaptor=mock_cache,
529556
local_log=mock_logger,
530557
data_convert_adaptor=data_adaptor)
531-
lst = []
532-
async for f_info in ds.get_data_parquet_minio_stream('(valid qastle string)'):
533-
lst.append(f_info)
558+
lst = [f_info async for f_info in ds.get_data_parquet_url_stream('(valid qastle string)')]
534559

535560
assert len(lst) == 1
536561
assert lst[0].bucket == '123-456'
@@ -539,6 +564,31 @@ async def test_good_run_parquet_files_from_minio(mocker):
539564
assert mock_servicex_adaptor.query_json['result-format'] == 'parquet'
540565

541566

567+
@pytest.mark.asyncio
568+
async def test_stream_parquet_files(mocker):
569+
'Get a parquet file pulling back minio info as it arrives'
570+
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
571+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
572+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
573+
mock_logger = mocker.MagicMock(spec=log_adaptor)
574+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
575+
576+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
577+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
578+
minio_adaptor=mock_minio_adaptor, # type: ignore
579+
cache_adaptor=mock_cache,
580+
local_log=mock_logger,
581+
data_convert_adaptor=data_adaptor)
582+
lst = [f_info async for f_info in ds.get_data_parquet_stream('(valid qastle string)')]
583+
584+
assert len(lst) == 1
585+
assert lst[0].file == 'one_minio_entry'
586+
assert 'foo' in lst[0].path.parts
587+
assert 'bar.root' in lst[0].path.parts
588+
589+
assert mock_servicex_adaptor.query_json['result-format'] == 'parquet'
590+
591+
542592
@pytest.mark.asyncio
543593
async def test_status_exception(mocker):
544594
'Make sure status error - like transform not found - is reported all the way to the top'
@@ -715,7 +765,7 @@ async def test_callback_none(mocker):
715765
"/foo/bar.root")
716766

717767
assert len(r) == 1
718-
assert r[0] == '/foo/bar.root'
768+
assert r[0] == Path('/foo/bar.root')
719769

720770

721771
@pytest.mark.asyncio

0 commit comments

Comments
 (0)