From c1003e47cade7a38e5b81db45a581e7dfc25edd0 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 12 Feb 2026 18:46:18 +0100 Subject: [PATCH 1/7] Add and expose `WindowSide` enum for resampling window configuration - Introduced the `WindowSide` enum with values `LEFT` and `RIGHT`: - `LEFT`: Represents the start of the resampling window. - `RIGHT`: Represents the end of the resampling window. - Exposed the `WindowSide` enum in the `__init__.py` of the timeseries module for public use. Signed-off-by: Malte Schaaf --- src/frequenz/sdk/timeseries/__init__.py | 2 ++ src/frequenz/sdk/timeseries/_resampling/_config.py | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/src/frequenz/sdk/timeseries/__init__.py b/src/frequenz/sdk/timeseries/__init__.py index d5bde1602..288ea6b5a 100644 --- a/src/frequenz/sdk/timeseries/__init__.py +++ b/src/frequenz/sdk/timeseries/__init__.py @@ -49,6 +49,7 @@ ResamplerConfig2, ResamplingFunction, ResamplingFunction2, + WindowSide, ) from ._resampling._exceptions import ResamplingError, SourceStoppedError from ._resampling._wall_clock_timer import ( @@ -82,4 +83,5 @@ "TickInfo", "WallClockTimer", "WallClockTimerConfig", + "WindowSide", ] diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 047892803..427573ab5 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -10,6 +10,7 @@ from collections.abc import Sequence from dataclasses import dataclass, field from datetime import datetime, timedelta +from enum import Enum from typing import Protocol from frequenz.core.datetime import UNIX_EPOCH @@ -44,6 +45,16 @@ """ +class WindowSide(Enum): + """Represents a side of a resampling window.""" + + LEFT = "left" + """The left side of the resampling window.""" + + RIGHT = "right" + """The right side of the resampling window.""" + + class ResamplingFunction(Protocol): """Combine multiple samples into a new one. From add952955d2b4bc7376454deda191671c579df0a Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 09:13:50 +0100 Subject: [PATCH 2/7] Add `closed` option to ResamplerConfig to define resampling window behavior - Introduced the `closed` parameter in ResamplerConfig with options `WindowSide.RIGHT` (default) and `WindowSide.LEFT`. - Updated the resampling logic to respect the `closed` configuration: - `RIGHT`: Includes samples at the end of the window, excludes those at the start. - `LEFT`: Includes samples at the start of the window, excludes those at the end. - Adjusted documentation to reflect the new `closed` parameter and its behavior. Signed-off-by: Malte Schaaf --- .../sdk/timeseries/_resampling/_config.py | 14 +++++++++++++ .../sdk/timeseries/_resampling/_resampler.py | 21 ++++++++++++++----- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 427573ab5..732aa5400 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -137,6 +137,20 @@ class ResamplerConfig: value. """ + closed: WindowSide = WindowSide.RIGHT + """Indicates which side of the resampling window is closed. + + If `WindowSide.RIGHT` (default), the resampling window is closed on the + right side and open on the left, meaning it includes samples with timestamps + within the range (start, end], where `start` and `end` are the boundaries of + the window. + + If `WindowSide.LEFT`, the resampling window is closed on the left side and + open on the right, meaning it includes samples with timestamps within the + range [start, end), where `start` and `end` are the boundaries of the + window. + """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index 81471fc0d..d02e5ed18 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -9,7 +9,7 @@ import itertools import logging import math -from bisect import bisect +from bisect import bisect, bisect_left from collections import deque from datetime import datetime, timedelta, timezone from typing import assert_never @@ -20,7 +20,7 @@ from ..._internal._asyncio import cancel_and_await from .._base_types import Sample from ._base_types import Sink, Source, SourceProperties -from ._config import ResamplerConfig, ResamplerConfig2 +from ._config import ResamplerConfig, ResamplerConfig2, WindowSide from ._exceptions import ResamplingError, SourceStoppedError from ._wall_clock_timer import TickInfo, WallClockTimer @@ -411,7 +411,8 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: """Generate a new sample based on all the current *relevant* samples. Args: - timestamp: The timestamp to be used to calculate the new sample. + timestamp: The reference timestamp for the resampling process. This + timestamp indicates the end of the resampling period. Returns: A new sample generated by calling the resampling function with all @@ -437,12 +438,22 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: ) minimum_relevant_timestamp = timestamp - period * conf.max_data_age_in_periods - min_index = bisect( + match conf.closed: + case WindowSide.LEFT: + bisect_func = bisect_left + case WindowSide.RIGHT: + bisect_func = bisect + case unexpected: + assert_never(unexpected) + + min_index = bisect_func( self._buffer, minimum_relevant_timestamp, key=lambda s: s[0], ) - max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) + + max_index = bisect_func(self._buffer, timestamp, key=lambda s: s[0]) + # Using itertools for slicing doesn't look very efficient, but # experiments with a custom (ring) buffer that can slice showed that # it is not that bad. See: From 37e7409f085e569e4277ba01b2bf4e0848c80e6d Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 09:22:07 +0100 Subject: [PATCH 3/7] Add `label` option to ResamplerConfig to define timestamp labeling of resampled data - Introduced the `label` parameter in ResamplerConfig with options `WindowSide.RIGHT` (default) and `WindowSide.LEFT`. - Updated the resampling logic to respect the `label` configuration: - `RIGHT`: The timestamp of the resampled data corresponds to the end of the resampling window. - `LEFT`: The timestamp of the resampled data corresponds to the start of the resampling window. - Adjusted the logic for setting `sample_time` to use the `label` configuration. - Updated documentation to reflect the new `label` parameter and its behavior. Signed-off-by: Malte Schaaf --- src/frequenz/sdk/timeseries/_resampling/_config.py | 9 +++++++++ src/frequenz/sdk/timeseries/_resampling/_resampler.py | 9 +++++++++ 2 files changed, 18 insertions(+) diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 732aa5400..55de6854b 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -151,6 +151,15 @@ class ResamplerConfig: window. """ + label: WindowSide = WindowSide.RIGHT + """Indicates the timestamp label of the resampled data. + + If `WindowSide.RIGHT` (default), the timestamp of the resampled data + corresponds to the right boundary of the resampling window. If + `WindowSide.LEFT`, the timestamp corresponds to the left boundary of the + resampling window. + """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index d02e5ed18..918771bdd 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -469,6 +469,15 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: if relevant_samples else None ) + + match conf.label: + case WindowSide.LEFT: + timestamp -= conf.resampling_period + case WindowSide.RIGHT: + pass + case unexpected: + assert_never(unexpected) + return Sample(timestamp, None if value is None else Quantity(value)) def _log_no_relevant_samples( From 7e1cb167f43cf199126c7d51d283efaf4f2773fa Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 10:59:16 +0100 Subject: [PATCH 4/7] Add test for `closed` option in ResamplerConfig with additional samples - Enhanced the `test_resampler_closed_option` to include additional samples at 2.5, 3, and 4 seconds. - Verified the behavior of the `closed` option (`WindowSide.RIGHT` and `WindowSide.LEFT`) with the extended timeline. - Added assertions to ensure correct resampling function calls and sink outputs for the new samples. - Confirmed that source properties and buffer length are updated correctly after processing the additional samples. Signed-off-by: Malte Schaaf --- tests/timeseries/test_resampling.py | 122 ++++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 3629b5f4e..80572d873 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -26,6 +26,7 @@ Sink, Source, SourceProperties, + WindowSide, ) from frequenz.sdk.timeseries._resampling._exceptions import ( ResamplingError, @@ -1504,6 +1505,127 @@ async def test_resampling_all_zeros( assert _get_buffer_len(resampler, source_receiver) == 3 +@pytest.mark.parametrize("closed", [WindowSide.RIGHT, WindowSide.LEFT]) +async def test_resampler_closed_option( + closed: WindowSide, + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `closed` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=closed, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + source_props = resampler.get_source_properties(source_receiver) + + # Test timeline + # + # t(s) 0 1 2 2.5 3 4 + # |----------|----------R----|-----|----------R-----> (no more samples) + # value 5.0 10.0 15.0 1.0 4.0 5.0 + # + # R = resampling is done + + # Send a few samples and run a resample tick, advancing the fake time by one period + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + sample3 = Sample(timestamp + timedelta(seconds=2), value=Quantity(15.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + await source_sender.send(sample3) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + assert datetime.now(timezone.utc).timestamp() == 2 + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(expected_resampled_value), + ) + ) + # Assert the behavior based on the `closed` option + if closed == WindowSide.RIGHT: + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample2), as_float_tuple(sample3)), + config, + source_props, + ) + elif closed == WindowSide.LEFT: + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample1), as_float_tuple(sample2)), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=3, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + sink_mock.reset_mock() + resampling_fun_mock.reset_mock() + + # Additional samples at 2.5, 3, and 4 seconds + sample4 = Sample(timestamp + timedelta(seconds=2.5), value=Quantity(1.0)) + sample5 = Sample(timestamp + timedelta(seconds=3), value=Quantity(4.0)) + sample6 = Sample(timestamp + timedelta(seconds=4), value=Quantity(5.0)) + await source_sender.send(sample4) + await source_sender.send(sample5) + await source_sender.send(sample6) + + # Advance time to 4 seconds and resample again + await _advance_time(fake_time, resampling_period_s * 2) + await resampler.resample(one_shot=True) + + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s * 2), + Quantity(expected_resampled_value), + ) + ) + if closed == WindowSide.RIGHT: + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample4), + as_float_tuple(sample5), + as_float_tuple(sample6), + ), + config, + source_props, + ) + elif closed == WindowSide.LEFT: + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample3), + as_float_tuple(sample4), + as_float_tuple(sample5), + ), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=6, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable-next=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen From 7e9e406314e8f9be4f4e4145062769fbaa73ad7f Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Tue, 3 Feb 2026 11:01:35 +0100 Subject: [PATCH 5/7] Add test for `label` option in ResamplerConfig to verify timestamp behavior - Introduced `test_resampler_label_option` to validate the `label` configuration in ResamplerConfig. - Tested both `WindowSide.LEFT` and `WindowSide.RIGHT` options to ensure the resampled datas timestamp corresponds to the start or end of the resampling window, respectively. - Verified sink outputs with the expected timestamp and resampled value. Signed-off-by: Malte Schaaf --- tests/timeseries/test_resampling.py | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 80572d873..31f22638f 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -1626,6 +1626,56 @@ async def test_resampler_closed_option( assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len +@pytest.mark.parametrize("label", [WindowSide.LEFT, WindowSide.RIGHT]) +async def test_resampler_label_option( + label: WindowSide, + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `label` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + label=label, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + # Send samples and resample + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + # Assert the timestamp of the resampled sample + expected_timestamp = ( + timestamp + if label == WindowSide.LEFT + else timestamp + timedelta(seconds=resampling_period_s) + ) + sink_mock.assert_called_once_with( + Sample(expected_timestamp, Quantity(expected_resampled_value)) + ) + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable-next=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen From c1ba871475172fe16667da2646998caabb580f7a Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Wed, 4 Mar 2026 09:05:37 +0100 Subject: [PATCH 6/7] Clarify `_StreamingHelper` `resample` method timestamp documentation Improve the docstring of the `_StreamingHelper`s `resample` method to better describe how the timestamp parameter is used in resampling calculations. The timestamp now clearly represents the upper bound for buffered samples considered in the resampling operation. Signed-off-by: Malte Schaaf --- src/frequenz/sdk/timeseries/_resampling/_resampler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index 918771bdd..db2631c2e 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -558,13 +558,14 @@ async def _receive_samples(self) -> None: # We need the noqa because pydoclint can't figure out that `recv_exception` is an # `Exception` instance. async def resample(self, timestamp: datetime) -> None: # noqa: DOC503 - """Calculate a new sample for the passed `timestamp` and send it. + """Calculate a new sample using buffered samples up to the given `timestamp` and send it. The helper is used to calculate the new sample and the sender is used to send it. Args: - timestamp: The timestamp to be used to calculate the new sample. + timestamp: The timestamp up to which all buffered samples are + considered for calculating the new sample. Raises: SourceStoppedError: If the source stopped sending samples. From 81a8840683a37e77de7207e33d36a83a4da51637 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Wed, 4 Mar 2026 09:08:44 +0100 Subject: [PATCH 7/7] Update release notes Signed-off-by: Malte Schaaf --- RELEASE_NOTES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d339d0def..5f93b7d7b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +- `Resampler`: The resampler can now be configured to have the resampling window closed to the right (default) or left, and to also set the resampler timestamp to the right (default) or left end of the window being resampled. You can configure setting the new options `closed` and `label` in the `ResamplerConfig`. ## Bug Fixes