diff --git a/ryan_library/processors/tuflow/HProcessor.py b/ryan_library/processors/tuflow/HProcessor.py index 49d95f88..b977e44e 100644 --- a/ryan_library/processors/tuflow/HProcessor.py +++ b/ryan_library/processors/tuflow/HProcessor.py @@ -1,24 +1,70 @@ -"""Placeholder processor for TUFLOW H timeseries data. +"""Processor for TUFLOW ``_H`` timeseries outputs.""" -This stub will be implemented once the Q-processor path is stable so the shared -infrastructure can be reused confidently. -""" +"""not tested, don't know if it works even slightly""" from __future__ import annotations import pandas as pd +from loguru import logger -from .base_processor import BaseProcessor +from .base_processor import ProcessorStatus +from .timeseries_processor import TimeSeriesProcessor -class HProcessor(BaseProcessor): - """Stub processor for `_1d_H.csv` files until the Q-processor path is stable.""" +class HProcessor(TimeSeriesProcessor): + """Handle water level (``H``) timeseries files with upstream/downstream values.""" - def process(self) -> pd.DataFrame: - """Raise ``NotImplementedError`` until H processing is available. + def process(self) -> pd.DataFrame: # type: ignore[override] + """Process a ``_H`` CSV using the shared timeseries pipeline.""" - TODO: Implement HProcessor once the Q-processor path is stable. - """ - raise NotImplementedError( - "HProcessor is not implemented. It will be completed once the Q-processor path is stable." - ) + return self._process_timeseries_pipeline(data_type="H") + + def process_timeseries_raw_dataframe(self) -> ProcessorStatus: + """Normalise the dual-value timeseries DataFrame produced by the shared pipeline.""" + + try: + logger.debug(f"{self.file_name}: Normalising reshaped 'H' DataFrame.") + + required_columns: set[str] = {"Time", "H_US", "H_DS"} + missing_columns: set[str] = required_columns - set(self.df.columns) + if missing_columns: + logger.error(f"{self.file_name}: Missing required columns after melt: {sorted(missing_columns)}.") + return ProcessorStatus.FAILURE + + identifier_columns: list[str] = [col for col in self.df.columns if col not in required_columns] + if not identifier_columns: + logger.error(f"{self.file_name}: No identifier column found alongside 'H_US'/'H_DS'.") + return ProcessorStatus.FAILURE + if len(identifier_columns) > 1: + identifier_error: str = ( + f"{self.file_name}: Expected a single identifier column alongside 'Time', " + f"'H_US' and 'H_DS', got {identifier_columns}." + ) + logger.error(identifier_error) + return ProcessorStatus.FAILURE + + identifier_column: str = identifier_columns[0] + logger.debug(f"{self.file_name}: Using '{identifier_column}' as the identifier column for 'H' values.") + + initial_row_count: int = len(self.df) + self.df.dropna(subset=["H_US", "H_DS"], how="all", inplace=True) + dropped_rows: int = initial_row_count - len(self.df) + if dropped_rows: + logger.debug(f"{self.file_name}: Dropped {dropped_rows} rows with missing 'H' values.") + + if self.df.empty: + logger.error(f"{self.file_name}: DataFrame is empty after removing rows with missing 'H' values.") + return ProcessorStatus.EMPTY_DATAFRAME + + expected_order: list[str] = ["Time", identifier_column, "H_US", "H_DS"] + self.df = self.df[expected_order] + + if not self.check_headers_match(test_headers=self.df.columns.tolist()): + logger.error(f"{self.file_name}: Header mismatch after normalising 'H' DataFrame.") + return ProcessorStatus.HEADER_MISMATCH + + logger.info(f"{self.file_name}: Successfully normalised 'H' DataFrame for downstream processing.") + return ProcessorStatus.SUCCESS + except Exception as exc: # pragma: no cover - defensive logging + logger.exception(f"{self.file_name}: Unexpected error while normalising 'H' DataFrame: {exc}") + return ProcessorStatus.FAILURE diff --git a/ryan_library/processors/tuflow/processor_collection.py b/ryan_library/processors/tuflow/processor_collection.py index e6ccb1d1..87e4db64 100644 --- a/ryan_library/processors/tuflow/processor_collection.py +++ b/ryan_library/processors/tuflow/processor_collection.py @@ -126,6 +126,19 @@ def combine_1d_timeseries(self) -> pd.DataFrame: combined_df = reorder_long_columns(df=combined_df) grouped_df: DataFrame = combined_df.groupby(group_keys).agg("max").reset_index() # type: ignore + + value_columns: list[str] = [col for col in grouped_df.columns if col not in group_keys] + if value_columns: + before_drop: int = len(grouped_df) + grouped_df.dropna(subset=value_columns, how="all", inplace=True) + after_drop: int = len(grouped_df) + if after_drop != before_drop: + logger.debug( + "Dropped %d all-null rows while combining Timeseries outputs.", + before_drop - after_drop, + ) + grouped_df.reset_index(drop=True, inplace=True) + logger.debug(f"Grouped {len(timeseries_processors)} Timeseries DataFrame with {len(grouped_df)} rows.") return grouped_df diff --git a/ryan_library/processors/tuflow/timeseries_processor.py b/ryan_library/processors/tuflow/timeseries_processor.py index f2f41b47..215afc5b 100644 --- a/ryan_library/processors/tuflow/timeseries_processor.py +++ b/ryan_library/processors/tuflow/timeseries_processor.py @@ -273,6 +273,8 @@ def _apply_final_transformations(self, data_type: str) -> None: if data_type == "H": col_types.update({"H_US": "float64", "H_DS": "float64"}) + else: + col_types[data_type] = "float64" self.apply_dtype_mapping(dtype_mapping=col_types, context="final_transformations") diff --git a/tests/processors/__init__.py b/tests/processors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/processors/tuflow/__init__.py b/tests/processors/tuflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/processors/tuflow/test_h_and_nmx_processors.py b/tests/processors/tuflow/test_h_and_nmx_processors.py new file mode 100644 index 00000000..c7bf5341 --- /dev/null +++ b/tests/processors/tuflow/test_h_and_nmx_processors.py @@ -0,0 +1,183 @@ +"""Regression coverage for TUFLOW timeseries and maximum processors.""" + +from __future__ import annotations + +import os +from contextlib import contextmanager +from pathlib import Path + +import numpy as np +import pandas as pd +import pandas.testing as pdt + +from ryan_library.processors.tuflow import NmxProcessor +from ryan_library.processors.tuflow.HProcessor import HProcessor +from ryan_library.processors.tuflow.processor_collection import ProcessorCollection + + +@contextmanager +def change_working_directory(path: Path) -> None: + """Temporarily change the working directory for processors relying on ``Path.cwd()``.""" + + original_cwd = Path.cwd() + os.chdir(path) + try: + yield + finally: + os.chdir(original_cwd) + + +def _write_h_csv(directory: Path, file_name: str) -> Path: + """Create a minimal `_1d_H.csv` file for testing.""" + + data = pd.DataFrame( + { + "Descriptor": ["row0", "row1", "row2"], + "Time": [0.0, 1.0, 2.0], + "H AAA_US": [1.0, 1.5, None], + "H AAA_DS": [2.0, None, None], + "H BBB_US": [3.0, 3.5, 4.5], + "H BBB_DS": [4.0, None, 4.8], + } + ) + path = directory / file_name + data.to_csv(path, index=False) + return path + + +def _write_nmx_csv(directory: Path, file_name: str) -> Path: + """Create a minimal `_1d_Nmx.csv` file for testing upstream/downstream pivots.""" + + data = pd.DataFrame( + { + "Node ID": ["CULVERT.1", "CULVERT.2", "CULVERT2.1"], + "Hmax": [1.2, 1.4, 2.0], + "Time Hmax": [10.0, 10.0, 12.0], + } + ) + path = directory / file_name + data.to_csv(path, index=False) + return path + + +def test_h_processor_normalises_dual_timeseries(tmp_path: Path) -> None: + """HProcessor should reshape upstream/downstream heads into tidy columns.""" + + with change_working_directory(tmp_path): + csv_path = _write_h_csv(Path.cwd(), "TestRun_1d_H.csv") + processor = HProcessor(file_path=csv_path) + df = processor.process() + + assert not df.empty + + observed = df[["Time", "Chan ID", "H_US", "H_DS"]].sort_values(["Chan ID", "Time"]).reset_index(drop=True) + + expected = pd.DataFrame( + [ + (0.0, "AAA", 1.0, 2.0), + (1.0, "AAA", 1.5, np.nan), + (0.0, "BBB", 3.0, 4.0), + (1.0, "BBB", 3.5, np.nan), + (2.0, "BBB", 4.5, 4.8), + ], + columns=["Time", "Chan ID", "H_US", "H_DS"], + ) + + expected = expected.sort_values(["Chan ID", "Time"]).reset_index(drop=True) + + observed = observed.astype({"Chan ID": "string"}) + expected = expected.astype({"Chan ID": "string"}) + + pdt.assert_frame_equal(observed, expected, check_dtype=False) + + # Ensure rows with no upstream/downstream data were discarded (AAA at Time 2.0). + assert (observed["Chan ID"] == "AAA").sum() == 2 + + +def test_collection_combines_h_timeseries(tmp_path: Path) -> None: + """ProcessorCollection should preserve both H columns when aggregating timeseries.""" + + with change_working_directory(tmp_path): + csv_path = _write_h_csv(Path.cwd(), "Combo_1d_H.csv") + processor = HProcessor(file_path=csv_path) + processor.process() + + collection = ProcessorCollection() + collection.add_processor(processor) + + combined = ( + collection.combine_1d_timeseries()[["internalName", "Chan ID", "Time", "H_US", "H_DS"]] + .sort_values(["Chan ID", "Time"]) + .reset_index(drop=True) + ) + + expected = pd.DataFrame( + [ + ("Combo", "AAA", 0.0, 1.0, 2.0), + ("Combo", "AAA", 1.0, 1.5, np.nan), + ("Combo", "BBB", 0.0, 3.0, 4.0), + ("Combo", "BBB", 1.0, 3.5, np.nan), + ("Combo", "BBB", 2.0, 4.5, 4.8), + ], + columns=["internalName", "Chan ID", "Time", "H_US", "H_DS"], + ) + + expected = expected.sort_values(["Chan ID", "Time"]).reset_index(drop=True) + + combined = combined.astype({"internalName": "string", "Chan ID": "string"}) + expected = expected.astype({"internalName": "string", "Chan ID": "string"}) + + pdt.assert_frame_equal(combined, expected, check_dtype=False) + + +def test_nmx_processor_pivots_upstream_downstream_nodes(tmp_path: Path) -> None: + """NmxProcessor should continue pivoting `.1`/`.2` suffixes into US/DS columns.""" + + with change_working_directory(tmp_path): + csv_path = _write_nmx_csv(Path.cwd(), "NmxRun_1d_Nmx.csv") + processor = NmxProcessor(file_path=csv_path) + df = processor.process() + + observed = df[["Chan ID", "Time", "US_h", "DS_h"]].sort_values("Chan ID").reset_index(drop=True) + observed = observed.rename_axis(columns=None) + + expected = pd.DataFrame( + [ + ("CULVERT", 10.0, 1.2, 1.4), + ("CULVERT2", 12.0, 2.0, np.nan), + ], + columns=["Chan ID", "Time", "US_h", "DS_h"], + ) + expected = expected.sort_values("Chan ID").reset_index(drop=True) + expected = expected.rename_axis(columns=None) + + observed = observed.astype({"Chan ID": "string"}) + expected = expected.astype({"Chan ID": "string"}) + + pdt.assert_frame_equal(observed, expected, check_dtype=False) + + collection = ProcessorCollection() + collection.add_processor(processor) + + combined = ( + collection.combine_1d_maximums()[["internalName", "Chan ID", "US_h", "DS_h"]] + .sort_values("Chan ID") + .reset_index(drop=True) + ) + combined = combined.rename_axis(columns=None) + + expected_combined = pd.DataFrame( + [ + ("NmxRun", "CULVERT", 1.2, 1.4), + ("NmxRun", "CULVERT2", 2.0, np.nan), + ], + columns=["internalName", "Chan ID", "US_h", "DS_h"], + ) + + expected_combined = expected_combined.sort_values("Chan ID").reset_index(drop=True) + expected_combined = expected_combined.rename_axis(columns=None) + + combined = combined.astype({"internalName": "string", "Chan ID": "string"}) + expected_combined = expected_combined.astype({"internalName": "string", "Chan ID": "string"}) + + pdt.assert_frame_equal(combined, expected_combined, check_dtype=False) diff --git a/vendor/run_hy8 b/vendor/run_hy8 new file mode 160000 index 00000000..0567efb2 --- /dev/null +++ b/vendor/run_hy8 @@ -0,0 +1 @@ +Subproject commit 0567efb2cbde9e71269071fd2910369a1e0f1dd5