Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
## Bug Fixes

- Improved formula validation: Consistent error messages for invalid formulas and conventional span semantics.
- Removed ChannelRegistry. Streams are not being set up using one-shot channels and owned by the data source.
25 changes: 14 additions & 11 deletions benchmarks/timeseries/benchmark_datasourcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
from time import perf_counter
from typing import Any

from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError
from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, make_oneshot
from frequenz.client.microgrid.metrics import Metric
from frequenz.quantities import Quantity

from frequenz.sdk import microgrid
from frequenz.sdk._internal._channels import ChannelRegistry
from frequenz.sdk.microgrid._data_sourcing import (
ComponentMetricRequest,
DataSourcingActor,
)
from frequenz.sdk.timeseries import Sample

try:
from tests.timeseries.mock_microgrid import MockMicrogrid
Expand Down Expand Up @@ -80,7 +81,6 @@ async def benchmark_data_sourcing( # pylint: disable=too-many-locals
name="DataSourcingActor Request Channel"
)

channel_registry = ChannelRegistry(name="Microgrid Channel Registry")
request_receiver = request_channel.new_receiver(
name="datasourcing-benchmark",
limit=(num_ev_chargers * len(COMPONENT_METRIC_IDS)),
Expand All @@ -105,18 +105,21 @@ async def consume(channel: Receiver[Any]) -> None:

for evc_id in mock_grid.evc_ids:
for component_metric_id in COMPONENT_METRIC_IDS:
telem_stream_sender, telem_stream_receiver = make_oneshot(
Receiver[Sample[Quantity]] # type: ignore[type-abstract]
)
request = ComponentMetricRequest(
"current_phase_requests", evc_id, component_metric_id, None
namespace="current_phase_requests",
component_id=evc_id,
metric=component_metric_id,
start_time=None,
telem_stream_sender=telem_stream_sender,
)

recv_channel = channel_registry.get_or_create(
ComponentMetricRequest, request.get_channel_name()
).new_receiver()

await request_sender.send(request)
consume_tasks.append(asyncio.create_task(consume(recv_channel)))
stream_receiver = await telem_stream_receiver.receive()
consume_tasks.append(asyncio.create_task(consume(stream_receiver)))

async with DataSourcingActor(request_receiver, channel_registry):
async with DataSourcingActor(request_receiver):
await asyncio.gather(*consume_tasks)

time_taken = perf_counter() - start_time
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
"frequenz-client-microgrid >= 0.18.1, < 0.19.0",
"frequenz-microgrid-component-graph >= 0.3.4, < 0.4",
"frequenz-client-common >= 0.3.6, < 0.4.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-channels @ git+https://github.com/shsms/frequenz-channels-python.git@oneshot",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't merge before this is merged 👀

"frequenz-quantities[marshmallow] >= 1.0.0, < 2.0.0",
"numpy >= 2.1.0, < 3",
"typing_extensions >= 4.14.1, < 5",
Expand Down
123 changes: 0 additions & 123 deletions src/frequenz/sdk/_internal/_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import abc
import dataclasses
import logging
import traceback
import typing

from frequenz.channels import Broadcast, Receiver
Expand Down Expand Up @@ -63,128 +62,6 @@ def new_receiver(self, *, limit: int = 50) -> Receiver[U_co]:
return self._mapping_function(self._fetcher.new_receiver(limit=limit))


class ChannelRegistry:
"""Dynamically creates, own and provide access to broadcast channels.

It can be used by actors to dynamically establish a communication channel
between each other.

The registry is responsible for creating channels when they are first requested via
the [`get_or_create()`][frequenz.sdk.actor.ChannelRegistry.get_or_create] method.

The registry also stores type information to make sure that the same channel is not
used for different message types.

Since the registry owns the channels, it is also responsible for closing them when
they are no longer needed. There is no way to remove a channel without closing it.

Note:
This registry stores [`Broadcast`][frequenz.channels.Broadcast] channels.
"""

def __init__(self, *, name: str) -> None:
"""Initialize this registry.

Args:
name: A name to identify the registry in the logs. This name is also used as
a prefix for the channel names.
"""
self._name = name
self._channels: dict[str, _Entry] = {}

@property
def name(self) -> str:
"""The name of this registry."""
return self._name

def message_type(self, key: str) -> type:
"""Get the message type of the channel for the given key.

Args:
key: The key to identify the channel.

Returns:
The message type of the channel.

Raises:
KeyError: If the channel does not exist.
"""
entry = self._channels.get(key)
if entry is None:
raise KeyError(f"No channel for key {key!r} exists.")
return entry.message_type

def __contains__(self, key: str) -> bool:
"""Check whether the channel for the given `key` exists."""
return key in self._channels

def get_or_create(self, message_type: type[T], key: str) -> Broadcast[T]:
"""Get or create a channel for the given key.

If a channel for the given key already exists, the message type of the existing
channel is checked against the requested message type. If they do not match,
a `ValueError` is raised.

Note:
The types have to match exactly, it doesn't do a subtype check due to
technical limitations. In the future subtype checks might be supported.

Args:
message_type: The type of the message that is sent through the channel.
key: The key to identify the channel.

Returns:
The channel for the given key.

Raises:
ValueError: If the channel exists and the message type does not match.
"""
if key not in self._channels:
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"Creating a new channel for key %r with type %s at:\n%s",
key,
message_type,
"".join(traceback.format_stack(limit=10)[:9]),
)
self._channels[key] = _Entry(
message_type, Broadcast(name=f"{self._name}-{key}")
)

entry = self._channels[key]
if entry.message_type is not message_type:
error_message = (
f"Type mismatch, a channel for key {key!r} exists and the requested "
f"message type {message_type} is not the same as the existing "
f"message type {entry.message_type}."
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
"%s at:\n%s",
error_message,
# We skip the last frame because it's this method, and limit the
# stack to 9 frames to avoid adding too much noise.
"".join(traceback.format_stack(limit=10)[:9]),
)
raise ValueError(error_message)

return typing.cast(Broadcast[T], entry.channel)

async def close_and_remove(self, key: str) -> None:
"""Remove the channel for the given key.

Args:
key: The key to identify the channel.

Raises:
KeyError: If the channel does not exist.
"""
entry = self._channels.pop(key, None)
if entry is None:
raise KeyError(f"No channel for key {key!r} exists.")
await entry.channel.close()


@dataclasses.dataclass(frozen=True)
class _Entry:
"""An entry in a channel registry."""
Expand Down
23 changes: 1 addition & 22 deletions src/frequenz/sdk/microgrid/_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid.component import Battery, EvCharger, SolarInverter

from .._internal._channels import ChannelRegistry
from ..actor._actor import Actor
from ..timeseries import ResamplerConfig
from ..timeseries._voltage_streamer import VoltageStreamer
Expand Down Expand Up @@ -96,29 +95,22 @@ def __init__(
"""
self._resampler_config: ResamplerConfig = resampler_config

self._channel_registry: ChannelRegistry = ChannelRegistry(
name="Data Pipeline Registry"
)

self._data_sourcing_actor: _ActorInfo | None = None
self._resampling_actor: _ActorInfo | None = None

self._battery_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=battery_power_manager_algorithm,
default_power=DefaultPower.ZERO,
component_class=Battery,
)
self._ev_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA,
default_power=DefaultPower.MAX,
component_class=EvCharger,
)
self._pv_power_wrapper = PowerWrapper(
self._channel_registry,
api_power_request_timeout=api_power_request_timeout,
power_manager_algorithm=PowerManagerAlgorithm.MATRYOSHKA,
default_power=DefaultPower.MIN,
Expand Down Expand Up @@ -158,18 +150,14 @@ def frequency(self) -> GridFrequency:
if self._frequency_instance is None:
self._frequency_instance = GridFrequency(
self._data_sourcing_request_sender(),
self._channel_registry,
)

return self._frequency_instance

def voltage_per_phase(self) -> VoltageStreamer:
"""Return the per-phase voltage measuring point."""
if not self._voltage_instance:
self._voltage_instance = VoltageStreamer(
self._resampling_request_sender(),
self._channel_registry,
)
self._voltage_instance = VoltageStreamer(self._resampling_request_sender())

return self._voltage_instance

Expand All @@ -179,7 +167,6 @@ def logical_meter(self) -> LogicalMeter:

if self._logical_meter is None:
self._logical_meter = LogicalMeter(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
return self._logical_meter
Expand All @@ -190,7 +177,6 @@ def consumer(self) -> Consumer:

if self._consumer is None:
self._consumer = Consumer(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
return self._consumer
Expand All @@ -201,7 +187,6 @@ def producer(self) -> Producer:

if self._producer is None:
self._producer = Producer(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
return self._producer
Expand All @@ -213,7 +198,6 @@ def grid(self) -> Grid:

if self._grid is None:
initialize_grid(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
)
self._grid = get_grid()
Expand Down Expand Up @@ -273,7 +257,6 @@ def new_ev_charger_pool(
if ref_store_key not in self._ev_charger_pool_reference_stores:
self._ev_charger_pool_reference_stores[ref_store_key] = (
EVChargerPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
status_receiver=self._ev_power_wrapper.status_channel.new_receiver(
limit=1
Expand Down Expand Up @@ -346,7 +329,6 @@ def new_pv_pool(

if ref_store_key not in self._pv_pool_reference_stores:
self._pv_pool_reference_stores[ref_store_key] = PVPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
status_receiver=(
self._pv_power_wrapper.status_channel.new_receiver(limit=1)
Expand Down Expand Up @@ -422,7 +404,6 @@ def new_battery_pool(
if ref_store_key not in self._battery_pool_reference_stores:
self._battery_pool_reference_stores[ref_store_key] = (
BatteryPoolReferenceStore(
channel_registry=self._channel_registry,
resampler_subscription_sender=self._resampling_request_sender(),
batteries_status_receiver=(
self._battery_power_wrapper.status_channel.new_receiver(limit=1)
Expand Down Expand Up @@ -461,7 +442,6 @@ def _data_sourcing_request_sender(self) -> Sender[ComponentMetricRequest]:
)
actor = DataSourcingActor(
request_receiver=channel.new_receiver(limit=_REQUEST_RECV_BUFFER_SIZE),
registry=self._channel_registry,
)
self._data_sourcing_actor = _ActorInfo(actor, channel)
self._data_sourcing_actor.actor.start()
Expand All @@ -482,7 +462,6 @@ def _resampling_request_sender(self) -> Sender[ComponentMetricRequest]:
name="Data Pipeline: Component Metric Resampling Actor Request Channel"
)
actor = ComponentMetricsResamplingActor(
channel_registry=self._channel_registry,
data_sourcing_request_sender=self._data_sourcing_request_sender(),
resampling_request_receiver=channel.new_receiver(
limit=_REQUEST_RECV_BUFFER_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
from dataclasses import dataclass
from datetime import datetime

from frequenz.channels import Receiver, Sender
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid.metrics import Metric
from frequenz.quantities import Quantity

from frequenz.sdk.microgrid._old_component_data import TransitionalMetric

__all__ = ["ComponentMetricRequest", "Metric"]

from frequenz.sdk.timeseries import Sample


@dataclass
class ComponentMetricRequest:
Expand Down Expand Up @@ -51,6 +55,9 @@ class ComponentMetricRequest:
If None, only live data is streamed.
"""

telem_stream_sender: Sender[Receiver[Sample[Quantity]]]
"""Sender of a oneshot channel used to send the data stream back to the requester."""

def get_channel_name(self) -> str:
"""Construct the channel name based on the request parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from frequenz.channels import Receiver

from ..._internal._channels import ChannelRegistry
from ...actor import Actor
from ._component_metric_request import ComponentMetricRequest
from .microgrid_api_source import MicrogridApiSource
Expand All @@ -17,22 +16,19 @@ class DataSourcingActor(Actor):
def __init__(
self,
request_receiver: Receiver[ComponentMetricRequest],
registry: ChannelRegistry,
*,
name: str | None = None,
) -> None:
"""Create a `DataSourcingActor` instance.
Args:
request_receiver: A channel receiver to accept metric requests from.
registry: A channel registry. To be replaced by a singleton
instance.
name: The name of the actor. If `None`, `str(id(self))` will be used. This
is used mostly for debugging purposes.
"""
super().__init__(name=name)
self._request_receiver = request_receiver
self._microgrid_api_source = MicrogridApiSource(registry)
self._microgrid_api_source = MicrogridApiSource()

async def _run(self) -> None:
"""Run the actor."""
Expand Down
Loading