diff --git a/src/CSET/__init__.py b/src/CSET/__init__.py index 9b41e29b2..65b84e23a 100644 --- a/src/CSET/__init__.py +++ b/src/CSET/__init__.py @@ -16,12 +16,11 @@ import argparse import logging -import os import sys from importlib.metadata import version from pathlib import Path -from CSET._common import ArgumentError +from CSET._common import ArgumentError, setup_logging logger = logging.getLogger(__name__) @@ -174,49 +173,6 @@ def setup_argument_parser() -> argparse.ArgumentParser: return parser -def setup_logging(verbosity: int): - """Configure logging level, format and output stream. - - Level is based on verbose argument and the LOGLEVEL environment variable. - """ - logging.captureWarnings(True) - - # Calculate logging level. - # Level from CLI flags. - if verbosity >= 2: - cli_loglevel = logging.DEBUG - elif verbosity == 1: - cli_loglevel = logging.INFO - else: - cli_loglevel = logging.WARNING - - # Level from $LOGLEVEL environment variable. - env_loglevel = logging.getLevelNamesMapping().get( - os.getenv("LOGLEVEL"), logging.ERROR - ) - - # Logging verbosity is the most verbose of CLI and environment setting. - loglevel = min(cli_loglevel, env_loglevel) - - # Configure the root logger. - logger = logging.getLogger() - # Set logging level. - logger.setLevel(loglevel) - - # Hide matplotlib's many font messages. - class NoFontMessageFilter(logging.Filter): - def filter(self, record): - return not record.getMessage().startswith("findfont:") - - logging.getLogger("matplotlib.font_manager").addFilter(NoFontMessageFilter()) - - stderr_log = logging.StreamHandler() - stderr_log.setFormatter( - logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") - ) - logger.addHandler(stderr_log) - - def _bake_command(args, unparsed_args): from CSET._common import parse_recipe, parse_variable_options from CSET.operators import execute_recipe diff --git a/src/CSET/_common.py b/src/CSET/_common.py index 0ad0de667..a6f937694 100644 --- a/src/CSET/_common.py +++ b/src/CSET/_common.py @@ -18,19 +18,65 @@ import io import json import logging +import os import re from collections.abc import Iterable from pathlib import Path +import sys from textwrap import dedent from typing import Any -import ruamel.yaml +from ruamel.yaml import YAML +from ruamel.yaml.parser import ParserError class ArgumentError(ValueError): """Provided arguments are not understood.""" +def setup_logging(verbosity: int): + """Configure logging level, format and output stream. + + Level is based on verbose argument and the LOGLEVEL environment variable. + """ + logging.captureWarnings(True) + + # Calculate logging level. + # Level from CLI flags. + if verbosity >= 2: + cli_loglevel = logging.DEBUG + elif verbosity == 1: + cli_loglevel = logging.INFO + else: + cli_loglevel = logging.WARNING + + # Level from $LOGLEVEL environment variable. + env_loglevel = logging.getLevelNamesMapping().get( + os.getenv("LOGLEVEL"), logging.ERROR + ) + + # Logging verbosity is the most verbose of CLI and environment setting. + loglevel = min(cli_loglevel, env_loglevel) + + # Configure the root logger. + logger = logging.getLogger() + # Set logging level. + logger.setLevel(loglevel) + + # Hide matplotlib's many font messages. + class NoFontMessageFilter(logging.Filter): + def filter(self, record): + return not record.getMessage().startswith("findfont:") + + logging.getLogger("matplotlib.font_manager").addFilter(NoFontMessageFilter()) + + stderr_log = logging.StreamHandler(stream=sys.stdout) + stderr_log.setFormatter( + logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s") + ) + logger.addHandler(stderr_log) + + def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict: """Parse a recipe into a python dictionary. @@ -40,7 +86,7 @@ def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict Path to a file containing, or a string of, a recipe's YAML describing the operators that need running. If a Path is provided it is opened and read. - variables: dict + variables: dict, optional Dictionary of recipe variables. If None templating is not attempted. Returns @@ -69,10 +115,10 @@ def parse_recipe(recipe_yaml: Path | str, variables: dict | None = None) -> dict raise TypeError("recipe_yaml must be a str or Path.") # Parse the recipe YAML. - with ruamel.yaml.YAML(typ="safe", pure=True) as yaml: + with YAML(typ="safe", pure=True) as yaml: try: recipe = yaml.load(recipe_yaml) - except ruamel.yaml.parser.ParserError as err: + except ParserError as err: raise ValueError("ParserError: Invalid YAML") from err logging.debug("Recipe before templating:\n%s", recipe) @@ -426,3 +472,20 @@ def is_increasing(sequence: list) -> bool: duplicate values. An iris DimCoord's points fulfils this criteria. """ return sequence[0] < sequence[1] + + +def format_duration(seconds: float) -> str: + """Format a number of seconds as a human readable string.""" + # Show milliseconds for short durations. + if seconds < 60: + return f"{seconds:.3f} seconds" + whole_seconds = int(seconds) + secs = (whole_seconds) % 60 + mins = (whole_seconds // 60) % 60 + hours = (whole_seconds // 3600) % 24 + days = whole_seconds // 86400 + time_string = f"{hours}h{mins}m{secs}s" + if days: + return f"{days} {'day' if days == 1 else 'days'} {time_string}" + else: + return time_string diff --git a/src/CSET/cset_workflow/app/bake_recipes/bin/bake.sh b/src/CSET/cset_workflow/app/bake_recipes/bin/bake.sh deleted file mode 100755 index fe8599424..000000000 --- a/src/CSET/cset_workflow/app/bake_recipes/bin/bake.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash -# Run CSET bake for a recipe file. -set -euo pipefail - -# Put together path to output dir from nice name for recipe. -output_dir="${CYLC_WORKFLOW_SHARE_DIR}/web/plots/${CYLC_TASK_CYCLE_POINT}/$(basename "$1" .yaml)" - -set -x -# Bake recipe. -exec cset bake \ - --recipe "$1" \ - --output-dir "$output_dir" \ - ${COLORBAR_FILE:+"--style-file='${CYLC_WORKFLOW_SHARE_DIR}/style.json'"} \ - ${PLOT_RESOLUTION:+"--plot-resolution=$PLOT_RESOLUTION"} \ - ${SKIP_WRITE:+"--skip-write"} diff --git a/src/CSET/cset_workflow/app/bake_recipes/bin/baker.sh b/src/CSET/cset_workflow/app/bake_recipes/bin/baker.sh deleted file mode 100755 index f4de7de72..000000000 --- a/src/CSET/cset_workflow/app/bake_recipes/bin/baker.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env bash -# Setup rose app for baking, then bake. -set -euo pipefail - -# Use the appropriate recipes. -optconfkey="$CYLC_TASK_CYCLE_POINT" -if [ -n "${DO_CASE_AGGREGATION-}" ]; then - RECIPE_DIR=$CYLC_WORKFLOW_SHARE_DIR/cycle/$CYLC_TASK_CYCLE_POINT/aggregation_recipes - optconfkey="${optconfkey}-aggregation" -else - RECIPE_DIR=$CYLC_WORKFLOW_SHARE_DIR/cycle/$CYLC_TASK_CYCLE_POINT/recipes -fi -if ! [ -d "$RECIPE_DIR" ]; then - echo "No recipes to bake in $RECIPE_DIR" - exit 0 -fi -export RECIPE_DIR - -# Determine parallelism. -parallelism="$(nproc)" -if [ "$CYLC_TASK_SUBMIT_NUMBER" -gt 1 ]; then - # This is a retry; enable DEBUG logging. - export LOGLEVEL="DEBUG" -fi - -# Get filenames without leading directory. -# Portability note: printf is specific to GNU find. -recipes="$(find "$RECIPE_DIR" -iname '*.yaml' -type f -printf '%P ')" -# Count and display number of recipes. (By counting the number of spaces.) -echo "Baking $(echo "$recipes" | tr -dc ' ' | wc -c) recipes..." - -# Write rose-bunch optional configuration. -mkdir -p "$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/opt/" -opt_conf="$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/opt/rose-app-${optconfkey}.conf" -printf "[bunch]\npool-size=%s\n[bunch-args]\nrecipe_file=%s\n" "$parallelism" "$recipes" > "$opt_conf" -unset opt_conf parallelism recipes - -# Run bake_recipes rose app. -exec rose task-run -v --app-key=bake_recipes --opt-conf-key="${optconfkey}" diff --git a/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py b/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py new file mode 100755 index 000000000..49f829a5f --- /dev/null +++ b/src/CSET/cset_workflow/app/bake_recipes/bin/traybake.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 + +"""Efficiently bake many recipes.""" + +import functools +import logging +import os +import time +from concurrent.futures import ProcessPoolExecutor, as_completed +from pathlib import Path + +from CSET._common import format_duration, parse_recipe, setup_logging +from CSET.operators import execute_recipe + +logger = logging.getLogger(__name__) + + +class RecipeError(RuntimeError): + """Recipe failed to bake.""" + + +class ProcessLoggingContext: + """Write this process's logs to a separate file. + + Log messages are filtered out from others. + """ + + def __init__(self, logger: logging.Logger, log_file: Path): + self.logger = logger + self.handler = logging.FileHandler(log_file) + + # Construct some filtering functions. + process_id = os.getpid() + + def keep_process_logs(record: logging.LogRecord) -> bool: + return record.process == process_id + + def discard_process_logs(record: logging.LogRecord) -> bool: + return record.process != process_id + + self.handler.addFilter(keep_process_logs) + self.filter = discard_process_logs + + def __enter__(self): + """Attach handler and filter when entering the context.""" + # Add filter for this process's log messages to existing handlers. + for handler in self.logger.handlers: + handler.addFilter(self.filter) + # Add a handler without the filter. + self.logger.addHandler(self.handler) + + def __exit__(self, exc_type, exc_value, traceback): + """Remove handler and filter and clean up when leaving the context.""" + self.logger.removeHandler(self.handler) + # Remove filter from other handlers. + for handler in self.logger.handlers: + handler.removeFilter(self.filter) + self.handler.close() + + +def bake_recipe( + recipe: Path, + cycle_output_dir: Path, + log_dir: Path, + style_file: Path, + plot_resolution: int, + skip_write: bool, +): + """Bake a single recipe.""" + recipe_name = recipe.name.removesuffix(".yaml") + + # Put together path to output dir from nice name for recipe. + output_dir = cycle_output_dir / recipe_name + + # Log this recipe's logs to a separate file. + log_file = log_dir / f"{recipe_name}.log" + root_logger = logging.getLogger() + with ProcessLoggingContext(logger=root_logger, log_file=log_file): + # Log the equivalent bake command for easy rerunning. + recipe_summary = ( + f"Baking recipe {recipe_name}\n" + "Equivalent bake command:\n" + "cset -vv bake \\\n" + f" --recipe='{recipe}' \\\n" + f" --output-dir='{output_dir}' \\\n" + f" --style-file='{style_file}' \\\n" + f" --plot_resolution={plot_resolution}" + f"{' \\\n --skip-write' if skip_write else ''}" + ) + logger.info(recipe_summary) + start_time = time.time() + + # Bake recipe. + try: + parsed_recipe = parse_recipe(recipe) + execute_recipe( + recipe=parsed_recipe, + output_directory=output_dir, + style_file=style_file, + plot_resolution=plot_resolution, + skip_write=skip_write, + ) + except Exception as err: + logger.exception( + "An unhandled exception occurred:\n%s", + err, + exc_info=True, + stack_info=True, + ) + raise RecipeError( + f"Recipe {recipe_name} failed to bake. See {log_file}" + ) from err + duration = time.time() - start_time + logger.info("Recipe baked in %s.", format_duration(duration)) + + +def traybake(): + """Bake many recipes.""" + # Force DEBUG logging on retries. + if int(os.environ["CYLC_TASK_SUBMIT_NUMBER"]) > 1: + setup_logging(2) + else: + setup_logging(1) + + # Load in information from environment. + log_dir = Path(os.environ["CYLC_TASK_LOG_DIR"]) + cycle_point = os.environ["CYLC_TASK_CYCLE_POINT"] + share_dir = os.environ["CYLC_WORKFLOW_SHARE_DIR"] + agg = bool(os.getenv("DO_CASE_AGGREGATION", False)) + recipe_dir = Path( + f"{share_dir}/cycle/{cycle_point}/{'aggregation_' if agg else ''}recipes" + ) + cycle_output_dir = Path(f"{share_dir}/web/plots/{cycle_point}") + + # Baking configuration. + style_file = Path(f"{share_dir}/style.json") + plot_resolution = int(os.getenv("PLOT_RESOLUTION", 72)) + skip_write = bool(os.getenv("SKIP_WRITE", False)) + + # Find recipes to bake. + recipes = list(filter(lambda p: p.is_file(), recipe_dir.glob("*.yaml"))) + num_recipes = len(recipes) + if num_recipes: + logger.info("Baking %s recipes...", num_recipes) + else: + logger.warning("No recipes to bake in %s", recipe_dir) + return + + # Fill in constant arguments. (All but recipe.) + partial_bake = functools.partial( + bake_recipe, + cycle_output_dir=cycle_output_dir, + log_dir=log_dir, + style_file=style_file, + plot_resolution=plot_resolution, + skip_write=skip_write, + ) + + # Get number of usable CPUs. + max_parallelism = len(os.sched_getaffinity(0)) + + number_length = len(str(num_recipes)) # For formatting. + num_baked = 0 + num_failed = 0 + + # Bake the recipes in parallel. + with ProcessPoolExecutor(max_workers=max_parallelism) as pool: + futures = [pool.submit(partial_bake, recipe) for recipe in recipes] + for future in as_completed(futures): + num_baked += 1 + try: + future.result() + logging.info(f"{num_baked: {number_length}}/{num_recipes}") + except RecipeError as err: + num_failed += 1 + logger.error(err) + logger.info("Baking complete!") + if num_failed: + logger.warning("%s/%s recipes failed to bake.", num_failed, num_recipes) + + +if __name__ == "__main__": + traybake() diff --git a/src/CSET/cset_workflow/app/bake_recipes/rose-app.conf b/src/CSET/cset_workflow/app/bake_recipes/rose-app.conf index cfb9c5ab5..2c27e99f7 100644 --- a/src/CSET/cset_workflow/app/bake_recipes/rose-app.conf +++ b/src/CSET/cset_workflow/app/bake_recipes/rose-app.conf @@ -1,6 +1,2 @@ -mode = rose_bunch - -[bunch] -incremental=true -command-format=app_env_wrapper bake.sh "$RECIPE_DIR/%(recipe_file)s" -# [bunch-args] come from an optional config written by bin/baker.sh +[command] +default=app_env_wrapper traybake.py diff --git a/src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py b/src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py index 8bf13cfff..acf999a7c 100644 --- a/src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py +++ b/src/CSET/cset_workflow/app/fetch_fcst/bin/fetch_data.py @@ -21,6 +21,7 @@ import logging import os import ssl +import sys import urllib.parse import urllib.request from concurrent.futures import ThreadPoolExecutor @@ -31,7 +32,9 @@ import isodate logging.basicConfig( - level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s" + level=os.getenv("LOGLEVEL", "INFO"), + format="%(asctime)s %(levelname)s %(message)s", + stream=sys.stdout, ) diff --git a/src/CSET/cset_workflow/app/finish_website/bin/finish_website.py b/src/CSET/cset_workflow/app/finish_website/bin/finish_website.py index f9a06218f..859ee82ea 100755 --- a/src/CSET/cset_workflow/app/finish_website/bin/finish_website.py +++ b/src/CSET/cset_workflow/app/finish_website/bin/finish_website.py @@ -25,6 +25,7 @@ import logging import os import shutil +import sys import time from importlib.metadata import version from pathlib import Path @@ -32,7 +33,9 @@ from CSET._common import combine_dicts, sort_dict logging.basicConfig( - level=os.getenv("LOGLEVEL", "INFO"), format="%(asctime)s %(levelname)s %(message)s" + level=os.getenv("LOGLEVEL", "INFO"), + format="%(asctime)s %(levelname)s %(message)s", + stream=sys.stdout, ) logger = logging.getLogger(__name__) diff --git a/src/CSET/cset_workflow/flow.cylc b/src/CSET/cset_workflow/flow.cylc index bc2febf58..b86eac738 100644 --- a/src/CSET/cset_workflow/flow.cylc +++ b/src/CSET/cset_workflow/flow.cylc @@ -191,12 +191,10 @@ final cycle point = {{CSET_TRIAL_END_DATE}} [[bake_recipes]] # Bake the parbaked recipes for this cycle. - script = "$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/bin/baker.sh" execution time limit = PT3H [[bake_aggregation_recipes]] # Bake the parbaked aggregation recipes. - script = "$CYLC_WORKFLOW_RUN_DIR/app/bake_recipes/bin/baker.sh" execution time limit = PT3H [[[environment]]] DO_CASE_AGGREGATION = True diff --git a/tests/test_cli.py b/tests/test_cli.py index b93e80aad..138815263 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -110,30 +110,6 @@ def test_argument_parser_extract_workflow(tmp_path): assert args.location == tmp_path -def test_setup_logging(): - """Tests the logging setup at various verbosity levels.""" - root_logger = logging.getLogger() - # Log has a minimum of WARNING. - CSET.setup_logging(0) - assert root_logger.level == logging.WARNING - # -v - CSET.setup_logging(1) - assert root_logger.level == logging.INFO - # -vv - CSET.setup_logging(2) - assert root_logger.level == logging.DEBUG - - -def test_setup_logging_mpl_font_logs_filtered(caplog): - """Test matplotlib log messages about fonts are filtered out.""" - CSET.setup_logging(2) - logger = logging.getLogger("matplotlib.font_manager") - logger.debug("findfont: message") - logger.debug("other message") - assert len(caplog.records) == 1 - assert caplog.records[0].getMessage() == "other message" - - def test_main_no_subparser(capsys): """Appropriate error when no subparser is given.""" with pytest.raises(SystemExit) as sysexit: diff --git a/tests/test_common.py b/tests/test_common.py index 4bfd3fac3..c9cb9c945 100644 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -14,6 +14,7 @@ """Tests for common functionality across CSET.""" +import logging from collections.abc import Iterable from pathlib import Path @@ -22,6 +23,30 @@ import CSET._common as common +def test_setup_logging(): + """Tests the logging setup at various verbosity levels.""" + root_logger = logging.getLogger() + # Log has a minimum of WARNING. + common.setup_logging(0) + assert root_logger.level == logging.WARNING + # -v + common.setup_logging(1) + assert root_logger.level == logging.INFO + # -vv + common.setup_logging(2) + assert root_logger.level == logging.DEBUG + + +def test_setup_logging_mpl_font_logs_filtered(caplog): + """Test matplotlib log messages about fonts are filtered out.""" + common.setup_logging(2) + logger = logging.getLogger("matplotlib.font_manager") + logger.debug("findfont: message") + logger.debug("other message") + assert len(caplog.records) == 1 + assert caplog.records[0].getMessage() == "other message" + + def test_parse_recipe_string(): """Loading and parsing of a YAML recipe from a string.""" valid_recipe = """\ @@ -373,3 +398,19 @@ def test_is_increasing(): """Check order of strictly monotonic sequences is determined.""" assert common.is_increasing([1, 2, 3]) assert not common.is_increasing([3, 2, 1]) + + +def test_format_duration(): + """Check formatting of different durations.""" + # Integer short duration. + assert common.format_duration(1) == "1.000 seconds" + # Float short duration. + assert common.format_duration(9.876543) == "9.877 seconds" + # Integer hours duration. + assert common.format_duration(3661) == "1h1m1s" + # Float hours duration. + assert common.format_duration(3661.999) == "1h1m1s" + # Days. + assert common.format_duration(86700) == "1 day 0h5m0s" + # Many days. + assert common.format_duration(8640000) == "100 days 0h0m0s"