diff --git a/.gitmodules b/.gitmodules index 70954703..c8018ada 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "unsorted"] path = unsorted url = https://github.com/Chain-Frost/ryan-tools-unsorted.git +[submodule "vendor/run_hy8"] + path = vendor/run_hy8 + url = https://github.com/Chain-Frost/run-hy8.git diff --git a/dist/ryan_functions-25.11.16.1-py3-none-any.whl b/dist/ryan_functions-25.11.16.1-py3-none-any.whl new file mode 100644 index 00000000..97214e88 Binary files /dev/null and b/dist/ryan_functions-25.11.16.1-py3-none-any.whl differ diff --git a/dist/ryan_functions-25.11.7.4-py3-none-any.whl b/dist/ryan_functions-25.11.7.4-py3-none-any.whl deleted file mode 100644 index c51cfd83..00000000 Binary files a/dist/ryan_functions-25.11.7.4-py3-none-any.whl and /dev/null differ diff --git a/repo-scripts/update_run_hy8.py b/repo-scripts/update_run_hy8.py new file mode 100644 index 00000000..37e1ef7f --- /dev/null +++ b/repo-scripts/update_run_hy8.py @@ -0,0 +1,121 @@ +""" +Utilities for synchronizing the run-hy8 vendored submodule. + +Running this script will pull the latest upstream commit for vendor/run_hy8 and +refresh vendor/run_hy8.UPSTREAM with the new commit hash and retrieval date. +""" + +from __future__ import annotations + +import argparse +import subprocess +from datetime import date +from pathlib import Path + +REPO_URL = "https://github.com/Chain-Frost/run-hy8.git" +SUBMODULE_PATH: Path = Path("vendor") / "run_hy8" +UPSTREAM_FILE: Path = Path("vendor") / "run_hy8.UPSTREAM" +INSTRUCTIONS = ( + "Update instructions:\n" + "1. From repository root run `git submodule update --remote vendor/run_hy8`.\n" + "2. Verify the new commit hash matches expectations and update this file.\n" + "3. Commit the submodule pointer change alongside any dependent code changes.\n" +) + + +def parse_args() -> argparse.Namespace: + """Collect CLI arguments so the script can be automated or customized.""" + parser = argparse.ArgumentParser( + description="Update the run-hy8 submodule and refresh metadata.", allow_abbrev=False + ) + parser.add_argument( + "--root", + type=Path, + default=None, + help="Repository root (defaults to auto-detection based on this script).", + ) + parser.add_argument( + "--skip-update", + action="store_true", + help="Only rewrite metadata using the currently checked-out submodule commit.", + ) + return parser.parse_args() + + +def find_repo_root(preferred: Path | None) -> Path: + """Walk upward from the guessed location until we find a `.git` directory.""" + if preferred is not None: + root_candidate: Path = preferred.resolve() + else: + root_candidate = Path(__file__).resolve().parent + + for path in (root_candidate, *root_candidate.parents): + if (path / ".git").exists(): + return path + raise RuntimeError("Unable to locate repository root (missing .git directory).") + + +def capture(args: list[str], cwd: Path, *, check: bool = True) -> str: + """Run a subprocess and return stdout, optionally surfacing non-zero exits.""" + result: subprocess.CompletedProcess[str] = subprocess.run(args=args, cwd=cwd, capture_output=True, text=True) + if check and result.returncode != 0: + raise RuntimeError( + f"Command {' '.join(args)} failed with exit code {result.returncode}:\n{result.stderr.strip()}" + ) + return result.stdout.strip() + + +def update_submodule(root: Path) -> None: + """Fetch the latest upstream commit for the vendor submodule.""" + capture( + args=["git", "submodule", "update", "--init", "--remote", str(object=SUBMODULE_PATH).replace("\\", "/")], + cwd=root, + ) + + +def current_commit(submodule_dir: Path) -> tuple[str, str]: + """Read the detached commit hash plus a friendly ref (branch or 'detached').""" + commit_hash: str = capture(args=["git", "rev-parse", "HEAD"], cwd=submodule_dir) + try: + branch: str = capture(args=["git", "symbolic-ref", "--quiet", "--short", "HEAD"], cwd=submodule_dir) + except RuntimeError: + branch = "detached" + return commit_hash, branch + + +def write_metadata(root: Path, commit_hash: str, branch: str) -> Path: + """Rewrite vendor/run_hy8.UPSTREAM with the new commit and date.""" + content: str = ( + f"Repository: {REPO_URL}\n" + f"Commit: {commit_hash} ({branch})\n" + f"Retrieved: {date.today().isoformat()}\n\n" + f"{INSTRUCTIONS}" + ) + target: Path = root / UPSTREAM_FILE + target.write_text(data=content, encoding="utf-8") + return target + + +def main() -> int: + """High-level orchestration: update submodule (optional) and metadata.""" + args: argparse.Namespace = parse_args() + root: Path = find_repo_root(preferred=args.root) + submodule_dir: Path = root / SUBMODULE_PATH + if not submodule_dir.exists(): + raise FileNotFoundError(f"Submodule directory {submodule_dir} does not exist. Has it been initialized?") + + if not args.skip_update: + print("Updating vendor/run_hy8 from upstream ...") + update_submodule(root) + + commit_hash, branch = current_commit(submodule_dir=submodule_dir) + metadata_path: Path = write_metadata(root=root, commit_hash=commit_hash, branch=branch) + + print(f"Vendor submodule now points at {commit_hash} ({branch}).") + print(f"Wrote metadata to {metadata_path.relative_to(root)}.") + print("Remember to commit the submodule pointer and metadata file together.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/ryan-scripts/TUFLOW-python/POMM-mean-max-aep-dur.py b/ryan-scripts/TUFLOW-python/POMM-mean-max-aep-dur.py index 69efaa1a..749eab16 100644 --- a/ryan-scripts/TUFLOW-python/POMM-mean-max-aep-dur.py +++ b/ryan-scripts/TUFLOW-python/POMM-mean-max-aep-dur.py @@ -3,7 +3,7 @@ from pathlib import Path import os -from ryan_library.scripts.pomm_max_items import run_mean_peak_report +from ryan_library.scripts.pomm_max_items import export_mean_peak_report from ryan_library.scripts.wrapper_utils import ( change_working_directory, print_library_version, @@ -28,7 +28,7 @@ def main() -> None: if not change_working_directory(target_dir=script_directory): return - run_mean_peak_report( + export_mean_peak_report( script_directory=script_directory, log_level=console_log_level, include_pomm=INCLUDE_POMM, diff --git a/ryan-scripts/TUFLOW-python/POMM-med-max-aep-dur.py b/ryan-scripts/TUFLOW-python/POMM-med-max-aep-dur.py index 8154bf71..3b4e300d 100644 --- a/ryan-scripts/TUFLOW-python/POMM-med-max-aep-dur.py +++ b/ryan-scripts/TUFLOW-python/POMM-med-max-aep-dur.py @@ -3,7 +3,7 @@ from pathlib import Path import os -from ryan_library.scripts.pomm_max_items import run_median_peak_report +from ryan_library.scripts.pomm_max_items import export_median_peak_report from ryan_library.scripts.wrapper_utils import ( change_working_directory, print_library_version, @@ -32,7 +32,7 @@ def main() -> None: if not change_working_directory(target_dir=script_directory): return - run_median_peak_report( + export_median_peak_report( script_directory=script_directory, log_level=console_log_level, include_pomm=INCLUDE_POMM, diff --git a/ryan-scripts/TUFLOW-python/POMM_combine.py b/ryan-scripts/TUFLOW-python/POMM_combine.py index b7441c24..cb15835e 100644 --- a/ryan-scripts/TUFLOW-python/POMM_combine.py +++ b/ryan-scripts/TUFLOW-python/POMM_combine.py @@ -1,6 +1,7 @@ # ryan-scripts\TUFLOW-python\POMM_combine.py import os from pathlib import Path +from typing import Literal from ryan_library.scripts.tuflow.pomm_combine import main_processing from ryan_library.scripts.wrapper_utils import ( change_working_directory, @@ -12,6 +13,8 @@ # Update this tuple to restrict processing to specific PO/Location values. # Leave empty to include every location found in the POMM files. LOCATIONS_TO_INCLUDE: tuple[str, ...] = () +# Choose output format: "excel", "parquet", or "both". +EXPORT_MODE: Literal["excel", "parquet", "both"] = "excel" def main() -> None: @@ -35,6 +38,7 @@ def main() -> None: include_data_types=["POMM"], console_log_level=console_log_level, locations_to_include=locations_to_include, + export_mode=EXPORT_MODE, ) print() print_library_version() diff --git a/ryan-scripts/TUFLOW-python/TUFLOW_Culvert_Maximums.py b/ryan-scripts/TUFLOW-python/TUFLOW_Culvert_Maximums.py index a7f95317..737d7f90 100644 --- a/ryan-scripts/TUFLOW-python/TUFLOW_Culvert_Maximums.py +++ b/ryan-scripts/TUFLOW-python/TUFLOW_Culvert_Maximums.py @@ -16,15 +16,14 @@ def main() -> None: print_library_version() console_log_level = "DEBUG" # or "INFO" script_dir: Path = Path(__file__).absolute().parent - # script_dir = Path( - # r"E:\Library\Automation\ryan-tools\tests\test_data\tuflow\tutorials\Module_03" - # ) + script_dir = Path(r"E:\Library\Automation\ryan-tools\tests\test_data\tuflow\tutorials\Module_11") if not change_working_directory(target_dir=script_dir): return main_processing( paths_to_process=[script_dir], - include_data_types=["Nmx", "Cmx", "Chan", "ccA"], + # include_data_types=["Nmx", "Cmx", "Chan", "ccA", "RLL_Qmx"], + include_data_types=["RLL_Qmx"], console_log_level=console_log_level, output_parquet=False, ) diff --git a/ryan-scripts/TUFLOW-python/generate_2d_po_label_list.py b/ryan-scripts/TUFLOW-python/generate_2d_po_label_list.py new file mode 100644 index 00000000..f127da00 --- /dev/null +++ b/ryan-scripts/TUFLOW-python/generate_2d_po_label_list.py @@ -0,0 +1,213 @@ +# ryan-scripts\TUFLOW-python\generate_2d_po_label_list.py +"""Utility to convert a 2d_po GeoPackage into a Python list of PO labels. + +Example: + python generate_2d_po_label_list.py path/to/2d_po_file.gpkg +""" + + +import argparse +import sqlite3 +from dataclasses import dataclass +from collections.abc import Iterable, Sequence +from pathlib import Path +from typing import Any + +# ------------------------------------------------------------------------------ +# Configuration section +# ------------------------------------------------------------------------------ +# If you frequently run this script against the same GeoPackage you can hardcode +# the file path below (either as a Path instance or a raw string). Leave it as +# None to rely exclusively on argparse. When both a hardcoded path and a CLI +# argument are supplied, the CLI argument wins. +HARDCODED_GPKG_PATH: Path | str | None = r"E:\Library\Automation\ryan-tools\2d_po_PilaruWest_01_L.gpkg" + + +@dataclass(slots=True) +class RuntimeOptions: + """Container for the fully-resolved runtime options.""" + + gpkg_path: Path + layer: str | None + + +def parse_and_resolve_options(argv: Sequence[str] | None = None) -> RuntimeOptions: + """Parse CLI arguments (if provided) and merge them with the hardcoded defaults.""" + + parser = argparse.ArgumentParser( + description=( + "Read a 2d_po GeoPackage and emit a Python list of PO labels, " "including inline comments when provided." + ) + ) + parser.add_argument( + "gpkg_path", + nargs="?", + type=Path, + help="Path to the 2d_po GeoPackage (e.g., 2d_po_Example_L.gpkg).", + ) + parser.add_argument( + "--layer", + help=( + "Optional layer/table name inside the GeoPackage. " + "If omitted the script auto-detects the first 2d_po layer." + ), + ) + args: argparse.Namespace = parser.parse_args(argv) + + gpkg_candidate: Path | str | None + if args.gpkg_path is not None: + gpkg_candidate = args.gpkg_path + else: + gpkg_candidate = HARDCODED_GPKG_PATH + + if gpkg_candidate is None: + parser.error("No GeoPackage supplied. Provide a path argument or set HARDCODED_GPKG_PATH.") + + gpkg_path: Path = Path(gpkg_candidate).expanduser().resolve() + return RuntimeOptions(gpkg_path=gpkg_path, layer=args.layer) + + +def main() -> None: + """Entry point used by both CLI execution and IDE run configurations.""" + + options: RuntimeOptions = parse_and_resolve_options() + if not options.gpkg_path.exists(): + raise SystemExit(f"GeoPackage not found: {options.gpkg_path}") + + try: + entries: list[tuple[str, str | None]] = load_label_entries( + gpkg_path=options.gpkg_path, preferred_layer=options.layer + ) + except ValueError as exc: + raise SystemExit(str(exc)) from exc + + output: str = format_as_python_list(entries) + output_path: Path = options.gpkg_path.with_suffix(".txt") + write_output_file(output_path=output_path, contents=output) + print(output) + print(f"\nList written to {output_path}") + + +def load_label_entries(gpkg_path: Path, preferred_layer: str | None = None) -> list[tuple[str, str | None]]: + """Open the GeoPackage, determine the relevant layer, and read label/comment pairs.""" + + # Connect directly to the GeoPackage; it's just a SQLite database under the hood. + with sqlite3.connect(database=gpkg_path.as_posix()) as connection: + cursor: sqlite3.Cursor = connection.cursor() + table_name, column_lookup = resolve_table_and_columns(cursor=cursor, preferred_layer=preferred_layer) + + label_column: str | None = column_lookup.get("label") + if label_column is None: + raise ValueError(f'Table "{table_name}" does not contain a "Label" column.') + comment_column: str | None = column_lookup.get("comment") + order_column: str = column_lookup.get("fid", label_column) + + quoted_table: str = quote_identifier(table_name) + quoted_label: str = quote_identifier(label_column) + quoted_comment: str = quote_identifier(comment_column) if comment_column is not None else "NULL" + quoted_order: str = quote_identifier(order_column) + + # Build a read-only query that orders the labels by fid (or label fallback). + sql: str = ( + f"SELECT {quoted_label} AS label_value, " + f"{quoted_comment} AS comment_value " + f"FROM {quoted_table} ORDER BY {quoted_order};" + ) + rows: list[Any] = cursor.execute(sql).fetchall() + + entries: list[tuple[str, str | None]] = [] + for label_value, comment_value in rows: + if label_value is None: + continue + + # Clean up whitespace so the output list reads nicely. + label_text: str = str(label_value).strip() + if not label_text: + continue + + comment_text: str | None + if comment_value is None: + comment_text = None + else: + comment_text = sanitize_comment(str(comment_value)) + + entries.append((label_text, comment_text)) + + if not entries: + raise ValueError(f'No label values found in table "{table_name}".') + return entries + + +def resolve_table_and_columns(cursor: sqlite3.Cursor, preferred_layer: str | None) -> tuple[str, dict[str, str]]: + """Pick a layer that exposes a Label column and collect its schema metadata.""" + + tables_to_check: list[str] = build_table_priority_list(cursor=cursor, preferred_layer=preferred_layer) + for table in tables_to_check: + column_lookup: dict[str, str] = get_column_lookup(cursor=cursor, table_name=table) + if column_lookup.get("label"): + return table, column_lookup + raise ValueError("Could not locate a layer with a 'Label' column.") + + +def build_table_priority_list(cursor: sqlite3.Cursor, preferred_layer: str | None) -> list[str]: + """Return candidate layers, prioritizing those that look like 2d_po layers.""" + + layer_rows: list[Any] = cursor.execute( + "SELECT table_name FROM gpkg_contents WHERE data_type = 'features';" + ).fetchall() + discovered_layers: list[Any] = [row[0] for row in layer_rows] + + if preferred_layer: + return [preferred_layer] + + def sort_key(name: str) -> tuple[int, str]: + lowered: str = name.lower() + return (0 if lowered.startswith("2d_po") else 1, lowered) + + return sorted(discovered_layers, key=sort_key) + + +def get_column_lookup(cursor: sqlite3.Cursor, table_name: str) -> dict[str, str]: + """Fetch column names for the table and normalize them for case-insensitive use.""" + + quoted_table: str = quote_identifier(table_name) + pragma_rows: list[Any] = cursor.execute(f"PRAGMA table_info({quoted_table});").fetchall() + return {row[1].lower(): row[1] for row in pragma_rows} + + +def sanitize_comment(comment: str) -> str | None: + """Normalize whitespace so comments become single-line Python-friendly strings.""" + + normalized: str = comment.replace("\r\n", "\n").replace("\r", " ").replace("\n", " ").strip() + return normalized or None + + +def format_as_python_list(entries: Iterable[tuple[str, str | None]]) -> str: + """Format the label/comment pairs as a copy/paste-friendly Python list literal.""" + + lines: list[str] = ["["] + for label, comment in entries: + # Represent labels with repr() so any embedded quotes are automatically escaped. + line: str = f" {repr(label)}," + if comment: + line += f" # {comment}" + lines.append(line) + lines.append("]") + return "\n".join(lines) + + +def quote_identifier(identifier: str) -> str: + """Quote SQLite identifiers to avoid syntax errors or SQL injection surprises.""" + + escaped = identifier.replace('"', '""') + return f'"{escaped}"' + + +def write_output_file(*, output_path: Path, contents: str) -> None: + """Persist the rendered list next to the source GeoPackage for reuse.""" + + output_path.write_text(f"{contents}\n", encoding="utf-8") + + +if __name__ == "__main__": + main() diff --git a/ryan_library/classes/column_definitions.py b/ryan_library/classes/column_definitions.py index 3f91b59d..853221e1 100644 --- a/ryan_library/classes/column_definitions.py +++ b/ryan_library/classes/column_definitions.py @@ -68,6 +68,11 @@ def default(cls) -> "ColumnMetadataRegistry": description="Absolute maximum magnitude observed within the event time-series.", value_type="float", ), + "AbsValue": ColumnDefinition( + name="AbsValue", + description="Absolute value of the 2d_po timeseries result (ignoring sign).", + value_type="float", + ), "SignedAbsMax": ColumnDefinition( name="SignedAbsMax", description="Absolute maximum magnitude preserving the original sign (positive/negative).", @@ -83,6 +88,16 @@ def default(cls) -> "ColumnMetadataRegistry": description="Minimum value in the event window.", value_type="float", ), + "Area_Culv": ColumnDefinition( + name="Area_Culv", + description="Full cross-sectional area of the culvert barrel reported by ccA outputs.", + value_type="float", + ), + "Area_Max": ColumnDefinition( + name="Area_Max", + description="Maximum wetted area recorded for the culvert during the event (ccA output).", + value_type="float", + ), "Tmax": ColumnDefinition( name="Tmax", description="Time (hours) at which the maximum value occurs.", @@ -103,11 +118,118 @@ def default(cls) -> "ColumnMetadataRegistry": description="Channel identifier from the 1d_nwk file.", value_type="string", ), + "ID": ColumnDefinition( + name="ID", + description="Reporting Location Line identifier from the RLL outputs.", + value_type="string", + ), + "Location ID": ColumnDefinition( + name="Location ID", + description=( + "Normalized location identifier used when grouping maximums across different source types." + ), + value_type="string", + ), + "Time": ColumnDefinition( + name="Time", + description="Simulation time (hours) corresponding to the recorded statistic.", + value_type="float", + ), + "Q": ColumnDefinition( + name="Q", + description="Discharge/flow rate reported for the location.", + value_type="float", + ), + "V": ColumnDefinition( + name="V", + description="Velocity reported for the location.", + value_type="float", + ), "Type": ColumnDefinition( name="Type", description="2d_po quantity type (for example Flow, Water Level, Velocity).", value_type="string", ), + "DS_h": ColumnDefinition( + name="DS_h", + description="Downstream water level extracted from maximum-result CSVs (usually metres AHD).", + value_type="float", + ), + "US_h": ColumnDefinition( + name="US_h", + description="Upstream water level extracted from maximum-result CSVs (usually metres AHD).", + value_type="float", + ), + "DS_H": ColumnDefinition( + name="DS_H", + description="Downstream water level taken from the 1d_H timeseries output.", + value_type="float", + ), + "US_H": ColumnDefinition( + name="US_H", + description="Upstream water level taken from the 1d_H timeseries output.", + value_type="float", + ), + "US Invert": ColumnDefinition( + name="US Invert", + description="Invert elevation at the upstream end of the culvert/channel.", + value_type="float", + ), + "DS Invert": ColumnDefinition( + name="DS Invert", + description="Invert elevation at the downstream end of the culvert/channel.", + value_type="float", + ), + "US Obvert": ColumnDefinition( + name="US Obvert", + description="Crown/obvert elevation at the upstream end of the culvert/channel.", + value_type="float", + ), + "Height": ColumnDefinition( + name="Height", + description="Barrel height or diameter used for the culvert/channel definition.", + value_type="float", + ), + "Length": ColumnDefinition( + name="Length", + description="Culvert or channel length taken from the 1d_Chan file.", + value_type="float", + ), + "n or Cd": ColumnDefinition( + name="n or Cd", + description="Manning's n roughness (for open channels) or discharge coefficient (for culverts).", + value_type="float", + ), + "pSlope": ColumnDefinition( + name="pSlope", + description="Design slope for the structure (percent).", + value_type="float", + ), + "pBlockage": ColumnDefinition( + name="pBlockage", + description="Blockage percentage applied to the culvert/barrel.", + value_type="float", + ), + "Flags": ColumnDefinition( + name="Flags", + description="Source flags describing channel or culvert configuration issues.", + value_type="string", + ), + "H": ColumnDefinition( + name="H", + description="Water level reported for the location at the time of the maximum flow event.", + value_type="float", + ), + "dQ": ColumnDefinition( + name="dQ", + description="Differential flow reported by the RLL maximum output.", + value_type="float", + ), + "Time dQ": ColumnDefinition( + name="Time dQ", + description="Time associated with the differential flow reported by the RLL output.", + value_type="float", + ), "aep_text": ColumnDefinition( name="aep_text", description="Annual exceedance probability label parsed from the run code (e.g. '01p').", @@ -198,6 +320,51 @@ def default(cls) -> "ColumnMetadataRegistry": description="Fifth segment of the run code.", value_type="string", ), + "Value": ColumnDefinition( + name="Value", + description="Raw 2d_po metric captured at the reporting location.", + value_type="float", + ), + "pFull_Max": ColumnDefinition( + name="pFull_Max", + description="Maximum percent full reported by ccA.", + value_type="float", + ), + "pTime_Full": ColumnDefinition( + name="pTime_Full", + description="Time (hours) at which ccA reports the culvert as maximum percent full.", + value_type="float", + ), + "Dur_Full": ColumnDefinition( + name="Dur_Full", + description="Duration (hours) the culvert remained full according to ccA.", + value_type="float", + ), + "Dur_10pFull": ColumnDefinition( + name="Dur_10pFull", + description="Duration (hours) the culvert remained above 10 percent full (ccA output).", + value_type="float", + ), + "Sur_CD": ColumnDefinition( + name="Sur_CD", + description="Surcharge coefficient/depth reported by ccA when the culvert is surcharged.", + value_type="float", + ), + "Dur_Sur": ColumnDefinition( + name="Dur_Sur", + description="Duration (hours) the culvert experienced surcharge conditions.", + value_type="float", + ), + "pTime_Sur": ColumnDefinition( + name="pTime_Sur", + description="Time (hours) at which the surcharge condition peaked.", + value_type="float", + ), + "TFirst_Sur": ColumnDefinition( + name="TFirst_Sur", + description="Time (hours) when the surcharge condition first began.", + value_type="float", + ), "MedianAbsMax": ColumnDefinition( name="MedianAbsMax", description="Absolute maxima across median of temporal patterns for the group.", @@ -225,17 +392,20 @@ def default(cls) -> "ColumnMetadataRegistry": ), "mean_PeakFlow": ColumnDefinition( name="mean_PeakFlow", - description="Peak flow corresponding to the mean storm for the group.", + description=( + "Peak flow from the event whose statistic is nearest to the group's arithmetic mean; " + "not an averaged peak flow. Uses mean_including_zeroes." + ), value_type="float", ), "mean_Duration": ColumnDefinition( name="mean_Duration", - description="Duration associated with the mean storm for the group.", + description=("Duration taken from the same nearest-to-mean event used for mean_PeakFlow."), value_type="string", ), "mean_TP": ColumnDefinition( name="mean_TP", - description="Temporal pattern associated with the mean storm for the group.", + description=("Temporal pattern taken from the same nearest-to-mean event used for mean_PeakFlow."), value_type="string", ), "low": ColumnDefinition( @@ -250,7 +420,7 @@ def default(cls) -> "ColumnMetadataRegistry": ), "count": ColumnDefinition( name="count", - description="Number of rows contributing to the median statistics for the selected duration.", + description="Number of rows contributing to the mean/median statistics for the selected duration.", value_type="int", ), "count_bin": ColumnDefinition( diff --git a/ryan_library/classes/tuflow_results_validation_and_datatypes.json b/ryan_library/classes/tuflow_results_validation_and_datatypes.json index ff3bc3ff..a2e53208 100644 --- a/ryan_library/classes/tuflow_results_validation_and_datatypes.json +++ b/ryan_library/classes/tuflow_results_validation_and_datatypes.json @@ -144,6 +144,31 @@ } } }, + "RLL_Qmx": { + "processor": "RLLQmxProcessor", + "suffixes": [ + "_RLL_Qmx.csv" + ], + "output_columns": { + "ID": "string", + "Time": "float", + "Q": "float", + "dQ": "float", + "Time dQ": "float", + "H": "float" + }, + "processingParts": { + "dataformat": "Maximums", + "columns_to_use": { + "ID": "string", + "Time Qmax": "float", + "Qmax": "float", + "dQmax": "float", + "Time dQmax": "float", + "H": "float" + } + } + }, "Q": { "processor": "QProcessor", "suffixes": [ diff --git a/ryan_library/classes/tuflow_string_classes.py b/ryan_library/classes/tuflow_string_classes.py index a69fb0ec..1af77f1f 100644 --- a/ryan_library/classes/tuflow_string_classes.py +++ b/ryan_library/classes/tuflow_string_classes.py @@ -1,4 +1,5 @@ # ryan_library\classes\tuflow_string_classes.py +import math import re from pathlib import Path from loguru import logger @@ -64,7 +65,9 @@ class TuflowStringParser: # Precompile regex patterns for efficiency # ``TP_PATTERN`` finds patterns like ``TP01`` that are surrounded by ``_`` or ``+`` (or appear at the edges). - TP_PATTERN: re.Pattern[str] = re.compile(pattern=r"(?:[_+]|^)TP(\d{2})(?:[_+]|$)", flags=re.IGNORECASE) + # Keep the core ``TP##``/``TP#`` style bounded by delimiters so we do not + # accidentally read other numbers embedded within filenames. + TP_PATTERN: re.Pattern[str] = re.compile(pattern=r"(?:[_+]|^)TP(\d{1,2})(?:[_+]|$)", flags=re.IGNORECASE) # ``DURATION_PATTERN`` captures 3-5 digits followed by ``m`` (e.g. ``00360m`` or ``360m``). DURATION_PATTERN: re.Pattern[str] = re.compile(pattern=r"(?:[_+]|^)(\d{3,5})[mM](?:[_+]|$)", flags=re.IGNORECASE) # ``AEP_PATTERN`` matches values like ``01.00p`` or ``5p`` and reads the numeric portion before ``p``. @@ -72,8 +75,14 @@ class TuflowStringParser: pattern=r"(?:^|[_+])(?P(?P\d+(?:\.\d{1,2})?)p|(?PPMPF|PMP))(?=$|[_+])", flags=re.IGNORECASE, ) + # ``GENERIC_TP_PATTERN`` handles free-form strings such as "tp 3". + GENERIC_TP_PATTERN: re.Pattern[str] = re.compile(pattern=r"TP?\s*(\d{1,2})", flags=re.IGNORECASE) + HUMAN_DURATION_PATTERN: re.Pattern[str] = re.compile( + pattern=r"(?P\d+(?:\.\d+)?)\s*(?Phours?|hrs?|hr|h|minutes?|mins?|min|m)(?=$|[^A-Za-z0-9])", + flags=re.IGNORECASE, + ) - def __init__(self, file_path: Path | str): + def __init__(self, file_path: Path | str) -> None: """Initialize the TuflowStringParser with the given file path. Args: file_path (Path | str): Path to the file to be processed.""" @@ -89,6 +98,103 @@ def __init__(self, file_path: Path | str): self.aep: RunCodeComponent | None = self.parse_aep(string=self.clean_run_code) self.trim_run_code: str = self.trim_the_run_code() + @staticmethod + def _coerce_text(value: object) -> str | None: + """Convert ``value`` into a cleaned string or ``None`` if it is effectively empty.""" + + if value is None: + return None + try: + if math.isnan(value): # type: ignore[arg-type] + return None + except (TypeError, ValueError): + pass + text: str = str(value).strip() + if not text: + return None + lowered: str = text.lower() + if lowered in {"nan", "none", ""}: + return None + return text + + @classmethod + def normalize_tp_label(cls, value: object) -> str | None: + """Return a canonical ``TP##`` label extracted from ``value`` when possible.""" + + text: str | None = cls._coerce_text(value) + if text is None: + return None + + # Normalise ``+`` to ``_`` so ``TP##`` detection works for both + # delimiter styles used in TUFLOW artefacts. + normalized: str = text.replace("+", "_") + match: re.Match[str] | None = cls.TP_PATTERN.search(normalized.upper()) + digits: str | None = match.group(1) if match else None + if digits is None: + generic_match: re.Match[str] | None = cls.GENERIC_TP_PATTERN.search(text) + if generic_match: + digits = generic_match.group(1) + if digits is None: + fallback: re.Match[str] | None = re.search(r"\b(\d{1,2})\b", text) + digits = fallback.group(1) if fallback else None + if digits is None: + return None + try: + numeric = int(digits) + except ValueError: + logger.debug(f"Unable to parse TP digits from value {text!r}") + return None + return f"TP{numeric:02d}" + + @classmethod + def normalize_duration_value(cls, value: object) -> float: + """Return the numeric component of a duration string or ``nan`` when parsing fails.""" + + text: str | None = cls._coerce_text(value) + if text is None: + return float("nan") + + # ``DURATION_PATTERN`` covers the common ``00120m`` style. + normalized: str = text.replace("+", "_") + match: re.Match[str] | None = cls.DURATION_PATTERN.search(normalized) + if match: + return float(match.group(1)) + + for human_match in cls.HUMAN_DURATION_PATTERN.finditer(normalized): + minutes: float | None = cls._minutes_from_human_match(human_match) + if minutes is not None: + return minutes + + fallback: re.Match[str] | None = re.search(r"\d{3,}", normalized) + if fallback: + return float(fallback.group(0)) + + logger.debug(f"Unable to parse duration from value {text!r}") + return float("nan") + + @staticmethod + def _minutes_from_human_match(match: re.Match[str]) -> float | None: + """Convert a ``HUMAN_DURATION_PATTERN`` match into minutes.""" + + value_str: str = match.group("value") + unit: str = match.group("unit").lower() + digits_only: str = value_str.replace(".", "") + if unit == "m" and len(digits_only) <= 2: + # Short bare ``m`` tokens (e.g. "5m") represent grid resolutions + # rather than durations, so ignore them and keep searching. + return None + + try: + magnitude = float(value_str) + except ValueError: + return None + + if unit in {"hours", "hour", "hrs", "hr", "h"}: + return magnitude * 60.0 + if unit in {"minutes", "minute", "mins", "min", "m"}: + return magnitude + return None + @staticmethod def clean_runcode(run_code: str) -> str: """Replace '+' with '_' to standardize delimiters. @@ -133,7 +239,7 @@ def extract_raw_run_code(self) -> str: """ for suffix in self.suffixes.keys(): if self.file_name.lower().endswith(suffix.lower()): - run_code = self.file_name[: -len(suffix)] + run_code: str = self.file_name[: -len(suffix)] logger.debug(f"Extracted raw run code '{run_code}' from file name '{self.file_name}'") return run_code logger.debug(f"No suffix matched; using entire file name '{self.file_name}' as run code") @@ -150,8 +256,8 @@ def extract_run_code_parts(clean_run_code: str) -> dict[str, str]: Returns: dict[str, str]: Dictionary of run code parts with keys like 'R01', 'R02', etc. """ - run_code_parts = clean_run_code.split("_") - r_dict = {f"R{index:02}": part for index, part in enumerate(run_code_parts, start=1)} + run_code_parts: list[str] = clean_run_code.split("_") + r_dict: dict[str, str] = {f"R{index:02}": part for index, part in enumerate(run_code_parts, start=1)} logger.debug(f"Extracted run code parts: {r_dict}") return r_dict @@ -165,9 +271,9 @@ def parse_tp(self, string: str) -> RunCodeComponent | None: Returns: Optional[RunCodeComponent]: Parsed TP component or None if not found. """ - match = self.TP_PATTERN.search(string) + match: re.Match[str] | None = self.TP_PATTERN.search(string) if match: - tp_value = match.group(1) + tp_value: str = match.group(1) logger.debug(f"Parsed TP value: {tp_value}") return RunCodeComponent(raw_value=tp_value, component_type="TP") logger.debug("No TP component found") @@ -183,11 +289,22 @@ def parse_duration(self, string: str) -> RunCodeComponent | None: Returns: Optional[RunCodeComponent]: Parsed Duration component or None if not found. """ - match = self.DURATION_PATTERN.search(string) + # Apply the same normalisation rules as ``normalize_duration_value`` so + # both parsing paths stay in sync. + normalized: str = string.replace("+", "_") + match: re.Match[str] | None = self.DURATION_PATTERN.search(normalized) if match: duration_value = match.group(1) logger.debug(f"Parsed Duration value: {duration_value}") return RunCodeComponent(raw_value=duration_value, component_type="Duration") + + for human_match in self.HUMAN_DURATION_PATTERN.finditer(normalized): + minutes: float | None = self._minutes_from_human_match(human_match) + if minutes is None: + continue + minutes_str: str = str(int(minutes)) if minutes.is_integer() else str(minutes) + logger.debug(f"Parsed Duration value from human-readable token: {minutes_str}") + return RunCodeComponent(raw_value=minutes_str, component_type="Duration") logger.debug("No Duration component found") return None @@ -219,9 +336,11 @@ def trim_the_run_code(self) -> str: Returns: str: Cleaned run code. """ - components_to_remove = {str(component).lower() for component in [self.aep, self.duration, self.tp] if component} + components_to_remove: set[str] = { + str(component).lower() for component in [self.aep, self.duration, self.tp] if component + } logger.debug(f"Components to remove: {components_to_remove}") - trimmed_runcode = "_".join( + trimmed_runcode: str = "_".join( part for part in self.clean_run_code.split("_") if part.lower() not in components_to_remove ) logger.debug(f"Trimmed run code: {trimmed_runcode}") diff --git a/ryan_library/functions/file_utils.py b/ryan_library/functions/file_utils.py index 8db61bc9..1b44c261 100644 --- a/ryan_library/functions/file_utils.py +++ b/ryan_library/functions/file_utils.py @@ -2,11 +2,11 @@ from collections.abc import Generator from pathlib import Path -from concurrent.futures import ThreadPoolExecutor import fnmatch +import re from loguru import logger import threading -from queue import Queue +from queue import Empty, Queue def find_files_parallel( @@ -43,18 +43,35 @@ def find_files_parallel( match any of the exclusion patterns. """ - # Normalize 'patterns' and 'excludes' to lists for consistent processing - patterns = [patterns] if isinstance(patterns, str) else patterns - excludes = [excludes] if isinstance(excludes, str) else excludes or [] + def _normalize_globs(globs: str | list[str] | None) -> list[str]: + """Ensure downstream logic always receives an iterable of glob strings.""" + if globs is None: + return [] + if isinstance(globs, str): + return [globs] + return list(globs) - # Convert all patterns to lowercase for case-insensitive matching - patterns = [p.lower() for p in patterns] - excludes = [e.lower() for e in excludes] + def _compile_patterns(globs: list[str]) -> list[re.Pattern[str]]: + """Translate glob syntax into compiled regex objects once up front.""" + compiled: list[re.Pattern[str]] = [] + for raw_pattern in globs: + compiled.append(re.compile(fnmatch.translate(raw_pattern), re.IGNORECASE)) + return compiled + + def _matches_any(name: str, compiled: list[re.Pattern[str]]) -> bool: + """Return True when the provided filename matches any compiled glob.""" + return any(pattern.match(name) for pattern in compiled) + + include_patterns: list[str] = _normalize_globs(patterns) + exclude_patterns: list[str] = _normalize_globs(excludes) + + compiled_includes: list[re.Pattern[str]] = _compile_patterns(include_patterns) + compiled_excludes: list[re.Pattern[str]] = _compile_patterns(exclude_patterns) logger.info(f"Root directories: {root_dirs}") - logger.info(f"Search patterns: {patterns}") - if excludes: - logger.info(f"Exclude patterns: {excludes}") + logger.info(f"Search patterns: {include_patterns}") + if exclude_patterns: + logger.info(f"Exclude patterns: {exclude_patterns}") # Obtain the current working directory to calculate relative paths later # ``absolute`` preserves drive-letter vs UNC style while ensuring an @@ -71,8 +88,9 @@ def find_files_parallel( visited_lock = threading.Lock() visited_dirs: set[Path] = set() - # Queue for directories to process + # Queue for directories to process; the stop event lets workers exit once traversal finishes. dir_queue: Queue[tuple[Path, Path]] = Queue() + stop_event = threading.Event() # Initialize the queue with absolute root directories without converting # between drive letters and UNC paths. @@ -87,39 +105,32 @@ def find_files_parallel( dir_queue.put((abs_root, abs_root)) # (current_path, root_dir) except FileNotFoundError: logger.error(f"Root directory does not exist: {root_dir}") - except Exception as e: - logger.error(f"Error resolving root directory {root_dir}: {e}") + except Exception as exc: + logger.error(f"Error resolving root directory {root_dir}: {exc}") def worker() -> None: - while True: + """Continuously pull directories off the queue, scanning files and enqueueing child folders.""" + while not stop_event.is_set(): try: - current_path, root_dir = dir_queue.get(timeout=1) - # Timeout to allow graceful exit - except: - # Queue is empty or timeout reached - return + current_path, root_dir = dir_queue.get(timeout=0.2) + except Empty: + continue try: + # Keep per-thread results local so we only touch global locks when we have matches. local_matched: list[Path] = [] local_folders_with_matches: set[Path] = set() - files_searched = 0 - folders_searched = 0 - # Use non-recursive glob to avoid overlapping traversals try: iterator: Generator[Path, None, None] = current_path.iterdir() except PermissionError: logger.error(f"Permission denied accessing directory: {current_path}") - dir_queue.task_done() continue - except Exception as e: - logger.error(f"Error accessing directory {current_path}: {e}") - dir_queue.task_done() + except Exception as exc: + logger.error(f"Error accessing directory {current_path}: {exc}") continue for subpath in iterator: if subpath.is_dir(): - folders_searched += 1 - if recursive_search and report_level: try: relative_path = subpath.relative_to(root_dir) @@ -132,7 +143,7 @@ def worker() -> None: display_path: Path = subpath.relative_to(current_dir) except ValueError: display_path = subpath.absolute() - logger.info(f"Searching (depth {depth}): {display_path}") + logger.debug(f"Searching (depth {depth}): {display_path}") if recursive_search: try: @@ -145,10 +156,12 @@ def worker() -> None: except PermissionError: logger.error(f"Permission denied accessing subdirectory: {subpath}") continue - except Exception as e: - logger.error(f"Error resolving subdirectory {subpath}: {e}") + except Exception as exc: + logger.error(f"Error resolving subdirectory {subpath}: {exc}") continue + # Record directories we've seen so we do not process the same path twice + # when multiple roots overlap or symlinks point back to an ancestor. with visited_lock: if resolved_subpath in visited_dirs: continue @@ -157,31 +170,32 @@ def worker() -> None: dir_queue.put((resolved_subpath, root_dir)) continue - files_searched += 1 - filename = subpath.name.lower() + filename: str = subpath.name - # Inclusion Check - if any(fnmatch.fnmatch(filename, pattern) for pattern in patterns): - # Exclusion Check - if not any(fnmatch.fnmatch(filename, exclude) for exclude in excludes): - try: - matched_file: Path = subpath.absolute() - display_path = matched_file - try: - display_path = matched_file.relative_to(current_dir) - except ValueError: - pass - logger.debug(f"Matched file: {display_path}") - if not matched_file.exists(): - raise FileNotFoundError - local_matched.append(matched_file) - local_folders_with_matches.add(matched_file.parent) - except FileNotFoundError: - logger.warning(f"File does not exist (might have been moved): {subpath}") - except PermissionError: - logger.error(f"Permission denied accessing file: {subpath}") - except Exception as e: - logger.error(f"Error resolving file {subpath}: {e}") + if not _matches_any(name=filename, compiled=compiled_includes): + continue + + if compiled_excludes and _matches_any(name=filename, compiled=compiled_excludes): + continue + + try: + matched_file: Path = subpath.absolute() + display_path = matched_file + try: + display_path = matched_file.relative_to(current_dir) + except ValueError: + pass + logger.debug(f"Matched file: {display_path}") + if not matched_file.exists(): + raise FileNotFoundError + local_matched.append(matched_file) + local_folders_with_matches.add(matched_file.parent) + except FileNotFoundError: + logger.warning(f"File does not exist (might have been moved): {subpath}") + except PermissionError: + logger.error(f"Permission denied accessing file: {subpath}") + except Exception as exc: + logger.error(f"Error resolving file {subpath}: {exc}") # Safely update the global matched_files list if local_matched: @@ -192,28 +206,27 @@ def worker() -> None: with folders_with_matches_lock: folders_with_matches.update(local_folders_with_matches) - if len(local_matched) > 0: - logger.info(f"Found {len(local_matched)} files in {current_path}") - except Exception as e: - logger.error(f"Unexpected error processing {current_path}: {e}") + if local_matched: + logger.debug(f"Found {len(local_matched)} files in {current_path}") + except Exception as exc: + logger.error(f"Unexpected error processing {current_path}: {exc}") finally: dir_queue.task_done() logger.info(f"Starting search in {len(root_dirs)} root directory(ies).") - - # Determine the number of worker threads; adjust as needed - num_workers: int = min(32, (len(root_dirs) * 4) or 4) - - with ThreadPoolExecutor(max_workers=num_workers) as executor: - # Launch worker threads - futures = [executor.submit(worker) for _ in range(num_workers)] - - # Wait for all tasks in the queue to be processed - dir_queue.join() - - # Optionally, wait for all worker threads to complete - for future in futures: - future.result() + num_workers: int = min(32, max(len(root_dirs) * 4, 4)) + threads: list[threading.Thread] = [ + threading.Thread(target=worker, name=f"find-files-worker-{i}", daemon=True) for i in range(num_workers) + ] + for thread in threads: + thread.start() + + # Wait until every directory queued for processing has been handled. + dir_queue.join() + stop_event.set() + + for thread in threads: + thread.join() # Log folders with matched files if print_found_folder: diff --git a/ryan_library/functions/misc_functions.py b/ryan_library/functions/misc_functions.py index 6dbfe39e..8605e6fc 100644 --- a/ryan_library/functions/misc_functions.py +++ b/ryan_library/functions/misc_functions.py @@ -4,7 +4,7 @@ import multiprocessing import pandas as pd import logging -from typing import TypedDict +from typing import Literal, TypedDict from pathlib import Path from importlib import metadata import re @@ -112,6 +112,9 @@ def export_dataframes( column_widths: dict[str, dict[str, float]] | None = None, auto_adjust_width: bool = True, file_name: str | None = None, + *, + export_mode: Literal["excel", "parquet", "both"] = "excel", + parquet_compression: str = "gzip", ) -> None: """Export multiple DataFrames to Excel files with optional column widths. Args: @@ -138,6 +141,17 @@ def export_dataframes( ``export_dict``. When provided, the auto-generated timestamp prefix is skipped and ``file_name`` is written exactly (``.xlsx`` appended when missing). + export_mode (Literal["excel", "parquet", "both"], optional): + Controls which artefacts are produced: + * "excel" (default) writes only Excel files (with automatic Parquet/CSV + fallback when Excel limits are exceeded). + * "parquet" skips Excel entirely and writes Parquet files matching the + Excel naming scheme. + * "both" writes Excel files (subject to the standard fallback) and also + emits companion Parquet files. + parquet_compression (str, optional): + Compression codec passed to pandas whenever Parquet files are written. + Defaults to ``"gzip"``. Raises: ValueError: If the number of DataFrames doesn't match the number of sheets. InvalidFileException: If there's an issue with writing the Excel file. @@ -155,6 +169,10 @@ def export_dataframes( ExcelExporter().export_dataframes(export_dict, output_directory=Path("exports")) """ datetime_string: str = datetime.now().strftime(format="%Y%m%d-%H%M") + normalized_mode: str = export_mode.lower() + valid_modes: set[str] = {"excel", "parquet", "both"} + if normalized_mode not in valid_modes: + raise ValueError(f"Invalid export_mode '{export_mode}'. Expected one of {sorted(valid_modes)}.") if file_name is not None and len(export_dict) != 1: raise ValueError("'file_name' can only be provided when exporting a single workbook.") @@ -169,17 +187,31 @@ def export_dataframes( f"For file '{file_label}', the number of dataframes ({len(dataframes)}) and sheets ({len(sheets)}) must match." ) + export_stem: str = self._resolve_export_stem( + datetime_string=datetime_string, export_key=export_key, file_name=file_name + ) + + if normalized_mode == "parquet": + self._export_as_parquet_only( + export_stem=export_stem, + dataframes=dataframes, + sheets=sheets, + output_directory=output_directory, + compression=parquet_compression, + ) + continue + if self._exceeds_excel_limits(dataframes=dataframes): logging.warning( "Data for '%s' exceeds Excel size limits. Exporting to Parquet and CSV instead.", - file_name, + export_stem, ) self._export_as_parquet_and_csv( - file_name=file_name, + export_stem=export_stem, dataframes=dataframes, sheets=sheets, - datetime_string=datetime_string, output_directory=output_directory, + compression=parquet_compression, ) continue @@ -187,7 +219,7 @@ def export_dataframes( if file_name is not None: export_filename = file_name if file_name.lower().endswith(".xlsx") else f"{file_name}.xlsx" else: - export_filename = f"{datetime_string}_{export_key}.xlsx" + export_filename = f"{export_stem}.xlsx" export_path: Path = ( (output_directory / export_filename) if output_directory else Path(export_filename) # Defaults to CWD ) @@ -239,6 +271,15 @@ def export_dataframes( logging.error(f"Failed to write to '{export_path}': {e}") raise + if normalized_mode == "both": + self._export_as_parquet_only( + export_stem=export_stem, + dataframes=dataframes, + sheets=sheets, + output_directory=output_directory, + compression=parquet_compression, + ) + def _exceeds_excel_limits(self, dataframes: list[pd.DataFrame]) -> bool: """Return True if any dataframe exceeds Excel's size limits.""" @@ -260,13 +301,86 @@ def _exceeds_excel_limits(self, dataframes: list[pd.DataFrame]) -> bool: return True return False + def _resolve_export_stem(self, *, datetime_string: str, export_key: str, file_name: str | None) -> str: + """Return the base filename (without extension) for the current export.""" + + if file_name: + return file_name[:-5] if file_name.lower().endswith(".xlsx") else file_name + return f"{datetime_string}_{export_key}" + + def _build_parquet_filename(self, base_filename: str, compression: str | None) -> str: + """Return a parquet filename with an optional compression suffix.""" + + if compression: + suffix = compression.lower() + if suffix == "gzip": + return f"{base_filename}.parquet.gzip" + return f"{base_filename}.parquet.{suffix}" + return f"{base_filename}.parquet" + + def _write_parquet( + self, + *, + df: pd.DataFrame, + sheet: str, + parquet_path: Path, + export_label: str, + compression: str | None, + ) -> None: + """Write a DataFrame to Parquet with consistent logging and error handling.""" + + try: + df.to_parquet(path=parquet_path, index=False, compression=compression) + logging.info("Exported Parquet to %s", parquet_path) + except (ImportError, ValueError) as exc: + message: str = ( + "Unable to export Parquet for " + f"'{export_label}' sheet '{sheet}': {exc}. Install pyarrow or fastparquet." + ) + logging.error(message) + print(message) + except Exception as exc: # pragma: no cover - unforeseen errors should be logged + logging.exception("Unexpected error during Parquet export for '%s' sheet '%s': %s", export_label, sheet, exc) + + def _export_as_parquet_only( + self, + *, + export_stem: str, + dataframes: list[pd.DataFrame], + sheets: list[str], + output_directory: Path | None, + compression: str | None, + ) -> None: + """Export each DataFrame to a Parquet file sharing the Excel naming scheme.""" + + export_targets: list[tuple[pd.DataFrame, str, Path]] = [] + for df, sheet in zip(dataframes, sheets): + sanitized_sheet: str = self._sanitize_name(sheet) + base_filename: str = f"{export_stem}_{sanitized_sheet}" + parquet_filename: str = self._build_parquet_filename(base_filename, compression) + parquet_path: Path = self._build_output_path( + base_filename=parquet_filename, output_directory=output_directory + ) + parquet_path.parent.mkdir(parents=True, exist_ok=True) + export_targets.append((df, sheet, parquet_path)) + + for df, sheet, parquet_path in export_targets: + self._write_parquet( + df=df, + sheet=sheet, + parquet_path=parquet_path, + export_label=export_stem, + compression=compression, + ) + def _export_as_parquet_and_csv( self, - file_name: str, + *, + export_stem: str, dataframes: list[pd.DataFrame], sheets: list[str], - datetime_string: str, output_directory: Path | None, + compression: str | None = None, ) -> None: """Export dataframes to Parquet and CSV files when Excel limits are exceeded.""" @@ -274,10 +388,11 @@ def _export_as_parquet_and_csv( for df, sheet in zip(dataframes, sheets): sanitized_sheet: str = self._sanitize_name(sheet) - base_filename: str = f"{datetime_string}_{file_name}_{sanitized_sheet}" + base_filename: str = f"{export_stem}_{sanitized_sheet}" + parquet_filename: str = self._build_parquet_filename(base_filename, compression) parquet_path: Path = self._build_output_path( - base_filename=f"{base_filename}.parquet", output_directory=output_directory + base_filename=parquet_filename, output_directory=output_directory ) csv_path: Path = self._build_output_path( base_filename=f"{base_filename}.csv", output_directory=output_directory @@ -287,20 +402,13 @@ def _export_as_parquet_and_csv( export_targets.append((df, sheet, parquet_path, csv_path)) for df, sheet, parquet_path, _ in export_targets: - try: - df.to_parquet(path=parquet_path, index=False) - logging.info("Exported Parquet to %s", parquet_path) - except (ImportError, ValueError) as exc: - message: str = ( - "Unable to export Parquet for " - f"'{file_name}' sheet '{sheet}': {exc}. Install pyarrow or fastparquet." - ) - logging.error(message) - print(message) - except Exception as exc: # pragma: no cover - unforeseen errors should be logged - logging.exception( - "Unexpected error during Parquet export for '%s' sheet '%s': %s", file_name, sheet, exc - ) + self._write_parquet( + df=df, + sheet=sheet, + parquet_path=parquet_path, + export_label=export_stem, + compression=compression, + ) for df, sheet, _, csv_path in export_targets: df.to_csv(path_or_buf=csv_path, index=False) @@ -328,6 +436,9 @@ def save_to_excel( column_widths: dict[str, float] | None = None, auto_adjust_width: bool = True, file_name: str | None = None, + *, + export_mode: Literal["excel", "parquet", "both"] = "excel", + parquet_compression: str = "gzip", ) -> None: """Export a single DataFrame to an Excel file with a single sheet and optional column widths. @@ -348,7 +459,12 @@ def save_to_excel( file_name (str | None, optional): Explicit file name to use for the exported workbook. When provided the timestamp-based prefix is skipped and ``file_name`` is written exactly as - supplied (``.xlsx`` is appended automatically when missing).""" + supplied (``.xlsx`` is appended automatically when missing). + export_mode (Literal["excel", "parquet", "both"], optional): + See :meth:`export_dataframes` for details. + parquet_compression (str, optional): + Compression codec to use when Parquet outputs are requested. Defaults to + ``"gzip"``.""" export_dict: dict[str, ExportContent] = {file_name_prefix: {"dataframes": [data_frame], "sheets": [sheet_name]}} # Prepare column_widths in the required format @@ -362,6 +478,8 @@ def save_to_excel( column_widths=prepared_column_widths, auto_adjust_width=auto_adjust_width, file_name=file_name, + export_mode=export_mode, + parquet_compression=parquet_compression, ) def calculate_column_widths(self, df: pd.DataFrame) -> dict[str, float]: diff --git a/ryan_library/functions/pandas/median_calc.py b/ryan_library/functions/pandas/median_calc.py index 9a21bae0..4d4fe4e9 100644 --- a/ryan_library/functions/pandas/median_calc.py +++ b/ryan_library/functions/pandas/median_calc.py @@ -1,13 +1,15 @@ # ryan_library\functions\pandas\median_calc.py -"""Utilities for computing median statistics for grouped data.""" +"""Utilities for summarising grouped statistics for POMM reports.""" + +from typing import Any +from collections.abc import Callable import pandas as pd from pandas import DataFrame -from typing import Any -def _median_stats_for_group(durgrp: pd.DataFrame, stat_col: str, tp_col: str, dur_col: str) -> dict[str, Any]: - """Return statistics for a single duration group.""" +def summarise_duration_statistics(durgrp: pd.DataFrame, stat_col: str, tp_col: str, dur_col: str) -> dict[str, Any]: + """Return median and mean-adjacent statistics for a single duration group.""" ensemblestat: DataFrame = durgrp.sort_values(stat_col, ascending=True, na_position="first") r: int = len(ensemblestat.index) @@ -17,11 +19,11 @@ def _median_stats_for_group(durgrp: pd.DataFrame, stat_col: str, tp_col: str, du mean_including_zeroes = float(stat_series.mean()) mean_excluding_zeroes = float(ensemblestat[ensemblestat[stat_col] != 0][stat_col].mean()) - mean_duration = ensemblestat[dur_col].iloc[medianpos] - mean_tp = ensemblestat[tp_col].iloc[medianpos] + mean_duration: Any = pd.NA + mean_tp: Any = pd.NA mean_peak_flow = float("nan") if stat_series.notna().any(): - closest_idx = (stat_series - mean_including_zeroes).abs().idxmin() + closest_idx: int | str = (stat_series - mean_including_zeroes).abs().idxmin() mean_duration = ensemblestat.loc[closest_idx, dur_col] mean_tp = ensemblestat.loc[closest_idx, tp_col] mean_peak_flow = float(ensemblestat.loc[closest_idx, stat_col]) @@ -41,11 +43,11 @@ def _median_stats_for_group(durgrp: pd.DataFrame, stat_col: str, tp_col: str, du } -def median_stats( +def calculate_median_statistics( thinned_df: pd.DataFrame, stat_col: str, tp_col: str, dur_col: str ) -> tuple[dict[str, Any], list[dict[str, Any]]]: - """Return median statistics for each duration group and the maximum median. - The logic mirrors the ``stats`` function in ``TUFLOW_2023_max_med_from POMM_v9.py``. + """Return per-duration stats and the record with the largest median. + The logic is based on the ``stats`` function in ``TUFLOW_2023_max_med_from POMM_v9.py``. For each duration group the DataFrame is sorted by ``statcol``. The median value is selected, along with the associated temporal pattern. The group with the highest median is returned separately. @@ -73,8 +75,8 @@ def median_stats( tracking_median: float = float("-inf") count_bin: int = 0 - for _, durgrp in thinned_df.groupby(by=dur_col): - stats_dict: dict[str, Any] = _median_stats_for_group( + for _, durgrp in thinned_df.groupby(by=dur_col): # type: ignore + stats_dict: dict[str, Any] = summarise_duration_statistics( durgrp=durgrp, stat_col=stat_col, tp_col=tp_col, dur_col=dur_col ) @@ -98,6 +100,10 @@ def median_stats( def median_calc( thinned_df: pd.DataFrame, statcol: str, tpcol: str, durcol: str ) -> tuple[dict[str, Any], list[dict[str, Any]]]: - """Compatibility wrapper for previous function name.""" + """Compatibility wrapper retaining the legacy public function name.""" + + return calculate_median_statistics(thinned_df=thinned_df, stat_col=statcol, tp_col=tpcol, dur_col=durcol) + - return median_stats(thinned_df=thinned_df, stat_col=statcol, tp_col=tpcol, dur_col=durcol) +# Backwards compatibility for older imports +median_stats: Callable[..., tuple[dict[str, Any], list[dict[str, Any]]]] = calculate_median_statistics diff --git a/ryan_library/functions/tuflow/pomm_combine.py b/ryan_library/functions/tuflow/pomm_combine.py index 6a29c826..4edd270e 100644 --- a/ryan_library/functions/tuflow/pomm_combine.py +++ b/ryan_library/functions/tuflow/pomm_combine.py @@ -2,14 +2,13 @@ from collections.abc import Collection from pathlib import Path +from typing import Literal import pandas as pd from loguru import logger -from ryan_library.scripts.pomm_utils import ( - collect_files, - process_files_in_parallel, -) +from ryan_library.scripts.pomm_utils import process_files_in_parallel +from ryan_library.functions.tuflow.tuflow_common import collect_files from ryan_library.processors.tuflow.base_processor import BaseProcessor from ryan_library.processors.tuflow.processor_collection import ProcessorCollection from ryan_library.functions.file_utils import ensure_output_directory @@ -23,6 +22,7 @@ def main_processing( include_data_types: list[str] | None = None, console_log_level: str = "INFO", locations_to_include: Collection[str] | None = None, + export_mode: Literal["excel", "parquet", "both"] = "excel", ) -> None: """Generate merged culvert data and export the results.""" @@ -50,26 +50,30 @@ def main_processing( if normalized_locations: results_set.filter_locations(normalized_locations) - export_results(results=results_set) + export_results(results=results_set, export_mode=export_mode) logger.info("End of POMM results combination processing") -def export_results(results: ProcessorCollection) -> None: - """Export combined DataFrames to Excel.""" +def export_results( + *, results: ProcessorCollection, export_mode: Literal["excel", "parquet", "both"] = "excel" +) -> None: + """Export combined DataFrames according to the requested mode.""" if not results.processors: logger.warning("No results to export.") return - exporter = ExcelExporter() combined_df: pd.DataFrame = results.pomm_combine() if combined_df.empty: logger.warning("No combined data found. Skipping export.") return ensure_output_directory(output_dir=Path.cwd()) + exporter = ExcelExporter() exporter.save_to_excel( data_frame=combined_df, file_name_prefix="combined_POMM", sheet_name="combined_POMM", output_directory=Path.cwd(), + export_mode=export_mode, + parquet_compression="gzip", ) diff --git a/ryan_library/functions/tuflow/tuflow_common.py b/ryan_library/functions/tuflow/tuflow_common.py index bfbb35de..a7f80026 100644 --- a/ryan_library/functions/tuflow/tuflow_common.py +++ b/ryan_library/functions/tuflow/tuflow_common.py @@ -3,10 +3,8 @@ from pathlib import Path from multiprocessing import Pool from dataclasses import dataclass, field -from queue import Queue +from collections.abc import Iterable from typing import Any - -import pandas as pd from loguru import logger from ryan_library.functions.file_utils import ( @@ -14,12 +12,16 @@ is_non_zero_file, ) from ryan_library.functions.misc_functions import calculate_pool_size -from ryan_library.functions.loguru_helpers import setup_logger, worker_initializer +from ryan_library.functions.loguru_helpers import worker_initializer from ryan_library.processors.tuflow.base_processor import BaseProcessor from ryan_library.processors.tuflow.processor_collection import ProcessorCollection from ryan_library.classes.suffixes_and_dtypes import SuffixesConfig +def _string_list() -> list[str]: + return [] + + @dataclass(frozen=True) class ScenarioConfig: """One export scenario.""" @@ -28,31 +30,72 @@ class ScenarioConfig: parquet_prefix: str # e.g. "1d_timeseries_data" excel_sheet: str # e.g. "Timeseries" export_parquet: bool # only True for timeseries - column_order: list[str] = field(default_factory=list) + column_order: list[str] = field(default_factory=_string_list) def collect_files( - paths_to_process: list[Path], - include_data_types: list[str], + paths_to_process: Iterable[Path], + include_data_types: Iterable[str], suffixes_config: SuffixesConfig, ) -> list[Path]: + """Return all non-empty files matching ``include_data_types`` underneath ``paths_to_process``.""" + + normalized_roots: list[Path] = [] + seen_roots: set[Path] = set() + for candidate in paths_to_process: + path = Path(candidate) + if path in seen_roots: + continue + seen_roots.add(path) + normalized_roots.append(path) + + deduped_types: list[str] = [] + seen_types: set[str] = set() + for data_type in include_data_types: + if data_type in seen_types: + continue + seen_types.add(data_type) + deduped_types.append(data_type) + + if not deduped_types: + logger.error("No data types were supplied for file collection.") + return [] + data_map: dict[str, list[str]] = suffixes_config.invert_suffix_to_type() suffixes: list[str] = [] - for dt in include_data_types: - if dt in data_map: - suffixes.extend(data_map[dt]) - else: - logger.error(f"No suffixes for data type '{dt}'") + for data_type in deduped_types: + dt_suffixes: list[str] | None = data_map.get(data_type) + if not dt_suffixes: + logger.error(f"No suffixes for data type '{data_type}'. Skipping.") + continue + suffixes.extend(dt_suffixes) + + suffixes = list(dict.fromkeys(suffixes)) if not suffixes: + logger.error("No suffixes found for the requested data types.") return [] - patterns: list[str] = [f"*{s}" for s in suffixes] - roots: list[Path] = [p for p in paths_to_process if p.is_dir()] - for bad in set(paths_to_process) - set(roots): - logger.warning(f"Skipping non-dir {bad}") + patterns: list[str] = [f"*{suffix}" for suffix in suffixes] + roots: list[Path] = [p for p in normalized_roots if p.is_dir()] + invalid_roots: list[Path] = [p for p in normalized_roots if not p.is_dir()] + for bad_root in invalid_roots: + logger.warning(f"Skipping non-directory path {bad_root}") + + if not roots: + logger.warning("No valid directories were supplied for file collection.") + return [] files: list[Path] = find_files_parallel(root_dirs=roots, patterns=patterns) - return [f for f in files if is_non_zero_file(f)] + filtered_files: list[Path] = [] + seen_files: set[Path] = set() + for file_path in files: + if not is_non_zero_file(file_path): + continue + if file_path in seen_files: + continue + seen_files.add(file_path) + filtered_files.append(file_path) + return filtered_files def process_file(file_path: Path) -> BaseProcessor | None: diff --git a/ryan_library/processors/tuflow/RLLQmxProcessor.py b/ryan_library/processors/tuflow/RLLQmxProcessor.py new file mode 100644 index 00000000..1f858ef1 --- /dev/null +++ b/ryan_library/processors/tuflow/RLLQmxProcessor.py @@ -0,0 +1,60 @@ +# ryan_library/processors/tuflow/rll_qmx_processor.py + +import pandas as pd +from loguru import logger + +from .base_processor import BaseProcessor + + +class RLLQmxProcessor(BaseProcessor): + """Processor for '_RLL_Qmx.csv' files produced by the Reporting Location Lines maximum export.""" + + def process(self) -> pd.DataFrame: + """Process the '_RLL_Qmx.csv' file and return the cleaned DataFrame.""" + logger.info(f"Starting processing of RLL Qmx file: {self.file_path}") + try: + status: int = self.read_maximums_csv() + if status != 0: + logger.error(f"Processing aborted for file: {self.file_path} due to previous errors.") + self.df = pd.DataFrame() + return self.df + + self._reshape_rll_qmx_data() + + self.add_common_columns() + self.apply_output_transformations() + + if not self.validate_data(): + logger.error(f"{self.file_name}: Data validation failed.") + self.df = pd.DataFrame() + return self.df + + self.processed = True + logger.info(f"Completed processing of RLL Qmx file: {self.file_path}") + return self.df + except Exception as exc: + logger.error(f"Failed to process RLL Qmx file {self.file_path}: {exc}") + self.df = pd.DataFrame() + return self.df + + def _reshape_rll_qmx_data(self) -> None: + """Rename columns and align schema with other maximum datasets.""" + required_columns: list[str] = ["ID", "Qmax", "Time Qmax", "dQmax", "Time dQmax", "H"] + missing_columns: list[str] = [col for col in required_columns if col not in self.df.columns] + if missing_columns: + logger.error( + f"{self.file_name}: Missing required columns for RLL Qmx processing: {missing_columns}" + ) + self.df = pd.DataFrame() + return + + rename_map: dict[str, str] = { + "Qmax": "Q", + "Time Qmax": "Time", + "dQmax": "dQ", + "Time dQmax": "Time dQ", + } + self.df.rename(columns=rename_map, inplace=True) + + ordered_columns: list[str] = ["ID", "Time", "Q", "dQ", "Time dQ", "H"] + self.df = self.df[ordered_columns] diff --git a/ryan_library/processors/tuflow/processor_collection.py b/ryan_library/processors/tuflow/processor_collection.py index b42782e5..01947897 100644 --- a/ryan_library/processors/tuflow/processor_collection.py +++ b/ryan_library/processors/tuflow/processor_collection.py @@ -3,7 +3,7 @@ from collections.abc import Collection from loguru import logger import pandas as pd -from pandas import DataFrame +from pandas import DataFrame, Series from ryan_library.functions.dataframe_helpers import ( reorder_columns, reorder_long_columns, @@ -58,7 +58,9 @@ def filter_locations(self, locations: Collection[str] | None) -> frozenset[str]: continue processor.filter_locations(normalized_locations) - filtered_processors: list[BaseProcessor] = [processor for processor in self.processors if not processor.df.empty] + filtered_processors: list[BaseProcessor] = [ + processor for processor in self.processors if not processor.df.empty + ] removed_processors: int = len(self.processors) - len(filtered_processors) self.processors = filtered_processors @@ -107,9 +109,7 @@ def combine_1d_timeseries(self) -> pd.DataFrame: columns_to_drop: list[str] = ["file", "rel_path", "path", "directory_path"] # Check for existing columns and drop them - existing_columns_to_drop: list[str] = [ - col for col in columns_to_drop if col in combined_df.columns - ] + existing_columns_to_drop: list[str] = [col for col in columns_to_drop if col in combined_df.columns] if existing_columns_to_drop: combined_df.drop(columns=existing_columns_to_drop, inplace=True) logger.debug(f"Dropped columns {existing_columns_to_drop} from DataFrame.") @@ -118,9 +118,7 @@ def combine_1d_timeseries(self) -> pd.DataFrame: # Reset categorical ordering # Group by 'internalName', 'Chan ID', and 'Time' group_keys: list[str] = ["internalName", "Chan ID", "Time"] - missing_keys: list[str] = [ - key for key in group_keys if key not in combined_df.columns - ] + missing_keys: list[str] = [key for key in group_keys if key not in combined_df.columns] if missing_keys: logger.error(f"Missing group keys {missing_keys} in Timeseries data.") return pd.DataFrame() @@ -128,9 +126,7 @@ def combine_1d_timeseries(self) -> pd.DataFrame: combined_df = reorder_long_columns(df=combined_df) grouped_df: DataFrame = combined_df.groupby(group_keys).agg("max").reset_index() - logger.debug( - f"Grouped {len(timeseries_processors)} Timeseries DataFrame with {len(grouped_df)} rows." - ) + logger.debug(f"Grouped {len(timeseries_processors)} Timeseries DataFrame with {len(grouped_df)} rows.") return grouped_df @@ -153,44 +149,54 @@ def combine_1d_maximums(self) -> pd.DataFrame: return pd.DataFrame() # Concatenate DataFrames - combined_df: DataFrame = pd.concat( - [p.df for p in maximums_processors if not p.df.empty], ignore_index=True - ) + combined_df: DataFrame = pd.concat([p.df for p in maximums_processors if not p.df.empty], ignore_index=True) logger.debug(f"Combined Maximums/ccA DataFrame with {len(combined_df)} rows.") # Columns to drop columns_to_drop: list[str] = ["file", "rel_path", "path", "Time"] # Check for existing columns and drop them - existing_columns_to_drop: list[str] = [ - col for col in columns_to_drop if col in combined_df.columns - ] + existing_columns_to_drop: list[str] = [col for col in columns_to_drop if col in combined_df.columns] if existing_columns_to_drop: combined_df.drop(columns=existing_columns_to_drop, inplace=True) logger.debug(f"Dropped columns {existing_columns_to_drop} from DataFrame.") combined_df = reorder_long_columns(df=combined_df) + combined_df = self._ensure_location_identifier(df=combined_df) # Reset categorical ordering combined_df = reset_categorical_ordering(combined_df) - # Group by 'internalName' and 'Chan ID' - group_keys: list[str] = ["internalName", "Chan ID"] - missing_keys: list[str] = [ - key for key in group_keys if key not in combined_df.columns - ] + if "Location ID" not in combined_df.columns: + logger.error("Location ID column is missing after maximums preprocessing.") + return pd.DataFrame() + + missing_location_mask: Series[bool] = combined_df["Location ID"].isna() + if missing_location_mask.any(): + logger.warning( + "Dropping {count} rows without a location identifier from Maximums/ccA data.", + count=int(missing_location_mask.sum()), + ) + combined_df = combined_df[~missing_location_mask] + if combined_df.empty: + logger.error("No Maximums/ccA data remaining after removing rows without location identifiers.") + return pd.DataFrame() + + # Group by 'internalName' and the derived 'Location ID' + group_keys: list[str] = ["internalName", "Location ID"] + missing_keys: list[str] = [key for key in group_keys if key not in combined_df.columns] if missing_keys: logger.error(f"Missing group keys {missing_keys} in Maximums/ccA data.") return pd.DataFrame() - grouped_df: DataFrame = ( - combined_df.groupby(by=group_keys, observed=False).agg("max").reset_index() - ) + grouped_df: DataFrame = combined_df.groupby(by=group_keys, observed=False).agg("max").reset_index() p1_col: list[str] = [ "trim_runcode", "aep_text", "duration_text", "tp_text", + "Location ID", "Chan ID", + "ID", "Q", "V", "DS_h", @@ -219,12 +225,34 @@ def combine_1d_maximums(self) -> pd.DataFrame: prefix_order=["R"], second_priority_columns=p2_col, ) - logger.debug( - f"Grouped {len(maximums_processors)} Maximums/ccA DataFrame with {len(grouped_df)} rows." - ) + logger.debug(f"Grouped {len(maximums_processors)} Maximums/ccA DataFrame with {len(grouped_df)} rows.") logger.debug("line157") return grouped_df + def _ensure_location_identifier(self, df: DataFrame) -> DataFrame: + """Ensure a 'Location ID' column exists for grouping maximum datasets.""" + if df.empty: + df["Location ID"] = pd.Series(dtype="string") + return df + + candidate_columns: list[str] = ["Chan ID", "ID", "Location"] + available_columns: list[str] = [col for col in candidate_columns if col in df.columns] + + if not available_columns: + logger.error("No location-based columns available to derive 'Location ID'.") + df["Location ID"] = pd.Series(pd.NA, index=df.index, dtype="string") + return df + + location_series: pd.Series = pd.Series(pd.NA, index=df.index, dtype="string") + for column in available_columns: + source_values: Series[str] = df[column].astype("string") + mask: Series[bool] = location_series.isna() & source_values.notna() + if mask.any(): + location_series.loc[mask] = source_values.loc[mask] + + df["Location ID"] = location_series + return df + def combine_raw(self) -> pd.DataFrame: """Concatenate all DataFrames together without any grouping. @@ -233,9 +261,7 @@ def combine_raw(self) -> pd.DataFrame: logger.debug("Combining raw data without grouping.") # Concatenate all DataFrames - combined_df: DataFrame = pd.concat( - [p.df for p in self.processors if not p.df.empty], ignore_index=True - ) + combined_df: DataFrame = pd.concat([p.df for p in self.processors if not p.df.empty], ignore_index=True) logger.debug(f"Combined Raw DataFrame with {len(combined_df)} rows.") combined_df = reorder_long_columns(df=combined_df) @@ -254,21 +280,15 @@ def pomm_combine(self) -> pd.DataFrame: logger.debug("Combining POMM data.") # Filter processors with dataformat 'POMM' - pomm_processors: list[BaseProcessor] = [ - p for p in self.processors if p.dataformat.lower() == "pomm" - ] + pomm_processors: list[BaseProcessor] = [p for p in self.processors if p.dataformat.lower() == "pomm"] if not pomm_processors: logger.warning("No processors with dataformat 'POMM' found.") return pd.DataFrame() # Concatenate DataFrames - combined_df: DataFrame = pd.concat( - [p.df for p in pomm_processors if not p.df.empty], ignore_index=True - ) - logger.debug( - f"Combined {len(pomm_processors)} POMM DataFrame with {len(combined_df)} rows." - ) + combined_df: DataFrame = pd.concat([p.df for p in pomm_processors if not p.df.empty], ignore_index=True) + logger.debug(f"Combined {len(pomm_processors)} POMM DataFrame with {len(combined_df)} rows.") combined_df = reorder_long_columns(df=combined_df) @@ -286,21 +306,15 @@ def po_combine(self) -> pd.DataFrame: logger.debug("Combining PO data.") # Filter processors with dataformat 'PO' - po_processors: list[BaseProcessor] = [ - p for p in self.processors if p.dataformat.lower() == "po" - ] + po_processors: list[BaseProcessor] = [p for p in self.processors if p.dataformat.lower() == "po"] if not po_processors: logger.warning("No processors with dataformat 'PO' found.") return pd.DataFrame() # Concatenate DataFrames - combined_df = pd.concat( - [p.df for p in po_processors if not p.df.empty], ignore_index=True - ) - logger.debug( - f"Combined {len(po_processors)} PO DataFrame with {len(combined_df)} rows." - ) + combined_df = pd.concat([p.df for p in po_processors if not p.df.empty], ignore_index=True) + logger.debug(f"Combined {len(po_processors)} PO DataFrame with {len(combined_df)} rows.") combined_df: DataFrame = reorder_long_columns(df=combined_df) @@ -309,9 +323,7 @@ def po_combine(self) -> pd.DataFrame: return combined_df - def get_processors_by_data_type( - self, data_types: list[str] | str - ) -> "ProcessorCollection": + def get_processors_by_data_type(self, data_types: list[str] | str) -> "ProcessorCollection": """Retrieve processors matching a specific data_type or list of data_types. Args: @@ -366,8 +378,7 @@ def check_duplicates(self) -> dict[tuple[str, str], list[BaseProcessor]]: for (run_code, dtype), procs in duplicates.items(): files = ", ".join(p.file_name for p in procs) logger.warning( - f"Potential duplicate group: run_code='{run_code}', " - f"data_type='{dtype}' found in files: {files}" + f"Potential duplicate group: run_code='{run_code}', " f"data_type='{dtype}' found in files: {files}" ) else: logger.debug("No duplicate processors found by run_code & data_type.") diff --git a/ryan_library/scripts/RORB/closure_durations.py b/ryan_library/scripts/RORB/closure_durations.py index 7dc44a06..68a9310c 100644 --- a/ryan_library/scripts/RORB/closure_durations.py +++ b/ryan_library/scripts/RORB/closure_durations.py @@ -1,8 +1,9 @@ +# ryan_library\scripts\RORB\closure_durations.py + from datetime import datetime from pathlib import Path from collections.abc import Iterable -from typing import TYPE_CHECKING import pandas as pd from loguru import logger diff --git a/ryan_library/scripts/pomm_max_items.py b/ryan_library/scripts/pomm_max_items.py index c739c6a4..e4e1da99 100644 --- a/ryan_library/scripts/pomm_max_items.py +++ b/ryan_library/scripts/pomm_max_items.py @@ -1,67 +1,105 @@ # ryan_library/scripts/pomm_max_items.py -from collections.abc import Collection, Callable -from loguru import logger -from pathlib import Path +from collections.abc import Callable, Collection from datetime import datetime +from pathlib import Path +import warnings + import pandas as pd +from loguru import logger +from ryan_library.functions.loguru_helpers import setup_logger +from ryan_library.processors.tuflow.base_processor import BaseProcessor from ryan_library.scripts.pomm_utils import ( aggregated_from_paths, save_peak_report_mean, save_peak_report_median, ) -from ryan_library.functions.loguru_helpers import setup_logger -from ryan_library.processors.tuflow.base_processor import BaseProcessor def run_peak_report(script_directory: Path | None = None) -> None: - """Run the peak report generation workflow.""" + """Legacy entry point retained for backwards compatibility.""" + print() print("You are using an old wrapper") print() - run_median_peak_report() + export_median_peak_report(script_directory=script_directory) -def _run_peak_report( +def run_peak_report_workflow( + *, script_directory: Path | None = None, log_level: str = "INFO", include_pomm: bool = True, locations_to_include: Collection[str] | None = None, - save_report: Callable[..., None] | None = None, + exporter: Callable[..., None], ) -> None: - """Core implementation for running a peak report workflow.""" - - setup_logger(console_log_level=log_level) - logger.info(f"Current Working Directory: {Path.cwd()}") - if script_directory is None: - script_directory = Path.cwd() + """Coordinate loading peak data and exporting via ``exporter``.""" + script_directory = script_directory or Path.cwd() normalized_locations: frozenset[str] = BaseProcessor.normalize_locations(locations_to_include) + location_filter: frozenset[str] | None = normalized_locations if normalized_locations else None - if locations_to_include and not normalized_locations: - logger.warning("Location filter provided but no valid values found. All locations will be included.") + with setup_logger(console_log_level=log_level): + logger.info(f"Current Working Directory: {Path.cwd()}") - aggregated_df: pd.DataFrame = aggregated_from_paths( - paths=[script_directory], - locations_to_include=normalized_locations if normalized_locations else None, + if locations_to_include and not normalized_locations: + logger.warning("Location filter provided but no valid values found. All locations will be included.") + + aggregated_df: pd.DataFrame = aggregated_from_paths( + paths=[script_directory], + locations_to_include=location_filter, + ) + + if aggregated_df.empty: + if location_filter: + logger.warning("No rows remain after applying the Location filter. Exiting.") + else: + logger.warning("No POMM CSV files found. Exiting.") + return + + timestamp: str = datetime.now().strftime(format="%Y%m%d-%H%M") + exporter( + aggregated_df=aggregated_df, + script_directory=script_directory, + timestamp=timestamp, + include_pomm=include_pomm, + ) + + +def export_median_peak_report( + *, + script_directory: Path | None = None, + log_level: str = "INFO", + include_pomm: bool = True, + locations_to_include: Collection[str] | None = None, +) -> None: + """Locate and process POMM files and export median-based peak values.""" + + run_peak_report_workflow( + script_directory=script_directory, + log_level=log_level, + include_pomm=include_pomm, + locations_to_include=locations_to_include, + exporter=save_peak_report_median, ) - if aggregated_df.empty: - if normalized_locations: - logger.warning("No rows remain after applying the Location filter. Exiting.") - else: - logger.warning("No POMM CSV files found. Exiting.") - return - - timestamp: str = datetime.now().strftime(format="%Y%m%d-%H%M") - if save_report is None: - save_report = save_peak_report_median - save_report( - aggregated_df=aggregated_df, + +def export_mean_peak_report( + *, + script_directory: Path | None = None, + log_level: str = "INFO", + include_pomm: bool = True, + locations_to_include: Collection[str] | None = None, +) -> None: + """Locate and process POMM files and export mean-based peak values.""" + + run_peak_report_workflow( script_directory=script_directory, - timestamp=timestamp, + log_level=log_level, include_pomm=include_pomm, + locations_to_include=locations_to_include, + exporter=save_peak_report_mean, ) @@ -71,14 +109,18 @@ def run_median_peak_report( include_pomm: bool = True, locations_to_include: Collection[str] | None = None, ) -> None: - """Locate and process POMM files and export median-based peak values.""" + """Deprecated wrapper around :func:`export_median_peak_report`.""" - _run_peak_report( + warnings.warn( + "run_median_peak_report is deprecated; use export_median_peak_report instead.", + DeprecationWarning, + stacklevel=2, + ) + export_median_peak_report( script_directory=script_directory, log_level=log_level, include_pomm=include_pomm, locations_to_include=locations_to_include, - save_report=save_peak_report_median, ) @@ -88,12 +130,16 @@ def run_mean_peak_report( include_pomm: bool = True, locations_to_include: Collection[str] | None = None, ) -> None: - """Locate and process POMM files and export mean-based peak values.""" + """Deprecated wrapper around :func:`export_mean_peak_report`.""" - _run_peak_report( + warnings.warn( + "run_mean_peak_report is deprecated; use export_mean_peak_report instead.", + DeprecationWarning, + stacklevel=2, + ) + export_mean_peak_report( script_directory=script_directory, log_level=log_level, include_pomm=include_pomm, locations_to_include=locations_to_include, - save_report=save_peak_report_mean, ) diff --git a/ryan_library/scripts/pomm_utils.py b/ryan_library/scripts/pomm_utils.py index eb41e390..8b31ecfd 100644 --- a/ryan_library/scripts/pomm_utils.py +++ b/ryan_library/scripts/pomm_utils.py @@ -3,26 +3,23 @@ from pathlib import Path from multiprocessing import Pool -from collections.abc import Collection, Iterable, Mapping +from collections.abc import Collection, Mapping, Sequence from datetime import datetime, timezone -from importlib.metadata import PackageNotFoundError, version from typing import Any import pandas as pd from loguru import logger +from pandas import Series -from ryan_library.classes.column_definitions import ColumnMetadataRegistry +from ryan_library.classes.column_definitions import ColumnDefinition, ColumnMetadataRegistry from ryan_library.functions.pandas.median_calc import median_calc - -from ryan_library.functions.file_utils import ( - find_files_parallel, - is_non_zero_file, -) -from ryan_library.functions.misc_functions import ExcelExporter, calculate_pool_size +from ryan_library.functions.misc_functions import ExcelExporter, calculate_pool_size, get_tools_version from ryan_library.processors.tuflow.base_processor import BaseProcessor from ryan_library.processors.tuflow.processor_collection import ProcessorCollection from ryan_library.classes.suffixes_and_dtypes import SuffixesConfig from ryan_library.functions.loguru_helpers import setup_logger, worker_initializer +from ryan_library.functions.tuflow.tuflow_common import collect_files +from ryan_library.classes.tuflow_string_classes import TuflowStringParser NAType = type(pd.NA) @@ -30,52 +27,21 @@ DATA_DICTIONARY_SHEET_NAME: str = "data-dictionary" -def collect_files( - paths_to_process: Iterable[Path], - include_data_types: Iterable[str], - suffixes_config: SuffixesConfig, -) -> list[Path]: - """Collect and filter files based on specified data types. - - Args: - paths_to_process (list[Path]): Directories to search. - include_data_types (list[str] ): List of data types to include. - suffixes_config (SuffixesConfig ): Suffixes configuration instance. - - Returns: - list[Path]: List of valid file paths.""" - - csv_file_list: list[Path] = [] - suffixes: list[str] = [] - - # Determine which suffixes to include based on data types - # Invert suffixes config - data_type_to_suffix: dict[str, list[str]] = suffixes_config.invert_suffix_to_type() +def _ordered_columns( + df: pd.DataFrame, + column_groups: Sequence[Sequence[str]], + info_columns: Sequence[str], +) -> list[str]: + """Return a column order keeping identifiers first and meta-data last.""" - for data_type in include_data_types: - dt_suffixes: list[str] | None = data_type_to_suffix.get(data_type) - if not dt_suffixes: - logger.error(f"No suffixes found for data type '{data_type}'. Skipping.") - continue - suffixes.extend(dt_suffixes) + ordered: list[str] = [] + for group in column_groups: + ordered.extend([column for column in group if column in df.columns]) - if not suffixes: - logger.error("No suffixes found for the specified data types.") - return csv_file_list - - # Prepend '*' for wildcard searching - patterns: list[str] = [f"*{suffix}" for suffix in suffixes] - - root_dirs: list[Path] = [p for p in paths_to_process if p.is_dir()] - invalid_dirs: set[Path] = set(paths_to_process) - set(root_dirs) - for invalid_dir in invalid_dirs: - logger.warning(f"Path {invalid_dir} is not a directory. Skipping.") - - matched_files: list[Path] = find_files_parallel(root_dirs=root_dirs, patterns=patterns) - csv_file_list.extend(matched_files) - - csv_file_list = [f for f in csv_file_list if is_non_zero_file(f)] - return csv_file_list + remaining: list[str] = [column for column in df.columns if column not in ordered and column not in info_columns] + ordered.extend(remaining) + ordered.extend([column for column in info_columns if column in df.columns]) + return ordered def process_file(file_path: Path, location_filter: frozenset[str] | None = None) -> BaseProcessor: @@ -306,7 +272,7 @@ def _build_metadata_rows( "Generated at": generated_at, "Filename timestamp": timestamp if timestamp else "not supplied", "Generator module": __name__, - "ryan_functions version": _resolve_package_version("ryan_functions"), + "ryan_functions version": get_tools_version(package="ryan_functions"), "Include POMM sheet": "Yes" if include_pomm else "No", f"{aep_dur_sheet_name} rows": str(len(aep_dur_max)), f"{aep_sheet_name} rows": str(len(aep_max)), @@ -320,22 +286,13 @@ def _build_metadata_rows( directories_series = aggregated_df["directory_path"].dropna() except AttributeError: directories_series = pd.Series(dtype="string") - unique_directories = sorted({str(Path(dir_value)) for dir_value in directories_series.unique()}) + unique_directories: list[str] = sorted({str(Path(dir_value)) for dir_value in directories_series.unique()}) if unique_directories: metadata["Source directories"] = "\n".join(unique_directories) return metadata -def _resolve_package_version(package_name: str) -> str: - """Return the installed version for ``package_name`` if available.""" - - try: - return version(package_name) - except PackageNotFoundError: - return "unknown" - - def _build_data_dictionary( registry: ColumnMetadataRegistry, sheet_frames: Mapping[str, pd.DataFrame], @@ -370,7 +327,7 @@ def _build_data_dictionary( continue dtype_map: dict[str, str] = {column: str(dtype) for column, dtype in frame.dtypes.items()} - definitions = registry.iter_definitions(columns, sheet_name=sheet_name) + definitions: list[ColumnDefinition] = registry.iter_definitions(columns, sheet_name=sheet_name) for column_name, definition in zip(columns, definitions): rows.append( @@ -384,7 +341,7 @@ def _build_data_dictionary( ) return pd.DataFrame( - rows, + data=rows, columns=["sheet", "column", "description", "value_type", "pandas_dtype"], ) @@ -402,7 +359,6 @@ def save_peak_report( output_filename: str = f"{timestamp}{suffix}" output_path: Path = script_directory / output_filename logger.info(f"Starting export of peak report to {output_path}") - logger.info(f"Starting export of peak report to {output_path}") save_to_excel( aep_dur_max=aep_dur_max, aep_max=aep_max, @@ -412,7 +368,6 @@ def save_peak_report( timestamp=timestamp, ) logger.info(f"Completed peak report export to {output_path}") - logger.info(f"Completed peak report export to {output_path}") def find_aep_dur_median(aggregated_df: pd.DataFrame) -> pd.DataFrame: @@ -447,54 +402,33 @@ def find_aep_dur_median(aggregated_df: pd.DataFrame) -> pd.DataFrame: rows.append(row) median_df = pd.DataFrame(rows) if not median_df.empty: - - def norm_tp(value: str | int | float | None) -> str | NAType: + # Normalise TP / duration text so the "mean storm equals median storm" flag is stable. + def _normalize_tp(value: object) -> str | NAType: if pd.isna(value): return pd.NA - cleaned = str(value).replace("TP", "") - numeric = pd.to_numeric(cleaned, errors="coerce") - return pd.NA if pd.isna(numeric) else f"TP{int(numeric):02d}" - - def norm_duration(value: object) -> float: - if pd.isna(value): - return float("nan") - text = str(value).strip().lower() - suffixes: tuple[str, ...] = ( - "hours", - "hour", - "hrs", - "hr", - "h", - "minutes", - "minute", - "mins", - "min", - "m", - ) - for suffix in suffixes: - if text.endswith(suffix): - text = text[: -len(suffix)] - break - cleaned = text.strip() - numeric = pd.to_numeric(cleaned, errors="coerce") - return float(numeric) if pd.notna(numeric) else float("nan") + normalized = TuflowStringParser.normalize_tp_label(value) + return pd.NA if normalized is None else normalized for column in ("median_TP", "mean_TP"): if column in median_df.columns: - median_df[column] = median_df[column].apply(norm_tp) + median_df[column] = median_df[column].apply(_normalize_tp) - mean_storm_matches = pd.Series(False, index=median_df.index) - required_cols = { + mean_storm_matches: Series[bool] = pd.Series(False, index=median_df.index) + required_cols: set[str] = { "median_duration", "mean_Duration", "median_TP", "mean_TP", } if required_cols.issubset(median_df.columns): - median_duration_norm = median_df["median_duration"].map(norm_duration) - mean_duration_norm = median_df["mean_Duration"].map(norm_duration) - duration_match = median_duration_norm.eq(mean_duration_norm) - tp_match = median_df["median_TP"].eq(median_df["mean_TP"]) + median_duration_norm: Series[float] = median_df["median_duration"].map( + TuflowStringParser.normalize_duration_value + ) + mean_duration_norm: Series[float] = median_df["mean_Duration"].map( + TuflowStringParser.normalize_duration_value + ) + duration_match: Series[bool] = median_duration_norm.eq(mean_duration_norm) + tp_match: Series[bool] = median_df["median_TP"].eq(median_df["mean_TP"]) mean_storm_matches = (duration_match & tp_match).fillna(False) median_df["mean_storm_is_median_storm"] = mean_storm_matches @@ -510,15 +444,11 @@ def norm_duration(value: object) -> float: median_columns: list[str] = ["MedianAbsMax", "median_duration", "median_TP"] info_columns: list[str] = ["low", "high", "count", "count_bin", "mean_storm_is_median_storm"] - ordered_cols: list[str] = [] - for group in (id_columns, mean_columns, median_columns): - ordered_cols.extend([col for col in group if col in median_df.columns]) - - remaining_cols: list[str] = [ - col for col in median_df.columns if col not in ordered_cols and col not in info_columns - ] - ordered_cols.extend(remaining_cols) - ordered_cols.extend([col for col in info_columns if col in median_df.columns]) + ordered_cols: list[str] = _ordered_columns( + df=median_df, + column_groups=(id_columns, mean_columns, median_columns), + info_columns=info_columns, + ) median_df = median_df[ordered_cols] logger.info("Created 'aep_dur_median' DataFrame with median records for each AEP-Duration group.") @@ -551,6 +481,8 @@ def find_aep_median_max(aep_dur_median: pd.DataFrame) -> pd.DataFrame: mean_df: pd.DataFrame = aep_dur_median.copy() mean_df["_mean_peakflow_numeric"] = pd.to_numeric(mean_df.get("mean_PeakFlow"), errors="coerce") # type: ignore if mean_df["_mean_peakflow_numeric"].notna().any(): + # When the mean columns are present we track the rows that best represent + # the maximum mean independently from the median selection above. idx_mean = ( mean_df[mean_df["_mean_peakflow_numeric"].notna()] .groupby(group_cols, observed=True)["_mean_peakflow_numeric"] @@ -582,45 +514,11 @@ def find_aep_median_max(aep_dur_median: pd.DataFrame) -> pd.DataFrame: "aep_bin", ] - ordered_cols: list[str] = [] - for group in (id_columns, mean_columns, median_columns): - ordered_cols.extend([col for col in group if col in aep_med_max.columns]) - - remaining_cols: list[str] = [ - col for col in aep_med_max.columns if col not in ordered_cols and col not in info_columns - ] - ordered_cols.extend(remaining_cols) - ordered_cols.extend([col for col in info_columns if col in aep_med_max.columns]) - - aep_med_max = aep_med_max[ordered_cols] - if not aep_med_max.empty: - id_columns: list[str] = ["aep_text", "duration_text", "Location", "Type", "trim_runcode"] - mean_columns: list[str] = [ - "mean_including_zeroes", - "mean_excluding_zeroes", - "mean_PeakFlow", - "mean_Duration", - "mean_TP", - ] - median_columns: list[str] = ["MedianAbsMax", "median_duration", "median_TP"] - info_columns: list[str] = [ - "low", - "high", - "count", - "count_bin", - "mean_storm_is_median_storm", - "aep_bin", - ] - - ordered_cols: list[str] = [] - for group in (id_columns, mean_columns, median_columns): - ordered_cols.extend([col for col in group if col in aep_med_max.columns]) - - remaining_cols: list[str] = [ - col for col in aep_med_max.columns if col not in ordered_cols and col not in info_columns - ] - ordered_cols.extend(remaining_cols) - ordered_cols.extend([col for col in info_columns if col in aep_med_max.columns]) + ordered_cols: list[str] = _ordered_columns( + df=aep_med_max, + column_groups=(id_columns, mean_columns, median_columns), + info_columns=info_columns, + ) aep_med_max = aep_med_max[ordered_cols] logger.info("Created 'aep_median_max' DataFrame with maximum median records for each AEP group.") @@ -647,15 +545,13 @@ def find_aep_dur_mean(aggregated_df: pd.DataFrame) -> pd.DataFrame: ] info_columns: list[str] = ["low", "high", "count", "count_bin", "mean_storm_is_median_storm"] - ordered_cols: list[str] = [] - for group in (id_columns, mean_columns): - ordered_cols.extend([col for col in group if col in aep_dur_median.columns]) - - remaining_cols: list[str] = [ - col for col in aep_dur_median.columns if col not in ordered_cols and col not in info_columns - ] - ordered_cols.extend(remaining_cols) - ordered_cols.extend([col for col in info_columns if col in aep_dur_median.columns]) + # Keep the mean-focused statistics grouped together; later callers drop any + # residual median columns so the mean workflow can diverge independently. + ordered_cols: list[str] = _ordered_columns( + df=aep_dur_median, + column_groups=(id_columns, mean_columns), + info_columns=info_columns, + ) return aep_dur_median[ordered_cols] @@ -699,15 +595,11 @@ def find_aep_mean_max(aep_dur_mean: pd.DataFrame) -> pd.DataFrame: "mean_bin", ] - ordered_cols: list[str] = [] - for group in (id_columns, mean_columns): - ordered_cols.extend([col for col in group if col in aep_mean_max.columns]) - - remaining_cols: list[str] = [ - col for col in aep_mean_max.columns if col not in ordered_cols and col not in info_columns - ] - ordered_cols.extend(remaining_cols) - ordered_cols.extend([col for col in info_columns if col in aep_mean_max.columns]) + ordered_cols: list[str] = _ordered_columns( + df=aep_mean_max, + column_groups=(id_columns, mean_columns), + info_columns=info_columns, + ) aep_mean_max = aep_mean_max[ordered_cols] diff --git a/setup.py b/setup.py index 5fd09cc6..5db000f2 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ name="ryan_functions", # Version scheme: yy.mm.dd.release_number # Increment when publishing new wheels - version="25.11.07.4", + version="25.11.16.1", packages=find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests"]), include_package_data=True, # Include package data as specified in MANIFEST.in # package_data={"ryan_library": ["py.typed"]}, diff --git a/tests/functions/test_file_utils_small.py b/tests/functions/test_file_utils_small.py new file mode 100644 index 00000000..726606f3 --- /dev/null +++ b/tests/functions/test_file_utils_small.py @@ -0,0 +1,77 @@ +from pathlib import Path + + +from ryan_library.functions.file_utils import ( + ensure_output_directory, + find_files_parallel, + is_non_zero_file, +) + + +def _write_file(path: Path, content: str = "data") -> None: + path.write_text(content, encoding="utf-8") + + +def test_find_files_parallel_non_recursive_respects_excludes(tmp_path: Path) -> None: + root = tmp_path / "root" + sub = root / "nested" + sub.mkdir(parents=True) + keep = root / "keep.txt" + skip = root / "skip.txt" + nested = sub / "nested.txt" + _write_file(keep) + _write_file(skip) + _write_file(nested) + + results = find_files_parallel( + root_dirs=[root], + patterns="*.txt", + excludes=["skip.txt"], + recursive_search=False, + report_level=None, + print_found_folder=False, + ) + + assert results == [keep.resolve()] + + +def test_find_files_parallel_deduplicates_overlapping_roots(tmp_path: Path) -> None: + root = tmp_path / "root" + nested = root / "nested" + nested.mkdir(parents=True) + target = nested / "match.csv" + _write_file(target) + + results = find_files_parallel( + root_dirs=[root, nested], + patterns="*.csv", + recursive_search=True, + report_level=None, + print_found_folder=False, + ) + + assert results == [target.resolve()] + + +def test_is_non_zero_file_variants(tmp_path: Path) -> None: + data_file = tmp_path / "data.txt" + empty_file = tmp_path / "empty.txt" + directory = tmp_path / "dir" + directory.mkdir() + _write_file(data_file) + empty_file.touch() + + assert is_non_zero_file(data_file) + assert is_non_zero_file(str(data_file)) + assert not is_non_zero_file(empty_file) + assert not is_non_zero_file(directory) + assert not is_non_zero_file(tmp_path / "missing.txt") + + +def test_ensure_output_directory_creates_missing(tmp_path: Path) -> None: + out_dir = tmp_path / "exports" + ensure_output_directory(out_dir) + assert out_dir.exists() and out_dir.is_dir() + + # second invocation should be a no-op + ensure_output_directory(out_dir) diff --git a/tests/functions/test_median_calc.py b/tests/functions/test_median_calc.py index 32933692..bea65e64 100644 --- a/tests/functions/test_median_calc.py +++ b/tests/functions/test_median_calc.py @@ -1,10 +1,9 @@ -import pandas as pd import numpy as np - +import pandas as pd from ryan_library.functions.pandas.median_calc import ( - _median_stats_for_group, median_calc, + summarise_duration_statistics, ) @@ -20,7 +19,7 @@ def make_df(values, tps, dur) -> pd.DataFrame: def test_median_stats_for_group_odd() -> None: df: pd.DataFrame = make_df([1, 2, 3], ["A", "B", "C"], ["5", "5", "5"]) - stats: dict = _median_stats_for_group(df, "val", "tp", "dur") + stats: dict = summarise_duration_statistics(df, "val", "tp", "dur") assert stats["median"] == 2 assert stats["low"] == 1 assert stats["high"] == 3 @@ -32,7 +31,7 @@ def test_median_stats_for_group_odd() -> None: def test_median_stats_for_group_even() -> None: df: pd.DataFrame = make_df([4, 1, 3, 2], ["A", "B", "C", "D"], ["1", "1", "1", "1"]) - stats: dict = _median_stats_for_group(df, "val", "tp", "dur") + stats: dict = summarise_duration_statistics(df, "val", "tp", "dur") # sorted values [1,2,3,4], median index 2 -> value 3 assert stats["median"] == 3 assert stats["low"] == 1 @@ -45,7 +44,7 @@ def test_median_stats_for_group_even() -> None: def test_median_stats_for_group_zeros() -> None: df: pd.DataFrame = make_df([0, 0, 0], ["A", "B", "C"], ["d", "d", "d"]) - stats = _median_stats_for_group(df, "val", "tp", "dur") + stats = summarise_duration_statistics(df, "val", "tp", "dur") assert stats["median"] == 0 assert stats["low"] == 0 assert stats["high"] == 0 diff --git a/tests/functions/test_pomm_combine.py b/tests/functions/test_pomm_combine.py new file mode 100644 index 00000000..2bc293a9 --- /dev/null +++ b/tests/functions/test_pomm_combine.py @@ -0,0 +1,85 @@ +from datetime import datetime +from pathlib import Path + +import pandas as pd +from pandas import DataFrame +import pytest + +import ryan_library.functions.misc_functions as misc_functions +from ryan_library.functions.tuflow import pomm_combine as pomm_module + + +class _FixedDateTime(datetime): + @classmethod + def now(cls, tz=None): # type: ignore[override] + return cls(2024, 1, 2, 3, 4) + + +class _DummyResults: + def __init__(self, df: pd.DataFrame, processors: list[object] | None = None) -> None: + self._df: DataFrame = df + self.processors: list[object] = processors if processors is not None else [object()] + + def pomm_combine(self) -> pd.DataFrame: + return self._df + + +@pytest.fixture(autouse=True) +def _fixed_time(monkeypatch: pytest.MonkeyPatch) -> str: + monkeypatch.setattr(misc_functions, "datetime", _FixedDateTime) + return "20240102-0304" + + +@pytest.fixture(autouse=True) +def _stub_parquet(monkeypatch: pytest.MonkeyPatch) -> None: + def _fake_to_parquet(self, path, *args, **kwargs): + Path(path).write_text("parquet", encoding="utf-8") + + monkeypatch.setattr(pd.DataFrame, "to_parquet", _fake_to_parquet, raising=False) + + +@pytest.fixture() +def temp_cwd(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: + monkeypatch.chdir(tmp_path) + return tmp_path + + +def _list_outputs(directory: Path) -> list[Path]: + return sorted(directory.iterdir()) + + +def test_export_results_excel_only(temp_cwd: Path) -> None: + df = pd.DataFrame({"A": [1]}) + pomm_module.export_results(results=_DummyResults(df), export_mode="excel") + + outputs: list[Path] = _list_outputs(temp_cwd) + assert len(outputs) == 1 + assert outputs[0].suffix == ".xlsx" + assert outputs[0].name == "20240102-0304_combined_POMM.xlsx" + + +def test_export_results_parquet_only(temp_cwd: Path) -> None: + df = pd.DataFrame({"A": [1]}) + pomm_module.export_results(results=_DummyResults(df), export_mode="parquet") + + outputs: list[Path] = _list_outputs(temp_cwd) + assert len(outputs) == 1 + assert outputs[0].suffixes == [".parquet", ".gzip"] + assert outputs[0].name == "20240102-0304_combined_POMM_combined_POMM.parquet.gzip" + + +def test_export_results_both_formats(temp_cwd: Path) -> None: + df = pd.DataFrame({"B": [1]}) + pomm_module.export_results(results=_DummyResults(df), export_mode="both") + + outputs = _list_outputs(temp_cwd) + assert len(outputs) == 2 + assert {p.suffix for p in outputs} == {".xlsx", ".gzip"} + + +def test_export_results_no_processors_produces_no_files(temp_cwd: Path) -> None: + df = pd.DataFrame({"B": [1]}) + results = _DummyResults(df, processors=[]) + pomm_module.export_results(results=results, export_mode="excel") + + assert list(temp_cwd.iterdir()) == [] diff --git a/tests/sample_scenarios.json b/tests/sample_scenarios.json new file mode 100644 index 00000000..0356e86f --- /dev/null +++ b/tests/sample_scenarios.json @@ -0,0 +1,95 @@ +[ + { + "index": 0, + "internalName": "Sawmill Creek South", + "Chan ID": "C-001", + "Q": 0.01, + "V": "", + "US_h": 95.5, + "DS_h": 94.8, + "Length": 34.5, + "n or Cd": 0.024, + "US Invert": 94.0, + "DS Invert": 93.5, + "Height": 0.9, + "number_interp": 1, + "roadway_crest": 105.0 + }, + { + "index": 1, + "internalName": "Sawmill Creek Spur", + "Chan ID": "C-001A", + "Q": 0.02, + "V": "", + "US_h": 95.5, + "DS_h": 94.6, + "Length": 32.0, + "n or Cd": 0.024, + "US Invert": 94.1, + "DS Invert": 93.7, + "Height": 0.9, + "number_interp": 3, + "roadway_crest": 105.0 + }, + { + "index": 2, + "internalName": "Dry Gulch Crossing", + "Chan ID": "C-002", + "Q": 0.03, + "V": "", + "US_h": 96.4, + "DS_h": 95.9, + "Length": 28.0, + "n or Cd": 0.022, + "US Invert": 95.2, + "DS Invert": 95.0, + "Height": 1.0, + "number_interp": 1 + }, + { + "index": 3, + "internalName": "Dry Gulch Spur", + "Chan ID": "C-002A", + "Q": 0.01, + "V": "", + "US_h": 96.4, + "DS_h": 95.8, + "Length": 30.0, + "n or Cd": 0.025, + "US Invert": 95.4, + "DS Invert": 95.1, + "Height": 0.8, + "number_interp": 1 + }, + { + "index": 4, + "internalName": "Irrigation Canal", + "Chan ID": "C-003", + "Q": 0.01, + "V": "", + "US_h": 96.9, + "DS_h": 96.1, + "Length": 36.0, + "n or Cd": 0.024, + "US Invert": 95.7, + "DS Invert": 95.6, + "Height": 1.1, + "number_interp": 2 + }, + { + "index": 5, + "internalName": "Coastal Relief", + "Chan ID": "C-004", + "Q": 0.05, + "V": "", + "US_h": 14.8, + "DS_h": 14.2, + "Length": 20.0, + "n or Cd": 0.015, + "US Invert": 13.5, + "DS Invert": 13.1, + "Height": 1.5, + "number_interp": 2, + "roadway_crest": 22.0 + } +] diff --git a/tests/test_data/tuflow/tutorials/TUFLOW_Culvert_Merge.py b/tests/test_data/tuflow/tutorials/TUFLOW_Culvert_Merge.py deleted file mode 100644 index 6e621b3f..00000000 --- a/tests/test_data/tuflow/tutorials/TUFLOW_Culvert_Merge.py +++ /dev/null @@ -1,33 +0,0 @@ -# TUFLOW_Culvert_Merge.py - -from pathlib import Path -import os -from ryan_library.scripts.tuflow_culverts_merge import main_processing - - -def main(): - """ - Wrapper script to merge culvert results - By default, it processes files in the directory where the script is located. - """ - - try: - # Determine the script directory - script_directory: Path = Path(__file__).absolute().parent - script_directory = Path( - # r"Q:\Library\Automation\ryan-tools\tests\test_data\tuflow\tutorials" - r"E:\Library\Automation\ryan-tools\tests\test_data\tuflow\tutorials" - ) - os.chdir(script_directory) - - # You can pass a list of paths here if needed; default is the script directory - main_processing([script_directory], console_log_level="DEBUG") - except Exception as e: - print(f"Failed to change working directory: {e}") - os.system("PAUSE") - exit(1) - - -if __name__ == "__main__": - main() - os.system("PAUSE") diff --git a/vendor/__init__.py b/vendor/__init__.py index 211e1901..7b2a5ab3 100644 --- a/vendor/__init__.py +++ b/vendor/__init__.py @@ -1 +1,14 @@ -from .PyHMA import * +from __future__ import annotations + +import sys +from pathlib import Path + +# Re-export PyHMA to maintain existing imports. +from .PyHMA import * # noqa: F401,F403 + +# Make the run-hy8 submodule importable without installing it separately. +RUN_HY8_SRC: Path = Path(__file__).with_name("run_hy8") / "src" +if RUN_HY8_SRC.is_dir(): + run_hy8_src = str(RUN_HY8_SRC.resolve()) + if run_hy8_src not in sys.path: + sys.path.append(run_hy8_src) diff --git a/vendor/run_hy8 b/vendor/run_hy8 new file mode 160000 index 00000000..6b84b9ea --- /dev/null +++ b/vendor/run_hy8 @@ -0,0 +1 @@ +Subproject commit 6b84b9eabcf2960798f0b2ab6d7fbe670b1cfc85 diff --git a/vendor/run_hy8.UPSTREAM b/vendor/run_hy8.UPSTREAM new file mode 100644 index 00000000..2c568e12 --- /dev/null +++ b/vendor/run_hy8.UPSTREAM @@ -0,0 +1,8 @@ +Repository: https://github.com/Chain-Frost/run-hy8.git +Commit: 6b84b9eabcf2960798f0b2ab6d7fbe670b1cfc85 (detached) +Retrieved: 2025-11-16 + +Update instructions: +1. From repository root run `git submodule update --remote vendor/run_hy8`. +2. Verify the new commit hash matches expectations and update this file. +3. Commit the submodule pointer change alongside any dependent code changes.