Skip to content

(feat): custom reopen with read_elem_as_dask for remote h5ad #1665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions src/anndata/_io/specs/lazy_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from .registry import _LAZY_REGISTRY, IOSpec

if TYPE_CHECKING:
from collections.abc import Generator, Mapping, Sequence
from collections.abc import Callable, Generator, Iterator, Mapping, Sequence
from typing import Literal, ParamSpec, TypeVar

from anndata._core.sparse_dataset import _CSCDataset, _CSRDataset
from anndata._types import ArrayStorageType, StorageType
from anndata.experimental.backed._compat import DataArray, Dataset2D
from anndata.experimental.backed._lazy_arrays import CategoricalArray, MaskedArray

Expand Down Expand Up @@ -76,14 +78,22 @@ def make_dask_chunk(
path_or_sparse_dataset: Path | D,
elem_name: str,
block_info: BlockInfo | None = None,
*,
wrap: Callable[[ArrayStorageType], ArrayStorageType]
| Callable[[H5Group | ZarrGroup], _CSRDataset | _CSCDataset] = lambda g: g,
reopen: None | Callable[[], Iterator[StorageType]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the idea is “reopen is a callable that can be transformed into a context manager using contextlib.contextmanager.

Why not just “reopen is a callable that returns an contextlib.AbstractContextManager[StorageType]?

) -> CSMatrix | CSArray:
if block_info is None:
msg = "Block info is required"
raise ValueError(msg)
# We need to open the file in each task since `dask` cannot share h5py objects when using `dask.distributed`
# https://github.com/scverse/anndata/issues/1105
with maybe_open_h5(path_or_sparse_dataset, elem_name) as f:
mtx = ad.io.sparse_dataset(f) if isinstance(f, H5Group) else f
with (
contextmanager(reopen)()
if reopen is not None
else maybe_open_h5(path_or_sparse_dataset, elem_name)
) as f:
mtx = wrap(f)
idx = tuple(
slice(start, stop) for start, stop in block_info[None]["array-location"]
)
Expand All @@ -108,6 +118,7 @@ def read_sparse_as_dask(
*,
_reader: LazyReader,
chunks: tuple[int, ...] | None = None, # only tuple[int, int] is supported here
reopen: None | Callable[[], Iterator[StorageType]] = None,
) -> DaskArray:
import dask.array as da

Expand Down Expand Up @@ -149,7 +160,13 @@ def read_sparse_as_dask(
(chunks_minor, chunks_major) if is_csc else (chunks_major, chunks_minor)
)
memory_format = sparse.csc_matrix if is_csc else sparse.csr_matrix
make_chunk = partial(make_dask_chunk, path_or_sparse_dataset, elem_name)
make_chunk = partial(
make_dask_chunk,
path_or_sparse_dataset,
elem_name,
wrap=ad.sparse_dataset,
reopen=reopen,
)
da_mtx = da.map_blocks(
make_chunk,
dtype=dtype,
Expand Down Expand Up @@ -178,7 +195,11 @@ def read_h5_string_array(

@_LAZY_REGISTRY.register_read(H5Array, IOSpec("array", "0.2.0"))
def read_h5_array(
elem: H5Array, *, _reader: LazyReader, chunks: tuple[int, ...] | None = None
elem: H5Array,
*,
_reader: LazyReader,
chunks: tuple[int, ...] | None = None,
reopen: None | Callable[[], Iterator[StorageType]] = None,
) -> DaskArray:
import dask.array as da

Expand Down Expand Up @@ -208,7 +229,11 @@ def read_h5_array(
@_LAZY_REGISTRY.register_read(ZarrArray, IOSpec("string-array", "0.2.0"))
@_LAZY_REGISTRY.register_read(ZarrArray, IOSpec("array", "0.2.0"))
def read_zarr_array(
elem: ZarrArray, *, _reader: LazyReader, chunks: tuple[int, ...] | None = None
elem: ZarrArray,
*,
_reader: LazyReader,
chunks: tuple[int, ...] | None = None,
reopen: None | Callable[[], Iterator[StorageType]] = None,
) -> DaskArray:
chunks: tuple[int, ...] = chunks if chunks is not None else elem.chunks
import dask.array as da
Expand Down
14 changes: 10 additions & 4 deletions src/anndata/_io/specs/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from anndata.compat import DaskArray, ZarrGroup, _read_attr, is_zarr_v2

if TYPE_CHECKING:
from collections.abc import Callable, Generator, Iterable
from collections.abc import Callable, Generator, Iterable, Iterator
from typing import Any

from anndata._types import (
Expand Down Expand Up @@ -292,6 +292,7 @@ def read_elem(
elem: StorageType,
modifiers: frozenset[str] = frozenset(),
chunks: tuple[int, ...] | None = None,
reopen: None | Callable[[], Iterator[StorageType]] = None,
**kwargs,
) -> LazyDataStructures:
"""Read a dask element from a store. See exported function for more details."""
Expand All @@ -313,7 +314,7 @@ def read_elem(
raise ValueError(msg)
if "chunks" in read_params:
kwargs["chunks"] = chunks
return read_func(elem, **kwargs)
return read_func(elem, chunks=chunks, reopen=reopen)


class Writer:
Expand Down Expand Up @@ -404,7 +405,10 @@ def read_elem(elem: StorageType) -> RWAble:


def read_elem_lazy(
elem: StorageType, chunks: tuple[int, ...] | None = None, **kwargs
elem: StorageType,
chunks: tuple[int, ...] | None = None,
reopen: None | Callable[[], Iterator[StorageType]] = None,
**kwargs,
) -> LazyDataStructures:
"""
Read an element from a store lazily.
Expand All @@ -424,6 +428,8 @@ def read_elem_lazy(
`(adata.shape[0], 1000)` for CSC sparse,
and the on-disk chunking otherwise for dense.
Can use `-1` or `None` to indicate use of the size of the corresponding dimension.
reopen, optional
A custom function for re-opening your store in the dask reader.

Returns
-------
Expand Down Expand Up @@ -479,7 +485,7 @@ def read_elem_lazy(
>>> adata.X = ad.experimental.read_elem_lazy(g["X"], chunks=(500, -1))
>>> adata.X = ad.experimental.read_elem_lazy(g["X"], chunks=(500, None))
"""
return LazyReader(_LAZY_REGISTRY).read_elem(elem, chunks=chunks, **kwargs)
return LazyReader(_LAZY_REGISTRY).read_elem(elem, chunks=chunks, reopen=reopen)


def write_elem(
Expand Down
Loading