diff --git a/.gitignore b/.gitignore index 5cbef21..0b70208 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,7 @@ nunit-*.xml !.idea/codeStyles/ # VS Code IDE -.vscode/ \ No newline at end of file +.vscode/ + +# Trae IDE +.trae/ \ No newline at end of file diff --git a/GIT_FLOW.md b/.spec/GIT_FLOW.md similarity index 100% rename from GIT_FLOW.md rename to .spec/GIT_FLOW.md diff --git a/README.md b/README.md index ca6a3e5..63e6939 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![License](https://img.shields.io/github/license/longkerdandy/SparklerNet)](https://github.com/longkerdandy/SparklerNet/blob/main/LICENSE) [![Language](https://img.shields.io/github/languages/top/longkerdandy/SparklerNet)](https://github.com/longkerdandy/SparklerNet) -[![.NET](https://github.com/longkerdandy/SparklerNet/actions/workflows/dotnet.yml/badge.svg)](https://github.com/longkerdandy/SparklerNet/actions/workflows/dotnet.yml) +[![Build & Test](https://github.com/longkerdandy/SparklerNet/actions/workflows/dotnet.yml/badge.svg)](https://github.com/longkerdandy/SparklerNet/actions/workflows/dotnet.yml) [![CodeQL Advanced](https://github.com/longkerdandy/SparklerNet/actions/workflows/codeql.yml/badge.svg)](https://github.com/longkerdandy/SparklerNet/actions/workflows/codeql.yml) [![NuGet Version](https://img.shields.io/nuget/v/SparklerNet)](https://www.nuget.org/packages/SparklerNet/) [![NuGet Downloads](https://img.shields.io/nuget/dt/SparklerNet)](https://www.nuget.org/packages/SparklerNet/) @@ -43,7 +43,7 @@ The library aims to fully implement the complete Sparkplug protocol, with planne - ✅ Default wildcard topic support (spBv1.0/#) - ✅ Specific group and edge node subscription support - ✅ Sparkplug Host Application Message Ordering -- ⬜ Cache Edge Node and Device birth certificates +- ✅ Cache Edge Node and Device online status ### Message Processing @@ -73,6 +73,21 @@ The library aims to fully implement the complete Sparkplug protocol, with planne - ⬜ Reconnection logic with exponential backoff - ✅ Configuration validation +## Eclipse™ Sparkplug™ TCK Compatibility + +The following are the compatibility test results against the Eclipse Sparkplug Test Compatibility Kit (TCK) available at https://github.com/eclipse-sparkplug/sparkplug/tree/master/tck: + +### Host Application Tests + +| Test | Status | +|------|--------| +| Session Establishment Test | ✅ Passed | +| Session Termination Test | ✅ Passed | +| Send Command Test | ✅ Passed | +| Edge Session Termination Test | ✅ Passed | +| Message Ordering Test | ✅ Passed | +| Multiple MQTT Server (Broker) Test | ❌ Not supported yet | + ## Installation Install SparklerNet via NuGet Package Manager: @@ -94,6 +109,15 @@ Or reference it directly in your project: Here's a simple example of a Sparkplug host application: ```csharp +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using MQTTnet; +using SparklerNet.Core.Constants; +using SparklerNet.Core.Events; +using SparklerNet.HostApplication; +using SparklerNet.HostApplication.Caches; +using SparklerNet.HostApplication.Extensions; + // Create MQTT client options var mqttOptions = new MqttClientOptionsBuilder() .WithTcpServer("localhost", 1883) @@ -107,15 +131,36 @@ var sparkplugOptions = new SparkplugClientOptions HostApplicationId = "MyHostApplication" }; -// Create logger -var loggerFactory = LoggerFactory.Create(builder => +// Create the logger factory +using var loggerFactory = LoggerFactory.Create(builder => { builder.AddConsole(); + builder.SetMinimumLevel(LogLevel.Information); }); -var logger = loggerFactory.CreateLogger(); -// Create and start host application -var hostApplication = new SparkplugHostApplication(mqttOptions, sparkplugOptions, logger); +// Create the dependency injection container +var services = new ServiceCollection(); + +// Register the singleton services +services.AddSingleton(mqttOptions); +services.AddSingleton(sparkplugOptions); +services.AddSingleton(loggerFactory); +services.AddSingleton(loggerFactory); + +// Register cache services +services.AddMemoryCache(); +services.AddHybridCache(); + +// Register the SparklerNet services +services.AddSingleton(); +services.AddSingleton(); +services.AddSingleton(); + +// Build service provider +var serviceProvider = services.BuildServiceProvider(); + +// Resolve SparkplugHostApplication from the container +var hostApplication = serviceProvider.GetRequiredService(); // Subscribe to DBIRTH event hostApplication.DeviceBirthReceivedAsync += args => { @@ -136,17 +181,16 @@ await hostApplication.StopAsync(); ## Sample Application -The project includes a comprehensive sample named `SimpleHostApplication` that demonstrates a complete Sparkplug Host Application implementation with the following features: +The project includes a sample named `SimpleHostApplication` demonstrating a complete Sparkplug Host Application implementation with these core features: -- **Interactive Command-Line Interface**: Provides a user-friendly console interface with commands for controlling the application lifecycle and sending commands -- **Complete Event Handling**: Demonstrates subscription and processing of all Sparkplug message types (NBIRTH, NDATA, NDEATH, DBIRTH, DDATA, DDEATH, STATE) -- **Robust Error Handling**: Includes comprehensive exception handling throughout the application lifecycle -- **Advanced Logging System**: Implements structured logging using Serilog, providing detailed information about message reception and processing -- **Command Sending Capabilities**: Allows sending rebirth commands to both Edge Nodes and Devices with customizable parameters -- **User-Friendly Input**: Features command prompts with default values for improved user experience -- **Detailed Data Display**: Shows comprehensive information about received messages including timestamps, sequences, and all metrics with their types and values +- **Interactive CLI**: User-friendly console interface for application lifecycle management and command sending +- **Full Event Processing**: Handles all Sparkplug message types with detailed data display +- **Command Capabilities**: Sends Rebirth and ScanRate commands to Edge Nodes (NCMD) and Devices (DCMD) +- **Configuration Profiles**: Supports multiple profiles (mimic, tck) via `--profile` command line argument +- **Dependency Injection**: Uses Microsoft.Extensions.DependencyInjection for service management +- **Structured Logging**: Implements Serilog for detailed message and processing logging -Please refer to the `SparklerNet.Samples` project for the complete implementation and to see these features in action. +Refer to the `SparklerNet.Samples` project for the complete implementation. ## Project Structure @@ -175,6 +219,10 @@ Please refer to the `SparklerNet.Samples` project for the complete implementatio │ │ └── AssemblyInfo.cs # Assembly information │ └── SparklerNet.csproj # Core library project file ├── SparklerNet.Samples/ # Sample application +│ ├── Profiles/ # Configuration profiles +│ │ ├── IProfile.cs # Profile interface +│ │ ├── MimicApplicationProfile.cs # Mimic application profile +│ │ └── TCKApplicationProfile.cs # TCK application profile │ ├── Program.cs # Sample program entry point │ ├── SimpleHostApplication.cs # Simple host application implementation │ └── SparklerNet.Samples.csproj # Sample project file @@ -208,21 +256,13 @@ SparklerNet supports the following Sparkplug B message types: ## Dependencies - Google.Protobuf (3.33.0) +- Microsoft.Extensions.Caching.Hybrid (9.10.0) - Microsoft.Extensions.Caching.Memory (9.0.10) - Microsoft.Extensions.Logging (9.0.10) - MQTTnet (5.0.1.1416) - System.Net.Http (4.3.4) - System.Text.RegularExpressions (4.3.1) -## Contribution Guidelines - -Contributions via Pull Requests and Issues are welcome. Before submitting code, please ensure: - -1. Follow the project's code style and [Git Flow](GIT_FLOW.md) -2. Add necessary tests -3. Ensure all tests pass -4. Provide detailed code explanations - ## License This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. diff --git a/SparklerNet.Samples/Profiles/IProfile.cs b/SparklerNet.Samples/Profiles/IProfile.cs new file mode 100644 index 0000000..7e94589 --- /dev/null +++ b/SparklerNet.Samples/Profiles/IProfile.cs @@ -0,0 +1,22 @@ +using MQTTnet; +using SparklerNet.Core.Options; + +namespace SparklerNet.Samples.Profiles; + +/// +/// This interface defines the contract for a Sparkplug client profile. +/// +public interface IProfile +{ + /// + /// Gets the MQTT client options for the profile. + /// + /// The MQTT client options. + public MqttClientOptions GetMqttClientOptions(); + + /// + /// Gets the Sparkplug client options for the profile. + /// + /// The Sparkplug client options. + public SparkplugClientOptions GetSparkplugClientOptions(); +} \ No newline at end of file diff --git a/SparklerNet.Samples/Profiles/MimicApplicationProfile.cs b/SparklerNet.Samples/Profiles/MimicApplicationProfile.cs new file mode 100644 index 0000000..b0a2222 --- /dev/null +++ b/SparklerNet.Samples/Profiles/MimicApplicationProfile.cs @@ -0,0 +1,36 @@ +using MQTTnet; +using MQTTnet.Formatter; +using SparklerNet.Core.Constants; +using SparklerNet.Core.Options; + +namespace SparklerNet.Samples.Profiles; + +/// +/// This is a Sparkplug Host Application profile that will subscribe to the MIMIC topic. +/// The MIMIC MQTT Simulators are shared, read-only Sparkplug B sensors publishing unique Sparkplug messages with +/// temperature telemetry to the public brokers TEST.MOSQUITTO.ORG and BROKER.HIVEMQ.COM. For more information about +/// MIMIC MQTT Simulators, visit MQTTLAB.IOTSIM.IO/sparkplug +/// +public class MimicApplicationProfile : IProfile +{ + /// + public MqttClientOptions GetMqttClientOptions() + { + return new MqttClientOptionsBuilder() + .WithTcpServer("BROKER.HIVEMQ.COM", 1883) + .WithProtocolVersion(MqttProtocolVersion.V311) + .Build(); + } + + /// + public SparkplugClientOptions GetSparkplugClientOptions() + { + return new SparkplugClientOptions + { + Version = SparkplugVersion.V300, + HostApplicationId = "SparklerNetSimpleHostApp", + Subscriptions = { new MqttTopicFilterBuilder().WithTopic("spBv1.0/MIMIC/#").WithAtLeastOnceQoS().Build() }, + EnableMessageOrdering = true + }; + } +} \ No newline at end of file diff --git a/SparklerNet.Samples/Profiles/TCKApplicationProfile.cs b/SparklerNet.Samples/Profiles/TCKApplicationProfile.cs new file mode 100644 index 0000000..a37926d --- /dev/null +++ b/SparklerNet.Samples/Profiles/TCKApplicationProfile.cs @@ -0,0 +1,36 @@ +using MQTTnet; +using MQTTnet.Formatter; +using SparklerNet.Core.Constants; +using SparklerNet.Core.Options; + +namespace SparklerNet.Samples.Profiles; + +/// +/// This is a Sparkplug Host Application profile that will connect to the local MQTT broker for Eclipse™ Sparkplug™ TCK +/// tests. For more information about Eclipse™ Sparkplug™ TCK, visit +/// Eclipse™ Sparkplug™ TCK +/// +public class TckApplicationProfile : IProfile +{ + /// + public MqttClientOptions GetMqttClientOptions() + { + return new MqttClientOptionsBuilder() + .WithTcpServer("localhost", 1883) + .WithProtocolVersion(MqttProtocolVersion.V500) + .Build(); + } + + /// + public SparkplugClientOptions GetSparkplugClientOptions() + { + return new SparkplugClientOptions + { + Version = SparkplugVersion.V300, + HostApplicationId = "SparklerNetSimpleHostApp", + AlwaysSubscribeToWildcardTopic = true, + EnableMessageOrdering = true, + SeqReorderTimeout = 5000 + }; + } +} \ No newline at end of file diff --git a/SparklerNet.Samples/Program.cs b/SparklerNet.Samples/Program.cs index f10a22b..45667d0 100644 --- a/SparklerNet.Samples/Program.cs +++ b/SparklerNet.Samples/Program.cs @@ -1,15 +1,16 @@ -using MQTTnet; -using MQTTnet.Formatter; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using Serilog; using Serilog.Extensions.Logging; -using SparklerNet.Core.Constants; -using SparklerNet.Core.Options; +using SparklerNet.HostApplication; +using SparklerNet.HostApplication.Caches; +using SparklerNet.Samples.Profiles; namespace SparklerNet.Samples; internal static class Program { - public static async Task Main() + public static async Task Main(string[] args) { // Configure Serilog logging Log.Logger = new LoggerConfiguration() @@ -20,13 +21,44 @@ public static async Task Main() // Display welcome message DisplayWelcomeMessage(); - // Create client options - var mqttOptions = CreateMqttOptions(); - var sparkplugOptions = CreateSparkplugOptions(); + // Parse command line arguments and determine which profile to use + var profileName = ParseProfileArgument(args); + var profile = CreateProfile(profileName); + Log.Information("Using profile: {ProfileName}", profileName); - // Create logger factory and host application + // Get client options from the selected profile + var mqttOptions = profile.GetMqttClientOptions(); + var sparkplugOptions = profile.GetSparkplugClientOptions(); + + // Create the logger factory using var loggerFactory = new SerilogLoggerFactory(Log.Logger); - var hostApp = new SimpleHostApplication(mqttOptions, sparkplugOptions, loggerFactory); + + // Create the dependency injection container + var services = new ServiceCollection(); + + // Register the singleton services + services.AddSingleton(mqttOptions); + services.AddSingleton(sparkplugOptions); + services.AddSingleton(loggerFactory); + services.AddSingleton(loggerFactory); + + // Register cache services + services.AddMemoryCache(); + services.AddHybridCache(); + + // Register the SparklerNet services + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + // Register the SimpleHostApplication + services.AddSingleton(); + + // Build service provider + var serviceProvider = services.BuildServiceProvider(); + + // Resolve SimpleHostApplication from the container + var hostApp = serviceProvider.GetRequiredService(); hostApp.SubscribeToEvents(); // Handle console input and process commands @@ -35,9 +67,9 @@ public static async Task Main() // Flush logs before exiting await Log.CloseAndFlushAsync(); } - + /// - /// Displays the application welcome message with ASCII art and command instructions. + /// Displays the application welcome message with ASCII art and command instructions. /// private static void DisplayWelcomeMessage() { @@ -64,29 +96,37 @@ private static void DisplayWelcomeMessage() } /// - /// Creates and configures MQTT client options. + /// Parses the command line arguments to determine which profile to use. /// - /// Configured MqttClientOptions instance. - private static MqttClientOptions CreateMqttOptions() + /// The command line arguments. + /// The profile name. + private static string ParseProfileArgument(string[] args) { - return new MqttClientOptionsBuilder() - .WithTcpServer("BROKER.HIVEMQ.COM", 1883) - .WithProtocolVersion(MqttProtocolVersion.V311) - .Build(); + const string defaultProfile = "mimic"; + const string profileArg = "--profile"; + + // Check if --profile argument is provided + for (var i = 0; i < args.Length; i++) + if (args[i].Equals(profileArg, StringComparison.OrdinalIgnoreCase) && i + 1 < args.Length) + return args[i + 1].ToLower(); + + // Default to the mimic profile if no profile is specified + return defaultProfile; } /// - /// Creates and configures Sparkplug client options. + /// Creates the appropriate profile based on the profile name. /// - /// Configured SparkplugClientOptions instance. - private static SparkplugClientOptions CreateSparkplugOptions() + /// The name of the profile to create. + /// The created profile instance. + /// Thrown when an invalid profile name is provided. + private static IProfile CreateProfile(string profileName) { - return new SparkplugClientOptions + return profileName switch { - Version = SparkplugVersion.V300, - HostApplicationId = "SparklerNetSimpleHostApp", - Subscriptions = { new MqttTopicFilterBuilder().WithTopic("spBv1.0/MIMIC/#").WithAtLeastOnceQoS().Build() }, - EnableMessageOrdering = true + "tck" => new TckApplicationProfile(), + "mimic" => new MimicApplicationProfile(), + _ => throw new ArgumentOutOfRangeException(nameof(profileName), profileName, null) }; } } \ No newline at end of file diff --git a/SparklerNet.Samples/SimpleHostApplication.cs b/SparklerNet.Samples/SimpleHostApplication.cs index 9f9f5e9..c966882 100644 --- a/SparklerNet.Samples/SimpleHostApplication.cs +++ b/SparklerNet.Samples/SimpleHostApplication.cs @@ -3,7 +3,6 @@ using SparklerNet.Core.Constants; using SparklerNet.Core.Events; using SparklerNet.Core.Model; -using SparklerNet.Core.Options; using SparklerNet.HostApplication; using SparklerNet.HostApplication.Extensions; using static SparklerNet.Core.Constants.SparkplugMessageType; @@ -22,13 +21,11 @@ public class SimpleHostApplication /// /// Initializes a new instance of the SimpleHostApplication class. /// - /// MQTT client options - /// Sparkplug client options - /// Logger factory to create required loggers - public SimpleHostApplication(MqttClientOptions mqttOptions, SparkplugClientOptions sparkplugOptions, - ILoggerFactory loggerFactory) + /// The Sparkplug Host Application instance. + /// Logger factory to create required loggers. + public SimpleHostApplication(SparkplugHostApplication hostApplication, ILoggerFactory loggerFactory) { - _hostApplication = new SparkplugHostApplication(mqttOptions, sparkplugOptions, loggerFactory); + _hostApplication = hostApplication; _logger = loggerFactory.CreateLogger(); _isRunning = false; } @@ -164,32 +161,67 @@ private async Task SendCommandAsync(bool isNodeCommand) { try { - // Show default value in prompt for better user experience - const string defaultGroupId = "Sparkplug Group 1"; + const string defaultGroupId = "Sparkplug_Group_1"; Console.Write($"Enter Group ID [{defaultGroupId}]: "); var groupIdInput = Console.ReadLine(); var groupId = string.IsNullOrWhiteSpace(groupIdInput) ? defaultGroupId : groupIdInput; - const string defaultEdgeNodeId = "Sparkplug Node 1"; + const string defaultEdgeNodeId = "Sparkplug_Node_1"; Console.Write($"Enter Edge Node ID [{defaultEdgeNodeId}]: "); var edgeNodeIdInput = Console.ReadLine(); var edgeNodeId = string.IsNullOrWhiteSpace(edgeNodeIdInput) ? defaultEdgeNodeId : edgeNodeIdInput; - if (isNodeCommand) + string? deviceId = null; + if (!isNodeCommand) { - _logger.LogInformation("Sending NCMD to {Group}/{Node}", groupId, edgeNodeId); - await _hostApplication.PublishEdgeNodeRebirthCommandAsync(groupId, edgeNodeId); - } - else - { - // Show default value in prompt for better user experience - const string defaultDeviceId = "Sparkplug Device 1"; + const string defaultDeviceId = "Sparkplug_Device_1"; Console.Write($"Enter Device ID [{defaultDeviceId}]: "); var deviceIdInput = Console.ReadLine(); - var deviceId = string.IsNullOrWhiteSpace(deviceIdInput) ? defaultDeviceId : deviceIdInput; + deviceId = string.IsNullOrWhiteSpace(deviceIdInput) ? defaultDeviceId : deviceIdInput; + } - _logger.LogInformation("Sending DCMD to {Group}/{Node}/{Device}", groupId, edgeNodeId, deviceId); - await _hostApplication.PublishDeviceRebirthCommandAsync(groupId, edgeNodeId, deviceId); + const string defaultCommandType = "Rebirth"; + string commandType; + bool isValid; + do + { + Console.Write($"Enter Command Type (Rebirth/ScanRate) [{defaultCommandType}]: "); + var commandTypeInput = Console.ReadLine(); + commandType = string.IsNullOrWhiteSpace(commandTypeInput) + ? defaultCommandType + : commandTypeInput.Trim(); + + // Validate command type with single check + isValid = commandType is "Rebirth" or "ScanRate"; + if (!isValid) Console.WriteLine("Invalid command type. Please enter 'Rebirth' or 'ScanRate'."); + } while (!isValid); + + // ReSharper disable once ConvertIfStatementToSwitchStatement + if (isNodeCommand && commandType == "Rebirth") + { + _logger.LogInformation("Sending NCMD Rebirth to {Group}/{Node}", groupId, edgeNodeId); + await _hostApplication.PublishEdgeNodeRebirthCommandAsync(groupId, edgeNodeId); + } + else if (isNodeCommand && commandType == "ScanRate") + { + var randomScanRate = new Random().Next(1000, 10001); + _logger.LogInformation("Sending NCMD ScanRate ({ScanRate}ms) to {Group}/{Node}", randomScanRate, + groupId, edgeNodeId); + await _hostApplication.PublishEdgeNodeScanRateCommandAsync(groupId, edgeNodeId, randomScanRate); + } + else if (!isNodeCommand && commandType == "Rebirth") + { + _logger.LogInformation("Sending DCMD Rebirth to {Group}/{Node}/{Device}", groupId, edgeNodeId, + deviceId!); + await _hostApplication.PublishDeviceRebirthCommandAsync(groupId, edgeNodeId, deviceId!); + } + else if (!isNodeCommand && commandType == "ScanRate") + { + var randomScanRate = new Random().Next(1000, 10001); + _logger.LogInformation("Sending DCMD ScanRate ({ScanRate}ms) to {Group}/{Node}/{Device}", + randomScanRate, groupId, edgeNodeId, deviceId!); + await _hostApplication.PublishDeviceScanRateCommandAsync(groupId, edgeNodeId, deviceId!, + randomScanRate); } } catch (Exception ex) diff --git a/SparklerNet.Samples/SparklerNet.Samples.csproj b/SparklerNet.Samples/SparklerNet.Samples.csproj index 7362837..a517867 100644 --- a/SparklerNet.Samples/SparklerNet.Samples.csproj +++ b/SparklerNet.Samples/SparklerNet.Samples.csproj @@ -8,11 +8,12 @@ - - - - - + + + + + + diff --git a/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs b/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs new file mode 100644 index 0000000..f1a07ee --- /dev/null +++ b/SparklerNet.Tests/HostApplication/Caches/CacheHelperTests.cs @@ -0,0 +1,56 @@ +using Xunit; +using SparklerNet.HostApplication.Caches; + +namespace SparklerNet.Tests.HostApplication.Caches; + +public class CacheHelperTests +{ + [Theory] + [InlineData("prefix:", "group1", "edge1", "device1", "prefix:group1:edge1:device1")] + [InlineData("status:", "group2", "edge2", "device2", "status:group2:edge2:device2")] + [InlineData(null, "group3", "edge3", "device3", "group3:edge3:device3")] + [InlineData("", "group4", "edge4", "device4", "group4:edge4:device4")] + public void BuildCacheKey_WithDeviceId_ReturnsCorrectFormat(string? prefix, string groupId, string edgeNodeId, string? deviceId, string expected) + { + var result = CacheHelper.BuildCacheKey(prefix, groupId, edgeNodeId, deviceId); + Assert.Equal(expected, result); + } + + [Theory] + [InlineData("prefix:", "group1", "edge1", null, "prefix:group1:edge1")] + [InlineData("status:", "group2", "edge2", null, "status:group2:edge2")] + [InlineData(null, "group3", "edge3", null, "group3:edge3")] + [InlineData("", "group4", "edge4", null, "group4:edge4")] + public void BuildCacheKey_WithoutDeviceId_ReturnsCorrectFormat(string? prefix, string groupId, string edgeNodeId, string? deviceId, string expected) + { + var result = CacheHelper.BuildCacheKey(prefix, groupId, edgeNodeId, deviceId); + Assert.Equal(expected, result); + } + + [Fact] + public void GetSemaphore_ReturnsSameInstanceForSameKey() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + + Assert.Same(semaphore1, semaphore2); + } + + [Fact] + public void GetSemaphore_ReturnsDifferentInstancesForDifferentKeys() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group2", "edge2", "device2"); + + Assert.NotSame(semaphore1, semaphore2); + } + + [Fact] + public void GetSemaphore_WithAndWithoutDeviceId_ReturnsDifferentInstances() + { + var semaphore1 = CacheHelper.GetSemaphore("group1", "edge1", "device1"); + var semaphore2 = CacheHelper.GetSemaphore("group1", "edge1", null); + + Assert.NotSame(semaphore1, semaphore2); + } +} \ No newline at end of file diff --git a/SparklerNet.Tests/HostApplication/Caches/MessageOrderingServiceTests.cs b/SparklerNet.Tests/HostApplication/Caches/MessageOrderingServiceTests.cs index 82e0021..f767b83 100644 --- a/SparklerNet.Tests/HostApplication/Caches/MessageOrderingServiceTests.cs +++ b/SparklerNet.Tests/HostApplication/Caches/MessageOrderingServiceTests.cs @@ -1,5 +1,6 @@ +using System.Collections.Concurrent; using System.Reflection; -using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Moq; using SparklerNet.Core.Constants; @@ -13,74 +14,18 @@ namespace SparklerNet.Tests.HostApplication.Caches; public class MessageOrderingServiceTests { - private readonly MessageOrderingService _service; - - public MessageOrderingServiceTests() - { - var options = new SparkplugClientOptions - { - HostApplicationId = "TestHost", - SeqReorderTimeout = 1000, - SendRebirthWhenTimeout = true - }; - IMemoryCache cache = new MemoryCache(new MemoryCacheOptions()); - - // Setup mock ILoggerFactory - var mockLogger = new Mock>(); - var mockLoggerFactory = new Mock(); - mockLoggerFactory.Setup(factory => factory.CreateLogger(It.IsAny())) - .Returns(mockLogger.Object); - - _service = new MessageOrderingService(cache, options, mockLoggerFactory.Object); - } - - [Theory] - [InlineData("Group1", "Edge1", null, null, "Group1:Edge1")] - [InlineData("Group1", "Edge1", "Device1", null, "Group1:Edge1:Device1")] - [InlineData("Group1", "Edge1", null, "Prefix_", "Prefix_Group1:Edge1")] - [InlineData("Group1", "Edge1", "Device1", "Prefix_", "Prefix_Group1:Edge1:Device1")] - [InlineData("Group1", "Edge1", "", "Prefix_", "Prefix_Group1:Edge1")] - [InlineData("Group1", "Edge1", "Device1", "", "Group1:Edge1:Device1")] - public void BuildCacheKey_ShouldReturnCorrectKey(string groupId, string edgeNodeId, string? deviceId, - string? prefix, string expectedKey) - { - var result = MessageOrderingService.BuildCacheKey(prefix, groupId, edgeNodeId, deviceId); - Assert.Equal(expectedKey, result); - } - - [Theory] - // Normal integer comparison cases - [InlineData(10, 20, -1)] - [InlineData(20, 10, 1)] - [InlineData(15, 15, 0)] - // Wrap-around cases: x near 0 and y near 255 (x should be considered larger) - [InlineData(10, 240, 1)] - [InlineData(20, 230, 1)] - [InlineData(31, 224, 1)] - // Wrap-around cases: x near 255 and y near 0 (x should be considered smaller) - [InlineData(240, 10, -1)] - [InlineData(230, 20, -1)] - [InlineData(224, 31, -1)] - // Boundary cases testing - [InlineData(32, 223, -1)] // Threshold boundary should use normal comparison - [InlineData(223, 32, 1)] // Threshold boundary should use normal comparison - public void CircularSequenceComparer_ShouldCompareCorrectly(int x, int y, int expectedResult) - { - var comparer = new MessageOrderingService.CircularSequenceComparer(); - var result = comparer.Compare(x, y); - Assert.Equal(expectedResult, result); - } + private readonly MessageOrderingService _service = CreateMessageOrderingCache(); [Fact] - public void ProcessMessageOrder_ShouldProcessContinuousSequence() + public async Task ProcessMessageOrderAsync_ShouldProcessContinuousSequence() { var message1 = CreateMessageEventArgs(1); var message2 = CreateMessageEventArgs(2); var message3 = CreateMessageEventArgs(3); - var result1 = _service.ProcessMessageOrder(message1); - var result2 = _service.ProcessMessageOrder(message2); - var result3 = _service.ProcessMessageOrder(message3); + var result1 = await _service.ProcessMessageOrderAsync(message1); + var result2 = await _service.ProcessMessageOrderAsync(message2); + var result3 = await _service.ProcessMessageOrderAsync(message3); Assert.Single(result1); Assert.Single(result2); @@ -98,15 +43,15 @@ public void ProcessMessageOrder_ShouldProcessContinuousSequence() } [Fact] - public void ProcessMessageOrder_ShouldCacheOutOfOrderMessages() + public async Task ProcessMessageOrderAsync_ShouldCacheOutOfOrderMessages() { var message1 = CreateMessageEventArgs(1); var message3 = CreateMessageEventArgs(3); var message2 = CreateMessageEventArgs(2); - var result1 = _service.ProcessMessageOrder(message1); - var result3 = _service.ProcessMessageOrder(message3); - var result2 = _service.ProcessMessageOrder(message2); + var result1 = await _service.ProcessMessageOrderAsync(message1); + var result3 = await _service.ProcessMessageOrderAsync(message3); + var result2 = await _service.ProcessMessageOrderAsync(message2); Assert.Single(result1); Assert.Empty(result3); // Should be cached @@ -126,44 +71,50 @@ public void ProcessMessageOrder_ShouldCacheOutOfOrderMessages() } [Fact] - public void ProcessMessageOrder_ShouldHandleMultipleOutOfOrderMessages() + public async Task ProcessMessageOrderAsync_ShouldHandleMultipleOutOfOrderMessages() { + // Create messages with multiple sequence gaps var message1 = CreateMessageEventArgs(1); - var message4 = CreateMessageEventArgs(4); - var message6 = CreateMessageEventArgs(6); - var message2 = CreateMessageEventArgs(2); - var message3 = CreateMessageEventArgs(3); - var message5 = CreateMessageEventArgs(5); - - var result1 = _service.ProcessMessageOrder(message1); - var result4 = _service.ProcessMessageOrder(message4); - var result6 = _service.ProcessMessageOrder(message6); - var result2 = _service.ProcessMessageOrder(message2); - var result3 = _service.ProcessMessageOrder(message3); - var result5 = _service.ProcessMessageOrder(message5); - - Assert.Single(result1); // Message 1 processed immediately - Assert.Empty(result4); // Message 4 cached - Assert.Empty(result6); // Message 6 cached - Assert.Single(result2); // Message 2 processed immediately - Assert.Equal(2, result3.Count); // Messages 3 and 4 processed when the gap is filled - Assert.Equal(3, result3[0].Payload.Seq); - Assert.Equal(4, result3[1].Payload.Seq); - Assert.Equal(2, result5.Count); // Messages 5 and 6 processed when the gap is filled - Assert.Equal(5, result5[0].Payload.Seq); - Assert.Equal(6, result5[1].Payload.Seq); + var message4 = CreateMessageEventArgs(4); // Will be cached + var message6 = CreateMessageEventArgs(6); // Will be cached + var message3 = CreateMessageEventArgs(3); // Will be cached + var message2 = CreateMessageEventArgs(2); // Will process messages 2, 3, 4 + var message5 = CreateMessageEventArgs(5); // Will process messages 5, 6 + + var result1 = await _service.ProcessMessageOrderAsync(message1); + var result4 = await _service.ProcessMessageOrderAsync(message4); + var result6 = await _service.ProcessMessageOrderAsync(message6); + var result3 = await _service.ProcessMessageOrderAsync(message3); + var result2 = await _service.ProcessMessageOrderAsync(message2); + var result5 = await _service.ProcessMessageOrderAsync(message5); - // Verify fields for immediately processed consecutive messages + // Verify the initial message was processed + Assert.Single(result1); + Assert.Equal(1, result1[0].Payload.Seq); Assert.True(result1[0].IsSeqConsecutive); Assert.False(result1[0].IsCached); + + // Verify out-of-order messages were cached + Assert.Empty(result4); + Assert.Empty(result6); + Assert.Empty(result3); + + // Verify message2 triggers processing of messages 2, 3, 4 + Assert.Equal(3, result2.Count); + Assert.Equal(2, result2[0].Payload.Seq); + Assert.Equal(3, result2[1].Payload.Seq); + Assert.Equal(4, result2[2].Payload.Seq); Assert.True(result2[0].IsSeqConsecutive); Assert.False(result2[0].IsCached); + Assert.True(result2[1].IsSeqConsecutive); + Assert.True(result2[1].IsCached); + Assert.True(result2[2].IsSeqConsecutive); + Assert.True(result2[2].IsCached); - // Verify fields for messages processed from the cache - Assert.True(result3[0].IsSeqConsecutive); - Assert.False(result3[0].IsCached); - Assert.True(result3[1].IsSeqConsecutive); - Assert.True(result3[1].IsCached); + // Verify message5 triggers processing of messages 5, 6 + Assert.Equal(2, result5.Count); + Assert.Equal(5, result5[0].Payload.Seq); + Assert.Equal(6, result5[1].Payload.Seq); Assert.True(result5[0].IsSeqConsecutive); Assert.False(result5[0].IsCached); Assert.True(result5[1].IsSeqConsecutive); @@ -171,17 +122,17 @@ public void ProcessMessageOrder_ShouldHandleMultipleOutOfOrderMessages() } [Fact] - public void ProcessMessageOrder_ShouldHandleSequenceWrapAround() + public async Task ProcessMessageOrderAsync_ShouldHandleSequenceWrapAround() { var message254 = CreateMessageEventArgs(254); var message255 = CreateMessageEventArgs(255); var message0 = CreateMessageEventArgs(0); var message1 = CreateMessageEventArgs(1); - var result254 = _service.ProcessMessageOrder(message254); - var result255 = _service.ProcessMessageOrder(message255); - var result0 = _service.ProcessMessageOrder(message0); - var result1 = _service.ProcessMessageOrder(message1); + var result254 = await _service.ProcessMessageOrderAsync(message254); + var result255 = await _service.ProcessMessageOrderAsync(message255); + var result0 = await _service.ProcessMessageOrderAsync(message0); + var result1 = await _service.ProcessMessageOrderAsync(message1); Assert.Single(result254); Assert.Single(result255); @@ -200,13 +151,31 @@ public void ProcessMessageOrder_ShouldHandleSequenceWrapAround() } [Fact] - public void ProcessMessageOrder_ShouldReturnInvalidSequenceNumberMessages() + public async Task ProcessMessageOrderAsync_ShouldHandleFirstMessage() + { + // Reset the cache to ensure we're testing the first message scenario + await _service.ResetMessageOrderAsync("Group1", "Edge1"); + + // Create the first message with sequence 0 + var firstMessage = CreateMessageEventArgs(0); + var result = await _service.ProcessMessageOrderAsync(firstMessage); + + // Verify the first message is processed correctly + Assert.Single(result); + Assert.Same(firstMessage, result[0]); + Assert.True(result[0].IsSeqConsecutive); + Assert.False(result[0].IsCached); + Assert.Equal(0, result[0].Payload.Seq); + } + + [Fact] + public async Task ProcessMessageOrderAsync_ShouldReturnInvalidSequenceNumberMessages() { var invalidMessageNegative = CreateMessageEventArgs(-1); var invalidMessageTooHigh = CreateMessageEventArgs(256); - var resultNegative = _service.ProcessMessageOrder(invalidMessageNegative); - var resultTooHigh = _service.ProcessMessageOrder(invalidMessageTooHigh); + var resultNegative = await _service.ProcessMessageOrderAsync(invalidMessageNegative); + var resultTooHigh = await _service.ProcessMessageOrderAsync(invalidMessageTooHigh); // Verify that invalid sequence number messages are returned Assert.Single(resultNegative); @@ -216,236 +185,383 @@ public void ProcessMessageOrder_ShouldReturnInvalidSequenceNumberMessages() } [Fact] - public void ClearMessageOrderCache_ShouldRemoveCachedItems() + public async Task ResetMessageOrderAsync_ShouldRemoveCachedItems() { var message1 = CreateMessageEventArgs(1); var message3 = CreateMessageEventArgs(3); // Will be cached - _service.ProcessMessageOrder(message1); - _service.ProcessMessageOrder(message3); + await _service.ProcessMessageOrderAsync(message1); + await _service.ProcessMessageOrderAsync(message3); - _service.ClearMessageOrder("Group1", "Edge1", "Device1"); + await _service.ResetMessageOrderAsync("Group1", "Edge1"); var message2 = CreateMessageEventArgs(2); // Should not process message3 now - var result2 = _service.ProcessMessageOrder(message2); + var result2 = await _service.ProcessMessageOrderAsync(message2); - Assert.Single(result2); // Only context2 is processed, context3 was cleared from the cache + Assert.Single(result2); // Only message2 is processed, message3 was cleared from the cache Assert.Equal(2, result2[0].Payload.Seq); } [Fact] - public void OnReorderTimeout_ShouldProcessPendingMessages() + public async Task ProcessMessageOrderAsync_ShouldReturnReplacedMessage_WhenSequenceDuplicate() { - var memoryCache = new MemoryCache(new MemoryCacheOptions()); - var options = new SparkplugClientOptions { HostApplicationId = "TestHost" }; - var mockLogger = new Mock>(); - var mockLoggerFactory = new Mock(); - mockLoggerFactory.Setup(lf => lf.CreateLogger(It.IsAny())).Returns(mockLogger.Object); - - // List to capture pending messages processed by the timeout handler - List? capturedMessages = null; - - var service = new MessageOrderingService(memoryCache, options, mockLoggerFactory.Object) - { - // Set up the delegate to capture pending messages when processed by timeout - OnPendingMessages = messages => - { - capturedMessages = [.. messages]; - return Task.CompletedTask; - } - }; - - // Create out-of-order messages to generate pending messages + // Create the first message with sequence 2 that will be cached var message1 = CreateMessageEventArgs(1); - var message3 = CreateMessageEventArgs(3); // This will be cached as pending + var message3 = CreateMessageEventArgs(3); // Will be cached + var result1 = await _service.ProcessMessageOrderAsync(message1); + var result3 = await _service.ProcessMessageOrderAsync(message3); + + // Verify initial messages were processed/cached correctly + Assert.Single(result1); + Assert.Empty(result3); // Message 3 should be cached - // Process messages to create a pending state - service.ProcessMessageOrder(message1); - service.ProcessMessageOrder(message3); + // Create the second message with the same sequence 3 as the cached message + var duplicateMessage3 = CreateMessageEventArgs(3); + var resultDuplicate = await _service.ProcessMessageOrderAsync(duplicateMessage3); - // Simulate timeout by directly invoking the OnReorderTimeout method - const string timerKey = "Group1:Edge1:Device1"; - service.GetType().GetMethod("OnReorderTimeout", BindingFlags.NonPublic | BindingFlags.Instance)? - .Invoke(service, [timerKey]); + // Verify the result contains the original cached message (which was replaced) + Assert.Single(resultDuplicate); + Assert.Equal(3, resultDuplicate[0].Payload.Seq); + Assert.Equal(message3.GroupId, resultDuplicate[0].GroupId); + Assert.Equal(message3.EdgeNodeId, resultDuplicate[0].EdgeNodeId); + Assert.Equal(message3.DeviceId, resultDuplicate[0].DeviceId); + Assert.Equal(message3.MessageType, resultDuplicate[0].MessageType); + Assert.True(resultDuplicate[0].IsCached); // Original message should have IsCached = true - // Allow async operation to complete - Thread.Sleep(100); + // Now process message 2 to trigger processing of the duplicate message + var message2 = CreateMessageEventArgs(2); + var result2 = await _service.ProcessMessageOrderAsync(message2); - // Verify that pending messages were processed during timeout - Assert.NotNull(capturedMessages); - Assert.Single(capturedMessages); - Assert.Equal(3, capturedMessages[0].Payload.Seq); + // Verify message 2 and duplicate message 3 are processed + Assert.Equal(2, result2.Count); + Assert.Equal(2, result2[0].Payload.Seq); + Assert.Equal(3, result2[1].Payload.Seq); + Assert.Equal(duplicateMessage3.GroupId, result2[1].GroupId); + Assert.Equal(duplicateMessage3.EdgeNodeId, result2[1].EdgeNodeId); + Assert.Equal(duplicateMessage3.DeviceId, result2[1].DeviceId); + Assert.Equal(duplicateMessage3.MessageType, result2[1].MessageType); } [Fact] - public async Task OnReorderTimeout_ShouldRemoveTimer() + public async Task ProcessMessageOrderAsync_ShouldOnlyAcceptValidMessageTypes() { - var memoryCache = new MemoryCache(new MemoryCacheOptions()); - var options = new SparkplugClientOptions { HostApplicationId = "TestHost" }; - var mockLogger = new Mock>(); - var mockLoggerFactory = new Mock(); - mockLoggerFactory.Setup(lf => lf.CreateLogger(It.IsAny())).Returns(mockLogger.Object); - var service = new MessageOrderingService(memoryCache, options, mockLoggerFactory.Object) + // Test with valid message types + var validTypes = new[] + { + SparkplugMessageType.NDATA, SparkplugMessageType.DDATA, SparkplugMessageType.DBIRTH, + SparkplugMessageType.DDEATH + }; + foreach (var messageType in validTypes) + { + // Reset the cache for each test case to ensure a clean state + await _service.ResetMessageOrderAsync("Group1", "Edge1"); + var message = CreateMessageEventArgs(1, messageType); + var result = await _service.ProcessMessageOrderAsync(message); + Assert.Single(result); + } + + // Test with invalid message types + var invalidTypes = new[] { - OnPendingMessages = _ => Task.CompletedTask + SparkplugMessageType.NBIRTH, SparkplugMessageType.NDEATH, SparkplugMessageType.NCMD, + SparkplugMessageType.DCMD }; + foreach (var messageType in invalidTypes) + { + var message = CreateMessageEventArgs(1, messageType); + await Assert.ThrowsAsync(() => _service.ProcessMessageOrderAsync(message)); + } + } - // Create messages with sequence numbers 1 and 3 (simulate out of order) - var message1 = CreateMessageEventArgs(1); - var message3 = CreateMessageEventArgs(3); + [Fact] + public void CircularSequenceComparer_ShouldCompareCorrectly() + { + var comparer = new MessageOrderingService.CircularSequenceComparer(); - // Process messages to create a timer - service.ProcessMessageOrder(message1); - service.ProcessMessageOrder(message3); + // Normal integer comparison cases + Assert.Equal(-1, comparer.Compare(10, 20)); + Assert.Equal(1, comparer.Compare(20, 10)); + Assert.Equal(0, comparer.Compare(15, 15)); - // Get the cache key to verify timer removal - var cacheKey = MessageOrderingService.BuildCacheKey(null, "Group1", "Edge1", "Device1"); + // Wrap-around cases: x near 0 and y near 255 (x should be considered greater) + Assert.Equal(1, comparer.Compare(10, 240)); + Assert.Equal(1, comparer.Compare(20, 230)); + Assert.Equal(1, comparer.Compare(31, 224)); - const string timerKey = "Group1:Edge1:Device1"; - service.GetType().GetMethod("OnReorderTimeout", BindingFlags.NonPublic | BindingFlags.Instance) - ?.Invoke(service, [timerKey]); + // Wrap-around cases: x near 255 and y near 0 (x should be considered smaller) + Assert.Equal(-1, comparer.Compare(240, 10)); + Assert.Equal(-1, comparer.Compare(230, 20)); + Assert.Equal(-1, comparer.Compare(224, 31)); - await Task.Delay(100); + // Boundary cases testing + Assert.Equal(-1, comparer.Compare(32, 223)); // Threshold boundary should use normal comparison + Assert.Equal(1, comparer.Compare(223, 32)); // Threshold boundary should use normal comparison + } - // Verify the timer was removed from the cache - Assert.False(memoryCache.TryGetValue($"{cacheKey}_timer", out _)); + [Fact] + public void CreateSequenceCacheEntryOptions_ShouldReturnCorrectOptions() + { + // Test with default SeqCacheExpiration (0) + var serviceDefault = CreateMessageOrderingCache(); + + // Directly call the protected internal method (visible due to InternalsVisibleTo attribute) + var resultDefault = serviceDefault.CreateSequenceCacheEntryOptions(); + Assert.NotNull(resultDefault); + // When SeqCacheExpiration is 0, Expiration should be null (no explicit expiration) + Assert.Null(resultDefault.Expiration); + + // Test with custom SeqCacheExpiration (30 minutes) + var serviceCustom = CreateMessageOrderingCache(seqCacheExpiration: 30); + + var resultCustom = serviceCustom.CreateSequenceCacheEntryOptions(); + Assert.NotNull(resultCustom); + Assert.Equal(TimeSpan.FromMinutes(30), resultCustom.Expiration); } [Fact] - public async Task OnReorderTimeout_ShouldSendRebirthRequest_WhenSendRebirthWhenTimeoutEnabled() + public async Task OnReorderTimeout_ShouldProcessPendingMessages() { - string? actualGroupId = null; - string? actualEdgeNodeId = null; - string? actualDeviceId = null; - _service.OnRebirthRequested = (groupId, edgeNodeId, deviceId) => + // Create a message ordering cache with a short timeout + var service = CreateMessageOrderingCache(100); + + // Create messages with a sequence gap + var message1 = CreateMessageEventArgs(1); + var message3 = CreateMessageEventArgs(3); // Will be cached + + // Process the first message and cache the second one + await service.ProcessMessageOrderAsync(message1); + await service.ProcessMessageOrderAsync(message3); + + // Track processed pending messages + List? processedPendingMessages = null; + service.OnPendingMessages = messages => { - actualGroupId = groupId; - actualEdgeNodeId = edgeNodeId; - actualDeviceId = deviceId; + processedPendingMessages = messages.ToList(); return Task.CompletedTask; }; - const string timerKey = "Group1:Edge1:Device1"; - _service.GetType().GetMethod("OnReorderTimeout", BindingFlags.NonPublic | BindingFlags.Instance) - ?.Invoke(_service, [timerKey]); + // Call OnReorderTimeout directly with the timer key + var timerKey = "Group1:Edge1"; + var methodInfo = typeof(MessageOrderingService).GetMethod("OnReorderTimeout", + BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(methodInfo); - // Allow async operation to complete - await Task.Delay(100); + // Call the method (it's async void, so we can't await it directly) + methodInfo.Invoke(service, [timerKey]); - Assert.Equal("Group1", actualGroupId); - Assert.Equal("Edge1", actualEdgeNodeId); - Assert.Equal("Device1", actualDeviceId); + // Wait for a short time to allow the async operation to complete + await Task.Delay(200); + + // Verify pending message was processed + Assert.NotNull(processedPendingMessages); + Assert.Single(processedPendingMessages); + Assert.Equal(3, processedPendingMessages[0].Payload.Seq); } [Fact] - public void ProcessMessageOrder_ShouldReturnReplacedMessage_WhenSequenceDuplicate() + public async Task OnReorderTimeout_ShouldSendRebirthRequest() { - // Create the first message with sequence 2 that will be cached + // Create a message ordering cache with a short timeout + var service = CreateMessageOrderingCache(100); + + // Create messages with a sequence gap var message1 = CreateMessageEventArgs(1); var message3 = CreateMessageEventArgs(3); // Will be cached - var result1 = _service.ProcessMessageOrder(message1); - var result3 = _service.ProcessMessageOrder(message3); - // Verify initial messages were processed/cached correctly - Assert.Single(result1); - Assert.Empty(result3); // Message 3 should be cached + // Process the first message and cache the second one + await service.ProcessMessageOrderAsync(message1); + await service.ProcessMessageOrderAsync(message3); - // Create the second message with the same sequence 3 as the cached message - var duplicateMessage3 = CreateMessageEventArgs(3); - var resultDuplicate = _service.ProcessMessageOrder(duplicateMessage3); + // Track rebirth requests + string? rebirthGroupId = null; + string? rebirthEdgeNodeId = null; + string? rebirthDeviceId = null; + service.OnRebirthRequested = (groupId, edgeNodeId) => + { + rebirthGroupId = groupId; + rebirthEdgeNodeId = edgeNodeId; + return Task.CompletedTask; + }; - // Verify the result contains the original cached message (which was replaced) - Assert.Single(resultDuplicate); - Assert.Equal(3, resultDuplicate[0].Payload.Seq); - Assert.Same(message3, resultDuplicate[0]); // Should return the original cached message - Assert.True(resultDuplicate[0].IsCached); // Original message should have IsCached = true + // Call OnReorderTimeout directly with the timer key + var timerKey = "Group1:Edge1"; + var methodInfo = typeof(MessageOrderingService).GetMethod("OnReorderTimeout", + BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(methodInfo); - // Now process message 2 to trigger processing of the duplicate message - var message2 = CreateMessageEventArgs(2); - var result2 = _service.ProcessMessageOrder(message2); + // Call the method (it's async void, so we can't await it directly) + methodInfo.Invoke(service, [timerKey]); - // Verify message 2 and the duplicate message 3 are processed - Assert.Equal(2, result2.Count); - Assert.Equal(2, result2[0].Payload.Seq); - Assert.Equal(3, result2[1].Payload.Seq); - Assert.Same(duplicateMessage3, result2[1]); // Should use the new duplicate message, not the original + // Wait for a short time to allow the async operation to complete + await Task.Delay(200); + + // Verify rebirth request was sent + Assert.NotNull(rebirthGroupId); + Assert.NotNull(rebirthEdgeNodeId); + Assert.Equal("Group1", rebirthGroupId); + Assert.Equal("Edge1", rebirthEdgeNodeId); + Assert.Null(rebirthDeviceId); // No device ID in this case } [Fact] - public void GetAllMessagesAndClearCache_ShouldReturnAllCachedMessagesAndClearCache() + public async Task ProcessMessageOrderAsync_ShouldProcessMessagesConcurrently() { - // Create messages with an out-of-order sequence to generate cached messages + // Create multiple messages with continuous sequences + var messages = Enumerable.Range(1, 10) + .Select(seq => CreateMessageEventArgs(seq)) + .ToList(); + + // Process messages concurrently + var tasks = messages.Select(msg => _service.ProcessMessageOrderAsync(msg)); + var results = await Task.WhenAll(tasks); + + // Verify all messages were processed + Assert.Equal(10, results.Length); + + // Verify continuous sequence processing + for (var i = 0; i < results.Length; i++) + { + var result = results[i]; + if (i == 0) + { + // The first message should be processed immediately + Assert.Single(result); + Assert.Equal(1, result[0].Payload.Seq); + Assert.True(result[0].IsSeqConsecutive); + Assert.False(result[0].IsCached); + } + else + { + // Subsequent messages should be processed when their turn comes. + // This is a simplified test since concurrent processing will likely result in some messages being cached + // and processed later when the sequence is filled + var processedSeq = result.Select(msg => msg.Payload.Seq).OrderBy(seq => seq).ToList(); + Assert.NotNull(processedSeq); + } + } + } + + [Fact] + public async Task CachePendingMessageAsync_ShouldManageTimersCorrectly() + { + // Create a message ordering cache with a long timeout + var service = CreateMessageOrderingCache(10000); + + // Get the _reorderTimers field using reflection + var reorderTimersField = typeof(MessageOrderingService).GetField("_reorderTimers", + BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(reorderTimersField); + + var reorderTimers = (ConcurrentDictionary)reorderTimersField.GetValue(service)!; + Assert.NotNull(reorderTimers); + Assert.Empty(reorderTimers); + + // Create messages with a sequence gap var message1 = CreateMessageEventArgs(1); var message3 = CreateMessageEventArgs(3); // Will be cached - var message5 = CreateMessageEventArgs(5); // Will be cached - // Process messages to create a cached state - _service.ProcessMessageOrder(message1); - _service.ProcessMessageOrder(message3); - _service.ProcessMessageOrder(message5); + // Process the first message and cache the second one + await service.ProcessMessageOrderAsync(message1); + await service.ProcessMessageOrderAsync(message3); - // Get all messages and clear the cache - var result = _service.GetAllMessagesAndClearCache(); + // Verify timer was added + Assert.Single(reorderTimers); + var timerKey = "Group1:Edge1"; + Assert.True(reorderTimers.ContainsKey(timerKey)); - // Verify all cached messages are returned - Assert.Equal(2, result.Count); - - // Verify the returned messages have the correct sequence numbers - var sequenceNumbers = result.Select(m => m.Payload.Seq).ToList(); - Assert.Contains(3, sequenceNumbers); - Assert.Contains(5, sequenceNumbers); + // Create another message with sequence gap (should update the existing timer) + var message5 = CreateMessageEventArgs(5); // Will be cached + await service.ProcessMessageOrderAsync(message5); + + // Verify only one timer exists for the same edge node + Assert.Single(reorderTimers); + Assert.True(reorderTimers.ContainsKey(timerKey)); - // Verify cache is cleared by checking that processing a message with sequence 2 - // doesn't trigger processing of previously cached messages + // Process message2 to fill the gap var message2 = CreateMessageEventArgs(2); - var resultAfterClear = _service.ProcessMessageOrder(message2); - - // Only message 2 should be processed, cached messages (3 and 5) should be gone - Assert.Single(resultAfterClear); - Assert.Equal(2, resultAfterClear[0].Payload.Seq); + await service.ProcessMessageOrderAsync(message2); + + // Verify timer is still present (message5 is still cached) + Assert.Single(reorderTimers); + Assert.True(reorderTimers.ContainsKey(timerKey)); + + // Process message4 to fill the remaining gap + var message4 = CreateMessageEventArgs(4); + await service.ProcessMessageOrderAsync(message4); + + // Verify timer was removed (no more pending messages) + Assert.Empty(reorderTimers); } [Fact] - public void GetAllMessagesAndClearCache_ShouldReturnEmptyList_WhenNoMessagesCached() + public async Task GetPendingMessagesAsync_ShouldProcessMessagesUntilGap() { - // No messages processed before calling the method - - // Get all messages and clear the cache - var result = _service.GetAllMessagesAndClearCache(); - - // Verify empty list is returned + // Create messages with multiple sequence gaps + var message1 = CreateMessageEventArgs(1); + var message3 = CreateMessageEventArgs(3); // Will be cached + var message4 = CreateMessageEventArgs(4); // Will be cached + var message6 = CreateMessageEventArgs(6); // Will be cached + + // Process the first message and cache the others + await _service.ProcessMessageOrderAsync(message1); + await _service.ProcessMessageOrderAsync(message3); + await _service.ProcessMessageOrderAsync(message4); + await _service.ProcessMessageOrderAsync(message6); + + // Get and process pending messages with seq = 1 (should process 2, 3, 4) + // We need to call this method via reflection since it's private + var methodInfo = typeof(MessageOrderingService).GetMethod("GetPendingMessagesAsync", + BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(methodInfo); + + var result = await (Task>)methodInfo.Invoke(_service, + ["Group1", "Edge1", 1])!; + + // Verify that only messages 3 and 4 were processed (since message2 is missing) + // Wait, this is incorrect - we're testing GetPendingMessagesAsync with seq = 1, + // which means it will look for messages starting from seq = 2, + // Since message2 is missing, no messages should be processed Assert.Empty(result); + + // Now let's process message2 to fill the gap + var message2 = CreateMessageEventArgs(2); + var processResult = await _service.ProcessMessageOrderAsync(message2); + + // Verify that messages 2, 3, 4 were processed + Assert.Equal(3, processResult.Count); + Assert.Equal(2, processResult[0].Payload.Seq); + Assert.Equal(3, processResult[1].Payload.Seq); + Assert.Equal(4, processResult[2].Payload.Seq); } [Fact] - public void GetAllMessagesAndClearCache_ShouldClearAllDataStructures() + public async Task ClearCacheAsync_ShouldClearAllOrderingCache() { - // Create messages with an out-of-order sequence to generate cached messages and timers var message1 = CreateMessageEventArgs(1); - var message3 = CreateMessageEventArgs(3); // Will be cached and create a timer - - _service.ProcessMessageOrder(message1); - _service.ProcessMessageOrder(message3); - - // Get all messages and clear the cache - _service.GetAllMessagesAndClearCache(); - - // Create new messages with same sequence numbers - var newMessage1 = CreateMessageEventArgs(1); - var newMessage2 = CreateMessageEventArgs(2); - - // Process new messages - var result1 = _service.ProcessMessageOrder(newMessage1); - var result2 = _service.ProcessMessageOrder(newMessage2); - - // The new sequence should start fresh - message1 should be processed normally, - // and message2 should be processed as consecutive (not triggering any old cached messages) + var message3 = CreateMessageEventArgs(3); + + // Process messages to populate the cache + var result1 = await _service.ProcessMessageOrderAsync(message1); + var result3 = await _service.ProcessMessageOrderAsync(message3); + + // Verify cache has been populated Assert.Single(result1); - Assert.Equal(1, result1[0].Payload.Seq); - Assert.Single(result2); - Assert.Equal(2, result2[0].Payload.Seq); - Assert.True(result2[0].IsSeqConsecutive); + Assert.Empty(result3); // Message 3 should be cached + + // Call ClearCacheAsync to clear all cache + await _service.ClearCacheAsync(); + + // Process new messages after cache clear + var message2 = CreateMessageEventArgs(2); + var result2AfterClear = await _service.ProcessMessageOrderAsync(message2); + + // Verify cache has been cleared; message 2 is processed as a new sequence + Assert.Single(result2AfterClear); + Assert.Equal(2, result2AfterClear[0].Payload.Seq); + Assert.True(result2AfterClear[0].IsSeqConsecutive); + Assert.False(result2AfterClear[0].IsCached); } - private static SparkplugMessageEventArgs CreateMessageEventArgs(int sequenceNumber) + private static SparkplugMessageEventArgs CreateMessageEventArgs(int sequenceNumber, + SparkplugMessageType messageType = SparkplugMessageType.NDATA) { // Create payload with specified sequence number var payload = new Payload(); @@ -454,12 +570,40 @@ private static SparkplugMessageEventArgs CreateMessageEventArgs(int sequenceNumb return new SparkplugMessageEventArgs( SparkplugVersion.V300, - SparkplugMessageType.NDATA, + messageType, "Group1", "Edge1", "Device1", - payload, - null! + payload ); } + + private static MessageOrderingService CreateMessageOrderingCache( + int seqReorderTimeout = 1000, + int seqCacheExpiration = 0, + string hostApplicationId = "TestHost") + { + var options = new SparkplugClientOptions + { + HostApplicationId = hostApplicationId, + EnableMessageOrdering = true, + SeqReorderTimeout = seqReorderTimeout, + SendRebirthWhenTimeout = true, + SeqCacheExpiration = seqCacheExpiration + }; + + var services = new ServiceCollection(); + services.AddMemoryCache(); + services.AddHybridCache(); + services.AddSingleton(options); + + var mockLoggerFactory = new Mock(); + mockLoggerFactory.Setup(factory => factory.CreateLogger(It.IsAny())) + .Returns(new Mock>().Object); + services.AddSingleton(mockLoggerFactory.Object); + services.AddSingleton(); + + var serviceProvider = services.BuildServiceProvider(); + return serviceProvider.GetRequiredService(); + } } \ No newline at end of file diff --git a/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs new file mode 100644 index 0000000..32fe6bb --- /dev/null +++ b/SparklerNet.Tests/HostApplication/Caches/StatusTrackingServiceTests.cs @@ -0,0 +1,407 @@ +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.DependencyInjection; +using SparklerNet.HostApplication.Caches; +using Xunit; + +namespace SparklerNet.Tests.HostApplication.Caches; + +[SuppressMessage("ReSharper", "ConvertToConstant.Local")] +public class StatusTrackingServiceTests +{ + private readonly StatusTrackingService _statusService; + + public StatusTrackingServiceTests() + { + var services = new ServiceCollection(); + services.AddMemoryCache(); + services.AddHybridCache(); + services.AddSingleton(); + var serviceProvider = services.BuildServiceProvider(); + _statusService = serviceProvider.GetRequiredService(); + } + + [Fact] + public async Task IsEndpointOnline_WhenNoStatusInCache_ReturnsFalse() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.False(result); + } + + [Fact] + public async Task IsEndpointOnline_WithOnlineStatus_ReturnsTrue() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + var edgeNodeStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(edgeNodeStatus); + } + + [Fact] + public async Task IsEndpointOnline_WithOfflineStatus_ReturnsFalse() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.False(result); + } + + [Fact] + public async Task IsEndpointOnline_WithNullDeviceId_ReturnsCorrectStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + var result = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(result); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOnlineStatus_CachesStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var isOnline = true; + var bdSeq = 1; + var timestamp = 1000L; + + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, isOnline, bdSeq, timestamp); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenNewerTimestampUpdatesOlderTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set initial status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Update with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenOlderTimestampDoesNotUpdateNewerTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set initial status with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 2, 2000); + + // Try to update with an older timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Should remain online + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.True(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithSameBdSeqUpdatesOnlineStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var bdSeq = 1; + + // Set online status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, bdSeq, 1000); + + // Set offline status with same bdSeq + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, bdSeq, 2000); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.False(status); + } + + [Fact] + public async Task UpdateEdgeNodeOnlineStatus_WhenSettingOfflineStatusWithNewerTimestampUpdatesOfflineStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + + // Set offline status + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Update offline status with newer timestamp + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 2, 2000); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + + Assert.False(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenSettingOnlineStatus_CachesStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + var isOnline = true; + var timestamp = 1000L; + + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, isOnline, timestamp); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenNewerTimestampUpdatesOlderTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set initial status + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); + + // Update with newer timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_WhenOlderTimestampDoesNotUpdateNewerTimestamp() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set initial status with newer timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Try to update with an older timestamp + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, false, 1000); + + // Should remain online + var status = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.True(status); + } + + [Fact] + public async Task UpdateDeviceOnlineStatus_DeviceStatusIsIndependentOfEdgeNodeStatus() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set edge node as offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 1000); + + // Set device as online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Check statuses separately + var edgeNodeStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, null); + var deviceStatus = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + + Assert.False(edgeNodeStatus); + Assert.True(deviceStatus); + } + + [Fact] + public async Task WhenEdgeNodeGoesOffline_AssociatedDevicesBecomeOffline() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set EdgeNode online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Set Device online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Device should be online when EdgeNode is online + var deviceStatusBefore = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + Assert.True(deviceStatusBefore, "Device should be online when EdgeNode is online"); + + // Set EdgeNode offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); + + // Device should now be offline due to EdgeNode going offline + var deviceStatusAfter = await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId); + Assert.False(deviceStatusAfter, "Device should be offline when EdgeNode goes offline"); + } + + [Fact] + public async Task WhenEdgeNodeComesOnlineAgain_DeviceStatusRequiresReset() + { + var groupId = "group20"; + var edgeNodeId = "edgeNode20"; + var deviceId = "device20"; + + // Set EdgeNode and Device online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Verify both are online + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), "Device should be online"); + + // Set EdgeNode offline and verify the Device becomes offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 3000); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), + "Device should be offline when EdgeNode is offline"); + + // Set EdgeNode online again + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 4000); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), + "EdgeNode should be online after reconnection"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), + "Device should remain offline"); + } + + [Fact] + public async Task WhenEdgeNodeGoesOffline_MultipleDevicesBecomeOfflineSimultaneously() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId1 = "device1"; + var deviceId2 = "device2"; + + // Set EdgeNode online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + + // Set multiple Devices online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId1, true, 2000); + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId2, true, 3000); + + // Verify all are online + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId1), + "First device should be online"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId2), + "Second device should be online"); + + // Set EdgeNode offline + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, false, 1, 4000); + + // All devices should now be offline due to EdgeNode going offline + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId1), + "First device should be offline"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId2), + "Second device should be offline"); + } + + [Fact] + public async Task TestMixedEdgeNodeAndDeviceStatusUpdates_Concurrent() + { + const string groupId = "test-group"; + const string edgeNodeId = "test-edge-node"; + const string deviceId = "test-device"; + + var completedEdgeNodeTasks = 0; + var completedDeviceTasks = 0; + var errors = new List(); + + // Create mixed edge node and device update tasks + var edgeNodeTasks = Enumerable.Range(0, 5).Select(async _ => + { + try + { + await _statusService.UpdateEdgeNodeOnlineStatus( + groupId, + edgeNodeId, + true, + 1, + DateTimeOffset.Now.ToUnixTimeMilliseconds()); + Interlocked.Increment(ref completedEdgeNodeTasks); + } + catch (Exception ex) + { + errors.Add(ex); + } + }); + + var deviceTasks = Enumerable.Range(0, 5).Select(async _ => + { + try + { + await _statusService.UpdateDeviceOnlineStatus( + groupId, + edgeNodeId, + deviceId, + true, + DateTimeOffset.Now.ToUnixTimeMilliseconds()); + Interlocked.Increment(ref completedDeviceTasks); + } + catch (Exception ex) + { + errors.Add(ex); + } + }); + + // Run all tasks concurrently + var allTasks = edgeNodeTasks.Concat(deviceTasks); + await Task.WhenAll(allTasks); + + // Verify no errors + Assert.Empty(errors); + Assert.Equal(5, completedEdgeNodeTasks); + Assert.Equal(5, completedDeviceTasks); + + // Verify statuses were updated correctly + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null)); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId)); + } + + [Fact] + public async Task ClearCacheAsync_ShouldClearAllStatusCache() + { + var groupId = "group1"; + var edgeNodeId = "edgeNode1"; + var deviceId = "device1"; + + // Set edge node online + await _statusService.UpdateEdgeNodeOnlineStatus(groupId, edgeNodeId, true, 1, 1000); + // Set device online + await _statusService.UpdateDeviceOnlineStatus(groupId, edgeNodeId, deviceId, true, 2000); + + // Verify statuses are correctly cached + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be online before cache clear"); + Assert.True(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), "Device should be online before cache clear"); + + // Call ClearCacheAsync method + await _statusService.ClearCacheAsync(); + + // Verify cache is cleared, all statuses return default value false + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, null), "EdgeNode should be offline after cache clear"); + Assert.False(await _statusService.IsEndpointOnline(groupId, edgeNodeId, deviceId), "Device should be offline after cache clear"); + } +} \ No newline at end of file diff --git a/SparklerNet.Tests/SparklerNet.Tests.csproj b/SparklerNet.Tests/SparklerNet.Tests.csproj index 10231b8..6ac00b8 100644 --- a/SparklerNet.Tests/SparklerNet.Tests.csproj +++ b/SparklerNet.Tests/SparklerNet.Tests.csproj @@ -10,10 +10,10 @@ - + - + diff --git a/SparklerNet/Core/Events/SparkplugMessageEventArgs.cs b/SparklerNet/Core/Events/SparkplugMessageEventArgs.cs index 5bfc1a4..576427f 100644 --- a/SparklerNet/Core/Events/SparkplugMessageEventArgs.cs +++ b/SparklerNet/Core/Events/SparkplugMessageEventArgs.cs @@ -1,4 +1,3 @@ -using MQTTnet; using SparklerNet.Core.Constants; using SparklerNet.Core.Model; @@ -17,7 +16,6 @@ namespace SparklerNet.Core.Events; /// The Edge Node ID /// The Device ID (optional) /// The message payload -/// The original MQTT message received event arguments /// Indicates whether the message sequence number is consecutive /// Indicates whether the message is cached /// The timestamp when the message was received on the application layer in milliseconds @@ -28,7 +26,6 @@ public class SparkplugMessageEventArgs( string edgeNodeId, string? deviceId, Payload payload, - MqttApplicationMessageReceivedEventArgs eventArgs, bool isSeqConsecutive = true, bool isCached = false, long timestamp = 0) : EventArgs @@ -63,11 +60,6 @@ public class SparkplugMessageEventArgs( /// public Payload Payload { get; init; } = payload; - /// - /// The original MQTT message received event arguments - /// - public MqttApplicationMessageReceivedEventArgs EventArgs { get; init; } = eventArgs; - /// /// Indicates whether the message sequence number is consecutive to the expected sequence number /// diff --git a/SparklerNet/Core/Model/Payload.cs b/SparklerNet/Core/Model/Payload.cs index d195bde..9ed1ae6 100644 --- a/SparklerNet/Core/Model/Payload.cs +++ b/SparklerNet/Core/Model/Payload.cs @@ -1,5 +1,7 @@ // ReSharper disable PropertyCanBeMadeInitOnly.Global +using System.Diagnostics.CodeAnalysis; + namespace SparklerNet.Core.Model; /// @@ -28,4 +30,41 @@ public record Payload /// The array of bytes which can be used for any custom binary encoded data. /// public byte[]? Body { get; init; } + + /// + /// Gets the bdSeq (Birth/Death Sequence) metric value from the payload. + /// bdSeq is a special metric used in Birth and Death messages to ensure proper sequence tracking. + /// + /// The bdSeq value if found and datatype is supported, otherwise 0 + [SuppressMessage("ReSharper", "InvertIf")] + public int GetBdSeq() + { + // Find the bdSeq metric in the metrics list + var bdSeqMetric = Metrics.FirstOrDefault(m => m.Name == "bdSeq"); + + // Check if the metric exists and has a value + if (bdSeqMetric is { Value: not null, DataType: not null }) + { + // List of supported data types that can be converted to int + var supportedTypes = new[] + { + DataType.Int16, DataType.Int32, DataType.Int64, DataType.UInt8, + DataType.UInt16, DataType.UInt32, DataType.UInt64 + }; + + // Check if the data type is supported + if (supportedTypes.Contains(bdSeqMetric.DataType.Value)) + try + { + return Convert.ToInt32(bdSeqMetric.Value); + } + catch (OverflowException) + { + return 0; + } + } + + // Return default value 0 if the metric is not found, has no value + return 0; + } } \ No newline at end of file diff --git a/SparklerNet/Core/Options/SparkplugClientOptions.cs b/SparklerNet/Core/Options/SparkplugClientOptions.cs index 776ea1a..d222fd8 100644 --- a/SparklerNet/Core/Options/SparkplugClientOptions.cs +++ b/SparklerNet/Core/Options/SparkplugClientOptions.cs @@ -31,13 +31,29 @@ public record SparkplugClientOptions /// public List Subscriptions { get; set; } = []; + /// + /// Whether to always subscribe to the wildcard topic 'spBv1.0/#'. + /// If set to true, the client will subscribe to the wildcard topic 'spBv1.0/#' in addition to the topics specified in + /// Subscriptions. This is required to pass the Sparkplug TCK tests. + /// The default value is false. + /// + public bool AlwaysSubscribeToWildcardTopic { get; set; } + + /// + /// Whether to enable the status tracking mechanism. When enabled, the Sparkplug Host Application will track the status + /// of each Edge Node and Device. Users can more conveniently check the online status of Edge Nodes and Devices without + /// having to implement this logic themselves. + /// The default value is true. + /// + public bool EnableStatusTracking { get; set; } = true; + /// /// Whether to enable the message ordering mechanism. The specification requires Sparkplug Host Application to ensure /// that all messages arrive within a Reorder Timeout. /// Since the cache and timeout mechanism actually involves many edge cases to consider, enabling this function will /// incur certain performance overhead and cause delays in out-of-order messages. In specific scenarios, due to the /// Timer reset mechanism, the actual delay may be much longer than the duration set by SeqReorderTimeout. Therefore, - /// it is not recommended to enable this function if the device side cannot guarantee that message sequence numbers + /// it is not recommended to enable this function if the Edge Node side cannot guarantee that message sequence numbers /// increase sequentially within the range of 0~255. /// The default value is false. /// @@ -46,22 +62,23 @@ public record SparkplugClientOptions /// /// The sequence numbers cache expiration time in minutes. This is a special timeout mechanism set for message sequence /// numbers. It is designed to prevent the following messages from being identified as out-of-order messages due to the - /// invalidation of cached message sequence numbers after a device has been offline for a long time. + /// invalidation of cached message sequence numbers after an Edge Node has been offline for a long time. /// The default value is 120 minutes. /// public int SeqCacheExpiration { get; set; } = 120; /// /// The reorder timeout period in milliseconds for out-of-order messages. This is the Reorder Timer defined in the - /// Sparkplug protocol. When an out-of-order message is cached, a Timer will be set for that device. When the Timer - /// expires, the messages in the cache will be processed. + /// Sparkplug specification. When an out-of-order message is cached, a Timer will be set for that Edge Node. When the + /// Timer expires, the messages in the cache will be processed. /// The default value is 10,000 milliseconds (10 seconds). /// public int SeqReorderTimeout { get; set; } = 10000; /// /// Whether to send a rebirth command when a reorder timeout occurs. After the Timer expires, the messages in the cache - /// will be processed. Subsequently, you can send a Rebirth message to attempt to reset the device's message sequence + /// will be processed. Subsequently, you can send a Rebirth message to attempt to reset the Edge Node's message + /// sequence /// number. /// The default value is true. /// diff --git a/SparklerNet/HostApplication/Caches/CacheHelper.cs b/SparklerNet/HostApplication/Caches/CacheHelper.cs new file mode 100644 index 0000000..ac420e5 --- /dev/null +++ b/SparklerNet/HostApplication/Caches/CacheHelper.cs @@ -0,0 +1,52 @@ +using System.Collections.Concurrent; + +namespace SparklerNet.HostApplication.Caches; + +/// +/// Provides helper methods for cache operations. +/// +public static class CacheHelper +{ + private static readonly ConcurrentDictionary Semaphores = new(); + + /// + /// Builds a standardized cache key based on the provided prefix and identifiers + /// + /// The prefix to use for the key (can be null) + /// The group ID part of the key + /// The edge node ID part of the key + /// The device ID part of the key (optional) + /// The constructed cache key in format "prefix:groupId:edgeNodeId:deviceId" or "prefix:groupId:edgeNodeId" + public static string BuildCacheKey(string? prefix, string groupId, string edgeNodeId, string? deviceId) + { + var baseKey = !string.IsNullOrEmpty(deviceId) + ? $"{groupId}:{edgeNodeId}:{deviceId}" + : $"{groupId}:{edgeNodeId}"; + + return string.IsNullOrEmpty(prefix) ? baseKey : $"{prefix}{baseKey}"; + } + + /// + /// Gets a SemaphoreSlim object for the specified context to support async locking + /// Ensures thread safety for asynchronous operations on a specific device/node combination + /// + /// The group ID part of the key + /// The edge node ID part of the key + /// The device ID part of the key (optional) + /// The SemaphoreSlim object for the specified EdgeNode/Device + public static SemaphoreSlim GetSemaphore(string groupId, string edgeNodeId, string? deviceId) + { + var key = BuildCacheKey(null, groupId, edgeNodeId, deviceId); + return Semaphores.GetOrAdd(key, _ => new SemaphoreSlim(1, 1)); + } + + /// + /// Clears all SemaphoreSlim objects from the cache + /// This method should only be called during application shutdown to prevent race conditions + /// + public static void ClearSemaphores() + { + foreach (var semaphore in Semaphores.Values) semaphore.Dispose(); + Semaphores.Clear(); + } +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs b/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs index d7ff86e..d63188c 100644 --- a/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs +++ b/SparklerNet/HostApplication/Caches/IMessageOrderingService.cs @@ -7,8 +7,7 @@ namespace SparklerNet.HostApplication.Caches; /// /// The group ID of the entity requiring rebirth /// The edge node ID of the entity requiring rebirth -/// The device ID of the entity requiring rebirth (optional) -public delegate Task RebirthRequestCallback(string groupId, string edgeNodeId, string? deviceId = null); +public delegate Task RebirthRequestCallback(string groupId, string edgeNodeId); /// /// Delegate for notifying when pending messages have been processed and are ready for consumption @@ -17,8 +16,8 @@ namespace SparklerNet.HostApplication.Caches; public delegate Task PendingMessagesCallback(IEnumerable messages); /// -/// Interface for a service responsible for managing message ordering by caching and validating sequence numbers. -/// Ensures messages are processed in sequential order according to the Sparkplug specification. +/// Interface for managing message ordering by caching and validating sequence numbers. +/// Ensures NDATA, DDATA, DBIRTH, and DDEATH messages are processed sequentially per Sparkplug specification. /// public interface IMessageOrderingService { @@ -33,27 +32,26 @@ public interface IMessageOrderingService PendingMessagesCallback? OnPendingMessages { get; set; } /// - /// Processes a message in the correct order, handling both continuous and non-continuous sequences + /// Processes the message in the correct order, handling both continuous and non-continuous sequences /// Messages with continuous sequence numbers are processed immediately /// Messages with gaps in sequence are cached for later processing when the gap is filled + /// NDATA DDATA DBIRTH and DDEATH messages will be processed here when Message Ordering enabled /// /// The message context to process /// List of messages that can be processed (current message if continuous and any continuous pending messages) - List ProcessMessageOrder(SparkplugMessageEventArgs message); + Task> ProcessMessageOrderAsync(SparkplugMessageEventArgs message); /// - /// Clears the sequence cache and pending messages for a specific edge node or device + /// Reset the message order and clear the pending messages for the specific edge node /// Also cleans up any associated timer resources + /// NBIRTH and NDEATH messages will be processed here when Message Ordering enabled /// - /// The group ID of the edge node + /// The group ID /// The edge node ID - /// The device ID (optional) - void ClearMessageOrder(string groupId, string edgeNodeId, string? deviceId); + Task ResetMessageOrderAsync(string groupId, string edgeNodeId); /// - /// Retrieves all cached messages and clears the message order cache. - /// This method is useful for accessing any messages that are stored in the cache before the cache is reset or cleared. + /// Clears all message ordering related cache entries. /// - /// List of all cached messages prior to cache clearance. - List GetAllMessagesAndClearCache(); + Task ClearCacheAsync(); } \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs new file mode 100644 index 0000000..b1c23c6 --- /dev/null +++ b/SparklerNet/HostApplication/Caches/IStatusTrackingService.cs @@ -0,0 +1,41 @@ +namespace SparklerNet.HostApplication.Caches; + +/// +/// Provides methods to track and query the status of edge nodes and devices. +/// +public interface IStatusTrackingService +{ + /// + /// Determines if a specific endpoint (EdgeNode or Device) is currently online. + /// + /// The group ID + /// The edge node ID + /// The device ID (optional) + /// True if the endpoint is online, otherwise false. + Task IsEndpointOnline(string groupId, string edgeNodeId, string? deviceId); + + /// + /// Updates the online status of a specific edge node. + /// + /// The group ID + /// The edge node ID + /// True if the edge node is online, otherwise false. + /// The bdSeq metric value of Birth or Death certificates + /// The timestamp of Birth or Death certificates + Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, bool isOnline, int bdSeq, long timestamp); + + /// + /// Updates the online status of a specific device. + /// + /// The group ID + /// The edge node ID + /// The device ID + /// True if the device is online, otherwise false. + /// The timestamp of Birth or Death certificates + Task UpdateDeviceOnlineStatus(string groupId, string edgeNodeId, string deviceId, bool isOnline, long timestamp); + + /// + /// Clears all status-related cache entries. + /// + Task ClearCacheAsync(); +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Caches/MessageOrderingService.cs b/SparklerNet/HostApplication/Caches/MessageOrderingService.cs index b35ddae..9836860 100644 --- a/SparklerNet/HostApplication/Caches/MessageOrderingService.cs +++ b/SparklerNet/HostApplication/Caches/MessageOrderingService.cs @@ -1,82 +1,79 @@ using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; -using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.Caching.Hybrid; using Microsoft.Extensions.Logging; using SparklerNet.Core.Events; using SparklerNet.Core.Extensions; using SparklerNet.Core.Options; +using static SparklerNet.Core.Constants.SparkplugMessageType; namespace SparklerNet.HostApplication.Caches; /// -/// Service responsible for managing message ordering by caching and validating sequence numbers -/// Ensures messages are processed in sequential order according to the Sparkplug specification +/// Manages message ordering by caching and validating sequence numbers. +/// Ensures NDATA, DDATA, DBIRTH, and DDEATH messages are processed sequentially per Sparkplug specification. +/// Uses HybridCache for efficient in-memory and distributed caching of sequence states and pending messages. /// public class MessageOrderingService : IMessageOrderingService { private const string SequenceKeyPrefix = "sparkplug:seq:"; // Prefix for the sequence number cache keys private const string PendingKeyPrefix = "sparkplug:pending:"; // Prefix for the pending messages cache keys + private const string OrderingTag = "sparkplug:tags:ordering"; // Global tag for all message ordering cache entries private const int SequenceNumberRange = 256; // Valid sequence number range (0-255) as defined in Sparkplug spec - private readonly IMemoryCache _cache; // In-memory cache for storing sequence states and pending messages - - // Collection to track all cache keys created by this service - private readonly ConcurrentDictionary _cachedPendingKeys = new(); - private readonly ConcurrentDictionary _cachedSeqKeys = new(); - + private readonly HybridCache _cache; // Hybrid cache for storing sequence states and pending messages private readonly ILogger _logger; // Logger for the service private readonly SparkplugClientOptions _options; // Configuration options for the service - // Fine-grained locks for thread-safe operations on specific devices - private readonly ConcurrentDictionary _reorderLocks = new(); - // Timer collection for handling message reordering timeouts private readonly ConcurrentDictionary _reorderTimers = new(); /// /// Initializes a new instance of the /// - /// The memory cache instance for storing sequence states and pending messages + /// The hybrid cache instance for storing sequence states and pending messages /// The Sparkplug client options containing reordering configuration /// The logger factory to create the logger - public MessageOrderingService(IMemoryCache cache, SparkplugClientOptions options, ILoggerFactory loggerFactory) + public MessageOrderingService(HybridCache cache, SparkplugClientOptions options, ILoggerFactory loggerFactory) { - _cache = cache; - _options = options; - _logger = loggerFactory.CreateLogger(); + _cache = cache ?? throw new ArgumentNullException(nameof(cache)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = loggerFactory.CreateLogger() ?? + throw new ArgumentNullException(nameof(loggerFactory)); } - /// - /// Gets or sets the delegate to be called when a Rebirth message needs to be sent due to detected message gaps - /// + /// public RebirthRequestCallback? OnRebirthRequested { get; set; } - /// - /// Gets or sets the delegate to be called when pending messages have been processed and are ready for consumption - /// + /// public PendingMessagesCallback? OnPendingMessages { get; set; } /// - public List ProcessMessageOrder(SparkplugMessageEventArgs message) + public async Task> ProcessMessageOrderAsync(SparkplugMessageEventArgs message) { ArgumentNullException.ThrowIfNull(message); + if (message.MessageType is not NDATA and not DDATA and not DBIRTH and not DDEATH) + throw new ArgumentException($"Invalid message type '{message.MessageType}'.", nameof(message)); // Validate sequence number is within the allowed range // If the sequence is invalid, return the message as-is to avoid processing it further if (message.Payload.Seq is < 0 or >= SequenceNumberRange) return [message]; var result = new List(); - // Use fine-grained lock to ensure thread safety for this specific device/node - lock (GetLockObject(message.GroupId, message.EdgeNodeId, message.DeviceId)) + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(message.GroupId, message.EdgeNodeId, null); + await semaphore.WaitAsync(); + + try { // Check if the sequence is continuous with the last processed message - if (UpdateSequenceNumber(message)) + if (await UpdateSequenceNumberAsync(message)) { // If the sequence is continuous, add the current message to results result.Add(message); // Get and process any now-continuous pending messages - var pendingMessages = GetPendingMessages(message.GroupId, message.EdgeNodeId, - message.DeviceId, + var pendingMessages = await GetPendingMessagesAsync(message.GroupId, message.EdgeNodeId, message.Payload.Seq); if (pendingMessages.Count > 0) result.AddRange(pendingMessages); } @@ -84,70 +81,57 @@ public List ProcessMessageOrder(SparkplugMessageEvent { // If the sequence has a gap, cache the message for later processing // Return the cached message if it exists, this indicates a duplicate sequence number - var oldMessage = CachePendingMessage(message); + var oldMessage = await CachePendingMessageAsync(message); if (oldMessage != null) result.Add(oldMessage); } } + finally + { + semaphore.Release(); + } return result; } /// - public void ClearMessageOrder(string groupId, string edgeNodeId, string? deviceId) + public async Task ResetMessageOrderAsync(string groupId, string edgeNodeId) { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + // Build all required cache keys - var seqKey = BuildCacheKey(SequenceKeyPrefix, groupId, edgeNodeId, deviceId); - var pendingKey = BuildCacheKey(PendingKeyPrefix, groupId, edgeNodeId, deviceId); - var timerKey = BuildCacheKey(null, groupId, edgeNodeId, deviceId); + var seqKey = CacheHelper.BuildCacheKey(SequenceKeyPrefix, groupId, edgeNodeId, null); + var pendingKey = CacheHelper.BuildCacheKey(PendingKeyPrefix, groupId, edgeNodeId, null); + var timerKey = CacheHelper.BuildCacheKey(null, groupId, edgeNodeId, null); + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + await semaphore.WaitAsync(); + + try + { + // Remove cached items + await _cache.RemoveAsync(seqKey); + await _cache.RemoveAsync(pendingKey); - // Use lock to ensure thread safety during cache and timer cleanup - lock (GetLockObject(groupId, edgeNodeId, deviceId)) + // Dispose timer if it exists + if (_reorderTimers.TryRemove(timerKey, out var timer)) await timer.DisposeAsync(); + } + finally { - // Remove cached items and dispose timer if it exists - _cache.Remove(seqKey); - _cache.Remove(pendingKey); - _cachedSeqKeys.TryRemove(seqKey, out _); - _cachedPendingKeys.TryRemove(pendingKey, out _); - if (_reorderTimers.TryRemove(timerKey, out var timer)) - timer.Dispose(); + semaphore.Release(); } } /// - [SuppressMessage("ReSharper", "InconsistentlySynchronizedField")] - public List GetAllMessagesAndClearCache() + public async Task ClearCacheAsync() { - // Dispose all reorder timers to prevent callbacks during cache clearing - var timerKeys = _reorderTimers.Keys.ToList(); - foreach (var timerKey in timerKeys) - if (_reorderTimers.TryRemove(timerKey, out var timer)) - timer.Dispose(); - - // Get all pending keys from the cache, clear the pending messages cache - var result = new List(); - var pendingKeys = _cachedPendingKeys.Keys.ToList(); - foreach (var pendingKey in pendingKeys) - { - // Get pending messages from the cache if available - if (!_cache.TryGetValue(pendingKey, out SortedDictionary? pendingMessages) - || pendingMessages == null) continue; - - // Add all pending messages to the result list - result.AddRange(pendingMessages.Values); - - // Remove the pending messages from the cache - _cache.Remove(pendingKey); - } - - // Clear the pending keys cache - _cachedPendingKeys.Clear(); + // Clear all message ordering related cache entries using the global tag + await _cache.RemoveByTagAsync(OrderingTag); - // Clear the sequence number cache and the sequence keys cache - var seqKeys = _cachedSeqKeys.Keys.ToList(); - foreach (var seqKey in seqKeys) _cache.Remove(seqKey); - _cachedSeqKeys.Clear(); - - return result; + // Dispose all reorder timers to prevent memory leaks + foreach (var timer in _reorderTimers.Values) await timer.DisposeAsync(); + _reorderTimers.Clear(); } /// @@ -170,17 +154,25 @@ private async void OnReorderTimeout(object? state) List pendingMessages; - // Use lock to ensure thread safety and prevent race conditions with concurrent message processing - lock (GetLockObject(groupId, edgeNodeId, deviceId)) + // Use SemaphoreSlim for async thread safety + // Because the NDATA DDATA DBIRTH and DDEATH messages will share the same sequence number, the lock will be acquired on the EdgeNode level + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + await semaphore.WaitAsync(); + + try { // Always remove and dispose the timer to prevent duplicate callbacks - if (_reorderTimers.TryRemove(timerKey, out var timer)) timer.Dispose(); + if (_reorderTimers.TryRemove(timerKey, out var timer)) await timer.DisposeAsync(); // Pass seq as -1 to indicate timeout scenario - pendingMessages = GetPendingMessages(groupId, edgeNodeId, deviceId, -1); + pendingMessages = await GetPendingMessagesAsync(groupId, edgeNodeId, -1); + } + finally + { + semaphore.Release(); } - // Call the pending messages delegate outside the lock to avoid deadlocks with async operations + // Call the pending messages delegate outside the semaphore to avoid deadlocks if (pendingMessages.Count > 0 && OnPendingMessages != null) { _logger.LogDebug( @@ -189,40 +181,9 @@ private async void OnReorderTimeout(object? state) await OnPendingMessages.Invoke(pendingMessages); } - // Send the rebirth request if configured and delegate is set + // Send the rebirth request if the option is enabled if (_options.SendRebirthWhenTimeout && OnRebirthRequested != null) - await OnRebirthRequested.Invoke(groupId, edgeNodeId, deviceId); - } - - /// - /// Builds a standardized cache key based on the provided prefix and identifiers - /// - /// The prefix to use for the key (can be null) - /// The group ID part of the key - /// The edge node ID part of the key - /// The device ID part of the key (optional) - /// The constructed cache key in format "prefix:groupId:edgeNodeId:deviceId" or "prefix:groupId:edgeNodeId" - internal static string BuildCacheKey(string? prefix, string groupId, string edgeNodeId, string? deviceId) - { - var baseKey = !string.IsNullOrEmpty(deviceId) - ? $"{groupId}:{edgeNodeId}:{deviceId}" - : $"{groupId}:{edgeNodeId}"; - - return string.IsNullOrEmpty(prefix) ? baseKey : $"{prefix}{baseKey}"; - } - - /// - /// Gets a lock object for the specified context from the reorder locks dictionary - /// Ensures thread safety for operations on a specific device/node combination - /// - /// The group ID part of the key - /// The edge node ID part of the key - /// The device ID part of the key (optional) - /// The lock object for the specified EdgeNode/Device - private object GetLockObject(string groupId, string edgeNodeId, string? deviceId) - { - var key = BuildCacheKey(null, groupId, edgeNodeId, deviceId); - return _reorderLocks.GetOrAdd(key, _ => new object()); + await OnRebirthRequested.Invoke(groupId, edgeNodeId); } /// @@ -230,14 +191,14 @@ private object GetLockObject(string groupId, string edgeNodeId, string? deviceId /// /// The message context containing sequence information /// True if the sequence is continuous, false if there's a gap - private bool UpdateSequenceNumber(SparkplugMessageEventArgs message) + private async Task UpdateSequenceNumberAsync(SparkplugMessageEventArgs message) { // Build cache key for the sequence number tracking - var cacheKey = BuildCacheKey(SequenceKeyPrefix, message.GroupId, message.EdgeNodeId, - message.DeviceId); + var cacheKey = CacheHelper.BuildCacheKey(SequenceKeyPrefix, message.GroupId, message.EdgeNodeId, null); // Check if the current sequence is continuous with the previously recorded sequence - if (_cache.TryGetValue(cacheKey, out int previousSeq)) + var previousSeq = await _cache.GetOrCreateAsync(cacheKey, _ => ValueTask.FromResult(-1), tags: [OrderingTag]); + if (previousSeq != -1) { // Calculate the next expected sequence number with wrap-around var expectedNextSeq = (previousSeq + 1) % SequenceNumberRange; @@ -246,9 +207,8 @@ private bool UpdateSequenceNumber(SparkplugMessageEventArgs message) } // Update the cache with the current sequence number - _cache.Set(cacheKey, message.Payload.Seq, CreateSequenceCacheEntryOptions()); - _cachedSeqKeys.TryAdd(cacheKey, null); - + // If configured, also set an expiration for the sequence number cache entry + await _cache.SetAsync(cacheKey, message.Payload.Seq, CreateSequenceCacheEntryOptions(), [OrderingTag]); return true; } @@ -257,18 +217,16 @@ private bool UpdateSequenceNumber(SparkplugMessageEventArgs message) /// /// The message context containing sequence number and message data /// The cached message if it exists (duplicated sequence number), otherwise null - private SparkplugMessageEventArgs? CachePendingMessage(SparkplugMessageEventArgs message) + private async Task CachePendingMessageAsync(SparkplugMessageEventArgs message) { // Build cache key for pending messages - var pendingKey = BuildCacheKey(PendingKeyPrefix, message.GroupId, message.EdgeNodeId, - message.DeviceId); + var pendingKey = CacheHelper.BuildCacheKey(PendingKeyPrefix, message.GroupId, message.EdgeNodeId, null); // Get existing pending messages or create a new sorted collection with circular sequence ordering - var pendingMessages = - _cache.TryGetValue(pendingKey, out SortedDictionary? existingMessages) - ? existingMessages ?? - new SortedDictionary(new CircularSequenceComparer()) - : new SortedDictionary(new CircularSequenceComparer()); + var pendingMessages = await _cache.GetOrCreateAsync(pendingKey, + _ => ValueTask.FromResult( + new SortedDictionary(new CircularSequenceComparer())), + tags: [OrderingTag]); // Add or update the message with this sequence number // If the sequence number already exists, the existing message will be returned @@ -276,15 +234,14 @@ private bool UpdateSequenceNumber(SparkplugMessageEventArgs message) pendingMessages.TryReplace(message.Payload.Seq, message, out var result); // Cache the updated pending messages - _cache.Set(pendingKey, pendingMessages); - _cachedPendingKeys.TryAdd(pendingKey, null); + await _cache.SetAsync(pendingKey, pendingMessages, tags: [OrderingTag]); _logger.LogDebug( - "{MessageType} message has been cached due to sequence disorder: Group={Group}, Node={Node}, Device={Device}, Seq={Seq}", + "{MessageType} message cached due to sequence gap: Group={Group}, Node={Node}, Device={Device}, Seq={Seq}", message.MessageType, message.GroupId, message.EdgeNodeId, message.DeviceId ?? "", message.Payload.Seq); // Build timer key for managing reordering timeout - var timerKey = BuildCacheKey(null, message.GroupId, message.EdgeNodeId, message.DeviceId); + var timerKey = CacheHelper.BuildCacheKey(null, message.GroupId, message.EdgeNodeId, null); // Check if the new message becomes the first message in the sorted collection var isFirstMessage = pendingMessages.First().Key == message.Payload.Seq; @@ -313,30 +270,36 @@ private bool UpdateSequenceNumber(SparkplugMessageEventArgs message) /// /// The group ID /// The edge node ID - /// The device ID (optional) /// /// The current sequence number, -1 for the reorder timeout scenario (still processes consecutive sequences in order) /// /// List of pending messages that can now be processed in order [SuppressMessage("ReSharper", "InvertIf")] - private List GetPendingMessages(string groupId, string edgeNodeId, string? deviceId, + private async Task> GetPendingMessagesAsync(string groupId, string edgeNodeId, int seq) { // Validate required parameters - if (string.IsNullOrEmpty(groupId)) throw new ArgumentNullException(nameof(groupId)); - if (string.IsNullOrEmpty(edgeNodeId)) throw new ArgumentNullException(nameof(edgeNodeId)); + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); var result = new List(); // Build all required cache keys - var seqKey = BuildCacheKey(SequenceKeyPrefix, groupId, edgeNodeId, deviceId); - var pendingKey = BuildCacheKey(PendingKeyPrefix, groupId, edgeNodeId, deviceId); - var timerKey = BuildCacheKey(null, groupId, edgeNodeId, deviceId); + var seqKey = CacheHelper.BuildCacheKey(SequenceKeyPrefix, groupId, edgeNodeId, null); + var pendingKey = CacheHelper.BuildCacheKey(PendingKeyPrefix, groupId, edgeNodeId, null); + var timerKey = CacheHelper.BuildCacheKey(null, groupId, edgeNodeId, null); + + // Get existing pending messages + var pendingMessages = await _cache.GetOrCreateAsync(pendingKey, + _ => ValueTask.FromResult?>(null), + tags: [OrderingTag]); - // Check if we have pending messages - if (!_cache.TryGetValue(pendingKey, out SortedDictionary? pendingMessages) || - pendingMessages == null || pendingMessages.Count == 0) + // If no pending messages, remove the cache entry and return an empty result + if (pendingMessages == null || pendingMessages.Count == 0) + { + await _cache.RemoveAsync(pendingKey); return result; + } // Process pending messages in order until we find a gap // When seq is -1 (timeout), we still process consecutive message sequences in order @@ -366,15 +329,15 @@ private List GetPendingMessages(string groupId, strin } while (foundMoreMessages && pendingMessages.Count > 0); // Update the cache with the new sequence after processing all continuous messages - _cache.Set(seqKey, currentSeq, CreateSequenceCacheEntryOptions()); - _cachedSeqKeys.TryAdd(seqKey, null); + // If configured, also set an expiration for the sequence number cache entry + await _cache.SetAsync(seqKey, currentSeq, CreateSequenceCacheEntryOptions(), tags: [OrderingTag]); // Update or remove pending messages cache and handle timer accordingly // ReSharper disable once ConvertIfStatementToSwitchStatement if (pendingMessages.Count > 0 && result.Count > 0) { // Still have pending messages and size changed, update cache and reset timer - _cache.Set(pendingKey, pendingMessages); + await _cache.SetAsync(pendingKey, pendingMessages, tags: [OrderingTag]); _reorderTimers.AddOrUpdate(timerKey, _ => new Timer(OnReorderTimeout, timerKey, _options.SeqReorderTimeout, Timeout.Infinite), (_, existingTimer) => @@ -385,11 +348,9 @@ private List GetPendingMessages(string groupId, strin } else if (pendingMessages.Count == 0) { - // No more pending messages, clean up cache and timer - _cache.Remove(pendingKey); - _cachedPendingKeys.TryRemove(pendingKey, out _); - if (_reorderTimers.TryRemove(timerKey, out var timer)) - timer.Dispose(); + // No more pending messages, clean up the cache and timer + await _cache.RemoveAsync(pendingKey); + if (_reorderTimers.TryRemove(timerKey, out var timer)) await timer.DisposeAsync(); } return result; @@ -398,31 +359,26 @@ private List GetPendingMessages(string groupId, strin /// /// Creates cache entry options for sequence number caching with appropriate expiration settings /// - /// Configured MemoryCacheEntryOptions with sliding expiration if specified in options - private MemoryCacheEntryOptions CreateSequenceCacheEntryOptions() + /// Configured HybridCacheEntryOptions with expiration if specified in options + [SuppressMessage("ReSharper", "MemberCanBePrivate.Global")] + protected internal HybridCacheEntryOptions CreateSequenceCacheEntryOptions() { - var memoryCacheOptions = new MemoryCacheEntryOptions(); - - if (_options.SeqCacheExpiration <= 0) return memoryCacheOptions; - - // Set sliding expiration based on the configuration - memoryCacheOptions.SlidingExpiration = TimeSpan.FromMinutes(_options.SeqCacheExpiration); - - // Register eviction callback to remove the key from _cachedSeqKeys when cache entry is evicted - memoryCacheOptions.RegisterPostEvictionCallback((key, value, reason, state) => - { - if (key is not string cacheKey) return; - _cachedSeqKeys.TryRemove(cacheKey, out _); - }); + if (_options.SeqCacheExpiration > 0) + // Set expiration based on the configuration using object initializer + return new HybridCacheEntryOptions + { + Expiration = TimeSpan.FromMinutes(_options.SeqCacheExpiration) + }; - return memoryCacheOptions; + return new HybridCacheEntryOptions(); } /// /// Custom comparer for circular sequence numbers (0-255) /// Ensures proper ordering when sequence numbers wrap around (0 is considered greater than 255) /// - internal class CircularSequenceComparer : IComparer + [SuppressMessage("ReSharper", "MemberCanBePrivate.Global")] + protected internal class CircularSequenceComparer : IComparer { /// /// Compares two circular sequence numbers considering the wrap-around at 255->0 diff --git a/SparklerNet/HostApplication/Caches/Readme.md b/SparklerNet/HostApplication/Caches/MessageOrderingService.md similarity index 85% rename from SparklerNet/HostApplication/Caches/Readme.md rename to SparklerNet/HostApplication/Caches/MessageOrderingService.md index d1170cb..2d1a9fc 100644 --- a/SparklerNet/HostApplication/Caches/Readme.md +++ b/SparklerNet/HostApplication/Caches/MessageOrderingService.md @@ -110,33 +110,23 @@ OnReorderTimeout(object? state) ### Reorder Clearing Flow ``` -ClearMessageOrder(groupId, edgeNodeId, deviceId) +ResetMessageOrderAsync(groupId, edgeNodeId) ├── Build all required cache keys (seqKey, pendingKey, timerKey) -├── Acquire fine-grained lock for thread safety on specific device/node combination +├── Acquire fine-grained lock for thread safety ├── Remove sequence number cache entry ├── Remove pending messages cache entry -├── Remove entry from _cachedSeqKeys and _cachedPendingKeys collections ├── Remove and dispose timer if it exists └── Exit lock ``` -### Get All Messages and Clear Cache Flow +### Clear Cache Flow ``` -GetAllMessagesAndClearCache() -├── Obtain snapshot of timer keys to avoid concurrent modification exceptions -├── Dispose all reorder timers to prevent callbacks during cache clearing -│ ├── Try to remove each timer from _reorderTimers collection -│ ├── Call Dispose() on each removed timer -├── Initialize result list for all cached messages -├── Obtain snapshot of pending cache keys to avoid concurrent modification exceptions -├── Process each pending key -│ ├── Try to get pending messages from cache -│ ├── Add all pending messages to result list -│ ├── Remove pending messages from cache -├── Clear _cachedPendingKeys collection -├── Obtain snapshot of sequence cache keys to avoid concurrent modification exceptions -├── Remove each sequence key from cache -├── Clear _cachedSeqKeys collection -└── Return result list containing all previously cached messages +ClearCacheAsync() +├── Remove all message ordering related cache entries using the global tag +├── Dispose all reorder timers to prevent memory leaks +│ ├── Iterate through all timers in _reorderTimers collection +│ ├── Call DisposeAsync() on each timer +│ └── Clear the _reorderTimers collection +└── Return completion task ``` diff --git a/SparklerNet/HostApplication/Caches/StatusTrackingService.cs b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs new file mode 100644 index 0000000..9d625b9 --- /dev/null +++ b/SparklerNet/HostApplication/Caches/StatusTrackingService.cs @@ -0,0 +1,174 @@ +using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Caching.Hybrid; + +namespace SparklerNet.HostApplication.Caches; + +/// +/// Service responsible for tracking the online status of edge nodes and devices +/// Provides methods to check and update the online status of specific edge nodes and devices +/// Uses HybridCache for efficient caching with both in-memory and distributed capabilities +/// +public class StatusTrackingService : IStatusTrackingService +{ + private const string StatusKeyPrefix = "sparkplug:status:"; // Prefix for the status cache keys + private const string StatusTag = "sparkplug:tags:status"; // Global tag for all status cache entries + private readonly HybridCache _cache; // Hybrid cache for storing endpoint status + + /// + /// Initializes a new instance of the + /// + /// The HybridCache instance for caching online status + public StatusTrackingService(HybridCache cache) + { + _cache = cache ?? throw new ArgumentNullException(nameof(cache)); + } + + /// + public async Task IsEndpointOnline(string groupId, string edgeNodeId, string? deviceId) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, deviceId); + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + await semaphore.WaitAsync(); + + try + { + // Get the status from the cache or create a new entry if it doesn't exist + var status = await _cache.GetOrCreateAsync(cacheKey, _ => ValueTask.FromResult(null), + tags: [StatusTag]); + + // If the status is null (meaning it was just created), remove it from cache to avoid storing null values + if (status is null) await _cache.RemoveAsync(cacheKey); + + // If the status is not in the cache, assume it is offline + return status is { IsOnline: true }; + } + finally + { + semaphore.Release(); + } + } + + /// + public async Task UpdateEdgeNodeOnlineStatus(string groupId, string edgeNodeId, bool isOnline, int bdSeq, + long timestamp) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, null); + var cacheTag = CacheHelper.BuildCacheKey(null, groupId, edgeNodeId, null); + + // Create a new status object + var newStatus = new EndpointStatus { IsOnline = isOnline, BdSeq = bdSeq, Timestamp = timestamp }; + + // Use SemaphoreSlim for async thread safety + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + await semaphore.WaitAsync(); + + try + { + // Get the current status from the cache or create a new entry if it doesn't exist + var currentStatus = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(newStatus), tags: [cacheTag, StatusTag]); + + // Online status update logic + if (newStatus.IsOnline) + { + // Update the cache if the new status is newer than the current status + // Note: if the cache is empty, currentStatus will be set to newStatus by GetOrCreateAsync + if (newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag, StatusTag]); + } + // Offline status update logic + else + { + // When the current status is offline, update if the new status is newer than the current status + // ReSharper disable once ConvertIfStatementToSwitchStatement + if (!currentStatus.IsOnline && newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag, StatusTag]); + + // When the current status is online, update if: + // 1. The new status has the same bdSeq + // 2. The new status has the same or newer timestamp + if (currentStatus.IsOnline && + (newStatus.BdSeq == currentStatus.BdSeq || newStatus.Timestamp >= currentStatus.Timestamp)) + { + await _cache.RemoveByTagAsync(cacheTag); + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag, StatusTag]); + } + } + } + finally + { + // Always release the semaphore to prevent deadlocks + semaphore.Release(); + } + } + + /// + public async Task UpdateDeviceOnlineStatus(string groupId, string edgeNodeId, string deviceId, bool isOnline, + long timestamp) + { + ArgumentException.ThrowIfNullOrWhiteSpace(groupId); + ArgumentException.ThrowIfNullOrWhiteSpace(edgeNodeId); + ArgumentException.ThrowIfNullOrWhiteSpace(deviceId); + + // Build the cache key for status tracking + var cacheKey = CacheHelper.BuildCacheKey(StatusKeyPrefix, groupId, edgeNodeId, deviceId); + var cacheTag = CacheHelper.BuildCacheKey(null, groupId, edgeNodeId, null); + + // Create a new status object + var newStatus = new EndpointStatus { IsOnline = isOnline, BdSeq = 0, Timestamp = timestamp }; + + // Use SemaphoreSlim for async thread safety + // Because when Edge Node is offline, all devices are also offline, so always use the Edge Node level semaphore + var semaphore = CacheHelper.GetSemaphore(groupId, edgeNodeId, null); + + try + { + // Wait for the semaphore asynchronously + await semaphore.WaitAsync(); + + // Get the current status from the cache or create a new entry if it doesn't exist + var currentStatus = await _cache.GetOrCreateAsync( + cacheKey, _ => ValueTask.FromResult(newStatus), tags: [cacheTag, StatusTag]); + + // Update the cache if the new status is newer than the current status + // Note: if the cache is empty, currentStatus will be set to newStatus by GetOrCreateAsync + if (newStatus.Timestamp > currentStatus.Timestamp) + await _cache.SetAsync(cacheKey, newStatus, tags: [cacheTag, StatusTag]); + } + finally + { + // Always release the semaphore to prevent deadlocks + semaphore.Release(); + } + } + + /// + public async Task ClearCacheAsync() + { + await _cache.RemoveByTagAsync(StatusTag); + } + + /// + /// Simple data class to store online status information + /// Allows for future expansion of status data beyond just a boolean flag + /// + [SuppressMessage("ReSharper", "MemberCanBePrivate.Global")] + protected record EndpointStatus + { + public bool IsOnline { get; init; } + + public int BdSeq { get; init; } + + public long Timestamp { get; init; } + } +} \ No newline at end of file diff --git a/SparklerNet/HostApplication/Extensions/SparkplugHostApplicationExtensions.cs b/SparklerNet/HostApplication/Extensions/SparkplugHostApplicationExtensions.cs index bd85bda..c7428a4 100644 --- a/SparklerNet/HostApplication/Extensions/SparkplugHostApplicationExtensions.cs +++ b/SparklerNet/HostApplication/Extensions/SparkplugHostApplicationExtensions.cs @@ -47,6 +47,51 @@ public static Task PublishDeviceRebirthCommandAsync( return hostApplication.PublishDeviceCommandMessageAsync(groupId, edgeNodeId, deviceId, payload); } + /// + /// Sends a Scan Rate command to a specific Edge Node + /// + /// The Sparkplug Host Application instance + /// The Sparkplug Group ID + /// The Sparkplug Edge Node ID + /// The scan rate value in milliseconds + /// The MQTT Client Publish Result + public static Task PublishEdgeNodeScanRateCommandAsync( + this SparkplugHostApplication hostApplication, string groupId, string edgeNodeId, long scanRate) + { + ArgumentNullException.ThrowIfNull(hostApplication); + SparkplugNamespace.ValidateNamespaceElement(groupId, nameof(groupId)); + SparkplugNamespace.ValidateNamespaceElement(edgeNodeId, nameof(edgeNodeId)); + if (scanRate <= 0) + throw new ArgumentOutOfRangeException(nameof(scanRate), scanRate, "Scan rate must be greater than zero."); + + var payload = CreateScanRatePayload(true, scanRate); + return hostApplication.PublishEdgeNodeCommandMessageAsync(groupId, edgeNodeId, payload); + } + + /// + /// Sends a Scan Rate command to a specific Device + /// + /// The Sparkplug Host Application instance + /// The Sparkplug Group ID + /// The Sparkplug Edge Node ID + /// The Sparkplug Device ID + /// The scan rate value in milliseconds + /// The MQTT Client Publish Result + public static Task PublishDeviceScanRateCommandAsync( + this SparkplugHostApplication hostApplication, string groupId, string edgeNodeId, string deviceId, + long scanRate) + { + ArgumentNullException.ThrowIfNull(hostApplication); + SparkplugNamespace.ValidateNamespaceElement(groupId, nameof(groupId)); + SparkplugNamespace.ValidateNamespaceElement(edgeNodeId, nameof(edgeNodeId)); + SparkplugNamespace.ValidateNamespaceElement(deviceId, nameof(deviceId)); + if (scanRate <= 0) + throw new ArgumentOutOfRangeException(nameof(scanRate), scanRate, "Scan rate must be greater than zero."); + + var payload = CreateScanRatePayload(false, scanRate); + return hostApplication.PublishDeviceCommandMessageAsync(groupId, edgeNodeId, deviceId, payload); + } + /// /// Creates a payload for Rebirth command /// @@ -68,4 +113,27 @@ private static Payload CreateRebirthPayload(bool isNodeCommand = true) } }; } + + /// + /// Creates a payload for Scan Rate command + /// + /// Indicates if this is a node command or device command + /// The scan rate value in milliseconds + /// Payload object configured for scan rate command + private static Payload CreateScanRatePayload(bool isNodeCommand, long value) + { + return new Payload + { + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Metrics = + { + new Metric + { + Name = isNodeCommand ? "Node Control/Scan Rate" : "Device Control/Scan Rate", + DataType = DataType.Int64, + Value = value + } + } + }; + } } \ No newline at end of file diff --git a/SparklerNet/HostApplication/SparkplugHostApplication.cs b/SparklerNet/HostApplication/SparkplugHostApplication.cs index b7078c5..7fae5cd 100644 --- a/SparklerNet/HostApplication/SparkplugHostApplication.cs +++ b/SparklerNet/HostApplication/SparkplugHostApplication.cs @@ -1,7 +1,5 @@ -using System.Diagnostics.CodeAnalysis; using System.Text.Json; using Google.Protobuf; -using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Logging; using MQTTnet; using MQTTnet.Formatter; @@ -32,24 +30,22 @@ namespace SparklerNet.HostApplication; public class SparkplugHostApplication { private readonly SparkplugMessageEvents _events = new(); - private readonly ILogger _logger; - private readonly MqttClientOptions _mqttOptions; - - [SuppressMessage("Performance", "CA1859:Use concrete types when possible for improved performance")] - private readonly IMessageOrderingService _msgOrderingService; - + private readonly IMessageOrderingService _orderingService; private readonly SparkplugClientOptions _sparkplugOptions; + private readonly IStatusTrackingService _trackingService; /// /// Create a new instance of the Sparkplug Host Application. /// /// The MQTT Client Options. /// The Sparkplug Client Options. + /// The Message Ordering Service. + /// The Status Tracking Service. /// The Logger Factory. - public SparkplugHostApplication(MqttClientOptions mqttOptions, SparkplugClientOptions sparkplugOptions, - ILoggerFactory loggerFactory) + public SparkplugHostApplication(MqttClientOptions mqttOptions, SparkplugClientOptions sparkplugOptions, + IMessageOrderingService orderingService, IStatusTrackingService trackingService, ILoggerFactory loggerFactory) { // Validate sparkplugOptions SparkplugNamespace.ValidateNamespaceElement(sparkplugOptions.HostApplicationId, @@ -57,8 +53,8 @@ public SparkplugHostApplication(MqttClientOptions mqttOptions, SparkplugClientOp _mqttOptions = mqttOptions; _sparkplugOptions = sparkplugOptions; - var memoryCache = new MemoryCache(new MemoryCacheOptions()); - _msgOrderingService = new MessageOrderingService(memoryCache, _sparkplugOptions, loggerFactory); + _orderingService = orderingService; + _trackingService = trackingService; _logger = loggerFactory.CreateLogger(); // Create a new MQTT client. @@ -70,8 +66,8 @@ public SparkplugHostApplication(MqttClientOptions mqttOptions, SparkplugClientOp MqttClient.DisconnectedAsync += HandleDisconnectedAsync; // Set the rebirth message request delegate and the pending messages processed delegate - _msgOrderingService.OnRebirthRequested = HandleRebirthRequested; - _msgOrderingService.OnPendingMessages = HandlePendingMessages; + _orderingService.OnRebirthRequested = HandleRebirthRequested; + _orderingService.OnPendingMessages = HandlePendingMessages; } // MQTT Client @@ -82,28 +78,27 @@ public SparkplugHostApplication(MqttClientOptions mqttOptions, SparkplugClientOp /// /// The group ID of the entity requiring rebirth /// The edge node ID of the entity requiring rebirth - /// The device ID of the entity requiring rebirth (optional) - private async Task HandleRebirthRequested(string groupId, string edgeNodeId, string? deviceId = null) + private async Task HandleRebirthRequested(string groupId, string edgeNodeId) { try { - if (deviceId != null) await this.PublishDeviceRebirthCommandAsync(groupId, edgeNodeId, deviceId); - else await this.PublishEdgeNodeRebirthCommandAsync(groupId, edgeNodeId); + await this.PublishEdgeNodeRebirthCommandAsync(groupId, edgeNodeId); } catch (Exception ex) { _logger.LogError(ex, - "Exception occurred while sending rebirth command. GroupId: {GroupId}, EdgeNodeId: {EdgeNodeId}, DeviceId: {DeviceId}", - groupId, edgeNodeId, deviceId); + "Exception occurred while sending rebirth command. GroupId: {GroupId}, EdgeNodeId: {EdgeNodeId}", + groupId, edgeNodeId); } } /// - /// Processes data messages (NDATA and DDATA) and invokes the appropriate event. - /// When the ProcessDisorderedMessages option is enabled, pending messages will be processed in the order they were - /// received. + /// Processes pending messages (NDATA DDATA DBIRTH & DDEATH) and invokes the appropriate event. + /// When the EnableMessageOrdering option is enabled, discontinuous messages will be cached until their sequence is + /// completed or a timeout is reached. When the EnableMessageOrdering option is disabled, messages will be processed + /// directly in the order they arrive. /// - /// The collection of NDATA and DDATA messages to process. + /// The collection of messages to process. private async Task HandlePendingMessages(IEnumerable messages) { foreach (var message in messages) @@ -114,6 +109,8 @@ private async Task HandlePendingMessages(IEnumerable { NDATA => _events.EdgeNodeDataReceivedEvent.InvokeAsync(message), DDATA => _events.DeviceDataReceivedEvent.InvokeAsync(message), + DBIRTH => _events.DeviceBirthReceivedEvent.InvokeAsync(message), + DDEATH => _events.DeviceDeathReceivedEvent.InvokeAsync(message), _ => Task.CompletedTask // Other message types are ignored }); } @@ -165,8 +162,9 @@ private async Task HandlePendingMessages(IEnumerable _logger.LogWarning(ex, "Exception occurred while handling client connected event"); } - _logger.LogInformation("Successfully started Sparkplug Host Application {HostApplicationId}.", - _sparkplugOptions.HostApplicationId); + _logger.LogInformation( + "Successfully started Sparkplug Host Application {HostApplicationId} with MQTT client id {ClientId}.", + _sparkplugOptions.HostApplicationId, _mqttOptions.ClientId); return (connectResult, subscribeResult); } @@ -184,12 +182,10 @@ public async Task StopAsync() await PublishStateMessageAsync(false, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); await MqttClient.DisconnectAsync(); - // If the ProcessDisorderedMessages option is enabled, clear the cache and process pending messages - if (_sparkplugOptions.EnableMessageOrdering) - { - var pendingMessages = _msgOrderingService.GetAllMessagesAndClearCache(); - await HandlePendingMessages(pendingMessages); - } + // Clear the caches + if (_sparkplugOptions.EnableMessageOrdering) await _orderingService.ClearCacheAsync(); + if (_sparkplugOptions.EnableStatusTracking) await _trackingService.ClearCacheAsync(); + CacheHelper.ClearSemaphores(); _logger.LogInformation("Successfully stopped Sparkplug Host Application {HostApplicationId}.", _sparkplugOptions.HostApplicationId); @@ -261,18 +257,17 @@ protected async Task ConnectAsync(long timestamp) /// The task result contains the MQTT subscribe result. protected async Task SubscribeAsync() { - // Remove the self (STATE) subscription if present. + // Remove the Sparkplug specification related subscriptions if present. var stateTopic = SparkplugTopicFactory.CreateStateTopic( _sparkplugOptions.Version, _sparkplugOptions.HostApplicationId); - _sparkplugOptions.Subscriptions.RemoveAll(topicFilter => topicFilter.Topic == stateTopic); + var spBTopic = SparkplugTopicFactory.CreateSparkplugWildcardTopic(_sparkplugOptions.Version); + _sparkplugOptions.Subscriptions.RemoveAll(topicFilter => + topicFilter.Topic == stateTopic || topicFilter.Topic == spBTopic); - // Add the default Sparkplug wildcard subscription if the subscriptions option is empty. - if (_sparkplugOptions.Subscriptions.Count == 0) - { - var spBTopic = SparkplugTopicFactory.CreateSparkplugWildcardTopic(_sparkplugOptions.Version); + // Add the default Sparkplug wildcard subscription if AlwaysSubscribeToWildcardTopic is set to true or the subscriptions are empty. + if (_sparkplugOptions.AlwaysSubscribeToWildcardTopic || _sparkplugOptions.Subscriptions.Count == 0) _sparkplugOptions.Subscriptions.Add( new MqttTopicFilterBuilder().WithTopic(spBTopic).WithAtLeastOnceQoS().Build()); - } // Add the self (STATE) subscription. _sparkplugOptions.Subscriptions.Add( @@ -428,8 +423,7 @@ await _events.StateReceivedEvent.InvokeAsync( var protoPayload = ProtoPayload.Parser.ParseFrom(eventArgs.ApplicationMessage.Payload); var payload = protoPayload.ToPayload(); - var message = new SparkplugMessageEventArgs(version, messageType, groupId!, edgeNodeId!, deviceId, payload, - eventArgs); + var message = new SparkplugMessageEventArgs(version, messageType, groupId!, edgeNodeId!, deviceId, payload); // Process messages based on the message type await (messageType switch @@ -470,7 +464,7 @@ private async Task ProcessDataMessagesAsync(SparkplugMessageEventArgs message) { // Initialize the pending messages based on the message ordering configuration var messages = _sparkplugOptions.EnableMessageOrdering - ? _msgOrderingService.ProcessMessageOrder(message) + ? await _orderingService.ProcessMessageOrderAsync(message) : [message]; // Process all messages in order @@ -487,27 +481,49 @@ private async Task ProcessDataMessagesAsync(SparkplugMessageEventArgs message) /// A task that represents the asynchronous operation. private async Task ProcessBirthDeathMessagesAsync(SparkplugMessageEventArgs message) { - if (_sparkplugOptions.EnableMessageOrdering) - // Clear message order cache for the edge node or device - _msgOrderingService.ClearMessageOrder(message.GroupId, message.EdgeNodeId, message.DeviceId); - - try + if (message.MessageType is NBIRTH or NDEATH) { - // Raise the appropriate event - await (message.MessageType switch + if (_sparkplugOptions.EnableStatusTracking) + // Update the Edge Node online status + await _trackingService.UpdateEdgeNodeOnlineStatus(message.GroupId, message.EdgeNodeId, + message.MessageType == NBIRTH, message.Payload.GetBdSeq(), message.Payload.Timestamp); + + if (_sparkplugOptions.EnableMessageOrdering) + // Reset the message order cache for the edge node + await _orderingService.ResetMessageOrderAsync(message.GroupId, message.EdgeNodeId); + + try { - NBIRTH => _events.EdgeNodeBirthReceivedEvent.InvokeAsync(message), - NDEATH => _events.EdgeNodeDeathReceivedEvent.InvokeAsync(message), - DBIRTH => _events.DeviceBirthReceivedEvent.InvokeAsync(message), - DDEATH => _events.DeviceDeathReceivedEvent.InvokeAsync(message), - _ => Task.CompletedTask // This case should never be reached due to the method's caller check - }); + // Raise the appropriate event + await (message.MessageType switch + { + NBIRTH => _events.EdgeNodeBirthReceivedEvent.InvokeAsync(message), + NDEATH => _events.EdgeNodeDeathReceivedEvent.InvokeAsync(message), + _ => Task.CompletedTask // This case should never be reached due to the method's caller check + }); + } + catch (Exception ex) + { + _logger.LogWarning(ex, + "Exception occurred while handling message received event. GroupId: {GroupId}, EdgeNodeId: {EdgeNodeId}, DeviceId: {DeviceId}", + message.GroupId, message.EdgeNodeId, message.DeviceId); + } } - catch (Exception ex) + + if (message.MessageType is DBIRTH or DDEATH) { - _logger.LogWarning(ex, - "Exception occurred while handling message received event. GroupId: {GroupId}, EdgeNodeId: {EdgeNodeId}, DeviceId: {DeviceId}", - message.GroupId, message.EdgeNodeId, message.DeviceId); + if (_sparkplugOptions.EnableStatusTracking) + // Update the Device online status + await _trackingService.UpdateDeviceOnlineStatus(message.GroupId, message.EdgeNodeId, message.DeviceId!, + message.MessageType == DBIRTH, message.Payload.Timestamp); + + // Initialize the pending messages based on the message ordering configuration + var messages = _sparkplugOptions.EnableMessageOrdering + ? await _orderingService.ProcessMessageOrderAsync(message) + : [message]; + + // Process all messages in order + await HandlePendingMessages(messages); } } diff --git a/SparklerNet/SparklerNet.csproj b/SparklerNet/SparklerNet.csproj index 83ef2c0..3494d1e 100644 --- a/SparklerNet/SparklerNet.csproj +++ b/SparklerNet/SparklerNet.csproj @@ -7,7 +7,7 @@ enable - 0.9.6 + 0.9.7 @@ -34,6 +34,7 @@ +