Skip to content

Commit 464d041

Browse files
committed
obstore implementations for .getsize and .getsize_prefix
1 parent bbdefac commit 464d041

File tree

2 files changed

+47
-16
lines changed

2 files changed

+47
-16
lines changed

src/zarr/storage/_obstore.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import contextlib
55
import pickle
66
from collections import defaultdict
7-
from typing import TYPE_CHECKING, TypedDict
7+
from typing import TYPE_CHECKING, TypedDict, TypeVar
88

99
from zarr.abc.store import (
1010
ByteRequest,
@@ -16,7 +16,7 @@
1616
from zarr.core.config import config
1717

1818
if TYPE_CHECKING:
19-
from collections.abc import AsyncGenerator, Coroutine, Iterable
19+
from collections.abc import AsyncGenerator, Coroutine, Iterable, Sequence
2020
from typing import Any
2121

2222
from obstore import ListResult, ListStream, ObjectMeta, OffsetRange, SuffixRange
@@ -27,6 +27,8 @@
2727

2828
__all__ = ["ObjectStore"]
2929

30+
T = TypeVar("T")
31+
3032
_ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = (
3133
FileNotFoundError,
3234
IsADirectoryError,
@@ -212,41 +214,55 @@ def supports_listing(self) -> bool:
212214
# docstring inherited
213215
return True
214216

215-
def list(self) -> AsyncGenerator[str, None]:
216-
# docstring inherited
217+
def _list(self, prefix: str | None = None) -> AsyncGenerator[ObjectMeta, None]:
217218
import obstore as obs
218219

219-
objects: ListStream[list[ObjectMeta]] = obs.list(self.store)
220-
return _transform_list(objects)
220+
objects: ListStream[Sequence[ObjectMeta]] = obs.list(self.store, prefix=prefix)
221+
return (obj async for obj in _transform_list(objects))
221222

222-
def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
223+
def list(self) -> AsyncGenerator[str, None]:
223224
# docstring inherited
224-
import obstore as obs
225+
return (obj["path"] async for obj in self._list())
225226

226-
objects: ListStream[list[ObjectMeta]] = obs.list(self.store, prefix=prefix)
227-
return _transform_list(objects)
227+
def list_prefix(self, prefix: str) -> AsyncGenerator[str, None]:
228+
# docstring inherited
229+
return (obj["path"] async for obj in self._list(prefix))
228230

229231
def list_dir(self, prefix: str) -> AsyncGenerator[str, None]:
230232
# docstring inherited
231233
import obstore as obs
232234

233-
coroutine = obs.list_with_delimiter_async(self.store, prefix=prefix)
235+
coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]] = (
236+
obs.list_with_delimiter_async(self.store, prefix=prefix)
237+
)
234238
return _transform_list_dir(coroutine, prefix)
235239

240+
async def getsize(self, key: str) -> int:
241+
# docstring inherited
242+
import obstore as obs
243+
244+
resp = await obs.head_async(self.store, key)
245+
return resp["size"]
246+
247+
async def getsize_prefix(self, prefix: str) -> int:
248+
# docstring inherited
249+
sizes = [obj["size"] async for obj in self._list(prefix=prefix)]
250+
return sum(sizes)
251+
236252

237253
async def _transform_list(
238-
list_stream: ListStream[list[ObjectMeta]],
239-
) -> AsyncGenerator[str, None]:
254+
list_stream: ListStream[Sequence[ObjectMeta]],
255+
) -> AsyncGenerator[ObjectMeta, None]:
240256
"""
241-
Transform the result of list into an async generator of paths.
257+
Transform the result of list into an async generator of ObjectMeta dicts.
242258
"""
243259
async for batch in list_stream:
244260
for item in batch:
245-
yield item["path"]
261+
yield item
246262

247263

248264
async def _transform_list_dir(
249-
list_result_coroutine: Coroutine[Any, Any, ListResult[list[ObjectMeta]]], prefix: str
265+
list_result_coroutine: Coroutine[Any, Any, ListResult[Sequence[ObjectMeta]]], prefix: str
250266
) -> AsyncGenerator[str, None]:
251267
"""
252268
Transform the result of list_with_delimiter into an async generator of paths.

tests/test_store/test_object.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,21 @@ def test_store_init_raises(self) -> None:
7575
with pytest.raises(TypeError):
7676
ObjectStore("path/to/store")
7777

78+
async def test_store_getsize(self, store: ObjectStore) -> None:
79+
buf = cpu.Buffer.from_bytes(b"\x01\x02\x03\x04")
80+
await self.set(store, "key", buf)
81+
size = await store.getsize("key")
82+
assert size == len(buf)
83+
84+
async def test_store_getsize_prefix(self, store: ObjectStore) -> None:
85+
buf = cpu.Buffer.from_bytes(b"\x01\x02\x03\x04")
86+
await self.set(store, "c/key1/0", buf)
87+
await self.set(store, "c/key2/0", buf)
88+
size = await store.getsize_prefix("c/key1")
89+
assert size == len(buf)
90+
total_size = await store.getsize_prefix("c")
91+
assert total_size == len(buf) * 2
92+
7893

7994
@pytest.mark.slow_hypothesis
8095
def test_zarr_hierarchy():

0 commit comments

Comments
 (0)