-
Notifications
You must be signed in to change notification settings - Fork 10
Automatically closing broadcast channels #494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v1.x.x
Are you sure you want to change the base?
Changes from all commits
78c43c1
f4b78be
486abce
9082d38
23bab61
b190dce
8f28eaa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,12 +16,51 @@ | |
| from ._exceptions import ChannelClosedError | ||
| from ._generic import ChannelMessageT | ||
| from ._receiver import Receiver, ReceiverStoppedError | ||
| from ._sender import Sender, SenderError | ||
| from ._sender import ClonableSubscribableSender, SenderClosedError, SenderError | ||
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Broadcast(Generic[ChannelMessageT]): | ||
| def broadcast( | ||
| message_type: type[ChannelMessageT], # pylint: disable=unused-argument | ||
| /, | ||
| *, | ||
| name: str, | ||
| resend_latest: bool = False, | ||
| ) -> tuple[ClonableSubscribableSender[ChannelMessageT], Receiver[ChannelMessageT]]: | ||
| """Create a new Broadcast channel and return a sender and a receiver attached to it. | ||
|
|
||
| The channel will be automatically closed when all senders or all receivers | ||
| are closed. | ||
|
|
||
| Args: | ||
| message_type: The type of messages that will be sent through this channel. This | ||
| is only for type checking purposes, it is not used at runtime. | ||
| name: The name of the channel. This is for logging purposes, and it will be | ||
| shown in the string representation of the channel. | ||
| resend_latest: When True, every time a new receiver is created with | ||
| `new_receiver`, the last message seen by the channel will be sent to the | ||
| new receiver automatically. This allows new receivers on slow streams to | ||
| get the latest message as soon as they are created, without having to | ||
| wait for the next message on the channel to arrive. It is safe to be | ||
| set in data/reporting channels, but is not recommended for use in | ||
| channels that stream control instructions. | ||
|
|
||
| Returns: | ||
| A tuple of a sender and a receiver attached to the created channel. | ||
| """ | ||
| channel = Broadcast[ChannelMessageT]( | ||
| name=name, resend_latest=resend_latest, auto_close=True | ||
| ) | ||
| return channel.new_sender(), channel.new_receiver() | ||
|
|
||
|
|
||
| @deprecated( | ||
| "Please use the `broadcast` function to create a Broadcast channel instead." | ||
| ) | ||
|
Comment on lines
+58
to
+60
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would skip deprecation for now, let's give the new approach some time to sink in before spamming basically every downstream user. I would even consider adding the new function function to |
||
| class Broadcast( # pylint: disable=too-many-instance-attributes | ||
| Generic[ChannelMessageT] | ||
| ): | ||
| """A channel that deliver all messages to all receivers. | ||
|
|
||
| # Description | ||
|
|
@@ -184,7 +223,13 @@ async def main() -> None: | |
| ``` | ||
| """ | ||
|
|
||
| def __init__(self, *, name: str, resend_latest: bool = False) -> None: | ||
| def __init__( | ||
| self, | ||
| *, | ||
| name: str, | ||
| resend_latest: bool = False, | ||
| auto_close: bool = False, | ||
| ) -> None: | ||
| """Initialize this channel. | ||
|
|
||
| Args: | ||
|
|
@@ -197,6 +242,8 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: | |
| wait for the next message on the channel to arrive. It is safe to be | ||
| set in data/reporting channels, but is not recommended for use in | ||
| channels that stream control instructions. | ||
| auto_close: If True, the channel will be closed when all senders or all | ||
| receivers are closed. | ||
| """ | ||
| self._name: str = name | ||
| """The name of the broadcast channel. | ||
|
|
@@ -207,6 +254,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: | |
| self._recv_cv: Condition = Condition() | ||
| """The condition to wait for data in the channel's buffer.""" | ||
|
|
||
| self._sender_count: int = 0 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: I think we should expose this as a read-only property, it might be useful if someone need to make a more manual tracking of a channel's usage. |
||
| """The number of senders attached to this channel.""" | ||
|
|
||
| self._receivers: dict[ | ||
| int, weakref.ReferenceType[_Receiver[ChannelMessageT]] | ||
| ] = {} | ||
|
|
@@ -218,6 +268,9 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: | |
| self._latest: ChannelMessageT | None = None | ||
| """The latest message sent to the channel.""" | ||
|
|
||
| self._auto_close: bool = auto_close | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same, having a way to query if a channel is auto-close or not might be useful. Even more, I wonder if this one could even be read-write. Although probably is not very useful, even if possible, but at least I would expose it as a read-only property.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| """Whether to close the channel when all senders or all receivers are closed.""" | ||
|
|
||
| self.resend_latest: bool = resend_latest | ||
| """Whether to resend the latest message to new receivers. | ||
|
|
||
|
|
@@ -269,7 +322,7 @@ async def close(self) -> None: # noqa: D402 | |
| """Close the channel, deprecated alias for `aclose()`.""" # noqa: D402 | ||
| return await self.aclose() | ||
|
|
||
| def new_sender(self) -> Sender[ChannelMessageT]: | ||
| def new_sender(self) -> ClonableSubscribableSender[ChannelMessageT]: | ||
| """Return a new sender attached to this channel.""" | ||
| return _Sender(self) | ||
|
|
||
|
|
@@ -317,7 +370,7 @@ def __repr__(self) -> str: | |
| _T = TypeVar("_T") | ||
|
|
||
|
|
||
| class _Sender(Sender[_T]): | ||
| class _Sender(ClonableSubscribableSender[_T]): | ||
| """A sender to send messages to the broadcast channel. | ||
|
|
||
| Should not be created directly, but through the | ||
|
|
@@ -334,6 +387,11 @@ def __init__(self, channel: Broadcast[_T], /) -> None: | |
| self._channel: Broadcast[_T] = channel | ||
| """The broadcast channel this sender belongs to.""" | ||
|
|
||
| self._closed: bool = False | ||
| """Whether this sender is closed.""" | ||
|
|
||
| self._channel._sender_count += 1 | ||
|
|
||
| @override | ||
| async def send(self, message: _T, /) -> None: | ||
| """Send a message to all broadcast receivers. | ||
|
|
@@ -345,12 +403,22 @@ async def send(self, message: _T, /) -> None: | |
| SenderError: If the underlying channel was closed. | ||
| A [ChannelClosedError][frequenz.channels.ChannelClosedError] is | ||
| set as the cause. | ||
| SenderClosedError: If this sender was closed. | ||
| """ | ||
| # pylint: disable=protected-access | ||
| if self._channel._closed: | ||
| raise SenderError("The channel was closed", self) from ChannelClosedError( | ||
| self._channel | ||
| ) | ||
| if self._channel._auto_close and ( | ||
| self._channel._sender_count == 0 or len(self._channel._receivers) == 0 | ||
| ): | ||
| await self._channel.aclose() | ||
|
Comment on lines
+413
to
+416
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: This is used a couple of times, to prevent bugs it might be worth putting in its own method |
||
| raise SenderError("The channel was closed", self) from ChannelClosedError( | ||
|
Comment on lines
+413
to
+417
|
||
| self._channel | ||
| ) | ||
| if self._closed: | ||
| raise SenderClosedError(self) | ||
| self._channel._latest = message | ||
| stale_refs = [] | ||
| for _hash, recv_ref in self._channel._receivers.items(): | ||
|
|
@@ -365,6 +433,27 @@ async def send(self, message: _T, /) -> None: | |
| self._channel._recv_cv.notify_all() | ||
| # pylint: enable=protected-access | ||
|
|
||
| @override | ||
| def close(self) -> None: | ||
| """Close this sender. | ||
|
|
||
| After a sender is closed, it can no longer be used to send messages. Any | ||
| attempt to send a message through a closed sender will raise a | ||
| [SenderError][frequenz.channels.SenderError]. | ||
| """ | ||
shsms marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._closed = True | ||
| self._channel._sender_count -= 1 | ||
|
Comment on lines
+444
to
+445
|
||
|
|
||
| @override | ||
| def clone(self) -> _Sender[_T]: | ||
| """Return a clone of this sender.""" | ||
| return _Sender(self._channel) | ||
|
|
||
| @override | ||
| def subscribe(self) -> Receiver[_T]: | ||
| """Return a new receiver attached to this sender's channel.""" | ||
| return self._channel.new_receiver() | ||
|
|
||
| def __str__(self) -> str: | ||
| """Return a string representation of this sender.""" | ||
| return f"{self._channel}:{type(self).__name__}" | ||
|
|
@@ -476,6 +565,11 @@ async def ready(self) -> bool: | |
| while len(self._q) == 0: | ||
| if self._channel._closed or self._closed: | ||
| return False | ||
| if self._channel._auto_close and ( | ||
| self._channel._sender_count == 0 or len(self._channel._receivers) == 0 | ||
| ): | ||
| await self._channel.aclose() | ||
| return False | ||
| async with self._channel._recv_cv: | ||
| await self._channel._recv_cv.wait() | ||
| return True | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| # License: MIT | ||
| # Copyright © 2026 Frequenz Energy-as-a-Service GmbH | ||
|
|
||
| """A channel that can send a single message.""" | ||
|
|
||
| import asyncio | ||
| import typing | ||
|
|
||
| from ._generic import ChannelMessageT | ||
| from ._receiver import Receiver, ReceiverStoppedError | ||
| from ._sender import Sender, SenderClosedError | ||
|
|
||
|
|
||
| def oneshot( | ||
| message_type: type[ChannelMessageT], # pylint: disable=unused-argument | ||
| ) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]: | ||
| """Create a one-shot channel. | ||
|
Comment on lines
+14
to
+17
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would name this |
||
|
|
||
| A one-shot channel is a channel that can only send one message. After the first | ||
| message is sent, the sender is closed and any further attempts to send a message | ||
| will raise a `SenderClosedError`. | ||
|
|
||
| Args: | ||
| message_type: The type of messages that can be sent through this channel. | ||
|
|
||
| Returns: | ||
| A tuple of a sender and a receiver for this channel. | ||
| """ | ||
| channel = _OneShot[ChannelMessageT]() | ||
| return _OneShotSender(channel), _OneShotReceiver(channel) | ||
|
|
||
|
|
||
| class _Empty: | ||
| pass | ||
|
|
||
|
|
||
| _EMPTY = _Empty() | ||
|
|
||
|
|
||
| class _OneShot(typing.Generic[ChannelMessageT]): | ||
| """A one-shot channel. | ||
|
|
||
| A one-shot channel is a channel that can only send one message. After the first | ||
| message is sent, the sender is closed and any further attempts to send a message | ||
| will raise a `SenderClosedError`. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| """Create a new one-shot channel.""" | ||
| self.message: ChannelMessageT | _Empty = _EMPTY | ||
| self.sent = False | ||
| self.drained = False | ||
| self.event = asyncio.Event() | ||
|
|
||
|
|
||
| class _OneShotSender(Sender[ChannelMessageT]): | ||
| def __init__(self, channel: _OneShot[ChannelMessageT]) -> None: | ||
| self._channel = channel | ||
|
|
||
| async def send(self, message: ChannelMessageT, /) -> None: | ||
| if self._channel.sent: | ||
| raise SenderClosedError(self) | ||
| self._channel.message = message | ||
| self._channel.sent = True | ||
| self._channel.event.set() | ||
|
|
||
| def close(self) -> None: | ||
| self._channel.sent = True | ||
|
|
||
|
|
||
| class _OneShotReceiver(Receiver[ChannelMessageT]): | ||
| def __init__(self, channel: _OneShot[ChannelMessageT]) -> None: | ||
| self._channel = channel | ||
|
|
||
| async def ready(self) -> bool: | ||
| if self._channel.drained: | ||
| return False | ||
| if not self._channel.sent: | ||
| await self._channel.event.wait() | ||
| return True | ||
|
|
||
| def consume(self) -> ChannelMessageT: | ||
| if self._channel.drained: | ||
| raise ReceiverStoppedError(self) | ||
| if isinstance(self._channel.message, _Empty): | ||
| raise ReceiverStoppedError(self) | ||
|
|
||
| self._channel.drained = True | ||
| self._channel.event.clear() | ||
| return self._channel.message | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about
create_broadcast()ormake_broadcast()(we could usenew_broadcast()to match the current naming convention too, but I think we should use a verb instead). For me it reads weird thatbroadcast()returns a (sender, receiver). Trio actually uses openopen_memory_channel, which for something that can be closed it doesn't sound like a bad idea:open_broadcast_channel()Another good thing Trio does, is make senders and receivers async context managers, so they can be used with
async with. Not for this PR, but we should do that too (#411).I know I might sound like a broken record citing Trio, but it really looks like they put a lot of thought to the design, this is why I think it is a good idea to use it as inspiration. I recommend in particular their example on using channels, as it is very idiomatic for Python, while copying Rust interfaces might not map as well.