Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using FoundationKit.Events.RabbitMQ.Exceptions;
using FoundationKit.Events.RabbitMQ.Handlers;
using FoundationKit.Events.RabbitMQ.Messages;
using Microsoft.Extensions.DependencyModel;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
Expand All @@ -20,7 +19,7 @@ public class RabbitMqConsumerService : BackgroundService
private readonly HandlerContainer _handlerContainer;
private readonly RabbitConfig _rabbitConfig;
private readonly RabbitTopologyRegistry _topologyRegistry;
private IChannel _channel;
private IChannel _channel = null!;

public RabbitMqConsumerService(ILogger<RabbitMqConsumerService> logger,
IConnection connection,
Expand All @@ -37,7 +36,7 @@ public RabbitMqConsumerService(ILogger<RabbitMqConsumerService> logger,
_topologyRegistry = topologyRegistry;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken) => await Task.Run(async () =>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
Expand All @@ -47,26 +46,27 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) => a
await _channel.BasicQosAsync(0, 10, false, stoppingToken);

var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
{
await HandleMessages(model, ea, stoppingToken).ConfigureAwait(false);
};
consumer.ReceivedAsync += (model, ea) => HandleMessages(model, ea, stoppingToken);

await InitializeQueueAndConsumerAsync(consumer, stoppingToken).ConfigureAwait(false);

while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken).ConfigureAwait(false);
}
await WaitUntilCancelledAsync(stoppingToken).ConfigureAwait(false);
}
catch (Exception e)
{
_logger.LogError(e, "Error in RabbitMqConsumerService");
throw new ConsumerServiceException("Error creating a consumer channel please, check rabbitMQ connection",
e);
}
}, stoppingToken);
}

private static async Task WaitUntilCancelledAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(1000, stoppingToken).ConfigureAwait(false);
}
}

private async Task HandleMessages(object _, BasicDeliverEventArgs ea, CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -145,10 +145,10 @@ await _channel.BasicConsumeAsync(
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
catch
catch (Exception ex)
{
_logger.LogError("Error declaring consumers: Queue:{ErrorMessage}", def.QueueName);
_logger.LogError(ex, "Error declaring consumers: Queue:{QueueName}", def.QueueName);
}
}
}
}
}