Skip to content

Automatically closing broadcast channels#494

Open
shsms wants to merge 7 commits intofrequenz-floss:v1.x.xfrom
shsms:auto-close-channels
Open

Automatically closing broadcast channels#494
shsms wants to merge 7 commits intofrequenz-floss:v1.x.xfrom
shsms:auto-close-channels

Conversation

@shsms
Copy link
Contributor

@shsms shsms commented Feb 9, 2026

Channels created with the broadcast function will close automatically when either all senders or all receivers to a channel are closed.

@shsms shsms requested a review from a team as a code owner February 9, 2026 16:00
@shsms shsms requested review from Marenz and Copilot and removed request for a team February 9, 2026 16:00
@github-actions github-actions bot added part:tests Affects the unit, integration and performance (benchmarks) tests part:channels Affects channels implementation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:experimental Affects the experimental package labels Feb 9, 2026
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an auto-closing mode for Broadcast channels via a new broadcast() factory, aiming to close the underlying channel automatically once all senders or all receivers are closed.

Changes:

  • Introduce broadcast() factory that creates a Broadcast with auto_close=True.
  • Add a close() method to the Sender interface and implement “closed sender” behavior (incl. SenderClosedError) in Anycast/Broadcast senders.
  • Add tests asserting Broadcast auto-close behavior for “all receivers closed” and “all senders closed”.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
tests/test_broadcast.py Adds coverage for auto-close semantics using the new broadcast() factory.
src/frequenz/channels/experimental/_relay_sender.py Implements close() by forwarding close to underlying senders.
src/frequenz/channels/_sender.py Extends sender API with close(), adds SenderClosedError, and introduces clone/subscribe sender ABCs.
src/frequenz/channels/_broadcast.py Adds broadcast() factory and implements sender/receiver-driven auto-close logic in Broadcast.
src/frequenz/channels/_anycast.py Adds sender close-state tracking and close() implementation; raises SenderClosedError on use-after-close.
src/frequenz/channels/init.py Exposes broadcast() and new sender-related types/errors from the public package API.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +410 to +414
if self._channel._auto_close and (
self._channel._sender_count == 0 or len(self._channel._receivers) == 0
):
await self._channel.aclose()
raise SenderError("The channel was closed", self) from ChannelClosedError(
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _Sender.send(), the auto-close branch runs before the _closed check. If a user calls send() on a closed sender, this can raise a channel-closed SenderError (and even close the whole channel) instead of the documented SenderClosedError. Check self._closed first to preserve the expected error semantics and avoid closing the channel due to a misuse of a closed sender.

Copilot uses AI. Check for mistakes.


class SenderClosedError(SenderError[SenderMessageT_co]):
"""An error indicating that a send operation was attempted a closed sender."""
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring grammar: “attempted a closed sender” should be “attempted on a closed sender” (or similar).

Suggested change
"""An error indicating that a send operation was attempted a closed sender."""
"""An error indicating that a send operation was attempted on a closed sender."""

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +128
A new sender that sends messages to the same channel as this sender.
"""


Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SubscribableSender.subscribe() returns a Receiver, but the docstring says it returns “A new sender…”. Update the return docs to say it returns a new receiver attached to the same channel/stream.

Suggested change
A new sender that sends messages to the same channel as this sender.
"""
A new receiver that receives messages from the same channel/stream
as this sender.
"""

Copilot uses AI. Check for mistakes.
Comment on lines +129 to +130
class ClonableSender(Sender[SenderMessageT_contra], ABC):
"""A [Sender][frequenz.channels.Sender] that can be cloned."""
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Public API naming: ClonableSender / ClonableSubscribableSender appear to be misspelled (standard spelling is “Cloneable”). Since these are exported in __init__.py, consider renaming now to avoid locking in a typo in the public API.

Copilot uses AI. Check for mistakes.
Comment on lines +441 to +442
self._closed = True
self._channel._sender_count -= 1
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the last sender closes, receivers that are currently blocked in ready() will not be woken up because close() doesn't notify _recv_cv or otherwise trigger channel closure. With auto_close=True, this can leave receivers waiting forever even though all senders are closed. Consider scheduling aclose() (or at least notifying _recv_cv) when _sender_count reaches 0.

Copilot uses AI. Check for mistakes.
@shsms shsms force-pushed the auto-close-channels branch from 7f347bd to d98c90a Compare February 9, 2026 17:02
Copy link
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in general, but there are a few design issues we need to resolve/agree on. I would say the most important points are:

  1. close() vs aclose() (I would really go with aclose() unless we have a very good reason not to).
  2. Naming.

The PR is also missing documentation updates. The docs still shows Broadcast() as the only way to create a broadcast channel.

"""A [Sender][frequenz.channels.Sender] that can be subscribed to."""

@abstractmethod
def subscribe(self) -> Receiver[SenderMessageT_contra]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you considered using new_receiver() instead, as it is the name we use everywhere else?

super().__init__("Sender is closed", sender)


class SubscribableSender(Sender[SenderMessageT_contra], ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: You don't need to pass ABC here, Sender is already an ABC, so as long as you don't implement its abstract methods, it will still be an abstract class. Not sure if you prefer to mark it explicitly in case something changes in the future.

(same with the other new abstract classes)

Comment on lines +76 to +83
@abstractmethod
def close(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we talk about closing in an async context, we need to start thinking about asynchronicity in closing too. If we keep close sync (so no aclose()), we should probably add a way to wait for completion (for example, flushing a sending queue), but we can add that in the future, when/if needed.

I'm really not sure what is best here. My feeling is splitting close initiation (sync) from close completion (async) might be more error prone, because people might miss the awaiting for completion, which could potentially end up in wrong finalization or thinking something finished immediately when it actually kept working in the background somehow.

Using aclose() makes closing more atomic, and more clear when there is actually a need to wait for something to finish before the close is really complete. But it is less flexible, as one can only close something in an async context, which sometimes can complicate things. But my feeling is this happens because asyncio kind of encourages this kind of pattern, they usually split closing from awaiting (like task cancel/await). When using a sender/receiver you should be in an async context anyway.

BTW, Trio has an abstract base class AsyncResource with aclose() and everything that is async uses aclose(), so if we are in doubt, I would go for aclose(), as it seems like the safest approach.

self._recv_cv: Condition = Condition()
"""The condition to wait for data in the channel's buffer."""

self._sender_count: int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I think we should expose this as a read-only property, it might be useful if someone need to make a more manual tracking of a channel's usage.

self._latest: ChannelMessageT | None = None
"""The latest message sent to the channel."""

self._auto_close: bool = auto_close
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, having a way to query if a channel is auto-close or not might be useful. Even more, I wonder if this one could even be read-write. Although probably is not very useful, even if possible, but at least I would expose it as a read-only property.

assert isinstance(excinfo.value.__cause__, ChannelClosedError)


async def test_broadcast_auto_close_2() -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick:

Suggested change
async def test_broadcast_auto_close_2() -> None:
async def test_broadcast_auto_close_all_senders_closed() -> None:

Comment on lines +14 to +17
def oneshot(
message_type: type[ChannelMessageT], # pylint: disable=unused-argument
) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]:
"""Create a one-shot channel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would name this open_oneshot_channel() for the same reasons as the broadcast channel.


async def ready(self) -> bool:
while not self._channel.sent:
await self._channel.condition.wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is missing the release():

Suggested change
await self._channel.condition.wait()
async self._channel.condition:
with await self._channel.condition.wait()

raise SenderClosedError(self)
self._channel.message = message
self._channel.sent = True
if self._channel.condition.locked():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really need to check for locked()? Can't you always notify()?

self.message: ChannelMessageT | _Empty = _EMPTY
self.sent = False
self.drained = False
self.condition = Condition()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me that this could be implemented more simply using an Event. Just wait for the event on the receiver and set the event from the sender. Do you really need a Condition here?

Comment on lines 384 to 387
@override
def clone(self) -> _Sender[_T]:
"""Return a clone of this sender."""
return _Sender(self._channel)
Copy link
Contributor

@ela-kotulska-frequenz ela-kotulska-frequenz Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sender is closed and user clones it, then it will create new open (self._closed = False) sender. Is it ok?
It would mean, that you can create open sender to closed channel.

Comment on lines 379 to +388
@override
def close(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
self._closed = True
self._channel._sender_count -= 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if user doesn't close sender before removing it?
Would it work if we double check if sender is closed in __del__ method and print warning if not?

@shsms shsms force-pushed the auto-close-channels branch from d98c90a to 8adb21c Compare March 3, 2026 12:14
shsms added 7 commits March 3, 2026 13:14
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The `broadcast` function would only return a sender and a receiver
from an auto-closing channel.

Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
@shsms shsms force-pushed the auto-close-channels branch from 8adb21c to 8f28eaa Compare March 3, 2026 12:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:channels Affects channels implementation part:core Affects the core types (`Sender`, `Receiver`, exceptions, etc.) part:experimental Affects the experimental package part:tests Affects the unit, integration and performance (benchmarks) tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants