diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index aff63be..ae2b4e5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -18,7 +18,7 @@ jobs: - name: Setup .NET uses: actions/setup-dotnet@v5 with: - dotnet-version: 8.0.x + dotnet-version: 10.0.x - name: Restore dependencies run: dotnet restore diff --git a/OdectyStat/Application/GaugeService.cs b/OdectyStat/Application/GaugeService.cs index a5c78ea..0899bea 100644 --- a/OdectyStat/Application/GaugeService.cs +++ b/OdectyStat/Application/GaugeService.cs @@ -23,6 +23,11 @@ public async Task AddNewValue(NewValue newValue) { Console.WriteLine("Add new value " + newValue.GaugeId); var gauge = await context.GaugeRepository.GetGauge(newValue.GaugeId); + if (gauge == null) + { + logger.LogError("Gauge {gaugeId} not found.", newValue.GaugeId); + throw new Exception($"Gauge {newValue.GaugeId} not found."); + } gauge.SetNewValue(newValue.Value, newValue.Datetime); await context.SaveChangesAsync(); await context.MessageQueue.Publish(new { gaugeId = newValue.GaugeId, value = gauge.LastValue }, MessageQueueRoutingKeys.Odecty_Gauge_Lastvaluechanged); @@ -82,7 +87,7 @@ public async Task AddNewValue(NewValue newValue) public async Task AddIncrement(int gaugeId, decimal increment, DateTime datetime) { var gauge = await context.GaugeRepository.GetGauge(gaugeId); - gauge.AddIncrement(increment, datetime); + gauge?.AddIncrement(increment, datetime); await context.SaveChangesAsync(); } @@ -96,6 +101,11 @@ public async Task GaugeRecognizedSucceeded(int gaugeId, string imagePath, decima { logger.LogInformation("Recognition succeeded for gauge {gaugeId} with image {imagePath} and value {value}", gaugeId, imagePath, value); var gauge = await context.GaugeRepository.GetGauge(gaugeId); + if (gauge == null) + { + logger.LogError("Gauge {gaugeId} not found", gaugeId); + throw new Exception($"Gauge {gaugeId} not found."); + } var localDateTime = dateTime.ToLocalTime(); bool valid = false; if (gauge.LastMeasurement != null) diff --git a/OdectyStat/Business/ComputeService.cs b/OdectyStat/Business/ComputeService.cs index 0d64cd0..b4536ee 100644 --- a/OdectyStat/Business/ComputeService.cs +++ b/OdectyStat/Business/ComputeService.cs @@ -1,4 +1,5 @@ -using OdectyStat1.Business; +#nullable disable +using OdectyStat1.Business; using OdectyStat1.Entities; namespace OdectyStat1 { diff --git a/OdectyStat/Business/ComputeService2.cs b/OdectyStat/Business/ComputeService2.cs index 84b3ead..715f01b 100644 --- a/OdectyStat/Business/ComputeService2.cs +++ b/OdectyStat/Business/ComputeService2.cs @@ -1,4 +1,6 @@ -using OdectyStat1.Contracts; +#nullable disable + +using OdectyStat1.Contracts; using OdectyStat1.Entities; namespace OdectyStat1.Business diff --git a/OdectyStat/Business/ComputeService3.cs b/OdectyStat/Business/ComputeService3.cs index 6399b81..83780a7 100644 --- a/OdectyStat/Business/ComputeService3.cs +++ b/OdectyStat/Business/ComputeService3.cs @@ -1,4 +1,5 @@ -using OdectyStat1.Contracts; +#nullable disable +using OdectyStat1.Contracts; using OdectyStat1.Entities; namespace OdectyStat1.Business diff --git a/OdectyStat/Business/Gauge.cs b/OdectyStat/Business/Gauge.cs index 21dfac4..f646334 100644 --- a/OdectyStat/Business/Gauge.cs +++ b/OdectyStat/Business/Gauge.cs @@ -9,12 +9,12 @@ public class Gauge { [Key] public int Id { get; set; } - public string Description { get; set; } - public string Type { get; set; } + public string? Description { get; set; } + public required string Type { get; set; } public decimal LastValue { get; set; } [NotMapped] - public GaugeMeasurement LastMeasurement { get; set; } + public GaugeMeasurement? LastMeasurement { get; set; } public decimal? MaxValuePerHour { get; set; } public decimal InitialValue { get; set; } diff --git a/OdectyStat/Contracts/IGaugeContext.cs b/OdectyStat/Contracts/IGaugeContext.cs index fd8e6c8..fb7a8c3 100644 --- a/OdectyStat/Contracts/IGaugeContext.cs +++ b/OdectyStat/Contracts/IGaugeContext.cs @@ -4,7 +4,6 @@ public interface IGaugeContext { IGaugeRepository GaugeRepository { get; } IMeasurementDayRepository MeasurementDayRepository { get; } - IExcelProvider ExcelProvider { get; } IMeasurementStatisticsRepository MeasurementStatisticsRepository { get; } IMeasurementRepository MeasurementRepository { get; } IHomeAssistantStatisticsRepository HomeAssistantStatisticsRepository { get; } diff --git a/OdectyStat/Contracts/IGaugeRepository.cs b/OdectyStat/Contracts/IGaugeRepository.cs index d32fa3d..b0f3d99 100644 --- a/OdectyStat/Contracts/IGaugeRepository.cs +++ b/OdectyStat/Contracts/IGaugeRepository.cs @@ -4,6 +4,6 @@ namespace OdectyStat1.Contracts { public interface IGaugeRepository { - Task GetGauge(int id); + Task GetGauge(int id); } } diff --git a/OdectyStat/Contracts/IMeasurementStatisticsRepository.cs b/OdectyStat/Contracts/IMeasurementStatisticsRepository.cs index 218e03a..4409130 100644 --- a/OdectyStat/Contracts/IMeasurementStatisticsRepository.cs +++ b/OdectyStat/Contracts/IMeasurementStatisticsRepository.cs @@ -4,6 +4,6 @@ namespace OdectyStat1.Contracts { public interface IMeasurementStatisticsRepository { - Task GetLast(int gaugeId); + Task GetLast(int gaugeId); } } \ No newline at end of file diff --git a/OdectyStat/DataLayer/Consumers/BinaryConsumerBackgroundService.cs b/OdectyStat/DataLayer/Consumers/BinaryConsumerBackgroundService.cs index caebb90..1405725 100644 --- a/OdectyStat/DataLayer/Consumers/BinaryConsumerBackgroundService.cs +++ b/OdectyStat/DataLayer/Consumers/BinaryConsumerBackgroundService.cs @@ -35,12 +35,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (!consumer.IsConsuming) { - consumer.StartConsuming(); + await consumer.StartConsuming(); } if (stoppingToken.IsCancellationRequested) { - consumer.StopConsuming(); + await consumer.StopConsuming(); } } @@ -49,8 +49,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) foreach (var consumer in consumers) { - consumer.StopConsuming(); - consumer.Dispose(); + await consumer.StopConsuming(); + await consumer.DisposeAsync(); } } } diff --git a/OdectyStat/DataLayer/Consumers/BinaryMessageConsumer.cs b/OdectyStat/DataLayer/Consumers/BinaryMessageConsumer.cs index d25c573..986fc49 100644 --- a/OdectyStat/DataLayer/Consumers/BinaryMessageConsumer.cs +++ b/OdectyStat/DataLayer/Consumers/BinaryMessageConsumer.cs @@ -19,17 +19,17 @@ public BinaryMessageConsumer( this.logger = logger; } - protected override async void ConsumerReceived(object? sender, BasicDeliverEventArgs e) + protected override async Task ConsumerReceived(object? sender, BasicDeliverEventArgs e) { try { await handler.HandleAsync(e.Body, CancellationToken.None); - AcknowledgeMessage(e.DeliveryTag); + await AcknowledgeMessage(e.DeliveryTag); } catch (Exception ex) { logger.LogError(ex, "Failed to process binary message from queue {QueueName}", handler.QueueName); - RejectMessage(e.DeliveryTag, requeue: false); + await RejectMessage(e.DeliveryTag, requeue: false); } } } diff --git a/OdectyStat/DataLayer/Consumers/ConsumerBackgroundService.cs b/OdectyStat/DataLayer/Consumers/ConsumerBackgroundService.cs index 092c411..4976be5 100644 --- a/OdectyStat/DataLayer/Consumers/ConsumerBackgroundService.cs +++ b/OdectyStat/DataLayer/Consumers/ConsumerBackgroundService.cs @@ -24,11 +24,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (!consumer.IsConsuming) { - consumer.StartConsuming(); + await consumer.StartConsuming(); } if (stoppingToken.IsCancellationRequested) { - consumer.StopConsuming(); + await consumer.StopConsuming(); } } // Placeholder for background service logic diff --git a/OdectyStat/DataLayer/Consumers/IRabbitMQConsumer.cs b/OdectyStat/DataLayer/Consumers/IRabbitMQConsumer.cs index 1575b3c..c84613e 100644 --- a/OdectyStat/DataLayer/Consumers/IRabbitMQConsumer.cs +++ b/OdectyStat/DataLayer/Consumers/IRabbitMQConsumer.cs @@ -4,7 +4,7 @@ public interface IRabbitMQConsumer { bool IsConsuming { get; } - void Dispose(); - void StartConsuming(); - void StopConsuming(); + ValueTask DisposeAsync(); + Task StartConsuming(); + Task StopConsuming(); } \ No newline at end of file diff --git a/OdectyStat/DataLayer/Consumers/MQClient.cs b/OdectyStat/DataLayer/Consumers/MQClient.cs index d6ca1db..9b9156c 100644 --- a/OdectyStat/DataLayer/Consumers/MQClient.cs +++ b/OdectyStat/DataLayer/Consumers/MQClient.cs @@ -23,7 +23,7 @@ public MQClient(IServiceProvider serviceProvider, RabbitMQProvider rabbitMQProvi this.logger = logger; } - protected override async void ConsumerReceived(object? sender, BasicDeliverEventArgs e) + protected override async Task ConsumerReceived(object? sender, BasicDeliverEventArgs e) { logger.LogInformation("Data received at: {time}", DateTimeOffset.Now); if (!inProcess) @@ -34,14 +34,22 @@ protected override async void ConsumerReceived(object? sender, BasicDeliverEvent var body = Encoding.UTF8.GetString(e.Body.ToArray()); logger.LogInformation("Data received {body}", body); var newValue = Newtonsoft.Json.JsonConvert.DeserializeObject(body); - using var scope = serviceProvider.CreateScope(); - var service = scope.ServiceProvider.GetService(); - await service.AddNewValue(newValue); - AcknowledgeMessage(e.DeliveryTag); + if (newValue != null) + { + using var scope = serviceProvider.CreateScope(); + var service = scope.ServiceProvider.GetService(); + await service!.AddNewValue(newValue); + await AcknowledgeMessage(e.DeliveryTag); + } + else + { + await RejectMessage(e.DeliveryTag); + } } catch (Exception ex) { logger.LogError(ex, "Error processing data: {message}", ex.Message); + await RejectMessage(e.DeliveryTag); } finally { @@ -51,7 +59,7 @@ protected override async void ConsumerReceived(object? sender, BasicDeliverEvent else { //redeliver message - RejectMessage(e.DeliveryTag, true); + await RejectMessage(e.DeliveryTag, true); } } } diff --git a/OdectyStat/DataLayer/Consumers/RabbitMQConsumer.cs b/OdectyStat/DataLayer/Consumers/RabbitMQConsumer.cs index a73f233..3ef2d48 100644 --- a/OdectyStat/DataLayer/Consumers/RabbitMQConsumer.cs +++ b/OdectyStat/DataLayer/Consumers/RabbitMQConsumer.cs @@ -3,37 +3,37 @@ using RabbitMQ.Client.Events; namespace OdectyStat1.DataLayer.Consumers; -public abstract class RabbitMQConsumer : IDisposable, IRabbitMQConsumer +public abstract class RabbitMQConsumer : IAsyncDisposable, IRabbitMQConsumer { private readonly RabbitMQProvider rabbitMQProvider; private readonly ILogger logger; private readonly string queueName; - private IModel model; + private IChannel? model; public bool IsConsuming => model != null && !model.IsClosed; public RabbitMQConsumer(RabbitMQProvider rabbitMQProvider, ILogger logger, string queueName) { this.rabbitMQProvider = rabbitMQProvider; - this.rabbitMQProvider.ConnectionShutdown += (s, e) => + this.rabbitMQProvider.ConnectionShutdown += async (s, e) => { - StopConsuming(); + await StopConsuming(); logger.LogWarning("RabbitMQ connection shutdown detected. Stopped consuming."); }; this.logger = logger; this.queueName = queueName; } - public void StartConsuming() + public async Task StartConsuming() { if (model == null || model.IsClosed) { - model = rabbitMQProvider.CreateModel(); + model = await rabbitMQProvider.CreateModel(); if (model != null) { - model.ModelShutdown += (s, e) => + model.ChannelShutdownAsync += async (s, e) => { - StopConsuming(); + await StopConsuming(); logger.LogWarning("RabbitMQ model shutdown detected. Stopped consuming from queue: {QueueName}", queueName); }; } @@ -41,38 +41,56 @@ public void StartConsuming() if (model == null) { logger.LogWarning("Failed to create RabbitMQ model for consuming."); - StopConsuming(); + await StopConsuming(); return; } - var consumer = new EventingBasicConsumer(model); - consumer.Received += ConsumerReceived; - model.BasicConsume(queueName, false, consumer); + var consumer = new AsyncEventingBasicConsumer(model); + consumer.ReceivedAsync += ConsumerReceived; + await model.BasicConsumeAsync(queueName, false, consumer); logger.LogInformation("Started consuming from queue: {QueueName}", queueName); } - protected abstract void ConsumerReceived(object? sender, BasicDeliverEventArgs e); + protected abstract Task ConsumerReceived(object? sender, BasicDeliverEventArgs e); - protected void AcknowledgeMessage(ulong deliveryTag) + protected async Task AcknowledgeMessage(ulong deliveryTag) { - model?.BasicAck(deliveryTag, false); + if (model != null) + { + await model.BasicAckAsync(deliveryTag, false); + } } - protected void RejectMessage(ulong deliveryTag, bool requeue = false) + protected async Task RejectMessage(ulong deliveryTag, bool requeue = false) { - model?.BasicReject(deliveryTag, requeue); + if (model != null) + { + await model.BasicRejectAsync(deliveryTag, requeue); + } } - public void StopConsuming() + public async Task StopConsuming() { - model?.Close(); - model?.Dispose(); + if (model != null) + { + if (!model.IsClosed) + { + await model.CloseAsync(); + } + model.Dispose(); + } model = null; } - public void Dispose() + public async ValueTask DisposeAsync() { - model?.Close(); - model?.Dispose(); + if (model != null) + { + if (!model.IsClosed) + { + await model.CloseAsync(); + } + await model.DisposeAsync(); + } model = null; } } diff --git a/OdectyStat/DataLayer/Consumers/RecognizedFailed.cs b/OdectyStat/DataLayer/Consumers/RecognizedFailed.cs index 9c458de..02b090c 100644 --- a/OdectyStat/DataLayer/Consumers/RecognizedFailed.cs +++ b/OdectyStat/DataLayer/Consumers/RecognizedFailed.cs @@ -26,7 +26,7 @@ public RecognizedFailed(IServiceProvider serviceProvider, IOptions(body); - using var scope = serviceProvider.CreateScope(); - var service = scope.ServiceProvider.GetService(); - service.GaugeRecognizedFailed(int.Parse(message.gaugeId.ToString()), message.file.ToString()); - AcknowledgeMessage(e.DeliveryTag); + dynamic? message = Newtonsoft.Json.JsonConvert.DeserializeObject(body); + if (message != null) + { + using var scope = serviceProvider.CreateScope(); + var service = scope.ServiceProvider.GetService(); + service!.GaugeRecognizedFailed(int.Parse(message.gaugeId.ToString()), message.file.ToString()); + await AcknowledgeMessage(e.DeliveryTag); + } + else + { + await RejectMessage(e.DeliveryTag); + } } catch (Exception ex) { logger.LogError(ex, "Error processing data: {message}", ex.Message); + await RejectMessage(e.DeliveryTag); } finally { @@ -54,7 +62,7 @@ protected override void ConsumerReceived(object? sender, BasicDeliverEventArgs e else { //redeliver message - RejectMessage(e.DeliveryTag, true); + await RejectMessage(e.DeliveryTag, true); } } } diff --git a/OdectyStat/DataLayer/Consumers/RecognizedSuccess.cs b/OdectyStat/DataLayer/Consumers/RecognizedSuccess.cs index 96bf379..9653363 100644 --- a/OdectyStat/DataLayer/Consumers/RecognizedSuccess.cs +++ b/OdectyStat/DataLayer/Consumers/RecognizedSuccess.cs @@ -24,7 +24,7 @@ public RecognizedSuccess(IServiceProvider serviceProvider, RabbitMQProvider rabb this.logger = logger; } - protected override async void ConsumerReceived(object? sender, BasicDeliverEventArgs e) + protected override async Task ConsumerReceived(object? sender, BasicDeliverEventArgs e) { logger.LogInformation("Data received at: {time}", DateTimeOffset.Now); if (!inProcess) @@ -34,15 +34,23 @@ protected override async void ConsumerReceived(object? sender, BasicDeliverEvent inProcess = true; var body = Encoding.UTF8.GetString(e.Body.ToArray()); logger.LogInformation("Data received {body}", body); - dynamic message = Newtonsoft.Json.JsonConvert.DeserializeObject(body); - using var scope = serviceProvider.CreateScope(); - var service = scope.ServiceProvider.GetService(); - await service.GaugeRecognizedSucceeded(int.Parse(message.gaugeId.ToString()),message.file.ToString(), decimal.Parse(message.state.ToString(), CultureInfo.InvariantCulture), DateTime.Parse(message.datetime.ToString())); - AcknowledgeMessage(e.DeliveryTag); + dynamic? message = Newtonsoft.Json.JsonConvert.DeserializeObject(body); + if (message != null) + { + using var scope = serviceProvider.CreateScope(); + var service = scope.ServiceProvider.GetService(); + await service!.GaugeRecognizedSucceeded(int.Parse(message.gaugeId.ToString()), message.file.ToString(), decimal.Parse(message.state.ToString(), CultureInfo.InvariantCulture), DateTime.Parse(message.datetime.ToString())); + await AcknowledgeMessage(e.DeliveryTag); + } + else + { + await RejectMessage(e.DeliveryTag); + } } catch (Exception ex) { logger.LogError(ex, "Error processing data: {message}", ex.Message); + await RejectMessage(e.DeliveryTag); } finally { @@ -52,7 +60,7 @@ protected override async void ConsumerReceived(object? sender, BasicDeliverEvent else { //redeliver message - RejectMessage(e.DeliveryTag, true); + await RejectMessage(e.DeliveryTag, true); } } } diff --git a/OdectyStat/DataLayer/GaugeContext.cs b/OdectyStat/DataLayer/GaugeContext.cs index 267e264..f6bd5e4 100644 --- a/OdectyStat/DataLayer/GaugeContext.cs +++ b/OdectyStat/DataLayer/GaugeContext.cs @@ -10,7 +10,6 @@ public class GaugeContext : IGaugeContext public GaugeContext(IGaugeRepository gaugeRepository, GaugeDbContext context, IMeasurementDayRepository measurementDayRepository, - //IExcelProvider excelProvider, IMeasurementStatisticsRepository measurementStatisticsRepository, IMeasurementRepository measurementRepository, IHomeAssistantStatisticsRepository homeAssistantStatisticsRepository, @@ -20,7 +19,6 @@ public GaugeContext(IGaugeRepository gaugeRepository, GaugeRepository = gaugeRepository; this.context = context; MeasurementDayRepository = measurementDayRepository; - //ExcelProvider=excelProvider; MeasurementStatisticsRepository = measurementStatisticsRepository; MeasurementRepository = measurementRepository; HomeAssistantStatisticsRepository = homeAssistantStatisticsRepository; @@ -30,7 +28,6 @@ public GaugeContext(IGaugeRepository gaugeRepository, public IGaugeRepository GaugeRepository { get; } public IMeasurementDayRepository MeasurementDayRepository { get; } - public IExcelProvider ExcelProvider { get; } public IMeasurementStatisticsRepository MeasurementStatisticsRepository { get; } @@ -41,14 +38,20 @@ public GaugeContext(IGaugeRepository gaugeRepository, public void AddHomeAssistant(TEntity entity) { - homeAssistantDbContext.Add(entity); + if (entity != null) + { + homeAssistantDbContext.Add(entity); + } } public void AddRange(ICollection entities) { foreach (TEntity entity in entities) { - context.Add(entity); + if (entity != null) + { + context.Add(entity); + } } } diff --git a/OdectyStat/DataLayer/GaugeRepository.cs b/OdectyStat/DataLayer/GaugeRepository.cs index bfa31a2..81f60ad 100644 --- a/OdectyStat/DataLayer/GaugeRepository.cs +++ b/OdectyStat/DataLayer/GaugeRepository.cs @@ -13,10 +13,13 @@ public GaugeRepository(GaugeDbContext gaugeContext) this.gaugeContext = gaugeContext; } - public async Task GetGauge(int id) + public async Task GetGauge(int id) { var gauge = await gaugeContext.Gauge.FindAsync(id); - gauge.LastMeasurement = await gaugeContext.GaugeMeasurement.Where(k => k.GaugeId == id).OrderByDescending(k => k.MeasurementDateTime).FirstOrDefaultAsync(); + if (gauge != null) + { + gauge.LastMeasurement = await gaugeContext.GaugeMeasurement.Where(k => k.GaugeId == id).OrderByDescending(k => k.MeasurementDateTime).FirstOrDefaultAsync(); + } return gauge; } } diff --git a/OdectyStat/DataLayer/MeasurementStatisticsRepository.cs b/OdectyStat/DataLayer/MeasurementStatisticsRepository.cs index b35b2e7..f62e70d 100644 --- a/OdectyStat/DataLayer/MeasurementStatisticsRepository.cs +++ b/OdectyStat/DataLayer/MeasurementStatisticsRepository.cs @@ -12,7 +12,7 @@ public MeasurementStatisticsRepository(GaugeDbContext context) { this.context = context; } - public async Task GetLast(int gaugeId) + public async Task GetLast(int gaugeId) { return await context.GaugeMeasuerementStatistics.Where(k => k.GaugeId == gaugeId).OrderByDescending(k => k.MeasurementDateTime).FirstOrDefaultAsync(); } diff --git a/OdectyStat/DataLayer/MessageQueue.cs b/OdectyStat/DataLayer/MessageQueue.cs index ae53aa7..f1ffcd4 100644 --- a/OdectyStat/DataLayer/MessageQueue.cs +++ b/OdectyStat/DataLayer/MessageQueue.cs @@ -7,30 +7,51 @@ namespace OdectyStat1.DataLayer; public class MessageQueue : IMessageQueue, IDisposable { - private readonly IModel model; + private IChannel? model; + private readonly RabbitMQProvider rabbitMQProvider; private readonly IOptions options; + private readonly ILogger logger; - public MessageQueue(RabbitMQProvider rabbitMQProvider, IOptions options) + public MessageQueue(RabbitMQProvider rabbitMQProvider, IOptions options, ILogger logger) { - model = rabbitMQProvider.CreateModel(); + this.rabbitMQProvider = rabbitMQProvider; this.options = options; + this.logger = logger; } - public Task Publish(object message, string routingKey) + public async Task Publish(object message, string routingKey) { - model.BasicPublish(exchange: options.Value.ExchangeName, + if(model == null) + { + model = await rabbitMQProvider.CreateModel(); + if(model == null) + { + logger.LogWarning("Channel could not be created."); + return; + } + } + await model.BasicPublishAsync(exchange: options.Value.ExchangeName, routingKey: routingKey, - basicProperties: null, + mandatory: false, + basicProperties: new BasicProperties(), body: Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(message))); - return Task.CompletedTask; } - public Task MQTTPublish(string message, string routingKey) + public async Task MQTTPublish(string message, string routingKey) { - model.BasicPublish(exchange: "amq.topic", + if (model == null) + { + model = await rabbitMQProvider.CreateModel(); + if (model == null) + { + logger.LogWarning("Channel could not be created."); + return; + } + } + await model.BasicPublishAsync(exchange: "amq.topic", routingKey: routingKey, - basicProperties: null, + mandatory: false, + basicProperties: new BasicProperties(), body: Encoding.UTF8.GetBytes(message)); - return Task.CompletedTask; } public void Dispose() diff --git a/OdectyStat/DataLayer/RabbitMQProvider.cs b/OdectyStat/DataLayer/RabbitMQProvider.cs index 39b28fc..17b7f2c 100644 --- a/OdectyStat/DataLayer/RabbitMQProvider.cs +++ b/OdectyStat/DataLayer/RabbitMQProvider.cs @@ -2,11 +2,12 @@ using Microsoft.Extensions.Options; using OdectyStat1.Dto; using RabbitMQ.Client; +using RabbitMQ.Client.Events; namespace OdectyStat1.DataLayer; -public class RabbitMQProvider : IDisposable +public class RabbitMQProvider : IAsyncDisposable { - private IConnection connection; + private IConnection? connection; private readonly IOptions options; private readonly ILogger logger; private bool first = true; @@ -16,8 +17,11 @@ public class RabbitMQProvider : IDisposable private readonly Random random = new Random(); private DateTime? lastAttemptTime = null; private TimeSpan? connectionDelay = null; + private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1); + private readonly SemaphoreSlim semaphoreConnect = new SemaphoreSlim(1, 1); - public EventHandler ConnectionShutdown = new EventHandler((s, e) => { }); + + public AsyncEventHandler ConnectionShutdown = new AsyncEventHandler((s, e) => Task.CompletedTask); public RabbitMQProvider(IOptions options, ILogger logger) { @@ -30,21 +34,25 @@ public RabbitMQProvider(IOptions options, ILogger handler in ConnectionShutdown.GetInvocationList()) + { + try + { + await handler(sender, e); + } + catch(Exception ex) + { + logger.LogError(ex, ex.Message); + } + } + await connection.CloseAsync(); + connection.Dispose(); + connection = null; + } isConnected = false; } - public IModel CreateModel() + public async Task CreateModel() { - if(!isConnected) + if(!isConnected || connection == null) { - Connect(); + await Connect(); } if(!isConnected) { @@ -78,32 +104,47 @@ public IModel CreateModel() } if (first) { - using var model = connection.CreateModel(); - model.ExchangeDeclare(options.Value.ExchangeName, ExchangeType.Direct, true, false, null); - foreach (var exchange in options.Value.QueueMappings.Select(q => q.ExchangeName).Distinct()) + await semaphore.WaitAsync(); + try { - if (exchange.StartsWith("amq.")) + if (first) { - continue; + using var model = await connection!.CreateChannelAsync(); + await model.ExchangeDeclareAsync(options.Value.ExchangeName, ExchangeType.Direct, true, false, null); + foreach (var exchange in options.Value.QueueMappings.Select(q => q.ExchangeName).Distinct()) + { + if (exchange == null || exchange.StartsWith("amq.")) + { + continue; + } + await model.ExchangeDeclareAsync(exchange, ExchangeType.Direct, true, false, null); + } + foreach (var queue in options.Value.QueueMappings.Select(q => q.QueueName).Distinct()) + { + await model.QueueDeclareAsync(queue, true, false, false, null); + } + foreach (var map in options.Value.QueueMappings) + { + await model.QueueBindAsync(map.QueueName, map.ExchangeName, map.RoutingKey ?? string.Empty); + } + first = false; } - model.ExchangeDeclare(exchange, ExchangeType.Direct, true, false, null); - } - foreach (var queue in options.Value.QueueMappings.Select(q => q.QueueName).Distinct()) - { - model.QueueDeclare(queue, true, false, false, null); } - foreach (var map in options.Value.QueueMappings) + finally { - model.QueueBind(map.QueueName, map.ExchangeName, map.RoutingKey); + semaphore.Release(); } - first = false; } - return connection.CreateModel(); + return await connection!.CreateChannelAsync(); } - public void Dispose() + public async ValueTask DisposeAsync() { - connection?.Close(); + if (connection != null) + { + await connection.CloseAsync(); + await connection.DisposeAsync(); + } connection = null; } } diff --git a/OdectyStat/Dto/GaugeImageLocation.cs b/OdectyStat/Dto/GaugeImageLocation.cs index 64915f3..a43d49b 100644 --- a/OdectyStat/Dto/GaugeImageLocation.cs +++ b/OdectyStat/Dto/GaugeImageLocation.cs @@ -7,7 +7,7 @@ namespace OdectyStat1.Dto; public class GaugeImageLocation { - public string Path { get; set; } - public string RecognizedFailedFolder { get; set; } - public string RecognizedSuccessFolder { get; set; } + public required string Path { get; set; } + public required string RecognizedFailedFolder { get; set; } + public required string RecognizedSuccessFolder { get; set; } } diff --git a/OdectyStat/Dto/GaugeOverviewDto.cs b/OdectyStat/Dto/GaugeOverviewDto.cs index 0fbfb63..f562d2a 100644 --- a/OdectyStat/Dto/GaugeOverviewDto.cs +++ b/OdectyStat/Dto/GaugeOverviewDto.cs @@ -3,7 +3,7 @@ namespace OdectyStat1.Dto; public class GaugeOverviewDto { public int Id { get; set; } - public string Description { get; set; } = string.Empty; + public string? Description { get; set; } = string.Empty; public string Type { get; set; } = string.Empty; public decimal LastValue { get; set; } public DateTime? LastMeasurementAt { get; set; } diff --git a/OdectyStat/Dto/OdectySettings.cs b/OdectyStat/Dto/OdectySettings.cs index 709cd48..9f5d2be 100644 --- a/OdectyStat/Dto/OdectySettings.cs +++ b/OdectyStat/Dto/OdectySettings.cs @@ -1,10 +1,10 @@ namespace OdectyStat1.Dto; public class OdectySettings { - public string RabbitMQHost { get; set; } - public string RabbitMQUsername { get; set; } - public string RabbitMQPassword { get; set; } - public string RabbitMQVHost { get; set; } - public string ExchangeName { get; set; } - public List QueueMappings { get; set; } + public required string RabbitMQHost { get; set; } + public required string RabbitMQUsername { get; set; } + public required string RabbitMQPassword { get; set; } + public required string RabbitMQVHost { get; set; } + public required string ExchangeName { get; set; } + public List QueueMappings { get; set; } = new(); } diff --git a/OdectyStat/Dto/QueueMapping.cs b/OdectyStat/Dto/QueueMapping.cs index 1191a2e..3f7962d 100644 --- a/OdectyStat/Dto/QueueMapping.cs +++ b/OdectyStat/Dto/QueueMapping.cs @@ -1,7 +1,7 @@ namespace OdectyStat1.Dto; public class QueueMapping { - public string QueueName { get; set; } - public string RoutingKey { get; set; } - public string ExchangeName { get; set; } + public required string QueueName { get; set; } + public string? RoutingKey { get; set; } + public required string ExchangeName { get; set; } } diff --git a/OdectyStat/OdectyStat1.csproj b/OdectyStat/OdectyStat1.csproj index 14b6e3e..223ea96 100644 --- a/OdectyStat/OdectyStat1.csproj +++ b/OdectyStat/OdectyStat1.csproj @@ -1,12 +1,14 @@ - + - net8.0 + net10.0 enable enable + + @@ -15,28 +17,32 @@ + + + + + - + - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all - - - - - + + + + - - + + - - - - + + + + diff --git a/OdectyStat/Program.cs b/OdectyStat/Program.cs index dd4cd45..05e5ac6 100644 --- a/OdectyStat/Program.cs +++ b/OdectyStat/Program.cs @@ -12,6 +12,7 @@ using OpenTelemetry.Metrics; using OpenTelemetry.Resources; using OpenTelemetry.Trace; +using RabbitMQ.Client; const string ServiceName = "OdectyStat"; @@ -89,10 +90,17 @@ name: "postgres-diagnostics", tags: new[] { "ready" }) .AddRabbitMQ( - (sp, opts) => + async (sp) => { var s = sp.GetRequiredService>().Value; - opts.ConnectionUri = new Uri($"amqp://{Uri.EscapeDataString(s.RabbitMQUsername)}:{Uri.EscapeDataString(s.RabbitMQPassword)}@{s.RabbitMQHost}/{Uri.EscapeDataString(s.RabbitMQVHost)}"); + var factory = new ConnectionFactory + { + UserName = s.RabbitMQUsername, + Password = s.RabbitMQPassword, + HostName = s.RabbitMQHost, + VirtualHost = s.RabbitMQVHost + }; + return await factory.CreateConnectionAsync(); }, name: "rabbitmq", tags: new[] { "ready" }); diff --git a/OdectyStat/Properties/launchSettings.json b/OdectyStat/Properties/launchSettings.json new file mode 100644 index 0000000..3b7b743 --- /dev/null +++ b/OdectyStat/Properties/launchSettings.json @@ -0,0 +1,12 @@ +{ + "profiles": { + "OdectyStat1": { + "commandName": "Project", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + }, + "applicationUrl": "https://localhost:60277;http://localhost:60278" + } + } +} \ No newline at end of file