From d97bd658d041822fddc9b5d55bf9a2225cf6afaf Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 12 Mar 2026 12:26:42 -0400 Subject: [PATCH 01/23] fix: add shutdown methods to executors and fix lithops memory leak Add explicit shutdown() to SerialExecutor and DaskDelayedExecutor that clears tracked futures. Enhance LithopsEagerFunctionExecutor.shutdown() to clear cached _call_output on ResponseFutures before closing, preventing memory accumulation across repeated map() calls. Add parametrized tests. Co-Authored-By: Claude Opus 4.6 --- virtualizarr/parallel.py | 14 ++++++ virtualizarr/tests/test_parallel.py | 74 ++++++++++++++++++++++++++++- 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/virtualizarr/parallel.py b/virtualizarr/parallel.py index 7341261d..7deeeebc 100644 --- a/virtualizarr/parallel.py +++ b/virtualizarr/parallel.py @@ -137,6 +137,9 @@ def map( """ return map(fn, *iterables) + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: + self._futures.clear() + class DaskDelayedExecutor(Executor): """ @@ -230,6 +233,9 @@ def map( # Compute all tasks return iter(dask.compute(*delayed_tasks)) + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: + self._futures.clear() + class LithopsEagerFunctionExecutor(Executor): """ @@ -372,4 +378,12 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: wait Whether to wait for pending futures. """ + # Free cached results from lithops ResponseFuture objects before shutdown. + # lithops.FunctionExecutor.futures is never cleared internally — each map() + # call extends it with new ResponseFutures that cache deserialized results + # in _call_output. Without this, memory accumulates across repeated calls. + for f in self.lithops_client.futures: + f._call_output = None + self.lithops_client.futures.clear() + self._futures.clear() self.lithops_client.__exit__(None, None, None) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 13c5fe03..4a9f604a 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -2,8 +2,13 @@ import pytest -from virtualizarr.parallel import LithopsEagerFunctionExecutor, get_executor -from virtualizarr.tests import requires_lithops +from virtualizarr.parallel import ( + DaskDelayedExecutor, + LithopsEagerFunctionExecutor, + SerialExecutor, + get_executor, +) +from virtualizarr.tests import requires_dask, requires_lithops @requires_lithops @@ -41,3 +46,68 @@ def test_get_executor_process_pool_mode(): assert ctx is not None, "Expected executor to have a multiprocessing context" assert ctx.get_start_method() == "forkserver" + + +def _make_executor(executor_cls): + """Create a pytest param for an executor class with appropriate marks.""" + marks = { + "DaskDelayedExecutor": [requires_dask], + "LithopsEagerFunctionExecutor": [requires_lithops], + } + return pytest.param( + executor_cls, + id=executor_cls.__name__, + marks=marks.get(executor_cls.__name__, []), + ) + + +ALL_EXECUTORS = [ + _make_executor(SerialExecutor), + _make_executor(DaskDelayedExecutor), + _make_executor(LithopsEagerFunctionExecutor), +] + + +@pytest.mark.parametrize("executor_cls", ALL_EXECUTORS) +class TestExecutorShutdown: + def test_shutdown_clears_futures(self, executor_cls): + with executor_cls() as executor: + executor.submit(lambda: 42) + executor.submit(lambda: 99) + assert len(executor._futures) == 2 + + assert len(executor._futures) == 0 + + def test_shutdown_via_context_manager(self, executor_cls): + with executor_cls() as executor: + executor.submit(lambda: 42) + assert len(executor._futures) == 1 + + assert len(executor._futures) == 0 + + def test_shutdown_idempotent(self, executor_cls): + executor = executor_cls() + executor.submit(lambda: 1) + executor.shutdown() + executor.shutdown() + assert len(executor._futures) == 0 + + +@requires_lithops +class TestLithopsExecutorShutdownSpecific: + def test_shutdown_clears_lithops_client_futures(self): + executor = LithopsEagerFunctionExecutor() + executor.submit(lambda: 42) + + executor.shutdown() + assert len(executor.lithops_client.futures) == 0 + + def test_shutdown_clears_lithops_cached_results(self): + """Verify that shutdown clears _call_output on lithops ResponseFutures.""" + with LithopsEagerFunctionExecutor() as executor: + executor.map(lambda x: x * 2, (1, 2, 3)) + lithops_futures = list(executor.lithops_client.futures) + assert len(lithops_futures) > 0 + + # After shutdown, lithops futures list should be cleared + assert len(executor.lithops_client.futures) == 0 From 4f4c06111d7b678a663b6168e0c31af5c47965ac Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:27:15 -0400 Subject: [PATCH 02/23] Add tests and constrain fix only to lithops --- virtualizarr/parallel.py | 13 ++-- virtualizarr/tests/test_parallel.py | 113 ++++++++++++++++++---------- 2 files changed, 80 insertions(+), 46 deletions(-) diff --git a/virtualizarr/parallel.py b/virtualizarr/parallel.py index 7deeeebc..71d0b786 100644 --- a/virtualizarr/parallel.py +++ b/virtualizarr/parallel.py @@ -1,3 +1,4 @@ +import atexit import inspect import multiprocessing as mp import warnings @@ -137,9 +138,6 @@ def map( """ return map(fn, *iterables) - def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: - self._futures.clear() - class DaskDelayedExecutor(Executor): """ @@ -233,9 +231,6 @@ def map( # Compute all tasks return iter(dask.compute(*delayed_tasks)) - def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: - self._futures.clear() - class LithopsEagerFunctionExecutor(Executor): """ @@ -386,4 +381,10 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: f._call_output = None self.lithops_client.futures.clear() self._futures.clear() + + # Lithops registers self.clean as an atexit handler (executors.py __init__), + # which prevents the FunctionExecutor from ever being garbage collected. + # Unregister it so the executor can be freed after shutdown. + atexit.unregister(self.lithops_client.clean) + self.lithops_client.__exit__(None, None, None) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 4a9f604a..d592a017 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -1,4 +1,6 @@ +import gc import multiprocessing as mp +import weakref import pytest @@ -48,6 +50,26 @@ def test_get_executor_process_pool_mode(): assert ctx.get_start_method() == "forkserver" +@requires_lithops +class TestLithopsExecutorShutdown: + def test_shutdown_clears_lithops_client_futures(self): + executor = LithopsEagerFunctionExecutor() + executor.submit(lambda: 42) + + executor.shutdown() + assert len(executor.lithops_client.futures) == 0 + + def test_shutdown_clears_lithops_cached_results(self): + """Verify that shutdown clears _call_output on lithops ResponseFutures.""" + with LithopsEagerFunctionExecutor() as executor: + executor.map(lambda x: x * 2, (1, 2, 3)) + lithops_futures = list(executor.lithops_client.futures) + assert len(lithops_futures) > 0 + + # After shutdown, lithops futures list should be cleared + assert len(executor.lithops_client.futures) == 0 + + def _make_executor(executor_cls): """Create a pytest param for an executor class with appropriate marks.""" marks = { @@ -69,45 +91,56 @@ def _make_executor(executor_cls): @pytest.mark.parametrize("executor_cls", ALL_EXECUTORS) -class TestExecutorShutdown: - def test_shutdown_clears_futures(self, executor_cls): - with executor_cls() as executor: - executor.submit(lambda: 42) - executor.submit(lambda: 99) - assert len(executor._futures) == 2 +class TestExecutorMemory: + def test_executor_does_not_leak_after_context_manager(self, executor_cls): + """Executor and its futures should be GC-collectable after the with block.""" - assert len(executor._futures) == 0 - - def test_shutdown_via_context_manager(self, executor_cls): with executor_cls() as executor: - executor.submit(lambda: 42) - assert len(executor._futures) == 1 - - assert len(executor._futures) == 0 - - def test_shutdown_idempotent(self, executor_cls): - executor = executor_cls() - executor.submit(lambda: 1) - executor.shutdown() - executor.shutdown() - assert len(executor._futures) == 0 - - -@requires_lithops -class TestLithopsExecutorShutdownSpecific: - def test_shutdown_clears_lithops_client_futures(self): - executor = LithopsEagerFunctionExecutor() - executor.submit(lambda: 42) - - executor.shutdown() - assert len(executor.lithops_client.futures) == 0 - - def test_shutdown_clears_lithops_cached_results(self): - """Verify that shutdown clears _call_output on lithops ResponseFutures.""" - with LithopsEagerFunctionExecutor() as executor: - executor.map(lambda x: x * 2, (1, 2, 3)) - lithops_futures = list(executor.lithops_client.futures) - assert len(lithops_futures) > 0 - - # After shutdown, lithops futures list should be cleared - assert len(executor.lithops_client.futures) == 0 + # Use map() since lithops call_async requires a data argument + list(executor.map(lambda x: x * 2, range(5))) + ref = weakref.ref(executor) + + # Drop the only local reference to the executor + del executor + gc.collect() + + assert ref() is None, ( + f"{executor_cls.__name__} was not garbage collected after shutdown" + ) + + def test_repeated_executor_use_does_not_grow_memory(self, executor_cls): + """Memory should not grow when creating and destroying executors repeatedly.""" + import tracemalloc + + def _run_once(): + with executor_cls() as executor: + # Use map() to produce non-trivial results + return list(executor.map(lambda x: list(range(10_000)), range(5))) + + # Warm up (first run may allocate caches, import modules, etc.) + _run_once() + gc.collect() + + # Measure baseline: peak memory from a single run + tracemalloc.start() + _run_once() + gc.collect() + _, baseline_peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + + # Now run many iterations and check peak doesn't grow + tracemalloc.start() + n_iterations = 10 + for _ in range(n_iterations): + _run_once() + gc.collect() + _, multi_peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + + # If memory leaks, peak will scale with n_iterations. + # Allow 1.2x the single-run peak to account for GC timing jitter. + assert multi_peak < 1.2 * baseline_peak, ( + f"{executor_cls.__name__} leaked memory: single run peak " + f"{baseline_peak / 1024:.0f} KB, {n_iterations} runs peak " + f"{multi_peak / 1024:.0f} KB" + ) From d925fb7fed0b7659f39313908cc7c5d7dd6865f6 Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 12 Mar 2026 15:35:28 -0400 Subject: [PATCH 03/23] Clean up claudes horrible tests --- virtualizarr/parallel.py | 6 +++ virtualizarr/tests/test_parallel.py | 60 ++++++++++++----------------- 2 files changed, 30 insertions(+), 36 deletions(-) diff --git a/virtualizarr/parallel.py b/virtualizarr/parallel.py index 71d0b786..7c728ef5 100644 --- a/virtualizarr/parallel.py +++ b/virtualizarr/parallel.py @@ -138,6 +138,9 @@ def map( """ return map(fn, *iterables) + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: + self._futures.clear() + class DaskDelayedExecutor(Executor): """ @@ -231,6 +234,9 @@ def map( # Compute all tasks return iter(dask.compute(*delayed_tasks)) + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: + self._futures.clear() + class LithopsEagerFunctionExecutor(Executor): """ diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index d592a017..542d4ae4 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -50,26 +50,6 @@ def test_get_executor_process_pool_mode(): assert ctx.get_start_method() == "forkserver" -@requires_lithops -class TestLithopsExecutorShutdown: - def test_shutdown_clears_lithops_client_futures(self): - executor = LithopsEagerFunctionExecutor() - executor.submit(lambda: 42) - - executor.shutdown() - assert len(executor.lithops_client.futures) == 0 - - def test_shutdown_clears_lithops_cached_results(self): - """Verify that shutdown clears _call_output on lithops ResponseFutures.""" - with LithopsEagerFunctionExecutor() as executor: - executor.map(lambda x: x * 2, (1, 2, 3)) - lithops_futures = list(executor.lithops_client.futures) - assert len(lithops_futures) > 0 - - # After shutdown, lithops futures list should be cleared - assert len(executor.lithops_client.futures) == 0 - - def _make_executor(executor_cls): """Create a pytest param for an executor class with appropriate marks.""" marks = { @@ -89,25 +69,33 @@ def _make_executor(executor_cls): _make_executor(LithopsEagerFunctionExecutor), ] - @pytest.mark.parametrize("executor_cls", ALL_EXECUTORS) -class TestExecutorMemory: - def test_executor_does_not_leak_after_context_manager(self, executor_cls): - """Executor and its futures should be GC-collectable after the with block.""" - +class TestExecutorShutdown: + def test_shutdown_clears_futures(self, executor_cls): + """Internal _futures list should be empty after shutdown.""" with executor_cls() as executor: - # Use map() since lithops call_async requires a data argument - list(executor.map(lambda x: x * 2, range(5))) - ref = weakref.ref(executor) - - # Drop the only local reference to the executor - del executor - gc.collect() + executor.submit(lambda x: x * 2, 1) + executor.submit(lambda x: x + 1, 2) + assert len(executor._futures) == 2 + if executor_cls is LithopsEagerFunctionExecutor: + # grab refs before they get cleared + lithops_futures = list(executor.lithops_client.futures) + assert len(lithops_futures) ==2 + + assert len(executor._futures) == 0 + + # Lithops-specific: verify lithops internal futures are also cleared + if executor_cls is LithopsEagerFunctionExecutor: + assert len(executor.lithops_client.futures) == 0 + assert all(f._call_output is None for f in lithops_futures) + + # Testing idempotency + executor.shutdown() + assert len(executor._futures) == 0 - assert ref() is None, ( - f"{executor_cls.__name__} was not garbage collected after shutdown" - ) +@pytest.mark.parametrize("executor_cls", ALL_EXECUTORS) +class TestExecutorMemory: def test_repeated_executor_use_does_not_grow_memory(self, executor_cls): """Memory should not grow when creating and destroying executors repeatedly.""" import tracemalloc @@ -140,7 +128,7 @@ def _run_once(): # If memory leaks, peak will scale with n_iterations. # Allow 1.2x the single-run peak to account for GC timing jitter. assert multi_peak < 1.2 * baseline_peak, ( - f"{executor_cls.__name__} leaked memory: single run peak " + f"{executor_cls.__name__} does not release memory: single run peak " f"{baseline_peak / 1024:.0f} KB, {n_iterations} runs peak " f"{multi_peak / 1024:.0f} KB" ) From 46908c1bc88e3326fd9a783d9b8e38461d08e038 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:36:43 +0000 Subject: [PATCH 04/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- virtualizarr/tests/test_parallel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 542d4ae4..5a52046f 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -1,6 +1,5 @@ import gc import multiprocessing as mp -import weakref import pytest @@ -69,6 +68,7 @@ def _make_executor(executor_cls): _make_executor(LithopsEagerFunctionExecutor), ] + @pytest.mark.parametrize("executor_cls", ALL_EXECUTORS) class TestExecutorShutdown: def test_shutdown_clears_futures(self, executor_cls): @@ -80,7 +80,7 @@ def test_shutdown_clears_futures(self, executor_cls): if executor_cls is LithopsEagerFunctionExecutor: # grab refs before they get cleared lithops_futures = list(executor.lithops_client.futures) - assert len(lithops_futures) ==2 + assert len(lithops_futures) == 2 assert len(executor._futures) == 0 From 4732cc33e4b962421babeef4491c6e440aef8f8d Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 12 Mar 2026 18:59:46 -0400 Subject: [PATCH 05/23] Alternative approach via lithops config --- virtualizarr/parallel.py | 35 ++++++++++++++++++----------- virtualizarr/tests/test_parallel.py | 7 +++--- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/virtualizarr/parallel.py b/virtualizarr/parallel.py index 7c728ef5..ca3ecb40 100644 --- a/virtualizarr/parallel.py +++ b/virtualizarr/parallel.py @@ -277,6 +277,24 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: def __init__(self, **kwargs) -> None: import lithops # type: ignore[import-untyped] + + # Fix for unbounded memory growth on repeated `open_virtual_mfdataset` calls + # see https://github.com/zarr-developers/VirtualiZarr/issues/926 + + # Users are encouraged to provide configs for lithops via file + # But just in case that someone imports this and configures it, they have to provide all + # details below explicitly as `config=` argument. + if not 'config' in kwargs: + _config_file = lithops.config.load_config() + if _config_file['lithops'].get('backend') == 'localhost': + # We currently only want to apply this fix for the localhost executor + kwargs['config'] = { + "lithops":{ + "data_cleaner":False, #prevents atexit registration of `.lithops_client.clean` method + "backend":'localhost', # if this is not provided lithops will default to aws lambda + } + } + # Create Lithops client with optional configuration self.lithops_client = lithops.FunctionExecutor(**kwargs).__enter__() @@ -379,18 +397,9 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: wait Whether to wait for pending futures. """ - # Free cached results from lithops ResponseFuture objects before shutdown. - # lithops.FunctionExecutor.futures is never cleared internally — each map() - # call extends it with new ResponseFutures that cache deserialized results - # in _call_output. Without this, memory accumulates across repeated calls. - for f in self.lithops_client.futures: - f._call_output = None - self.lithops_client.futures.clear() - self._futures.clear() - - # Lithops registers self.clean as an atexit handler (executors.py __init__), - # which prevents the FunctionExecutor from ever being garbage collected. - # Unregister it so the executor can be freed after shutdown. - atexit.unregister(self.lithops_client.clean) + if wait: + # ensure all futures are completed before exiting + self.lithops_client.wait(show_progressbar=False) + #Exit context manager entered during __init__ self.lithops_client.__exit__(None, None, None) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 5a52046f..556bdbb6 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -1,6 +1,6 @@ import gc import multiprocessing as mp - +import tracemalloc import pytest from virtualizarr.parallel import ( @@ -98,7 +98,6 @@ def test_shutdown_clears_futures(self, executor_cls): class TestExecutorMemory: def test_repeated_executor_use_does_not_grow_memory(self, executor_cls): """Memory should not grow when creating and destroying executors repeatedly.""" - import tracemalloc def _run_once(): with executor_cls() as executor: @@ -118,7 +117,7 @@ def _run_once(): # Now run many iterations and check peak doesn't grow tracemalloc.start() - n_iterations = 10 + n_iterations = 20 for _ in range(n_iterations): _run_once() gc.collect() @@ -127,7 +126,7 @@ def _run_once(): # If memory leaks, peak will scale with n_iterations. # Allow 1.2x the single-run peak to account for GC timing jitter. - assert multi_peak < 1.2 * baseline_peak, ( + assert multi_peak < 1.1 * baseline_peak, ( f"{executor_cls.__name__} does not release memory: single run peak " f"{baseline_peak / 1024:.0f} KB, {n_iterations} runs peak " f"{multi_peak / 1024:.0f} KB" From 031ffac5f94172be107998e89fba77f698c406a5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 12 Mar 2026 22:59:58 +0000 Subject: [PATCH 06/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- virtualizarr/parallel.py | 24 +++++++++++------------- virtualizarr/tests/test_parallel.py | 1 + 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/virtualizarr/parallel.py b/virtualizarr/parallel.py index ca3ecb40..d71c587a 100644 --- a/virtualizarr/parallel.py +++ b/virtualizarr/parallel.py @@ -1,4 +1,3 @@ -import atexit import inspect import multiprocessing as mp import warnings @@ -277,23 +276,22 @@ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: def __init__(self, **kwargs) -> None: import lithops # type: ignore[import-untyped] + # Fix for unbounded memory growth on repeated `open_virtual_mfdataset` calls + # see https://github.com/zarr-developers/VirtualiZarr/issues/926 - # Fix for unbounded memory growth on repeated `open_virtual_mfdataset` calls - # see https://github.com/zarr-developers/VirtualiZarr/issues/926 - # Users are encouraged to provide configs for lithops via file # But just in case that someone imports this and configures it, they have to provide all # details below explicitly as `config=` argument. - if not 'config' in kwargs: + if "config" not in kwargs: _config_file = lithops.config.load_config() - if _config_file['lithops'].get('backend') == 'localhost': + if _config_file["lithops"].get("backend") == "localhost": # We currently only want to apply this fix for the localhost executor - kwargs['config'] = { - "lithops":{ - "data_cleaner":False, #prevents atexit registration of `.lithops_client.clean` method - "backend":'localhost', # if this is not provided lithops will default to aws lambda - } - } + kwargs["config"] = { + "lithops": { + "data_cleaner": False, # prevents atexit registration of `.lithops_client.clean` method + "backend": "localhost", # if this is not provided lithops will default to aws lambda + } + } # Create Lithops client with optional configuration self.lithops_client = lithops.FunctionExecutor(**kwargs).__enter__() @@ -401,5 +399,5 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: # ensure all futures are completed before exiting self.lithops_client.wait(show_progressbar=False) - #Exit context manager entered during __init__ + # Exit context manager entered during __init__ self.lithops_client.__exit__(None, None, None) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 556bdbb6..c4861e26 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -1,6 +1,7 @@ import gc import multiprocessing as mp import tracemalloc + import pytest from virtualizarr.parallel import ( From 6414546a041b8b7cd06f2b001c32785ad2a9da86 Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 12 Mar 2026 19:07:45 -0400 Subject: [PATCH 07/23] toms renaming suggestion --- virtualizarr/tests/test_parallel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 556bdbb6..db03c355 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -62,14 +62,14 @@ def _make_executor(executor_cls): ) -ALL_EXECUTORS = [ +ALL_CUSTOM_EXECUTORS = [ _make_executor(SerialExecutor), _make_executor(DaskDelayedExecutor), _make_executor(LithopsEagerFunctionExecutor), ] -@pytest.mark.parametrize("executor_cls", ALL_EXECUTORS) +@pytest.mark.parametrize("executor_cls", ALL_CUSTOM_EXECUTORS) class TestExecutorShutdown: def test_shutdown_clears_futures(self, executor_cls): """Internal _futures list should be empty after shutdown.""" @@ -94,7 +94,7 @@ def test_shutdown_clears_futures(self, executor_cls): assert len(executor._futures) == 0 -@pytest.mark.parametrize("executor_cls", ALL_EXECUTORS) +@pytest.mark.parametrize("executor_cls", ALL_CUSTOM_EXECUTORS) class TestExecutorMemory: def test_repeated_executor_use_does_not_grow_memory(self, executor_cls): """Memory should not grow when creating and destroying executors repeatedly.""" From 3f9aa440991227ada5555b7c8b6e1c24a6cdcdfb Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Mon, 16 Mar 2026 14:12:17 -0400 Subject: [PATCH 08/23] Update lithops dependency version in pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9317afa7..8275304a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,7 +133,7 @@ dev = [ "pytest-xdist", "ruff", "s3fs", - "lithops", + "lithops>3.6.4", "dask", ] From af2c83edd427ac12358d998442058e6add1f9553 Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Mon, 16 Mar 2026 14:14:15 -0400 Subject: [PATCH 09/23] Revert lithops dependency version constraint --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8275304a..9317afa7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,7 +133,7 @@ dev = [ "pytest-xdist", "ruff", "s3fs", - "lithops>3.6.4", + "lithops", "dask", ] From 0cb839d325b0e8d2b040bd5053452c0ad9f8a8ee Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:46:42 -0400 Subject: [PATCH 10/23] Mark Lithops executor tests as flaky --- virtualizarr/tests/test_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index e9abcc89..a63bc9ee 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -56,7 +56,7 @@ def _make_executor(executor_cls): """Create a pytest param for an executor class with appropriate marks.""" marks = { "DaskDelayedExecutor": [requires_dask], - "LithopsEagerFunctionExecutor": [requires_lithops], + "LithopsEagerFunctionExecutor": [requires_lithops, pytest.mark.flaky], } return pytest.param( executor_cls, From 05316e6a5df2f70693863da2863568c765772b6c Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 14:55:25 -0400 Subject: [PATCH 11/23] rerun flaky tests --- virtualizarr/tests/test_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index a63bc9ee..da64799a 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -56,7 +56,7 @@ def _make_executor(executor_cls): """Create a pytest param for an executor class with appropriate marks.""" marks = { "DaskDelayedExecutor": [requires_dask], - "LithopsEagerFunctionExecutor": [requires_lithops, pytest.mark.flaky], + "LithopsEagerFunctionExecutor": [requires_lithops, pytest.mark.flaky(max_runs=3)], } return pytest.param( executor_cls, From 23d2654e0943cc366554b2069564cfca967df5dd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 19 Mar 2026 18:55:46 +0000 Subject: [PATCH 12/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- virtualizarr/tests/test_parallel.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index da64799a..cb2c03cf 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -56,7 +56,10 @@ def _make_executor(executor_cls): """Create a pytest param for an executor class with appropriate marks.""" marks = { "DaskDelayedExecutor": [requires_dask], - "LithopsEagerFunctionExecutor": [requires_lithops, pytest.mark.flaky(max_runs=3)], + "LithopsEagerFunctionExecutor": [ + requires_lithops, + pytest.mark.flaky(max_runs=3), + ], } return pytest.param( executor_cls, From 748f50d0e862a99309193491c8ac521d89bb488a Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 15:22:21 -0400 Subject: [PATCH 13/23] Fix LithopsEagerFunctionExecutor.shutdown() not clearing futures The shutdown method was not clearing `lithops_client.futures` or freeing output memory, causing test failures on Python 3.12 and 3.13. Co-Authored-By: Claude Sonnet 4.6 --- virtualizarr/parallel.py | 7 +++++++ virtualizarr/tests/test_parallel.py | 5 +---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/virtualizarr/parallel.py b/virtualizarr/parallel.py index d71c587a..2fcadd68 100644 --- a/virtualizarr/parallel.py +++ b/virtualizarr/parallel.py @@ -399,5 +399,12 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: # ensure all futures are completed before exiting self.lithops_client.wait(show_progressbar=False) + self._futures.clear() + + # Free output memory and clear lithops internal futures list + for f in self.lithops_client.futures: + f._call_output = None + self.lithops_client.futures.clear() + # Exit context manager entered during __init__ self.lithops_client.__exit__(None, None, None) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index cb2c03cf..e9abcc89 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -56,10 +56,7 @@ def _make_executor(executor_cls): """Create a pytest param for an executor class with appropriate marks.""" marks = { "DaskDelayedExecutor": [requires_dask], - "LithopsEagerFunctionExecutor": [ - requires_lithops, - pytest.mark.flaky(max_runs=3), - ], + "LithopsEagerFunctionExecutor": [requires_lithops], } return pytest.param( executor_cls, From 263656e0e5a874fa9239839b06fd3afb77bb0bfb Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 15:24:03 -0400 Subject: [PATCH 14/23] add comment --- virtualizarr/parallel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/virtualizarr/parallel.py b/virtualizarr/parallel.py index 2fcadd68..5f4339e0 100644 --- a/virtualizarr/parallel.py +++ b/virtualizarr/parallel.py @@ -399,6 +399,7 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: # ensure all futures are completed before exiting self.lithops_client.wait(show_progressbar=False) + # Python <= 3.13 requires manual clearing of the futures, python 3.14 clears futures during wait self._futures.clear() # Free output memory and clear lithops internal futures list From b9ba3cadafa3d651dcf935538f37646ef02d0b4a Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:03:24 -0400 Subject: [PATCH 15/23] Test against lithops fork with job_manager thread join fix Temporarily point lithops dep to jbusecke/lithops@fix-join-job-manager-localhostv2 to verify the upstream fix resolves the memory growth test on Linux CI. Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a99ac862..b31ae47f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,7 +133,7 @@ dev = [ "pytest-xdist", "ruff", "s3fs", - "lithops", + "lithops @ git+https://github.com/jbusecke/lithops.git@fix-join-job-manager-localhostv2", "dask", ] From 63481f847f2640a61aa93316e8f1dccaeda849f1 Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:09:30 -0400 Subject: [PATCH 16/23] Add sleep between memory test iterations to allow background threads to exit Co-Authored-By: Claude Sonnet 4.6 --- virtualizarr/tests/test_parallel.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index e9abcc89..f9f6a67d 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -1,5 +1,6 @@ import gc import multiprocessing as mp +import time import tracemalloc import pytest @@ -97,6 +98,13 @@ def test_shutdown_clears_futures(self, executor_cls): assert len(executor._futures) == 0 +@requires_lithops +def test_lithops_executor_data_cleaner_disabled(): + """data_cleaner must be False to prevent atexit registration of lithops' clean method.""" + with LithopsEagerFunctionExecutor() as executor: + assert executor.lithops_client.data_cleaner is False + + @pytest.mark.parametrize("executor_cls", ALL_CUSTOM_EXECUTORS) class TestExecutorMemory: def test_repeated_executor_use_does_not_grow_memory(self, executor_cls): @@ -124,6 +132,7 @@ def _run_once(): for _ in range(n_iterations): _run_once() gc.collect() + time.sleep(0.5) # allow background threads to exit and be GC'd _, multi_peak = tracemalloc.get_traced_memory() tracemalloc.stop() From 68a60e969e644440db9e961dd7cac9ed9fd4918b Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:10:03 -0400 Subject: [PATCH 17/23] Move sleep to after the loop to allow background threads to exit before measuring Co-Authored-By: Claude Sonnet 4.6 --- virtualizarr/tests/test_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index f9f6a67d..7bc79229 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -132,7 +132,7 @@ def _run_once(): for _ in range(n_iterations): _run_once() gc.collect() - time.sleep(0.5) # allow background threads to exit and be GC'd + time.sleep(2) # allow background threads to exit and be GC'd before measuring _, multi_peak = tracemalloc.get_traced_memory() tracemalloc.stop() From ba61353caaaeb080d00eb0ea6e98e0b844f575c3 Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:14:48 -0400 Subject: [PATCH 18/23] Increase sleep to 30s to give background threads more time to exit Co-Authored-By: Claude Sonnet 4.6 --- virtualizarr/tests/test_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 7bc79229..2b2d5c72 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -132,7 +132,7 @@ def _run_once(): for _ in range(n_iterations): _run_once() gc.collect() - time.sleep(2) # allow background threads to exit and be GC'd before measuring + time.sleep(30) # allow background threads to exit and be GC'd before measuring _, multi_peak = tracemalloc.get_traced_memory() tracemalloc.stop() From 71a43b44184fb65a109b68c3c7e50950a7996820 Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:24:18 -0400 Subject: [PATCH 19/23] Revert lithops dep back to released version Co-Authored-By: Claude Sonnet 4.6 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b31ae47f..a99ac862 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,7 +133,7 @@ dev = [ "pytest-xdist", "ruff", "s3fs", - "lithops @ git+https://github.com/jbusecke/lithops.git@fix-join-job-manager-localhostv2", + "lithops", "dask", ] From 394c019c4015f18a8f1f29c91cd208611d1608cc Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:44:02 -0400 Subject: [PATCH 20/23] remove the memory growth test --- virtualizarr/parallel.py | 1 - virtualizarr/tests/test_parallel.py | 40 ----------------------------- 2 files changed, 41 deletions(-) diff --git a/virtualizarr/parallel.py b/virtualizarr/parallel.py index 5f4339e0..2fcadd68 100644 --- a/virtualizarr/parallel.py +++ b/virtualizarr/parallel.py @@ -399,7 +399,6 @@ def shutdown(self, wait: bool = True, *, cancel_futures: bool = False) -> None: # ensure all futures are completed before exiting self.lithops_client.wait(show_progressbar=False) - # Python <= 3.13 requires manual clearing of the futures, python 3.14 clears futures during wait self._futures.clear() # Free output memory and clear lithops internal futures list diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 2b2d5c72..76681a3c 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -103,43 +103,3 @@ def test_lithops_executor_data_cleaner_disabled(): """data_cleaner must be False to prevent atexit registration of lithops' clean method.""" with LithopsEagerFunctionExecutor() as executor: assert executor.lithops_client.data_cleaner is False - - -@pytest.mark.parametrize("executor_cls", ALL_CUSTOM_EXECUTORS) -class TestExecutorMemory: - def test_repeated_executor_use_does_not_grow_memory(self, executor_cls): - """Memory should not grow when creating and destroying executors repeatedly.""" - - def _run_once(): - with executor_cls() as executor: - # Use map() to produce non-trivial results - return list(executor.map(lambda x: list(range(10_000)), range(5))) - - # Warm up (first run may allocate caches, import modules, etc.) - _run_once() - gc.collect() - - # Measure baseline: peak memory from a single run - tracemalloc.start() - _run_once() - gc.collect() - _, baseline_peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - - # Now run many iterations and check peak doesn't grow - tracemalloc.start() - n_iterations = 20 - for _ in range(n_iterations): - _run_once() - gc.collect() - time.sleep(30) # allow background threads to exit and be GC'd before measuring - _, multi_peak = tracemalloc.get_traced_memory() - tracemalloc.stop() - - # If memory leaks, peak will scale with n_iterations. - # Allow 1.2x the single-run peak to account for GC timing jitter. - assert multi_peak < 1.1 * baseline_peak, ( - f"{executor_cls.__name__} does not release memory: single run peak " - f"{baseline_peak / 1024:.0f} KB, {n_iterations} runs peak " - f"{multi_peak / 1024:.0f} KB" - ) From d1822a31130eb63e9d34054788b99bbb94f48980 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 19 Mar 2026 21:44:32 +0000 Subject: [PATCH 21/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- virtualizarr/tests/test_parallel.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 76681a3c..8625437e 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -1,7 +1,4 @@ -import gc import multiprocessing as mp -import time -import tracemalloc import pytest From d8c0ab199815cdeca2caef03bb2927191cb22f43 Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:51:57 -0400 Subject: [PATCH 22/23] Add .shutdown() method to custom executors Added `.shutdown()` method to custom executors to prevent unbounded memory increase in lithops. --- docs/releases.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/releases.md b/docs/releases.md index ecb83f39..04fb8a18 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -42,6 +42,8 @@ By [Tom Nicholas](https://github.com/TomNicholas). - Fix `ZarrParser` not correctly parsing scalar variables from v2 native zarr stores ([#936](https://github.com/zarr-developers/VirtualiZarr/pull/936)). By [Julius Buseceke](https://github.com/jbusecke) +- Add `.shutdown()` method to custom executors (dask, lithops) preventing unbounded memory increase in the case of lithops ([#925](https://github.com/zarr-developers/VirtualiZarr/pull/925)). + By [Julius Buseceke](https://github.com/jbusecke) ### Documentation From 887edac031e5588a854180f4157e92a8e0313a01 Mon Sep 17 00:00:00 2001 From: Julius Busecke <14314623+jbusecke@users.noreply.github.com> Date: Thu, 19 Mar 2026 17:57:30 -0400 Subject: [PATCH 23/23] satisfy linter --- virtualizarr/tests/test_parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/virtualizarr/tests/test_parallel.py b/virtualizarr/tests/test_parallel.py index 8625437e..d274cb58 100644 --- a/virtualizarr/tests/test_parallel.py +++ b/virtualizarr/tests/test_parallel.py @@ -97,6 +97,6 @@ def test_shutdown_clears_futures(self, executor_cls): @requires_lithops def test_lithops_executor_data_cleaner_disabled(): - """data_cleaner must be False to prevent atexit registration of lithops' clean method.""" + """Data_cleaner must be False to prevent atexit registration of lithops' clean method.""" with LithopsEagerFunctionExecutor() as executor: assert executor.lithops_client.data_cleaner is False