From 4213d2c802ea49eda6c2aeed7afcb7b74d34989d Mon Sep 17 00:00:00 2001 From: James Frost Date: Tue, 10 Feb 2026 13:52:49 +0000 Subject: [PATCH 1/7] Send logs to stdout instead of stderr This means they appear in job.out instead of being weirdly split over job.out and job.err. --- src/CSET/__init__.py | 2 +- src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py | 5 ++++- .../cset_workflow/app/finish_website/bin/finish_website.py | 5 ++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/CSET/__init__.py b/src/CSET/__init__.py index 9b41e29b2..f98ca8081 100644 --- a/src/CSET/__init__.py +++ b/src/CSET/__init__.py @@ -210,7 +210,7 @@ def filter(self, record): logging.getLogger("matplotlib.font_manager").addFilter(NoFontMessageFilter()) - stderr_log = logging.StreamHandler() + stderr_log = logging.StreamHandler(stream=sys.stdout) stderr_log.setFormatter( logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") ) diff --git a/src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py b/src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py index 8bf13cfff..acf999a7c 100644 --- a/src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py +++ b/src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py @@ -21,6 +21,7 @@ import logging import os import ssl +import sys import urllib.parse import urllib.request from concurrent.futures import ThreadPoolExecutor @@ -31,7 +32,9 @@ import isodate logging.basicConfig( - level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s" + level=os.getenv("LOGLEVEL", "INFO"), + format="%(asctime)s %(levelname)s %(message)s", + stream=sys.stdout, ) diff --git a/src/CSET/cset_workflow/app/finish_website/bin/finish_website.py b/src/CSET/cset_workflow/app/finish_website/bin/finish_website.py index f9a06218f..859ee82ea 100755 --- a/src/CSET/cset_workflow/app/finish_website/bin/finish_website.py +++ b/src/CSET/cset_workflow/app/finish_website/bin/finish_website.py @@ -25,6 +25,7 @@ import logging import os import shutil +import sys import time from importlib.metadata import version from pathlib import Path @@ -32,7 +33,9 @@ from CSET._common import combine_dicts, sort_dict logging.basicConfig( - level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s" + level=os.getenv("LOGLEVEL", "INFO"), + format="%(asctime)s %(levelname)s %(message)s", + stream=sys.stdout, ) logger = logging.getLogger(__name__) From ba6477b9f46312a847b9d8744f4d099ec303c901 Mon Sep 17 00:00:00 2001 From: James Frost Date: Fri, 6 Feb 2026 11:50:53 +0000 Subject: [PATCH 2/7] Move setup_logging into CSET._common We are going to use it in multiple places now. --- src/CSET/__init__.py | 46 +------------------------------------------- src/CSET/_common.py | 45 +++++++++++++++++++++++++++++++++++++++++++ tests/test_cli.py | 24 ----------------------- tests/test_common.py | 25 ++++++++++++++++++++++++ 4 files changed, 71 insertions(+), 69 deletions(-) diff --git a/src/CSET/__init__.py b/src/CSET/__init__.py index f98ca8081..65b84e23a 100644 --- a/src/CSET/__init__.py +++ b/src/CSET/__init__.py @@ -16,12 +16,11 @@ import argparse import logging -import os import sys from importlib.metadata import version from pathlib import Path -from CSET._common import ArgumentError +from CSET._common import ArgumentError, setup_logging logger = logging.getLogger(__name__) @@ -174,49 +173,6 @@ def setup_argument_parser() -> argparse.ArgumentParser: return parser -def setup_logging(verbosity: int): - """Configure logging level, format and output stream. - - Level is based on verbose argument and the LOGLEVEL environment variable. - """ - logging.captureWarnings(True) - - # Calculate logging level. - # Level from CLI flags. - if verbosity >= 2: - cli_loglevel = logging.DEBUG - elif verbosity == 1: - cli_loglevel = logging.INFO - else: - cli_loglevel = logging.WARNING - - # Level from $LOGLEVEL environment variable. - env_loglevel = logging.getLevelNamesMapping().get( - os.getenv("LOGLEVEL"), logging.ERROR - ) - - # Logging verbosity is the most verbose of CLI and environment setting. - loglevel = min(cli_loglevel, env_loglevel) - - # Configure the root logger. - logger = logging.getLogger() - # Set logging level. - logger.setLevel(loglevel) - - # Hide matplotlib's many font messages. - class NoFontMessageFilter(logging.Filter): - def filter(self, record): - return not record.getMessage().startswith("findfont:") - - logging.getLogger("matplotlib.font_manager").addFilter(NoFontMessageFilter()) - - stderr_log = logging.StreamHandler(stream=sys.stdout) - stderr_log.setFormatter( - logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") - ) - logger.addHandler(stderr_log) - - def _bake_command(args, unparsed_args): from CSET._common import parse_recipe, parse_variable_options from CSET.operators import execute_recipe diff --git a/src/CSET/_common.py b/src/CSET/_common.py index 0ad0de667..ee23f8034 100644 --- a/src/CSET/_common.py +++ b/src/CSET/_common.py @@ -18,9 +18,11 @@ import io import json import logging +import os import re from collections.abc import Iterable from pathlib import Path +import sys from textwrap import dedent from typing import Any @@ -31,6 +33,49 @@ class ArgumentError(ValueError): """Provided arguments are not understood.""" +def setup_logging(verbosity: int): + """Configure logging level, format and output stream. + + Level is based on verbose argument and the LOGLEVEL environment variable. + """ + logging.captureWarnings(True) + + # Calculate logging level. + # Level from CLI flags. + if verbosity >= 2: + cli_loglevel = logging.DEBUG + elif verbosity == 1: + cli_loglevel = logging.INFO + else: + cli_loglevel = logging.WARNING + + # Level from $LOGLEVEL environment variable. + env_loglevel = logging.getLevelNamesMapping().get( + os.getenv("LOGLEVEL"), logging.ERROR + ) + + # Logging verbosity is the most verbose of CLI and environment setting. + loglevel = min(cli_loglevel, env_loglevel) + + # Configure the root logger. + logger = logging.getLogger() + # Set logging level. + logger.setLevel(loglevel) + + # Hide matplotlib's many font messages. + class NoFontMessageFilter(logging.Filter): + def filter(self, record): + return not record.getMessage().startswith("findfont:") + + logging.getLogger("matplotlib.font_manager").addFilter(NoFontMessageFilter()) + + stderr_log = logging.StreamHandler(stream=sys.stdout) + stderr_log.setFormatter( + logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") + ) + logger.addHandler(stderr_log) + + def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict: """Parse a recipe into a python dictionary. diff --git a/tests/test_cli.py b/tests/test_cli.py index b93e80aad..138815263 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -110,30 +110,6 @@ def test_argument_parser_extract_workflow(tmp_path): assert args.location == tmp_path -def test_setup_logging(): - """Tests the logging setup at various verbosity levels.""" - root_logger = logging.getLogger() - # Log has a minimum of WARNING. - CSET.setup_logging(0) - assert root_logger.level == logging.WARNING - # -v - CSET.setup_logging(1) - assert root_logger.level == logging.INFO - # -vv - CSET.setup_logging(2) - assert root_logger.level == logging.DEBUG - - -def test_setup_logging_mpl_font_logs_filtered(caplog): - """Test matplotlib log messages about fonts are filtered out.""" - CSET.setup_logging(2) - logger = logging.getLogger("matplotlib.font_manager") - logger.debug("findfont: message") - logger.debug("other message") - assert len(caplog.records) == 1 - assert caplog.records[0].getMessage() == "other message" - - def test_main_no_subparser(capsys): """Appropriate error when no subparser is given.""" with pytest.raises(SystemExit) as sysexit: diff --git a/tests/test_common.py b/tests/test_common.py index 4bfd3fac3..fdd8b910e 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -14,6 +14,7 @@ """Tests for common functionality across CSET.""" +import logging from collections.abc import Iterable from pathlib import Path @@ -22,6 +23,30 @@ import CSET._common as common +def test_setup_logging(): + """Tests the logging setup at various verbosity levels.""" + root_logger = logging.getLogger() + # Log has a minimum of WARNING. + common.setup_logging(0) + assert root_logger.level == logging.WARNING + # -v + common.setup_logging(1) + assert root_logger.level == logging.INFO + # -vv + common.setup_logging(2) + assert root_logger.level == logging.DEBUG + + +def test_setup_logging_mpl_font_logs_filtered(caplog): + """Test matplotlib log messages about fonts are filtered out.""" + common.setup_logging(2) + logger = logging.getLogger("matplotlib.font_manager") + logger.debug("findfont: message") + logger.debug("other message") + assert len(caplog.records) == 1 + assert caplog.records[0].getMessage() == "other message" + + def test_parse_recipe_string(): """Loading and parsing of a YAML recipe from a string.""" valid_recipe = """\ From 41e1bd5d7c5e2f325658b9b8a51d84406c52769b Mon Sep 17 00:00:00 2001 From: James Frost Date: Fri, 6 Feb 2026 13:02:42 +0000 Subject: [PATCH 3/7] Tweak how YAML is imported --- src/CSET/_common.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/CSET/_common.py b/src/CSET/_common.py index ee23f8034..a6b0c3dcf 100644 --- a/src/CSET/_common.py +++ b/src/CSET/_common.py @@ -26,7 +26,8 @@ from textwrap import dedent from typing import Any -import ruamel.yaml +from ruamel.yaml import YAML +from ruamel.yaml.parser import ParserError class ArgumentError(ValueError): @@ -85,7 +86,7 @@ def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict Path to a file containing, or a string of, a recipe's YAML describing the operators that need running. If a Path is provided it is opened and read. - variables: dict + variables: dict, optional Dictionary of recipe variables. If None templating is not attempted. Returns @@ -114,10 +115,10 @@ def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict raise TypeError("recipe_yaml must be a str or Path.") # Parse the recipe YAML. - with ruamel.yaml.YAML(typ="safe", pure=True) as yaml: + with YAML(typ="safe", pure=True) as yaml: try: recipe = yaml.load(recipe_yaml) - except ruamel.yaml.parser.ParserError as err: + except ParserError as err: raise ValueError("ParserError: Invalid YAML") from err logging.debug("Recipe before templating:\n%s", recipe) From 18332b8adfb97b1169a6baf56c6401ccfee16858 Mon Sep 17 00:00:00 2001 From: James Frost Date: Fri, 6 Feb 2026 14:42:27 +0000 Subject: [PATCH 4/7] Add a duration formatting function --- src/CSET/_common.py | 17 +++++++++++++++++ tests/test_common.py | 16 ++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/src/CSET/_common.py b/src/CSET/_common.py index a6b0c3dcf..a6f937694 100644 --- a/src/CSET/_common.py +++ b/src/CSET/_common.py @@ -472,3 +472,20 @@ def is_increasing(sequence: list) -> bool: duplicate values. An iris DimCoord's points fulfils this criteria. """ return sequence[0] < sequence[1] + + +def format_duration(seconds: float) -> str: + """Format a number of seconds as a human readable string.""" + # Show milliseconds for short durations. + if seconds < 60: + return f"{seconds:.3f} seconds" + whole_seconds = int(seconds) + secs = (whole_seconds) % 60 + mins = (whole_seconds // 60) % 60 + hours = (whole_seconds // 3600) % 24 + days = whole_seconds // 86400 + time_string = f"{hours}h{mins}m{secs}s" + if days: + return f"{days} {'day' if days == 1 else 'days'} {time_string}" + else: + return time_string diff --git a/tests/test_common.py b/tests/test_common.py index fdd8b910e..c9cb9c945 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -398,3 +398,19 @@ def test_is_increasing(): """Check order of strictly monotonic sequences is determined.""" assert common.is_increasing([1, 2, 3]) assert not common.is_increasing([3, 2, 1]) + + +def test_format_duration(): + """Check formatting of different durations.""" + # Integer short duration. + assert common.format_duration(1) == "1.000 seconds" + # Float short duration. + assert common.format_duration(9.876543) == "9.877 seconds" + # Integer hours duration. + assert common.format_duration(3661) == "1h1m1s" + # Float hours duration. + assert common.format_duration(3661.999) == "1h1m1s" + # Days. + assert common.format_duration(86700) == "1 day 0h5m0s" + # Many days. + assert common.format_duration(8640000) == "100 days 0h0m0s" From 71b970ba7fdce02383625f7990c3dd0240adeb5a Mon Sep 17 00:00:00 2001 From: James Frost Date: Fri, 6 Feb 2026 15:13:39 +0000 Subject: [PATCH 5/7] Port workflow parallel baking to python --- .../app/bake_recipes/bin/bake.sh | 15 --- .../app/bake_recipes/bin/baker.sh | 39 ------ .../app/bake_recipes/bin/traybake.py | 120 ++++++++++++++++++ .../app/bake_recipes/rose-app.conf | 8 +- src/CSET/cset_workflow/flow.cylc | 2 - 5 files changed, 122 insertions(+), 62 deletions(-) delete mode 100755 src/CSET/cset_workflow/app/bake_recipes/bin/bake.sh delete mode 100755 src/CSET/cset_workflow/app/bake_recipes/bin/baker.sh create mode 100755 src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py diff --git a/src/CSET/cset_workflow/app/bake_recipes/bin/bake.sh b/src/CSET/cset_workflow/app/bake_recipes/bin/bake.sh deleted file mode 100755 index fe8599424..000000000 --- a/src/CSET/cset_workflow/app/bake_recipes/bin/bake.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash -# Run CSET bake for a recipe file. -set -euo pipefail - -# Put together path to output dir from nice name for recipe. -output_dir="${CYLC_WORKFLOW_SHARE_DIR}/web/plots/${CYLC_TASK_CYCLE_POINT}/$(basename "$1" .yaml)" - -set -x -# Bake recipe. -exec cset bake \ - --recipe "$1" \ - --output-dir "$output_dir" \ - ${COLORBAR_FILE:+"--style-file='${CYLC_WORKFLOW_SHARE_DIR}/style.json'"} \ - ${PLOT_RESOLUTION:+"--plot-resolution=$PLOT_RESOLUTION"} \ - ${SKIP_WRITE:+"--skip-write"} diff --git a/src/CSET/cset_workflow/app/bake_recipes/bin/baker.sh b/src/CSET/cset_workflow/app/bake_recipes/bin/baker.sh deleted file mode 100755 index f4de7de72..000000000 --- a/src/CSET/cset_workflow/app/bake_recipes/bin/baker.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env bash -# Setup rose app for baking, then bake. -set -euo pipefail - -# Use the appropriate recipes. -optconfkey="$CYLC_TASK_CYCLE_POINT" -if [ -n "${DO_CASE_AGGREGATION-}" ]; then - RECIPE_DIR=$CYLC_WORKFLOW_SHARE_DIR/cycle/$CYLC_TASK_CYCLE_POINT/aggregation_recipes - optconfkey="${optconfkey}-aggregation" -else - RECIPE_DIR=$CYLC_WORKFLOW_SHARE_DIR/cycle/$CYLC_TASK_CYCLE_POINT/recipes -fi -if ! [ -d "$RECIPE_DIR" ]; then - echo "No recipes to bake in $RECIPE_DIR" - exit 0 -fi -export RECIPE_DIR - -# Determine parallelism. -parallelism="$(nproc)" -if [ "$CYLC_TASK_SUBMIT_NUMBER" -gt 1 ]; then - # This is a retry; enable DEBUG logging. - export LOGLEVEL="DEBUG" -fi - -# Get filenames without leading directory. -# Portability note: printf is specific to GNU find. -recipes="$(find "$RECIPE_DIR" -iname '*.yaml' -type f -printf '%P ')" -# Count and display number of recipes. (By counting the number of spaces.) -echo "Baking $(echo "$recipes" | tr -dc ' ' | wc -c) recipes..." - -# Write rose-bunch optional configuration. -mkdir -p "$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/opt/" -opt_conf="$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/opt/rose-app-${optconfkey}.conf" -printf "[bunch]\npool-size=%s\n[bunch-args]\nrecipe_file=%s\n" "$parallelism" "$recipes" > "$opt_conf" -unset opt_conf parallelism recipes - -# Run bake_recipes rose app. -exec rose task-run -v --app-key=bake_recipes --opt-conf-key="${optconfkey}" diff --git a/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py b/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py new file mode 100755 index 000000000..60bf3949c --- /dev/null +++ b/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 + +"""Efficiently bake many recipes.""" + +import functools +import logging +import os +import sys +import time +from pathlib import Path + +from CSET._common import format_duration, parse_recipe, setup_logging +from CSET.operators import execute_recipe + +logger = logging.getLogger(__name__) + + +def bake_recipe( + recipe: Path, + cycle_output_dir: Path, + log_dir: Path, + style_file: Path, + plot_resolution: int, + skip_write: bool, +): + """Bake a single recipe. Returns whether it succeeded.""" + recipe_name = recipe.name.removesuffix(".yaml") + + # Put together path to output dir from nice name for recipe. + output_dir = cycle_output_dir / recipe_name + + # Make a recipe-specific logger. + log_file = log_dir / f"{recipe_name}.log" + recipe_logger = logging.getLogger(f"{__name__}.recipe.{recipe_name}") + recipe_logger.propagate = False + file_handler = logging.FileHandler(log_file) + recipe_logger.addHandler(file_handler) + + # Log the equivalent bake command for easy rerunning. + recipe_summary = f"""Baking recipe: +Recipe:\t{recipe} +Output:\t{output_dir} +Equivalent bake command: +cset -vv bake --recipe='{recipe}' --output-dir='{output_dir}' --style-file='{style_file}' --plot_resolution={plot_resolution}{" --skip-write" if skip_write else ""} +""" + recipe_logger.info(recipe_summary) + start_time = time.time() + + # Bake recipe. + try: + parsed_recipe = parse_recipe(recipe) + execute_recipe( + recipe=parsed_recipe, + output_directory=output_dir, + style_file=style_file, + plot_resolution=plot_resolution, + skip_write=skip_write, + ) + except Exception as err: + recipe_logger.exception( + "An unhandled exception occurred:\n%s", + str(err), + exc_info=True, + stack_info=True, + ) + logger.error("Recipe %s failed to bake. See %s", recipe_name, log_file) + duration = time.time() - start_time + recipe_logger.info("Recipe baked in %s.", format_duration(duration)) + + +def traybake(): + """Bake many recipes.""" + # Force DEBUG logging on retries. + if int(os.environ["CYLC_TASK_SUBMIT_NUMBER"]) > 1: + setup_logging(2) + else: + setup_logging(1) + + # Load in information from environment. + log_dir = Path(os.environ["CYLC_TASK_LOG_DIR"]) + cycle_point = os.environ["CYLC_TASK_CYCLE_POINT"] + share_dir = os.environ["CYLC_WORKFLOW_SHARE_DIR"] + agg = bool(os.getenv("DO_CASE_AGGREGATION", False)) + recipe_dir = Path( + f"{share_dir}/cycle/{cycle_point}/{'aggregation_' if agg else ''}recipes" + ) + cycle_output_dir = Path(f"{share_dir}/web/plots/{cycle_point}") + + # Baking configuration. + style_file = Path(f"{share_dir}/style.json") + plot_resolution = int(os.getenv("PLOT_RESOLUTION", 72)) + skip_write = bool(os.getenv("SKIP_WRITE", False)) + + # TODO: Replace with os.process_cpu_count once python 3.13 is our minimum. + # max_parallelism = len(os.sched_getaffinity(0)) + + recipes = list(filter(lambda p: p.is_file(), recipe_dir.glob("*.yaml"))) + if len(recipes): + logger.info("Baking %s recipes...", len(recipes)) + else: + logger.warning("No recipes to bake in %s", recipe_dir) + sys.exit(0) + + # Fill in all the constant arguments. (Everything but recipe.) + partial_bake = functools.partial( + bake_recipe, + cycle_output_dir=cycle_output_dir, + log_dir=log_dir, + style_file=style_file, + plot_resolution=plot_resolution, + skip_write=skip_write, + ) + + # TODO: Use a parallel executor. + for recipe in recipes: + partial_bake(recipe) + + +if __name__ == "__main__": + traybake() diff --git a/src/CSET/cset_workflow/app/bake_recipes/rose-app.conf b/src/CSET/cset_workflow/app/bake_recipes/rose-app.conf index cfb9c5ab5..2c27e99f7 100644 --- a/src/CSET/cset_workflow/app/bake_recipes/rose-app.conf +++ b/src/CSET/cset_workflow/app/bake_recipes/rose-app.conf @@ -1,6 +1,2 @@ -mode = rose_bunch - -[bunch] -incremental=true -command-format=app_env_wrapper bake.sh "$RECIPE_DIR/%(recipe_file)s" -# [bunch-args] come from an optional config written by bin/baker.sh +[command] +default=app_env_wrapper traybake.py diff --git a/src/CSET/cset_workflow/flow.cylc b/src/CSET/cset_workflow/flow.cylc index bc2febf58..b86eac738 100644 --- a/src/CSET/cset_workflow/flow.cylc +++ b/src/CSET/cset_workflow/flow.cylc @@ -191,12 +191,10 @@ final cycle point = {{CSET_TRIAL_END_DATE}} [[bake_recipes]] # Bake the parbaked recipes for this cycle. - script = "$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/bin/baker.sh" execution time limit = PT3H [[bake_aggregation_recipes]] # Bake the parbaked aggregation recipes. - script = "$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/bin/baker.sh" execution time limit = PT3H [[[environment]]] DO_CASE_AGGREGATION = True From 2fc072ea56e5db15ae9629b94475d70440791082 Mon Sep 17 00:00:00 2001 From: James Frost Date: Fri, 6 Feb 2026 17:07:37 +0000 Subject: [PATCH 6/7] Add per-process logging to keep the logs separate Currently untested. --- .../app/bake_recipes/bin/traybake.py | 122 ++++++++++++------ 1 file changed, 82 insertions(+), 40 deletions(-) diff --git a/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py b/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py index 60bf3949c..54e446fc5 100755 --- a/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py +++ b/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py @@ -7,6 +7,7 @@ import os import sys import time +from concurrent.futures import ProcessPoolExecutor from pathlib import Path from CSET._common import format_duration, parse_recipe, setup_logging @@ -15,6 +16,45 @@ logger = logging.getLogger(__name__) +class ProcessLoggingContext: + """Write this process's logs to a separate file. + + Log messages are filtered out from others. + """ + + def __init__(self, logger: logging.Logger, log_file: Path): + self.logger = logger + self.handler = logging.FileHandler(log_file) + + # Construct some filtering functions. + process_id = os.getpid() + + def keep_process_logs(record: logging.LogRecord) -> bool: + return record.process == process_id + + def discard_process_logs(record: logging.LogRecord) -> bool: + return record.process != process_id + + self.handler.addFilter(keep_process_logs) + self.filter = discard_process_logs + + def __enter__(self): + """Attach handler and filter when entering the context.""" + # Add filter for this process's log messages to existing handlers. + for handler in self.logger.handlers: + handler.addFilter(self.filter) + # Add a handler without the filter. + self.logger.addHandler(self.handler) + + def __exit__(self, exc_type, exc_value, traceback): + """Remove handler and filter and clean up when leaving the context.""" + self.logger.removeHandler(self.handler) + # Remove filter from other handlers. + for handler in self.logger.handlers: + handler.removeFilter(self.filter) + self.handler.close() + + def bake_recipe( recipe: Path, cycle_output_dir: Path, @@ -29,43 +69,44 @@ def bake_recipe( # Put together path to output dir from nice name for recipe. output_dir = cycle_output_dir / recipe_name - # Make a recipe-specific logger. + # Log this recipe's logs to a separate file. log_file = log_dir / f"{recipe_name}.log" - recipe_logger = logging.getLogger(f"{__name__}.recipe.{recipe_name}") - recipe_logger.propagate = False - file_handler = logging.FileHandler(log_file) - recipe_logger.addHandler(file_handler) - - # Log the equivalent bake command for easy rerunning. - recipe_summary = f"""Baking recipe: -Recipe:\t{recipe} -Output:\t{output_dir} -Equivalent bake command: -cset -vv bake --recipe='{recipe}' --output-dir='{output_dir}' --style-file='{style_file}' --plot_resolution={plot_resolution}{" --skip-write" if skip_write else ""} -""" - recipe_logger.info(recipe_summary) - start_time = time.time() - - # Bake recipe. - try: - parsed_recipe = parse_recipe(recipe) - execute_recipe( - recipe=parsed_recipe, - output_directory=output_dir, - style_file=style_file, - plot_resolution=plot_resolution, - skip_write=skip_write, + root_logger = logging.getLogger() + with ProcessLoggingContext(logger=root_logger, log_file=log_file): + # Log the equivalent bake command for easy rerunning. + recipe_summary = ( + f"Baking recipe {recipe_name}\n" + "Equivalent bake command:\n" + "cset -vv bake \\\n" + f" --recipe='{recipe}' \\\n" + f" --output-dir='{output_dir}' \\\n" + f" --style-file='{style_file}' \\\n" + f" --plot_resolution={plot_resolution}" + f"{' \\\n --skip-write' if skip_write else ''}" ) - except Exception as err: - recipe_logger.exception( - "An unhandled exception occurred:\n%s", - str(err), - exc_info=True, - stack_info=True, - ) - logger.error("Recipe %s failed to bake. See %s", recipe_name, log_file) - duration = time.time() - start_time - recipe_logger.info("Recipe baked in %s.", format_duration(duration)) + logger.info(recipe_summary) + start_time = time.time() + + # Bake recipe. + try: + parsed_recipe = parse_recipe(recipe) + execute_recipe( + recipe=parsed_recipe, + output_directory=output_dir, + style_file=style_file, + plot_resolution=plot_resolution, + skip_write=skip_write, + ) + except Exception as err: + logger.exception( + "An unhandled exception occurred:\n%s", + err, + exc_info=True, + stack_info=True, + ) + logger.error("Recipe %s failed to bake. See %s", recipe_name, log_file) + duration = time.time() - start_time + logger.info("Recipe baked in %s.", format_duration(duration)) def traybake(): @@ -91,9 +132,6 @@ def traybake(): plot_resolution = int(os.getenv("PLOT_RESOLUTION", 72)) skip_write = bool(os.getenv("SKIP_WRITE", False)) - # TODO: Replace with os.process_cpu_count once python 3.13 is our minimum. - # max_parallelism = len(os.sched_getaffinity(0)) - recipes = list(filter(lambda p: p.is_file(), recipe_dir.glob("*.yaml"))) if len(recipes): logger.info("Baking %s recipes...", len(recipes)) @@ -111,9 +149,13 @@ def traybake(): skip_write=skip_write, ) + # TODO: Replace with os.process_cpu_count once python 3.13 is our minimum. + max_parallelism = len(os.sched_getaffinity(0)) + # TODO: Use a parallel executor. - for recipe in recipes: - partial_bake(recipe) + with ProcessPoolExecutor(max_workers=max_parallelism) as pool: + futures = [pool.submit(partial_bake, recipe) for recipe in recipes] + # TODO: Log when each future is finished. if __name__ == "__main__": From 76f0ebf52d869fd8f099d4e98df6e8aa726f37cf Mon Sep 17 00:00:00 2001 From: James Frost Date: Mon, 9 Feb 2026 16:37:25 +0000 Subject: [PATCH 7/7] Add parallel executor and top-level logging --- .../app/bake_recipes/bin/traybake.py | 43 ++++++++++++++----- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py b/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py index 54e446fc5..49f829a5f 100755 --- a/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py +++ b/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py @@ -5,9 +5,8 @@ import functools import logging import os -import sys import time -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from CSET._common import format_duration, parse_recipe, setup_logging @@ -16,6 +15,10 @@ logger = logging.getLogger(__name__) +class RecipeError(RuntimeError): + """Recipe failed to bake.""" + + class ProcessLoggingContext: """Write this process's logs to a separate file. @@ -63,7 +66,7 @@ def bake_recipe( plot_resolution: int, skip_write: bool, ): - """Bake a single recipe. Returns whether it succeeded.""" + """Bake a single recipe.""" recipe_name = recipe.name.removesuffix(".yaml") # Put together path to output dir from nice name for recipe. @@ -104,7 +107,9 @@ def bake_recipe( exc_info=True, stack_info=True, ) - logger.error("Recipe %s failed to bake. See %s", recipe_name, log_file) + raise RecipeError( + f"Recipe {recipe_name} failed to bake. See {log_file}" + ) from err duration = time.time() - start_time logger.info("Recipe baked in %s.", format_duration(duration)) @@ -132,14 +137,16 @@ def traybake(): plot_resolution = int(os.getenv("PLOT_RESOLUTION", 72)) skip_write = bool(os.getenv("SKIP_WRITE", False)) + # Find recipes to bake. recipes = list(filter(lambda p: p.is_file(), recipe_dir.glob("*.yaml"))) - if len(recipes): - logger.info("Baking %s recipes...", len(recipes)) + num_recipes = len(recipes) + if num_recipes: + logger.info("Baking %s recipes...", num_recipes) else: logger.warning("No recipes to bake in %s", recipe_dir) - sys.exit(0) + return - # Fill in all the constant arguments. (Everything but recipe.) + # Fill in constant arguments. (All but recipe.) partial_bake = functools.partial( bake_recipe, cycle_output_dir=cycle_output_dir, @@ -149,13 +156,27 @@ def traybake(): skip_write=skip_write, ) - # TODO: Replace with os.process_cpu_count once python 3.13 is our minimum. + # Get number of usable CPUs. max_parallelism = len(os.sched_getaffinity(0)) - # TODO: Use a parallel executor. + number_length = len(str(num_recipes)) # For formatting. + num_baked = 0 + num_failed = 0 + + # Bake the recipes in parallel. with ProcessPoolExecutor(max_workers=max_parallelism) as pool: futures = [pool.submit(partial_bake, recipe) for recipe in recipes] - # TODO: Log when each future is finished. + for future in as_completed(futures): + num_baked += 1 + try: + future.result() + logging.info(f"{num_baked: {number_length}}/{num_recipes}") + except RecipeError as err: + num_failed += 1 + logger.error(err) + logger.info("Baking complete!") + if num_failed: + logger.warning("%s/%s recipes failed to bake.", num_failed, num_recipes) if __name__ == "__main__":