From 937dcc67e46fd21f40b81bb104e4dd6cf54eea7a Mon Sep 17 00:00:00 2001 From: Chain-Frost Date: Thu, 18 Sep 2025 09:12:32 +0800 Subject: [PATCH 1/2] Add H timeseries processor and coverage --- ...flow_results_validation_and_datatypes.json | 9 +- ryan_library/processors/tuflow/HProcessor.py | 74 +++++-- .../processors/tuflow/processor_collection.py | 13 ++ .../processors/tuflow/timeseries_processor.py | 15 +- tests/processors/__init__.py | 0 tests/processors/tuflow/__init__.py | 0 .../tuflow/test_h_and_nmx_processors.py | 183 ++++++++++++++++++ 7 files changed, 266 insertions(+), 28 deletions(-) create mode 100644 tests/processors/__init__.py create mode 100644 tests/processors/tuflow/__init__.py create mode 100644 tests/processors/tuflow/test_h_and_nmx_processors.py diff --git a/ryan_library/classes/tuflow_results_validation_and_datatypes.json b/ryan_library/classes/tuflow_results_validation_and_datatypes.json index f40d1397..2103fc8d 100644 --- a/ryan_library/classes/tuflow_results_validation_and_datatypes.json +++ b/ryan_library/classes/tuflow_results_validation_and_datatypes.json @@ -197,13 +197,16 @@ "output_columns": { "Time": "float", "Chan ID": "string", - "US_H": "float", - "DS_H": "float" + "H_US": "float", + "H_DS": "float" }, "processingParts": { "dataformat": "Timeseries", "expected_in_header": [ - "H" + "Time", + "Chan ID", + "H_US", + "H_DS" ] } }, diff --git a/ryan_library/processors/tuflow/HProcessor.py b/ryan_library/processors/tuflow/HProcessor.py index 49d95f88..94d5c576 100644 --- a/ryan_library/processors/tuflow/HProcessor.py +++ b/ryan_library/processors/tuflow/HProcessor.py @@ -1,24 +1,68 @@ -"""Placeholder processor for TUFLOW H timeseries data. - -This stub will be implemented once the Q-processor path is stable so the shared -infrastructure can be reused confidently. -""" +"""Processor for TUFLOW ``_H`` timeseries outputs.""" from __future__ import annotations import pandas as pd +from loguru import logger + +from .base_processor import ProcessorStatus +from .timeseries_processor import TimeSeriesProcessor + + +class HProcessor(TimeSeriesProcessor): + """Handle water level (``H``) timeseries files with upstream/downstream values.""" + + def process(self) -> pd.DataFrame: # type: ignore[override] + """Process a ``_H`` CSV using the shared timeseries pipeline.""" + + return self._process_timeseries_pipeline(data_type="H") + + def process_timeseries_raw_dataframe(self) -> ProcessorStatus: + """Normalise the dual-value timeseries DataFrame produced by the shared pipeline.""" + + try: + logger.debug(f"{self.file_name}: Normalising reshaped 'H' DataFrame.") + + required_columns: set[str] = {"Time", "H_US", "H_DS"} + missing_columns: set[str] = required_columns - set(self.df.columns) + if missing_columns: + logger.error(f"{self.file_name}: Missing required columns after melt: {sorted(missing_columns)}.") + return ProcessorStatus.FAILURE + + identifier_columns: list[str] = [col for col in self.df.columns if col not in required_columns] + if not identifier_columns: + logger.error(f"{self.file_name}: No identifier column found alongside 'H_US'/'H_DS'.") + return ProcessorStatus.FAILURE + if len(identifier_columns) > 1: + identifier_error: str = ( + f"{self.file_name}: Expected a single identifier column alongside 'Time', " + f"'H_US' and 'H_DS', got {identifier_columns}." + ) + logger.error(identifier_error) + return ProcessorStatus.FAILURE + + identifier_column: str = identifier_columns[0] + logger.debug(f"{self.file_name}: Using '{identifier_column}' as the identifier column for 'H' values.") -from .base_processor import BaseProcessor + initial_row_count: int = len(self.df) + self.df.dropna(subset=["H_US", "H_DS"], how="all", inplace=True) + dropped_rows: int = initial_row_count - len(self.df) + if dropped_rows: + logger.debug(f"{self.file_name}: Dropped {dropped_rows} rows with missing 'H' values.") + if self.df.empty: + logger.error(f"{self.file_name}: DataFrame is empty after removing rows with missing 'H' values.") + return ProcessorStatus.EMPTY_DATAFRAME -class HProcessor(BaseProcessor): - """Stub processor for `_1d_H.csv` files until the Q-processor path is stable.""" + expected_order: list[str] = ["Time", identifier_column, "H_US", "H_DS"] + self.df = self.df[expected_order] - def process(self) -> pd.DataFrame: - """Raise ``NotImplementedError`` until H processing is available. + if not self.check_headers_match(test_headers=self.df.columns.tolist()): + logger.error(f"{self.file_name}: Header mismatch after normalising 'H' DataFrame.") + return ProcessorStatus.HEADER_MISMATCH - TODO: Implement HProcessor once the Q-processor path is stable. - """ - raise NotImplementedError( - "HProcessor is not implemented. It will be completed once the Q-processor path is stable." - ) + logger.info(f"{self.file_name}: Successfully normalised 'H' DataFrame for downstream processing.") + return ProcessorStatus.SUCCESS + except Exception as exc: # pragma: no cover - defensive logging + logger.exception(f"{self.file_name}: Unexpected error while normalising 'H' DataFrame: {exc}") + return ProcessorStatus.FAILURE diff --git a/ryan_library/processors/tuflow/processor_collection.py b/ryan_library/processors/tuflow/processor_collection.py index b9f32e8a..e5268192 100644 --- a/ryan_library/processors/tuflow/processor_collection.py +++ b/ryan_library/processors/tuflow/processor_collection.py @@ -76,6 +76,19 @@ def combine_1d_timeseries(self) -> pd.DataFrame: combined_df = reorder_long_columns(df=combined_df) grouped_df: DataFrame = combined_df.groupby(group_keys).agg("max").reset_index() + + value_columns: list[str] = [col for col in grouped_df.columns if col not in group_keys] + if value_columns: + before_drop: int = len(grouped_df) + grouped_df.dropna(subset=value_columns, how="all", inplace=True) + after_drop: int = len(grouped_df) + if after_drop != before_drop: + logger.debug( + "Dropped %d all-null rows while combining Timeseries outputs.", + before_drop - after_drop, + ) + grouped_df.reset_index(drop=True, inplace=True) + logger.debug(f"Grouped {len(timeseries_processors)} Timeseries DataFrame with {len(grouped_df)} rows.") return grouped_df diff --git a/ryan_library/processors/tuflow/timeseries_processor.py b/ryan_library/processors/tuflow/timeseries_processor.py index e59955f3..b1590454 100644 --- a/ryan_library/processors/tuflow/timeseries_processor.py +++ b/ryan_library/processors/tuflow/timeseries_processor.py @@ -46,9 +46,7 @@ def _process_timeseries_pipeline(self, data_type: str) -> pd.DataFrame: status = self.process_timeseries_raw_dataframe() if status is not ProcessorStatus.SUCCESS: - logger.error( - f"Processing aborted for file: {self.file_path} during {data_type} post-processing step." - ) + logger.error(f"Processing aborted for file: {self.file_path} during {data_type} post-processing step.") self.df = pd.DataFrame() return self.df @@ -270,13 +268,12 @@ def _apply_final_transformations(self, data_type: str) -> None: Args: data_type: Identifier of the main numeric value column in ``self.df``. """ - col_types: dict[str, str] = { - "Time": "float64", - data_type: "float64", - } + col_types: dict[str, str] = {"Time": "float64"} if data_type == "H": col_types.update({"H_US": "float64", "H_DS": "float64"}) + else: + col_types[data_type] = "float64" self.apply_dtype_mapping(dtype_mapping=col_types, context="final_transformations") @@ -298,9 +295,7 @@ def _normalise_value_dataframe(self, value_column: str) -> ProcessorStatus: required_columns: set[str] = {"Time", value_column} missing_columns: set[str] = required_columns - set(self.df.columns) if missing_columns: - logger.error( - f"{self.file_name}: Missing required columns after melt: {sorted(missing_columns)}." - ) + logger.error(f"{self.file_name}: Missing required columns after melt: {sorted(missing_columns)}.") return ProcessorStatus.FAILURE identifier_columns: list[str] = [col for col in self.df.columns if col not in required_columns] diff --git a/tests/processors/__init__.py b/tests/processors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/processors/tuflow/__init__.py b/tests/processors/tuflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/processors/tuflow/test_h_and_nmx_processors.py b/tests/processors/tuflow/test_h_and_nmx_processors.py new file mode 100644 index 00000000..c7bf5341 --- /dev/null +++ b/tests/processors/tuflow/test_h_and_nmx_processors.py @@ -0,0 +1,183 @@ +"""Regression coverage for TUFLOW timeseries and maximum processors.""" + +from __future__ import annotations + +import os +from contextlib import contextmanager +from pathlib import Path + +import numpy as np +import pandas as pd +import pandas.testing as pdt + +from ryan_library.processors.tuflow import NmxProcessor +from ryan_library.processors.tuflow.HProcessor import HProcessor +from ryan_library.processors.tuflow.processor_collection import ProcessorCollection + + +@contextmanager +def change_working_directory(path: Path) -> None: + """Temporarily change the working directory for processors relying on ``Path.cwd()``.""" + + original_cwd = Path.cwd() + os.chdir(path) + try: + yield + finally: + os.chdir(original_cwd) + + +def _write_h_csv(directory: Path, file_name: str) -> Path: + """Create a minimal `_1d_H.csv` file for testing.""" + + data = pd.DataFrame( + { + "Descriptor": ["row0", "row1", "row2"], + "Time": [0.0, 1.0, 2.0], + "H AAA_US": [1.0, 1.5, None], + "H AAA_DS": [2.0, None, None], + "H BBB_US": [3.0, 3.5, 4.5], + "H BBB_DS": [4.0, None, 4.8], + } + ) + path = directory / file_name + data.to_csv(path, index=False) + return path + + +def _write_nmx_csv(directory: Path, file_name: str) -> Path: + """Create a minimal `_1d_Nmx.csv` file for testing upstream/downstream pivots.""" + + data = pd.DataFrame( + { + "Node ID": ["CULVERT.1", "CULVERT.2", "CULVERT2.1"], + "Hmax": [1.2, 1.4, 2.0], + "Time Hmax": [10.0, 10.0, 12.0], + } + ) + path = directory / file_name + data.to_csv(path, index=False) + return path + + +def test_h_processor_normalises_dual_timeseries(tmp_path: Path) -> None: + """HProcessor should reshape upstream/downstream heads into tidy columns.""" + + with change_working_directory(tmp_path): + csv_path = _write_h_csv(Path.cwd(), "TestRun_1d_H.csv") + processor = HProcessor(file_path=csv_path) + df = processor.process() + + assert not df.empty + + observed = df[["Time", "Chan ID", "H_US", "H_DS"]].sort_values(["Chan ID", "Time"]).reset_index(drop=True) + + expected = pd.DataFrame( + [ + (0.0, "AAA", 1.0, 2.0), + (1.0, "AAA", 1.5, np.nan), + (0.0, "BBB", 3.0, 4.0), + (1.0, "BBB", 3.5, np.nan), + (2.0, "BBB", 4.5, 4.8), + ], + columns=["Time", "Chan ID", "H_US", "H_DS"], + ) + + expected = expected.sort_values(["Chan ID", "Time"]).reset_index(drop=True) + + observed = observed.astype({"Chan ID": "string"}) + expected = expected.astype({"Chan ID": "string"}) + + pdt.assert_frame_equal(observed, expected, check_dtype=False) + + # Ensure rows with no upstream/downstream data were discarded (AAA at Time 2.0). + assert (observed["Chan ID"] == "AAA").sum() == 2 + + +def test_collection_combines_h_timeseries(tmp_path: Path) -> None: + """ProcessorCollection should preserve both H columns when aggregating timeseries.""" + + with change_working_directory(tmp_path): + csv_path = _write_h_csv(Path.cwd(), "Combo_1d_H.csv") + processor = HProcessor(file_path=csv_path) + processor.process() + + collection = ProcessorCollection() + collection.add_processor(processor) + + combined = ( + collection.combine_1d_timeseries()[["internalName", "Chan ID", "Time", "H_US", "H_DS"]] + .sort_values(["Chan ID", "Time"]) + .reset_index(drop=True) + ) + + expected = pd.DataFrame( + [ + ("Combo", "AAA", 0.0, 1.0, 2.0), + ("Combo", "AAA", 1.0, 1.5, np.nan), + ("Combo", "BBB", 0.0, 3.0, 4.0), + ("Combo", "BBB", 1.0, 3.5, np.nan), + ("Combo", "BBB", 2.0, 4.5, 4.8), + ], + columns=["internalName", "Chan ID", "Time", "H_US", "H_DS"], + ) + + expected = expected.sort_values(["Chan ID", "Time"]).reset_index(drop=True) + + combined = combined.astype({"internalName": "string", "Chan ID": "string"}) + expected = expected.astype({"internalName": "string", "Chan ID": "string"}) + + pdt.assert_frame_equal(combined, expected, check_dtype=False) + + +def test_nmx_processor_pivots_upstream_downstream_nodes(tmp_path: Path) -> None: + """NmxProcessor should continue pivoting `.1`/`.2` suffixes into US/DS columns.""" + + with change_working_directory(tmp_path): + csv_path = _write_nmx_csv(Path.cwd(), "NmxRun_1d_Nmx.csv") + processor = NmxProcessor(file_path=csv_path) + df = processor.process() + + observed = df[["Chan ID", "Time", "US_h", "DS_h"]].sort_values("Chan ID").reset_index(drop=True) + observed = observed.rename_axis(columns=None) + + expected = pd.DataFrame( + [ + ("CULVERT", 10.0, 1.2, 1.4), + ("CULVERT2", 12.0, 2.0, np.nan), + ], + columns=["Chan ID", "Time", "US_h", "DS_h"], + ) + expected = expected.sort_values("Chan ID").reset_index(drop=True) + expected = expected.rename_axis(columns=None) + + observed = observed.astype({"Chan ID": "string"}) + expected = expected.astype({"Chan ID": "string"}) + + pdt.assert_frame_equal(observed, expected, check_dtype=False) + + collection = ProcessorCollection() + collection.add_processor(processor) + + combined = ( + collection.combine_1d_maximums()[["internalName", "Chan ID", "US_h", "DS_h"]] + .sort_values("Chan ID") + .reset_index(drop=True) + ) + combined = combined.rename_axis(columns=None) + + expected_combined = pd.DataFrame( + [ + ("NmxRun", "CULVERT", 1.2, 1.4), + ("NmxRun", "CULVERT2", 2.0, np.nan), + ], + columns=["internalName", "Chan ID", "US_h", "DS_h"], + ) + + expected_combined = expected_combined.sort_values("Chan ID").reset_index(drop=True) + expected_combined = expected_combined.rename_axis(columns=None) + + combined = combined.astype({"internalName": "string", "Chan ID": "string"}) + expected_combined = expected_combined.astype({"internalName": "string", "Chan ID": "string"}) + + pdt.assert_frame_equal(combined, expected_combined, check_dtype=False) From 7bb376113888f9f15d035da955060cbba0a65acb Mon Sep 17 00:00:00 2001 From: Chain Frost Date: Mon, 17 Nov 2025 00:00:37 +0800 Subject: [PATCH 2/2] tweaks --- .../classes/tuflow_results_validation_and_datatypes.json | 9 +++------ ryan_library/processors/tuflow/HProcessor.py | 2 ++ ryan_library/processors/tuflow/timeseries_processor.py | 5 ++++- vendor/run_hy8 | 1 + 4 files changed, 10 insertions(+), 7 deletions(-) create mode 160000 vendor/run_hy8 diff --git a/ryan_library/classes/tuflow_results_validation_and_datatypes.json b/ryan_library/classes/tuflow_results_validation_and_datatypes.json index 2103fc8d..f40d1397 100644 --- a/ryan_library/classes/tuflow_results_validation_and_datatypes.json +++ b/ryan_library/classes/tuflow_results_validation_and_datatypes.json @@ -197,16 +197,13 @@ "output_columns": { "Time": "float", "Chan ID": "string", - "H_US": "float", - "H_DS": "float" + "US_H": "float", + "DS_H": "float" }, "processingParts": { "dataformat": "Timeseries", "expected_in_header": [ - "Time", - "Chan ID", - "H_US", - "H_DS" + "H" ] } }, diff --git a/ryan_library/processors/tuflow/HProcessor.py b/ryan_library/processors/tuflow/HProcessor.py index 94d5c576..b977e44e 100644 --- a/ryan_library/processors/tuflow/HProcessor.py +++ b/ryan_library/processors/tuflow/HProcessor.py @@ -1,5 +1,7 @@ """Processor for TUFLOW ``_H`` timeseries outputs.""" +"""not tested, don't know if it works even slightly""" + from __future__ import annotations import pandas as pd diff --git a/ryan_library/processors/tuflow/timeseries_processor.py b/ryan_library/processors/tuflow/timeseries_processor.py index b1590454..141022af 100644 --- a/ryan_library/processors/tuflow/timeseries_processor.py +++ b/ryan_library/processors/tuflow/timeseries_processor.py @@ -268,7 +268,10 @@ def _apply_final_transformations(self, data_type: str) -> None: Args: data_type: Identifier of the main numeric value column in ``self.df``. """ - col_types: dict[str, str] = {"Time": "float64"} + col_types: dict[str, str] = { + "Time": "float64", + data_type: "float64", + } if data_type == "H": col_types.update({"H_US": "float64", "H_DS": "float64"}) diff --git a/vendor/run_hy8 b/vendor/run_hy8 new file mode 160000 index 00000000..0567efb2 --- /dev/null +++ b/vendor/run_hy8 @@ -0,0 +1 @@ +Subproject commit 0567efb2cbde9e71269071fd2910369a1e0f1dd5