Skip to content

Commit a70d4ce

Browse files
authored
Allow Zarr compression to be set for intermediate files (#572)
1 parent 88ae813 commit a70d4ce

File tree

9 files changed

+81
-7
lines changed

9 files changed

+81
-7
lines changed

cubed/core/ops.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,7 @@ def blockwise(
304304
target_store=target_store,
305305
target_path=target_path,
306306
storage_options=spec.storage_options,
307+
compressor=spec.zarr_compressor,
307308
shape=shape,
308309
dtype=dtype,
309310
chunks=_chunks,
@@ -367,6 +368,7 @@ def general_blockwise(
367368
target_store=target_store,
368369
target_path=target_path,
369370
storage_options=spec.storage_options,
371+
compressor=spec.zarr_compressor,
370372
shape=shape,
371373
dtype=dtype,
372374
chunks=chunks,

cubed/primitive/blockwise.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def blockwise(
128128
target_store: T_Store,
129129
target_path: Optional[str] = None,
130130
storage_options: Optional[Dict[str, Any]] = None,
131+
compressor: Union[dict, str, None] = "default",
131132
shape: T_Shape,
132133
dtype: T_DType,
133134
chunks: T_Chunks,
@@ -215,6 +216,7 @@ def blockwise(
215216
target_store=target_store,
216217
target_path=target_path,
217218
storage_options=storage_options,
219+
compressor=compressor,
218220
shape=shape,
219221
dtype=dtype,
220222
chunks=chunks,
@@ -236,6 +238,7 @@ def general_blockwise(
236238
target_store: T_Store,
237239
target_path: Optional[str] = None,
238240
storage_options: Optional[Dict[str, Any]] = None,
241+
compressor: Union[dict, str, None] = "default",
239242
shape: T_Shape,
240243
dtype: T_DType,
241244
chunks: T_Chunks,
@@ -297,6 +300,7 @@ def general_blockwise(
297300
chunks=chunksize,
298301
path=target_path,
299302
storage_options=storage_options,
303+
compressor=compressor,
300304
)
301305

302306
func_kwargs = extra_func_kwargs or {}

cubed/runtime/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
try:
1111
import memray
1212
except ImportError:
13-
memray = None
13+
memray = None # type: ignore
1414

1515
sym_counter = 0
1616

cubed/spec.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def __init__(
2121
executor_name: Optional[str] = None,
2222
executor_options: Optional[dict] = None,
2323
storage_options: Union[dict, None] = None,
24+
zarr_compressor: Union[dict, str, None] = "default",
2425
):
2526
"""
2627
Specify resources available to run a computation.
@@ -42,6 +43,13 @@ def __init__(
4243
The default executor for running computations.
4344
storage_options : dict, optional
4445
Storage options to be passed to fsspec.
46+
zarr_compressor : dict or str, optional
47+
The compressor used by Zarr for intermediate data.
48+
49+
If not specified, or set to ``"default"``, Zarr will use the default Blosc compressor.
50+
If set to ``None``, compression is disabled, which can be a good option when using local storage.
51+
Use a dictionary to configure arbitrary compression using Numcodecs. The following example specifies
52+
Blosc compression: ``zarr_compressor={"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}``.
4553
"""
4654

4755
self._work_dir = work_dir
@@ -61,6 +69,7 @@ def __init__(
6169
self._executor = None
6270

6371
self._storage_options = storage_options
72+
self._zarr_compressor = zarr_compressor
6473

6574
@property
6675
def work_dir(self) -> Optional[str]:
@@ -97,10 +106,15 @@ def storage_options(self) -> Optional[dict]:
97106
"""Storage options to be passed to fsspec."""
98107
return self._storage_options
99108

109+
@property
110+
def zarr_compressor(self) -> Union[dict, str, None]:
111+
"""The compressor used by Zarr for intermediate data."""
112+
return self._zarr_compressor
113+
100114
def __repr__(self) -> str:
101115
return (
102116
f"cubed.Spec(work_dir={self._work_dir}, allowed_mem={self._allowed_mem}, "
103-
f"reserved_mem={self._reserved_mem}, executor={self._executor}, storage_options={self._storage_options})"
117+
f"reserved_mem={self._reserved_mem}, executor={self._executor}, storage_options={self._storage_options}, zarr_compressor={self._zarr_compressor})"
104118
)
105119

106120
def __eq__(self, other):
@@ -111,6 +125,7 @@ def __eq__(self, other):
111125
and self.reserved_mem == other.reserved_mem
112126
and self.executor == other.executor
113127
and self.storage_options == other.storage_options
128+
and self.zarr_compressor == other.zarr_compressor
114129
)
115130
else:
116131
return False

cubed/storage/backends/tensorstore.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import dataclasses
22
import math
3-
from typing import Any, Dict, Optional
3+
from typing import Any, Dict, Optional, Union
44

55
import numpy as np
66
import tensorstore
@@ -73,7 +73,7 @@ def encode_dtype(d):
7373
return d.descr
7474

7575

76-
def get_metadata(dtype, chunks):
76+
def get_metadata(dtype, chunks, compressor):
7777
metadata = {}
7878
if dtype is not None:
7979
dtype = np.dtype(dtype)
@@ -82,6 +82,8 @@ def get_metadata(dtype, chunks):
8282
if isinstance(chunks, int):
8383
chunks = (chunks,)
8484
metadata["chunks"] = chunks
85+
if compressor != "default":
86+
metadata["compressor"] = compressor
8587
return metadata
8688

8789

@@ -93,6 +95,7 @@ def open_tensorstore_array(
9395
dtype: Optional[T_DType] = None,
9496
chunks: Optional[T_RegularChunks] = None,
9597
path: Optional[str] = None,
98+
compressor: Union[dict, str, None] = "default",
9699
**kwargs,
97100
):
98101
store = str(store) # TODO: check if Path or str
@@ -121,7 +124,7 @@ def open_tensorstore_array(
121124
raise ValueError(f"Mode not supported: {mode}")
122125

123126
if dtype is None or not hasattr(dtype, "fields") or dtype.fields is None:
124-
metadata = get_metadata(dtype, chunks)
127+
metadata = get_metadata(dtype, chunks, compressor)
125128
if metadata:
126129
spec["metadata"] = metadata
127130

@@ -140,7 +143,7 @@ def open_tensorstore_array(
140143
spec["path"] = field_path
141144

142145
field_dtype, _ = dtype.fields[field]
143-
metadata = get_metadata(field_dtype, chunks)
146+
metadata = get_metadata(field_dtype, chunks, compressor)
144147
if metadata:
145148
spec["metadata"] = metadata
146149

cubed/storage/backends/zarr_python.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from typing import Optional
1+
from typing import Optional, Union
22

33
import zarr
4+
from numcodecs.registry import get_codec
45

56
from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store
67

@@ -13,14 +14,19 @@ def open_zarr_array(
1314
dtype: Optional[T_DType] = None,
1415
chunks: Optional[T_RegularChunks] = None,
1516
path: Optional[str] = None,
17+
compressor: Union[dict, str, None] = "default",
1618
**kwargs,
1719
):
20+
if isinstance(compressor, dict):
21+
compressor = get_codec(compressor)
22+
1823
return zarr.open_array(
1924
store,
2025
mode=mode,
2126
shape=shape,
2227
dtype=dtype,
2328
chunks=chunks,
2429
path=path,
30+
compressor=compressor,
2531
**kwargs,
2632
)

cubed/tests/storage/test_zarr.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import pytest
2+
import zarr
3+
from numcodecs.registry import get_codec
24

5+
from cubed import config
6+
from cubed.storage.backend import open_backend_array
37
from cubed.storage.zarr import lazy_zarr_array
48

9+
ZARR_PYTHON_V3 = zarr.__version__[0] == "3"
10+
511

612
def test_lazy_zarr_array(tmp_path):
713
zarr_path = tmp_path / "lazy.zarr"
@@ -14,3 +20,28 @@ def test_lazy_zarr_array(tmp_path):
1420
arr.create()
1521
assert zarr_path.exists()
1622
arr.open()
23+
24+
25+
@pytest.mark.skipif(
26+
ZARR_PYTHON_V3, reason="setting zarr compressor not yet possible for Zarr Python v3"
27+
)
28+
@pytest.mark.parametrize(
29+
"compressor",
30+
[None, {"id": "zstd"}, {"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}],
31+
)
32+
def test_compression(tmp_path, compressor):
33+
zarr_path = tmp_path / "lazy.zarr"
34+
35+
arr = lazy_zarr_array(
36+
zarr_path, shape=(3, 3), dtype=int, chunks=(2, 2), compressor=compressor
37+
)
38+
arr.create()
39+
40+
# open with zarr python (for zarr python v2 and tensorstore)
41+
with config.set({"storage_name": "zarr-python"}):
42+
z = open_backend_array(zarr_path, mode="r")
43+
44+
if compressor is None:
45+
assert z.compressor is None
46+
else:
47+
assert z.compressor == get_codec(compressor)

cubed/tests/test_core.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,17 @@ def test_default_spec_config_override():
349349
assert_array_equal(b.compute(), -np.ones((20000, 1000)))
350350

351351

352+
@pytest.mark.parametrize(
353+
"compressor",
354+
[None, {"id": "zstd"}, {"id": "blosc", "cname": "lz4", "clevel": 2, "shuffle": -1}],
355+
)
356+
def test_spec_compressor(tmp_path, compressor):
357+
spec = cubed.Spec(tmp_path, allowed_mem=100000, zarr_compressor=compressor)
358+
a = xp.ones((3, 3), chunks=(2, 2), spec=spec)
359+
b = xp.negative(a)
360+
assert_array_equal(b.compute(), -np.ones((3, 3)))
361+
362+
352363
def test_different_specs(tmp_path):
353364
spec1 = cubed.Spec(tmp_path, allowed_mem=100000)
354365
spec2 = cubed.Spec(tmp_path, allowed_mem=200000)

setup.cfg

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ ignore_missing_imports = True
5050
ignore_missing_imports = True
5151
[mypy-networkx.*]
5252
ignore_missing_imports = True
53+
[mypy-numcodecs.*]
54+
ignore_missing_imports = True
5355
[mypy-numpy.*]
5456
ignore_missing_imports = True
5557
[mypy-pandas.*]

0 commit comments

Comments
 (0)