Skip to content

Commit 15ed8e5

Browse files
Automatic Dask-Zarr chunk alignment (#10336)
* First draft of the automatic chunk alignment * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Adding a new chunks file on the backends module, refactoring a little the logic used on the set_variables method, adding the docs of the align_chunks parameter * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix import error * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Modify the error of the chunks validations again to avoid problems with the test * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix the test_backends_chunks.py when dask is not present, remove a duplicated test added by mistake on test_backends.py and also make mypy happy again * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Change the error in case that non-grid chunks are used, change align_chunks to align_nd_chunks to keep it consistent with the terminology used, make mypy happy changing the new_shape to a tuple, added some simple tests for the other functions inside the chunks module * Improved the message of other errors and fixed the typos * Fix one test related to the change of the text of the value error * Add an entry on the whats-new.rst file * Improve the docs of the align_chunks parameter --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent d21d79e commit 15ed8e5

File tree

8 files changed

+511
-91
lines changed

8 files changed

+511
-91
lines changed

doc/whats-new.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ New Features
3030
(:issue:`10243`, :pull:`10293`)
3131
By `Benoit Bovy <https://github.com/benbovy>`_.
3232

33+
- Data corruption issues arising from misaligned Dask and Zarr chunks
34+
can now be prevented using the new ``align_chunks`` parameter in
35+
:py:meth:`~xarray.DataArray.to_zarr`. This option automatically rechunk
36+
the Dask array to align it with the Zarr storage chunks. For now, it is
37+
disabled by default, but this could change on the future.
38+
(:issue:`9914`, :pull:`10336`)
39+
By `Joseph Nowak <https://github.com/josephnowak>`_.
40+
41+
3342
Breaking changes
3443
~~~~~~~~~~~~~~~~
3544

xarray/backends/api.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2132,6 +2132,7 @@ def to_zarr(
21322132
append_dim: Hashable | None = None,
21332133
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
21342134
safe_chunks: bool = True,
2135+
align_chunks: bool = False,
21352136
storage_options: dict[str, str] | None = None,
21362137
zarr_version: int | None = None,
21372138
write_empty_chunks: bool | None = None,
@@ -2155,6 +2156,7 @@ def to_zarr(
21552156
append_dim: Hashable | None = None,
21562157
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
21572158
safe_chunks: bool = True,
2159+
align_chunks: bool = False,
21582160
storage_options: dict[str, str] | None = None,
21592161
zarr_version: int | None = None,
21602162
write_empty_chunks: bool | None = None,
@@ -2176,6 +2178,7 @@ def to_zarr(
21762178
append_dim: Hashable | None = None,
21772179
region: Mapping[str, slice | Literal["auto"]] | Literal["auto"] | None = None,
21782180
safe_chunks: bool = True,
2181+
align_chunks: bool = False,
21792182
storage_options: dict[str, str] | None = None,
21802183
zarr_version: int | None = None,
21812184
zarr_format: int | None = None,
@@ -2225,13 +2228,16 @@ def to_zarr(
22252228
append_dim=append_dim,
22262229
write_region=region,
22272230
safe_chunks=safe_chunks,
2231+
align_chunks=align_chunks,
22282232
zarr_version=zarr_version,
22292233
zarr_format=zarr_format,
22302234
write_empty=write_empty_chunks,
22312235
**kwargs,
22322236
)
22332237

2234-
dataset = zstore._validate_and_autodetect_region(dataset)
2238+
dataset = zstore._validate_and_autodetect_region(
2239+
dataset,
2240+
)
22352241
zstore._validate_encoding(encoding)
22362242

22372243
writer = ArrayWriter()

xarray/backends/chunks.py

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
import numpy as np
2+
3+
from xarray.core.datatree import Variable
4+
5+
6+
def align_nd_chunks(
7+
nd_var_chunks: tuple[tuple[int, ...], ...],
8+
nd_backend_chunks: tuple[tuple[int, ...], ...],
9+
) -> tuple[tuple[int, ...], ...]:
10+
if len(nd_backend_chunks) != len(nd_var_chunks):
11+
raise ValueError(
12+
"The number of dimensions on the backend and the variable must be the same."
13+
)
14+
15+
nd_aligned_chunks: list[tuple[int, ...]] = []
16+
for backend_chunks, var_chunks in zip(
17+
nd_backend_chunks, nd_var_chunks, strict=True
18+
):
19+
# Validate that they have the same number of elements
20+
if sum(backend_chunks) != sum(var_chunks):
21+
raise ValueError(
22+
"The number of elements in the backend does not "
23+
"match the number of elements in the variable. "
24+
"This inconsistency should never occur at this stage."
25+
)
26+
27+
# Validate if the backend_chunks satisfy the condition that all the values
28+
# excluding the borders are equal
29+
if len(set(backend_chunks[1:-1])) > 1:
30+
raise ValueError(
31+
f"This function currently supports aligning chunks "
32+
f"only when backend chunks are of uniform size, excluding borders. "
33+
f"If you encounter this error, please report it—this scenario should never occur "
34+
f"unless there is an internal misuse. "
35+
f"Backend chunks: {backend_chunks}"
36+
)
37+
38+
# The algorithm assumes that there are always two borders on the
39+
# Backend and the Array if not, the result is going to be the same
40+
# as the input, and there is nothing to optimize
41+
if len(backend_chunks) == 1:
42+
nd_aligned_chunks.append(backend_chunks)
43+
continue
44+
45+
if len(var_chunks) == 1:
46+
nd_aligned_chunks.append(var_chunks)
47+
continue
48+
49+
# Size of the chunk on the backend
50+
fixed_chunk = max(backend_chunks)
51+
52+
# The ideal size of the chunks is the maximum of the two; this would avoid
53+
# that we use more memory than expected
54+
max_chunk = max(fixed_chunk, max(var_chunks))
55+
56+
# The algorithm assumes that the chunks on this array are aligned except the last one
57+
# because it can be considered a partial one
58+
aligned_chunks: list[int] = []
59+
60+
# For simplicity of the algorithm, let's transform the Array chunks in such a way that
61+
# we remove the partial chunks. To achieve this, we add artificial data to the borders
62+
t_var_chunks = list(var_chunks)
63+
t_var_chunks[0] += fixed_chunk - backend_chunks[0]
64+
t_var_chunks[-1] += fixed_chunk - backend_chunks[-1]
65+
66+
# The unfilled_size is the amount of space that has not been filled on the last
67+
# processed chunk; this is equivalent to the amount of data that would need to be
68+
# added to a partial Zarr chunk to fill it up to the fixed_chunk size
69+
unfilled_size = 0
70+
71+
for var_chunk in t_var_chunks:
72+
# Ideally, we should try to preserve the original Dask chunks, but this is only
73+
# possible if the last processed chunk was aligned (unfilled_size == 0)
74+
ideal_chunk = var_chunk
75+
if unfilled_size:
76+
# If that scenario is not possible, the best option is to merge the chunks
77+
ideal_chunk = var_chunk + aligned_chunks[-1]
78+
79+
while ideal_chunk:
80+
if not unfilled_size:
81+
# If the previous chunk is filled, let's add a new chunk
82+
# of size 0 that will be used on the merging step to simplify the algorithm
83+
aligned_chunks.append(0)
84+
85+
if ideal_chunk > max_chunk:
86+
# If the ideal_chunk is bigger than the max_chunk,
87+
# we need to increase the last chunk as much as possible
88+
# but keeping it aligned, and then add a new chunk
89+
max_increase = max_chunk - aligned_chunks[-1]
90+
max_increase = (
91+
max_increase - (max_increase - unfilled_size) % fixed_chunk
92+
)
93+
aligned_chunks[-1] += max_increase
94+
else:
95+
# Perfect scenario where the chunks can be merged without any split.
96+
aligned_chunks[-1] = ideal_chunk
97+
98+
ideal_chunk -= aligned_chunks[-1]
99+
unfilled_size = (
100+
fixed_chunk - aligned_chunks[-1] % fixed_chunk
101+
) % fixed_chunk
102+
103+
# Now we have to remove the artificial data added to the borders
104+
for order in [-1, 1]:
105+
border_size = fixed_chunk - backend_chunks[::order][0]
106+
aligned_chunks = aligned_chunks[::order]
107+
aligned_chunks[0] -= border_size
108+
t_var_chunks = t_var_chunks[::order]
109+
t_var_chunks[0] -= border_size
110+
if (
111+
len(aligned_chunks) >= 2
112+
and aligned_chunks[0] + aligned_chunks[1] <= max_chunk
113+
and aligned_chunks[0] != t_var_chunks[0]
114+
):
115+
# The artificial data added to the border can introduce inefficient chunks
116+
# on the borders, for that reason, we will check if we can merge them or not
117+
# Example:
118+
# backend_chunks = [6, 6, 1]
119+
# var_chunks = [6, 7]
120+
# t_var_chunks = [6, 12]
121+
# The ideal output should preserve the same var_chunks, but the previous loop
122+
# is going to produce aligned_chunks = [6, 6, 6]
123+
# And after removing the artificial data, we will end up with aligned_chunks = [6, 6, 1]
124+
# which is not ideal and can be merged into a single chunk
125+
aligned_chunks[1] += aligned_chunks[0]
126+
aligned_chunks = aligned_chunks[1:]
127+
128+
t_var_chunks = t_var_chunks[::order]
129+
aligned_chunks = aligned_chunks[::order]
130+
131+
nd_aligned_chunks.append(tuple(aligned_chunks))
132+
133+
return tuple(nd_aligned_chunks)
134+
135+
136+
def build_grid_chunks(
137+
size: int,
138+
chunk_size: int,
139+
region: slice | None = None,
140+
) -> tuple[int, ...]:
141+
if region is None:
142+
region = slice(0, size)
143+
144+
region_start = region.start if region.start else 0
145+
# Generate the zarr chunks inside the region of this dim
146+
chunks_on_region = [chunk_size - (region_start % chunk_size)]
147+
chunks_on_region.extend([chunk_size] * ((size - chunks_on_region[0]) // chunk_size))
148+
if (size - chunks_on_region[0]) % chunk_size != 0:
149+
chunks_on_region.append((size - chunks_on_region[0]) % chunk_size)
150+
return tuple(chunks_on_region)
151+
152+
153+
def grid_rechunk(
154+
v: Variable,
155+
enc_chunks: tuple[int, ...],
156+
region: tuple[slice, ...],
157+
) -> Variable:
158+
nd_var_chunks = v.chunks
159+
if not nd_var_chunks:
160+
return v
161+
162+
nd_grid_chunks = tuple(
163+
build_grid_chunks(
164+
sum(var_chunks),
165+
region=interval,
166+
chunk_size=chunk_size,
167+
)
168+
for var_chunks, chunk_size, interval in zip(
169+
nd_var_chunks, enc_chunks, region, strict=True
170+
)
171+
)
172+
173+
nd_aligned_chunks = align_nd_chunks(
174+
nd_var_chunks=nd_var_chunks,
175+
nd_backend_chunks=nd_grid_chunks,
176+
)
177+
v = v.chunk(dict(zip(v.dims, nd_aligned_chunks, strict=True)))
178+
return v
179+
180+
181+
def validate_grid_chunks_alignment(
182+
nd_var_chunks: tuple[tuple[int, ...], ...] | None,
183+
enc_chunks: tuple[int, ...],
184+
backend_shape: tuple[int, ...],
185+
region: tuple[slice, ...],
186+
allow_partial_chunks: bool,
187+
name: str,
188+
):
189+
if nd_var_chunks is None:
190+
return
191+
base_error = (
192+
"Specified Zarr chunks encoding['chunks']={enc_chunks!r} for "
193+
"variable named {name!r} would overlap multiple Dask chunks. "
194+
"Check the chunk at position {var_chunk_pos}, which has a size of "
195+
"{var_chunk_size} on dimension {dim_i}. It is unaligned with "
196+
"backend chunks of size {chunk_size} in region {region}. "
197+
"Writing this array in parallel with Dask could lead to corrupted data. "
198+
"To resolve this issue, consider one of the following options: "
199+
"- Rechunk the array using `chunk()`. "
200+
"- Modify or delete `encoding['chunks']`. "
201+
"- Set `safe_chunks=False`. "
202+
"- Enable automatic chunks alignment with `align_chunks=True`."
203+
)
204+
205+
for dim_i, chunk_size, var_chunks, interval, size in zip(
206+
range(len(enc_chunks)),
207+
enc_chunks,
208+
nd_var_chunks,
209+
region,
210+
backend_shape,
211+
strict=True,
212+
):
213+
for i, chunk in enumerate(var_chunks[1:-1]):
214+
if chunk % chunk_size:
215+
raise ValueError(
216+
base_error.format(
217+
var_chunk_pos=i + 1,
218+
var_chunk_size=chunk,
219+
name=name,
220+
dim_i=dim_i,
221+
chunk_size=chunk_size,
222+
region=interval,
223+
enc_chunks=enc_chunks,
224+
)
225+
)
226+
227+
interval_start = interval.start if interval.start else 0
228+
229+
if len(var_chunks) > 1:
230+
# The first border size is the amount of data that needs to be updated on the
231+
# first chunk taking into account the region slice.
232+
first_border_size = chunk_size
233+
if allow_partial_chunks:
234+
first_border_size = chunk_size - interval_start % chunk_size
235+
236+
if (var_chunks[0] - first_border_size) % chunk_size:
237+
raise ValueError(
238+
base_error.format(
239+
var_chunk_pos=0,
240+
var_chunk_size=var_chunks[0],
241+
name=name,
242+
dim_i=dim_i,
243+
chunk_size=chunk_size,
244+
region=interval,
245+
enc_chunks=enc_chunks,
246+
)
247+
)
248+
249+
if not allow_partial_chunks:
250+
region_stop = interval.stop if interval.stop else size
251+
252+
error_on_last_chunk = base_error.format(
253+
var_chunk_pos=len(var_chunks) - 1,
254+
var_chunk_size=var_chunks[-1],
255+
name=name,
256+
dim_i=dim_i,
257+
chunk_size=chunk_size,
258+
region=interval,
259+
enc_chunks=enc_chunks,
260+
)
261+
if interval_start % chunk_size:
262+
# The last chunk which can also be the only one is a partial chunk
263+
# if it is not aligned at the beginning
264+
raise ValueError(error_on_last_chunk)
265+
266+
if np.ceil(region_stop / chunk_size) == np.ceil(size / chunk_size):
267+
# If the region is covering the last chunk then check
268+
# if the reminder with the default chunk size
269+
# is equal to the size of the last chunk
270+
if var_chunks[-1] % chunk_size != size % chunk_size:
271+
raise ValueError(error_on_last_chunk)
272+
elif var_chunks[-1] % chunk_size:
273+
raise ValueError(error_on_last_chunk)

0 commit comments

Comments
 (0)