From b26984d9ee3292c165ffdc2635efc1388fcfd5a7 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 3 Mar 2025 17:41:55 +0100 Subject: [PATCH 1/2] Use shorter variant of the same code Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 8bb76a8..8ac9ba9 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -275,15 +275,10 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: stopping_dispatch: The dispatch that is stopping the actor. msg: The message to be passed to the actors being stopped. """ - actor: Actor | None = None identity = self._dispatch_identity(stopping_dispatch) - actor = self._actors.get(identity) - - if actor: + if actor := self._actors.pop(identity, None): await actor.stop(msg) - - del self._actors[identity] else: _logger.warning( "Actor for dispatch type %r is not running", stopping_dispatch.type From 2350b4c481572400fd9f8c8a9805e038479e9ebe Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Mon, 3 Mar 2025 17:48:56 +0100 Subject: [PATCH 2/2] Clean up retry-tasks on shutdown Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 34 +++++++++------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 8ac9ba9..6ce9594 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -133,8 +133,8 @@ async def main(): ``` """ - class RetryFailedDispatches: - """Manages the retry of failed dispatches.""" + class FailedDispatchesRetrier(BackgroundService): + """Manages the retring of failed dispatches.""" def __init__(self, retry_interval: timedelta) -> None: """Initialize the retry manager. @@ -142,10 +142,16 @@ def __init__(self, retry_interval: timedelta) -> None: Args: retry_interval: The interval between retries. """ + super().__init__() self._retry_interval = retry_interval self._channel = Broadcast[Dispatch](name="retry_channel") self._sender = self._channel.new_sender() - self._tasks: set[asyncio.Task[None]] = set() + + def start(self) -> None: + """Start the background service. + + This is a no-op. + """ def new_receiver(self) -> Receiver[Dispatch]: """Create a new receiver for dispatches to retry. @@ -187,7 +193,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen ], running_status_receiver: Receiver[Dispatch], dispatch_identity: Callable[[Dispatch], int] | None = None, - retry_interval: timedelta | None = timedelta(seconds=60), + retry_interval: timedelta = timedelta(seconds=60), ) -> None: """Initialize the dispatch handler. @@ -197,7 +203,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen running_status_receiver: The receiver for dispatch running status changes. dispatch_identity: A function to identify to which actor a dispatch refers. By default, it uses the dispatch ID. - retry_interval: The interval between retries. If `None`, retries are disabled. + retry_interval: The interval between retries. """ super().__init__() self._dispatch_identity: Callable[[Dispatch], int] = ( @@ -211,11 +217,7 @@ def __init__( # pylint: disable=too-many-arguments, too-many-positional-argumen name="dispatch_updates_channel", resend_latest=True ) self._updates_sender = self._updates_channel.new_sender() - self._retrier = ( - ActorDispatcher.RetryFailedDispatches(retry_interval) - if retry_interval - else None - ) + self._retrier = ActorDispatcher.FailedDispatchesRetrier(retry_interval) def start(self) -> None: """Start the background service.""" @@ -258,12 +260,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None: dispatch.type, exc_info=e, ) - if self._retrier: - self._retrier.retry(dispatch) - else: - _logger.error( - "No retry mechanism enabled, dispatch %r failed", dispatch - ) + self._retrier.retry(dispatch) else: # No exception occurred, so we can add the actor to the list self._actors[identity] = actor @@ -286,10 +283,7 @@ async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: async def _run(self) -> None: """Run the background service.""" - if not self._retrier: - async for dispatch in self._dispatch_rx: - await self._handle_dispatch(dispatch) - else: + async with self._retrier: retry_recv = self._retrier.new_receiver() async for selected in select(retry_recv, self._dispatch_rx):