Skip to content

Commit 37d112e

Browse files
committed
Enhanced Metadata Retrieval
- This pull request resolves the issue of deadlocks and indefinite hangs when opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a fallback to sequential reads for non-concurrency-safe filesystems, ensuring robust metadata retrieval without sacrificing performance for safe filesystems. Furthermore `Store.get_many` was modified to retrieve objects concurrently from storage. The previous implementation was sequential, awaiting each `self.get(*req)` before proceeding, contrary to the docstring. - Introduced `Store.get_many_ordered` and `StorePath.get_many_ordered` to retrieve multiple metadata files in a single call, optimizing the retrieval process and reducing overhead. `StorePath.get_many_ordered` is used in `get_array_metadata`. `Store._get_many_ordered` is used in `_read_metadata_v2`. - Modified `FsspecStore._get_many` and `FsspecStore._get_many_ordered` to conditionally use `asyncio.gather` based on the concurrency safety of the underlying file system, enhancing compatibility with synchronous file systems by avoiding deadlocks when accessing metadata concurrently. Adding tests `LockableFileSystem` to test with async/sync behavior.
1 parent 8405073 commit 37d112e

File tree

8 files changed

+261
-21
lines changed

8 files changed

+261
-21
lines changed

changes/3207.bugfix.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
- This pull request resolves the issue of deadlocks and indefinite hangs when
2+
opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a
3+
fallback to sequential reads for non-concurrency-safe filesystems, ensuring
4+
robust metadata retrieval without sacrificing performance for safe
5+
filesystems. Furthermore ``Store.get_many`` was modified to retrieve objects
6+
concurrently from storage. The previous implementation was sequential,
7+
awaiting each ``self.get(*req)`` before proceeding, contrary to the docstring.
8+
- Introduced ``Store.get_many_ordered`` and ``StorePath.get_many_ordered`` to
9+
retrieve multiple metadata files in a single call, optimizing the retrieval
10+
process and reducing overhead. ``StorePath.get_many_ordered`` is used in
11+
``get_array_metadata``. ``Store._get_many_ordered`` is used in
12+
``_read_metadata_v2``.
13+
- Modified ``FsspecStore._get_many`` and ``FsspecStore._get_many_ordered``
14+
to conditionally use ``asyncio.gather`` based on the concurrency safety
15+
of the underlying file system, enhancing compatibility with
16+
synchronous file systems by avoiding deadlocks when accessing metadata
17+
concurrently. Adding tests ``LockableFileSystem`` to test with
18+
async/sync behavior.

src/zarr/abc/store.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
from abc import ABC, abstractmethod
4-
from asyncio import gather
4+
from asyncio import as_completed, gather
55
from dataclasses import dataclass
66
from itertools import starmap
77
from typing import TYPE_CHECKING, Protocol, runtime_checkable
@@ -414,8 +414,27 @@ async def _get_many(
414414
that objects will be retrieved in the order in which they were requested, so this method
415415
yields tuple[str, Buffer | None] instead of just Buffer | None
416416
"""
417-
for req in requests:
418-
yield (req[0], await self.get(*req))
417+
418+
async def _get_with_name(
419+
key: str, prototype: BufferPrototype, byte_range: ByteRequest | None
420+
) -> tuple[str, Buffer | None]:
421+
value = await self.get(key, prototype, byte_range)
422+
return key, value
423+
424+
tasks = [_get_with_name(*req) for req in requests]
425+
for completed in as_completed(tasks):
426+
task = await completed
427+
yield task
428+
429+
async def _get_many_ordered(
430+
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
431+
) -> tuple[Buffer | None, ...]:
432+
"""
433+
Retrieve a collection of objects from storage in the order they were requested.
434+
"""
435+
tasks = [self.get(*req) for req in requests]
436+
437+
return tuple(await gather(*tasks))
419438

420439
async def getsize(self, key: str) -> int:
421440
"""

src/zarr/core/array.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,10 @@ async def get_array_metadata(
212212
store_path: StorePath, zarr_format: ZarrFormat | None = 3
213213
) -> dict[str, JSON]:
214214
if zarr_format == 2:
215-
zarray_bytes, zattrs_bytes = await gather(
216-
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
217-
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
215+
zarray_bytes, zattrs_bytes = await store_path.get_many_ordered(
216+
ZARRAY_JSON,
217+
ZATTRS_JSON,
218+
prototype=cpu_buffer_prototype,
218219
)
219220
if zarray_bytes is None:
220221
raise FileNotFoundError(store_path)
@@ -223,10 +224,11 @@ async def get_array_metadata(
223224
if zarr_json_bytes is None:
224225
raise FileNotFoundError(store_path)
225226
elif zarr_format is None:
226-
zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather(
227-
(store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype),
228-
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
229-
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
227+
zarr_json_bytes, zarray_bytes, zattrs_bytes = await store_path.get_many_ordered(
228+
ZARR_JSON,
229+
ZARRAY_JSON,
230+
ZATTRS_JSON,
231+
prototype=cpu_buffer_prototype,
230232
)
231233
if zarr_json_bytes is not None and zarray_bytes is not None:
232234
# warn and favor v3
@@ -1446,7 +1448,6 @@ async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = F
14461448
).items()
14471449
]
14481450
)
1449-
14501451
await gather(*awaitables)
14511452

14521453
async def _set_selection(

src/zarr/core/group.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -539,11 +539,11 @@ async def open(
539539
zgroup_bytes,
540540
zattrs_bytes,
541541
maybe_consolidated_metadata_bytes,
542-
) = await asyncio.gather(
543-
(store_path / ZARR_JSON).get(),
544-
(store_path / ZGROUP_JSON).get(),
545-
(store_path / ZATTRS_JSON).get(),
546-
(store_path / str(consolidated_key)).get(),
542+
) = await store_path.get_many_ordered(
543+
ZARR_JSON,
544+
ZGROUP_JSON,
545+
ZATTRS_JSON,
546+
consolidated_key,
547547
)
548548
if zarr_json_bytes is not None and zgroup_bytes is not None:
549549
# warn and favor v3
@@ -3465,10 +3465,12 @@ async def _read_metadata_v2(store: Store, path: str) -> ArrayV2Metadata | GroupM
34653465
"""
34663466
# TODO: consider first fetching array metadata, and only fetching group metadata when we don't
34673467
# find an array
3468-
zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
3469-
store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()),
3470-
store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()),
3471-
store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()),
3468+
zarray_bytes, zgroup_bytes, zattrs_bytes = await store._get_many_ordered(
3469+
[
3470+
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
3471+
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
3472+
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
3473+
]
34723474
)
34733475

34743476
if zattrs_bytes is None:

src/zarr/storage/_common.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import importlib.util
44
import json
5+
from asyncio import gather
56
from pathlib import Path
67
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias
78

@@ -163,6 +164,35 @@ async def get(
163164
prototype = default_buffer_prototype()
164165
return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
165166

167+
async def get_many_ordered(
168+
self,
169+
*path_components: str,
170+
prototype: BufferPrototype | None = None,
171+
byte_range: ByteRequest | None = None,
172+
) -> tuple[Buffer | None, ...]:
173+
"""
174+
Read multiple bytes from the store in order of the provided path_components.
175+
176+
Parameters
177+
----------
178+
path_components : str
179+
Components to append to the store path.
180+
prototype : BufferPrototype, optional
181+
The buffer prototype to use when reading the bytes.
182+
byte_range : ByteRequest, optional
183+
The range of bytes to read.
184+
185+
Returns
186+
-------
187+
tuple[Buffer | None, ...]
188+
A tuple of buffers read from the store, in the order of the provided path_components.
189+
"""
190+
if prototype is None:
191+
prototype = default_buffer_prototype()
192+
193+
tasks = [(self / component).path for component in path_components]
194+
return await self.store._get_many_ordered([(task, prototype, byte_range) for task in tasks])
195+
166196
async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None:
167197
"""
168198
Write bytes to the store.

src/zarr/storage/_fsspec.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import json
45
import warnings
56
from contextlib import suppress
@@ -18,7 +19,7 @@
1819
from zarr.storage._common import _dereference_path
1920

2021
if TYPE_CHECKING:
21-
from collections.abc import AsyncIterator, Iterable
22+
from collections.abc import AsyncGenerator, AsyncIterator, Iterable
2223

2324
from fsspec import AbstractFileSystem
2425
from fsspec.asyn import AsyncFileSystem
@@ -326,6 +327,29 @@ async def get(
326327
else:
327328
return value
328329

330+
async def _get_many(
331+
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
332+
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
333+
if getattr(self.fs, "asynchronous", True):
334+
async for key, value in super()._get_many(requests):
335+
yield (key, value)
336+
else:
337+
for key, prototype, byte_range in requests:
338+
value = await self.get(key, prototype, byte_range)
339+
yield (key, value)
340+
341+
async def _get_many_ordered(
342+
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
343+
) -> tuple[Buffer | None, ...]:
344+
if getattr(self.fs, "asynchronous", True):
345+
return await super()._get_many_ordered(requests)
346+
else:
347+
results = []
348+
for key, prototype, byte_range in requests:
349+
value = await self.get(key, prototype, byte_range)
350+
results.append(value)
351+
return tuple(results)
352+
329353
async def set(
330354
self,
331355
key: str,

src/zarr/testing/store.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,30 @@ async def test_get_many(self, store: S) -> None:
265265
expected_kvs = sorted(((k, b) for k, b in zip(keys, values, strict=False)))
266266
assert observed_kvs == expected_kvs
267267

268+
async def test_get_many_ordered(self, store: S) -> None:
269+
"""
270+
Ensure that multiple keys can be retrieved at once with the _get_many method,
271+
preserving the order of keys.
272+
"""
273+
keys = tuple(map(str, range(10)))
274+
values = tuple(f"{k}".encode() for k in keys)
275+
276+
for k, v in zip(keys, values, strict=False):
277+
await self.set(store, k, self.buffer_cls.from_bytes(v))
278+
279+
observed_buffers = await store._get_many_ordered(
280+
tuple(
281+
zip(
282+
keys,
283+
(default_buffer_prototype(),) * len(keys),
284+
(None,) * len(keys),
285+
strict=False,
286+
)
287+
)
288+
)
289+
observed_bytes = tuple(b.to_bytes() for b in observed_buffers if b is not None)
290+
assert observed_bytes == values, f"Expected {values}, got {observed_bytes}"
291+
268292
@pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"])
269293
@pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""])
270294
async def test_getsize(self, store: S, key: str, data: bytes) -> None:

tests/test_store/test_fsspec.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import json
45
import os
56
import re
7+
import warnings
8+
from itertools import cycle
69
from typing import TYPE_CHECKING, Any
710

811
import numpy as np
@@ -42,6 +45,7 @@
4245
]
4346

4447
fsspec = pytest.importorskip("fsspec")
48+
AsyncFileSystem = pytest.importorskip("fsspec.asyn").AsyncFileSystem
4549
s3fs = pytest.importorskip("s3fs")
4650
requests = pytest.importorskip("requests")
4751
moto_server = pytest.importorskip("moto.moto_server.threaded_moto_server")
@@ -440,3 +444,121 @@ async def test_with_read_only_auto_mkdir(tmp_path: Path) -> None:
440444

441445
store_w = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": False})
442446
_ = store_w.with_read_only()
447+
448+
449+
class LockableFileSystem(AsyncFileSystem):
450+
"""
451+
A mock file system that simulates asynchronous and synchronous behaviors with artificial delays.
452+
"""
453+
454+
def __init__(
455+
self,
456+
asynchronous: bool,
457+
lock: bool | None = None,
458+
delays: tuple[float, ...] | None = None,
459+
) -> None:
460+
if delays is None:
461+
delays = (
462+
0.03,
463+
0.01,
464+
)
465+
lock = lock if lock is not None else not asynchronous
466+
467+
# self.asynchronous = asynchronous
468+
self.lock = asyncio.Lock() if lock else None
469+
self.delays = cycle(delays)
470+
self.async_impl = True
471+
472+
super().__init__(asynchronous=asynchronous)
473+
474+
async def _check_active(self) -> None:
475+
if self.lock and self.lock.locked():
476+
raise RuntimeError("Concurrent requests!")
477+
478+
async def _cat_file(self, path, start=None, end=None) -> bytes:
479+
await self._simulate_io_operation(path)
480+
return self.get_data(path)
481+
482+
async def _await_io(self) -> None:
483+
await asyncio.sleep(next(self.delays))
484+
485+
async def _simulate_io_operation(self, path) -> None:
486+
if self.lock:
487+
await self._check_active()
488+
async with self.lock:
489+
await self._await_io()
490+
else:
491+
await self._await_io()
492+
493+
def get_store(self, path: str) -> FsspecStore:
494+
with warnings.catch_warnings():
495+
warnings.simplefilter("ignore", category=UserWarning)
496+
return FsspecStore(fs=self, path=path)
497+
498+
@staticmethod
499+
def get_data(key: str) -> bytes:
500+
return f"{key}_data".encode()
501+
502+
503+
@pytest.mark.asyncio
504+
class TestLockableFSSPECFileSystem:
505+
@pytest.fixture(autouse=True)
506+
async def setup(self):
507+
self.path = "root"
508+
self.store_async = LockableFileSystem(asynchronous=True).get_store(path=self.path)
509+
self.store_sync = LockableFileSystem(asynchronous=False).get_store(path=self.path)
510+
511+
def get_requests_and_true_results(self, path_components=("a", "b")):
512+
true_results = [
513+
(component, LockableFileSystem.get_data(os.path.join(self.path, component)))
514+
for component in path_components
515+
]
516+
requests = [(component, default_buffer_prototype(), None) for component in path_components]
517+
return requests, true_results
518+
519+
async def test_get_many_asynchronous_fs(self):
520+
requests, true_results = self.get_requests_and_true_results(("a", "b", "c"))
521+
522+
results = []
523+
async for k, v in self.store_async._get_many(requests):
524+
results.append((k, v.to_bytes() if v else None))
525+
526+
results_ordered = sorted(results, key=lambda x: x[0])
527+
assert results_ordered == true_results
528+
529+
async def test_get_many_synchronous_fs(self):
530+
requests, true_results = self.get_requests_and_true_results()
531+
532+
results = []
533+
async for k, v in self.store_sync._get_many(requests):
534+
results.append((k, v.to_bytes() if v else None))
535+
# Results should already be in the same order as requests
536+
537+
assert results == true_results
538+
539+
async def test_get_many_ordered_synchronous_fs(self):
540+
requests, true_results = self.get_requests_and_true_results()
541+
542+
results = await self.store_sync._get_many_ordered(requests)
543+
results = [value.to_bytes() if value else None for value in results]
544+
545+
assert results == [value[1] for value in true_results]
546+
547+
async def test_get_many_ordered_asynchronous_fs(self):
548+
requests, true_results = self.get_requests_and_true_results()
549+
550+
results = await self.store_async._get_many_ordered(requests)
551+
results = [value.to_bytes() if value else None for value in results]
552+
553+
assert results == [value[1] for value in true_results]
554+
555+
async def test_asynchronous_locked_fs_raises(self):
556+
store = LockableFileSystem(asynchronous=True, lock=True).get_store(path="root")
557+
requests, _ = self.get_requests_and_true_results()
558+
559+
with pytest.raises(RuntimeError, match="Concurrent requests!"):
560+
async for _, _ in store._get_many(requests):
561+
pass
562+
563+
with pytest.raises(RuntimeError, match="Concurrent requests!"):
564+
await store._get_many_ordered(requests)

0 commit comments

Comments
 (0)