diff --git a/packages/openstef-models/src/openstef_models/presets/forecasting_workflow.py b/packages/openstef-models/src/openstef_models/presets/forecasting_workflow.py index b30ea60b2..028302c1b 100644 --- a/packages/openstef-models/src/openstef_models/presets/forecasting_workflow.py +++ b/packages/openstef-models/src/openstef_models/presets/forecasting_workflow.py @@ -44,6 +44,7 @@ SampleWeighter, Scaler, Selector, + Shifter, ) from openstef_models.transforms.postprocessing import ConfidenceIntervalApplicator, QuantileSorter from openstef_models.transforms.time_domain import ( @@ -206,6 +207,11 @@ class ForecastingWorkflowConfig(BaseConfig): # PredictionJob ) # Feature engineering + shifters: list[Shifter] = Field( + default=[], + description="List of feature shifts to align aggregation intervals. " + "Each Shifter can target different features with different aggregation periods.", + ) rolling_aggregate_features: list[AggregationFunction] = Field( default=[], description="If not None, rolling aggregate(s) of load will be used as features in the model.", @@ -311,6 +317,7 @@ def create_forecasting_workflow( ), CompletenessChecker(completeness_threshold=config.completeness_threshold), ] + feature_aligners = config.shifters feature_adders = [ LagsAdder( history_available=config.predict_history, @@ -361,6 +368,7 @@ def create_forecasting_workflow( if config.model == "xgboost": preprocessing = [ *checks, + *feature_aligners, *feature_adders, HolidayFeatureAdder(country_code=config.location.country_code), DatetimeFeaturesAdder(onehot_encode=False), @@ -382,6 +390,7 @@ def create_forecasting_workflow( elif config.model == "lgbmlinear": preprocessing = [ *checks, + *feature_aligners, *feature_adders, HolidayFeatureAdder(country_code=config.location.country_code), DatetimeFeaturesAdder(onehot_encode=False), @@ -396,6 +405,7 @@ def create_forecasting_workflow( elif config.model == "lgbm": preprocessing = [ *checks, + *feature_aligners, *feature_adders, HolidayFeatureAdder(country_code=config.location.country_code), DatetimeFeaturesAdder(onehot_encode=False), @@ -410,6 +420,7 @@ def create_forecasting_workflow( elif config.model == "gblinear": preprocessing = [ *checks, + *feature_aligners, *feature_adders, *feature_standardizers, Imputer( diff --git a/packages/openstef-models/src/openstef_models/transforms/general/__init__.py b/packages/openstef-models/src/openstef_models/transforms/general/__init__.py index c4888e6c0..e62d81db6 100644 --- a/packages/openstef-models/src/openstef_models/transforms/general/__init__.py +++ b/packages/openstef-models/src/openstef_models/transforms/general/__init__.py @@ -19,6 +19,7 @@ from openstef_models.transforms.general.sample_weighter import SampleWeightConfig, SampleWeighter from openstef_models.transforms.general.scaler import Scaler from openstef_models.transforms.general.selector import Selector +from openstef_models.transforms.general.shifter import Shifter __all__ = [ "Clipper", @@ -31,4 +32,5 @@ "SampleWeighter", "Scaler", "Selector", + "Shifter", ] diff --git a/packages/openstef-models/src/openstef_models/transforms/general/shifter.py b/packages/openstef-models/src/openstef_models/transforms/general/shifter.py new file mode 100644 index 000000000..06db0b99e --- /dev/null +++ b/packages/openstef-models/src/openstef_models/transforms/general/shifter.py @@ -0,0 +1,130 @@ +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +"""Transform for shifting features to align aggregation intervals. + +This module provides functionality to shift time series features that are aggregated +over a different interval than the target variable, correcting the phase misalignment +by shifting and linearly interpolating back onto the original time grid. +""" + +from datetime import timedelta +from typing import Any, override + +import numpy as np +from pydantic import Field, PrivateAttr + +from openstef_core.base_model import BaseConfig +from openstef_core.datasets import TimeSeriesDataset +from openstef_core.transforms import TimeSeriesTransform +from openstef_models.utils.feature_selection import FeatureSelection + + +class Shifter(BaseConfig, TimeSeriesTransform): + """Transform that shifts features to align their aggregation interval with the target. + + When source features are aggregated over a different interval than the target variable, + their timestamps represent a different center point in time. This transform corrects + the phase misalignment by shifting the source features and linearly interpolating + back onto the original time grid. + + The shift is computed as:: + + shift = source_aggregation_period / 2 - target_aggregation_period / 2 + + Timestamps are assumed to be at the end of the aggregation interval. + For example, a timestamp of 12:00 with a 60-minute aggregation period represents + the average over [11:00, 12:00], centered at 11:30. For instantaneous features + or target, use an aggregation period of zero. + + Example: Aligning hourly radiation with 15-minute load + Hourly radiation (source_aggregation_period=60 min) has its center 30 min + before the timestamp, while 15-minute load (target_aggregation_period=15 min) + has its center 7.5 min before the timestamp. The required backward shift + for radiation is 22.5 minutes. + + >>> import pandas as pd + >>> from datetime import timedelta + >>> from openstef_core.datasets import TimeSeriesDataset + >>> from openstef_models.transforms.general import Shifter + >>> from openstef_models.utils.feature_selection import FeatureSelection + >>> + >>> # Hourly radiation interpolated onto a 15-minute grid + >>> index = pd.date_range('2025-01-01', periods=8, freq='15min') + >>> data = pd.DataFrame({ + ... 'load': range(8), + ... 'radiation': [200, 220, 240, 260, 280, 300, 320, 340], + ... }, index=index) + >>> dataset = TimeSeriesDataset(data, timedelta(minutes=15)) + >>> + >>> shifter = Shifter( + ... selection=FeatureSelection(include=['radiation']), + ... source_aggregation_period=timedelta(minutes=60), + ... target_aggregation_period=timedelta(minutes=15), + ... fill_edges=True, + ... ) + >>> result = shifter.transform(dataset) + >>> result.data['radiation'].tolist() + [230.0, 250.0, 270.0, 290.0, 310.0, 330.0, 340.0, 340.0] + """ + + selection: FeatureSelection = Field( + default=FeatureSelection.NONE, + description="Features to shift.", + ) + source_aggregation_period: timedelta = Field( + default=timedelta(minutes=60), + description="Aggregation period of the source features.", + ) + target_aggregation_period: timedelta = Field( + default=timedelta(minutes=15), + description="Aggregation period of the target variable.", + ) + fill_edges: bool = Field( + default=False, + description=( + "Whether to fill NaN at the edges introduced by the shift " + "with the original (un-shifted) boundary value of each feature." + ), + ) + + _shift: timedelta = PrivateAttr() + + @override + def model_post_init(self, context: Any) -> None: + self._shift = self.source_aggregation_period / 2 - self.target_aggregation_period / 2 + + @override + def transform(self, data: TimeSeriesDataset) -> TimeSeriesDataset: + if self._shift == timedelta(0): + return data + + features = self.selection.resolve(data.feature_names) + transformed_data = data.data.copy() + + original_index = data.index + shifted_index = original_index - self._shift + combined_index = original_index.union(shifted_index) + + feature_data = transformed_data[features] + + # Place values on the shifted time axis, interpolate back onto the original grid + shifted_df = feature_data.set_axis(shifted_index) # pyright: ignore[reportUnknownMemberType] + combined_df = shifted_df.reindex(combined_index) + + limit_area = None if self.fill_edges else "inside" + realigned = combined_df.interpolate(method="time", limit_direction="both", limit_area=limit_area) + realigned = realigned.reindex(original_index) + + # Restore pre-existing NaN at their shifted positions (nearest-neighbor mapping) + nan_mask_shifted = feature_data.isna().set_axis(shifted_index) # pyright: ignore[reportUnknownMemberType] + realigned[nan_mask_shifted.reindex(original_index, method="nearest")] = np.nan + + transformed_data[features] = realigned + + return data.copy_with(data=transformed_data, is_sorted=True) + + @override + def features_added(self) -> list[str]: + return [] diff --git a/packages/openstef-models/tests/unit/transforms/general/test_shifter.py b/packages/openstef-models/tests/unit/transforms/general/test_shifter.py new file mode 100644 index 000000000..b3a209e03 --- /dev/null +++ b/packages/openstef-models/tests/unit/transforms/general/test_shifter.py @@ -0,0 +1,174 @@ +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +from datetime import timedelta + +import numpy as np +import pandas as pd +import pytest + +from openstef_core.datasets import TimeSeriesDataset +from openstef_models.transforms.general import Shifter +from openstef_models.utils.feature_selection import FeatureSelection + + +@pytest.fixture +def sample_dataset() -> TimeSeriesDataset: + """Sample dataset on a 15-minute grid.""" + return TimeSeriesDataset( + data=pd.DataFrame( + { + "load": list(range(8)), + "radiation": [200.0, 220.0, 240.0, 260.0, 280.0, 300.0, 320.0, 340.0], + }, + index=pd.date_range("2025-01-01", periods=8, freq="15min"), + ), + sample_interval=timedelta(minutes=15), + ) + + +@pytest.mark.parametrize( + ("source_aggregation_period", "target_aggregation_period", "expected_radiation"), + [ + pytest.param( + timedelta(minutes=60), + timedelta(minutes=15), + [230.0, 250.0, 270.0, 290.0, 310.0, 330.0, np.nan, np.nan], + id="60min_to_15min", + ), + pytest.param( + timedelta(minutes=30), + timedelta(minutes=15), + [210.0, 230.0, 250.0, 270.0, 290.0, 310.0, 330.0, np.nan], + id="30min_to_15min", + ), + pytest.param( + timedelta(minutes=60), + timedelta(0), + [240.0, 260.0, 280.0, 300.0, 320.0, 340.0, np.nan, np.nan], + id="60min_to_instantaneous", + ), + ], +) +def test_shifter__shift_and_interpolate( + sample_dataset: TimeSeriesDataset, + source_aggregation_period: timedelta, + target_aggregation_period: timedelta, + expected_radiation: list[float], +): + """Test that features are shifted and interpolated correctly for different intervals.""" + # Arrange + shifter = Shifter( + selection=FeatureSelection(include={"radiation"}), + source_aggregation_period=source_aggregation_period, + target_aggregation_period=target_aggregation_period, + ) + + # Act + result = shifter.transform(sample_dataset) + + # Assert + expected = pd.Series(expected_radiation, index=sample_dataset.index, name="radiation") + pd.testing.assert_series_equal(result.data["radiation"], expected) + # Unselected feature should be unchanged + assert result.data["load"].tolist() == list(range(8)) + + +def test_shifter__no_shift_when_intervals_equal(sample_dataset: TimeSeriesDataset): + """Test that the same dataset object is returned when no shift is needed.""" + # Arrange + shifter = Shifter( + selection=FeatureSelection(include={"radiation"}), + source_aggregation_period=timedelta(minutes=15), + target_aggregation_period=timedelta(minutes=15), + ) + + # Act + result = shifter.transform(sample_dataset) + + # Assert + assert result is sample_dataset + + +@pytest.mark.parametrize( + ("fill_edges", "expected_trailing"), + [ + pytest.param(False, [np.nan, np.nan], id="no_fill_leaves_nan"), + pytest.param(True, [340.0, 340.0], id="fill_uses_last_original_value"), + ], +) +def test_shifter__fill_edges( + sample_dataset: TimeSeriesDataset, + fill_edges: bool, + expected_trailing: list[float], +): + """Test edge handling with fill_edges.""" + # Arrange + shifter = Shifter( + selection=FeatureSelection(include={"radiation"}), + fill_edges=fill_edges, + ) + + # Act + result = shifter.transform(sample_dataset) + + # Assert — first 6 values are always the shifted+interpolated result + assert result.data["radiation"].iloc[:6].tolist() == [230.0, 250.0, 270.0, 290.0, 310.0, 330.0] + # Trailing 2 values depend on fill_edges + expected = pd.Series(expected_trailing, name="radiation", index=sample_dataset.index[-2:]) + pd.testing.assert_series_equal(result.data["radiation"].iloc[-2:], expected) + + +def test_shifter__fill_edges_leading_nan(sample_dataset: TimeSeriesDataset): + """Test fill_edges handles leading NaN when source interval is smaller than target.""" + # Arrange — negative shift (source < target) produces leading NaN + shifter = Shifter( + selection=FeatureSelection(include={"radiation"}), + source_aggregation_period=timedelta(minutes=15), + target_aggregation_period=timedelta(minutes=60), + fill_edges=True, + ) + + # Act + result = shifter.transform(sample_dataset) + + # Assert — leading NaN should be filled with the first original value (200.0) + expected = pd.Series( + [200.0, 200.0, 210.0, 230.0, 250.0, 270.0, 290.0, 310.0], + index=sample_dataset.index, + name="radiation", + ) + pd.testing.assert_series_equal(result.data["radiation"], expected) + + +def test_shifter__preserves_preexisting_nan(): + """Test that pre-existing NaN values are shifted rather than imputed.""" + # Arrange + data = pd.DataFrame( + { + "load": list(range(8)), + "radiation": [200.0, 220.0, np.nan, 260.0, 280.0, np.nan, np.nan, 340.0], + }, + index=pd.date_range("2025-01-01", periods=8, freq="15min"), + ) + dataset = TimeSeriesDataset(data, timedelta(minutes=15)) + shifter = Shifter( + selection=FeatureSelection(include={"radiation"}), + source_aggregation_period=timedelta(minutes=60), + target_aggregation_period=timedelta(minutes=15), + fill_edges=True, + ) + + # Act + result = shifter.transform(dataset) + + # Assert + # Each grid point inherits the NaN status of its nearest shifted data point. + # For equidistant cases, pandas nearest picks the later (right) neighbor. + expected = pd.Series( + [np.nan, 250.0, 270.0, np.nan, np.nan, 330.0, 340.0, 340.0], + index=dataset.index, + name="radiation", + ) + pd.testing.assert_series_equal(result.data["radiation"], expected)