Skip to content

Commit d436e73

Browse files
authored
Refactor uproot-raw to chunk the processing (#1059)
* Refactor uproot-raw to write out results in chunks in order to keep memory use down
1 parent 1991e6b commit d436e73

File tree

3 files changed

+118
-74
lines changed

3 files changed

+118
-74
lines changed

code_generator_raw_uproot/servicex/raw_uproot_code_generator/request_translator.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2019, IRIS-HEP
1+
# Copyright (c) 2019-2025, IRIS-HEP
22
# All rights reserved.
33
#
44
# Redistribution and use in source and binary forms, with or without
@@ -58,9 +58,8 @@ def run_query(file_path):
5858
5959
rv_arrays_trees = {{}}; rv_arrays_histograms = {{}}
6060
for subquery in jquery:
61-
a, b = run_single_query(file_path, subquery)
62-
rv_arrays_trees.update(a); rv_arrays_histograms.update(b)
63-
return rv_arrays_trees, rv_arrays_histograms
61+
for obj in run_single_query(file_path, subquery):
62+
yield obj
6463
6564
def run_single_query(file_path, query):
6665
import uproot
@@ -141,26 +140,20 @@ def run_single_query(file_path, query):
141140
raise
142141
else:
143142
continue
144-
arr = None
143+
arrfound = False
145144
for subarr in t.iterate(language=lang, **sanitized_args):
146-
if arr is None:
147-
arr = subarr
148-
else:
149-
arr = ak.concatenate([arr, subarr])
150-
if arr is not None and len(arr): # iterate will not give anything if tree empty
151-
rv_arrays_trees[outtreename] = (arr, None)
152-
else: # recent uproot handles zero-length case properly for arrays()
145+
arrfound = True
146+
yield ('tree', outtreename, subarr)
147+
if not arrfound: # need this branch if the original tree has no entries
153148
if 'cut' in sanitized_args:
154149
sanitized_args.pop('cut')
155150
arr = t.arrays(language=lang, entry_stop=0, **sanitized_args)
156-
rv_arrays_trees[outtreename] = (None, arr.layout)
151+
yield ('tree', outtreename, arr)
157152
else:
158153
histograms = query['copy_histograms']
159154
keys = fl.keys(filter_name=histograms, cycle=False)
160155
for key in keys:
161-
rv_arrays_histograms[key] = fl[key]
162-
163-
return rv_arrays_trees, rv_arrays_histograms
156+
yield ('obj', key, fl[key])
164157
'''
165158

166159
_hash = hashlib.md5(generated_code.encode(), usedforsecurity=False).hexdigest()

code_generator_raw_uproot/servicex/templates/transform_single_file.py

Lines changed: 108 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,80 @@
1+
# Copyright (c) 2019-2025, IRIS-HEP
2+
# All rights reserved.
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright notice, this
8+
# list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright notice,
11+
# this list of conditions and the following disclaimer in the documentation
12+
# and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the copyright holder nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28+
129
import os
230
import sys
331
import time
432
from pathlib import Path
5-
import generated_transformer
33+
from generated_transformer import run_query # noqa
634
import awkward as ak
735
import pyarrow.parquet as pq
8-
import pyarrow
36+
import functools
937
instance = os.environ.get('INSTANCE_NAME', 'Unknown')
1038

1139

40+
def get_generator_timing(f):
41+
from time import perf_counter
42+
43+
@functools.wraps(f)
44+
def wrapper(*args, **kwargs):
45+
t0 = perf_counter()
46+
for yv in f(*args, **kwargs):
47+
dt = perf_counter()-t0
48+
yield dt, yv
49+
t0 = perf_counter()
50+
return wrapper
51+
52+
53+
def get_direct_timing(f):
54+
from time import perf_counter
55+
56+
@functools.wraps(f)
57+
def wrapper(*args, **kwargs):
58+
t0 = perf_counter()
59+
rv = f(*args, **kwargs)
60+
dt = perf_counter()-t0
61+
return dt, rv
62+
return wrapper
63+
64+
65+
def root_write_table_data(output_format, writer, outtreename, data):
66+
if output_format == 'root-file':
67+
if outtreename in writer:
68+
writer[outtreename].extend({field: data[field] for field in data.fields})
69+
else:
70+
writer[outtreename] = {field: data[field] for field in data.fields}
71+
else: # RNTuple
72+
if outtreename in writer:
73+
writer[outtreename].extend(data)
74+
else:
75+
writer.mkrntuple(outtreename, data)
76+
77+
1278
def transform_single_file(file_path: str, output_path: Path, output_format: str):
1379
"""
1480
Transform a single file and return some information about output
@@ -18,73 +84,58 @@ def transform_single_file(file_path: str, output_path: Path, output_format: str)
1884
"""
1985
try:
2086
stime = time.time()
21-
22-
awkward_array_dict, histograms = generated_transformer.run_query(file_path)
23-
total_events = sum((ak.num(awkward_array[0], axis=0)
24-
for awkward_array in awkward_array_dict.values()
25-
if awkward_array[0] is not None), 0)
26-
27-
ttime = time.time()
87+
total_events = 0
88+
ttimedt = 0
89+
etimedt = 0
2890

2991
if output_format in ('root-file', 'root-rntuple'):
3092
import uproot
31-
etime = time.time()
3293
# opening the file with open() is a workaround for a bug handling multiple colons
33-
# in the filename in uproot 5.3.9
94+
# in the filename in uproot
3495
with open(output_path, 'b+w') as wfile:
3596
with uproot.recreate(wfile, compression=uproot.ZSTD(5)) as writer:
36-
for k, v in awkward_array_dict.items():
37-
if output_format == 'root-file':
38-
if v[0] is not None:
39-
writer[k] = {field: v[0][field] for field in
40-
v[0].fields}
41-
else:
42-
writer.mktree(k, dict(zip(v[1].form.columns(),
43-
v[1].form.column_types())))
44-
else: # RNTuple
45-
if v[0] is not None:
46-
# Work around a limitation in uproot 5.6.0
47-
# If a cut is specified, we'll get ListArrays which can't be
48-
# written via uproot. Convert them to ListOffsetArrays
49-
# Assume the ListArrays are only at top level
50-
warr = ak.zip({_: v[0][_].layout.to_ListOffsetArray64()
51-
if isinstance(v[0][_].layout, ak.contents.ListArray)
52-
else v[0][_]
53-
for _ in v[0].fields}, depth_limit=1)
54-
writer.mkrntuple(k, warr)
55-
else:
56-
writer.mkrntuple(k, v[1].form)
57-
for k, v in histograms.items():
58-
writer[k] = v
97+
for dt, item in get_generator_timing(run_query)(file_path):
98+
ttimedt += dt
99+
match item:
100+
case ('tree', k, v):
101+
total_events += ak.num(v, axis=0)
102+
root_write_table_data(output_format, writer, k, v)
103+
case ('obj', k, v):
104+
writer[k] = v
59105
wtime = time.time()
60106

61-
else:
62-
if histograms:
63-
raise RuntimeError("Cannot store histograms in a non-ROOT return file format")
64-
for treename, subarray in awkward_array_dict.items():
65-
subarray['treename'] = treename
66-
awkward_array = awkward_array_dict.popitem()[1]
67-
for treename, subarray in awkward_array_dict.items():
68-
awkward_array = ak.concatenate([awkward_array, subarray])
69-
70-
arrow = ak.to_arrow_table(awkward_array)
71-
72-
etime = time.time()
73-
107+
else: # parquet
108+
awkward_array = None
109+
writer = None
74110
try:
75-
writer = pq.ParquetWriter(output_path, arrow.schema)
76-
except pyarrow.lib.ArrowNotImplementedError:
77-
raise RuntimeError("Unable to translate output tables to parquet "
78-
"(probably different queries give different branches?)")
79-
writer.write_table(table=arrow)
80-
writer.close()
81-
111+
for dt, item in get_generator_timing(run_query)(file_path):
112+
ttimedt += dt
113+
match item:
114+
case ('tree', k, awkward_array):
115+
total_events += ak.num(awkward_array, axis=0)
116+
awkward_array['treename'] = k
117+
dt2, arrow = get_direct_timing(ak.to_arrow_table)(awkward_array)
118+
etimedt += dt2
119+
if not writer:
120+
writer = pq.ParquetWriter(output_path, arrow.schema)
121+
try:
122+
writer.write_table(table=arrow)
123+
except ValueError as e:
124+
raise RuntimeError("Unable to translate output tables to parquet "
125+
"(probably different queries give different "
126+
f"branches?)\n{e}")
127+
case ('obj', k, v):
128+
raise RuntimeError("Cannot store histograms in a non-ROOT "
129+
"return file format")
130+
finally:
131+
if writer:
132+
writer.close()
82133
wtime = time.time()
83134

84135
output_size = os.stat(output_path).st_size
85-
print(f'Detailed transformer times. query_time:{round(ttime - stime, 3)} '
86-
f'serialization: {round(etime - ttime, 3)} '
87-
f'writing: {round(wtime - etime, 3)}')
136+
print(f'Detailed transformer times. query_time:{round(ttimedt, 3)} '
137+
f'serialization: {round(etimedt, 3)} '
138+
f'writing: {round((wtime - stime) - etimedt - ttimedt, 3)}')
88139

89140
print(f"Transform stats: Total Events: {total_events}, resulting file size {output_size}")
90141
except Exception as error:

code_generator_raw_uproot/tests/test_src.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def test_generate_code():
5050
'filter_name': ['lbn']},
5151
{'copy_histograms': 'CutBookkeeper*'}
5252
])
53-
expected_hash = "5c3235898f268e81080455c92b7c914e"
53+
expected_hash = "e95bbb95ff7556f2ffcc8a8c8f09919c"
5454
result = translator.generate_code(query, tmpdirname)
5555

5656
# is the generated code at least syntactically valid Python?

0 commit comments

Comments
 (0)