Skip to content

Commit 8f7f027

Browse files
author
acordeir
committed
parquet features
1 parent 6ccfc7d commit 8f7f027

File tree

1 file changed

+23
-6
lines changed

1 file changed

+23
-6
lines changed

servicex_analysis_utils/materialization.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,32 @@ def to_awk(deliver_dict, dask=False, **uproot_kwargs):
4848
awk_arrays = {}
4949

5050
for sample, paths in deliver_dict.items():
51+
#Check file type
52+
if paths[0].endswith(".root"):
53+
is_root=True
54+
elif paths[0].endswith(".parquet") or paths[0].endswith(".pq"):
55+
is_root=False
56+
# ServiceX supports only root/parquet in transformed files
57+
else:
58+
raise ValueError(f"Unsupported file format: '{paths[0]}'. Files must be ROOT (.root) or Parquet (.parquet, .pq)")
59+
5160
try:
5261
if dask:
53-
# Use uproot.dask to handle URLs and local paths lazily
54-
awk_arrays[sample] = uproot.dask(paths, library="ak", **uproot_kwargs)
62+
if is_root==True:
63+
# Use uproot.dask to handle URLs and local paths lazily
64+
awk_arrays[sample] = uproot.dask(paths, library="ak", **uproot_kwargs)
65+
else:
66+
#file is parquet
67+
awk_arrays[sample] = dak.from_parquet(paths)
5568
else:
56-
# Use uproot.iterate to handle URLs and local paths files in chunks
57-
tmp_arrays = list(uproot.iterate(paths, library="ak", **uproot_kwargs))
58-
# Merge arrays
59-
awk_arrays[sample] = ak.concatenate(tmp_arrays)
69+
if is_root==True:
70+
# Use uproot.iterate to handle URLs and local paths files in chunks
71+
tmp_arrays = list(uproot.iterate(paths, library="ak", **uproot_kwargs))
72+
# Merge arrays
73+
awk_arrays[sample] = ak.concatenate(tmp_arrays)
74+
else:
75+
awk_arrays[sample] = ak.from_parquet(paths)
76+
6077

6178
except Exception as e:
6279
# Log the exception pointing at the user's code

0 commit comments

Comments
 (0)