Skip to content

(feat): add async versions of {write,read}_{elem,dispatched} #1902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 40 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
f97a786
(fix): try faster ci
ilan-gold Mar 7, 2025
f7caf36
(chore): remove `read_elem_partial`
ilan-gold Mar 7, 2025
f563800
Merge branch 'main' into ig/faster_ci
ilan-gold Mar 7, 2025
25f7f6f
(feat): add `async` versions of `read_{elem,dispatched}`
ilan-gold Mar 7, 2025
0393a74
(fix): write chunk-by-chunk
ilan-gold Mar 8, 2025
37ad8f4
(feat): use `_async_array` for reading
ilan-gold Mar 9, 2025
eb08bd5
(feat): `async` full reading of sparse
ilan-gold Mar 10, 2025
c23df17
(refactor): remove `read_dispatched_async`
ilan-gold Mar 10, 2025
899e7ee
(chore): refactor to remove duplicated code
ilan-gold Mar 12, 2025
5f14193
(feat): writing
ilan-gold Mar 12, 2025
d8be5bf
Merge branch 'main' into ig/faster_ci
ilan-gold Mar 13, 2025
19387f5
(fix): reporting read error
ilan-gold Mar 13, 2025
1984f62
(chore): async tests
ilan-gold Mar 13, 2025
059a948
(fix): actually use wrapper
ilan-gold Mar 13, 2025
68cab54
(chore): add benchmarks
ilan-gold Mar 13, 2025
39f62ab
Merge branch 'main' into ig/faster_ci
ilan-gold Mar 13, 2025
6e01d50
Merge branch 'ig/faster_ci' into ig/async_io
ilan-gold Mar 13, 2025
0c9907b
Merge branch 'ig/faster_ci' into ig/async_io
ilan-gold Mar 13, 2025
f51ffd1
(chore): merge for backed
ilan-gold Mar 13, 2025
cafa6ff
(fix): benchmark
ilan-gold Mar 13, 2025
2b93c70
(fix): settinng with v2
ilan-gold Mar 13, 2025
bb036ce
(fix): notebooks
ilan-gold Mar 13, 2025
325eb6e
(fix): docs
ilan-gold Mar 13, 2025
b69ba89
(fix): small type fixes
ilan-gold Mar 13, 2025
4d8b7a5
(fix): write typing
ilan-gold Mar 13, 2025
35b7deb
(fix): look at mode
ilan-gold Mar 13, 2025
c527e26
(fix): lol methods
ilan-gold Mar 13, 2025
233cd7a
(fix): zarr setup
ilan-gold Mar 13, 2025
13e3253
(fix): api docs
ilan-gold Mar 13, 2025
5b2ceba
(fix): benchmark read functions
ilan-gold Mar 13, 2025
356772d
(fix): custom compression
ilan-gold Mar 13, 2025
c4118f6
(fix): no zarr backed
ilan-gold Mar 13, 2025
c86aeb6
(fix): remove `ReadAsyncCallback`
ilan-gold Mar 13, 2025
4866808
(fix): zarr benchmark
ilan-gold Mar 13, 2025
cd69257
(fix): actuall write data
ilan-gold Mar 13, 2025
24da584
(fix): docs + add `pytest-asyncio`
ilan-gold Mar 13, 2025
ab9740e
(fix): zarr writing
ilan-gold Mar 13, 2025
58b7087
(fix): async tests
ilan-gold Mar 13, 2025
21b0a6e
(fix): usage of write function
ilan-gold Mar 13, 2025
8855231
(fix): write for docs
ilan-gold Mar 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 103 additions & 60 deletions benchmarks/benchmarks/readwrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import sys
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING

import numpy as np
import pooch
Expand All @@ -35,6 +36,9 @@

from .utils import get_actualsize, get_peak_mem, sedate

if TYPE_CHECKING:
from collections.abc import Callable

PBMC_3K_URL = "https://falexwolf.de/data/pbmc3k_raw.h5ad"

# PBMC_3K_PATH = Path(__file__).parent / "data/pbmc3k_raw.h5ad"
Expand All @@ -43,100 +47,104 @@
# BM_43K_CSC_PATH = Path(__file__).parent.parent / "datasets/BM2_43k-cells_CSC.h5ad"


# class ZarrReadSuite:
# params = []
# param_names = ["input_url"]

# def setup(self, input_url):
# self.filepath = pooch.retrieve(url=input_url, known_hash=None)

# def time_read_full(self, input_url):
# anndata.read_zarr(self.filepath)

# def peakmem_read_full(self, input_url):
# anndata.read_zarr(self.filepath)

# def mem_readfull_object(self, input_url):
# return anndata.read_zarr(self.filepath)
class TestSuite:
_urls = dict(pbmc3k=PBMC_3K_URL)
params = _urls.keys()
param_names = ["input_data"]
filepath: Path
read_func: Callable[[Path | str], anndata.AnnData]

# def track_read_full_memratio(self, input_url):
# mem_recording = memory_usage(
# (sedate(anndata.read_zarr, 0.005), (self.filepath,)), interval=0.001
# )
# adata = anndata.read_zarr(self.filepath)
# base_size = mem_recording[-1] - mem_recording[0]
# print(np.max(mem_recording) - np.min(mem_recording))
# print(base_size)
# return (np.max(mem_recording) - np.min(mem_recording)) / base_size
def setup(self, input_data: str):
self.filepath = Path(
pooch.retrieve(url=self._urls[input_data], known_hash=None)
)

# def peakmem_read_backed(self, input_url):
# anndata.read_zarr(self.filepath, backed="r")

# def mem_read_backed_object(self, input_url):
# return anndata.read_zarr(self.filepath, backed="r")
class ZarrMixin(TestSuite):
def setup(self, input_data: str):
super().setup(input_data)
zarr_path = self.filepath.with_suffix(".zarr")
anndata.read_h5ad(self.filepath).write_zarr(zarr_path)
self.filepath = zarr_path

@property
def read_func(self):
return anndata.read_zarr

class H5ADInMemorySizeSuite:
_urls = dict(pbmc3k=PBMC_3K_URL)
params = _urls.keys()
param_names = ["input_data"]

def setup(self, input_data: str):
self.filepath = pooch.retrieve(url=self._urls[input_data], known_hash=None)
class H5ADInMemorySizeSuite(TestSuite):
@property
def read_func(self):
return anndata.read_h5ad

def track_in_memory_size(self, *_):
adata = anndata.read_h5ad(self.filepath)
adata = self.read_func(self.filepath)
adata_size = sys.getsizeof(adata)

return adata_size

def track_actual_in_memory_size(self, *_):
adata = anndata.read_h5ad(self.filepath)
adata = self.read_func(self.filepath)
adata_size = get_actualsize(adata)

return adata_size


class H5ADReadSuite:
_urls = dict(pbmc3k=PBMC_3K_URL)
params = _urls.keys()
param_names = ["input_data"]
class ZarrInMemorySizeSuite(ZarrMixin, H5ADInMemorySizeSuite):
@property
def read_func(self):
return anndata.read_zarr

def setup(self, input_data: str):
self.filepath = pooch.retrieve(url=self._urls[input_data], known_hash=None)

class H5ADReadSuite(TestSuite):
@property
def read_func(self):
return anndata.read_h5ad

def time_read_full(self, *_):
anndata.read_h5ad(self.filepath)
self.read_func(self.filepath)

def peakmem_read_full(self, *_):
anndata.read_h5ad(self.filepath)
self.read_func(self.filepath)

def mem_readfull_object(self, *_):
return anndata.read_h5ad(self.filepath)
return self.read_func(self.filepath)

def track_read_full_memratio(self, *_):
mem_recording = memory_usage(
(sedate(anndata.read_h5ad, 0.005), (self.filepath,)), interval=0.001
(sedate(self.read_func, 0.005), (self.filepath,)), interval=0.001
)
# adata = anndata.read_h5ad(self.filepath)
# adata = self.read_func(self.filepath)
base_size = mem_recording[-1] - mem_recording[0]
print(np.max(mem_recording) - np.min(mem_recording))
print(base_size)
return (np.max(mem_recording) - np.min(mem_recording)) / base_size

# causes benchmarking to break from: https://github.com/pympler/pympler/issues/151
# def mem_read_backed_object(self, *_):
# return self.read_func(self.filepath, backed="r")


class BackedH5ADSuite(TestSuite):
def peakmem_read_backed(self, *_):
anndata.read_h5ad(self.filepath, backed="r")

# causes benchmarking to break from: https://github.com/pympler/pympler/issues/151
# def mem_read_backed_object(self, *_):
# return anndata.read_h5ad(self.filepath, backed="r")

class ZarrReadSuite(ZarrMixin, H5ADReadSuite):
@property
def read_func(self):
return anndata.read_zarr


class H5ADWriteSuite:
_urls = dict(pbmc3k=PBMC_3K_URL)
params = _urls.keys()
param_names = ["input_data"]

@property
def write_func(self):
return anndata.write_h5ad

def setup(self, input_data: str):
mem_recording, adata = memory_usage(
(
Expand All @@ -155,31 +163,66 @@ def teardown(self, *_):
self.tmpdir.cleanup()

def time_write_full(self, *_):
self.adata.write_h5ad(self.writepth, compression=None)
self.write_func(self.writepth, compression=None)

def peakmem_write_full(self, *_):
self.adata.write_h5ad(self.writepth)
self.write_func(self.writepth)

def track_peakmem_write_full(self, *_):
return get_peak_mem((sedate(self.adata.write_h5ad), (self.writepth,)))
return get_peak_mem((sedate(self.write_func), (self.writepth, self.adata)))

def time_write_compressed(self, *_):
self.adata.write_h5ad(self.writepth, compression="gzip")
self.write_func(self.adata, self.writepth, compression="gzip")

def peakmem_write_compressed(self, *_):
self.adata.write_h5ad(self.writepth, compression="gzip")
self.write_func(self.adata, self.writepth, compression="gzip")

def track_peakmem_write_compressed(self, *_):
return get_peak_mem(
(sedate(self.adata.write_h5ad), (self.writepth,), {"compression": "gzip"})
(
sedate(self.write_func),
(self.writepth, self.adata),
{"compression": "gzip"},
)
)


class H5ADBackedWriteSuite(H5ADWriteSuite):
_urls = dict(pbmc3k=PBMC_3K_URL)
params = _urls.keys()
param_names = ["input_data"]
class ZarrWriteSizeSuite(H5ADWriteSuite):
write_func_str = "write_zarr"

@property
def write_func(self):
return anndata.write_zarr

def setup(self, input_data: str):
h5_path = Path(pooch.retrieve(self._urls[input_data], known_hash=None))
zarr_path = h5_path.with_suffix(".zarr")
anndata.read_h5ad(h5_path).write_zarr(zarr_path)

mem_recording, adata = memory_usage(
(
sedate(anndata.read_zarr, 0.005),
(zarr_path,),
),
retval=True,
interval=0.001,
)
self.adata = adata
self.base_size = mem_recording[-1] - mem_recording[0]
self.tmpdir = tempfile.TemporaryDirectory()
self.writepth = Path(self.tmpdir.name) / "out.zarr"

def track_peakmem_write_compressed(self, *_):
return get_peak_mem(
(
sedate(self.write_func),
(self.writepth, self.adata),
{"compression": "gzip"},
)
)


class BackedH5ADWriteSuite(H5ADWriteSuite):
def setup(self, input_data):
mem_recording, adata = memory_usage(
(
Expand Down
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ Types used by the former:
experimental.IOSpec
experimental.Read
experimental.Write
experimental.ReadAsync
experimental.ReadCallback
experimental.WriteCallback
experimental.StorageType
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def setup(app: Sphinx):
"anndata._types.WriteCallback": "anndata.experimental.WriteCallback",
"anndata._types.Read": "anndata.experimental.Read",
"anndata._types.Write": "anndata.experimental.Write",
"anndata._types.ReadAsync": "anndata.experimental.ReadAsync",
"zarr.core.array.Array": "zarr.Array",
"zarr.core.group.Group": "zarr.Group",
"anndata.compat.DaskArray": "dask.array.Array",
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ test-full = [ "anndata[test,lazy]" ]
test = [
"loompy>=3.0.5",
"pytest>=8.2,<8.3.4",
"pytest-asyncio",
"pytest-cov",
"matplotlib",
"scikit-learn",
Expand Down Expand Up @@ -143,6 +144,7 @@ addopts = [
"--pyargs",
"-ptesting.anndata._pytest",
]
asyncio_mode = "auto"
filterwarnings = [
"ignore::anndata._warnings.OldFormatWarning",
"ignore::anndata._warnings.ExperimentalFeatureWarning",
Expand Down
43 changes: 39 additions & 4 deletions src/anndata/_core/sparse_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# - think about supporting the COO format
from __future__ import annotations

import asyncio
import warnings
from abc import ABC
from collections.abc import Iterable
Expand Down Expand Up @@ -586,7 +587,12 @@ def append(self, sparse_matrix: CSMatrix | CSArray) -> None:
append_data = sparse_matrix.data
append_indices = sparse_matrix.indices
if isinstance(sparse_matrix.data, ZarrArray) and not is_zarr_v2():
data[orig_data_size:] = append_data[...]
from .._io.specs.methods import _iter_chunks_for_copy

for chunk in _iter_chunks_for_copy(append_data, data):
data[(chunk.start + orig_data_size) : (chunk.stop + orig_data_size)] = (
append_data[chunk]
)
else:
data[orig_data_size:] = append_data
# indptr
Expand All @@ -598,12 +604,16 @@ def append(self, sparse_matrix: CSMatrix | CSArray) -> None:
)

# indices
if isinstance(sparse_matrix.data, ZarrArray) and not is_zarr_v2():
append_indices = append_indices[...]
indices = self.group["indices"]
orig_data_size = indices.shape[0]
indices.resize((orig_data_size + sparse_matrix.indices.shape[0],))
indices[orig_data_size:] = append_indices
if isinstance(sparse_matrix.data, ZarrArray) and not is_zarr_v2():
for chunk in _iter_chunks_for_copy(append_indices, indices):
indices[
(chunk.start + orig_data_size) : (chunk.stop + orig_data_size)
] = append_indices[chunk]
else:
indices[orig_data_size:] = append_indices

# Clear cached property
for attr in ["_indptr", "_indices", "_data"]:
Expand Down Expand Up @@ -652,6 +662,31 @@ def to_memory(self) -> CSMatrix | CSArray:
mtx.indptr = self._indptr
return mtx

async def to_memory_async(self) -> CSMatrix | CSArray:
format_class = get_memory_class(
self.format, use_sparray_in_io=settings.use_sparse_array_on_read
)
mtx = format_class(self.shape, dtype=self.dtype)
mtx.indptr = self._indptr
if isinstance(self._data, ZarrArray) and not is_zarr_v2():
await asyncio.gather(
*(
self.set_memory_async_from_zarr(mtx, attr)
for attr in ["data", "indices"]
)
)
else:
mtx.data = self._data[...]
mtx.indices = self._indices[...]
return mtx

async def set_memory_async_from_zarr(
self, mtx: CSMatrix | CSArray, attr: Literal["indptr", "data", "indices"]
) -> None:
setattr(
mtx, attr, await getattr(self, f"_{attr}")._async_array.getitem(())
) # TODO: better way to asyncify


class _CSRDataset(BaseCompressedSparseDataset, abc.CSRDataset):
"""Internal concrete version of :class:`anndata.abc.CSRDataset`."""
Expand Down
Loading
Loading