Automatically closing broadcast channels#494
Automatically closing broadcast channels#494shsms wants to merge 7 commits intofrequenz-floss:v1.x.xfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds an auto-closing mode for Broadcast channels via a new broadcast() factory, aiming to close the underlying channel automatically once all senders or all receivers are closed.
Changes:
- Introduce
broadcast()factory that creates a Broadcast withauto_close=True. - Add a
close()method to theSenderinterface and implement “closed sender” behavior (incl.SenderClosedError) in Anycast/Broadcast senders. - Add tests asserting Broadcast auto-close behavior for “all receivers closed” and “all senders closed”.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_broadcast.py | Adds coverage for auto-close semantics using the new broadcast() factory. |
| src/frequenz/channels/experimental/_relay_sender.py | Implements close() by forwarding close to underlying senders. |
| src/frequenz/channels/_sender.py | Extends sender API with close(), adds SenderClosedError, and introduces clone/subscribe sender ABCs. |
| src/frequenz/channels/_broadcast.py | Adds broadcast() factory and implements sender/receiver-driven auto-close logic in Broadcast. |
| src/frequenz/channels/_anycast.py | Adds sender close-state tracking and close() implementation; raises SenderClosedError on use-after-close. |
| src/frequenz/channels/init.py | Exposes broadcast() and new sender-related types/errors from the public package API. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if self._channel._auto_close and ( | ||
| self._channel._sender_count == 0 or len(self._channel._receivers) == 0 | ||
| ): | ||
| await self._channel.aclose() | ||
| raise SenderError("The channel was closed", self) from ChannelClosedError( |
There was a problem hiding this comment.
In _Sender.send(), the auto-close branch runs before the _closed check. If a user calls send() on a closed sender, this can raise a channel-closed SenderError (and even close the whole channel) instead of the documented SenderClosedError. Check self._closed first to preserve the expected error semantics and avoid closing the channel due to a misuse of a closed sender.
|
|
||
|
|
||
| class SenderClosedError(SenderError[SenderMessageT_co]): | ||
| """An error indicating that a send operation was attempted a closed sender.""" |
There was a problem hiding this comment.
Docstring grammar: “attempted a closed sender” should be “attempted on a closed sender” (or similar).
| """An error indicating that a send operation was attempted a closed sender.""" | |
| """An error indicating that a send operation was attempted on a closed sender.""" |
| A new sender that sends messages to the same channel as this sender. | ||
| """ | ||
|
|
||
|
|
There was a problem hiding this comment.
SubscribableSender.subscribe() returns a Receiver, but the docstring says it returns “A new sender…”. Update the return docs to say it returns a new receiver attached to the same channel/stream.
| A new sender that sends messages to the same channel as this sender. | |
| """ | |
| A new receiver that receives messages from the same channel/stream | |
| as this sender. | |
| """ |
| class ClonableSender(Sender[SenderMessageT_contra], ABC): | ||
| """A [Sender][frequenz.channels.Sender] that can be cloned.""" |
There was a problem hiding this comment.
Public API naming: ClonableSender / ClonableSubscribableSender appear to be misspelled (standard spelling is “Cloneable”). Since these are exported in __init__.py, consider renaming now to avoid locking in a typo in the public API.
| self._closed = True | ||
| self._channel._sender_count -= 1 |
There was a problem hiding this comment.
When the last sender closes, receivers that are currently blocked in ready() will not be woken up because close() doesn't notify _recv_cv or otherwise trigger channel closure. With auto_close=True, this can leave receivers waiting forever even though all senders are closed. Consider scheduling aclose() (or at least notifying _recv_cv) when _sender_count reaches 0.
7f347bd to
d98c90a
Compare
llucax
left a comment
There was a problem hiding this comment.
LGTM in general, but there are a few design issues we need to resolve/agree on. I would say the most important points are:
close()vsaclose()(I would really go withaclose()unless we have a very good reason not to).- Naming.
The PR is also missing documentation updates. The docs still shows Broadcast() as the only way to create a broadcast channel.
| """A [Sender][frequenz.channels.Sender] that can be subscribed to.""" | ||
|
|
||
| @abstractmethod | ||
| def subscribe(self) -> Receiver[SenderMessageT_contra]: |
There was a problem hiding this comment.
Did you considered using new_receiver() instead, as it is the name we use everywhere else?
| super().__init__("Sender is closed", sender) | ||
|
|
||
|
|
||
| class SubscribableSender(Sender[SenderMessageT_contra], ABC): |
There was a problem hiding this comment.
Nitpick: You don't need to pass ABC here, Sender is already an ABC, so as long as you don't implement its abstract methods, it will still be an abstract class. Not sure if you prefer to mark it explicitly in case something changes in the future.
(same with the other new abstract classes)
| @abstractmethod | ||
| def close(self) -> None: | ||
| """Close this sender. | ||
|
|
||
| After a sender is closed, it can no longer be used to send messages. Any | ||
| attempt to send a message through a closed sender will raise a | ||
| [SenderError][frequenz.channels.SenderError]. | ||
| """ |
There was a problem hiding this comment.
When we talk about closing in an async context, we need to start thinking about asynchronicity in closing too. If we keep close sync (so no aclose()), we should probably add a way to wait for completion (for example, flushing a sending queue), but we can add that in the future, when/if needed.
I'm really not sure what is best here. My feeling is splitting close initiation (sync) from close completion (async) might be more error prone, because people might miss the awaiting for completion, which could potentially end up in wrong finalization or thinking something finished immediately when it actually kept working in the background somehow.
Using aclose() makes closing more atomic, and more clear when there is actually a need to wait for something to finish before the close is really complete. But it is less flexible, as one can only close something in an async context, which sometimes can complicate things. But my feeling is this happens because asyncio kind of encourages this kind of pattern, they usually split closing from awaiting (like task cancel/await). When using a sender/receiver you should be in an async context anyway.
BTW, Trio has an abstract base class AsyncResource with aclose() and everything that is async uses aclose(), so if we are in doubt, I would go for aclose(), as it seems like the safest approach.
| self._recv_cv: Condition = Condition() | ||
| """The condition to wait for data in the channel's buffer.""" | ||
|
|
||
| self._sender_count: int = 0 |
There was a problem hiding this comment.
Nitpick: I think we should expose this as a read-only property, it might be useful if someone need to make a more manual tracking of a channel's usage.
| self._latest: ChannelMessageT | None = None | ||
| """The latest message sent to the channel.""" | ||
|
|
||
| self._auto_close: bool = auto_close |
There was a problem hiding this comment.
Same, having a way to query if a channel is auto-close or not might be useful. Even more, I wonder if this one could even be read-write. Although probably is not very useful, even if possible, but at least I would expose it as a read-only property.
| assert isinstance(excinfo.value.__cause__, ChannelClosedError) | ||
|
|
||
|
|
||
| async def test_broadcast_auto_close_2() -> None: |
There was a problem hiding this comment.
Nitpick:
| async def test_broadcast_auto_close_2() -> None: | |
| async def test_broadcast_auto_close_all_senders_closed() -> None: |
| def oneshot( | ||
| message_type: type[ChannelMessageT], # pylint: disable=unused-argument | ||
| ) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]: | ||
| """Create a one-shot channel. |
There was a problem hiding this comment.
I would name this open_oneshot_channel() for the same reasons as the broadcast channel.
src/frequenz/channels/_one_shot.py
Outdated
|
|
||
| async def ready(self) -> bool: | ||
| while not self._channel.sent: | ||
| await self._channel.condition.wait() |
There was a problem hiding this comment.
This is missing the release():
| await self._channel.condition.wait() | |
| async self._channel.condition: | |
| with await self._channel.condition.wait() |
src/frequenz/channels/_one_shot.py
Outdated
| raise SenderClosedError(self) | ||
| self._channel.message = message | ||
| self._channel.sent = True | ||
| if self._channel.condition.locked(): |
There was a problem hiding this comment.
Do you really need to check for locked()? Can't you always notify()?
src/frequenz/channels/_one_shot.py
Outdated
| self.message: ChannelMessageT | _Empty = _EMPTY | ||
| self.sent = False | ||
| self.drained = False | ||
| self.condition = Condition() |
There was a problem hiding this comment.
It looks to me that this could be implemented more simply using an Event. Just wait for the event on the receiver and set the event from the sender. Do you really need a Condition here?
| @override | ||
| def clone(self) -> _Sender[_T]: | ||
| """Return a clone of this sender.""" | ||
| return _Sender(self._channel) |
There was a problem hiding this comment.
If sender is closed and user clones it, then it will create new open (self._closed = False) sender. Is it ok?
It would mean, that you can create open sender to closed channel.
| @override | ||
| def close(self) -> None: | ||
| """Close this sender. | ||
|
|
||
| After a sender is closed, it can no longer be used to send messages. Any | ||
| attempt to send a message through a closed sender will raise a | ||
| [SenderError][frequenz.channels.SenderError]. | ||
| """ | ||
| self._closed = True | ||
| self._channel._sender_count -= 1 |
There was a problem hiding this comment.
What if user doesn't close sender before removing it?
Would it work if we double check if sender is closed in __del__ method and print warning if not?
d98c90a to
8adb21c
Compare
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The `broadcast` function would only return a sender and a receiver from an auto-closing channel. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
8adb21c to
8f28eaa
Compare
Channels created with the
broadcastfunction will close automatically when either all senders or all receivers to a channel are closed.