Skip to content

Commit 828915e

Browse files
committed
Minor changes to the model for streaming
Fixing it in its final "way".
1 parent 915d6fe commit 828915e

File tree

4 files changed

+35
-28
lines changed

4 files changed

+35
-28
lines changed

.vscode/settings.json

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
{
22
"python.pythonPath": ".venv\\Scripts\\python.exe",
3-
"pyright.openFilesOnly": false,
4-
"pyright.useLibraryCodeForTypes": true,
53
"python.analysis.logLevel": "Information",
64
"python.analysis.memory.keepLibraryAst": true,
75
"python.linting.flake8Enabled": true,
@@ -82,6 +80,7 @@
8280
],
8381
"python.analysis.typeCheckingMode": "basic",
8482
"python.testing.pytestArgs": [
83+
"tests",
8584
"--no-cov"
86-
]
85+
],
8786
}

servicex/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from .servicex import ServiceXDataset # NOQA
1+
from .servicex import ServiceXDataset, StreamInfoUrl # NOQA
22
from .utils import ( # NOQA
33
ServiceXException,
44
ServiceXUnknownRequestID,

servicex/servicex.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import functools
44
import logging
55
import time
6+
from dataclasses import dataclass
67
from datetime import timedelta
78
from pathlib import Path
89
from typing import (Any, AsyncIterator, Awaitable, Callable, Dict, List,
@@ -30,6 +31,15 @@
3031
stream_status_updates, stream_unique_updates_only)
3132

3233

34+
@dataclass
35+
class StreamInfoUrl:
36+
'''Contains information on accessing ServiceX data via a url
37+
'''
38+
url: str
39+
file: str
40+
bucket: str
41+
42+
3343
class ServiceXDataset(ServiceXABC):
3444
'''
3545
Used to access an instance of ServiceX at an end point on the internet. Support convieration
@@ -166,19 +176,20 @@ async def get_data_awkward_async(self, selection_query: str):
166176
return self._converter.combine_awkward(await self._data_return(
167177
selection_query, lambda f: self._converter.convert_to_awkward(f)))
168178

169-
async def get_data_rootfiles_minio_async(self, selection_query: str) \
170-
-> AsyncIterator[Dict[str, str]]:
171-
'''Returns, as an async iterator, each of the files from the minio bucket,
172-
as the files are added there.
179+
async def get_data_rootfiles_url_stream(self, selection_query: str) \
180+
-> AsyncIterator[StreamInfoUrl]:
181+
'''Returns, as an async iterator, each completed batch of work from ServiceX.
182+
The data that comes back includes a `url` that can be accessed to download the
183+
data.
173184
174185
Args:
175186
selection_query (str): The ServiceX Selection
176187
'''
177188
async for f_info in self._get_minio_buckets(selection_query, 'root-files'):
178189
yield f_info
179190

180-
async def get_data_parquet_minio_async(self, selection_query: str) \
181-
-> AsyncIterator[Dict[str, str]]:
191+
async def get_data_parquet_minio_stream(self, selection_query: str) \
192+
-> AsyncIterator[StreamInfoUrl]:
182193
'''Returns, as an async iterator, each of the files from the minio bucket,
183194
as the files are added there.
184195
@@ -214,7 +225,7 @@ async def convert_to_file(f: Path) -> Path:
214225

215226
@on_exception(backoff.constant, ServiceXUnknownRequestID, interval=0.1, max_tries=3)
216227
async def _get_minio_buckets(self, selection_query: str, data_format: str) \
217-
-> AsyncIterator[Dict[str, str]]:
228+
-> AsyncIterator[StreamInfoUrl]:
218229
'''Get a list of files back for a request
219230
220231
Args:
@@ -247,11 +258,7 @@ async def _get_minio_buckets(self, selection_query: str, data_format: str) \
247258

248259
# Reflect the files back up a level.
249260
async for r in minio_files:
250-
yield {
251-
'bucket': request_id,
252-
'file': r,
253-
'url': minio_adaptor.get_access_url(request_id, r),
254-
}
261+
yield StreamInfoUrl(minio_adaptor.get_access_url(request_id, r), r, request_id)
255262

256263
# Cache the final status
257264
await self._update_query_status(client, request_id)

tests/test_servicex.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
from contextlib import contextmanager
33
from pathlib import Path
4+
from servicex.servicex import StreamInfoUrl
45
from servicex.cache import Cache
56

67
from confuse.core import Configuration
@@ -405,14 +406,14 @@ async def test_good_run_root_files_from_minio(mocker):
405406
cache_adaptor=mock_cache,
406407
local_log=mock_logger,
407408
data_convert_adaptor=data_adaptor)
408-
lst = []
409-
async for f_info in ds.get_data_rootfiles_minio_async('(valid qastle string)'):
410-
lst.append(f_info)
409+
lst = [f async for f in ds.get_data_rootfiles_url_stream('(valid qastle string)')]
411410

412411
assert len(lst) == 1
413-
assert lst[0]['bucket'] == '123-456'
414-
assert lst[0]['file'] == 'one_minio_entry'
415-
assert lst[0]['url'] == 'http://the.url.com'
412+
r = lst[0]
413+
assert isinstance(r, StreamInfoUrl)
414+
assert r.bucket == '123-456'
415+
assert r.file == 'one_minio_entry'
416+
assert r.url == 'http://the.url.com'
416417

417418
assert mock_servicex_adaptor.query_json['result-format'] == 'root-file'
418419
assert mock_minio_adaptor.access_called_with == ('123-456', 'one_minio_entry')
@@ -438,7 +439,7 @@ async def test_bad_request_id_run_root_files_from_minio(mocker):
438439

439440
with pytest.raises(ServiceXException) as e:
440441
lst = []
441-
async for f_info in ds.get_data_rootfiles_minio_async('(valid qastle string)'):
442+
async for f_info in ds.get_data_rootfiles_url_stream('(valid qastle string)'):
442443
lst.append(f_info)
443444

444445
assert 'to know about' in str(e.value)
@@ -481,7 +482,7 @@ async def test_bad_transform_run_root_files_from_minio(mocker):
481482

482483
with pytest.raises(ServiceXFatalTransformException) as e:
483484
lst = []
484-
async for f_info in ds.get_data_rootfiles_minio_async('(valid qastle string)'):
485+
async for f_info in ds.get_data_rootfiles_url_stream('(valid qastle string)'):
485486
lst.append(f_info)
486487

487488
assert 'Fatal Error' in str(e.value)
@@ -506,7 +507,7 @@ async def test_bad_file_transform_run_root_files_from_minio(mocker):
506507

507508
with pytest.raises(ServiceXException) as e:
508509
lst = []
509-
async for f_info in ds.get_data_rootfiles_minio_async('(valid qastle string)'):
510+
async for f_info in ds.get_data_rootfiles_url_stream('(valid qastle string)'):
510511
lst.append(f_info)
511512

512513
assert 'Failed to transform all files' in str(e.value)
@@ -528,12 +529,12 @@ async def test_good_run_parquet_files_from_minio(mocker):
528529
local_log=mock_logger,
529530
data_convert_adaptor=data_adaptor)
530531
lst = []
531-
async for f_info in ds.get_data_parquet_minio_async('(valid qastle string)'):
532+
async for f_info in ds.get_data_parquet_minio_stream('(valid qastle string)'):
532533
lst.append(f_info)
533534

534535
assert len(lst) == 1
535-
assert lst[0]['bucket'] == '123-456'
536-
assert lst[0]['file'] == 'one_minio_entry'
536+
assert lst[0].bucket == '123-456'
537+
assert lst[0].file == 'one_minio_entry'
537538

538539
assert mock_servicex_adaptor.query_json['result-format'] == 'parquet'
539540

0 commit comments

Comments
 (0)