Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- There's a new `oneshot` channel, which can be created with the `make_oneshot` function, which returns a sender and a receiver. A single message can be sent using the sender, after which it will be closed. And the receiver will close as soon as the message is received.

- `Sender`s now have an `aclose`, which must be called, when they are no-longer needed.

## Bug Fixes

Expand Down
5 changes: 4 additions & 1 deletion src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
)
from ._latest_value_cache import LatestValueCache
from ._merge import Merger, merge
from ._oneshot import make_oneshot
from ._receiver import Receiver, ReceiverError, ReceiverStoppedError
from ._select import (
Selected,
Expand All @@ -100,7 +101,7 @@
select,
selected_from,
)
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

__all__ = [
"Anycast",
Expand All @@ -120,10 +121,12 @@
"SelectError",
"Selected",
"Sender",
"SenderClosedError",
"SenderError",
"SenderMessageT_co",
"SenderMessageT_contra",
"UnhandledSelectedError",
"make_oneshot",
"merge",
"select",
"selected_from",
Expand Down
19 changes: 18 additions & 1 deletion src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -327,6 +327,9 @@ def __init__(self, channel: Anycast[_T], /) -> None:
self._channel: Anycast[_T] = channel
"""The channel that this sender belongs to."""

self._closed: bool = False
"""Whether the sender is closed."""

@override
async def send(self, message: _T, /) -> None:
"""Send a message across the channel.
Expand All @@ -343,7 +346,11 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
if self._closed:
raise SenderClosedError(self)

# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
Expand All @@ -367,6 +374,16 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify(1)
# pylint: enable=protected-access

@override
async def aclose(self) -> None:
"""Close this sender.

After closing, the sender will not be able to send any more messages. Any
attempt to send a message through a closed sender will raise a
[SenderError][frequenz.channels.SenderError].
"""
self._closed = True

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
18 changes: 17 additions & 1 deletion src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ._exceptions import ChannelClosedError
from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderError
from ._sender import Sender, SenderClosedError, SenderError

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -334,6 +334,9 @@ def __init__(self, channel: Broadcast[_T], /) -> None:
self._channel: Broadcast[_T] = channel
"""The broadcast channel this sender belongs to."""

self._closed: bool = False
"""Whether this sender is closed."""

@override
async def send(self, message: _T, /) -> None:
"""Send a message to all broadcast receivers.
Expand All @@ -345,7 +348,10 @@ async def send(self, message: _T, /) -> None:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
SenderClosedError: If this sender was closed.
"""
if self._closed:
raise SenderClosedError(self)
# pylint: disable=protected-access
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
Expand All @@ -365,6 +371,16 @@ async def send(self, message: _T, /) -> None:
self._channel._recv_cv.notify_all()
# pylint: enable=protected-access

@override
async def aclose(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderClosedError][frequenz.channels.SenderClosedError].
"""
self._closed = True

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._channel}:{type(self).__name__}"
Expand Down
115 changes: 115 additions & 0 deletions src/frequenz/channels/_oneshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""A channel that can send a single message."""

import asyncio
import typing

from ._generic import ChannelMessageT
from ._receiver import Receiver, ReceiverStoppedError
from ._sender import Sender, SenderClosedError


def make_oneshot(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open_oneshot_channel()? 😬

I feel pretty strongly about this one. This is not a Trio-only thing, open is also used in asyncio, like open_connection().

I would also be up for open_channel() and take the type as an argument somehow, but I guess it will make the interface more complicated (it will need more type parameters) with very little gain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually inclined to switching back to channels.oneshot and channels.broadcast.

IMHO, open doesn't work for us at all. For files and networking, open is an idiomatic verb. In both those cases, there is a counterparty or a resource owner, and the idea is to open something to access the other side.

What we are doing is more like creating, because we are creating both sides and the link between them. So to me, Trio's naming is imprecise.

I'd take inspiration from os.pipe, which is so much closer to what we're doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would also match Rust, but I'm ok with make_oneshot, etc. as well, and it also somewhat similar to the Go syntax, where you have to call make for many things, including channels, a bit like the new keyword of C++.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oneshot for me is not an option because it is not a verb. That leaves us with make or open. Let's say we forget about trio and asyncio, I don't understand why you want to introduce an asymmetry between make/close instead of going with open.

And you are opening a channel, the fact that you hide the channel object l, and you close it indirectly by closing all it's sender's or all it's receivers doesn't change the fact that you are opening a channel that needs to be closed.

And I don't get why you want to follow other programming languages naming conventions in python. I really think is not very relevant what rust or go does here.

I think os.pipe is not a good example because it follows very old POSIX naming conventions, not python ones.

I think open_oneshot_channel is the most precise, descriptive and pythonic option.

message_type: type[ChannelMessageT], # pylint: disable=unused-argument
) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]:
"""Create a one-shot channel.

A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.

Args:
message_type: The type of messages that can be sent through this channel.

Returns:
A tuple of a sender and a receiver for this channel.
"""
channel = _OneShot[ChannelMessageT]()
return _OneShotSender(channel), _OneShotReceiver(channel)


class _Empty:
"""A sentinel indicating that no message has been sent."""


_EMPTY = _Empty()


class _OneShot(typing.Generic[ChannelMessageT]):
"""A one-shot channel.

A one-shot channel is a channel that can only send one message. After the first
message is sent, the sender is closed and any further attempts to send a message
will raise a `SenderClosedError`.
"""

def __init__(self) -> None:
"""Create a new one-shot channel."""
self.message: ChannelMessageT | _Empty = _EMPTY
self.closed: bool = False
self.drained: bool = False
self.event: asyncio.Event = asyncio.Event()


class _OneShotSender(Sender[ChannelMessageT]):
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
"""Initialize this sender."""
self._channel = channel

async def send(self, message: ChannelMessageT, /) -> None:
"""Send a message through this sender."""
if self._channel.closed:
raise SenderClosedError(self)
self._channel.message = message
self._channel.closed = True
self._channel.event.set()

async def aclose(self) -> None:
"""Close this sender."""
self._channel.closed = True
if isinstance(self._channel.message, _Empty):
self._channel.drained = True
self._channel.event.set()


class _OneShotReceiver(Receiver[ChannelMessageT]):
def __init__(self, channel: _OneShot[ChannelMessageT]) -> None:
"""Initialize this receiver."""
self._channel = channel

async def ready(self) -> bool:
"""Check if a message is ready to be received.

Returns:
`True` if a message is ready to be received, `False` if the sender
is closed and no message will be sent.
"""
if self._channel.drained:
return False
while not self._channel.closed:
await self._channel.event.wait()
if isinstance(self._channel.message, _Empty):
return False
return True

def consume(self) -> ChannelMessageT:
"""Consume a message from this receiver.

Returns:
The message that was sent through this channel.

Raises:
ReceiverStoppedError: If the sender was closed without sending a message.
"""
if self._channel.drained:
raise ReceiverStoppedError(self)

assert not isinstance(
self._channel.message, _Empty
), "`consume()` must be preceded by a call to `ready()`."

self._channel.drained = True
self._channel.event.clear()
return self._channel.message
21 changes: 21 additions & 0 deletions src/frequenz/channels/_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ async def send(self, message: SenderMessageT_contra, /) -> None:
SenderError: If there was an error sending the message.
"""

@abstractmethod
async def aclose(self) -> None:
"""Close this sender.

After a sender is closed, it can no longer be used to send messages. Any
attempt to send a message through a closed sender will raise a
[SenderClosedError][frequenz.channels.SenderClosedError].
"""


class SenderError(Error, Generic[SenderMessageT_co]):
"""An error that originated in a [Sender][frequenz.channels.Sender].
Expand All @@ -88,3 +97,15 @@ def __init__(self, message: str, sender: Sender[SenderMessageT_co]):
super().__init__(message)
self.sender: Sender[SenderMessageT_co] = sender
"""The sender where the error happened."""


class SenderClosedError(SenderError[SenderMessageT_co]):
"""An error indicating that a send operation was attempted on a closed sender."""

def __init__(self, sender: Sender[SenderMessageT_co]):
"""Initialize this error.

Args:
sender: The [Sender][frequenz.channels.Sender] that was closed.
"""
super().__init__("Sender is closed", sender)
9 changes: 7 additions & 2 deletions src/frequenz/channels/experimental/_relay_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
to the senders it was created with.
"""

import typing
import asyncio

from typing_extensions import override

from .._generic import SenderMessageT_contra
from .._sender import Sender


class RelaySender(typing.Generic[SenderMessageT_contra], Sender[SenderMessageT_contra]):
class RelaySender(Sender[SenderMessageT_contra]):
"""A Sender for sending messages to multiple senders.

The `RelaySender` class takes multiple senders and forwards all the messages sent to
Expand Down Expand Up @@ -57,3 +57,8 @@ async def send(self, message: SenderMessageT_contra, /) -> None:
"""
for sender in self._senders:
await sender.send(message)

@override
async def aclose(self) -> None:
"""Close this sender."""
await asyncio.gather(*(sender.aclose() for sender in self._senders))
87 changes: 87 additions & 0 deletions tests/test_oneshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Tests for the oneshot channel."""

import asyncio

import pytest

from frequenz.channels import (
ReceiverStoppedError,
SenderClosedError,
make_oneshot,
)


async def test_oneshot_recv_after_send() -> None:
"""Test the oneshot function.

`receiver.receive()` is called after `sender.send()`.
"""
sender, receiver = make_oneshot(int)

await sender.send(42)
assert await receiver.receive() == 42

with pytest.raises(SenderClosedError):
await sender.send(43)
with pytest.raises(ReceiverStoppedError):
await receiver.receive()


async def test_oneshot_recv_before_send() -> None:
"""Test the oneshot function.

`receiver.receive()` is called before `sender.send()`.
"""
sender, receiver = make_oneshot(int)

task = asyncio.create_task(receiver.receive())

# Give the receiver a chance to start waiting
await asyncio.sleep(0.0)

await sender.send(42)
assert await task == 42

with pytest.raises(SenderClosedError):
await sender.send(43)
with pytest.raises(ReceiverStoppedError):
await receiver.receive()


async def test_oneshot_recv_after_sender_closed() -> None:
"""Test that closing sender works without sending a message.

`receiver.receive()` is called after `sender.aclose()`.
"""
sender, receiver = make_oneshot(int)

await sender.aclose()

with pytest.raises(ReceiverStoppedError):
await receiver.receive()
with pytest.raises(SenderClosedError):
await sender.send(4)


async def test_oneshot_recv_before_sender_closed() -> None:
"""Test that closing sender works without sending a message.

`receiver.receive()` is called before `sender.aclose()`.
"""
sender, receiver = make_oneshot(int)

task = asyncio.create_task(receiver.receive())

# Give the receiver a chance to start waiting
await asyncio.sleep(0.0)

await sender.aclose()

with pytest.raises(ReceiverStoppedError):
await task

with pytest.raises(SenderClosedError):
await sender.send(4)