Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"""

from ._anycast import Anycast
from ._broadcast import Broadcast
from ._broadcast import Broadcast, broadcast
from ._exceptions import ChannelClosedError, ChannelError, Error
from ._generic import (
ChannelMessageT,
Expand All @@ -92,6 +92,7 @@
)
from ._latest_value_cache import LatestValueCache
from ._merge import Merger, merge
from ._oneshot import oneshot
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
from ._select import (
Selected,
Expand All @@ -100,14 +101,23 @@
select,
selected_from,
)
from ._sender import Sender, SenderError
from ._sender import (
ClonableSender,
ClonableSubscribableSender,
Sender,
SenderClosedError,
SenderError,
SubscribableSender,
)

__all__ = [
"Anycast",
"Broadcast",
"ChannelClosedError",
"ChannelError",
"ChannelMessageT",
"ClonableSender",
"ClonableSubscribableSender",
"Error",
"ErroredChannelT_co",
"LatestValueCache",
Expand All @@ -120,11 +130,15 @@
"SelectError",
"Selected",
"Sender",
"SenderClosedError",
"SenderError",
"SenderMessageT_co",
"SenderMessageT_contra",
"SubscribableSender",
"UnhandledSelectedError",
"broadcast",
"merge",
"oneshot",
"select",
"selected_from",
]
19 changes: 18 additions & 1 deletion src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
self._channel: Anycast[_T] = channel
"""The channel that this sender belongs to."""

self._closed: bool = False
"""Whether the sender is closed."""

@override
async def send(self, message: _T, /) -> None:
"""Send a message across the channel.
Expand All @@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
if self._closed:
raise SenderClosedError(self)

# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
Expand All @@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify(1)
# pylint: enable=protected-access

@override
def close(self) -> None:
"""Close this sender.

After closing, the sender will not be able to send any more messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
self._closed = True

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
104 changes: 99 additions & 5 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,51 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)


class Broadcast(Generic[ChannelMessageT]):
def broadcast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about create_broadcast() or make_broadcast() (we could use new_broadcast() to match the current naming convention too, but I think we should use a verb instead). For me it reads weird that broadcast() returns a (sender, receiver). Trio actually uses open open_memory_channel, which for something that can be closed it doesn't sound like a bad idea: open_broadcast_channel()

Another good thing Trio does, is make senders and receivers async context managers, so they can be used with async with. Not for this PR, but we should do that too (#411).

I know I might sound like a broken record citing Trio, but it really looks like they put a lot of thought to the design, this is why I think it is a good idea to use it as inspiration. I recommend in particular their example on using channels, as it is very idiomatic for Python, while copying Rust interfaces might not map as well.

message_type: type[ChannelMessageT], # pylint: disable=unused-argument
/,
*,
name: str,
resend_latest: bool = False,
) -> tuple[ClonableSubscribableSender[ChannelMessageT], Receiver[ChannelMessageT]]:
"""Create a new Broadcast channel and return a sender and a receiver attached to it.

The channel will be automatically closed when all senders or all receivers
are closed.

Args:
message_type: The type of messages that will be sent through this channel. This
is only for type checking purposes, it is not used at runtime.
name: The name of the channel. This is for logging purposes, and it will be
shown in the string representation of the channel.
resend_latest: When True, every time a new receiver is created with
`new_receiver`, the last message seen by the channel will be sent to the
new receiver automatically. This allows new receivers on slow streams to
get the latest message as soon as they are created, without having to
wait for the next message on the channel to arrive. It is safe to be
set in data/reporting channels, but is not recommended for use in
channels that stream control instructions.

Returns:
A tuple of a sender and a receiver attached to the created channel.
"""
channel = Broadcast[ChannelMessageT](
name=name, resend_latest=resend_latest, auto_close=True
)
return channel.new_sender(), channel.new_receiver()


@deprecated(
"Please use the `broadcast` function to create a Broadcast channel instead."
)
Comment on lines +58 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would skip deprecation for now, let's give the new approach some time to sink in before spamming basically every downstream user. I would even consider adding the new function function to experimental, but I guess it still OK to put in the main namespace (if we agree on the name 😆), so I'm fine with that too.

class Broadcast( # pylint: disable=too-many-instance-attributes
Generic[ChannelMessageT]
):
"""A channel that deliver all messages to all receivers.

# Description
Expand Down Expand Up @@ -184,7 +223,13 @@ async def main() -> None:
```
"""

def __init__(self, *, name: str, resend_latest: bool = False) -> None:
def __init__(
self,
*,
name: str,
resend_latest: bool = False,
auto_close: bool = False,
) -> None:
"""Initialize this channel.

Args:
Expand All @@ -197,6 +242,8 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
wait for the next message on the channel to arrive. It is safe to be
set in data/reporting channels, but is not recommended for use in
channels that stream control instructions.
auto_close: If True, the channel will be closed when all senders or all
receivers are closed.
"""
self._name: str = name
"""The name of the broadcast channel.
Expand All @@ -207,6 +254,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
self._recv_cv: Condition = Condition()
"""The condition to wait for data in the channel's buffer."""

self._sender_count: int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I think we should expose this as a read-only property, it might be useful if someone need to make a more manual tracking of a channel's usage.

"""The number of senders attached to this channel."""

self._receivers: dict[
int, weakref.ReferenceType[_Receiver[ChannelMessageT]]
] = {}
Expand All @@ -218,6 +268,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None:
self._latest: ChannelMessageT | None = None
"""The latest message sent to the channel."""

self._auto_close: bool = auto_close
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, having a way to query if a channel is auto-close or not might be useful. Even more, I wonder if this one could even be read-write. Although probably is not very useful, even if possible, but at least I would expose it as a read-only property.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_auto_close sounds like a verb, flags should read as a question. I suggest something like _auto_close_enabled. It seems silly, but I bumped with this actual issue when suggesting to move the logic to auto close the channel to a separate method, which of course should be called _auto_close() 😆

"""Whether to close the channel when all senders or all receivers are closed."""

self.resend_latest: bool = resend_latest
"""Whether to resend the latest message to new receivers.

Expand Down Expand Up @@ -269,7 +322,7 @@ async def close(self) -> None: # noqa: D402
"""Close the channel, deprecated alias for `aclose()`.""" # noqa: D402
return await self.aclose()

def new_sender(self) -> Sender[ChannelMessageT]:
def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]:
"""Return a new sender attached to this channel."""
return _Sender(self)

Expand Down Expand Up @@ -317,7 +370,7 @@ def __repr__(self) -> str:
_T = TypeVar("_T")


class _Sender(Sender[_T]):
class _Sender(ClonableSubscribableSender[_T]):
"""A sender to send messages to the broadcast channel.

Should not be created directly, but through the
Expand All @@ -334,6 +387,11 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
self._channel: Broadcast[_T] = channel
"""The broadcast channel this sender belongs to."""

self._closed: bool = False
"""Whether this sender is closed."""

self._channel._sender_count += 1

@override
async def send(self, message: _T, /) -> None:
"""Send a message to all broadcast receivers.
Expand All @@ -345,12 +403,22 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
self._channel
)
if self._channel._auto_close and (
self._channel._sender_count == 0 or len(self._channel._receivers) == 0
):
await self._channel.aclose()
Comment on lines +413 to +416
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: This is used a couple of times, to prevent bugs it might be worth putting in its own method _auto_close() (we need to rename the attribute).

raise SenderError("The channel was closed", self) from ChannelClosedError(
Comment on lines +413 to +417
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _Sender.send(), the auto-close branch runs before the _closed check. If a user calls send() on a closed sender, this can raise a channel-closed SenderError (and even close the whole channel) instead of the documented SenderClosedError. Check self._closed first to preserve the expected error semantics and avoid closing the channel due to a misuse of a closed sender.

Copilot uses AI. Check for mistakes.
self._channel
)
if self._closed:
raise SenderClosedError(self)
self._channel._latest = message
stale_refs = []
for _hash, recv_ref in self._channel._receivers.items():
Expand All @@ -365,6 +433,27 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify_all()
# pylint: enable=protected-access

@override
def close(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
self._closed = True
self._channel._sender_count -= 1
Comment on lines +444 to +445
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the last sender closes, receivers that are currently blocked in ready() will not be woken up because close() doesn't notify _recv_cv or otherwise trigger channel closure. With auto_close=True, this can leave receivers waiting forever even though all senders are closed. Consider scheduling aclose() (or at least notifying _recv_cv) when _sender_count reaches 0.

Copilot uses AI. Check for mistakes.

@override
def clone(self) -> _Sender[_T]:
"""Return a clone of this sender."""
return _Sender(self._channel)

@override
def subscribe(self) -> Receiver[_T]:
"""Return a new receiver attached to this sender's channel."""
return self._channel.new_receiver()

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down Expand Up @@ -476,6 +565,11 @@ async def ready(self) -> bool:
while len(self._q) == 0:
if self._channel._closed or self._closed:
return False
if self._channel._auto_close and (
self._channel._sender_count == 0 or len(self._channel._receivers) == 0
):
await self._channel.aclose()
return False
async with self._channel._recv_cv:
await self._channel._recv_cv.wait()
return True
Expand Down
90 changes: 90 additions & 0 deletions src/frequenz/channels/_oneshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""A channel that can send a single message."""

import asyncio
import typing

from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderClosedError


def oneshot(
message_type: type[ChannelMessageT], # pylint: disable=unused-argument
) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]:
"""Create a one-shot channel.
Comment on lines +14 to +17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would name this open_oneshot_channel() for the same reasons as the broadcast channel.


A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.

Args:
message_type: The type of messages that can be sent through this channel.

Returns:
A tuple of a sender and a receiver for this channel.
"""
channel = _OneShot[ChannelMessageT]()
return _OneShotSender(channel), _OneShotReceiver(channel)


class _Empty:
pass


_EMPTY = _Empty()


class _OneShot(typing.Generic[ChannelMessageT]):
"""A one-shot channel.

A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.
"""

def __init__(self) -> None:
"""Create a new one-shot channel."""
self.message: ChannelMessageT | _Empty = _EMPTY
self.sent = False
self.drained = False
self.event = asyncio.Event()


class _OneShotSender(Sender[ChannelMessageT]):
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
self._channel = channel

async def send(self, message: ChannelMessageT, /) -> None:
if self._channel.sent:
raise SenderClosedError(self)
self._channel.message = message
self._channel.sent = True
self._channel.event.set()

def close(self) -> None:
self._channel.sent = True


class _OneShotReceiver(Receiver[ChannelMessageT]):
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
self._channel = channel

async def ready(self) -> bool:
if self._channel.drained:
return False
if not self._channel.sent:
await self._channel.event.wait()
return True

def consume(self) -> ChannelMessageT:
if self._channel.drained:
raise ReceiverStoppedError(self)
if isinstance(self._channel.message, _Empty):
raise ReceiverStoppedError(self)

self._channel.drained = True
self._channel.event.clear()
return self._channel.message
Loading
Loading