Skip to content

Add parquet functionalities to to_awk #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ include = [
test = [
"pytest>=7.2.0",
"numpy>=1.21",
"pyarrow>=8.0.0",
"pandas"
]

[tool.hatch.build.targets.wheel]
Expand Down
35 changes: 27 additions & 8 deletions servicex_analysis_utils/materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@
import dask_awkward as dak
import logging

def to_awk(deliver_dict, dask=False, **uproot_kwargs):
def to_awk(deliver_dict, dask=False, **kwargs):
"""
Load an awkward array from the deliver() output with uproot or uproot.dask.

Parameters:
deliver_dict (dict): Returned dictionary from servicex.deliver()
(keys are sample names, values are file paths or URLs).
dask (bool): Optional. Flag to load as dask-awkward array. Default is False
**uproot_kwargs : Optional. Additional keyword arguments passed to uproot.dask or uproot.iterate
**kwargs : Optional. Additional keyword arguments passed to uproot.dask, uproot.iterate and from_parquet


Returns:
Expand All @@ -48,15 +48,34 @@ def to_awk(deliver_dict, dask=False, **uproot_kwargs):
awk_arrays = {}

for sample, paths in deliver_dict.items():
#Check file type
f_type=str(paths[0])
if f_type.endswith(".root"):
is_root=True
elif f_type.endswith(".parquet") or f_type.endswith(".pq"):
is_root=False
# ServiceX supports only root/parquet in transformed files
else:
raise ValueError(f"Unsupported file format: '{paths[0]}'. Files must be ROOT (.root) or Parquet (.parquet, .pq)")

try:
if dask:
# Use uproot.dask to handle URLs and local paths lazily
awk_arrays[sample] = uproot.dask(paths, library="ak", **uproot_kwargs)
if is_root==True:
# Use uproot.dask to handle URLs and local paths lazily
awk_arrays[sample] = uproot.dask(paths, library="ak", **kwargs)
else:
#file is parquet
awk_arrays[sample] = dak.from_parquet(paths, **kwargs)
else:
# Use uproot.iterate to handle URLs and local paths files in chunks
tmp_arrays = list(uproot.iterate(paths, library="ak", **uproot_kwargs))
# Merge arrays
awk_arrays[sample] = ak.concatenate(tmp_arrays)
if is_root==True:
# Use uproot.iterate to handle URLs and local paths files in chunks
tmp_arrays = list(uproot.iterate(paths, library="ak", **kwargs))
# Merge arrays
awk_arrays[sample] = ak.concatenate(tmp_arrays)
else:
#file is parquet
awk_arrays[sample] = ak.from_parquet(paths, **kwargs)


except Exception as e:
# Log the exception pointing at the user's code
Expand Down
16 changes: 12 additions & 4 deletions tests/test_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
@pytest.fixture
def build_test_samples(tmp_path):

test_path1 = tmp_path / "test_file1.root"
test_path2 = tmp_path / "test_file2.root"
test_path1 = str(tmp_path / "test_file1.root")
test_path2 = str(tmp_path / "test_file2.root")
# example data for two branches
tree_data1 = {
"branch1": np.ones(100),
Expand All @@ -57,13 +57,13 @@ def build_test_samples(tmp_path):
file["Tree"] = tree_data2

#Dict like servicex.deliver() output
sx_dict = {"Test-Sample1": test_path1, "Test-Sample2": test_path2}
sx_dict = {"Test-Sample1": [test_path1], "Test-Sample2": [test_path2]}

return sx_dict


#Test functions
def test_to_awk_collection(build_test_samples):
def test_to_awk(build_test_samples):
sx_dict = build_test_samples
result = to_awk(sx_dict) #uproot.iterate expressions kwarg

Expand Down Expand Up @@ -115,6 +115,14 @@ def test_to_awk_dask(build_test_samples):
assert ak.all(arr2['branch1'].compute() == ak.from_numpy(np.ones(10)))


def test_unsupported_file_format():
fake_paths = {"fake-Sample": ["invalid_file.txt"]}
# match is regex-level
with pytest.raises(ValueError, match=r"Unsupported file format: 'invalid_file.txt'\. Files must be ROOT \(.*\) or Parquet \(.*\)"):
to_awk(fake_paths)






Expand Down
90 changes: 90 additions & 0 deletions tests/test_materialization_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright (c) 2025, IRIS-HEP
# All rights reserved.
import pytest
import awkward as ak
import dask_awkward as dak
import logging
import numpy as np
import pyarrow.parquet as pq
from servicex_analysis_utils.materialization import to_awk


@pytest.fixture
def build_test_samples(tmp_path):
"""
Creates two Parquet files with sample data for testing.
"""
test_path1 = str(tmp_path / "test_file1.parquet")
test_path2 = str(tmp_path / "test_file2.parquet")

# Example data for two branches
data1 = ak.Array({
"branch1": np.ones(100),
"branch2": np.zeros(100)
})

# Example data for one branch
data2 = ak.Array({
"branch1": np.ones(10)
})

# Write to Parquet files
ak.to_parquet(data1, test_path1, row_group_size=10) #partions
ak.to_parquet(data2, test_path2)

# Dict simulating servicex.deliver() output
sx_dict = {"Test-Sample1": [test_path1], "Test-Sample2": [test_path2]}

return sx_dict


# Test function for to_awk with Parquet files
def test_to_awk_parquet(build_test_samples):
sx_dict = build_test_samples
result = to_awk(sx_dict) # Using ak.from_parquet internally

# Collecting all samples
assert list(result.keys()) == ["Test-Sample1", "Test-Sample2"]
arr1 = result["Test-Sample1"]
arr2 = result["Test-Sample2"]

# Collecting all branches
assert ak.fields(arr1) == ['branch1', 'branch2']
assert ak.fields(arr2) == ['branch1']

assert isinstance(arr1, ak.Array), "to_awk() does not produce an awkward.Array instance"
assert isinstance(arr2, ak.Array), "to_awk() does not produce an awkward.Array instance"

# Collecting all elements per branch
assert ak.all(arr1['branch2'] == ak.from_numpy(np.zeros(100)))
assert ak.all(arr2['branch1'] == ak.from_numpy(np.ones(10)))

# Checking kwargs
result_filtered = to_awk(sx_dict, columns="branch1")
arr1_filtered = result_filtered["Test-Sample1"]
assert ak.fields(arr1_filtered) == ['branch1'] # branch2 should be filtered out

def test_to_awk_dask_parquet(build_test_samples):
sx_dict = build_test_samples
result_da = to_awk(sx_dict, dask=True, split_row_groups=True) #split in partitions

#Collecting all samples
assert list(result_da.keys())==["Test-Sample1", "Test-Sample2"]
arr1=result_da["Test-Sample1"]
arr2=result_da["Test-Sample2"]

#Checking instance
assert isinstance(arr1, dak.Array), "to_awk(dask=True) does not produce an dak.Array instance"
assert isinstance(arr2, dak.Array), "to_awk(dask=True) does not produce an dak.Array instance"

#Testing partitionning kwarg
assert arr1.npartitions == 10
assert arr2.npartitions == 1

#Collecting all branches
assert ak.fields(arr1) == ['branch1', 'branch2']
assert ak.fields(arr2) == ['branch1']

#Collecting all elements per branch
assert ak.all(arr1['branch2'].compute() == ak.from_numpy(np.zeros(100)))
assert ak.all(arr2['branch1'].compute() == ak.from_numpy(np.ones(10)))