Skip to content

Commit 4f94be1

Browse files
committed
Enhanced Metadata Retrieval
- Introduced `StorePath.get_many` to retrieve multiple metadata files in a single call, optimizing the retrieval process and reducing overhead based on concurrency safety. `get_many` is used in `get_array_metadata`. - Introduced `FsspecStore._get_many` to retrieve multiple metadata files in a single call. `_get_many` is used in `_read_metadata_v2`. - Introduced `StorePath._is_concurrency_save` to determine if the file system supports concurrent access. - Updated logic to conditionally use `asyncio.gather` based on the concurrency safety of the underlying file system, enhancing compatibility with synchronous file systems by avoiding deadlocks when accessing metadata concurrently.
1 parent 9d97b24 commit 4f94be1

File tree

5 files changed

+74
-19
lines changed

5 files changed

+74
-19
lines changed

changes/3207.bugfix.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
- Introduced ``StorePath.get_many`` to retrieve multiple metadata files in a single
2+
call, optimizing the retrieval process and reducing overhead based on
3+
concurrency safety. ``get_many`` is used in ``get_array_metadata``.
4+
- Introduced ``FsspecStore._get_many`` to retrieve multiple metadata files in a
5+
single call. ``_get_many`` is used in ``_read_metadata_v2``.
6+
- Introduced ``StorePath._is_concurrency_save`` to determine if the file system
7+
supports concurrent access.
8+
- Updated logic to conditionally use ``asyncio.gather`` based on the concurrency
9+
safety of the underlying file system, enhancing compatibility with
10+
synchronous file systems by avoiding deadlocks when accessing metadata
11+
concurrently.

src/zarr/core/array.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,10 @@ async def get_array_metadata(
200200
store_path: StorePath, zarr_format: ZarrFormat | None = 3
201201
) -> dict[str, JSON]:
202202
if zarr_format == 2:
203-
zarray_bytes, zattrs_bytes = await gather(
204-
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
205-
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
203+
zarray_bytes, zattrs_bytes = await store_path.get_many(
204+
ZARRAY_JSON,
205+
ZATTRS_JSON,
206+
prototype=cpu_buffer_prototype,
206207
)
207208
if zarray_bytes is None:
208209
raise FileNotFoundError(store_path)
@@ -211,10 +212,11 @@ async def get_array_metadata(
211212
if zarr_json_bytes is None:
212213
raise FileNotFoundError(store_path)
213214
elif zarr_format is None:
214-
zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather(
215-
(store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype),
216-
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
217-
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
215+
zarr_json_bytes, zarray_bytes, zattrs_bytes = await store_path.get_many(
216+
ZARR_JSON,
217+
ZARRAY_JSON,
218+
ZATTRS_JSON,
219+
prototype=cpu_buffer_prototype,
218220
)
219221
if zarr_json_bytes is not None and zarray_bytes is not None:
220222
# warn and favor v3
@@ -1429,7 +1431,6 @@ async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = F
14291431
).items()
14301432
]
14311433
)
1432-
14331434
await gather(*awaitables)
14341435

14351436
async def _set_selection(

src/zarr/core/group.py

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -540,11 +540,11 @@ async def open(
540540
zgroup_bytes,
541541
zattrs_bytes,
542542
maybe_consolidated_metadata_bytes,
543-
) = await asyncio.gather(
544-
(store_path / ZARR_JSON).get(),
545-
(store_path / ZGROUP_JSON).get(),
546-
(store_path / ZATTRS_JSON).get(),
547-
(store_path / str(consolidated_key)).get(),
543+
) = await store_path.get_many(
544+
ZARR_JSON,
545+
ZGROUP_JSON,
546+
ZATTRS_JSON,
547+
consolidated_key,
548548
)
549549
if zarr_json_bytes is not None and zgroup_bytes is not None:
550550
# warn and favor v3
@@ -3476,11 +3476,20 @@ async def _read_metadata_v2(store: Store, path: str) -> ArrayV2Metadata | GroupM
34763476
"""
34773477
# TODO: consider first fetching array metadata, and only fetching group metadata when we don't
34783478
# find an array
3479-
zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
3480-
store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()),
3481-
store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()),
3482-
store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()),
3483-
)
3479+
requests = [
3480+
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
3481+
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
3482+
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
3483+
]
3484+
3485+
# Use the _get_many method to retrieve the data
3486+
results = {}
3487+
async for key, buffer in store._get_many(requests):
3488+
results[key] = buffer
3489+
3490+
zarray_bytes = results.get(_join_paths([path, ZARRAY_JSON]))
3491+
zgroup_bytes = results.get(_join_paths([path, ZGROUP_JSON]))
3492+
zattrs_bytes = results.get(_join_paths([path, ZATTRS_JSON]))
34843493

34853494
if zattrs_bytes is None:
34863495
zattrs = {}

src/zarr/storage/_common.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import importlib.util
44
import json
5+
from asyncio import gather
56
from pathlib import Path
67
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias
78

@@ -163,6 +164,24 @@ async def get(
163164
prototype = default_buffer_prototype()
164165
return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
165166

167+
async def get_many(
168+
self,
169+
*suffixes : str,
170+
prototype: BufferPrototype | None = None,
171+
byte_range: ByteRequest | None = None,
172+
):
173+
tasks = [
174+
(self / suffix).get(prototype=prototype, byte_range=byte_range) for suffix in suffixes
175+
]
176+
if await self._is_concurrency_save():
177+
return await gather(*tasks)
178+
else:
179+
results = []
180+
for task in tasks:
181+
result = await task
182+
results.append(result)
183+
return results
184+
166185
async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None:
167186
"""
168187
Write bytes to the store.
@@ -263,6 +282,10 @@ def __eq__(self, other: object) -> bool:
263282
pass
264283
return False
265284

285+
async def _is_concurrency_save(self):
286+
fs = getattr(self.store, "fs", None)
287+
return getattr(fs, "asynchronous", True)
288+
266289

267290
StoreLike: TypeAlias = Store | StorePath | FSMap | Path | str | dict[str, Buffer]
268291

src/zarr/storage/_fsspec.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from zarr.storage._common import _dereference_path
1919

2020
if TYPE_CHECKING:
21-
from collections.abc import AsyncIterator, Iterable
21+
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
2222

2323
from fsspec import AbstractFileSystem
2424
from fsspec.asyn import AsyncFileSystem
@@ -326,6 +326,17 @@ async def get(
326326
else:
327327
return value
328328

329+
async def _get_many(
330+
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
331+
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
332+
if getattr(self.fs, "asynchronous", True):
333+
async for result in super()._get_many(requests=requests):
334+
yield result
335+
else:
336+
for key, prototype, byte_range in requests:
337+
value = await self.get(key, prototype, byte_range)
338+
yield (key, value)
339+
329340
async def set(
330341
self,
331342
key: str,

0 commit comments

Comments
 (0)