From 1cf9e28b3817d9dfb377eea4ba8d829200fbc9e5 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Fri, 6 Sep 2024 15:56:27 +0100 Subject: [PATCH 1/6] Move section padding calculator to `section.py` --- httomo/runner/section.py | 11 ++++- httomo/runner/task_runner.py | 8 ---- tests/runner/test_section.py | 66 +++++++++++++++++++++++++++++- tests/runner/test_task_runner.py | 70 -------------------------------- 4 files changed, 75 insertions(+), 80 deletions(-) diff --git a/httomo/runner/section.py b/httomo/runner/section.py index 479f7a602..69a43a5f0 100644 --- a/httomo/runner/section.py +++ b/httomo/runner/section.py @@ -1,5 +1,5 @@ import logging -from typing import Iterator, List, Optional +from typing import Iterator, List, Optional, Tuple from httomo.runner.output_ref import OutputRef from httomo.runner.pipeline import Pipeline @@ -142,3 +142,12 @@ def _set_method_patterns(sections: List[Section]): for s in sections: for m in s: m.pattern = s.pattern + + +def determine_section_padding(section: Section) -> Tuple[int, int]: + # NOTE: Assumes that only one method with padding will be in a section, which is + # consistent with the assumptions made by `sectionize()` + for method in section.methods: + if method.padding: + return method.calculate_padding() + return (0, 0) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index ee0f4169a..7d6fa0276 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -353,14 +353,6 @@ def determine_max_slices(self, section: Section, slicing_dim: int): section.max_slices = min(max_slices_methods) - def determine_section_padding(self, section: Section) -> Tuple[int, int]: - # NOTE: Assumes that only one method with padding will be in a section, which is - # consistent with the assumptions made by `section.sectionizer()` - for method in section.methods: - if method.padding: - return method.calculate_padding() - return (0, 0) - def calculate_next_chunk_shape( comm: MPI.Comm, diff --git a/tests/runner/test_section.py b/tests/runner/test_section.py index 5797c16b8..f453204fa 100644 --- a/tests/runner/test_section.py +++ b/tests/runner/test_section.py @@ -3,7 +3,7 @@ from pytest_mock import MockerFixture from httomo.runner.output_ref import OutputRef from httomo.runner.pipeline import Pipeline -from httomo.runner.section import sectionize, Section +from httomo.runner.section import determine_section_padding, sectionize, Section from httomo.utils import Pattern from ..testing_utils import make_test_loader, make_test_method @@ -308,3 +308,67 @@ def test_sectionizer_splits_section_if_multiple_padding_methods(mocker: MockerFi len(s[0]) == 3 ) # loader + 3 methods, as we want right before the next padding method assert len(s[1]) == 1 # just the last method with padding + + +def test_determine_section_padding_no_padding_method_in_section( + mocker: MockerFixture, +): + loader = make_test_loader(mocker) + method_1 = make_test_method(mocker=mocker, padding=False) + method_2 = make_test_method(mocker=mocker, padding=False) + method_3 = make_test_method(mocker=mocker, padding=False) + + pipeline = Pipeline( + loader=loader, + methods=[method_1, method_2, method_3], + ) + sections = sectionize(pipeline) + section_padding = determine_section_padding(sections[0]) + assert section_padding == (0, 0) + + +def test_determine_section_padding_one_padding_method_only_method_in_section( + mocker: MockerFixture, +): + loader = make_test_loader(mocker) + + PADDING = (3, 5) + padding_method = make_test_method(mocker=mocker, padding=True) + mocker.patch.object( + target=padding_method, + attribute="calculate_padding", + return_value=PADDING, + ) + + pipeline = Pipeline(loader=loader, methods=[padding_method]) + sections = sectionize(pipeline) + section_padding = determine_section_padding(sections[0]) + assert section_padding == PADDING + + +def test_determine_section_padding_one_padding_method_and_other_methods_in_section( + mocker: MockerFixture, +): + loader = make_test_loader(mocker) + + PADDING = (3, 5) + padding_method = make_test_method(mocker=mocker, padding=True) + mocker.patch.object( + target=padding_method, + attribute="calculate_padding", + return_value=PADDING, + ) + method_1 = make_test_method(mocker=mocker, padding=False) + method_2 = make_test_method(mocker=mocker, padding=False) + method_3 = make_test_method(mocker=mocker, padding=False) + + pipeline = Pipeline( + loader=loader, + methods=[method_1, method_2, padding_method, method_3], + ) + + sections = sectionize(pipeline) + assert len(sections[0]) == 4 + + section_padding = determine_section_padding(sections[0]) + assert section_padding == PADDING diff --git a/tests/runner/test_task_runner.py b/tests/runner/test_task_runner.py index 18a3a73ac..8f8104fca 100644 --- a/tests/runner/test_task_runner.py +++ b/tests/runner/test_task_runner.py @@ -478,76 +478,6 @@ def test_warns_with_multiple_stores_from_side_outputs( assert "Data saving or/and reslicing operation will be performed 4 times" in args[0] -def test_determine_section_padding_no_padding_method_in_section( - mocker: MockerFixture, - tmp_path: PathLike, -): - loader = make_test_loader(mocker) - method_1 = make_test_method(mocker=mocker, padding=False) - method_2 = make_test_method(mocker=mocker, padding=False) - method_3 = make_test_method(mocker=mocker, padding=False) - - pipeline = Pipeline( - loader=loader, - methods=[method_1, method_2, method_3], - ) - runner = TaskRunner(pipeline=pipeline, reslice_dir=tmp_path, comm=MPI.COMM_WORLD) - sections = sectionize(pipeline) - section_padding = runner.determine_section_padding(sections[0]) - assert section_padding == (0, 0) - - -def test_determine_section_padding_one_padding_method_only_method_in_section( - mocker: MockerFixture, - tmp_path: PathLike, -): - loader = make_test_loader(mocker) - - PADDING = (3, 5) - padding_method = make_test_method(mocker=mocker, padding=True) - mocker.patch.object( - target=padding_method, - attribute="calculate_padding", - return_value=PADDING, - ) - - pipeline = Pipeline(loader=loader, methods=[padding_method]) - runner = TaskRunner(pipeline=pipeline, reslice_dir=tmp_path, comm=MPI.COMM_WORLD) - sections = sectionize(pipeline) - section_padding = runner.determine_section_padding(sections[0]) - assert section_padding == PADDING - - -def test_determine_section_padding_one_padding_method_and_other_methods_in_section( - mocker: MockerFixture, - tmp_path: PathLike, -): - loader = make_test_loader(mocker) - - PADDING = (3, 5) - padding_method = make_test_method(mocker=mocker, padding=True) - mocker.patch.object( - target=padding_method, - attribute="calculate_padding", - return_value=PADDING, - ) - method_1 = make_test_method(mocker=mocker, padding=False) - method_2 = make_test_method(mocker=mocker, padding=False) - method_3 = make_test_method(mocker=mocker, padding=False) - - pipeline = Pipeline( - loader=loader, - methods=[method_1, method_2, padding_method, method_3], - ) - runner = TaskRunner(pipeline=pipeline, reslice_dir=tmp_path, comm=MPI.COMM_WORLD) - - sections = sectionize(pipeline) - assert len(sections[0]) == 4 - - section_padding = runner.determine_section_padding(sections[0]) - assert section_padding == PADDING - - @pytest.mark.parametrize( "nprocs, rank, next_section_slicing_dim, next_section_padding", [ From d04fa40ef6bdc587ecd07c888d710090bf5eb0df Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Fri, 6 Sep 2024 16:15:49 +0100 Subject: [PATCH 2/6] Move section chunk shape calculator out of runner --- httomo/runner/dataset_store_backing.py | 22 ++++++++ httomo/runner/task_runner.py | 20 ------- tests/runner/test_dataset_store_backing.py | 61 ++++++++++++++++++++++ tests/runner/test_task_runner.py | 58 +------------------- 4 files changed, 84 insertions(+), 77 deletions(-) create mode 100644 httomo/runner/dataset_store_backing.py create mode 100644 tests/runner/test_dataset_store_backing.py diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py new file mode 100644 index 000000000..d1938f1f9 --- /dev/null +++ b/httomo/runner/dataset_store_backing.py @@ -0,0 +1,22 @@ +from typing import Tuple + +from mpi4py import MPI + +from httomo.utils import make_3d_shape_from_shape + + +def calculate_section_chunk_shape( + comm: MPI.Comm, + global_shape: Tuple[int, int, int], + slicing_dim: int, + padding: Tuple[int, int], +) -> Tuple[int, int, int]: + """ + Calculate chunk shape (w/ or w/o padding) for a section. + """ + start = round((global_shape[slicing_dim] / comm.size) * comm.rank) + stop = round((global_shape[slicing_dim] / comm.size) * (comm.rank + 1)) + section_slicing_dim_len = stop - start + shape = list(global_shape) + shape[slicing_dim] = section_slicing_dim_len + padding[0] + padding[1] + return make_3d_shape_from_shape(shape) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 7d6fa0276..b71ed873f 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -29,7 +29,6 @@ log_exception, log_once, log_rank, - make_3d_shape_from_shape, ) import numpy as np @@ -352,22 +351,3 @@ def determine_max_slices(self, section: Section, slicing_dim: int): non_slice_dims_shape = output_dims section.max_slices = min(max_slices_methods) - - -def calculate_next_chunk_shape( - comm: MPI.Comm, - global_shape: Tuple[int, int, int], - next_section_slicing_dim: int, - next_section_padding: Tuple[int, int], -) -> Tuple[int, int, int]: - """ - Utility function for calculating the chunk shape (including padding) for the next section. - """ - start = round((global_shape[next_section_slicing_dim] / comm.size) * comm.rank) - stop = round((global_shape[next_section_slicing_dim] / comm.size) * (comm.rank + 1)) - next_section_slicing_dim_len = stop - start - shape = list(global_shape) - shape[next_section_slicing_dim] = ( - next_section_slicing_dim_len + next_section_padding[0] + next_section_padding[1] - ) - return make_3d_shape_from_shape(shape) diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py new file mode 100644 index 000000000..f4945e477 --- /dev/null +++ b/tests/runner/test_dataset_store_backing.py @@ -0,0 +1,61 @@ +from typing import List, Tuple + +import pytest +from mpi4py import MPI +from pytest_mock import MockerFixture + +from httomo.runner.dataset_store_backing import calculate_section_chunk_shape +from httomo.utils import make_3d_shape_from_shape + + +@pytest.mark.parametrize( + "nprocs, rank, section_slicing_dim, section_padding", + [ + (2, 1, 0, (0, 0)), + (2, 1, 0, (3, 5)), + (2, 1, 1, (0, 0)), + (2, 1, 1, (3, 5)), + (4, 2, 0, (0, 0)), + (4, 2, 0, (3, 5)), + (4, 2, 1, (0, 0)), + (4, 2, 1, (3, 5)), + ], + ids=[ + "2procs-proj-to-proj_unpadded", + "2procs-proj-to-proj_padded", + "2procs-proj-to-sino_unpadded", + "2procs-proj-to-sino_padded", + "4procs-proj-to-proj_unpadded", + "4procs-proj-to-proj_padded", + "4procs-proj-to-sino_unpadded", + "4procs-proj-to-sino_padded", + ], +) +def test_calculate_section_chunk_shape( + nprocs: int, + rank: int, + section_slicing_dim: int, + section_padding: Tuple[int, int], + mocker: MockerFixture, +): + GLOBAL_SHAPE = (1801, 2160, 2560) + + # Define mock communicator that reflects the desired data splitting/distribution to be + # tested + mock_global_comm = mocker.create_autospec(spec=MPI.Comm, size=nprocs, rank=rank) + + # The chunk shape for the section should reflect the padding needed for that section + expected_chunk_shape: List[int] = list(GLOBAL_SHAPE) + start = round(GLOBAL_SHAPE[section_slicing_dim] / nprocs * rank) + stop = round(GLOBAL_SHAPE[section_slicing_dim] / nprocs * (rank + 1)) + slicing_dim_len = stop - start + expected_chunk_shape[section_slicing_dim] = ( + slicing_dim_len + section_padding[0] + section_padding[1] + ) + section_chunk_shape = calculate_section_chunk_shape( + comm=mock_global_comm, + global_shape=GLOBAL_SHAPE, + slicing_dim=section_slicing_dim, + padding=section_padding, + ) + assert section_chunk_shape == make_3d_shape_from_shape(expected_chunk_shape) diff --git a/tests/runner/test_task_runner.py b/tests/runner/test_task_runner.py index 8f8104fca..77ae3ee36 100644 --- a/tests/runner/test_task_runner.py +++ b/tests/runner/test_task_runner.py @@ -15,10 +15,9 @@ from httomo.runner.output_ref import OutputRef from httomo.runner.pipeline import Pipeline from httomo.runner.section import Section, sectionize -from httomo.runner.task_runner import TaskRunner, calculate_next_chunk_shape +from httomo.runner.task_runner import TaskRunner from httomo.utils import ( Pattern, - make_3d_shape_from_shape, xp, gpu_enabled, ) @@ -476,58 +475,3 @@ def test_warns_with_multiple_stores_from_side_outputs( spy.assert_called() args, _ = spy.call_args assert "Data saving or/and reslicing operation will be performed 4 times" in args[0] - - -@pytest.mark.parametrize( - "nprocs, rank, next_section_slicing_dim, next_section_padding", - [ - (2, 1, 0, (0, 0)), - (2, 1, 0, (3, 5)), - (2, 1, 1, (0, 0)), - (2, 1, 1, (3, 5)), - (4, 2, 0, (0, 0)), - (4, 2, 0, (3, 5)), - (4, 2, 1, (0, 0)), - (4, 2, 1, (3, 5)), - ], - ids=[ - "2procs-proj-to-proj_unpadded", - "2procs-proj-to-proj_padded", - "2procs-proj-to-sino_unpadded", - "2procs-proj-to-sino_padded", - "4procs-proj-to-proj_unpadded", - "4procs-proj-to-proj_padded", - "4procs-proj-to-sino_unpadded", - "4procs-proj-to-sino_padded", - ], -) -def test_calculate_next_chunk_shape( - nprocs: int, - rank: int, - next_section_slicing_dim: int, - next_section_padding: Tuple[int, int], - mocker: MockerFixture, -): - GLOBAL_SHAPE = (1801, 2160, 2560) - - # Define mock communicator that reflects the desired data splitting/distribution to be - # tested - mock_global_comm = mocker.create_autospec(spec=MPI.Comm, size=nprocs, rank=rank) - - # The chunk shape for the next section should reflect the padding needed for that section - expected_next_chunk_shape: List[int] = list(GLOBAL_SHAPE) - start = round(GLOBAL_SHAPE[next_section_slicing_dim] / nprocs * rank) - stop = round(GLOBAL_SHAPE[next_section_slicing_dim] / nprocs * (rank + 1)) - slicing_dim_len = stop - start - expected_next_chunk_shape[next_section_slicing_dim] = ( - slicing_dim_len + next_section_padding[0] + next_section_padding[1] - ) - next_section_chunk_shape = calculate_next_chunk_shape( - comm=mock_global_comm, - global_shape=GLOBAL_SHAPE, - next_section_slicing_dim=next_section_slicing_dim, - next_section_padding=next_section_padding, - ) - assert next_section_chunk_shape == make_3d_shape_from_shape( - expected_next_chunk_shape - ) From 1429838ee40ba9f5cf5ac775156e0de0b4df716d Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Mon, 9 Sep 2024 11:14:44 +0100 Subject: [PATCH 3/6] Add section output chunk bytes calculator --- httomo/runner/dataset_store_backing.py | 30 +++++- tests/runner/test_dataset_store_backing.py | 118 ++++++++++++++++++++- 2 files changed, 145 insertions(+), 3 deletions(-) diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index d1938f1f9..925027ad3 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -1,8 +1,11 @@ from typing import Tuple +import numpy as np +from numpy.typing import DTypeLike from mpi4py import MPI -from httomo.utils import make_3d_shape_from_shape +from httomo.runner.section import Section +from httomo.utils import _get_slicing_dim, make_3d_shape_from_shape def calculate_section_chunk_shape( @@ -20,3 +23,28 @@ def calculate_section_chunk_shape( shape = list(global_shape) shape[slicing_dim] = section_slicing_dim_len + padding[0] + padding[1] return make_3d_shape_from_shape(shape) + + +def calculate_section_chunk_bytes( + chunk_shape: Tuple[int, int, int], + dtype: DTypeLike, + section: Section, +) -> int: + """ + Calculate the number of bytes in the section output chunk that is written to the store. Ths + accounts for data's non-slicing dims changing during processing, which changes the chunk + shape for the section and thus affects the number of bytes in the chunk. + """ + slicing_dim = _get_slicing_dim(section.pattern) - 1 + non_slice_dims_list = list(chunk_shape) + non_slice_dims_list.pop(slicing_dim) + non_slice_dims = (non_slice_dims_list[0], non_slice_dims_list[1]) + + for method in section.methods: + if method.memory_gpu is None: + continue + non_slice_dims = method.calculate_output_dims(non_slice_dims) + + return int( + np.prod(non_slice_dims) * chunk_shape[slicing_dim] * np.dtype(dtype).itemsize + ) diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py index f4945e477..fac4a1a3c 100644 --- a/tests/runner/test_dataset_store_backing.py +++ b/tests/runner/test_dataset_store_backing.py @@ -1,11 +1,19 @@ from typing import List, Tuple import pytest +import numpy as np from mpi4py import MPI from pytest_mock import MockerFixture -from httomo.runner.dataset_store_backing import calculate_section_chunk_shape -from httomo.utils import make_3d_shape_from_shape +from httomo.runner.dataset_store_backing import ( + calculate_section_chunk_shape, + calculate_section_chunk_bytes, +) +from httomo.runner.methods_repository_interface import GpuMemoryRequirement +from httomo.runner.pipeline import Pipeline +from httomo.runner.section import sectionize +from httomo.utils import Pattern, make_3d_shape_from_shape +from tests.testing_utils import make_test_loader, make_test_method @pytest.mark.parametrize( @@ -59,3 +67,109 @@ def test_calculate_section_chunk_shape( padding=section_padding, ) assert section_chunk_shape == make_3d_shape_from_shape(expected_chunk_shape) + + +def test_calculate_section_chunk_bytes_output_dims_change(mocker: MockerFixture): + NEW_OUTPUT_DIMS = (300, 400) + SECTION_INPUT_CHUNK_SHAPE = (100, 10, 100) + DTYPE = np.float32 + EXPECTED_SECTION_OUTPUT_CHUNK_SHAPE = ( + SECTION_INPUT_CHUNK_SHAPE[0], + NEW_OUTPUT_DIMS[0], + NEW_OUTPUT_DIMS[1], + ) + EXPECTED_SECTION_OUTPUT_CHUNK_BYTES = ( + np.prod(EXPECTED_SECTION_OUTPUT_CHUNK_SHAPE) * np.dtype(DTYPE).itemsize + ) + + # Define methods to form section, one of which changes the output shape + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method( + mocker=mocker, method_name="m2", pattern=Pattern.projection, gpu=True + ) + mocker.patch.object( + m2, + "memory_gpu", + [GpuMemoryRequirement(multiplier=2.0, method="direct")], + ) + mocker.patch.object( + target=m2, + attribute="calculate_output_dims", + return_value=NEW_OUTPUT_DIMS, + ) + + # Generate list of sections + pipeline = Pipeline(loader=loader, methods=[m1, m2]) + sections = sectionize(pipeline) + + # Check that the number of bytes in the chunk accounts for the non-slicing dims change by + # the method in the section + section_output_chunk_bytes = calculate_section_chunk_bytes( + chunk_shape=SECTION_INPUT_CHUNK_SHAPE, + dtype=DTYPE, + section=sections[0], + ) + assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES + + +def test_calculate_section_chunk_bytes_output_dims_change_and_swap( + mocker: MockerFixture, +): + RECON_SIZE = 400 + SECTION_INPUT_CHUNK_SHAPE = (100, 10, 100) + DTYPE = np.float32 + EXPECTED_SECTION_OUTPUT_CHUNK_SHAPE = ( + SECTION_INPUT_CHUNK_SHAPE[1], + RECON_SIZE, + RECON_SIZE, + ) + EXPECTED_SECTION_OUTPUT_CHUNK_BYTES = ( + np.prod(EXPECTED_SECTION_OUTPUT_CHUNK_SHAPE) * np.dtype(DTYPE).itemsize + ) + + # Define methods to form section, one of which changes the output shape and also swaps the + # output dims 0 and 1 (which can happen in recon methods) + loader = make_test_loader(mocker=mocker) + m1 = make_test_method( + mocker=mocker, method_name="stripe-removal", pattern=Pattern.sinogram + ) + # NOTE: The reason that `swap_dims_on_output=True` is not passed to `make_test_method()` + # even though `m2` is assumed to be swapping the output dims is because that doesn't + # actually do anything when not running the method on data. + # + # The output dims being swapped is not taken into account when calling + # `calculate_output_dims()` on a method wrapper object, and the output dims swapping is + # implicitly done when the method executes. + # + # It however still seemed reasonable to check that the correct chunk bytes value was given + # if there's a method in a section that swaps the output dims. + m2 = make_test_method( + mocker=mocker, + method_name="recon", + pattern=Pattern.sinogram, + gpu=True, + ) + mocker.patch.object( + m2, + "memory_gpu", + [GpuMemoryRequirement(multiplier=2.0, method="direct")], + ) + mocker.patch.object( + target=m2, + attribute="calculate_output_dims", + return_value=(RECON_SIZE, RECON_SIZE), + ) + + # Generate list of sections + pipeline = Pipeline(loader=loader, methods=[m1, m2]) + sections = sectionize(pipeline) + + # Check that the number of bytes in the chunk accounts for the non-slicing dims change by + # the method in the section + section_output_chunk_bytes = calculate_section_chunk_bytes( + chunk_shape=SECTION_INPUT_CHUNK_SHAPE, + dtype=DTYPE, + section=sections[0], + ) + assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES From 7205fcdfad4877ba673bc597f2c1c0af7403ead1 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Mon, 9 Sep 2024 11:34:18 +0100 Subject: [PATCH 4/6] Move sectionising to task runner's constructor --- httomo/runner/task_runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index b71ed873f..0536a33f6 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -55,12 +55,13 @@ def __init__( self._memory_limit_bytes = memory_limit_bytes + self._sections = self._sectionize() + def execute(self) -> None: with catchtime() as t: - sections = self._sectionize() self._prepare() - for i, section in enumerate(sections): + for i, section in enumerate(self._sections): self._execute_section(section, i) gpumem_cleanup() From 30114fff86ce3b7844437da7a562c2169b9bd382 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Mon, 9 Sep 2024 15:51:23 +0100 Subject: [PATCH 5/6] Add function to determine the store backing For any section that is not the last section, approximately two chunk copies need to be accounted for: - the unpadded chunk that the writer of section `n` creates - the padded chunk that the reader of section `n+1` creates For the last section in a pipeline, only one chunk copy needs to be accounted for: the unpadded chunk that the writer of that section creates. --- httomo/runner/dataset_store_backing.py | 132 +++++++- tests/runner/test_dataset_store_backing.py | 340 +++++++++++++++++++++ 2 files changed, 470 insertions(+), 2 deletions(-) diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index 925027ad3..250b03d66 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -1,10 +1,11 @@ -from typing import Tuple +from enum import Enum +from typing import Callable, List, ParamSpec, Tuple import numpy as np from numpy.typing import DTypeLike from mpi4py import MPI -from httomo.runner.section import Section +from httomo.runner.section import Section, determine_section_padding from httomo.utils import _get_slicing_dim, make_3d_shape_from_shape @@ -48,3 +49,130 @@ def calculate_section_chunk_bytes( return int( np.prod(non_slice_dims) * chunk_shape[slicing_dim] * np.dtype(dtype).itemsize ) + + +class DataSetStoreBacking(Enum): + RAM = 1 + File = 2 + + +P = ParamSpec("P") + + +def _reduce_decorator_factory( + comm: MPI.Comm, +) -> Callable[[Callable[P, DataSetStoreBacking]], Callable[P, DataSetStoreBacking]]: + """ + Generate decorator for store-backing calculator function that will use the given MPI + communicator for the reduce operation. + """ + + def reduce_decorator( + func: Callable[P, DataSetStoreBacking] + ) -> Callable[P, DataSetStoreBacking]: + """ + Decorator for store-backing calculator function. + """ + + def wrapper(*args: P.args, **kwargs: P.kwargs) -> DataSetStoreBacking: + """ + Perform store-backing calculation across all MPI processes and reduce. + """ + # reduce store backing enum variant across all processes - if any has + # `File` variant, all should use a file + send_buffer = np.zeros(1, dtype=bool) + recv_buffer = np.zeros(1, dtype=bool) + store_backing = func(*args, **kwargs) + + if store_backing is DataSetStoreBacking.File: + send_buffer[0] = True + + # do a logical or of all the enum variants across the processes + comm.Allreduce([send_buffer, MPI.BOOL], [recv_buffer, MPI.BOOL], MPI.LOR) + + if bool(recv_buffer[0]) is True: + return DataSetStoreBacking.File + + return DataSetStoreBacking.RAM + + return wrapper + + return reduce_decorator + + +def _non_last_section_in_pipeline( + memory_limit_bytes: int, + write_chunk_bytes: int, + read_chunk_bytes: int, +) -> DataSetStoreBacking: + """ + Calculate backing of dataset store for non-last sections in pipeline + """ + if ( + memory_limit_bytes > 0 + and write_chunk_bytes + read_chunk_bytes >= memory_limit_bytes + ): + return DataSetStoreBacking.File + + return DataSetStoreBacking.RAM + + +def _last_section_in_pipeline( + memory_limit_bytes: int, + write_chunk_bytes: int, +) -> DataSetStoreBacking: + """ + Calculate backing of dataset store for last section in pipeline + """ + if memory_limit_bytes > 0 and write_chunk_bytes >= memory_limit_bytes: + return DataSetStoreBacking.File + + return DataSetStoreBacking.RAM + + +def determine_store_backing( + comm: MPI.Comm, + sections: List[Section], + memory_limit_bytes: int, + dtype: DTypeLike, + global_shape: Tuple[int, int, int], + section_idx: int, +) -> DataSetStoreBacking: + reduce_decorator = _reduce_decorator_factory(comm) + + # Get chunk shape input to section + current_chunk_shape = calculate_section_chunk_shape( + comm=comm, + global_shape=global_shape, + slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, + padding=(0, 0), + ) + + # Get the number of bytes in the input chunk to the section w/ potential modifications to + # the non-slicing dims + current_chunk_bytes = calculate_section_chunk_bytes( + chunk_shape=current_chunk_shape, + dtype=dtype, + section=sections[section_idx], + ) + + if section_idx == len(sections) - 1: + return reduce_decorator(_last_section_in_pipeline)( + memory_limit_bytes=memory_limit_bytes, + write_chunk_bytes=current_chunk_bytes, + ) + + # Get chunk shape created by reader of section `n+1`, that will add padding to the + # chunk shape written by the writer of section `n` + next_chunk_shape = calculate_section_chunk_shape( + comm=comm, + global_shape=global_shape, + slicing_dim=_get_slicing_dim(sections[section_idx + 1].pattern) - 1, + padding=determine_section_padding(sections[section_idx + 1]), + ) + next_chunk_bytes = int(np.prod(next_chunk_shape) * np.dtype(dtype).itemsize) + return reduce_decorator(_non_last_section_in_pipeline)( + memory_limit_bytes=memory_limit_bytes, + write_chunk_bytes=current_chunk_bytes, + read_chunk_bytes=next_chunk_bytes, + ) diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py index fac4a1a3c..7b61cef7b 100644 --- a/tests/runner/test_dataset_store_backing.py +++ b/tests/runner/test_dataset_store_backing.py @@ -6,8 +6,10 @@ from pytest_mock import MockerFixture from httomo.runner.dataset_store_backing import ( + DataSetStoreBacking, calculate_section_chunk_shape, calculate_section_chunk_bytes, + determine_store_backing, ) from httomo.runner.methods_repository_interface import GpuMemoryRequirement from httomo.runner.pipeline import Pipeline @@ -173,3 +175,341 @@ def test_calculate_section_chunk_bytes_output_dims_change_and_swap( section=sections[0], ) assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES + + +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (3 * 1024**2, DataSetStoreBacking.File), + (4 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["3MB-limit-file-backing", "4MB-limit-ram-backing"], +) +def test_determine_store_backing_last_section_pipeline_single_proc( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype and shape combined makes: + # - the write chunk ~3.4MB + # - the read chunk also ~3.4MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + method = make_test_method( + mocker=mocker, method_name="method", pattern=Pattern.projection + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[method], + ) + sections = sectionize(pipeline) + + # Based on memory limit and the given section in the pipeline, determine the backing of the + # store for the execution of that section + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (1 * 1024**2, DataSetStoreBacking.File), + (2 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["1MB-limit-file-backing", "2MB-limit-ram-backing"], +) +def test_determine_store_backing_last_section_pipeline_two_procs( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For two processes, chunk shape = half of global shape + # + # The dtype and shape combined makes: + # - the write chunk ~1.7MB + # - the read chunk also ~1.7MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + method = make_test_method( + mocker=mocker, method_name="method", pattern=Pattern.projection + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[method], + ) + sections = sectionize(pipeline) + + # Based on memory limit and the given section in the pipeline, determine the backing of the + # store for the execution of that section + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (6 * 1024**2, DataSetStoreBacking.File), + (7 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["6MB-limit-file-backing", "7MB-limit-ram-backing"], +) +def test_determine_store_backing_non_last_section_pipeline_single_proc( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype and shape combined makes: + # - the write chunk ~3.4MB + # - the read chunk also ~3.4MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + # For execution of non-last sections in pipelines, the writer must take into account that a + # copy of the chunk is made by the reader of the following section. Therefore, two copies + # of the chunk must be taken into account when deciding the backing of the store. + # + # Note that section 0 is only the section that is "not the last section", so it's the only + # one that will need to account for two copies of the chunk, and thus the main target of + # the test. Hence, why `section_idx=0` is given. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (3 * 1024**2, DataSetStoreBacking.File), + (4 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["3MB-limit-file-backing", "4MB-limit-ram-backing"], +) +def test_determine_store_backing_non_last_section_pipeline_two_procs( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For two processes, chunk shape = half of global shape + # + # The dtype and shape combined makes: + # - the write chunk ~1.7MB + # - the read chunk also ~1.7MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + # For exeuction of non-last sections in pipelines, the writer must take into account that a + # copy of the chunk is made by the reader of the following section. Therefore, two copies + # of the chunk must be taken into account when deciding the backing of the store. + # + # Note that section 0 is only the section that is "not the last section", so it's the only + # one that will need to account for two copies of the chunk, and thus the main target of + # the test. Hence, why `section_idx=0` is given. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (7 * 1024**2, DataSetStoreBacking.File), + (10 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["7MB-limit-file-backing", "10MB-limit-ram-backing"], +) +def test_determine_store_backing_non_last_section_pipeline_large_padding_single_proc( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype, shape, and padding combined makes: + # - the write chunk ~3.4MB + # - the read chunk ~5.7MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + PADDING = (50, 50) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method( + mocker=mocker, method_name="m2", pattern=Pattern.sinogram, padding=True + ) + mocker.patch.object( + target=m2, + attribute="calculate_padding", + return_value=PADDING, + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + # For execution of non-last sections in pipelines, the writer must take into account that a + # copy of the chunk is made by the reader of the following section. Therefore, two copies + # of the chunk must be taken into account when deciding the backing of the store. + # + # Note that section 0 is only the section that is "not the last section", so it's the only + # one that will need to account for two copies of the chunk, and thus the main target of + # the test. Hence, why `section_idx=0` is given. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing + + +@pytest.mark.mpi +@pytest.mark.skipif( + MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" +) +@pytest.mark.parametrize( + "memory_limit, expected_store_backing", + [ + (4 * 1024**2, DataSetStoreBacking.File), + (5 * 1024**2, DataSetStoreBacking.RAM), + ], + ids=["4MB-limit-file-backing", "5MB-limit-ram-backing"], +) +def test_determine_store_backing_non_last_section_pipeline_large_padding_two_procs( + mocker: MockerFixture, + memory_limit: int, + expected_store_backing: DataSetStoreBacking, +): + COMM = MPI.COMM_WORLD + + # For a single process, chunk shape = global shape + # + # The dtype, shape, and padding combined makes: + # - the write chunk ~1.7MB + # - the read chunk ~2.8MB + DTYPE = np.float32 + GLOBAL_SHAPE = (10, 300, 300) + PADDING = (50, 50) + + # Define dummy loader and method wrapper objects + loader = make_test_loader(mocker=mocker) + m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) + m2 = make_test_method( + mocker=mocker, method_name="m2", pattern=Pattern.sinogram, padding=True + ) + mocker.patch.object( + target=m2, + attribute="calculate_padding", + return_value=PADDING, + ) + + # Get list of section objects that represent pipeline + pipeline = Pipeline( + loader=loader, + methods=[m1, m2], + ) + sections = sectionize(pipeline) + + # For execution of non-last sections in pipelines, the writer must take into account that a + # copy of the chunk is made by the reader of the following section. Therefore, two copies + # of the chunk must be taken into account when deciding the backing of the store. + # + # Note that section 0 is only the section that is "not the last section", so it's the only + # one that will need to account for two copies of the chunk, and thus the main target of + # the test. Hence, why `section_idx=0` is given. + store_backing = determine_store_backing( + comm=COMM, + sections=sections, + memory_limit_bytes=memory_limit, + dtype=DTYPE, + global_shape=GLOBAL_SHAPE, + section_idx=0, + ) + assert store_backing is expected_store_backing From c7ed8b2752ebe743efe14cc2a5010b467605f472 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Tue, 10 Sep 2024 15:48:16 +0100 Subject: [PATCH 6/6] Move store backing calculation out of dataset writer Prior to this change, the decision of whether the dataset store was backed by RAM or an hdf5 file was made by the writer. Specifically, the writer was deciding whether the store should be backed by RAM or an hdf5 file when the first block was written to the store. This made a lot of sense when the determining of the store-backing was done by: - attempting to create a numpy array with the necessary chunk shape (thus allocating the required memory) - potentially catching a `MemoryError` being raised by python However, some changes were made in #218 to accommodate the fact that, running under SLURM, the job would be OOM killed by SLURM without giving python a chance to raise a `MemoryError`. Among these changes were that, instead of allocating a numpy array and then catching if a `MemoryError` was raised, the required number of bytes was calculated to determine if the numpy array should be created or not. This essentially made the determining of the store backing be an operation purely involving some simple arithmetic. The arithmetic could conceivably be done outside of the writer (ie, prior to the first block being written to the store). One benefit of this would be that the decision of the store backing wouldn't be so hidden as to where it was happening. Furthermore, the requirement of needing to account for two copies of the chunk in memory (see the following comment and the linked thread in the comment for more details https://github.com/DiamondLightSource/httomo/pull/401#issuecomment-2269256537), was difficult to fulfil if the writer was the object deciding what the backing of the store should be (ie, getting the chunk information about the next section, to the writer of the current section, appeared to be tricky). With the above in mind, the calculation of if the store should be backed by RAM or an hdf5 file has been moved out of the writer, and is now performed by the task runner. The runner determines the backing of the store, and passes that information to the writer's constructor. The writer now simply uses whichever backing it is told to use by the runner. --- httomo/data/dataset_store.py | 35 ++---- httomo/runner/task_runner.py | 16 ++- tests/data/test_dataset_store.py | 179 +++++++------------------------ tests/runner/test_task_runner.py | 15 ++- 4 files changed, 70 insertions(+), 175 deletions(-) diff --git a/httomo/data/dataset_store.py b/httomo/data/dataset_store.py index 0a9c79e8b..52ed911f7 100644 --- a/httomo/data/dataset_store.py +++ b/httomo/data/dataset_store.py @@ -29,6 +29,7 @@ from httomo.data.padding import extrapolate_after, extrapolate_before from httomo.runner.auxiliary_data import AuxiliaryData from httomo.runner.dataset import DataSetBlock +from httomo.runner.dataset_store_backing import DataSetStoreBacking from httomo.runner.dataset_store_interfaces import ( DataSetSource, ReadableDataSetSink, @@ -57,7 +58,7 @@ def __init__( slicing_dim: Literal[0, 1, 2], comm: MPI.Comm, temppath: PathLike, - memory_limit_bytes: int = 0, + store_backing: DataSetStoreBacking = DataSetStoreBacking.RAM, ): self._slicing_dim = slicing_dim self._comm = comm @@ -66,7 +67,7 @@ def __init__( self._readonly = False self._h5file: Optional[h5py.File] = None self._h5filename: Optional[Path] = None - self._memory_limit_bytes: int = memory_limit_bytes + self._store_backing = store_backing self._data: Optional[Union[np.ndarray, h5py.Dataset]] = None @@ -79,7 +80,7 @@ def __init__( @property def is_file_based(self) -> bool: - return self._h5filename is not None + return self._store_backing is DataSetStoreBacking.File @property def filename(self) -> Optional[Path]: @@ -177,23 +178,12 @@ def _get_global_h5_filename(self) -> PathLike: return self._h5filename def _create_new_data(self, block: DataSetBlock): - # reduce memory errors across all processes - if any has a memory problem, - # all should use a file - sendBuffer = np.zeros(1, dtype=bool) - recvBuffer = np.zeros(1, dtype=bool) - try: + if self._store_backing is DataSetStoreBacking.RAM: self._data = self._create_numpy_data( unpadded_chunk_shape=block.chunk_shape_unpadded, - padded_chunk_shape=block.chunk_shape, dtype=block.data.dtype, ) - except MemoryError: - sendBuffer[0] = True - - # do a logical or of all the memory errors across the processes - self.comm.Allreduce([sendBuffer, MPI.BOOL], [recvBuffer, MPI.BOOL], MPI.LOR) - - if bool(recvBuffer[0]) is True: + else: log_once( "Chunk does not fit in memory - using a file-based store", level=logging.WARNING, @@ -208,20 +198,9 @@ def _create_new_data(self, block: DataSetBlock): ) def _create_numpy_data( - self, - unpadded_chunk_shape: Tuple[int, int, int], - padded_chunk_shape: Tuple[int, int, int], - dtype: DTypeLike, + self, unpadded_chunk_shape: Tuple[int, int, int], dtype: DTypeLike ) -> np.ndarray: """Convenience method to enable mocking easily""" - unpadded_chunk_bytes = np.prod(unpadded_chunk_shape) * np.dtype(dtype).itemsize - padded_chunk_bytes = np.prod(padded_chunk_shape) * np.dtype(dtype).itemsize - if ( - self._memory_limit_bytes > 0 - and unpadded_chunk_bytes + padded_chunk_bytes >= self._memory_limit_bytes - ): - raise MemoryError("Memory limit reached") - return np.empty(unpadded_chunk_shape, dtype) def _create_h5_data( diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 0536a33f6..d84367237 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -9,6 +9,7 @@ import httomo.globals from httomo.data.dataset_store import DataSetStoreWriter +from httomo.runner.dataset_store_backing import determine_store_backing from httomo.runner.method_wrapper import MethodWrapper from httomo.runner.block_split import BlockSplitter from httomo.runner.dataset import DataSetBlock @@ -81,7 +82,7 @@ def _sectionize(self) -> List[Section]: return sections def _execute_section(self, section: Section, section_index: int = 0): - self._setup_source_sink(section) + self._setup_source_sink(section, section_index) assert self.source is not None, "Dataset has not been loaded yet" assert self.sink is not None, "Sink setup failed" @@ -162,7 +163,7 @@ def _execute_section(self, section: Section, section_index: int = 0): level=logging.INFO, ) - def _setup_source_sink(self, section: Section): + def _setup_source_sink(self, section: Section, idx: int): assert self.source is not None, "Dataset has not been loaded yet" slicing_dim_section: Literal[0, 1] = _get_slicing_dim(section.pattern) - 1 # type: ignore @@ -173,6 +174,15 @@ def _setup_source_sink(self, section: Section): assert isinstance(self.sink, ReadableDataSetSink) self.source = self.sink.make_reader(slicing_dim_section) + store_backing = determine_store_backing( + comm=self.comm, + sections=self._sections, + memory_limit_bytes=self._memory_limit_bytes, + dtype=self.source.dtype, + global_shape=self.source.global_shape, + section_idx=idx, + ) + if section.is_last: # we don't need to store the results - this sink just discards it self.sink = DummySink(slicing_dim_section) @@ -181,7 +191,7 @@ def _setup_source_sink(self, section: Section): slicing_dim_section, self.comm, self.reslice_dir, - memory_limit_bytes=self._memory_limit_bytes, + store_backing=store_backing, ) def _execute_section_block( diff --git a/tests/data/test_dataset_store.py b/tests/data/test_dataset_store.py index 7ff1fdb67..ec19270df 100644 --- a/tests/data/test_dataset_store.py +++ b/tests/data/test_dataset_store.py @@ -1,6 +1,6 @@ from os import PathLike from pathlib import Path -from typing import List, Literal +from typing import Literal from unittest.mock import ANY import numpy as np import pytest @@ -11,6 +11,7 @@ from httomo.runner.auxiliary_data import AuxiliaryData from httomo.runner.dataset import DataSetBlock +from httomo.runner.dataset_store_backing import DataSetStoreBacking from httomo.utils import make_3d_shape_from_shape @@ -59,14 +60,18 @@ def test_reader_throws_if_no_data(tmp_path: PathLike): assert "no data" in str(e) -@pytest.mark.parametrize("file_based", [False, True]) +@pytest.mark.parametrize( + "store_backing", [DataSetStoreBacking.RAM, DataSetStoreBacking.File] +) def test_can_write_and_read_blocks( - mocker: MockerFixture, tmp_path: PathLike, file_based: bool + tmp_path: PathLike, + store_backing: DataSetStoreBacking, ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=store_backing, ) GLOBAL_SHAPE = (10, 10, 10) @@ -94,8 +99,6 @@ def test_can_write_and_read_blocks( chunk_start=chunk_start, ) - if file_based: - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(block1) writer.write_block(block2) @@ -116,25 +119,22 @@ def test_can_write_and_read_blocks( np.testing.assert_array_equal(rblock2.data, block2.data) -@pytest.mark.parametrize("file_based", [False, True]) +@pytest.mark.parametrize( + "store_backing", [DataSetStoreBacking.RAM, DataSetStoreBacking.File] +) def test_write_after_read_throws( - mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike, - file_based: bool, + store_backing: DataSetStoreBacking, ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=store_backing, ) - - if file_based: - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(dummy_block) - writer.make_reader() - with pytest.raises(ValueError): writer.write_block(dummy_block) @@ -146,9 +146,9 @@ def test_writer_closes_file_on_finalize( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=DataSetStoreBacking.File, ) - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(dummy_block) fileclose = mocker.patch.object(writer._h5file, "close") writer.finalize() @@ -157,16 +157,15 @@ def test_writer_closes_file_on_finalize( def test_making_reader_closes_file_and_deletes( - mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike + dummy_block: DataSetBlock, tmp_path: PathLike ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=DataSetStoreBacking.File, ) - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(dummy_block) - reader = writer.make_reader() assert writer._h5file is None @@ -340,16 +339,16 @@ def test_writing_inconsistent_global_index_fails(tmp_path: PathLike): assert "inconsistent shape" in str(e) -def test_create_new_data_goes_to_file_on_memory_error( +def test_create_new_data_goes_to_file_for_file_store_backing( mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=DataSetStoreBacking.File, ) - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) createh5_mock = mocker.patch.object( writer, "_create_h5_data", return_value=dummy_block.data ) @@ -364,46 +363,6 @@ def test_create_new_data_goes_to_file_on_memory_error( ) -def test_create_new_data_goes_to_file_on_memory_limit( - mocker: MockerFixture, tmp_path: PathLike -): - GLOBAL_SHAPE = (500, 10, 10) - data = np.ones(GLOBAL_SHAPE, dtype=np.float32) - aux_data = AuxiliaryData( - angles=np.ones(data.shape[0], dtype=np.float32), - darks=2.0 * np.ones((2, GLOBAL_SHAPE[1], GLOBAL_SHAPE[2]), dtype=np.float32), - flats=1.0 * np.ones((2, GLOBAL_SHAPE[1], GLOBAL_SHAPE[2]), dtype=np.float32), - ) - block = DataSetBlock( - data=data[0:2, :, :], - aux_data=aux_data, - slicing_dim=0, - block_start=0, - chunk_start=0, - global_shape=GLOBAL_SHAPE, - chunk_shape=GLOBAL_SHAPE, - ) - writer = DataSetStoreWriter( - slicing_dim=0, - comm=MPI.COMM_WORLD, - temppath=tmp_path, - memory_limit_bytes=block.data.nbytes + 5, # only one block will fit in memory - ) - - createh5_mock = mocker.patch.object( - writer, "_create_h5_data", return_value=block.data - ) - - writer.write_block(block) - - createh5_mock.assert_called_with( - writer.global_shape, - block.data.dtype, - ANY, - writer.comm, - ) - - def test_calls_reslice( mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike ): @@ -422,24 +381,25 @@ def test_calls_reslice( reslice_mock.assert_called_with(0, 1, d) -@pytest.mark.parametrize("file_based", [False, True]) +@pytest.mark.parametrize( + "store_backing", [DataSetStoreBacking.RAM, DataSetStoreBacking.File] +) def test_reslice_single_block_single_process( - mocker: MockerFixture, dummy_block: DataSetBlock, tmp_path: PathLike, - file_based: bool, + store_backing: DataSetStoreBacking, ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=store_backing, ) - if file_based: - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(dummy_block) - assert writer.is_file_based is file_based + expected_is_file_based = store_backing is DataSetStoreBacking.File + assert writer.is_file_based is expected_is_file_based reader = writer.make_reader(new_slicing_dim=1) @@ -456,7 +416,7 @@ def test_reslice_single_block_single_process( assert block.chunk_shape == reader.chunk_shape assert isinstance(reader, DataSetStoreReader) - assert reader.is_file_based is file_based + assert reader.is_file_based is expected_is_file_based np.testing.assert_array_equal(block.data, dummy_block.data[:, 1:3, :]) np.testing.assert_array_equal(block.flats, dummy_block.flats) @@ -469,18 +429,12 @@ def test_reslice_single_block_single_process( MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" ) @pytest.mark.parametrize( - "out_of_memory_ranks", - [[], [1], [0, 1]], - ids=[ - "out_of_memory=none", - "out_of_memory=one process", - "out_of_memory=both processes", - ], + "store_backing", + [DataSetStoreBacking.RAM, DataSetStoreBacking.File], ) def test_full_integration_with_reslice( - mocker: MockerFixture, tmp_path: PathLike, - out_of_memory_ranks: List[int], + store_backing: DataSetStoreBacking, ): ########### ARRANGE DATA and mocks @@ -512,12 +466,9 @@ def test_full_integration_with_reslice( slicing_dim=0, comm=comm, temppath=tmp_path, + store_backing=store_backing, ) - if comm.rank in out_of_memory_ranks: - # make it throw an exception, so it reverts to file-based store - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) - ############# ACT # write full chunk-sized block @@ -560,14 +511,18 @@ def test_full_integration_with_reslice( ) -@pytest.mark.parametrize("file_based", [False, True]) +@pytest.mark.parametrize( + "store_backing", [DataSetStoreBacking.RAM, DataSetStoreBacking.File] +) def test_can_write_blocks_with_padding_and_read( - mocker: MockerFixture, tmp_path: PathLike, file_based: bool + tmp_path: PathLike, + store_backing: DataSetStoreBacking, ): writer = DataSetStoreWriter( slicing_dim=0, comm=MPI.COMM_WORLD, temppath=tmp_path, + store_backing=store_backing, ) GLOBAL_SHAPE = (10, 10, 10) @@ -616,8 +571,6 @@ def test_can_write_blocks_with_padding_and_read( padding=padding, ) - if file_based: - mocker.patch.object(writer, "_create_numpy_data", side_effect=MemoryError) writer.write_block(block1) writer.write_block(block2) @@ -1045,63 +998,3 @@ def test_adapts_shapes_with_padding_and_reslicing_mpi( np.testing.assert_array_equal(block1.data, b1expected) np.testing.assert_array_equal(block2.data, b2expected) - - -def test_write_block_accounts_for_two_chunk_copies(tmp_path: PathLike): - MEMORY_LIMIT = 4 * 1024**2 # 4MB - - # Core chunk array size ~3.4MB - # Padded chunk array size ~4.8MB - # -> Core chunk array fits within memory limit, but core chunk + padded chunk doesn't - PADDING = (2, 2) - SLICING_DIM = 0 - DTYPE = np.float32 - CORE_CHUNK_SHAPE = (10, 300, 300) - PADDED_CHUNK_SHAPE = ( - CORE_CHUNK_SHAPE[0] + PADDING[0] + PADDING[1], - CORE_CHUNK_SHAPE[1], - CORE_CHUNK_SHAPE[2], - ) - - # Create block to later write to the store - data = np.ones( - ( - CORE_CHUNK_SHAPE[0] // 2 + PADDING[0] + PADDING[1], - CORE_CHUNK_SHAPE[1], - CORE_CHUNK_SHAPE[2], - ), - dtype=DTYPE, - ) - block = DataSetBlock( - data=data, - aux_data=AuxiliaryData(np.ones(CORE_CHUNK_SHAPE[0], dtype=DTYPE)), - slicing_dim=SLICING_DIM, - global_shape=CORE_CHUNK_SHAPE, - chunk_shape=PADDED_CHUNK_SHAPE, - block_start=0, - chunk_start=0, - padding=PADDING, - ) - - # Create writer with memory limit - COMM = MPI.COMM_WORLD - writer = DataSetStoreWriter( - slicing_dim=SLICING_DIM, - comm=COMM, - temppath=tmp_path, - memory_limit_bytes=MEMORY_LIMIT, - ) - - # A 4MB limit on RAM is enough to hold the core chunk, but not enough to hold the core - # chunk + padded chunk. Therefore, a `MemoryError` should be raised + handled by the writer - # when the first block is attempted to be written. - # - # A `MemoryError` being raised and handled by the writer will result in an hdf5 file - # backing the store. Thus, checking if the writer is file-based will show if a - # `MemoryError` was handled or not. - writer.write_block(block) - assert writer.is_file_based - - # Execute finaliser to make sure the hdf5 file that should be backing the store is cleaned - # up (doing so avoids warnings in pytest output) - writer.finalize() diff --git a/tests/runner/test_task_runner.py b/tests/runner/test_task_runner.py index 77ae3ee36..b5c92aba1 100644 --- a/tests/runner/test_task_runner.py +++ b/tests/runner/test_task_runner.py @@ -10,6 +10,7 @@ from httomo.data.dataset_store import DataSetStoreWriter from httomo.runner.auxiliary_data import AuxiliaryData from httomo.runner.dataset import DataSetBlock +from httomo.runner.dataset_store_backing import DataSetStoreBacking from httomo.runner.methods_repository_interface import GpuMemoryRequirement from httomo.runner.monitoring_interface import MonitoringInterface from httomo.runner.output_ref import OutputRef @@ -334,6 +335,13 @@ def test_execute_section_calls_blockwise_execute_and_monitors( s = sectionize(p) mon = mocker.create_autospec(MonitoringInterface, instance=True) t = TaskRunner(p, reslice_dir=tmp_path, comm=MPI.COMM_WORLD, monitor=mon) + + # Patch the store backing calculator function to assume being backed by RAM + mocker.patch( + "httomo.runner.task_runner.determine_store_backing", + return_value=DataSetStoreBacking.RAM, + ) + t._prepare() # make that do nothing mocker.patch.object(t, "determine_max_slices") @@ -349,7 +357,7 @@ def mul_block_by_two(section, block: DataSetBlock): ) s[0].is_last = False # should setup a DataSetStoreWriter as sink - t._execute_section(s[0], 1) + t._execute_section(s[0]) assert isinstance(t.sink, DataSetStoreWriter) reader = t.sink.make_reader() data = reader.read_block(0, dummy_block.shape[0]) @@ -396,6 +404,11 @@ def test_does_reslice_when_needed_and_reports_time( mon = mocker.create_autospec(MonitoringInterface, instance=True) t = TaskRunner(p, reslice_dir=tmp_path, comm=MPI.COMM_WORLD, monitor=mon) + # Patch the store backing calculator function to assume being backed by RAM + mocker.patch( + "httomo.runner.task_runner.determine_store_backing", + return_value=DataSetStoreBacking.RAM, + ) t.execute() assert loader.pattern == Pattern.projection