Skip to content

Commit 1e1eceb

Browse files
authored
Use extra buffer copies for cloud stores like S3 (#703)
1 parent e9c81df commit 1e1eceb

File tree

4 files changed

+52
-10
lines changed

4 files changed

+52
-10
lines changed

cubed/core/ops.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from cubed.core.plan import Plan, new_temp_path
2424
from cubed.primitive.blockwise import blockwise as primitive_blockwise
2525
from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise
26+
from cubed.primitive.memory import get_buffer_copies
2627
from cubed.primitive.rechunk import rechunk as primitive_rechunk
2728
from cubed.spec import spec_from_config
2829
from cubed.storage.backend import open_backend_array
@@ -293,6 +294,7 @@ def blockwise(
293294

294295
name = gensym()
295296
spec = check_array_specs(arrays)
297+
buffer_copies = get_buffer_copies(spec)
296298
if target_store is None:
297299
target_store = new_temp_path(name=name, spec=spec)
298300
op = primitive_blockwise(
@@ -312,6 +314,7 @@ def blockwise(
312314
new_axes=new_axes,
313315
in_names=in_names,
314316
out_name=name,
317+
buffer_copies=buffer_copies,
315318
extra_func_kwargs=extra_func_kwargs,
316319
fusable_with_predecessors=fusable_with_predecessors,
317320
fusable_with_successors=fusable_with_successors,
@@ -442,6 +445,7 @@ def _general_blockwise(
442445
op_name = kwargs.pop("op_name", "blockwise")
443446

444447
spec = check_array_specs(arrays)
448+
buffer_copies = get_buffer_copies(spec)
445449

446450
if isinstance(target_stores, list): # multiple outputs
447451
name = [gensym() for _ in range(len(target_stores))]
@@ -461,6 +465,7 @@ def _general_blockwise(
461465
allowed_mem=spec.allowed_mem,
462466
reserved_mem=spec.reserved_mem,
463467
extra_projected_mem=extra_projected_mem,
468+
buffer_copies=buffer_copies,
464469
target_stores=target_stores,
465470
target_paths=target_paths,
466471
storage_options=spec.storage_options,
@@ -1157,10 +1162,13 @@ def _rechunk_plan(x, chunks, *, min_mem=None):
11571162
source_chunks = to_chunksize(normalize_chunks(x.chunks, x.shape, dtype=x.dtype))
11581163

11591164
# rechunker doesn't take account of uncompressed and compressed copies of the
1160-
# input and output array chunk/selection, so adjust appropriately
1161-
# note the factor is 5 (not 4) since there is a extra (unnecessary) copy
1162-
# made when writing out to Zarr
1163-
rechunker_max_mem = (spec.allowed_mem - spec.reserved_mem) // 5
1165+
# input and output array chunk/selection, so adjust appropriately:
1166+
# 1 input array plus copies to read that array from storage,
1167+
# 1 array for processing,
1168+
# 1 output array plus copies to write that array to storage
1169+
buffer_copies = get_buffer_copies(spec)
1170+
total_copies = 1 + buffer_copies.read + 1 + 1 + buffer_copies.write
1171+
rechunker_max_mem = (spec.allowed_mem - spec.reserved_mem) // total_copies
11641172
if min_mem is None:
11651173
min_mem = min(rechunker_max_mem // 20, x.nbytes)
11661174
stages = multistage_rechunking_plan(

cubed/primitive/blockwise.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ def blockwise(
162162
in_names: Optional[List[str]] = None,
163163
out_name: Optional[str] = None,
164164
extra_projected_mem: int = 0,
165+
buffer_copies: Optional[BufferCopies] = None,
165166
extra_func_kwargs: Optional[Dict[str, Any]] = None,
166167
fusable_with_predecessors: bool = True,
167168
fusable_with_successors: bool = True,
@@ -199,6 +200,8 @@ def blockwise(
199200
extra_projected_mem : int
200201
Extra memory projected to be needed (in bytes) in addition to the memory used reading
201202
the input arrays and writing the output.
203+
buffer_copies: BufferCopies
204+
The the number of buffer copies incurred for array storage operations.
202205
extra_func_kwargs : dict
203206
Extra keyword arguments to pass to function that can't be passed as regular keyword arguments
204207
since they clash with other blockwise arguments (such as dtype).
@@ -250,6 +253,7 @@ def blockwise(
250253
chunkss=[chunks],
251254
in_names=in_names,
252255
extra_projected_mem=extra_projected_mem,
256+
buffer_copies=buffer_copies,
253257
extra_func_kwargs=extra_func_kwargs,
254258
fusable_with_predecessors=fusable_with_predecessors,
255259
fusable_with_successors=fusable_with_successors,
@@ -274,6 +278,7 @@ def general_blockwise(
274278
chunkss: List[T_Chunks],
275279
in_names: Optional[List[str]] = None,
276280
extra_projected_mem: int = 0,
281+
buffer_copies: Optional[BufferCopies] = None,
277282
extra_func_kwargs: Optional[Dict[str, Any]] = None,
278283
fusable_with_predecessors: bool = True,
279284
fusable_with_successors: bool = True,
@@ -312,6 +317,8 @@ def general_blockwise(
312317
extra_projected_mem : int
313318
Extra memory projected to be needed (in bytes) in addition to the memory used reading
314319
the input arrays and writing the output.
320+
buffer_copies: BufferCopies
321+
The the number of buffer copies incurred for array storage operations.
315322
extra_func_kwargs : dict
316323
Extra keyword arguments to pass to function that can't be passed as regular keyword arguments
317324
since they clash with other blockwise arguments (such as dtype).
@@ -388,8 +395,7 @@ def general_blockwise(
388395
return_writes_stores,
389396
)
390397

391-
# assumes a single buffer copy for reading and writing, compare https://github.com/tomwhite/memray-array
392-
buffer_copies = BufferCopies(read=1, write=1)
398+
buffer_copies = buffer_copies or BufferCopies(read=1, write=1)
393399
projected_mem = calculate_projected_mem(
394400
reserved_mem=reserved_mem,
395401
inputs=[array_memory(array.dtype, array.chunks) for array in arrays],

cubed/primitive/memory.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
from dataclasses import dataclass
2-
from typing import List
2+
from typing import List, Optional
3+
4+
from cubed.spec import Spec
5+
from cubed.utils import is_cloud_storage_path
36

47

58
@dataclass
@@ -13,6 +16,25 @@ class BufferCopies:
1316
"""The number of copies made when writing an array to storage."""
1417

1518

19+
def get_buffer_copies(spec: Optional[Spec]):
20+
"""Return the number of buffer copies to use, based on the spec.
21+
22+
Using cloud storage will result in more buffer copies being accounted for.
23+
"""
24+
25+
# See https://github.com/tomwhite/memray-array
26+
# More factors (e.g. compression) could be taken into account in the future.
27+
28+
if (
29+
spec is not None
30+
and spec.work_dir is not None
31+
and is_cloud_storage_path(spec.work_dir)
32+
):
33+
return BufferCopies(read=2, write=2)
34+
35+
return BufferCopies(read=1, write=1)
36+
37+
1638
def calculate_projected_mem(
1739
reserved_mem: int,
1840
inputs: List[int],

cubed/tests/test_rechunk.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,20 @@ def test_rechunk_era5(
5454
assert max_output_blocks == expected_max_output_blocks
5555

5656

57-
def test_rechunk_era5_chunk_sizes():
57+
@pytest.mark.parametrize(
58+
"spec",
59+
[
60+
cubed.Spec(allowed_mem="2.5GB"),
61+
# cloud stores use extra buffer copies, so need more memory for same rechunk plan
62+
cubed.Spec("s3://cubed-unittest/rechunk-era5", allowed_mem="3.5GB"),
63+
],
64+
)
65+
def test_rechunk_era5_chunk_sizes(spec):
5866
# from https://github.com/pangeo-data/rechunker/pull/89
5967
shape = (350640, 721, 1440)
6068
source_chunks = (31, 721, 1440)
6169
target_chunks = (350640, 10, 10)
6270

63-
spec = cubed.Spec(allowed_mem="2.5GB")
64-
6571
a = xp.empty(shape, dtype=xp.float32, chunks=source_chunks, spec=spec)
6672

6773
from cubed.core.ops import _rechunk_plan

0 commit comments

Comments
 (0)