From 8a1642cc8f38b75ce98efc9c828ce8fe7e8b484b Mon Sep 17 00:00:00 2001 From: Kenneth Li Date: Fri, 11 Jul 2025 09:43:14 -0400 Subject: [PATCH 1/3] obstore implementations for .getsize and .getsize_prefix --- changes/3227.feature.rst | 1 + src/zarr/storage/_obstore.py | 47 +++++++++++++++++++-------------- tests/test_store/test_object.py | 15 +++++++++++ 3 files changed, 43 insertions(+), 20 deletions(-) create mode 100644 changes/3227.feature.rst diff --git a/changes/3227.feature.rst b/changes/3227.feature.rst new file mode 100644 index 0000000000..ddbedd0a30 --- /dev/null +++ b/changes/3227.feature.rst @@ -0,0 +1 @@ +Add lightweight implementations of .getsize() and .getsize_prefix() for ObjectStore. diff --git a/src/zarr/storage/_obstore.py b/src/zarr/storage/_obstore.py index 1b822a919e..85c60254e4 100644 --- a/src/zarr/storage/_obstore.py +++ b/src/zarr/storage/_obstore.py @@ -16,7 +16,7 @@ from zarr.core.config import config if TYPE_CHECKING: - from collections.abc import AsyncGenerator, Coroutine, Iterable + from collections.abc import AsyncGenerator, Coroutine, Iterable, Sequence from typing import Any from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange @@ -212,41 +212,48 @@ def supports_listing(self) -> bool: # docstring inherited return True - def list(self) -> AsyncGenerator[str, None]: - # docstring inherited + async def _list(self, prefix: str | None = None) -> AsyncGenerator[ObjectMeta, None]: import obstore as obs - objects: ListStream[list[ObjectMeta]] = obs.list(self.store) - return _transform_list(objects) + objects: ListStream[Sequence[ObjectMeta]] = obs.list(self.store, prefix=prefix) + async for batch in objects: + for item in batch: + yield item - def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: + # return (obj async for obj in _transform_list(objects)) + + def list(self) -> AsyncGenerator[str, None]: # docstring inherited - import obstore as obs + return (obj["path"] async for obj in self._list()) - objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix) - return _transform_list(objects) + def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]: + # docstring inherited + return (obj["path"] async for obj in self._list(prefix)) def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited import obstore as obs - coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix) + coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]] = ( + obs.list_with_delimiter_async(self.store, prefix=prefix) + ) return _transform_list_dir(coroutine, prefix) + async def getsize(self, key: str) -> int: + # docstring inherited + import obstore as obs -async def _transform_list( - list_stream: ListStream[list[ObjectMeta]], -) -> AsyncGenerator[str, None]: - """ - Transform the result of list into an async generator of paths. - """ - async for batch in list_stream: - for item in batch: - yield item["path"] + resp = await obs.head_async(self.store, key) + return resp["size"] + + async def getsize_prefix(self, prefix: str) -> int: + # docstring inherited + sizes = [obj["size"] async for obj in self._list(prefix=prefix)] + return sum(sizes) async def _transform_list_dir( - list_result_coroutine: Coroutine[Any, Any, ListResult[list[ObjectMeta]]], prefix: str + list_result_coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]], prefix: str ) -> AsyncGenerator[str, None]: """ Transform the result of list_with_delimiter into an async generator of paths. diff --git a/tests/test_store/test_object.py b/tests/test_store/test_object.py index 4d9e8fcc1f..d8b89e56b7 100644 --- a/tests/test_store/test_object.py +++ b/tests/test_store/test_object.py @@ -75,6 +75,21 @@ def test_store_init_raises(self) -> None: with pytest.raises(TypeError): ObjectStore("path/to/store") + async def test_store_getsize(self, store: ObjectStore) -> None: + buf = cpu.Buffer.from_bytes(b"\x01\x02\x03\x04") + await self.set(store, "key", buf) + size = await store.getsize("key") + assert size == len(buf) + + async def test_store_getsize_prefix(self, store: ObjectStore) -> None: + buf = cpu.Buffer.from_bytes(b"\x01\x02\x03\x04") + await self.set(store, "c/key1/0", buf) + await self.set(store, "c/key2/0", buf) + size = await store.getsize_prefix("c/key1") + assert size == len(buf) + total_size = await store.getsize_prefix("c") + assert total_size == len(buf) * 2 + @pytest.mark.slow_hypothesis def test_zarr_hierarchy(): From 8eaeaf03b55ad3eb4b4e69747d189a400dd4cae6 Mon Sep 17 00:00:00 2001 From: Kenneth Li Date: Wed, 16 Jul 2025 23:24:23 -0400 Subject: [PATCH 2/3] rm comment --- src/zarr/storage/_obstore.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/zarr/storage/_obstore.py b/src/zarr/storage/_obstore.py index 85c60254e4..ad98629ed9 100644 --- a/src/zarr/storage/_obstore.py +++ b/src/zarr/storage/_obstore.py @@ -220,8 +220,6 @@ async def _list(self, prefix: str | None = None) -> AsyncGenerator[ObjectMeta, N for item in batch: yield item - # return (obj async for obj in _transform_list(objects)) - def list(self) -> AsyncGenerator[str, None]: # docstring inherited return (obj["path"] async for obj in self._list()) From 85a4ac580b082050e116441c70e041b295bab31e Mon Sep 17 00:00:00 2001 From: Kenneth Li Date: Wed, 16 Jul 2025 23:42:29 -0400 Subject: [PATCH 3/3] rm typehint --- src/zarr/storage/_obstore.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/zarr/storage/_obstore.py b/src/zarr/storage/_obstore.py index ad98629ed9..e1469a991e 100644 --- a/src/zarr/storage/_obstore.py +++ b/src/zarr/storage/_obstore.py @@ -232,9 +232,7 @@ def list_dir(self, prefix: str) -> AsyncGenerator[str, None]: # docstring inherited import obstore as obs - coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]] = ( - obs.list_with_delimiter_async(self.store, prefix=prefix) - ) + coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix) return _transform_list_dir(coroutine, prefix) async def getsize(self, key: str) -> int: