|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
| 3 | +import asyncio |
3 | 4 | import copy
|
4 | 5 | import datetime
|
5 | 6 | import math
|
@@ -531,49 +532,50 @@ def load(self, **kwargs) -> Self:
|
531 | 532 | dask.compute
|
532 | 533 | """
|
533 | 534 | # access .data to coerce everything to numpy or dask arrays
|
534 |
| - lazy_data = { |
| 535 | + chunked_data = { |
535 | 536 | k: v._data for k, v in self.variables.items() if is_chunked_array(v._data)
|
536 | 537 | }
|
537 |
| - if lazy_data: |
538 |
| - chunkmanager = get_chunked_array_type(*lazy_data.values()) |
| 538 | + if chunked_data: |
| 539 | + chunkmanager = get_chunked_array_type(*chunked_data.values()) |
539 | 540 |
|
540 | 541 | # evaluate all the chunked arrays simultaneously
|
541 | 542 | evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
|
542 |
| - *lazy_data.values(), **kwargs |
| 543 | + *chunked_data.values(), **kwargs |
543 | 544 | )
|
544 | 545 |
|
545 |
| - for k, data in zip(lazy_data, evaluated_data, strict=False): |
| 546 | + for k, data in zip(chunked_data, evaluated_data, strict=False): |
546 | 547 | self.variables[k].data = data
|
547 | 548 |
|
548 | 549 | # load everything else sequentially
|
549 |
| - for k, v in self.variables.items(): |
550 |
| - if k not in lazy_data: |
551 |
| - v.load() |
| 550 | + [v.load_async() for k, v in self.variables.items() if k not in chunked_data] |
552 | 551 |
|
553 | 552 | return self
|
554 | 553 |
|
555 | 554 | async def load_async(self, **kwargs) -> Self:
|
| 555 | + # TODO refactor this to pul out the common chunked_data codepath |
| 556 | + |
556 | 557 | # this blocks on chunked arrays but not on lazily indexed arrays
|
557 | 558 |
|
558 | 559 | # access .data to coerce everything to numpy or dask arrays
|
559 |
| - lazy_data = { |
| 560 | + chunked_data = { |
560 | 561 | k: v._data for k, v in self.variables.items() if is_chunked_array(v._data)
|
561 | 562 | }
|
562 |
| - if lazy_data: |
563 |
| - chunkmanager = get_chunked_array_type(*lazy_data.values()) |
| 563 | + if chunked_data: |
| 564 | + chunkmanager = get_chunked_array_type(*chunked_data.values()) |
564 | 565 |
|
565 | 566 | # evaluate all the chunked arrays simultaneously
|
566 | 567 | evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
|
567 |
| - *lazy_data.values(), **kwargs |
| 568 | + *chunked_data.values(), **kwargs |
568 | 569 | )
|
569 | 570 |
|
570 |
| - for k, data in zip(lazy_data, evaluated_data, strict=False): |
| 571 | + for k, data in zip(chunked_data, evaluated_data, strict=False): |
571 | 572 | self.variables[k].data = data
|
572 | 573 |
|
573 |
| - # load everything else sequentially |
574 |
| - for k, v in self.variables.items(): |
575 |
| - if k not in lazy_data: |
576 |
| - await v.load_async() |
| 574 | + # load everything else concurrently |
| 575 | + tasks = [ |
| 576 | + v.load_async() for k, v in self.variables.items() if k not in chunked_data |
| 577 | + ] |
| 578 | + await asyncio.gather(*tasks) |
577 | 579 |
|
578 | 580 | return self
|
579 | 581 |
|
|
0 commit comments