Skip to content

Commit b7a3313

Browse files
committed
Add virtual-rechunk example
1 parent 4fb30c8 commit b7a3313

File tree

6 files changed

+218
-0
lines changed

6 files changed

+218
-0
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Python 3.11
2+
FROM python:3.11-slim-buster
3+
4+
5+
RUN apt-get update \
6+
# Install aws-lambda-cpp build dependencies
7+
&& apt-get install -y \
8+
g++ \
9+
make \
10+
cmake \
11+
unzip \
12+
# cleanup package lists, they are not used anymore in this image
13+
&& rm -rf /var/lib/apt/lists/* \
14+
&& apt-cache search linux-headers-generic
15+
16+
ARG FUNCTION_DIR="/function"
17+
18+
# Copy function code
19+
RUN mkdir -p ${FUNCTION_DIR}
20+
21+
# Update pip
22+
# NB botocore/boto3 are pinned due to https://github.com/boto/boto3/issues/3648
23+
# using versions from https://github.com/aio-libs/aiobotocore/blob/72b8dd5d7d4ef2f1a49a0ae0c37b47e5280e2070/setup.py
24+
# due to s3fs dependency
25+
RUN pip install --upgrade --ignore-installed pip wheel six setuptools \
26+
&& pip install --upgrade --no-cache-dir --ignore-installed \
27+
awslambdaric \
28+
botocore==1.29.76 \
29+
boto3==1.26.76 \
30+
redis \
31+
httplib2 \
32+
requests \
33+
numpy \
34+
scipy \
35+
pandas \
36+
pika \
37+
kafka-python \
38+
cloudpickle \
39+
ps-mem \
40+
tblib
41+
42+
# Set working directory to function root directory
43+
WORKDIR ${FUNCTION_DIR}
44+
45+
# Add Lithops
46+
COPY lithops_lambda.zip ${FUNCTION_DIR}
47+
RUN unzip lithops_lambda.zip \
48+
&& rm lithops_lambda.zip \
49+
&& mkdir handler \
50+
&& touch handler/__init__.py \
51+
&& mv entry_point.py handler/
52+
53+
# Put your dependencies here, using RUN pip install... or RUN apt install...
54+
55+
COPY requirements.txt requirements.txt
56+
RUN pip install --no-cache-dir -r requirements.txt
57+
58+
ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ]
59+
CMD [ "handler.entry_point.lambda_handler" ]

examples/virtual-rechunk/README.md

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Rechunk a virtual dataset
2+
3+
This example demonstrates how to rechunk a collection of necdf files on s3
4+
into a single zarr store.
5+
6+
First, lithops and Virtualizarr construct a virtual dataset comprised of the
7+
netcdf files on s3. Then, xarray-cubed rechunks the virtual dataset into a
8+
zarr.
9+
10+
## Credits
11+
Inspired by Pythia's cookbook: https://projectpythia.org/kerchunk-cookbook
12+
by norlandrhagen.
13+
14+
Please, contribute improvements.
15+
16+
17+
18+
1. Set up a Python environment
19+
```bash
20+
conda create --name virtualizarr-rechunk -y python=3.11
21+
conda activate virtualizarr-rechunk
22+
pip install -r requirements.txt
23+
```
24+
25+
1. Set up cubed executor for [lithops-aws](https://github.com/cubed-dev/cubed/blob/main/examples/lithops/aws/README.md) by editing `./lithops.yaml` with your `bucket` and `execution_role`.
26+
```bash
27+
28+
1. Build a runtime image for Cubed
29+
```bash
30+
export LITHOPS_CONFIG_FILE=$(pwd)/lithops.yaml
31+
export CUBED_CONFIG=$(pwd)
32+
lithops runtime build -b aws_lambda -f Dockerfile_virtualizarr virtualizarr-runtime
33+
```
34+
35+
1. Run the script
36+
```bash
37+
python cubed-rechunk.py
38+
```
39+
40+
## Cleaning up
41+
To rebuild the Litops image, delete the existing one by running
42+
```bash
43+
lithops runtime delete -b aws_lambda -d virtualizarr-runtime
44+
```

examples/virtual-rechunk/cubed.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
spec:
2+
work_dir: "s3://cubed-$USER-temp"
3+
allowed_mem: "2GB"
4+
executor_name: "lithops"
5+
executor_options:
6+
runtime: "virtualizarr-runtime"
7+
runtime_memory: 2000

examples/virtual-rechunk/lithops.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
lithops:
2+
backend: aws_lambda
3+
storage: aws_s3
4+
5+
aws:
6+
region: us-west-2
7+
8+
aws_lambda:
9+
execution_role: arn:aws:iam::807615458658:role/lambdaLithopsExecutionRole
10+
runtime: virtualizarr-runtime
11+
runtime_memory: 2000
12+
13+
aws_s3:
14+
bucket: arn:aws:s3:::cubed-thodson-temp
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
boto
2+
cftime
3+
cubed
4+
cubed-xarray
5+
h5py
6+
kerchunk
7+
lithops
8+
s3fs
9+
virtualizarr
10+
xarray
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Rechunk a collection of necdf files on s3 into a single zarr store.
2+
#
3+
# First, lithops and Virtualizarr construct a virtual dataset comprised of the
4+
# netcdf files on s3. Then, xarray-cubed rechunks the virtual dataset into a
5+
# zarr.
6+
#
7+
# Inspired by Pythia's cookbook: https://projectpythia.org/kerchunk-cookbook
8+
# by norlandrhagen.
9+
#
10+
# Please, contribute improvements.
11+
12+
import fsspec
13+
import lithops
14+
import xarray as xr
15+
16+
from virtualizarr import open_virtual_dataset
17+
18+
fs_read = fsspec.filesystem("s3", anon=True, skip_instance_cache=True)
19+
files_paths = fs_read.glob("s3://wrf-se-ak-ar5/ccsm/rcp85/daily/2060/*")
20+
file_pattern = sorted(["s3://" + f for f in files_paths])
21+
22+
# truncate file_pattern while debugging
23+
file_pattern = file_pattern[:4]
24+
25+
print(f"{len(file_pattern)} file paths were retrieved.")
26+
27+
28+
def map_references(fil):
29+
""" Map function to open virtual datasets.
30+
"""
31+
vds = open_virtual_dataset(fil,
32+
indexes={},
33+
loadable_variables=['Time'],
34+
cftime_variables=['Time'],
35+
)
36+
return vds
37+
38+
39+
def reduce_references(results):
40+
""" Reduce to concat virtual datasets.
41+
42+
"""
43+
combined_vds = xr.combine_nested(
44+
results,
45+
concat_dim=['Time'],
46+
coords='minimal',
47+
compat='override',
48+
)
49+
# possibly write parquet to s3 here
50+
return combined_vds
51+
52+
53+
fexec = lithops.FunctionExecutor(config_file="lithops.yaml")
54+
55+
futures = fexec.map_reduce(
56+
map_references,
57+
file_pattern,
58+
reduce_references,
59+
spawn_reducer=100,
60+
)
61+
62+
ds = futures.get_result()
63+
ds.virtualize.to_kerchunk('combined.json', format='json')
64+
65+
# NOTE: In jupyter, open_dataset seems to cache the json, such that changes
66+
# aren't propogated until the kernel is restarted.
67+
combined_ds = xr.open_dataset('combined.json',
68+
engine="kerchunk",
69+
chunks={},
70+
chunked_array_type='cubed',
71+
)
72+
73+
combined_ds['Time'].attrs = {} # to_zarr complains about attrs
74+
75+
rechunked_ds = combined_ds.chunk(
76+
chunks={'Time': 5, 'south_north': 25, 'west_east': 32}
77+
)
78+
79+
rechunked_ds.to_zarr('rechunked.zarr',
80+
mode='w',
81+
encoding={}, # TODO
82+
consolidated=True,
83+
safe_chunks=False,
84+
)

0 commit comments

Comments
 (0)