Skip to content

Commit 44c7d94

Browse files
committed
Move manifest to s3 and split workflow
1 parent 544a060 commit 44c7d94

File tree

5 files changed

+104
-85
lines changed

5 files changed

+104
-85
lines changed

examples/virtual-rechunk/README.md

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
# Rechunk a virtual dataset
22

3-
This example demonstrates how to rechunk a collection of necdf files on s3
4-
into a single zarr store.
3+
This example demonstrates how to rechunk a collection of necdf files on s3 into a single zarr store.
4+
5+
Most rechunking workflows can be conceptualized in two steps,
6+
which typically provides greater flexibility than combining them.
7+
The first (staging) step is mostly embarassingly parallel and prepares the input data.
8+
In this example, we construct a virtual zarr dataset using `lithops`,
9+
but we could incorporate data transfer and reprocessing as part of staging.
10+
11+
The second (rechunking) step rechunks the staged data.
12+
Here, we rechuck the virtual zarr using `cubed`,
13+
but in theory, `dask` or other map-reduce frameworks may be used.
514

6-
First, lithops and Virtualizarr construct a virtual dataset comprised of the
7-
netcdf files on s3. Then, xarray-cubed rechunks the virtual dataset into a
8-
zarr.
915

1016
## Credits
1117
Inspired by Pythia's cookbook: https://projectpythia.org/kerchunk-cookbook
@@ -14,31 +20,36 @@ by norlandrhagen.
1420
Please, contribute improvements.
1521

1622

17-
1823
1. Set up a Python environment
1924
```bash
2025
conda create --name virtualizarr-rechunk -y python=3.11
2126
conda activate virtualizarr-rechunk
2227
pip install -r requirements.txt
2328
```
2429

25-
1. Set up cubed executor for [lithops-aws](https://github.com/cubed-dev/cubed/blob/main/examples/lithops/aws/README.md) by editing `./lithops.yaml` with your `bucket` and `execution_role`.
26-
```bash
30+
2. Set up cubed executor for [lithops-aws](https://github.com/cubed-dev/cubed/blob/main/examples/lithops/aws/README.md) by editing `./lithops.yaml` with your `bucket` and `execution_role`.
2731

28-
1. Build a runtime image for Cubed
32+
3. Build a runtime image for `cubed`
2933
```bash
3034
export LITHOPS_CONFIG_FILE=$(pwd)/lithops.yaml
31-
export CUBED_CONFIG=$(pwd)
35+
export CUBED_CONFIG=$(pwd)/cubed.yaml
36+
# create a bucket for storing results
37+
export BUCKET_URL=s3://wma-uncertainty/scratch
3238
lithops runtime build -b aws_lambda -f Dockerfile_virtualizarr virtualizarr-runtime
3339
```
3440

35-
1. Run the script
41+
4. Stage the virtual zarr using `lithops`
42+
```bash
43+
python create-virtualzarr.py
44+
```
45+
46+
5. Rechunk the virtual zarr with `cubed` (using `lithops`)
3647
```bash
3748
python cubed-rechunk.py
3849
```
3950

4051
## Cleaning up
41-
To rebuild the Litops image, delete the existing one by running
52+
To rebuild the `lithops` image, delete the existing one by running
4253
```bash
4354
lithops runtime delete -b aws_lambda -d virtualizarr-runtime
4455
```
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Use lithops to construct a virtual zarr from netcdf files on s3.
2+
3+
import fsspec
4+
import lithops
5+
import os
6+
import xarray as xr
7+
8+
from virtualizarr import open_virtual_dataset
9+
10+
bucket_url = os.getenv("BUCKET_URL")
11+
12+
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
13+
files_paths = fs_read.glob("s3://wrf-se-ak-ar5/ccsm/rcp85/daily/2060/*")
14+
file_pattern = sorted(["s3://" + f for f in files_paths])
15+
16+
# Truncate file_pattern while debugging
17+
file_pattern = file_pattern[:4]
18+
19+
print(f"{len(file_pattern)} file paths were retrieved.")
20+
21+
22+
def map_references(fil):
23+
""" Map function to open virtual datasets.
24+
"""
25+
vds = open_virtual_dataset(
26+
fil,
27+
indexes={},
28+
loadable_variables=['Time'],
29+
cftime_variables=['Time'],
30+
)
31+
return vds
32+
33+
34+
def reduce_references(results):
35+
""" Reduce to concat virtual datasets.
36+
"""
37+
combined_vds = xr.combine_nested(
38+
results,
39+
concat_dim=["Time"],
40+
coords="minimal",
41+
compat="override",
42+
)
43+
44+
return combined_vds
45+
46+
47+
fexec = lithops.FunctionExecutor(config_file="lithops.yaml")
48+
49+
futures = fexec.map_reduce(
50+
map_references,
51+
file_pattern,
52+
reduce_references,
53+
spawn_reducer=100,
54+
)
55+
56+
ds = futures.get_result()
57+
58+
# Save the virtual zarr manifest to s3
59+
ds.virtualize.to_kerchunk(f"{bucket_url}/combined.json", format="json")

examples/virtual-rechunk/lithops.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ aws_lambda:
1111
runtime_memory: 2000
1212

1313
aws_s3:
14-
bucket: arn:aws:s3:::cubed-thodson-temp
14+
storage_bucket: cubed-thodson-temp

examples/virtual-rechunk/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
boto
1+
boto3
22
cftime
33
cubed
44
cubed-xarray
Lines changed: 20 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,34 @@
1-
# Rechunk a collection of necdf files on s3 into a single zarr store.
1+
# Rechunk a virtual zarr on s3 into a single zarr store using xarray-cubed.
22
#
3-
# First, lithops and Virtualizarr construct a virtual dataset comprised of the
4-
# netcdf files on s3. Then, xarray-cubed rechunks the virtual dataset into a
5-
# zarr.
3+
# Prior to running this script, create the virtual zarr with
4+
# > python create-virtualzarr.py
65
#
7-
# Inspired by Pythia's cookbook: https://projectpythia.org/kerchunk-cookbook
8-
# by norlandrhagen.
9-
#
10-
# Please, contribute improvements.
6+
# NOTE: In jupyter, open_dataset seems to cache the json, such that changes
7+
# aren't propogated until the kernel is restarted.
118

12-
import fsspec
13-
import lithops
9+
import os
1410
import xarray as xr
1511

16-
from virtualizarr import open_virtual_dataset
17-
18-
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
19-
files_paths = fs_read.glob("s3://wrf-se-ak-ar5/ccsm/rcp85/daily/2060/*")
20-
file_pattern = sorted(["s3://" + f for f in files_paths])
21-
22-
# truncate file_pattern while debugging
23-
file_pattern = file_pattern[:4]
24-
25-
print(f"{len(file_pattern)} file paths were retrieved.")
26-
27-
28-
def map_references(fil):
29-
""" Map function to open virtual datasets.
30-
"""
31-
vds = open_virtual_dataset(fil,
32-
indexes={},
33-
loadable_variables=['Time'],
34-
cftime_variables=['Time'],
35-
)
36-
return vds
37-
38-
39-
def reduce_references(results):
40-
""" Reduce to concat virtual datasets.
12+
bucket_url = os.getenv("BUCKET_URL")
4113

42-
"""
43-
combined_vds = xr.combine_nested(
44-
results,
45-
concat_dim=["Time"],
46-
coords="minimal",
47-
compat="override",
48-
)
49-
# possibly write parquet to s3 here
50-
return combined_vds
51-
52-
53-
fexec = lithops.FunctionExecutor(config_file="lithops.yaml")
54-
55-
futures = fexec.map_reduce(
56-
map_references,
57-
file_pattern,
58-
reduce_references,
59-
spawn_reducer=100,
14+
combined_ds = xr.open_dataset(
15+
f"{bucket_url}/combined.json", # location must be accessible to workers
16+
engine="kerchunk",
17+
chunks={},
18+
chunked_array_type="cubed",
6019
)
6120

62-
ds = futures.get_result()
63-
ds.virtualize.to_kerchunk("combined.json", format="json")
64-
65-
# NOTE: In jupyter, open_dataset seems to cache the json, such that changes
66-
# aren't propogated until the kernel is restarted.
67-
combined_ds = xr.open_dataset("combined.json",
68-
engine="kerchunk",
69-
chunks={},
70-
chunked_array_type="cubed",
71-
)
72-
73-
combined_ds['Time'].attrs = {} # to_zarr complains about attrs
21+
combined_ds['Time'].attrs = {} # otherwise to_zarr complains about attrs
7422

7523
rechunked_ds = combined_ds.chunk(
7624
chunks={'Time': 5, 'south_north': 25, 'west_east': 32},
7725
chunked_array_type="cubed",
7826
)
7927

80-
rechunked_ds.to_zarr("rechunked.zarr",
81-
mode="w",
82-
encoding={}, # TODO
83-
consolidated=True,
84-
safe_chunks=False,
85-
)
28+
rechunked_ds.to_zarr(
29+
f"{bucket_url}/rechunked.zarr",
30+
mode="w",
31+
encoding={}, # TODO
32+
consolidated=True,
33+
safe_chunks=False,
34+
)

0 commit comments

Comments
 (0)