diff --git a/.gitignore b/.gitignore
index 3388739..8bf07b5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,9 @@
# User settings
*.DotSettings.user
*.sln.DotSettings
+
+# JetBrains
+.idea
+
+# VSCode
+.vscode
diff --git a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/.gitignore b/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/.gitignore
deleted file mode 100644
index 33d7e19..0000000
--- a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/.gitignore
+++ /dev/null
@@ -1,13 +0,0 @@
-# Default ignored files
-/shelf/
-/workspace.xml
-# Rider ignored files
-/contentModel.xml
-/projectSettingsUpdater.xml
-/modules.xml
-/.idea.CodeCargo.NatsDistributedCache.iml
-# Editor-based HTTP Client requests
-/httpRequests/
-# Datasource local storage ignored files
-/dataSources/
-/dataSources.local.xml
diff --git a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/encodings.xml b/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/encodings.xml
deleted file mode 100644
index df87cf9..0000000
--- a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/encodings.xml
+++ /dev/null
@@ -1,4 +0,0 @@
-
-
-
-
\ No newline at end of file
diff --git a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/indexLayout.xml b/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/indexLayout.xml
deleted file mode 100644
index 7b08163..0000000
--- a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/indexLayout.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/vcs.xml b/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/vcs.xml
deleted file mode 100644
index 94a25f7..0000000
--- a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/vcs.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/src/CodeCargo.NatsDistributedCache/NatsCache.Log.cs b/src/CodeCargo.NatsDistributedCache/NatsCache.Log.cs
index 2784715..6560393 100644
--- a/src/CodeCargo.NatsDistributedCache/NatsCache.Log.cs
+++ b/src/CodeCargo.NatsDistributedCache/NatsCache.Log.cs
@@ -1,45 +1,43 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
-using System;
using Microsoft.Extensions.Logging;
-namespace CodeCargo.NatsDistributedCache
+namespace CodeCargo.NatsDistributedCache;
+
+public partial class NatsCache
{
- public partial class NatsCache
+ private void LogException(Exception exception)
+ {
+ _logger.LogError(EventIds.Exception, exception, "An exception occurred in NATS KV store.");
+ }
+
+ private void LogConnectionError(Exception exception)
+ {
+ _logger.LogError(EventIds.ConnectionError, exception, "Error connecting to NATS KV store.");
+ }
+
+ private void LogConnectionIssue()
+ {
+ _logger.LogWarning(EventIds.ConnectionIssue, "Connection issue with NATS KV store.");
+ }
+
+ private void LogConnected()
+ {
+ _logger.LogInformation(EventIds.Connected, "Connected to NATS KV store.");
+ }
+
+ private void LogUpdateFailed(string key)
+ {
+ _logger.LogDebug(EventIds.UpdateFailed, "Sliding expiration update failed for key {Key} due to optimistic concurrency control", key);
+ }
+
+ private static class EventIds
{
- private static class EventIds
- {
- public static readonly EventId ConnectionIssue = new EventId(100, nameof(ConnectionIssue));
- public static readonly EventId ConnectionError = new EventId(101, nameof(ConnectionError));
- public static readonly EventId Connected = new EventId(102, nameof(Connected));
- public static readonly EventId UpdateFailed = new EventId(103, nameof(UpdateFailed));
- public static readonly EventId Exception = new EventId(104, nameof(Exception));
- }
-
- private void LogException(Exception exception)
- {
- _logger.LogError(EventIds.Exception, exception, "An exception occurred in NATS KV store.");
- }
-
- private void LogConnectionError(Exception exception)
- {
- _logger.LogError(EventIds.ConnectionError, exception, "Error connecting to NATS KV store.");
- }
-
- private void LogConnectionIssue()
- {
- _logger.LogWarning(EventIds.ConnectionIssue, "Connection issue with NATS KV store.");
- }
-
- private void LogConnected()
- {
- _logger.LogInformation(EventIds.Connected, "Connected to NATS KV store.");
- }
-
- private void LogUpdateFailed(string key)
- {
- _logger.LogDebug(EventIds.UpdateFailed, "Sliding expiration update failed for key {Key} due to optimistic concurrency control", key);
- }
+ public static readonly EventId ConnectionIssue = new EventId(100, nameof(ConnectionIssue));
+ public static readonly EventId ConnectionError = new EventId(101, nameof(ConnectionError));
+ public static readonly EventId Connected = new EventId(102, nameof(Connected));
+ public static readonly EventId UpdateFailed = new EventId(103, nameof(UpdateFailed));
+ public static readonly EventId Exception = new EventId(104, nameof(Exception));
}
}
diff --git a/src/CodeCargo.NatsDistributedCache/NatsCache.cs b/src/CodeCargo.NatsDistributedCache/NatsCache.cs
index 76b5270..8afbab7 100644
--- a/src/CodeCargo.NatsDistributedCache/NatsCache.cs
+++ b/src/CodeCargo.NatsDistributedCache/NatsCache.cs
@@ -1,783 +1,427 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
-using System;
using System.Buffers;
-using System.Diagnostics;
-using System.IO;
-using System.Text;
-using System.Text.Json;
using System.Text.Json.Serialization;
-using System.Threading;
-using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using NATS.Client.Core;
-using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;
using NATS.Net;
-namespace CodeCargo.NatsDistributedCache
+namespace CodeCargo.NatsDistributedCache;
+
+///
+/// Cache entry for storing in NATS Key-Value Store
+///
+public class CacheEntry
{
- [JsonSerializable(typeof(CacheEntry))]
- internal partial class CacheEntryJsonContext : JsonSerializerContext
- {
- }
+ [JsonPropertyName("absexp")]
+ public string? AbsoluteExpiration { get; set; }
- ///
- /// Cache entry for storing in NATS Key-Value Store
- ///
- public class CacheEntry
+ [JsonPropertyName("sldexp")]
+ public string? SlidingExpiration { get; set; }
+
+ [JsonPropertyName("data")]
+ public byte[]? Data { get; set; }
+}
+
+///
+/// Distributed cache implementation using NATS Key-Value Store.
+///
+public partial class NatsCache : IBufferDistributedCache, IDisposable
+{
+ // Static JSON serializer for CacheEntry
+ private static readonly NatsJsonContextSerializer _cacheEntrySerializer =
+ new NatsJsonContextSerializer(CacheEntryJsonContext.Default);
+
+ private readonly ILogger _logger;
+ private readonly NatsCacheOptions _options;
+ private readonly string _instanceName;
+ private readonly INatsConnection _natsConnection;
+ private readonly SemaphoreSlim _connectionLock = new(initialCount: 1, maxCount: 1);
+ private NatsKVStore? _kvStore;
+ private bool _disposed;
+
+ public NatsCache(
+ IOptions optionsAccessor,
+ ILogger logger,
+ INatsConnection natsConnection)
{
- [JsonPropertyName("absexp")]
- public string? AbsoluteExpiration { get; set; }
+ ArgumentNullException.ThrowIfNull(optionsAccessor);
+ ArgumentNullException.ThrowIfNull(natsConnection);
- [JsonPropertyName("sldexp")]
- public string? SlidingExpiration { get; set; }
+ _options = optionsAccessor.Value;
+ _logger = logger;
+ _natsConnection = natsConnection;
+ _instanceName = _options.InstanceName ?? string.Empty;
- [JsonPropertyName("data")]
- public byte[]? Data { get; set; }
+ // No need to connect immediately; will connect on-demand
}
- ///
- /// Distributed cache implementation using NATS Key-Value Store.
- ///
- public partial class NatsCache : IBufferDistributedCache, IDisposable
+ public NatsCache(IOptions optionsAccessor, INatsConnection natsConnection)
+ : this(optionsAccessor, NullLogger.Instance, natsConnection)
{
- private const string AbsoluteExpirationKey = "absexp";
- private const string SlidingExpirationKey = "sldexp";
- private const string DataKey = "data";
-
- // combined keys - same hash keys fetched constantly; avoid allocating an array each time
- private static readonly string[] GetHashFieldsNoData = new[] { AbsoluteExpirationKey, SlidingExpirationKey };
-
- private static readonly string[] GetHashFieldsWithData =
- new[] { AbsoluteExpirationKey, SlidingExpirationKey, DataKey };
-
- // Static JSON serializer for CacheEntry
- private static readonly NatsJsonContextSerializer _cacheEntrySerializer =
- new NatsJsonContextSerializer(CacheEntryJsonContext.Default);
-
- private readonly ILogger _logger;
- private readonly NatsCacheOptions _options;
- private readonly string _instanceName;
- private readonly INatsConnection _natsConnection;
- private NatsKVStore? _kvStore;
- private bool _disposed;
- private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
-
- public NatsCache(IOptions optionsAccessor, ILogger logger,
- INatsConnection natsConnection)
- {
- if (optionsAccessor == null)
- {
- throw new ArgumentNullException(nameof(optionsAccessor));
- }
+ }
- _options = optionsAccessor.Value;
- _logger = logger;
- _natsConnection = natsConnection ?? throw new ArgumentNullException(nameof(natsConnection));
- _instanceName = _options.InstanceName ?? string.Empty;
+ ///
+ public void Set(string key, byte[] value, DistributedCacheEntryOptions options) => SetAsync(key, value, options).GetAwaiter().GetResult();
- // No need to connect immediately; will connect on-demand
- }
+ ///
+ public async Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token = default)
+ {
+ ArgumentNullException.ThrowIfNull(key);
+ ArgumentNullException.ThrowIfNull(value);
+ ArgumentNullException.ThrowIfNull(options);
+ token.ThrowIfCancellationRequested();
+
+ var ttl = GetTtl(options);
+ var entry = CreateCacheEntry(value, options);
+ var kvStore = await GetKVStore().ConfigureAwait(false);
- public NatsCache(IOptions optionsAccessor, INatsConnection natsConnection)
- : this(optionsAccessor, NullLogger.Instance, natsConnection)
+ try
{
+ await kvStore.PutAsync(GetKeyPrefix(key), entry, ttl ?? default, _cacheEntrySerializer, token).ConfigureAwait(false);
}
-
- // This is the method used by hybrid caching to determine if it should use the distributed instance
- internal virtual bool IsHybridCacheActive() => false;
-
- private string GetKeyPrefix(string key)
+ catch (Exception ex)
{
- return string.IsNullOrEmpty(_instanceName)
- ? key
- : _instanceName + ":" + key;
+ LogException(ex);
+ throw;
}
+ }
- ///
- /// Gets or sets a value with the given key.
- ///
- /// The key to get the value for.
- /// The value for the given key, or null if not found.
- public byte[]? Get(string key)
- {
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
+ ///
+ public void Set(string key, ReadOnlySequence value, DistributedCacheEntryOptions options) => SetAsync(key, value, options).GetAwaiter().GetResult();
- return GetAndRefresh(key, getData: true);
- }
+ ///
+ public async ValueTask SetAsync(string key, ReadOnlySequence value, DistributedCacheEntryOptions options, CancellationToken token = default)
+ {
+ ArgumentNullException.ThrowIfNull(key);
+ ArgumentNullException.ThrowIfNull(options);
+ token.ThrowIfCancellationRequested();
- ///
- /// Asynchronously gets or sets a value with the given key.
- ///
- /// The key to get the value for.
- /// Optional. A to cancel the operation.
- /// The value for the given key, or null if not found.
- public async Task GetAsync(string key, CancellationToken token = default)
- {
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
+ var array = value.IsSingleSegment ? value.First.ToArray() : value.ToArray();
+ await SetAsync(key, array, options, token).ConfigureAwait(false);
+ }
- token.ThrowIfCancellationRequested();
+ ///
+ public void Remove(string key) => RemoveAsync(key, null, default).GetAwaiter().GetResult();
- return await GetAndRefreshAsync(key, getData: true, token: token).ConfigureAwait(false);
- }
+ ///
+ public async Task RemoveAsync(string key, CancellationToken token = default) => await RemoveAsync(key, null, token).ConfigureAwait(false);
- ///
- /// Sets a value with the given key.
- ///
- /// The key to set the value for.
- /// The value to set.
- /// The cache options for the value.
- public void Set(string key, byte[] value, DistributedCacheEntryOptions options)
- {
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
+ ///
+ /// Removes the value with the given key.
+ ///
+ /// A string identifying the requested value.
+ /// Nats KV delete options
+ /// Optional. The used to propagate notifications that the operation should be canceled.
+ /// The that represents the asynchronous operation.
+ public async Task RemoveAsync(string key, NatsKVDeleteOpts? natsKVDeleteOpts = null, CancellationToken token = default)
+ {
+ ArgumentNullException.ThrowIfNull(key);
+ token.ThrowIfCancellationRequested();
- if (value == null)
- {
- throw new ArgumentNullException(nameof(value));
- }
+ var kvStore = await GetKVStore().ConfigureAwait(false);
+ await kvStore.DeleteAsync(GetKeyPrefix(key), natsKVDeleteOpts, cancellationToken: token).ConfigureAwait(false);
+ }
- if (options == null)
- {
- throw new ArgumentNullException(nameof(options));
- }
+ ///
+ public void Refresh(string key) => RefreshAsync(key).GetAwaiter().GetResult();
- var kvStore = GetKVStore().GetAwaiter().GetResult();
- var ttl = GetTTL(options);
- var entry = CreateCacheEntry(key, value, options);
+ ///
+ public async Task RefreshAsync(string key, CancellationToken token = default)
+ {
+ ArgumentNullException.ThrowIfNull(key);
+ token.ThrowIfCancellationRequested();
- try
- {
- if (ttl.HasValue)
- {
- kvStore.PutAsync(GetKeyPrefix(key), entry, ttl.Value, serializer: _cacheEntrySerializer, default).GetAwaiter()
- .GetResult();
- }
- else
- {
- kvStore.PutAsync(GetKeyPrefix(key), entry, serializer: _cacheEntrySerializer, default).GetAwaiter().GetResult();
- }
- }
- catch (Exception ex)
- {
- LogException(ex);
- throw;
- }
- }
+ await GetAndRefreshAsync(key, getData: false, retry: true, token: token).ConfigureAwait(false);
+ }
- ///
- /// Asynchronously sets a value with the given key.
- ///
- /// The key to set the value for.
- /// The value to set.
- /// The cache options for the value.
- /// Optional. A to cancel the operation.
- /// A that represents the asynchronous set operation.
- public async Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options,
- CancellationToken token = default)
- {
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
+ ///
+ public byte[]? Get(string key) => GetAsync(key).GetAwaiter().GetResult();
- if (value == null)
- {
- throw new ArgumentNullException(nameof(value));
- }
+ ///
+ public async Task GetAsync(string key, CancellationToken token = default)
+ {
+ ArgumentNullException.ThrowIfNull(key);
+ token.ThrowIfCancellationRequested();
- if (options == null)
- {
- throw new ArgumentNullException(nameof(options));
- }
+ return await GetAndRefreshAsync(key, getData: true, retry: true, token: token).ConfigureAwait(false);
+ }
- token.ThrowIfCancellationRequested();
+ ///
+ public bool TryGet(string key, IBufferWriter destination) => TryGetAsync(key, destination).GetAwaiter().GetResult();
- var kvStore = await GetKVStore().ConfigureAwait(false);
- var ttl = GetTTL(options);
- var entry = CreateCacheEntry(key, value, options);
+ ///
+ public async ValueTask TryGetAsync(string key, IBufferWriter destination, CancellationToken token = default)
+ {
+ ArgumentNullException.ThrowIfNull(key);
+ ArgumentNullException.ThrowIfNull(destination);
+ token.ThrowIfCancellationRequested();
- if (ttl.HasValue)
- {
- await kvStore.PutAsync(GetKeyPrefix(key), entry, ttl.Value, serializer: _cacheEntrySerializer, token)
- .ConfigureAwait(false);
- }
- else
+ try
+ {
+ var result = await GetAsync(key, token).ConfigureAwait(false);
+ if (result != null)
{
- await kvStore.PutAsync(GetKeyPrefix(key), entry, serializer: _cacheEntrySerializer, token).ConfigureAwait(false);
+ destination.Write(result);
+ return true;
}
}
-
- ///
- /// Refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any).
- ///
- /// The key to refresh.
- public void Refresh(string key)
+ catch
{
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
-
- GetAndRefresh(key, getData: false);
+ // Ignore failures here; they will surface later
}
- ///
- /// Asynchronously refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any).
- ///
- /// The key to refresh.
- /// Optional. A to cancel the operation.
- /// A that represents the asynchronous refresh operation.
- public async Task RefreshAsync(string key, CancellationToken token = default)
- {
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
+ return false;
+ }
- token.ThrowIfCancellationRequested();
+ public void Dispose()
+ {
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
+ }
- await GetAndRefreshAsync(key, getData: false, token: token).ConfigureAwait(false);
- }
+ // This is the method used by hybrid caching to determine if it should use the distributed instance
+ internal virtual bool IsHybridCacheActive() => false;
- ///
- /// Removes the value with the given key.
- ///
- /// The key to remove the value for.
- public void Remove(string key)
+ protected virtual void Dispose(bool disposing)
+ {
+ if (_disposed)
{
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
-
- GetKVStore().GetAwaiter().GetResult().DeleteAsync(GetKeyPrefix(key)).GetAwaiter().GetResult();
+ return;
}
- ///
- /// Asynchronously removes the value with the given key.
- ///
- /// The key to remove the value for.
- /// Optional. A to cancel the operation.
- /// A that represents the asynchronous remove operation.
- public async Task RemoveAsync(string key, CancellationToken token = default)
+ if (disposing)
{
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
+ // Dispose managed state (managed objects)
+ _connectionLock.Dispose();
+ _kvStore = null; // Set to null to ensure we don't use it after dispose
+ }
- token.ThrowIfCancellationRequested();
+ // Free unmanaged resources (unmanaged objects) and override finalizer
+ _disposed = true;
+ }
- var kvStore = await GetKVStore().ConfigureAwait(false);
- await kvStore.DeleteAsync(GetKeyPrefix(key), cancellationToken: token).ConfigureAwait(false);
+ private static TimeSpan? GetTtl(DistributedCacheEntryOptions options)
+ {
+ if (options.AbsoluteExpiration.HasValue && options.AbsoluteExpiration.Value <= DateTimeOffset.Now)
+ {
+ throw new ArgumentOutOfRangeException(
+ nameof(DistributedCacheEntryOptions.AbsoluteExpiration),
+ options.AbsoluteExpiration.Value,
+ "The absolute expiration value must be in the future.");
}
- private async Task GetKVStore()
+ if (options.AbsoluteExpirationRelativeToNow.HasValue &&
+ options.AbsoluteExpirationRelativeToNow.Value <= TimeSpan.Zero)
{
- if (_kvStore != null && !_disposed)
- {
- return _kvStore;
- }
-
- await _connectionLock.WaitAsync().ConfigureAwait(false);
- try
- {
- if (_kvStore == null || _disposed)
- {
- if (_disposed)
- {
- throw new ObjectDisposedException(nameof(NatsCache));
- }
-
- if (string.IsNullOrEmpty(_options.BucketName))
- {
- throw new InvalidOperationException("BucketName is required and cannot be null or empty.");
- }
-
- var jsContext = _natsConnection.CreateJetStreamContext();
- var kvContext = new NatsKVContext(jsContext);
- _kvStore = (NatsKVStore)await kvContext.GetStoreAsync(_options.BucketName).ConfigureAwait(false);
- if (_kvStore == null)
- {
- throw new InvalidOperationException("Failed to create NATS KV store");
- }
-
- LogConnected();
- }
- }
- catch (Exception ex)
- {
- LogException(ex);
- throw;
- }
- finally
- {
- _connectionLock.Release();
- }
-
- return _kvStore;
+ throw new ArgumentOutOfRangeException(
+ nameof(DistributedCacheEntryOptions.AbsoluteExpirationRelativeToNow),
+ options.AbsoluteExpirationRelativeToNow.Value,
+ "The relative expiration value must be positive.");
}
- private static DistributedCacheEntryOptions GetExpirationOptions(
- string? absoluteExpiration,
- string? slidingExpiration)
+ if (options.SlidingExpiration.HasValue && options.SlidingExpiration.Value <= TimeSpan.Zero)
{
- var options = new DistributedCacheEntryOptions();
- if (absoluteExpiration != null)
- {
- options.AbsoluteExpiration = DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(absoluteExpiration));
- }
-
- if (slidingExpiration != null)
- {
- options.SlidingExpiration = TimeSpan.FromMilliseconds(long.Parse(slidingExpiration));
- }
-
- return options;
+ throw new ArgumentOutOfRangeException(
+ nameof(DistributedCacheEntryOptions.SlidingExpiration),
+ options.SlidingExpiration.Value,
+ "The sliding expiration value must be positive.");
}
- private TimeSpan? GetTTL(DistributedCacheEntryOptions options)
+ var absoluteExpiration = options.AbsoluteExpiration;
+ if (options.AbsoluteExpirationRelativeToNow.HasValue)
{
- if (options.AbsoluteExpiration.HasValue && options.AbsoluteExpiration.Value <= DateTimeOffset.Now)
- {
- throw new ArgumentOutOfRangeException(
- nameof(DistributedCacheEntryOptions.AbsoluteExpiration),
- options.AbsoluteExpiration.Value,
- "The absolute expiration value must be in the future.");
- }
-
- if (options.AbsoluteExpirationRelativeToNow.HasValue &&
- options.AbsoluteExpirationRelativeToNow.Value <= TimeSpan.Zero)
- {
- throw new ArgumentOutOfRangeException(
- nameof(DistributedCacheEntryOptions.AbsoluteExpirationRelativeToNow),
- options.AbsoluteExpirationRelativeToNow.Value,
- "The relative expiration value must be positive.");
- }
-
- if (options.SlidingExpiration.HasValue && options.SlidingExpiration.Value <= TimeSpan.Zero)
- {
- throw new ArgumentOutOfRangeException(
- nameof(DistributedCacheEntryOptions.SlidingExpiration),
- options.SlidingExpiration.Value,
- "The sliding expiration value must be positive.");
- }
-
- var absoluteExpiration = options.AbsoluteExpiration;
- if (options.AbsoluteExpirationRelativeToNow.HasValue)
- {
- absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value);
- }
-
- if (absoluteExpiration.HasValue)
- {
- var ttl = absoluteExpiration.Value - DateTimeOffset.Now;
- if (ttl.TotalMilliseconds <= 0)
- {
- // Value is in the past, remove it
- return TimeSpan.Zero;
- }
-
- // If there's also a sliding expiration, use the minimum of the two
- if (options.SlidingExpiration.HasValue)
- {
- return TimeSpan.FromTicks(Math.Min(ttl.Ticks, options.SlidingExpiration.Value.Ticks));
- }
-
- return ttl;
- }
+ absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value);
+ }
+ if (!absoluteExpiration.HasValue)
+ {
return options.SlidingExpiration;
}
- private CacheEntry CreateCacheEntry(string key, byte[] value, DistributedCacheEntryOptions options)
+ var ttl = absoluteExpiration.Value - DateTimeOffset.Now;
+ if (ttl.TotalMilliseconds <= 0)
{
- var absoluteExpiration = options.AbsoluteExpiration;
- if (options.AbsoluteExpirationRelativeToNow.HasValue)
- {
- absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value);
- }
-
- var cacheEntry = new CacheEntry { Data = value };
-
- if (absoluteExpiration.HasValue)
- {
- cacheEntry.AbsoluteExpiration = absoluteExpiration.Value.ToUnixTimeMilliseconds().ToString();
- }
+ // Value is in the past, remove it
+ return TimeSpan.Zero;
+ }
- if (options.SlidingExpiration.HasValue)
- {
- cacheEntry.SlidingExpiration = options.SlidingExpiration.Value.TotalMilliseconds.ToString();
- }
+ // If there's also a sliding expiration, use the minimum of the two
+ return options.SlidingExpiration.HasValue
+ ? TimeSpan.FromTicks(Math.Min(ttl.Ticks, options.SlidingExpiration.Value.Ticks))
+ : ttl;
+ }
- return cacheEntry;
+ private static CacheEntry CreateCacheEntry(byte[] value, DistributedCacheEntryOptions options)
+ {
+ var absoluteExpiration = options.AbsoluteExpiration;
+ if (options.AbsoluteExpirationRelativeToNow.HasValue)
+ {
+ absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value);
}
- private byte[]? GetAndRefresh(string key, bool getData)
- {
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
+ var cacheEntry = new CacheEntry { Data = value };
- return GetAndRefreshAsync(key, getData).GetAwaiter().GetResult();
+ if (absoluteExpiration.HasValue)
+ {
+ cacheEntry.AbsoluteExpiration = absoluteExpiration.Value.ToUnixTimeMilliseconds().ToString();
}
- private async Task GetAndRefreshAsync(string key, bool getData, CancellationToken token = default)
+ if (options.SlidingExpiration.HasValue)
{
- token.ThrowIfCancellationRequested();
- var kvStore = await GetKVStore().ConfigureAwait(false);
- var prefixedKey = GetKeyPrefix(key);
- try
- {
- var kvEntry = await kvStore.GetEntryAsync(prefixedKey, serializer: _cacheEntrySerializer, cancellationToken: token)
- .ConfigureAwait(false);
-
- // Check if the value is null
- if (kvEntry.Value == null)
- {
- return null;
- }
-
- // Check absolute expiration
- if (kvEntry.Value.AbsoluteExpiration != null)
- {
- var absoluteExpiration =
- DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(kvEntry.Value.AbsoluteExpiration));
- if (absoluteExpiration <= DateTimeOffset.Now)
- {
- // NatsKVWrongLastRevisionException is caught below
- await kvStore.DeleteAsync(
- prefixedKey,
- new NatsKVDeleteOpts { Revision = kvEntry.Revision },
- cancellationToken: token)
- .ConfigureAwait(false);
-
- return null;
- }
- }
-
- // Refresh if sliding expiration exists
- if (kvEntry.Value.SlidingExpiration != null)
- {
- var slidingExpirationMilliseconds = long.Parse(kvEntry.Value.SlidingExpiration);
- var slidingExpiration = TimeSpan.FromMilliseconds(slidingExpirationMilliseconds);
-
- if (slidingExpiration > TimeSpan.Zero)
- {
- await UpdateEntryExpirationAsync(kvStore, prefixedKey, kvEntry, token)
- .ConfigureAwait(false);
- }
- }
-
- return getData ? kvEntry.Value.Data : null;
- }
- catch (NatsKVWrongLastRevisionException)
- {
- // Optimistic concurrency control failed, someone else updated it
- // That's fine, we just retry the get operation
- LogUpdateFailed(key);
+ cacheEntry.SlidingExpiration = options.SlidingExpiration.Value.TotalMilliseconds.ToString();
+ }
- // Try once more to get the latest value
- try
- {
- var kvEntry = await kvStore.GetEntryAsync(prefixedKey, serializer: _cacheEntrySerializer, cancellationToken: token)
- .ConfigureAwait(false);
+ return cacheEntry;
+ }
- // Check if the value is null
- if (kvEntry.Value == null)
- {
- return null;
- }
+ private string GetKeyPrefix(string key) => string.IsNullOrEmpty(_instanceName)
+ ? key
+ : _instanceName + ":" + key;
- return getData ? kvEntry.Value.Data : null;
- }
- catch (Exception ex)
- {
- LogException(ex);
- return null;
- }
- }
- catch (Exception ex)
- {
- LogException(ex);
- throw;
- }
+ private async Task GetKVStore()
+ {
+ if (_kvStore != null && !_disposed)
+ {
+ return _kvStore;
}
- private async Task UpdateEntryExpirationAsync(NatsKVStore kvStore, string key, NatsKVEntry kvEntry,
- CancellationToken token)
+ await _connectionLock.WaitAsync().ConfigureAwait(false);
+ try
{
- // Calculate new TTL based on sliding expiration
- TimeSpan? ttl = null;
-
- // If we have a sliding expiration, use it as the TTL
- if (kvEntry.Value.SlidingExpiration != null)
+ if (_kvStore == null || _disposed)
{
- ttl = TimeSpan.FromMilliseconds(long.Parse(kvEntry.Value.SlidingExpiration));
-
- // If we also have an absolute expiration, make sure we don't exceed it
- if (kvEntry.Value.AbsoluteExpiration != null)
+ ObjectDisposedException.ThrowIf(_disposed, this);
+ if (string.IsNullOrEmpty(_options.BucketName))
{
- var absoluteExpiration =
- DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(kvEntry.Value.AbsoluteExpiration));
- var remainingTime = absoluteExpiration - DateTimeOffset.Now;
-
- // Use the minimum of sliding window or remaining absolute time
- if (remainingTime > TimeSpan.Zero && remainingTime < ttl)
- {
- ttl = remainingTime;
- }
+ throw new InvalidOperationException("BucketName is required and cannot be null or empty.");
}
- }
- if (ttl.HasValue && ttl.Value > TimeSpan.Zero)
- {
- // Use optimistic concurrency control with the last revision
- try
- {
- await kvStore.UpdateAsync(key, kvEntry.Value, kvEntry.Revision, ttl.Value, serializer: _cacheEntrySerializer, cancellationToken: token)
- .ConfigureAwait(false);
- }
- catch (NatsKVWrongLastRevisionException)
+ var jsContext = _natsConnection.CreateJetStreamContext();
+ var kvContext = new NatsKVContext(jsContext);
+ _kvStore = (NatsKVStore)await kvContext.GetStoreAsync(_options.BucketName).ConfigureAwait(false);
+ if (_kvStore == null)
{
- // Someone else updated it; that's fine, we'll get the latest version next time
- LogUpdateFailed(key.Replace(GetKeyPrefix(string.Empty), string.Empty));
+ throw new InvalidOperationException("Failed to create NATS KV store");
}
- }
- }
-
- public void Dispose()
- {
- if (!_disposed)
- {
- _connectionLock.Dispose();
- _disposed = true;
-
- // Set to null to ensure we don't use it after dispose
- _kvStore = null;
- }
- }
- ///
- public bool TryGetValue(string key, out ReadOnlySequence sequence)
- {
- sequence = ReadOnlySequence.Empty;
-
- try
- {
- var result = Get(key);
- if (result != null)
- {
- sequence = new ReadOnlySequence(result);
- return true;
- }
- }
- catch
- {
- // Ignore failures here; they will surface later
+ LogConnected();
}
-
- return false;
}
-
- ///
- public async ValueTask TryGetValueAsync(string key, Memory destination,
- CancellationToken token = default)
+ catch (Exception ex)
{
- try
- {
- var result = await GetAsync(key, token).ConfigureAwait(false);
- if (result != null)
- {
- if (result.Length <= destination.Length)
- {
- result.CopyTo(destination);
- return true;
- }
- }
- }
- catch
- {
- // Ignore failures here; they will surface later
- }
-
- return false;
+ LogException(ex);
+ throw;
}
-
- ///
- public async ValueTask GetAsync(string key, Stream destination, CancellationToken token = default)
+ finally
{
- try
- {
- var result = await GetAsync(key, token).ConfigureAwait(false);
- if (result != null)
- {
- await destination.WriteAsync(result, token).ConfigureAwait(false);
- return true;
- }
- }
- catch
- {
- // Ignore failures here; they will surface later
- }
-
- return false;
+ _connectionLock.Release();
}
- ///
- public async ValueTask GetAndRefreshAsync(string key, Stream destination, bool getData,
- CancellationToken token = default)
- {
- try
- {
- var result = await GetAndRefreshAsync(key, getData, token).ConfigureAwait(false);
- if (result != null)
- {
- await destination.WriteAsync(result, token).ConfigureAwait(false);
- return true;
- }
- }
- catch
- {
- // Ignore failures here; they will surface later
- }
+ return _kvStore;
+ }
- return false;
- }
+ private async Task GetAndRefreshAsync(string key, bool getData, bool retry, CancellationToken token = default)
+ {
+ token.ThrowIfCancellationRequested();
- ///
- public bool TryGet(string key, IBufferWriter destination)
+ var kvStore = await GetKVStore().ConfigureAwait(false);
+ var prefixedKey = GetKeyPrefix(key);
+ try
{
- if (key == null)
+ var natsResult = await kvStore.TryGetEntryAsync(prefixedKey, serializer: _cacheEntrySerializer, cancellationToken: token).ConfigureAwait(false);
+ if (!natsResult.Success)
{
- throw new ArgumentNullException(nameof(key));
+ return null;
}
- if (destination == null)
+ var kvEntry = natsResult.Value;
+ if (kvEntry.Value == null)
{
- throw new ArgumentNullException(nameof(destination));
+ return null;
}
- try
+ // Check absolute expiration
+ if (kvEntry.Value.AbsoluteExpiration != null
+ && DateTimeOffset.Now > DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(kvEntry.Value.AbsoluteExpiration)))
{
- var result = Get(key);
- if (result != null)
- {
- destination.Write(result);
- return true;
- }
- }
- catch
- {
- // Ignore failures here; they will surface later
+ // NatsKVWrongLastRevisionException is caught below
+ var natsDeleteOpts = new NatsKVDeleteOpts { Revision = kvEntry.Revision };
+ await RemoveAsync(key, natsDeleteOpts, token).ConfigureAwait(false);
+ return null;
}
- return false;
+ await UpdateEntryExpirationAsync(kvStore, prefixedKey, kvEntry, token).ConfigureAwait(false);
+ return getData ? kvEntry.Value.Data : null;
}
-
- ///
- public async ValueTask TryGetAsync(string key, IBufferWriter destination,
- CancellationToken token = default)
+ catch (NatsKVWrongLastRevisionException ex)
{
- if (key == null)
+ // Optimistic concurrency control failed, someone else updated it
+ LogUpdateFailed(key);
+ if (retry)
{
- throw new ArgumentNullException(nameof(key));
+ return await GetAndRefreshAsync(key, getData, retry: false, token).ConfigureAwait(false);
}
-
- if (destination == null)
- {
- throw new ArgumentNullException(nameof(destination));
- }
-
- token.ThrowIfCancellationRequested();
-
- try
- {
- var result = await GetAsync(key, token).ConfigureAwait(false);
- if (result != null)
- {
- destination.Write(result);
- return true;
- }
- }
- catch
+ else
{
- // Ignore failures here; they will surface later
+ LogException(ex);
+ return null;
}
-
- return false;
}
+ catch (Exception ex)
+ {
+ LogException(ex);
+ throw;
+ }
+ }
- ///
- public void Set(string key, ReadOnlySequence value, DistributedCacheEntryOptions options)
+ private async Task UpdateEntryExpirationAsync(NatsKVStore kvStore, string key, NatsKVEntry kvEntry, CancellationToken token)
+ {
+ if (kvEntry.Value?.SlidingExpiration == null)
{
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
+ return;
+ }
- if (options == null)
- {
- throw new ArgumentNullException(nameof(options));
- }
+ // If we have a sliding expiration, use it as the TTL
+ var ttl = TimeSpan.FromMilliseconds(long.Parse(kvEntry.Value.SlidingExpiration));
- byte[] array;
+ // If we also have an absolute expiration, make sure we don't exceed it
+ if (kvEntry.Value.AbsoluteExpiration != null)
+ {
+ var absoluteExpiration =
+ DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(kvEntry.Value.AbsoluteExpiration));
+ var remainingTime = absoluteExpiration - DateTimeOffset.Now;
- if (value.IsSingleSegment)
- {
- array = value.First.ToArray();
- }
- else
+ // Use the minimum of sliding window or remaining absolute time
+ if (remainingTime > TimeSpan.Zero && remainingTime < ttl)
{
- array = value.ToArray();
+ ttl = remainingTime;
}
-
- Set(key, array, options);
}
- ///
- public async ValueTask SetAsync(string key, ReadOnlySequence value, DistributedCacheEntryOptions options,
- CancellationToken token = default)
+ if (ttl > TimeSpan.Zero)
{
- if (key == null)
- {
- throw new ArgumentNullException(nameof(key));
- }
-
- if (options == null)
- {
- throw new ArgumentNullException(nameof(options));
- }
-
- token.ThrowIfCancellationRequested();
-
- byte[] array;
-
- if (value.IsSingleSegment)
+ // Use optimistic concurrency control with the last revision
+ try
{
- array = value.First.ToArray();
+ await kvStore.UpdateAsync(key, kvEntry.Value, kvEntry.Revision, ttl, serializer: _cacheEntrySerializer, cancellationToken: token).ConfigureAwait(false);
}
- else
+ catch (NatsKVWrongLastRevisionException)
{
- array = value.ToArray();
+ // Someone else updated it; that's fine, we'll get the latest version next time
+ LogUpdateFailed(key.Replace(GetKeyPrefix(string.Empty), string.Empty));
}
-
- await SetAsync(key, array, options, token).ConfigureAwait(false);
}
}
}
+
+[JsonSerializable(typeof(CacheEntry))]
+internal partial class CacheEntryJsonContext : JsonSerializerContext
+{
+}
diff --git a/src/CodeCargo.NatsDistributedCache/NatsCacheImpl.cs b/src/CodeCargo.NatsDistributedCache/NatsCacheImpl.cs
index 23a9b83..ec1d3d5 100644
--- a/src/CodeCargo.NatsDistributedCache/NatsCacheImpl.cs
+++ b/src/CodeCargo.NatsDistributedCache/NatsCacheImpl.cs
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
-using System;
using Microsoft.Extensions.Caching.Hybrid;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -14,9 +13,6 @@ internal sealed class NatsCacheImpl : NatsCache
{
private readonly IServiceProvider _services;
- internal override bool IsHybridCacheActive()
- => _services.GetService() is not null;
-
public NatsCacheImpl(IOptions optionsAccessor, ILogger logger, IServiceProvider services, INatsConnection natsConnection)
: base(optionsAccessor, logger, natsConnection)
{
@@ -28,5 +24,8 @@ public NatsCacheImpl(IOptions optionsAccessor, IServiceProvide
{
_services = services; // important: do not check for HybridCache here due to dependency - creates a cycle
}
+
+ internal override bool IsHybridCacheActive()
+ => _services.GetService() is not null;
}
}
diff --git a/src/CodeCargo.NatsDistributedCache/NatsCacheOptions.cs b/src/CodeCargo.NatsDistributedCache/NatsCacheOptions.cs
index 99e7b91..d481be9 100644
--- a/src/CodeCargo.NatsDistributedCache/NatsCacheOptions.cs
+++ b/src/CodeCargo.NatsDistributedCache/NatsCacheOptions.cs
@@ -1,7 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
-using System;
using Microsoft.Extensions.Options;
namespace CodeCargo.NatsDistributedCache
@@ -22,9 +21,6 @@ public class NatsCacheOptions : IOptions
///
public string? InstanceName { get; set; }
- NatsCacheOptions IOptions.Value
- {
- get { return this; }
- }
+ NatsCacheOptions IOptions.Value => this;
}
}
diff --git a/src/CodeCargo.NatsDistributedCache/NatsCacheServiceCollectionExtensions.cs b/src/CodeCargo.NatsDistributedCache/NatsCacheServiceCollectionExtensions.cs
index 485d2d0..0ed59e0 100644
--- a/src/CodeCargo.NatsDistributedCache/NatsCacheServiceCollectionExtensions.cs
+++ b/src/CodeCargo.NatsDistributedCache/NatsCacheServiceCollectionExtensions.cs
@@ -1,10 +1,8 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
-using System;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NATS.Client.Core;
@@ -25,15 +23,8 @@ public static class NatsCacheServiceCollectionExtensions
/// The so that additional calls can be chained.
public static IServiceCollection AddNatsDistributedCache(this IServiceCollection services, Action setupAction)
{
- if (services == null)
- {
- throw new ArgumentNullException(nameof(services));
- }
-
- if (setupAction == null)
- {
- throw new ArgumentNullException(nameof(setupAction));
- }
+ ArgumentNullException.ThrowIfNull(services);
+ ArgumentNullException.ThrowIfNull(setupAction);
services.AddOptions();
services.Configure(setupAction);
diff --git a/test/IntegrationTests/CacheServiceExtensionsTests.cs b/test/IntegrationTests/CacheServiceExtensionsTests.cs
index 1a0e66d..90a22e9 100644
--- a/test/IntegrationTests/CacheServiceExtensionsTests.cs
+++ b/test/IntegrationTests/CacheServiceExtensionsTests.cs
@@ -1,12 +1,4 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
-using Microsoft.Extensions.DependencyInjection;
-using Microsoft.Extensions.Options;
-using Xunit;
namespace CodeCargo.NatsDistributedCache.IntegrationTests;
diff --git a/test/IntegrationTests/NatsCacheSetAndRemoveTests.cs b/test/IntegrationTests/NatsCacheSetAndRemoveTests.cs
index a78184f..3132767 100644
--- a/test/IntegrationTests/NatsCacheSetAndRemoveTests.cs
+++ b/test/IntegrationTests/NatsCacheSetAndRemoveTests.cs
@@ -1,11 +1,6 @@
-using System;
using System.Runtime.CompilerServices;
using System.Text;
-using System.Threading.Tasks;
-using CodeCargo.NatsDistributedCache.IntegrationTests;
using Microsoft.Extensions.Caching.Distributed;
-using Microsoft.Extensions.Options;
-using Xunit;
namespace CodeCargo.NatsDistributedCache.IntegrationTests;
@@ -17,16 +12,6 @@ public NatsCacheSetAndRemoveTests(NatsIntegrationFixture fixture)
{
}
- private IDistributedCache CreateCacheInstance()
- {
- return new NatsCache(
- Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
- {
- BucketName = "cache"
- }),
- NatsConnection);
- }
-
[Fact]
public void GetMissingKeyReturnsNull()
{
@@ -114,6 +99,7 @@ public void SetGetNonNullString(string payload)
// check raw bytes
var raw = cache.Get(key);
+ Assert.NotNull(raw);
Assert.Equal(Hex(payload), Hex(raw));
// check via string API
@@ -137,6 +123,7 @@ public async Task SetGetNonNullStringAsync(string payload)
// check raw bytes
var raw = await cache.GetAsync(key);
+ Assert.NotNull(raw);
Assert.Equal(Hex(payload), Hex(raw));
// check via string API
@@ -150,4 +137,14 @@ public async Task SetGetNonNullStringAsync(string payload)
private static string Hex(string value) => Hex(Encoding.UTF8.GetBytes(value));
private static string Me([CallerMemberName] string caller = "") => caller;
+
+ private IDistributedCache CreateCacheInstance()
+ {
+ return new NatsCache(
+ Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
+ {
+ BucketName = "cache"
+ }),
+ NatsConnection);
+ }
}
diff --git a/test/IntegrationTests/NatsCollection.cs b/test/IntegrationTests/NatsCollection.cs
index 0408380..2ff557c 100644
--- a/test/IntegrationTests/NatsCollection.cs
+++ b/test/IntegrationTests/NatsCollection.cs
@@ -1,5 +1,3 @@
-using Xunit;
-
namespace CodeCargo.NatsDistributedCache.IntegrationTests;
[CollectionDefinition(Name)]
diff --git a/test/IntegrationTests/TestBase.cs b/test/IntegrationTests/TestBase.cs
index 3aa8bf6..28c51ca 100644
--- a/test/IntegrationTests/TestBase.cs
+++ b/test/IntegrationTests/TestBase.cs
@@ -1,8 +1,6 @@
using CodeCargo.NatsDistributedCache.TestUtils.Services.Logging;
using Microsoft.Extensions.Logging;
using NATS.Client.Core;
-using NATS.Client.KeyValueStore;
-using NATS.Net;
namespace CodeCargo.NatsDistributedCache.IntegrationTests;
@@ -21,11 +19,7 @@ protected TestBase(NatsIntegrationFixture fixture)
{
// Get the test output helper from the current test context
var testContext = TestContext.Current;
- var output = testContext.TestOutputHelper;
- if (output == null)
- {
- throw new InvalidOperationException("TestOutputHelper was not available in the current test context");
- }
+ var output = testContext.TestOutputHelper ?? throw new InvalidOperationException("TestOutputHelper was not available in the current test context");
// Create a service collection and configure logging
var services = new ServiceCollection();
diff --git a/test/IntegrationTests/TestHelpers/ExceptionAssert.cs b/test/IntegrationTests/TestHelpers/ExceptionAssert.cs
index 1734381..563026a 100644
--- a/test/IntegrationTests/TestHelpers/ExceptionAssert.cs
+++ b/test/IntegrationTests/TestHelpers/ExceptionAssert.cs
@@ -1,6 +1,3 @@
-using System;
-using Xunit;
-
namespace CodeCargo.NatsDistributedCache.IntegrationTests.TestHelpers;
public static class ExceptionAssert
diff --git a/test/IntegrationTests/TimeExpirationAsyncTests.cs b/test/IntegrationTests/TimeExpirationAsyncTests.cs
index 91871ef..e6c455f 100644
--- a/test/IntegrationTests/TimeExpirationAsyncTests.cs
+++ b/test/IntegrationTests/TimeExpirationAsyncTests.cs
@@ -1,9 +1,5 @@
-using System;
using System.Runtime.CompilerServices;
-using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
-using Microsoft.Extensions.Options;
-using Xunit;
namespace CodeCargo.NatsDistributedCache.IntegrationTests;
@@ -15,36 +11,6 @@ public TimeExpirationAsyncTests(NatsIntegrationFixture fixture)
{
}
- private IDistributedCache CreateCacheInstance()
- {
- return new NatsCache(
- Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
- {
- BucketName = "cache"
- }),
- NatsConnection);
- }
-
- // async twin to ExceptionAssert.ThrowsArgumentOutOfRange
- private static async Task ThrowsArgumentOutOfRangeAsync(Func test, string paramName, string message, object actualValue)
- {
- var ex = await Assert.ThrowsAsync(test);
- if (paramName is not null)
- {
- Assert.Equal(paramName, ex.ParamName);
- }
-
- if (message is not null)
- {
- Assert.StartsWith(message, ex.Message); // can have "\r\nParameter name:" etc
- }
-
- if (actualValue is not null)
- {
- Assert.Equal(actualValue, ex.ActualValue);
- }
- }
-
// AbsoluteExpirationInThePastThrowsAsync test moved to UnitTests/TimeExpirationAsyncUnitTests.cs
[Fact]
public async Task AbsoluteExpirationExpiresAsync()
@@ -53,7 +19,7 @@ public async Task AbsoluteExpirationExpiresAsync()
var key = await GetNameAndReset(cache);
var value = new byte[1];
- await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1)));
+ await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1.1)));
var result = await cache.GetAsync(key);
Assert.Equal(value, result);
@@ -67,19 +33,6 @@ public async Task AbsoluteExpirationExpiresAsync()
Assert.Null(result);
}
- [Fact]
- public async Task AbsoluteSubSecondExpirationExpiresImmediatelyAsync()
- {
- var cache = CreateCacheInstance();
- var key = await GetNameAndReset(cache);
- var value = new byte[1];
-
- await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(0.25)));
-
- var result = await cache.GetAsync(key);
- Assert.Null(result);
- }
-
// NegativeRelativeExpirationThrowsAsync test moved to UnitTests/TimeExpirationAsyncUnitTests.cs
// ZeroRelativeExpirationThrowsAsync test moved to UnitTests/TimeExpirationAsyncUnitTests.cs
@@ -90,7 +43,7 @@ public async Task RelativeExpirationExpiresAsync()
var key = await GetNameAndReset(cache);
var value = new byte[1];
- await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1)));
+ await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1.1)));
var result = await cache.GetAsync(key);
Assert.Equal(value, result);
@@ -160,14 +113,14 @@ public async Task SlidingExpirationRenewedByAccessUntilAbsoluteExpirationAsync()
var value = new byte[1];
await cache.SetAsync(key, value, new DistributedCacheEntryOptions()
- .SetSlidingExpiration(TimeSpan.FromSeconds(1))
- .SetAbsoluteExpiration(TimeSpan.FromSeconds(3)));
+ .SetSlidingExpiration(TimeSpan.FromSeconds(1.1))
+ .SetAbsoluteExpiration(TimeSpan.FromSeconds(4)));
var setTime = DateTime.Now;
var result = await cache.GetAsync(key);
Assert.Equal(value, result);
- for (var i = 0; i < 5; i++)
+ for (var i = 0; i < 4; i++)
{
await Task.Delay(TimeSpan.FromSeconds(0.5));
@@ -190,4 +143,34 @@ private static async Task GetNameAndReset(IDistributedCache cache, [Call
await cache.RemoveAsync(caller);
return caller;
}
+
+ // async twin to ExceptionAssert.ThrowsArgumentOutOfRange
+ private static async Task ThrowsArgumentOutOfRangeAsync(Func test, string paramName, string message, object actualValue)
+ {
+ var ex = await Assert.ThrowsAsync(test);
+ if (paramName is not null)
+ {
+ Assert.Equal(paramName, ex.ParamName);
+ }
+
+ if (message is not null)
+ {
+ Assert.StartsWith(message, ex.Message); // can have "\r\nParameter name:" etc
+ }
+
+ if (actualValue is not null)
+ {
+ Assert.Equal(actualValue, ex.ActualValue);
+ }
+ }
+
+ private IDistributedCache CreateCacheInstance()
+ {
+ return new NatsCache(
+ Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
+ {
+ BucketName = "cache"
+ }),
+ NatsConnection);
+ }
}
diff --git a/test/IntegrationTests/TimeExpirationTests.cs b/test/IntegrationTests/TimeExpirationTests.cs
index 90e7365..16e8cc9 100644
--- a/test/IntegrationTests/TimeExpirationTests.cs
+++ b/test/IntegrationTests/TimeExpirationTests.cs
@@ -1,10 +1,5 @@
-using System;
using System.Runtime.CompilerServices;
-using System.Threading;
-using CodeCargo.NatsDistributedCache.IntegrationTests.TestHelpers;
using Microsoft.Extensions.Caching.Distributed;
-using Microsoft.Extensions.Options;
-using Xunit;
namespace CodeCargo.NatsDistributedCache.IntegrationTests;
@@ -16,16 +11,6 @@ public TimeExpirationTests(NatsIntegrationFixture fixture)
{
}
- private IDistributedCache CreateCacheInstance()
- {
- return new NatsCache(
- Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
- {
- BucketName = "cache"
- }),
- NatsConnection);
- }
-
// AbsoluteExpirationInThePastThrows test moved to UnitTests/TimeExpirationUnitTests.cs
[Fact]
public void AbsoluteExpirationExpires()
@@ -34,7 +19,7 @@ public void AbsoluteExpirationExpires()
var key = GetNameAndReset(cache);
var value = new byte[1];
- cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1)));
+ cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1.1)));
var result = cache.Get(key);
Assert.Equal(value, result);
@@ -48,19 +33,6 @@ public void AbsoluteExpirationExpires()
Assert.Null(result);
}
- [Fact]
- public void AbsoluteSubSecondExpirationExpiresImmediately()
- {
- var cache = CreateCacheInstance();
- var key = GetNameAndReset(cache);
- var value = new byte[1];
-
- cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(0.25)));
-
- var result = cache.Get(key);
- Assert.Null(result);
- }
-
// NegativeRelativeExpirationThrows test moved to UnitTests/TimeExpirationUnitTests.cs
// ZeroRelativeExpirationThrows test moved to UnitTests/TimeExpirationUnitTests.cs
@@ -71,7 +43,7 @@ public void RelativeExpirationExpires()
var key = GetNameAndReset(cache);
var value = new byte[1];
- cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1)));
+ cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1.1)));
var result = cache.Get(key);
Assert.Equal(value, result);
@@ -141,14 +113,14 @@ public void SlidingExpirationRenewedByAccessUntilAbsoluteExpiration()
var value = new byte[1];
cache.Set(key, value, new DistributedCacheEntryOptions()
- .SetSlidingExpiration(TimeSpan.FromSeconds(1))
- .SetAbsoluteExpiration(TimeSpan.FromSeconds(3)));
+ .SetSlidingExpiration(TimeSpan.FromSeconds(1.1))
+ .SetAbsoluteExpiration(TimeSpan.FromSeconds(4)));
var setTime = DateTime.Now;
var result = cache.Get(key);
Assert.Equal(value, result);
- for (var i = 0; i < 5; i++)
+ for (var i = 0; i < 4; i++)
{
Thread.Sleep(TimeSpan.FromSeconds(0.5));
@@ -171,4 +143,14 @@ private static string GetNameAndReset(IDistributedCache cache, [CallerMemberName
cache.Remove(caller);
return caller;
}
+
+ private IDistributedCache CreateCacheInstance()
+ {
+ return new NatsCache(
+ Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
+ {
+ BucketName = "cache"
+ }),
+ NatsConnection);
+ }
}
diff --git a/test/UnitTests/CacheServiceExtensionsUnitTests.cs b/test/UnitTests/CacheServiceExtensionsUnitTests.cs
index 747418f..aaeba90 100644
--- a/test/UnitTests/CacheServiceExtensionsUnitTests.cs
+++ b/test/UnitTests/CacheServiceExtensionsUnitTests.cs
@@ -1,13 +1,8 @@
-using System;
-using System.Linq;
-using System.Threading;
-using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Moq;
using NATS.Client.Core;
-using Xunit;
namespace CodeCargo.NatsDistributedCache.UnitTests;
@@ -102,6 +97,10 @@ public void AddNatsCache_UsesCacheOptionsAction()
wasInvoked = true;
});
+ // Build service provider and resolve options to trigger the setup action
+ var sp = services.BuildServiceProvider();
+ _ = sp.GetRequiredService();
+
// Assert
Assert.True(wasInvoked);
}
diff --git a/test/UnitTests/NatsCacheSetAndRemoveUnitTests.cs b/test/UnitTests/NatsCacheSetAndRemoveUnitTests.cs
index 93fb804..ae4bddc 100644
--- a/test/UnitTests/NatsCacheSetAndRemoveUnitTests.cs
+++ b/test/UnitTests/NatsCacheSetAndRemoveUnitTests.cs
@@ -1,11 +1,6 @@
-using System;
-using System.Threading;
-using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
-using Microsoft.Extensions.Options;
using Moq;
using NATS.Client.Core;
-using Xunit;
namespace CodeCargo.NatsDistributedCache.UnitTests;
@@ -18,6 +13,16 @@ public NatsCacheSetAndRemoveUnitTests()
_mockNatsConnection = new Mock();
}
+ [Fact]
+ public void SetNullValueThrows()
+ {
+ var cache = CreateCacheInstance();
+ byte[] value = null!;
+ var key = "myKey";
+
+ Assert.Throws(() => cache.Set(key, value));
+ }
+
private IDistributedCache CreateCacheInstance()
{
return new NatsCache(
@@ -27,14 +32,4 @@ private IDistributedCache CreateCacheInstance()
}),
_mockNatsConnection.Object);
}
-
- [Fact]
- public void SetNullValueThrows()
- {
- var cache = CreateCacheInstance();
- byte[] value = null;
- var key = "myKey";
-
- Assert.Throws(() => cache.Set(key, value));
- }
}
diff --git a/test/UnitTests/TestHelpers/ExceptionAssert.cs b/test/UnitTests/TestHelpers/ExceptionAssert.cs
index fb2acda..0cbe44a 100644
--- a/test/UnitTests/TestHelpers/ExceptionAssert.cs
+++ b/test/UnitTests/TestHelpers/ExceptionAssert.cs
@@ -1,6 +1,3 @@
-using System;
-using Xunit;
-
namespace CodeCargo.NatsDistributedCache.UnitTests.TestHelpers;
public static class ExceptionAssert
diff --git a/test/UnitTests/TimeExpirationAsyncUnitTests.cs b/test/UnitTests/TimeExpirationAsyncUnitTests.cs
index a649406..befc06a 100644
--- a/test/UnitTests/TimeExpirationAsyncUnitTests.cs
+++ b/test/UnitTests/TimeExpirationAsyncUnitTests.cs
@@ -1,11 +1,6 @@
-using System;
-using System.Runtime.CompilerServices;
-using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
-using Microsoft.Extensions.Options;
using Moq;
using NATS.Client.Core;
-using Xunit;
namespace CodeCargo.NatsDistributedCache.UnitTests;
@@ -18,36 +13,6 @@ public TimeExpirationAsyncUnitTests()
_mockNatsConnection = new Mock();
}
- private IDistributedCache CreateCacheInstance()
- {
- return new NatsCache(
- Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
- {
- BucketName = "cache"
- }),
- _mockNatsConnection.Object);
- }
-
- // async twin to ExceptionAssert.ThrowsArgumentOutOfRange
- private static async Task ThrowsArgumentOutOfRangeAsync(Func test, string paramName, string message, object actualValue)
- {
- var ex = await Assert.ThrowsAsync(test);
- if (paramName is not null)
- {
- Assert.Equal(paramName, ex.ParamName);
- }
-
- if (message is not null)
- {
- Assert.StartsWith(message, ex.Message); // can have "\r\nParameter name:" etc
- }
-
- if (actualValue is not null)
- {
- Assert.Equal(actualValue, ex.ActualValue);
- }
- }
-
[Fact]
public async Task AbsoluteExpirationInThePastThrowsAsync()
{
@@ -133,4 +98,34 @@ await ThrowsArgumentOutOfRangeAsync(
"The sliding expiration value must be positive.",
TimeSpan.Zero);
}
+
+ // async twin to ExceptionAssert.ThrowsArgumentOutOfRange
+ private static async Task ThrowsArgumentOutOfRangeAsync(Func test, string paramName, string message, object actualValue)
+ {
+ var ex = await Assert.ThrowsAsync(test);
+ if (paramName is not null)
+ {
+ Assert.Equal(paramName, ex.ParamName);
+ }
+
+ if (message is not null)
+ {
+ Assert.StartsWith(message, ex.Message); // can have "\r\nParameter name:" etc
+ }
+
+ if (actualValue is not null)
+ {
+ Assert.Equal(actualValue, ex.ActualValue);
+ }
+ }
+
+ private IDistributedCache CreateCacheInstance()
+ {
+ return new NatsCache(
+ Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
+ {
+ BucketName = "cache"
+ }),
+ _mockNatsConnection.Object);
+ }
}
diff --git a/test/UnitTests/TimeExpirationUnitTests.cs b/test/UnitTests/TimeExpirationUnitTests.cs
index 705e1c6..bdbaa17 100644
--- a/test/UnitTests/TimeExpirationUnitTests.cs
+++ b/test/UnitTests/TimeExpirationUnitTests.cs
@@ -1,12 +1,8 @@
-using System;
-using System.Runtime.CompilerServices;
-using System.Threading;
using CodeCargo.NatsDistributedCache.UnitTests.TestHelpers;
using Microsoft.Extensions.Caching.Distributed;
-using Microsoft.Extensions.Options;
+using Microsoft.Extensions.Logging;
using Moq;
using NATS.Client.Core;
-using Xunit;
namespace CodeCargo.NatsDistributedCache.UnitTests;
@@ -17,16 +13,12 @@ public class TimeExpirationUnitTests
public TimeExpirationUnitTests()
{
_mockNatsConnection = new Mock();
- }
- private IDistributedCache CreateCacheInstance()
- {
- return new NatsCache(
- Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
- {
- BucketName = "cache"
- }),
- _mockNatsConnection.Object);
+ // Setup the mock to properly handle the Opts property
+ var opts = new NatsOpts { LoggerFactory = new LoggerFactory() };
+ _mockNatsConnection.SetupGet(m => m.Opts).Returns(opts);
+ var connection = new NatsConnection(opts);
+ _mockNatsConnection.SetupGet(m => m.Connection).Returns(connection);
}
[Fact]
@@ -114,4 +106,14 @@ public void ZeroSlidingExpirationThrows()
"The sliding expiration value must be positive.",
TimeSpan.Zero);
}
+
+ private IDistributedCache CreateCacheInstance()
+ {
+ return new NatsCache(
+ Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions
+ {
+ BucketName = "cache"
+ }),
+ _mockNatsConnection.Object);
+ }
}
diff --git a/util/PerfTest/PerfTest.cs b/util/PerfTest/PerfTest.cs
index b2eb186..4439973 100644
--- a/util/PerfTest/PerfTest.cs
+++ b/util/PerfTest/PerfTest.cs
@@ -35,7 +35,7 @@ public async Task Run(CancellationToken cancellationToken)
await Task.WhenAll(printTask, watchTask);
}
- private Task StartPrintTask(CancellationToken ct) =>
+ private static Task StartWatchTask(CancellationToken ct) =>
Task.Run(
async () =>
{
@@ -43,18 +43,10 @@ private Task StartPrintTask(CancellationToken ct) =>
{
while (!ct.IsCancellationRequested)
{
- // Clear
- Console.Clear();
-
- // Print current statistics
- Console.WriteLine("======== Per Stats ========");
- Console.WriteLine($"Keys Inserted: {_stats["KeysInserted"]}");
- Console.WriteLine($"Keys Retrieved: {_stats["KeysRetrieved"]}");
- Console.WriteLine($"Keys Expired: {_stats["KeysExpired"]}");
- Console.WriteLine("===========================");
+ // TODO: Implement watching for cache operations and updating stats
- // Wait before printing again
- await Task.Delay(TimeSpan.FromSeconds(1), ct);
+ // Wait before checking again
+ await Task.Delay(100, ct);
}
}
catch (OperationCanceledException)
@@ -64,7 +56,7 @@ private Task StartPrintTask(CancellationToken ct) =>
},
ct);
- private static Task StartWatchTask(CancellationToken ct) =>
+ private Task StartPrintTask(CancellationToken ct) =>
Task.Run(
async () =>
{
@@ -72,10 +64,18 @@ private static Task StartWatchTask(CancellationToken ct) =>
{
while (!ct.IsCancellationRequested)
{
- // TODO: Implement watching for cache operations and updating stats
+ // Clear
+ Console.Clear();
- // Wait before checking again
- await Task.Delay(100, ct);
+ // Print current statistics
+ Console.WriteLine("======== Per Stats ========");
+ Console.WriteLine($"Keys Inserted: {_stats["KeysInserted"]}");
+ Console.WriteLine($"Keys Retrieved: {_stats["KeysRetrieved"]}");
+ Console.WriteLine($"Keys Expired: {_stats["KeysExpired"]}");
+ Console.WriteLine("===========================");
+
+ // Wait before printing again
+ await Task.Delay(TimeSpan.FromSeconds(1), ct);
}
}
catch (OperationCanceledException)