-
-
Notifications
You must be signed in to change notification settings - Fork 348
Closed
Labels
bugPotential issues with the zarr-python libraryPotential issues with the zarr-python library
Milestone
Description
Zarr version
v3.0.0a0
Numcodecs version
v0.12.1
Python Version
3.12
Operating System
Linux
Installation
Using Poetry
Description
I have an array stored using Zarr v3 in a sharded format where the inner chunk size is 1. Reading past the first chunk results in an error show below in the MWE. If the chunk size is > 1 (e.g. k
), then the no errors occur for indices 0 through k - 1
, but the same error occurs when accessing index k
onwards.
Steps to reproduce
First, create a sharded store:
In [1]: import zarr
In [2]: import numpy as np
In [3]: store = zarr.store.LocalStore("./test.zarr", mode="w")
In [4]: from zarr.codecs import ShardingCodec
In [5]: arr = zarr.create(store=store,
...: shape=(1000, 1000),
...: chunk_shape=(20, 1000),
...: zarr_format=3,
...: codecs=[ShardingCodec(chunk_shape=(1, 1000))])
In [6]: arr[:] = np.ones((1000, 1000))
Now, attempt to open the store and read a single chunk at a time:
In [1]: import zarr
In [2]: store = zarr.store.LocalStore("./test.zarr", mode="r")
In [3]: arr = zarr.open_array(store=store, zarr_format=3)
In [4]: arr[0].shape
Out[4]: (1000,)
In [5]: arr[1].shape
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Cell In[5], line 1
----> 1 arr[1].shape
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:711, in Array.__getitem__(self, selection)
709 return self.get_orthogonal_selection(pure_selection, fields=fields)
710 else:
--> 711 return self.get_basic_selection(cast(BasicSelection, pure_selection), fields=fields)
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:733, in Array.get_basic_selection(self, selection, out, prototype, fields)
731 raise NotImplementedError
732 else:
--> 733 return sync(
734 self._async_array._get_selection(
735 BasicIndexer(selection, self.shape, self.metadata.chunk_grid),
736 out=out,
737 fields=fields,
738 prototype=prototype,
739 )
740 )
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/sync.py:92, in sync(coro, loop, timeout)
89 return_result = next(iter(finished)).result()
91 if isinstance(return_result, BaseException):
---> 92 raise return_result
93 else:
94 return return_result
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/sync.py:51, in _runner(coro)
46 """
47 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
48 exception, the exception will be returned.
49 """
50 try:
---> 51 return await coro
52 except Exception as ex:
53 return ex
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/array.py:447, in AsyncArray._get_selection(self, indexer, prototype, out, fields)
439 out_buffer = prototype.nd_buffer.create(
440 shape=indexer.shape,
441 dtype=out_dtype,
442 order=self.order,
443 fill_value=self.metadata.fill_value,
444 )
445 if product(indexer.shape) > 0:
446 # reading chunks and decoding them
--> 447 await self.metadata.codec_pipeline.read(
448 [
449 (
450 self.store_path / self.metadata.encode_chunk_key(chunk_coords),
451 self.metadata.get_chunk_spec(chunk_coords, self.order, prototype=prototype),
452 chunk_selection,
453 out_selection,
454 )
455 for chunk_coords, chunk_selection, out_selection in indexer
456 ],
457 out_buffer,
458 drop_axes=indexer.drop_axes,
459 )
460 return out_buffer.as_ndarray_like()
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:489, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
483 async def read(
484 self,
485 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
486 out: NDBuffer,
487 drop_axes: tuple[int, ...] = (),
488 ) -> None:
--> 489 await concurrent_map(
490 [
491 (single_batch_info, out, drop_axes)
492 for single_batch_info in batched(batch_info, self.batch_size)
493 ],
494 self.read_batch,
495 config.get("async.concurrency"),
496 )
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:298, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
291 async def read_batch(
292 self,
293 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
294 out: NDBuffer,
295 drop_axes: tuple[int, ...] = (),
296 ) -> None:
297 if self.supports_partial_decode:
--> 298 chunk_array_batch = await self.decode_partial_batch(
299 [
300 (byte_getter, chunk_selection, chunk_spec)
301 for byte_getter, chunk_spec, chunk_selection, _ in batch_info
302 ]
303 )
304 for chunk_array, (_, chunk_spec, _, out_selection) in zip(
305 chunk_array_batch, batch_info, strict=False
306 ):
307 if chunk_array is not None:
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:254, in BatchedCodecPipeline.decode_partial_batch(self, batch_info)
252 assert self.supports_partial_decode
253 assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialDecodeMixin)
--> 254 return await self.array_bytes_codec.decode_partial(batch_info)
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:182, in ArrayBytesCodecPartialDecodeMixin.decode_partial(self, batch_info)
162 async def decode_partial(
163 self,
164 batch_info: Iterable[tuple[ByteGetter, SelectorTuple, ArraySpec]],
165 ) -> Iterable[NDBuffer | None]:
166 """Partially decodes a batch of chunks.
167 This method determines parts of a chunk from the slice selection,
168 fetches these parts from the store (via ByteGetter) and decodes them.
(...)
180 Iterable[NDBuffer | None]
181 """
--> 182 return await concurrent_map(
183 [
184 (byte_getter, selection, chunk_spec)
185 for byte_getter, selection, chunk_spec in batch_info
186 ],
187 self._decode_partial_single,
188 config.get("async.concurrency"),
189 )
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/sharding.py:477, in ShardingCodec._decode_partial_single(self, byte_getter, selection, shard_spec)
474 shard_dict[chunk_coords] = chunk_bytes
476 # decoding chunks and writing them into the output buffer
--> 477 await self.codecs.read(
478 [
479 (
480 _ShardingByteGetter(shard_dict, chunk_coords),
481 chunk_spec,
482 chunk_selection,
483 out_selection,
484 )
485 for chunk_coords, chunk_selection, out_selection in indexer
486 ],
487 out,
488 )
489 return out
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:489, in BatchedCodecPipeline.read(self, batch_info, out, drop_axes)
483 async def read(
484 self,
485 batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]],
486 out: NDBuffer,
487 drop_axes: tuple[int, ...] = (),
488 ) -> None:
--> 489 await concurrent_map(
490 [
491 (single_batch_info, out, drop_axes)
492 for single_batch_info in batched(batch_info, self.batch_size)
493 ],
494 self.read_batch,
495 config.get("async.concurrency"),
496 )
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:320, in BatchedCodecPipeline.read_batch(self, batch_info, out, drop_axes)
311 else:
312 chunk_bytes_batch = await concurrent_map(
313 [
314 (byte_getter, array_spec.prototype)
(...)
318 config.get("async.concurrency"),
319 )
--> 320 chunk_array_batch = await self.decode_batch(
321 [
322 (chunk_bytes, chunk_spec)
323 for chunk_bytes, (_, chunk_spec, _, _) in zip(
324 chunk_bytes_batch, batch_info, strict=False
325 )
326 ],
327 )
328 for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
329 chunk_array_batch, batch_info, strict=False
330 ):
331 if chunk_array is not None:
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/pipeline.py:237, in BatchedCodecPipeline.decode_batch(self, chunk_bytes_and_specs)
232 chunk_bytes_batch = await bb_codec.decode(
233 zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
234 )
236 ab_codec, chunk_spec_batch = ab_codec_with_spec
--> 237 chunk_array_batch = await ab_codec.decode(
238 zip(chunk_bytes_batch, chunk_spec_batch, strict=False)
239 )
241 for aa_codec, chunk_spec_batch in aa_codecs_with_spec[::-1]:
242 chunk_array_batch = await aa_codec.decode(
243 zip(chunk_array_batch, chunk_spec_batch, strict=False)
244 )
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:107, in _Codec.decode(self, chunks_and_specs)
91 async def decode(
92 self,
93 chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]],
94 ) -> Iterable[CodecInput | None]:
95 """Decodes a batch of chunks.
96 Chunks can be None in which case they are ignored by the codec.
97
(...)
105 Iterable[CodecInput | None]
106 """
--> 107 return await batching_helper(self._decode_single, chunks_and_specs)
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:392, in batching_helper(func, batch_info)
388 async def batching_helper(
389 func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
390 batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
391 ) -> list[CodecOutput | None]:
--> 392 return await concurrent_map(
393 [(chunk_array, chunk_spec) for chunk_array, chunk_spec in batch_info],
394 noop_for_none(func),
395 config.get("async.concurrency"),
396 )
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/common.py:53, in concurrent_map(items, func, limit)
49 async def concurrent_map(
50 items: list[T], func: Callable[..., Awaitable[V]], limit: int | None = None
51 ) -> list[V]:
52 if limit is None:
---> 53 return await asyncio.gather(*[func(*item) for item in items])
55 else:
56 sem = asyncio.Semaphore(limit)
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/abc/codec.py:405, in noop_for_none.<locals>.wrap(chunk, chunk_spec)
403 if chunk is None:
404 return None
--> 405 return await func(chunk, chunk_spec)
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/codecs/bytes.py:89, in BytesCodec._decode_single(self, chunk_bytes, chunk_spec)
87 # ensure correct chunk shape
88 if chunk_array.shape != chunk_spec.shape:
---> 89 chunk_array = chunk_array.reshape(
90 chunk_spec.shape,
91 )
92 return chunk_array
File ~/micromamba/envs/learning-ddm/lib/python3.12/site-packages/zarr/buffer.py:375, in NDBuffer.reshape(self, newshape)
374 def reshape(self, newshape: ChunkCoords | Literal[-1]) -> Self:
--> 375 return self.__class__(self._data.reshape(newshape))
ValueError: cannot reshape array of size 2000 into shape (1,1000)
If we try to access arr[2]
then the error will try to reshape an array of size 3000. It seems that doing arr[i]
reads chunks from 0 through i
(inclusive) instead of a single chunk.
Additional output
No response
Metadata
Metadata
Assignees
Labels
bugPotential issues with the zarr-python libraryPotential issues with the zarr-python library