Skip to content

Commit 8927112

Browse files
committed
Merge branch 'master' into 3.0_develop
2 parents e681796 + 3fc54a8 commit 8927112

File tree

8 files changed

+1978
-148
lines changed

8 files changed

+1978
-148
lines changed

.github/workflows/ci-production.yaml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
name: Production Service Testing
2+
3+
on:
4+
schedule:
5+
- cron: '0 */6 * * *'
6+
workflow_dispatch:
7+
8+
jobs:
9+
test:
10+
runs-on: ubuntu-latest
11+
environment: production-service
12+
13+
steps:
14+
- uses: actions/checkout@v3
15+
with:
16+
ref: 3.0_develop
17+
18+
- name: Set up Python 3.12
19+
uses: actions/setup-python@v4
20+
with:
21+
python-version: 3.12
22+
23+
- name: Save secret
24+
run: 'echo "$SERVICEX_YAML" > servicex.yaml'
25+
shell: bash
26+
env:
27+
SERVICEX_YAML: ${{ secrets.SERVICEX_YAML }}
28+
29+
- name: Install package
30+
run: |
31+
python -m pip install --upgrade uv
32+
uv pip install --system --upgrade pip setuptools wheel
33+
uv pip install --system --upgrade '.[databinder,pandas,test]'
34+
uv pip list --system
35+
36+
- name: Run example
37+
working-directory: examples
38+
run: python3 UprootRaw_Dict.py
39+

.github/workflows/ci_production.yaml

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +0,0 @@
1-
name: Production Service Testing
2-
3-
on:
4-
schedule:
5-
- cron: '0 */6 * * *'
6-
workflow_dispatch:
7-
8-
jobs:
9-
test:
10-
runs-on: ubuntu-latest
11-
environment: production-service
12-
13-
steps:
14-
- uses: actions/checkout@v3
15-
16-
- name: Set up Python 3.12
17-
uses: actions/setup-python@v4
18-
with:
19-
python-version: 3.12
20-
21-
- name: Save secret
22-
run: 'echo "$SERVICEX_YAML" > servicex.yaml'
23-
shell: bash
24-
env:
25-
SERVICEX_YAML: ${{ secrets.SERVICEX_YAML }}
26-
27-
- name: Install uv
28-
uses: astral-sh/setup-uv@v3
29-
30-
- name: Install package
31-
run: |
32-
uv pip install --system --upgrade '.[test]'
33-
uv pip list --system
34-
35-
- name: Run example
36-
working-directory: examples
37-
run: python3 Uproot_UprootRaw_Dict.py
38-

.vscode/settings.json

Lines changed: 0 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,110 +0,0 @@
1-
{
2-
"python.analysis.logLevel": "Information",
3-
"cSpell.words": [
4-
"accesskey",
5-
"aenter",
6-
"aexit",
7-
"aiohttp",
8-
"AOD's",
9-
"asyncio",
10-
"atlasxaod",
11-
"awks",
12-
"AZNLOCTEQ",
13-
"backend's",
14-
"backends",
15-
"backoff",
16-
"bortles",
17-
"cacheme",
18-
"caplog",
19-
"cernopendata",
20-
"codecov",
21-
"codegen",
22-
"Comming",
23-
"coveragerc",
24-
"DAOD",
25-
"dcache",
26-
"desy",
27-
"dont",
28-
"fget",
29-
"fname",
30-
"forkingshirtballs",
31-
"forkit",
32-
"getenv",
33-
"gitlab",
34-
"giveup",
35-
"idna",
36-
"inmem",
37-
"ipywidgets",
38-
"isabstractmethod",
39-
"iscoroutinefunction",
40-
"jupyter",
41-
"jupyterlab",
42-
"leftfoot",
43-
"linq",
44-
"localds",
45-
"mcrn",
46-
"minio",
47-
"Minio",
48-
"miniouser",
49-
"mino",
50-
"myfile",
51-
"nargs",
52-
"ncols",
53-
"ndarray",
54-
"NOQA",
55-
"nqueries",
56-
"ntuples",
57-
"numpy",
58-
"opendata",
59-
"pathlib",
60-
"PHYSLITE",
61-
"pnfs",
62-
"posix",
63-
"Powheg",
64-
"prereleased",
65-
"protomolecule",
66-
"ptetaphi",
67-
"pyarrow",
68-
"pypa",
69-
"pypi",
70-
"pytest",
71-
"qastle",
72-
"qsize",
73-
"Reconstructor",
74-
"redownload",
75-
"rootfiles",
76-
"rucio",
77-
"sdist",
78-
"secretkey",
79-
"servicex",
80-
"servicexabc",
81-
"Servivce",
82-
"setuptools",
83-
"slateci",
84-
"sslhep",
85-
"STDM",
86-
"stfc",
87-
"SXPASS",
88-
"SXTOKEN",
89-
"SXTYPE",
90-
"SXUSER",
91-
"tcut",
92-
"thegoodplace",
93-
"thishashdoesnotexist",
94-
"Topo",
95-
"tqdm",
96-
"unittests",
97-
"URL's",
98-
"xaod",
99-
"xaodr",
100-
"xrootd"
101-
],
102-
"python.analysis.typeCheckingMode": "basic",
103-
"python.testing.pytestArgs": [
104-
"--cov=servicex",
105-
"tests"
106-
],
107-
"python.testing.unittestEnabled": false,
108-
"python.testing.pytestEnabled": true,
109-
"editor.formatOnSave": false,
110-
}

servicex/data_conversions.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
import asyncio
2+
from concurrent.futures import ThreadPoolExecutor
3+
from pathlib import Path
4+
from typing import Iterable, Optional
5+
6+
import pandas as pd
7+
import awkward as ak
8+
9+
from .utils import ServiceXException
10+
11+
_conversion_pool = ThreadPoolExecutor(4)
12+
13+
14+
class DataConverterAdaptor:
15+
"""Methods to convert from one type of data to the other."""
16+
17+
def __init__(self, default_file_type: str):
18+
"""Create a data converter adaptor. By default it will do the
19+
conversation as requested.
20+
21+
Args:
22+
default_file_type (str): The default file type (`parquet` or `root`)
23+
"""
24+
self._default_file_type = default_file_type
25+
26+
async def convert_to_pandas(self, file: Path, file_type: Optional[str] = None):
27+
"""Convert to a pandas DataFrame from data stored in a file of a particular file_type
28+
29+
Args:
30+
file (Path): Path to the file
31+
file_type (str): What the file contains (root, parquet, etc)
32+
"""
33+
file_type = file_type if file_type is not None else self._default_file_type
34+
if file_type == "root-file":
35+
return await self._convert_root_to_pandas(file)
36+
elif file_type == "parquet":
37+
return await self._convert_parquet_to_pandas(file)
38+
else:
39+
raise ServiceXException(
40+
f"Conversion from {file_type} into an pandas DF is not " "yet supported"
41+
)
42+
43+
async def convert_to_awkward(self, file: Path, file_type: Optional[str] = None):
44+
"""Convert to an awkward data array from data stored in a file of a particular file_type
45+
46+
Args:
47+
file (Path): Path to the file
48+
file_type (str): What the file contains (root, parquet, etc)
49+
"""
50+
file_type = file_type if file_type is not None else self._default_file_type
51+
if file_type == "root-file":
52+
return await self._convert_root_to_awkward(file)
53+
elif file_type == "parquet":
54+
return await self._convert_parquet_to_awkward(file)
55+
else:
56+
raise ServiceXException(
57+
f"Conversion from {file_type} into an awkward array is not "
58+
"yet supported"
59+
)
60+
61+
def combine_pandas(self, dfs: Iterable[pd.DataFrame]) -> pd.DataFrame:
62+
"""Combine many pandas DataFrame into a single one, in order.
63+
64+
Args:
65+
dfs (Iterable[pd.DataFrame]): The list of DataFrames
66+
"""
67+
return pd.concat(dfs)
68+
69+
def combine_awkward(self, awks: Iterable[ak.Array]) -> ak.Array:
70+
"""Combine many awkward arrays into a single one, in order.
71+
72+
Args:
73+
awks (Iterable[ChunkedArray]): The input list of awkward arrays
74+
"""
75+
return ak.concatenate(awks) # type: ignore
76+
77+
async def _convert_root_to_pandas(self, file: Path):
78+
"""
79+
Convert the contents of a ROOT file to pandas.
80+
81+
Arguments:
82+
83+
file A `Path` to the file containing the pandas data
84+
85+
Returns:
86+
87+
DataFrame A pandas DataFrame
88+
89+
Note:
90+
91+
- Work is done on a second thread.
92+
- Pandas is only imported if this is called.
93+
94+
"""
95+
from pandas import DataFrame
96+
97+
def do_the_work(file: Path) -> DataFrame:
98+
import uproot as uproot
99+
100+
with uproot.open(file) as f_in: # type: ignore
101+
r = f_in[f_in.keys()[0]]
102+
return r.arrays(library="pd") # type: ignore
103+
104+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
105+
106+
async def _convert_parquet_to_pandas(self, file: Path):
107+
"""
108+
Convert the contents of a parquet file to pandas.
109+
110+
Arguments:
111+
112+
file A `Path` to the file containing the pandas data
113+
114+
Returns:
115+
116+
DataFrame A pandas DataFrame
117+
118+
Note:
119+
120+
- Work is done on a second thread.
121+
- Pandas is only imported if this is called.
122+
123+
"""
124+
import pandas as pd
125+
126+
def do_the_work(file: Path) -> pd.DataFrame:
127+
return pd.read_parquet(str(file))
128+
129+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
130+
131+
async def _convert_root_to_awkward(self, file: Path):
132+
"""
133+
Convert the contents of a ROOT file to an awkward dictionary.
134+
135+
Arguments:
136+
137+
file A `Path` to the file containing the pandas data
138+
139+
Returns:
140+
141+
DataFrame A pandas DataFrame
142+
143+
Note:
144+
145+
- Work is done on a second thread.
146+
- Awkward is only imported if this is called.
147+
- A LazyArray is returned, so it isn't completely loaded into memory. That also means
148+
this will leak file handles - as that has to be left open.
149+
150+
"""
151+
152+
def do_the_work(file: Path) -> ak.Array:
153+
import uproot as uproot
154+
155+
with uproot.open(file) as f_in: # type: ignore
156+
tree_name = f_in.keys()[0]
157+
# If `tree_name` ends with a ";NN", (where NN is a number
158+
# left over from VMS file versioning!), remove everything from the
159+
# ';' on
160+
tree_name = tree_name.split(";")[0]
161+
162+
if hasattr(uproot, "lazy"):
163+
return uproot.lazy(f"{file}:{tree_name}") # type: ignore
164+
165+
if hasattr(uproot, "dask"):
166+
return uproot.dask({str(file): tree_name}) # type: ignore
167+
168+
assert (
169+
False
170+
), "Uproot version does not have either `dask` or `lazy` - please fix environment!"
171+
172+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
173+
174+
async def _convert_parquet_to_awkward(self, file: Path):
175+
"""
176+
Convert the contents of a parquet file to an awkward dictionary.
177+
178+
Arguments:
179+
180+
file A `Path` to the file containing the pandas data
181+
182+
Returns:
183+
184+
DataFrame A pandas DataFrame
185+
186+
Note:
187+
188+
- Work is done on a second thread.
189+
- Pandas is only imported if this is called.
190+
191+
"""
192+
193+
def do_the_work(file: Path) -> ak.Array:
194+
# TODO: When we move to awkward1, make sure this becomes lazy
195+
return ak.from_parquet(str(file)) # type: ignore
196+
197+
return await asyncio.wrap_future(_conversion_pool.submit(do_the_work, file))
198+

0 commit comments

Comments
 (0)