Skip to content

Commit ce602b1

Browse files
authored
Combine awkward and panads separately (#117)
Pull out how we do combining pandas and awkward - turns out to need more testing than we had been doing as there are some odd edge cases.
1 parent ad871af commit ce602b1

12 files changed

+469
-158
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"aexit",
2626
"aiohttp",
2727
"asyncio",
28+
"awks",
2829
"backends",
2930
"cacheme",
3031
"codecov",

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ The following lines will return a `pandas.DataFrame` containing all the jet pT's
5959
from servicex import ServiceX
6060
query = "(call ResultTTree (call Select (call SelectMany (call EventDataset (list 'localds:bogus')) (lambda (list e) (call (attr e 'Jets') 'AntiKt4EMTopoJets'))) (lambda (list j) (/ (call (attr j 'pt')) 1000.0))) (list 'JetPt') 'analysis' 'junk.root')"
6161
dataset = "mc15_13TeV:mc15_13TeV.361106.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zee.merge.DAOD_STDM3.e3601_s2576_s2132_r6630_r6264_p2363_tid05630052_00"
62-
ds = ServiceXDataset(dataset, 'xaod')
62+
ds = ServiceXDataset(dataset)
6363
r = ds.get_data_pandas_df(query)
6464
print(r)
6565
```
@@ -162,8 +162,9 @@ Everything is based around the `ServiceXDataset` object. Below is the documentat
162162
dataset Name of a dataset from which queries will be selected.
163163
backend_type The type of backend. Used only if we need to find an
164164
end-point. If we do not have a `servicex_adaptor` then this
165-
cannot be null. Possible types are `uproot`, `xaod`,
166-
and anything that finds a match in the `.servicex` file.
165+
will default to xaod, unless you have any endpoint listed
166+
in your servicex file. It will default to best match there,
167+
in that case.
167168
image Name of transformer image to use to transform the data
168169
max_workers Maximum number of transformers to run simultaneously on
169170
ServiceX.

servicex/config_default.yaml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
# Default settings for servicex. This will point you to a developer end-point, that
22
# you've setup on your own machine (usually using k8's port-forward command):
33

4-
api_endpoint:
5-
endpoint: http://localhost:5000
6-
# email: xxx
7-
# password: yyy
4+
api_endpoints:
5+
- endpoint: http://localhost:5000
6+
# email: xxx
7+
# password: yyy
88

9+
# These are default settings, depreciated, and should not be used.
10+
# They will be removed in the next version.
11+
api_endpoint:
912
minio_endpoint: localhost:9000
1013
# The username and password for accessing files generated by servicex.
1114
# NOTE:

servicex/data_conversions.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
from pathlib import Path
2-
from concurrent.futures import ThreadPoolExecutor
31
import asyncio
4-
from servicex.utils import ServiceXException
5-
from typing import Dict, Optional, Union
6-
2+
from concurrent.futures import ThreadPoolExecutor
3+
from pathlib import Path
4+
from typing import Dict, Iterable, Optional, Union
75
from awkward.array.chunked import ChunkedArray
6+
from awkward.array.table import Table
7+
8+
import pandas as pd
9+
import awkward as ak
10+
11+
from servicex.utils import ServiceXException
812

913
_conversion_pool = ThreadPoolExecutor(4)
1014

@@ -53,6 +57,22 @@ async def convert_to_awkward(self, file: Path, file_type: Optional[str] = None):
5357
raise ServiceXException(f'Conversion from {file_type} into an awkward array is not '
5458
'yet supported')
5559

60+
def combine_pandas(self, dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
61+
'''Combine many pandas dataframes into a single one, in order.
62+
63+
Args:
64+
dfs (Iterable[pd.DataFrame]): The list of dataframes
65+
'''
66+
return pd.concat(dfs)
67+
68+
def combine_awkward(self, awks: Iterable[Union[Table, ChunkedArray]]) -> Table:
69+
'''Combine many awkward arrays into a single one, in order.
70+
71+
Args:
72+
awks (Iterable[ChunkedArray]): The input list of awkward arrays
73+
'''
74+
return ak.concatenate(awks)
75+
5676
async def _convert_root_to_pandas(self, file: Path):
5777
'''
5878
Convert the contents of a ROOT file to pandas.
@@ -125,7 +145,9 @@ async def _convert_root_to_awkward(self, file: Path):
125145
Note:
126146
127147
- Work is done on a second thread.
128-
- Pandas is only imported if this is called.
148+
- Awkward is only imported if this is called.
149+
- A LazyArray is returned, so it isn't completely loaded into memory. That also means this
150+
will leak filehandles - as that has to be left open.
129151
130152
'''
131153
from numpy import ndarray
@@ -135,11 +157,8 @@ def do_the_work(file: Path) -> Dict[Union[str, bytes], Union[ndarray, JaggedArra
135157
import uproot
136158

137159
f_in = uproot.open(file)
138-
try:
139-
r = f_in[f_in.keys()[0]]
140-
return r.lazyarrays() # type: ignore
141-
finally:
142-
f_in._context.source.close()
160+
r = f_in[f_in.keys()[0]]
161+
return r.lazyarrays() # type: ignore
143162

144163
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
145164

servicex/servicex.py

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import asyncio
33
import functools
44
import logging
5-
from servicex.servicex_config import ServiceXConfigAdaptor
65
import time
76
from datetime import timedelta
87
from pathlib import Path
@@ -13,12 +12,14 @@
1312
import backoff
1413
from backoff import on_exception
1514

15+
from servicex.servicex_config import ServiceXConfigAdaptor
16+
1617
from .cache import Cache
1718
from .data_conversions import DataConverterAdaptor
1819
from .minio_adaptor import (MinioAdaptor, MinioAdaptorFactory,
1920
find_new_bucket_files)
20-
from .servicex_adaptor import (ServiceXAdaptor, servicex_adaptor_factory,
21-
transform_status_stream, trap_servicex_failures)
21+
from .servicex_adaptor import (ServiceXAdaptor, transform_status_stream,
22+
trap_servicex_failures)
2223
from .servicex_utils import _wrap_in_memory_sx_cache
2324
from .servicexabc import ServiceXABC
2425
from .utils import (ServiceXException, ServiceXFailedFileTransform,
@@ -55,8 +56,9 @@ def __init__(self,
5556
dataset Name of a dataset from which queries will be selected.
5657
backend_type The type of backend. Used only if we need to find an
5758
end-point. If we do not have a `servicex_adaptor` then this
58-
cannot be null. Possible types are `uproot`, `xaod`,
59-
and anything that finds a match in the `.servicex` file.
59+
will default to xaod, unless you have any endpoint listed
60+
in your servicex file. It will default to best match there,
61+
in that case.
6062
image Name of transformer image to use to transform the data
6163
max_workers Maximum number of transformers to run simultaneously on
6264
ServiceX.
@@ -94,10 +96,6 @@ def __init__(self,
9496
status_callback_factory,
9597
)
9698

97-
# Make sure the arguments are reasonable
98-
if backend_type is None and servicex_adaptor is None:
99-
raise ServiceXException('Specify backend_type or servicex_adaptor')
100-
10199
# Get the local settings
102100
config = config_adaptor if config_adaptor is not None \
103101
else ServiceXConfigAdaptor()
@@ -109,8 +107,8 @@ def __init__(self,
109107

110108
if not servicex_adaptor:
111109
# Given servicex adaptor is none, this should be ok. Fixes type checkers
112-
assert backend_type is not None
113-
servicex_adaptor = servicex_adaptor_factory(config.settings, backend_type)
110+
end_point, email, password = config.get_servicex_adaptor_config(backend_type)
111+
servicex_adaptor = ServiceXAdaptor(end_point, email, password)
114112
self._servicex_adaptor = servicex_adaptor
115113

116114
if not minio_adaptor:
@@ -142,18 +140,14 @@ async def get_data_parquet_async(self, selection_query: str) -> List[Path]:
142140
@functools.wraps(ServiceXABC.get_data_pandas_df_async, updated=())
143141
@_wrap_in_memory_sx_cache
144142
async def get_data_pandas_df_async(self, selection_query: str):
145-
import pandas as pd
146-
return pd.concat(await self._data_return(
143+
return self._converter.combine_pandas(await self._data_return(
147144
selection_query, lambda f: self._converter.convert_to_pandas(f)))
148145

149146
@functools.wraps(ServiceXABC.get_data_awkward_async, updated=())
150147
@_wrap_in_memory_sx_cache
151148
async def get_data_awkward_async(self, selection_query: str):
152-
import awkward
153-
all_data = await self._data_return(
154-
selection_query, lambda f: self._converter.convert_to_awkward(f))
155-
col_names = all_data[0].keys()
156-
return {c: awkward.concatenate([ar[c] for ar in all_data]) for c in col_names}
149+
return self._converter.combine_awkward(await self._data_return(
150+
selection_query, lambda f: self._converter.convert_to_awkward(f)))
157151

158152
async def _file_return(self, selection_query: str, data_format: str):
159153
'''

servicex/servicex_adaptor.py

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import logging
55

66
import aiohttp
7-
from confuse import ConfigView
87
from google.auth import jwt
98

109
from .utils import (
@@ -249,35 +248,3 @@ async def trap_servicex_failures(stream: AsyncIterator[TransformTuple]) \
249248
f'processed: {processed}).')
250249

251250
yield p
252-
253-
254-
def servicex_adaptor_factory(c: ConfigView, backend_type: str) -> ServiceXAdaptor:
255-
'''Given a configuration and the backend, find an appropriate configuration
256-
for us to grab and create a `servicex_adaptor`.
257-
258-
Args:
259-
c (ConfigView): The config information loaded form files.
260-
backend_type (str): The backend type we need to match
261-
262-
Returns:
263-
[ServiceXAdaptor]: A servicex adaptor.
264-
'''
265-
# Find a list of all endpoints.
266-
# It is an error if this is not specified somewhere.
267-
endpoints = c['api_endpoints']
268-
seen_types = []
269-
for ep in endpoints:
270-
if ep['type'].as_str_expanded() == backend_type:
271-
endpoint = ep['endpoint'].as_str_expanded()
272-
email = ep['email'].as_str_expanded() if 'email' in ep else None
273-
password = ep['password'].as_str_expanded() if 'password' in ep \
274-
else None
275-
276-
# We can default these to "None"
277-
return ServiceXAdaptor(endpoint, email, password)
278-
else:
279-
seen_types.append(ep['type'].as_str_expanded())
280-
281-
# If we are here, we found no matching type.
282-
raise ServiceXException(f'Unable to find type {backend_type} '
283-
f'in configuration. Saw: {", ".join(seen_types)}')

servicex/servicex_config.py

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from typing import Optional, cast
1+
import logging
2+
from typing import Optional, Tuple, cast
23

34
from confuse.core import ConfigView
45

@@ -70,11 +71,76 @@ def get_backend_info(self, backend_type: str, key: str) -> Optional[str]:
7071
def find_in_list(c, key) -> Optional[str]:
7172
if c.exists():
7273
for ep in c:
73-
if ep['type'].as_str() == backend_type:
74-
if key in ep:
75-
return ep[key].as_str_expanded()
74+
if ep['type'].exists():
75+
if ep['type'].as_str() == backend_type:
76+
if key in ep:
77+
return ep[key].as_str_expanded()
7678
return None
7779

7880
a = find_in_list(self._settings['api_endpoints'], key)
7981
a = find_in_list(self._settings['backend_types'], key) if a is None else a
8082
return a
83+
84+
def get_servicex_adaptor_config(self, backend_type: Optional[str] = None) -> \
85+
Tuple[str, Optional[str], Optional[str]]:
86+
'''Return the servicex (endpoint, username, email) from a given backend configuration.
87+
88+
Args:
89+
backend_type (str): The backend name (like `xaod`) which we hopefully can find in the
90+
`.servicex` file.
91+
92+
Returns:
93+
Tuple[str, str, str]: The tuple of info to create a `ServiceXAdaptor`: end point,
94+
email, and password.
95+
'''
96+
# Find a list of all endpoints.
97+
# It is an error if this is not specified somewhere.
98+
endpoints = self._settings['api_endpoints']
99+
100+
def extract_info(ep) -> Tuple[str, Optional[str], Optional[str]]:
101+
endpoint = ep['endpoint'].as_str_expanded()
102+
email = ep['email'].as_str_expanded() if 'email' in ep else None
103+
password = ep['password'].as_str_expanded() if 'password' in ep \
104+
else None
105+
106+
# We can default these to "None"
107+
return (endpoint, email, password) # type: ignore
108+
109+
# If we have a good name, look for exact match
110+
if backend_type is not None:
111+
for ep in endpoints:
112+
if ep['type'].exists() and ep['type'].as_str_expanded() == backend_type:
113+
return extract_info(ep)
114+
115+
# See if one is unlabeled.
116+
log = logging.getLogger(__name__)
117+
for ep in endpoints:
118+
if not ep['type'].exists():
119+
if backend_type is None:
120+
log.warning('No backend type requested, '
121+
f'using {ep["endpoint"].as_str_expanded()} - please be explicit '
122+
'in the ServiceXDataset constructor')
123+
else:
124+
log.warning(f"No '{backend_type}' backend type found, "
125+
f'using {ep["endpoint"].as_str_expanded()} - please add to '
126+
'the .servicex file')
127+
return extract_info(ep)
128+
129+
if backend_type is not None:
130+
# They have a labeled backend, and all the end-points are labeled. So that means
131+
# there really is not match. So throw!
132+
seen_types = [str(ep['type'].as_str_expanded()) for ep in endpoints
133+
if ep['type'].exists()]
134+
raise ServiceXException(f'Unable to find type {backend_type} '
135+
f'in configuration. Saw: {", ".join(seen_types)}')
136+
137+
# Nope - now we are going to have to just use the first one there.
138+
for ep in endpoints:
139+
log.warning('No backend type requested, '
140+
f'using {ep["endpoint"].as_str_expanded()} - please be explicit '
141+
'in the ServiceXDataset constructor')
142+
return extract_info(ep)
143+
144+
# If we are here - then... it just isn't going to work!
145+
raise ServiceXException('Not even a default set of configurations are here! Bad install '
146+
' of the servicex package!')

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ def good_pandas_file_data(mocker):
176176

177177
converter = asyncmock.MagicMock(spec=DataConverterAdaptor)
178178
converter.convert_to_pandas.return_value = pd.DataFrame({'JetPt': [0, 1, 2, 3, 4, 5]})
179+
converter.combine_pandas.return_value = converter.convert_to_pandas.return_value
179180

180181
return converter
181182

@@ -186,6 +187,7 @@ def good_awkward_file_data(mocker):
186187

187188
converter = asyncmock.MagicMock(spec=DataConverterAdaptor)
188189
converter.convert_to_awkward.return_value = {'JetPt': awk.fromiter([0, 1, 2, 3, 4, 5])}
190+
converter.combine_awkward.return_value = converter.convert_to_awkward.return_value
189191

190192
return converter
191193

0 commit comments

Comments
 (0)