From 16ad0877abcbd2f0a44a8fe1f5d3c43da80763d9 Mon Sep 17 00:00:00 2001 From: John Silvestre Date: Thu, 19 Feb 2026 15:25:05 -0400 Subject: [PATCH] refactor(rabbitmq): simplify consumer execution flow and improve logging --- .../RabbitMqConsumerService.cs | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/FoundationKit.Events/RabbitMQ/BackgroundServices/RabbitMqConsumerService.cs b/FoundationKit.Events/RabbitMQ/BackgroundServices/RabbitMqConsumerService.cs index b2b8c54..5a19327 100644 --- a/FoundationKit.Events/RabbitMQ/BackgroundServices/RabbitMqConsumerService.cs +++ b/FoundationKit.Events/RabbitMQ/BackgroundServices/RabbitMqConsumerService.cs @@ -4,7 +4,6 @@ using FoundationKit.Events.RabbitMQ.Exceptions; using FoundationKit.Events.RabbitMQ.Handlers; using FoundationKit.Events.RabbitMQ.Messages; -using Microsoft.Extensions.DependencyModel; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using RabbitMQ.Client; @@ -20,7 +19,7 @@ public class RabbitMqConsumerService : BackgroundService private readonly HandlerContainer _handlerContainer; private readonly RabbitConfig _rabbitConfig; private readonly RabbitTopologyRegistry _topologyRegistry; - private IChannel _channel; + private IChannel _channel = null!; public RabbitMqConsumerService(ILogger logger, IConnection connection, @@ -37,7 +36,7 @@ public RabbitMqConsumerService(ILogger logger, _topologyRegistry = topologyRegistry; } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) => await Task.Run(async () => + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { @@ -47,17 +46,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) => a await _channel.BasicQosAsync(0, 10, false, stoppingToken); var consumer = new AsyncEventingBasicConsumer(_channel); - consumer.ReceivedAsync += async (model, ea) => - { - await HandleMessages(model, ea, stoppingToken).ConfigureAwait(false); - }; + consumer.ReceivedAsync += (model, ea) => HandleMessages(model, ea, stoppingToken); await InitializeQueueAndConsumerAsync(consumer, stoppingToken).ConfigureAwait(false); - while (!stoppingToken.IsCancellationRequested) - { - await Task.Delay(1000, stoppingToken).ConfigureAwait(false); - } + await WaitUntilCancelledAsync(stoppingToken).ConfigureAwait(false); } catch (Exception e) { @@ -65,8 +58,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) => a throw new ConsumerServiceException("Error creating a consumer channel please, check rabbitMQ connection", e); } - }, stoppingToken); + } + private static async Task WaitUntilCancelledAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + await Task.Delay(1000, stoppingToken).ConfigureAwait(false); + } + } private async Task HandleMessages(object _, BasicDeliverEventArgs ea, CancellationToken cancellationToken = default) { @@ -145,10 +145,10 @@ await _channel.BasicConsumeAsync( cancellationToken: cancellationToken) .ConfigureAwait(false); } - catch + catch (Exception ex) { - _logger.LogError("Error declaring consumers: Queue:{ErrorMessage}", def.QueueName); + _logger.LogError(ex, "Error declaring consumers: Queue:{QueueName}", def.QueueName); } } } -} \ No newline at end of file +}