Skip to content

(feat): add async versions of {write,read}_{elem,dispatched} #1902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 40 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f97a786
(fix): try faster ci
ilan-gold Mar 7, 2025
f7caf36
(chore): remove `read_elem_partial`
ilan-gold Mar 7, 2025
f563800
Merge branch 'main' into ig/faster_ci
ilan-gold Mar 7, 2025
25f7f6f
(feat): add `async` versions of `read_{elem,dispatched}`
ilan-gold Mar 7, 2025
0393a74
(fix): write chunk-by-chunk
ilan-gold Mar 8, 2025
37ad8f4
(feat): use `_async_array` for reading
ilan-gold Mar 9, 2025
eb08bd5
(feat): `async` full reading of sparse
ilan-gold Mar 10, 2025
c23df17
(refactor): remove `read_dispatched_async`
ilan-gold Mar 10, 2025
899e7ee
(chore): refactor to remove duplicated code
ilan-gold Mar 12, 2025
5f14193
(feat): writing
ilan-gold Mar 12, 2025
d8be5bf
Merge branch 'main' into ig/faster_ci
ilan-gold Mar 13, 2025
19387f5
(fix): reporting read error
ilan-gold Mar 13, 2025
1984f62
(chore): async tests
ilan-gold Mar 13, 2025
059a948
(fix): actually use wrapper
ilan-gold Mar 13, 2025
68cab54
(chore): add benchmarks
ilan-gold Mar 13, 2025
39f62ab
Merge branch 'main' into ig/faster_ci
ilan-gold Mar 13, 2025
6e01d50
Merge branch 'ig/faster_ci' into ig/async_io
ilan-gold Mar 13, 2025
0c9907b
Merge branch 'ig/faster_ci' into ig/async_io
ilan-gold Mar 13, 2025
f51ffd1
(chore): merge for backed
ilan-gold Mar 13, 2025
cafa6ff
(fix): benchmark
ilan-gold Mar 13, 2025
2b93c70
(fix): settinng with v2
ilan-gold Mar 13, 2025
bb036ce
(fix): notebooks
ilan-gold Mar 13, 2025
325eb6e
(fix): docs
ilan-gold Mar 13, 2025
b69ba89
(fix): small type fixes
ilan-gold Mar 13, 2025
4d8b7a5
(fix): write typing
ilan-gold Mar 13, 2025
35b7deb
(fix): look at mode
ilan-gold Mar 13, 2025
c527e26
(fix): lol methods
ilan-gold Mar 13, 2025
233cd7a
(fix): zarr setup
ilan-gold Mar 13, 2025
13e3253
(fix): api docs
ilan-gold Mar 13, 2025
5b2ceba
(fix): benchmark read functions
ilan-gold Mar 13, 2025
356772d
(fix): custom compression
ilan-gold Mar 13, 2025
c4118f6
(fix): no zarr backed
ilan-gold Mar 13, 2025
c86aeb6
(fix): remove `ReadAsyncCallback`
ilan-gold Mar 13, 2025
4866808
(fix): zarr benchmark
ilan-gold Mar 13, 2025
cd69257
(fix): actuall write data
ilan-gold Mar 13, 2025
24da584
(fix): docs + add `pytest-asyncio`
ilan-gold Mar 13, 2025
ab9740e
(fix): zarr writing
ilan-gold Mar 13, 2025
58b7087
(fix): async tests
ilan-gold Mar 13, 2025
21b0a6e
(fix): usage of write function
ilan-gold Mar 13, 2025
8855231
(fix): write for docs
ilan-gold Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ jobs:
- script: uv pip list
displayName: "Display installed versions"

- script: pytest
- script: pytest -n auto
displayName: "PyTest"
condition: eq(variables['TEST_TYPE'], 'standard')

- script: pytest --cov --cov-report=xml --cov-context=test
- script: pytest --cov --cov-report=xml --cov-context=test -n auto
displayName: "PyTest (coverage)"
condition: eq(variables['TEST_TYPE'], 'coverage')

- script: pytest --strict-warnings
- script: pytest --strict-warnings -n auto
displayName: "PyTest (treat warnings as errors)"
condition: eq(variables['TEST_TYPE'], 'strict-warning')

Expand Down
17 changes: 13 additions & 4 deletions src/anndata/_core/sparse_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,12 @@ def append(self, sparse_matrix: CSMatrix | CSArray) -> None:
append_data = sparse_matrix.data
append_indices = sparse_matrix.indices
if isinstance(sparse_matrix.data, ZarrArray) and not is_zarr_v2():
data[orig_data_size:] = append_data[...]
from .._io.specs.methods import _iter_chunks_for_copy

for chunk in _iter_chunks_for_copy(append_data, data):
data[(chunk.start + orig_data_size) : (chunk.stop + orig_data_size)] = (
append_data[chunk]
)
else:
data[orig_data_size:] = append_data
# indptr
Expand All @@ -598,12 +603,16 @@ def append(self, sparse_matrix: CSMatrix | CSArray) -> None:
)

# indices
if isinstance(sparse_matrix.data, ZarrArray) and not is_zarr_v2():
append_indices = append_indices[...]
indices = self.group["indices"]
orig_data_size = indices.shape[0]
indices.resize((orig_data_size + sparse_matrix.indices.shape[0],))
indices[orig_data_size:] = append_indices
if isinstance(sparse_matrix.data, ZarrArray) and not is_zarr_v2():
for chunk in _iter_chunks_for_copy(append_indices, indices):
indices[
(chunk.start + orig_data_size) : (chunk.stop + orig_data_size)
] = append_indices[chunk]
else:
indices[orig_data_size:] = append_indices

# Clear cached property
for attr in ["_indptr", "_indices", "_data"]:
Expand Down
68 changes: 39 additions & 29 deletions src/anndata/_io/h5ad.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import re
from functools import partial
from pathlib import Path
Expand All @@ -23,9 +24,10 @@
_decode_structured_array,
_from_fixed_length_strings,
)
from ..experimental import read_dispatched
from .specs import read_elem, write_elem
from .specs.registry import IOSpec, write_spec
from ..experimental import read_dispatched_async
from .specs import write_elem
from .specs.methods import sync_async_to_async
from .specs.registry import IOSpec, read_elem_async, write_spec
from .utils import (
H5PY_V3,
_read_legacy_raw,
Expand Down Expand Up @@ -140,7 +142,7 @@
del f[key]


def read_h5ad_backed(filename: str | Path, mode: Literal["r", "r+"]) -> AnnData:
async def read_h5ad_backed(filename: str | Path, mode: Literal["r", "r+"]) -> AnnData:
d = dict(filename=filename, filemode=mode)

f = h5py.File(filename, mode)
Expand All @@ -153,11 +155,11 @@
else:
for k in df_attributes:
if k in f: # Backwards compat
d[k] = read_dataframe(f[k])
d[k] = await read_dataframe(f[k])

Check warning on line 158 in src/anndata/_io/h5ad.py

View check run for this annotation

Codecov / codecov/patch

src/anndata/_io/h5ad.py#L158

Added line #L158 was not covered by tests

d.update({k: read_elem(f[k]) for k in attributes if k in f})
d.update({k: await read_elem_async(f[k]) for k in attributes if k in f})

d["raw"] = _read_raw(f, attrs={"var", "varm"})
d["raw"] = await _read_raw(f, attrs={"var", "varm"})

adata = AnnData(**d)

Expand Down Expand Up @@ -212,7 +214,7 @@
if mode is True:
mode = "r+"
assert mode in {"r", "r+"}
return read_h5ad_backed(filename, mode)
return asyncio.run(read_h5ad_backed(filename, mode))

if as_sparse_fmt not in (sparse.csr_matrix, sparse.csc_matrix):
msg = "Dense formats can only be read to CSR or CSC matrices at this time."
Expand All @@ -234,33 +236,38 @@

with h5py.File(filename, "r") as f:

def callback(func, elem_name: str, elem, iospec):
async def callback(func, elem_name: str, elem, iospec):
if iospec.encoding_type == "anndata" or elem_name.endswith("/"):
return AnnData(
**{
# This is covering up backwards compat in the anndata initializer
# In most cases we should be able to call `func(elen[k])` instead
k: read_dispatched(elem[k], callback)
for k in elem.keys()
if not k.startswith("raw.")
}
args = dict(
await asyncio.gather(
*(
# This is covering up backwards compat in the anndata initializer
# In most cases we should be able to call `func(elen[k])` instead
sync_async_to_async(
k, read_dispatched_async(elem[k], callback)
)
for k in elem.keys()
if not k.startswith("raw.")
)
)
)
return AnnData(**args)
elif elem_name.startswith("/raw."):
return None
elif elem_name == "/X" and "X" in as_sparse:
return rdasp(elem)
elif elem_name == "/raw":
return _read_raw(f, as_sparse, rdasp)
return await _read_raw(f, as_sparse, rdasp)
elif elem_name in {"/obs", "/var"}:
# Backwards compat
return read_dataframe(elem)
return func(elem)
return await read_dataframe(elem)
return await func(elem)

adata = read_dispatched(f, callback=callback)
adata = asyncio.run(read_dispatched_async(f, callback=callback))

# Backwards compat (should figure out which version)
if "raw.X" in f:
raw = AnnData(**_read_raw(f, as_sparse, rdasp))
raw = AnnData(**asyncio.run(_read_raw(f, as_sparse, rdasp)))
raw.obs_names = adata.obs_names
adata.raw = raw

Expand All @@ -271,7 +278,7 @@
return adata


def _read_raw(
async def _read_raw(
f: h5py.File | AnnDataFileManager,
as_sparse: Collection[str] = (),
rdasp: Callable[[h5py.Dataset], CSMatrix] | None = None,
Expand All @@ -282,12 +289,15 @@
assert rdasp is not None, "must supply rdasp if as_sparse is supplied"
raw = {}
if "X" in attrs and "raw/X" in f:
read_x = rdasp if "raw/X" in as_sparse else read_elem
raw["X"] = read_x(f["raw/X"])
raw["X"] = (
(await read_elem_async(f["raw/X"]))
if "raw/X" not in as_sparse
else rdasp(f["raw/X"])
)
for v in ("var", "varm"):
if v in attrs and f"raw/{v}" in f:
raw[v] = read_elem(f[f"raw/{v}"])
return _read_legacy_raw(f, raw, read_dataframe, read_elem, attrs=attrs)
raw[v] = await read_elem_async(f[f"raw/{v}"])
return await _read_legacy_raw(f, raw, read_dataframe, read_elem_async, attrs=attrs)


@report_read_key_on_error
Expand All @@ -310,12 +320,12 @@
return df


def read_dataframe(group: h5py.Group | h5py.Dataset) -> pd.DataFrame:
async def read_dataframe(group: h5py.Group | h5py.Dataset) -> pd.DataFrame:
"""Backwards compat function"""
if not isinstance(group, h5py.Group):
return read_dataframe_legacy(group)
else:
return read_elem(group)
return await read_elem_async(group)


@report_read_key_on_error
Expand Down
2 changes: 2 additions & 0 deletions src/anndata/_io/specs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
get_spec,
read_elem,
read_elem_as_dask,
read_elem_async,
write_elem,
)

Expand All @@ -20,6 +21,7 @@
"get_spec",
"read_elem",
"read_elem_as_dask",
"read_elem_async",
"Reader",
"Writer",
"IOSpec",
Expand Down
6 changes: 3 additions & 3 deletions src/anndata/_io/specs/lazy_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def make_dask_chunk(
@_LAZY_REGISTRY.register_read(H5Group, IOSpec("csr_matrix", "0.1.0"))
@_LAZY_REGISTRY.register_read(ZarrGroup, IOSpec("csc_matrix", "0.1.0"))
@_LAZY_REGISTRY.register_read(ZarrGroup, IOSpec("csr_matrix", "0.1.0"))
def read_sparse_as_dask(
async def read_sparse_as_dask(
elem: H5Group | ZarrGroup,
*,
_reader: DaskReader,
Expand Down Expand Up @@ -148,7 +148,7 @@ def read_sparse_as_dask(


@_LAZY_REGISTRY.register_read(H5Array, IOSpec("array", "0.2.0"))
def read_h5_array(
async def read_h5_array(
elem: H5Array, *, _reader: DaskReader, chunks: tuple[int, ...] | None = None
) -> DaskArray:
import dask.array as da
Expand Down Expand Up @@ -177,7 +177,7 @@ def read_h5_array(


@_LAZY_REGISTRY.register_read(ZarrArray, IOSpec("array", "0.2.0"))
def read_zarr_array(
async def read_zarr_array(
elem: ZarrArray, *, _reader: DaskReader, chunks: tuple[int, ...] | None = None
) -> DaskArray:
chunks: tuple[int, ...] = chunks if chunks is not None else elem.chunks
Expand Down
Loading
Loading