From 003cde9b0cf05fe6d1f42e3ef82f55cd74fbcb94 Mon Sep 17 00:00:00 2001 From: Min Xu Date: Mon, 13 Oct 2025 19:00:38 -0700 Subject: [PATCH 1/4] memory limit to a parsl worker in HPCExecutor --- .../src/climate_ref/executor/hpc.py | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/packages/climate-ref/src/climate_ref/executor/hpc.py b/packages/climate-ref/src/climate_ref/executor/hpc.py index 8d9563bc0..458f3983a 100644 --- a/packages/climate-ref/src/climate_ref/executor/hpc.py +++ b/packages/climate-ref/src/climate_ref/executor/hpc.py @@ -20,8 +20,10 @@ import os import re +import resource import time -from typing import Annotated, Any, Literal +from collections.abc import Callable +from typing import Annotated, Any, Literal, TypeVar, cast import parsl from loguru import logger @@ -44,6 +46,8 @@ from .local import ExecutionFuture, process_result from .pbs_scheduler import SmartPBSProvider +F = TypeVar("F", bound=Callable[..., Any]) + class SlurmConfig(BaseModel): """Slurm Configurations""" @@ -61,7 +65,7 @@ class SlurmConfig(BaseModel): validation: StrictBool = False walltime: str = "00:30:00" scheduler_options: str = "" - retries: Annotated[int, Field(strict=True, ge=1, le=3)] = 2 + retries: Annotated[int, Field(strict=True, ge=0, le=3)] = 2 max_blocks: Annotated[int, Field(strict=True, ge=1)] = 1 # one block mean one job? worker_init: str = "" overrides: str = "" @@ -111,7 +115,23 @@ def _validate_walltime(cls, v: str) -> str: return v +def with_memory_limit(limit_gb: float) -> Callable[[F], F]: + """Set memory limit for a parsl worker""" + + def decorator(func: F) -> F: + def wrapper(*args: Any, **kwargs: Any) -> Any: + bytes_limit = int(limit_gb * 1024 * 1024 * 1024) + soft, hard = bytes_limit, bytes_limit + resource.setrlimit(resource.RLIMIT_AS, (soft, hard)) + return func(*args, **kwargs) + + return cast(F, wrapper) + + return decorator + + @python_app +@with_memory_limit(7.0) def _process_run(definition: ExecutionDefinition, log_level: str) -> ExecutionResult: """Run the function on computer nodes""" # This is a catch-all for any exceptions that occur in the process and need to raise for From e5b3ca9e05fa63c565e7cb6c569c394781654e36 Mon Sep 17 00:00:00 2001 From: Min Xu Date: Mon, 23 Feb 2026 10:38:08 -0800 Subject: [PATCH 2/4] replace the hard-coded memory limit with an env variable --- .../src/climate_ref/executor/hpc.py | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/packages/climate-ref/src/climate_ref/executor/hpc.py b/packages/climate-ref/src/climate_ref/executor/hpc.py index 458f3983a..8a42ba43b 100644 --- a/packages/climate-ref/src/climate_ref/executor/hpc.py +++ b/packages/climate-ref/src/climate_ref/executor/hpc.py @@ -115,14 +115,21 @@ def _validate_walltime(cls, v: str) -> str: return v -def with_memory_limit(limit_gb: float) -> Callable[[F], F]: +def with_memory_limit(limit_gb: float | Callable[..., float | None]) -> Callable[[F], F]: """Set memory limit for a parsl worker""" def decorator(func: F) -> F: def wrapper(*args: Any, **kwargs: Any) -> Any: - bytes_limit = int(limit_gb * 1024 * 1024 * 1024) - soft, hard = bytes_limit, bytes_limit - resource.setrlimit(resource.RLIMIT_AS, (soft, hard)) + try: + current_limit = limit_gb(*args, **kwargs) if callable(limit_gb) else limit_gb + except Exception: + current_limit = None + + if current_limit is not None and current_limit > 0: + bytes_limit = int(current_limit * 1024 * 1024 * 1024) + _, hard0 = resource.getrlimit(resource.RLIMIT_AS) + soft = min(bytes_limit, hard0) + resource.setrlimit(resource.RLIMIT_AS, (soft, hard0)) return func(*args, **kwargs) return cast(F, wrapper) @@ -130,8 +137,19 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return decorator +def limit_from_env(*args: Any, **kwargs: Any) -> float | None: + """Get the memory limits from env variables""" + val = os.getenv("MEMORY_LIMIT_PARSL_JOB_GB") + if not val: + return None + try: + return float(val) + except ValueError: + return None + + @python_app -@with_memory_limit(7.0) +@with_memory_limit(limit_from_env) def _process_run(definition: ExecutionDefinition, log_level: str) -> ExecutionResult: """Run the function on computer nodes""" # This is a catch-all for any exceptions that occur in the process and need to raise for From 5fb2f31259274b4bb83e924cab37628f3521ed1c Mon Sep 17 00:00:00 2001 From: Min Xu Date: Mon, 23 Feb 2026 16:36:27 -0800 Subject: [PATCH 3/4] add tests for memory limit --- .../src/climate_ref/executor/hpc.py | 2 +- .../tests/unit/executor/test_hpc_executor.py | 44 ++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/packages/climate-ref/src/climate_ref/executor/hpc.py b/packages/climate-ref/src/climate_ref/executor/hpc.py index 8a42ba43b..492cf80e1 100644 --- a/packages/climate-ref/src/climate_ref/executor/hpc.py +++ b/packages/climate-ref/src/climate_ref/executor/hpc.py @@ -128,7 +128,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: if current_limit is not None and current_limit > 0: bytes_limit = int(current_limit * 1024 * 1024 * 1024) _, hard0 = resource.getrlimit(resource.RLIMIT_AS) - soft = min(bytes_limit, hard0) + soft = min(bytes_limit, hard0) if hard0 > 0 else bytes_limit resource.setrlimit(resource.RLIMIT_AS, (soft, hard0)) return func(*args, **kwargs) diff --git a/packages/climate-ref/tests/unit/executor/test_hpc_executor.py b/packages/climate-ref/tests/unit/executor/test_hpc_executor.py index 535619ab8..c0b3214c5 100644 --- a/packages/climate-ref/tests/unit/executor/test_hpc_executor.py +++ b/packages/climate-ref/tests/unit/executor/test_hpc_executor.py @@ -1,4 +1,6 @@ +import os import re +import resource from unittest.mock import MagicMock, patch import parsl @@ -6,7 +8,13 @@ from parsl.dataflow import futures from pydantic import ValidationError -from climate_ref.executor.hpc import HPCExecutor, SlurmConfig, execute_locally +from climate_ref.executor.hpc import ( + HPCExecutor, + SlurmConfig, + execute_locally, + limit_from_env, + with_memory_limit, +) from climate_ref.executor.local import ExecutionFuture from climate_ref_core.diagnostics import ExecutionResult from climate_ref_core.exceptions import DiagnosticError @@ -173,3 +181,37 @@ def test_hpc_slurm_missing_required_config(self, missing_config, base_config): [slurm_cfg_dict.pop(m) for m in missing_config] with pytest.raises(ValidationError): SlurmConfig.model_validate(slurm_cfg_dict) + + def test_memeory_limit_fixed(self): + """Test with a fixed numeric memory limit""" + + @with_memory_limit(2.0) + def test_func(): + # Check if memory limit was set + soft, hard = resource.getrlimit(resource.RLIMIT_AS) + expected_bytes = int(2.0 * 1024 * 1024 * 1024) + assert soft == expected_bytes + assert hard >= expected_bytes + + def test_memory_limit_func(self): + # Save original state + os.environ.pop("MEMORY_LIMIT_PARSL_JOB_GB", None) + orig_soft, orig_hard = resource.getrlimit(resource.RLIMIT_AS) + + @with_memory_limit(limit_from_env) + def unset_func(): + soft, hard = resource.getrlimit(resource.RLIMIT_AS) + assert soft == orig_soft + assert hard == orig_hard + + unset_func() + os.environ["MEMORY_LIMIT_PARSL_JOB_GB"] = "7" + + @with_memory_limit(limit_from_env) + def set_func(): + soft, hard = resource.getrlimit(resource.RLIMIT_AS) + expected_bytes = 7 * 1024 * 1024 * 1024 + assert soft == expected_bytes + assert hard == -1 or hard >= expected_bytes + + set_func() From 55ccbca22164bfaedd13507e06343661daef3755 Mon Sep 17 00:00:00 2001 From: Min Xu Date: Mon, 23 Feb 2026 18:44:51 -0800 Subject: [PATCH 4/4] add changelog --- changelog/464.improvement.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/464.improvement.md diff --git a/changelog/464.improvement.md b/changelog/464.improvement.md new file mode 100644 index 000000000..bd3b247af --- /dev/null +++ b/changelog/464.improvement.md @@ -0,0 +1 @@ +Implemented a memory constraint using the environment variable `MEMORY_LIMIT_PARSL_JOB_GB` to set the memory limit (units: GB) for a PARSL worker.