Skip to content

Commit ad871af

Browse files
authored
Default return type based on backend-type (#110)
- Allow for a backend type - Used to look things up in the .servicex file
1 parent aaba3a9 commit ad871af

13 files changed

+444
-75
lines changed

.vscode/settings.json

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"aexit",
2626
"aiohttp",
2727
"asyncio",
28+
"backends",
2829
"cacheme",
2930
"codecov",
3031
"dcache",
@@ -54,6 +55,7 @@
5455
"prereleased",
5556
"protomolecule",
5657
"ptetaphi",
58+
"pyarrow",
5759
"pypa",
5860
"pypi",
5961
"pytest",
@@ -75,8 +77,5 @@
7577
"xaod",
7678
"xrootd"
7779
],
78-
"python.analysis.typeCheckingMode": "basic",
79-
"python.testing.pytestArgs": [
80-
"--no-cov"
81-
]
80+
"python.analysis.typeCheckingMode": "basic"
8281
}

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ Create a `.servicex` file, in the `yaml` format, in the appropriate place for yo
3939

4040
```yaml
4141
api_endpoints:
42-
- endpoint: <your-endpoint>
42+
- endpoint: <your-endpoint-url>
4343
email: <api-email>
4444
password: <api-password>
4545
type: xaod
@@ -105,6 +105,7 @@ The file can contain an `api_endpoint` as mentioned above. In addition the other
105105
- Linux: `cache_path: "/home/servicex-cache"`
106106

107107
- `minio_endpoint`, `minio_username`, `minio_password` - these are only interesting if you are using a pre-RC2 release of `servicex` - when the `minio` information wasn't part of the API exchange. This feature is depreciated and will be removed around the time `servicex` moves to RC3.
108+
- `backend_types` - a list of yaml dictionaries that contains some defaults for the backends. By default only the `return_data` is there, which for `xaod` is `root` and `uproot` is `parquet`. Allows `servicex` to convert to `pandas.DataFrame` or `awkward` if requested by the user.
108109

109110
All strings are expanded using python's [os.path.expand](https://docs.python.org/3/library/os.path.html#os.path.expandvars) method - so `$NAME` and `${NAME}` will work to expand existing environment variables.
110111

servicex/config_default.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,16 @@ cache_path: /tmp/servicex
2323
# This is a dummy value, here only to make sure that unit testing
2424
# works properly before package release.
2525
testing_value: 10
26+
27+
# If we can't figure out what backend the user is going to use, we
28+
# return this sort of file. Parquet for the uproot backend, and root for the
29+
# xaod backend.
30+
default_return_data: parquet
31+
32+
# Defaults for the various types of servicex backends that we might deal with.
33+
# Easy enough to add a new one here...
34+
backend_types:
35+
- type: xaod
36+
return_data: root
37+
- type: uproot
38+
return_data: parquet

servicex/data_conversions.py

Lines changed: 142 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,171 @@
11
from pathlib import Path
22
from concurrent.futures import ThreadPoolExecutor
33
import asyncio
4-
from typing import Dict, Union
4+
from servicex.utils import ServiceXException
5+
from typing import Dict, Optional, Union
56

6-
_conversion_pool = ThreadPoolExecutor(2)
7+
from awkward.array.chunked import ChunkedArray
78

9+
_conversion_pool = ThreadPoolExecutor(4)
810

9-
async def _convert_root_to_pandas(file: Path):
11+
12+
class DataConverterAdaptor:
13+
'''Methods to convert from one type of data to the other.
1014
'''
11-
Convert the contents of a ROOT file to pandas.
15+
def __init__(self, default_file_type: str):
16+
'''Create a data converter adaptor. By default it will do the
17+
conversation as requested.
1218
13-
Arguments:
19+
Args:
20+
default_file_type (str): The default file type (`parquet` or `root`)
21+
'''
22+
self._default_file_type = default_file_type
1423

15-
file A `Path` to the file containing the pandas data
24+
async def convert_to_pandas(self, file: Path, file_type: Optional[str] = None):
25+
'''Convert to a pandas dataframe from data stored in a file of a particular file_type
1626
17-
Returns:
27+
Args:
28+
file (Path): Path to the file
29+
file_type (str): What the file contains (root, parquet, etc)
30+
'''
31+
file_type = file_type if file_type is not None else self._default_file_type
32+
if file_type == 'root':
33+
return await self._convert_root_to_pandas(file)
34+
elif file_type == 'parquet':
35+
return await self._convert_parquet_to_pandas(file)
36+
else:
37+
raise ServiceXException(f'Conversion from {file_type} into an pandas DF is not '
38+
'yet supported')
1839

19-
DataFrame A pandas dataframe
40+
async def convert_to_awkward(self, file: Path, file_type: Optional[str] = None):
41+
'''Convert to an awkward data array from data stored in a file of a particular file_type
2042
21-
Note:
43+
Args:
44+
file (Path): Path to the file
45+
file_type (str): What the file contains (root, parquet, etc)
46+
'''
47+
file_type = file_type if file_type is not None else self._default_file_type
48+
if file_type == 'root':
49+
return await self._convert_root_to_awkward(file)
50+
elif file_type == 'parquet':
51+
return await self._convert_parquet_to_awkward(file)
52+
else:
53+
raise ServiceXException(f'Conversion from {file_type} into an awkward array is not '
54+
'yet supported')
2255

23-
- Work is done on a second thread.
24-
- Pandas is only imported if this is called.
56+
async def _convert_root_to_pandas(self, file: Path):
57+
'''
58+
Convert the contents of a ROOT file to pandas.
2559
26-
'''
27-
from pandas import DataFrame
60+
Arguments:
2861
29-
def do_the_work(file: Path) -> DataFrame:
30-
import uproot
62+
file A `Path` to the file containing the pandas data
3163
32-
f_in = uproot.open(file)
33-
try:
34-
r = f_in[f_in.keys()[0]]
35-
return r.pandas.df() # type: ignore
36-
finally:
37-
f_in._context.source.close()
64+
Returns:
3865
39-
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
66+
DataFrame A pandas dataframe
4067
68+
Note:
4169
42-
async def _convert_root_to_awkward(file: Path):
43-
'''
44-
Convert the contents of a ROOT file to an awkward dictionary.
70+
- Work is done on a second thread.
71+
- Pandas is only imported if this is called.
4572
46-
Arguments:
73+
'''
74+
from pandas import DataFrame
4775

48-
file A `Path` to the file containing the pandas data
76+
def do_the_work(file: Path) -> DataFrame:
77+
import uproot
4978

50-
Returns:
79+
f_in = uproot.open(file)
80+
try:
81+
r = f_in[f_in.keys()[0]]
82+
return r.pandas.df() # type: ignore
83+
finally:
84+
f_in._context.source.close()
5185

52-
DataFrame A pandas dataframe
86+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
5387

54-
Note:
88+
async def _convert_parquet_to_pandas(self, file: Path):
89+
'''
90+
Convert the contents of a parquet file to pandas.
5591
56-
- Work is done on a second thread.
57-
- Pandas is only imported if this is called.
92+
Arguments:
5893
59-
'''
60-
from numpy import ndarray
61-
from awkward import JaggedArray
94+
file A `Path` to the file containing the pandas data
95+
96+
Returns:
97+
98+
DataFrame A pandas dataframe
99+
100+
Note:
101+
102+
- Work is done on a second thread.
103+
- Pandas is only imported if this is called.
104+
105+
'''
106+
import pandas as pd
107+
108+
def do_the_work(file: Path) -> pd.DataFrame:
109+
return pd.read_parquet(str(file))
110+
111+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
112+
113+
async def _convert_root_to_awkward(self, file: Path):
114+
'''
115+
Convert the contents of a ROOT file to an awkward dictionary.
116+
117+
Arguments:
118+
119+
file A `Path` to the file containing the pandas data
120+
121+
Returns:
122+
123+
DataFrame A pandas dataframe
124+
125+
Note:
126+
127+
- Work is done on a second thread.
128+
- Pandas is only imported if this is called.
129+
130+
'''
131+
from numpy import ndarray
132+
from awkward import JaggedArray
133+
134+
def do_the_work(file: Path) -> Dict[Union[str, bytes], Union[ndarray, JaggedArray]]:
135+
import uproot
136+
137+
f_in = uproot.open(file)
138+
try:
139+
r = f_in[f_in.keys()[0]]
140+
return r.lazyarrays() # type: ignore
141+
finally:
142+
f_in._context.source.close()
143+
144+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
145+
146+
async def _convert_parquet_to_awkward(self, file: Path):
147+
'''
148+
Convert the contents of a parquet file to an awkward dictionary.
149+
150+
Arguments:
151+
152+
file A `Path` to the file containing the pandas data
153+
154+
Returns:
155+
156+
DataFrame A pandas dataframe
157+
158+
Note:
159+
160+
- Work is done on a second thread.
161+
- Pandas is only imported if this is called.
62162
63-
def do_the_work(file: Path) -> Dict[bytes, Union[ndarray, JaggedArray]]:
64-
import uproot
163+
'''
164+
import awkward as ak
65165

66-
f_in = uproot.open(file)
67-
try:
68-
r = f_in[f_in.keys()[0]]
69-
return r.lazyarrays() # type: ignore
70-
finally:
71-
f_in._context.source.close()
166+
def do_the_work(file: Path) -> \
167+
Union[Dict[Union[str, bytes], ak.ChunkedArray], ChunkedArray]:
168+
# TODO: When we move to awkward1, make sure this becomes lazy
169+
return ak.fromparquet(str(file))
72170

73-
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
171+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))

servicex/servicex.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import asyncio
33
import functools
44
import logging
5+
from servicex.servicex_config import ServiceXConfigAdaptor
56
import time
67
from datetime import timedelta
78
from pathlib import Path
@@ -11,11 +12,9 @@
1112
import aiohttp
1213
import backoff
1314
from backoff import on_exception
14-
from confuse import ConfigView
1515

1616
from .cache import Cache
17-
from .ConfigSettings import ConfigSettings
18-
from .data_conversions import _convert_root_to_awkward, _convert_root_to_pandas
17+
from .data_conversions import DataConverterAdaptor
1918
from .minio_adaptor import (MinioAdaptor, MinioAdaptorFactory,
2019
find_new_bucket_files)
2120
from .servicex_adaptor import (ServiceXAdaptor, servicex_adaptor_factory,
@@ -46,7 +45,8 @@ def __init__(self,
4645
status_callback_factory: Optional[StatusUpdateFactory] = _run_default_wrapper,
4746
local_log: log_adaptor = None,
4847
session_generator: Callable[[], Awaitable[aiohttp.ClientSession]] = None,
49-
config_adaptor: ConfigView = None):
48+
config_adaptor: Optional[ServiceXConfigAdaptor] = None,
49+
data_convert_adaptor: Optional[DataConverterAdaptor] = None):
5050
'''
5151
Create and configure a ServiceX object for a dataset.
5252
@@ -76,6 +76,9 @@ def __init__(self,
7676
`servicex` queries is used.
7777
config_adaptor Control how configuration options are read from the
7878
`.servicex` file.
79+
data_convert_adaptor Manages conversions between root and parquet and `pandas`
80+
and `awkward`, including default settings for expected
81+
datatypes from the backend.
7982
8083
Notes:
8184
@@ -97,21 +100,21 @@ def __init__(self,
97100

98101
# Get the local settings
99102
config = config_adaptor if config_adaptor is not None \
100-
else ConfigSettings('servicex', 'servicex')
103+
else ServiceXConfigAdaptor()
101104

102105
# Establish the cache that will store all our queries
103-
self._cache = Cache(get_configured_cache_path(config)) \
106+
self._cache = Cache(get_configured_cache_path(config.settings)) \
104107
if cache_adaptor is None \
105108
else cache_adaptor
106109

107110
if not servicex_adaptor:
108111
# Given servicex adaptor is none, this should be ok. Fixes type checkers
109112
assert backend_type is not None
110-
servicex_adaptor = servicex_adaptor_factory(config, backend_type)
113+
servicex_adaptor = servicex_adaptor_factory(config.settings, backend_type)
111114
self._servicex_adaptor = servicex_adaptor
112115

113116
if not minio_adaptor:
114-
self._minio_adaptor = MinioAdaptorFactory(config)
117+
self._minio_adaptor = MinioAdaptorFactory(config.settings)
115118
else:
116119
if isinstance(minio_adaptor, MinioAdaptor):
117120
self._minio_adaptor = MinioAdaptorFactory(always_return=minio_adaptor)
@@ -123,6 +126,9 @@ def __init__(self,
123126
self._session_generator = session_generator if session_generator is not None \
124127
else default_client_session
125128

129+
self._converter = data_convert_adaptor if data_convert_adaptor is not None \
130+
else DataConverterAdaptor(config.get_default_returned_datatype(backend_type))
131+
126132
@functools.wraps(ServiceXABC.get_data_rootfiles_async, updated=())
127133
@_wrap_in_memory_sx_cache
128134
async def get_data_rootfiles_async(self, selection_query: str) -> List[Path]:
@@ -137,13 +143,15 @@ async def get_data_parquet_async(self, selection_query: str) -> List[Path]:
137143
@_wrap_in_memory_sx_cache
138144
async def get_data_pandas_df_async(self, selection_query: str):
139145
import pandas as pd
140-
return pd.concat(await self._data_return(selection_query, _convert_root_to_pandas))
146+
return pd.concat(await self._data_return(
147+
selection_query, lambda f: self._converter.convert_to_pandas(f)))
141148

142149
@functools.wraps(ServiceXABC.get_data_awkward_async, updated=())
143150
@_wrap_in_memory_sx_cache
144151
async def get_data_awkward_async(self, selection_query: str):
145152
import awkward
146-
all_data = await self._data_return(selection_query, _convert_root_to_awkward)
153+
all_data = await self._data_return(
154+
selection_query, lambda f: self._converter.convert_to_awkward(f))
147155
col_names = all_data[0].keys()
148156
return {c: awkward.concatenate([ar[c] for ar in all_data]) for c in col_names}
149157

0 commit comments

Comments
 (0)