-
-
Notifications
You must be signed in to change notification settings - Fork 346
Preventing Deadlocks When Reading Metadata Concurrently via asyncio.gather
#3207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
- This pull request resolves the issue of deadlocks and indefinite hangs when | ||
opening Zarr v3 arrays on synchronous fsspec filesystems, by implementing a | ||
fallback to sequential reads for non-concurrency-safe filesystems, ensuring | ||
robust metadata retrieval without sacrificing performance for safe | ||
filesystems. Furthermore ``Store.get_many`` was modified to retrieve objects | ||
concurrently from storage. The previous implementation was sequential, | ||
awaiting each ``self.get(*req)`` before proceeding, contrary to the docstring. | ||
- Introduced ``Store.get_many_ordered`` and ``StorePath.get_many_ordered`` to | ||
retrieve multiple metadata files in a single call, optimizing the retrieval | ||
process and reducing overhead. ``StorePath.get_many_ordered`` is used in | ||
``get_array_metadata``. ``Store._get_many_ordered`` is used in | ||
``_read_metadata_v2``. | ||
- Modified ``FsspecStore._get_many`` and ``FsspecStore._get_many_ordered`` | ||
to conditionally use ``asyncio.gather`` based on the concurrency safety | ||
of the underlying file system, enhancing compatibility with | ||
synchronous file systems by avoiding deadlocks when accessing metadata | ||
concurrently. Adding tests ``LockableFileSystem`` to test with | ||
async/sync behavior. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
from __future__ import annotations | ||
|
||
from abc import ABC, abstractmethod | ||
from asyncio import gather | ||
from asyncio import as_completed, gather | ||
from dataclasses import dataclass | ||
from itertools import starmap | ||
from typing import TYPE_CHECKING, Protocol, runtime_checkable | ||
|
@@ -414,8 +414,27 @@ async def _get_many( | |
that objects will be retrieved in the order in which they were requested, so this method | ||
yields tuple[str, Buffer | None] instead of just Buffer | None | ||
""" | ||
for req in requests: | ||
yield (req[0], await self.get(*req)) | ||
|
||
async def _get_with_name( | ||
key: str, prototype: BufferPrototype, byte_range: ByteRequest | None | ||
) -> tuple[str, Buffer | None]: | ||
value = await self.get(key, prototype, byte_range) | ||
return key, value | ||
|
||
tasks = [_get_with_name(*req) for req in requests] | ||
for completed in as_completed(tasks): | ||
task = await completed | ||
yield task | ||
|
||
async def _get_many_ordered( | ||
self, requests: Iterable[tuple[str, BufferPrototype, ByteRequest | None]] | ||
) -> tuple[Buffer | None, ...]: | ||
""" | ||
Retrieve a collection of objects from storage in the order they were requested. | ||
""" | ||
tasks = [self.get(*req) for req in requests] | ||
Comment on lines
+429
to
+435
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I see the use for this method. If store users want fetches to happen in a specific order, then users can call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I introduced the method ExampleCompare for example zarray_bytes, zgroup_bytes, zattrs_bytes = await store._get_many_ordered(
[
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
]
) with ordered_keys = [
(_join_paths([path, ZARRAY_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZGROUP_JSON]), default_buffer_prototype(), None),
(_join_paths([path, ZATTRS_JSON]), default_buffer_prototype(), None),
]
retrieved_objects = {}
async for key, value in store._get_many(ordered_keys):
retrieved_objects[key] = value
zarray_bytes, zgroup_bytes, zattrs_bytes = tuple(retrieved_objects.get(key[0]) for key in ordered_keys) The first block can be read much faster. At the same time, we cannot use the original, zarray_bytes, zgroup_bytes, zattrs_bytes = await asyncio.gather(
store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype()),
store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype()),
store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()), as this would lead to deadlocks in synchronous file systems, and we of course also don't want to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Of course, but it is, if we were to await them sequentially, e.g. if we instead used zarray_bytes = await store.get(_join_paths([path, ZARRAY_JSON]), prototype=default_buffer_prototype())
zgroup_bytes = await store.get(_join_paths([path, ZGROUP_JSON]), prototype=default_buffer_prototype())
zattrs_bytes = await store.get(_join_paths([path, ZATTRS_JSON]), prototype=default_buffer_prototype()) This would assure that synchronous file systems don't run into deadlocks. But it would not be a good alternative for asynchronous systems. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm confused, aren't you doing all this exactly because the fsspec sftp backend is not async? so then sequential awaiting (inside the logic of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but in the general case, the implementation should be asynchronous. It must be implemented, however, in such a way that we can make it synchronous by overwriting store methods in the FSSpec store, making what is generally asynchronous synchronous if Note also, that originally, we were not using |
||
|
||
return tuple(await gather(*tasks)) | ||
|
||
async def getsize(self, key: str) -> int: | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the advantage of this new implementation? the previous implementation was extremely simple, which I think is good for an abc.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The claim in the docstring is incorrect given the previous implementation.
This loop is sequential: it awaits each self.get(*req) and yields it before moving on to the next. Each request is handled one at a time, in the exact order provided. Therefore, results are always yielded in the same order as the input requests.
It is thus not fully concurrent which would be desirable in an I/O-limited system and, at least as I understand, kind of defeats the purpose of having an asynchronous
_get_many
method yielding results in the first place. Because if we stick to the order, we might as well await all results and simply replace the implementation of_get_many
with that of_get_many_ordered
, making it faster and arguable more easy to used in the asynchronous case. If we want to give the extra flexibility of not awaiting all at once, but still requesting all at the same time, the new implementation would be the right one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the point of the default implementation is to be the simplest possible implementation of
_get_many
that any child class can safely support, given an implementation ofget
. But child classes should also be able to override this with more efficient methods where applicable, and in these cases the order of results is not guaranteed. hence the type annotation in the original method.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this somewhat confusing, as I would have expected the standard implementation to be fully asynchronous. However, if the goal is to maximize simplicity, then having an asynchronous implementation that runs synchronously might be the way to go.
That being said, if we revert this to the original, we would only have to also remove the
FsspecStore._get_many
from my current solution. Unless you think we should not have a_get_many_ordered
method and use the_get_many
method instead and then always sort the values locally, as they could be of a different order in other implementations.