diff --git a/examples/benchmarks/liander_2024_benchmark_xgboost_gblinear.py b/examples/benchmarks/liander_2024_benchmark_xgboost_gblinear.py index 7ce3e33e1..9ff296c5d 100644 --- a/examples/benchmarks/liander_2024_benchmark_xgboost_gblinear.py +++ b/examples/benchmarks/liander_2024_benchmark_xgboost_gblinear.py @@ -20,7 +20,7 @@ import multiprocessing from pathlib import Path -from openstef_beam.benchmarking.baselines import ( +from openstef_beam.benchmarking.baselines.openstef4 import ( create_openstef4_preset_backtest_forecaster, ) from openstef_beam.benchmarking.benchmarks.liander2024 import Liander2024Category, create_liander2024_benchmark_runner diff --git a/examples/benchmarks/liander_2024_ensemble.py b/examples/benchmarks/liander_2024_ensemble.py index c00d2c5df..5760d35e6 100644 --- a/examples/benchmarks/liander_2024_ensemble.py +++ b/examples/benchmarks/liander_2024_ensemble.py @@ -23,7 +23,7 @@ from pathlib import Path from openstef_beam.backtesting.backtest_forecaster import BacktestForecasterConfig -from openstef_beam.benchmarking.baselines import ( +from openstef_beam.benchmarking.baselines.openstef4 import ( create_openstef4_preset_backtest_forecaster, ) from openstef_beam.benchmarking.benchmarks.liander2024 import Liander2024Category, create_liander2024_benchmark_runner diff --git a/packages/openstef-beam/pyproject.toml b/packages/openstef-beam/pyproject.toml index 5c33fb373..0db13d4b9 100644 --- a/packages/openstef-beam/pyproject.toml +++ b/packages/openstef-beam/pyproject.toml @@ -36,8 +36,13 @@ dependencies = [ ] optional-dependencies.all = [ + "openstef-beam[baselines]", "s3fs>=2025.5.1", ] +optional-dependencies.baselines = [ + "openstef-meta>=4.0.0.dev0,<5", + "openstef-models>=4.0.0.dev0,<5", +] urls.Documentation = "https://openstef.github.io/openstef/index.html" urls.Homepage = "https://lfenergy.org/projects/openstef/" urls.Issues = "https://github.com/OpenSTEF/openstef/issues" @@ -48,3 +53,5 @@ packages = [ "src/openstef_beam" ] [tool.uv.sources] openstef-core = { workspace = true } +openstef-models = { workspace = true } +openstef-meta = { workspace = true } diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/__init__.py b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/__init__.py index 19124fc22..6bf2d8d07 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/__init__.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/__init__.py @@ -1,20 +1,17 @@ """Benchmarks baselines used by the OpenSTEF Beam benchmarking utilities. This package exposes baseline forecasters for use in backtesting. +The OpenSTEF v4 baselines require ``openstef-models`` and ``openstef-meta``, +available via the ``baselines`` extra: ``pip install openstef-beam[baselines]``. + +Import directly from the submodule:: + + from openstef_beam.benchmarking.baselines.openstef4 import ( + OpenSTEF4BacktestForecaster, + create_openstef4_preset_backtest_forecaster, + ) """ # SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project # # SPDX-License-Identifier: MPL-2.0 - -from openstef_beam.benchmarking.baselines.openstef4 import ( - OpenSTEF4BacktestForecaster, - WorkflowCreationContext, - create_openstef4_preset_backtest_forecaster, -) - -__all__ = [ - "OpenSTEF4BacktestForecaster", - "WorkflowCreationContext", - "create_openstef4_preset_backtest_forecaster", -] diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py index f6ecb67db..d6328e7f2 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py @@ -2,16 +2,17 @@ # # SPDX-License-Identifier: MPL-2.0 -"""OpenSTEF 4.0 forecaster for backtesting pipelines.""" +"""OpenSTEF 4.0 forecaster for backtesting pipelines. + +Requires the ``baselines`` extra: ``pip install openstef-beam[baselines]``. +""" import logging -from collections.abc import Callable from datetime import timedelta from functools import partial from pathlib import Path -from typing import Any, cast, override +from typing import TYPE_CHECKING, Any, cast, override -import pandas as pd from pydantic import Field, PrivateAttr from pydantic_extra_types.coordinate import Coordinate @@ -27,24 +28,20 @@ BenchmarkTarget, ForecasterFactory, ) -from openstef_core.base_model import BaseConfig, BaseModel +from openstef_core.base_model import BaseModel from openstef_core.datasets import TimeSeriesDataset -from openstef_core.exceptions import FlatlinerDetectedError, NotFittedError +from openstef_core.exceptions import FlatlinerDetectedError, MissingExtraError, NotFittedError from openstef_core.types import Q -from openstef_meta.presets import EnsembleForecastingWorkflowConfig, create_ensemble_forecasting_workflow -from openstef_models.presets import ForecastingWorkflowConfig +from openstef_models.presets import ForecastingWorkflowConfig, create_forecasting_workflow +from openstef_models.presets.forecasting_workflow import LocationConfig +from openstef_models.workflows.callbacks.data_save import DataSaveCallback from openstef_models.workflows.custom_forecasting_workflow import ( CustomForecastingWorkflow, + ForecastingCallback, ) - -class WorkflowCreationContext(BaseConfig): - """Context information for workflow execution within backtesting.""" - - step_name: str | None = Field( - default=None, - description="Name of the current backtesting step.", - ) +if TYPE_CHECKING: + from openstef_meta.presets import EnsembleForecastingWorkflowConfig class OpenSTEF4BacktestForecaster(BaseModel, BacktestForecasterMixin): @@ -57,8 +54,8 @@ class OpenSTEF4BacktestForecaster(BaseModel, BacktestForecasterMixin): config: BacktestForecasterConfig = Field( description="Configuration for the backtest forecaster interface", ) - workflow_factory: Callable[[WorkflowCreationContext], CustomForecastingWorkflow] = Field( - description="Factory function that creates a new CustomForecastingWorkflow instance", + workflow_template: CustomForecastingWorkflow = Field( + description="Untrained workflow template; deep-copied for each fit() call", ) cache_dir: Path = Field( description="Directory to use for caching model artifacts during backtesting", @@ -71,6 +68,10 @@ class OpenSTEF4BacktestForecaster(BaseModel, BacktestForecasterMixin): default=False, description="When True, saves base forecaster prediction contributions for ensemble models", ) + extra_callbacks: list[ForecastingCallback] = Field( + default_factory=list[ForecastingCallback], + description="Additional callbacks to inject into workflows created by the factory.", + ) _workflow: CustomForecastingWorkflow | None = PrivateAttr(default=None) _is_flatliner_detected: bool = PrivateAttr(default=False) @@ -80,22 +81,27 @@ class OpenSTEF4BacktestForecaster(BaseModel, BacktestForecasterMixin): @override def model_post_init(self, context: Any) -> None: if self.debug or self.contributions: - self.cache_dir.mkdir(parents=True, exist_ok=True) + self.extra_callbacks.append( + DataSaveCallback( + cache_dir=self.cache_dir, + save_training_data=self.debug, + save_prepared_data=self.debug, + save_predict_data=self.debug, + save_forecast=self.debug, + save_contributions=self.contributions, + ) + ) @property @override def quantiles(self) -> list[Q]: - # Create a workflow instance if needed to get quantiles - if self._workflow is None: - self._workflow = self.workflow_factory(WorkflowCreationContext()) - - return self._workflow.model.quantiles + return self.workflow_template.model.quantiles @override def fit(self, data: RestrictedHorizonVersionedTimeSeries) -> None: - # Create a new workflow for this training cycle - context = WorkflowCreationContext(step_name=data.horizon.isoformat()) - workflow = self.workflow_factory(context) + # Deep-copy the template for a fresh model + workflow = self.workflow_template.with_run_name(data.horizon.isoformat()) + workflow.callbacks.extend(self.extra_callbacks) # Extract the dataset for training training_data = data.get_window( @@ -104,10 +110,6 @@ def fit(self, data: RestrictedHorizonVersionedTimeSeries) -> None: available_before=data.horizon, ) - if self.debug: - id_str = data.horizon.strftime("%Y%m%d%H%M%S") - training_data.to_parquet(path=self.cache_dir / f"debug_{id_str}_training.parquet") - try: # Use the workflow's fit method workflow.fit(data=training_data) @@ -119,12 +121,6 @@ def fit(self, data: RestrictedHorizonVersionedTimeSeries) -> None: self._workflow = workflow - if self.debug: - id_str = data.horizon.strftime("%Y%m%d%H%M%S") - self._workflow.model.prepare_input(training_data).to_parquet( - path=self.cache_dir / f"debug_{id_str}_prepared_training.parquet" - ) - @override def predict(self, data: RestrictedHorizonVersionedTimeSeries) -> TimeSeriesDataset | None: if self._is_flatliner_detected: @@ -150,72 +146,49 @@ def predict(self, data: RestrictedHorizonVersionedTimeSeries) -> TimeSeriesDatas self._logger.info("Flatliner detected during prediction") return None - if self.debug: - id_str = data.horizon.strftime("%Y%m%d%H%M%S") - predict_data.to_parquet(path=self.cache_dir / f"debug_{id_str}_predict.parquet") - forecast.to_parquet(path=self.cache_dir / f"debug_{id_str}_forecast.parquet") - - if self.contributions: - id_str = data.horizon.strftime("%Y%m%d%H%M%S") - try: - contributions = self._workflow.model.predict_contributions(predict_data, forecast_start=data.horizon) - except NotImplementedError: - pass - else: - df = pd.concat([contributions.data, forecast.data.drop(columns=["load"])], axis=1) - df.to_parquet(path=self.cache_dir / f"contrib_{id_str}_predict.parquet") return forecast -class OpenSTEF4PresetBacktestForecaster(OpenSTEF4BacktestForecaster): - pass - - def _preset_target_forecaster_factory( - base_config: ForecastingWorkflowConfig | EnsembleForecastingWorkflowConfig, + base_config: "ForecastingWorkflowConfig | EnsembleForecastingWorkflowConfig", backtest_config: BacktestForecasterConfig, cache_dir: Path, context: BenchmarkContext, target: BenchmarkTarget, ) -> OpenSTEF4BacktestForecaster: - from openstef_models.presets import create_forecasting_workflow # noqa: PLC0415 - from openstef_models.presets.forecasting_workflow import LocationConfig # noqa: PLC0415 - - # Factory function that creates a forecaster for a given target. - prefix = context.run_name - - def _create_workflow(context: WorkflowCreationContext) -> CustomForecastingWorkflow: - # Create a new workflow instance with fresh model. - location = LocationConfig( - name=target.name, - description=target.description, - coordinate=Coordinate( - latitude=target.latitude, - longitude=target.longitude, - ), - ) - - update = { - "model_id": f"{prefix}_{target.name}", - "location": location, - "run_name": context.step_name, - } + location = LocationConfig( + name=target.name, + description=target.description, + coordinate=Coordinate( + latitude=target.latitude, + longitude=target.longitude, + ), + ) - if isinstance(base_config, EnsembleForecastingWorkflowConfig): - return create_ensemble_forecasting_workflow(config=base_config.model_copy(update=update)) + update: dict[str, Any] = { + "model_id": f"{context.run_name}_{target.name}", + "location": location, + } - return create_forecasting_workflow(config=base_config.model_copy(update=update)) + if base_config.kind == "ensemble": + try: + from openstef_meta.presets import create_ensemble_forecasting_workflow # noqa: PLC0415 + except ImportError as e: + raise MissingExtraError("openstef-meta") from e + workflow = create_ensemble_forecasting_workflow(config=base_config.model_copy(update=update)) + else: + workflow = create_forecasting_workflow(config=base_config.model_copy(update=update)) return OpenSTEF4BacktestForecaster( config=backtest_config, - workflow_factory=_create_workflow, + workflow_template=workflow, debug=False, cache_dir=cache_dir / f"{context.run_name}_{target.name}", ) def create_openstef4_preset_backtest_forecaster( - workflow_config: ForecastingWorkflowConfig | EnsembleForecastingWorkflowConfig, + workflow_config: "ForecastingWorkflowConfig | EnsembleForecastingWorkflowConfig", backtest_config: BacktestForecasterConfig | None = None, cache_dir: Path = Path("cache"), ) -> ForecasterFactory[BenchmarkTarget]: @@ -258,6 +231,5 @@ def create_openstef4_preset_backtest_forecaster( __all__ = [ "OpenSTEF4BacktestForecaster", - "WorkflowCreationContext", "create_openstef4_preset_backtest_forecaster", ] diff --git a/packages/openstef-beam/src/openstef_beam/evaluation/models/subset.py b/packages/openstef-beam/src/openstef_beam/evaluation/models/subset.py index 16a5e7f63..1731f3fcd 100644 --- a/packages/openstef-beam/src/openstef_beam/evaluation/models/subset.py +++ b/packages/openstef-beam/src/openstef_beam/evaluation/models/subset.py @@ -65,6 +65,23 @@ def get_metric(self, quantile: QuantileOrGlobal, metric_name: str) -> FloatOrNan """ return self.metrics.get(quantile, {}).get(metric_name) + def to_flat_dict(self, prefix: str = "") -> dict[str, float]: + """Flatten metrics into a single dict suitable for logging (e.g. MLflow). + + Each key is ``{prefix}{quantile}_{metric_name}``. + + Args: + prefix: String prepended to every key. + + Returns: + Flat mapping of metric names to values. + """ + return { + f"{prefix}{quantile}_{metric_name}": value + for quantile, metrics_dict in self.metrics.items() + for metric_name, value in metrics_dict.items() + } + def merge_quantile_metrics(metrics_list: list[QuantileMetricsDict]) -> QuantileMetricsDict: """Merge multiple quantile metrics dictionaries into a single one. diff --git a/packages/openstef-beam/tests/unit/benchmarking/baselines/__init__.py b/packages/openstef-beam/tests/unit/benchmarking/baselines/__init__.py new file mode 100644 index 000000000..7b9e0469f --- /dev/null +++ b/packages/openstef-beam/tests/unit/benchmarking/baselines/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 diff --git a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py new file mode 100644 index 000000000..e9b66591a --- /dev/null +++ b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py @@ -0,0 +1,111 @@ +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +from datetime import UTC, datetime, timedelta +from pathlib import Path + +import pytest + +from openstef_beam.backtesting.restricted_horizon_timeseries import RestrictedHorizonVersionedTimeSeries +from openstef_beam.benchmarking.baselines.openstef4 import create_openstef4_preset_backtest_forecaster +from openstef_beam.benchmarking.benchmark_pipeline import BenchmarkContext, BenchmarkTarget +from openstef_core.datasets import VersionedTimeSeriesDataset +from openstef_core.testing import create_synthetic_forecasting_dataset +from openstef_core.types import LeadTime, Q +from openstef_models.presets import ForecastingWorkflowConfig + + +@pytest.fixture +def xgboost_config() -> ForecastingWorkflowConfig: + return ForecastingWorkflowConfig( + model_id="test_xgb", + model="xgboost", + horizons=[LeadTime.from_string("PT24H")], + quantiles=[Q(0.5)], + ) + + +@pytest.fixture +def benchmark_target() -> BenchmarkTarget: + now = datetime(2024, 6, 1, tzinfo=UTC) + return BenchmarkTarget( + name="target_1", + description="test", + latitude=52.0, + longitude=5.0, + limit=100.0, + benchmark_start=now - timedelta(days=365), + benchmark_end=now, + train_start=now - timedelta(days=730), + ) + + +@pytest.fixture +def training_data() -> VersionedTimeSeriesDataset: + """Synthetic 15-min data covering 120 days.""" + ts = create_synthetic_forecasting_dataset( + start=datetime(2024, 2, 1, tzinfo=UTC), + length=timedelta(days=120), + sample_interval=timedelta(minutes=15), + include_atmosphere=True, + include_price=True, + include_available_at=True, + ) + return VersionedTimeSeriesDataset([ts]) + + +def test_fit_does_not_mutate_template( + xgboost_config: ForecastingWorkflowConfig, + benchmark_target: BenchmarkTarget, + training_data: VersionedTimeSeriesDataset, + tmp_path: Path, +): + """fit() should train a deep copy; the template must remain untouched.""" + # Arrange + factory = create_openstef4_preset_backtest_forecaster( + workflow_config=xgboost_config, + cache_dir=tmp_path / "test_no_mutate", + ) + forecaster = factory(BenchmarkContext(run_name="run"), benchmark_target) + template_model_id_before = id(forecaster.workflow_template.model) + + horizon = datetime(2024, 5, 25, tzinfo=UTC) + rhvts = RestrictedHorizonVersionedTimeSeries(dataset=training_data, horizon=horizon) + + # Act + forecaster.fit(rhvts) + + # Assert — template model object identity unchanged + assert id(forecaster.workflow_template.model) == template_model_id_before + assert forecaster.workflow_template.run_name is None + + # The fitted workflow should be different from the template + assert forecaster._workflow is not forecaster.workflow_template + assert forecaster._workflow is not None + assert forecaster._workflow.run_name == horizon.isoformat() + + +def test_fit_then_predict_returns_forecast( + xgboost_config: ForecastingWorkflowConfig, + benchmark_target: BenchmarkTarget, + training_data: VersionedTimeSeriesDataset, + tmp_path: Path, +): + """End-to-end: fit then predict should return a ForecastDataset.""" + # Arrange + factory = create_openstef4_preset_backtest_forecaster( + workflow_config=xgboost_config, + cache_dir=tmp_path / "test_e2e", + ) + forecaster = factory(BenchmarkContext(run_name="e2e"), benchmark_target) + horizon = datetime(2024, 5, 25, tzinfo=UTC) + rhvts = RestrictedHorizonVersionedTimeSeries(dataset=training_data, horizon=horizon) + + # Act + forecaster.fit(rhvts) + result = forecaster.predict(rhvts) + + # Assert + assert result is not None + assert len(result.data) > 0 diff --git a/packages/openstef-core/src/openstef_core/testing.py b/packages/openstef-core/src/openstef_core/testing.py index e7619fdee..66e98ab15 100644 --- a/packages/openstef-core/src/openstef_core/testing.py +++ b/packages/openstef-core/src/openstef_core/testing.py @@ -84,6 +84,10 @@ def create_synthetic_forecasting_dataset( # noqa: PLR0913, PLR0917 - complex fu radiation_influence: float | None = -0.2, stochastic_influence: float | None = 0.1, other_components: dict[str, float] | None = None, + *, + include_atmosphere: bool = False, + include_price: bool = False, + include_available_at: bool = False, ) -> TimeSeriesDataset: """Create synthetic forecasting dataset for testing. @@ -99,6 +103,9 @@ def create_synthetic_forecasting_dataset( # noqa: PLR0913, PLR0917 - complex fu radiation_influence: Coefficient for radiation component on load. stochastic_influence: Coefficient for random noise component. other_components: Additional components with their influence coefficients. + include_atmosphere: Add ``pressure`` (~1013) and ``relative_humidity`` (~70%) columns. + include_price: Add ``day_ahead_electricity_price`` (~50) column. + include_available_at: Add ``available_at`` column (index + sample_interval). Returns: TimeSeriesDataset containing synthetic load and component data. @@ -124,11 +131,21 @@ def create_synthetic_forecasting_dataset( # noqa: PLR0913, PLR0917 - complex fu load += component * influence components[component_name] = component + extras: dict[str, Any] = {} + if include_atmosphere: + extras["pressure"] = 1013.0 + rng.normal(0, 5, len(timestamps)) + extras["relative_humidity"] = 70.0 + rng.normal(0, 10, len(timestamps)) + if include_price: + extras["day_ahead_electricity_price"] = 50.0 + rng.normal(0, 10, len(timestamps)) + if include_available_at: + extras["available_at"] = timestamps + sample_interval + return TimeSeriesDataset( data=pd.DataFrame( data={ "load": load, **components, + **extras, }, index=timestamps, ), diff --git a/packages/openstef-core/src/openstef_core/types.py b/packages/openstef-core/src/openstef_core/types.py index 6c989c84d..fa01686e6 100644 --- a/packages/openstef-core/src/openstef_core/types.py +++ b/packages/openstef-core/src/openstef_core/types.py @@ -119,11 +119,12 @@ def to_hours(self) -> float: class AvailableAt(PydanticStringPrimitive): """Represents a time point available relative to a reference day. - Uses a specialized string format 'DnTHH:MM' where: + Uses a specialized string format 'DnTHHMM' where: - n is the day offset (negative indicates prior days) - - HH:MM is the time of day + - HHMM is the time of day - For example, 'D-1T06:00' means "6:00 AM on the previous day". + For example, 'D-1T0600' means "6:00 AM on the previous day". + The legacy 'DnTHH:MM' format (with colon) is also accepted by from_string(). Example: Creating and using availability times: @@ -132,7 +133,7 @@ class AvailableAt(PydanticStringPrimitive): >>> # Available at 6 AM on the previous day >>> at = AvailableAt(timedelta(hours=18)) # 18 hours before day end >>> str(at) - 'D-1T06:00' + 'D-1T0600' >>> # Available at midnight of the current day >>> AvailableAt.from_string('D0T00:00').lag_from_day datetime.timedelta(0) @@ -143,21 +144,21 @@ def __init__(self, lag_from_day: timedelta): self.lag_from_day = lag_from_day def __str__(self) -> str: - """Converts to string in 'DnTHH:MM' format. + """Converts to string in 'DnTHHMM' format (Windows-safe, no colon). Returns: - String representation in 'DnTHH:MM' format. + String representation in 'DnTHHMM' format. """ lag_days = -int(self.lag_from_day / timedelta(days=1)) - 1 time = timedelta(hours=24) - (self.lag_from_day % timedelta(days=1)) - return f"D{lag_days}T{time.seconds // 3600:02}:{(time.seconds // 60) % 60:02}" + return f"D{lag_days}T{time.seconds // 3600:02}{(time.seconds // 60) % 60:02}" @classmethod def from_string(cls, s: str) -> Self: - """Creates an instance from a string in 'DnTHH:MM' format. + """Creates an instance from a string in 'DnTHHMM' or 'DnTHH:MM' format. Args: - s: String in 'DnTHH:MM' format to parse. + s: String in 'DnTHHMM' or 'DnTHH:MM' format to parse. Returns: AvailableAt instance parsed from the string. @@ -165,7 +166,7 @@ def from_string(cls, s: str) -> Self: Raises: ValueError: If the string format is invalid. """ - match = re.match(r"D(-?\d+)T(\d{2}):(\d{2})", s) + match = re.match(r"D(-?\d+)T(\d{2}):?(\d{2})", s) if not match: error_message = f"Cannot convert {s} to {cls.__name__}" raise ValueError(error_message) diff --git a/packages/openstef-core/src/openstef_core/utils/pandas.py b/packages/openstef-core/src/openstef_core/utils/pandas.py index f714962e7..9466c353e 100644 --- a/packages/openstef-core/src/openstef_core/utils/pandas.py +++ b/packages/openstef-core/src/openstef_core/utils/pandas.py @@ -46,6 +46,21 @@ def unsafe_sorted_range_slice_idxs( return int(start_idx), int(end_idx) +def normalize_to_unit_sum(df: pd.DataFrame) -> pd.DataFrame: + """Normalize each column so absolute values sum to 1.0. + + Pipe-compatible: ``df.pipe(normalize_to_unit_sum)``. + + Columns that sum to zero are left as zeros (no NaN). + + Returns: + DataFrame with the same shape, each column normalized to unit sum. + """ + abs_values = df.abs() + totals = abs_values.sum(axis=0).replace(to_replace=0, value=1.0) # pyright: ignore[reportUnknownMemberType] + return abs_values / totals + + def combine_timeseries_indexes(indexes: Sequence[pd.DatetimeIndex]) -> pd.DatetimeIndex: """Combine multiple datetime indexes into a single sorted index. diff --git a/packages/openstef-core/tests/unit/test_types.py b/packages/openstef-core/tests/unit/test_types.py index b1f3af49f..0cbb40077 100644 --- a/packages/openstef-core/tests/unit/test_types.py +++ b/packages/openstef-core/tests/unit/test_types.py @@ -48,8 +48,8 @@ def test_lead_time_from_string_roundtrip(input_delta: timedelta): @pytest.mark.parametrize( ("lag_from_day", "expected_string"), [ - pytest.param(timedelta(hours=18), "D-1T06:00", id="D-1T06:00"), - pytest.param(timedelta(hours=12 + 24), "D-2T12:00", id="D-2T12:00"), + pytest.param(timedelta(hours=18), "D-1T0600", id="D-1T0600"), + pytest.param(timedelta(hours=12 + 24), "D-2T1200", id="D-2T1200"), ], ) def test_available_at_str(lag_from_day: timedelta, expected_string: str): diff --git a/packages/openstef-meta/pyproject.toml b/packages/openstef-meta/pyproject.toml index 0f620e63b..51519ea2c 100644 --- a/packages/openstef-meta/pyproject.toml +++ b/packages/openstef-meta/pyproject.toml @@ -2,6 +2,11 @@ # # SPDX-License-Identifier: MPL-2.0 +[build-system] +build-backend = "hatchling.build" + +requires = [ "hatchling" ] + [project] name = "openstef-meta" version = "0.0.0" diff --git a/packages/openstef-meta/src/openstef_meta/models/ensemble_forecasting_model.py b/packages/openstef-meta/src/openstef_meta/models/ensemble_forecasting_model.py index f107dd586..2d9bf4937 100644 --- a/packages/openstef-meta/src/openstef_meta/models/ensemble_forecasting_model.py +++ b/packages/openstef-meta/src/openstef_meta/models/ensemble_forecasting_model.py @@ -28,6 +28,7 @@ from openstef_core.mixins import HyperParams, TransformPipeline from openstef_core.types import LeadTime, Quantile from openstef_meta.models.forecast_combiners.forecast_combiner import ForecastCombiner +from openstef_models.explainability.mixins import ExplainableForecaster from openstef_models.models.forecasting.forecaster import Forecaster from openstef_models.models.forecasting_model import BaseForecastingModel, ModelFitResult, restore_target @@ -43,6 +44,18 @@ class EnsembleModelFitResult(ModelFitResult): forecaster_fit_results: dict[str, ModelFitResult] = Field(description="ModelFitResult for each base forecaster") + @override + def metrics_to_flat_dict(self) -> dict[str, float]: + result = super().metrics_to_flat_dict() + for name, child in self.forecaster_fit_results.items(): + result.update({f"{name}_{k}": v for k, v in child.metrics_to_flat_dict().items()}) + return result + + @property + @override + def component_fit_results(self) -> dict[str, ModelFitResult]: + return self.forecaster_fit_results + class EnsembleForecastingModel(BaseForecastingModel): """Ensemble forecasting pipeline: common preprocessing -> N forecasters -> combiner. @@ -124,8 +137,12 @@ def _validate_horizons_consistent(self) -> Self: Validated model instance. Raises: - ValueError: If any forecaster's horizons differ from the combiner's. + ValueError: If forecasters dict is empty or any forecaster's horizons differ from the combiner's. """ + if not self.forecasters: + msg = "At least one forecaster is required." + raise ValueError(msg) + expected = sorted(self.combiner.horizons) for name, forecaster in self.forecasters.items(): if sorted(forecaster.horizons) != expected: @@ -161,6 +178,23 @@ def hyperparams(self) -> HyperParams: def is_fitted(self) -> bool: return all(f.is_fitted for f in self.forecasters.values()) and self.combiner.is_fitted + @property + @override + def component_hyperparams(self) -> dict[str, HyperParams]: + return {name: f.hparams for name, f in self.forecasters.items()} + + @override + def get_explainable_components(self) -> dict[str, ExplainableForecaster]: + components: dict[str, ExplainableForecaster] = { + name: forecaster + for name, forecaster in self.forecasters.items() + if isinstance(forecaster, ExplainableForecaster) + } + # ForecastCombiner is always ExplainableForecaster, but skip if importances are empty + if not self.combiner.feature_importances.empty: + components["combiner"] = self.combiner + return components + @property def forecaster_names(self) -> list[str]: """Returns the names of the underlying forecasters.""" @@ -231,6 +265,8 @@ def _combine_datasets( ) def _transform_combiner_data(self, data: TimeSeriesDataset) -> ForecastInputDataset | None: + # Returns None when no combiner preprocessing is configured, signalling the combiner + # should work without additional features. if len(self.combiner_preprocessing.transforms) == 0: return None combiner_data = self.combiner_preprocessing.transform(data) @@ -242,7 +278,8 @@ def _fit_prepare_combiner_data( data_val: TimeSeriesDataset | None = None, data_test: TimeSeriesDataset | None = None, ) -> tuple[ForecastInputDataset | None, ForecastInputDataset | None, ForecastInputDataset | None]: - + # Fits combiner preprocessing on train data and transforms all splits. + # Returns (None, None, None) when no combiner preprocessing is configured. if len(self.combiner_preprocessing.transforms) == 0: return None, None, None self.combiner_preprocessing.fit(data=data) @@ -281,7 +318,8 @@ def _fit_forecasters( EnsembleForecastDataset | None, dict[str, ModelFitResult], ]: - + # Fits common + per-forecaster preprocessing, trains each forecaster, + # and bundles their in-sample predictions into EnsembleForecastDatasets. predictions_train: dict[str, ForecastDataset] = {} predictions_val: dict[str, ForecastDataset | None] = {} predictions_test: dict[str, ForecastDataset | None] = {} @@ -291,10 +329,8 @@ def _fit_forecasters( self.preprocessing.fit(data=data) data_transformed = self.preprocessing.transform(data=data) # Fit per-forecaster transforms on the common-preprocessed output (not raw data) - [ + for name in self.model_specific_preprocessing: self.model_specific_preprocessing[name].fit(data=data_transformed) - for name in self.model_specific_preprocessing - ] logger.debug("Completed fitting preprocessing pipelines.") # Fit the forecasters @@ -414,6 +450,7 @@ def _fit_forecaster( return prediction_train, prediction_val, prediction_test, result def _predict_forecaster(self, input_data: ForecastInputDataset, forecaster_name: str) -> ForecastDataset: + # Postprocessing is applied per-forecaster so the combiner sees final-scale predictions. logger.debug("Predicting forecaster '%s'.", forecaster_name) prediction_raw = self.forecasters[forecaster_name].predict(data=input_data) # Apply postprocessing per-forecaster so the combiner sees final-scale predictions @@ -445,7 +482,7 @@ def prepare_forecaster_input( """Prepare input data for a specific base forecaster. Applies common preprocessing, then model-specific preprocessing, restores - the target column, and trims history. + the target column, and trims history via the shared base ``prepare_input``. Args: data: Raw time series dataset. @@ -456,29 +493,32 @@ def prepare_forecaster_input( Processed forecast input dataset ready for the named forecaster. """ logger.debug("Preparing input data for forecaster '%s'.", forecaster_name) - input_data = self.preprocessing.transform(data=data) + # Apply model-specific preprocessing on top of the common pipeline if forecaster_name in self.model_specific_preprocessing: logger.debug("Applying model-specific preprocessing for forecaster '%s'.", forecaster_name) - input_data = self.model_specific_preprocessing[forecaster_name].transform(data=input_data) - input_data = restore_target(dataset=input_data, original_dataset=data, target_column=self.target_column) - - # Cut away input history to avoid training on incomplete data - input_data_start = cast("pd.Series[pd.Timestamp]", input_data.index).min().to_pydatetime() - input_data_cutoff = input_data_start + self.cutoff_history - if forecast_start is not None and forecast_start < input_data_cutoff: - input_data_cutoff = forecast_start - self._logger.warning( - "Forecast start %s is after input data start + cutoff history %s. Using forecast start as cutoff.", - forecast_start, - input_data_cutoff, + preprocessed = self.preprocessing.transform(data=data) + preprocessed = self.model_specific_preprocessing[forecaster_name].transform(data=preprocessed) + preprocessed = restore_target(dataset=preprocessed, original_dataset=data, target_column=self.target_column) + # Apply cutoff and create ForecastInputDataset + input_data_start = cast("pd.Series[pd.Timestamp]", preprocessed.index).min().to_pydatetime() + input_data_cutoff = input_data_start + self.cutoff_history + if forecast_start is not None and forecast_start < input_data_cutoff: + input_data_cutoff = forecast_start + self._logger.warning( + "Forecast start %s is before input data start + cutoff history %s. Using forecast start as cutoff.", + forecast_start, + input_data_cutoff, + ) + preprocessed = preprocessed.filter_by_range(start=input_data_cutoff) + + return ForecastInputDataset.from_timeseries( + dataset=preprocessed, + target_column=self.target_column, + forecast_start=forecast_start, ) - input_data = input_data.filter_by_range(start=input_data_cutoff) - return ForecastInputDataset.from_timeseries( - dataset=input_data, - target_column=self.target_column, - forecast_start=forecast_start, - ) + # No model-specific preprocessing — delegate entirely to shared base method + return self.prepare_input(data=data, forecast_start=forecast_start) def _predict_transform_combiner( self, ensemble_dataset: EnsembleForecastDataset, original_data: TimeSeriesDataset @@ -506,7 +546,6 @@ def _fit_combiner( val_ensemble_dataset: EnsembleForecastDataset | None = None, test_ensemble_dataset: EnsembleForecastDataset | None = None, ) -> ModelFitResult: - # Prepare additional features for the combiner (e.g. sample weights) — split separately from ensemble data features_train, features_val, features_test = self._fit_prepare_combiner_data( data=data, data_val=data_val, data_test=data_test @@ -556,11 +595,6 @@ def _predict_contributions_combiner( features = self._transform_combiner_data(data=original_data) return self.combiner.predict_contributions(ensemble_dataset, additional_features=features) - @override - def _predict(self, input_data: ForecastInputDataset) -> ForecastDataset: - msg = "EnsembleForecastingModel does not support single-input _predict; use predict() instead." - raise NotImplementedError(msg) - @override def predict(self, data: TimeSeriesDataset, forecast_start: datetime | None = None) -> ForecastDataset: """Generate forecasts for the provided dataset. diff --git a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/forecast_combiner.py b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/forecast_combiner.py index a2ca8fb5a..b3244088f 100644 --- a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/forecast_combiner.py +++ b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/forecast_combiner.py @@ -10,7 +10,6 @@ from abc import ABC, abstractmethod from typing import Self -import pandas as pd from pydantic import ConfigDict, Field from openstef_core.base_model import BaseConfig @@ -19,9 +18,10 @@ from openstef_core.mixins import Predictor from openstef_core.mixins.predictor import HyperParams from openstef_core.types import LeadTime, Quantile +from openstef_models.explainability.mixins import ExplainableForecaster -class ForecastCombiner(BaseConfig, Predictor[EnsembleForecastDataset, ForecastDataset], ABC): +class ForecastCombiner(BaseConfig, Predictor[EnsembleForecastDataset, ForecastDataset], ExplainableForecaster, ABC): """Combines base Forecaster predictions for each quantile into final predictions. Subclasses implement specific combination strategies (stacking, learned weights, @@ -127,8 +127,3 @@ def predict_contributions( Returns: TimeSeriesDataset where columns are features/models and rows are timesteps. """ - - @property - @abstractmethod - def feature_importances(self) -> pd.DataFrame: - """Aggregate feature importances from the combiner's internal models.""" diff --git a/packages/openstef-meta/src/openstef_meta/presets/forecasting_workflow.py b/packages/openstef-meta/src/openstef_meta/presets/forecasting_workflow.py index b24c3080b..d10916eba 100644 --- a/packages/openstef-meta/src/openstef_meta/presets/forecasting_workflow.py +++ b/packages/openstef-meta/src/openstef_meta/presets/forecasting_workflow.py @@ -74,6 +74,7 @@ class EnsembleForecastingWorkflowConfig(BaseConfig): """Configuration for ensemble forecasting workflows.""" + kind: Literal["ensemble"] = Field(default="ensemble", description="Discriminator tag for config type.") model_id: ModelIdentifier # Ensemble configuration diff --git a/packages/openstef-models/src/openstef_models/integrations/mlflow/__init__.py b/packages/openstef-models/src/openstef_models/integrations/mlflow/__init__.py index b95ebd898..3de518393 100644 --- a/packages/openstef-models/src/openstef_models/integrations/mlflow/__init__.py +++ b/packages/openstef-models/src/openstef_models/integrations/mlflow/__init__.py @@ -17,16 +17,10 @@ from .mlflow_storage import MLFlowStorage from .mlflow_storage_callback import ( - EnsembleFitResult, - EnsembleModel, - ExplainableEnsembleModel, MLFlowStorageCallback, ) __all__ = [ - "EnsembleFitResult", - "EnsembleModel", - "ExplainableEnsembleModel", "MLFlowStorage", "MLFlowStorageCallback", ] diff --git a/packages/openstef-models/src/openstef_models/integrations/mlflow/mlflow_storage_callback.py b/packages/openstef-models/src/openstef_models/integrations/mlflow/mlflow_storage_callback.py index 6eefe3ae2..558e79bd6 100644 --- a/packages/openstef-models/src/openstef_models/integrations/mlflow/mlflow_storage_callback.py +++ b/packages/openstef-models/src/openstef_models/integrations/mlflow/mlflow_storage_callback.py @@ -5,25 +5,29 @@ """MLflow integration for tracking and storing forecasting workflows. Provides a single callback for logging model training runs, artifacts, -and metrics to MLflow. Supports both single-model (ForecastingModel) and -ensemble (EnsembleForecastingModel) workflows via protocol-based dispatch. - -Ensemble-specific behavior is enabled automatically when the model satisfies -the ``EnsembleModel`` and ``ExplainableEnsembleModel`` protocols, and when the -fit result satisfies ``EnsembleFitResult``: - -- Logs combiner hyperparameters as the primary hyperparams -- Logs per-forecaster hyperparameters with name-prefixed keys -- Stores per-forecaster training data as separate artifacts -- Logs per-forecaster evaluation metrics with name-prefixed keys -- Stores feature importance plots for each explainable forecaster component -- Stores combiner feature importance plots +and metrics to MLflow. The callback is model-agnostic — it delegates to +polymorphic methods on ``BaseForecastingModel`` and ``ModelFitResult`` so +it works unchanged for both single-model and ensemble workflows. + +Key behaviours: + +- Logs model hyperparameters, plus per-component hyperparameters via + ``model.component_hyperparams`` (e.g. per-forecaster in an ensemble). +- Stores training data, plus per-component datasets via + ``result.component_fit_results``. +- Collects evaluation metrics via ``result.metrics_to_flat_dict()``; + subclasses embed child metrics automatically. +- Stores feature-importance plots for every explainable component + exposed by ``model.get_explainable_components()``. +- Supports model reuse (skip re-fit if a recent run exists) and + model selection (keep the better model based on a configurable metric + with a bias-towards-newer penalty). """ import logging from datetime import UTC, datetime, timedelta from pathlib import Path -from typing import Any, Protocol, cast, override, runtime_checkable +from typing import Any, cast, override from mlflow.entities import Run from pydantic import Field, PrivateAttr @@ -41,52 +45,20 @@ SkipFitting, ) from openstef_core.types import Q, QuantileOrGlobal -from openstef_models.explainability import ExplainableForecaster from openstef_models.integrations.mlflow.mlflow_storage import MLFlowStorage from openstef_models.mixins.callbacks import WorkflowContext -from openstef_models.models.forecasting.forecaster import Forecaster -from openstef_models.models.forecasting_model import BaseForecastingModel, ForecastingModel, ModelFitResult +from openstef_models.models.forecasting_model import BaseForecastingModel, ModelFitResult from openstef_models.workflows.custom_forecasting_workflow import ( CustomForecastingWorkflow, ForecastingCallback, ) -@runtime_checkable -class EnsembleModel(Protocol): - """Protocol for ensemble models with multiple base forecasters.""" - - @property - def forecasters(self) -> dict[str, Forecaster]: - """Return a dictionary of forecasters keyed by name.""" - ... - - -@runtime_checkable -class ExplainableEnsembleModel(Protocol): - """Protocol for ensemble models with an explainable forecast combiner.""" - - @property - def combiner(self) -> ExplainableForecaster: - """Return the explainable forecast combiner.""" - ... - - -@runtime_checkable -class EnsembleFitResult(Protocol): - """Protocol for fit results that contain per-forecaster results.""" - - @property - def forecaster_fit_results(self) -> dict[str, ModelFitResult]: - """Return per-forecaster fit results.""" - ... - - class MLFlowStorageCallback(BaseConfig, ForecastingCallback): """MLFlow callback for logging forecasting workflow events. - Handles both single-model and ensemble workflows via protocol-based - dispatch. + Model-agnostic: delegates to polymorphic methods on the model and fit result + for child hyperparams, child data, metrics, and feature importances. """ storage: MLFlowStorage = Field(default_factory=MLFlowStorage) @@ -161,19 +133,23 @@ def on_fit_end( run_id: str = run.info.run_id self._logger.info("Created MLflow run %s for model %s", run_id, context.workflow.model_id) - # Log per-forecaster hyperparams for ensemble models - if isinstance(context.workflow.model, EnsembleModel): - self._log_forecaster_hyperparams(context.workflow.model, run_id) + # Log per-component hyperparams + for name, hparams in context.workflow.model.component_hyperparams.items(): + prefixed = {f"{name}.{k}": str(v) for k, v in hparams.model_dump().items()} + self.storage.log_hyperparams(run_id=run_id, params=prefixed) - # Store the model input and per-forecaster data + # Store the model input data run_path = self.storage.get_artifacts_path(model_id=context.workflow.model_id, run_id=run_id) data_path = run_path / self.storage.data_path data_path.mkdir(parents=True, exist_ok=True) result.input_dataset.to_parquet(path=data_path / "data.parquet") self._logger.info("Stored training data at %s for run %s", data_path, run_id) - if isinstance(result, EnsembleFitResult): - self._store_forecaster_data(result.forecaster_fit_results, data_path) + # Store per-component training data + for name, component_result in result.component_fit_results.items(): + component_path = data_path / name + component_path.mkdir(parents=True, exist_ok=True) + component_result.input_dataset.to_parquet(path=component_path / "data.parquet") # Store feature importance plots if self.store_feature_importance_plot: @@ -188,7 +164,7 @@ def on_fit_end( self._logger.info("Stored trained model for run %s", run_id) # Format the metrics for MLflow - metrics = self._collect_metrics(result) + metrics = result.metrics_to_flat_dict() # Mark the run as finished self.storage.finalize_run(model_id=context.workflow.model_id, run_id=run_id, metrics=metrics) @@ -267,65 +243,14 @@ def _run_model_selection(self, workflow: CustomForecastingWorkflow, result: Mode ) raise SkipFitting("New model did not improve monitored metric, skipping re-fit.") - def _log_forecaster_hyperparams(self, model: EnsembleModel, run_id: str) -> None: - """Log per-forecaster hyperparameters to the run.""" - for name, forecaster in model.forecasters.items(): - prefixed_params = {f"{name}.{k}": str(v) for k, v in forecaster.hparams.model_dump().items()} - self.storage.log_hyperparams(run_id=run_id, params=prefixed_params) - self._logger.debug("Logged hyperparams for forecaster '%s' in run %s", name, run_id) - - def _store_forecaster_data(self, forecaster_fit_results: dict[str, ModelFitResult], data_path: Path) -> None: - """Store per-forecaster training data as separate parquet files.""" - for name, forecaster_result in forecaster_fit_results.items(): - forecaster_data_path = data_path / name - forecaster_data_path.mkdir(parents=True, exist_ok=True) - forecaster_result.input_dataset.to_parquet(path=forecaster_data_path / "data.parquet") - self._logger.debug("Stored training data for forecaster '%s' at %s", name, forecaster_data_path) - - def _collect_metrics(self, result: ModelFitResult) -> dict[str, float]: - """Collect all metrics from the fit result, including per-forecaster metrics for ensembles. - - Returns: - Flat dictionary mapping metric names to values, including per-forecaster prefixed metrics. - """ - metrics = self.metrics_to_dict(metrics=result.metrics_full, prefix="full_") - metrics.update(self.metrics_to_dict(metrics=result.metrics_train, prefix="train_")) - if result.metrics_val is not None: - metrics.update(self.metrics_to_dict(metrics=result.metrics_val, prefix="val_")) - if result.metrics_test is not None: - metrics.update(self.metrics_to_dict(metrics=result.metrics_test, prefix="test_")) - - if isinstance(result, EnsembleFitResult): - for name, forecaster_result in result.forecaster_fit_results.items(): - metrics.update(self.metrics_to_dict(metrics=forecaster_result.metrics_full, prefix=f"{name}_full_")) - metrics.update(self.metrics_to_dict(metrics=forecaster_result.metrics_train, prefix=f"{name}_train_")) - if forecaster_result.metrics_val is not None: - metrics.update(self.metrics_to_dict(metrics=forecaster_result.metrics_val, prefix=f"{name}_val_")) - if forecaster_result.metrics_test is not None: - metrics.update(self.metrics_to_dict(metrics=forecaster_result.metrics_test, prefix=f"{name}_test_")) - - return metrics - @staticmethod def _store_feature_importances(model: BaseForecastingModel, data_path: Path) -> None: - """Store feature importance plots for all explainable components of the model.""" - if isinstance(model, EnsembleModel): - # Ensemble model: store per-forecaster feature importances - for name, forecaster in model.forecasters.items(): - if isinstance(forecaster, ExplainableForecaster): - fig = forecaster.plot_feature_importances() - fig.write_html(data_path / f"feature_importances_{name}.html") # pyright: ignore[reportUnknownMemberType] - elif isinstance(model, ForecastingModel) and isinstance(model.forecaster, ExplainableForecaster): - # Single model: store feature importance - fig = model.forecaster.plot_feature_importances() - fig.write_html(data_path / "feature_importances.html") # pyright: ignore[reportUnknownMemberType] - - # Store combiner feature importances (if model has an explainable combiner) - if isinstance(model, ExplainableEnsembleModel): - combiner_fi = model.combiner.feature_importances - if not combiner_fi.empty: - fig = model.combiner.plot_feature_importances() - fig.write_html(data_path / "feature_importances_combiner.html") # pyright: ignore[reportUnknownMemberType] + for name, component in model.get_explainable_components().items(): + if component.feature_importances.empty: + continue + suffix = f"_{name}" if name else "" + fig = component.plot_feature_importances() + fig.write_html(data_path / f"feature_importances{suffix}.html") # pyright: ignore[reportUnknownMemberType] def _find_run(self, model_id: str, run_name: str | None) -> Run | None: """Find an MLflow run by model_id and optional run_name. @@ -441,35 +366,18 @@ def _check_is_new_model_better( quantile, ) + # Penalty biases selection towards newer models: + # higher_is_better: lower the bar by dividing old metric by penalty + # lower_is_better: raise the bar by multiplying old metric by penalty match direction: case "higher_is_better" if new_metric >= old_metric / self.model_selection_old_model_penalty: return True - case "lower_is_better" if new_metric <= old_metric / self.model_selection_old_model_penalty: + case "lower_is_better" if new_metric <= old_metric * self.model_selection_old_model_penalty: return True case _: return False - @staticmethod - def metrics_to_dict(metrics: SubsetMetric, prefix: str) -> dict[str, float]: - """Convert SubsetMetric to a flat dictionary for MLflow logging. - - Args: - metrics: The metrics to convert. - prefix: Prefix to add to each metric key (e.g. "full_", "train_"). - - Returns: - Flat dictionary mapping metric names to values. - """ - return { - f"{prefix}{quantile}_{metric_name}": value - for quantile, metrics_dict in metrics.metrics.items() - for metric_name, value in metrics_dict.items() - } - __all__ = [ - "EnsembleFitResult", - "EnsembleModel", - "ExplainableEnsembleModel", "MLFlowStorageCallback", ] diff --git a/packages/openstef-models/src/openstef_models/models/forecasting/gblinear_forecaster.py b/packages/openstef-models/src/openstef_models/models/forecasting/gblinear_forecaster.py index de18234e7..b181b7bd5 100644 --- a/packages/openstef-models/src/openstef_models/models/forecasting/gblinear_forecaster.py +++ b/packages/openstef-models/src/openstef_models/models/forecasting/gblinear_forecaster.py @@ -23,6 +23,7 @@ from openstef_core.datasets.validated_datasets import ForecastDataset, ForecastInputDataset from openstef_core.exceptions import InputValidationError, MissingExtraError, NotFittedError from openstef_core.mixins.predictor import HyperParams +from openstef_core.utils.pandas import normalize_to_unit_sum from openstef_models.explainability.mixins import ContributionsMixin, ExplainableForecaster from openstef_models.models.forecasting.forecaster import Forecaster from openstef_models.utils.evaluation_functions import EvaluationFunctionType, get_evaluation_function @@ -344,7 +345,4 @@ def feature_importances(self) -> pd.DataFrame: weights_df.index.name = "feature_name" weights_df.columns.name = "quantiles" - weights_abs = weights_df.abs() - total = weights_abs.sum(axis=0).replace(to_replace=0, value=1.0) # pyright: ignore[reportUnknownMemberType] - - return weights_abs / total + return weights_df.pipe(normalize_to_unit_sum) diff --git a/packages/openstef-models/src/openstef_models/models/forecasting/lgbm_forecaster.py b/packages/openstef-models/src/openstef_models/models/forecasting/lgbm_forecaster.py index 4c5aea656..ffba97309 100644 --- a/packages/openstef-models/src/openstef_models/models/forecasting/lgbm_forecaster.py +++ b/packages/openstef-models/src/openstef_models/models/forecasting/lgbm_forecaster.py @@ -21,6 +21,7 @@ NotFittedError, ) from openstef_core.mixins import HyperParams +from openstef_core.utils.pandas import normalize_to_unit_sum from openstef_models.explainability.mixins import ContributionsMixin, ExplainableForecaster from openstef_models.models.forecasting.forecaster import Forecaster from openstef_models.utils.multi_quantile_regressor import MultiQuantileRegressor @@ -334,10 +335,7 @@ def feature_importances(self) -> pd.DataFrame: weights_df.index.name = "feature_name" weights_df.columns.name = "quantiles" - weights_abs = weights_df.abs() - total = weights_abs.sum(axis=0).replace(to_replace=0, value=1.0) # pyright: ignore[reportUnknownMemberType] - - return weights_abs / total + return weights_df.pipe(normalize_to_unit_sum) __all__ = ["LGBMForecaster", "LGBMHyperParams"] diff --git a/packages/openstef-models/src/openstef_models/models/forecasting/lgbmlinear_forecaster.py b/packages/openstef-models/src/openstef_models/models/forecasting/lgbmlinear_forecaster.py index 204cabd3f..a315e702b 100644 --- a/packages/openstef-models/src/openstef_models/models/forecasting/lgbmlinear_forecaster.py +++ b/packages/openstef-models/src/openstef_models/models/forecasting/lgbmlinear_forecaster.py @@ -20,6 +20,7 @@ NotFittedError, ) from openstef_core.mixins import HyperParams +from openstef_core.utils.pandas import normalize_to_unit_sum from openstef_models.explainability.mixins import ContributionsMixin, ExplainableForecaster from openstef_models.models.forecasting.forecaster import Forecaster from openstef_models.utils.multi_quantile_regressor import MultiQuantileRegressor @@ -336,10 +337,7 @@ def feature_importances(self) -> pd.DataFrame: weights_df.index.name = "feature_name" weights_df.columns.name = "quantiles" - weights_abs = weights_df.abs() - total = weights_abs.sum(axis=0).replace(to_replace=0, value=1.0) # pyright: ignore[reportUnknownMemberType] - - return weights_abs / total + return weights_df.pipe(normalize_to_unit_sum) __all__ = ["LGBMLinearForecaster", "LGBMLinearHyperParams"] diff --git a/packages/openstef-models/src/openstef_models/models/forecasting/xgboost_forecaster.py b/packages/openstef-models/src/openstef_models/models/forecasting/xgboost_forecaster.py index 40b43101f..87a956378 100644 --- a/packages/openstef-models/src/openstef_models/models/forecasting/xgboost_forecaster.py +++ b/packages/openstef-models/src/openstef_models/models/forecasting/xgboost_forecaster.py @@ -19,6 +19,7 @@ from openstef_core.datasets import ForecastDataset, ForecastInputDataset, TimeSeriesDataset from openstef_core.exceptions import MissingExtraError, NotFittedError from openstef_core.mixins import HyperParams +from openstef_core.utils.pandas import normalize_to_unit_sum from openstef_models.explainability.mixins import ContributionsMixin, ExplainableForecaster from openstef_models.models.forecasting.forecaster import Forecaster from openstef_models.utils.evaluation_functions import EvaluationFunctionType, get_evaluation_function @@ -417,10 +418,7 @@ def feature_importances(self) -> pd.DataFrame: weights_df.index.name = "feature_name" weights_df.columns.name = "quantiles" - weights_abs = weights_df.abs() - total = weights_abs.sum(axis=0).replace(to_replace=0, value=1.0) # pyright: ignore[reportUnknownMemberType] - - return weights_abs / total + return weights_df.pipe(normalize_to_unit_sum) __all__ = ["XGBoostForecaster", "XGBoostHyperParams"] diff --git a/packages/openstef-models/src/openstef_models/models/forecasting_model.py b/packages/openstef-models/src/openstef_models/models/forecasting_model.py index 7ca93fdb9..2aa922032 100644 --- a/packages/openstef-models/src/openstef_models/models/forecasting_model.py +++ b/packages/openstef-models/src/openstef_models/models/forecasting_model.py @@ -31,7 +31,7 @@ from openstef_core.exceptions import InsufficientlyCompleteError, NotFittedError from openstef_core.mixins import HyperParams, Predictor, TransformPipeline from openstef_core.types import LeadTime, Quantile -from openstef_models.explainability.mixins import ContributionsMixin +from openstef_models.explainability.mixins import ContributionsMixin, ExplainableForecaster from openstef_models.models.forecasting.forecaster import Forecaster from openstef_models.utils.data_split import DataSplitter @@ -62,14 +62,43 @@ class ModelFitResult(BaseModel): ) metrics_full: SubsetMetric = Field(description="Evaluation metrics computed on the full original dataset.") + def metrics_to_flat_dict(self) -> dict[str, float]: + """Flatten all split metrics into a single dict for logging. + + Keys are prefixed with ``full_``, ``train_``, ``val_``, ``test_`` respectively. + Subclasses with child results (e.g. per-forecaster) should override to include + them. + + Returns: + Flat mapping of metric names to values. + """ + result = self.metrics_full.to_flat_dict(prefix="full_") + result.update(self.metrics_train.to_flat_dict(prefix="train_")) + if self.metrics_val is not None: + result.update(self.metrics_val.to_flat_dict(prefix="val_")) + if self.metrics_test is not None: + result.update(self.metrics_test.to_flat_dict(prefix="test_")) + return result + + @property + def component_fit_results(self) -> dict[str, "ModelFitResult"]: + """Per-component fit results (e.g. per-forecaster in an ensemble). + + Returns: + Empty dict by default; ensemble subclasses override. + """ + return {} + class BaseForecastingModel(BaseModel, Predictor[TimeSeriesDataset, ForecastDataset]): """Abstract base for forecasting models (single-forecaster and ensemble). Provides the shared pipeline skeleton: preprocessing -> predict -> postprocessing, data preparation, scoring, and evaluation. Concrete subclasses must implement the - abstract hooks ``fit``, ``_predict``, ``is_fitted``, ``quantiles``, and - ``max_horizon``. + abstract hooks ``fit``, ``is_fitted``, ``quantiles``, and ``max_horizon``. + Subclasses following the single-input template method pattern should also override + ``_predict``; those with a different predict flow (e.g. ensemble) can override + ``predict()`` directly. Important: The ``cutoff_history`` parameter is crucial when using lag-based features in @@ -139,6 +168,26 @@ def hyperparams(self) -> HyperParams: """ return HyperParams() + @property + def component_hyperparams(self) -> dict[str, HyperParams]: + """Per-component hyperparameters (e.g. per-forecaster in an ensemble). + + Returns: + Empty dict by default; ensemble subclasses override. + """ + return {} + + def get_explainable_components(self) -> dict[str, ExplainableForecaster]: # noqa: PLR6301 + """Return named components that support feature-importance plotting. + + Keys are used as filename suffixes; an empty key means no suffix. + Override in subclasses to expose forecasters and/or combiners. + + Returns: + Empty dict by default. + """ + return {} + @abstractmethod @override def fit( @@ -158,13 +207,15 @@ def fit( Result containing training details and metrics. """ - @abstractmethod def _predict(self, input_data: ForecastInputDataset) -> ForecastDataset: """Generate raw predictions from preprocessed input data. - Subclasses implement the actual prediction logic (single-forecaster - delegation or ensemble aggregation). + Subclasses that follow the single-input template method pattern implement this. + Subclasses that require a different predict flow (e.g. ensemble) should override + ``predict()`` directly and may leave this unimplemented. """ + msg = f"{type(self).__name__} does not implement _predict; override predict() instead." + raise NotImplementedError(msg) def prepare_input( self, @@ -193,7 +244,7 @@ def prepare_input( if forecast_start is not None and forecast_start < input_data_cutoff: input_data_cutoff = forecast_start self._logger.warning( - "Forecast start %s is after input data start + cutoff history %s. Using forecast start as cutoff.", + "Forecast start %s is before input data start + cutoff history %s. Using forecast start as cutoff.", forecast_start, input_data_cutoff, ) @@ -365,6 +416,12 @@ def hyperparams(self) -> HyperParams: def is_fitted(self) -> bool: return self.forecaster.is_fitted + @override + def get_explainable_components(self) -> dict[str, ExplainableForecaster]: + if isinstance(self.forecaster, ExplainableForecaster): + return {"": self.forecaster} + return {} + @override def fit( self, diff --git a/packages/openstef-models/src/openstef_models/presets/forecasting_workflow.py b/packages/openstef-models/src/openstef_models/presets/forecasting_workflow.py index e7b7ad70b..483955400 100644 --- a/packages/openstef-models/src/openstef_models/presets/forecasting_workflow.py +++ b/packages/openstef-models/src/openstef_models/presets/forecasting_workflow.py @@ -106,6 +106,7 @@ class ForecastingWorkflowConfig(BaseConfig): # PredictionJob hyperparameters, location information, data columns, and feature engineering settings. """ + kind: Literal["single"] = Field(default="single", description="Discriminator tag for config type.") model_id: ModelIdentifier = Field(description="Unique identifier for the forecasting model.") run_name: str | None = Field( default=None, description="Optional name for this workflow run, can be used for versioning." diff --git a/packages/openstef-models/src/openstef_models/workflows/callbacks/__init__.py b/packages/openstef-models/src/openstef_models/workflows/callbacks/__init__.py new file mode 100644 index 000000000..57a25eb92 --- /dev/null +++ b/packages/openstef-models/src/openstef_models/workflows/callbacks/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +"""Workflow callbacks for data capture, persistence, and debugging.""" + +from openstef_models.workflows.callbacks.data_save import DataSaveCallback + +__all__ = ["DataSaveCallback"] diff --git a/packages/openstef-models/src/openstef_models/workflows/callbacks/data_save.py b/packages/openstef-models/src/openstef_models/workflows/callbacks/data_save.py new file mode 100644 index 000000000..50c9065b1 --- /dev/null +++ b/packages/openstef-models/src/openstef_models/workflows/callbacks/data_save.py @@ -0,0 +1,108 @@ +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +"""Data-saving callback for forecasting workflows. + +Saves intermediate datasets (training data, prepared inputs, forecasts, +contributions) to parquet files. Useful for debugging, backtesting analysis, +and inspecting model behaviour. +""" + +import logging +from pathlib import Path +from typing import Any, override + +import pandas as pd +from pydantic import Field + +from openstef_core.base_model import BaseConfig +from openstef_core.datasets import TimeSeriesDataset, VersionedTimeSeriesDataset +from openstef_core.datasets.validated_datasets import ForecastDataset +from openstef_models.mixins.callbacks import WorkflowContext +from openstef_models.models.forecasting_model import ModelFitResult +from openstef_models.workflows.custom_forecasting_workflow import ( + CustomForecastingWorkflow, + ForecastingCallback, +) + +_logger = logging.getLogger(__name__) + + +class DataSaveCallback(BaseConfig, ForecastingCallback): + """Saves intermediate datasets to parquet files during workflow execution. + + Toggle individual outputs via the boolean fields. All paths use + ``workflow.run_name`` as an identifier in the filename. + """ + + cache_dir: Path = Field(description="Directory to write parquet files to.") + save_training_data: bool = Field(default=True, description="Save raw training data on fit.") + save_prepared_data: bool = Field(default=True, description="Save preprocessed training data on fit.") + save_predict_data: bool = Field(default=True, description="Save prediction input data on predict.") + save_forecast: bool = Field(default=True, description="Save forecast output on predict.") + save_contributions: bool = Field(default=False, description="Save prediction contributions on predict.") + + @override + def model_post_init(self, context: Any) -> None: + self.cache_dir.mkdir(parents=True, exist_ok=True) + + @override + def on_fit_start( + self, + context: WorkflowContext[CustomForecastingWorkflow], + data: VersionedTimeSeriesDataset | TimeSeriesDataset, + ) -> None: + if self.save_prepared_data: + # Stash training data so on_fit_end can call prepare_input with it + context.data["_datasave_training_data"] = data + + if self.save_training_data: + run_name = context.workflow.run_name or "step" + data.to_parquet(path=self.cache_dir / f"debug_{run_name}_training.parquet") + + @override + def on_fit_end( + self, + context: WorkflowContext[CustomForecastingWorkflow], + result: ModelFitResult, + ) -> None: + if not self.save_prepared_data: + return + + training_data = context.data.pop("_datasave_training_data", None) + if not isinstance(training_data, TimeSeriesDataset): + return + + run_name = context.workflow.run_name or "step" + prepared = context.workflow.model.prepare_input(training_data) + prepared.to_parquet(path=self.cache_dir / f"debug_{run_name}_prepared_training.parquet") + + @override + def on_predict_end( + self, + context: WorkflowContext[CustomForecastingWorkflow], + data: VersionedTimeSeriesDataset | TimeSeriesDataset, + result: ForecastDataset, + ) -> None: + run_name = context.workflow.run_name or "step" + + if self.save_predict_data: + data.to_parquet(path=self.cache_dir / f"debug_{run_name}_predict.parquet") + + if self.save_forecast: + result.to_parquet(path=self.cache_dir / f"debug_{run_name}_forecast.parquet") + + if self.save_contributions and isinstance(data, TimeSeriesDataset): + try: + contributions = context.workflow.model.predict_contributions( + data, + forecast_start=result.forecast_start, + ) + except NotImplementedError: + return + df = pd.concat([contributions.data, result.data.drop(columns=["load"])], axis=1) + df.to_parquet(path=self.cache_dir / f"contrib_{run_name}_predict.parquet") + + +__all__ = ["DataSaveCallback"] diff --git a/packages/openstef-models/src/openstef_models/workflows/custom_forecasting_workflow.py b/packages/openstef-models/src/openstef_models/workflows/custom_forecasting_workflow.py index 82e7d6a14..ed56ffd10 100644 --- a/packages/openstef-models/src/openstef_models/workflows/custom_forecasting_workflow.py +++ b/packages/openstef-models/src/openstef_models/workflows/custom_forecasting_workflow.py @@ -11,6 +11,7 @@ import logging from datetime import datetime +from typing import Self from pydantic import Field, PrivateAttr @@ -129,6 +130,10 @@ class CustomForecastingWorkflow(BaseModel): _logger: logging.Logger = PrivateAttr(default_factory=lambda: logging.getLogger(__name__)) + def with_run_name(self, run_name: str) -> Self: + """Return a deep copy of this workflow with the given run name.""" + return self.model_copy(deep=True, update={"run_name": run_name}) + def fit( self, data: TimeSeriesDataset, diff --git a/pyproject.toml b/pyproject.toml index 4552636c6..15a43c8cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -331,6 +331,7 @@ sequence = [ { cmd = "uv add openstef-meta==${version} --optional meta" }, { cmd = "uv add openstef-models==${version} --optional models" }, { cmd = "uv add openstef-core==${version} --optional core" }, + { cmd = "uv add openstef-meta==${version} --optional models" }, { cmd = "uv add openstef-meta==${version} openstef-models==${version} openstef-beam==${version} openstef-core==${version} --optional all" }, { cmd = "uv add openstef-models==${version} openstef-core==${version}" }, ] diff --git a/uv.lock b/uv.lock index c6cf242b6..2fedd554b 100644 --- a/uv.lock +++ b/uv.lock @@ -2778,19 +2778,28 @@ dependencies = [ [package.optional-dependencies] all = [ + { name = "openstef-meta" }, + { name = "openstef-models" }, { name = "s3fs" }, ] +baselines = [ + { name = "openstef-meta" }, + { name = "openstef-models" }, +] [package.metadata] requires-dist = [ + { name = "openstef-beam", extras = ["baselines"], marker = "extra == 'all'", editable = "packages/openstef-beam" }, { name = "openstef-core", editable = "packages/openstef-core" }, + { name = "openstef-meta", marker = "extra == 'baselines'", editable = "packages/openstef-meta" }, + { name = "openstef-models", marker = "extra == 'baselines'", editable = "packages/openstef-models" }, { name = "plotly", specifier = ">=6.3" }, { name = "pyyaml", specifier = ">=6.0.2" }, { name = "s3fs", marker = "extra == 'all'", specifier = ">=2025.5.1" }, { name = "scoringrules", specifier = ">=0.8" }, { name = "tqdm", specifier = ">=4.67.1" }, ] -provides-extras = ["all"] +provides-extras = ["all", "baselines"] [[package]] name = "openstef-core"