1
+ from __future__ import annotations
2
+
1
3
import os
4
+ from dataclasses import dataclass
2
5
3
6
import numpy as np
4
7
import pandas as pd
5
8
6
9
import xarray as xr
7
10
8
- from . import _skip_slow , randint , randn , requires_dask
11
+ from . import _skip_slow , parameterized , randint , randn , requires_dask
9
12
10
13
try :
11
14
import dask
16
19
17
20
os .environ ["HDF5_USE_FILE_LOCKING" ] = "FALSE"
18
21
22
+ _ENGINES = tuple (xr .backends .list_engines ().keys () - {"store" })
23
+
19
24
20
25
class IOSingleNetCDF :
21
26
"""
@@ -28,10 +33,6 @@ class IOSingleNetCDF:
28
33
number = 5
29
34
30
35
def make_ds (self ):
31
- # TODO: Lazily skipped in CI as it is very demanding and slow.
32
- # Improve times and remove errors.
33
- _skip_slow ()
34
-
35
36
# single Dataset
36
37
self .ds = xr .Dataset ()
37
38
self .nt = 1000
@@ -95,6 +96,10 @@ def make_ds(self):
95
96
96
97
class IOWriteSingleNetCDF3 (IOSingleNetCDF ):
97
98
def setup (self ):
99
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
100
+ # Improve times and remove errors.
101
+ _skip_slow ()
102
+
98
103
self .format = "NETCDF3_64BIT"
99
104
self .make_ds ()
100
105
@@ -107,6 +112,9 @@ def time_write_dataset_scipy(self):
107
112
108
113
class IOReadSingleNetCDF4 (IOSingleNetCDF ):
109
114
def setup (self ):
115
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
116
+ # Improve times and remove errors.
117
+ _skip_slow ()
110
118
111
119
self .make_ds ()
112
120
@@ -128,6 +136,9 @@ def time_vectorized_indexing(self):
128
136
129
137
class IOReadSingleNetCDF3 (IOReadSingleNetCDF4 ):
130
138
def setup (self ):
139
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
140
+ # Improve times and remove errors.
141
+ _skip_slow ()
131
142
132
143
self .make_ds ()
133
144
@@ -149,6 +160,9 @@ def time_vectorized_indexing(self):
149
160
150
161
class IOReadSingleNetCDF4Dask (IOSingleNetCDF ):
151
162
def setup (self ):
163
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
164
+ # Improve times and remove errors.
165
+ _skip_slow ()
152
166
153
167
requires_dask ()
154
168
@@ -189,6 +203,9 @@ def time_load_dataset_netcdf4_with_time_chunks_multiprocessing(self):
189
203
190
204
class IOReadSingleNetCDF3Dask (IOReadSingleNetCDF4Dask ):
191
205
def setup (self ):
206
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
207
+ # Improve times and remove errors.
208
+ _skip_slow ()
192
209
193
210
requires_dask ()
194
211
@@ -230,10 +247,6 @@ class IOMultipleNetCDF:
230
247
number = 5
231
248
232
249
def make_ds (self , nfiles = 10 ):
233
- # TODO: Lazily skipped in CI as it is very demanding and slow.
234
- # Improve times and remove errors.
235
- _skip_slow ()
236
-
237
250
# multiple Dataset
238
251
self .ds = xr .Dataset ()
239
252
self .nt = 1000
@@ -298,6 +311,10 @@ def make_ds(self, nfiles=10):
298
311
299
312
class IOWriteMultipleNetCDF3 (IOMultipleNetCDF ):
300
313
def setup (self ):
314
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
315
+ # Improve times and remove errors.
316
+ _skip_slow ()
317
+
301
318
self .make_ds ()
302
319
self .format = "NETCDF3_64BIT"
303
320
@@ -314,6 +331,9 @@ def time_write_dataset_scipy(self):
314
331
315
332
class IOReadMultipleNetCDF4 (IOMultipleNetCDF ):
316
333
def setup (self ):
334
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
335
+ # Improve times and remove errors.
336
+ _skip_slow ()
317
337
318
338
requires_dask ()
319
339
@@ -330,6 +350,9 @@ def time_open_dataset_netcdf4(self):
330
350
331
351
class IOReadMultipleNetCDF3 (IOReadMultipleNetCDF4 ):
332
352
def setup (self ):
353
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
354
+ # Improve times and remove errors.
355
+ _skip_slow ()
333
356
334
357
requires_dask ()
335
358
@@ -346,6 +369,9 @@ def time_open_dataset_scipy(self):
346
369
347
370
class IOReadMultipleNetCDF4Dask (IOMultipleNetCDF ):
348
371
def setup (self ):
372
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
373
+ # Improve times and remove errors.
374
+ _skip_slow ()
349
375
350
376
requires_dask ()
351
377
@@ -400,6 +426,9 @@ def time_open_dataset_netcdf4_with_time_chunks_multiprocessing(self):
400
426
401
427
class IOReadMultipleNetCDF3Dask (IOReadMultipleNetCDF4Dask ):
402
428
def setup (self ):
429
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
430
+ # Improve times and remove errors.
431
+ _skip_slow ()
403
432
404
433
requires_dask ()
405
434
@@ -435,10 +464,6 @@ def time_open_dataset_scipy_with_time_chunks(self):
435
464
def create_delayed_write ():
436
465
import dask .array as da
437
466
438
- # TODO: Lazily skipped in CI as it is very demanding and slow.
439
- # Improve times and remove errors.
440
- _skip_slow ()
441
-
442
467
vals = da .random .random (300 , chunks = (1 ,))
443
468
ds = xr .Dataset ({"vals" : (["a" ], vals )})
444
469
return ds .to_netcdf ("file.nc" , engine = "netcdf4" , compute = False )
@@ -450,7 +475,12 @@ class IOWriteNetCDFDask:
450
475
number = 5
451
476
452
477
def setup (self ):
478
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
479
+ # Improve times and remove errors.
480
+ _skip_slow ()
481
+
453
482
requires_dask ()
483
+
454
484
self .write = create_delayed_write ()
455
485
456
486
def time_write (self ):
@@ -459,15 +489,17 @@ def time_write(self):
459
489
460
490
class IOWriteNetCDFDaskDistributed :
461
491
def setup (self ):
492
+ # TODO: Lazily skipped in CI as it is very demanding and slow.
493
+ # Improve times and remove errors.
494
+ _skip_slow ()
495
+
496
+ requires_dask ()
497
+
462
498
try :
463
499
import distributed
464
500
except ImportError :
465
501
raise NotImplementedError ()
466
502
467
- # TODO: Lazily skipped in CI as it is very demanding and slow.
468
- # Improve times and remove errors.
469
- _skip_slow ()
470
-
471
503
self .client = distributed .Client ()
472
504
self .write = create_delayed_write ()
473
505
@@ -476,3 +508,145 @@ def cleanup(self):
476
508
477
509
def time_write (self ):
478
510
self .write .compute ()
511
+
512
+
513
+ class IOReadSingleFile (IOSingleNetCDF ):
514
+ def setup (self , * args , ** kwargs ):
515
+ self .make_ds ()
516
+
517
+ self .filepaths = {}
518
+ for engine in _ENGINES :
519
+ self .filepaths [engine ] = f"test_single_file_with_{ engine } .nc"
520
+ self .ds .to_netcdf (self .filepaths [engine ], engine = engine )
521
+
522
+ @parameterized (["engine" , "chunks" ], (_ENGINES , [None , {}]))
523
+ def time_read_dataset (self , engine , chunks ):
524
+ xr .open_dataset (self .filepaths [engine ], engine = engine , chunks = chunks )
525
+
526
+
527
+ class IOReadCustomEngine :
528
+ def setup (self , * args , ** kwargs ):
529
+ """
530
+ The custom backend does the bare mininum to be considered a lazy backend. But
531
+ the data in it is still in memory so slow file reading shouldn't affect the
532
+ results.
533
+ """
534
+ requires_dask ()
535
+
536
+ @dataclass
537
+ class PerformanceBackendArray (xr .backends .BackendArray ):
538
+ filename_or_obj : str | os .PathLike | None
539
+ shape : tuple [int , ...]
540
+ dtype : np .dtype
541
+ lock : xr .backends .locks .SerializableLock
542
+
543
+ def __getitem__ (self , key : tuple ):
544
+ return xr .core .indexing .explicit_indexing_adapter (
545
+ key ,
546
+ self .shape ,
547
+ xr .core .indexing .IndexingSupport .BASIC ,
548
+ self ._raw_indexing_method ,
549
+ )
550
+
551
+ def _raw_indexing_method (self , key : tuple ):
552
+ raise NotImplementedError
553
+
554
+ @dataclass
555
+ class PerformanceStore (xr .backends .common .AbstractWritableDataStore ):
556
+ manager : xr .backends .CachingFileManager
557
+ mode : str | None = None
558
+ lock : xr .backends .locks .SerializableLock | None = None
559
+ autoclose : bool = False
560
+
561
+ def __post_init__ (self ):
562
+ self .filename = self .manager ._args [0 ]
563
+
564
+ @classmethod
565
+ def open (
566
+ cls ,
567
+ filename : str | os .PathLike | None ,
568
+ mode : str = "r" ,
569
+ lock : xr .backends .locks .SerializableLock | None = None ,
570
+ autoclose : bool = False ,
571
+ ):
572
+ if lock is None :
573
+ if mode == "r" :
574
+ locker = xr .backends .locks .SerializableLock ()
575
+ else :
576
+ locker = xr .backends .locks .SerializableLock ()
577
+ else :
578
+ locker = lock
579
+
580
+ manager = xr .backends .CachingFileManager (
581
+ xr .backends .DummyFileManager ,
582
+ filename ,
583
+ mode = mode ,
584
+ )
585
+ return cls (manager , mode = mode , lock = locker , autoclose = autoclose )
586
+
587
+ def load (self ) -> tuple :
588
+ """
589
+ Load a bunch of test data quickly.
590
+
591
+ Normally this method would've opened a file and parsed it.
592
+ """
593
+ n_variables = 2000
594
+
595
+ # Important to have a shape and dtype for lazy loading.
596
+ shape = (1 ,)
597
+ dtype = np .dtype (int )
598
+ variables = {
599
+ f"long_variable_name_{ v } " : xr .Variable (
600
+ data = PerformanceBackendArray (
601
+ self .filename , shape , dtype , self .lock
602
+ ),
603
+ dims = ("time" ,),
604
+ fastpath = True ,
605
+ )
606
+ for v in range (0 , n_variables )
607
+ }
608
+ attributes = {}
609
+
610
+ return variables , attributes
611
+
612
+ class PerformanceBackend (xr .backends .BackendEntrypoint ):
613
+ def open_dataset (
614
+ self ,
615
+ filename_or_obj : str | os .PathLike | None ,
616
+ drop_variables : tuple [str ] = None ,
617
+ * ,
618
+ mask_and_scale = True ,
619
+ decode_times = True ,
620
+ concat_characters = True ,
621
+ decode_coords = True ,
622
+ use_cftime = None ,
623
+ decode_timedelta = None ,
624
+ lock = None ,
625
+ ** kwargs ,
626
+ ) -> xr .Dataset :
627
+ filename_or_obj = xr .backends .common ._normalize_path (filename_or_obj )
628
+ store = PerformanceStore .open (filename_or_obj , lock = lock )
629
+
630
+ store_entrypoint = xr .backends .store .StoreBackendEntrypoint ()
631
+
632
+ ds = store_entrypoint .open_dataset (
633
+ store ,
634
+ mask_and_scale = mask_and_scale ,
635
+ decode_times = decode_times ,
636
+ concat_characters = concat_characters ,
637
+ decode_coords = decode_coords ,
638
+ drop_variables = drop_variables ,
639
+ use_cftime = use_cftime ,
640
+ decode_timedelta = decode_timedelta ,
641
+ )
642
+ return ds
643
+
644
+ self .engine = PerformanceBackend
645
+
646
+ @parameterized (["chunks" ], ([None , {}]))
647
+ def time_open_dataset (self , chunks ):
648
+ """
649
+ Time how fast xr.open_dataset is without the slow data reading part.
650
+ Test with and without dask.
651
+ """
652
+ xr .open_dataset (None , engine = self .engine , chunks = chunks )
0 commit comments