-
Notifications
You must be signed in to change notification settings - Fork 10
Add the oneshot channel implementation #502
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v1.x.x
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,115 @@ | ||
| # License: MIT | ||
| # Copyright © 2026 Frequenz Energy-as-a-Service GmbH | ||
|
|
||
| """A channel that can send a single message.""" | ||
|
|
||
| import asyncio | ||
| import typing | ||
|
|
||
| from ._generic import ChannelMessageT | ||
| from ._receiver import Receiver, ReceiverStoppedError | ||
| from ._sender import Sender, SenderClosedError | ||
|
|
||
|
|
||
| def make_oneshot( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I feel pretty strongly about this one. This is not a Trio-only thing, open is also used in I would also be up for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm actually inclined to switching back to IMHO, What we are doing is more like creating, because we are creating both sides and the link between them. So to me, Trio's naming is imprecise. I'd take inspiration from os.pipe, which is so much closer to what we're doing.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would also match Rust, but I'm ok with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
And you are opening a channel, the fact that you hide the channel object l, and you close it indirectly by closing all it's sender's or all it's receivers doesn't change the fact that you are opening a channel that needs to be closed. And I don't get why you want to follow other programming languages naming conventions in python. I really think is not very relevant what rust or go does here. I think I think |
||
| message_type: type[ChannelMessageT], # pylint: disable=unused-argument | ||
| ) -> tuple[Sender[ChannelMessageT], Receiver[ChannelMessageT]]: | ||
| """Create a one-shot channel. | ||
|
|
||
| A one-shot channel is a channel that can only send one message. After the first | ||
| message is sent, the sender is closed and any further attempts to send a message | ||
| will raise a `SenderClosedError`. | ||
|
|
||
| Args: | ||
| message_type: The type of messages that can be sent through this channel. | ||
|
|
||
| Returns: | ||
| A tuple of a sender and a receiver for this channel. | ||
| """ | ||
| channel = _OneShot[ChannelMessageT]() | ||
| return _OneShotSender(channel), _OneShotReceiver(channel) | ||
|
|
||
|
|
||
| class _Empty: | ||
| """A sentinel indicating that no message has been sent.""" | ||
|
|
||
|
|
||
| _EMPTY = _Empty() | ||
|
|
||
|
|
||
| class _OneShot(typing.Generic[ChannelMessageT]): | ||
| """A one-shot channel. | ||
|
|
||
| A one-shot channel is a channel that can only send one message. After the first | ||
| message is sent, the sender is closed and any further attempts to send a message | ||
| will raise a `SenderClosedError`. | ||
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| """Create a new one-shot channel.""" | ||
| self.message: ChannelMessageT | _Empty = _EMPTY | ||
| self.closed: bool = False | ||
| self.drained: bool = False | ||
| self.event: asyncio.Event = asyncio.Event() | ||
|
|
||
|
|
||
| class _OneShotSender(Sender[ChannelMessageT]): | ||
| def __init__(self, channel: _OneShot[ChannelMessageT]) -> None: | ||
| """Initialize this sender.""" | ||
| self._channel = channel | ||
|
|
||
| async def send(self, message: ChannelMessageT, /) -> None: | ||
| """Send a message through this sender.""" | ||
| if self._channel.closed: | ||
| raise SenderClosedError(self) | ||
| self._channel.message = message | ||
| self._channel.closed = True | ||
| self._channel.event.set() | ||
|
|
||
| async def aclose(self) -> None: | ||
| """Close this sender.""" | ||
| self._channel.closed = True | ||
| if isinstance(self._channel.message, _Empty): | ||
| self._channel.drained = True | ||
| self._channel.event.set() | ||
shsms marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class _OneShotReceiver(Receiver[ChannelMessageT]): | ||
| def __init__(self, channel: _OneShot[ChannelMessageT]) -> None: | ||
| """Initialize this receiver.""" | ||
| self._channel = channel | ||
|
|
||
| async def ready(self) -> bool: | ||
| """Check if a message is ready to be received. | ||
|
|
||
| Returns: | ||
| `True` if a message is ready to be received, `False` if the sender | ||
| is closed and no message will be sent. | ||
| """ | ||
| if self._channel.drained: | ||
| return False | ||
| while not self._channel.closed: | ||
| await self._channel.event.wait() | ||
llucax marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if isinstance(self._channel.message, _Empty): | ||
| return False | ||
| return True | ||
|
|
||
| def consume(self) -> ChannelMessageT: | ||
| """Consume a message from this receiver. | ||
|
|
||
| Returns: | ||
| The message that was sent through this channel. | ||
|
|
||
| Raises: | ||
| ReceiverStoppedError: If the sender was closed without sending a message. | ||
| """ | ||
| if self._channel.drained: | ||
| raise ReceiverStoppedError(self) | ||
|
|
||
| assert not isinstance( | ||
| self._channel.message, _Empty | ||
| ), "`consume()` must be preceded by a call to `ready()`." | ||
|
|
||
| self._channel.drained = True | ||
| self._channel.event.clear() | ||
| return self._channel.message | ||
shsms marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| # License: MIT | ||
| # Copyright © 2026 Frequenz Energy-as-a-Service GmbH | ||
|
|
||
| """Tests for the oneshot channel.""" | ||
|
|
||
| import asyncio | ||
|
|
||
| import pytest | ||
|
|
||
| from frequenz.channels import ( | ||
| ReceiverStoppedError, | ||
| SenderClosedError, | ||
| make_oneshot, | ||
| ) | ||
|
|
||
|
|
||
| async def test_oneshot_recv_after_send() -> None: | ||
| """Test the oneshot function. | ||
|
|
||
| `receiver.receive()` is called after `sender.send()`. | ||
| """ | ||
| sender, receiver = make_oneshot(int) | ||
|
|
||
| await sender.send(42) | ||
| assert await receiver.receive() == 42 | ||
|
|
||
| with pytest.raises(SenderClosedError): | ||
| await sender.send(43) | ||
| with pytest.raises(ReceiverStoppedError): | ||
| await receiver.receive() | ||
|
|
||
|
|
||
| async def test_oneshot_recv_before_send() -> None: | ||
| """Test the oneshot function. | ||
|
|
||
| `receiver.receive()` is called before `sender.send()`. | ||
| """ | ||
| sender, receiver = make_oneshot(int) | ||
|
|
||
| task = asyncio.create_task(receiver.receive()) | ||
|
|
||
| # Give the receiver a chance to start waiting | ||
| await asyncio.sleep(0.0) | ||
|
|
||
| await sender.send(42) | ||
| assert await task == 42 | ||
|
|
||
| with pytest.raises(SenderClosedError): | ||
| await sender.send(43) | ||
| with pytest.raises(ReceiverStoppedError): | ||
| await receiver.receive() | ||
|
|
||
|
|
||
| async def test_oneshot_recv_after_sender_closed() -> None: | ||
| """Test that closing sender works without sending a message. | ||
|
|
||
| `receiver.receive()` is called after `sender.aclose()`. | ||
| """ | ||
| sender, receiver = make_oneshot(int) | ||
|
|
||
| await sender.aclose() | ||
|
|
||
| with pytest.raises(ReceiverStoppedError): | ||
| await receiver.receive() | ||
| with pytest.raises(SenderClosedError): | ||
| await sender.send(4) | ||
|
|
||
|
|
||
| async def test_oneshot_recv_before_sender_closed() -> None: | ||
| """Test that closing sender works without sending a message. | ||
|
|
||
| `receiver.receive()` is called before `sender.aclose()`. | ||
| """ | ||
| sender, receiver = make_oneshot(int) | ||
|
|
||
| task = asyncio.create_task(receiver.receive()) | ||
shsms marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| # Give the receiver a chance to start waiting | ||
| await asyncio.sleep(0.0) | ||
|
|
||
| await sender.aclose() | ||
|
|
||
| with pytest.raises(ReceiverStoppedError): | ||
| await task | ||
|
|
||
| with pytest.raises(SenderClosedError): | ||
| await sender.send(4) | ||
Uh oh!
There was an error while loading. Please reload this page.