diff --git a/util/PerfTest/PerfTest.cs b/util/PerfTest/PerfTest.cs
index 4439973..c310a92 100644
--- a/util/PerfTest/PerfTest.cs
+++ b/util/PerfTest/PerfTest.cs
@@ -1,61 +1,361 @@
+using System.Buffers;
using System.Collections.Concurrent;
+using System.Diagnostics;
+using Microsoft.Extensions.Caching.Distributed;
+using Microsoft.Extensions.Logging.Abstractions;
+using Microsoft.Extensions.Options;
using NATS.Client.Core;
+using NATS.Client.KeyValueStore;
+using NATS.Net;
namespace CodeCargo.NatsDistributedCache.PerfTest;
+///
+/// Performance test for NatsCache that tracks cache operations
+///
public class PerfTest
{
+ // Configuration
+ private const int MaxTestRuntimeSecs = 60;
+ private const int NumCacheItems = 10_000;
+ private const int FieldWidth = 12;
+ private const int BatchSize = 100;
+ private const int ValueDataSizeBytes = 256;
+ private const int AbsoluteExpirationSecs = 10;
+ private const int InsertDelayMs = 10;
+ private const int RetrieveDelayMs = 5;
+ private const int RemoveDelayMs = 50;
+ private const int StatsUpdateIntervalMs = 500;
+ private const int StatsPrintIntervalMs = 1000;
+
+ // Dependencies
private readonly INatsConnection _nats;
+ private readonly NatsCache _cache;
+
+ // Stats tracking
+ private readonly Stopwatch _stopwatch = new();
private readonly ConcurrentDictionary _stats = new();
+ private readonly List _activeKeys = [];
+ private readonly Random _random = new(42); // Fixed seed for reproducibility
+
+ // Operation tracking via kvstore watcher
+ private readonly SemaphoreSlim _watchLock = new(1, 1);
+ private NatsKVStore? _kvStore;
public PerfTest(INatsConnection nats)
{
_nats = nats;
+ var options = Options.Create(new NatsCacheOptions
+ {
+ BucketName = "cache"
+ });
+
+ _cache = new NatsCache(options, NullLogger.Instance, nats);
+
// Initialize statistics counters
_stats["KeysInserted"] = 0;
_stats["KeysRetrieved"] = 0;
+ _stats["KeysRemoved"] = 0;
_stats["KeysExpired"] = 0;
}
public async Task Run(CancellationToken cancellationToken)
{
- var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ Console.WriteLine($"Starting CodeCargo NatsDistributedCache Performance Test - {DateTime.Now}");
+ Console.WriteLine($"Testing with {NumCacheItems} cache items");
+
+ using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ cts.CancelAfter(TimeSpan.FromSeconds(MaxTestRuntimeSecs));
+
+ // Create and connect the watcher to track operations
+ await InitializeWatcher(cts.Token);
- // Create tasks for printing stats and watching metrics
+ // Start timer
+ _stopwatch.Start();
+
+ // Create tasks for the test operations
var printTask = StartPrintTask(cts.Token);
var watchTask = StartWatchTask(cts.Token);
+ var insertTask = StartInsertTask(cts.Token);
+ var retrieveTask = StartRetrieveTask(cts.Token);
+ var removeTask = StartRemoveTask(cts.Token);
+
+ try
+ {
+ // Wait for all tasks to complete or cancellation
+ await Task.WhenAll(watchTask, insertTask, retrieveTask, removeTask);
+ }
+ catch (OperationCanceledException)
+ {
+ // Expected when test completes
+ }
+ finally
+ {
+ // Stop stopwatch
+ _stopwatch.Stop();
+
+ // Make sure print task completes
+ await cts.CancelAsync();
+ try
+ {
+ await printTask;
+ }
+ catch (OperationCanceledException)
+ {
+ // Ignore
+ }
+
+ PrintFinalStats();
+ }
+ }
- // todo: test logic
- await Task.Delay(TimeSpan.FromSeconds(5), cts.Token);
+ private static string FormatElapsedTime(TimeSpan elapsed) =>
+ $"{Math.Floor(elapsed.TotalMinutes):00}:{elapsed.Seconds:00}.{elapsed.Milliseconds / 10:00}";
- await cts.CancelAsync();
+ private byte[] GenerateRandomData(int size)
+ {
+ var data = new byte[size];
+ _random.NextBytes(data);
+ return data;
+ }
- // Wait for all tasks to complete or cancellation
- await Task.WhenAll(printTask, watchTask);
+ private async Task InitializeWatcher(CancellationToken ct)
+ {
+ await _watchLock.WaitAsync(ct);
+ try
+ {
+ if (_kvStore == null)
+ {
+ var jsContext = _nats.CreateJetStreamContext();
+ var kvContext = new NatsKVContext(jsContext);
+ _kvStore = (NatsKVStore)await kvContext.GetStoreAsync("cache", ct);
+ }
+ }
+ finally
+ {
+ _watchLock.Release();
+ }
}
- private static Task StartWatchTask(CancellationToken ct) =>
+ private Task StartInsertTask(CancellationToken ct) =>
+ Task.Run(
+ async () =>
+ {
+ try
+ {
+ for (var i = 0; i < NumCacheItems && !ct.IsCancellationRequested; i++)
+ {
+ // Create a batch of items
+ for (var j = 0; j < BatchSize && i + j < NumCacheItems && !ct.IsCancellationRequested; j++)
+ {
+ var key = $"BatchNum{i}IndividualNum{j}";
+ var data = GenerateRandomData(ValueDataSizeBytes);
+
+ try
+ {
+ var options = new DistributedCacheEntryOptions
+ {
+ AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(AbsoluteExpirationSecs),
+ };
+
+ await _cache.SetAsync(key, data, options, ct);
+
+ lock (_activeKeys)
+ {
+ _activeKeys.Add(key);
+ }
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ Console.WriteLine($"Insert error: {ex.Message}");
+ }
+ }
+
+ await Task.Delay(InsertDelayMs, ct);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // Ignore cancellation
+ }
+ },
+ ct);
+
+ private Task StartRetrieveTask(CancellationToken ct) =>
Task.Run(
async () =>
{
try
{
+ // Wait a bit before starting retrieval operations
+ await Task.Delay(1000, ct);
+
while (!ct.IsCancellationRequested)
{
- // TODO: Implement watching for cache operations and updating stats
+ await ProcessSingleRetrievalAsync(ct);
+ await Task.Delay(RetrieveDelayMs, ct);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // Ignore cancellation
+ }
+ },
+ ct);
+
+ private async Task ProcessSingleRetrievalAsync(CancellationToken ct)
+ {
+ var (key, index) = GetRandomActiveKey();
+ if (key == null)
+ return;
+
+ try
+ {
+ var result = await _cache.GetAsync(key, ct);
+ _stats.AddOrUpdate("KeysRetrieved", 1, (_, count) => count + 1);
+
+ if (result == null)
+ {
+ _stats.AddOrUpdate("KeysExpired", 1, (_, count) => count + 1);
+ RemoveExpiredKeyFromActiveList(index);
+ }
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ Console.WriteLine($"Retrieve error: {ex.Message}");
+ }
+ }
+
+ private (string? Key, int Index) GetRandomActiveKey()
+ {
+ lock (_activeKeys)
+ {
+ if (_activeKeys.Count == 0)
+ return (null, -1);
- // Wait before checking again
- await Task.Delay(100, ct);
+ var index = _random.Next(_activeKeys.Count);
+ return (_activeKeys[index], index);
+ }
+ }
+
+ private void RemoveExpiredKeyFromActiveList(int index)
+ {
+ lock (_activeKeys)
+ {
+ if (index >= 0 && index < _activeKeys.Count)
+ {
+ _activeKeys.RemoveAt(index);
+ }
+ }
+ }
+
+ private Task StartRemoveTask(CancellationToken ct) =>
+ Task.Run(
+ async () =>
+ {
+ try
+ {
+ // Wait a bit before starting removal operations
+ await Task.Delay(2000, ct);
+
+ while (!ct.IsCancellationRequested)
+ {
+ string? key = null;
+
+ lock (_activeKeys)
+ {
+ if (_activeKeys.Count > 0)
+ {
+ var index = _random.Next(_activeKeys.Count);
+ key = _activeKeys[index];
+ _activeKeys.RemoveAt(index);
+ }
+ }
+
+ if (key != null)
+ {
+ try
+ {
+ await _cache.RemoveAsync(key, ct);
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ Console.WriteLine($"Remove error: {ex.Message}");
+ }
+ }
+
+ await Task.Delay(RemoveDelayMs, ct);
}
}
catch (OperationCanceledException)
{
- // Ignore cancellation exceptions
+ // Ignore cancellation
}
},
ct);
+ private Task StartWatchTask(CancellationToken ct) =>
+ Task.Run(
+ async () =>
+ {
+ try
+ {
+ if (_kvStore == null)
+ {
+ Console.WriteLine("KV Store not initialized");
+ return;
+ }
+
+ var opsBuffer = new List();
+ var statsUpdateInterval = TimeSpan.FromMilliseconds(StatsUpdateIntervalMs);
+ var lastStatsUpdate = DateTimeOffset.Now;
+
+ await foreach (var entry in _kvStore.WatchAsync>(
+ ">",
+ opts: new NatsKVWatchOpts { MetaOnly = true },
+ cancellationToken: ct))
+ {
+ opsBuffer.Add(entry.Operation);
+ if (DateTimeOffset.Now - lastStatsUpdate <= statsUpdateInterval)
+ {
+ await UpdateStatsIfNeeded(opsBuffer);
+ lastStatsUpdate = DateTimeOffset.Now;
+ }
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ // Ignore cancellation
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine($"Watch error: {ex.Message}");
+ }
+ },
+ ct);
+
+ private Task UpdateStatsIfNeeded(List opsBuffer)
+ {
+ // Process the buffer and update stats
+ // Read operations are tracked directly in StartRetrieveTask
+ var puts = opsBuffer.Count(op => op == NatsKVOperation.Put);
+ var deletes = opsBuffer.Count(op => op == NatsKVOperation.Del);
+
+ if (puts > 0)
+ {
+ _stats.AddOrUpdate("KeysInserted", puts, (_, count) => count + puts);
+ }
+
+ if (deletes > 0)
+ {
+ _stats.AddOrUpdate("KeysRemoved", deletes, (_, count) => count + deletes);
+ }
+
+ opsBuffer.Clear();
+ return Task.CompletedTask;
+ }
+
private Task StartPrintTask(CancellationToken ct) =>
Task.Run(
async () =>
@@ -64,24 +364,66 @@ private Task StartPrintTask(CancellationToken ct) =>
{
while (!ct.IsCancellationRequested)
{
- // Clear
Console.Clear();
-
- // Print current statistics
- Console.WriteLine("======== Per Stats ========");
- Console.WriteLine($"Keys Inserted: {_stats["KeysInserted"]}");
- Console.WriteLine($"Keys Retrieved: {_stats["KeysRetrieved"]}");
- Console.WriteLine($"Keys Expired: {_stats["KeysExpired"]}");
- Console.WriteLine("===========================");
+ PrintProgress();
// Wait before printing again
- await Task.Delay(TimeSpan.FromSeconds(1), ct);
+ await Task.Delay(TimeSpan.FromMilliseconds(StatsPrintIntervalMs), ct);
}
}
catch (OperationCanceledException)
{
- // Ignore cancellation exceptions
+ // Ignore cancellation
}
},
ct);
+
+ private void PrintProgress()
+ {
+ var totalOps = _stats["KeysInserted"] + _stats["KeysRetrieved"] + _stats["KeysRemoved"];
+ var opsPerSecond = (long)(totalOps / _stopwatch.Elapsed.TotalSeconds);
+ Console.WriteLine("========== CodeCargo NatsDistributedCache Performance ==========");
+ Console.WriteLine($"Current Keys: {_activeKeys.Count,FieldWidth:N0}");
+ Console.WriteLine($"Keys Inserted: {_stats["KeysInserted"],FieldWidth:N0}");
+ Console.WriteLine($"Keys Retrieved: {_stats["KeysRetrieved"],FieldWidth:N0}");
+ Console.WriteLine($"Keys Removed: {_stats["KeysRemoved"],FieldWidth:N0}");
+ Console.WriteLine("----------------------------------------------------------");
+ Console.WriteLine($"Cache Hits: {_stats["KeysRetrieved"] - _stats["KeysExpired"],FieldWidth:N0}");
+ Console.WriteLine($"Cache Misses: {_stats["KeysExpired"],FieldWidth:N0}");
+ Console.WriteLine("----------------------------------------------------------");
+ Console.WriteLine($"Elapsed Time: {FormatElapsedTime(_stopwatch.Elapsed),FieldWidth}");
+ Console.WriteLine($"Time Remaining: {FormatElapsedTime(TimeSpan.FromSeconds(MaxTestRuntimeSecs) - _stopwatch.Elapsed),FieldWidth}");
+ Console.WriteLine($"Total operations: {totalOps,FieldWidth:N0}");
+ Console.WriteLine($"Ops per Second: {opsPerSecond,FieldWidth:N0}");
+ Console.WriteLine("==========================================================");
+ }
+
+ private void PrintFinalStats()
+ {
+ var totalOps = _stats["KeysInserted"] + _stats["KeysRetrieved"] + _stats["KeysRemoved"];
+ var opsPerSecond = (long)(totalOps / _stopwatch.Elapsed.TotalSeconds);
+ Console.Clear();
+ Console.WriteLine("========== CodeCargo NatsDistributedCache Test Summary ==========");
+ Console.WriteLine($"Test completed at: {DateTime.Now}");
+ Console.WriteLine($"Total test duration: {FormatElapsedTime(_stopwatch.Elapsed)}");
+ Console.WriteLine("----------------------------------------------------------");
+ Console.WriteLine($"Final Current Keys: {_activeKeys.Count,FieldWidth:N0}");
+ Console.WriteLine($"Total Keys Inserted: {_stats["KeysInserted"],FieldWidth:N0}");
+ Console.WriteLine($"Total Keys Retrieved: {_stats["KeysRetrieved"],FieldWidth:N0}");
+ Console.WriteLine($"Total Keys Removed: {_stats["KeysRemoved"],FieldWidth:N0}");
+ Console.WriteLine("----------------------------------------------------------");
+ Console.WriteLine($"Cache Hits: {_stats["KeysRetrieved"] - _stats["KeysExpired"],FieldWidth:N0}");
+ Console.WriteLine($"Cache Misses: {_stats["KeysExpired"],FieldWidth:N0}");
+ Console.WriteLine("----------------------------------------------------------");
+ Console.WriteLine($"Total operations: {totalOps,FieldWidth:N0}");
+ Console.WriteLine($"Average Ops/Second: {opsPerSecond,FieldWidth:N0}");
+ Console.WriteLine("==========================================================");
+
+ // Also log memory usage
+ var meg = Math.Pow(2, 20);
+ var memoryMiB = Process.GetCurrentProcess().PrivateMemorySize64 / meg;
+ var allocMiB = GC.GetTotalAllocatedBytes() / meg;
+ Console.WriteLine($"Memory Usage MiB: {memoryMiB,FieldWidth:N0}");
+ Console.WriteLine($"Total Alloc MiB: {allocMiB,FieldWidth:N0}");
+ }
}