diff --git a/dist/ryan_functions-25.11.3.1-py3-none-any.whl b/dist/ryan_functions-25.11.7.2-py3-none-any.whl similarity index 62% rename from dist/ryan_functions-25.11.3.1-py3-none-any.whl rename to dist/ryan_functions-25.11.7.2-py3-none-any.whl index 6bea5143..08966f27 100644 Binary files a/dist/ryan_functions-25.11.3.1-py3-none-any.whl and b/dist/ryan_functions-25.11.7.2-py3-none-any.whl differ diff --git a/ryan-scripts/TUFLOW-python/POMM-mean-max-aep-dur.py b/ryan-scripts/TUFLOW-python/POMM-mean-max-aep-dur.py new file mode 100644 index 00000000..69efaa1a --- /dev/null +++ b/ryan-scripts/TUFLOW-python/POMM-mean-max-aep-dur.py @@ -0,0 +1,43 @@ +# ryan-scripts\TUFLOW-python\POMM-mean-max-aep-dur.py + +from pathlib import Path +import os + +from ryan_library.scripts.pomm_max_items import run_mean_peak_report +from ryan_library.scripts.wrapper_utils import ( + change_working_directory, + print_library_version, +) + +# Toggle to include the combined POMM sheet in the Excel export. +INCLUDE_POMM: bool = False + +# Update this tuple to restrict processing to specific PO/Location values. +# Leave empty to include every location found in the POMM files. +LOCATIONS_TO_INCLUDE: tuple[str, ...] = () + + +def main() -> None: + """Wrapper script for mean peak reporting.""" + + print_library_version() + console_log_level = "INFO" # or "DEBUG" + script_directory: Path = Path(__file__).absolute().parent + + locations_to_include: tuple[str, ...] | None = LOCATIONS_TO_INCLUDE or None + + if not change_working_directory(target_dir=script_directory): + return + run_mean_peak_report( + script_directory=script_directory, + log_level=console_log_level, + include_pomm=INCLUDE_POMM, + locations_to_include=locations_to_include, + ) + print() + print_library_version() + + +if __name__ == "__main__": + main() + os.system("PAUSE") diff --git a/ryan_library/classes/column_definitions.py b/ryan_library/classes/column_definitions.py new file mode 100644 index 00000000..3f91b59d --- /dev/null +++ b/ryan_library/classes/column_definitions.py @@ -0,0 +1,316 @@ +"""Central registry for column descriptions used across exports.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import ClassVar +from _collections_abc import Iterable, Mapping + + +@dataclass(frozen=True, slots=True) +class ColumnDefinition: + """Describe the intent of a DataFrame column.""" + + name: str + description: str + value_type: str | None = None + + +class ColumnMetadataRegistry: + """Registry providing consistent column descriptions across exports.""" + + _BASE_DEFINITIONS: ClassVar[Mapping[str, ColumnDefinition]] + _SHEET_SPECIFIC_DEFINITIONS: ClassVar[Mapping[str, Mapping[str, ColumnDefinition]]] + + def __init__( + self, + base_definitions: Mapping[str, ColumnDefinition] | None = None, + sheet_specific: Mapping[str, Mapping[str, ColumnDefinition]] | None = None, + ) -> None: + self._base_definitions: Mapping[str, ColumnDefinition] = base_definitions or {} + self._sheet_specific: Mapping[str, Mapping[str, ColumnDefinition]] = sheet_specific or {} + + def definition_for(self, column_name: str, sheet_name: str | None = None) -> ColumnDefinition: + """Return a :class:`ColumnDefinition` for ``column_name``. + + Sheet-specific definitions override base definitions. If no definition + exists a placeholder entry is returned so that missing descriptions are + easy to spot in the exported workbook. + """ + + if sheet_name and sheet_name in self._sheet_specific: + sheet_def: Mapping[str, ColumnDefinition] = self._sheet_specific[sheet_name] + if column_name in sheet_def: + return sheet_def[column_name] + + if column_name in self._base_definitions: + return self._base_definitions[column_name] + + return ColumnDefinition( + name=column_name, + description=f"TODO: add description for '{column_name}'.", + value_type=None, + ) + + def iter_definitions(self, column_names: Iterable[str], sheet_name: str | None = None) -> list[ColumnDefinition]: + """Return definitions for ``column_names`` preserving order.""" + + return [self.definition_for(column_name=col, sheet_name=sheet_name) for col in column_names] + + @classmethod + def default(cls) -> "ColumnMetadataRegistry": + """Return the default registry instance.""" + + if not hasattr(cls, "_INSTANCE"): + base_definitions: dict[str, ColumnDefinition] = { + "AbsMax": ColumnDefinition( + name="AbsMax", + description="Absolute maximum magnitude observed within the event time-series.", + value_type="float", + ), + "SignedAbsMax": ColumnDefinition( + name="SignedAbsMax", + description="Absolute maximum magnitude preserving the original sign (positive/negative).", + value_type="float", + ), + "Max": ColumnDefinition( + name="Max", + description="Maximum value in the event window.", + value_type="float", + ), + "Min": ColumnDefinition( + name="Min", + description="Minimum value in the event window.", + value_type="float", + ), + "Tmax": ColumnDefinition( + name="Tmax", + description="Time (hours) at which the maximum value occurs.", + value_type="float", + ), + "Tmin": ColumnDefinition( + name="Tmin", + description="Time (hours) at which the minimum value occurs.", + value_type="float", + ), + "Location": ColumnDefinition( + name="Location", + description="Model result location identifier from the 2d_po file.", + value_type="string", + ), + "Chan ID": ColumnDefinition( + name="Chan ID", + description="Channel identifier from the 1d_nwk file.", + value_type="string", + ), + "Type": ColumnDefinition( + name="Type", + description="2d_po quantity type (for example Flow, Water Level, Velocity).", + value_type="string", + ), + "aep_text": ColumnDefinition( + name="aep_text", + description="Annual exceedance probability label parsed from the run code (e.g. '01p').", + value_type="string", + ), + "aep_numeric": ColumnDefinition( + name="aep_numeric", + description="Annual exceedance probability represented as a numeric percentage e.g 1.", + value_type="float", + ), + "duration_text": ColumnDefinition( + name="duration_text", + description="Storm duration label parsed from the run code (e.g. '00030m').", + value_type="string", + ), + "duration_numeric": ColumnDefinition( + name="duration_numeric", + description="Storm duration represented as a numeric value (mins - tuflow style).", + value_type="float", + ), + "tp_text": ColumnDefinition( + name="tp_text", + description="Temporal pattern identifier parsed from the run code. e.g. TP07", + value_type="string", + ), + "tp_numeric": ColumnDefinition( + name="tp_numeric", + description="Temporal pattern identifier represented as a numeric value. e.g. 1", + value_type="int", + ), + "trim_runcode": ColumnDefinition( + name="trim_runcode", + description="Run code without the AEP, TP and Duration component. Used to group comparable scenarios.", + value_type="string", + ), + "internalName": ColumnDefinition( + name="internalName", + description="Full run code derived from the source file name.", + value_type="string", + ), + "file": ColumnDefinition( + name="file", + description="Name of the source CSV file that contributed the row.", + value_type="string", + ), + "path": ColumnDefinition( + name="path", + description="Absolute path to the source CSV file.", + value_type="string", + ), + "rel_path": ColumnDefinition( + name="rel_path", + description="Source CSV path relative to the working directory when processing.", + value_type="string", + ), + "directory_path": ColumnDefinition( + name="directory_path", + description="Absolute directory containing the source CSV file.", + value_type="string", + ), + "rel_directory": ColumnDefinition( + name="rel_directory", + description="Directory containing the source CSV file relative to the working directory.", + value_type="string", + ), + "R01": ColumnDefinition( + name="R01", + description="First segment of the run code.", + value_type="string", + ), + "R02": ColumnDefinition( + name="R02", + description="Second segment of the run code.", + value_type="string", + ), + "R03": ColumnDefinition( + name="R03", + description="Third segment of the run code.", + value_type="string", + ), + "R04": ColumnDefinition( + name="R04", + description="Fourth segment of the run code.", + value_type="string", + ), + "R05": ColumnDefinition( + name="R05", + description="Fifth segment of the run code.", + value_type="string", + ), + "MedianAbsMax": ColumnDefinition( + name="MedianAbsMax", + description="Absolute maxima across median of temporal patterns for the group.", + value_type="float", + ), + "median_duration": ColumnDefinition( + name="median_duration", + description="Duration associated with the MedianAbsMax.", + value_type="string", + ), + "median_TP": ColumnDefinition( + name="median_TP", + description="Temporal pattern associated with the MedianAbsMax.", + value_type="string", + ), + "mean_including_zeroes": ColumnDefinition( + name="mean_including_zeroes", + description="Mean of the statistic including zero values within the group.", + value_type="float", + ), + "mean_excluding_zeroes": ColumnDefinition( + name="mean_excluding_zeroes", + description="Mean of the statistic excluding zero values within the group.", + value_type="float", + ), + "mean_PeakFlow": ColumnDefinition( + name="mean_PeakFlow", + description="Peak flow corresponding to the mean storm for the group.", + value_type="float", + ), + "mean_Duration": ColumnDefinition( + name="mean_Duration", + description="Duration associated with the mean storm for the group.", + value_type="string", + ), + "mean_TP": ColumnDefinition( + name="mean_TP", + description="Temporal pattern associated with the mean storm for the group.", + value_type="string", + ), + "low": ColumnDefinition( + name="low", + description="Minimum statistic encountered across all temporal patterns in the group.", + value_type="float", + ), + "high": ColumnDefinition( + name="high", + description="Maximum statistic encountered across all temporal patterns in the group.", + value_type="float", + ), + "count": ColumnDefinition( + name="count", + description="Number of rows contributing to the median statistics for the selected duration.", + value_type="int", + ), + "count_bin": ColumnDefinition( + name="count_bin", + description="Total number of records considered across all durations for the group.", + value_type="int", + ), + "mean_storm_is_median_storm": ColumnDefinition( + name="mean_storm_is_median_storm", + description="Deprecated. Don't use. Indicates whether the mean storm matches the median storm selection.", + value_type="boolean", + ), + "aep_dur_bin": ColumnDefinition( + name="aep_dur_bin", + description="Count of records in the original AEP/Duration/Location/Type/run combination.", + value_type="int", + ), + "aep_bin": ColumnDefinition( + name="aep_bin", + description="Count of records in the original AEP/Location/Type/run combination.", + value_type="int", + ), + } + + sheet_specific: dict[str, dict[str, ColumnDefinition]] = { + "aep-dur-max": { + "AbsMax": ColumnDefinition( + name="AbsMax", + description="Peaks for the AEP/Duration/Location/Type/run grouping.", + value_type="float", + ), + }, + "aep-max": { + "AbsMax": ColumnDefinition( + name="AbsMax", + description="Peaks for the AEP/Location/Type/run grouping.", + value_type="float", + ), + }, + "aep-dur-med": { + "MedianAbsMax": ColumnDefinition( + name="MedianAbsMax", + description="Medians for the specific AEP/Duration/Location/Type/run grouping.", + value_type="float", + ), + }, + "aep-med-max": { + "MedianAbsMax": ColumnDefinition( + name="MedianAbsMax", + description="Medians the maximum median per AEP/Location/Type/run grouping.", + value_type="float", + ), + }, + } + + cls._INSTANCE = cls( + base_definitions=base_definitions, + sheet_specific=sheet_specific, + ) + return cls._INSTANCE + + +__all__: list[str] = ["ColumnDefinition", "ColumnMetadataRegistry"] diff --git a/ryan_library/functions/misc_functions.py b/ryan_library/functions/misc_functions.py index 674848e0..6dbfe39e 100644 --- a/ryan_library/functions/misc_functions.py +++ b/ryan_library/functions/misc_functions.py @@ -111,6 +111,7 @@ def export_dataframes( output_directory: Path | None = None, column_widths: dict[str, dict[str, float]] | None = None, auto_adjust_width: bool = True, + file_name: str | None = None, ) -> None: """Export multiple DataFrames to Excel files with optional column widths. Args: @@ -132,6 +133,11 @@ def export_dataframes( auto_adjust_width (bool, optional): If set to True, automatically adjusts the column widths based on the maximum length of the data in each column. Defaults to True. + file_name (str | None, optional): + Explicit workbook name to use when exporting a single entry from + ``export_dict``. When provided, the auto-generated timestamp prefix is + skipped and ``file_name`` is written exactly (``.xlsx`` appended when + missing). Raises: ValueError: If the number of DataFrames doesn't match the number of sheets. InvalidFileException: If there's an issue with writing the Excel file. @@ -150,13 +156,17 @@ def export_dataframes( """ datetime_string: str = datetime.now().strftime(format="%Y%m%d-%H%M") - for file_name, content in export_dict.items(): + if file_name is not None and len(export_dict) != 1: + raise ValueError("'file_name' can only be provided when exporting a single workbook.") + + for export_key, content in export_dict.items(): dataframes: list[pd.DataFrame] = content.get("dataframes", []) sheets: list[str] = content.get("sheets", []) if len(dataframes) != len(sheets): + file_label: str = file_name if file_name is not None else export_key raise ValueError( - f"For file '{file_name}', the number of dataframes ({len(dataframes)}) and sheets ({len(sheets)}) must match." + f"For file '{file_label}', the number of dataframes ({len(dataframes)}) and sheets ({len(sheets)}) must match." ) if self._exceeds_excel_limits(dataframes=dataframes): @@ -174,7 +184,10 @@ def export_dataframes( continue # Determine the export path - export_filename: str = f"{datetime_string}_{file_name}.xlsx" + if file_name is not None: + export_filename = file_name if file_name.lower().endswith(".xlsx") else f"{file_name}.xlsx" + else: + export_filename = f"{datetime_string}_{export_key}.xlsx" export_path: Path = ( (output_directory / export_filename) if output_directory else Path(export_filename) # Defaults to CWD ) @@ -221,7 +234,7 @@ def export_dataframes( column_widths=column_widths[sheet], ) - logging.info(f"Finished exporting '{file_name}' to '{export_path}'") + logging.info(f"Finished exporting '{export_filename}' to '{export_path}'") except InvalidFileException as e: logging.error(f"Failed to write to '{export_path}': {e}") raise @@ -314,6 +327,7 @@ def save_to_excel( output_directory: Path | None = None, column_widths: dict[str, float] | None = None, auto_adjust_width: bool = True, + file_name: str | None = None, ) -> None: """Export a single DataFrame to an Excel file with a single sheet and optional column widths. @@ -330,7 +344,11 @@ def save_to_excel( {"Name": 20, "Age": 10} auto_adjust_width (bool, optional): If set to True, automatically adjusts the column widths based on the - maximum length of the data in each column. Defaults to True.""" + maximum length of the data in each column. Defaults to True. + file_name (str | None, optional): + Explicit file name to use for the exported workbook. When provided the + timestamp-based prefix is skipped and ``file_name`` is written exactly as + supplied (``.xlsx`` is appended automatically when missing).""" export_dict: dict[str, ExportContent] = {file_name_prefix: {"dataframes": [data_frame], "sheets": [sheet_name]}} # Prepare column_widths in the required format @@ -343,6 +361,7 @@ def save_to_excel( output_directory=output_directory, column_widths=prepared_column_widths, auto_adjust_width=auto_adjust_width, + file_name=file_name, ) def calculate_column_widths(self, df: pd.DataFrame) -> dict[str, float]: @@ -436,6 +455,8 @@ def save_to_excel( file_name_prefix: str = "Export", sheet_name: str = "Export", output_directory: Path | None = None, + *, + file_name: str | None = None, ) -> None: """Backwards-compatible function that delegates to ExcelExporter. Args: @@ -450,4 +471,7 @@ def save_to_excel( file_name_prefix=file_name_prefix, sheet_name=sheet_name, output_directory=output_directory, + column_widths=None, + auto_adjust_width=True, + file_name=file_name, ) diff --git a/ryan_library/scripts/pomm_max_items.py b/ryan_library/scripts/pomm_max_items.py index abe9a379..c739c6a4 100644 --- a/ryan_library/scripts/pomm_max_items.py +++ b/ryan_library/scripts/pomm_max_items.py @@ -1,6 +1,6 @@ # ryan_library/scripts/pomm_max_items.py -from collections.abc import Collection +from collections.abc import Collection, Callable from loguru import logger from pathlib import Path from datetime import datetime @@ -8,6 +8,7 @@ from ryan_library.scripts.pomm_utils import ( aggregated_from_paths, + save_peak_report_mean, save_peak_report_median, ) from ryan_library.functions.loguru_helpers import setup_logger @@ -22,13 +23,14 @@ def run_peak_report(script_directory: Path | None = None) -> None: run_median_peak_report() -def run_median_peak_report( +def _run_peak_report( script_directory: Path | None = None, log_level: str = "INFO", include_pomm: bool = True, locations_to_include: Collection[str] | None = None, + save_report: Callable[..., None] | None = None, ) -> None: - """Locate and process POMM files and export median-based peak values.""" + """Core implementation for running a peak report workflow.""" setup_logger(console_log_level=log_level) logger.info(f"Current Working Directory: {Path.cwd()}") @@ -53,9 +55,45 @@ def run_median_peak_report( return timestamp: str = datetime.now().strftime(format="%Y%m%d-%H%M") - save_peak_report_median( + if save_report is None: + save_report = save_peak_report_median + save_report( aggregated_df=aggregated_df, script_directory=script_directory, timestamp=timestamp, include_pomm=include_pomm, ) + + +def run_median_peak_report( + script_directory: Path | None = None, + log_level: str = "INFO", + include_pomm: bool = True, + locations_to_include: Collection[str] | None = None, +) -> None: + """Locate and process POMM files and export median-based peak values.""" + + _run_peak_report( + script_directory=script_directory, + log_level=log_level, + include_pomm=include_pomm, + locations_to_include=locations_to_include, + save_report=save_peak_report_median, + ) + + +def run_mean_peak_report( + script_directory: Path | None = None, + log_level: str = "INFO", + include_pomm: bool = True, + locations_to_include: Collection[str] | None = None, +) -> None: + """Locate and process POMM files and export mean-based peak values.""" + + _run_peak_report( + script_directory=script_directory, + log_level=log_level, + include_pomm=include_pomm, + locations_to_include=locations_to_include, + save_report=save_peak_report_mean, + ) diff --git a/ryan_library/scripts/pomm_utils.py b/ryan_library/scripts/pomm_utils.py index 822d50ef..eb41e390 100644 --- a/ryan_library/scripts/pomm_utils.py +++ b/ryan_library/scripts/pomm_utils.py @@ -3,19 +3,22 @@ from pathlib import Path from multiprocessing import Pool -from collections.abc import Collection, Iterable +from collections.abc import Collection, Iterable, Mapping +from datetime import datetime, timezone +from importlib.metadata import PackageNotFoundError, version from typing import Any import pandas as pd from loguru import logger +from ryan_library.classes.column_definitions import ColumnMetadataRegistry from ryan_library.functions.pandas.median_calc import median_calc from ryan_library.functions.file_utils import ( find_files_parallel, is_non_zero_file, ) -from ryan_library.functions.misc_functions import calculate_pool_size +from ryan_library.functions.misc_functions import ExcelExporter, calculate_pool_size from ryan_library.processors.tuflow.base_processor import BaseProcessor from ryan_library.processors.tuflow.processor_collection import ProcessorCollection from ryan_library.classes.suffixes_and_dtypes import SuffixesConfig @@ -23,7 +26,8 @@ NAType = type(pd.NA) -NAType = type(pd.NA) + +DATA_DICTIONARY_SHEET_NAME: str = "data-dictionary" def collect_files( @@ -234,26 +238,155 @@ def save_to_excel( aggregated_df: pd.DataFrame, output_path: Path, include_pomm: bool = True, + timestamp: str | None = None, + aep_dur_sheet_name: str = "aep-dur-max", + aep_sheet_name: str = "aep-max", ) -> None: """Save peak DataFrames to an Excel file.""" logger.info(f"Output path: {output_path}") - with pd.ExcelWriter(output_path) as writer: - aep_dur_max.to_excel( - excel_writer=writer, - sheet_name="aep-dur-max", - index=False, - merge_cells=False, + registry: ColumnMetadataRegistry = ColumnMetadataRegistry.default() + metadata_rows: Mapping[str, str] = _build_metadata_rows( + timestamp=timestamp, + include_pomm=include_pomm, + aep_dur_max=aep_dur_max, + aep_max=aep_max, + aggregated_df=aggregated_df, + aep_dur_sheet_name=aep_dur_sheet_name, + aep_sheet_name=aep_sheet_name, + ) + + sheet_frames: dict[str, pd.DataFrame] = { + aep_dur_sheet_name: aep_dur_max, + aep_sheet_name: aep_max, + } + sheet_order: list[str] = [aep_dur_sheet_name, aep_sheet_name] + sheet_dfs: list[pd.DataFrame] = [aep_dur_max, aep_max] + + if include_pomm: + sheet_frames["POMM"] = aggregated_df + sheet_order.append("POMM") + sheet_dfs.append(aggregated_df) + + data_dictionary_df: pd.DataFrame = _build_data_dictionary( + registry=registry, + sheet_frames=sheet_frames, + metadata_rows=metadata_rows, + ) + + sheet_order.append(DATA_DICTIONARY_SHEET_NAME) + sheet_dfs.append(data_dictionary_df) + + ExcelExporter().export_dataframes( + export_dict={ + output_path.stem: { + "dataframes": sheet_dfs, + "sheets": sheet_order, + } + }, + output_directory=output_path.parent, + file_name=output_path.name, + ) + + logger.info(f"Peak data exported to {output_path}") + + +def _build_metadata_rows( + timestamp: str | None, + include_pomm: bool, + aep_dur_max: pd.DataFrame, + aep_max: pd.DataFrame, + aggregated_df: pd.DataFrame, + aep_dur_sheet_name: str, + aep_sheet_name: str, +) -> Mapping[str, str]: + """Return ordered metadata rows for the data dictionary sheet.""" + + generated_at: str = datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds") + metadata: dict[str, str] = { + "Generated at": generated_at, + "Filename timestamp": timestamp if timestamp else "not supplied", + "Generator module": __name__, + "ryan_functions version": _resolve_package_version("ryan_functions"), + "Include POMM sheet": "Yes" if include_pomm else "No", + f"{aep_dur_sheet_name} rows": str(len(aep_dur_max)), + f"{aep_sheet_name} rows": str(len(aep_max)), + } + + if include_pomm: + metadata["POMM rows"] = str(len(aggregated_df)) + + if "directory_path" in aggregated_df.columns: + try: + directories_series = aggregated_df["directory_path"].dropna() + except AttributeError: + directories_series = pd.Series(dtype="string") + unique_directories = sorted({str(Path(dir_value)) for dir_value in directories_series.unique()}) + if unique_directories: + metadata["Source directories"] = "\n".join(unique_directories) + + return metadata + + +def _resolve_package_version(package_name: str) -> str: + """Return the installed version for ``package_name`` if available.""" + + try: + return version(package_name) + except PackageNotFoundError: + return "unknown" + + +def _build_data_dictionary( + registry: ColumnMetadataRegistry, + sheet_frames: Mapping[str, pd.DataFrame], + metadata_rows: Mapping[str, str], +) -> pd.DataFrame: + """Build the DataFrame backing the data dictionary worksheet.""" + + rows: list[dict[str, str]] = [] + for key, value in metadata_rows.items(): + rows.append( + { + "sheet": "metadata", + "column": key, + "description": value, + "value_type": "metadata", + "pandas_dtype": "", + } ) - aep_max.to_excel(excel_writer=writer, sheet_name="aep-max", index=False, merge_cells=False) - if include_pomm: - aggregated_df.to_excel( - excel_writer=writer, - sheet_name="POMM", - index=False, - merge_cells=False, + + for sheet_name, frame in sheet_frames.items(): + columns: list[str] = list(frame.columns) + if not columns: + rows.append( + { + "sheet": sheet_name, + "column": "", + "description": "Sheet exported without any columns. Review upstream processing.", + "value_type": "", + "pandas_dtype": "", + } ) + continue - logger.info(f"Peak data exported to {output_path}") + dtype_map: dict[str, str] = {column: str(dtype) for column, dtype in frame.dtypes.items()} + definitions = registry.iter_definitions(columns, sheet_name=sheet_name) + + for column_name, definition in zip(columns, definitions): + rows.append( + { + "sheet": sheet_name, + "column": column_name, + "description": definition.description, + "value_type": definition.value_type or "", + "pandas_dtype": dtype_map.get(column_name, ""), + } + ) + + return pd.DataFrame( + rows, + columns=["sheet", "column", "description", "value_type", "pandas_dtype"], + ) def save_peak_report( @@ -276,6 +409,7 @@ def save_peak_report( aggregated_df=aggregated_df, output_path=output_path, include_pomm=include_pomm, + timestamp=timestamp, ) logger.info(f"Completed peak report export to {output_path}") logger.info(f"Completed peak report export to {output_path}") @@ -402,6 +536,33 @@ def find_aep_median_max(aep_dur_median: pd.DataFrame) -> pd.DataFrame: df["aep_bin"] = df.groupby(group_cols, observed=True)["MedianAbsMax"].transform("size") idx = df.groupby(group_cols, observed=True)["MedianAbsMax"].idxmax() aep_med_max: pd.DataFrame = df.loc[idx].reset_index(drop=True) + mean_value_columns: list[str] = [ + column + for column in ( + "mean_including_zeroes", + "mean_excluding_zeroes", + "mean_PeakFlow", + "mean_Duration", + "mean_TP", + ) + if column in aep_dur_median.columns + ] + if mean_value_columns: + mean_df: pd.DataFrame = aep_dur_median.copy() + mean_df["_mean_peakflow_numeric"] = pd.to_numeric(mean_df.get("mean_PeakFlow"), errors="coerce") # type: ignore + if mean_df["_mean_peakflow_numeric"].notna().any(): + idx_mean = ( + mean_df[mean_df["_mean_peakflow_numeric"].notna()] + .groupby(group_cols, observed=True)["_mean_peakflow_numeric"] + .idxmax() + ) + merge_columns: list[str] = mean_value_columns.copy() + if "mean_storm_is_median_storm" in aep_dur_median.columns: + merge_columns.append("mean_storm_is_median_storm") + mean_subset: pd.DataFrame = mean_df.loc[idx_mean, group_cols + merge_columns] + aep_med_max = aep_med_max.drop(columns=merge_columns, errors="ignore") + aep_med_max = aep_med_max.merge(mean_subset, on=group_cols, how="left") + mean_df = mean_df.drop(columns=["_mean_peakflow_numeric"], errors="ignore") if not aep_med_max.empty: id_columns: list[str] = ["aep_text", "duration_text", "Location", "Type", "trim_runcode"] mean_columns: list[str] = [ @@ -469,6 +630,121 @@ def find_aep_median_max(aep_dur_median: pd.DataFrame) -> pd.DataFrame: return aep_med_max +def find_aep_dur_mean(aggregated_df: pd.DataFrame) -> pd.DataFrame: + """Return mean stats for each AEP/Duration/Location/Type/RunCode group.""" + + aep_dur_median: pd.DataFrame = find_aep_dur_median(aggregated_df=aggregated_df) + if aep_dur_median.empty: + return aep_dur_median + + id_columns: list[str] = ["aep_text", "duration_text", "Location", "Type", "trim_runcode"] + mean_columns: list[str] = [ + "mean_including_zeroes", + "mean_excluding_zeroes", + "mean_PeakFlow", + "mean_Duration", + "mean_TP", + ] + info_columns: list[str] = ["low", "high", "count", "count_bin", "mean_storm_is_median_storm"] + + ordered_cols: list[str] = [] + for group in (id_columns, mean_columns): + ordered_cols.extend([col for col in group if col in aep_dur_median.columns]) + + remaining_cols: list[str] = [ + col for col in aep_dur_median.columns if col not in ordered_cols and col not in info_columns + ] + ordered_cols.extend(remaining_cols) + ordered_cols.extend([col for col in info_columns if col in aep_dur_median.columns]) + + return aep_dur_median[ordered_cols] + + +def find_aep_mean_max(aep_dur_mean: pd.DataFrame) -> pd.DataFrame: + """Return rows representing the maximum mean for each AEP/Location/Type/RunCode group.""" + + group_cols: list[str] = ["aep_text", "Location", "Type", "trim_runcode"] + try: + df: pd.DataFrame = aep_dur_mean.copy() + if "mean_PeakFlow" not in df.columns: + logger.error("'mean_PeakFlow' column not present for mean analysis. Returning empty DataFrame.") + return pd.DataFrame() + + df["_mean_peakflow_numeric"] = pd.to_numeric(df["mean_PeakFlow"], errors="coerce") + df["mean_bin"] = df.groupby(group_cols, observed=True)["_mean_peakflow_numeric"].transform("count") + + valid_df: pd.DataFrame = df[df["_mean_peakflow_numeric"].notna()] + if valid_df.empty: + logger.warning("No valid mean peak flow values found. Returning empty DataFrame.") + return pd.DataFrame() + + idx = valid_df.groupby(group_cols, observed=True)["_mean_peakflow_numeric"].idxmax() + aep_mean_max: pd.DataFrame = df.loc[idx].drop(columns=["_mean_peakflow_numeric"]).reset_index(drop=True) + + if not aep_mean_max.empty: + id_columns: list[str] = ["aep_text", "duration_text", "Location", "Type", "trim_runcode"] + mean_columns: list[str] = [ + "mean_including_zeroes", + "mean_excluding_zeroes", + "mean_PeakFlow", + "mean_Duration", + "mean_TP", + ] + info_columns: list[str] = [ + "low", + "high", + "count", + "count_bin", + "mean_storm_is_median_storm", + "mean_bin", + ] + + ordered_cols: list[str] = [] + for group in (id_columns, mean_columns): + ordered_cols.extend([col for col in group if col in aep_mean_max.columns]) + + remaining_cols: list[str] = [ + col for col in aep_mean_max.columns if col not in ordered_cols and col not in info_columns + ] + ordered_cols.extend(remaining_cols) + ordered_cols.extend([col for col in info_columns if col in aep_mean_max.columns]) + + aep_mean_max = aep_mean_max[ordered_cols] + + logger.info("Created 'aep_mean_max' DataFrame with maximum mean records for each AEP group.") + except KeyError as e: + logger.error(f"Missing expected columns for 'aep_mean_max' grouping: {e}") + aep_mean_max = pd.DataFrame() + return aep_mean_max + + +def _remove_columns_containing(df: pd.DataFrame, substrings: tuple[str, ...]) -> pd.DataFrame: + """Return ``df`` without columns that include any ``substrings``.""" + + filtered_df: pd.DataFrame = df.copy() + if filtered_df.empty: + return filtered_df + + columns_to_drop: list[str] = [ + column for column in filtered_df.columns if any(substring in column.lower() for substring in substrings) + ] + if columns_to_drop: + filtered_df = filtered_df.drop(columns=columns_to_drop, errors="ignore") + return filtered_df + + +def _median_only_columns(df: pd.DataFrame) -> pd.DataFrame: + """Return a DataFrame containing only median-focused columns.""" + + return _remove_columns_containing(df=df, substrings=("mean",)) + + +def _mean_only_columns(df: pd.DataFrame) -> pd.DataFrame: + """Return a DataFrame containing only mean-focused columns.""" + + return _remove_columns_containing(df=df, substrings=("median",)) + + def save_peak_report_median( aggregated_df: pd.DataFrame, script_directory: Path, @@ -482,13 +758,43 @@ def save_peak_report_median( output_filename: str = f"{timestamp}{suffix}" output_path: Path = script_directory / output_filename logger.info(f"Starting export of median peak report to {output_path}") - logger.info(f"Starting export of median peak report to {output_path}") + aep_dur_med_filtered: pd.DataFrame = _median_only_columns(df=aep_dur_med) + aep_med_max_filtered: pd.DataFrame = _median_only_columns(df=aep_med_max) save_to_excel( - aep_dur_max=aep_dur_med, - aep_max=aep_med_max, + aep_dur_max=aep_dur_med_filtered, + aep_max=aep_med_max_filtered, aggregated_df=aggregated_df, output_path=output_path, include_pomm=include_pomm, + timestamp=timestamp, ) logger.info(f"Completed median peak report export to {output_path}") - logger.info(f"Completed median peak report export to {output_path}") + + +def save_peak_report_mean( + aggregated_df: pd.DataFrame, + script_directory: Path, + timestamp: str, + suffix: str = "_mean_peaks.xlsx", + include_pomm: bool = True, +) -> None: + """Save mean-based peak data tables to an Excel file.""" + + aep_dur_mean: pd.DataFrame = find_aep_dur_mean(aggregated_df=aggregated_df) + aep_mean_max: pd.DataFrame = find_aep_mean_max(aep_dur_mean=aep_dur_mean) + output_filename: str = f"{timestamp}{suffix}" + output_path: Path = script_directory / output_filename + logger.info(f"Starting export of mean peak report to {output_path}") + aep_dur_mean_filtered: pd.DataFrame = _mean_only_columns(df=aep_dur_mean) + aep_mean_max_filtered: pd.DataFrame = _mean_only_columns(df=aep_mean_max) + save_to_excel( + aep_dur_max=aep_dur_mean_filtered, + aep_max=aep_mean_max_filtered, + aggregated_df=aggregated_df, + output_path=output_path, + include_pomm=include_pomm, + timestamp=timestamp, + aep_dur_sheet_name="aep-dur-mean", + aep_sheet_name="aep-mean-max", + ) + logger.info(f"Completed mean peak report export to {output_path}") diff --git a/setup.py b/setup.py index 6fe87efd..d7ecdcae 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ name="ryan_functions", # Version scheme: yy.mm.dd.release_number # Increment when publishing new wheels - version="25.11.03.1", + version="25.11.07.2", packages=find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests"]), include_package_data=True, # Include package data as specified in MANIFEST.in # package_data={"ryan_library": ["py.typed"]}, diff --git a/tests/scripts/test_pomm_peak_report.py b/tests/scripts/test_pomm_peak_report.py index 16ec5ba4..025071ab 100644 --- a/tests/scripts/test_pomm_peak_report.py +++ b/tests/scripts/test_pomm_peak_report.py @@ -11,7 +11,7 @@ find_aep_dur_median, find_aep_median_max, ) -from ryan_library.scripts.pomm_max_items import run_median_peak_report +from ryan_library.scripts.pomm_max_items import run_mean_peak_report, run_median_peak_report DATA_DIR: Path = Path(__file__).absolute().parent.parent / "test_data" / "tuflow" / "tutorials" @@ -56,6 +56,14 @@ def test_run_median_peak_report_creates_excel() -> None: assert excel_files xl = pd.ExcelFile(path_or_buffer=excel_files[0]) assert set(["aep-dur-max", "aep-max", "POMM"]).issubset(set(xl.sheet_names)) + aep_dur_df = xl.parse(sheet_name="aep-dur-max") + assert all("mean" not in col.lower() for col in aep_dur_df.columns) + if not aep_dur_df.empty: + assert "MedianAbsMax" in aep_dur_df.columns + aep_max_df = xl.parse(sheet_name="aep-max") + assert all("mean" not in col.lower() for col in aep_max_df.columns) + if not aep_max_df.empty: + assert "MedianAbsMax" in aep_max_df.columns for f in excel_files: f.unlink() @@ -70,3 +78,22 @@ def test_run_median_peak_report_skips_pomm_sheet_when_disabled() -> None: assert {"aep-dur-max", "aep-max"}.issubset(set(xl.sheet_names)) for f in excel_files: f.unlink() + + +def test_run_mean_peak_report_creates_excel_with_mean_only_columns() -> None: + src_dir: Path = DATA_DIR / "Module_01" / "results" + run_mean_peak_report(script_directory=src_dir, log_level="INFO") + excel_files: list[Path] = list(src_dir.glob("*_mean_peaks.xlsx")) + assert excel_files + xl = pd.ExcelFile(path_or_buffer=excel_files[0]) + assert {"aep-dur-mean", "aep-mean-max"}.issubset(set(xl.sheet_names)) + aep_dur_df = xl.parse(sheet_name="aep-dur-mean") + assert all("median" not in col.lower() for col in aep_dur_df.columns) + if not aep_dur_df.empty: + assert "mean_PeakFlow" in aep_dur_df.columns + aep_max_df = xl.parse(sheet_name="aep-mean-max") + assert all("median" not in col.lower() for col in aep_max_df.columns) + if not aep_max_df.empty: + assert "mean_PeakFlow" in aep_max_df.columns + for f in excel_files: + f.unlink()