diff --git a/httomo/data/dataset_store.py b/httomo/data/dataset_store.py index 0a9c79e8b..52ed911f7 100644 --- a/httomo/data/dataset_store.py +++ b/httomo/data/dataset_store.py @@ -29,6 +29,7 @@ from httomo.data.padding import extrapolate_after, extrapolate_before from httomo.runner.auxiliary_data import AuxiliaryData from httomo.runner.dataset import DataSetBlock +from httomo.runner.dataset_store_backing import DataSetStoreBacking from httomo.runner.dataset_store_interfaces import ( DataSetSource, ReadableDataSetSink, @@ -57,7 +58,7 @@ def __init__( slicing_dim: Literal[0, 1, 2], comm: MPI.Comm, temppath: PathLike, - memory_limit_bytes: int = 0, + store_backing: DataSetStoreBacking = DataSetStoreBacking.RAM, ): self._slicing_dim = slicing_dim self._comm = comm @@ -66,7 +67,7 @@ def __init__( self._readonly = False self._h5file: Optional[h5py.File] = None self._h5filename: Optional[Path] = None - self._memory_limit_bytes: int = memory_limit_bytes + self._store_backing = store_backing self._data: Optional[Union[np.ndarray, h5py.Dataset]] = None @@ -79,7 +80,7 @@ def __init__( @property def is_file_based(self) -> bool: - return self._h5filename is not None + return self._store_backing is DataSetStoreBacking.File @property def filename(self) -> Optional[Path]: @@ -177,23 +178,12 @@ def _get_global_h5_filename(self) -> PathLike: return self._h5filename def _create_new_data(self, block: DataSetBlock): - # reduce memory errors across all processes - if any has a memory problem, - # all should use a file - sendBuffer = np.zeros(1, dtype=bool) - recvBuffer = np.zeros(1, dtype=bool) - try: + if self._store_backing is DataSetStoreBacking.RAM: self._data = self._create_numpy_data( unpadded_chunk_shape=block.chunk_shape_unpadded, - padded_chunk_shape=block.chunk_shape, dtype=block.data.dtype, ) - except MemoryError: - sendBuffer[0] = True - - # do a logical or of all the memory errors across the processes - self.comm.Allreduce([sendBuffer, MPI.BOOL], [recvBuffer, MPI.BOOL], MPI.LOR) - - if bool(recvBuffer[0]) is True: + else: log_once( "Chunk does not fit in memory - using a file-based store", level=logging.WARNING, @@ -208,20 +198,9 @@ def _create_new_data(self, block: DataSetBlock): ) def _create_numpy_data( - self, - unpadded_chunk_shape: Tuple[int, int, int], - padded_chunk_shape: Tuple[int, int, int], - dtype: DTypeLike, + self, unpadded_chunk_shape: Tuple[int, int, int], dtype: DTypeLike ) -> np.ndarray: """Convenience method to enable mocking easily""" - unpadded_chunk_bytes = np.prod(unpadded_chunk_shape) * np.dtype(dtype).itemsize - padded_chunk_bytes = np.prod(padded_chunk_shape) * np.dtype(dtype).itemsize - if ( - self._memory_limit_bytes > 0 - and unpadded_chunk_bytes + padded_chunk_bytes >= self._memory_limit_bytes - ): - raise MemoryError("Memory limit reached") - return np.empty(unpadded_chunk_shape, dtype) def _create_h5_data( diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py new file mode 100644 index 000000000..250b03d66 --- /dev/null +++ b/httomo/runner/dataset_store_backing.py @@ -0,0 +1,178 @@ +from enum import Enum +from typing import Callable, List, ParamSpec, Tuple + +import numpy as np +from numpy.typing import DTypeLike +from mpi4py import MPI + +from httomo.runner.section import Section, determine_section_padding +from httomo.utils import _get_slicing_dim, make_3d_shape_from_shape + + +def calculate_section_chunk_shape( + comm: MPI.Comm, + global_shape: Tuple[int, int, int], + slicing_dim: int, + padding: Tuple[int, int], +) -> Tuple[int, int, int]: + """ + Calculate chunk shape (w/ or w/o padding) for a section. + """ + start = round((global_shape[slicing_dim] / comm.size) * comm.rank) + stop = round((global_shape[slicing_dim] / comm.size) * (comm.rank + 1)) + section_slicing_dim_len = stop - start + shape = list(global_shape) + shape[slicing_dim] = section_slicing_dim_len + padding[0] + padding[1] + return make_3d_shape_from_shape(shape) + + +def calculate_section_chunk_bytes( + chunk_shape: Tuple[int, int, int], + dtype: DTypeLike, + section: Section, +) -> int: + """ + Calculate the number of bytes in the section output chunk that is written to the store. Ths + accounts for data's non-slicing dims changing during processing, which changes the chunk + shape for the section and thus affects the number of bytes in the chunk. + """ + slicing_dim = _get_slicing_dim(section.pattern) - 1 + non_slice_dims_list = list(chunk_shape) + non_slice_dims_list.pop(slicing_dim) + non_slice_dims = (non_slice_dims_list[0], non_slice_dims_list[1]) + + for method in section.methods: + if method.memory_gpu is None: + continue + non_slice_dims = method.calculate_output_dims(non_slice_dims) + + return int( + np.prod(non_slice_dims) * chunk_shape[slicing_dim] * np.dtype(dtype).itemsize + ) + + +class DataSetStoreBacking(Enum): + RAM = 1 + File = 2 + + +P = ParamSpec("P") + + +def _reduce_decorator_factory( + comm: MPI.Comm, +) -> Callable[[Callable[P, DataSetStoreBacking]], Callable[P, DataSetStoreBacking]]: + """ + Generate decorator for store-backing calculator function that will use the given MPI + communicator for the reduce operation. + """ + + def reduce_decorator( + func: Callable[P, DataSetStoreBacking] + ) -> Callable[P, DataSetStoreBacking]: + """ + Decorator for store-backing calculator function. + """ + + def wrapper(*args: P.args, **kwargs: P.kwargs) -> DataSetStoreBacking: + """ + Perform store-backing calculation across all MPI processes and reduce. + """ + # reduce store backing enum variant across all processes - if any has + # `File` variant, all should use a file + send_buffer = np.zeros(1, dtype=bool) + recv_buffer = np.zeros(1, dtype=bool) + store_backing = func(*args, **kwargs) + + if store_backing is DataSetStoreBacking.File: + send_buffer[0] = True + + # do a logical or of all the enum variants across the processes + comm.Allreduce([send_buffer, MPI.BOOL], [recv_buffer, MPI.BOOL], MPI.LOR) + + if bool(recv_buffer[0]) is True: + return DataSetStoreBacking.File + + return DataSetStoreBacking.RAM + + return wrapper + + return reduce_decorator + + +def _non_last_section_in_pipeline( + memory_limit_bytes: int, + write_chunk_bytes: int, + read_chunk_bytes: int, +) -> DataSetStoreBacking: + """ + Calculate backing of dataset store for non-last sections in pipeline + """ + if ( + memory_limit_bytes > 0 + and write_chunk_bytes + read_chunk_bytes >= memory_limit_bytes + ): + return DataSetStoreBacking.File + + return DataSetStoreBacking.RAM + + +def _last_section_in_pipeline( + memory_limit_bytes: int, + write_chunk_bytes: int, +) -> DataSetStoreBacking: + """ + Calculate backing of dataset store for last section in pipeline + """ + if memory_limit_bytes > 0 and write_chunk_bytes >= memory_limit_bytes: + return DataSetStoreBacking.File + + return DataSetStoreBacking.RAM + + +def determine_store_backing( + comm: MPI.Comm, + sections: List[Section], + memory_limit_bytes: int, + dtype: DTypeLike, + global_shape: Tuple[int, int, int], + section_idx: int, +) -> DataSetStoreBacking: + reduce_decorator = _reduce_decorator_factory(comm) + + # Get chunk shape input to section + current_chunk_shape = calculate_section_chunk_shape( + comm=comm, + global_shape=global_shape, + slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, + padding=(0, 0), + ) + + # Get the number of bytes in the input chunk to the section w/ potential modifications to + # the non-slicing dims + current_chunk_bytes = calculate_section_chunk_bytes( + chunk_shape=current_chunk_shape, + dtype=dtype, + section=sections[section_idx], + ) + + if section_idx == len(sections) - 1: + return reduce_decorator(_last_section_in_pipeline)( + memory_limit_bytes=memory_limit_bytes, + write_chunk_bytes=current_chunk_bytes, + ) + + # Get chunk shape created by reader of section `n+1`, that will add padding to the + # chunk shape written by the writer of section `n` + next_chunk_shape = calculate_section_chunk_shape( + comm=comm, + global_shape=global_shape, + slicing_dim=_get_slicing_dim(sections[section_idx + 1].pattern) - 1, + padding=determine_section_padding(sections[section_idx + 1]), + ) + next_chunk_bytes = int(np.prod(next_chunk_shape) * np.dtype(dtype).itemsize) + return reduce_decorator(_non_last_section_in_pipeline)( + memory_limit_bytes=memory_limit_bytes, + write_chunk_bytes=current_chunk_bytes, + read_chunk_bytes=next_chunk_bytes, + ) diff --git a/httomo/runner/section.py b/httomo/runner/section.py index 479f7a602..69a43a5f0 100644 --- a/httomo/runner/section.py +++ b/httomo/runner/section.py @@ -1,5 +1,5 @@ import logging -from typing import Iterator, List, Optional +from typing import Iterator, List, Optional, Tuple from httomo.runner.output_ref import OutputRef from httomo.runner.pipeline import Pipeline @@ -142,3 +142,12 @@ def _set_method_patterns(sections: List[Section]): for s in sections: for m in s: m.pattern = s.pattern + + +def determine_section_padding(section: Section) -> Tuple[int, int]: + # NOTE: Assumes that only one method with padding will be in a section, which is + # consistent with the assumptions made by `sectionize()` + for method in section.methods: + if method.padding: + return method.calculate_padding() + return (0, 0) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index ee0f4169a..d84367237 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -9,6 +9,7 @@ import httomo.globals from httomo.data.dataset_store import DataSetStoreWriter +from httomo.runner.dataset_store_backing import determine_store_backing from httomo.runner.method_wrapper import MethodWrapper from httomo.runner.block_split import BlockSplitter from httomo.runner.dataset import DataSetBlock @@ -29,7 +30,6 @@ log_exception, log_once, log_rank, - make_3d_shape_from_shape, ) import numpy as np @@ -56,12 +56,13 @@ def __init__( self._memory_limit_bytes = memory_limit_bytes + self._sections = self._sectionize() + def execute(self) -> None: with catchtime() as t: - sections = self._sectionize() self._prepare() - for i, section in enumerate(sections): + for i, section in enumerate(self._sections): self._execute_section(section, i) gpumem_cleanup() @@ -81,7 +82,7 @@ def _sectionize(self) -> List[Section]: return sections def _execute_section(self, section: Section, section_index: int = 0): - self._setup_source_sink(section) + self._setup_source_sink(section, section_index) assert self.source is not None, "Dataset has not been loaded yet" assert self.sink is not None, "Sink setup failed" @@ -162,7 +163,7 @@ def _execute_section(self, section: Section, section_index: int = 0): level=logging.INFO, ) - def _setup_source_sink(self, section: Section): + def _setup_source_sink(self, section: Section, idx: int): assert self.source is not None, "Dataset has not been loaded yet" slicing_dim_section: Literal[0, 1] = _get_slicing_dim(section.pattern) - 1 # type: ignore @@ -173,6 +174,15 @@ def _setup_source_sink(self, section: Section): assert isinstance(self.sink, ReadableDataSetSink) self.source = self.sink.make_reader(slicing_dim_section) + store_backing = determine_store_backing( + comm=self.comm, + sections=self._sections, + memory_limit_bytes=self._memory_limit_bytes, + dtype=self.source.dtype, + global_shape=self.source.global_shape, + section_idx=idx, + ) + if section.is_last: # we don't need to store the results - this sink just discards it self.sink = DummySink(slicing_dim_section) @@ -181,7 +191,7 @@ def _setup_source_sink(self, section: Section): slicing_dim_section, self.comm, self.reslice_dir, - memory_limit_bytes=self._memory_limit_bytes, + store_backing=store_backing, ) def _execute_section_block( @@ -352,30 +362,3 @@ def determine_max_slices(self, section: Section, slicing_dim: int): non_slice_dims_shape = output_dims section.max_slices = min(max_slices_methods) - - def determine_section_padding(self, section: Section) -> Tuple[int, int]: - # NOTE: Assumes that only one method with padding will be in a section, which is - # consistent with the assumptions made by `section.sectionizer()` - for method in section.methods: - if method.padding: - return method.calculate_padding() - return (0, 0) - - -def calculate_next_chunk_shape( - comm: MPI.Comm, - global_shape: Tuple[int, int, int], - next_section_slicing_dim: int, - next_section_padding: Tuple[int, int], -) -> Tuple[int, int, int]: - """ - Utility function for calculating the chunk shape (including padding) for the next section. - """ - start = round((global_shape[next_section_slicing_dim] / comm.size) * comm.rank) - stop = round((global_shape[next_section_slicing_dim] / comm.size) * (comm.rank + 1)) - next_section_slicing_dim_len = stop - start - shape = list(global_shape) - shape[next_section_slicing_dim] = ( - next_section_slicing_dim_len + next_section_padding[0] + next_section_padding[1] - ) - return make_3d_shape_from_shape(shape) diff --git a/tests/data/test_dataset_store.py b/tests/data/test_dataset_store.py index 7ff1fdb67..ec19270df 100644 --- a/tests/data/test_dataset_store.py +++ b/tests/data/test_dataset_store.py @@ -1,6 +1,6 @@ from os import PathLike from pathlib import Path -from typing import List, Literal +from typing import Literal from unittest.mock import ANY import numpy as np import pytest @@ -11,6 +11,7 @@ from httomo.runner.auxiliary_data import AuxiliaryData from httomo.runner.dataset import DataSetBlock +from httomo.runner.dataset_store_backing import DataSetStoreBacking from httomo.utils import make_3d_shape_from_shape @@ -59,14 +60,18 @@ def test_reader_throws_if_no_data(tmp_path: PathLike): assert "no data" in str(e) -@pytest.mark.parametrize("file_based", [False, True]) +@pytest.mark.parametrize( + "store_backing", [DataSetStoreBacking.RAM, DataSetStoreBacking.File] +) def test_can_write_and_read_blocks( - mocker: MockerFixture, tmp_path: PathLike, file_based: bool + tmp_path: PathLike, + store_backing: DataSetStoreBacking, ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=store_backing, ) GLOBAL_SHAPE = (10, 10, 10) @@ -94,8 +99,6 @@ def test_can_write_and_read_blocks( chunk_start=chunk_start, ) - if file_based: - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(block1) writer.write_block(block2) @@ -116,25 +119,22 @@ def test_can_write_and_read_blocks( np.testing.assert_array_equal(rblock2.data, block2.data) -@pytest.mark.parametrize("file_based", [False, True]) +@pytest.mark.parametrize( + "store_backing", [DataSetStoreBacking.RAM, DataSetStoreBacking.File] +) def test_write_after_read_throws( - mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike, - file_based: bool, + store_backing: DataSetStoreBacking, ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=store_backing, ) - - if file_based: - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(dummy_block) - writer.make_reader() - with pytest.raises(ValueError): writer.write_block(dummy_block) @@ -146,9 +146,9 @@ def test_writer_closes_file_on_finalize( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=DataSetStoreBacking.File, ) - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(dummy_block) fileclose = mocker.patch.object(writer._h5file, "close") writer.finalize() @@ -157,16 +157,15 @@ def test_writer_closes_file_on_finalize( def test_making_reader_closes_file_and_deletes( - mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike + dummy_block: DataSetBlock, tmp_path: PathLike ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=DataSetStoreBacking.File, ) - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(dummy_block) - reader = writer.make_reader() assert writer._h5file is None @@ -340,16 +339,16 @@ def test_writing_inconsistent_global_index_fails(tmp_path: PathLike): assert "inconsistent shape" in str(e) -def test_create_new_data_goes_to_file_on_memory_error( +def test_create_new_data_goes_to_file_for_file_store_backing( mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=DataSetStoreBacking.File, ) - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) createh5_mock = mocker.patch.object( writer, "_create_h5_data", return_value=dummy_block.data ) @@ -364,46 +363,6 @@ def test_create_new_data_goes_to_file_on_memory_error( ) -def test_create_new_data_goes_to_file_on_memory_limit( - mocker: MockerFixture, tmp_path: PathLike -): - GLOBAL_SHAPE = (500, 10, 10) - data = np.ones(GLOBAL_SHAPE, dtype=np.float32) - aux_data = AuxiliaryData( - angles=np.ones(data.shape[0], dtype=np.float32), - darks=2.0 * np.ones((2, GLOBAL_SHAPE[1], GLOBAL_SHAPE[2]), dtype=np.float32), - flats=1.0 * np.ones((2, GLOBAL_SHAPE[1], GLOBAL_SHAPE[2]), dtype=np.float32), - ) - block = DataSetBlock( - data=data[0:2, :, :], - aux_data=aux_data, - slicing_dim=0, - block_start=0, - chunk_start=0, - global_shape=GLOBAL_SHAPE, - chunk_shape=GLOBAL_SHAPE, - ) - writer = DataSetStoreWriter( - slicing_dim=0, - comm=MPI.COMM_WORLD, - temppath=tmp_path, - memory_limit_bytes=block.data.nbytes + 5, # only one block will fit in memory - ) - - createh5_mock = mocker.patch.object( - writer, "_create_h5_data", return_value=block.data - ) - - writer.write_block(block) - - createh5_mock.assert_called_with( - writer.global_shape, - block.data.dtype, - ANY, - writer.comm, - ) - - def test_calls_reslice( mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike ): @@ -422,24 +381,25 @@ def test_calls_reslice( reslice_mock.assert_called_with(0, 1, d) -@pytest.mark.parametrize("file_based", [False, True]) +@pytest.mark.parametrize( + "store_backing", [DataSetStoreBacking.RAM, DataSetStoreBacking.File] +) def test_reslice_single_block_single_process( - mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike, - file_based: bool, + store_backing: DataSetStoreBacking, ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=store_backing, ) - if file_based: - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(dummy_block) - assert writer.is_file_based is file_based + expected_is_file_based = store_backing is DataSetStoreBacking.File + assert writer.is_file_based is expected_is_file_based reader = writer.make_reader(new_slicing_dim=1) @@ -456,7 +416,7 @@ def test_reslice_single_block_single_process( assert block.chunk_shape == reader.chunk_shape assert isinstance(reader, DataSetStoreReader) - assert reader.is_file_based is file_based + assert reader.is_file_based is expected_is_file_based np.testing.assert_array_equal(block.data, dummy_block.data[:, 1:3, :]) np.testing.assert_array_equal(block.flats, dummy_block.flats) @@ -469,18 +429,12 @@ def test_reslice_single_block_single_process( MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" ) @pytest.mark.parametrize( - "out_of_memory_ranks", - [[], [1], [0, 1]], - ids=[ - "out_of_memory=none", - "out_of_memory=one process", - "out_of_memory=both processes", - ], + "store_backing", + [DataSetStoreBacking.RAM, DataSetStoreBacking.File], ) def test_full_integration_with_reslice( - mocker: MockerFixture, tmp_path: PathLike, - out_of_memory_ranks: List[int], + store_backing: DataSetStoreBacking, ): ########### ARRANGE DATA and mocks @@ -512,12 +466,9 @@ def test_full_integration_with_reslice( slicing_dim=0, comm=comm, temppath=tmp_path, + store_backing=store_backing, ) - if comm.rank in out_of_memory_ranks: - # make it throw an exception, so it reverts to file-based store - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) - ############# ACT # write full chunk-sized block @@ -560,14 +511,18 @@ def test_full_integration_with_reslice( ) -@pytest.mark.parametrize("file_based", [False, True]) +@pytest.mark.parametrize( + "store_backing", [DataSetStoreBacking.RAM, DataSetStoreBacking.File] +) def test_can_write_blocks_with_padding_and_read( - mocker: MockerFixture, tmp_path: PathLike, file_based: bool + tmp_path: PathLike, + store_backing: DataSetStoreBacking, ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=store_backing, ) GLOBAL_SHAPE = (10, 10, 10) @@ -616,8 +571,6 @@ def test_can_write_blocks_with_padding_and_read( padding=padding, ) - if file_based: - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(block1) writer.write_block(block2) @@ -1045,63 +998,3 @@ def test_adapts_shapes_with_padding_and_reslicing_mpi( np.testing.assert_array_equal(block1.data, b1expected) np.testing.assert_array_equal(block2.data, b2expected) - - -def test_write_block_accounts_for_two_chunk_copies(tmp_path: PathLike): - MEMORY_LIMIT = 4 * 1024**2 # 4MB - - # Core chunk array size ~3.4MB - # Padded chunk array size ~4.8MB - # -> Core chunk array fits within memory limit, but core chunk + padded chunk doesn't - PADDING = (2, 2) - SLICING_DIM = 0 - DTYPE = np.float32 - CORE_CHUNK_SHAPE = (10, 300, 300) - PADDED_CHUNK_SHAPE = ( - CORE_CHUNK_SHAPE[0] + PADDING[0] + PADDING[1], - CORE_CHUNK_SHAPE[1], - CORE_CHUNK_SHAPE[2], - ) - - # Create block to later write to the store - data = np.ones( - ( - CORE_CHUNK_SHAPE[0] // 2 + PADDING[0] + PADDING[1], - CORE_CHUNK_SHAPE[1], - CORE_CHUNK_SHAPE[2], - ), - dtype=DTYPE, - ) - block = DataSetBlock( - data=data, - aux_data=AuxiliaryData(np.ones(CORE_CHUNK_SHAPE[0], dtype=DTYPE)), - slicing_dim=SLICING_DIM, - global_shape=CORE_CHUNK_SHAPE, - chunk_shape=PADDED_CHUNK_SHAPE, - block_start=0, - chunk_start=0, - padding=PADDING, - ) - - # Create writer with memory limit - COMM = MPI.COMM_WORLD - writer = DataSetStoreWriter( - slicing_dim=SLICING_DIM, - comm=COMM, - temppath=tmp_path, - memory_limit_bytes=MEMORY_LIMIT, - ) - - # A 4MB limit on RAM is enough to hold the core chunk, but not enough to hold the core - # chunk + padded chunk. Therefore, a `MemoryError` should be raised + handled by the writer - # when the first block is attempted to be written. - # - # A `MemoryError` being raised and handled by the writer will result in an hdf5 file - # backing the store. Thus, checking if the writer is file-based will show if a - # `MemoryError` was handled or not. - writer.write_block(block) - assert writer.is_file_based - - # Execute finaliser to make sure the hdf5 file that should be backing the store is cleaned - # up (doing so avoids warnings in pytest output) - writer.finalize() diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py new file mode 100644 index 000000000..7b61cef7b --- /dev/null +++ b/tests/runner/test_dataset_store_backing.py @@ -0,0 +1,515 @@ +from typing import List, Tuple + +import pytest +import numpy as np +from mpi4py import MPI +from pytest_mock import MockerFixture + +from httomo.runner.dataset_store_backing import ( + DataSetStoreBacking, + calculate_section_chunk_shape, + calculate_section_chunk_bytes, + determine_store_backing, +) +from httomo.runner.methods_repository_interface import GpuMemoryRequirement +from httomo.runner.pipeline import Pipeline +from httomo.runner.section import sectionize +from httomo.utils import Pattern, make_3d_shape_from_shape +from tests.testing_utils import make_test_loader, make_test_method + + +@pytest.mark.parametrize( + "nprocs, rank, section_slicing_dim, section_padding", + [ + (2, 1, 0, (0, 0)), + (2, 1, 0, (3, 5)), + (2, 1, 1, (0, 0)), + (2, 1, 1, (3, 5)), + (4, 2, 0, (0, 0)), + (4, 2, 0, (3, 5)), + (4, 2, 1, (0, 0)), + (4, 2, 1, (3, 5)), + ], + ids=[ + "2procs-proj-to-proj_unpadded", + "2procs-proj-to-proj_padded", + "2procs-proj-to-sino_unpadded", + "2procs-proj-to-sino_padded", + "4procs-proj-to-proj_unpadded", + "4procs-proj-to-proj_padded", + "4procs-proj-to-sino_unpadded", + "4procs-proj-to-sino_padded", + ], +) +def test_calculate_section_chunk_shape( + nprocs: int, + rank: int, + section_slicing_dim: int, + section_padding: Tuple[int, int], + mocker: MockerFixture, +): + GLOBAL_SHAPE = (1801, 2160, 2560) + + # Define mock communicator that reflects the desired data splitting/distribution to be + # tested + mock_global_comm = mocker.create_autospec(spec=MPI.Comm, size=nprocs, rank=rank) + + # The chunk shape for the section should reflect the padding needed for that section + expected_chunk_shape: List[int] = list(GLOBAL_SHAPE) + start = round(GLOBAL_SHAPE[section_slicing_dim] / nprocs * rank) + stop = round(GLOBAL_SHAPE[section_slicing_dim] / nprocs * (rank + 1)) + slicing_dim_len = stop - start + expected_chunk_shape[section_slicing_dim] = ( + slicing_dim_len + section_padding[0] + section_padding[1] + ) + section_chunk_shape = calculate_section_chunk_shape( + comm=mock_global_comm, + global_shape=GLOBAL_SHAPE, + slicing_dim=section_slicing_dim, + padding=section_padding, + ) + assert section_chunk_shape == make_3d_shape_from_shape(expected_chunk_shape) + + +def test_calculate_section_chunk_bytes_output_dims_change(mocker: MockerFixture): + NEW_OUTPUT_DIMS = (300, 400) + SECTION_INPUT_CHUNK_SHAPE = (100, 10, 100) + DTYPE = np.float32 + EXPECTED_SECTION_OUTPUT_CHUNK_SHAPE = ( + SECTION_INPUT_CHUNK_SHAPE[0], + NEW_OUTPUT_DIMS[0], + NEW_OUTPUT_DIMS[1], + ) + EXPECTED_SECTION_OUTPUT_CHUNK_BYTES = ( + np.prod(EXPECTED_SECTION_OUTPUT_CHUNK_SHAPE) * np.dtype(DTYPE).itemsize + ) + + # Define methods to form section, one of which changes the output shape + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method( + mocker=mocker, method_name="m2", pattern=Pattern.projection, gpu=True + ) + mocker.patch.object( + m2, + "memory_gpu", + [GpuMemoryRequirement(multiplier=2.0, method="direct")], + ) + mocker.patch.object( + target=m2, + attribute="calculate_output_dims", + return_value=NEW_OUTPUT_DIMS, + ) + + # Generate list of sections + pipeline = Pipeline(loader=loader, methods=[m1, m2]) + sections = sectionize(pipeline) + + # Check that the number of bytes in the chunk accounts for the non-slicing dims change by + # the method in the section + section_output_chunk_bytes = calculate_section_chunk_bytes( + chunk_shape=SECTION_INPUT_CHUNK_SHAPE, + dtype=DTYPE, + section=sections[0], + ) + assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES + + +def test_calculate_section_chunk_bytes_output_dims_change_and_swap( + mocker: MockerFixture, +): + RECON_SIZE = 400 + SECTION_INPUT_CHUNK_SHAPE = (100, 10, 100) + DTYPE = np.float32 + EXPECTED_SECTION_OUTPUT_CHUNK_SHAPE = ( + SECTION_INPUT_CHUNK_SHAPE[1], + RECON_SIZE, + RECON_SIZE, + ) + EXPECTED_SECTION_OUTPUT_CHUNK_BYTES = ( + np.prod(EXPECTED_SECTION_OUTPUT_CHUNK_SHAPE) * np.dtype(DTYPE).itemsize + ) + + # Define methods to form section, one of which changes the output shape and also swaps the + # output dims 0 and 1 (which can happen in recon methods) + loader = make_test_loader(mocker=mocker) + m1 = make_test_method( + mocker=mocker, method_name="stripe-removal", pattern=Pattern.sinogram + ) + # NOTE: The reason that `swap_dims_on_output=True` is not passed to `make_test_method()` + # even though `m2` is assumed to be swapping the output dims is because that doesn't + # actually do anything when not running the method on data. + # + # The output dims being swapped is not taken into account when calling + # `calculate_output_dims()` on a method wrapper object, and the output dims swapping is + # implicitly done when the method executes. + # + # It however still seemed reasonable to check that the correct chunk bytes value was given + # if there's a method in a section that swaps the output dims. + m2 = make_test_method( + mocker=mocker, + method_name="recon", + pattern=Pattern.sinogram, + gpu=True, + ) + mocker.patch.object( + m2, + "memory_gpu", + [GpuMemoryRequirement(multiplier=2.0, method="direct")], + ) + mocker.patch.object( + target=m2, + attribute="calculate_output_dims", + return_value=(RECON_SIZE, RECON_SIZE), + ) + + # Generate list of sections + pipeline = Pipeline(loader=loader, methods=[m1, m2]) + sections = sectionize(pipeline) + + # Check that the number of bytes in the chunk accounts for the non-slicing dims change by + # the method in the section + section_output_chunk_bytes = calculate_section_chunk_bytes( + chunk_shape=SECTION_INPUT_CHUNK_SHAPE, + dtype=DTYPE, + section=sections[0], + ) + assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES + + +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (3 * 1024**2, DataSetStoreBacking.File), + (4 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["3MB-limit-file-backing", "4MB-limit-ram-backing"], +) +def test_determine_store_backing_last_section_pipeline_single_proc( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype and shape combined makes: + # - the write chunk ~3.4MB + # - the read chunk also ~3.4MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + method = make_test_method( + mocker=mocker, method_name="method", pattern=Pattern.projection + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[method], + ) + sections = sectionize(pipeline) + + # Based on memory limit and the given section in the pipeline, determine the backing of the + # store for the execution of that section + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (1 * 1024**2, DataSetStoreBacking.File), + (2 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["1MB-limit-file-backing", "2MB-limit-ram-backing"], +) +def test_determine_store_backing_last_section_pipeline_two_procs( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For two processes, chunk shape = half of global shape + # + # The dtype and shape combined makes: + # - the write chunk ~1.7MB + # - the read chunk also ~1.7MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + method = make_test_method( + mocker=mocker, method_name="method", pattern=Pattern.projection + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[method], + ) + sections = sectionize(pipeline) + + # Based on memory limit and the given section in the pipeline, determine the backing of the + # store for the execution of that section + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (6 * 1024**2, DataSetStoreBacking.File), + (7 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["6MB-limit-file-backing", "7MB-limit-ram-backing"], +) +def test_determine_store_backing_non_last_section_pipeline_single_proc( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype and shape combined makes: + # - the write chunk ~3.4MB + # - the read chunk also ~3.4MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + # For execution of non-last sections in pipelines, the writer must take into account that a + # copy of the chunk is made by the reader of the following section. Therefore, two copies + # of the chunk must be taken into account when deciding the backing of the store. + # + # Note that section 0 is only the section that is "not the last section", so it's the only + # one that will need to account for two copies of the chunk, and thus the main target of + # the test. Hence, why `section_idx=0` is given. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (3 * 1024**2, DataSetStoreBacking.File), + (4 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["3MB-limit-file-backing", "4MB-limit-ram-backing"], +) +def test_determine_store_backing_non_last_section_pipeline_two_procs( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For two processes, chunk shape = half of global shape + # + # The dtype and shape combined makes: + # - the write chunk ~1.7MB + # - the read chunk also ~1.7MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + # For exeuction of non-last sections in pipelines, the writer must take into account that a + # copy of the chunk is made by the reader of the following section. Therefore, two copies + # of the chunk must be taken into account when deciding the backing of the store. + # + # Note that section 0 is only the section that is "not the last section", so it's the only + # one that will need to account for two copies of the chunk, and thus the main target of + # the test. Hence, why `section_idx=0` is given. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (7 * 1024**2, DataSetStoreBacking.File), + (10 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["7MB-limit-file-backing", "10MB-limit-ram-backing"], +) +def test_determine_store_backing_non_last_section_pipeline_large_padding_single_proc( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype, shape, and padding combined makes: + # - the write chunk ~3.4MB + # - the read chunk ~5.7MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + PADDING = (50, 50) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method( + mocker=mocker, method_name="m2", pattern=Pattern.sinogram, padding=True + ) + mocker.patch.object( + target=m2, + attribute="calculate_padding", + return_value=PADDING, + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + # For execution of non-last sections in pipelines, the writer must take into account that a + # copy of the chunk is made by the reader of the following section. Therefore, two copies + # of the chunk must be taken into account when deciding the backing of the store. + # + # Note that section 0 is only the section that is "not the last section", so it's the only + # one that will need to account for two copies of the chunk, and thus the main target of + # the test. Hence, why `section_idx=0` is given. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (4 * 1024**2, DataSetStoreBacking.File), + (5 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["4MB-limit-file-backing", "5MB-limit-ram-backing"], +) +def test_determine_store_backing_non_last_section_pipeline_large_padding_two_procs( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype, shape, and padding combined makes: + # - the write chunk ~1.7MB + # - the read chunk ~2.8MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + PADDING = (50, 50) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method( + mocker=mocker, method_name="m2", pattern=Pattern.sinogram, padding=True + ) + mocker.patch.object( + target=m2, + attribute="calculate_padding", + return_value=PADDING, + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + # For execution of non-last sections in pipelines, the writer must take into account that a + # copy of the chunk is made by the reader of the following section. Therefore, two copies + # of the chunk must be taken into account when deciding the backing of the store. + # + # Note that section 0 is only the section that is "not the last section", so it's the only + # one that will need to account for two copies of the chunk, and thus the main target of + # the test. Hence, why `section_idx=0` is given. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing diff --git a/tests/runner/test_section.py b/tests/runner/test_section.py index 5797c16b8..f453204fa 100644 --- a/tests/runner/test_section.py +++ b/tests/runner/test_section.py @@ -3,7 +3,7 @@ from pytest_mock import MockerFixture from httomo.runner.output_ref import OutputRef from httomo.runner.pipeline import Pipeline -from httomo.runner.section import sectionize, Section +from httomo.runner.section import determine_section_padding, sectionize, Section from httomo.utils import Pattern from ..testing_utils import make_test_loader, make_test_method @@ -308,3 +308,67 @@ def test_sectionizer_splits_section_if_multiple_padding_methods(mocker: MockerFi len(s[0]) == 3 ) # loader + 3 methods, as we want right before the next padding method assert len(s[1]) == 1 # just the last method with padding + + +def test_determine_section_padding_no_padding_method_in_section( + mocker: MockerFixture, +): + loader = make_test_loader(mocker) + method_1 = make_test_method(mocker=mocker, padding=False) + method_2 = make_test_method(mocker=mocker, padding=False) + method_3 = make_test_method(mocker=mocker, padding=False) + + pipeline = Pipeline( + loader=loader, + methods=[method_1, method_2, method_3], + ) + sections = sectionize(pipeline) + section_padding = determine_section_padding(sections[0]) + assert section_padding == (0, 0) + + +def test_determine_section_padding_one_padding_method_only_method_in_section( + mocker: MockerFixture, +): + loader = make_test_loader(mocker) + + PADDING = (3, 5) + padding_method = make_test_method(mocker=mocker, padding=True) + mocker.patch.object( + target=padding_method, + attribute="calculate_padding", + return_value=PADDING, + ) + + pipeline = Pipeline(loader=loader, methods=[padding_method]) + sections = sectionize(pipeline) + section_padding = determine_section_padding(sections[0]) + assert section_padding == PADDING + + +def test_determine_section_padding_one_padding_method_and_other_methods_in_section( + mocker: MockerFixture, +): + loader = make_test_loader(mocker) + + PADDING = (3, 5) + padding_method = make_test_method(mocker=mocker, padding=True) + mocker.patch.object( + target=padding_method, + attribute="calculate_padding", + return_value=PADDING, + ) + method_1 = make_test_method(mocker=mocker, padding=False) + method_2 = make_test_method(mocker=mocker, padding=False) + method_3 = make_test_method(mocker=mocker, padding=False) + + pipeline = Pipeline( + loader=loader, + methods=[method_1, method_2, padding_method, method_3], + ) + + sections = sectionize(pipeline) + assert len(sections[0]) == 4 + + section_padding = determine_section_padding(sections[0]) + assert section_padding == PADDING diff --git a/tests/runner/test_task_runner.py b/tests/runner/test_task_runner.py index 18a3a73ac..b5c92aba1 100644 --- a/tests/runner/test_task_runner.py +++ b/tests/runner/test_task_runner.py @@ -10,15 +10,15 @@ from httomo.data.dataset_store import DataSetStoreWriter from httomo.runner.auxiliary_data import AuxiliaryData from httomo.runner.dataset import DataSetBlock +from httomo.runner.dataset_store_backing import DataSetStoreBacking from httomo.runner.methods_repository_interface import GpuMemoryRequirement from httomo.runner.monitoring_interface import MonitoringInterface from httomo.runner.output_ref import OutputRef from httomo.runner.pipeline import Pipeline from httomo.runner.section import Section, sectionize -from httomo.runner.task_runner import TaskRunner, calculate_next_chunk_shape +from httomo.runner.task_runner import TaskRunner from httomo.utils import ( Pattern, - make_3d_shape_from_shape, xp, gpu_enabled, ) @@ -335,6 +335,13 @@ def test_execute_section_calls_blockwise_execute_and_monitors( s = sectionize(p) mon = mocker.create_autospec(MonitoringInterface, instance=True) t = TaskRunner(p, reslice_dir=tmp_path, comm=MPI.COMM_WORLD, monitor=mon) + + # Patch the store backing calculator function to assume being backed by RAM + mocker.patch( + "httomo.runner.task_runner.determine_store_backing", + return_value=DataSetStoreBacking.RAM, + ) + t._prepare() # make that do nothing mocker.patch.object(t, "determine_max_slices") @@ -350,7 +357,7 @@ def mul_block_by_two(section, block: DataSetBlock): ) s[0].is_last = False # should setup a DataSetStoreWriter as sink - t._execute_section(s[0], 1) + t._execute_section(s[0]) assert isinstance(t.sink, DataSetStoreWriter) reader = t.sink.make_reader() data = reader.read_block(0, dummy_block.shape[0]) @@ -397,6 +404,11 @@ def test_does_reslice_when_needed_and_reports_time( mon = mocker.create_autospec(MonitoringInterface, instance=True) t = TaskRunner(p, reslice_dir=tmp_path, comm=MPI.COMM_WORLD, monitor=mon) + # Patch the store backing calculator function to assume being backed by RAM + mocker.patch( + "httomo.runner.task_runner.determine_store_backing", + return_value=DataSetStoreBacking.RAM, + ) t.execute() assert loader.pattern == Pattern.projection @@ -476,128 +488,3 @@ def test_warns_with_multiple_stores_from_side_outputs( spy.assert_called() args, _ = spy.call_args assert "Data saving or/and reslicing operation will be performed 4 times" in args[0] - - -def test_determine_section_padding_no_padding_method_in_section( - mocker: MockerFixture, - tmp_path: PathLike, -): - loader = make_test_loader(mocker) - method_1 = make_test_method(mocker=mocker, padding=False) - method_2 = make_test_method(mocker=mocker, padding=False) - method_3 = make_test_method(mocker=mocker, padding=False) - - pipeline = Pipeline( - loader=loader, - methods=[method_1, method_2, method_3], - ) - runner = TaskRunner(pipeline=pipeline, reslice_dir=tmp_path, comm=MPI.COMM_WORLD) - sections = sectionize(pipeline) - section_padding = runner.determine_section_padding(sections[0]) - assert section_padding == (0, 0) - - -def test_determine_section_padding_one_padding_method_only_method_in_section( - mocker: MockerFixture, - tmp_path: PathLike, -): - loader = make_test_loader(mocker) - - PADDING = (3, 5) - padding_method = make_test_method(mocker=mocker, padding=True) - mocker.patch.object( - target=padding_method, - attribute="calculate_padding", - return_value=PADDING, - ) - - pipeline = Pipeline(loader=loader, methods=[padding_method]) - runner = TaskRunner(pipeline=pipeline, reslice_dir=tmp_path, comm=MPI.COMM_WORLD) - sections = sectionize(pipeline) - section_padding = runner.determine_section_padding(sections[0]) - assert section_padding == PADDING - - -def test_determine_section_padding_one_padding_method_and_other_methods_in_section( - mocker: MockerFixture, - tmp_path: PathLike, -): - loader = make_test_loader(mocker) - - PADDING = (3, 5) - padding_method = make_test_method(mocker=mocker, padding=True) - mocker.patch.object( - target=padding_method, - attribute="calculate_padding", - return_value=PADDING, - ) - method_1 = make_test_method(mocker=mocker, padding=False) - method_2 = make_test_method(mocker=mocker, padding=False) - method_3 = make_test_method(mocker=mocker, padding=False) - - pipeline = Pipeline( - loader=loader, - methods=[method_1, method_2, padding_method, method_3], - ) - runner = TaskRunner(pipeline=pipeline, reslice_dir=tmp_path, comm=MPI.COMM_WORLD) - - sections = sectionize(pipeline) - assert len(sections[0]) == 4 - - section_padding = runner.determine_section_padding(sections[0]) - assert section_padding == PADDING - - -@pytest.mark.parametrize( - "nprocs, rank, next_section_slicing_dim, next_section_padding", - [ - (2, 1, 0, (0, 0)), - (2, 1, 0, (3, 5)), - (2, 1, 1, (0, 0)), - (2, 1, 1, (3, 5)), - (4, 2, 0, (0, 0)), - (4, 2, 0, (3, 5)), - (4, 2, 1, (0, 0)), - (4, 2, 1, (3, 5)), - ], - ids=[ - "2procs-proj-to-proj_unpadded", - "2procs-proj-to-proj_padded", - "2procs-proj-to-sino_unpadded", - "2procs-proj-to-sino_padded", - "4procs-proj-to-proj_unpadded", - "4procs-proj-to-proj_padded", - "4procs-proj-to-sino_unpadded", - "4procs-proj-to-sino_padded", - ], -) -def test_calculate_next_chunk_shape( - nprocs: int, - rank: int, - next_section_slicing_dim: int, - next_section_padding: Tuple[int, int], - mocker: MockerFixture, -): - GLOBAL_SHAPE = (1801, 2160, 2560) - - # Define mock communicator that reflects the desired data splitting/distribution to be - # tested - mock_global_comm = mocker.create_autospec(spec=MPI.Comm, size=nprocs, rank=rank) - - # The chunk shape for the next section should reflect the padding needed for that section - expected_next_chunk_shape: List[int] = list(GLOBAL_SHAPE) - start = round(GLOBAL_SHAPE[next_section_slicing_dim] / nprocs * rank) - stop = round(GLOBAL_SHAPE[next_section_slicing_dim] / nprocs * (rank + 1)) - slicing_dim_len = stop - start - expected_next_chunk_shape[next_section_slicing_dim] = ( - slicing_dim_len + next_section_padding[0] + next_section_padding[1] - ) - next_section_chunk_shape = calculate_next_chunk_shape( - comm=mock_global_comm, - global_shape=GLOBAL_SHAPE, - next_section_slicing_dim=next_section_slicing_dim, - next_section_padding=next_section_padding, - ) - assert next_section_chunk_shape == make_3d_shape_from_shape( - expected_next_chunk_shape - )