From 342b493e830060de50ac9f81b929a49f20980224 Mon Sep 17 00:00:00 2001 From: Maleen Kidiwela <63271235+MaleenKidiwela@users.noreply.github.com> Date: Sun, 8 Feb 2026 23:05:49 -0800 Subject: [PATCH] Add structured range calculation workflow --- code/README.md | 16 ++- code/__init__.py | 31 ++++- code/fetch_range_calculation.ipynb | 85 ++++++++++++ code/positioning.py | 51 +++++++- code/pressure_moving_average.py | 57 +++++++++ code/range_calculation_main.py | 83 ++++++++++++ code/range_calculation_workflow.py | 124 ++++++++++++++++++ code/range_salinity.py | 199 +++++++++++++++++++++++++++++ code/tidal_correction.py | 68 ++++++++++ code/velocity_interpolation.py | 24 ++++ 10 files changed, 734 insertions(+), 4 deletions(-) create mode 100644 code/fetch_range_calculation.ipynb create mode 100644 code/pressure_moving_average.py create mode 100644 code/range_calculation_main.py create mode 100644 code/range_calculation_workflow.py create mode 100644 code/range_salinity.py create mode 100644 code/tidal_correction.py create mode 100644 code/velocity_interpolation.py diff --git a/code/README.md b/code/README.md index b39a981..6f19e07 100644 --- a/code/README.md +++ b/code/README.md @@ -90,6 +90,20 @@ Statistical and utility functions: - `normalize_data()`: Data normalization (z-score, min-max, robust) - `detect_outliers()`: Multiple outlier detection methods +### Range Calculation Workflow (FETCH Range Calculation Notebook) +Structured helpers extracted from `FETCH Range Calculation.ipynb`: +- `range_salinity.py`: Salinity gap stitching, smoothing, bottle calibration +- `tidal_correction.py`: Tidal prediction parsing and pressure correction +- `velocity_interpolation.py`: Interpolate recorded velocities to data timelines +- `pressure_moving_average.py`: 15-day moving average and re-basing helpers +- `range_calculation_workflow.py`: Data extraction and harmonic mean utilities +- `range_calculation_main.py`: Notebook-oriented main workflow entry point + +Use the included notebook scaffold: +```bash +jupyter notebook code/fetch_range_calculation.ipynb +``` + ## Example Analysis Workflow ```python @@ -207,4 +221,4 @@ This package was extracted from the FETCH StreamLine Final notebook to create a ## License -This code is derived from the FETCH StreamLine project for oceanographic research. \ No newline at end of file +This code is derived from the FETCH StreamLine project for oceanographic research. diff --git a/code/__init__.py b/code/__init__.py index 3d828b8..dc6b9c9 100644 --- a/code/__init__.py +++ b/code/__init__.py @@ -45,7 +45,8 @@ to_enu, apply_tilt_correction, geodetic_to_enu, - calculate_range_from_coordinates + calculate_range_from_coordinates, + build_baseline_perturbations ) from .optimization import ( @@ -63,6 +64,21 @@ normalize_data ) +from .range_salinity import ( + SalinityCalibrationConfig, + GapMeanShiftConfig, + prepare_salinity_series, + bottle_residuals +) + +from .tidal_correction import ( + load_tidal_predictions, + optimize_tidal_correction, + apply_tidal_correction +) + +from .velocity_interpolation import interpolate_velocity + from .data_persistence import ( save_dataframe_as_pickle, load_dataframe_from_pickle, @@ -94,6 +110,7 @@ 'apply_tilt_correction', 'geodetic_to_enu', 'calculate_range_from_coordinates', + 'build_baseline_perturbations', # Optimization 'fit_and_extrapolate', @@ -107,6 +124,16 @@ 'calculate_rms_error', 'calculate_statistics', 'normalize_data', + + # Range calculation helpers + 'SalinityCalibrationConfig', + 'GapMeanShiftConfig', + 'prepare_salinity_series', + 'bottle_residuals', + 'load_tidal_predictions', + 'optimize_tidal_correction', + 'apply_tidal_correction', + 'interpolate_velocity', # Data persistence 'save_dataframe_as_pickle', @@ -115,4 +142,4 @@ 'save_baseline_data', 'save_instrument_data', 'load_existing_pickles' -] \ No newline at end of file +] diff --git a/code/fetch_range_calculation.ipynb b/code/fetch_range_calculation.ipynb new file mode 100644 index 0000000..9acca51 --- /dev/null +++ b/code/fetch_range_calculation.ipynb @@ -0,0 +1,85 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# FETCH Range Calculation (Structured)\n", + "\n", + "This notebook calls the structured workflow modules in `code/` while preserving\n", + "the original file naming used in the FETCH Range Calculation notebook.\n" + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "from datetime import datetime\n", + "import pandas as pd\n", + "\n", + "from code.range_calculation_main import (\n", + " FetchRangeInputs,\n", + " run_fetch_range_workflow,\n", + ")\n", + "from code.range_salinity import SalinityCalibrationConfig, GapMeanShiftConfig\n", + "\n", + "filepaths = [\n", + " 'Data_230912111909_East_006870_2502_.csv',\n", + " 'Data_230912112033_West_006874_2503_.csv',\n", + " 'Data_230913091309_North_00687A_2504_.csv',\n", + "]\n", + "inputs = FetchRangeInputs(filepaths=filepaths, identifiers=['2504', '2503', '2502'])\n", + "\n", + "salinity_config = SalinityCalibrationConfig(\n", + " file_path='/data/wsd02/maleen_data/ooi-rs03ccal-mj03f-12-ctdpfb305_2f21_522e_6ceb.csv',\n", + " bottle_times=pd.to_datetime([\n", + " '2022-08-14 05:49:53',\n", + " '2022-08-30 19:37:13',\n", + " '2023-09-17 03:35:09',\n", + " ]),\n", + " bottle_sal=[34.52543602, 34.52720340, 34.52700000],\n", + " start_time=pd.to_datetime('2022-08-14 05:49:53'),\n", + " end_time=pd.to_datetime('2025-09-05 23:59:00'),\n", + " jump_start=pd.to_datetime('2023-09-07 00:00:00'),\n", + " jump_end=pd.to_datetime('2023-09-14 00:00:00'),\n", + " smooth_after=pd.to_datetime('2023-09-14 00:00:00'),\n", + " smooth_len=250,\n", + ")\n", + "\n", + "gap_shift_config = GapMeanShiftConfig(\n", + " gap_start=pd.to_datetime('2024-08-31 21:43:00'),\n", + " gap_end=pd.to_datetime('2024-09-01 20:13:00'),\n", + " pre_hours=24,\n", + " post_hours=24,\n", + ")\n", + "\n", + "tidal_prediction_paths = [\n", + " '/data/wsd02/maleen_data/pred_F_2022.txt',\n", + " '/data/wsd02/maleen_data/pred_F_2023.txt',\n", + " '/data/wsd02/maleen_data/pred_F_2024.txt',\n", + " '/data/wsd02/maleen_data/pred_F_2025.txt',\n", + "]\n", + "\n", + "outputs = run_fetch_range_workflow(\n", + " inputs,\n", + " salinity_config,\n", + " gap_shift_config,\n", + " tidal_prediction_paths,\n", + ")\n", + "outputs.keys()\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/code/positioning.py b/code/positioning.py index dd102df..22bb1f7 100644 --- a/code/positioning.py +++ b/code/positioning.py @@ -168,6 +168,55 @@ def calculate_baseline_perturbations( return perturb_df +def build_baseline_perturbations( + stations: dict, + pairs: list, + h_txr: float = H_TXR, +) -> pd.DataFrame: + """ + Build a baseline perturbation table for multiple station pairs. + + Args: + stations: Mapping of station IDs to dicts with keys: + 'inc' (DataFrame with Record Time, Pitch, Roll), + 'lat', 'lon', and 'heading'. + pairs: List of (tx, rx) tuples specifying baseline directions. + h_txr: Transducer-to-tilt-sensor lever arm in meters. + + Returns: + DataFrame indexed by timestamp with per-baseline perturbation columns. + """ + series_list = [] + + for tx, rx in pairs: + station_tx = stations[tx] + station_rx = stations[rx] + + timeline = pd.Index( + sorted(set(station_tx["inc"]["Record Time"]) | set(station_rx["inc"]["Record Time"])) + ) + + inc_tx = interp_unique(station_tx["inc"], timeline) + inc_rx = interp_unique(station_rx["inc"], timeline) + + e_tx, n_tx = to_enu(*local_xy(inc_tx, h_txr), station_tx["heading"]) + e_rx, n_rx = to_enu(*local_xy(inc_rx, h_txr), station_rx["heading"]) + + ue, un = unit_vector( + station_tx["lat"], + station_tx["lon"], + station_rx["lat"], + station_rx["lon"], + ) + dL_oneway = (e_tx - e_rx) * ue + (n_tx - n_rx) * un + + series_list.append( + pd.DataFrame({f"{tx}-{rx}_dL": dL_oneway}, index=timeline) + ) + + return pd.concat(series_list, axis=1).sort_index() + + def geodetic_to_enu( lat: float, lon: float, @@ -272,4 +321,4 @@ def correct_sound_path( standard_sound_speed = 1500.0 corrected_range = measured_range * (standard_sound_speed / sound_speed) - return corrected_range \ No newline at end of file + return corrected_range diff --git a/code/pressure_moving_average.py b/code/pressure_moving_average.py new file mode 100644 index 0000000..f97287f --- /dev/null +++ b/code/pressure_moving_average.py @@ -0,0 +1,57 @@ +""" +Pressure moving-average utilities for FETCH Range Calculation analysis. +""" + +from __future__ import annotations + +import pandas as pd + +PSI_TO_KPA = 6.894757 +COL_KPA = "Corrected Pressure (kPa)" + + +def prep_ma15d(df: pd.DataFrame, column: str = COL_KPA) -> pd.DataFrame: + df = df.copy() + df["DateTime"] = pd.to_datetime(df["DateTime"], errors="coerce") + df.sort_values("DateTime", inplace=True) + mu = df[column].mean(skipna=True) + df["pressure_demeaned"] = df[column] - mu + df["ma15d"] = ( + df.set_index("DateTime")["pressure_demeaned"] + .rolling("15D", min_periods=1) + .mean() + .values + ) + return df + + +def ensure_demeaned(df: pd.DataFrame, column: str = COL_KPA) -> pd.DataFrame: + df = df.copy() + df["DateTime"] = pd.to_datetime(df["DateTime"], errors="coerce") + df.sort_values("DateTime", inplace=True) + if "pressure_demeaned" not in df.columns: + mu = df[column].mean(skipna=True) + df["pressure_demeaned"] = df[column] - mu + df["ma15d"] = ( + df.set_index("DateTime")["pressure_demeaned"] + .rolling("15D", min_periods=1) + .mean() + .values + ) + return df + + +def to_ma15d(df: pd.DataFrame, tcol: str, vcol: str, unit: str = "kpa") -> pd.Series: + x = df[[tcol, vcol]].copy() + x[tcol] = pd.to_datetime(x[tcol], errors="coerce") + x = x.dropna(subset=[tcol]).sort_values(tcol) + v = x[vcol].astype(float) + if unit.lower() == "psi": + v = v * PSI_TO_KPA + s = pd.Series(v.values, index=x[tcol]) + return s.rolling("15D", center=True, min_periods=1).mean() + + +def rebase_to_window(ma: pd.Series, start: pd.Timestamp, end: pd.Timestamp) -> pd.Series: + w = ma.loc[start:end] + return ma - w.mean() diff --git a/code/range_calculation_main.py b/code/range_calculation_main.py new file mode 100644 index 0000000..6f4e666 --- /dev/null +++ b/code/range_calculation_main.py @@ -0,0 +1,83 @@ +""" +Main entry point for the FETCH Range Calculation workflow. + +This script is intended to be imported and executed from a notebook. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List, Tuple + +import pandas as pd + +from .range_calculation_workflow import ( + build_harmonic_means, + build_sound_speed_tables, + extract_sensor_data, + load_fetch_data, +) +from .range_salinity import ( + GapMeanShiftConfig, + SalinityCalibrationConfig, + bottle_residuals, + prepare_salinity_series, +) +from .tidal_correction import apply_tidal_correction, load_tidal_predictions, optimize_tidal_correction +from .velocity_interpolation import interpolate_velocity + + +@dataclass(frozen=True) +class FetchRangeInputs: + filepaths: List[str] + identifiers: List[str] + + +def run_fetch_range_workflow( + inputs: FetchRangeInputs, + salinity_config: SalinityCalibrationConfig, + gap_shift_config: GapMeanShiftConfig | None, + tidal_prediction_paths: List[str], +) -> Dict[str, object]: + df_dict = load_fetch_data(inputs.filepaths) + data_extracted = extract_sensor_data(df_dict, inputs.identifiers) + + result_dfs = build_sound_speed_tables(data_extracted, inputs.identifiers) + pairs = [("2502", "2503"), ("2502", "2504"), ("2503", "2504")] + harmonic_mean_dfs = build_harmonic_means(result_dfs, pairs) + + salinity_df = prepare_salinity_series(salinity_config, gap_shift_config) + salinity_residuals = bottle_residuals( + salinity_df, + salinity_config.bottle_times, + salinity_config.bottle_sal, + ) + + tidal_df = load_tidal_predictions(tidal_prediction_paths) + + outputs = { + "df_dict": df_dict, + "data_extracted": data_extracted, + "result_dfs": result_dfs, + "harmonic_mean_dfs": harmonic_mean_dfs, + "salinity_df": salinity_df, + "salinity_residuals": salinity_residuals, + "tidal_df": tidal_df, + } + + return outputs + + +def apply_tidal_and_velocity( + combined_df: pd.DataFrame, + tidal_df: pd.DataFrame, + result_df: pd.DataFrame, + amplitude: float | None = None, + rho: float | None = None, +) -> pd.DataFrame: + if amplitude is None or rho is None: + amplitude, rho = optimize_tidal_correction(combined_df, tidal_df) + + corrected = apply_tidal_correction(combined_df, tidal_df, amplitude, rho) + corrected["interp_v"] = interpolate_velocity(corrected["Record Time"], result_df) + return corrected diff --git a/code/range_calculation_workflow.py b/code/range_calculation_workflow.py new file mode 100644 index 0000000..ddf544e --- /dev/null +++ b/code/range_calculation_workflow.py @@ -0,0 +1,124 @@ +""" +Structured workflow for the FETCH Range Calculation notebook. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, Iterable, List, Tuple + +import numpy as np +import pandas as pd + +from .data_processing import create_nested_dictionary, ensure_datetime, remove_outliers +from .utils import calculate_harmonic_mean, compute_harmonic_mean + + +@dataclass(frozen=True) +class FetchFileGroup: + filepaths: Iterable[str] + + +def load_fetch_data(filepaths: Iterable[str]) -> Dict[str, Dict[str, pd.DataFrame]]: + return create_nested_dictionary(list(filepaths)) + + +def extract_sensor_data( + df_dict: Dict[str, Dict[str, pd.DataFrame]], + identifiers: Iterable[str], + sensor_keys: Iterable[str] = ("TMP", "DQZ", "INC", "SSP", "BSL"), +) -> Dict[str, Dict[str, pd.DataFrame]]: + data_extracted: Dict[str, Dict[str, pd.DataFrame]] = {} + for identifier in identifiers: + data_extracted[identifier] = {} + for key in sensor_keys: + data_extracted[identifier][key] = df_dict.get(identifier, {}).get(key) + return data_extracted + + +def build_sound_speed_tables( + data_extracted: Dict[str, Dict[str, pd.DataFrame]], + identifiers: Iterable[str], + column: str = "SoundSpeed (m/s)", +) -> Dict[str, pd.DataFrame]: + result_dfs: Dict[str, pd.DataFrame] = {} + for identifier in identifiers: + ssp_df = data_extracted[identifier].get("SSP") + if ssp_df is None: + continue + ensure_datetime(ssp_df, "Record Time") + ssp_df[column] = pd.to_numeric(ssp_df[column], errors="coerce") + result_dfs[identifier] = remove_outliers(ssp_df, column) + return result_dfs + + +def build_harmonic_means( + result_dfs: Dict[str, pd.DataFrame], + pairs: Iterable[Tuple[str, str]], + column: str = "SoundSpeed (m/s)", +) -> Dict[str, pd.DataFrame]: + harmonic_mean_dfs: Dict[str, pd.DataFrame] = {} + for inst_a, inst_b in pairs: + df1 = result_dfs[inst_a].set_index("Record Time") + df2 = result_dfs[inst_b].set_index("Record Time") + harmonic_series = calculate_harmonic_mean(df1[column], df2[column]) + harmonic_mean_dfs[f"{inst_a}_{inst_b}"] = pd.DataFrame( + {"Record Time": harmonic_series.index, "HMean": harmonic_series.values} + ) + return harmonic_mean_dfs + + +def build_harmonic_mean_from_combined( + combined_df4: pd.DataFrame, + combined_df3: pd.DataFrame, + combined_df2: pd.DataFrame, + column: str = "velocity", +) -> Dict[str, pd.DataFrame]: + for df in (combined_df4, combined_df3, combined_df2): + if "Record Time" in df.columns: + df["Record Time"] = pd.to_datetime(df["Record Time"]) + df.set_index("Record Time", inplace=True) + elif not isinstance(df.index, pd.DatetimeIndex): + df.index = pd.to_datetime(df.index) + + index_2504_2503, harmonic_mean_2504_2503 = compute_harmonic_mean( + combined_df4, combined_df3, column=column + ) + index_2504_2502, harmonic_mean_2504_2502 = compute_harmonic_mean( + combined_df4, combined_df2, column=column + ) + index_2503_2502, harmonic_mean_2503_2502 = compute_harmonic_mean( + combined_df3, combined_df2, column=column + ) + + harmonic_means = { + "2502_2503": (index_2503_2502, harmonic_mean_2503_2502), + "2502_2504": (index_2504_2502, harmonic_mean_2504_2502), + "2503_2504": (index_2504_2503, harmonic_mean_2504_2503), + } + + harmonic_df_dict: Dict[str, pd.DataFrame] = {} + for key, (index, values) in harmonic_means.items(): + harmonic_df_dict[key] = pd.DataFrame({"Record Time": index, "HMean": values}) + return harmonic_df_dict + + +def build_pressure_temperature_data( + data_extracted: Dict[str, Dict[str, pd.DataFrame]], + identifier: str, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + pressure_df = pd.DataFrame( + { + "Record Time": pd.to_datetime(data_extracted[identifier]["DQZ"]["Record Time"]), + "Pressure (kPa)": data_extracted[identifier]["DQZ"]["Pressure (kPa)"], + } + ).set_index("Record Time") + + temperature_df = pd.DataFrame( + { + "Record Time": pd.to_datetime(data_extracted[identifier]["TMP"]["Record Time"]), + "Temperature Deg C": data_extracted[identifier]["TMP"]["Temperature Deg C"], + } + ).set_index("Record Time") + + return pressure_df, temperature_df diff --git a/code/range_salinity.py b/code/range_salinity.py new file mode 100644 index 0000000..22cbd83 --- /dev/null +++ b/code/range_salinity.py @@ -0,0 +1,199 @@ +""" +Salinity correction workflow for FETCH Range Calculation analysis. + +This module mirrors the salinity correction steps from the +FETCH Range Calculation notebook, including gap stitching, +post-gap smoothing, bottle calibration, and optional gap mean shifts. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable, Tuple + +import numpy as np +import pandas as pd +from scipy.ndimage import uniform_filter1d +from scipy.stats import linregress + + +@dataclass(frozen=True) +class SalinityCalibrationConfig: + file_path: str + bottle_times: Iterable[pd.Timestamp] + bottle_sal: Iterable[float] + start_time: pd.Timestamp + end_time: pd.Timestamp + jump_start: pd.Timestamp + jump_end: pd.Timestamp + smooth_after: pd.Timestamp + smooth_len: int = 250 + + +@dataclass(frozen=True) +class GapMeanShiftConfig: + gap_start: pd.Timestamp + gap_end: pd.Timestamp + pre_hours: int = 24 + post_hours: int = 24 + column: str = "corrected_salinity" + overwrite_column: str | None = None + + +def load_salinity_data(file_path: str) -> pd.DataFrame: + df = pd.read_csv(file_path, usecols=[0, 1]).drop(0).reset_index(drop=True) + df.columns = ["time", "sal_raw"] + df["time"] = pd.to_datetime(df["time"]).dt.tz_localize(None) + df["sal_raw"] = pd.to_numeric(df["sal_raw"], errors="coerce") + return df[df["sal_raw"].between(34.48, 100)].copy() + + +def remove_gap_and_stitch( + df: pd.DataFrame, jump_start: pd.Timestamp, jump_end: pd.Timestamp +) -> pd.DataFrame: + gap_mask = (df["time"] >= jump_start) & (df["time"] < jump_end) + df = df[~gap_mask].reset_index(drop=True) + + pre_gap_val = df.loc[df["time"] < jump_start, "sal_raw"].iloc[-1] + post_gap_val = df.loc[df["time"] >= jump_end, "sal_raw"].iloc[0] + gap_shift = pre_gap_val - post_gap_val + df.loc[df["time"] >= jump_end, "sal_raw"] += gap_shift + return df + + +def smooth_post_gap( + df: pd.DataFrame, smooth_after: pd.Timestamp, smooth_len: int +) -> pd.DataFrame: + post_idx = df.index[df["time"] >= smooth_after] + if len(post_idx): + df.loc[post_idx, "sal_raw"] = uniform_filter1d( + df.loc[post_idx, "sal_raw"], size=smooth_len + ) + return df + + +def calibrate_bottle_salinity( + df: pd.DataFrame, + bottle_times: Iterable[pd.Timestamp], + bottle_sal: Iterable[float], + jump_start: pd.Timestamp, + jump_end: pd.Timestamp, + smooth_after: pd.Timestamp, +) -> pd.DataFrame: + bottle_times = pd.to_datetime(list(bottle_times)) + bottle_sal = np.array(list(bottle_sal), dtype=float) + + def nearest_val(t: pd.Timestamp) -> float: + return df.iloc[(df["time"] - t).abs().argmin()]["sal_raw"] + + raw_b1, raw_b2 = map(nearest_val, bottle_times[:2]) + d1 = bottle_sal[0] - raw_b1 + d2 = bottle_sal[1] - raw_b2 + + t1, t2, t3 = bottle_times + + df["offset_A"] = 0.0 + mask_A = (df["time"] >= t1) & (df["time"] <= t2) + w = (df.loc[mask_A, "time"] - t1) / (t2 - t1) + df.loc[mask_A, "offset_A"] = d1 + w * (d2 - d1) + df.loc[df["time"] > t2, "offset_A"] = d2 + + mask_B = (df["time"] > t2) & (df["time"] < jump_start) + if mask_B.any(): + linregress( + df.loc[mask_B, "time"].astype("int64"), + df.loc[mask_B, "sal_raw"] + d2, + ) + + raw_b3 = nearest_val(t3) + shift_C = bottle_sal[2] - (raw_b3 + d2) + df["offset_C"] = 0.0 + df.loc[df["time"] >= smooth_after, "offset_C"] = shift_C + + df["offset_P"] = 0.0 + ramp_mask = (df["time"] > t2) & (df["time"] < jump_end) + if ramp_mask.any(): + rr = (df.loc[ramp_mask, "time"] - t2) / (jump_end - t2) + df.loc[ramp_mask, "offset_P"] = rr * shift_C + + df["total_offset"] = df["offset_A"] + df["offset_P"] + df["offset_C"] + df["corrected_salinity"] = df["sal_raw"] + df["total_offset"] + return df + + +def apply_gap_mean_shift( + sdf: pd.DataFrame, + gap_start: pd.Timestamp, + gap_end: pd.Timestamp, + pre_hours: int = 24, + post_hours: int = 24, + column: str = "corrected_salinity", + overwrite_column: str | None = None, +) -> pd.DataFrame: + df = sdf.copy() + target_col = overwrite_column or column + + pre_window = df[(df["time"] >= gap_start - pd.Timedelta(hours=pre_hours)) + & (df["time"] < gap_start)] + post_window = df[(df["time"] >= gap_end) + & (df["time"] < gap_end + pd.Timedelta(hours=post_hours))] + + if pre_window.empty or post_window.empty: + df[target_col] = df[column] + return df + + pre_mean = pre_window[column].mean() + post_mean = post_window[column].mean() + shift = pre_mean - post_mean + + df[target_col] = df[column] + mask = df["time"] >= gap_end + df.loc[mask, target_col] = df.loc[mask, target_col] + shift + + gap_mask = (df["time"] >= gap_start) & (df["time"] < gap_end) + df = df[~gap_mask].reset_index(drop=True) + return df + + +def prepare_salinity_series( + config: SalinityCalibrationConfig, + gap_shift: GapMeanShiftConfig | None = None, +) -> pd.DataFrame: + df = load_salinity_data(config.file_path) + df = remove_gap_and_stitch(df, config.jump_start, config.jump_end) + df = smooth_post_gap(df, config.smooth_after, config.smooth_len) + df = calibrate_bottle_salinity( + df, + config.bottle_times, + config.bottle_sal, + config.jump_start, + config.jump_end, + config.smooth_after, + ) + + if gap_shift is not None: + df = apply_gap_mean_shift( + df, + gap_shift.gap_start, + gap_shift.gap_end, + pre_hours=gap_shift.pre_hours, + post_hours=gap_shift.post_hours, + column=gap_shift.column, + overwrite_column=gap_shift.overwrite_column, + ) + + mask = (df["time"] >= config.start_time) & (df["time"] <= config.end_time) + return df.loc[mask].copy() + + +def bottle_residuals( + salinity_df: pd.DataFrame, + bottle_times: Iterable[pd.Timestamp], + bottle_sal: Iterable[float], + column: str = "corrected_salinity", +) -> Tuple[Tuple[pd.Timestamp, float], ...]: + residuals = [] + for bt, true_s in zip(pd.to_datetime(list(bottle_times)), bottle_sal): + est = salinity_df.iloc[(salinity_df["time"] - bt).abs().argmin()] + residuals.append((bt, float(est[column] - true_s))) + return tuple(residuals) diff --git a/code/tidal_correction.py b/code/tidal_correction.py new file mode 100644 index 0000000..e4c446f --- /dev/null +++ b/code/tidal_correction.py @@ -0,0 +1,68 @@ +""" +Tidal correction workflow for FETCH Range Calculation analysis. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable, Tuple + +import numpy as np +import pandas as pd +from scipy.optimize import least_squares + +from .data_processing import parse_to_dataframe +from .optimization import optimize_tidal_influence + + +@dataclass(frozen=True) +class TidalPredictionFiles: + paths: Iterable[str] + + +def load_tidal_predictions(paths: Iterable[str]) -> pd.DataFrame: + frames = [] + for path in paths: + with open(path, "r", encoding="utf-8") as handle: + frames.append(parse_to_dataframe(handle.read())) + + tidal_df = pd.concat(frames, ignore_index=True) + tidal_df.set_index("DateTime", inplace=True) + return tidal_df + + +def optimize_tidal_correction( + pressure_df: pd.DataFrame, + tidal_df: pd.DataFrame, + initial_amplitude: float = 1.0, + initial_rho: float = 1025.0, +) -> Tuple[float, float]: + result = least_squares( + optimize_tidal_influence, + x0=[initial_amplitude, initial_rho], + args=(tidal_df, pressure_df), + ) + amplitude, rho = result.x + return float(amplitude), float(rho) + + +def apply_tidal_correction( + combined_df: pd.DataFrame, + tidal_df: pd.DataFrame, + amplitude: float, + rho: float, +) -> pd.DataFrame: + df = combined_df.copy() + df["DateTime"] = pd.to_datetime(df["Record Time"]) + df.set_index("DateTime", inplace=True) + + adjusted_tidal = amplitude * tidal_df["Value"] + tidal_df = tidal_df.copy() + tidal_df["Adjusted Value"] = adjusted_tidal + + interpolated = tidal_df.reindex(df.index, method="nearest")["Adjusted Value"] + df["Tidal Influence (kPa)"] = (rho * 9.81 * interpolated) * 0.001 + df["Corrected Pressure (kPa)"] = df["Pressure (kPa)"] - df["Tidal Influence (kPa)"] + + df.reset_index(inplace=True) + return df diff --git a/code/velocity_interpolation.py b/code/velocity_interpolation.py new file mode 100644 index 0000000..d3b7d4f --- /dev/null +++ b/code/velocity_interpolation.py @@ -0,0 +1,24 @@ +""" +Velocity interpolation utilities for FETCH Range Calculation analysis. +""" + +import pandas as pd +from scipy.interpolate import interp1d + + +def interpolate_velocity(time_series: pd.Series, velocity_df: pd.DataFrame) -> pd.Series: + velocity_df = velocity_df.copy() + velocity_df["Record Time"] = pd.to_datetime(velocity_df["Record Time"]) + time_numeric = velocity_df["Record Time"].astype("int64") + + interpolator = interp1d( + time_numeric, + velocity_df["SoundSpeed"], + fill_value="extrapolate", + bounds_error=False, + ) + + return pd.Series( + interpolator(pd.to_datetime(time_series).astype("int64")), + index=pd.to_datetime(time_series), + )