Skip to content

Preventing Deadlocks When Reading Metadata Concurrently via asyncio.gather #3207

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions changes/3207.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
- This pull request resolves the issue of deadlocks and indefinite hangs when
opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a
fallback to sequential reads for non-concurrency-safe filesystems, ensuring
robust metadata retrieval without sacrificing performance for safe
filesystems. Furthermore ``Store.get_many`` was modified to retrieve objects
concurrently from storage. The previous implementation was sequential,
awaiting each ``self.get(*req)`` before proceeding, contrary to the docstring.
- Introduced ``Store.get_many_ordered`` and ``StorePath.get_many_ordered`` to
retrieve multiple metadata files in a single call, optimizing the retrieval
process and reducing overhead. ``StorePath.get_many_ordered`` is used in
``get_array_metadata``. ``Store._get_many_ordered`` is used in
``_read_metadata_v2``.
- Modified ``FsspecStore._get_many`` and ``FsspecStore._get_many_ordered``
to conditionally use ``asyncio.gather`` based on the concurrency safety
of the underlying file system, enhancing compatibility with
synchronous file systems by avoiding deadlocks when accessing metadata
concurrently. Adding tests ``LockableFileSystem`` to test with
async/sync behavior.
25 changes: 22 additions & 3 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from asyncio import gather
from asyncio import as_completed, gather
from dataclasses import dataclass
from itertools import starmap
from typing import TYPE_CHECKING, Protocol, runtime_checkable
Expand Down Expand Up @@ -414,8 +414,27 @@ async def _get_many(
that objects will be retrieved in the order in which they were requested, so this method
yields tuple[str, Buffer | None] instead of just Buffer | None
"""
for req in requests:
yield (req[0], await self.get(*req))

async def _get_with_name(
key: str, prototype: BufferPrototype, byte_range: ByteRequest | None
) -> tuple[str, Buffer | None]:
value = await self.get(key, prototype, byte_range)
return key, value

tasks = [_get_with_name(*req) for req in requests]
for completed in as_completed(tasks):
task = await completed
yield task
Comment on lines +418 to +427
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the advantage of this new implementation? the previous implementation was extremely simple, which I think is good for an abc.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The claim in the docstring is incorrect given the previous implementation.

This loop is sequential: it awaits each self.get(*req) and yields it before moving on to the next. Each request is handled one at a time, in the exact order provided. Therefore, results are always yielded in the same order as the input requests.

It is thus not fully concurrent which would be desirable in an I/O-limited system and, at least as I understand, kind of defeats the purpose of having an asynchronous _get_many method yielding results in the first place. Because if we stick to the order, we might as well await all results and simply replace the implementation of _get_many with that of _get_many_ordered, making it faster and arguable more easy to used in the asynchronous case. If we want to give the extra flexibility of not awaiting all at once, but still requesting all at the same time, the new implementation would be the right one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point of the default implementation is to be the simplest possible implementation of _get_many that any child class can safely support, given an implementation of get. But child classes should also be able to override this with more efficient methods where applicable, and in these cases the order of results is not guaranteed. hence the type annotation in the original method.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this somewhat confusing, as I would have expected the standard implementation to be fully asynchronous. However, if the goal is to maximize simplicity, then having an asynchronous implementation that runs synchronously might be the way to go.

That being said, if we revert this to the original, we would only have to also remove the FsspecStore._get_many from my current solution. Unless you think we should not have a _get_many_ordered method and use the _get_many method instead and then always sort the values locally, as they could be of a different order in other implementations.


async def _get_many_ordered(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
) -> tuple[Buffer | None, ...]:
"""
Retrieve a collection of objects from storage in the order they were requested.
"""
tasks = [self.get(*req) for req in requests]
Comment on lines +429 to +435
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I see the use for this method. If store users want fetches to happen in a specific order, then users can call get in a loop. If users only want the results of their fetches to be ordered, they can re-order the results after receiving them.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced the method _get_many_ordered because it is the only used case in the current implementation. Not using it leads to repetitive boilerplate code because we would always find ourselves awaiting all fetches and then sorting the results afterwards. Using _get_many_ordered encapsulates this logic, resulting in code that is less verbose, less repetitive, and easier-to-read.

Example

Compare for example

zarray_bytes, zgroup_bytes, zattrs_bytes = await store._get_many_ordered(
    [
        (_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
        (_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
        (_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
    ]
)

with

ordered_keys = [
    (_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
    (_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
    (_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
]

retrieved_objects = {}
async for key, value in store._get_many(ordered_keys):
    retrieved_objects[key] = value

zarray_bytes, zgroup_bytes, zattrs_bytes = tuple(retrieved_objects.get(key[0]) for key in ordered_keys)

The first block can be read much faster.

At the same time, we cannot use the original,

zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
    store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()),
    store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()),
    store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()),

as this would lead to deadlocks in synchronous file systems, and we of course also don't want to call .get sequentially because it reduces performance in asynchronous systems.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get sequentially because it reduces performance in asynchronous systems.

get is async. caling it returns a coroutine. these can be scheduled together with asyncio.gather, which will preserve order. So calling get sequentially is not a performance problem.

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, but it is, if we were to await them sequentially, e.g. if we instead used

zarray_bytes = await store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()) 
zgroup_bytes = await store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype())
zattrs_bytes = await store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype())

This would assure that synchronous file systems don't run into deadlocks. But it would not be a good alternative for asynchronous systems.

Copy link
Contributor

@d-v-b d-v-b Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm confused, aren't you doing all this exactly because the fsspec sftp backend is not async? so then sequential awaiting (inside the logic of _get_many) is exactly what's we expect to happen, no?

Copy link
Author

@dgegen dgegen Jul 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in the general case, the implementation should be asynchronous. It must be implemented, however, in such a way that we can make it synchronous by overwriting store methods in the FSSpec store, making what is generally asynchronous synchronous if store.fs.asynchronous==False.

Note also, that originally, we were not using await.gather wrapping multiple get statements, so this was not possible. And a general sequential solution is not desirable in an I/O-limited system.


return tuple(await gather(*tasks))

async def getsize(self, key: str) -> int:
"""
Expand Down
17 changes: 9 additions & 8 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,10 @@ async def get_array_metadata(
store_path: StorePath, zarr_format: ZarrFormat | None = 3
) -> dict[str, JSON]:
if zarr_format == 2:
zarray_bytes, zattrs_bytes = await gather(
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
zarray_bytes, zattrs_bytes = await store_path.get_many_ordered(
ZARRAY_JSON,
ZATTRS_JSON,
prototype=cpu_buffer_prototype,
)
if zarray_bytes is None:
raise FileNotFoundError(store_path)
Expand All @@ -223,10 +224,11 @@ async def get_array_metadata(
if zarr_json_bytes is None:
raise FileNotFoundError(store_path)
elif zarr_format is None:
zarr_json_bytes, zarray_bytes, zattrs_bytes = await gather(
(store_path / ZARR_JSON).get(prototype=cpu_buffer_prototype),
(store_path / ZARRAY_JSON).get(prototype=cpu_buffer_prototype),
(store_path / ZATTRS_JSON).get(prototype=cpu_buffer_prototype),
zarr_json_bytes, zarray_bytes, zattrs_bytes = await store_path.get_many_ordered(
ZARR_JSON,
ZARRAY_JSON,
ZATTRS_JSON,
prototype=cpu_buffer_prototype,
)
if zarr_json_bytes is not None and zarray_bytes is not None:
# warn and favor v3
Expand Down Expand Up @@ -1446,7 +1448,6 @@ async def _save_metadata(self, metadata: ArrayMetadata, ensure_parents: bool = F
).items()
]
)

await gather(*awaitables)

async def _set_selection(
Expand Down
20 changes: 11 additions & 9 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,11 @@ async def open(
zgroup_bytes,
zattrs_bytes,
maybe_consolidated_metadata_bytes,
) = await asyncio.gather(
(store_path / ZARR_JSON).get(),
(store_path / ZGROUP_JSON).get(),
(store_path / ZATTRS_JSON).get(),
(store_path / str(consolidated_key)).get(),
) = await store_path.get_many_ordered(
ZARR_JSON,
ZGROUP_JSON,
ZATTRS_JSON,
consolidated_key,
)
if zarr_json_bytes is not None and zgroup_bytes is not None:
# warn and favor v3
Expand Down Expand Up @@ -3465,10 +3465,12 @@ async def _read_metadata_v2(store: Store, path: str) -> ArrayV2Metadata | GroupM
"""
# TODO: consider first fetching array metadata, and only fetching group metadata when we don't
# find an array
zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()),
store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()),
store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()),
zarray_bytes, zgroup_bytes, zattrs_bytes = await store._get_many_ordered(
[
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
]
)

if zattrs_bytes is None:
Expand Down
30 changes: 30 additions & 0 deletions src/zarr/storage/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import importlib.util
import json
from asyncio import gather
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias

Expand Down Expand Up @@ -163,6 +164,35 @@ async def get(
prototype = default_buffer_prototype()
return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)

async def get_many_ordered(
self,
*path_components: str,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> tuple[Buffer | None, ...]:
"""
Read multiple bytes from the store in order of the provided path_components.
Parameters
----------
path_components : str
Components to append to the store path.
prototype : BufferPrototype, optional
The buffer prototype to use when reading the bytes.
byte_range : ByteRequest, optional
The range of bytes to read.
Returns
-------
tuple[Buffer | None, ...]
A tuple of buffers read from the store, in the order of the provided path_components.
"""
if prototype is None:
prototype = default_buffer_prototype()

tasks = [(self / component).path for component in path_components]
return await self.store._get_many_ordered([(task, prototype, byte_range) for task in tasks])

async def set(self, value: Buffer, byte_range: ByteRequest | None = None) -> None:
"""
Write bytes to the store.
Expand Down
26 changes: 25 additions & 1 deletion src/zarr/storage/_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import json
import warnings
from contextlib import suppress
Expand All @@ -18,7 +19,7 @@
from zarr.storage._common import _dereference_path

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from collections.abc import AsyncGenerator, AsyncIterator, Iterable

from fsspec import AbstractFileSystem
from fsspec.asyn import AsyncFileSystem
Expand Down Expand Up @@ -326,6 +327,29 @@ async def get(
else:
return value

async def _get_many(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
) -> AsyncGenerator[tuple[str, Buffer | None], None]:
if getattr(self.fs, "asynchronous", True):
async for key, value in super()._get_many(requests):
yield (key, value)
else:
for key, prototype, byte_range in requests:
value = await self.get(key, prototype, byte_range)
yield (key, value)

async def _get_many_ordered(
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]]
) -> tuple[Buffer | None, ...]:
if getattr(self.fs, "asynchronous", True):
return await super()._get_many_ordered(requests)
else:
results = []
for key, prototype, byte_range in requests:
value = await self.get(key, prototype, byte_range)
results.append(value)
return tuple(results)

async def set(
self,
key: str,
Expand Down
24 changes: 24 additions & 0 deletions src/zarr/testing/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,30 @@ async def test_get_many(self, store: S) -> None:
expected_kvs = sorted(((k, b) for k, b in zip(keys, values, strict=False)))
assert observed_kvs == expected_kvs

async def test_get_many_ordered(self, store: S) -> None:
"""
Ensure that multiple keys can be retrieved at once with the _get_many method,
preserving the order of keys.
"""
keys = tuple(map(str, range(10)))
values = tuple(f"{k}".encode() for k in keys)

for k, v in zip(keys, values, strict=False):
await self.set(store, k, self.buffer_cls.from_bytes(v))

observed_buffers = await store._get_many_ordered(
tuple(
zip(
keys,
(default_buffer_prototype(),) * len(keys),
(None,) * len(keys),
strict=False,
)
)
)
observed_bytes = tuple(b.to_bytes() for b in observed_buffers if b is not None)
assert observed_bytes == values, f"Expected {values}, got {observed_bytes}"

@pytest.mark.parametrize("key", ["c/0", "foo/c/0.0", "foo/0/0"])
@pytest.mark.parametrize("data", [b"\x01\x02\x03\x04", b""])
async def test_getsize(self, store: S, key: str, data: bytes) -> None:
Expand Down
122 changes: 122 additions & 0 deletions tests/test_store/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from __future__ import annotations

import asyncio
import json
import os
import re
import warnings
from itertools import cycle
from typing import TYPE_CHECKING, Any

import numpy as np
Expand Down Expand Up @@ -42,6 +45,7 @@
]

fsspec = pytest.importorskip("fsspec")
AsyncFileSystem = pytest.importorskip("fsspec.asyn").AsyncFileSystem
s3fs = pytest.importorskip("s3fs")
requests = pytest.importorskip("requests")
moto_server = pytest.importorskip("moto.moto_server.threaded_moto_server")
Expand Down Expand Up @@ -440,3 +444,121 @@ async def test_with_read_only_auto_mkdir(tmp_path: Path) -> None:

store_w = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": False})
_ = store_w.with_read_only()


class LockableFileSystem(AsyncFileSystem):
"""
A mock file system that simulates asynchronous and synchronous behaviors with artificial delays.
"""

def __init__(
self,
asynchronous: bool,
lock: bool | None = None,
delays: tuple[float, ...] | None = None,
) -> None:
if delays is None:
delays = (
0.03,
0.01,
)
lock = lock if lock is not None else not asynchronous

# self.asynchronous = asynchronous
self.lock = asyncio.Lock() if lock else None
self.delays = cycle(delays)
self.async_impl = True

super().__init__(asynchronous=asynchronous)

async def _check_active(self) -> None:
if self.lock and self.lock.locked():
raise RuntimeError("Concurrent requests!")

async def _cat_file(self, path, start=None, end=None) -> bytes:
await self._simulate_io_operation(path)
return self.get_data(path)

async def _await_io(self) -> None:
await asyncio.sleep(next(self.delays))

async def _simulate_io_operation(self, path) -> None:
if self.lock:
await self._check_active()
async with self.lock:
await self._await_io()
else:
await self._await_io()

def get_store(self, path: str) -> FsspecStore:
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=UserWarning)
return FsspecStore(fs=self, path=path)

@staticmethod
def get_data(key: str) -> bytes:
return f"{key}_data".encode()


@pytest.mark.asyncio
class TestLockableFSSPECFileSystem:
@pytest.fixture(autouse=True)
async def setup(self):
self.path = "root"
self.store_async = LockableFileSystem(asynchronous=True).get_store(path=self.path)
self.store_sync = LockableFileSystem(asynchronous=False).get_store(path=self.path)

def get_requests_and_true_results(self, path_components=("a", "b")):
true_results = [
(component, LockableFileSystem.get_data(f"{self.path}/{component}"))
for component in path_components
]
requests = [(component, default_buffer_prototype(), None) for component in path_components]
return requests, true_results

async def test_get_many_asynchronous_fs(self):
requests, true_results = self.get_requests_and_true_results(("a", "b", "c"))

results = []
async for k, v in self.store_async._get_many(requests):
results.append((k, v.to_bytes() if v else None))

results_ordered = sorted(results, key=lambda x: x[0])
assert results_ordered == true_results

async def test_get_many_synchronous_fs(self):
requests, true_results = self.get_requests_and_true_results()

results = []
async for k, v in self.store_sync._get_many(requests):
results.append((k, v.to_bytes() if v else None))
# Results should already be in the same order as requests

assert results == true_results

async def test_get_many_ordered_synchronous_fs(self):
requests, true_results = self.get_requests_and_true_results()

results = await self.store_sync._get_many_ordered(requests)
results = [value.to_bytes() if value else None for value in results]

assert results == [value[1] for value in true_results]

async def test_get_many_ordered_asynchronous_fs(self):
requests, true_results = self.get_requests_and_true_results()

results = await self.store_async._get_many_ordered(requests)
results = [value.to_bytes() if value else None for value in results]

assert results == [value[1] for value in true_results]

async def test_asynchronous_locked_fs_raises(self):
store = LockableFileSystem(asynchronous=True, lock=True).get_store(path="root")
requests, _ = self.get_requests_and_true_results()

with pytest.raises(RuntimeError, match="Concurrent requests!"):
async for _, _ in store._get_many(requests):
pass

with pytest.raises(RuntimeError, match="Concurrent requests!"):
await store._get_many_ordered(requests)
Loading