Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
73 changes: 49 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The library aims to fully implement the complete Sparkplug protocol, with planne
- ✅ Default wildcard topic support (spBv1.0/#)
- ✅ Specific group and edge node subscription support
- ✅ Sparkplug Host Application Message Ordering
- Cache Edge Node and Device birth certificates
- Cache Edge Node and Device online status

### Message Processing

Expand Down Expand Up @@ -109,6 +109,15 @@ Or reference it directly in your project:
Here's a simple example of a Sparkplug host application:

```csharp
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using MQTTnet;
using SparklerNet.Core.Constants;
using SparklerNet.Core.Events;
using SparklerNet.HostApplication;
using SparklerNet.HostApplication.Caches;
using SparklerNet.HostApplication.Extensions;

// Create MQTT client options
var mqttOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost", 1883)
Expand All @@ -122,15 +131,36 @@ var sparkplugOptions = new SparkplugClientOptions
HostApplicationId = "MyHostApplication"
};

// Create logger
var loggerFactory = LoggerFactory.Create(builder =>
// Create the logger factory
using var loggerFactory = LoggerFactory.Create(builder =>
{
builder.AddConsole();
builder.SetMinimumLevel(LogLevel.Information);
});
var logger = loggerFactory.CreateLogger<SparkplugHostApplication>();

// Create and start host application
var hostApplication = new SparkplugHostApplication(mqttOptions, sparkplugOptions, logger);
// Create the dependency injection container
var services = new ServiceCollection();

// Register the singleton services
services.AddSingleton(mqttOptions);
services.AddSingleton(sparkplugOptions);
services.AddSingleton(loggerFactory);
services.AddSingleton<ILoggerFactory>(loggerFactory);

// Register cache services
services.AddMemoryCache();
services.AddHybridCache();

// Register the SparklerNet services
services.AddSingleton<IMessageOrderingService, MessageOrderingService>();
services.AddSingleton<IStatusTrackingService, StatusTrackingService>();
services.AddSingleton<SparkplugHostApplication>();

// Build service provider
var serviceProvider = services.BuildServiceProvider();

// Resolve SparkplugHostApplication from the container
var hostApplication = serviceProvider.GetRequiredService<SparkplugHostApplication>();

// Subscribe to DBIRTH event
hostApplication.DeviceBirthReceivedAsync += args => {
Expand All @@ -151,17 +181,16 @@ await hostApplication.StopAsync();

## Sample Application

The project includes a comprehensive sample named `SimpleHostApplication` that demonstrates a complete Sparkplug Host Application implementation with the following features:
The project includes a sample named `SimpleHostApplication` demonstrating a complete Sparkplug Host Application implementation with these core features:

- **Interactive Command-Line Interface**: Provides a user-friendly console interface with commands for controlling the application lifecycle and sending commands
- **Complete Event Handling**: Demonstrates subscription and processing of all Sparkplug message types (NBIRTH, NDATA, NDEATH, DBIRTH, DDATA, DDEATH, STATE)
- **Robust Error Handling**: Includes comprehensive exception handling throughout the application lifecycle
- **Advanced Logging System**: Implements structured logging using Serilog, providing detailed information about message reception and processing
- **Command Sending Capabilities**: Allows sending rebirth commands to both Edge Nodes and Devices with customizable parameters
- **User-Friendly Input**: Features command prompts with default values for improved user experience
- **Detailed Data Display**: Shows comprehensive information about received messages including timestamps, sequences, and all metrics with their types and values
- **Interactive CLI**: User-friendly console interface for application lifecycle management and command sending
- **Full Event Processing**: Handles all Sparkplug message types with detailed data display
- **Command Capabilities**: Sends Rebirth and ScanRate commands to Edge Nodes (NCMD) and Devices (DCMD)
- **Configuration Profiles**: Supports multiple profiles (mimic, tck) via `--profile` command line argument
- **Dependency Injection**: Uses Microsoft.Extensions.DependencyInjection for service management
- **Structured Logging**: Implements Serilog for detailed message and processing logging

Please refer to the `SparklerNet.Samples` project for the complete implementation and to see these features in action.
Refer to the `SparklerNet.Samples` project for the complete implementation.

## Project Structure

Expand Down Expand Up @@ -190,6 +219,10 @@ Please refer to the `SparklerNet.Samples` project for the complete implementatio
│ │ └── AssemblyInfo.cs # Assembly information
│ └── SparklerNet.csproj # Core library project file
├── SparklerNet.Samples/ # Sample application
│ ├── Profiles/ # Configuration profiles
│ │ ├── IProfile.cs # Profile interface
│ │ ├── MimicApplicationProfile.cs # Mimic application profile
│ │ └── TCKApplicationProfile.cs # TCK application profile
│ ├── Program.cs # Sample program entry point
│ ├── SimpleHostApplication.cs # Simple host application implementation
│ └── SparklerNet.Samples.csproj # Sample project file
Expand Down Expand Up @@ -223,21 +256,13 @@ SparklerNet supports the following Sparkplug B message types:
## Dependencies

- Google.Protobuf (3.33.0)
- Microsoft.Extensions.Caching.Hybrid (9.10.0)
- Microsoft.Extensions.Caching.Memory (9.0.10)
- Microsoft.Extensions.Logging (9.0.10)
- MQTTnet (5.0.1.1416)
- System.Net.Http (4.3.4)
- System.Text.RegularExpressions (4.3.1)

## Contribution Guidelines

Contributions via Pull Requests and Issues are welcome. Before submitting code, please ensure:

1. Follow the project's code style and [Git Flow](GIT_FLOW.md)
2. Add necessary tests
3. Ensure all tests pass
4. Provide detailed code explanations

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
8 changes: 8 additions & 0 deletions SparklerNet/Core/Options/SparkplugClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ public record SparkplugClientOptions
/// </summary>
public bool AlwaysSubscribeToWildcardTopic { get; set; }

/// <summary>
/// Whether to enable the status tracking mechanism. When enabled, the Sparkplug Host Application will track the status
/// of each Edge Node and Device. Users can more conveniently check the online status of Edge Nodes and Devices without
/// having to implement this logic themselves.
/// The default value is true.
/// </summary>
public bool EnableStatusTracking { get; set; } = true;

/// <summary>
/// Whether to enable the message ordering mechanism. The specification requires Sparkplug Host Application to ensure
/// that all messages arrive within a Reorder Timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,33 +110,23 @@ OnReorderTimeout(object? state)
### Reorder Clearing Flow

```
ClearMessageOrder(groupId, edgeNodeId, deviceId)
ResetMessageOrderAsync(groupId, edgeNodeId)
├── Build all required cache keys (seqKey, pendingKey, timerKey)
├── Acquire fine-grained lock for thread safety on specific device/node combination
├── Acquire fine-grained lock for thread safety
├── Remove sequence number cache entry
├── Remove pending messages cache entry
├── Remove entry from _cachedSeqKeys and _cachedPendingKeys collections
├── Remove and dispose timer if it exists
└── Exit lock
```

### Get All Messages and Clear Cache Flow
### Clear Cache Flow

```
GetAllMessagesAndClearCache()
├── Obtain snapshot of timer keys to avoid concurrent modification exceptions
├── Dispose all reorder timers to prevent callbacks during cache clearing
│ ├── Try to remove each timer from _reorderTimers collection
│ ├── Call Dispose() on each removed timer
├── Initialize result list for all cached messages
├── Obtain snapshot of pending cache keys to avoid concurrent modification exceptions
├── Process each pending key
│ ├── Try to get pending messages from cache
│ ├── Add all pending messages to result list
│ ├── Remove pending messages from cache
├── Clear _cachedPendingKeys collection
├── Obtain snapshot of sequence cache keys to avoid concurrent modification exceptions
├── Remove each sequence key from cache
├── Clear _cachedSeqKeys collection
└── Return result list containing all previously cached messages
ClearCacheAsync()
├── Remove all message ordering related cache entries using the global tag
├── Dispose all reorder timers to prevent memory leaks
│ ├── Iterate through all timers in _reorderTimers collection
│ ├── Call DisposeAsync() on each timer
│ └── Clear the _reorderTimers collection
└── Return completion task
```
20 changes: 11 additions & 9 deletions SparklerNet/HostApplication/SparkplugHostApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private async Task HandlePendingMessages(IEnumerable<SparkplugMessageEventArgs>
}

_logger.LogInformation(
"Successfully started Sparkplug Host Application {HostApplicationId} with MQTT clint id {ClientId}.",
"Successfully started Sparkplug Host Application {HostApplicationId} with MQTT client id {ClientId}.",
_sparkplugOptions.HostApplicationId, _mqttOptions.ClientId);
return (connectResult, subscribeResult);
}
Expand All @@ -183,8 +183,8 @@ public async Task StopAsync()
await MqttClient.DisconnectAsync();

// Clear the caches
await _orderingService.ClearCacheAsync();
await _trackingService.ClearCacheAsync();
if (_sparkplugOptions.EnableMessageOrdering) await _orderingService.ClearCacheAsync();
if (_sparkplugOptions.EnableStatusTracking) await _trackingService.ClearCacheAsync();
CacheHelper.ClearSemaphores();

_logger.LogInformation("Successfully stopped Sparkplug Host Application {HostApplicationId}.",
Expand Down Expand Up @@ -483,9 +483,10 @@ private async Task ProcessBirthDeathMessagesAsync(SparkplugMessageEventArgs mess
{
if (message.MessageType is NBIRTH or NDEATH)
{
// Update the Edge Node online status
await _trackingService.UpdateEdgeNodeOnlineStatus(message.GroupId, message.EdgeNodeId,
message.MessageType == NBIRTH, message.Payload.GetBdSeq(), message.Payload.Timestamp);
if (_sparkplugOptions.EnableStatusTracking)
// Update the Edge Node online status
await _trackingService.UpdateEdgeNodeOnlineStatus(message.GroupId, message.EdgeNodeId,
message.MessageType == NBIRTH, message.Payload.GetBdSeq(), message.Payload.Timestamp);

if (_sparkplugOptions.EnableMessageOrdering)
// Reset the message order cache for the edge node
Expand All @@ -511,9 +512,10 @@ await _trackingService.UpdateEdgeNodeOnlineStatus(message.GroupId, message.EdgeN

if (message.MessageType is DBIRTH or DDEATH)
{
// Update the Device online status
await _trackingService.UpdateDeviceOnlineStatus(message.GroupId, message.EdgeNodeId, message.DeviceId!,
message.MessageType == DBIRTH, message.Payload.Timestamp);
if (_sparkplugOptions.EnableStatusTracking)
// Update the Device online status
await _trackingService.UpdateDeviceOnlineStatus(message.GroupId, message.EdgeNodeId, message.DeviceId!,
message.MessageType == DBIRTH, message.Payload.Timestamp);

// Initialize the pending messages based on the message ordering configuration
var messages = _sparkplugOptions.EnableMessageOrdering
Expand Down
2 changes: 1 addition & 1 deletion SparklerNet/SparklerNet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<Nullable>enable</Nullable>

<!-- Version Information -->
<Version>0.9.7-dev</Version>
<Version>0.9.7</Version>

<!-- NuGet Package Configuration -->
<!-- Basic Info -->
Expand Down
Loading