diff --git a/MessageBusReader/Program.cs b/MessageBusReader/Program.cs index 506a265..54fcf66 100644 --- a/MessageBusReader/Program.cs +++ b/MessageBusReader/Program.cs @@ -1,4 +1,6 @@ -namespace MessageBusReader +using MessageBusReader.QueueProcessingHandlers; + +namespace MessageBusReader { using System; using System.Collections.Generic; @@ -15,6 +17,13 @@ class Program private static ServiceBusClient _client; private static ServiceBusProcessor _processor; + // Customise your operational params. + private const string QueueName = "error"; + private const string Env = "DEV_CONNECTION_STRING"; + + // Default handler unless overridden in Main. + private static Func _messageHandler = ProcessMessagesAsync; + private static TaskCompletionSource _taskCompletionSource; private static Task _loopTask; private static int _completeCounter = 0; @@ -30,15 +39,14 @@ static async Task Main(string[] args) var dotenv = Path.Combine(root, ".env"); DotEnv.Load(dotenv); - string env; - env = "PRODUCTION_CONNECTION_STRING"; - // env = "QA_CONNECTION_STRING"; - // env = "DEV_CONNECTION_STRING"; - var connectionString = Environment.GetEnvironmentVariable(env); + var connectionString = Environment.GetEnvironmentVariable(Env); // Connect to error queue _client = new ServiceBusClient(connectionString); + // _messageHandler = AcademyProcessing.ProcessAcademyMessagesAsync; + // _messageHandler = AcademyProcessing.ProcessAcademyDeadletterAsync; + // _messageHandler = AcademyProcessing.ProcessCheckTypesAsync; // await MainAsync(); // Switch to this to move deadletter back to the error queue @@ -54,9 +62,9 @@ static async Task MainAsync() ReceiveMode = ServiceBusReceiveMode.PeekLock, }; - _processor = _client.CreateProcessor("error", options); + _processor = _client.CreateProcessor(QueueName, options); - _processor.ProcessMessageAsync += ProcessMessagesAsync; + _processor.ProcessMessageAsync += _messageHandler; _processor.ProcessErrorAsync += ExceptionReceivedHandler; _taskCompletionSource = new TaskCompletionSource(); @@ -256,11 +264,9 @@ static async Task MoveDeadletter() SubQueue = SubQueue.DeadLetter }; - string queueName = "error"; - - _processor = _client.CreateProcessor(queueName, options); + _processor = _client.CreateProcessor(QueueName, options); - _processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, queueName); + _processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, QueueName); _processor.ProcessErrorAsync += ExceptionReceivedHandler; _taskCompletionSource = new TaskCompletionSource(); @@ -293,7 +299,7 @@ private static async Task ReturnDeadletterAsync(ProcessMessageEventArgs args, st await CompleteMessage(args); } - private static async Task CompleteMessage(ProcessMessageEventArgs args) + public static async Task CompleteMessage(ProcessMessageEventArgs args) { await args.CompleteMessageAsync(args.Message); @@ -309,7 +315,7 @@ private static async Task ReturnToSource(ProcessMessageEventArgs args) await ReturnToSource(args, 0); } - private static async Task ReturnToSource(ProcessMessageEventArgs args, int delay) + public static async Task ReturnToSource(ProcessMessageEventArgs args, int delay) { string source = GetSource(args); diff --git a/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs b/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs new file mode 100644 index 0000000..3d11349 --- /dev/null +++ b/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs @@ -0,0 +1,137 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace MessageBusReader.QueueProcessingHandlers; + +public class AcademyProcessing +{ + private static int _messageCount = 0; + private static List _foundTypes = new List(); + private static int _delay = 0; + + public static async Task ProcessAcademyDeadletterAsync(ProcessMessageEventArgs args) + { + ServiceBusReceivedMessage message = args.Message; + + _messageCount = Interlocked.Increment(ref _messageCount); + + Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); + + if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) + { + Console.WriteLine("Unable to get message type - Moving to deadletter"); + await args.DeadLetterMessageAsync(args.Message); + return; + } + + var academyTypes = new[] + { + "Edrington.Academy.Contracts.Events.AcademyInteracted, Edrington.Academy.Contracts", + "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts", + }; + string type = typeValue.ToString(); + + // Academy class/chapter create race condition. + if (!academyTypes.Contains(type)) + { + Console.WriteLine($"Type {type} is irrelevant - Moving to deadletter"); + await args.DeadLetterMessageAsync(args.Message); + return; + } + + return; + } + + + public static async Task ProcessCheckTypesAsync(ProcessMessageEventArgs args) + { + ServiceBusReceivedMessage message = args.Message; + + _messageCount = Interlocked.Increment(ref _messageCount); + + // Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); + + if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) + { + // Console.WriteLine("Unable to get message type - Moving to deadletter"); + await args.DeadLetterMessageAsync(args.Message); + return; + } + + string type = typeValue.ToString(); + + if (_foundTypes.Contains(type)) + { + return; + } + + _foundTypes.Add(type); + Console.WriteLine($"Found new type {type}"); + return; + } + + public static async Task ProcessAcademyMessagesAsync(ProcessMessageEventArgs args) + { + ServiceBusReceivedMessage message = args.Message; + + _messageCount = Interlocked.Increment(ref _messageCount); + + Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); + + if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) + { + return; + } + + var academyTypes = new[] + { + "Edrington.Academy.Contracts.Events.AcademyInteracted, Edrington.Academy.Contracts", + "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts", + }; + string type = typeValue.ToString(); + + if (!academyTypes.Contains(type)) + { + return; + } + + if (message.ContainsError("VALIDATION_ERROR")) + { + Console.WriteLine($"Found validation error: {message}"); + await Program.ReturnToSource(args, _delay); + _delay++; + return; + } + + if (message.ContainsError("Object reference not set to an instance of an object")) + { + Console.WriteLine($"Found missing user error: {message}"); + await Program.ReturnToSource(args, _delay); + _delay++; + return; + } + + Console.WriteLine("Error message:"); + Console.WriteLine(message.GetErrorMessage()); + + var removeMessageIds = new[] + { + "d9d0141a-f997-468c-831d-a4c155e74b64", + "a72de5ea-4bd1-49e7-9463-0166ce04479e", + "2354c8c2-c86d-495d-8a97-520990e1631e", + }; + if (removeMessageIds.Contains(message.MessageId)) + { + Console.WriteLine($"Removing message."); + await Program.CompleteMessage(args); + return; + } + + // Console.WriteLine($"Not processed message: {message}"); + return; + } +}