|
| 1 | +# Materialization of ServiceX Deliver Results |
| 2 | + |
| 3 | +The `to_awk()` function provides a streamlined method to materialize the output of a ServiceX `deliver()` call into Awkward Arrays, Dask arrays, or iterators. |
| 4 | + |
| 5 | +This simplifies workflows by allowing easy manipulation of the retrieved data in various analysis pipelines like in the examples below. |
| 6 | + |
| 7 | +--- |
| 8 | + |
| 9 | +## Functionality Overview |
| 10 | + |
| 11 | +The `to_awk()` function loads data from the deliver output dictionary, supporting both ROOT (`.root`) and Parquet (`.parquet` or `.pq`) file formats. |
| 12 | + |
| 13 | +It provides flexible options for: |
| 14 | + |
| 15 | +- Direct loading into Awkward Arrays. |
| 16 | +- Lazy loading using Dask for scalable operations. |
| 17 | +- Returning iterator objects for manual control over file streaming. |
| 18 | + |
| 19 | +## Function Signature |
| 20 | + |
| 21 | +```python |
| 22 | +to_awk(deliver_dict, dask=False, iterator=False, **kwargs) |
| 23 | +``` |
| 24 | + |
| 25 | +**Parameters:** |
| 26 | + |
| 27 | +- `deliver_dict` (dict): Dictionary returned by `servicex.deliver()`. Keys are sample names, values are file paths or URLs. |
| 28 | +- `dask` (bool, optional): If True, loads files lazily using Dask. Default is False. |
| 29 | +- `iterator` (bool, optional): If True and not using Dask, returns iterators instead of materialized arrays. Default is False. |
| 30 | +- `**kwargs`: Additional keyword arguments passed to `uproot.dask`, `uproot.iterate`, dak.from_parquet, or `awkward.from_parquet`. |
| 31 | + |
| 32 | +**Returns:** |
| 33 | + |
| 34 | +- `dict`: A dictionary where keys are sample names and values are either Awkward Arrays, Dask Arrays, or iterators. Keeping the same dict structure as the `deliver` output. |
| 35 | + |
| 36 | +--- |
| 37 | + |
| 38 | +## Usage Examples |
| 39 | + |
| 40 | +### Simple Materialization |
| 41 | + |
| 42 | +Load ServiceX deliver results directly into Awkward Arrays: |
| 43 | + |
| 44 | +```python |
| 45 | +from servicex_analysis_utils import to_awk |
| 46 | +from servicex import query, dataset, deliver |
| 47 | + |
| 48 | +spec = { |
| 49 | + "Sample": [ |
| 50 | + { |
| 51 | + "Name": "simple_transform", |
| 52 | + "Dataset": dataset.FileList( |
| 53 | + ["root://eospublic.cern.ch//eos/opendata/atlas/rucio/data16_13TeV/DAOD_PHYSLITE.37019878._000001.pool.root.1"] # noqa: E501 |
| 54 | + ), |
| 55 | + "Query": query.FuncADL_Uproot() |
| 56 | + .FromTree("CollectionTree") |
| 57 | + .Select(lambda e: {"el_pt": e["AnalysisElectronsAuxDyn.pt"]}), |
| 58 | + } |
| 59 | + ] |
| 60 | +} |
| 61 | + |
| 62 | +arrays=to_awk(deliver(spec)) |
| 63 | +``` |
| 64 | + |
| 65 | +### Lazy Loading with Dask |
| 66 | + |
| 67 | +Load results lazily for large datasets using Dask task graphs. Enables parallel execution across multiple workers. |
| 68 | + |
| 69 | +```python |
| 70 | +import dask_awkward as dak |
| 71 | + |
| 72 | +dask_arrays = to_awk(deliver(spec), dask=True) |
| 73 | +el_pt_array = arrays["simple_transform"]["el_pt"] |
| 74 | +mean_el_pt = dak.mean(el_pt_array).compute() |
| 75 | +``` |
| 76 | + |
| 77 | +### Using Iterators |
| 78 | + |
| 79 | +Return iterators instead of materialized arrays to avoid loading too much data into memory. Requires `dask=False` (default). Example with loading 10,000 events per chunk: |
| 80 | + |
| 81 | +```python |
| 82 | +iterables = to_awk(deliver(spec), iterator=True, step_size=10000) |
| 83 | +``` |
| 84 | + |
| 85 | +You can then manually loop over the data chunks: |
| 86 | + |
| 87 | +```python |
| 88 | +for chunk in iterables['simple_transform']: |
| 89 | + # process small chunk (~10k events) |
| 90 | + analyse(chunk) #some function for el_pt |
| 91 | +``` |
| 92 | + |
| 93 | +All events can also be loaded by using: |
| 94 | + |
| 95 | +```python |
| 96 | +import awkward as ak |
| 97 | +arrays= ak.concatenate(list[iterables['simple_transform']]) |
| 98 | +``` |
| 99 | + |
| 100 | +--- |
| 101 | + |
| 102 | + |
| 103 | +## Multiple samples |
| 104 | + |
| 105 | +ServiceX queries allow multiple sample transformations. The `to_awk` allows a straightforward manipulation of such requests. This allows seamless integration with analysis frameworks with multiple samples being manipulated separately after being passing the same transformation using `deliver()`. |
| 106 | + |
| 107 | +```python |
| 108 | +from servicex_analysis_utils import to_awk |
| 109 | +import awkward as ak |
| 110 | + |
| 111 | +# Given a ServiceX deliver return |
| 112 | +deliver_result = { |
| 113 | + "Signal": ["path/to/signal_file1.root", "path/to/signal_file2.root"], |
| 114 | + "Background": ["path/to/background_file.root"] |
| 115 | +} |
| 116 | + |
| 117 | +arrays = to_awk(deliver_result) |
| 118 | + |
| 119 | +signal_el_pt = arrays["Signal"]["el_pt"] |
| 120 | +background_el_pt = arrays["Background"]["el_pt"] |
| 121 | + |
| 122 | +mean_signal = ak.mean(signal_el_pt) |
| 123 | +mean_background = ak.mean(background_el_pt) |
| 124 | + |
| 125 | +print(f"Mean electron pT (Signal): {mean_signal:.2f} GeV") |
| 126 | +print(f"Mean electron pT (Background): {mean_background:.2f} GeV") |
| 127 | +``` |
| 128 | + |
| 129 | + |
| 130 | +## Notes |
| 131 | + |
| 132 | +- **Multiple samples:** For transformations delivering multiple samples the dask and iterators are applied homegeneously to all. |
| 133 | +- **Error Handling:** In case of loading errors, the affected sample will have `None` as its value in the returned dictionary. |
| 134 | +- **Supported Formats:** A custom dict (non servicex) can be inputed but the paths must point be either ROOT or Parquet format. |
| 135 | +- **Branch Filtering, others:** Additional `**kwargs` allow specifying branch selections or other loading options supported by `uproot`, `awkward` and `dask_awkward`. |
| 136 | + |
| 137 | +--- |
0 commit comments