diff --git a/GIT_FLOW.md b/.spec/GIT_FLOW.md similarity index 100% rename from GIT_FLOW.md rename to .spec/GIT_FLOW.md diff --git a/README.md b/README.md index c339ff9..63e6939 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ The library aims to fully implement the complete Sparkplug protocol, with planne - ✅ Default wildcard topic support (spBv1.0/#) - ✅ Specific group and edge node subscription support - ✅ Sparkplug Host Application Message Ordering -- ⬜ Cache Edge Node and Device birth certificates +- ✅ Cache Edge Node and Device online status ### Message Processing @@ -109,6 +109,15 @@ Or reference it directly in your project: Here's a simple example of a Sparkplug host application: ```csharp +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using MQTTnet; +using SparklerNet.Core.Constants; +using SparklerNet.Core.Events; +using SparklerNet.HostApplication; +using SparklerNet.HostApplication.Caches; +using SparklerNet.HostApplication.Extensions; + // Create MQTT client options var mqttOptions = new MqttClientOptionsBuilder() .WithTcpServer("localhost", 1883) @@ -122,15 +131,36 @@ var sparkplugOptions = new SparkplugClientOptions HostApplicationId = "MyHostApplication" }; -// Create logger -var loggerFactory = LoggerFactory.Create(builder => +// Create the logger factory +using var loggerFactory = LoggerFactory.Create(builder => { builder.AddConsole(); + builder.SetMinimumLevel(LogLevel.Information); }); -var logger = loggerFactory.CreateLogger(); -// Create and start host application -var hostApplication = new SparkplugHostApplication(mqttOptions, sparkplugOptions, logger); +// Create the dependency injection container +var services = new ServiceCollection(); + +// Register the singleton services +services.AddSingleton(mqttOptions); +services.AddSingleton(sparkplugOptions); +services.AddSingleton(loggerFactory); +services.AddSingleton(loggerFactory); + +// Register cache services +services.AddMemoryCache(); +services.AddHybridCache(); + +// Register the SparklerNet services +services.AddSingleton(); +services.AddSingleton(); +services.AddSingleton(); + +// Build service provider +var serviceProvider = services.BuildServiceProvider(); + +// Resolve SparkplugHostApplication from the container +var hostApplication = serviceProvider.GetRequiredService(); // Subscribe to DBIRTH event hostApplication.DeviceBirthReceivedAsync += args => { @@ -151,17 +181,16 @@ await hostApplication.StopAsync(); ## Sample Application -The project includes a comprehensive sample named `SimpleHostApplication` that demonstrates a complete Sparkplug Host Application implementation with the following features: +The project includes a sample named `SimpleHostApplication` demonstrating a complete Sparkplug Host Application implementation with these core features: -- **Interactive Command-Line Interface**: Provides a user-friendly console interface with commands for controlling the application lifecycle and sending commands -- **Complete Event Handling**: Demonstrates subscription and processing of all Sparkplug message types (NBIRTH, NDATA, NDEATH, DBIRTH, DDATA, DDEATH, STATE) -- **Robust Error Handling**: Includes comprehensive exception handling throughout the application lifecycle -- **Advanced Logging System**: Implements structured logging using Serilog, providing detailed information about message reception and processing -- **Command Sending Capabilities**: Allows sending rebirth commands to both Edge Nodes and Devices with customizable parameters -- **User-Friendly Input**: Features command prompts with default values for improved user experience -- **Detailed Data Display**: Shows comprehensive information about received messages including timestamps, sequences, and all metrics with their types and values +- **Interactive CLI**: User-friendly console interface for application lifecycle management and command sending +- **Full Event Processing**: Handles all Sparkplug message types with detailed data display +- **Command Capabilities**: Sends Rebirth and ScanRate commands to Edge Nodes (NCMD) and Devices (DCMD) +- **Configuration Profiles**: Supports multiple profiles (mimic, tck) via `--profile` command line argument +- **Dependency Injection**: Uses Microsoft.Extensions.DependencyInjection for service management +- **Structured Logging**: Implements Serilog for detailed message and processing logging -Please refer to the `SparklerNet.Samples` project for the complete implementation and to see these features in action. +Refer to the `SparklerNet.Samples` project for the complete implementation. ## Project Structure @@ -190,6 +219,10 @@ Please refer to the `SparklerNet.Samples` project for the complete implementatio │ │ └── AssemblyInfo.cs # Assembly information │ └── SparklerNet.csproj # Core library project file ├── SparklerNet.Samples/ # Sample application +│ ├── Profiles/ # Configuration profiles +│ │ ├── IProfile.cs # Profile interface +│ │ ├── MimicApplicationProfile.cs # Mimic application profile +│ │ └── TCKApplicationProfile.cs # TCK application profile │ ├── Program.cs # Sample program entry point │ ├── SimpleHostApplication.cs # Simple host application implementation │ └── SparklerNet.Samples.csproj # Sample project file @@ -223,21 +256,13 @@ SparklerNet supports the following Sparkplug B message types: ## Dependencies - Google.Protobuf (3.33.0) +- Microsoft.Extensions.Caching.Hybrid (9.10.0) - Microsoft.Extensions.Caching.Memory (9.0.10) - Microsoft.Extensions.Logging (9.0.10) - MQTTnet (5.0.1.1416) - System.Net.Http (4.3.4) - System.Text.RegularExpressions (4.3.1) -## Contribution Guidelines - -Contributions via Pull Requests and Issues are welcome. Before submitting code, please ensure: - -1. Follow the project's code style and [Git Flow](GIT_FLOW.md) -2. Add necessary tests -3. Ensure all tests pass -4. Provide detailed code explanations - ## License This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. diff --git a/SparklerNet/Core/Options/SparkplugClientOptions.cs b/SparklerNet/Core/Options/SparkplugClientOptions.cs index afd66a8..d222fd8 100644 --- a/SparklerNet/Core/Options/SparkplugClientOptions.cs +++ b/SparklerNet/Core/Options/SparkplugClientOptions.cs @@ -39,6 +39,14 @@ public record SparkplugClientOptions /// public bool AlwaysSubscribeToWildcardTopic { get; set; } + /// + /// Whether to enable the status tracking mechanism. When enabled, the Sparkplug Host Application will track the status + /// of each Edge Node and Device. Users can more conveniently check the online status of Edge Nodes and Devices without + /// having to implement this logic themselves. + /// The default value is true. + /// + public bool EnableStatusTracking { get; set; } = true; + /// /// Whether to enable the message ordering mechanism. The specification requires Sparkplug Host Application to ensure /// that all messages arrive within a Reorder Timeout. diff --git a/SparklerNet/HostApplication/Caches/Readme.md b/SparklerNet/HostApplication/Caches/MessageOrderingService.md similarity index 85% rename from SparklerNet/HostApplication/Caches/Readme.md rename to SparklerNet/HostApplication/Caches/MessageOrderingService.md index d1170cb..2d1a9fc 100644 --- a/SparklerNet/HostApplication/Caches/Readme.md +++ b/SparklerNet/HostApplication/Caches/MessageOrderingService.md @@ -110,33 +110,23 @@ OnReorderTimeout(object? state) ### Reorder Clearing Flow ``` -ClearMessageOrder(groupId, edgeNodeId, deviceId) +ResetMessageOrderAsync(groupId, edgeNodeId) ├── Build all required cache keys (seqKey, pendingKey, timerKey) -├── Acquire fine-grained lock for thread safety on specific device/node combination +├── Acquire fine-grained lock for thread safety ├── Remove sequence number cache entry ├── Remove pending messages cache entry -├── Remove entry from _cachedSeqKeys and _cachedPendingKeys collections ├── Remove and dispose timer if it exists └── Exit lock ``` -### Get All Messages and Clear Cache Flow +### Clear Cache Flow ``` -GetAllMessagesAndClearCache() -├── Obtain snapshot of timer keys to avoid concurrent modification exceptions -├── Dispose all reorder timers to prevent callbacks during cache clearing -│ ├── Try to remove each timer from _reorderTimers collection -│ ├── Call Dispose() on each removed timer -├── Initialize result list for all cached messages -├── Obtain snapshot of pending cache keys to avoid concurrent modification exceptions -├── Process each pending key -│ ├── Try to get pending messages from cache -│ ├── Add all pending messages to result list -│ ├── Remove pending messages from cache -├── Clear _cachedPendingKeys collection -├── Obtain snapshot of sequence cache keys to avoid concurrent modification exceptions -├── Remove each sequence key from cache -├── Clear _cachedSeqKeys collection -└── Return result list containing all previously cached messages +ClearCacheAsync() +├── Remove all message ordering related cache entries using the global tag +├── Dispose all reorder timers to prevent memory leaks +│ ├── Iterate through all timers in _reorderTimers collection +│ ├── Call DisposeAsync() on each timer +│ └── Clear the _reorderTimers collection +└── Return completion task ``` diff --git a/SparklerNet/HostApplication/SparkplugHostApplication.cs b/SparklerNet/HostApplication/SparkplugHostApplication.cs index 6747a34..7fae5cd 100644 --- a/SparklerNet/HostApplication/SparkplugHostApplication.cs +++ b/SparklerNet/HostApplication/SparkplugHostApplication.cs @@ -163,7 +163,7 @@ private async Task HandlePendingMessages(IEnumerable } _logger.LogInformation( - "Successfully started Sparkplug Host Application {HostApplicationId} with MQTT clint id {ClientId}.", + "Successfully started Sparkplug Host Application {HostApplicationId} with MQTT client id {ClientId}.", _sparkplugOptions.HostApplicationId, _mqttOptions.ClientId); return (connectResult, subscribeResult); } @@ -183,8 +183,8 @@ public async Task StopAsync() await MqttClient.DisconnectAsync(); // Clear the caches - await _orderingService.ClearCacheAsync(); - await _trackingService.ClearCacheAsync(); + if (_sparkplugOptions.EnableMessageOrdering) await _orderingService.ClearCacheAsync(); + if (_sparkplugOptions.EnableStatusTracking) await _trackingService.ClearCacheAsync(); CacheHelper.ClearSemaphores(); _logger.LogInformation("Successfully stopped Sparkplug Host Application {HostApplicationId}.", @@ -483,9 +483,10 @@ private async Task ProcessBirthDeathMessagesAsync(SparkplugMessageEventArgs mess { if (message.MessageType is NBIRTH or NDEATH) { - // Update the Edge Node online status - await _trackingService.UpdateEdgeNodeOnlineStatus(message.GroupId, message.EdgeNodeId, - message.MessageType == NBIRTH, message.Payload.GetBdSeq(), message.Payload.Timestamp); + if (_sparkplugOptions.EnableStatusTracking) + // Update the Edge Node online status + await _trackingService.UpdateEdgeNodeOnlineStatus(message.GroupId, message.EdgeNodeId, + message.MessageType == NBIRTH, message.Payload.GetBdSeq(), message.Payload.Timestamp); if (_sparkplugOptions.EnableMessageOrdering) // Reset the message order cache for the edge node @@ -511,9 +512,10 @@ await _trackingService.UpdateEdgeNodeOnlineStatus(message.GroupId, message.EdgeN if (message.MessageType is DBIRTH or DDEATH) { - // Update the Device online status - await _trackingService.UpdateDeviceOnlineStatus(message.GroupId, message.EdgeNodeId, message.DeviceId!, - message.MessageType == DBIRTH, message.Payload.Timestamp); + if (_sparkplugOptions.EnableStatusTracking) + // Update the Device online status + await _trackingService.UpdateDeviceOnlineStatus(message.GroupId, message.EdgeNodeId, message.DeviceId!, + message.MessageType == DBIRTH, message.Payload.Timestamp); // Initialize the pending messages based on the message ordering configuration var messages = _sparkplugOptions.EnableMessageOrdering diff --git a/SparklerNet/SparklerNet.csproj b/SparklerNet/SparklerNet.csproj index fab88a5..3494d1e 100644 --- a/SparklerNet/SparklerNet.csproj +++ b/SparklerNet/SparklerNet.csproj @@ -7,7 +7,7 @@ enable - 0.9.7-dev + 0.9.7