2
2
import sys
3
3
import time
4
4
from pathlib import Path
5
+ import inspect
5
6
import generated_transformer
6
- import awkward as ak
7
- import uproot
8
- import pyarrow .parquet as pq
9
- import numpy as np
10
- instance = os .environ .get ('INSTANCE_NAME' , 'Unknown' )
7
+
8
+ instance = os .environ .get ("INSTANCE_NAME" , "Unknown" )
11
9
default_tree_name = "servicex"
12
10
default_branch_name = "branch"
13
11
@@ -22,65 +20,96 @@ def transform_single_file(file_path: str, output_path: Path, output_format: str)
22
20
try :
23
21
stime = time .time ()
24
22
25
- output = generated_transformer . run_query ( file_path )
26
-
27
- ttime = time . time ()
23
+ # We first see if the function takes two parameters; if so we assume the second
24
+ # will be interpreted as the file name for the output.
25
+ # If it doesn't, then we assume it's giving us back awkward array results
28
26
29
- if output_format == 'root-file' :
27
+ provided_signature = inspect .signature (generated_transformer .run_query )
28
+ if len (provided_signature .parameters ) == 2 :
29
+ generated_transformer .run_query (file_path , str (output_path ))
30
+ if not output_path .exists ():
31
+ raise RuntimeError (
32
+ "Transformation did not produce expected output file "
33
+ f"{ output_path } "
34
+ )
35
+ ttime = time .time ()
30
36
etime = time .time ()
31
- if isinstance (output , ak .Array ):
32
- awkward_arrays = {default_tree_name : output }
33
- elif isinstance (output , dict ):
34
- awkward_arrays = output
35
- with open (output_path , 'b+w' ) as wfile :
36
- with uproot .recreate (wfile ) as writer :
37
- for key in awkward_arrays .keys ():
38
- total_events = awkward_arrays [key ].__len__ ()
39
- if awkward_arrays [key ].fields and total_events :
40
- o_dict = {field : awkward_arrays [key ][field ]
41
- for field in awkward_arrays [key ].fields }
42
- elif awkward_arrays [key ].fields and not total_events :
43
- o_dict = {field : np .array ([])
44
- for field in awkward_arrays [key ].fields }
45
- elif not awkward_arrays [key ].fields and total_events :
46
- o_dict = {default_branch_name : awkward_arrays [key ]}
47
- else :
48
- o_dict = {default_branch_name : np .array ([])}
49
- writer [key ] = o_dict
50
-
51
37
wtime = time .time ()
52
- elif output_format == 'raw-file' :
53
- etime = time .time ()
54
38
total_events = 0
55
- output_path = output
56
- wtime = time .time ()
57
39
else :
58
- if isinstance (output , dict ):
59
- tree_name = list (output .keys ())[0 ]
60
- awkward_array = output [tree_name ]
61
- print (f'Returned type from your Python function is a dictionary - '
62
- f'Only the first key { tree_name } will be written as parquet files. '
63
- f'Please use root-file output to write all trees.' )
40
+ import awkward as ak
41
+ import uproot
42
+ import pyarrow .parquet as pq
43
+ import numpy as np
44
+
45
+ output = generated_transformer .run_query (file_path )
46
+
47
+ ttime = time .time ()
48
+ if output_format == "root-file" :
49
+ etime = time .time ()
50
+ if isinstance (output , ak .Array ):
51
+ awkward_arrays = {default_tree_name : output }
52
+ elif isinstance (output , dict ):
53
+ awkward_arrays = output
54
+ with open (output_path , "b+w" ) as wfile :
55
+ with uproot .recreate (wfile ) as writer :
56
+ for key in awkward_arrays .keys ():
57
+ total_events = awkward_arrays [key ].__len__ ()
58
+ if awkward_arrays [key ].fields and total_events :
59
+ o_dict = {
60
+ field : awkward_arrays [key ][field ]
61
+ for field in awkward_arrays [key ].fields
62
+ }
63
+ elif awkward_arrays [key ].fields and not total_events :
64
+ o_dict = {
65
+ field : np .array ([])
66
+ for field in awkward_arrays [key ].fields
67
+ }
68
+ elif not awkward_arrays [key ].fields and total_events :
69
+ o_dict = {default_branch_name : awkward_arrays [key ]}
70
+ else :
71
+ o_dict = {default_branch_name : np .array ([])}
72
+ writer [key ] = o_dict
73
+
74
+ wtime = time .time ()
75
+ elif output_format == "raw-file" :
76
+ etime = time .time ()
77
+ total_events = 0
78
+ output_path = output
79
+ wtime = time .time ()
64
80
else :
65
- awkward_array = output
81
+ if isinstance (output , dict ):
82
+ tree_name = list (output .keys ())[0 ]
83
+ awkward_array = output [tree_name ]
84
+ print (
85
+ f"Returned type from your Python function is a dictionary - "
86
+ f"Only the first key { tree_name } will be written as parquet files. "
87
+ f"Please use root-file output to write all trees."
88
+ )
89
+ else :
90
+ awkward_array = output
66
91
67
- total_events = ak .num (awkward_array , axis = 0 )
68
- arrow = ak .to_arrow_table (awkward_array )
92
+ total_events = ak .num (awkward_array , axis = 0 )
93
+ arrow = ak .to_arrow_table (awkward_array )
69
94
70
- etime = time .time ()
95
+ etime = time .time ()
71
96
72
- writer = pq .ParquetWriter (output_path , arrow .schema )
73
- writer .write_table (table = arrow )
74
- writer .close ()
97
+ writer = pq .ParquetWriter (output_path , arrow .schema )
98
+ writer .write_table (table = arrow )
99
+ writer .close ()
75
100
76
- wtime = time .time ()
101
+ wtime = time .time ()
77
102
78
103
output_size = os .stat (output_path ).st_size
79
- print (f'Detailed transformer times. query_time:{ round (ttime - stime , 3 )} '
80
- f'serialization: { round (etime - ttime , 3 )} '
81
- f'writing: { round (wtime - etime , 3 )} ' )
104
+ print (
105
+ f"Detailed transformer times. query_time:{ round (ttime - stime , 3 )} "
106
+ f"serialization: { round (etime - ttime , 3 )} "
107
+ f"writing: { round (wtime - etime , 3 )} "
108
+ )
82
109
83
- print (f"Transform stats: Total Events: { total_events } , resulting file size { output_size } " )
110
+ print (
111
+ f"Transform stats: Total Events: { total_events } , resulting file size { output_size } "
112
+ )
84
113
except Exception as error :
85
114
mesg = f"Failed to transform input file { file_path } : { error } "
86
115
print (mesg )
0 commit comments