diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index d67e7b4bc..b150fe553 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Callable, List, ParamSpec, Tuple +from typing import List, Tuple import numpy as np from numpy.typing import DTypeLike @@ -56,50 +56,6 @@ class DataSetStoreBacking(Enum): File = 2 -P = ParamSpec("P") - - -def _reduce_decorator_factory( - comm: MPI.Comm, -) -> Callable[[Callable[P, DataSetStoreBacking]], Callable[P, DataSetStoreBacking]]: - """ - Generate decorator for store-backing calculator function that will use the given MPI - communicator for the reduce operation. - """ - - def reduce_decorator( - func: Callable[P, DataSetStoreBacking], - ) -> Callable[P, DataSetStoreBacking]: - """ - Decorator for store-backing calculator function. - """ - - def wrapper(*args: P.args, **kwargs: P.kwargs) -> DataSetStoreBacking: - """ - Perform store-backing calculation across all MPI processes and reduce. - """ - # reduce store backing enum variant across all processes - if any has - # `File` variant, all should use a file - send_buffer = np.zeros(1, dtype=bool) - recv_buffer = np.zeros(1, dtype=bool) - store_backing = func(*args, **kwargs) - - if store_backing is DataSetStoreBacking.File: - send_buffer[0] = True - - # do a logical or of all the enum variants across the processes - comm.Allreduce([send_buffer, MPI.BOOL], [recv_buffer, MPI.BOOL], MPI.LOR) - - if bool(recv_buffer[0]) is True: - return DataSetStoreBacking.File - - return DataSetStoreBacking.RAM - - return wrapper - - return reduce_decorator - - def _non_last_section_in_pipeline( memory_limit_bytes: int, write_chunk_bytes: int, @@ -117,19 +73,6 @@ def _non_last_section_in_pipeline( return DataSetStoreBacking.RAM -def _last_section_in_pipeline( - memory_limit_bytes: int, - write_chunk_bytes: int, -) -> DataSetStoreBacking: - """ - Calculate backing of dataset store for last section in pipeline - """ - if memory_limit_bytes > 0 and write_chunk_bytes >= memory_limit_bytes: - return DataSetStoreBacking.File - - return DataSetStoreBacking.RAM - - def determine_store_backing( comm: MPI.Comm, sections: List[Section], @@ -138,10 +81,22 @@ def determine_store_backing( global_shape: Tuple[int, int, int], section_idx: int, ) -> DataSetStoreBacking: - reduce_decorator = _reduce_decorator_factory(comm) + # Get chunk shape created by reader of section `n` (the current section) that will account + # for padding. This chunk shape is based on the chunk shape written by the writer of + # section `n - 1` (the previous section) + padded_input_chunk_shape = calculate_section_chunk_shape( + comm=comm, + global_shape=global_shape, + slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, + padding=determine_section_padding(sections[section_idx]), + ) + padded_input_chunk_bytes = int( + np.prod(padded_input_chunk_shape) * np.dtype(dtype).itemsize + ) - # Get chunk shape input to section - current_chunk_shape = calculate_section_chunk_shape( + # Get unpadded chunk shape input to current section (for calculation of bytes in output + # chunk for the current section) + input_chunk_shape = calculate_section_chunk_shape( comm=comm, global_shape=global_shape, slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, @@ -149,30 +104,29 @@ def determine_store_backing( ) # Get the number of bytes in the input chunk to the section w/ potential modifications to - # the non-slicing dims - current_chunk_bytes = calculate_section_chunk_bytes( - chunk_shape=current_chunk_shape, + # the non-slicing dims, to then determine the number of bytes in the output chunk written + # by the current section + output_chunk_bytes = calculate_section_chunk_bytes( + chunk_shape=input_chunk_shape, dtype=dtype, section=sections[section_idx], ) - if section_idx == len(sections) - 1: - return reduce_decorator(_last_section_in_pipeline)( - memory_limit_bytes=memory_limit_bytes, - write_chunk_bytes=current_chunk_bytes, - ) - - # Get chunk shape created by reader of section `n+1`, that will add padding to the - # chunk shape written by the writer of section `n` - next_chunk_shape = calculate_section_chunk_shape( - comm=comm, - global_shape=global_shape, - slicing_dim=_get_slicing_dim(sections[section_idx + 1].pattern) - 1, - padding=determine_section_padding(sections[section_idx + 1]), - ) - next_chunk_bytes = int(np.prod(next_chunk_shape) * np.dtype(dtype).itemsize) - return reduce_decorator(_non_last_section_in_pipeline)( + send_buffer = np.zeros(1, dtype=bool) + recv_buffer = np.zeros(1, dtype=bool) + store_backing = _non_last_section_in_pipeline( memory_limit_bytes=memory_limit_bytes, - write_chunk_bytes=current_chunk_bytes, - read_chunk_bytes=next_chunk_bytes, + read_chunk_bytes=padded_input_chunk_bytes, + write_chunk_bytes=output_chunk_bytes, ) + + if store_backing is DataSetStoreBacking.File: + send_buffer[0] = True + + # do a logical OR of all the enum variants across the processes + comm.Allreduce([send_buffer, MPI.BOOL], [recv_buffer, MPI.BOOL], MPI.LOR) + + if bool(recv_buffer[0]) is True: + return DataSetStoreBacking.File + + return DataSetStoreBacking.RAM diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 04d31f45b..af90c976c 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -235,19 +235,18 @@ def _setup_source_sink(self, section: Section, idx: int): self.source.finalize() self.source = new_source - store_backing = determine_store_backing( - comm=self.comm, - sections=self._sections, - memory_limit_bytes=self._memory_limit_bytes, - dtype=self.source.dtype, - global_shape=self.source.global_shape, - section_idx=idx, - ) - if section.is_last: # we don't need to store the results - this sink just discards it self.sink = DummySink(slicing_dim_section) else: + store_backing = determine_store_backing( + comm=self.comm, + sections=self._sections, + memory_limit_bytes=self._memory_limit_bytes, + dtype=self.source.dtype, + global_shape=self.source.global_shape, + section_idx=idx, + ) self.sink = DataSetStoreWriter( slicing_dim_section, self.comm, diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py index 4dc9b77b6..354545f09 100644 --- a/tests/runner/test_dataset_store_backing.py +++ b/tests/runner/test_dataset_store_backing.py @@ -178,108 +178,6 @@ def test_calculate_section_chunk_bytes_output_dims_change_and_swap( assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES -@pytest.mark.parametrize( - "memory_limit, expected_store_backing", - [ - (3 * 1024**2, DataSetStoreBacking.File), - (4 * 1024**2, DataSetStoreBacking.RAM), - ], - ids=["3MB-limit-file-backing", "4MB-limit-ram-backing"], -) -def test_determine_store_backing_last_section_pipeline_single_proc( - mocker: MockerFixture, - memory_limit: int, - expected_store_backing: DataSetStoreBacking, -): - COMM = MPI.COMM_WORLD - - # For a single process, chunk shape = global shape - # - # The dtype and shape combined makes: - # - the write chunk ~3.4MB - # - the read chunk also ~3.4MB - DTYPE = np.float32 - GLOBAL_SHAPE = (10, 300, 300) - - # Define dummy loader and method wrapper objects - loader = make_test_loader(mocker=mocker) - method = make_test_method( - mocker=mocker, method_name="method", pattern=Pattern.projection - ) - - # Get list of section objects that represent pipeline - pipeline = Pipeline( - loader=loader, - methods=[method], - ) - sections = sectionize(pipeline) - - # Based on memory limit and the given section in the pipeline, determine the backing of the - # store for the execution of that section - store_backing = determine_store_backing( - comm=COMM, - sections=sections, - memory_limit_bytes=memory_limit, - dtype=DTYPE, - global_shape=GLOBAL_SHAPE, - section_idx=0, - ) - assert store_backing is expected_store_backing - - -@pytest.mark.mpi -@pytest.mark.skipif( - MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" -) -@pytest.mark.parametrize( - "memory_limit, expected_store_backing", - [ - (1 * 1024**2, DataSetStoreBacking.File), - (2 * 1024**2, DataSetStoreBacking.RAM), - ], - ids=["1MB-limit-file-backing", "2MB-limit-ram-backing"], -) -def test_determine_store_backing_last_section_pipeline_two_procs( - mocker: MockerFixture, - memory_limit: int, - expected_store_backing: DataSetStoreBacking, -): - COMM = MPI.COMM_WORLD - - # For two processes, chunk shape = half of global shape - # - # The dtype and shape combined makes: - # - the write chunk ~1.7MB - # - the read chunk also ~1.7MB - DTYPE = np.float32 - GLOBAL_SHAPE = (10, 300, 300) - - # Define dummy loader and method wrapper objects - loader = make_test_loader(mocker=mocker) - method = make_test_method( - mocker=mocker, method_name="method", pattern=Pattern.projection - ) - - # Get list of section objects that represent pipeline - pipeline = Pipeline( - loader=loader, - methods=[method], - ) - sections = sectionize(pipeline) - - # Based on memory limit and the given section in the pipeline, determine the backing of the - # store for the execution of that section - store_backing = determine_store_backing( - comm=COMM, - sections=sections, - memory_limit_bytes=memory_limit, - dtype=DTYPE, - global_shape=GLOBAL_SHAPE, - section_idx=0, - ) - assert store_backing is expected_store_backing - - @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ @@ -393,10 +291,10 @@ def test_determine_store_backing_non_last_section_pipeline_two_procs( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (7 * 1024**2, DataSetStoreBacking.File), - (10 * 1024**2, DataSetStoreBacking.RAM), + (41 * 1024**2, DataSetStoreBacking.File), + (42 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["7MB-limit-file-backing", "10MB-limit-ram-backing"], + ids=["41MB-limit-file-backing", "42MB-limit-ram-backing"], ) def test_determine_store_backing_non_last_section_pipeline_large_padding_single_proc( mocker: MockerFixture, @@ -408,20 +306,21 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ # For a single process, chunk shape = global shape # # The dtype, shape, and padding combined makes: - # - the write chunk ~3.4MB - # - the read chunk ~5.7MB + # - the unpadded input chunk ~3.4MB (10 * 300 * 300 * 4 / (1024 ** 2)) + # - the padded input chunk ~37.7MB (110 * 300 * 300 * 4 / (1024 ** 2)) + # - the output chunk ~3.4MB (10 * 300 * 300 * 4 / (1024 ** 2)) DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) PADDING = (50, 50) # Define dummy loader and method wrapper objects loader = make_test_loader(mocker=mocker) - m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) - m2 = make_test_method( - mocker=mocker, method_name="m2", pattern=Pattern.sinogram, padding=True + m1 = make_test_method( + mocker=mocker, method_name="m1", pattern=Pattern.projection, padding=True ) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) mocker.patch.object( - target=m2, + target=m1, attribute="calculate_padding", return_value=PADDING, ) @@ -432,14 +331,15 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ methods=[m1, m2], ) sections = sectionize(pipeline) + print(sections) - # For execution of non-last sections in pipelines, the writer must take into account that a - # copy of the chunk is made by the reader of the following section. Therefore, two copies - # of the chunk must be taken into account when deciding the backing of the store. + # For execution of non-last sections which have non-zero padding, the reader creates a + # padded copy of the chunk that is made. Therefore, two copies of the chunk must be taken + # into account when deciding the backing of the store. # - # Note that section 0 is only the section that is "not the last section", so it's the only - # one that will need to account for two copies of the chunk, and thus the main target of - # the test. Hence, why `section_idx=0` is given. + # Note that section 0 is the only section of the two produced that is "not the last + # section", so it's the only one that will need to account for two copies of the chunk, and + # thus the main target of the test. Hence, why `section_idx=0` is given. store_backing = determine_store_backing( comm=COMM, sections=sections, @@ -458,10 +358,10 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (4 * 1024**2, DataSetStoreBacking.File), - (5 * 1024**2, DataSetStoreBacking.RAM), + (37 * 1024**2, DataSetStoreBacking.File), + (38 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["4MB-limit-file-backing", "5MB-limit-ram-backing"], + ids=["37MB-limit-file-backing", "38MB-limit-ram-backing"], ) def test_determine_store_backing_non_last_section_pipeline_large_padding_two_procs( mocker: MockerFixture, @@ -473,20 +373,21 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_two_pro # For a single process, chunk shape = global shape # # The dtype, shape, and padding combined makes: - # - the write chunk ~1.7MB - # - the read chunk ~2.8MB + # - the unpadded input chunk ~1.7MB (5 * 300 * 300 * 4 / (1024 ** 2)) + # - the padded input chunk ~36.0MB (105 * 300 * 300 * 4 / (1024 ** 2)) + # - the output chunk ~1.7MB (5 * 300 * 300 * 4 / (1024 ** 2)) DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) PADDING = (50, 50) # Define dummy loader and method wrapper objects loader = make_test_loader(mocker=mocker) - m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) - m2 = make_test_method( - mocker=mocker, method_name="m2", pattern=Pattern.sinogram, padding=True + m1 = make_test_method( + mocker=mocker, method_name="m1", pattern=Pattern.projection, padding=True ) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) mocker.patch.object( - target=m2, + target=m1, attribute="calculate_padding", return_value=PADDING, ) @@ -498,13 +399,13 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_two_pro ) sections = sectionize(pipeline) - # For execution of non-last sections in pipelines, the writer must take into account that a - # copy of the chunk is made by the reader of the following section. Therefore, two copies - # of the chunk must be taken into account when deciding the backing of the store. + # For execution of non-last sections which have non-zero padding, the reader creates a + # padded copy of the chunk that is made. Therefore, two copies of the chunk must be taken + # into account when deciding the backing of the store. # - # Note that section 0 is only the section that is "not the last section", so it's the only - # one that will need to account for two copies of the chunk, and thus the main target of - # the test. Hence, why `section_idx=0` is given. + # Note that section 0 is the only section of the two produced that is "not the last + # section", so it's the only one that will need to account for two copies of the chunk, and + # thus the main target of the test. Hence, why `section_idx=0` is given. store_backing = determine_store_backing( comm=COMM, sections=sections,