Skip to content

Commit 368d441

Browse files
committed
Merge branch 'master' into oauth
2 parents e35b095 + 26104d9 commit 368d441

15 files changed

+891
-202
lines changed

.vscode/settings.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
"aexit",
2727
"aiohttp",
2828
"asyncio",
29+
"awks",
30+
"backends",
2931
"cacheme",
3032
"codecov",
3133
"dcache",
@@ -55,6 +57,7 @@
5557
"prereleased",
5658
"protomolecule",
5759
"ptetaphi",
60+
"pyarrow",
5861
"pypa",
5962
"pypi",
6063
"pytest",
@@ -76,8 +79,5 @@
7679
"xaod",
7780
"xrootd"
7881
],
79-
"python.analysis.typeCheckingMode": "basic",
80-
"python.testing.pytestArgs": [
81-
"--no-cov"
82-
]
82+
"python.analysis.typeCheckingMode": "basic"
8383
}

README.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ The `servicex` library searches for configuration information in several locatio
3535

3636
If no endpoint is specified, then the library defaults to the developer endpoint, which is `http://localhost:5000` for the web-service API, and `localhost:9000` for the `minio` endpoint. No passwords are required.
3737

38-
Create a `.servicex` file, in the `yaml` format, in the appropriate place for your work that contains the following:
38+
Create a `.servicex` file, in the `yaml` format, in the appropriate place for your work that contains the following (for the `xaod` backend; use `uproot` for the uproot backend):
3939

4040
```yaml
41-
<<<<<<< HEAD
4241
api_endpoints:
4342
- endpoint: <your-endpoint>
4443
token: <api-token>
@@ -59,7 +58,7 @@ The following lines will return a `pandas.DataFrame` containing all the jet pT's
5958
from servicex import ServiceX
6059
query = "(call ResultTTree (call Select (call SelectMany (call EventDataset (list 'localds:bogus')) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (/ (call (attr j 'pt')) 1000.0))) (list 'JetPt') 'analysis' 'junk.root')"
6160
dataset = "mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_00"
62-
ds = ServiceXDataset(dataset, 'xaod')
61+
ds = ServiceXDataset(dataset)
6362
r = ds.get_data_pandas_df(query)
6463
print(r)
6564
```
@@ -105,6 +104,7 @@ The file can contain an `api_endpoint` as mentioned above. In addition the other
105104
- Linux: `cache_path: "/home/servicex-cache"`
106105

107106
- `minio_endpoint`, `minio_username`, `minio_password` - these are only interesting if you are using a pre-RC2 release of `servicex` - when the `minio` information wasn't part of the API exchange. This feature is depreciated and will be removed around the time `servicex` moves to RC3.
107+
- `backend_types` - a list of yaml dictionaries that contains some defaults for the backends. By default only the `return_data` is there, which for `xaod` is `root` and `uproot` is `parquet`. Allows `servicex` to convert to `pandas.DataFrame` or `awkward` if requested by the user.
108108

109109
All strings are expanded using python's [os.path.expand](https://docs.python.org/3/library/os.path.html#os.path.expandvars) method - so `$NAME` and `${NAME}` will work to expand existing environment variables.
110110

@@ -161,8 +161,9 @@ Everything is based around the `ServiceXDataset` object. Below is the documentat
161161
dataset Name of a dataset from which queries will be selected.
162162
backend_type The type of backend. Used only if we need to find an
163163
end-point. If we do not have a `servicex_adaptor` then this
164-
cannot be null. Possible types are `uproot`, `xaod`,
165-
and anything that finds a match in the `.servicex` file.
164+
will default to xaod, unless you have any endpoint listed
165+
in your servicex file. It will default to best match there,
166+
in that case.
166167
image Name of transformer image to use to transform the data
167168
max_workers Maximum number of transformers to run simultaneously on
168169
ServiceX.

servicex/config_default.yaml

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
# Default settings for servicex. This will point you to a developer end-point, that
22
# you've setup on your own machine (usually using k8's port-forward command):
33

4-
api_endpoint:
5-
endpoint: http://localhost:5000
6-
# token: xxx
4+
api_endpoints:
5+
- endpoint: http://localhost:5000
6+
# token: xxx
77

8+
# These are default settings, depreciated, and should not be used.
9+
# They will be removed in the next version.
10+
api_endpoint:
811
minio_endpoint: localhost:9000
912
# The username and password for accessing files generated by servicex.
1013
# NOTE:
@@ -22,3 +25,16 @@ cache_path: /tmp/servicex
2225
# This is a dummy value, here only to make sure that unit testing
2326
# works properly before package release.
2427
testing_value: 10
28+
29+
# If we can't figure out what backend the user is going to use, we
30+
# return this sort of file. Parquet for the uproot backend, and root for the
31+
# xaod backend.
32+
default_return_data: parquet
33+
34+
# Defaults for the various types of servicex backends that we might deal with.
35+
# Easy enough to add a new one here...
36+
backend_types:
37+
- type: xaod
38+
return_data: root
39+
- type: uproot
40+
return_data: parquet

servicex/data_conversions.py

Lines changed: 161 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,190 @@
1-
from pathlib import Path
2-
from concurrent.futures import ThreadPoolExecutor
31
import asyncio
4-
from typing import Dict, Union
2+
from concurrent.futures import ThreadPoolExecutor
3+
from pathlib import Path
4+
from typing import Dict, Iterable, Optional, Union
5+
from awkward.array.chunked import ChunkedArray
6+
from awkward.array.table import Table
7+
8+
import pandas as pd
9+
import awkward as ak
510

6-
_conversion_pool = ThreadPoolExecutor(2)
11+
from servicex.utils import ServiceXException
712

13+
_conversion_pool = ThreadPoolExecutor(4)
814

9-
async def _convert_root_to_pandas(file: Path):
15+
16+
class DataConverterAdaptor:
17+
'''Methods to convert from one type of data to the other.
1018
'''
11-
Convert the contents of a ROOT file to pandas.
19+
def __init__(self, default_file_type: str):
20+
'''Create a data converter adaptor. By default it will do the
21+
conversation as requested.
1222
13-
Arguments:
23+
Args:
24+
default_file_type (str): The default file type (`parquet` or `root`)
25+
'''
26+
self._default_file_type = default_file_type
1427

15-
file A `Path` to the file containing the pandas data
28+
async def convert_to_pandas(self, file: Path, file_type: Optional[str] = None):
29+
'''Convert to a pandas dataframe from data stored in a file of a particular file_type
1630
17-
Returns:
31+
Args:
32+
file (Path): Path to the file
33+
file_type (str): What the file contains (root, parquet, etc)
34+
'''
35+
file_type = file_type if file_type is not None else self._default_file_type
36+
if file_type == 'root':
37+
return await self._convert_root_to_pandas(file)
38+
elif file_type == 'parquet':
39+
return await self._convert_parquet_to_pandas(file)
40+
else:
41+
raise ServiceXException(f'Conversion from {file_type} into an pandas DF is not '
42+
'yet supported')
1843

19-
DataFrame A pandas dataframe
44+
async def convert_to_awkward(self, file: Path, file_type: Optional[str] = None):
45+
'''Convert to an awkward data array from data stored in a file of a particular file_type
2046
21-
Note:
47+
Args:
48+
file (Path): Path to the file
49+
file_type (str): What the file contains (root, parquet, etc)
50+
'''
51+
file_type = file_type if file_type is not None else self._default_file_type
52+
if file_type == 'root':
53+
return await self._convert_root_to_awkward(file)
54+
elif file_type == 'parquet':
55+
return await self._convert_parquet_to_awkward(file)
56+
else:
57+
raise ServiceXException(f'Conversion from {file_type} into an awkward array is not '
58+
'yet supported')
2259

23-
- Work is done on a second thread.
24-
- Pandas is only imported if this is called.
60+
def combine_pandas(self, dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
61+
'''Combine many pandas dataframes into a single one, in order.
2562
26-
'''
27-
from pandas import DataFrame
63+
Args:
64+
dfs (Iterable[pd.DataFrame]): The list of dataframes
65+
'''
66+
return pd.concat(dfs)
2867

29-
def do_the_work(file: Path) -> DataFrame:
30-
import uproot
68+
def combine_awkward(self, awks: Iterable[Union[Table, ChunkedArray]]) -> Table:
69+
'''Combine many awkward arrays into a single one, in order.
3170
32-
f_in = uproot.open(file)
33-
try:
34-
r = f_in[f_in.keys()[0]]
35-
return r.pandas.df() # type: ignore
36-
finally:
37-
f_in._context.source.close()
71+
Args:
72+
awks (Iterable[ChunkedArray]): The input list of awkward arrays
73+
'''
74+
return ak.concatenate(awks)
3875

39-
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
76+
async def _convert_root_to_pandas(self, file: Path):
77+
'''
78+
Convert the contents of a ROOT file to pandas.
4079
80+
Arguments:
4181
42-
async def _convert_root_to_awkward(file: Path):
43-
'''
44-
Convert the contents of a ROOT file to an awkward dictionary.
82+
file A `Path` to the file containing the pandas data
4583
46-
Arguments:
84+
Returns:
4785
48-
file A `Path` to the file containing the pandas data
86+
DataFrame A pandas dataframe
4987
50-
Returns:
88+
Note:
5189
52-
DataFrame A pandas dataframe
90+
- Work is done on a second thread.
91+
- Pandas is only imported if this is called.
5392
54-
Note:
93+
'''
94+
from pandas import DataFrame
5595

56-
- Work is done on a second thread.
57-
- Pandas is only imported if this is called.
96+
def do_the_work(file: Path) -> DataFrame:
97+
import uproot
5898

59-
'''
60-
from numpy import ndarray
61-
from awkward import JaggedArray
99+
f_in = uproot.open(file)
100+
try:
101+
r = f_in[f_in.keys()[0]]
102+
return r.pandas.df() # type: ignore
103+
finally:
104+
f_in._context.source.close()
105+
106+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
107+
108+
async def _convert_parquet_to_pandas(self, file: Path):
109+
'''
110+
Convert the contents of a parquet file to pandas.
111+
112+
Arguments:
113+
114+
file A `Path` to the file containing the pandas data
115+
116+
Returns:
117+
118+
DataFrame A pandas dataframe
119+
120+
Note:
121+
122+
- Work is done on a second thread.
123+
- Pandas is only imported if this is called.
124+
125+
'''
126+
import pandas as pd
127+
128+
def do_the_work(file: Path) -> pd.DataFrame:
129+
return pd.read_parquet(str(file))
62130

63-
def do_the_work(file: Path) -> Dict[bytes, Union[ndarray, JaggedArray]]:
64-
import uproot
131+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
65132

66-
f_in = uproot.open(file)
67-
try:
133+
async def _convert_root_to_awkward(self, file: Path):
134+
'''
135+
Convert the contents of a ROOT file to an awkward dictionary.
136+
137+
Arguments:
138+
139+
file A `Path` to the file containing the pandas data
140+
141+
Returns:
142+
143+
DataFrame A pandas dataframe
144+
145+
Note:
146+
147+
- Work is done on a second thread.
148+
- Awkward is only imported if this is called.
149+
- A LazyArray is returned, so it isn't completely loaded into memory. That also means
150+
this will leak filehandles - as that has to be left open.
151+
152+
'''
153+
from numpy import ndarray
154+
from awkward import JaggedArray
155+
156+
def do_the_work(file: Path) -> Dict[Union[str, bytes], Union[ndarray, JaggedArray]]:
157+
import uproot
158+
159+
f_in = uproot.open(file)
68160
r = f_in[f_in.keys()[0]]
69161
return r.lazyarrays() # type: ignore
70-
finally:
71-
f_in._context.source.close()
72162

73-
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
163+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
164+
165+
async def _convert_parquet_to_awkward(self, file: Path):
166+
'''
167+
Convert the contents of a parquet file to an awkward dictionary.
168+
169+
Arguments:
170+
171+
file A `Path` to the file containing the pandas data
172+
173+
Returns:
174+
175+
DataFrame A pandas dataframe
176+
177+
Note:
178+
179+
- Work is done on a second thread.
180+
- Pandas is only imported if this is called.
181+
182+
'''
183+
import awkward as ak
184+
185+
def do_the_work(file: Path) -> \
186+
Union[Dict[Union[str, bytes], ak.ChunkedArray], ChunkedArray]:
187+
# TODO: When we move to awkward1, make sure this becomes lazy
188+
return ak.fromparquet(str(file))
189+
190+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))

0 commit comments

Comments
 (0)