diff --git a/changes/3207.bugfix.rst b/changes/3207.bugfix.rst new file mode 100644 index 0000000000..85d2c7b96b --- /dev/null +++ b/changes/3207.bugfix.rst @@ -0,0 +1,18 @@ +- This pull request resolves the issue of deadlocks and indefinite hangs when + opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a + fallback to sequential reads for non-concurrency-safe filesystems, ensuring + robust metadata retrieval without sacrificing performance for safe + filesystems. Furthermore ``Store.get_many`` was modified to retrieve objects + concurrently from storage. The previous implementation was sequential, + awaiting each ``self.get(*req)`` before proceeding, contrary to the docstring. +- Introduced ``Store.get_many_ordered`` and ``StorePath.get_many_ordered`` to + retrieve multiple metadata files in a single call, optimizing the retrieval + process and reducing overhead. ``StorePath.get_many_ordered`` is used in + ``get_array_metadata``. ``Store._get_many_ordered`` is used in + ``_read_metadata_v2``. +- Modified ``FsspecStore._get_many`` and ``FsspecStore._get_many_ordered`` + to conditionally use ``asyncio.gather`` based on the concurrency safety + of the underlying file system, enhancing compatibility with + synchronous file systems by avoiding deadlocks when accessing metadata + concurrently. Adding tests ``LockableFileSystem`` to test with + async/sync behavior. diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 1fbdb3146c..b056f0f2fa 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from asyncio import gather +from asyncio import as_completed, gather from dataclasses import dataclass from itertools import starmap from typing import TYPE_CHECKING, Protocol, runtime_checkable @@ -414,8 +414,27 @@ async def _get_many( that objects will be retrieved in the order in which they were requested, so this method yields tuple[str, Buffer | None] instead of just Buffer | None """ - for req in requests: - yield (req[0], await self.get(*req)) + + async def _get_with_name( + key: str, prototype: BufferPrototype, byte_range: ByteRequest | None + ) -> tuple[str, Buffer | None]: + value = await self.get(key, prototype, byte_range) + return key, value + + tasks = [_get_with_name(*req) for req in requests] + for completed in as_completed(tasks): + task = await completed + yield task + + async def _get_many_ordered( + self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]] + ) -> tuple[Buffer | None, ...]: + """ + Retrieve a collection of objects from storage in the order they were requested. + """ + tasks = [self.get(*req) for req in requests] + + return tuple(await gather(*tasks)) async def getsize(self, key: str) -> int: """ diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index a0b8e9e7dd..894aad13f6 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -212,9 +212,10 @@ async def get_array_metadata( store_path: StorePath, zarr_format: ZarrFormat | None = 3 ) -> dict[str, JSON]: if zarr_format == 2: - zarray_bytes, zattrs_bytes = await gather( - (store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype), - (store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype), + zarray_bytes, zattrs_bytes = await store_path.get_many_ordered( + ZARRAY_JSON, + ZATTRS_JSON, + prototype=cpu_buffer_prototype, ) if zarray_bytes is None: raise FileNotFoundError(store_path) @@ -223,10 +224,11 @@ async def get_array_metadata( if zarr_json_bytes is None: raise FileNotFoundError(store_path) elif zarr_format is None: - zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather( - (store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype), - (store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype), - (store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype), + zarr_json_bytes, zarray_bytes, zattrs_bytes = await store_path.get_many_ordered( + ZARR_JSON, + ZARRAY_JSON, + ZATTRS_JSON, + prototype=cpu_buffer_prototype, ) if zarr_json_bytes is not None and zarray_bytes is not None: # warn and favor v3 @@ -1446,7 +1448,6 @@ async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = F ).items() ] ) - await gather(*awaitables) async def _set_selection( diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index e02d09694f..3044abb230 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -539,11 +539,11 @@ async def open( zgroup_bytes, zattrs_bytes, maybe_consolidated_metadata_bytes, - ) = await asyncio.gather( - (store_path / ZARR_JSON).get(), - (store_path / ZGROUP_JSON).get(), - (store_path / ZATTRS_JSON).get(), - (store_path / str(consolidated_key)).get(), + ) = await store_path.get_many_ordered( + ZARR_JSON, + ZGROUP_JSON, + ZATTRS_JSON, + consolidated_key, ) if zarr_json_bytes is not None and zgroup_bytes is not None: # warn and favor v3 @@ -3465,10 +3465,12 @@ async def _read_metadata_v2(store: Store, path: str) -> ArrayV2Metadata | GroupM """ # TODO: consider first fetching array metadata, and only fetching group metadata when we don't # find an array - zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather( - store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()), - store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()), - store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()), + zarray_bytes, zgroup_bytes, zattrs_bytes = await store._get_many_ordered( + [ + (_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None), + (_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None), + (_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None), + ] ) if zattrs_bytes is None: diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index e25fa28424..8d646f3655 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -2,6 +2,7 @@ import importlib.util import json +from asyncio import gather from pathlib import Path from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias @@ -163,6 +164,35 @@ async def get( prototype = default_buffer_prototype() return await self.store.get(self.path, prototype=prototype, byte_range=byte_range) + async def get_many_ordered( + self, + *path_components: str, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> tuple[Buffer | None, ...]: + """ + Read multiple bytes from the store in order of the provided path_components. + + Parameters + ---------- + path_components : str + Components to append to the store path. + prototype : BufferPrototype, optional + The buffer prototype to use when reading the bytes. + byte_range : ByteRequest, optional + The range of bytes to read. + + Returns + ------- + tuple[Buffer | None, ...] + A tuple of buffers read from the store, in the order of the provided path_components. + """ + if prototype is None: + prototype = default_buffer_prototype() + + tasks = [(self / component).path for component in path_components] + return await self.store._get_many_ordered([(task, prototype, byte_range) for task in tasks]) + async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None: """ Write bytes to the store. diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index e169eededc..bcc5516a2a 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import json import warnings from contextlib import suppress @@ -18,7 +19,7 @@ from zarr.storage._common import _dereference_path if TYPE_CHECKING: - from collections.abc import AsyncIterator, Iterable + from collections.abc import AsyncGenerator, AsyncIterator, Iterable from fsspec import AbstractFileSystem from fsspec.asyn import AsyncFileSystem @@ -326,6 +327,29 @@ async def get( else: return value + async def _get_many( + self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]] + ) -> AsyncGenerator[tuple[str, Buffer | None], None]: + if getattr(self.fs, "asynchronous", True): + async for key, value in super()._get_many(requests): + yield (key, value) + else: + for key, prototype, byte_range in requests: + value = await self.get(key, prototype, byte_range) + yield (key, value) + + async def _get_many_ordered( + self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]] + ) -> tuple[Buffer | None, ...]: + if getattr(self.fs, "asynchronous", True): + return await super()._get_many_ordered(requests) + else: + results = [] + for key, prototype, byte_range in requests: + value = await self.get(key, prototype, byte_range) + results.append(value) + return tuple(results) + async def set( self, key: str, diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index d2946705f0..575c56a9a3 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -265,6 +265,30 @@ async def test_get_many(self, store: S) -> None: expected_kvs = sorted(((k, b) for k, b in zip(keys, values, strict=False))) assert observed_kvs == expected_kvs + async def test_get_many_ordered(self, store: S) -> None: + """ + Ensure that multiple keys can be retrieved at once with the _get_many method, + preserving the order of keys. + """ + keys = tuple(map(str, range(10))) + values = tuple(f"{k}".encode() for k in keys) + + for k, v in zip(keys, values, strict=False): + await self.set(store, k, self.buffer_cls.from_bytes(v)) + + observed_buffers = await store._get_many_ordered( + tuple( + zip( + keys, + (default_buffer_prototype(),) * len(keys), + (None,) * len(keys), + strict=False, + ) + ) + ) + observed_bytes = tuple(b.to_bytes() for b in observed_buffers if b is not None) + assert observed_bytes == values, f"Expected {values}, got {observed_bytes}" + @pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"]) @pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""]) async def test_getsize(self, store: S, key: str, data: bytes) -> None: diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 026b25f8fc..7633d64262 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -1,8 +1,11 @@ from __future__ import annotations +import asyncio import json import os import re +import warnings +from itertools import cycle from typing import TYPE_CHECKING, Any import numpy as np @@ -42,6 +45,7 @@ ] fsspec = pytest.importorskip("fsspec") +AsyncFileSystem = pytest.importorskip("fsspec.asyn").AsyncFileSystem s3fs = pytest.importorskip("s3fs") requests = pytest.importorskip("requests") moto_server = pytest.importorskip("moto.moto_server.threaded_moto_server") @@ -440,3 +444,121 @@ async def test_with_read_only_auto_mkdir(tmp_path: Path) -> None: store_w = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": False}) _ = store_w.with_read_only() + + +class LockableFileSystem(AsyncFileSystem): + """ + A mock file system that simulates asynchronous and synchronous behaviors with artificial delays. + """ + + def __init__( + self, + asynchronous: bool, + lock: bool | None = None, + delays: tuple[float, ...] | None = None, + ) -> None: + if delays is None: + delays = ( + 0.03, + 0.01, + ) + lock = lock if lock is not None else not asynchronous + + # self.asynchronous = asynchronous + self.lock = asyncio.Lock() if lock else None + self.delays = cycle(delays) + self.async_impl = True + + super().__init__(asynchronous=asynchronous) + + async def _check_active(self) -> None: + if self.lock and self.lock.locked(): + raise RuntimeError("Concurrent requests!") + + async def _cat_file(self, path, start=None, end=None) -> bytes: + await self._simulate_io_operation(path) + return self.get_data(path) + + async def _await_io(self) -> None: + await asyncio.sleep(next(self.delays)) + + async def _simulate_io_operation(self, path) -> None: + if self.lock: + await self._check_active() + async with self.lock: + await self._await_io() + else: + await self._await_io() + + def get_store(self, path: str) -> FsspecStore: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=UserWarning) + return FsspecStore(fs=self, path=path) + + @staticmethod + def get_data(key: str) -> bytes: + return f"{key}_data".encode() + + +@pytest.mark.asyncio +class TestLockableFSSPECFileSystem: + @pytest.fixture(autouse=True) + async def setup(self): + self.path = "root" + self.store_async = LockableFileSystem(asynchronous=True).get_store(path=self.path) + self.store_sync = LockableFileSystem(asynchronous=False).get_store(path=self.path) + + def get_requests_and_true_results(self, path_components=("a", "b")): + true_results = [ + (component, LockableFileSystem.get_data(f"{self.path}/{component}")) + for component in path_components + ] + requests = [(component, default_buffer_prototype(), None) for component in path_components] + return requests, true_results + + async def test_get_many_asynchronous_fs(self): + requests, true_results = self.get_requests_and_true_results(("a", "b", "c")) + + results = [] + async for k, v in self.store_async._get_many(requests): + results.append((k, v.to_bytes() if v else None)) + + results_ordered = sorted(results, key=lambda x: x[0]) + assert results_ordered == true_results + + async def test_get_many_synchronous_fs(self): + requests, true_results = self.get_requests_and_true_results() + + results = [] + async for k, v in self.store_sync._get_many(requests): + results.append((k, v.to_bytes() if v else None)) + # Results should already be in the same order as requests + + assert results == true_results + + async def test_get_many_ordered_synchronous_fs(self): + requests, true_results = self.get_requests_and_true_results() + + results = await self.store_sync._get_many_ordered(requests) + results = [value.to_bytes() if value else None for value in results] + + assert results == [value[1] for value in true_results] + + async def test_get_many_ordered_asynchronous_fs(self): + requests, true_results = self.get_requests_and_true_results() + + results = await self.store_async._get_many_ordered(requests) + results = [value.to_bytes() if value else None for value in results] + + assert results == [value[1] for value in true_results] + + async def test_asynchronous_locked_fs_raises(self): + store = LockableFileSystem(asynchronous=True, lock=True).get_store(path="root") + requests, _ = self.get_requests_and_true_results() + + with pytest.raises(RuntimeError, match="Concurrent requests!"): + async for _, _ in store._get_many(requests): + pass + + with pytest.raises(RuntimeError, match="Concurrent requests!"): + await store._get_many_ordered(requests)