Skip to content

Commit 6108a5b

Browse files
committed
Combine awkward and panads separately
Not 100% sure this is going to work, but lets see! Fixes #116
1 parent cdad05f commit 6108a5b

File tree

6 files changed

+104
-20
lines changed

6 files changed

+104
-20
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"aexit",
2626
"aiohttp",
2727
"asyncio",
28+
"awks",
2829
"backends",
2930
"cacheme",
3031
"codecov",

servicex/data_conversions.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
from pathlib import Path
2-
from concurrent.futures import ThreadPoolExecutor
31
import asyncio
4-
from servicex.utils import ServiceXException
5-
from typing import Dict, Optional, Union
6-
2+
from concurrent.futures import ThreadPoolExecutor
3+
from pathlib import Path
4+
from typing import Dict, Iterable, Optional, Union
75
from awkward.array.chunked import ChunkedArray
6+
from awkward.array.table import Table
7+
8+
import pandas as pd
9+
import awkward as ak
10+
11+
from servicex.utils import ServiceXException
812

913
_conversion_pool = ThreadPoolExecutor(4)
1014

@@ -53,6 +57,22 @@ async def convert_to_awkward(self, file: Path, file_type: Optional[str] = None):
5357
raise ServiceXException(f'Conversion from {file_type} into an awkward array is not '
5458
'yet supported')
5559

60+
def combine_pandas(self, dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
61+
'''Combine many pandas dataframes into a single one, in order.
62+
63+
Args:
64+
dfs (Iterable[pd.DataFrame]): The list of dataframes
65+
'''
66+
return pd.concat(dfs)
67+
68+
def combine_awkward(self, awks: Iterable[Union[Table, ChunkedArray]]) -> Table:
69+
'''Combine many awkward arrays into a single one, in order.
70+
71+
Args:
72+
awks (Iterable[ChunkedArray]): The input list of awkward arrays
73+
'''
74+
return ak.concatenate(awks)
75+
5676
async def _convert_root_to_pandas(self, file: Path):
5777
'''
5878
Convert the contents of a ROOT file to pandas.

servicex/servicex.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,14 @@ async def get_data_parquet_async(self, selection_query: str) -> List[Path]:
140140
@functools.wraps(ServiceXABC.get_data_pandas_df_async, updated=())
141141
@_wrap_in_memory_sx_cache
142142
async def get_data_pandas_df_async(self, selection_query: str):
143-
import pandas as pd
144-
return pd.concat(await self._data_return(
143+
return self._converter.combine_pandas(await self._data_return(
145144
selection_query, lambda f: self._converter.convert_to_pandas(f)))
146145

147146
@functools.wraps(ServiceXABC.get_data_awkward_async, updated=())
148147
@_wrap_in_memory_sx_cache
149148
async def get_data_awkward_async(self, selection_query: str):
150-
import awkward
151-
all_data = await self._data_return(
152-
selection_query, lambda f: self._converter.convert_to_awkward(f))
153-
col_names = all_data[0].keys()
154-
return {c: awkward.concatenate([ar[c] for ar in all_data]) for c in col_names}
149+
return self._converter.combine_awkward(await self._data_return(
150+
selection_query, lambda f: self._converter.convert_to_awkward(f)))
155151

156152
async def _file_return(self, selection_query: str, data_format: str):
157153
'''

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ def good_pandas_file_data(mocker):
176176

177177
converter = asyncmock.MagicMock(spec=DataConverterAdaptor)
178178
converter.convert_to_pandas.return_value = pd.DataFrame({'JetPt': [0, 1, 2, 3, 4, 5]})
179+
converter.combine_pandas.return_value = converter.convert_to_pandas.return_value
179180

180181
return converter
181182

@@ -186,6 +187,7 @@ def good_awkward_file_data(mocker):
186187

187188
converter = asyncmock.MagicMock(spec=DataConverterAdaptor)
188189
converter.convert_to_awkward.return_value = {'JetPt': awk.fromiter([0, 1, 2, 3, 4, 5])}
190+
converter.combine_awkward.return_value = converter.convert_to_awkward.return_value
189191

190192
return converter
191193

tests/test_data_conversions.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,63 @@ async def test_to_awkward_fail(good_root_file_path):
5454
async def test_to_panads_fail(good_root_file_path):
5555
with pytest.raises(ServiceXException):
5656
await DataConverterAdaptor('root').convert_to_pandas(good_root_file_path, 'notreally')
57+
58+
59+
def test_combine_pandas_from_root(good_root_file_path):
60+
'Load a dataframe from root files and make sure that they work when we ask them to combine'
61+
def load_df():
62+
import uproot
63+
with uproot.open(good_root_file_path) as f_in:
64+
df = f_in[f_in.keys()[0]].pandas.df() # type: ignore
65+
return df
66+
67+
df1 = load_df()
68+
df2 = load_df()
69+
70+
combined = DataConverterAdaptor('root').combine_pandas([df1, df2])
71+
72+
assert len(combined) == len(df1) + len(df2)
73+
74+
75+
def test_combine_pandas_from_parquet(good_uproot_file_path):
76+
'Load a dataframe from a parquet file and make sure they work when we ask them to combine'
77+
def load_df():
78+
import pandas as pd
79+
return pd.read_parquet(good_uproot_file_path)
80+
81+
df1 = load_df()
82+
df2 = load_df()
83+
84+
combined = DataConverterAdaptor('root').combine_pandas([df1, df2])
85+
86+
assert len(combined) == len(df1) + len(df2)
87+
88+
89+
def test_combine_awkward_from_root(good_root_file_path):
90+
'Load a dataframe from root files and make sure that they work when we ask them to combine'
91+
def load_df():
92+
import uproot
93+
with uproot.open(good_root_file_path) as f_in:
94+
df = f_in[f_in.keys()[0]].lazyarrays() # type: ignore
95+
return df
96+
97+
df1 = load_df()
98+
df2 = load_df()
99+
100+
combined = DataConverterAdaptor('root').combine_awkward([df1, df2])
101+
102+
assert len(combined) == len(df1) + len(df2)
103+
104+
105+
def test_combine_awkward_from_parquet(good_uproot_file_path):
106+
'Load a dataframe from a parquet file and make sure they work when we ask them to combine'
107+
def load_df():
108+
import awkward as ak
109+
return ak.fromparquet(good_uproot_file_path)
110+
111+
df1 = load_df()
112+
df2 = load_df()
113+
114+
combined = DataConverterAdaptor('root').combine_awkward([df1, df2])
115+
116+
assert len(combined) == len(df1) + len(df2)

tests/test_servicex.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,10 @@ async def test_good_run_single_ds_1file_pandas(mocker, good_pandas_file_data):
262262
assert isinstance(r, pd.DataFrame)
263263
assert len(r) == 6
264264

265+
good_pandas_file_data.combine_pandas.assert_called_once()
266+
good_pandas_file_data.convert_to_pandas.assert_called_once()
267+
assert len(good_pandas_file_data.combine_pandas.call_args[0][0]) == 1
268+
265269

266270
@pytest.mark.asyncio
267271
async def test_good_run_single_ds_1file_awkward(mocker, good_awkward_file_data):
@@ -283,6 +287,10 @@ async def test_good_run_single_ds_1file_awkward(mocker, good_awkward_file_data):
283287
assert 'JetPt' in r
284288
assert len(r['JetPt']) == 6
285289

290+
good_awkward_file_data.combine_awkward.assert_called_once()
291+
good_awkward_file_data.convert_to_awkward.assert_called_once()
292+
assert len(good_awkward_file_data.combine_awkward.call_args[0][0]) == 1
293+
286294

287295
@pytest.mark.asyncio
288296
async def test_good_run_single_ds_2file_pandas(mocker, good_pandas_file_data):
@@ -298,9 +306,9 @@ async def test_good_run_single_ds_2file_pandas(mocker, good_pandas_file_data):
298306
cache_adaptor=mock_cache,
299307
data_convert_adaptor=good_pandas_file_data,
300308
local_log=mock_logger)
301-
r = await ds.get_data_pandas_df_async('(valid qastle string)')
302-
assert isinstance(r, pd.DataFrame)
303-
assert len(r) == 6 * 2
309+
await ds.get_data_pandas_df_async('(valid qastle string)')
310+
good_pandas_file_data.combine_pandas.assert_called_once()
311+
assert len(good_pandas_file_data.combine_pandas.call_args[0][0]) == 2
304312

305313

306314
@pytest.mark.asyncio
@@ -317,11 +325,8 @@ async def test_good_run_single_ds_2file_awkward(mocker, good_awkward_file_data):
317325
cache_adaptor=mock_cache,
318326
data_convert_adaptor=good_awkward_file_data,
319327
local_log=mock_logger)
320-
r = await ds.get_data_awkward_async('(valid qastle string)')
321-
assert isinstance(r, dict)
322-
assert len(r) == 1
323-
assert 'JetPt' in r
324-
assert len(r['JetPt']) == 6 * 2
328+
await ds.get_data_awkward_async('(valid qastle string)')
329+
assert len(good_awkward_file_data.combine_awkward.call_args[0][0]) == 2
325330

326331

327332
@pytest.mark.asyncio

0 commit comments

Comments
 (0)