Skip to content

Commit f0cd089

Browse files
committed
First go at the callback to fetch minio async
1 parent 6b24b5a commit f0cd089

File tree

3 files changed

+231
-9
lines changed

3 files changed

+231
-9
lines changed

servicex/minio_adaptor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def from_best(self, transaction_info: Optional[Dict[str, str]] = None) -> MinioA
143143
if all(k in transaction_info for k in keys):
144144
logging.getLogger(__name__).debug('Using the request-specific minio_adaptor')
145145
return MinioAdaptor(transaction_info['minio-endpoint'],
146-
transaction_info['minio-secured'],
146+
bool(transaction_info['minio-secured']),
147147
transaction_info['minio-access-key'],
148148
transaction_info['minio-secret-key'])
149149
if self._config_adaptor is not None:

servicex/servicex.py

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import aiohttp
1212
import backoff
1313
from backoff import on_exception
14+
from numpy.lib.function_base import select
1415

1516
from servicex.servicex_config import ServiceXConfigAdaptor
1617

@@ -166,13 +167,27 @@ async def get_data_awkward_async(self, selection_query: str):
166167
return self._converter.combine_awkward(await self._data_return(
167168
selection_query, lambda f: self._converter.convert_to_awkward(f)))
168169

169-
# async def get_data_minio_async(self, selection_query: str):
170-
# '''Async iterator return of minio buckets/files for a query. The results are made
171-
# available as soon as they are returned from ServiceX.
170+
async def get_data_rootfiles_minio_async(self, selection_query: str) \
171+
-> AsyncIterator[Dict[str, str]]:
172+
'''Returns, as an async iterator, each of the files from the minio bucket,
173+
as the files are added there.
172174
173-
# Args:
174-
# selection_query (str): The query string
175-
# '''
175+
Args:
176+
selection_query (str): The ServiceX Selection
177+
'''
178+
async for f_info in self._get_minio_buckets(select, 'root-files'):
179+
yield f_info
180+
181+
async def get_data_parquet_minio_async(self, selection_query: str) \
182+
-> AsyncIterator[Dict[str, str]]:
183+
'''Returns, as an async iterator, each of the files from the minio bucket,
184+
as the files are added there.
185+
186+
Args:
187+
selection_query (str): The ServiceX Selection
188+
'''
189+
async for f_info in self._get_minio_buckets(select, 'parquet'):
190+
yield f_info
176191

177192
async def _file_return(self, selection_query: str, data_format: str):
178193
'''
@@ -198,6 +213,66 @@ async def convert_to_file(f: Path) -> Path:
198213

199214
return await self._data_return(selection_query, convert_to_file, data_format)
200215

216+
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
217+
async def _get_minio_buckets(self, selection_query: str, data_format: str) \
218+
-> AsyncIterator[Dict[str, str]]:
219+
'''Get a list of files back for a request
220+
221+
Args:
222+
selection_query (str): The selection query we are to do
223+
data_format (str): The requested file format
224+
225+
Yields:
226+
AsyncIterator[Dict[str, str]]: A tuple of the minio bucket and file in that bucket.
227+
The dict will have entries for:
228+
bucket: The minio bucket name
229+
file: the completed file in the bucket
230+
'''
231+
query = self._build_json_query(selection_query, data_format)
232+
233+
async with aiohttp.ClientSession() as client:
234+
235+
# Get a request id - which might be cached, but if not, submit it.
236+
request_id = await self._get_request_id(client, query)
237+
238+
# Get the minio adaptor we are going to use for downloading.
239+
minio_adaptor = self._minio_adaptor \
240+
.from_best(self._cache.lookup_query_status(request_id))
241+
242+
# Look up the cache, and then fetch an iterator going thorugh the results
243+
# from either servicex or the cache, depending.
244+
try:
245+
notifier = self._create_notifier()
246+
minio_files = self._get_minio_bucket_files_from_servicex(request_id, client,
247+
minio_adaptor, notifier)
248+
249+
# Reflect the files back up a level.
250+
async for r in minio_files:
251+
yield {
252+
'bucket': request_id,
253+
'file': r,
254+
}
255+
256+
# Cache the final status
257+
await self._update_query_status(client, request_id)
258+
259+
except ServiceXUnknownRequestID as e:
260+
self._cache.remove_query(query)
261+
raise ServiceXException('Expected the ServiceX backend to know about query '
262+
f'{request_id}. It did not. Cleared local cache. '
263+
'Please resubmit to trigger a new query.') from e
264+
265+
except ServiceXFatalTransformException as e:
266+
transform_status = await self._servicex_adaptor.get_query_status(client,
267+
request_id)
268+
raise ServiceXFatalTransformException(
269+
f'ServiceX Fatal Error: {transform_status["failure-info"]}') from e
270+
271+
except ServiceXFailedFileTransform as e:
272+
self._cache.remove_query(query)
273+
await self._servicex_adaptor.dump_query_errors(client, request_id)
274+
raise ServiceXException(f'Failed to transform all files in {request_id}') from e
275+
201276
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
202277
async def _data_return(self, selection_query: str,
203278
converter: Callable[[Path], Awaitable[Any]],
@@ -415,7 +490,8 @@ async def _get_files_from_servicex(self, request_id: str,
415490
async def _get_minio_bucket_files_from_servicex(self, request_id: str,
416491
client: aiohttp.ClientSession,
417492
minio_adaptor: MinioAdaptor,
418-
notifier: _status_update_wrapper):
493+
notifier: _status_update_wrapper) \
494+
-> AsyncIterator[str]:
419495
'''Create an async stream of `minio` bucket/filenames from a request id.
420496
421497
Args:

tests/test_servicex.py

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def do_context():
118118

119119

120120
@pytest.mark.asyncio
121-
async def test_good_run_root_files(mocker):
121+
async def test_minio_back(mocker):
122122
'Get a root file with a single file'
123123
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
124124
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
@@ -390,6 +390,152 @@ async def test_good_run_single_ds_2file_awkward(mocker, good_awkward_file_data):
390390
assert len(good_awkward_file_data.combine_awkward.call_args[0][0]) == 2
391391

392392

393+
@pytest.mark.asyncio
394+
async def test_good_run_root_files_from_minio(mocker):
395+
'Get a root file pulling back minio info as it arrives'
396+
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
397+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
398+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
399+
mock_logger = mocker.MagicMock(spec=log_adaptor)
400+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
401+
402+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
403+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
404+
minio_adaptor=mock_minio_adaptor, # type: ignore
405+
cache_adaptor=mock_cache,
406+
local_log=mock_logger,
407+
data_convert_adaptor=data_adaptor)
408+
lst = []
409+
async for f_info in ds.get_data_rootfiles_minio_async('(valid qastle string)'):
410+
lst.append(f_info)
411+
412+
assert len(lst) == 1
413+
assert lst[0]['bucket'] == '123-456'
414+
assert lst[0]['file'] == 'one_minio_entry'
415+
416+
assert mock_servicex_adaptor.query_json['result-format'] == 'root-file'
417+
418+
419+
@pytest.mark.asyncio
420+
async def test_bad_request_id_run_root_files_from_minio(mocker):
421+
'Using the minio interface - the request_id is not known'
422+
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
423+
transform_status = mocker.MagicMock(side_effect=ServiceXUnknownRequestID('boom'))
424+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456",
425+
mock_transform_status=transform_status)
426+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
427+
mock_logger = mocker.MagicMock(spec=log_adaptor)
428+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
429+
430+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
431+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
432+
minio_adaptor=mock_minio_adaptor, # type: ignore
433+
cache_adaptor=mock_cache,
434+
local_log=mock_logger,
435+
data_convert_adaptor=data_adaptor)
436+
437+
with pytest.raises(ServiceXException) as e:
438+
lst = []
439+
async for f_info in ds.get_data_rootfiles_minio_async('(valid qastle string)'):
440+
lst.append(f_info)
441+
442+
assert 'to know about' in str(e.value)
443+
444+
445+
@pytest.mark.asyncio
446+
async def test_bad_transform_run_root_files_from_minio(mocker):
447+
'Using the async minio interface - fail to transform (like bad DID)'
448+
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
449+
fatal_transform_status = {
450+
"request_id": "24e59fa2-e1d7-4831-8c7e-82b2efc7c658",
451+
"did": "mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_0000", # NOQA
452+
"columns": "Electrons.pt(), Electrons.eta(), Electrons.phi(), Electrons.e(), Muons.pt(), Muons.eta(), Muons.phi(), Muons.e()", # NOQA
453+
"selection": None,
454+
"tree-name": None,
455+
"image": "sslhep/servicex_func_adl_xaod_transformer:130_reset_cwd",
456+
"chunk-size": 7000,
457+
"workers": 1,
458+
"result-destination": "kafka",
459+
"result-format": "arrow",
460+
"kafka-broker": "servicex-kafka-1.slateci.net:19092",
461+
"workflow-name": "straight_transform",
462+
"generated-code-cm": None,
463+
"status": "Fatal",
464+
"failure-info": "DID Not found mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_0000" # NOQA
465+
}
466+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456",
467+
mock_transform_status=mocker.MagicMock(side_effect=ServiceXFatalTransformException('DID was BAD')), # NOQA
468+
mock_transform_query_status=mocker.MagicMock(return_value=fatal_transform_status))
469+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
470+
mock_logger = mocker.MagicMock(spec=log_adaptor)
471+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
472+
473+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
474+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
475+
minio_adaptor=mock_minio_adaptor, # type: ignore
476+
cache_adaptor=mock_cache,
477+
local_log=mock_logger,
478+
data_convert_adaptor=data_adaptor)
479+
480+
with pytest.raises(ServiceXFatalTransformException) as e:
481+
lst = []
482+
async for f_info in ds.get_data_rootfiles_minio_async('(valid qastle string)'):
483+
lst.append(f_info)
484+
485+
assert 'Fatal Error' in str(e.value)
486+
487+
488+
@pytest.mark.asyncio
489+
async def test_bad_file_transform_run_root_files_from_minio(mocker):
490+
'Using the async minio interface, some files will fail to translate.'
491+
mock_cache = build_cache_mock(mocker)
492+
mock_logger = mocker.MagicMock(spec=log_adaptor)
493+
mock_transform_status = mocker.Mock(return_value=(0, 1, 1))
494+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456", mock_transform_status)
495+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry', 'two_minio_entry'])
496+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
497+
498+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
499+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
500+
minio_adaptor=mock_minio_adaptor, # type: ignore
501+
cache_adaptor=mock_cache,
502+
local_log=mock_logger,
503+
data_convert_adaptor=data_adaptor)
504+
505+
with pytest.raises(ServiceXException) as e:
506+
lst = []
507+
async for f_info in ds.get_data_rootfiles_minio_async('(valid qastle string)'):
508+
lst.append(f_info)
509+
510+
assert 'Failed to transform all files' in str(e.value)
511+
512+
513+
@pytest.mark.asyncio
514+
async def test_good_run_parquet_files_from_minio(mocker):
515+
'Get a parquet file pulling back minio info as it arrives'
516+
mock_cache = build_cache_mock(mocker, data_file_return="/foo/bar.root")
517+
mock_servicex_adaptor = MockServiceXAdaptor(mocker, "123-456")
518+
mock_minio_adaptor = MockMinioAdaptor(mocker, files=['one_minio_entry'])
519+
mock_logger = mocker.MagicMock(spec=log_adaptor)
520+
data_adaptor = mocker.MagicMock(spec=DataConverterAdaptor)
521+
522+
ds = fe.ServiceXDataset('localds://mc16_tev:13',
523+
servicex_adaptor=mock_servicex_adaptor, # type: ignore
524+
minio_adaptor=mock_minio_adaptor, # type: ignore
525+
cache_adaptor=mock_cache,
526+
local_log=mock_logger,
527+
data_convert_adaptor=data_adaptor)
528+
lst = []
529+
async for f_info in ds.get_data_parquet_minio_async('(valid qastle string)'):
530+
lst.append(f_info)
531+
532+
assert len(lst) == 1
533+
assert lst[0]['bucket'] == '123-456'
534+
assert lst[0]['file'] == 'one_minio_entry'
535+
536+
assert mock_servicex_adaptor.query_json['result-format'] == 'parquet'
537+
538+
393539
@pytest.mark.asyncio
394540
async def test_status_exception(mocker):
395541
'Make sure status error - like transform not found - is reported all the way to the top'

0 commit comments

Comments
 (0)