diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d339d0def..5f93b7d7b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -10,7 +10,7 @@ ## New Features - +- `Resampler`: The resampler can now be configured to have the resampling window closed to the right (default) or left, and to also set the resampler timestamp to the right (default) or left end of the window being resampled. You can configure setting the new options `closed` and `label` in the `ResamplerConfig`. ## Bug Fixes diff --git a/src/frequenz/sdk/timeseries/__init__.py b/src/frequenz/sdk/timeseries/__init__.py index d5bde1602..288ea6b5a 100644 --- a/src/frequenz/sdk/timeseries/__init__.py +++ b/src/frequenz/sdk/timeseries/__init__.py @@ -49,6 +49,7 @@ ResamplerConfig2, ResamplingFunction, ResamplingFunction2, + WindowSide, ) from ._resampling._exceptions import ResamplingError, SourceStoppedError from ._resampling._wall_clock_timer import ( @@ -82,4 +83,5 @@ "TickInfo", "WallClockTimer", "WallClockTimerConfig", + "WindowSide", ] diff --git a/src/frequenz/sdk/timeseries/_resampling/_config.py b/src/frequenz/sdk/timeseries/_resampling/_config.py index 047892803..55de6854b 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_config.py +++ b/src/frequenz/sdk/timeseries/_resampling/_config.py @@ -10,6 +10,7 @@ from collections.abc import Sequence from dataclasses import dataclass, field from datetime import datetime, timedelta +from enum import Enum from typing import Protocol from frequenz.core.datetime import UNIX_EPOCH @@ -44,6 +45,16 @@ """ +class WindowSide(Enum): + """Represents a side of a resampling window.""" + + LEFT = "left" + """The left side of the resampling window.""" + + RIGHT = "right" + """The right side of the resampling window.""" + + class ResamplingFunction(Protocol): """Combine multiple samples into a new one. @@ -126,6 +137,29 @@ class ResamplerConfig: value. """ + closed: WindowSide = WindowSide.RIGHT + """Indicates which side of the resampling window is closed. + + If `WindowSide.RIGHT` (default), the resampling window is closed on the + right side and open on the left, meaning it includes samples with timestamps + within the range (start, end], where `start` and `end` are the boundaries of + the window. + + If `WindowSide.LEFT`, the resampling window is closed on the left side and + open on the right, meaning it includes samples with timestamps within the + range [start, end), where `start` and `end` are the boundaries of the + window. + """ + + label: WindowSide = WindowSide.RIGHT + """Indicates the timestamp label of the resampled data. + + If `WindowSide.RIGHT` (default), the timestamp of the resampled data + corresponds to the right boundary of the resampling window. If + `WindowSide.LEFT`, the timestamp corresponds to the left boundary of the + resampling window. + """ + initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT """The initial length of the resampling buffer. diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index 81471fc0d..db2631c2e 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -9,7 +9,7 @@ import itertools import logging import math -from bisect import bisect +from bisect import bisect, bisect_left from collections import deque from datetime import datetime, timedelta, timezone from typing import assert_never @@ -20,7 +20,7 @@ from ..._internal._asyncio import cancel_and_await from .._base_types import Sample from ._base_types import Sink, Source, SourceProperties -from ._config import ResamplerConfig, ResamplerConfig2 +from ._config import ResamplerConfig, ResamplerConfig2, WindowSide from ._exceptions import ResamplingError, SourceStoppedError from ._wall_clock_timer import TickInfo, WallClockTimer @@ -411,7 +411,8 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: """Generate a new sample based on all the current *relevant* samples. Args: - timestamp: The timestamp to be used to calculate the new sample. + timestamp: The reference timestamp for the resampling process. This + timestamp indicates the end of the resampling period. Returns: A new sample generated by calling the resampling function with all @@ -437,12 +438,22 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: ) minimum_relevant_timestamp = timestamp - period * conf.max_data_age_in_periods - min_index = bisect( + match conf.closed: + case WindowSide.LEFT: + bisect_func = bisect_left + case WindowSide.RIGHT: + bisect_func = bisect + case unexpected: + assert_never(unexpected) + + min_index = bisect_func( self._buffer, minimum_relevant_timestamp, key=lambda s: s[0], ) - max_index = bisect(self._buffer, timestamp, key=lambda s: s[0]) + + max_index = bisect_func(self._buffer, timestamp, key=lambda s: s[0]) + # Using itertools for slicing doesn't look very efficient, but # experiments with a custom (ring) buffer that can slice showed that # it is not that bad. See: @@ -458,6 +469,15 @@ def resample(self, timestamp: datetime) -> Sample[Quantity]: if relevant_samples else None ) + + match conf.label: + case WindowSide.LEFT: + timestamp -= conf.resampling_period + case WindowSide.RIGHT: + pass + case unexpected: + assert_never(unexpected) + return Sample(timestamp, None if value is None else Quantity(value)) def _log_no_relevant_samples( @@ -538,13 +558,14 @@ async def _receive_samples(self) -> None: # We need the noqa because pydoclint can't figure out that `recv_exception` is an # `Exception` instance. async def resample(self, timestamp: datetime) -> None: # noqa: DOC503 - """Calculate a new sample for the passed `timestamp` and send it. + """Calculate a new sample using buffered samples up to the given `timestamp` and send it. The helper is used to calculate the new sample and the sender is used to send it. Args: - timestamp: The timestamp to be used to calculate the new sample. + timestamp: The timestamp up to which all buffered samples are + considered for calculating the new sample. Raises: SourceStoppedError: If the source stopped sending samples. diff --git a/tests/timeseries/test_resampling.py b/tests/timeseries/test_resampling.py index 3629b5f4e..31f22638f 100644 --- a/tests/timeseries/test_resampling.py +++ b/tests/timeseries/test_resampling.py @@ -26,6 +26,7 @@ Sink, Source, SourceProperties, + WindowSide, ) from frequenz.sdk.timeseries._resampling._exceptions import ( ResamplingError, @@ -1504,6 +1505,177 @@ async def test_resampling_all_zeros( assert _get_buffer_len(resampler, source_receiver) == 3 +@pytest.mark.parametrize("closed", [WindowSide.RIGHT, WindowSide.LEFT]) +async def test_resampler_closed_option( + closed: WindowSide, + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `closed` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + closed=closed, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + source_props = resampler.get_source_properties(source_receiver) + + # Test timeline + # + # t(s) 0 1 2 2.5 3 4 + # |----------|----------R----|-----|----------R-----> (no more samples) + # value 5.0 10.0 15.0 1.0 4.0 5.0 + # + # R = resampling is done + + # Send a few samples and run a resample tick, advancing the fake time by one period + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + sample3 = Sample(timestamp + timedelta(seconds=2), value=Quantity(15.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + await source_sender.send(sample3) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + assert datetime.now(timezone.utc).timestamp() == 2 + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s), + Quantity(expected_resampled_value), + ) + ) + # Assert the behavior based on the `closed` option + if closed == WindowSide.RIGHT: + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample2), as_float_tuple(sample3)), + config, + source_props, + ) + elif closed == WindowSide.LEFT: + resampling_fun_mock.assert_called_once_with( + a_sequence(as_float_tuple(sample1), as_float_tuple(sample2)), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=3, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + sink_mock.reset_mock() + resampling_fun_mock.reset_mock() + + # Additional samples at 2.5, 3, and 4 seconds + sample4 = Sample(timestamp + timedelta(seconds=2.5), value=Quantity(1.0)) + sample5 = Sample(timestamp + timedelta(seconds=3), value=Quantity(4.0)) + sample6 = Sample(timestamp + timedelta(seconds=4), value=Quantity(5.0)) + await source_sender.send(sample4) + await source_sender.send(sample5) + await source_sender.send(sample6) + + # Advance time to 4 seconds and resample again + await _advance_time(fake_time, resampling_period_s * 2) + await resampler.resample(one_shot=True) + + sink_mock.assert_called_once_with( + Sample( + timestamp + timedelta(seconds=resampling_period_s * 2), + Quantity(expected_resampled_value), + ) + ) + if closed == WindowSide.RIGHT: + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample4), + as_float_tuple(sample5), + as_float_tuple(sample6), + ), + config, + source_props, + ) + elif closed == WindowSide.LEFT: + resampling_fun_mock.assert_called_once_with( + a_sequence( + as_float_tuple(sample3), + as_float_tuple(sample4), + as_float_tuple(sample5), + ), + config, + source_props, + ) + assert source_props == SourceProperties( + sampling_start=timestamp, received_samples=6, sampling_period=None + ) + assert _get_buffer_len(resampler, source_receiver) == config.initial_buffer_len + + +@pytest.mark.parametrize("label", [WindowSide.LEFT, WindowSide.RIGHT]) +async def test_resampler_label_option( + label: WindowSide, + fake_time: time_machine.Coordinates, + source_chan: Broadcast[Sample[Quantity]], +) -> None: + """Test the `label` option in ResamplerConfig.""" + timestamp = datetime.now(timezone.utc) + + resampling_period_s = 2 + expected_resampled_value = 42.0 + + resampling_fun_mock = MagicMock( + spec=ResamplingFunction, return_value=expected_resampled_value + ) + config = ResamplerConfig( + resampling_period=timedelta(seconds=resampling_period_s), + max_data_age_in_periods=1.0, + resampling_function=resampling_fun_mock, + label=label, + ) + resampler = Resampler(config) + + source_receiver = source_chan.new_receiver() + source_sender = source_chan.new_sender() + + sink_mock = AsyncMock(spec=Sink, return_value=True) + + resampler.add_timeseries("test", source_receiver, sink_mock) + + # Send samples and resample + sample1 = Sample(timestamp, value=Quantity(5.0)) + sample2 = Sample(timestamp + timedelta(seconds=1), value=Quantity(10.0)) + await source_sender.send(sample1) + await source_sender.send(sample2) + + await _advance_time(fake_time, resampling_period_s) + await resampler.resample(one_shot=True) + + # Assert the timestamp of the resampled sample + expected_timestamp = ( + timestamp + if label == WindowSide.LEFT + else timestamp + timedelta(seconds=resampling_period_s) + ) + sink_mock.assert_called_once_with( + Sample(expected_timestamp, Quantity(expected_resampled_value)) + ) + + def _get_buffer_len(resampler: Resampler, source_receiver: Source) -> int: # pylint: disable-next=protected-access blen = resampler._resamplers[source_receiver]._helper._buffer.maxlen