Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 60 additions & 14 deletions ryan_library/processors/tuflow/HProcessor.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,70 @@
"""Placeholder processor for TUFLOW H timeseries data.
"""Processor for TUFLOW ``_H`` timeseries outputs."""

This stub will be implemented once the Q-processor path is stable so the shared
infrastructure can be reused confidently.
"""
"""not tested, don't know if it works even slightly"""

from __future__ import annotations

import pandas as pd
from loguru import logger

from .base_processor import BaseProcessor
from .base_processor import ProcessorStatus
from .timeseries_processor import TimeSeriesProcessor


class HProcessor(BaseProcessor):
"""Stub processor for `_1d_H.csv` files until the Q-processor path is stable."""
class HProcessor(TimeSeriesProcessor):
"""Handle water level (``H``) timeseries files with upstream/downstream values."""

def process(self) -> pd.DataFrame:
"""Raise ``NotImplementedError`` until H processing is available.
def process(self) -> pd.DataFrame: # type: ignore[override]
"""Process a ``_H`` CSV using the shared timeseries pipeline."""

TODO: Implement HProcessor once the Q-processor path is stable.
"""
raise NotImplementedError(
"HProcessor is not implemented. It will be completed once the Q-processor path is stable."
)
return self._process_timeseries_pipeline(data_type="H")

def process_timeseries_raw_dataframe(self) -> ProcessorStatus:
"""Normalise the dual-value timeseries DataFrame produced by the shared pipeline."""

try:
logger.debug(f"{self.file_name}: Normalising reshaped 'H' DataFrame.")

required_columns: set[str] = {"Time", "H_US", "H_DS"}
missing_columns: set[str] = required_columns - set(self.df.columns)
if missing_columns:
logger.error(f"{self.file_name}: Missing required columns after melt: {sorted(missing_columns)}.")
return ProcessorStatus.FAILURE

identifier_columns: list[str] = [col for col in self.df.columns if col not in required_columns]
if not identifier_columns:
logger.error(f"{self.file_name}: No identifier column found alongside 'H_US'/'H_DS'.")
return ProcessorStatus.FAILURE
if len(identifier_columns) > 1:
identifier_error: str = (
f"{self.file_name}: Expected a single identifier column alongside 'Time', "
f"'H_US' and 'H_DS', got {identifier_columns}."
)
logger.error(identifier_error)
return ProcessorStatus.FAILURE

identifier_column: str = identifier_columns[0]
logger.debug(f"{self.file_name}: Using '{identifier_column}' as the identifier column for 'H' values.")

initial_row_count: int = len(self.df)
self.df.dropna(subset=["H_US", "H_DS"], how="all", inplace=True)
dropped_rows: int = initial_row_count - len(self.df)
if dropped_rows:
logger.debug(f"{self.file_name}: Dropped {dropped_rows} rows with missing 'H' values.")

if self.df.empty:
logger.error(f"{self.file_name}: DataFrame is empty after removing rows with missing 'H' values.")
return ProcessorStatus.EMPTY_DATAFRAME

expected_order: list[str] = ["Time", identifier_column, "H_US", "H_DS"]
self.df = self.df[expected_order]

if not self.check_headers_match(test_headers=self.df.columns.tolist()):
logger.error(f"{self.file_name}: Header mismatch after normalising 'H' DataFrame.")
return ProcessorStatus.HEADER_MISMATCH

logger.info(f"{self.file_name}: Successfully normalised 'H' DataFrame for downstream processing.")
return ProcessorStatus.SUCCESS
except Exception as exc: # pragma: no cover - defensive logging
logger.exception(f"{self.file_name}: Unexpected error while normalising 'H' DataFrame: {exc}")
return ProcessorStatus.FAILURE
13 changes: 13 additions & 0 deletions ryan_library/processors/tuflow/processor_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ def combine_1d_timeseries(self) -> pd.DataFrame:
combined_df = reorder_long_columns(df=combined_df)

grouped_df: DataFrame = combined_df.groupby(group_keys).agg("max").reset_index() # type: ignore

value_columns: list[str] = [col for col in grouped_df.columns if col not in group_keys]
if value_columns:
before_drop: int = len(grouped_df)
grouped_df.dropna(subset=value_columns, how="all", inplace=True)
after_drop: int = len(grouped_df)
if after_drop != before_drop:
logger.debug(
"Dropped %d all-null rows while combining Timeseries outputs.",
before_drop - after_drop,
)
grouped_df.reset_index(drop=True, inplace=True)

logger.debug(f"Grouped {len(timeseries_processors)} Timeseries DataFrame with {len(grouped_df)} rows.")

return grouped_df
Expand Down
2 changes: 2 additions & 0 deletions ryan_library/processors/tuflow/timeseries_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ def _apply_final_transformations(self, data_type: str) -> None:

if data_type == "H":
col_types.update({"H_US": "float64", "H_DS": "float64"})
else:
col_types[data_type] = "float64"

self.apply_dtype_mapping(dtype_mapping=col_types, context="final_transformations")

Expand Down
Empty file added tests/processors/__init__.py
Empty file.
Empty file.
183 changes: 183 additions & 0 deletions tests/processors/tuflow/test_h_and_nmx_processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""Regression coverage for TUFLOW timeseries and maximum processors."""

from __future__ import annotations

import os
from contextlib import contextmanager
from pathlib import Path

import numpy as np
import pandas as pd
import pandas.testing as pdt

from ryan_library.processors.tuflow import NmxProcessor
from ryan_library.processors.tuflow.HProcessor import HProcessor
from ryan_library.processors.tuflow.processor_collection import ProcessorCollection


@contextmanager
def change_working_directory(path: Path) -> None:
"""Temporarily change the working directory for processors relying on ``Path.cwd()``."""

original_cwd = Path.cwd()
os.chdir(path)
try:
yield
finally:
os.chdir(original_cwd)


def _write_h_csv(directory: Path, file_name: str) -> Path:
"""Create a minimal `_1d_H.csv` file for testing."""

data = pd.DataFrame(
{
"Descriptor": ["row0", "row1", "row2"],
"Time": [0.0, 1.0, 2.0],
"H AAA_US": [1.0, 1.5, None],
"H AAA_DS": [2.0, None, None],
"H BBB_US": [3.0, 3.5, 4.5],
"H BBB_DS": [4.0, None, 4.8],
}
)
path = directory / file_name
data.to_csv(path, index=False)
return path


def _write_nmx_csv(directory: Path, file_name: str) -> Path:
"""Create a minimal `_1d_Nmx.csv` file for testing upstream/downstream pivots."""

data = pd.DataFrame(
{
"Node ID": ["CULVERT.1", "CULVERT.2", "CULVERT2.1"],
"Hmax": [1.2, 1.4, 2.0],
"Time Hmax": [10.0, 10.0, 12.0],
}
)
path = directory / file_name
data.to_csv(path, index=False)
return path


def test_h_processor_normalises_dual_timeseries(tmp_path: Path) -> None:
"""HProcessor should reshape upstream/downstream heads into tidy columns."""

with change_working_directory(tmp_path):
csv_path = _write_h_csv(Path.cwd(), "TestRun_1d_H.csv")
processor = HProcessor(file_path=csv_path)
df = processor.process()

assert not df.empty

observed = df[["Time", "Chan ID", "H_US", "H_DS"]].sort_values(["Chan ID", "Time"]).reset_index(drop=True)

expected = pd.DataFrame(
[
(0.0, "AAA", 1.0, 2.0),
(1.0, "AAA", 1.5, np.nan),
(0.0, "BBB", 3.0, 4.0),
(1.0, "BBB", 3.5, np.nan),
(2.0, "BBB", 4.5, 4.8),
],
columns=["Time", "Chan ID", "H_US", "H_DS"],
)

expected = expected.sort_values(["Chan ID", "Time"]).reset_index(drop=True)

observed = observed.astype({"Chan ID": "string"})
expected = expected.astype({"Chan ID": "string"})

pdt.assert_frame_equal(observed, expected, check_dtype=False)

# Ensure rows with no upstream/downstream data were discarded (AAA at Time 2.0).
assert (observed["Chan ID"] == "AAA").sum() == 2


def test_collection_combines_h_timeseries(tmp_path: Path) -> None:
"""ProcessorCollection should preserve both H columns when aggregating timeseries."""

with change_working_directory(tmp_path):
csv_path = _write_h_csv(Path.cwd(), "Combo_1d_H.csv")
processor = HProcessor(file_path=csv_path)
processor.process()

collection = ProcessorCollection()
collection.add_processor(processor)

combined = (
collection.combine_1d_timeseries()[["internalName", "Chan ID", "Time", "H_US", "H_DS"]]
.sort_values(["Chan ID", "Time"])
.reset_index(drop=True)
)

expected = pd.DataFrame(
[
("Combo", "AAA", 0.0, 1.0, 2.0),
("Combo", "AAA", 1.0, 1.5, np.nan),
("Combo", "BBB", 0.0, 3.0, 4.0),
("Combo", "BBB", 1.0, 3.5, np.nan),
("Combo", "BBB", 2.0, 4.5, 4.8),
],
columns=["internalName", "Chan ID", "Time", "H_US", "H_DS"],
)

expected = expected.sort_values(["Chan ID", "Time"]).reset_index(drop=True)

combined = combined.astype({"internalName": "string", "Chan ID": "string"})
expected = expected.astype({"internalName": "string", "Chan ID": "string"})

pdt.assert_frame_equal(combined, expected, check_dtype=False)


def test_nmx_processor_pivots_upstream_downstream_nodes(tmp_path: Path) -> None:
"""NmxProcessor should continue pivoting `.1`/`.2` suffixes into US/DS columns."""

with change_working_directory(tmp_path):
csv_path = _write_nmx_csv(Path.cwd(), "NmxRun_1d_Nmx.csv")
processor = NmxProcessor(file_path=csv_path)
df = processor.process()

observed = df[["Chan ID", "Time", "US_h", "DS_h"]].sort_values("Chan ID").reset_index(drop=True)
observed = observed.rename_axis(columns=None)

expected = pd.DataFrame(
[
("CULVERT", 10.0, 1.2, 1.4),
("CULVERT2", 12.0, 2.0, np.nan),
],
columns=["Chan ID", "Time", "US_h", "DS_h"],
)
expected = expected.sort_values("Chan ID").reset_index(drop=True)
expected = expected.rename_axis(columns=None)

observed = observed.astype({"Chan ID": "string"})
expected = expected.astype({"Chan ID": "string"})

pdt.assert_frame_equal(observed, expected, check_dtype=False)

collection = ProcessorCollection()
collection.add_processor(processor)

combined = (
collection.combine_1d_maximums()[["internalName", "Chan ID", "US_h", "DS_h"]]
.sort_values("Chan ID")
.reset_index(drop=True)
)
combined = combined.rename_axis(columns=None)

expected_combined = pd.DataFrame(
[
("NmxRun", "CULVERT", 1.2, 1.4),
("NmxRun", "CULVERT2", 2.0, np.nan),
],
columns=["internalName", "Chan ID", "US_h", "DS_h"],
)

expected_combined = expected_combined.sort_values("Chan ID").reset_index(drop=True)
expected_combined = expected_combined.rename_axis(columns=None)

combined = combined.astype({"internalName": "string", "Chan ID": "string"})
expected_combined = expected_combined.astype({"internalName": "string", "Chan ID": "string"})

pdt.assert_frame_equal(combined, expected_combined, check_dtype=False)
1 change: 1 addition & 0 deletions vendor/run_hy8
Submodule run_hy8 added at 0567ef