From 349e811a3b39e1def09e2983fc667758f9fa1c08 Mon Sep 17 00:00:00 2001 From: yanniboi Date: Tue, 10 Dec 2024 08:39:42 +0000 Subject: [PATCH 1/4] By yanniboi: Added error replay for HubSpot Academy validation errors. --- MessageBusReader/Program.cs | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/MessageBusReader/Program.cs b/MessageBusReader/Program.cs index 506a265..18f07f0 100644 --- a/MessageBusReader/Program.cs +++ b/MessageBusReader/Program.cs @@ -15,6 +15,8 @@ class Program private static ServiceBusClient _client; private static ServiceBusProcessor _processor; + private const string QueueName = "error"; + private static TaskCompletionSource _taskCompletionSource; private static Task _loopTask; private static int _completeCounter = 0; @@ -54,7 +56,7 @@ static async Task MainAsync() ReceiveMode = ServiceBusReceiveMode.PeekLock, }; - _processor = _client.CreateProcessor("error", options); + _processor = _client.CreateProcessor(QueueName, options); _processor.ProcessMessageAsync += ProcessMessagesAsync; _processor.ProcessErrorAsync += ExceptionReceivedHandler; @@ -105,6 +107,17 @@ private static async Task ProcessMessagesAsync(ProcessMessageEventArgs args) string type = typeValue.ToString(); + // Academy class/chapter create race condition. + if (type == "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts") + { + if (message.ContainsError("VALIDATION_ERROR")) + { + await ReturnToSource(args, _delay); + _delay++; + return; + } + } + // SynchroniseConsentPreferences if (type == "Edrington.Data.Consumer.Commands.SynchroniseConsentPreferences, Edrington.Data") { @@ -256,11 +269,9 @@ static async Task MoveDeadletter() SubQueue = SubQueue.DeadLetter }; - string queueName = "error"; - - _processor = _client.CreateProcessor(queueName, options); + _processor = _client.CreateProcessor(QueueName, options); - _processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, queueName); + _processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, QueueName); _processor.ProcessErrorAsync += ExceptionReceivedHandler; _taskCompletionSource = new TaskCompletionSource(); From 0fe9d013f8107b162434cbbd8813aa1eab999d2e Mon Sep 17 00:00:00 2001 From: yanniboi Date: Tue, 10 Dec 2024 15:03:22 +0000 Subject: [PATCH 2/4] By yanniboi: Move Academy message handling into separate class. --- MessageBusReader/Program.cs | 20 ++- .../AcademyProcessing.cs | 136 ++++++++++++++++++ 2 files changed, 149 insertions(+), 7 deletions(-) create mode 100644 MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs diff --git a/MessageBusReader/Program.cs b/MessageBusReader/Program.cs index 18f07f0..b451833 100644 --- a/MessageBusReader/Program.cs +++ b/MessageBusReader/Program.cs @@ -1,4 +1,6 @@ -namespace MessageBusReader +using MessageBusReader.QueueProcessingHandlers; + +namespace MessageBusReader { using System; using System.Collections.Generic; @@ -15,7 +17,12 @@ class Program private static ServiceBusClient _client; private static ServiceBusProcessor _processor; + // Customise your operational params. private const string QueueName = "error"; + private const string Env = "DEV_CONNECTION_STRING"; + + // Default handler unless overridden in Main. + private static Func _messageHandler = ProcessMessagesAsync; private static TaskCompletionSource _taskCompletionSource; private static Task _loopTask; @@ -32,15 +39,14 @@ static async Task Main(string[] args) var dotenv = Path.Combine(root, ".env"); DotEnv.Load(dotenv); - string env; - env = "PRODUCTION_CONNECTION_STRING"; - // env = "QA_CONNECTION_STRING"; - // env = "DEV_CONNECTION_STRING"; - var connectionString = Environment.GetEnvironmentVariable(env); + var connectionString = Environment.GetEnvironmentVariable(Env); // Connect to error queue _client = new ServiceBusClient(connectionString); + // _messageHandler = AcademyProcessing.ProcessAcademyMessagesAsync; + // _messageHandler = AcademyProcessing.ProcessAcademyDeadletterAsync; + // _messageHandler = AcademyProcessing.ProcessCheckTypesAsync; // await MainAsync(); // Switch to this to move deadletter back to the error queue @@ -58,7 +64,7 @@ static async Task MainAsync() _processor = _client.CreateProcessor(QueueName, options); - _processor.ProcessMessageAsync += ProcessMessagesAsync; + _processor.ProcessMessageAsync += _messageHandler; _processor.ProcessErrorAsync += ExceptionReceivedHandler; _taskCompletionSource = new TaskCompletionSource(); diff --git a/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs b/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs new file mode 100644 index 0000000..fce8ae1 --- /dev/null +++ b/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs @@ -0,0 +1,136 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; + +namespace MessageBusReader.QueueProcessingHandlers; + +public class AcademyProcessing +{ + private static int _messageCount = 0; + private static List _foundTypes = new List(); + + public static async Task ProcessAcademyDeadletterAsync(ProcessMessageEventArgs args) + { + ServiceBusReceivedMessage message = args.Message; + + _messageCount = Interlocked.Increment(ref _messageCount); + + Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); + + if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) + { + Console.WriteLine("Unable to get message type - Moving to deadletter"); + await args.DeadLetterMessageAsync(args.Message); + return; + } + + var academyTypes = new[] + { + "Edrington.Academy.Contracts.Events.AcademyInteracted, Edrington.Academy.Contracts", + "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts", + }; + string type = typeValue.ToString(); + + // Academy class/chapter create race condition. + if (!academyTypes.Contains(type)) + { + Console.WriteLine($"Type {type} is irrelevant - Moving to deadletter"); + await args.DeadLetterMessageAsync(args.Message); + return; + } + + return; + } + + + public static async Task ProcessCheckTypesAsync(ProcessMessageEventArgs args) + { + ServiceBusReceivedMessage message = args.Message; + + _messageCount = Interlocked.Increment(ref _messageCount); + + // Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); + + if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) + { + // Console.WriteLine("Unable to get message type - Moving to deadletter"); + await args.DeadLetterMessageAsync(args.Message); + return; + } + + string type = typeValue.ToString(); + + if (_foundTypes.Contains(type)) + { + return; + } + + _foundTypes.Add(type); + Console.WriteLine($"Found new type {type}"); + return; + } + + public static async Task ProcessAcademyMessagesAsync(ProcessMessageEventArgs args) + { + ServiceBusReceivedMessage message = args.Message; + + _messageCount = Interlocked.Increment(ref _messageCount); + + Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); + + if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) + { + return; + } + + var academyTypes = new[] + { + "Edrington.Academy.Contracts.Events.AcademyInteracted, Edrington.Academy.Contracts", + "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts", + }; + string type = typeValue.ToString(); + + if (!academyTypes.Contains(type)) + { + return; + } + + if (message.ContainsError("VALIDATION_ERROR")) + { + Console.WriteLine($"Found validation error: {message}"); + await ReturnToSource(args, _delay); + _delay++; + return; + } + + if (message.ContainsError("Object reference not set to an instance of an object")) + { + Console.WriteLine($"Found missing user error: {message}"); + await ReturnToSource(args, _delay); + _delay++; + return; + } + + Console.WriteLine("Error message:"); + Console.WriteLine(message.GetErrorMessage()); + + var removeMessageIds = new[] + { + "d9d0141a-f997-468c-831d-a4c155e74b64", + "a72de5ea-4bd1-49e7-9463-0166ce04479e", + "2354c8c2-c86d-495d-8a97-520990e1631e", + }; + if (removeMessageIds.Contains(message.MessageId)) + { + Console.WriteLine($"Removing message."); + // await CompleteMessage(args); + return; + } + + // Console.WriteLine($"Not processed message: {message}"); + return; + } +} From c3c87aec0139165dc2c3f11f49907c7b5e83c530 Mon Sep 17 00:00:00 2001 From: yanniboi Date: Tue, 10 Dec 2024 15:06:03 +0000 Subject: [PATCH 3/4] By yanniboi: Fix method access. --- MessageBusReader/Program.cs | 4 ++-- .../QueueProcessingHandlers/AcademyProcessing.cs | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/MessageBusReader/Program.cs b/MessageBusReader/Program.cs index b451833..45e2781 100644 --- a/MessageBusReader/Program.cs +++ b/MessageBusReader/Program.cs @@ -310,7 +310,7 @@ private static async Task ReturnDeadletterAsync(ProcessMessageEventArgs args, st await CompleteMessage(args); } - private static async Task CompleteMessage(ProcessMessageEventArgs args) + public static async Task CompleteMessage(ProcessMessageEventArgs args) { await args.CompleteMessageAsync(args.Message); @@ -326,7 +326,7 @@ private static async Task ReturnToSource(ProcessMessageEventArgs args) await ReturnToSource(args, 0); } - private static async Task ReturnToSource(ProcessMessageEventArgs args, int delay) + public static async Task ReturnToSource(ProcessMessageEventArgs args, int delay) { string source = GetSource(args); diff --git a/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs b/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs index fce8ae1..3d11349 100644 --- a/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs +++ b/MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs @@ -11,6 +11,7 @@ public class AcademyProcessing { private static int _messageCount = 0; private static List _foundTypes = new List(); + private static int _delay = 0; public static async Task ProcessAcademyDeadletterAsync(ProcessMessageEventArgs args) { @@ -101,7 +102,7 @@ public static async Task ProcessAcademyMessagesAsync(ProcessMessageEventArgs arg if (message.ContainsError("VALIDATION_ERROR")) { Console.WriteLine($"Found validation error: {message}"); - await ReturnToSource(args, _delay); + await Program.ReturnToSource(args, _delay); _delay++; return; } @@ -109,7 +110,7 @@ public static async Task ProcessAcademyMessagesAsync(ProcessMessageEventArgs arg if (message.ContainsError("Object reference not set to an instance of an object")) { Console.WriteLine($"Found missing user error: {message}"); - await ReturnToSource(args, _delay); + await Program.ReturnToSource(args, _delay); _delay++; return; } @@ -126,7 +127,7 @@ public static async Task ProcessAcademyMessagesAsync(ProcessMessageEventArgs arg if (removeMessageIds.Contains(message.MessageId)) { Console.WriteLine($"Removing message."); - // await CompleteMessage(args); + await Program.CompleteMessage(args); return; } From d40099df65f6a9be853dc7d368a69172a8d79550 Mon Sep 17 00:00:00 2001 From: yanniboi Date: Tue, 10 Dec 2024 15:19:38 +0000 Subject: [PATCH 4/4] By yanniboi: Remove academy from main. --- MessageBusReader/Program.cs | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/MessageBusReader/Program.cs b/MessageBusReader/Program.cs index 45e2781..54fcf66 100644 --- a/MessageBusReader/Program.cs +++ b/MessageBusReader/Program.cs @@ -113,17 +113,6 @@ private static async Task ProcessMessagesAsync(ProcessMessageEventArgs args) string type = typeValue.ToString(); - // Academy class/chapter create race condition. - if (type == "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts") - { - if (message.ContainsError("VALIDATION_ERROR")) - { - await ReturnToSource(args, _delay); - _delay++; - return; - } - } - // SynchroniseConsentPreferences if (type == "Edrington.Data.Consumer.Commands.SynchroniseConsentPreferences, Edrington.Data") {