From e2fe6e8881a39bd1930d3400c544b33c9efd89f5 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Fri, 13 Mar 2026 12:03:12 +0100 Subject: [PATCH 01/16] fix(STEF-2854): handle InsufficientlyCompleteError during backtest training OpenSTEF4BacktestForecaster.fit() now catches InsufficientlyCompleteError alongside FlatlinerDetectedError. When a training window has insufficient non-NaN data, the training event is skipped and the previous model is retained instead of crashing the entire target backtest. Signed-off-by: Egor Dmitriev --- .../benchmarking/baselines/openstef4.py | 10 +++++- .../benchmarking/baselines/test_openstef4.py | 34 +++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py index d6328e7f2..fc61088f9 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py @@ -30,7 +30,12 @@ ) from openstef_core.base_model import BaseModel from openstef_core.datasets import TimeSeriesDataset -from openstef_core.exceptions import FlatlinerDetectedError, MissingExtraError, NotFittedError +from openstef_core.exceptions import ( + FlatlinerDetectedError, + InsufficientlyCompleteError, + MissingExtraError, + NotFittedError, +) from openstef_core.types import Q from openstef_models.presets import ForecastingWorkflowConfig, create_forecasting_workflow from openstef_models.presets.forecasting_workflow import LocationConfig @@ -118,6 +123,9 @@ def fit(self, data: RestrictedHorizonVersionedTimeSeries) -> None: self._logger.warning("Flatliner detected during training") self._is_flatliner_detected = True return # Skip setting the workflow on flatliner detection + except InsufficientlyCompleteError: + self._logger.warning("Insufficient training data at %s, retaining previous model", data.horizon) + return # Retain previous model state; predictions will use the last successful fit self._workflow = workflow diff --git a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py index e9b66591a..705e86da8 100644 --- a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py +++ b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py @@ -4,6 +4,7 @@ from datetime import UTC, datetime, timedelta from pathlib import Path +from unittest.mock import patch import pytest @@ -11,6 +12,7 @@ from openstef_beam.benchmarking.baselines.openstef4 import create_openstef4_preset_backtest_forecaster from openstef_beam.benchmarking.benchmark_pipeline import BenchmarkContext, BenchmarkTarget from openstef_core.datasets import VersionedTimeSeriesDataset +from openstef_core.exceptions import InsufficientlyCompleteError from openstef_core.testing import create_synthetic_forecasting_dataset from openstef_core.types import LeadTime, Q from openstef_models.presets import ForecastingWorkflowConfig @@ -109,3 +111,35 @@ def test_fit_then_predict_returns_forecast( # Assert assert result is not None assert len(result.data) > 0 + + +def test_fit_retains_previous_model_on_insufficient_data( + xgboost_config: ForecastingWorkflowConfig, + benchmark_target: BenchmarkTarget, + training_data: VersionedTimeSeriesDataset, + tmp_path: Path, +): + """fit() should skip training and retain the previous model when data has all-NaN targets.""" + # Arrange + factory = create_openstef4_preset_backtest_forecaster( + workflow_config=xgboost_config, + cache_dir=tmp_path / "test_insufficient", + ) + forecaster = factory(BenchmarkContext(run_name="insufficient"), benchmark_target) + horizon = datetime(2024, 5, 25, tzinfo=UTC) + rhvts = RestrictedHorizonVersionedTimeSeries(dataset=training_data, horizon=horizon) + + # First fit succeeds — establishes a baseline model + forecaster.fit(rhvts) + assert forecaster._workflow is not None + previous_workflow = forecaster._workflow + + # Act — patch workflow.fit to raise InsufficientlyCompleteError + with patch( + "openstef_models.workflows.custom_forecasting_workflow.CustomForecastingWorkflow.fit", + side_effect=InsufficientlyCompleteError("No training data available after dropping NaN targets"), + ): + forecaster.fit(rhvts) + + # Assert — previous model is retained + assert forecaster._workflow is previous_workflow From 9f830e914dd722e8e0cbdeaea626d6313acf2085 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Fri, 13 Mar 2026 12:26:00 +0100 Subject: [PATCH 02/16] test(STEF-2854): replace mock with real NaN data in insufficient-data test Use all-NaN load data with model_reuse_enable=False to trigger InsufficientlyCompleteError naturally instead of patching workflow.fit. Signed-off-by: Egor Dmitriev --- .../benchmarking/baselines/test_openstef4.py | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py index 705e86da8..173ee6f37 100644 --- a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py +++ b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py @@ -4,15 +4,14 @@ from datetime import UTC, datetime, timedelta from pathlib import Path -from unittest.mock import patch +import numpy as np import pytest from openstef_beam.backtesting.restricted_horizon_timeseries import RestrictedHorizonVersionedTimeSeries from openstef_beam.benchmarking.baselines.openstef4 import create_openstef4_preset_backtest_forecaster from openstef_beam.benchmarking.benchmark_pipeline import BenchmarkContext, BenchmarkTarget from openstef_core.datasets import VersionedTimeSeriesDataset -from openstef_core.exceptions import InsufficientlyCompleteError from openstef_core.testing import create_synthetic_forecasting_dataset from openstef_core.types import LeadTime, Q from openstef_models.presets import ForecastingWorkflowConfig @@ -114,32 +113,47 @@ def test_fit_then_predict_returns_forecast( def test_fit_retains_previous_model_on_insufficient_data( - xgboost_config: ForecastingWorkflowConfig, benchmark_target: BenchmarkTarget, training_data: VersionedTimeSeriesDataset, tmp_path: Path, ): """fit() should skip training and retain the previous model when data has all-NaN targets.""" - # Arrange + # Arrange — disable model reuse to avoid caching side-effects + config = ForecastingWorkflowConfig( + model_id="test_insufficient", + model="xgboost", + horizons=[LeadTime.from_string("PT24H")], + quantiles=[Q(0.5)], + model_reuse_enable=False, + ) factory = create_openstef4_preset_backtest_forecaster( - workflow_config=xgboost_config, + workflow_config=config, cache_dir=tmp_path / "test_insufficient", ) forecaster = factory(BenchmarkContext(run_name="insufficient"), benchmark_target) - horizon = datetime(2024, 5, 25, tzinfo=UTC) - rhvts = RestrictedHorizonVersionedTimeSeries(dataset=training_data, horizon=horizon) # First fit succeeds — establishes a baseline model - forecaster.fit(rhvts) + horizon_good = datetime(2024, 5, 25, tzinfo=UTC) + rhvts_good = RestrictedHorizonVersionedTimeSeries(dataset=training_data, horizon=horizon_good) + forecaster.fit(rhvts_good) assert forecaster._workflow is not None previous_workflow = forecaster._workflow - # Act — patch workflow.fit to raise InsufficientlyCompleteError - with patch( - "openstef_models.workflows.custom_forecasting_workflow.CustomForecastingWorkflow.fit", - side_effect=InsufficientlyCompleteError("No training data available after dropping NaN targets"), - ): - forecaster.fit(rhvts) + # Build a dataset with all-NaN load to trigger InsufficientlyCompleteError naturally + nan_ts = create_synthetic_forecasting_dataset( + start=datetime(2024, 2, 1, tzinfo=UTC), + length=timedelta(days=120), + sample_interval=timedelta(minutes=15), + include_atmosphere=True, + include_price=True, + include_available_at=True, + ) + nan_ts.data["load"] = np.nan + nan_dataset = VersionedTimeSeriesDataset([nan_ts]) + rhvts_nan = RestrictedHorizonVersionedTimeSeries(dataset=nan_dataset, horizon=datetime(2024, 5, 26, tzinfo=UTC)) + + # Act — fit with data that has no valid targets + forecaster.fit(rhvts_nan) - # Assert — previous model is retained + # Assert — previous model is retained (fit returned early without updating _workflow) assert forecaster._workflow is previous_workflow From c3a9fda247f21f8d0f571f0175da9d05bb811e56 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Fri, 13 Mar 2026 12:31:32 +0100 Subject: [PATCH 03/16] fix(STEF-2854): return None from predict() when no model fitted When the first fit fails due to InsufficientlyCompleteError, _workflow stays None. predict() now returns None (like flatliner) instead of raising NotFittedError, letting the benchmark pipeline skip gracefully. Signed-off-by: Egor Dmitriev --- .../benchmarking/baselines/openstef4.py | 4 ++-- .../benchmarking/baselines/test_openstef4.py | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py index fc61088f9..0811613be 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/baselines/openstef4.py @@ -34,7 +34,6 @@ FlatlinerDetectedError, InsufficientlyCompleteError, MissingExtraError, - NotFittedError, ) from openstef_core.types import Q from openstef_models.presets import ForecastingWorkflowConfig, create_forecasting_workflow @@ -136,7 +135,8 @@ def predict(self, data: RestrictedHorizonVersionedTimeSeries) -> TimeSeriesDatas return None if self._workflow is None: - raise NotFittedError("Must call fit() before predict()") + self._logger.info("No fitted model available, skipping prediction") + return None # Extract the dataset including both historical context and forecast period predict_data = data.get_window( diff --git a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py index 173ee6f37..176e387d1 100644 --- a/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py +++ b/packages/openstef-beam/tests/unit/benchmarking/baselines/test_openstef4.py @@ -157,3 +157,26 @@ def test_fit_retains_previous_model_on_insufficient_data( # Assert — previous model is retained (fit returned early without updating _workflow) assert forecaster._workflow is previous_workflow + + +def test_predict_returns_none_when_never_fitted( + xgboost_config: ForecastingWorkflowConfig, + benchmark_target: BenchmarkTarget, + training_data: VersionedTimeSeriesDataset, + tmp_path: Path, +): + """predict() should return None when no model has been fitted yet.""" + # Arrange + factory = create_openstef4_preset_backtest_forecaster( + workflow_config=xgboost_config, + cache_dir=tmp_path / "test_no_fit", + ) + forecaster = factory(BenchmarkContext(run_name="no_fit"), benchmark_target) + horizon = datetime(2024, 5, 25, tzinfo=UTC) + rhvts = RestrictedHorizonVersionedTimeSeries(dataset=training_data, horizon=horizon) + + # Act + result = forecaster.predict(rhvts) + + # Assert + assert result is None From 1ab0eef9398cf9b58620aa204d329040761aff40 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Fri, 13 Mar 2026 15:55:35 +0100 Subject: [PATCH 04/16] fix(STEF-2854): make WindowedMetricVisualization robust to missing data Skip runs/targets with no windowed metrics instead of raising ValueError. Returns an HTML placeholder when all items in a visualization are empty. Signed-off-by: Egor Dmitriev --- .../analysis/analysis_pipeline.py | 5 +- .../windowed_metric_visualization.py | 54 ++++++++++++++----- .../test_windowed_metric_visualization.py | 11 ++-- 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py b/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py index 78b6954c3..2c5752c0d 100644 --- a/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py +++ b/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py @@ -103,10 +103,7 @@ def run_for_subsets( no providers support the requested aggregation level. """ return [ - provider.create( - reports=reports, - aggregation=aggregation, - ) + provider.create(reports=reports, aggregation=aggregation) for provider in self.config.visualization_providers if aggregation in provider.supported_aggregations ] diff --git a/packages/openstef-beam/src/openstef_beam/analysis/visualizations/windowed_metric_visualization.py b/packages/openstef-beam/src/openstef_beam/analysis/visualizations/windowed_metric_visualization.py index 249b8d5da..e59cd18ff 100644 --- a/packages/openstef-beam/src/openstef_beam/analysis/visualizations/windowed_metric_visualization.py +++ b/packages/openstef-beam/src/openstef_beam/analysis/visualizations/windowed_metric_visualization.py @@ -8,6 +8,7 @@ how performance metrics evolve across different time windows. """ +import logging import operator from collections import defaultdict from datetime import datetime @@ -23,6 +24,8 @@ from openstef_beam.evaluation import EvaluationSubsetReport, Window from openstef_core.types import Quantile +_logger = logging.getLogger(__name__) + class WindowedMetricVisualization(VisualizationProvider): """Creates time series plots showing metric evolution across evaluation windows. @@ -180,7 +183,8 @@ def create_by_none( time_value_pairs = self._extract_windowed_metric_values(report, metric_name, quantile_or_global) if not time_value_pairs: - raise ValueError("No windowed metrics found for the specified window and metric.") + _logger.warning("No windowed metrics for %s (%s) — skipping visualization.", metadata.name, self.name) + return self._empty_output(f"No windowed metrics available for {metadata.name}") # Unpack the sorted pairs timestamps = [pair[0] for pair in time_value_pairs] @@ -198,19 +202,23 @@ def create_by_none( return VisualizationOutput(name=self.name, figure=figure) + def _empty_output(self, message: str) -> VisualizationOutput: + return VisualizationOutput(name=self.name, html=f"

{message}

") + @override def create_by_run_and_none(self, reports: dict[RunName, list[ReportTuple]]) -> VisualizationOutput: metric_name, quantile_or_global = self._get_metric_info() plotter = WindowedMetricPlotter() + has_data = False # Collect data for each run for run_name, report_pairs in reports.items(): for _metadata, report in report_pairs: time_value_pairs = self._extract_windowed_metric_values(report, metric_name, quantile_or_global) - # Skip if no data points found for this run if not time_value_pairs: - raise ValueError("No windowed metrics found for the specified window, metric and run.") + _logger.warning("No windowed metrics for run '%s' (%s) — skipping.", run_name, self.name) + continue # Unpack the sorted pairs timestamps = [pair[0] for pair in time_value_pairs] @@ -221,6 +229,10 @@ def create_by_run_and_none(self, reports: dict[RunName, list[ReportTuple]]) -> V timestamps=timestamps, metric_values=metric_values, ) + has_data = True + + if not has_data: + return self._empty_output("No windowed metrics available for any run") title = self._create_plot_title(metric_name, quantile_or_global, "by Run") figure = plotter.plot(title=title) @@ -238,13 +250,14 @@ def create_by_target( # Get the run name from the first target metadata for the title run_name = reports[0][0].run_name if reports else "" + has_data = False # Process each target's report for metadata, report in reports: time_value_pairs = self._extract_windowed_metric_values(report, metric_name, quantile_or_global) - # Skip if no data points found for this target if not time_value_pairs: - raise ValueError("No windowed metrics found for the specified window, metric and target.") + _logger.warning("No windowed metrics for target '%s' (%s) — skipping.", metadata.name, self.name) + continue # Unpack the sorted pairs timestamps = [pair[0] for pair in time_value_pairs] @@ -256,6 +269,10 @@ def create_by_target( timestamps=timestamps, metric_values=metric_values, ) + has_data = True + + if not has_data: + return self._empty_output("No windowed metrics available for any target") title_suffix = "by Target" if run_name: @@ -274,11 +291,13 @@ def create_by_run_and_target( ) -> VisualizationOutput: metric_name, quantile_or_global = self._get_metric_info() plotter = WindowedMetricPlotter() + has_data = False # Process each run and calculate averaged metrics across its targets for run_name, target_reports in reports.items(): if not target_reports: - raise ValueError("No windowed metrics found for the specified window, metric and run.") + _logger.warning("No reports for run '%s' (%s) — skipping.", run_name, self.name) + continue # Average windowed metrics across all targets for this run averaged_pairs = self._average_time_series_across_targets( @@ -287,9 +306,9 @@ def create_by_run_and_target( quantile_or_global=quantile_or_global, ) - # Skip if no averaged data points found for this run if not averaged_pairs: - raise ValueError("No windowed averaged metrics found for the specified window, metric and run.") + _logger.warning("No windowed averaged metrics for run '%s' (%s) — skipping.", run_name, self.name) + continue # Unpack the averaged pairs timestamps = [pair[0] for pair in averaged_pairs] @@ -301,6 +320,10 @@ def create_by_run_and_target( timestamps=timestamps, metric_values=metric_values, ) + has_data = True + + if not has_data: + return self._empty_output("No windowed metrics available for any run") title = self._create_plot_title(metric_name, quantile_or_global, "by run (averaged over targets in group)") figure = plotter.plot(title=title, metric_name=metric_name) @@ -321,10 +344,12 @@ def create_by_run_and_group( for (run_name, _group_name), target_reports in reports.items(): run_to_targets.setdefault(run_name, []).extend(target_reports) + has_data = False # Average metrics over all targets for each run for run_name, all_target_reports in run_to_targets.items(): if not all_target_reports: - raise ValueError("No windowed metrics found for the specified window, metric and run.") + _logger.warning("No reports for run '%s' (%s) — skipping.", run_name, self.name) + continue # Average windowed metrics across all targets for this run averaged_pairs = self._average_time_series_across_targets( @@ -334,7 +359,8 @@ def create_by_run_and_group( ) if not averaged_pairs: - raise ValueError("No windowed averaged metrics found for the specified window, metric and run.") + _logger.warning("No windowed averaged metrics for run '%s' (%s) — skipping.", run_name, self.name) + continue timestamps = [pair[0] for pair in averaged_pairs] metric_values = [pair[1] for pair in averaged_pairs] @@ -345,6 +371,10 @@ def create_by_run_and_group( timestamps=timestamps, metric_values=metric_values, ) + has_data = True + + if not has_data: + return self._empty_output("No windowed metrics available for any run") title = self._create_plot_title(metric_name, quantile_or_global, "by run (averaged over all targets)") figure = plotter.plot(title=title, metric_name=metric_name) @@ -373,9 +403,7 @@ def create_by_group( ) if not averaged_pairs: - raise ValueError( - "No windowed averaged metrics found for the specified window, metric and run across all groups." - ) + return self._empty_output("No windowed metrics available across all groups") timestamps = [pair[0] for pair in averaged_pairs] metric_values = [pair[1] for pair in averaged_pairs] diff --git a/packages/openstef-beam/tests/unit/analysis/visualizations/test_windowed_metric_visualization.py b/packages/openstef-beam/tests/unit/analysis/visualizations/test_windowed_metric_visualization.py index 0d905fc67..c7cfde183 100644 --- a/packages/openstef-beam/tests/unit/analysis/visualizations/test_windowed_metric_visualization.py +++ b/packages/openstef-beam/tests/unit/analysis/visualizations/test_windowed_metric_visualization.py @@ -147,14 +147,17 @@ def test_create_by_none_creates_time_series_visualization( assert result.figure == mock_plotly_figure -def test_create_by_none_raises_error_when_no_windowed_data( +def test_create_by_none_returns_empty_output_when_no_windowed_data( empty_evaluation_report: EvaluationSubsetReport, simple_target_metadata: TargetMetadata, sample_window: Window ): - """Test that create_by_none raises appropriate error when no windowed metrics are found.""" + """Test that create_by_none returns an HTML placeholder when no windowed metrics are found.""" viz = WindowedMetricVisualization(name="test_viz", metric="mae", window=sample_window) - with pytest.raises(ValueError, match="No windowed metrics found for the specified window and metric"): - viz.create_by_none(empty_evaluation_report, simple_target_metadata) + result = viz.create_by_none(empty_evaluation_report, simple_target_metadata) + + assert result.figure is None + assert result.html is not None + assert "No windowed metrics" in result.html def test_create_by_target_adds_time_series_per_target( From ce6274e4f0ce52e73a1efdadbf986163de3da3c0 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 10:16:13 +0100 Subject: [PATCH 05/16] fix(STEF-2854): fix combiner label/weight shape mismatch and Quantile serialization Two fixes: 1. learned_weights_combiner.py: Filter labels to match combined_data index after inner join drops rows from additional_features. Fixes ValueError: operands could not be broadcast together. 2. types.py: Add Pydantic serializer to Quantile to suppress PydanticSerializationUnexpectedValue warnings. Signed-off-by: Egor Dmitriev Signed-off-by: Egor Dmitriev --- packages/openstef-core/src/openstef_core/types.py | 6 +++++- .../models/forecast_combiners/learned_weights_combiner.py | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/openstef-core/src/openstef_core/types.py b/packages/openstef-core/src/openstef_core/types.py index 1d536f9cb..2bd452bdd 100644 --- a/packages/openstef-core/src/openstef_core/types.py +++ b/packages/openstef-core/src/openstef_core/types.py @@ -338,7 +338,11 @@ def __get_pydantic_core_schema__(cls, source_type: Any, handler: GetCoreSchemaHa Returns: Core schema for Pydantic validation. """ - return core_schema.no_info_after_validator_function(cls, handler(float)) + return core_schema.no_info_after_validator_function( + cls, + handler(float), + serialization=core_schema.plain_serializer_function_ser_schema(float), + ) def format(self) -> str: """Instance method to format the quantile as a string. diff --git a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py index d5dda5189..2215593d2 100644 --- a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py +++ b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py @@ -219,7 +219,8 @@ def fit( additional_features=additional_features, ) input_data = combined_data.input_data() - labels = labels.loc[combined_data.data.index] + # Filter labels to match combined_data index (inner join may drop rows) + labels = labels.loc[input_data.index] self._validate_labels(labels=labels, quantile=q) encoded_labels = self._label_encoder.transform(labels) From be93fb0b2c858268ca6a45e4a6bb76da82721c3c Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 10:07:17 +0100 Subject: [PATCH 06/16] fix(STEF-2854): add Pydantic serializer to Quantile to suppress warnings Quantile.__get_pydantic_core_schema__ only defined a validator but no serializer. When Quantile values appear as dict keys in a union type (e.g., QuantileOrGlobal = Quantile | Literal['global']), Pydantic emits PydanticSerializationUnexpectedValue warnings during model_dump_json(). Add a plain_serializer_function_ser_schema(float) so Pydantic knows how to serialize Quantile as a float, preventing the warning. Signed-off-by: Egor Dmitriev Signed-off-by: Egor Dmitriev --- .../openstef-core/tests/unit/test_types.py | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/packages/openstef-core/tests/unit/test_types.py b/packages/openstef-core/tests/unit/test_types.py index 189c02733..1eb166929 100644 --- a/packages/openstef-core/tests/unit/test_types.py +++ b/packages/openstef-core/tests/unit/test_types.py @@ -2,13 +2,15 @@ # # SPDX-License-Identifier: MPL-2.0 +import warnings from datetime import UTC, datetime, time, timedelta, timezone import pandas as pd import pytest import pytz +from pydantic import BaseModel -from openstef_core.types import AvailableAt, LeadTime +from openstef_core.types import AvailableAt, LeadTime, Quantile, QuantileOrGlobal @pytest.mark.parametrize( @@ -342,3 +344,20 @@ def test_available_at_apply_index_matches_apply(): scalar = pd.DatetimeIndex([at.apply(ts.to_pydatetime()) for ts in index]) pd.testing.assert_index_equal(vectorized, scalar) + + +def test_quantile_serialization_no_warnings(): + """Quantile used as dict key in a QuantileOrGlobal union must not trigger pydantic serialization warnings.""" + + class Model(BaseModel): + metrics: dict[QuantileOrGlobal, dict[str, float]] + + m = Model(metrics={Quantile(0.05): {"mae": 1.0}, Quantile(0.5): {"mae": 2.0}, "global": {"mae": 1.5}}) + + with warnings.catch_warnings(): + warnings.filterwarnings("error", category=UserWarning, message="Pydantic serializer") + data = m.model_dump_json() + + assert '"0.05"' in data + assert '"0.5"' in data + assert '"global"' in data From a62459ae9b4f02b87f0792cee7bee67101dfbbf1 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 10:38:08 +0100 Subject: [PATCH 07/16] fix(STEF-2854): raise InsufficientlyCompleteError for empty datasets in train/test split chronological_train_test_split crashed with IndexError when the dataset had fewer than 2 unique timestamps. This happens during ensemble backtest when a base forecaster's preprocessed data is empty. Now raises InsufficientlyCompleteError which is caught by the backtest harness. Signed-off-by: Egor Dmitriev --- .../openstef-models/src/openstef_models/utils/data_split.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/openstef-models/src/openstef_models/utils/data_split.py b/packages/openstef-models/src/openstef_models/utils/data_split.py index f27ca83e5..d802017d3 100644 --- a/packages/openstef-models/src/openstef_models/utils/data_split.py +++ b/packages/openstef-models/src/openstef_models/utils/data_split.py @@ -22,6 +22,7 @@ from openstef_core.base_model import BaseConfig from openstef_core.datasets import TimeSeriesDataset +from openstef_core.exceptions import InsufficientlyCompleteError def split_by_dates[T: TimeSeriesDataset]( @@ -95,6 +96,9 @@ def chronological_train_test_split[T: TimeSeriesDataset]( index_unique = dataset.index.unique() n_total = len(index_unique) + if n_total < 2: + msg = f"Dataset has {n_total} unique timestamps, need at least 2 to split into train/test." + raise InsufficientlyCompleteError(msg) n_test = int(n_total * test_fraction) n_test = min(n_test, n_total - 1) # Ensure at least one for train if possible if n_total > 1 and n_test == 0: From 625abfe8276f1854e9d6a65f29f1a8e2a1e6e7ce Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 11:06:43 +0100 Subject: [PATCH 08/16] fix(STEF-2854): raise InsufficientlyCompleteError on empty combiner data after inner join Signed-off-by: Egor Dmitriev --- .../models/forecast_combiners/learned_weights_combiner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py index 2215593d2..1a991f4bc 100644 --- a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py +++ b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py @@ -24,7 +24,7 @@ from openstef_core.datasets import ForecastDataset, ForecastInputDataset, TimeSeriesDataset from openstef_core.datasets.validated_datasets import ENSEMBLE_COLUMN_SEP, EnsembleForecastDataset -from openstef_core.exceptions import MissingExtraError, NotFittedError +from openstef_core.exceptions import InsufficientlyCompleteError, MissingExtraError, NotFittedError from openstef_core.mixins.predictor import HyperParams from openstef_core.types import Quantile from openstef_meta.models.forecast_combiners.forecast_combiner import ( @@ -277,6 +277,9 @@ def _prepare_input_data( if additional_features is not None: df_a = additional_features.input_data(start=dataset.index[0]) df = pd.concat([df, df_a], axis=1, join="inner") + if df.empty: + msg = "No overlapping timestamps between base predictions and additional features after inner join." + raise InsufficientlyCompleteError(msg) return df def _predict_quantile( From 4d00735ad596dc4627341fde671488727788865b Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 11:52:01 +0100 Subject: [PATCH 09/16] feat(STEF-2854): add strict parameter to BenchmarkComparisonPipeline.run() Signed-off-by: Egor Dmitriev --- .../benchmarking/benchmark_comparison_pipeline.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py index 2aaac8f5c..34d8999ac 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py @@ -120,6 +120,7 @@ def run( self, run_data: dict[RunName, BenchmarkStorage], filter_args: F | None = None, + strict: bool = True, ): """Execute comparison analysis across multiple benchmark runs. @@ -132,6 +133,8 @@ def run( Each storage backend should contain evaluation results for the run. filter_args: Optional criteria for filtering targets. Only targets matching these criteria will be included in the comparison. + strict: If True, raise an error when evaluation is missing for a target. + If False, skip missing targets. """ targets = self.target_provider.get_targets(filter_args) @@ -142,7 +145,7 @@ def run( targets=targets, storage=run_storage, run_name=run_name, - strict=True, + strict=strict, ) reports.extend(run_reports) From 7f1a06d6e0202db6af4aafd623499867c3fd5992 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 17:27:09 +0100 Subject: [PATCH 10/16] fix(STEF-2854): use 'global' subdirectory for RUN_AND_GROUP analysis scope The RUN_AND_GROUP scope was saving directly to the base analysis dir, making has_analysis_output fail to locate it and colliding with group-level outputs. Store in a 'global' subdirectory to match the group-level pattern (group_name/global). Signed-off-by: Egor Dmitriev --- .../src/openstef_beam/benchmarking/storage/local_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/storage/local_storage.py b/packages/openstef-beam/src/openstef_beam/benchmarking/storage/local_storage.py index 892cd09ec..9c0f9fb83 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/storage/local_storage.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/storage/local_storage.py @@ -149,7 +149,7 @@ def get_analysis_path(self, scope: AnalysisScope) -> Path: elif scope.aggregation == AnalysisAggregation.RUN_AND_NONE: output_dir = base_dir / str(scope.group_name) / str(scope.target_name) elif scope.aggregation == AnalysisAggregation.RUN_AND_GROUP: - output_dir = base_dir + output_dir = base_dir / "global" elif scope.aggregation == AnalysisAggregation.RUN_AND_TARGET: output_dir = base_dir / str(scope.group_name) / "global" else: From d2f64f4f9e4a01409c5f7e4b6c1fbe6be7981a74 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 17:30:41 +0100 Subject: [PATCH 11/16] fix(STEF-2854): renormalize ensemble weights when base model predictions are NaN When a base model cannot predict certain timestamps (e.g. gblinear limited to 2-day weather horizon while lgbm predicts 7 days), the combiner must redistribute the missing model's weight proportionally to the remaining models. Previously, pandas sum(axis=1, skipna=True) silently dropped the NaN model's weight contribution, causing predictions to be systematically scaled down by ~35% for timestamps beyond the weather horizon. Now weights are reindexed to match predictions, zeroed where predictions are NaN, and the weighted sum is divided by the available weight total. When all models are NaN, the result is 0 (matching prior behavior). Includes regression test with seeded data verifying no NaN propagation and no systematic downscaling. Signed-off-by: Egor Dmitriev Signed-off-by: Egor Dmitriev --- .../learned_weights_combiner.py | 18 +++++-- .../test_learned_weights_combiner.py | 52 +++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py index 1a991f4bc..ecae3deec 100644 --- a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py +++ b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py @@ -293,10 +293,22 @@ def _predict_quantile( if self.hard_selection: # Convert soft probabilities to hard selection: max weight → 1.0, ties distributed equally - weights = (weights == weights.max(axis=1).to_frame().to_numpy()) / weights.sum(axis=1).to_frame().to_numpy() + weights = pd.DataFrame( + (weights.to_numpy() == weights.max(axis=1).to_numpy()[:, None]) # type: ignore[reportUnknownMemberType] + / weights.sum(axis=1).to_numpy()[:, None], # type: ignore[reportUnknownMemberType] + index=weights.index, + columns=weights.columns, + ) - # Weighted average: multiply each forecaster's prediction by its weight and sum - return dataset.input_data().mul(weights).sum(axis=1) + # Weighted average: renormalize weights so NaN base-model predictions don't shrink the sum. + # When a base model has no prediction (NaN), its weight is redistributed proportionally + # to the remaining models. Reindex weights to predictions so that rows without + # additional_features (dropped by _prepare_input_data's inner join) get zero weight. + predictions = dataset.input_data() + weights = weights.reindex(predictions.index, fill_value=0) + available_weight = weights.where(predictions.notna(), 0).sum(axis=1) + weighted_sum = predictions.fillna(0).mul(weights).sum(axis=1) # type: ignore[reportUnknownMemberType] + return weighted_sum / available_weight.replace(0, 1) # type: ignore[reportUnknownMemberType] @override def predict( diff --git a/packages/openstef-meta/tests/unit/models/forecast_combiners/test_learned_weights_combiner.py b/packages/openstef-meta/tests/unit/models/forecast_combiners/test_learned_weights_combiner.py index 7791e8478..db2620ca1 100644 --- a/packages/openstef-meta/tests/unit/models/forecast_combiners/test_learned_weights_combiner.py +++ b/packages/openstef-meta/tests/unit/models/forecast_combiners/test_learned_weights_combiner.py @@ -114,3 +114,55 @@ def test_quantile_weights_combiner__fit_with_additional_features_shorter_index( # Assert assert combiner.is_fitted + + +def test_predict_renormalizes_weights_when_base_model_predictions_are_nan() -> None: + """Predict should renormalize weights when a base model has NaN predictions. + + Regression test: when one base model cannot predict certain timestamps (e.g. + gblinear limited to 2-day horizon while lgbm predicts 7 days), the combiner + must redistribute the missing model's weight to the remaining models. Without + renormalization, sum(axis=1, skipna=True) drops the NaN contribution, causing + predictions to be systematically scaled down. + """ + rng = np.random.default_rng(42) + index = pd.date_range("2023-01-01", periods=100, freq="15min") + + # Two forecasters: lgbm has all values, gblinear is NaN for the last 50 rows + lgbm_vals = rng.normal(1000, 100, 100) + gblinear_vals = rng.normal(1000, 100, 100).copy() + gblinear_vals[50:] = np.nan + + data = pd.DataFrame( + { + "LGBMForecaster__quantile_P10": lgbm_vals * 0.8, + "LGBMForecaster__quantile_P50": lgbm_vals, + "LGBMForecaster__quantile_P90": lgbm_vals * 1.2, + "GBLinearForecaster__quantile_P10": gblinear_vals * 0.8, + "GBLinearForecaster__quantile_P50": gblinear_vals, + "GBLinearForecaster__quantile_P90": gblinear_vals * 1.2, + "load": rng.normal(1000, 100, 100), + }, + index=index, + ) + dataset = EnsembleForecastDataset(data=data, sample_interval=timedelta(minutes=15)) + + combiner = WeightsCombiner( + hyperparams=LGBMCombinerHyperParams(n_leaves=5, n_estimators=10), + quantiles=[Q(0.1), Q(0.5), Q(0.9)], + horizons=[LeadTime(timedelta(days=1))], + ) + combiner.fit(dataset) + + # Act + result = combiner.predict(dataset) + + # Assert — rows where gblinear is NaN should still produce valid (non-NaN) predictions + nan_rows = result.data[["quantile_P10", "quantile_P50", "quantile_P90"]].iloc[50:] + assert not nan_rows.isna().any().any(), ( + "Predictions should not be NaN when at least one base model has valid predictions" + ) + # And the predictions should be in the ballpark of the lgbm values (not scaled down) + assert nan_rows["quantile_P50"].mean() > 500, ( + "Predictions in the NaN region should not be systematically scaled down" + ) From 368cb8523675f68e201cdd1d8e62b98928929866 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 20:45:20 +0100 Subject: [PATCH 12/16] refactor(STEF-2854): extract nan_aware_weighted_mean helper Extract NaN-aware weight renormalization into a reusable helper in openstef_core.utils.pandas and use it in learned_weights_combiner. Removes type: ignore comments from _predict_quantile. Signed-off-by: Egor Dmitriev --- .../src/openstef_core/utils/pandas.py | 20 +++++++ .../tests/unit/utils/test_pandas.py | 56 +++++++++++++++++++ .../learned_weights_combiner.py | 21 +++---- 3 files changed, 83 insertions(+), 14 deletions(-) create mode 100644 packages/openstef-core/tests/unit/utils/test_pandas.py diff --git a/packages/openstef-core/src/openstef_core/utils/pandas.py b/packages/openstef-core/src/openstef_core/utils/pandas.py index 9466c353e..8b96f0283 100644 --- a/packages/openstef-core/src/openstef_core/utils/pandas.py +++ b/packages/openstef-core/src/openstef_core/utils/pandas.py @@ -83,3 +83,23 @@ def combine_timeseries_indexes(indexes: Sequence[pd.DatetimeIndex]) -> pd.Dateti ) index_raw = functools.reduce(union_fn, indexes) return index_raw.unique().sort_values(ascending=True) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType] + + +def nan_aware_weighted_mean(values: pd.DataFrame, weights: pd.DataFrame) -> "pd.Series[float]": + """Weighted mean that redistributes NaN values' weights proportionally. + + For each row, weights corresponding to NaN values are zeroed out and the + remaining weights are used to compute the weighted sum, normalized by their + total. If all values in a row are NaN, the result is 0. + + Args: + values: DataFrame of values (may contain NaN). + weights: DataFrame of non-negative weights, aligned with values. + + Returns: + Series with the weighted mean for each row. + """ + valid_weights = weights.where(values.notna(), 0) + available = cast("pd.Series[float]", valid_weights.sum(axis=1).replace(0, 1)) + weighted_sum = cast("pd.Series[float]", values.fillna(0).mul(valid_weights).sum(axis=1)) + return weighted_sum / available diff --git a/packages/openstef-core/tests/unit/utils/test_pandas.py b/packages/openstef-core/tests/unit/utils/test_pandas.py new file mode 100644 index 000000000..cf056d9cc --- /dev/null +++ b/packages/openstef-core/tests/unit/utils/test_pandas.py @@ -0,0 +1,56 @@ +# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project +# +# SPDX-License-Identifier: MPL-2.0 + +"""Tests for pandas utilities.""" + +import numpy as np +import pandas as pd +import pytest + +from openstef_core.utils.pandas import nan_aware_weighted_mean + + +@pytest.mark.parametrize( + ("values", "weights", "expected"), + [ + pytest.param( + {"a": [1.0, 2.0], "b": [3.0, 4.0]}, + {"a": [0.6, 0.5], "b": [0.4, 0.5]}, + [1.8, 3.0], + id="no_nans", + ), + pytest.param( + {"a": [1.0, np.nan], "b": [3.0, 4.0]}, + {"a": [0.6, 0.6], "b": [0.4, 0.4]}, + [1.8, 4.0], + id="nan_redistributes_weight", + ), + pytest.param( + {"a": [np.nan], "b": [np.nan]}, + {"a": [0.5], "b": [0.5]}, + [0.0], + id="all_nan_returns_zero", + ), + pytest.param( + {"a": [5.0, np.nan, 3.0]}, + {"a": [1.0, 1.0, 1.0]}, + [5.0, 0.0, 3.0], + id="single_column", + ), + ], +) +def test_nan_aware_weighted_mean( + values: dict[str, list[float]], + weights: dict[str, list[float]], + expected: list[float], +) -> None: + # Arrange + values_df = pd.DataFrame(values) + weights_df = pd.DataFrame(weights) + + # Act + result = nan_aware_weighted_mean(values_df, weights_df) + + # Assert + pd.testing.assert_series_equal(result, pd.Series(expected), check_names=False) diff --git a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py index ecae3deec..f8587ea7b 100644 --- a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py +++ b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py @@ -27,6 +27,7 @@ from openstef_core.exceptions import InsufficientlyCompleteError, MissingExtraError, NotFittedError from openstef_core.mixins.predictor import HyperParams from openstef_core.types import Quantile +from openstef_core.utils.pandas import nan_aware_weighted_mean from openstef_meta.models.forecast_combiners.forecast_combiner import ( ForecastCombiner, ) @@ -293,22 +294,14 @@ def _predict_quantile( if self.hard_selection: # Convert soft probabilities to hard selection: max weight → 1.0, ties distributed equally - weights = pd.DataFrame( - (weights.to_numpy() == weights.max(axis=1).to_numpy()[:, None]) # type: ignore[reportUnknownMemberType] - / weights.sum(axis=1).to_numpy()[:, None], # type: ignore[reportUnknownMemberType] - index=weights.index, - columns=weights.columns, - ) + is_max = weights.eq(weights.max(axis=1), axis=0) + weights = cast("pd.DataFrame", is_max.div(weights.sum(axis=1), axis=0)) - # Weighted average: renormalize weights so NaN base-model predictions don't shrink the sum. - # When a base model has no prediction (NaN), its weight is redistributed proportionally - # to the remaining models. Reindex weights to predictions so that rows without - # additional_features (dropped by _prepare_input_data's inner join) get zero weight. + # Reindex weights to predictions so that rows without additional_features + # (dropped by _prepare_input_data's inner join) get zero weight. predictions = dataset.input_data() - weights = weights.reindex(predictions.index, fill_value=0) - available_weight = weights.where(predictions.notna(), 0).sum(axis=1) - weighted_sum = predictions.fillna(0).mul(weights).sum(axis=1) # type: ignore[reportUnknownMemberType] - return weighted_sum / available_weight.replace(0, 1) # type: ignore[reportUnknownMemberType] + weights = weights.reindex(predictions.index, fill_value=0.0) + return nan_aware_weighted_mean(predictions, weights) @override def predict( From aacbddecc2d5e9f7d68c05a8385c933705c130bc Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Wed, 18 Mar 2026 21:12:07 +0100 Subject: [PATCH 13/16] feat(STEF-2854): add skip_analysis param to BenchmarkPipeline.run() Allows skipping per-target and global analysis steps when running benchmarks. Useful when analysis will be run separately later via the comparison pipeline. Signed-off-by: Egor Dmitriev --- .../benchmarking/benchmark_pipeline.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_pipeline.py b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_pipeline.py index f4220e0fa..ea4058563 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_pipeline.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_pipeline.py @@ -157,6 +157,8 @@ def run( run_name: str = "default", filter_args: F | None = None, n_processes: int | None = None, + *, + skip_analysis: bool = False, ) -> None: """Runs the benchmark for all targets, optionally filtered and in parallel. @@ -174,6 +176,8 @@ def run( matching these criteria will be processed. n_processes: Number of processes to use for parallel execution. If None or 1, targets are processed sequentially. + skip_analysis: When True, skips per-target and global analysis steps. + Useful when analysis will be run separately later. """ context = BenchmarkContext(run_name=run_name) @@ -184,13 +188,13 @@ def run( _logger.info("Running benchmark in parallel with %d processes", n_processes) run_parallel( - process_fn=partial(self._run_for_target, context, forecaster_factory), + process_fn=partial(self._run_for_target, context, forecaster_factory, skip_analysis=skip_analysis), items=targets, n_processes=n_processes, mode="loky", ) - if not self.storage.has_analysis_output( + if not skip_analysis and not self.storage.has_analysis_output( AnalysisScope( aggregation=AnalysisAggregation.GROUP, run_name=context.run_name, @@ -203,7 +207,14 @@ def run( self.callback_manager.on_benchmark_complete(runner=self, targets=cast(list[BenchmarkTarget], targets)) - def _run_for_target(self, context: BenchmarkContext, model_factory: ForecasterFactory[T], target: T) -> None: + def _run_for_target( + self, + context: BenchmarkContext, + model_factory: ForecasterFactory[T], + target: T, + *, + skip_analysis: bool = False, + ) -> None: """Run benchmark for a single target.""" if not self.callback_manager.on_target_start(runner=self, target=target): _logger.info("Skipping target") @@ -221,7 +232,7 @@ def _run_for_target(self, context: BenchmarkContext, model_factory: ForecasterFa predictions = self.storage.load_backtest_output(target) self.run_evaluation_for_target(target=target, predictions=predictions, quantiles=forecaster.quantiles) - if not self.storage.has_analysis_output( + if not skip_analysis and not self.storage.has_analysis_output( scope=AnalysisScope( aggregation=AnalysisAggregation.TARGET, target_name=target.name, From 9777e0e874fdcb26c123b70376971f2bb9408e34 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Thu, 19 Mar 2026 10:06:38 +0100 Subject: [PATCH 14/16] feat(STEF-2854): add filterings override to AnalysisConfig Signed-off-by: Egor Dmitriev --- .../src/openstef_beam/analysis/analysis_pipeline.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py b/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py index 2c5752c0d..b12785523 100644 --- a/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py +++ b/packages/openstef-beam/src/openstef_beam/analysis/analysis_pipeline.py @@ -35,6 +35,11 @@ class AnalysisConfig(BaseConfig): visualization_providers: list[VisualizationProvider] = Field( default=[], description="List of visualization providers to use for generating analysis outputs" ) + filterings: list[Filtering] | None = Field( + default=None, + description="When set, only include these filterings (e.g. LeadTime, AvailableAt) in analysis. " + "None means use all filterings found in the evaluation data.", + ) class AnalysisPipeline: @@ -61,8 +66,8 @@ def __init__( super().__init__() self.config = config - @staticmethod def _group_by_filtering( + self, reports: Sequence[tuple[TargetMetadata, EvaluationReport]], ) -> dict[Filtering, list[ReportTuple]]: """Group reports by their lead time filtering conditions. @@ -71,13 +76,17 @@ def _group_by_filtering( 1-hour ahead vs 24-hour ahead forecasts), enabling comparison of model performance across different forecasting horizons. + When ``config.filterings`` is set, only subsets matching those filterings are included. + Returns: Dictionary mapping lead time filtering conditions to lists of report tuples. """ + allowed = set(self.config.filterings) if self.config.filterings is not None else None return groupby( (subset.filtering, (base_metadata.with_filtering(subset.filtering), subset)) for base_metadata, report in reports for subset in report.subset_reports + if allowed is None or subset.filtering in allowed ) def run_for_subsets( From 96e70159d2a662c2587064a7957f0fa629c6005f Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Thu, 19 Mar 2026 10:29:51 +0100 Subject: [PATCH 15/16] fix(STEF-2854): resolve ruff lint warnings Signed-off-by: Egor Dmitriev --- .../benchmarking/benchmark_comparison_pipeline.py | 1 + .../openstef-models/src/openstef_models/utils/data_split.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py index 34d8999ac..cc9e435b7 100644 --- a/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py +++ b/packages/openstef-beam/src/openstef_beam/benchmarking/benchmark_comparison_pipeline.py @@ -120,6 +120,7 @@ def run( self, run_data: dict[RunName, BenchmarkStorage], filter_args: F | None = None, + *, strict: bool = True, ): """Execute comparison analysis across multiple benchmark runs. diff --git a/packages/openstef-models/src/openstef_models/utils/data_split.py b/packages/openstef-models/src/openstef_models/utils/data_split.py index d802017d3..3506a9ea0 100644 --- a/packages/openstef-models/src/openstef_models/utils/data_split.py +++ b/packages/openstef-models/src/openstef_models/utils/data_split.py @@ -85,6 +85,7 @@ def chronological_train_test_split[T: TimeSeriesDataset]( Raises: ValueError: If test_fraction is not between 0 and 1. + InsufficientlyCompleteError: If dataset has fewer than 2 unique timestamps. """ if not 0.0 <= test_fraction <= 1.0: raise ValueError("test_fraction must be between 0 and 1.") @@ -96,8 +97,9 @@ def chronological_train_test_split[T: TimeSeriesDataset]( index_unique = dataset.index.unique() n_total = len(index_unique) - if n_total < 2: - msg = f"Dataset has {n_total} unique timestamps, need at least 2 to split into train/test." + min_timestamps = 2 + if n_total < min_timestamps: + msg = f"Dataset has {n_total} unique timestamps, need at least {min_timestamps} to split into train/test." raise InsufficientlyCompleteError(msg) n_test = int(n_total * test_fraction) n_test = min(n_test, n_total - 1) # Ensure at least one for train if possible From 6618b4d738f4fe89466c76c08793836b88360cb2 Mon Sep 17 00:00:00 2001 From: Egor Dmitriev Date: Thu, 19 Mar 2026 10:35:32 +0100 Subject: [PATCH 16/16] fix(STEF-2854): resolve pyright type errors in modified files Signed-off-by: Egor Dmitriev --- packages/openstef-core/src/openstef_core/utils/pandas.py | 4 ++-- .../models/forecast_combiners/learned_weights_combiner.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/openstef-core/src/openstef_core/utils/pandas.py b/packages/openstef-core/src/openstef_core/utils/pandas.py index 8b96f0283..d77bc1903 100644 --- a/packages/openstef-core/src/openstef_core/utils/pandas.py +++ b/packages/openstef-core/src/openstef_core/utils/pandas.py @@ -100,6 +100,6 @@ def nan_aware_weighted_mean(values: pd.DataFrame, weights: pd.DataFrame) -> "pd. Series with the weighted mean for each row. """ valid_weights = weights.where(values.notna(), 0) - available = cast("pd.Series[float]", valid_weights.sum(axis=1).replace(0, 1)) - weighted_sum = cast("pd.Series[float]", values.fillna(0).mul(valid_weights).sum(axis=1)) + available = cast("pd.Series[float]", valid_weights.sum(axis=1).replace(0, 1)) # pyright: ignore[reportUnknownMemberType] + weighted_sum = cast("pd.Series[float]", values.fillna(0).mul(valid_weights).sum(axis=1)) # pyright: ignore[reportUnknownMemberType] return weighted_sum / available diff --git a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py index f8587ea7b..b91ab87ec 100644 --- a/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py +++ b/packages/openstef-meta/src/openstef_meta/models/forecast_combiners/learned_weights_combiner.py @@ -294,8 +294,8 @@ def _predict_quantile( if self.hard_selection: # Convert soft probabilities to hard selection: max weight → 1.0, ties distributed equally - is_max = weights.eq(weights.max(axis=1), axis=0) - weights = cast("pd.DataFrame", is_max.div(weights.sum(axis=1), axis=0)) + is_max: pd.DataFrame = weights.eq(weights.max(axis=1), axis=0) # pyright: ignore[reportUnknownMemberType] + weights = is_max.div(weights.sum(axis=1), axis=0) # Reindex weights to predictions so that rows without additional_features # (dropped by _prepare_input_data's inner join) get zero weight.