-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Generalize handling of chunked array types #7019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
15fc2b8
5e05b71
cff89ee
039973b
60d44bc
5ddba7e
37d0d66
67d7efc
cdcb3fb
73e4563
5995685
46223ae
320b09f
3facfd6
c616a85
ecabaa4
e53a588
fe21edd
3f6aedc
ea8f482
c49ab8e
26d1868
e9b4a33
3a43b00
c7c9589
fc051e3
56e9d0f
3b16cca
7ac3323
e732b87
68930eb
8442e1f
3717431
78d8969
7ac6531
e423bfb
149db9d
280c563
42186e7
103a755
f2bce3d
f09947d
e760f10
b1a4e35
5320f4d
45ed5d2
eec096b
c64ff5f
8a37905
989d6bb
8c7fe79
0222b55
9d6cf6b
afc6abc
2c0cc26
1a255cf
c398d98
598bf12
7bef188
7af5395
287e96c
8bbc141
6cfe9fa
4ca044b
8ed5ed6
2a4c38b
45a4c98
dee5b33
176d7fa
9e58d6d
a6219a0
9f21994
eb7bb0b
57733de
4c58b28
d07830c
c1bf040
3ae21d9
7ef0129
ec22963
4c8d773
f4de577
1cd7283
5386711
6b173de
c431a5f
7ab9047
72f8f5f
34c6aea
fb9466d
ffd2e21
36b2be0
77a1e4e
a6222f9
61fe236
447d1f1
a7a6a6e
aa64996
fec1a13
db11947
2c18df6
2e49154
748e90d
70804f4
dae2fe4
4ef500c
ba66419
7fd4617
69d77c9
8337857
9850a46
488fd5b
00bcf6c
ff1f8ab
844726d
3d56a3d
1952c55
3ba8d42
b15411c
53d6094
7dc6581
fcaf499
64df7e8
747ada5
9b33ab7
c8d5aa1
6a7a043
20f92c6
796a577
29d0c92
031017b
a8c3413
14a1226
5dd9d35
5a46294
d6b56c6
471d22a
8378f43
f8b1020
907c15b
11676ab
76ce09e
127c184
604bbf3
7604594
97537dd
026fe17
355555f
6eac87a
f4224f6
4ec8370
316c63d
9cd9078
5dc2016
995eb5a
c8b9ee7
5c95758
a61a30a
ea35a32
e68b327
956c055
42ad08e
cf0c28e
4f2ec27
5f2f569
929db33
876f81c
ad0a706
115b52b
8741eec
a1ba4f0
bdf7600
06bb508
ba00558
6a99454
95d81e8
e5e3096
a221436
fe2e9b3
7bcaece
fecf7ed
15dc44b
660ef41
e6d6f1f
c7fbe79
d728427
6b9fa3f
51db5f2
c69c563
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,199 @@ | ||||||
""" | ||||||
The code in this module is an experiment in going from N=1 to N=2 parallel computing frameworks in xarray. | ||||||
It could later be used as the basis for a public interface allowing any N frameworks to interoperate with xarray, | ||||||
but for now it is just a private experiment. | ||||||
""" | ||||||
|
||||||
from abc import ABC, abstractmethod | ||||||
from typing import Any | ||||||
|
||||||
import numpy as np | ||||||
|
||||||
from . import indexing, utils | ||||||
from .pycompat import DuckArrayModule, is_duck_dask_array | ||||||
|
||||||
CHUNK_MANAGERS = {} | ||||||
|
||||||
|
||||||
def _get_chunk_manager(name: str) -> "ChunkManager": | ||||||
if name in CHUNK_MANAGERS: | ||||||
chunkmanager = CHUNK_MANAGERS[name] | ||||||
return chunkmanager() | ||||||
else: | ||||||
raise ImportError(f"ChunkManager {name} has not been defined") | ||||||
|
||||||
|
||||||
class ChunkManager(ABC): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just tried to avoid the word "backend" because we use it for the file I/O, our only other entrypoint... |
||||||
""" | ||||||
Adapter between a particular parallel computing framework and xarray. | ||||||
|
||||||
Attributes | ||||||
---------- | ||||||
array_type | ||||||
Type of the array class this parallel computing framework provides. | ||||||
|
||||||
Parallel frameworks need to provide an array class that supports the array API standard. | ||||||
Used for type checking. | ||||||
""" | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
@staticmethod | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
@abstractmethod | ||||||
def chunks(arr): | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
... | ||||||
|
||||||
@staticmethod | ||||||
@abstractmethod | ||||||
def chunk(data: np.ndarray, chunks, **kwargs): | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
... | ||||||
|
||||||
@staticmethod | ||||||
@abstractmethod | ||||||
def rechunk(data: Any, chunks, **kwargs): | ||||||
... | ||||||
|
||||||
@staticmethod | ||||||
@abstractmethod | ||||||
def compute(arr, **kwargs) -> np.ndarray: | ||||||
... | ||||||
|
||||||
@staticmethod | ||||||
@abstractmethod | ||||||
def apply_ufunc(): | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
""" | ||||||
Called inside xarray.apply_ufunc, so must be supplied for vast majority of xarray computations to be supported. | ||||||
""" | ||||||
... | ||||||
|
||||||
@staticmethod | ||||||
def map_blocks(): | ||||||
"""Called by xarray.map_blocks.""" | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
raise NotImplementedError() | ||||||
|
||||||
@staticmethod | ||||||
def blockwise(): | ||||||
"""Called by some niche functions in xarray.""" | ||||||
raise NotImplementedError() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if we only had to implement one of this operations, e.g., perhaps I would be curious what @tomwhite thinks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be possible to implement all of these with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's value in a minimal interface, and I doubt there's any real advatantage to using these other formulations -- other than the accident that that is how we wrote Xarray's existing implementation on top of Dask. That said, for now it would be a great start just to get everything in one file, so the structure of our dependence on Dask becomes clear. We can save this clean-up for later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tomwhite presumably adding
Definitely. |
||||||
|
||||||
|
||||||
class DaskManager(ChunkManager): | ||||||
def __init__(self): | ||||||
from dask.array import Array | ||||||
|
||||||
self.array_type = Array | ||||||
|
||||||
@staticmethod | ||||||
def chunks(arr: "dask.array.Array"): | ||||||
return arr.chunks | ||||||
|
||||||
@staticmethod | ||||||
def chunk(data: Any, chunks, **kwargs): | ||||||
import dask.array as da | ||||||
|
||||||
# dask-specific kwargs | ||||||
name = kwargs.pop("name", None) | ||||||
lock = kwargs.pop("lock", False) | ||||||
inline_array = kwargs.pop("inline_array", False) | ||||||
|
||||||
if is_duck_dask_array(data): | ||||||
data = data.rechunk(chunks) | ||||||
elif isinstance(data, DuckArrayModule("cubed").type): | ||||||
raise TypeError("Trying to rechunk a cubed array using dask") | ||||||
else: | ||||||
if isinstance(data, indexing.ExplicitlyIndexed): | ||||||
# Unambiguously handle array storage backends (like NetCDF4 and h5py) | ||||||
# that can't handle general array indexing. For example, in netCDF4 you | ||||||
# can do "outer" indexing along two dimensions independent, which works | ||||||
# differently from how NumPy handles it. | ||||||
# da.from_array works by using lazy indexing with a tuple of slices. | ||||||
# Using OuterIndexer is a pragmatic choice: dask does not yet handle | ||||||
# different indexing types in an explicit way: | ||||||
# https://github.com/dask/dask/issues/2883 | ||||||
data = indexing.ImplicitToExplicitIndexingAdapter( | ||||||
data, indexing.OuterIndexer | ||||||
) | ||||||
|
||||||
# All of our lazily loaded backend array classes should use NumPy | ||||||
# array operations. | ||||||
dask_kwargs = {"meta": np.ndarray} | ||||||
else: | ||||||
dask_kwargs = {} | ||||||
|
||||||
if utils.is_dict_like(chunks): | ||||||
chunks = tuple(chunks.get(n, s) for n, s in enumerate(data.shape)) | ||||||
|
||||||
data = da.from_array( | ||||||
data, | ||||||
chunks, | ||||||
name=name, | ||||||
lock=lock, | ||||||
inline_array=inline_array, | ||||||
**dask_kwargs, | ||||||
) | ||||||
return data | ||||||
|
||||||
@staticmethod | ||||||
def rechunk(chunks, **kwargs): | ||||||
... | ||||||
|
||||||
@staticmethod | ||||||
def compute(arr, **kwargs): | ||||||
return arr.compute(**kwargs) | ||||||
|
||||||
@staticmethod | ||||||
def apply_ufunc(): | ||||||
from dask.array.gufunc import apply_gufunc | ||||||
|
||||||
... | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
@staticmethod | ||||||
def map_blocks(): | ||||||
from dask.array import map_blocks | ||||||
|
||||||
... | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
@staticmethod | ||||||
def blockwise(): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interestingly this is only used in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I didn't even realise we called |
||||||
from dask.array import blockwise | ||||||
|
||||||
... | ||||||
TomNicholas marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
|
||||||
try: | ||||||
import dask | ||||||
|
||||||
CHUNK_MANAGERS["dask"] = DaskManager | ||||||
except ImportError: | ||||||
pass | ||||||
|
||||||
|
||||||
class CubedManager(ChunkManager): | ||||||
def __init__(self): | ||||||
from cubed import Array | ||||||
|
||||||
self.array_type = Array | ||||||
|
||||||
def chunk(self, data: np.ndarray, chunks, **kwargs): | ||||||
import cubed # type: ignore | ||||||
|
||||||
spec = kwargs.pop("spec", None) | ||||||
|
||||||
if isinstance(data, cubed.Array): | ||||||
data = data.rechunk(chunks) | ||||||
elif is_duck_dask_array(data): | ||||||
raise TypeError("Trying to rechunk a dask array using cubed") | ||||||
else: | ||||||
data = cubed.from_array( | ||||||
data, | ||||||
chunks, | ||||||
spec=spec, | ||||||
) | ||||||
|
||||||
return data | ||||||
|
||||||
|
||||||
try: | ||||||
import cubed | ||||||
|
||||||
CHUNK_MANAGERS["cubed"] = CubedManager | ||||||
except ImportError: | ||||||
pass |
Uh oh!
There was an error while loading. Please reload this page.