From 95017b55f3074466596b9bbd0ed90a7b1e07951a Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Tue, 7 Apr 2026 12:51:30 +0100 Subject: [PATCH 1/5] Rename vars in `determine_store_backing()` function --- httomo/runner/dataset_store_backing.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index d67e7b4bc..893f6fe30 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -141,7 +141,7 @@ def determine_store_backing( reduce_decorator = _reduce_decorator_factory(comm) # Get chunk shape input to section - current_chunk_shape = calculate_section_chunk_shape( + input_chunk_shape = calculate_section_chunk_shape( comm=comm, global_shape=global_shape, slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, @@ -149,9 +149,10 @@ def determine_store_backing( ) # Get the number of bytes in the input chunk to the section w/ potential modifications to - # the non-slicing dims - current_chunk_bytes = calculate_section_chunk_bytes( - chunk_shape=current_chunk_shape, + # the non-slicing dims, to then determine the number of bytes in the output chunk written + # by the current section + output_chunk_bytes = calculate_section_chunk_bytes( + chunk_shape=input_chunk_shape, dtype=dtype, section=sections[section_idx], ) @@ -159,7 +160,7 @@ def determine_store_backing( if section_idx == len(sections) - 1: return reduce_decorator(_last_section_in_pipeline)( memory_limit_bytes=memory_limit_bytes, - write_chunk_bytes=current_chunk_bytes, + write_chunk_bytes=output_chunk_bytes, ) # Get chunk shape created by reader of section `n+1`, that will add padding to the @@ -173,6 +174,6 @@ def determine_store_backing( next_chunk_bytes = int(np.prod(next_chunk_shape) * np.dtype(dtype).itemsize) return reduce_decorator(_non_last_section_in_pipeline)( memory_limit_bytes=memory_limit_bytes, - write_chunk_bytes=current_chunk_bytes, + write_chunk_bytes=output_chunk_bytes, read_chunk_bytes=next_chunk_bytes, ) From c527e9c667266564cbc82ee6f64f2e98a560c6f9 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Tue, 7 Apr 2026 16:57:27 +0100 Subject: [PATCH 2/5] Use current section's padded chunk size in store backing calculation The padded shape of the next section's input chunk was being used, which is incorrect from the perspective of the order of operations that occur when setting up a section's source and sink in `_setup_source_sink()` in the task runner. Instead, it should be the padded shape of the current section's input chunk. --- httomo/runner/dataset_store_backing.py | 21 +++++--- tests/runner/test_dataset_store_backing.py | 63 +++++++++++----------- 2 files changed, 46 insertions(+), 38 deletions(-) diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index 893f6fe30..7a6720ce5 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -140,7 +140,8 @@ def determine_store_backing( ) -> DataSetStoreBacking: reduce_decorator = _reduce_decorator_factory(comm) - # Get chunk shape input to section + # Get unpadded chunk shape input to current section (for calculation of bytes in output + # chunk for the current section) input_chunk_shape = calculate_section_chunk_shape( comm=comm, global_shape=global_shape, @@ -163,17 +164,21 @@ def determine_store_backing( write_chunk_bytes=output_chunk_bytes, ) - # Get chunk shape created by reader of section `n+1`, that will add padding to the - # chunk shape written by the writer of section `n` - next_chunk_shape = calculate_section_chunk_shape( + # Get chunk shape created by reader of section `n` (the current section) that will account + # for padding. This chunk shape is based on the chunk shape written by the writer of + # section `n - 1` (the previous section) + padded_input_chunk_shape = calculate_section_chunk_shape( comm=comm, global_shape=global_shape, - slicing_dim=_get_slicing_dim(sections[section_idx + 1].pattern) - 1, - padding=determine_section_padding(sections[section_idx + 1]), + slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, + padding=determine_section_padding(sections[section_idx]), + ) + padded_input_chunk_bytes = int( + np.prod(padded_input_chunk_shape) * np.dtype(dtype).itemsize ) - next_chunk_bytes = int(np.prod(next_chunk_shape) * np.dtype(dtype).itemsize) + return reduce_decorator(_non_last_section_in_pipeline)( memory_limit_bytes=memory_limit_bytes, write_chunk_bytes=output_chunk_bytes, - read_chunk_bytes=next_chunk_bytes, + read_chunk_bytes=padded_input_chunk_bytes, ) diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py index 4dc9b77b6..ab4ecba0e 100644 --- a/tests/runner/test_dataset_store_backing.py +++ b/tests/runner/test_dataset_store_backing.py @@ -393,10 +393,10 @@ def test_determine_store_backing_non_last_section_pipeline_two_procs( @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (7 * 1024**2, DataSetStoreBacking.File), - (10 * 1024**2, DataSetStoreBacking.RAM), + (41 * 1024**2, DataSetStoreBacking.File), + (42 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["7MB-limit-file-backing", "10MB-limit-ram-backing"], + ids=["41MB-limit-file-backing", "42MB-limit-ram-backing"], ) def test_determine_store_backing_non_last_section_pipeline_large_padding_single_proc( mocker: MockerFixture, @@ -408,20 +408,21 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ # For a single process, chunk shape = global shape # # The dtype, shape, and padding combined makes: - # - the write chunk ~3.4MB - # - the read chunk ~5.7MB + # - the unpadded input chunk ~3.4MB (10 * 300 * 300 * 4 / (1024 ** 2)) + # - the padded input chunk ~37.7MB (110 * 300 * 300 * 4 / (1024 ** 2)) + # - the output chunk ~3.4MB (10 * 300 * 300 * 4 / (1024 ** 2)) DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) PADDING = (50, 50) # Define dummy loader and method wrapper objects loader = make_test_loader(mocker=mocker) - m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) - m2 = make_test_method( - mocker=mocker, method_name="m2", pattern=Pattern.sinogram, padding=True + m1 = make_test_method( + mocker=mocker, method_name="m1", pattern=Pattern.projection, padding=True ) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) mocker.patch.object( - target=m2, + target=m1, attribute="calculate_padding", return_value=PADDING, ) @@ -432,14 +433,15 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ methods=[m1, m2], ) sections = sectionize(pipeline) + print(sections) - # For execution of non-last sections in pipelines, the writer must take into account that a - # copy of the chunk is made by the reader of the following section. Therefore, two copies - # of the chunk must be taken into account when deciding the backing of the store. + # For execution of non-last sections which have non-zero padding, the reader creates a + # padded copy of the chunk that is made. Therefore, two copies of the chunk must be taken + # into account when deciding the backing of the store. # - # Note that section 0 is only the section that is "not the last section", so it's the only - # one that will need to account for two copies of the chunk, and thus the main target of - # the test. Hence, why `section_idx=0` is given. + # Note that section 0 is the only section of the two produced that is "not the last + # section", so it's the only one that will need to account for two copies of the chunk, and + # thus the main target of the test. Hence, why `section_idx=0` is given. store_backing = determine_store_backing( comm=COMM, sections=sections, @@ -458,10 +460,10 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_single_ @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ - (4 * 1024**2, DataSetStoreBacking.File), - (5 * 1024**2, DataSetStoreBacking.RAM), + (37 * 1024**2, DataSetStoreBacking.File), + (38 * 1024**2, DataSetStoreBacking.RAM), ], - ids=["4MB-limit-file-backing", "5MB-limit-ram-backing"], + ids=["37MB-limit-file-backing", "38MB-limit-ram-backing"], ) def test_determine_store_backing_non_last_section_pipeline_large_padding_two_procs( mocker: MockerFixture, @@ -473,20 +475,21 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_two_pro # For a single process, chunk shape = global shape # # The dtype, shape, and padding combined makes: - # - the write chunk ~1.7MB - # - the read chunk ~2.8MB + # - the unpadded input chunk ~1.7MB (5 * 300 * 300 * 4 / (1024 ** 2)) + # - the padded input chunk ~36.0MB (105 * 300 * 300 * 4 / (1024 ** 2)) + # - the output chunk ~1.7MB (5 * 300 * 300 * 4 / (1024 ** 2)) DTYPE = np.float32 GLOBAL_SHAPE = (10, 300, 300) PADDING = (50, 50) # Define dummy loader and method wrapper objects loader = make_test_loader(mocker=mocker) - m1 = make_test_method(mocker=mocker, method_name="m1", pattern=Pattern.projection) - m2 = make_test_method( - mocker=mocker, method_name="m2", pattern=Pattern.sinogram, padding=True + m1 = make_test_method( + mocker=mocker, method_name="m1", pattern=Pattern.projection, padding=True ) + m2 = make_test_method(mocker=mocker, method_name="m2", pattern=Pattern.sinogram) mocker.patch.object( - target=m2, + target=m1, attribute="calculate_padding", return_value=PADDING, ) @@ -498,13 +501,13 @@ def test_determine_store_backing_non_last_section_pipeline_large_padding_two_pro ) sections = sectionize(pipeline) - # For execution of non-last sections in pipelines, the writer must take into account that a - # copy of the chunk is made by the reader of the following section. Therefore, two copies - # of the chunk must be taken into account when deciding the backing of the store. + # For execution of non-last sections which have non-zero padding, the reader creates a + # padded copy of the chunk that is made. Therefore, two copies of the chunk must be taken + # into account when deciding the backing of the store. # - # Note that section 0 is only the section that is "not the last section", so it's the only - # one that will need to account for two copies of the chunk, and thus the main target of - # the test. Hence, why `section_idx=0` is given. + # Note that section 0 is the only section of the two produced that is "not the last + # section", so it's the only one that will need to account for two copies of the chunk, and + # thus the main target of the test. Hence, why `section_idx=0` is given. store_backing = determine_store_backing( comm=COMM, sections=sections, From 2fd8de7823546d42660ff22059a22e7f79263802 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Wed, 8 Apr 2026 09:58:31 +0100 Subject: [PATCH 3/5] Move padded input chunk size calculation before output chunk size calculation This is purely for the sake of making the order of the chunk size calculations in `determine_store_backing()` match the order of the allocations that would occur during the setup + processing of a section. --- httomo/runner/dataset_store_backing.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index 7a6720ce5..f6c6ea3ce 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -140,6 +140,19 @@ def determine_store_backing( ) -> DataSetStoreBacking: reduce_decorator = _reduce_decorator_factory(comm) + # Get chunk shape created by reader of section `n` (the current section) that will account + # for padding. This chunk shape is based on the chunk shape written by the writer of + # section `n - 1` (the previous section) + padded_input_chunk_shape = calculate_section_chunk_shape( + comm=comm, + global_shape=global_shape, + slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, + padding=determine_section_padding(sections[section_idx]), + ) + padded_input_chunk_bytes = int( + np.prod(padded_input_chunk_shape) * np.dtype(dtype).itemsize + ) + # Get unpadded chunk shape input to current section (for calculation of bytes in output # chunk for the current section) input_chunk_shape = calculate_section_chunk_shape( @@ -164,19 +177,6 @@ def determine_store_backing( write_chunk_bytes=output_chunk_bytes, ) - # Get chunk shape created by reader of section `n` (the current section) that will account - # for padding. This chunk shape is based on the chunk shape written by the writer of - # section `n - 1` (the previous section) - padded_input_chunk_shape = calculate_section_chunk_shape( - comm=comm, - global_shape=global_shape, - slicing_dim=_get_slicing_dim(sections[section_idx].pattern) - 1, - padding=determine_section_padding(sections[section_idx]), - ) - padded_input_chunk_bytes = int( - np.prod(padded_input_chunk_shape) * np.dtype(dtype).itemsize - ) - return reduce_decorator(_non_last_section_in_pipeline)( memory_limit_bytes=memory_limit_bytes, write_chunk_bytes=output_chunk_bytes, From a053c773ab0f3f2f30706a1eb1854f4d2c5a159d Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Wed, 8 Apr 2026 11:10:12 +0100 Subject: [PATCH 4/5] Remove store backing calculation for last section For the last section: - the input chunk's backing (RAM or hdf5 file) is purely determined by the output of the previous section - the output chunk is not written anywhere (the dummy sink simply discards the blocks given to it) Furthermore, the dummy sink doesn't take a type of backing as a parameter (and if it did, it's unclear what it could do with that information, given that the dummy sink simply discards the input data). Therefore, there's no need to have any calculation of the backing of the store for the last section's writing. --- httomo/runner/dataset_store_backing.py | 19 ---- httomo/runner/task_runner.py | 17 ++-- tests/runner/test_dataset_store_backing.py | 102 --------------------- 3 files changed, 8 insertions(+), 130 deletions(-) diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index f6c6ea3ce..4c1454c90 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -117,19 +117,6 @@ def _non_last_section_in_pipeline( return DataSetStoreBacking.RAM -def _last_section_in_pipeline( - memory_limit_bytes: int, - write_chunk_bytes: int, -) -> DataSetStoreBacking: - """ - Calculate backing of dataset store for last section in pipeline - """ - if memory_limit_bytes > 0 and write_chunk_bytes >= memory_limit_bytes: - return DataSetStoreBacking.File - - return DataSetStoreBacking.RAM - - def determine_store_backing( comm: MPI.Comm, sections: List[Section], @@ -171,12 +158,6 @@ def determine_store_backing( section=sections[section_idx], ) - if section_idx == len(sections) - 1: - return reduce_decorator(_last_section_in_pipeline)( - memory_limit_bytes=memory_limit_bytes, - write_chunk_bytes=output_chunk_bytes, - ) - return reduce_decorator(_non_last_section_in_pipeline)( memory_limit_bytes=memory_limit_bytes, write_chunk_bytes=output_chunk_bytes, diff --git a/httomo/runner/task_runner.py b/httomo/runner/task_runner.py index 04d31f45b..af90c976c 100644 --- a/httomo/runner/task_runner.py +++ b/httomo/runner/task_runner.py @@ -235,19 +235,18 @@ def _setup_source_sink(self, section: Section, idx: int): self.source.finalize() self.source = new_source - store_backing = determine_store_backing( - comm=self.comm, - sections=self._sections, - memory_limit_bytes=self._memory_limit_bytes, - dtype=self.source.dtype, - global_shape=self.source.global_shape, - section_idx=idx, - ) - if section.is_last: # we don't need to store the results - this sink just discards it self.sink = DummySink(slicing_dim_section) else: + store_backing = determine_store_backing( + comm=self.comm, + sections=self._sections, + memory_limit_bytes=self._memory_limit_bytes, + dtype=self.source.dtype, + global_shape=self.source.global_shape, + section_idx=idx, + ) self.sink = DataSetStoreWriter( slicing_dim_section, self.comm, diff --git a/tests/runner/test_dataset_store_backing.py b/tests/runner/test_dataset_store_backing.py index ab4ecba0e..354545f09 100644 --- a/tests/runner/test_dataset_store_backing.py +++ b/tests/runner/test_dataset_store_backing.py @@ -178,108 +178,6 @@ def test_calculate_section_chunk_bytes_output_dims_change_and_swap( assert section_output_chunk_bytes == EXPECTED_SECTION_OUTPUT_CHUNK_BYTES -@pytest.mark.parametrize( - "memory_limit, expected_store_backing", - [ - (3 * 1024**2, DataSetStoreBacking.File), - (4 * 1024**2, DataSetStoreBacking.RAM), - ], - ids=["3MB-limit-file-backing", "4MB-limit-ram-backing"], -) -def test_determine_store_backing_last_section_pipeline_single_proc( - mocker: MockerFixture, - memory_limit: int, - expected_store_backing: DataSetStoreBacking, -): - COMM = MPI.COMM_WORLD - - # For a single process, chunk shape = global shape - # - # The dtype and shape combined makes: - # - the write chunk ~3.4MB - # - the read chunk also ~3.4MB - DTYPE = np.float32 - GLOBAL_SHAPE = (10, 300, 300) - - # Define dummy loader and method wrapper objects - loader = make_test_loader(mocker=mocker) - method = make_test_method( - mocker=mocker, method_name="method", pattern=Pattern.projection - ) - - # Get list of section objects that represent pipeline - pipeline = Pipeline( - loader=loader, - methods=[method], - ) - sections = sectionize(pipeline) - - # Based on memory limit and the given section in the pipeline, determine the backing of the - # store for the execution of that section - store_backing = determine_store_backing( - comm=COMM, - sections=sections, - memory_limit_bytes=memory_limit, - dtype=DTYPE, - global_shape=GLOBAL_SHAPE, - section_idx=0, - ) - assert store_backing is expected_store_backing - - -@pytest.mark.mpi -@pytest.mark.skipif( - MPI.COMM_WORLD.size != 2, reason="Only rank-2 MPI is supported with this test" -) -@pytest.mark.parametrize( - "memory_limit, expected_store_backing", - [ - (1 * 1024**2, DataSetStoreBacking.File), - (2 * 1024**2, DataSetStoreBacking.RAM), - ], - ids=["1MB-limit-file-backing", "2MB-limit-ram-backing"], -) -def test_determine_store_backing_last_section_pipeline_two_procs( - mocker: MockerFixture, - memory_limit: int, - expected_store_backing: DataSetStoreBacking, -): - COMM = MPI.COMM_WORLD - - # For two processes, chunk shape = half of global shape - # - # The dtype and shape combined makes: - # - the write chunk ~1.7MB - # - the read chunk also ~1.7MB - DTYPE = np.float32 - GLOBAL_SHAPE = (10, 300, 300) - - # Define dummy loader and method wrapper objects - loader = make_test_loader(mocker=mocker) - method = make_test_method( - mocker=mocker, method_name="method", pattern=Pattern.projection - ) - - # Get list of section objects that represent pipeline - pipeline = Pipeline( - loader=loader, - methods=[method], - ) - sections = sectionize(pipeline) - - # Based on memory limit and the given section in the pipeline, determine the backing of the - # store for the execution of that section - store_backing = determine_store_backing( - comm=COMM, - sections=sections, - memory_limit_bytes=memory_limit, - dtype=DTYPE, - global_shape=GLOBAL_SHAPE, - section_idx=0, - ) - assert store_backing is expected_store_backing - - @pytest.mark.parametrize( "memory_limit, expected_store_backing", [ From ac1e96e4cb18f72bf59c38afa1c0414b90b12117 Mon Sep 17 00:00:00 2001 From: Yousef Moazzam Date: Wed, 8 Apr 2026 14:21:25 +0100 Subject: [PATCH 5/5] Remove reduce decorator factory in dataset store backing calculator Due to the previous commit, there is now only one dataset store backing calculator function that needs a reduction operation performed, so there is no need to have a generic way to produce a decorator that can perform the reduction on the result of different functions. --- httomo/runner/dataset_store_backing.py | 65 +++++++------------------- 1 file changed, 16 insertions(+), 49 deletions(-) diff --git a/httomo/runner/dataset_store_backing.py b/httomo/runner/dataset_store_backing.py index 4c1454c90..b150fe553 100644 --- a/httomo/runner/dataset_store_backing.py +++ b/httomo/runner/dataset_store_backing.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import Callable, List, ParamSpec, Tuple +from typing import List, Tuple import numpy as np from numpy.typing import DTypeLike @@ -56,50 +56,6 @@ class DataSetStoreBacking(Enum): File = 2 -P = ParamSpec("P") - - -def _reduce_decorator_factory( - comm: MPI.Comm, -) -> Callable[[Callable[P, DataSetStoreBacking]], Callable[P, DataSetStoreBacking]]: - """ - Generate decorator for store-backing calculator function that will use the given MPI - communicator for the reduce operation. - """ - - def reduce_decorator( - func: Callable[P, DataSetStoreBacking], - ) -> Callable[P, DataSetStoreBacking]: - """ - Decorator for store-backing calculator function. - """ - - def wrapper(*args: P.args, **kwargs: P.kwargs) -> DataSetStoreBacking: - """ - Perform store-backing calculation across all MPI processes and reduce. - """ - # reduce store backing enum variant across all processes - if any has - # `File` variant, all should use a file - send_buffer = np.zeros(1, dtype=bool) - recv_buffer = np.zeros(1, dtype=bool) - store_backing = func(*args, **kwargs) - - if store_backing is DataSetStoreBacking.File: - send_buffer[0] = True - - # do a logical or of all the enum variants across the processes - comm.Allreduce([send_buffer, MPI.BOOL], [recv_buffer, MPI.BOOL], MPI.LOR) - - if bool(recv_buffer[0]) is True: - return DataSetStoreBacking.File - - return DataSetStoreBacking.RAM - - return wrapper - - return reduce_decorator - - def _non_last_section_in_pipeline( memory_limit_bytes: int, write_chunk_bytes: int, @@ -125,8 +81,6 @@ def determine_store_backing( global_shape: Tuple[int, int, int], section_idx: int, ) -> DataSetStoreBacking: - reduce_decorator = _reduce_decorator_factory(comm) - # Get chunk shape created by reader of section `n` (the current section) that will account # for padding. This chunk shape is based on the chunk shape written by the writer of # section `n - 1` (the previous section) @@ -158,8 +112,21 @@ def determine_store_backing( section=sections[section_idx], ) - return reduce_decorator(_non_last_section_in_pipeline)( + send_buffer = np.zeros(1, dtype=bool) + recv_buffer = np.zeros(1, dtype=bool) + store_backing = _non_last_section_in_pipeline( memory_limit_bytes=memory_limit_bytes, - write_chunk_bytes=output_chunk_bytes, read_chunk_bytes=padded_input_chunk_bytes, + write_chunk_bytes=output_chunk_bytes, ) + + if store_backing is DataSetStoreBacking.File: + send_buffer[0] = True + + # do a logical OR of all the enum variants across the processes + comm.Allreduce([send_buffer, MPI.BOOL], [recv_buffer, MPI.BOOL], MPI.LOR) + + if bool(recv_buffer[0]) is True: + return DataSetStoreBacking.File + + return DataSetStoreBacking.RAM