diff --git a/changes/3169.bugfix.rst b/changes/3169.bugfix.rst new file mode 100644 index 0000000000..74a96c1bd4 --- /dev/null +++ b/changes/3169.bugfix.rst @@ -0,0 +1 @@ +Fix race condition when passing array data in ``create_array(data=..)`` for an array that has a set shard size. diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 312dc0bc4d..fb0afcf2c0 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -1178,6 +1178,18 @@ def cdata_shape(self) -> ChunkCoords: """ return tuple(starmap(ceildiv, zip(self.shape, self.chunks, strict=False))) + @property + def _shard_data_shape(self) -> ChunkCoords: + """ + The shape of the shard grid for this array. + + Returns + ------- + Tuple[int] + The shape of the chunk grid for this array. + """ + return tuple(starmap(ceildiv, zip(self.shape, self.shards or self.chunks, strict=False))) + @property def nchunks(self) -> int: """ @@ -1220,7 +1232,11 @@ async def nbytes_stored(self) -> int: return await self.store_path.store.getsize_prefix(self.store_path.path) def _iter_chunk_coords( - self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None + self, + *, + iter_shards: bool = False, + origin: Sequence[int] | None = None, + selection_shape: Sequence[int] | None = None, ) -> Iterator[ChunkCoords]: """ Create an iterator over the coordinates of chunks in chunk grid space. If the `origin` @@ -1232,6 +1248,8 @@ def _iter_chunk_coords( Parameters ---------- + iter_shards : bool, default=False + Whether to iterate by shard (if True) or by chunk (if False). origin : Sequence[int] | None, default=None The origin of the selection relative to the array's chunk grid. selection_shape : Sequence[int] | None, default=None @@ -1242,7 +1260,11 @@ def _iter_chunk_coords( chunk_coords: ChunkCoords The coordinates of each chunk in the selection. """ - return _iter_grid(self.cdata_shape, origin=origin, selection_shape=selection_shape) + return _iter_grid( + self._shard_data_shape if iter_shards else self.cdata_shape, + origin=origin, + selection_shape=selection_shape, + ) def _iter_chunk_keys( self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None @@ -1269,13 +1291,19 @@ def _iter_chunk_keys( yield self.metadata.encode_chunk_key(k) def _iter_chunk_regions( - self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None + self, + *, + iter_shards: bool = False, + origin: Sequence[int] | None = None, + selection_shape: Sequence[int] | None = None, ) -> Iterator[tuple[slice, ...]]: """ Iterate over the regions spanned by each chunk. Parameters ---------- + iter_shards : bool, default=False + Whether to iterate by shard (if True) or by chunk (if False). origin : Sequence[int] | None, default=None The origin of the selection relative to the array's chunk grid. selection_shape : Sequence[int] | None, default=None @@ -1286,11 +1314,12 @@ def _iter_chunk_regions( region: tuple[slice, ...] A tuple of slice objects representing the region spanned by each chunk in the selection. """ + region_size = (self.shards or self.chunks) if iter_shards else self.chunks for cgrid_position in self._iter_chunk_coords( - origin=origin, selection_shape=selection_shape + iter_shards=iter_shards, origin=origin, selection_shape=selection_shape ): out: tuple[slice, ...] = () - for c_pos, c_shape in zip(cgrid_position, self.chunks, strict=False): + for c_pos, c_shape in zip(cgrid_position, region_size, strict=False): start = c_pos * c_shape stop = start + c_shape out += (slice(start, stop, 1),) @@ -2188,6 +2217,13 @@ def cdata_shape(self) -> ChunkCoords: """ return tuple(starmap(ceildiv, zip(self.shape, self.chunks, strict=False))) + @property + def _shard_data_shape(self) -> ChunkCoords: + """ + The shape of the shard grid for this array. + """ + return tuple(starmap(ceildiv, zip(self.shape, self.shards or self.chunks, strict=False))) + @property def nchunks(self) -> int: """ @@ -2275,7 +2311,10 @@ def nbytes_stored(self) -> int: return sync(self._async_array.nbytes_stored()) def _iter_chunk_keys( - self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None + self, + *, + origin: Sequence[int] | None = None, + selection_shape: Sequence[int] | None = None, ) -> Iterator[str]: """ Iterate over the storage keys of each chunk, relative to an optional origin, and optionally @@ -2298,13 +2337,19 @@ def _iter_chunk_keys( ) def _iter_chunk_regions( - self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None + self, + *, + iter_shards: bool = False, + origin: Sequence[int] | None = None, + selection_shape: Sequence[int] | None = None, ) -> Iterator[tuple[slice, ...]]: """ Iterate over the regions spanned by each chunk. Parameters ---------- + iter_shards : bool, default=False + Whether to iterate by shard (if True) or by chunk (if False). origin : Sequence[int] | None, default=None The origin of the selection relative to the array's chunk grid. selection_shape : Sequence[int] | None, default=None @@ -2316,7 +2361,7 @@ def _iter_chunk_regions( A tuple of slice objects representing the region spanned by each chunk in the selection. """ yield from self._async_array._iter_chunk_regions( - origin=origin, selection_shape=selection_shape + iter_shards=iter_shards, origin=origin, selection_shape=selection_shape ) def __array__( @@ -4104,7 +4149,7 @@ async def _copy_array_region(chunk_coords: ChunkCoords | slice, _data: Array) -> # Stream data from the source array to the new array await concurrent_map( - [(region, data) for region in result._iter_chunk_regions()], + [(region, data) for region in result._iter_chunk_regions(iter_shards=True)], _copy_array_region, zarr.core.config.config.get("async.concurrency"), ) @@ -4115,7 +4160,7 @@ async def _copy_arraylike_region(chunk_coords: slice, _data: NDArrayLike) -> Non # Stream data from the source array to the new array await concurrent_map( - [(region, data) for region in result._iter_chunk_regions()], + [(region, data) for region in result._iter_chunk_regions(iter_shards=True)], _copy_arraylike_region, zarr.core.config.config.get("async.concurrency"), ) diff --git a/tests/test_array.py b/tests/test_array.py index 0bca860e84..344daac7d6 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -1681,6 +1681,30 @@ async def test_from_array( assert result.chunks == new_chunks +@pytest.mark.parametrize("store", ["memory"], indirect=True) +@pytest.mark.parametrize("chunks", [(10, 10)]) +@pytest.mark.parametrize("shards", [(60, 60)]) +async def test_from_array_shards( + store: Store, + zarr_format: ZarrFormat, + chunks: tuple[int, ...], + shards: tuple[int, ...], +) -> None: + # Regression test for https://github.com/zarr-developers/zarr-python/issues/3169 + source_data = np.arange(3600).reshape((60, 60)) + + zarr.create_array( + store=store, + data=source_data, + chunks=chunks, + shards=shards, + ) + + array = zarr.open_array(store=store) + + assert np.array_equal(array[:], source_data) + + @pytest.mark.parametrize("store", ["local"], indirect=True) @pytest.mark.parametrize("chunks", ["keep", "auto"]) @pytest.mark.parametrize("write_data", [True, False])