Skip to content

Commit 7f72217

Browse files
committed
Enhanced Metadata Retrieval
- Introduced `Store.get_many_ordered` and `StorePath.get_many_ordered` to retrieve multiple metadata files in a single call, optimizing the retrieval process and reducing overhead. `get_many_ordered` is used in `get_array_metadata`. - Modified `FsspecStore._get_many` to conditionally use `asyncio.gather` based on the concurrency safety of the underlying file system, enhancing compatibility with synchronous file systems by avoiding deadlocks when accessing metadata concurrently. - Modified `_read_metadata_v2` to use `Store._get_many`
1 parent ea4d7e9 commit 7f72217

File tree

7 files changed

+108
-19
lines changed

7 files changed

+108
-19
lines changed

changes/3207.bugfix.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
- Introduced ``Store.get_many_ordered`` and ``StorePath.get_many_ordered`` to
2+
retrieve multiple metadata files in a single call, optimizing the retrieval
3+
process and reducing overhead. ``get_many_ordered`` is used in
4+
``get_array_metadata``.
5+
- Modified ``FsspecStore._get_many`` to conditionally use ``asyncio.gather``
6+
based on the concurrency safety of the underlying file system, enhancing
7+
compatibility with synchronous file systems by avoiding deadlocks when
8+
accessing metadata concurrently.
9+
- Modified ``_read_metadata_v2`` to use ``Store._get_many``

src/zarr/abc/store.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from zarr.core.config import config
1212

1313
if TYPE_CHECKING:
14-
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
14+
from collections.abc import AsyncGenerator, AsyncIterator, Iterable, Sequence
1515
from types import TracebackType
1616
from typing import Any, Self, TypeAlias
1717

@@ -417,6 +417,18 @@ async def _get_many(
417417
for req in requests:
418418
yield (req[0], await self.get(*req))
419419

420+
async def _get_many_ordered(
421+
self, requests: Sequence[tuple[str, BufferPrototype, ByteRequest | None]]
422+
) -> tuple[Buffer | None, ...]:
423+
"""
424+
Retrieve a collection of objects from storage in the order they were requested.
425+
"""
426+
key_to_index = {req[0]: i for i, req in enumerate(requests)}
427+
results: list[Buffer | None] = [None] * len(requests)
428+
async for key, value in self._get_many(requests):
429+
results[key_to_index[key]] = value
430+
return tuple(results)
431+
420432
async def getsize(self, key: str) -> int:
421433
"""
422434
Return the size, in bytes, of a value in a Store.

src/zarr/core/array.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,10 @@ async def get_array_metadata(
205205
store_path: StorePath, zarr_format: ZarrFormat | None = 3
206206
) -> dict[str, JSON]:
207207
if zarr_format == 2:
208-
zarray_bytes, zattrs_bytes = await gather(
209-
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
210-
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
208+
zarray_bytes, zattrs_bytes = await store_path.get_many_ordered(
209+
ZARRAY_JSON,
210+
ZATTRS_JSON,
211+
prototype=cpu_buffer_prototype,
211212
)
212213
if zarray_bytes is None:
213214
raise FileNotFoundError(store_path)
@@ -216,10 +217,11 @@ async def get_array_metadata(
216217
if zarr_json_bytes is None:
217218
raise FileNotFoundError(store_path)
218219
elif zarr_format is None:
219-
zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather(
220-
(store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype),
221-
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
222-
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
220+
zarr_json_bytes, zarray_bytes, zattrs_bytes = await store_path.get_many_ordered(
221+
ZARR_JSON,
222+
ZARRAY_JSON,
223+
ZATTRS_JSON,
224+
prototype=cpu_buffer_prototype,
223225
)
224226
if zarr_json_bytes is not None and zarray_bytes is not None:
225227
# warn and favor v3
@@ -1436,7 +1438,6 @@ async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = F
14361438
).items()
14371439
]
14381440
)
1439-
14401441
await gather(*awaitables)
14411442

14421443
async def _set_selection(

src/zarr/core/group.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -540,11 +540,11 @@ async def open(
540540
zgroup_bytes,
541541
zattrs_bytes,
542542
maybe_consolidated_metadata_bytes,
543-
) = await asyncio.gather(
544-
(store_path / ZARR_JSON).get(),
545-
(store_path / ZGROUP_JSON).get(),
546-
(store_path / ZATTRS_JSON).get(),
547-
(store_path / str(consolidated_key)).get(),
543+
) = await store_path.get_many_ordered(
544+
ZARR_JSON,
545+
ZGROUP_JSON,
546+
ZATTRS_JSON,
547+
consolidated_key,
548548
)
549549
if zarr_json_bytes is not None and zgroup_bytes is not None:
550550
# warn and favor v3
@@ -3476,10 +3476,12 @@ async def _read_metadata_v2(store: Store, path: str) -> ArrayV2Metadata | GroupM
34763476
"""
34773477
# TODO: consider first fetching array metadata, and only fetching group metadata when we don't
34783478
# find an array
3479-
zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
3480-
store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()),
3481-
store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()),
3482-
store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()),
3479+
zarray_bytes, zgroup_bytes, zattrs_bytes = await store._get_many_ordered(
3480+
[
3481+
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
3482+
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
3483+
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
3484+
]
34833485
)
34843486

34853487
if zattrs_bytes is None:

src/zarr/storage/_common.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import importlib.util
44
import json
5+
from asyncio import gather
56
from pathlib import Path
67
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias
78

@@ -163,6 +164,35 @@ async def get(
163164
prototype = default_buffer_prototype()
164165
return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
165166

167+
async def get_many_ordered(
168+
self,
169+
*path_components: str,
170+
prototype: BufferPrototype | None = None,
171+
byte_range: ByteRequest | None = None,
172+
) -> tuple[Buffer | None, ...]:
173+
"""
174+
Read multiple bytes from the store in order of the provided path_components.
175+
176+
Parameters
177+
----------
178+
path_components : str
179+
Components to append to the store path.
180+
prototype : BufferPrototype, optional
181+
The buffer prototype to use when reading the bytes.
182+
byte_range : ByteRequest, optional
183+
The range of bytes to read.
184+
185+
Returns
186+
-------
187+
tuple[Buffer | None, ...]
188+
A tuple of buffers read from the store, in the order of the provided path_components.
189+
"""
190+
if prototype is None:
191+
prototype = default_buffer_prototype()
192+
193+
tasks = [(self / component).path for component in path_components]
194+
return await self.store._get_many_ordered([(task, prototype, byte_range) for task in tasks])
195+
166196
async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None:
167197
"""
168198
Write bytes to the store.

src/zarr/storage/_fsspec.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from zarr.storage._common import _dereference_path
1919

2020
if TYPE_CHECKING:
21-
from collections.abc import AsyncIterator, Iterable
21+
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
2222

2323
from fsspec import AbstractFileSystem
2424
from fsspec.asyn import AsyncFileSystem
@@ -326,6 +326,17 @@ async def get(
326326
else:
327327
return value
328328

329+
async def _get_many(
330+
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
331+
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
332+
if getattr(self.fs, "asynchronous", True):
333+
async for result in super()._get_many(requests=requests):
334+
yield result
335+
else:
336+
for key, prototype, byte_range in requests:
337+
value = await self.get(key, prototype, byte_range)
338+
yield (key, value)
339+
329340
async def set(
330341
self,
331342
key: str,

src/zarr/testing/store.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,30 @@ async def test_get_many(self, store: S) -> None:
265265
expected_kvs = sorted(((k, b) for k, b in zip(keys, values, strict=False)))
266266
assert observed_kvs == expected_kvs
267267

268+
async def test_get_many_ordered(self, store: S) -> None:
269+
"""
270+
Ensure that multiple keys can be retrieved at once with the _get_many method,
271+
preserving the order of keys.
272+
"""
273+
keys = tuple(map(str, range(10)))
274+
values = tuple(f"{k}".encode() for k in keys)
275+
276+
for k, v in zip(keys, values, strict=False):
277+
await self.set(store, k, self.buffer_cls.from_bytes(v))
278+
279+
observed_buffers = await store._get_many_ordered(
280+
tuple(
281+
zip(
282+
keys,
283+
(default_buffer_prototype(),) * len(keys),
284+
(None,) * len(keys),
285+
strict=False,
286+
)
287+
)
288+
)
289+
observed_bytes = tuple(b.to_bytes() for b in observed_buffers if b is not None)
290+
assert observed_bytes == values, f"Expected {values}, got {observed_bytes}"
291+
268292
@pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"])
269293
@pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""])
270294
async def test_getsize(self, store: S, key: str, data: bytes) -> None:

0 commit comments

Comments
 (0)