From ba64234c5f2310428028597e223fff33a4735d01 Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Tue, 13 May 2025 02:16:32 -0400 Subject: [PATCH 1/5] update perf test Signed-off-by: Caleb Lloyd --- .../NatsHybridCacheSerializerFactoryTests.cs | 2 - util/PerfTest/PerfTest.cs | 633 ++++++++---------- util/PerfTest/Program.cs | 2 + util/PerfTest/Utils/StopwatchPool.cs | 70 ++ 4 files changed, 343 insertions(+), 364 deletions(-) create mode 100644 util/PerfTest/Utils/StopwatchPool.cs diff --git a/test/UnitTests/Serialization/NatsHybridCacheSerializerFactoryTests.cs b/test/UnitTests/Serialization/NatsHybridCacheSerializerFactoryTests.cs index c0fb89a..a823224 100644 --- a/test/UnitTests/Serialization/NatsHybridCacheSerializerFactoryTests.cs +++ b/test/UnitTests/Serialization/NatsHybridCacheSerializerFactoryTests.cs @@ -1,8 +1,6 @@ using System.Buffers; -using Microsoft.Extensions.Caching.Hybrid; using Moq; using NATS.Client.Core; -using NATS.Net; namespace CodeCargo.Nats.DistributedCache.UnitTests.Serialization; diff --git a/util/PerfTest/PerfTest.cs b/util/PerfTest/PerfTest.cs index a26eb0d..e6a9376 100644 --- a/util/PerfTest/PerfTest.cs +++ b/util/PerfTest/PerfTest.cs @@ -1,429 +1,338 @@ -using System.Buffers; -using System.Collections.Concurrent; using System.Diagnostics; +using CodeCargo.Nats.DistributedCache.PerfTest.Utils; using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.Logging.Abstractions; -using Microsoft.Extensions.Options; -using NATS.Client.Core; -using NATS.Client.KeyValueStore; -using NATS.Net; namespace CodeCargo.Nats.DistributedCache.PerfTest; -/// -/// Performance test for NatsCache that tracks cache operations -/// public class PerfTest { - // Configuration - private const int MaxTestRuntimeSecs = 60; - private const int NumCacheItems = 10_000; - private const int FieldWidth = 12; - private const int BatchSize = 100; - private const int ValueDataSizeBytes = 256; - private const int AbsoluteExpirationSecs = 10; - private const int InsertDelayMs = 10; - private const int RetrieveDelayMs = 5; - private const int RemoveDelayMs = 50; - private const int StatsUpdateIntervalMs = 500; - private const int StatsPrintIntervalMs = 1000; - - // Dependencies - private readonly INatsConnection _nats; - private readonly NatsCache _cache; - - // Stats tracking - private readonly Stopwatch _stopwatch = new(); - private readonly ConcurrentDictionary _stats = new(); - private readonly List _activeKeys = []; - private readonly Random _random = new(42); // Fixed seed for reproducibility - - // Operation tracking via kvstore watcher - private readonly SemaphoreSlim _watchLock = new(1, 1); - private NatsKVStore? _kvStore; - - public PerfTest(INatsConnection nats) + private const int NumKeys = 100_000; + private const int ValueSizeBytes = 128; + private static readonly int ParallelTasks = Environment.ProcessorCount; + private static readonly TimeSpan ProgressUpdateInterval = TimeSpan.FromMilliseconds(250); + + private readonly IDistributedCache _cache; + private readonly string[] _keys; + private readonly byte[] _valuePayload; + private readonly List _stages = new(); + + public PerfTest(IDistributedCache cache) { - _nats = nats; + _cache = cache; - var options = Options.Create(new NatsCacheOptions + // Pre-generate unique keys + _keys = new string[NumKeys]; + for (var i = 0; i < NumKeys; i++) { - BucketName = "cache" - }); - - _cache = new NatsCache(options, NullLogger.Instance, nats); + _keys[i] = i.ToString(); + } - // Initialize statistics counters - _stats["KeysInserted"] = 0; - _stats["KeysRetrieved"] = 0; - _stats["KeysRemoved"] = 0; - _stats["KeysExpired"] = 0; + // Prepare a sample value payload filled with the character '0' + _valuePayload = new byte[ValueSizeBytes]; + Array.Fill(_valuePayload, (byte)'0'); // Fill with ASCII character '0' (value 48) } - public async Task Run(CancellationToken cancellationToken) + public async Task Run(CancellationToken ct) { - Console.WriteLine($"Starting CodeCargo NatsDistributedCache Performance Test - {DateTime.Now}"); - Console.WriteLine($"Testing with {NumCacheItems} cache items"); - - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(TimeSpan.FromSeconds(MaxTestRuntimeSecs)); - - // Create and connect the watcher to track operations - await InitializeWatcher(cts.Token); - - // Start timer - _stopwatch.Start(); - - // Create tasks for the test operations - var printTask = StartPrintTask(cts.Token); - var watchTask = StartWatchTask(cts.Token); - var insertTask = StartInsertTask(cts.Token); - var retrieveTask = StartRetrieveTask(cts.Token); - var removeTask = StartRemoveTask(cts.Token); + // Clear stages for a new run + _stages.Clear(); - try - { - // Wait for all tasks to complete or cancellation - await Task.WhenAll(watchTask, insertTask, retrieveTask, removeTask); - } - catch (OperationCanceledException) - { - // Expected when test completes - } - finally - { - // Stop stopwatch - _stopwatch.Stop(); + // Run all stages sequentially + await RunStage("Insert", SetWithAbsoluteExpiry, ct); + await RunStage("Get", GetOperation, ct); + await RunStage("Update", SetWithSlidingExpiry, ct); + await RunStage("Get (refresh)", GetOperation, ct); + await RunStage("Delete", DeleteOperation, ct); - // Make sure print task completes - await cts.CancelAsync(); - try - { - await printTask; - } - catch (OperationCanceledException) - { - // Ignore - } + // Final display with results + Console.Clear(); + PrintResults(); + } - PrintFinalStats(); - } + private async Task SetWithAbsoluteExpiry(string key, CancellationToken ct) + { + using var sw = StopwatchPool.Rent(); + var options = new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(10) }; + await _cache.SetAsync(key, _valuePayload, options, ct); + sw.Stop(); + return sw.Elapsed; } - private static string FormatElapsedTime(TimeSpan elapsed) => - $"{Math.Floor(elapsed.TotalMinutes):00}:{elapsed.Seconds:00}.{elapsed.Milliseconds / 10:00}"; + private async Task GetOperation(string key, CancellationToken ct) + { + using var sw = StopwatchPool.Rent(); + await _cache.GetAsync(key, ct); + sw.Stop(); + return sw.Elapsed; + } - private byte[] GenerateRandomData(int size) + private async Task SetWithSlidingExpiry(string key, CancellationToken ct) { - var data = new byte[size]; - _random.NextBytes(data); - return data; + using var sw = StopwatchPool.Rent(); + var options = new DistributedCacheEntryOptions { SlidingExpiration = TimeSpan.FromMinutes(10) }; + await _cache.SetAsync(key, _valuePayload, options, ct); + sw.Stop(); + return sw.Elapsed; } - private async Task InitializeWatcher(CancellationToken ct) + private async Task DeleteOperation(string key, CancellationToken ct) { - await _watchLock.WaitAsync(ct); - try - { - if (_kvStore == null) - { - var jsContext = _nats.CreateJetStreamContext(); - var kvContext = new NatsKVContext(jsContext); - _kvStore = (NatsKVStore)await kvContext.GetStoreAsync("cache", ct); - } - } - finally - { - _watchLock.Release(); - } + using var sw = StopwatchPool.Rent(); + await _cache.RemoveAsync(key, ct); + sw.Stop(); + return sw.Elapsed; } - private Task StartInsertTask(CancellationToken ct) => - Task.Run( + // Core method to run a batch of operations in parallel and collect metrics + private async Task RunStage( + string stageName, + Func> operationFunc, + CancellationToken ct) + { + // Create a new stage and add it to the collection + var stage = new Stage(stageName); + _stages.Add(stage); + + var totalOps = NumKeys; + var completedOps = 0; + + // Start stage timing + stage.StartTiming(); + + // Start a background progress updater task + using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + var progressTask = Task.Run( async () => { - try + while (!cts.Token.IsCancellationRequested) { - for (var i = 0; i < NumCacheItems && !ct.IsCancellationRequested; i++) - { - // Create a batch of items - for (var j = 0; j < BatchSize && i + j < NumCacheItems && !ct.IsCancellationRequested; j++) - { - var key = $"BatchNum{i}IndividualNum{j}"; - var data = GenerateRandomData(ValueDataSizeBytes); - - try - { - var options = new DistributedCacheEntryOptions - { - AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(AbsoluteExpirationSecs), - }; - - await _cache.SetAsync(key, data, options, ct); - - lock (_activeKeys) - { - _activeKeys.Add(key); - } - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - Console.WriteLine($"Insert error: {ex.Message}"); - } - } - - await Task.Delay(InsertDelayMs, ct); - } - } - catch (OperationCanceledException) - { - // Ignore cancellation + var done = Volatile.Read(ref completedOps); + var percent = (done * 100.0) / totalOps; + + // Get a snapshot of the current progress state + var progressLine = $"{stageName}: {done,8:N0}/{totalOps:N0} completed ({percent,6:F2}%)"; + + // First calculate all needed values, then clear and print + Console.Clear(); + + // Print the current results table + PrintResults(); + + // Print progress indicator below table with a newline + Console.WriteLine(); + Console.WriteLine(progressLine); + + if (done >= totalOps) + break; + + await Task.Delay(ProgressUpdateInterval, cts.Token).ConfigureAwait(false); } }, - ct); + cts.Token); - private Task StartRetrieveTask(CancellationToken ct) => - Task.Run( - async () => + // Launch parallel worker tasks + var tasks = new Task[ParallelTasks]; + var nextKeyIndex = 0; + for (var t = 0; t < ParallelTasks; t++) + { + // Determine the range of keys this task will handle + var chunkSize = totalOps / ParallelTasks; + if (t < totalOps % ParallelTasks) { - try - { - // Wait a bit before starting retrieval operations - await Task.Delay(1000, ct); + // distribute remainder keys one per task (for first few tasks) + chunkSize++; + } + + var startIndex = nextKeyIndex; + var endIndex = startIndex + chunkSize; + nextKeyIndex = endIndex; - while (!ct.IsCancellationRequested) + tasks[t] = Task.Run( + async () => + { + for (var i = startIndex; i < endIndex && !ct.IsCancellationRequested; i++) { - await ProcessSingleRetrievalAsync(ct); - await Task.Delay(RetrieveDelayMs, ct); + // Perform the operation and record its duration + var elapsed = await operationFunc(_keys[i], ct).ConfigureAwait(false); + + // Add the operation duration to the stage + stage.AddOperationDuration(elapsed); + + // Atomically increment the completed count for progress tracking + Interlocked.Increment(ref completedOps); } - } - catch (OperationCanceledException) - { - // Ignore cancellation - } - }, - ct); + }, + ct); + } - private async Task ProcessSingleRetrievalAsync(CancellationToken ct) - { - var (key, index) = GetRandomActiveKey(); - if (key == null) - return; + // Wait for all tasks to finish + await Task.WhenAll(tasks); + stage.StopTiming(); + // Ensure progress task exits and wait for it + // Mark all ops completed for progress loop, in case it hasn't updated the final state yet + Volatile.Write(ref completedOps, totalOps); + await cts.CancelAsync(); try { - var result = await _cache.GetAsync(key, ct); - _stats.AddOrUpdate("KeysRetrieved", 1, (_, count) => count + 1); - - if (result == null) - { - _stats.AddOrUpdate("KeysExpired", 1, (_, count) => count + 1); - RemoveExpiredKeyFromActiveList(index); - } + await progressTask; } - catch (Exception ex) when (ex is not OperationCanceledException) + catch (OperationCanceledException) { - Console.WriteLine($"Retrieve error: {ex.Message}"); } } - private (string? Key, int Index) GetRandomActiveKey() + // Print the current results table + private void PrintResults(bool clearScreen = false) { - lock (_activeKeys) + if (clearScreen) { - if (_activeKeys.Count == 0) - return (null, -1); + Console.Clear(); + } - var index = _random.Next(_activeKeys.Count); - return (_activeKeys[index], index); + // Define table width constants + const int stageWidth = 15; + const int opsWidth = 12; + const int dataWidth = 10; + const int durationWidth = 12; + const int p50Width = 10; + const int p95Width = 10; + const int p99Width = 10; + const int totalWidth = stageWidth + 1 + opsWidth + 1 + dataWidth + 1 + durationWidth + 1 + p50Width + 1 + + p95Width + 1 + p99Width; + + // Print header + Console.WriteLine( + "{0,-" + stageWidth + "} {1," + opsWidth + "} {2," + dataWidth + "} {3," + durationWidth + "} {4," + + p50Width + "} {5," + p95Width + "} {6," + p99Width + "}", + "Stage", + "Operations", + "Data (MiB)", + "Duration (s)", + "P50 (ms)", + "P95 (ms)", + "P99 (ms)"); + Console.WriteLine(new string('-', totalWidth)); + + // Print each stage result in aligned columns + long totalOperations = 0; + double totalDuration = 0; + double totalDataMiB = 0; + + foreach (var stage in _stages) + { + var operations = stage.Operations; + totalOperations += operations; + totalDuration += stage.Duration.TotalSeconds; + + // Calculate data size in MiB + var dataMiB = (operations * (long)ValueSizeBytes) / (1024.0 * 1024.0); + totalDataMiB += dataMiB; + + Console.WriteLine( + "{0,-" + stageWidth + "} {1," + opsWidth + ":N0} {2," + dataWidth + ":F2} {3," + durationWidth + + ":F2} {4," + p50Width + ":F2} {5," + p95Width + ":F2} {6," + p99Width + ":F2}", + stage.Name, + operations, + dataMiB, + stage.Duration.TotalSeconds, + stage.GetPercentile(50), + stage.GetPercentile(95), + stage.GetPercentile(99)); } - } - private void RemoveExpiredKeyFromActiveList(int index) - { - lock (_activeKeys) + // Add totals row if we have results + if (_stages.Count > 0) { - if (index >= 0 && index < _activeKeys.Count) - { - _activeKeys.RemoveAt(index); - } + // Print totals row + Console.WriteLine(new string('-', totalWidth)); + Console.WriteLine( + "{0,-" + stageWidth + "} {1," + opsWidth + ":N0} {2," + dataWidth + ":F2} {3," + durationWidth + + ":F2} {4," + p50Width + "} {5," + p95Width + "} {6," + p99Width + "}", + "Total", + totalOperations, + totalDataMiB, + totalDuration, + string.Empty, + string.Empty, + string.Empty); + Console.WriteLine(new string('-', totalWidth)); } } - private Task StartRemoveTask(CancellationToken ct) => - Task.Run( - async () => - { - try - { - // Wait a bit before starting removal operations - await Task.Delay(2000, ct); - - while (!ct.IsCancellationRequested) - { - string? key = null; - - lock (_activeKeys) - { - if (_activeKeys.Count > 0) - { - var index = _random.Next(_activeKeys.Count); - key = _activeKeys[index]; - _activeKeys.RemoveAt(index); - } - } - - if (key != null) - { - try - { - await _cache.RemoveAsync(key, ct); - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - Console.WriteLine($"Remove error: {ex.Message}"); - } - } - - await Task.Delay(RemoveDelayMs, ct); - } - } - catch (OperationCanceledException) - { - // Ignore cancellation - } - }, - ct); - - private Task StartWatchTask(CancellationToken ct) => - Task.Run( - async () => - { - try - { - if (_kvStore == null) - { - Console.WriteLine("KV Store not initialized"); - return; - } - - var opsBuffer = new List(); - var statsUpdateInterval = TimeSpan.FromMilliseconds(StatsUpdateIntervalMs); - var lastStatsUpdate = DateTimeOffset.Now; - - await foreach (var entry in _kvStore.WatchAsync>( - ">", - opts: new NatsKVWatchOpts { MetaOnly = true }, - cancellationToken: ct)) - { - opsBuffer.Add(entry.Operation); - if (DateTimeOffset.Now - lastStatsUpdate <= statsUpdateInterval) - { - await UpdateStatsIfNeeded(opsBuffer); - lastStatsUpdate = DateTimeOffset.Now; - } - } - } - catch (OperationCanceledException) - { - // Ignore cancellation - } - catch (Exception ex) - { - Console.WriteLine($"Watch error: {ex.Message}"); - } - }, - ct); - - private Task UpdateStatsIfNeeded(List opsBuffer) + /// + /// Represents a stage in the performance test with real-time statistics + /// + private class Stage { - // Process the buffer and update stats - // Read operations are tracked directly in StartRetrieveTask - var puts = opsBuffer.Count(op => op == NatsKVOperation.Put); - var deletes = opsBuffer.Count(op => op == NatsKVOperation.Del); + private readonly Stopwatch _duration = new(); + private readonly SortedSet _sortedTicks = new(); + private readonly object _syncLock = new(); + private int _totalOps = 0; - if (puts > 0) + public Stage(string name) { - _stats.AddOrUpdate("KeysInserted", puts, (_, count) => count + puts); + Name = name; } - if (deletes > 0) + /// + /// Gets the name of this stage + /// + public string Name { get; } + + /// + /// Gets the number of operations completed so far + /// + public int Operations => _totalOps; + + /// + /// Gets the total duration of this stage + /// + public TimeSpan Duration => _duration.Elapsed; + + /// + /// Starts timing this stage + /// + public void StartTiming() => _duration.Start(); + + /// + /// Stops timing this stage + /// + public void StopTiming() => _duration.Stop(); + + /// + /// Adds a single operation duration to the statistics + /// + public void AddOperationDuration(TimeSpan duration) { - _stats.AddOrUpdate("KeysRemoved", deletes, (_, count) => count + deletes); - } - - opsBuffer.Clear(); - return Task.CompletedTask; - } + // Store ticks instead of TimeSpan to reduce memory overhead + var ticks = duration.Ticks; - private Task StartPrintTask(CancellationToken ct) => - Task.Run( - async () => + lock (_syncLock) { - try - { - while (!ct.IsCancellationRequested) - { - Console.Clear(); - PrintProgress(); + _sortedTicks.Add(ticks); + _totalOps++; + } + } - // Wait before printing again - await Task.Delay(TimeSpan.FromMilliseconds(StatsPrintIntervalMs), ct); - } - } - catch (OperationCanceledException) - { - // Ignore cancellation - } - }, - ct); + /// + /// Gets the requested percentile of operation durations in milliseconds + /// + public double GetPercentile(double percentile) + { + lock (_syncLock) + { + // If we have no operations yet, return 0 + if (_sortedTicks.Count == 0) + return 0; - private void PrintProgress() - { - var totalOps = _stats["KeysInserted"] + _stats["KeysRetrieved"] + _stats["KeysRemoved"]; - var opsPerSecond = (long)(totalOps / _stopwatch.Elapsed.TotalSeconds); - Console.WriteLine("========== CodeCargo NatsDistributedCache Performance =========="); - Console.WriteLine($"Current Keys: {_activeKeys.Count,FieldWidth:N0}"); - Console.WriteLine($"Keys Inserted: {_stats["KeysInserted"],FieldWidth:N0}"); - Console.WriteLine($"Keys Retrieved: {_stats["KeysRetrieved"],FieldWidth:N0}"); - Console.WriteLine($"Keys Removed: {_stats["KeysRemoved"],FieldWidth:N0}"); - Console.WriteLine("----------------------------------------------------------"); - Console.WriteLine($"Cache Hits: {_stats["KeysRetrieved"] - _stats["KeysExpired"],FieldWidth:N0}"); - Console.WriteLine($"Cache Misses: {_stats["KeysExpired"],FieldWidth:N0}"); - Console.WriteLine("----------------------------------------------------------"); - Console.WriteLine($"Elapsed Time: {FormatElapsedTime(_stopwatch.Elapsed),FieldWidth}"); - Console.WriteLine($"Time Remaining: {FormatElapsedTime(TimeSpan.FromSeconds(MaxTestRuntimeSecs) - _stopwatch.Elapsed),FieldWidth}"); - Console.WriteLine($"Total operations: {totalOps,FieldWidth:N0}"); - Console.WriteLine($"Ops per Second: {opsPerSecond,FieldWidth:N0}"); - Console.WriteLine("=========================================================="); - } + // Calculate the index for the requested percentile + var idx = (int)Math.Ceiling(percentile / 100.0 * _sortedTicks.Count) - 1; + idx = Math.Max(0, Math.Min(idx, _sortedTicks.Count - 1)); - private void PrintFinalStats() - { - var totalOps = _stats["KeysInserted"] + _stats["KeysRetrieved"] + _stats["KeysRemoved"]; - var opsPerSecond = (long)(totalOps / _stopwatch.Elapsed.TotalSeconds); - Console.Clear(); - Console.WriteLine("========== CodeCargo NatsDistributedCache Test Summary =========="); - Console.WriteLine($"Test completed at: {DateTime.Now}"); - Console.WriteLine($"Total test duration: {FormatElapsedTime(_stopwatch.Elapsed)}"); - Console.WriteLine("----------------------------------------------------------"); - Console.WriteLine($"Final Current Keys: {_activeKeys.Count,FieldWidth:N0}"); - Console.WriteLine($"Total Keys Inserted: {_stats["KeysInserted"],FieldWidth:N0}"); - Console.WriteLine($"Total Keys Retrieved: {_stats["KeysRetrieved"],FieldWidth:N0}"); - Console.WriteLine($"Total Keys Removed: {_stats["KeysRemoved"],FieldWidth:N0}"); - Console.WriteLine("----------------------------------------------------------"); - Console.WriteLine($"Cache Hits: {_stats["KeysRetrieved"] - _stats["KeysExpired"],FieldWidth:N0}"); - Console.WriteLine($"Cache Misses: {_stats["KeysExpired"],FieldWidth:N0}"); - Console.WriteLine("----------------------------------------------------------"); - Console.WriteLine($"Total operations: {totalOps,FieldWidth:N0}"); - Console.WriteLine($"Average Ops/Second: {opsPerSecond,FieldWidth:N0}"); - Console.WriteLine("=========================================================="); - - // Also log memory usage - var meg = Math.Pow(2, 20); - var memoryMiB = Process.GetCurrentProcess().PrivateMemorySize64 / meg; - var allocMiB = GC.GetTotalAllocatedBytes() / meg; - Console.WriteLine($"Memory Usage MiB: {memoryMiB,FieldWidth:N0}"); - Console.WriteLine($"Total Alloc MiB: {allocMiB,FieldWidth:N0}"); + // Get the value at the specified index + return new TimeSpan(_sortedTicks.ElementAt(idx)).TotalMilliseconds; + } + } } } diff --git a/util/PerfTest/Program.cs b/util/PerfTest/Program.cs index 4e54053..24c5e4b 100644 --- a/util/PerfTest/Program.cs +++ b/util/PerfTest/Program.cs @@ -1,5 +1,6 @@ using Aspire.Hosting.ApplicationModel; using Aspire.Hosting.Testing; +using CodeCargo.Nats.DistributedCache; using CodeCargo.Nats.DistributedCache.PerfTest; using CodeCargo.Nats.DistributedCache.TestUtils; using Microsoft.Extensions.DependencyInjection; @@ -40,6 +41,7 @@ builder.ConfigureServices(services => { services.AddNatsTestClient(natsConnectionString); + services.AddNatsDistributedCache(options => options.BucketName = "cache"); services.AddScoped(); }); diff --git a/util/PerfTest/Utils/StopwatchPool.cs b/util/PerfTest/Utils/StopwatchPool.cs new file mode 100644 index 0000000..7a8896b --- /dev/null +++ b/util/PerfTest/Utils/StopwatchPool.cs @@ -0,0 +1,70 @@ +using System.Buffers; +using System.Diagnostics; + +namespace CodeCargo.Nats.DistributedCache.PerfTest.Utils +{ + /// + /// Provides a pooled Stopwatch to reduce allocations in performance-critical code + /// + public static class StopwatchPool + { + private static readonly ArrayPool _pool = ArrayPool.Shared; + + /// + /// Rents a stopwatch from the pool and starts it + /// + /// A PooledStopwatch that wraps the timer and handles returning it to the pool + public static PooledStopwatch Rent() + { + // Get a single stopwatch from the pool + var stopwatches = _pool.Rent(1); + var stopwatch = stopwatches[0] ?? new Stopwatch(); + + // Reset and start + stopwatch.Reset(); + stopwatch.Start(); + + return new PooledStopwatch(stopwatch, stopwatches); + } + + /// + /// A disposable wrapper for a pooled Stopwatch + /// + public readonly struct PooledStopwatch : IDisposable + { + private readonly Stopwatch _stopwatch; + private readonly Stopwatch[] _array; + + internal PooledStopwatch(Stopwatch stopwatch, Stopwatch[] array) + { + _stopwatch = stopwatch; + _array = array; + } + + /// + /// Gets the elapsed time of the stopwatch + /// + public TimeSpan Elapsed => _stopwatch.Elapsed; + + /// + /// Stops the timer + /// + public void Stop() => _stopwatch.Stop(); + + /// + /// Returns the stopwatch to the pool + /// + public void Dispose() + { + // Ensure stopwatch is stopped + _stopwatch.Stop(); + + // Store back in array at index 0 + _array[0] = _stopwatch; + + // Return array to pool + _pool.Return(_array); + } + } + } +} From 7da87a41e4cd503a3dcbc0e501ede0064440dca2 Mon Sep 17 00:00:00 2001 From: SuddyN Date: Tue, 13 May 2025 10:19:44 -0400 Subject: [PATCH 2/5] reduce NumKeys --- util/PerfTest/PerfTest.cs | 5 +++-- util/PerfTest/Program.cs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/util/PerfTest/PerfTest.cs b/util/PerfTest/PerfTest.cs index e6a9376..f2ffaec 100644 --- a/util/PerfTest/PerfTest.cs +++ b/util/PerfTest/PerfTest.cs @@ -6,7 +6,7 @@ namespace CodeCargo.Nats.DistributedCache.PerfTest; public class PerfTest { - private const int NumKeys = 100_000; + private const int NumKeys = 20_000; private const int ValueSizeBytes = 128; private static readonly int ParallelTasks = Environment.ProcessorCount; private static readonly TimeSpan ProgressUpdateInterval = TimeSpan.FromMilliseconds(250); @@ -179,6 +179,7 @@ private async Task RunStage( } catch (OperationCanceledException) { + // ignore OperationCanceledException } } @@ -263,7 +264,7 @@ private void PrintResults(bool clearScreen = false) /// /// Represents a stage in the performance test with real-time statistics /// - private class Stage + private sealed class Stage { private readonly Stopwatch _duration = new(); private readonly SortedSet _sortedTicks = new(); diff --git a/util/PerfTest/Program.cs b/util/PerfTest/Program.cs index 24c5e4b..910b80e 100644 --- a/util/PerfTest/Program.cs +++ b/util/PerfTest/Program.cs @@ -60,7 +60,7 @@ await kv.CreateOrUpdateStoreAsync( // Run the host Console.WriteLine("Starting app..."); -var appCts = new CancellationTokenSource(); +using var appCts = new CancellationTokenSource(); var appTask = Task.Run(async () => { try @@ -121,7 +121,7 @@ await kv.CreateOrUpdateStoreAsync( } catch (Exception ex) { - Console.Error.WriteLine($"Error stopping application: {ex.Message}"); + await Console.Error.WriteLineAsync($"Error stopping application: {ex.Message}"); } await aspireApp.DisposeAsync(); From 19794ea7111651b01af3ade663431e65d2574eab Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Tue, 13 May 2025 10:30:37 -0400 Subject: [PATCH 3/5] use memory stream Signed-off-by: Caleb Lloyd --- dev/nats-integration/config/nats.conf | 2 +- test/IntegrationTests/NatsIntegrationFixture.cs | 5 ++++- util/PerfTest/Program.cs | 9 ++++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/dev/nats-integration/config/nats.conf b/dev/nats-integration/config/nats.conf index b1a8b10..d3c94c4 100644 --- a/dev/nats-integration/config/nats.conf +++ b/dev/nats-integration/config/nats.conf @@ -4,6 +4,6 @@ http_port: 8222 jetstream { store_dir: "/data" - max_memory_store: 0 + max_memory_store: 1GB max_file_store: 10GB } diff --git a/test/IntegrationTests/NatsIntegrationFixture.cs b/test/IntegrationTests/NatsIntegrationFixture.cs index 3ac3f33..e4bee79 100644 --- a/test/IntegrationTests/NatsIntegrationFixture.cs +++ b/test/IntegrationTests/NatsIntegrationFixture.cs @@ -65,7 +65,10 @@ public async ValueTask InitializeAsync() // create the KV store var kvContext = NatsConnection.CreateKeyValueStoreContext(); await kvContext.CreateOrUpdateStoreAsync( - new NatsKVConfig("cache") { LimitMarkerTTL = TimeSpan.FromSeconds(1), }, + new NatsKVConfig("cache") + { + LimitMarkerTTL = TimeSpan.FromSeconds(1), Storage = NatsKVStorageType.Memory + }, TestContext.Current.CancellationToken); } diff --git a/util/PerfTest/Program.cs b/util/PerfTest/Program.cs index 910b80e..ceac039 100644 --- a/util/PerfTest/Program.cs +++ b/util/PerfTest/Program.cs @@ -6,6 +6,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NATS.Client.Core; +using NATS.Client.JetStream.Models; using NATS.Client.KeyValueStore; using NATS.Net; @@ -54,8 +55,14 @@ var nats = host.Services.GetRequiredService(); var kv = nats.CreateKeyValueStoreContext(); await kv.CreateOrUpdateStoreAsync( - new NatsKVConfig("cache") { LimitMarkerTTL = TimeSpan.FromSeconds(1) }, + new NatsKVConfig("cache") + { + LimitMarkerTTL = TimeSpan.FromSeconds(1), Storage = NatsKVStorageType.Memory + }, startupCts.Token); +await nats + .CreateJetStreamContext() + .PurgeStreamAsync("KV_cache", new StreamPurgeRequest(), startupCts.Token); Console.WriteLine("KV store created"); // Run the host From 6291a90e0745004d7cb831d0c4da79006c8504bd Mon Sep 17 00:00:00 2001 From: SuddyN Date: Tue, 13 May 2025 10:44:08 -0400 Subject: [PATCH 4/5] dotnet format --- util/PerfTest/Program.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/util/PerfTest/Program.cs b/util/PerfTest/Program.cs index ceac039..bb8c6ab 100644 --- a/util/PerfTest/Program.cs +++ b/util/PerfTest/Program.cs @@ -57,7 +57,8 @@ await kv.CreateOrUpdateStoreAsync( new NatsKVConfig("cache") { - LimitMarkerTTL = TimeSpan.FromSeconds(1), Storage = NatsKVStorageType.Memory + LimitMarkerTTL = TimeSpan.FromSeconds(1), + Storage = NatsKVStorageType.Memory }, startupCts.Token); await nats From 4d3d19e4feb52738e4b5545a49ff4ce76bca547b Mon Sep 17 00:00:00 2001 From: Caleb Lloyd Date: Tue, 13 May 2025 11:01:39 -0400 Subject: [PATCH 5/5] fmt Signed-off-by: Caleb Lloyd --- test/IntegrationTests/NatsIntegrationFixture.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/IntegrationTests/NatsIntegrationFixture.cs b/test/IntegrationTests/NatsIntegrationFixture.cs index e4bee79..1a079f5 100644 --- a/test/IntegrationTests/NatsIntegrationFixture.cs +++ b/test/IntegrationTests/NatsIntegrationFixture.cs @@ -67,7 +67,8 @@ public async ValueTask InitializeAsync() await kvContext.CreateOrUpdateStoreAsync( new NatsKVConfig("cache") { - LimitMarkerTTL = TimeSpan.FromSeconds(1), Storage = NatsKVStorageType.Memory + LimitMarkerTTL = TimeSpan.FromSeconds(1), + Storage = NatsKVStorageType.Memory }, TestContext.Current.CancellationToken); }