-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Generalize handling of chunked array types #7019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 158 commits
15fc2b8
5e05b71
cff89ee
039973b
60d44bc
5ddba7e
37d0d66
67d7efc
cdcb3fb
73e4563
5995685
46223ae
320b09f
3facfd6
c616a85
ecabaa4
e53a588
fe21edd
3f6aedc
ea8f482
c49ab8e
26d1868
e9b4a33
3a43b00
c7c9589
fc051e3
56e9d0f
3b16cca
7ac3323
e732b87
68930eb
8442e1f
3717431
78d8969
7ac6531
e423bfb
149db9d
280c563
42186e7
103a755
f2bce3d
f09947d
e760f10
b1a4e35
5320f4d
45ed5d2
eec096b
c64ff5f
8a37905
989d6bb
8c7fe79
0222b55
9d6cf6b
afc6abc
2c0cc26
1a255cf
c398d98
598bf12
7bef188
7af5395
287e96c
8bbc141
6cfe9fa
4ca044b
8ed5ed6
2a4c38b
45a4c98
dee5b33
176d7fa
9e58d6d
a6219a0
9f21994
eb7bb0b
57733de
4c58b28
d07830c
c1bf040
3ae21d9
7ef0129
ec22963
4c8d773
f4de577
1cd7283
5386711
6b173de
c431a5f
7ab9047
72f8f5f
34c6aea
fb9466d
ffd2e21
36b2be0
77a1e4e
a6222f9
61fe236
447d1f1
a7a6a6e
aa64996
fec1a13
db11947
2c18df6
2e49154
748e90d
70804f4
dae2fe4
4ef500c
ba66419
7fd4617
69d77c9
8337857
9850a46
488fd5b
00bcf6c
ff1f8ab
844726d
3d56a3d
1952c55
3ba8d42
b15411c
53d6094
7dc6581
fcaf499
64df7e8
747ada5
9b33ab7
c8d5aa1
6a7a043
20f92c6
796a577
29d0c92
031017b
a8c3413
14a1226
5dd9d35
5a46294
d6b56c6
471d22a
8378f43
f8b1020
907c15b
11676ab
76ce09e
127c184
604bbf3
7604594
97537dd
026fe17
355555f
6eac87a
f4224f6
4ec8370
316c63d
9cd9078
5dc2016
995eb5a
c8b9ee7
5c95758
a61a30a
ea35a32
e68b327
956c055
42ad08e
cf0c28e
4f2ec27
5f2f569
929db33
876f81c
ad0a706
115b52b
8741eec
a1ba4f0
bdf7600
06bb508
ba00558
6a99454
95d81e8
e5e3096
a221436
fe2e9b3
7bcaece
fecf7ed
15dc44b
660ef41
e6d6f1f
c7fbe79
d728427
6b9fa3f
51db5f2
c69c563
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ module = [ | |
"cf_units.*", | ||
"cfgrib.*", | ||
"cftime.*", | ||
"cubed.*", | ||
"cupy.*", | ||
"fsspec.*", | ||
"h5netcdf.*", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,16 @@ | |
from glob import glob | ||
from io import BytesIO | ||
from numbers import Number | ||
from typing import TYPE_CHECKING, Any, Callable, Final, Literal, Union, cast, overload | ||
from typing import ( | ||
TYPE_CHECKING, | ||
Any, | ||
Callable, | ||
Final, | ||
Literal, | ||
Union, | ||
cast, | ||
overload, | ||
) | ||
|
||
import numpy as np | ||
|
||
|
@@ -20,9 +29,11 @@ | |
_nested_combine, | ||
combine_by_coords, | ||
) | ||
from xarray.core.daskmanager import DaskManager | ||
from xarray.core.dataarray import DataArray | ||
from xarray.core.dataset import Dataset, _get_chunk, _maybe_chunk | ||
from xarray.core.indexes import Index | ||
from xarray.core.parallelcompat import guess_chunkmanager | ||
from xarray.core.utils import is_remote_uri | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -38,6 +49,7 @@ | |
CompatOptions, | ||
JoinOptions, | ||
NestedSequence, | ||
T_Chunks, | ||
) | ||
|
||
T_NetcdfEngine = Literal["netcdf4", "scipy", "h5netcdf"] | ||
|
@@ -48,7 +60,6 @@ | |
str, # no nice typing support for custom backends | ||
None, | ||
] | ||
T_Chunks = Union[int, dict[Any, Any], Literal["auto"], None] | ||
T_NetcdfTypes = Literal[ | ||
"NETCDF4", "NETCDF4_CLASSIC", "NETCDF3_64BIT", "NETCDF3_CLASSIC" | ||
] | ||
|
@@ -297,17 +308,27 @@ def _chunk_ds( | |
chunks, | ||
overwrite_encoded_chunks, | ||
inline_array, | ||
chunked_array_type, | ||
from_array_kwargs, | ||
**extra_tokens, | ||
): | ||
from dask.base import tokenize | ||
chunkmanager = guess_chunkmanager(chunked_array_type) | ||
|
||
# TODO refactor to move this dask-specific logic inside the DaskManager class | ||
if isinstance(chunkmanager, DaskManager): | ||
from dask.base import tokenize | ||
|
||
mtime = _get_mtime(filename_or_obj) | ||
token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens) | ||
name_prefix = f"open_dataset-{token}" | ||
mtime = _get_mtime(filename_or_obj) | ||
token = tokenize(filename_or_obj, mtime, engine, chunks, **extra_tokens) | ||
name_prefix = "open_dataset-" | ||
else: | ||
# not used | ||
token = (None,) | ||
name_prefix = None | ||
|
||
variables = {} | ||
for name, var in backend_ds.variables.items(): | ||
var_chunks = _get_chunk(var, chunks) | ||
var_chunks = _get_chunk(var, chunks, chunkmanager) | ||
variables[name] = _maybe_chunk( | ||
name, | ||
var, | ||
|
@@ -316,6 +337,8 @@ def _chunk_ds( | |
name_prefix=name_prefix, | ||
token=token, | ||
inline_array=inline_array, | ||
chunked_array_type=chunkmanager, | ||
from_array_kwargs=from_array_kwargs.copy(), | ||
) | ||
return backend_ds._replace(variables) | ||
|
||
|
@@ -328,6 +351,8 @@ def _dataset_from_backend_dataset( | |
cache, | ||
overwrite_encoded_chunks, | ||
inline_array, | ||
chunked_array_type, | ||
from_array_kwargs, | ||
**extra_tokens, | ||
): | ||
if not isinstance(chunks, (int, dict)) and chunks not in {None, "auto"}: | ||
|
@@ -346,6 +371,8 @@ def _dataset_from_backend_dataset( | |
chunks, | ||
overwrite_encoded_chunks, | ||
inline_array, | ||
chunked_array_type, | ||
from_array_kwargs, | ||
**extra_tokens, | ||
) | ||
|
||
|
@@ -373,6 +400,8 @@ def open_dataset( | |
decode_coords: Literal["coordinates", "all"] | bool | None = None, | ||
drop_variables: str | Iterable[str] | None = None, | ||
inline_array: bool = False, | ||
chunked_array_type: str | None = None, | ||
from_array_kwargs: dict[str, Any] | None = None, | ||
backend_kwargs: dict[str, Any] | None = None, | ||
**kwargs, | ||
) -> Dataset: | ||
|
@@ -465,6 +494,15 @@ def open_dataset( | |
itself, and each chunk refers to that task by its key. With | ||
``inline_array=True``, Dask will instead inline the array directly | ||
in the values of the task graph. See :py:func:`dask.array.from_array`. | ||
chunked_array_type: str, optional | ||
Which chunked array type to coerce this datasets' arrays to. | ||
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEnetryPoint` system. | ||
Experimental API that should not be relied upon. | ||
from_array_kwargs: dict | ||
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create | ||
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg. | ||
For example if :py:func:`dask.array.Array` objects are used for chunking, additional kwargs will be passed | ||
to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon. | ||
backend_kwargs: dict | ||
Additional keyword arguments passed on to the engine open function, | ||
equivalent to `**kwargs`. | ||
|
@@ -508,6 +546,9 @@ def open_dataset( | |
if engine is None: | ||
engine = plugins.guess_engine(filename_or_obj) | ||
|
||
if from_array_kwargs is None: | ||
from_array_kwargs = {} | ||
|
||
backend = plugins.get_backend(engine) | ||
|
||
decoders = _resolve_decoders_kwargs( | ||
|
@@ -536,6 +577,8 @@ def open_dataset( | |
cache, | ||
overwrite_encoded_chunks, | ||
inline_array, | ||
chunked_array_type, | ||
from_array_kwargs, | ||
drop_variables=drop_variables, | ||
**decoders, | ||
**kwargs, | ||
|
@@ -546,8 +589,8 @@ def open_dataset( | |
def open_dataarray( | ||
filename_or_obj: str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore, | ||
*, | ||
engine: T_Engine = None, | ||
chunks: T_Chunks = None, | ||
engine: T_Engine | None = None, | ||
chunks: T_Chunks | None = None, | ||
cache: bool | None = None, | ||
decode_cf: bool | None = None, | ||
mask_and_scale: bool | None = None, | ||
|
@@ -558,6 +601,8 @@ def open_dataarray( | |
decode_coords: Literal["coordinates", "all"] | bool | None = None, | ||
drop_variables: str | Iterable[str] | None = None, | ||
inline_array: bool = False, | ||
chunked_array_type: str | None = None, | ||
from_array_kwargs: dict[str, Any] | None = None, | ||
backend_kwargs: dict[str, Any] | None = None, | ||
**kwargs, | ||
) -> DataArray: | ||
|
@@ -652,6 +697,15 @@ def open_dataarray( | |
itself, and each chunk refers to that task by its key. With | ||
``inline_array=True``, Dask will instead inline the array directly | ||
in the values of the task graph. See :py:func:`dask.array.from_array`. | ||
chunked_array_type: str, optional | ||
Which chunked array type to coerce the underlying data array to. | ||
Defaults to 'dask' if installed, else whatever is registered via the `ChunkManagerEnetryPoint` system. | ||
Experimental API that should not be relied upon. | ||
from_array_kwargs: dict | ||
Additional keyword arguments passed on to the `ChunkManagerEntrypoint.from_array` method used to create | ||
chunked arrays, via whichever chunk manager is specified through the `chunked_array_type` kwarg. | ||
For example if :py:func:`dask.array.Array` objects are used for chunking, additional kwargs will be passed | ||
to :py:func:`dask.array.from_array`. Experimental API that should not be relied upon. | ||
backend_kwargs: dict | ||
Additional keyword arguments passed on to the engine open function, | ||
equivalent to `**kwargs`. | ||
|
@@ -695,6 +749,8 @@ def open_dataarray( | |
cache=cache, | ||
drop_variables=drop_variables, | ||
inline_array=inline_array, | ||
chunked_array_type=chunked_array_type, | ||
from_array_kwargs=from_array_kwargs, | ||
backend_kwargs=backend_kwargs, | ||
use_cftime=use_cftime, | ||
decode_timedelta=decode_timedelta, | ||
|
@@ -726,7 +782,7 @@ def open_dataarray( | |
|
||
def open_mfdataset( | ||
paths: str | NestedSequence[str | os.PathLike], | ||
chunks: T_Chunks = None, | ||
chunks: T_Chunks | None = None, | ||
concat_dim: str | ||
| DataArray | ||
| Index | ||
|
@@ -736,7 +792,7 @@ def open_mfdataset( | |
| None = None, | ||
compat: CompatOptions = "no_conflicts", | ||
preprocess: Callable[[Dataset], Dataset] | None = None, | ||
engine: T_Engine = None, | ||
engine: T_Engine | None = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually don't think we need to - This should actually just work, except in the case of |
||
data_vars: Literal["all", "minimal", "different"] | list[str] = "all", | ||
coords="different", | ||
combine: Literal["by_coords", "nested"] = "by_coords", | ||
|
@@ -1490,6 +1546,7 @@ def to_zarr( | |
safe_chunks: bool = True, | ||
storage_options: dict[str, str] | None = None, | ||
zarr_version: int | None = None, | ||
chunkmanager_store_kwargs: dict[str, Any] | None = None, | ||
) -> backends.ZarrStore: | ||
... | ||
|
||
|
@@ -1512,6 +1569,7 @@ def to_zarr( | |
safe_chunks: bool = True, | ||
storage_options: dict[str, str] | None = None, | ||
zarr_version: int | None = None, | ||
chunkmanager_store_kwargs: dict[str, Any] | None = None, | ||
) -> Delayed: | ||
... | ||
|
||
|
@@ -1531,6 +1589,7 @@ def to_zarr( | |
safe_chunks: bool = True, | ||
storage_options: dict[str, str] | None = None, | ||
zarr_version: int | None = None, | ||
chunkmanager_store_kwargs: dict[str, Any] | None = None, | ||
) -> backends.ZarrStore | Delayed: | ||
"""This function creates an appropriate datastore for writing a dataset to | ||
a zarr ztore | ||
|
@@ -1652,7 +1711,9 @@ def to_zarr( | |
writer = ArrayWriter() | ||
# TODO: figure out how to properly handle unlimited_dims | ||
dump_to_store(dataset, zstore, writer, encoding=encoding) | ||
writes = writer.sync(compute=compute) | ||
writes = writer.sync( | ||
compute=compute, chunkmanager_store_kwargs=chunkmanager_store_kwargs | ||
) | ||
|
||
if compute: | ||
_finalize_store(writes, zstore) | ||
|
Uh oh!
There was an error while loading. Please reload this page.