Skip to content

Commit d4a6556

Browse files
authored
Obey default data return formats (#260)
* Fix #259 - get_awkward now properly requests parquet files from an uproot background, and root files from a xaod backend. * get_pandas does the same things * Spell checker was run on multiple source files and spelling updated. * A global variable in `servicex.py` limits you to what data formats you can request. It is now possible to alter that global variable if you want to add a new format without having to `hack` the package.
1 parent 9c5572b commit d4a6556

File tree

5 files changed

+112
-28
lines changed

5 files changed

+112
-28
lines changed

servicex/servicex.py

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def url(self) -> str:
106106

107107
@property
108108
def bucket(self) -> str:
109-
"""Returns the buck name - unique and constant accross transformations.
109+
"""Returns the buck name - unique and constant across transformations.
110110
Can be used to order the results
111111
112112
Returns:
@@ -166,7 +166,7 @@ def data(self) -> Any:
166166

167167
class ServiceXDataset(ServiceXABC):
168168
"""
169-
Used to access an instance of ServiceX at an end point on the internet. Support convieration
169+
Used to access an instance of ServiceX at an end point on the internet. Support conversion
170170
by configuration object `config_adaptor` or by creating the adaptors defined in the `__init__`
171171
function.
172172
"""
@@ -210,7 +210,7 @@ def __init__(
210210
Defaults to object-store, but could be used to save
211211
results to a posix volume
212212
servicex_adaptor Object to control communication with the servicex instance
213-
at a particular ip address with certian login credentials.
213+
at a particular ip address with certain login credentials.
214214
Will be configured via the `config_adaptor` by default.
215215
minio_adaptor Object to control communication with the minio servicex
216216
instance.
@@ -347,7 +347,7 @@ async def get_data_rootfiles_stream(
347347
access the data.
348348
349349
Args:
350-
selection_query (str): The `qastle` query for the data to retreive.
350+
selection_query (str): The `qastle` query for the data to retrieve.
351351
352352
Yields:
353353
AsyncIterator[StreamInfoPath]: As ServiceX completes the data, and it is downloaded
@@ -375,7 +375,7 @@ async def get_data_parquet_stream(
375375
access the data.
376376
377377
Args:
378-
selection_query (str): The `qastle` query for the data to retreive.
378+
selection_query (str): The `qastle` query for the data to retrieve.
379379
380380
Yields:
381381
AsyncIterator[StreamInfoPath]: As ServiceX completes the data, and it is downloaded
@@ -393,9 +393,13 @@ async def get_data_parquet_stream(
393393
async def get_data_pandas_df_async(
394394
self, selection_query: str, title: Optional[str] = None
395395
):
396+
data_format = self._return_types[0]
396397
return self._converter.combine_pandas(
397398
await self._data_return(
398-
selection_query, lambda f: self._converter.convert_to_pandas(f), title
399+
selection_query,
400+
lambda f: self._converter.convert_to_pandas(f),
401+
title,
402+
data_format=data_format,
399403
)
400404
)
401405

@@ -404,9 +408,13 @@ async def get_data_pandas_df_async(
404408
async def get_data_awkward_async(
405409
self, selection_query: str, title: Optional[str] = None
406410
):
411+
data_format = self._return_types[0]
407412
return self._converter.combine_awkward(
408413
await self._data_return(
409-
selection_query, lambda f: self._converter.convert_to_awkward(f), title
414+
selection_query,
415+
lambda f: self._converter.convert_to_awkward(f),
416+
title,
417+
data_format=data_format,
410418
)
411419
)
412420

@@ -417,7 +425,7 @@ async def get_data_awkward_stream(
417425
as a separate `awkward` array. The data is returned in a `StreamInfoData` object.
418426
419427
Args:
420-
selection_query (str): The `qastle` query for the data to retreive.
428+
selection_query (str): The `qastle` query for the data to retrieve.
421429
422430
Yields:
423431
AsyncIterator[StreamInfoData]: As ServiceX completes the data, and it is downloaded
@@ -437,7 +445,7 @@ async def get_data_pandas_stream(
437445
as a separate `pandas.DataFrame` array. The data is returned in a `StreamInfoData` object.
438446
439447
Args:
440-
selection_query (str): The `qastle` query for the data to retreive.
448+
selection_query (str): The `qastle` query for the data to retrieve.
441449
442450
Yields:
443451
AsyncIterator[StreamInfoData]: As ServiceX completes the data, and it is downloaded
@@ -539,7 +547,7 @@ async def _file_return(
539547
Given a query, return the list of files, in a unique order, that hold
540548
the data for the query.
541549
542-
For certian types of exceptions, the queries will be repeated. For example,
550+
For certain types of exceptions, the queries will be repeated. For example,
543551
if `ServiceX` indicates that it was restarted in the middle of the query, then
544552
the query will be re-submitted.
545553
@@ -670,7 +678,7 @@ async def _data_return(
670678
"""Given a query, return the data, in a unique order, that hold
671679
the data for the query.
672680
673-
For certian types of exceptions, the queries will be repeated. For example,
681+
For certain types of exceptions, the queries will be repeated. For example,
674682
if `ServiceX` indicates that it was restarted in the middle of the query, then
675683
the query will be re-submitted.
676684
@@ -710,7 +718,7 @@ async def _stream_return(
710718
"""Given a query, return the data, in the order it arrives back
711719
converted as appropriate.
712720
713-
For certian types of exceptions, the queries will be repeated. For example,
721+
For certain types of exceptions, the queries will be repeated. For example,
714722
if `ServiceX` indicates that it was restarted in the middle of the query, then
715723
the query will be re-submitted.
716724
@@ -741,7 +749,7 @@ async def _stream_local_files(
741749
that contain the results of the query. This is an async generator, and files
742750
are returned as they arrive.
743751
744-
For certian types of exceptions, the queries will be repeated. For example,
752+
For certain types of exceptions, the queries will be repeated. For example,
745753
if `ServiceX` indicates that it was restarted in the middle of the query, then
746754
the query will be re-submitted.
747755
@@ -779,7 +787,7 @@ async def _get_files(
779787
Return a list of files from servicex as they have been downloaded to this machine. The
780788
return type is an awaitable that will yield the path to the file.
781789
782-
For certian types of `ServiceX` failures we will automatically attempt a few retries:
790+
For certain types of `ServiceX` failures we will automatically attempt a few retries:
783791
784792
- When `ServiceX` forgets the query. This sometimes happens when a user submits a
785793
query, and then disconnects from the network, `ServiceX` is restarted, and then the
@@ -815,7 +823,7 @@ async def _get_files(
815823
self._cache.lookup_query_status(request_id)
816824
)
817825

818-
# Look up the cache, and then fetch an iterator going thorugh the results
826+
# Look up the cache, and then fetch an iterator going through the results
819827
# from either servicex or the cache, depending.
820828
try:
821829
cached_files = self._cache.lookup_files(request_id)
@@ -1052,7 +1060,7 @@ def _build_json_query(
10521060
json_query: Dict[str, Union[str, Iterable[str]]] = {
10531061
"selection": selection_query,
10541062
"result-destination": self._result_destination,
1055-
"result-format": "parquet" if data_format == "parquet" else "root-file",
1063+
"result-format": data_format,
10561064
"chunk-size": "1000",
10571065
"workers": str(self._max_workers),
10581066
}

servicex/servicexabc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ async def get_data_pandas_df_async(
119119
) -> pd.DataFrame:
120120
"""
121121
Fetch query data from ServiceX matching `selection_query` and return it as
122-
a pandas dataframe. The data is uniquely ordered (the same query will always
122+
a pandas DataFrame. The data is uniquely ordered (the same query will always
123123
return the same order).
124124
125125
Arguments:
126126
selection_query The `qastle` string specifying the data to be queried
127127
title Title reported to the ServiceX backend for status reporting
128128
129129
Returns:
130-
df The pandas dataframe
130+
df The pandas DataFrame
131131
132132
Exceptions:
133133
xxx If the data is not the correct shape (e.g. a flat,

servicex/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ def _init_tqdm(self):
350350
def _update_bar(
351351
self, bar: Optional[tqdm], total: Optional[int], num: int, failed: int
352352
):
353-
assert bar is not None, "Internal error - bar was not initalized"
353+
assert bar is not None, "Internal error - bar was not initialized"
354354
if total is not None:
355355
if bar.total != total:
356356
# There is a bug in the tqm library if running in a notebook
@@ -417,7 +417,7 @@ def clean_linq(q: str) -> str:
417417
418418
Arguments
419419
420-
q Strign containing the qastle code`
420+
q String containing the qastle code`
421421
422422
Returns
423423

tests/conftest.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ def __init__(
4848
self,
4949
mocker: MockFixture,
5050
request_id: str,
51-
mock_transform_status: Mock = None,
52-
mock_query: Mock = None,
53-
mock_transform_query_status: Mock = None,
51+
mock_transform_status: Optional[Mock] = None,
52+
mock_query: Optional[Mock] = None,
53+
mock_transform_query_status: Optional[Mock] = None,
5454
):
5555
self.request_id = request_id
5656
self._endpoint = "http://localhost:5000"
@@ -139,11 +139,11 @@ def access_called_with(self) -> Optional[Tuple[str, str]]:
139139

140140
def build_cache_mock(
141141
mocker,
142-
query_cache_return: str = None,
142+
query_cache_return: Optional[str] = None,
143143
files: Optional[List[Tuple[str, Path]]] = None,
144144
in_memory: Any = None,
145145
make_in_memory_work: bool = False,
146-
data_file_return: str = None,
146+
data_file_return: Optional[str] = None,
147147
query_status_lookup_return: Optional[Dict[str, str]] = None,
148148
) -> Cache:
149149
c = mocker.MagicMock(spec=Cache)

0 commit comments

Comments
 (0)