Skip to content

Commit 6a05c64

Browse files
authored
Merge pull request #1 from ArturU043/testing
Add parquet functionalities to to_awk
2 parents df59038 + 842d8de commit 6a05c64

File tree

4 files changed

+131
-12
lines changed

4 files changed

+131
-12
lines changed

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ include = [
3131
test = [
3232
"pytest>=7.2.0",
3333
"numpy>=1.21",
34+
"pyarrow>=8.0.0",
35+
"pandas"
3436
]
3537

3638
[tool.hatch.build.targets.wheel]

servicex_analysis_utils/materialization.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@
3030
import dask_awkward as dak
3131
import logging
3232

33-
def to_awk(deliver_dict, dask=False, **uproot_kwargs):
33+
def to_awk(deliver_dict, dask=False, **kwargs):
3434
"""
3535
Load an awkward array from the deliver() output with uproot or uproot.dask.
3636
3737
Parameters:
3838
deliver_dict (dict): Returned dictionary from servicex.deliver()
3939
(keys are sample names, values are file paths or URLs).
4040
dask (bool): Optional. Flag to load as dask-awkward array. Default is False
41-
**uproot_kwargs : Optional. Additional keyword arguments passed to uproot.dask or uproot.iterate
41+
**kwargs : Optional. Additional keyword arguments passed to uproot.dask, uproot.iterate and from_parquet
4242
4343
4444
Returns:
@@ -48,15 +48,34 @@ def to_awk(deliver_dict, dask=False, **uproot_kwargs):
4848
awk_arrays = {}
4949

5050
for sample, paths in deliver_dict.items():
51+
#Check file type
52+
f_type=str(paths[0])
53+
if f_type.endswith(".root"):
54+
is_root=True
55+
elif f_type.endswith(".parquet") or f_type.endswith(".pq"):
56+
is_root=False
57+
# ServiceX supports only root/parquet in transformed files
58+
else:
59+
raise ValueError(f"Unsupported file format: '{paths[0]}'. Files must be ROOT (.root) or Parquet (.parquet, .pq)")
60+
5161
try:
5262
if dask:
53-
# Use uproot.dask to handle URLs and local paths lazily
54-
awk_arrays[sample] = uproot.dask(paths, library="ak", **uproot_kwargs)
63+
if is_root==True:
64+
# Use uproot.dask to handle URLs and local paths lazily
65+
awk_arrays[sample] = uproot.dask(paths, library="ak", **kwargs)
66+
else:
67+
#file is parquet
68+
awk_arrays[sample] = dak.from_parquet(paths, **kwargs)
5569
else:
56-
# Use uproot.iterate to handle URLs and local paths files in chunks
57-
tmp_arrays = list(uproot.iterate(paths, library="ak", **uproot_kwargs))
58-
# Merge arrays
59-
awk_arrays[sample] = ak.concatenate(tmp_arrays)
70+
if is_root==True:
71+
# Use uproot.iterate to handle URLs and local paths files in chunks
72+
tmp_arrays = list(uproot.iterate(paths, library="ak", **kwargs))
73+
# Merge arrays
74+
awk_arrays[sample] = ak.concatenate(tmp_arrays)
75+
else:
76+
#file is parquet
77+
awk_arrays[sample] = ak.from_parquet(paths, **kwargs)
78+
6079

6180
except Exception as e:
6281
# Log the exception pointing at the user's code

tests/test_materialization.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
@pytest.fixture
4040
def build_test_samples(tmp_path):
4141

42-
test_path1 = tmp_path / "test_file1.root"
43-
test_path2 = tmp_path / "test_file2.root"
42+
test_path1 = str(tmp_path / "test_file1.root")
43+
test_path2 = str(tmp_path / "test_file2.root")
4444
# example data for two branches
4545
tree_data1 = {
4646
"branch1": np.ones(100),
@@ -57,13 +57,13 @@ def build_test_samples(tmp_path):
5757
file["Tree"] = tree_data2
5858

5959
#Dict like servicex.deliver() output
60-
sx_dict = {"Test-Sample1": test_path1, "Test-Sample2": test_path2}
60+
sx_dict = {"Test-Sample1": [test_path1], "Test-Sample2": [test_path2]}
6161

6262
return sx_dict
6363

6464

6565
#Test functions
66-
def test_to_awk_collection(build_test_samples):
66+
def test_to_awk(build_test_samples):
6767
sx_dict = build_test_samples
6868
result = to_awk(sx_dict) #uproot.iterate expressions kwarg
6969

@@ -115,6 +115,14 @@ def test_to_awk_dask(build_test_samples):
115115
assert ak.all(arr2['branch1'].compute() == ak.from_numpy(np.ones(10)))
116116

117117

118+
def test_unsupported_file_format():
119+
fake_paths = {"fake-Sample": ["invalid_file.txt"]}
120+
# match is regex-level
121+
with pytest.raises(ValueError, match=r"Unsupported file format: 'invalid_file.txt'\. Files must be ROOT \(.*\) or Parquet \(.*\)"):
122+
to_awk(fake_paths)
123+
124+
125+
118126

119127

120128

tests/test_materialization_parquet.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Copyright (c) 2025, IRIS-HEP
2+
# All rights reserved.
3+
import pytest
4+
import awkward as ak
5+
import dask_awkward as dak
6+
import logging
7+
import numpy as np
8+
import pyarrow.parquet as pq
9+
from servicex_analysis_utils.materialization import to_awk
10+
11+
12+
@pytest.fixture
13+
def build_test_samples(tmp_path):
14+
"""
15+
Creates two Parquet files with sample data for testing.
16+
"""
17+
test_path1 = str(tmp_path / "test_file1.parquet")
18+
test_path2 = str(tmp_path / "test_file2.parquet")
19+
20+
# Example data for two branches
21+
data1 = ak.Array({
22+
"branch1": np.ones(100),
23+
"branch2": np.zeros(100)
24+
})
25+
26+
# Example data for one branch
27+
data2 = ak.Array({
28+
"branch1": np.ones(10)
29+
})
30+
31+
# Write to Parquet files
32+
ak.to_parquet(data1, test_path1, row_group_size=10) #partions
33+
ak.to_parquet(data2, test_path2)
34+
35+
# Dict simulating servicex.deliver() output
36+
sx_dict = {"Test-Sample1": [test_path1], "Test-Sample2": [test_path2]}
37+
38+
return sx_dict
39+
40+
41+
# Test function for to_awk with Parquet files
42+
def test_to_awk_parquet(build_test_samples):
43+
sx_dict = build_test_samples
44+
result = to_awk(sx_dict) # Using ak.from_parquet internally
45+
46+
# Collecting all samples
47+
assert list(result.keys()) == ["Test-Sample1", "Test-Sample2"]
48+
arr1 = result["Test-Sample1"]
49+
arr2 = result["Test-Sample2"]
50+
51+
# Collecting all branches
52+
assert ak.fields(arr1) == ['branch1', 'branch2']
53+
assert ak.fields(arr2) == ['branch1']
54+
55+
assert isinstance(arr1, ak.Array), "to_awk() does not produce an awkward.Array instance"
56+
assert isinstance(arr2, ak.Array), "to_awk() does not produce an awkward.Array instance"
57+
58+
# Collecting all elements per branch
59+
assert ak.all(arr1['branch2'] == ak.from_numpy(np.zeros(100)))
60+
assert ak.all(arr2['branch1'] == ak.from_numpy(np.ones(10)))
61+
62+
# Checking kwargs
63+
result_filtered = to_awk(sx_dict, columns="branch1")
64+
arr1_filtered = result_filtered["Test-Sample1"]
65+
assert ak.fields(arr1_filtered) == ['branch1'] # branch2 should be filtered out
66+
67+
def test_to_awk_dask_parquet(build_test_samples):
68+
sx_dict = build_test_samples
69+
result_da = to_awk(sx_dict, dask=True, split_row_groups=True) #split in partitions
70+
71+
#Collecting all samples
72+
assert list(result_da.keys())==["Test-Sample1", "Test-Sample2"]
73+
arr1=result_da["Test-Sample1"]
74+
arr2=result_da["Test-Sample2"]
75+
76+
#Checking instance
77+
assert isinstance(arr1, dak.Array), "to_awk(dask=True) does not produce an dak.Array instance"
78+
assert isinstance(arr2, dak.Array), "to_awk(dask=True) does not produce an dak.Array instance"
79+
80+
#Testing partitionning kwarg
81+
assert arr1.npartitions == 10
82+
assert arr2.npartitions == 1
83+
84+
#Collecting all branches
85+
assert ak.fields(arr1) == ['branch1', 'branch2']
86+
assert ak.fields(arr2) == ['branch1']
87+
88+
#Collecting all elements per branch
89+
assert ak.all(arr1['branch2'].compute() == ak.from_numpy(np.zeros(100)))
90+
assert ak.all(arr2['branch1'].compute() == ak.from_numpy(np.ones(10)))

0 commit comments

Comments
 (0)