Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Setup .NET
uses: actions/setup-dotnet@v5
with:
dotnet-version: 8.0.x
dotnet-version: 10.0.x

- name: Restore dependencies
run: dotnet restore
Expand Down
12 changes: 11 additions & 1 deletion OdectyStat/Application/GaugeService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public async Task AddNewValue(NewValue newValue)
{
Console.WriteLine("Add new value " + newValue.GaugeId);
var gauge = await context.GaugeRepository.GetGauge(newValue.GaugeId);
if (gauge == null)
{
logger.LogError("Gauge {gaugeId} not found.", newValue.GaugeId);
throw new Exception($"Gauge {newValue.GaugeId} not found.");
}
gauge.SetNewValue(newValue.Value, newValue.Datetime);
await context.SaveChangesAsync();
await context.MessageQueue.Publish(new { gaugeId = newValue.GaugeId, value = gauge.LastValue }, MessageQueueRoutingKeys.Odecty_Gauge_Lastvaluechanged);
Expand Down Expand Up @@ -82,7 +87,7 @@ public async Task AddNewValue(NewValue newValue)
public async Task AddIncrement(int gaugeId, decimal increment, DateTime datetime)
{
var gauge = await context.GaugeRepository.GetGauge(gaugeId);
gauge.AddIncrement(increment, datetime);
gauge?.AddIncrement(increment, datetime);
await context.SaveChangesAsync();
}

Expand All @@ -96,6 +101,11 @@ public async Task GaugeRecognizedSucceeded(int gaugeId, string imagePath, decima
{
logger.LogInformation("Recognition succeeded for gauge {gaugeId} with image {imagePath} and value {value}", gaugeId, imagePath, value);
var gauge = await context.GaugeRepository.GetGauge(gaugeId);
if (gauge == null)
{
logger.LogError("Gauge {gaugeId} not found", gaugeId);
throw new Exception($"Gauge {gaugeId} not found.");
}
var localDateTime = dateTime.ToLocalTime();
bool valid = false;
if (gauge.LastMeasurement != null)
Expand Down
3 changes: 2 additions & 1 deletion OdectyStat/Business/ComputeService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using OdectyStat1.Business;
#nullable disable
using OdectyStat1.Business;
using OdectyStat1.Entities;
namespace OdectyStat1
{
Expand Down
4 changes: 3 additions & 1 deletion OdectyStat/Business/ComputeService2.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using OdectyStat1.Contracts;
#nullable disable

using OdectyStat1.Contracts;
using OdectyStat1.Entities;

namespace OdectyStat1.Business
Expand Down
3 changes: 2 additions & 1 deletion OdectyStat/Business/ComputeService3.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using OdectyStat1.Contracts;
#nullable disable
using OdectyStat1.Contracts;
using OdectyStat1.Entities;

namespace OdectyStat1.Business
Expand Down
6 changes: 3 additions & 3 deletions OdectyStat/Business/Gauge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ public class Gauge
{
[Key]
public int Id { get; set; }
public string Description { get; set; }
public string Type { get; set; }
public string? Description { get; set; }
public required string Type { get; set; }
public decimal LastValue { get; set; }

[NotMapped]
public GaugeMeasurement LastMeasurement { get; set; }
public GaugeMeasurement? LastMeasurement { get; set; }
public decimal? MaxValuePerHour { get; set; }
public decimal InitialValue { get; set; }

Expand Down
1 change: 0 additions & 1 deletion OdectyStat/Contracts/IGaugeContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ public interface IGaugeContext
{
IGaugeRepository GaugeRepository { get; }
IMeasurementDayRepository MeasurementDayRepository { get; }
IExcelProvider ExcelProvider { get; }
IMeasurementStatisticsRepository MeasurementStatisticsRepository { get; }
IMeasurementRepository MeasurementRepository { get; }
IHomeAssistantStatisticsRepository HomeAssistantStatisticsRepository { get; }
Expand Down
2 changes: 1 addition & 1 deletion OdectyStat/Contracts/IGaugeRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ namespace OdectyStat1.Contracts
{
public interface IGaugeRepository
{
Task<Gauge> GetGauge(int id);
Task<Gauge?> GetGauge(int id);
}
}
2 changes: 1 addition & 1 deletion OdectyStat/Contracts/IMeasurementStatisticsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ namespace OdectyStat1.Contracts
{
public interface IMeasurementStatisticsRepository
{
Task<GaugeMeasuerementStatistics> GetLast(int gaugeId);
Task<GaugeMeasuerementStatistics?> GetLast(int gaugeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!consumer.IsConsuming)
{
consumer.StartConsuming();
await consumer.StartConsuming();
}

if (stoppingToken.IsCancellationRequested)
{
consumer.StopConsuming();
await consumer.StopConsuming();
}
}

Expand All @@ -49,8 +49,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

foreach (var consumer in consumers)
{
consumer.StopConsuming();
consumer.Dispose();
await consumer.StopConsuming();
await consumer.DisposeAsync();
}
}
}
6 changes: 3 additions & 3 deletions OdectyStat/DataLayer/Consumers/BinaryMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ public BinaryMessageConsumer(
this.logger = logger;
}

protected override async void ConsumerReceived(object? sender, BasicDeliverEventArgs e)
protected override async Task ConsumerReceived(object? sender, BasicDeliverEventArgs e)
{
try
{
await handler.HandleAsync(e.Body, CancellationToken.None);
AcknowledgeMessage(e.DeliveryTag);
await AcknowledgeMessage(e.DeliveryTag);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to process binary message from queue {QueueName}", handler.QueueName);
RejectMessage(e.DeliveryTag, requeue: false);
await RejectMessage(e.DeliveryTag, requeue: false);
}
}
}
4 changes: 2 additions & 2 deletions OdectyStat/DataLayer/Consumers/ConsumerBackgroundService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!consumer.IsConsuming)
{
consumer.StartConsuming();
await consumer.StartConsuming();
}
if (stoppingToken.IsCancellationRequested)
{
consumer.StopConsuming();
await consumer.StopConsuming();
}
}
// Placeholder for background service logic
Expand Down
6 changes: 3 additions & 3 deletions OdectyStat/DataLayer/Consumers/IRabbitMQConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ public interface IRabbitMQConsumer
{
bool IsConsuming { get; }

void Dispose();
void StartConsuming();
void StopConsuming();
ValueTask DisposeAsync();
Task StartConsuming();
Task StopConsuming();
}
20 changes: 14 additions & 6 deletions OdectyStat/DataLayer/Consumers/MQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public MQClient(IServiceProvider serviceProvider, RabbitMQProvider rabbitMQProvi
this.logger = logger;
}

protected override async void ConsumerReceived(object? sender, BasicDeliverEventArgs e)
protected override async Task ConsumerReceived(object? sender, BasicDeliverEventArgs e)
{
logger.LogInformation("Data received at: {time}", DateTimeOffset.Now);
if (!inProcess)
Expand All @@ -34,14 +34,22 @@ protected override async void ConsumerReceived(object? sender, BasicDeliverEvent
var body = Encoding.UTF8.GetString(e.Body.ToArray());
logger.LogInformation("Data received {body}", body);
var newValue = Newtonsoft.Json.JsonConvert.DeserializeObject<NewValue>(body);
using var scope = serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetService<IGaugeService>();
await service.AddNewValue(newValue);
AcknowledgeMessage(e.DeliveryTag);
if (newValue != null)
{
using var scope = serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetService<IGaugeService>();
await service!.AddNewValue(newValue);
await AcknowledgeMessage(e.DeliveryTag);
}
else
{
await RejectMessage(e.DeliveryTag);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing data: {message}", ex.Message);
await RejectMessage(e.DeliveryTag);
}
finally
{
Expand All @@ -51,7 +59,7 @@ protected override async void ConsumerReceived(object? sender, BasicDeliverEvent
else
{
//redeliver message
RejectMessage(e.DeliveryTag, true);
await RejectMessage(e.DeliveryTag, true);
}
}
}
Expand Down
64 changes: 41 additions & 23 deletions OdectyStat/DataLayer/Consumers/RabbitMQConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,94 @@
using RabbitMQ.Client.Events;

namespace OdectyStat1.DataLayer.Consumers;
public abstract class RabbitMQConsumer : IDisposable, IRabbitMQConsumer
public abstract class RabbitMQConsumer : IAsyncDisposable, IRabbitMQConsumer
{
private readonly RabbitMQProvider rabbitMQProvider;
private readonly ILogger<RabbitMQConsumer> logger;
private readonly string queueName;
private IModel model;
private IChannel? model;

public bool IsConsuming => model != null && !model.IsClosed;

public RabbitMQConsumer(RabbitMQProvider rabbitMQProvider, ILogger<RabbitMQConsumer> logger, string queueName)
{
this.rabbitMQProvider = rabbitMQProvider;
this.rabbitMQProvider.ConnectionShutdown += (s, e) =>
this.rabbitMQProvider.ConnectionShutdown += async (s, e) =>
{
StopConsuming();
await StopConsuming();
logger.LogWarning("RabbitMQ connection shutdown detected. Stopped consuming.");
};
this.logger = logger;
this.queueName = queueName;
}

public void StartConsuming()
public async Task StartConsuming()
{
if (model == null || model.IsClosed)
{
model = rabbitMQProvider.CreateModel();
model = await rabbitMQProvider.CreateModel();
if (model != null)
{
model.ModelShutdown += (s, e) =>
model.ChannelShutdownAsync += async (s, e) =>
{
StopConsuming();
await StopConsuming();
logger.LogWarning("RabbitMQ model shutdown detected. Stopped consuming from queue: {QueueName}", queueName);
};
}
}
if (model == null)
{
logger.LogWarning("Failed to create RabbitMQ model for consuming.");
StopConsuming();
await StopConsuming();
return;
}
var consumer = new EventingBasicConsumer(model);
consumer.Received += ConsumerReceived;
model.BasicConsume(queueName, false, consumer);
var consumer = new AsyncEventingBasicConsumer(model);
consumer.ReceivedAsync += ConsumerReceived;
await model.BasicConsumeAsync(queueName, false, consumer);
logger.LogInformation("Started consuming from queue: {QueueName}", queueName);
}

protected abstract void ConsumerReceived(object? sender, BasicDeliverEventArgs e);
protected abstract Task ConsumerReceived(object? sender, BasicDeliverEventArgs e);

protected void AcknowledgeMessage(ulong deliveryTag)
protected async Task AcknowledgeMessage(ulong deliveryTag)
{
model?.BasicAck(deliveryTag, false);
if (model != null)
{
await model.BasicAckAsync(deliveryTag, false);
}
}

protected void RejectMessage(ulong deliveryTag, bool requeue = false)
protected async Task RejectMessage(ulong deliveryTag, bool requeue = false)
{
model?.BasicReject(deliveryTag, requeue);
if (model != null)
{
await model.BasicRejectAsync(deliveryTag, requeue);
}
}

public void StopConsuming()
public async Task StopConsuming()
{
model?.Close();
model?.Dispose();
if (model != null)
{
if (!model.IsClosed)
{
await model.CloseAsync();
}
model.Dispose();
}
model = null;
}

public void Dispose()
public async ValueTask DisposeAsync()
{
model?.Close();
model?.Dispose();
if (model != null)
{
if (!model.IsClosed)
{
await model.CloseAsync();
}
await model.DisposeAsync();
}
model = null;
}
}
22 changes: 15 additions & 7 deletions OdectyStat/DataLayer/Consumers/RecognizedFailed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public RecognizedFailed(IServiceProvider serviceProvider, IOptions<GaugeImageLoc
this.logger = logger;
}

protected override void ConsumerReceived(object? sender, BasicDeliverEventArgs e)
protected override async Task ConsumerReceived(object? sender, BasicDeliverEventArgs e)
{
logger.LogInformation("Data received at: {time}", DateTimeOffset.Now);
if (!inProcess)
Expand All @@ -36,15 +36,23 @@ protected override void ConsumerReceived(object? sender, BasicDeliverEventArgs e
inProcess = true;
var body = Encoding.UTF8.GetString(e.Body.ToArray());
logger.LogInformation("Data received {body}", body);
dynamic message = Newtonsoft.Json.JsonConvert.DeserializeObject<dynamic>(body);
using var scope = serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetService<IGaugeService>();
service.GaugeRecognizedFailed(int.Parse(message.gaugeId.ToString()), message.file.ToString());
AcknowledgeMessage(e.DeliveryTag);
dynamic? message = Newtonsoft.Json.JsonConvert.DeserializeObject<dynamic>(body);
if (message != null)
{
using var scope = serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetService<IGaugeService>();
service!.GaugeRecognizedFailed(int.Parse(message.gaugeId.ToString()), message.file.ToString());
await AcknowledgeMessage(e.DeliveryTag);
}
else
{
await RejectMessage(e.DeliveryTag);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing data: {message}", ex.Message);
await RejectMessage(e.DeliveryTag);
}
finally
{
Expand All @@ -54,7 +62,7 @@ protected override void ConsumerReceived(object? sender, BasicDeliverEventArgs e
else
{
//redeliver message
RejectMessage(e.DeliveryTag, true);
await RejectMessage(e.DeliveryTag, true);
}
}
}
Expand Down
Loading
Loading