From 8d387636cf75a4b643fd218f66ca818cead1545a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:44:05 +0100 Subject: [PATCH 01/11] Retrieve data stream via oneshot channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- pyproject.toml | 2 +- .../_component_metric_request.py | 7 ++ .../_data_sourcing/microgrid_api_source.py | 38 ++++--- tests/microgrid/test_data_sourcing.py | 101 +++++------------- 4 files changed, 60 insertions(+), 88 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c9292f64d..51aa601f9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ dependencies = [ "frequenz-client-microgrid >= 0.18.1, < 0.19.0", "frequenz-microgrid-component-graph >= 0.3.4, < 0.4", "frequenz-client-common >= 0.3.6, < 0.4.0", - "frequenz-channels >= 1.6.1, < 2.0.0", + "frequenz-channels @ git+https://github.com/shsms/frequenz-channels-python.git@oneshot", "frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0", "numpy >= 2.1.0, < 3", "typing_extensions >= 4.14.1, < 5", diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py b/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py index 2fb8f5aa3..cb2b18313 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/_component_metric_request.py @@ -6,13 +6,17 @@ from dataclasses import dataclass from datetime import datetime +from frequenz.channels import Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric +from frequenz.quantities import Quantity from frequenz.sdk.microgrid._old_component_data import TransitionalMetric __all__ = ["ComponentMetricRequest", "Metric"] +from frequenz.sdk.timeseries import Sample + @dataclass class ComponentMetricRequest: @@ -51,6 +55,9 @@ class ComponentMetricRequest: If None, only live data is streamed. """ + telem_stream_sender: Sender[Receiver[Sample[Quantity]]] + """Sender of a oneshot channel used to send the data stream back to the requester.""" + def get_channel_name(self) -> str: """Construct the channel name based on the request parameters. diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index d43da8e58..03faad41e 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -8,7 +8,7 @@ from collections.abc import Callable from typing import Any -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ComponentCategory from frequenz.client.microgrid.metrics import Metric @@ -182,6 +182,9 @@ def __init__( ComponentId, dict[Metric | TransitionalMetric, list[ComponentMetricRequest]] ] = {} + self._channel_lookup: dict[str, Broadcast[Sample[Quantity]]] = {} + """ Channel cache for reuse (map channel name to channel).""" + async def _get_component_category( self, comp_id: ComponentId ) -> ComponentCategory | int | None: @@ -397,7 +400,7 @@ def _get_data_extraction_method( _logger.error(err) raise ValueError(err) - def _get_metric_senders( + async def _get_metric_senders( self, category: ComponentCategory | int, requests: dict[Metric | TransitionalMetric, list[ComponentMetricRequest]], @@ -413,18 +416,23 @@ def _get_metric_senders( A dictionary of output metric names to channel senders from the channel registry. """ - return [ - ( - self._get_data_extraction_method(category, metric), - [ - self._registry.get_or_create( - Sample[Quantity], request.get_channel_name() - ).new_sender() - for request in req_list - ], - ) - for (metric, req_list) in requests.items() - ] + all_senders = [] + for metric, req_list in requests.items(): + extraction_method = self._get_data_extraction_method(category, metric) + senders = [] + for request in req_list: + channel_name = request.get_channel_name() + # Create missing channels and inform the requesting side via oneshot + if channel_name not in self._channel_lookup: + telem_stream: Broadcast[Sample[Quantity]] = Broadcast( + name=channel_name + ) + self._channel_lookup[channel_name] = telem_stream + await request.telem_stream_sender.send(telem_stream.new_receiver()) + senders.append(self._channel_lookup[channel_name].new_sender()) + all_senders.append((extraction_method, senders)) + + return all_senders async def _handle_data_stream( self, @@ -446,7 +454,7 @@ async def _handle_data_stream( await self._check_requested_component_and_metrics( comp_id, category, self._req_streaming_metrics[comp_id] ) - stream_senders = self._get_metric_senders( + stream_senders = await self._get_metric_senders( category, self._req_streaming_metrics[comp_id] ) api_data_receiver: Receiver[Any] = self.comp_data_receivers[comp_id] diff --git a/tests/microgrid/test_data_sourcing.py b/tests/microgrid/test_data_sourcing.py index bcc4e5857..6ec3f8e0c 100644 --- a/tests/microgrid/test_data_sourcing.py +++ b/tests/microgrid/test_data_sourcing.py @@ -11,7 +11,7 @@ import pytest import pytest_mock -from frequenz.channels import Broadcast +from frequenz.channels import Broadcast, Receiver, make_oneshot from frequenz.client.common.microgrid import MicrogridId from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import ( @@ -24,7 +24,6 @@ from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ( ComponentMetricRequest, DataSourcingActor, @@ -84,88 +83,46 @@ def mock_connection_manager(mocker: pytest_mock.MockFixture) -> mock.Mock: return mock_conn_manager +@pytest.mark.parametrize( + ("component_id", "metric", "expected_sample_value"), + [ + (ComponentId(4), Metric.AC_ACTIVE_POWER, 100.0), + (ComponentId(4), Metric.AC_REACTIVE_POWER, 100.0), + (ComponentId(6), Metric.AC_ACTIVE_POWER, 0.0), + (ComponentId(9), Metric.BATTERY_SOC_PCT, 9.0), + (ComponentId(9), Metric.BATTERY_SOC_PCT, 9.0), + (ComponentId(12), Metric.AC_ACTIVE_POWER, -13.0), + ], +) async def test_data_sourcing_actor( # pylint: disable=too-many-locals mock_connection_manager: mock.Mock, # pylint: disable=redefined-outer-name,unused-argument + component_id: ComponentId, + metric: Metric, + expected_sample_value: float, ) -> None: """Tests for the DataSourcingActor.""" req_chan = Broadcast[ComponentMetricRequest](name="data_sourcing_requests") req_sender = req_chan.new_sender() - registry = ChannelRegistry(name="test-registry") - - async with DataSourcingActor(req_chan.new_receiver(), registry): - active_power_request_4 = ComponentMetricRequest( - "test-namespace", ComponentId(4), Metric.AC_ACTIVE_POWER, None - ) - active_power_recv_4 = registry.get_or_create( - Sample[Quantity], active_power_request_4.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_4) - - reactive_power_request_4 = ComponentMetricRequest( - "test-namespace", ComponentId(4), Metric.AC_REACTIVE_POWER, None - ) - reactive_power_recv_4 = registry.get_or_create( - Sample[Quantity], reactive_power_request_4.get_channel_name() - ).new_receiver() - await req_sender.send(reactive_power_request_4) - - active_power_request_6 = ComponentMetricRequest( - "test-namespace", ComponentId(6), Metric.AC_ACTIVE_POWER, None - ) - active_power_recv_6 = registry.get_or_create( - Sample[Quantity], active_power_request_6.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_6) - - soc_request_9 = ComponentMetricRequest( - "test-namespace", ComponentId(9), Metric.BATTERY_SOC_PCT, None - ) - soc_recv_9 = registry.get_or_create( - Sample[Quantity], soc_request_9.get_channel_name() - ).new_receiver() - await req_sender.send(soc_request_9) - - soc2_request_9 = ComponentMetricRequest( - "test-namespace", ComponentId(9), Metric.BATTERY_SOC_PCT, None + async with DataSourcingActor(req_chan.new_receiver()): + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] ) - soc2_recv_9 = registry.get_or_create( - Sample[Quantity], soc2_request_9.get_channel_name() - ).new_receiver() - await req_sender.send(soc2_request_9) - active_power_request_12 = ComponentMetricRequest( - "test-namespace", ComponentId(12), Metric.AC_ACTIVE_POWER, None + component_metric_request = ComponentMetricRequest( + "test-namespace", + component_id, + metric, + None, + telem_stream_sender, ) - active_power_recv_12 = registry.get_or_create( - Sample[Quantity], active_power_request_12.get_channel_name() - ).new_receiver() - await req_sender.send(active_power_request_12) - - for i in range(3): - sample = await active_power_recv_4.receive() - assert sample.value is not None - assert 100.0 + i == sample.value.base_value - - sample = await reactive_power_recv_4.receive() - assert sample.value is not None - assert 100.0 + i == sample.value.base_value - - sample = await active_power_recv_6.receive() - assert sample.value is not None - assert 0.0 + i == sample.value.base_value - - sample = await soc_recv_9.receive() - assert sample.value is not None - assert 9.0 + i == sample.value.base_value - - sample = await soc2_recv_9.receive() - assert sample.value is not None - assert 9.0 + i == sample.value.base_value + await req_sender.send(component_metric_request) + telem_stream = await telem_stream_receiver.receive() - sample = await active_power_recv_12.receive() + for i in range(10): + sample = await telem_stream.receive() assert sample.value is not None - assert -13.0 + i == sample.value.base_value + assert expected_sample_value + i == sample.value.base_value def _new_meter_data( From 833af45185c0959c9b8a796efc8081c84733acfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:45:45 +0100 Subject: [PATCH 02/11] Use oneshot channel setup in resampler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- src/frequenz/sdk/microgrid/_resampling.py | 53 ++++--- .../formulas/_resampled_stream_fetcher.py | 15 +- tests/actor/test_resampling.py | 134 ++++++++++++++---- tests/timeseries/mock_resampler.py | 65 ++++----- 4 files changed, 174 insertions(+), 93 deletions(-) diff --git a/src/frequenz/sdk/microgrid/_resampling.py b/src/frequenz/sdk/microgrid/_resampling.py index b5c554513..0e330fe9f 100644 --- a/src/frequenz/sdk/microgrid/_resampling.py +++ b/src/frequenz/sdk/microgrid/_resampling.py @@ -5,10 +5,9 @@ import asyncio -import dataclasses import logging -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot from frequenz.quantities import Quantity from .._internal._asyncio import cancel_and_await @@ -58,7 +57,7 @@ def __init__( # pylint: disable=too-many-arguments resampling_request_receiver ) self._resampler: Resampler = Resampler(config) - self._active_req_channels: set[str] = set() + self._upstream_channels: dict[str, Broadcast[Sample[Quantity]]] = {} async def _subscribe(self, request: ComponentMetricRequest) -> None: """Request data for a component metric. @@ -68,28 +67,38 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None: """ request_channel_name = request.get_channel_name() - # If we are already handling this request, there is nothing to do. - if request_channel_name in self._active_req_channels: + # If we are already handling this request, answer the request by sending a + # new receiver from the existing channel. + if upstream_channel := self._upstream_channels.get(request_channel_name): + await request.telem_stream_sender.send(upstream_channel.new_receiver()) return - self._active_req_channels.add(request_channel_name) - - data_source_request = dataclasses.replace( - request, namespace=request.namespace + ":Source" + # Derive a new ComponentMetricRequest from the given one to + # receive the telemetry stream from the data sourcing actor. + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) + own_request = ComponentMetricRequest( + namespace=request.namespace + ":Source", + component_id=request.component_id, + metric=request.metric, + start_time=request.start_time, + telem_stream_sender=telem_stream_sender, + ) + await self._data_sourcing_request_sender.send(own_request) + telem_stream = await telem_stream_receiver.receive() + + # Create a new channel based on the original ComponentMetricRequest + # to act as our data sink. + upstream_channel = Broadcast(name=request_channel_name, resend_latest=True) + await request.telem_stream_sender.send(upstream_channel.new_receiver()) + self._upstream_channels[request_channel_name] = upstream_channel + + self._resampler.add_timeseries( + name=request_channel_name, + source=telem_stream, + sink=upstream_channel.new_sender().send, ) - data_source_channel_name = data_source_request.get_channel_name() - await self._data_sourcing_request_sender.send(data_source_request) - receiver = self._channel_registry.get_or_create( - Sample[Quantity], data_source_channel_name - ).new_receiver() - - # This is a temporary hack until the Sender implementation uses - # exceptions to report errors. - sender = self._channel_registry.get_or_create( - Sample[Quantity], request_channel_name - ).new_sender() - - self._resampler.add_timeseries(request_channel_name, receiver, sender.send) async def _process_resampling_requests(self) -> None: """Process resampling data requests.""" diff --git a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py index 7ad45ec53..0f2165d4c 100644 --- a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py +++ b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py @@ -3,7 +3,7 @@ """Fetches telemetry streams for components.""" -from frequenz.channels import Receiver, Sender +from frequenz.channels import Receiver, Sender, make_oneshot from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Quantity @@ -54,18 +54,17 @@ async def fetch_stream( Returns: A receiver to stream resampled data for the given component id. """ + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) + request = ComponentMetricRequest( self._namespace, component_id, self._metric, None, + telem_stream_sender, ) - chan = self._channel_registry.get_or_create( - Sample[Quantity], request.get_channel_name() - ) - chan.resend_latest = True - await self._resampler_subscription_sender.send(request) - - return chan.new_receiver() + return await telem_stream_receiver.receive() diff --git a/tests/actor/test_resampling.py b/tests/actor/test_resampling.py index 9b15339de..fc7a6b6f5 100644 --- a/tests/actor/test_resampling.py +++ b/tests/actor/test_resampling.py @@ -3,18 +3,16 @@ """Frequenz Python SDK resampling example.""" import asyncio -import dataclasses from datetime import datetime, timedelta, timezone import async_solipsism import pytest import time_machine -from frequenz.channels import Broadcast +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ComponentMetricRequest from frequenz.sdk.microgrid._resampling import ComponentMetricsResamplingActor from frequenz.sdk.timeseries import ResamplerConfig2, Sample @@ -31,19 +29,10 @@ def _now() -> datetime: async def _assert_resampling_works( - channel_registry: ChannelRegistry, + timeseries_sender: Sender[Sample[Quantity]], + timeseries_receiver: Receiver[Sample[Quantity]], fake_time: time_machine.Coordinates, - *, - resampling_chan_name: str, - data_source_chan_name: str, ) -> None: - timeseries_receiver = channel_registry.get_or_create( - Sample[Quantity], resampling_chan_name - ).new_receiver() - timeseries_sender = channel_registry.get_or_create( - Sample[Quantity], data_source_chan_name - ).new_sender() - fake_time.shift(0.2) new_sample = await timeseries_receiver.receive() # At 0.2s (timer) assert new_sample == Sample(_now(), None) @@ -103,14 +92,12 @@ async def test_single_request( fake_time: time_machine.Coordinates, ) -> None: """Run main functions that initializes and creates everything.""" - channel_registry = ChannelRegistry(name="test") data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") data_source_req_recv = data_source_req_chan.new_receiver() resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") resampling_req_sender = resampling_req_chan.new_sender() async with ComponentMetricsResamplingActor( - channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), config=ResamplerConfig2( @@ -118,25 +105,35 @@ async def test_single_request( max_data_age_in_periods=2, ), ) as resampling_actor: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), metric=Metric.BATTERY_SOC_PCT, start_time=None, + telem_stream_sender=telem_stream_sender, ) await resampling_req_sender.send(subs_req) data_source_req = await data_source_req_recv.receive() assert data_source_req is not None - assert data_source_req == dataclasses.replace( - subs_req, namespace="Resampling:Source" - ) + + assert data_source_req.namespace == "Resampling:Source" + assert data_source_req.component_id == ComponentId(9) + assert data_source_req.metric == Metric.BATTERY_SOC_PCT + assert data_source_req.start_time is None + assert data_source_req.telem_stream_sender != telem_stream_sender + + # Create the telemetry stream on behalf of nonexisting data sourcing actor + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) await _assert_resampling_works( - channel_registry, - fake_time, - resampling_chan_name=subs_req.get_channel_name(), - data_source_chan_name=data_source_req.get_channel_name(), + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=await telem_stream_receiver.receive(), + fake_time=fake_time, ) await resampling_actor._resampler.stop() # pylint: disable=protected-access @@ -146,14 +143,12 @@ async def test_duplicate_request( fake_time: time_machine.Coordinates, ) -> None: """Run main functions that initializes and creates everything.""" - channel_registry = ChannelRegistry(name="test") data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") data_source_req_recv = data_source_req_chan.new_receiver() resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") resampling_req_sender = resampling_req_chan.new_sender() async with ComponentMetricsResamplingActor( - channel_registry=channel_registry, data_sourcing_request_sender=data_source_req_chan.new_sender(), resampling_request_receiver=resampling_req_chan.new_receiver(), config=ResamplerConfig2( @@ -161,11 +156,15 @@ async def test_duplicate_request( max_data_age_in_periods=2, ), ) as resampling_actor: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) subs_req = ComponentMetricRequest( namespace="Resampling", component_id=ComponentId(9), metric=Metric.BATTERY_SOC_PCT, start_time=None, + telem_stream_sender=telem_stream_sender, ) await resampling_req_sender.send(subs_req) @@ -176,11 +175,88 @@ async def test_duplicate_request( with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(data_source_req_recv.receive(), timeout=0.1) + # Create the telemetry stream on behalf of nonexisting data sourcing actor + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) + + await _assert_resampling_works( + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=await telem_stream_receiver.receive(), + fake_time=fake_time, + ) + + await resampling_actor._resampler.stop() # pylint: disable=protected-access + + +async def test_resubscribe(fake_time: time_machine.Coordinates) -> None: + """Test that resampling works when e receiver resubscribes. + + For example, Coalesce may close its receivers and resubscribe to the + same component later on. ComponentMetricsResamplingActor must provide + a new receiver in that case. + """ + data_source_req_chan = Broadcast[ComponentMetricRequest](name="data-source-req") + data_source_req_recv = data_source_req_chan.new_receiver() + resampling_req_chan = Broadcast[ComponentMetricRequest](name="resample-req") + resampling_req_sender = resampling_req_chan.new_sender() + + async with ComponentMetricsResamplingActor( + data_sourcing_request_sender=data_source_req_chan.new_sender(), + resampling_request_receiver=resampling_req_chan.new_receiver(), + config=ResamplerConfig2( + resampling_period=timedelta(seconds=0.2), + max_data_age_in_periods=2, + ), + ) as resampling_actor: + + async def send_metric_request() -> Receiver[Receiver[Sample[Quantity]]]: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) + subs_req = ComponentMetricRequest( + namespace="Resampling", + component_id=ComponentId(9), + metric=Metric.BATTERY_SOC_PCT, + start_time=None, + telem_stream_sender=telem_stream_sender, + ) + await resampling_req_sender.send(subs_req) + return telem_stream_receiver + + telem_stream_receiver = await send_metric_request() + + # Create the telemetry stream on behalf of nonexisting data sourcing actor + data_source_req = await data_source_req_recv.receive() + telem_stream: Broadcast[Sample[Quantity]] = Broadcast(name="Telemetry stream") + await data_source_req.telem_stream_sender.send(telem_stream.new_receiver()) + + resampled_stream_receiver = await telem_stream_receiver.receive() + + await _assert_resampling_works( + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=resampled_stream_receiver, + fake_time=fake_time, + ) + + resampled_stream_receiver.close() + + # Resubscribe to the same metric data + telem_stream_receiver = await send_metric_request() + resampled_stream_receiver = await telem_stream_receiver.receive() + + # No need to answer the request in the data sourcing actor - The resampler answers + + # New subscriptions receive the latest resampled value immediately. + # This must be drained for _assert_resampling_works to have a clean start. + resent_sample = await resampled_stream_receiver.receive() + assert resent_sample is not None + assert resent_sample.value is None + assert resent_sample.timestamp == _now() + await _assert_resampling_works( - channel_registry, - fake_time, - resampling_chan_name=subs_req.get_channel_name(), - data_source_chan_name=data_source_req.get_channel_name(), + timeseries_sender=telem_stream.new_sender(), + timeseries_receiver=resampled_stream_receiver, + fake_time=fake_time, ) await resampling_actor._resampler.stop() # pylint: disable=protected-access diff --git a/tests/timeseries/mock_resampler.py b/tests/timeseries/mock_resampler.py index 8ae6b0920..b3e221683 100644 --- a/tests/timeseries/mock_resampler.py +++ b/tests/timeseries/mock_resampler.py @@ -41,13 +41,18 @@ def __init__( # pylint: disable=too-many-arguments,too-many-positional-argument ) -> None: """Create a `MockDataPipeline` instance.""" self._data_pipeline = _DataPipeline(resampler_config) - - self._channel_registry = self._data_pipeline._channel_registry + self._channel_lookup: dict[str, Broadcast[Sample[Quantity]]] = {} self._resampler_request_channel = Broadcast[ComponentMetricRequest]( - name="resampler-request" + name="resampler-request", + resend_latest=True, ) self._input_channels_receivers: dict[str, list[Receiver[Sample[Quantity]]]] = {} + def get_or_create_channel(name: str) -> Broadcast[Sample[Quantity]]: + if name not in self._channel_lookup: + self._channel_lookup[name] = Broadcast[Sample[Quantity]](name=name) + return self._channel_lookup[name] + def metric_senders( comp_ids: list[ComponentId], metric_id: Metric, @@ -55,15 +60,9 @@ def metric_senders( senders: list[Sender[Sample[Quantity]]] = [] for comp_id in comp_ids: name = f"{comp_id}:{metric_id}" - senders.append( - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_sender() - ) + senders.append(get_or_create_channel(name).new_sender()) self._input_channels_receivers[name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_receiver() + get_or_create_channel(name).new_receiver() for _ in range(namespaces) ] return senders @@ -115,33 +114,21 @@ def multi_phase_senders( senders.append( [ - self._channel_registry.get_or_create( - Sample[Quantity], p1_name - ).new_sender(), - self._channel_registry.get_or_create( - Sample[Quantity], p2_name - ).new_sender(), - self._channel_registry.get_or_create( - Sample[Quantity], p3_name - ).new_sender(), + get_or_create_channel(p1_name).new_sender(), + get_or_create_channel(p2_name).new_sender(), + get_or_create_channel(p3_name).new_sender(), ] ) self._input_channels_receivers[p1_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p1_name - ).new_receiver() + get_or_create_channel(p1_name).new_receiver() for _ in range(namespaces) ] self._input_channels_receivers[p2_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p2_name - ).new_receiver() + get_or_create_channel(p2_name).new_receiver() for _ in range(namespaces) ] self._input_channels_receivers[p3_name] = [ - self._channel_registry.get_or_create( - Sample[Quantity], p3_name - ).new_receiver() + get_or_create_channel(p3_name).new_receiver() for _ in range(namespaces) ] return senders @@ -237,16 +224,26 @@ async def _channel_forward_messages( async def _handle_resampling_requests(self) -> None: async for request in self._resampler_request_channel.new_receiver(): name = request.get_channel_name() + if name in self._forward_tasks: + # Forward task exists, but we must create a new receiver + # from the existing channel and return it to the request sender. + assert name in self._channel_lookup + output_channel = self._channel_lookup[name] + await request.telem_stream_sender.send(output_channel.new_receiver()) continue + input_chan_recv_name = f"{request.component_id}:{request.metric}" input_chan_recv = self._input_channels_receivers[input_chan_recv_name].pop() assert input_chan_recv is not None - output_chan_sender: Sender[Sample[Quantity]] = ( - self._channel_registry.get_or_create( - Sample[Quantity], name - ).new_sender() - ) + + if name not in self._channel_lookup: + self._channel_lookup[name] = Broadcast(name=name, resend_latest=True) + + output_channel = self._channel_lookup[name] + output_chan_sender = output_channel.new_sender() + await request.telem_stream_sender.send(output_channel.new_receiver()) + task = asyncio.create_task( self._channel_forward_messages( input_chan_recv, From 05e28120498f7998e04cda6ab5f8553d8a09d4f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:46:07 +0100 Subject: [PATCH 03/11] Use oneshot channel setup in GridFrequency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../sdk/timeseries/_grid_frequency.py | 62 ++++++++++--------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 774b2a02b..07ebda951 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -8,8 +8,8 @@ import asyncio import logging -from frequenz.channels import Receiver, Sender -from frequenz.client.common.microgrid.components import ComponentId +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels.experimental import Pipe from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Frequency, Quantity @@ -23,20 +23,6 @@ _logger = logging.getLogger(__name__) -def create_request(component_id: ComponentId) -> ComponentMetricRequest: - """Create a request for grid frequency. - - Args: - component_id: The component id to use for the request. - - Returns: - A component metric request for grid frequency. - """ - return ComponentMetricRequest( - "grid-frequency", component_id, Metric.AC_FREQUENCY, None - ) - - class GridFrequency: """Grid Frequency.""" @@ -65,10 +51,27 @@ def __init__( ) self._channel_registry: ChannelRegistry = channel_registry self._source_component: Component = source - self._component_metric_request: ComponentMetricRequest = create_request( - self._source_component.id + + # Microgrid API source will send the stream through a oneshot channel + telem_stream_sender, self._telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] ) + self._component_metric_request = ComponentMetricRequest( + "grid-frequency", + self._source_component.id, + Metric.AC_FREQUENCY, + None, + telem_stream_sender, + ) + + # This channel merely forwards the telemetry stream. It is needed + # because we must return a receiver synchronously in new_receiver. + # The "real" channel for telemetry must be created in and owned by + # MicrogridApiSource, otherwise streams would not be reused. + self._forwarding_channel: Broadcast[Sample[Quantity]] | None = None + + # Sadly needed for testing self._task: None | asyncio.Task[None] = None @property @@ -86,18 +89,13 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: Returns: A receiver that will receive grid frequency samples. """ - receiver = self._channel_registry.get_or_create( - Sample[Quantity], self._component_metric_request.get_channel_name() - ).new_receiver() - - if not self._task: - self._task = asyncio.create_task(self._send_request()) - else: - _logger.info( - "Grid frequency request already sent: %s", self._source_component + if self._forwarding_channel is None: + self._forwarding_channel = Broadcast(name="Forward frequency samples") + self._task = asyncio.create_task( + self._send_request(self._forwarding_channel.new_sender()) ) - return receiver.map( + return self._forwarding_channel.new_receiver().map( lambda sample: ( Sample[Frequency](sample.timestamp, None) if sample.value is None or sample.value.isnan() @@ -107,7 +105,13 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: ) ) - async def _send_request(self) -> None: + async def _send_request(self, forwarding_sender: Sender[Sample[Quantity]]) -> None: """Send the request for grid frequency.""" await self._request_sender.send(self._component_metric_request) _logger.debug("Sent request for grid frequency: %s", self._source_component) + + # Receive the telemetry stream and forward it via pipe + telem_receiver: Receiver[Sample[Quantity]] = ( + await self._telem_stream_receiver.receive() + ) + await Pipe(telem_receiver, forwarding_sender).start() From 118fd0f586da7a873f644834f8545469b9f3e4c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:46:17 +0100 Subject: [PATCH 04/11] Use oneshot channel setup in VoltageStreamer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../sdk/timeseries/_voltage_streamer.py | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index 4759b362f..5709ae716 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -13,7 +13,7 @@ import logging from typing import TYPE_CHECKING -from frequenz.channels import Receiver, Sender +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot from frequenz.client.microgrid.component import Component, EvCharger, Inverter, Meter from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity, Voltage @@ -99,6 +99,8 @@ def __init__( self._channel_key = f"{self._namespace}-all-phases" """The channel key for the phase-to-neutral voltage streaming.""" + self._telem_channel: Broadcast[Sample3Phase[Voltage]] | None = None + @property def source(self) -> Component: """Get the component to fetch the phase-to-neutral voltage from. @@ -108,6 +110,14 @@ def source(self) -> Component: """ return self._source_component + @property + def _channel(self) -> Broadcast[Sample3Phase[Voltage]]: + if self._telem_channel is None: + self._telem_channel = Broadcast[Sample3Phase[Voltage]]( + name=self._channel_key + ) + return self._telem_channel + def new_receiver(self) -> Receiver[Sample3Phase[Voltage]]: """Create a receiver for the phase-to-neutral voltage. @@ -118,9 +128,7 @@ def new_receiver(self) -> Receiver[Sample3Phase[Voltage]]: A receiver that will receive the phase-to-neutral voltage as a 3-phase sample. """ - receiver = self._channel_registry.get_or_create( - Sample3Phase[Voltage], self._channel_key - ).new_receiver() + receiver = self._channel.new_receiver() if not self._task: self._task = asyncio.create_task(self._send_request()) @@ -143,21 +151,21 @@ async def _send_request(self) -> None: ) phases_rx: list[Receiver[Sample[Quantity]]] = [] for metric in metrics: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) req = ComponentMetricRequest( - self._namespace, self._source_component.id, metric, None + namespace=self._namespace, + component_id=self._source_component.id, + metric=metric, + start_time=None, + telem_stream_sender=telem_stream_sender, ) await self._resampler_subscription_sender.send(req) + phases_rx.append(await telem_stream_receiver.receive()) - phases_rx.append( - self._channel_registry.get_or_create( - Sample[Quantity], req.get_channel_name() - ).new_receiver() - ) - - sender = self._channel_registry.get_or_create( - Sample3Phase[Voltage], self._channel_key - ).new_sender() + sender = self._channel.new_sender() _logger.debug( "Sent request for fetching voltage from: %s", self._source_component From bdc4caf117b77add33cb13bbccca37dd59dd7e12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:50:44 +0100 Subject: [PATCH 05/11] Added oneshot channel to ReportRequest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../_power_managing/_base_classes.py | 6 +++- .../_power_managing/_power_managing_actor.py | 28 +++++++++++++------ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py b/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py index cd575229a..40616de63 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_base_classes.py @@ -10,6 +10,7 @@ import enum import typing +from frequenz.channels import Receiver, Sender from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -31,7 +32,10 @@ class ReportRequest: """The component IDs to report on.""" priority: int - """The priority of the actor .""" + """The priority of the actor.""" + + report_stream_sender: Sender[Receiver[_Report]] + """Oneshot sender to transmit the report receiver.""" def get_channel_name(self) -> str: """Get the channel name for the report request. diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index 0dead0c29..b06586917 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -11,7 +11,7 @@ from datetime import datetime, timedelta, timezone from typing import assert_never -from frequenz.channels import Receiver, Sender, select, selected_from +from frequenz.channels import Broadcast, Receiver, Sender, select, selected_from from frequenz.channels.timer import SkipMissedAndDrift, Timer from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter @@ -72,6 +72,7 @@ def __init__( # pylint: disable=too-many-arguments self._power_distributing_results_receiver = power_distributing_results_receiver self._channel_registry = channel_registry self._proposals_receiver = proposals_receiver + self._channel_lookup: dict[str, Broadcast[_Report]] = {} self._system_bounds: dict[frozenset[ComponentId], SystemBounds] = {} self._bound_tracker_tasks: dict[frozenset[ComponentId], asyncio.Task[None]] = {} @@ -233,18 +234,27 @@ async def _run(self) -> None: component_ids = sub.component_ids priority = sub.priority + async def get_or_create_channel( + subscription: ReportRequest, + ) -> Broadcast[_Report]: + channel_name = subscription.get_channel_name() + if channel_name not in self._channel_lookup: + report_channel = Broadcast[_Report](name=channel_name) + report_channel.resend_latest = True + self._channel_lookup[channel_name] = report_channel + await subscription.report_stream_sender.send( + report_channel.new_receiver() + ) + return self._channel_lookup[channel_name] + if component_ids not in self._subscriptions: + channel = await get_or_create_channel(sub) self._subscriptions[component_ids] = { - priority: self._channel_registry.get_or_create( - _Report, sub.get_channel_name() - ).new_sender() + priority: channel.new_sender() } elif priority not in self._subscriptions[component_ids]: - self._subscriptions[component_ids][priority] = ( - self._channel_registry.get_or_create( - _Report, sub.get_channel_name() - ).new_sender() - ) + channel = await get_or_create_channel(sub) + self._subscriptions[component_ids][priority] = channel.new_sender() if sub.component_ids not in self._bound_tracker_tasks: self._add_system_bounds_tracker(sub.component_ids) From 5ab184abff956c17d29d34410194d0e7d91e6b51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 12:51:07 +0100 Subject: [PATCH 06/11] Using oneshot channel for report requests in all use cases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../timeseries/battery_pool/_battery_pool.py | 44 +++++++++++++++--- .../ev_charger_pool/_ev_charger_pool.py | 45 +++++++++++++++---- .../sdk/timeseries/pv_pool/_pv_pool.py | 45 +++++++++++++++---- 3 files changed, 109 insertions(+), 25 deletions(-) diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 3be021a85..65c6e4d02 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -12,12 +12,15 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Energy, Percentage, Power, Temperature from ... import timeseries from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing, connection_manager +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Sample from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -74,6 +77,7 @@ def __init__( unique_id = str(uuid.uuid4()) self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None async def propose_power( self, @@ -329,22 +333,48 @@ def power_status(self) -> ReceiverFetcher[BatteryPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = make_oneshot( + Receiver[_Report] # type: ignore[type-abstract] + ) + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store._batteries, + report_stream_sender=report_stream_sender, + ) + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, ) - self._pool_ref_store._power_bounds_subs[sub.get_channel_name()] = ( + + self._pool_ref_store._power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store._power_manager_bounds_subscription_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store._channel_registry.get_or_create( - _power_managing._Report, sub.get_channel_name() + + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store._power_manager_bounds_subscription_sender.send( + request ) - channel.resend_latest = True - return channel + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + pipe = Pipe(report_receiver, forwarding_sender) + await pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index 77327eb7d..d8bc9f5c7 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -8,11 +8,14 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Current, Power from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing, connection_manager +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Bounds from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -61,6 +64,7 @@ def __init__( # pylint: disable=too-many-arguments unique_id = str(uuid.uuid4()) self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None async def propose_power( self, @@ -171,23 +175,46 @@ def power_status(self) -> ReceiverFetcher[EVChargerPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = make_oneshot( + Receiver[_Report] # type: ignore[type-abstract] + ) + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store.component_ids, + report_stream_sender=report_stream_sender, + ) + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, ) - self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( + + self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store.channel_registry.get_or_create( - _power_managing._Report, # pylint: disable=protected-access - sub.get_channel_name(), - ) - channel.resend_latest = True - return channel + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store.power_manager_bounds_subs_sender.send(request) + + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + pipe = Pipe(report_receiver, forwarding_sender) + await pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index 6bbb91e15..dac444d30 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -7,6 +7,8 @@ import uuid from collections import abc +from frequenz.channels import Broadcast, Receiver, Sender, make_oneshot +from frequenz.channels.experimental import Pipe from frequenz.client.common.microgrid.components import ComponentId from frequenz.quantities import Power @@ -14,6 +16,7 @@ from ..._internal._channels import MappingReceiverFetcher, ReceiverFetcher from ...microgrid import _power_distributing, _power_managing +from ...microgrid._power_managing import ReportRequest, _Report from ...timeseries import Bounds from .._base_types import SystemBounds from ..formulas._formula import Formula @@ -56,6 +59,7 @@ def __init__( # pylint: disable=too-many-arguments unique_id = uuid.uuid4() self._source_id = str(unique_id) if name is None else f"{name}-{unique_id}" self._priority = priority + self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None async def propose_power( self, @@ -143,23 +147,46 @@ def power_status(self) -> ReceiverFetcher[PVPoolReport]: Returns: A receiver that will stream power status reports for the pool's priority. """ - sub = _power_managing.ReportRequest( + report_stream_sender, self._report_stream_receiver = make_oneshot( + Receiver[_Report] # type: ignore[type-abstract] + ) + request = _power_managing.ReportRequest( source_id=self._source_id, priority=self._priority, component_ids=self._pool_ref_store.component_ids, + report_stream_sender=report_stream_sender, + ) + + forwarding_channel = Broadcast[_Report]( + name=request.get_channel_name() + ":Forwarded", + resend_latest=True, ) - self._pool_ref_store.power_bounds_subs[sub.get_channel_name()] = ( + + self._pool_ref_store.power_bounds_subs[request.get_channel_name()] = ( asyncio.create_task( - self._pool_ref_store.power_manager_bounds_subs_sender.send(sub) + self._send_request(forwarding_channel.new_sender(), request) ) ) - channel = self._pool_ref_store.channel_registry.get_or_create( - _power_managing._Report, # pylint: disable=protected-access - sub.get_channel_name(), - ) - channel.resend_latest = True - return channel + return forwarding_channel + + async def _send_request( + self, + forwarding_sender: Sender[_Report], + request: ReportRequest, + ) -> None: + """Send the report request and receive the report channel. + + Connect it via pipe to the channel that was returned in power_status. + """ + await self._pool_ref_store.power_manager_bounds_subs_sender.send(request) + + assert self._report_stream_receiver is not None + report_receiver: Receiver[_Report] = ( + await self._report_stream_receiver.receive() + ) + pipe = Pipe(report_receiver, forwarding_sender) + await pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: From 2bc0206a231f81660bbe4be76f6df2b3750849d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Thu, 5 Mar 2026 12:20:55 +0100 Subject: [PATCH 07/11] Stopped using ChannelRegistry in benchmarks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../timeseries/benchmark_datasourcing.py | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/benchmarks/timeseries/benchmark_datasourcing.py b/benchmarks/timeseries/benchmark_datasourcing.py index dd430aa42..005424d9c 100644 --- a/benchmarks/timeseries/benchmark_datasourcing.py +++ b/benchmarks/timeseries/benchmark_datasourcing.py @@ -16,15 +16,16 @@ from time import perf_counter from typing import Any -from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError +from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, make_oneshot from frequenz.client.microgrid.metrics import Metric +from frequenz.quantities import Quantity from frequenz.sdk import microgrid -from frequenz.sdk._internal._channels import ChannelRegistry from frequenz.sdk.microgrid._data_sourcing import ( ComponentMetricRequest, DataSourcingActor, ) +from frequenz.sdk.timeseries import Sample try: from tests.timeseries.mock_microgrid import MockMicrogrid @@ -80,7 +81,6 @@ async def benchmark_data_sourcing( # pylint: disable=too-many-locals name="DataSourcingActor Request Channel" ) - channel_registry = ChannelRegistry(name="Microgrid Channel Registry") request_receiver = request_channel.new_receiver( name="datasourcing-benchmark", limit=(num_ev_chargers * len(COMPONENT_METRIC_IDS)), @@ -105,18 +105,21 @@ async def consume(channel: Receiver[Any]) -> None: for evc_id in mock_grid.evc_ids: for component_metric_id in COMPONENT_METRIC_IDS: + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) request = ComponentMetricRequest( - "current_phase_requests", evc_id, component_metric_id, None + namespace="current_phase_requests", + component_id=evc_id, + metric=component_metric_id, + start_time=None, + telem_stream_sender=telem_stream_sender, ) - - recv_channel = channel_registry.get_or_create( - ComponentMetricRequest, request.get_channel_name() - ).new_receiver() - await request_sender.send(request) - consume_tasks.append(asyncio.create_task(consume(recv_channel))) + stream_receiver = await telem_stream_receiver.receive() + consume_tasks.append(asyncio.create_task(consume(stream_receiver))) - async with DataSourcingActor(request_receiver, channel_registry): + async with DataSourcingActor(request_receiver): await asyncio.gather(*consume_tasks) time_taken = perf_counter() - start_time From 9a72f26f87e973f2ba391aa49258f0996bdd1b2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Thu, 5 Mar 2026 12:21:41 +0100 Subject: [PATCH 08/11] Remove ChannelRegistry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- src/frequenz/sdk/_internal/_channels.py | 123 ------------------ src/frequenz/sdk/microgrid/_data_pipeline.py | 23 +--- .../microgrid/_data_sourcing/data_sourcing.py | 6 +- .../_data_sourcing/microgrid_api_source.py | 14 +- .../_power_managing/_power_managing_actor.py | 4 - src/frequenz/sdk/microgrid/_power_wrapper.py | 6 +- src/frequenz/sdk/microgrid/_resampling.py | 5 - .../sdk/timeseries/_grid_frequency.py | 4 - .../sdk/timeseries/_voltage_streamer.py | 7 - .../_battery_pool_reference_store.py | 7 +- src/frequenz/sdk/timeseries/consumer.py | 4 - .../_ev_charger_pool_reference_store.py | 7 +- .../sdk/timeseries/formulas/_formula_pool.py | 6 - .../formulas/_resampled_stream_fetcher.py | 5 - src/frequenz/sdk/timeseries/grid.py | 5 - .../logical_meter/_logical_meter.py | 6 - src/frequenz/sdk/timeseries/producer.py | 4 - .../pv_pool/_pv_pool_reference_store.py | 7 +- tests/actor/test_channel_registry.py | 69 ---------- tests/timeseries/_formulas/utils.py | 1 - 20 files changed, 8 insertions(+), 305 deletions(-) delete mode 100644 tests/actor/test_channel_registry.py diff --git a/src/frequenz/sdk/_internal/_channels.py b/src/frequenz/sdk/_internal/_channels.py index 74b2f6929..4b11cbba7 100644 --- a/src/frequenz/sdk/_internal/_channels.py +++ b/src/frequenz/sdk/_internal/_channels.py @@ -6,7 +6,6 @@ import abc import dataclasses import logging -import traceback import typing from frequenz.channels import Broadcast, Receiver @@ -63,128 +62,6 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]: return self._mapping_function(self._fetcher.new_receiver(limit=limit)) -class ChannelRegistry: - """Dynamically creates, own and provide access to broadcast channels. - - It can be used by actors to dynamically establish a communication channel - between each other. - - The registry is responsible for creating channels when they are first requested via - the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method. - - The registry also stores type information to make sure that the same channel is not - used for different message types. - - Since the registry owns the channels, it is also responsible for closing them when - they are no longer needed. There is no way to remove a channel without closing it. - - Note: - This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels. - """ - - def __init__(self, *, name: str) -> None: - """Initialize this registry. - - Args: - name: A name to identify the registry in the logs. This name is also used as - a prefix for the channel names. - """ - self._name = name - self._channels: dict[str, _Entry] = {} - - @property - def name(self) -> str: - """The name of this registry.""" - return self._name - - def message_type(self, key: str) -> type: - """Get the message type of the channel for the given key. - - Args: - key: The key to identify the channel. - - Returns: - The message type of the channel. - - Raises: - KeyError: If the channel does not exist. - """ - entry = self._channels.get(key) - if entry is None: - raise KeyError(f"No channel for key {key!r} exists.") - return entry.message_type - - def __contains__(self, key: str) -> bool: - """Check whether the channel for the given `key` exists.""" - return key in self._channels - - def get_or_create(self, message_type: type[T], key: str) -> Broadcast[T]: - """Get or create a channel for the given key. - - If a channel for the given key already exists, the message type of the existing - channel is checked against the requested message type. If they do not match, - a `ValueError` is raised. - - Note: - The types have to match exactly, it doesn't do a subtype check due to - technical limitations. In the future subtype checks might be supported. - - Args: - message_type: The type of the message that is sent through the channel. - key: The key to identify the channel. - - Returns: - The channel for the given key. - - Raises: - ValueError: If the channel exists and the message type does not match. - """ - if key not in self._channels: - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug( - "Creating a new channel for key %r with type %s at:\n%s", - key, - message_type, - "".join(traceback.format_stack(limit=10)[:9]), - ) - self._channels[key] = _Entry( - message_type, Broadcast(name=f"{self._name}-{key}") - ) - - entry = self._channels[key] - if entry.message_type is not message_type: - error_message = ( - f"Type mismatch, a channel for key {key!r} exists and the requested " - f"message type {message_type} is not the same as the existing " - f"message type {entry.message_type}." - ) - if _logger.isEnabledFor(logging.DEBUG): - _logger.debug( - "%s at:\n%s", - error_message, - # We skip the last frame because it's this method, and limit the - # stack to 9 frames to avoid adding too much noise. - "".join(traceback.format_stack(limit=10)[:9]), - ) - raise ValueError(error_message) - - return typing.cast(Broadcast[T], entry.channel) - - async def close_and_remove(self, key: str) -> None: - """Remove the channel for the given key. - - Args: - key: The key to identify the channel. - - Raises: - KeyError: If the channel does not exist. - """ - entry = self._channels.pop(key, None) - if entry is None: - raise KeyError(f"No channel for key {key!r} exists.") - await entry.channel.close() - - @dataclasses.dataclass(frozen=True) class _Entry: """An entry in a channel registry.""" diff --git a/src/frequenz/sdk/microgrid/_data_pipeline.py b/src/frequenz/sdk/microgrid/_data_pipeline.py index aca47e4a9..4bf96f35d 100644 --- a/src/frequenz/sdk/microgrid/_data_pipeline.py +++ b/src/frequenz/sdk/microgrid/_data_pipeline.py @@ -20,7 +20,6 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter -from .._internal._channels import ChannelRegistry from ..actor._actor import Actor from ..timeseries import ResamplerConfig from ..timeseries._voltage_streamer import VoltageStreamer @@ -96,29 +95,22 @@ def __init__( """ self._resampler_config: ResamplerConfig = resampler_config - self._channel_registry: ChannelRegistry = ChannelRegistry( - name="Data Pipeline Registry" - ) - self._data_sourcing_actor: _ActorInfo | None = None self._resampling_actor: _ActorInfo | None = None self._battery_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=battery_power_manager_algorithm, default_power=DefaultPower.ZERO, component_class=Battery, ) self._ev_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA, default_power=DefaultPower.MAX, component_class=EvCharger, ) self._pv_power_wrapper = PowerWrapper( - self._channel_registry, api_power_request_timeout=api_power_request_timeout, power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA, default_power=DefaultPower.MIN, @@ -158,7 +150,6 @@ def frequency(self) -> GridFrequency: if self._frequency_instance is None: self._frequency_instance = GridFrequency( self._data_sourcing_request_sender(), - self._channel_registry, ) return self._frequency_instance @@ -166,10 +157,7 @@ def frequency(self) -> GridFrequency: def voltage_per_phase(self) -> VoltageStreamer: """Return the per-phase voltage measuring point.""" if not self._voltage_instance: - self._voltage_instance = VoltageStreamer( - self._resampling_request_sender(), - self._channel_registry, - ) + self._voltage_instance = VoltageStreamer(self._resampling_request_sender()) return self._voltage_instance @@ -179,7 +167,6 @@ def logical_meter(self) -> LogicalMeter: if self._logical_meter is None: self._logical_meter = LogicalMeter( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._logical_meter @@ -190,7 +177,6 @@ def consumer(self) -> Consumer: if self._consumer is None: self._consumer = Consumer( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._consumer @@ -201,7 +187,6 @@ def producer(self) -> Producer: if self._producer is None: self._producer = Producer( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) return self._producer @@ -213,7 +198,6 @@ def grid(self) -> Grid: if self._grid is None: initialize_grid( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), ) self._grid = get_grid() @@ -273,7 +257,6 @@ def new_ev_charger_pool( if ref_store_key not in self._ev_charger_pool_reference_stores: self._ev_charger_pool_reference_stores[ref_store_key] = ( EVChargerPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), status_receiver=self._ev_power_wrapper.status_channel.new_receiver( limit=1 @@ -346,7 +329,6 @@ def new_pv_pool( if ref_store_key not in self._pv_pool_reference_stores: self._pv_pool_reference_stores[ref_store_key] = PVPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), status_receiver=( self._pv_power_wrapper.status_channel.new_receiver(limit=1) @@ -422,7 +404,6 @@ def new_battery_pool( if ref_store_key not in self._battery_pool_reference_stores: self._battery_pool_reference_stores[ref_store_key] = ( BatteryPoolReferenceStore( - channel_registry=self._channel_registry, resampler_subscription_sender=self._resampling_request_sender(), batteries_status_receiver=( self._battery_power_wrapper.status_channel.new_receiver(limit=1) @@ -461,7 +442,6 @@ def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]: ) actor = DataSourcingActor( request_receiver=channel.new_receiver(limit=_REQUEST_RECV_BUFFER_SIZE), - registry=self._channel_registry, ) self._data_sourcing_actor = _ActorInfo(actor, channel) self._data_sourcing_actor.actor.start() @@ -482,7 +462,6 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]: name="Data Pipeline: Component Metric Resampling Actor Request Channel" ) actor = ComponentMetricsResamplingActor( - channel_registry=self._channel_registry, data_sourcing_request_sender=self._data_sourcing_request_sender(), resampling_request_receiver=channel.new_receiver( limit=_REQUEST_RECV_BUFFER_SIZE, diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py b/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py index 990e2b420..776768f29 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/data_sourcing.py @@ -5,7 +5,6 @@ from frequenz.channels import Receiver -from ..._internal._channels import ChannelRegistry from ...actor import Actor from ._component_metric_request import ComponentMetricRequest from .microgrid_api_source import MicrogridApiSource @@ -17,7 +16,6 @@ class DataSourcingActor(Actor): def __init__( self, request_receiver: Receiver[ComponentMetricRequest], - registry: ChannelRegistry, *, name: str | None = None, ) -> None: @@ -25,14 +23,12 @@ def __init__( Args: request_receiver: A channel receiver to accept metric requests from. - registry: A channel registry. To be replaced by a singleton - instance. name: The name of the actor. If `None`, `str(id(self))` will be used. This is used mostly for debugging purposes. """ super().__init__(name=name) self._request_receiver = request_receiver - self._microgrid_api_source = MicrogridApiSource(registry) + self._microgrid_api_source = MicrogridApiSource() async def _run(self) -> None: """Run the actor.""" diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 03faad41e..03f89474f 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -15,7 +15,6 @@ from frequenz.quantities import Quantity from ..._internal._asyncio import run_forever -from ..._internal._channels import ChannelRegistry from ...microgrid import connection_manager from ...timeseries import Sample from .._old_component_data import ( @@ -159,16 +158,8 @@ class MicrogridApiSource: Used by the DataSourcingActor. """ - def __init__( - self, - registry: ChannelRegistry, - ) -> None: - """Create a `MicrogridApiSource` instance. - - Args: - registry: A channel registry. To be replaced by a singleton - instance. - """ + def __init__(self) -> None: + """Create a `MicrogridApiSource` instance.""" self._comp_categories_cache: dict[ComponentId, ComponentCategory | int] = {} self.comp_data_receivers: dict[ComponentId, Receiver[Any]] = {} @@ -177,7 +168,6 @@ def __init__( self.comp_data_tasks: dict[ComponentId, asyncio.Task[None]] = {} """The dictionary of component IDs to asyncio tasks.""" - self._registry = registry self._req_streaming_metrics: dict[ ComponentId, dict[Metric | TransitionalMetric, list[ComponentMetricRequest]] ] = {} diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index b06586917..b58737242 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -18,7 +18,6 @@ from typing_extensions import override from ..._internal._asyncio import run_forever -from ..._internal._channels import ChannelRegistry from ...actor import Actor from ...timeseries._base_types import SystemBounds from .. import _data_pipeline, _power_distributing @@ -46,7 +45,6 @@ def __init__( # pylint: disable=too-many-arguments bounds_subscription_receiver: Receiver[ReportRequest], power_distributing_requests_sender: Sender[_power_distributing.Request], power_distributing_results_receiver: Receiver[_power_distributing.Result], - channel_registry: ChannelRegistry, algorithm: PowerManagerAlgorithm, default_power: DefaultPower, component_class: type[Battery | EvCharger | SolarInverter], @@ -60,7 +58,6 @@ def __init__( # pylint: disable=too-many-arguments requests. power_distributing_results_receiver: The receiver for power distribution results. - channel_registry: The channel registry. algorithm: The power management algorithm to use. default_power: The default power to use for the components. component_class: The class of component this instance is going to support. @@ -70,7 +67,6 @@ def __init__( # pylint: disable=too-many-arguments self._bounds_subscription_receiver = bounds_subscription_receiver self._power_distributing_requests_sender = power_distributing_requests_sender self._power_distributing_results_receiver = power_distributing_results_receiver - self._channel_registry = channel_registry self._proposals_receiver = proposals_receiver self._channel_lookup: dict[str, Broadcast[_Report]] = {} diff --git a/src/frequenz/sdk/microgrid/_power_wrapper.py b/src/frequenz/sdk/microgrid/_power_wrapper.py index 70e2e4e55..e82382335 100644 --- a/src/frequenz/sdk/microgrid/_power_wrapper.py +++ b/src/frequenz/sdk/microgrid/_power_wrapper.py @@ -11,7 +11,7 @@ from frequenz.channels import Broadcast from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter -from .._internal._channels import ChannelRegistry, ReceiverFetcher +from .._internal._channels import ReceiverFetcher # pylint seems to think this is a cyclic import, but it is not. # @@ -35,7 +35,6 @@ class PowerWrapper: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, - channel_registry: ChannelRegistry, *, api_power_request_timeout: timedelta, power_manager_algorithm: PowerManagerAlgorithm, @@ -45,7 +44,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize the power control. Args: - channel_registry: A channel registry for use in the actors. api_power_request_timeout: Timeout to use when making power requests to the microgrid API. power_manager_algorithm: The power management algorithm to use. @@ -55,7 +53,6 @@ def __init__( # pylint: disable=too-many-arguments self._default_power = default_power self._power_manager_algorithm = power_manager_algorithm self._component_class = component_class - self._channel_registry = channel_registry self._api_power_request_timeout = api_power_request_timeout self.status_channel: Broadcast[ComponentPoolStatus] = Broadcast( @@ -109,7 +106,6 @@ def _start_power_managing_actor(self) -> None: power_distributing_results_receiver=( self._power_distribution_results_channel.new_receiver() ), - channel_registry=self._channel_registry, ) self._power_managing_actor.start() diff --git a/src/frequenz/sdk/microgrid/_resampling.py b/src/frequenz/sdk/microgrid/_resampling.py index 0e330fe9f..8457f1883 100644 --- a/src/frequenz/sdk/microgrid/_resampling.py +++ b/src/frequenz/sdk/microgrid/_resampling.py @@ -11,7 +11,6 @@ from frequenz.quantities import Quantity from .._internal._asyncio import cancel_and_await -from .._internal._channels import ChannelRegistry from ..actor._actor import Actor from ..timeseries import Sample from ..timeseries._resampling._config import ResamplerConfig @@ -28,7 +27,6 @@ class ComponentMetricsResamplingActor(Actor): def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, data_sourcing_request_sender: Sender[ComponentMetricRequest], resampling_request_receiver: Receiver[ComponentMetricRequest], config: ResamplerConfig, @@ -37,8 +35,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize an instance. Args: - channel_registry: The channel registry used to get senders and - receivers for data sourcing subscriptions. data_sourcing_request_sender: The sender used to send requests to the [`DataSourcingActor`][frequenz.sdk.actor.DataSourcingActor] to subscribe to component metrics. @@ -49,7 +45,6 @@ def __init__( # pylint: disable=too-many-arguments is used mostly for debugging purposes. """ super().__init__(name=name) - self._channel_registry: ChannelRegistry = channel_registry self._data_sourcing_request_sender: Sender[ComponentMetricRequest] = ( data_sourcing_request_sender ) diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 07ebda951..5a35ee494 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -14,7 +14,6 @@ from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Frequency, Quantity -from .._internal._channels import ChannelRegistry from .._internal._graph_traversal import find_first_descendant_component from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest @@ -29,14 +28,12 @@ class GridFrequency: def __init__( self, data_sourcing_request_sender: Sender[ComponentMetricRequest], - channel_registry: ChannelRegistry, source: Component | None = None, ): """Initialize the grid frequency formula generator. Args: data_sourcing_request_sender: The sender to use for requests. - channel_registry: The channel registry to use for the grid frequency. source: The source component to use to receive the grid frequency. """ if not source: @@ -49,7 +46,6 @@ def __init__( self._request_sender: Sender[ComponentMetricRequest] = ( data_sourcing_request_sender ) - self._channel_registry: ChannelRegistry = channel_registry self._source_component: Component = source # Microgrid API source will send the stream through a oneshot channel diff --git a/src/frequenz/sdk/timeseries/_voltage_streamer.py b/src/frequenz/sdk/timeseries/_voltage_streamer.py index 5709ae716..d784dce25 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streamer.py +++ b/src/frequenz/sdk/timeseries/_voltage_streamer.py @@ -18,7 +18,6 @@ from frequenz.client.microgrid.metrics import Metric from frequenz.quantities import Quantity, Voltage -from .._internal._channels import ChannelRegistry from .._internal._graph_traversal import find_first_descendant_component from ..timeseries._base_types import Sample, Sample3Phase @@ -55,7 +54,6 @@ class VoltageStreamer: def __init__( self, resampler_subscription_sender: Sender[ComponentMetricRequest], - channel_registry: ChannelRegistry, source_component: Component | None = None, ): """Initialize the phase-to-neutral voltage streaming. @@ -63,8 +61,6 @@ def __init__( Args: resampler_subscription_sender: The sender for sending metric requests to the resampling actor. - channel_registry: The channel registry for the phase-to-neutral - voltage streaming. source_component: The source component to receive the phase-to-neutral voltage. If None, it fetches the source component from the connection manager. @@ -73,9 +69,6 @@ def __init__( self._resampler_subscription_sender = resampler_subscription_sender """The sender for sending metric requests to the resampling actor.""" - self._channel_registry = channel_registry - """The channel registry for the phase-to-neutral voltage streaming.""" - from ..microgrid import ( # pylint: disable=import-outside-toplevel connection_manager, ) diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py index 68a03f597..34d549174 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool_reference_store.py @@ -15,7 +15,7 @@ from frequenz.client.microgrid.component import Battery from ..._internal._asyncio import cancel_and_await -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import Result @@ -41,7 +41,6 @@ class BatteryPoolReferenceStore: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], batteries_status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -53,8 +52,6 @@ def __init__( # pylint: disable=too-many-arguments """Create the class instance. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. batteries_status_receiver: Receiver to receive status of the batteries. @@ -125,13 +122,11 @@ def __init__( # pylint: disable=too-many-arguments self._power_bounds_subs: dict[str, asyncio.Task[None]] = {} self._namespace: str = f"battery-pool-{self._batteries}-{uuid.uuid4()}" self._power_distributing_namespace: str = f"power-distributor-{self._namespace}" - self._channel_registry: ChannelRegistry = channel_registry self._power_dist_results_fetcher: ReceiverFetcher[Result] = ( power_distribution_results_fetcher ) self._formula_pool: FormulaPool = FormulaPool( self._namespace, - self._channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/consumer.py b/src/frequenz/sdk/timeseries/consumer.py index 0376c0444..50d40c1a6 100644 --- a/src/frequenz/sdk/timeseries/consumer.py +++ b/src/frequenz/sdk/timeseries/consumer.py @@ -8,7 +8,6 @@ from frequenz.channels import Sender from frequenz.quantities import Power -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from .formulas._formula import Formula @@ -57,19 +56,16 @@ class Consumer: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the consumer formula generator. Args: - channel_registry: The channel registry to use for the consumer. resampler_subscription_sender: The sender to use for resampler subscriptions. """ namespace = f"consumer-{uuid.uuid4()}" self._formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py index 7e4d4c27b..8b7e00971 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool_reference_store.py @@ -11,7 +11,7 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import EvCharger -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import ComponentPoolStatus, Result @@ -37,7 +37,6 @@ class EVChargerPoolReferenceStore: def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -48,8 +47,6 @@ def __init__( # pylint: disable=too-many-arguments """Create an instance of the class. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. status_receiver: A receiver that streams the status of the EV Chargers in @@ -68,7 +65,6 @@ def __init__( # pylint: disable=too-many-arguments ValueError: If any of the specified component_ids are not EV chargers or are unknown to the component graph. """ - self.channel_registry = channel_registry self.resampler_subscription_sender = resampler_subscription_sender self.status_receiver = status_receiver self.power_manager_requests_sender = power_manager_requests_sender @@ -96,7 +92,6 @@ def __init__( # pylint: disable=too-many-arguments self.namespace: str = f"ev-charger-pool-{uuid.uuid4()}" self.formula_pool = FormulaPool( self.namespace, - self.channel_registry, self.resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/formulas/_formula_pool.py b/src/frequenz/sdk/timeseries/formulas/_formula_pool.py index d0887c7e2..107ab449a 100644 --- a/src/frequenz/sdk/timeseries/formulas/_formula_pool.py +++ b/src/frequenz/sdk/timeseries/formulas/_formula_pool.py @@ -15,7 +15,6 @@ ResampledStreamFetcher, ) -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest from ._formula import Formula from ._formula_3_phase import Formula3Phase @@ -44,20 +43,16 @@ class FormulaPool: def __init__( self, namespace: str, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Create a new instance. Args: namespace: namespace to use with the data pipeline. - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. """ self._namespace: str = namespace - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) @@ -276,7 +271,6 @@ def _telemetry_fetcher(self, metric: Metric) -> ResampledStreamFetcher: """ return ResampledStreamFetcher( self._namespace, - self._channel_registry, self._resampler_subscription_sender, metric, ) diff --git a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py index 0f2165d4c..f72860fe2 100644 --- a/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py +++ b/src/frequenz/sdk/timeseries/formulas/_resampled_stream_fetcher.py @@ -9,7 +9,6 @@ from frequenz.sdk.timeseries import Sample -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest, Metric from ...microgrid._old_component_data import TransitionalMetric @@ -20,7 +19,6 @@ class ResampledStreamFetcher: def __init__( self, namespace: str, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], metric: Metric | TransitionalMetric, ): @@ -29,14 +27,11 @@ def __init__( Args: namespace: The unique namespace to allow reuse of streams in the data pipeline. - channel_registry: The channel registry instance shared with the resampling - and the data sourcing actors. resampler_subscription_sender: A sender to send metric requests to the resampling actor. metric: The metric to fetch for all components in this formula. """ self._namespace: str = namespace - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) diff --git a/src/frequenz/sdk/timeseries/grid.py b/src/frequenz/sdk/timeseries/grid.py index b79a690f9..f9c788b10 100644 --- a/src/frequenz/sdk/timeseries/grid.py +++ b/src/frequenz/sdk/timeseries/grid.py @@ -16,7 +16,6 @@ from frequenz.client.microgrid.component import GridConnectionPoint from frequenz.quantities import Current, Power, ReactivePower -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from ._fuse import Fuse @@ -155,14 +154,11 @@ async def stop(self) -> None: def initialize( - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the grid connection. Args: - channel_registry: The channel registry instance shared with the - resampling actor. resampler_subscription_sender: The sender for sending metric requests to the resampling actor. @@ -204,7 +200,6 @@ def initialize( namespace = f"grid-{uuid.uuid4()}" formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py index 00df8f776..220581125 100644 --- a/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py +++ b/src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py @@ -12,7 +12,6 @@ from frequenz.sdk.microgrid import connection_manager -from ..._internal._channels import ChannelRegistry from ...microgrid._data_sourcing import ComponentMetricRequest from ..formulas._formula import Formula from ..formulas._formula_pool import FormulaPool @@ -57,7 +56,6 @@ class LogicalMeter: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Create a `LogicalMeter` instance. @@ -68,12 +66,9 @@ def __init__( for creating `LogicalMeter` instances. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. """ - self._channel_registry: ChannelRegistry = channel_registry self._resampler_subscription_sender: Sender[ComponentMetricRequest] = ( resampler_subscription_sender ) @@ -83,7 +78,6 @@ def __init__( self._namespace: str = f"logical-meter-{uuid.uuid4()}" self._formula_pool: FormulaPool = FormulaPool( self._namespace, - self._channel_registry, self._resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/producer.py b/src/frequenz/sdk/timeseries/producer.py index 0c82ae16c..0aa89bd2f 100644 --- a/src/frequenz/sdk/timeseries/producer.py +++ b/src/frequenz/sdk/timeseries/producer.py @@ -8,7 +8,6 @@ from frequenz.channels import Sender from frequenz.quantities import Power -from .._internal._channels import ChannelRegistry from ..microgrid import connection_manager from ..microgrid._data_sourcing import ComponentMetricRequest from .formulas._formula import Formula @@ -57,19 +56,16 @@ class Producer: def __init__( self, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], ) -> None: """Initialize the producer formula generator. Args: - channel_registry: The channel registry to use for the producer. resampler_subscription_sender: The sender to use for resampler subscriptions. """ namespace = f"producer-{uuid.uuid4()}" self._formula_pool = FormulaPool( namespace, - channel_registry, resampler_subscription_sender, ) diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py index e893f9a36..d8b3ed641 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool_reference_store.py @@ -12,7 +12,7 @@ from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.microgrid.component import SolarInverter -from ..._internal._channels import ChannelRegistry, ReceiverFetcher +from ..._internal._channels import ReceiverFetcher from ...microgrid import connection_manager from ...microgrid._data_sourcing import ComponentMetricRequest from ...microgrid._power_distributing import ComponentPoolStatus, Result @@ -38,7 +38,6 @@ class PVPoolReferenceStore: def __init__( # pylint: disable=too-many-arguments self, *, - channel_registry: ChannelRegistry, resampler_subscription_sender: Sender[ComponentMetricRequest], status_receiver: Receiver[ComponentPoolStatus], power_manager_requests_sender: Sender[Proposal], @@ -49,8 +48,6 @@ def __init__( # pylint: disable=too-many-arguments """Initialize this instance. Args: - channel_registry: A channel registry instance shared with the resampling - actor. resampler_subscription_sender: A sender for sending metric requests to the resampling actor. status_receiver: A receiver that streams the status of the PV inverters in @@ -69,7 +66,6 @@ def __init__( # pylint: disable=too-many-arguments ValueError: If any of the provided component_ids are not PV inverters or are unknown to the component graph. """ - self.channel_registry = channel_registry self.resampler_subscription_sender = resampler_subscription_sender self.status_receiver = status_receiver self.power_manager_requests_sender = power_manager_requests_sender @@ -98,7 +94,6 @@ def __init__( # pylint: disable=too-many-arguments self.namespace: str = f"pv-pool-{uuid.uuid4()}" self.formula_pool = FormulaPool( self.namespace, - self.channel_registry, self.resampler_subscription_sender, ) self.bounds_channel: Broadcast[SystemBounds] = Broadcast( diff --git a/tests/actor/test_channel_registry.py b/tests/actor/test_channel_registry.py deleted file mode 100644 index 2966a9ad3..000000000 --- a/tests/actor/test_channel_registry.py +++ /dev/null @@ -1,69 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Tests for the ChannelRegistry.""" - -import pytest -from frequenz.channels import ReceiverError, SenderError - -from frequenz.sdk._internal._channels import ChannelRegistry - - -async def test_channel_registry() -> None: - """Tests for ChannelRegistry, with string as key type.""" - reg = ChannelRegistry(name="test-registry") - - assert "20-hello" not in reg - assert "21-hello" not in reg - - chan20 = reg.get_or_create(int, "20-hello") - assert "20-hello" in reg - assert reg.message_type("20-hello") == int - - with pytest.raises(ValueError): - reg.get_or_create(str, "20-hello") - - sender20 = chan20.new_sender() - receiver20 = chan20.new_receiver() - - assert "21-hello" not in reg - - chan21 = reg.get_or_create(int, "21-hello") - assert "21-hello" in reg - assert reg.message_type("21-hello") == int - - sender21 = chan21.new_sender() - receiver21 = chan21.new_receiver() - - await sender20.send(30) - await sender21.send(31) - - rcvd = await receiver21.receive() - assert rcvd == 31 - - rcvd = await receiver20.receive() - assert rcvd == 30 - - await reg.close_and_remove("20-hello") - assert "20-hello" not in reg - assert chan20._closed # pylint: disable=protected-access - with pytest.raises(SenderError): - await sender20.send(30) - with pytest.raises(ReceiverError): - await receiver20.receive() - with pytest.raises(KeyError): - reg.message_type("20-hello") - with pytest.raises(KeyError): - await reg.close_and_remove("20-hello") - - await reg.close_and_remove("21-hello") - assert "21-hello" not in reg - assert chan21._closed # pylint: disable=protected-access - with pytest.raises(SenderError): - await sender21.send(30) - with pytest.raises(ReceiverError): - await receiver21.receive() - with pytest.raises(KeyError): - reg.message_type("21-hello") - with pytest.raises(KeyError): - await reg.close_and_remove("21-hello") diff --git a/tests/timeseries/_formulas/utils.py b/tests/timeseries/_formulas/utils.py index 471d7e9c7..ee0ae1b22 100644 --- a/tests/timeseries/_formulas/utils.py +++ b/tests/timeseries/_formulas/utils.py @@ -31,7 +31,6 @@ async def get_resampled_stream( # pylint: disable=protected-access builder = ResampledStreamFetcher( namespace=namespace, - channel_registry=_data_pipeline._get()._channel_registry, resampler_subscription_sender=_data_pipeline._get()._resampling_request_sender(), metric=metric, ) From 4e1ce14907057a3b37ebd871878912a922a24bea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Fri, 6 Mar 2026 15:38:38 +0100 Subject: [PATCH 09/11] Update release notes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- RELEASE_NOTES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5f93b7d7b..1302eda7b 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -15,3 +15,4 @@ ## Bug Fixes - Improved formula validation: Consistent error messages for invalid formulas and conventional span semantics. +- Removed ChannelRegistry. Streams are not being set up using one-shot channels and owned by the data source. From 43bcffec4e74927da14670fca2761a8b78418209 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 14:39:51 +0100 Subject: [PATCH 10/11] Keep a reference to Pipe objects to prevent it from being garbage collected MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- src/frequenz/sdk/timeseries/_grid_frequency.py | 5 ++++- src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py | 6 ++++-- .../sdk/timeseries/ev_charger_pool/_ev_charger_pool.py | 6 ++++-- src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py | 6 ++++-- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 5a35ee494..58b072a10 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -69,6 +69,8 @@ def __init__( # Sadly needed for testing self._task: None | asyncio.Task[None] = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[Sample[Quantity]] | None = None @property def source(self) -> Component: @@ -110,4 +112,5 @@ async def _send_request(self, forwarding_sender: Sender[Sample[Quantity]]) -> No telem_receiver: Receiver[Sample[Quantity]] = ( await self._telem_stream_receiver.receive() ) - await Pipe(telem_receiver, forwarding_sender).start() + self._pipe = Pipe(telem_receiver, forwarding_sender) + await self._pipe.start() diff --git a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py index 65c6e4d02..f2da953a4 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py @@ -78,6 +78,8 @@ def __init__( self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -373,8 +375,8 @@ async def _send_request( report_receiver: Receiver[_Report] = ( await self._report_stream_receiver.receive() ) - pipe = Pipe(report_receiver, forwarding_sender) - await pipe.start() + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py index d8bc9f5c7..12bd79814 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_ev_charger_pool.py @@ -65,6 +65,8 @@ def __init__( # pylint: disable=too-many-arguments self._source_id = unique_id if name is None else f"{name}-{unique_id}" self._priority = priority self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -213,8 +215,8 @@ async def _send_request( report_receiver: Receiver[_Report] = ( await self._report_stream_receiver.receive() ) - pipe = Pipe(report_receiver, forwarding_sender) - await pipe.start() + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: diff --git a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py index dac444d30..ef3598041 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_pv_pool.py @@ -60,6 +60,8 @@ def __init__( # pylint: disable=too-many-arguments self._source_id = str(unique_id) if name is None else f"{name}-{unique_id}" self._priority = priority self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None + # Keep a reference to prevent garbage collector from destroying pipe + self._pipe: Pipe[_Report] | None = None async def propose_power( self, @@ -185,8 +187,8 @@ async def _send_request( report_receiver: Receiver[_Report] = ( await self._report_stream_receiver.receive() ) - pipe = Pipe(report_receiver, forwarding_sender) - await pipe.start() + self._pipe = Pipe(report_receiver, forwarding_sender) + await self._pipe.start() @property def power_distribution_results(self) -> ReceiverFetcher[_power_distributing.Result]: From a28fab2f7152735c4e6c9ea6d1899b8c60ec3b00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20V=C3=B6lcker?= Date: Mon, 9 Mar 2026 14:38:36 +0100 Subject: [PATCH 11/11] Deprecate GridFrequency.new_receiver in favor of .subscribe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Simon Völcker --- .../sdk/timeseries/_grid_frequency.py | 42 ++++++-- tests/timeseries/test_frequency_streaming.py | 98 ++++++++++++------- 2 files changed, 95 insertions(+), 45 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_grid_frequency.py b/src/frequenz/sdk/timeseries/_grid_frequency.py index 58b072a10..7e504a227 100644 --- a/src/frequenz/sdk/timeseries/_grid_frequency.py +++ b/src/frequenz/sdk/timeseries/_grid_frequency.py @@ -81,9 +81,27 @@ def source(self) -> Component: """ return self._source_component + @staticmethod + def _map_frequency_samples( + receiver: Receiver[Sample[Quantity]], + ) -> Receiver[Sample[Frequency]]: + """Handle NaN values and map sample type to Frequency.""" + return receiver.map( + lambda sample: ( + Sample[Frequency](sample.timestamp, None) + if sample.value is None or sample.value.isnan() + else Sample( + sample.timestamp, Frequency.from_hertz(sample.value.base_value) + ) + ) + ) + def new_receiver(self) -> Receiver[Sample[Frequency]]: """Create a receiver for grid frequency. + Deprecated: + Use subscribe() instead. + Returns: A receiver that will receive grid frequency samples. """ @@ -93,15 +111,23 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]: self._send_request(self._forwarding_channel.new_sender()) ) - return self._forwarding_channel.new_receiver().map( - lambda sample: ( - Sample[Frequency](sample.timestamp, None) - if sample.value is None or sample.value.isnan() - else Sample( - sample.timestamp, Frequency.from_hertz(sample.value.base_value) - ) - ) + return self._map_frequency_samples(self._forwarding_channel.new_receiver()) + + async def subscribe(self) -> Receiver[Sample[Frequency]]: + """Create a receiver for grid frequency.""" + telem_stream_sender, telem_stream_receiver = make_oneshot( + Receiver[Sample[Quantity]] # type: ignore[type-abstract] + ) + component_metric_request = ComponentMetricRequest( + "grid-frequency", + self._source_component.id, + Metric.AC_FREQUENCY, + None, + telem_stream_sender, ) + await self._request_sender.send(component_metric_request) + receiver = await telem_stream_receiver.receive() + return self._map_frequency_samples(receiver) async def _send_request(self, forwarding_sender: Sender[Sample[Quantity]]) -> None: """Send the request for grid frequency.""" diff --git a/tests/timeseries/test_frequency_streaming.py b/tests/timeseries/test_frequency_streaming.py index 25077c3b9..ef47f1ce8 100644 --- a/tests/timeseries/test_frequency_streaming.py +++ b/tests/timeseries/test_frequency_streaming.py @@ -7,6 +7,7 @@ import asyncio from datetime import datetime, timezone +import pytest from frequenz.quantities import Frequency from pytest_mock import MockerFixture @@ -19,7 +20,8 @@ # pylint: disable=protected-access -async def test_grid_frequency_none(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("use_subscribe", [True, False]) +async def test_grid_frequency_none(mocker: MockerFixture, use_subscribe: bool) -> None: """Test the grid frequency formula.""" mockgrid = MockMicrogrid(grid_meter=True) mockgrid.add_batteries(2) @@ -27,13 +29,16 @@ async def test_grid_frequency_none(mocker: MockerFixture) -> None: await mockgrid.start(mocker) grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - - assert grid_freq._task is not None - # We have to wait for the metric request to be sent - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) await mockgrid.mock_client.send( component_data_wrapper.MeterDataWrapper( @@ -47,7 +52,8 @@ async def test_grid_frequency_none(mocker: MockerFixture) -> None: await mockgrid.cleanup() -async def test_grid_frequency_1(mocker: MockerFixture) -> None: +@pytest.mark.parametrize("use_subscribe", [True, False]) +async def test_grid_frequency_1(mocker: MockerFixture, use_subscribe: bool) -> None: """Test the grid frequency formula.""" mockgrid = MockMicrogrid(grid_meter=True, mocker=mocker) mockgrid.add_batteries(2) @@ -55,13 +61,16 @@ async def test_grid_frequency_1(mocker: MockerFixture) -> None: async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - - assert grid_freq._task is not None - # We have to wait for the metric request to be sent - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] grid_meter_data = [] @@ -82,8 +91,9 @@ async def test_grid_frequency_1(mocker: MockerFixture) -> None: assert equal_float_lists(results, grid_meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_no_grid_meter_no_consumer_meter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without a grid side meter.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -94,12 +104,16 @@ async def test_grid_frequency_no_grid_meter_no_consumer_meter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = [] @@ -120,8 +134,9 @@ async def test_grid_frequency_no_grid_meter_no_consumer_meter( assert equal_float_lists(results, meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_no_grid_meter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without a grid side meter.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -131,12 +146,16 @@ async def test_grid_frequency_no_grid_meter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = [] @@ -157,8 +176,9 @@ async def test_grid_frequency_no_grid_meter( assert equal_float_lists(results, meter_data) +@pytest.mark.parametrize("use_subscribe", [True, False]) async def test_grid_frequency_only_inverter( - mocker: MockerFixture, + mocker: MockerFixture, use_subscribe: bool ) -> None: """Test the grid frequency formula without any meter but only inverters.""" mockgrid = MockMicrogrid(grid_meter=False, mocker=mocker) @@ -166,12 +186,16 @@ async def test_grid_frequency_only_inverter( async with mockgrid: grid_freq = microgrid.frequency() - grid_freq_recv = grid_freq.new_receiver() - # We have to wait for the metric request to be sent - assert grid_freq._task is not None - await grid_freq._task - # And consumed - await asyncio.sleep(0) + if use_subscribe: + grid_freq_recv = await grid_freq.subscribe() + else: + # Deprecated + grid_freq_recv = grid_freq.new_receiver() + assert grid_freq._task is not None + # We have to wait for the metric request to be sent + await grid_freq._task + # And consumed + await asyncio.sleep(0) results = [] meter_data = []