diff --git a/.gitignore b/.gitignore index 3388739..8bf07b5 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,9 @@ # User settings *.DotSettings.user *.sln.DotSettings + +# JetBrains +.idea + +# VSCode +.vscode diff --git a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/.gitignore b/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/.gitignore deleted file mode 100644 index 33d7e19..0000000 --- a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/.gitignore +++ /dev/null @@ -1,13 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Rider ignored files -/contentModel.xml -/projectSettingsUpdater.xml -/modules.xml -/.idea.CodeCargo.NatsDistributedCache.iml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/encodings.xml b/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/encodings.xml deleted file mode 100644 index df87cf9..0000000 --- a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/encodings.xml +++ /dev/null @@ -1,4 +0,0 @@ - - - - \ No newline at end of file diff --git a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/indexLayout.xml b/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/indexLayout.xml deleted file mode 100644 index 7b08163..0000000 --- a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/indexLayout.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/vcs.xml b/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/vcs.xml deleted file mode 100644 index 94a25f7..0000000 --- a/.idea/.idea.CodeCargo.NatsDistributedCache/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/src/CodeCargo.NatsDistributedCache/NatsCache.Log.cs b/src/CodeCargo.NatsDistributedCache/NatsCache.Log.cs index 2784715..6560393 100644 --- a/src/CodeCargo.NatsDistributedCache/NatsCache.Log.cs +++ b/src/CodeCargo.NatsDistributedCache/NatsCache.Log.cs @@ -1,45 +1,43 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System; using Microsoft.Extensions.Logging; -namespace CodeCargo.NatsDistributedCache +namespace CodeCargo.NatsDistributedCache; + +public partial class NatsCache { - public partial class NatsCache + private void LogException(Exception exception) + { + _logger.LogError(EventIds.Exception, exception, "An exception occurred in NATS KV store."); + } + + private void LogConnectionError(Exception exception) + { + _logger.LogError(EventIds.ConnectionError, exception, "Error connecting to NATS KV store."); + } + + private void LogConnectionIssue() + { + _logger.LogWarning(EventIds.ConnectionIssue, "Connection issue with NATS KV store."); + } + + private void LogConnected() + { + _logger.LogInformation(EventIds.Connected, "Connected to NATS KV store."); + } + + private void LogUpdateFailed(string key) + { + _logger.LogDebug(EventIds.UpdateFailed, "Sliding expiration update failed for key {Key} due to optimistic concurrency control", key); + } + + private static class EventIds { - private static class EventIds - { - public static readonly EventId ConnectionIssue = new EventId(100, nameof(ConnectionIssue)); - public static readonly EventId ConnectionError = new EventId(101, nameof(ConnectionError)); - public static readonly EventId Connected = new EventId(102, nameof(Connected)); - public static readonly EventId UpdateFailed = new EventId(103, nameof(UpdateFailed)); - public static readonly EventId Exception = new EventId(104, nameof(Exception)); - } - - private void LogException(Exception exception) - { - _logger.LogError(EventIds.Exception, exception, "An exception occurred in NATS KV store."); - } - - private void LogConnectionError(Exception exception) - { - _logger.LogError(EventIds.ConnectionError, exception, "Error connecting to NATS KV store."); - } - - private void LogConnectionIssue() - { - _logger.LogWarning(EventIds.ConnectionIssue, "Connection issue with NATS KV store."); - } - - private void LogConnected() - { - _logger.LogInformation(EventIds.Connected, "Connected to NATS KV store."); - } - - private void LogUpdateFailed(string key) - { - _logger.LogDebug(EventIds.UpdateFailed, "Sliding expiration update failed for key {Key} due to optimistic concurrency control", key); - } + public static readonly EventId ConnectionIssue = new EventId(100, nameof(ConnectionIssue)); + public static readonly EventId ConnectionError = new EventId(101, nameof(ConnectionError)); + public static readonly EventId Connected = new EventId(102, nameof(Connected)); + public static readonly EventId UpdateFailed = new EventId(103, nameof(UpdateFailed)); + public static readonly EventId Exception = new EventId(104, nameof(Exception)); } } diff --git a/src/CodeCargo.NatsDistributedCache/NatsCache.cs b/src/CodeCargo.NatsDistributedCache/NatsCache.cs index 76b5270..8afbab7 100644 --- a/src/CodeCargo.NatsDistributedCache/NatsCache.cs +++ b/src/CodeCargo.NatsDistributedCache/NatsCache.cs @@ -1,783 +1,427 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System; using System.Buffers; -using System.Diagnostics; -using System.IO; -using System.Text; -using System.Text.Json; using System.Text.Json.Serialization; -using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using NATS.Client.Core; -using NATS.Client.JetStream; using NATS.Client.KeyValueStore; using NATS.Net; -namespace CodeCargo.NatsDistributedCache +namespace CodeCargo.NatsDistributedCache; + +/// +/// Cache entry for storing in NATS Key-Value Store +/// +public class CacheEntry { - [JsonSerializable(typeof(CacheEntry))] - internal partial class CacheEntryJsonContext : JsonSerializerContext - { - } + [JsonPropertyName("absexp")] + public string? AbsoluteExpiration { get; set; } - /// - /// Cache entry for storing in NATS Key-Value Store - /// - public class CacheEntry + [JsonPropertyName("sldexp")] + public string? SlidingExpiration { get; set; } + + [JsonPropertyName("data")] + public byte[]? Data { get; set; } +} + +/// +/// Distributed cache implementation using NATS Key-Value Store. +/// +public partial class NatsCache : IBufferDistributedCache, IDisposable +{ + // Static JSON serializer for CacheEntry + private static readonly NatsJsonContextSerializer _cacheEntrySerializer = + new NatsJsonContextSerializer(CacheEntryJsonContext.Default); + + private readonly ILogger _logger; + private readonly NatsCacheOptions _options; + private readonly string _instanceName; + private readonly INatsConnection _natsConnection; + private readonly SemaphoreSlim _connectionLock = new(initialCount: 1, maxCount: 1); + private NatsKVStore? _kvStore; + private bool _disposed; + + public NatsCache( + IOptions optionsAccessor, + ILogger logger, + INatsConnection natsConnection) { - [JsonPropertyName("absexp")] - public string? AbsoluteExpiration { get; set; } + ArgumentNullException.ThrowIfNull(optionsAccessor); + ArgumentNullException.ThrowIfNull(natsConnection); - [JsonPropertyName("sldexp")] - public string? SlidingExpiration { get; set; } + _options = optionsAccessor.Value; + _logger = logger; + _natsConnection = natsConnection; + _instanceName = _options.InstanceName ?? string.Empty; - [JsonPropertyName("data")] - public byte[]? Data { get; set; } + // No need to connect immediately; will connect on-demand } - /// - /// Distributed cache implementation using NATS Key-Value Store. - /// - public partial class NatsCache : IBufferDistributedCache, IDisposable + public NatsCache(IOptions optionsAccessor, INatsConnection natsConnection) + : this(optionsAccessor, NullLogger.Instance, natsConnection) { - private const string AbsoluteExpirationKey = "absexp"; - private const string SlidingExpirationKey = "sldexp"; - private const string DataKey = "data"; - - // combined keys - same hash keys fetched constantly; avoid allocating an array each time - private static readonly string[] GetHashFieldsNoData = new[] { AbsoluteExpirationKey, SlidingExpirationKey }; - - private static readonly string[] GetHashFieldsWithData = - new[] { AbsoluteExpirationKey, SlidingExpirationKey, DataKey }; - - // Static JSON serializer for CacheEntry - private static readonly NatsJsonContextSerializer _cacheEntrySerializer = - new NatsJsonContextSerializer(CacheEntryJsonContext.Default); - - private readonly ILogger _logger; - private readonly NatsCacheOptions _options; - private readonly string _instanceName; - private readonly INatsConnection _natsConnection; - private NatsKVStore? _kvStore; - private bool _disposed; - private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1); - - public NatsCache(IOptions optionsAccessor, ILogger logger, - INatsConnection natsConnection) - { - if (optionsAccessor == null) - { - throw new ArgumentNullException(nameof(optionsAccessor)); - } + } - _options = optionsAccessor.Value; - _logger = logger; - _natsConnection = natsConnection ?? throw new ArgumentNullException(nameof(natsConnection)); - _instanceName = _options.InstanceName ?? string.Empty; + /// + public void Set(string key, byte[] value, DistributedCacheEntryOptions options) => SetAsync(key, value, options).GetAwaiter().GetResult(); - // No need to connect immediately; will connect on-demand - } + /// + public async Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(key); + ArgumentNullException.ThrowIfNull(value); + ArgumentNullException.ThrowIfNull(options); + token.ThrowIfCancellationRequested(); + + var ttl = GetTtl(options); + var entry = CreateCacheEntry(value, options); + var kvStore = await GetKVStore().ConfigureAwait(false); - public NatsCache(IOptions optionsAccessor, INatsConnection natsConnection) - : this(optionsAccessor, NullLogger.Instance, natsConnection) + try { + await kvStore.PutAsync(GetKeyPrefix(key), entry, ttl ?? default, _cacheEntrySerializer, token).ConfigureAwait(false); } - - // This is the method used by hybrid caching to determine if it should use the distributed instance - internal virtual bool IsHybridCacheActive() => false; - - private string GetKeyPrefix(string key) + catch (Exception ex) { - return string.IsNullOrEmpty(_instanceName) - ? key - : _instanceName + ":" + key; + LogException(ex); + throw; } + } - /// - /// Gets or sets a value with the given key. - /// - /// The key to get the value for. - /// The value for the given key, or null if not found. - public byte[]? Get(string key) - { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } + /// + public void Set(string key, ReadOnlySequence value, DistributedCacheEntryOptions options) => SetAsync(key, value, options).GetAwaiter().GetResult(); - return GetAndRefresh(key, getData: true); - } + /// + public async ValueTask SetAsync(string key, ReadOnlySequence value, DistributedCacheEntryOptions options, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(key); + ArgumentNullException.ThrowIfNull(options); + token.ThrowIfCancellationRequested(); - /// - /// Asynchronously gets or sets a value with the given key. - /// - /// The key to get the value for. - /// Optional. A to cancel the operation. - /// The value for the given key, or null if not found. - public async Task GetAsync(string key, CancellationToken token = default) - { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } + var array = value.IsSingleSegment ? value.First.ToArray() : value.ToArray(); + await SetAsync(key, array, options, token).ConfigureAwait(false); + } - token.ThrowIfCancellationRequested(); + /// + public void Remove(string key) => RemoveAsync(key, null, default).GetAwaiter().GetResult(); - return await GetAndRefreshAsync(key, getData: true, token: token).ConfigureAwait(false); - } + /// + public async Task RemoveAsync(string key, CancellationToken token = default) => await RemoveAsync(key, null, token).ConfigureAwait(false); - /// - /// Sets a value with the given key. - /// - /// The key to set the value for. - /// The value to set. - /// The cache options for the value. - public void Set(string key, byte[] value, DistributedCacheEntryOptions options) - { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } + /// + /// Removes the value with the given key. + /// + /// A string identifying the requested value. + /// Nats KV delete options + /// Optional. The used to propagate notifications that the operation should be canceled. + /// The that represents the asynchronous operation. + public async Task RemoveAsync(string key, NatsKVDeleteOpts? natsKVDeleteOpts = null, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(key); + token.ThrowIfCancellationRequested(); - if (value == null) - { - throw new ArgumentNullException(nameof(value)); - } + var kvStore = await GetKVStore().ConfigureAwait(false); + await kvStore.DeleteAsync(GetKeyPrefix(key), natsKVDeleteOpts, cancellationToken: token).ConfigureAwait(false); + } - if (options == null) - { - throw new ArgumentNullException(nameof(options)); - } + /// + public void Refresh(string key) => RefreshAsync(key).GetAwaiter().GetResult(); - var kvStore = GetKVStore().GetAwaiter().GetResult(); - var ttl = GetTTL(options); - var entry = CreateCacheEntry(key, value, options); + /// + public async Task RefreshAsync(string key, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(key); + token.ThrowIfCancellationRequested(); - try - { - if (ttl.HasValue) - { - kvStore.PutAsync(GetKeyPrefix(key), entry, ttl.Value, serializer: _cacheEntrySerializer, default).GetAwaiter() - .GetResult(); - } - else - { - kvStore.PutAsync(GetKeyPrefix(key), entry, serializer: _cacheEntrySerializer, default).GetAwaiter().GetResult(); - } - } - catch (Exception ex) - { - LogException(ex); - throw; - } - } + await GetAndRefreshAsync(key, getData: false, retry: true, token: token).ConfigureAwait(false); + } - /// - /// Asynchronously sets a value with the given key. - /// - /// The key to set the value for. - /// The value to set. - /// The cache options for the value. - /// Optional. A to cancel the operation. - /// A that represents the asynchronous set operation. - public async Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, - CancellationToken token = default) - { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } + /// + public byte[]? Get(string key) => GetAsync(key).GetAwaiter().GetResult(); - if (value == null) - { - throw new ArgumentNullException(nameof(value)); - } + /// + public async Task GetAsync(string key, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(key); + token.ThrowIfCancellationRequested(); - if (options == null) - { - throw new ArgumentNullException(nameof(options)); - } + return await GetAndRefreshAsync(key, getData: true, retry: true, token: token).ConfigureAwait(false); + } - token.ThrowIfCancellationRequested(); + /// + public bool TryGet(string key, IBufferWriter destination) => TryGetAsync(key, destination).GetAwaiter().GetResult(); - var kvStore = await GetKVStore().ConfigureAwait(false); - var ttl = GetTTL(options); - var entry = CreateCacheEntry(key, value, options); + /// + public async ValueTask TryGetAsync(string key, IBufferWriter destination, CancellationToken token = default) + { + ArgumentNullException.ThrowIfNull(key); + ArgumentNullException.ThrowIfNull(destination); + token.ThrowIfCancellationRequested(); - if (ttl.HasValue) - { - await kvStore.PutAsync(GetKeyPrefix(key), entry, ttl.Value, serializer: _cacheEntrySerializer, token) - .ConfigureAwait(false); - } - else + try + { + var result = await GetAsync(key, token).ConfigureAwait(false); + if (result != null) { - await kvStore.PutAsync(GetKeyPrefix(key), entry, serializer: _cacheEntrySerializer, token).ConfigureAwait(false); + destination.Write(result); + return true; } } - - /// - /// Refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any). - /// - /// The key to refresh. - public void Refresh(string key) + catch { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } - - GetAndRefresh(key, getData: false); + // Ignore failures here; they will surface later } - /// - /// Asynchronously refreshes a value in the cache based on its key, resetting its sliding expiration timeout (if any). - /// - /// The key to refresh. - /// Optional. A to cancel the operation. - /// A that represents the asynchronous refresh operation. - public async Task RefreshAsync(string key, CancellationToken token = default) - { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } + return false; + } - token.ThrowIfCancellationRequested(); + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + GC.SuppressFinalize(this); + } - await GetAndRefreshAsync(key, getData: false, token: token).ConfigureAwait(false); - } + // This is the method used by hybrid caching to determine if it should use the distributed instance + internal virtual bool IsHybridCacheActive() => false; - /// - /// Removes the value with the given key. - /// - /// The key to remove the value for. - public void Remove(string key) + protected virtual void Dispose(bool disposing) + { + if (_disposed) { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } - - GetKVStore().GetAwaiter().GetResult().DeleteAsync(GetKeyPrefix(key)).GetAwaiter().GetResult(); + return; } - /// - /// Asynchronously removes the value with the given key. - /// - /// The key to remove the value for. - /// Optional. A to cancel the operation. - /// A that represents the asynchronous remove operation. - public async Task RemoveAsync(string key, CancellationToken token = default) + if (disposing) { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } + // Dispose managed state (managed objects) + _connectionLock.Dispose(); + _kvStore = null; // Set to null to ensure we don't use it after dispose + } - token.ThrowIfCancellationRequested(); + // Free unmanaged resources (unmanaged objects) and override finalizer + _disposed = true; + } - var kvStore = await GetKVStore().ConfigureAwait(false); - await kvStore.DeleteAsync(GetKeyPrefix(key), cancellationToken: token).ConfigureAwait(false); + private static TimeSpan? GetTtl(DistributedCacheEntryOptions options) + { + if (options.AbsoluteExpiration.HasValue && options.AbsoluteExpiration.Value <= DateTimeOffset.Now) + { + throw new ArgumentOutOfRangeException( + nameof(DistributedCacheEntryOptions.AbsoluteExpiration), + options.AbsoluteExpiration.Value, + "The absolute expiration value must be in the future."); } - private async Task GetKVStore() + if (options.AbsoluteExpirationRelativeToNow.HasValue && + options.AbsoluteExpirationRelativeToNow.Value <= TimeSpan.Zero) { - if (_kvStore != null && !_disposed) - { - return _kvStore; - } - - await _connectionLock.WaitAsync().ConfigureAwait(false); - try - { - if (_kvStore == null || _disposed) - { - if (_disposed) - { - throw new ObjectDisposedException(nameof(NatsCache)); - } - - if (string.IsNullOrEmpty(_options.BucketName)) - { - throw new InvalidOperationException("BucketName is required and cannot be null or empty."); - } - - var jsContext = _natsConnection.CreateJetStreamContext(); - var kvContext = new NatsKVContext(jsContext); - _kvStore = (NatsKVStore)await kvContext.GetStoreAsync(_options.BucketName).ConfigureAwait(false); - if (_kvStore == null) - { - throw new InvalidOperationException("Failed to create NATS KV store"); - } - - LogConnected(); - } - } - catch (Exception ex) - { - LogException(ex); - throw; - } - finally - { - _connectionLock.Release(); - } - - return _kvStore; + throw new ArgumentOutOfRangeException( + nameof(DistributedCacheEntryOptions.AbsoluteExpirationRelativeToNow), + options.AbsoluteExpirationRelativeToNow.Value, + "The relative expiration value must be positive."); } - private static DistributedCacheEntryOptions GetExpirationOptions( - string? absoluteExpiration, - string? slidingExpiration) + if (options.SlidingExpiration.HasValue && options.SlidingExpiration.Value <= TimeSpan.Zero) { - var options = new DistributedCacheEntryOptions(); - if (absoluteExpiration != null) - { - options.AbsoluteExpiration = DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(absoluteExpiration)); - } - - if (slidingExpiration != null) - { - options.SlidingExpiration = TimeSpan.FromMilliseconds(long.Parse(slidingExpiration)); - } - - return options; + throw new ArgumentOutOfRangeException( + nameof(DistributedCacheEntryOptions.SlidingExpiration), + options.SlidingExpiration.Value, + "The sliding expiration value must be positive."); } - private TimeSpan? GetTTL(DistributedCacheEntryOptions options) + var absoluteExpiration = options.AbsoluteExpiration; + if (options.AbsoluteExpirationRelativeToNow.HasValue) { - if (options.AbsoluteExpiration.HasValue && options.AbsoluteExpiration.Value <= DateTimeOffset.Now) - { - throw new ArgumentOutOfRangeException( - nameof(DistributedCacheEntryOptions.AbsoluteExpiration), - options.AbsoluteExpiration.Value, - "The absolute expiration value must be in the future."); - } - - if (options.AbsoluteExpirationRelativeToNow.HasValue && - options.AbsoluteExpirationRelativeToNow.Value <= TimeSpan.Zero) - { - throw new ArgumentOutOfRangeException( - nameof(DistributedCacheEntryOptions.AbsoluteExpirationRelativeToNow), - options.AbsoluteExpirationRelativeToNow.Value, - "The relative expiration value must be positive."); - } - - if (options.SlidingExpiration.HasValue && options.SlidingExpiration.Value <= TimeSpan.Zero) - { - throw new ArgumentOutOfRangeException( - nameof(DistributedCacheEntryOptions.SlidingExpiration), - options.SlidingExpiration.Value, - "The sliding expiration value must be positive."); - } - - var absoluteExpiration = options.AbsoluteExpiration; - if (options.AbsoluteExpirationRelativeToNow.HasValue) - { - absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value); - } - - if (absoluteExpiration.HasValue) - { - var ttl = absoluteExpiration.Value - DateTimeOffset.Now; - if (ttl.TotalMilliseconds <= 0) - { - // Value is in the past, remove it - return TimeSpan.Zero; - } - - // If there's also a sliding expiration, use the minimum of the two - if (options.SlidingExpiration.HasValue) - { - return TimeSpan.FromTicks(Math.Min(ttl.Ticks, options.SlidingExpiration.Value.Ticks)); - } - - return ttl; - } + absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value); + } + if (!absoluteExpiration.HasValue) + { return options.SlidingExpiration; } - private CacheEntry CreateCacheEntry(string key, byte[] value, DistributedCacheEntryOptions options) + var ttl = absoluteExpiration.Value - DateTimeOffset.Now; + if (ttl.TotalMilliseconds <= 0) { - var absoluteExpiration = options.AbsoluteExpiration; - if (options.AbsoluteExpirationRelativeToNow.HasValue) - { - absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value); - } - - var cacheEntry = new CacheEntry { Data = value }; - - if (absoluteExpiration.HasValue) - { - cacheEntry.AbsoluteExpiration = absoluteExpiration.Value.ToUnixTimeMilliseconds().ToString(); - } + // Value is in the past, remove it + return TimeSpan.Zero; + } - if (options.SlidingExpiration.HasValue) - { - cacheEntry.SlidingExpiration = options.SlidingExpiration.Value.TotalMilliseconds.ToString(); - } + // If there's also a sliding expiration, use the minimum of the two + return options.SlidingExpiration.HasValue + ? TimeSpan.FromTicks(Math.Min(ttl.Ticks, options.SlidingExpiration.Value.Ticks)) + : ttl; + } - return cacheEntry; + private static CacheEntry CreateCacheEntry(byte[] value, DistributedCacheEntryOptions options) + { + var absoluteExpiration = options.AbsoluteExpiration; + if (options.AbsoluteExpirationRelativeToNow.HasValue) + { + absoluteExpiration = DateTimeOffset.Now.Add(options.AbsoluteExpirationRelativeToNow.Value); } - private byte[]? GetAndRefresh(string key, bool getData) - { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } + var cacheEntry = new CacheEntry { Data = value }; - return GetAndRefreshAsync(key, getData).GetAwaiter().GetResult(); + if (absoluteExpiration.HasValue) + { + cacheEntry.AbsoluteExpiration = absoluteExpiration.Value.ToUnixTimeMilliseconds().ToString(); } - private async Task GetAndRefreshAsync(string key, bool getData, CancellationToken token = default) + if (options.SlidingExpiration.HasValue) { - token.ThrowIfCancellationRequested(); - var kvStore = await GetKVStore().ConfigureAwait(false); - var prefixedKey = GetKeyPrefix(key); - try - { - var kvEntry = await kvStore.GetEntryAsync(prefixedKey, serializer: _cacheEntrySerializer, cancellationToken: token) - .ConfigureAwait(false); - - // Check if the value is null - if (kvEntry.Value == null) - { - return null; - } - - // Check absolute expiration - if (kvEntry.Value.AbsoluteExpiration != null) - { - var absoluteExpiration = - DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(kvEntry.Value.AbsoluteExpiration)); - if (absoluteExpiration <= DateTimeOffset.Now) - { - // NatsKVWrongLastRevisionException is caught below - await kvStore.DeleteAsync( - prefixedKey, - new NatsKVDeleteOpts { Revision = kvEntry.Revision }, - cancellationToken: token) - .ConfigureAwait(false); - - return null; - } - } - - // Refresh if sliding expiration exists - if (kvEntry.Value.SlidingExpiration != null) - { - var slidingExpirationMilliseconds = long.Parse(kvEntry.Value.SlidingExpiration); - var slidingExpiration = TimeSpan.FromMilliseconds(slidingExpirationMilliseconds); - - if (slidingExpiration > TimeSpan.Zero) - { - await UpdateEntryExpirationAsync(kvStore, prefixedKey, kvEntry, token) - .ConfigureAwait(false); - } - } - - return getData ? kvEntry.Value.Data : null; - } - catch (NatsKVWrongLastRevisionException) - { - // Optimistic concurrency control failed, someone else updated it - // That's fine, we just retry the get operation - LogUpdateFailed(key); + cacheEntry.SlidingExpiration = options.SlidingExpiration.Value.TotalMilliseconds.ToString(); + } - // Try once more to get the latest value - try - { - var kvEntry = await kvStore.GetEntryAsync(prefixedKey, serializer: _cacheEntrySerializer, cancellationToken: token) - .ConfigureAwait(false); + return cacheEntry; + } - // Check if the value is null - if (kvEntry.Value == null) - { - return null; - } + private string GetKeyPrefix(string key) => string.IsNullOrEmpty(_instanceName) + ? key + : _instanceName + ":" + key; - return getData ? kvEntry.Value.Data : null; - } - catch (Exception ex) - { - LogException(ex); - return null; - } - } - catch (Exception ex) - { - LogException(ex); - throw; - } + private async Task GetKVStore() + { + if (_kvStore != null && !_disposed) + { + return _kvStore; } - private async Task UpdateEntryExpirationAsync(NatsKVStore kvStore, string key, NatsKVEntry kvEntry, - CancellationToken token) + await _connectionLock.WaitAsync().ConfigureAwait(false); + try { - // Calculate new TTL based on sliding expiration - TimeSpan? ttl = null; - - // If we have a sliding expiration, use it as the TTL - if (kvEntry.Value.SlidingExpiration != null) + if (_kvStore == null || _disposed) { - ttl = TimeSpan.FromMilliseconds(long.Parse(kvEntry.Value.SlidingExpiration)); - - // If we also have an absolute expiration, make sure we don't exceed it - if (kvEntry.Value.AbsoluteExpiration != null) + ObjectDisposedException.ThrowIf(_disposed, this); + if (string.IsNullOrEmpty(_options.BucketName)) { - var absoluteExpiration = - DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(kvEntry.Value.AbsoluteExpiration)); - var remainingTime = absoluteExpiration - DateTimeOffset.Now; - - // Use the minimum of sliding window or remaining absolute time - if (remainingTime > TimeSpan.Zero && remainingTime < ttl) - { - ttl = remainingTime; - } + throw new InvalidOperationException("BucketName is required and cannot be null or empty."); } - } - if (ttl.HasValue && ttl.Value > TimeSpan.Zero) - { - // Use optimistic concurrency control with the last revision - try - { - await kvStore.UpdateAsync(key, kvEntry.Value, kvEntry.Revision, ttl.Value, serializer: _cacheEntrySerializer, cancellationToken: token) - .ConfigureAwait(false); - } - catch (NatsKVWrongLastRevisionException) + var jsContext = _natsConnection.CreateJetStreamContext(); + var kvContext = new NatsKVContext(jsContext); + _kvStore = (NatsKVStore)await kvContext.GetStoreAsync(_options.BucketName).ConfigureAwait(false); + if (_kvStore == null) { - // Someone else updated it; that's fine, we'll get the latest version next time - LogUpdateFailed(key.Replace(GetKeyPrefix(string.Empty), string.Empty)); + throw new InvalidOperationException("Failed to create NATS KV store"); } - } - } - - public void Dispose() - { - if (!_disposed) - { - _connectionLock.Dispose(); - _disposed = true; - - // Set to null to ensure we don't use it after dispose - _kvStore = null; - } - } - /// - public bool TryGetValue(string key, out ReadOnlySequence sequence) - { - sequence = ReadOnlySequence.Empty; - - try - { - var result = Get(key); - if (result != null) - { - sequence = new ReadOnlySequence(result); - return true; - } - } - catch - { - // Ignore failures here; they will surface later + LogConnected(); } - - return false; } - - /// - public async ValueTask TryGetValueAsync(string key, Memory destination, - CancellationToken token = default) + catch (Exception ex) { - try - { - var result = await GetAsync(key, token).ConfigureAwait(false); - if (result != null) - { - if (result.Length <= destination.Length) - { - result.CopyTo(destination); - return true; - } - } - } - catch - { - // Ignore failures here; they will surface later - } - - return false; + LogException(ex); + throw; } - - /// - public async ValueTask GetAsync(string key, Stream destination, CancellationToken token = default) + finally { - try - { - var result = await GetAsync(key, token).ConfigureAwait(false); - if (result != null) - { - await destination.WriteAsync(result, token).ConfigureAwait(false); - return true; - } - } - catch - { - // Ignore failures here; they will surface later - } - - return false; + _connectionLock.Release(); } - /// - public async ValueTask GetAndRefreshAsync(string key, Stream destination, bool getData, - CancellationToken token = default) - { - try - { - var result = await GetAndRefreshAsync(key, getData, token).ConfigureAwait(false); - if (result != null) - { - await destination.WriteAsync(result, token).ConfigureAwait(false); - return true; - } - } - catch - { - // Ignore failures here; they will surface later - } + return _kvStore; + } - return false; - } + private async Task GetAndRefreshAsync(string key, bool getData, bool retry, CancellationToken token = default) + { + token.ThrowIfCancellationRequested(); - /// - public bool TryGet(string key, IBufferWriter destination) + var kvStore = await GetKVStore().ConfigureAwait(false); + var prefixedKey = GetKeyPrefix(key); + try { - if (key == null) + var natsResult = await kvStore.TryGetEntryAsync(prefixedKey, serializer: _cacheEntrySerializer, cancellationToken: token).ConfigureAwait(false); + if (!natsResult.Success) { - throw new ArgumentNullException(nameof(key)); + return null; } - if (destination == null) + var kvEntry = natsResult.Value; + if (kvEntry.Value == null) { - throw new ArgumentNullException(nameof(destination)); + return null; } - try + // Check absolute expiration + if (kvEntry.Value.AbsoluteExpiration != null + && DateTimeOffset.Now > DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(kvEntry.Value.AbsoluteExpiration))) { - var result = Get(key); - if (result != null) - { - destination.Write(result); - return true; - } - } - catch - { - // Ignore failures here; they will surface later + // NatsKVWrongLastRevisionException is caught below + var natsDeleteOpts = new NatsKVDeleteOpts { Revision = kvEntry.Revision }; + await RemoveAsync(key, natsDeleteOpts, token).ConfigureAwait(false); + return null; } - return false; + await UpdateEntryExpirationAsync(kvStore, prefixedKey, kvEntry, token).ConfigureAwait(false); + return getData ? kvEntry.Value.Data : null; } - - /// - public async ValueTask TryGetAsync(string key, IBufferWriter destination, - CancellationToken token = default) + catch (NatsKVWrongLastRevisionException ex) { - if (key == null) + // Optimistic concurrency control failed, someone else updated it + LogUpdateFailed(key); + if (retry) { - throw new ArgumentNullException(nameof(key)); + return await GetAndRefreshAsync(key, getData, retry: false, token).ConfigureAwait(false); } - - if (destination == null) - { - throw new ArgumentNullException(nameof(destination)); - } - - token.ThrowIfCancellationRequested(); - - try - { - var result = await GetAsync(key, token).ConfigureAwait(false); - if (result != null) - { - destination.Write(result); - return true; - } - } - catch + else { - // Ignore failures here; they will surface later + LogException(ex); + return null; } - - return false; } + catch (Exception ex) + { + LogException(ex); + throw; + } + } - /// - public void Set(string key, ReadOnlySequence value, DistributedCacheEntryOptions options) + private async Task UpdateEntryExpirationAsync(NatsKVStore kvStore, string key, NatsKVEntry kvEntry, CancellationToken token) + { + if (kvEntry.Value?.SlidingExpiration == null) { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } + return; + } - if (options == null) - { - throw new ArgumentNullException(nameof(options)); - } + // If we have a sliding expiration, use it as the TTL + var ttl = TimeSpan.FromMilliseconds(long.Parse(kvEntry.Value.SlidingExpiration)); - byte[] array; + // If we also have an absolute expiration, make sure we don't exceed it + if (kvEntry.Value.AbsoluteExpiration != null) + { + var absoluteExpiration = + DateTimeOffset.FromUnixTimeMilliseconds(long.Parse(kvEntry.Value.AbsoluteExpiration)); + var remainingTime = absoluteExpiration - DateTimeOffset.Now; - if (value.IsSingleSegment) - { - array = value.First.ToArray(); - } - else + // Use the minimum of sliding window or remaining absolute time + if (remainingTime > TimeSpan.Zero && remainingTime < ttl) { - array = value.ToArray(); + ttl = remainingTime; } - - Set(key, array, options); } - /// - public async ValueTask SetAsync(string key, ReadOnlySequence value, DistributedCacheEntryOptions options, - CancellationToken token = default) + if (ttl > TimeSpan.Zero) { - if (key == null) - { - throw new ArgumentNullException(nameof(key)); - } - - if (options == null) - { - throw new ArgumentNullException(nameof(options)); - } - - token.ThrowIfCancellationRequested(); - - byte[] array; - - if (value.IsSingleSegment) + // Use optimistic concurrency control with the last revision + try { - array = value.First.ToArray(); + await kvStore.UpdateAsync(key, kvEntry.Value, kvEntry.Revision, ttl, serializer: _cacheEntrySerializer, cancellationToken: token).ConfigureAwait(false); } - else + catch (NatsKVWrongLastRevisionException) { - array = value.ToArray(); + // Someone else updated it; that's fine, we'll get the latest version next time + LogUpdateFailed(key.Replace(GetKeyPrefix(string.Empty), string.Empty)); } - - await SetAsync(key, array, options, token).ConfigureAwait(false); } } } + +[JsonSerializable(typeof(CacheEntry))] +internal partial class CacheEntryJsonContext : JsonSerializerContext +{ +} diff --git a/src/CodeCargo.NatsDistributedCache/NatsCacheImpl.cs b/src/CodeCargo.NatsDistributedCache/NatsCacheImpl.cs index 23a9b83..ec1d3d5 100644 --- a/src/CodeCargo.NatsDistributedCache/NatsCacheImpl.cs +++ b/src/CodeCargo.NatsDistributedCache/NatsCacheImpl.cs @@ -1,7 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System; using Microsoft.Extensions.Caching.Hybrid; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -14,9 +13,6 @@ internal sealed class NatsCacheImpl : NatsCache { private readonly IServiceProvider _services; - internal override bool IsHybridCacheActive() - => _services.GetService() is not null; - public NatsCacheImpl(IOptions optionsAccessor, ILogger logger, IServiceProvider services, INatsConnection natsConnection) : base(optionsAccessor, logger, natsConnection) { @@ -28,5 +24,8 @@ public NatsCacheImpl(IOptions optionsAccessor, IServiceProvide { _services = services; // important: do not check for HybridCache here due to dependency - creates a cycle } + + internal override bool IsHybridCacheActive() + => _services.GetService() is not null; } } diff --git a/src/CodeCargo.NatsDistributedCache/NatsCacheOptions.cs b/src/CodeCargo.NatsDistributedCache/NatsCacheOptions.cs index 99e7b91..d481be9 100644 --- a/src/CodeCargo.NatsDistributedCache/NatsCacheOptions.cs +++ b/src/CodeCargo.NatsDistributedCache/NatsCacheOptions.cs @@ -1,7 +1,6 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System; using Microsoft.Extensions.Options; namespace CodeCargo.NatsDistributedCache @@ -22,9 +21,6 @@ public class NatsCacheOptions : IOptions /// public string? InstanceName { get; set; } - NatsCacheOptions IOptions.Value - { - get { return this; } - } + NatsCacheOptions IOptions.Value => this; } } diff --git a/src/CodeCargo.NatsDistributedCache/NatsCacheServiceCollectionExtensions.cs b/src/CodeCargo.NatsDistributedCache/NatsCacheServiceCollectionExtensions.cs index 485d2d0..0ed59e0 100644 --- a/src/CodeCargo.NatsDistributedCache/NatsCacheServiceCollectionExtensions.cs +++ b/src/CodeCargo.NatsDistributedCache/NatsCacheServiceCollectionExtensions.cs @@ -1,10 +1,8 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. -using System; using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NATS.Client.Core; @@ -25,15 +23,8 @@ public static class NatsCacheServiceCollectionExtensions /// The so that additional calls can be chained. public static IServiceCollection AddNatsDistributedCache(this IServiceCollection services, Action setupAction) { - if (services == null) - { - throw new ArgumentNullException(nameof(services)); - } - - if (setupAction == null) - { - throw new ArgumentNullException(nameof(setupAction)); - } + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(setupAction); services.AddOptions(); services.Configure(setupAction); diff --git a/test/IntegrationTests/CacheServiceExtensionsTests.cs b/test/IntegrationTests/CacheServiceExtensionsTests.cs index 1a0e66d..90a22e9 100644 --- a/test/IntegrationTests/CacheServiceExtensionsTests.cs +++ b/test/IntegrationTests/CacheServiceExtensionsTests.cs @@ -1,12 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using Xunit; namespace CodeCargo.NatsDistributedCache.IntegrationTests; diff --git a/test/IntegrationTests/NatsCacheSetAndRemoveTests.cs b/test/IntegrationTests/NatsCacheSetAndRemoveTests.cs index a78184f..3132767 100644 --- a/test/IntegrationTests/NatsCacheSetAndRemoveTests.cs +++ b/test/IntegrationTests/NatsCacheSetAndRemoveTests.cs @@ -1,11 +1,6 @@ -using System; using System.Runtime.CompilerServices; using System.Text; -using System.Threading.Tasks; -using CodeCargo.NatsDistributedCache.IntegrationTests; using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.Options; -using Xunit; namespace CodeCargo.NatsDistributedCache.IntegrationTests; @@ -17,16 +12,6 @@ public NatsCacheSetAndRemoveTests(NatsIntegrationFixture fixture) { } - private IDistributedCache CreateCacheInstance() - { - return new NatsCache( - Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions - { - BucketName = "cache" - }), - NatsConnection); - } - [Fact] public void GetMissingKeyReturnsNull() { @@ -114,6 +99,7 @@ public void SetGetNonNullString(string payload) // check raw bytes var raw = cache.Get(key); + Assert.NotNull(raw); Assert.Equal(Hex(payload), Hex(raw)); // check via string API @@ -137,6 +123,7 @@ public async Task SetGetNonNullStringAsync(string payload) // check raw bytes var raw = await cache.GetAsync(key); + Assert.NotNull(raw); Assert.Equal(Hex(payload), Hex(raw)); // check via string API @@ -150,4 +137,14 @@ public async Task SetGetNonNullStringAsync(string payload) private static string Hex(string value) => Hex(Encoding.UTF8.GetBytes(value)); private static string Me([CallerMemberName] string caller = "") => caller; + + private IDistributedCache CreateCacheInstance() + { + return new NatsCache( + Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions + { + BucketName = "cache" + }), + NatsConnection); + } } diff --git a/test/IntegrationTests/NatsCollection.cs b/test/IntegrationTests/NatsCollection.cs index 0408380..2ff557c 100644 --- a/test/IntegrationTests/NatsCollection.cs +++ b/test/IntegrationTests/NatsCollection.cs @@ -1,5 +1,3 @@ -using Xunit; - namespace CodeCargo.NatsDistributedCache.IntegrationTests; [CollectionDefinition(Name)] diff --git a/test/IntegrationTests/TestBase.cs b/test/IntegrationTests/TestBase.cs index 3aa8bf6..28c51ca 100644 --- a/test/IntegrationTests/TestBase.cs +++ b/test/IntegrationTests/TestBase.cs @@ -1,8 +1,6 @@ using CodeCargo.NatsDistributedCache.TestUtils.Services.Logging; using Microsoft.Extensions.Logging; using NATS.Client.Core; -using NATS.Client.KeyValueStore; -using NATS.Net; namespace CodeCargo.NatsDistributedCache.IntegrationTests; @@ -21,11 +19,7 @@ protected TestBase(NatsIntegrationFixture fixture) { // Get the test output helper from the current test context var testContext = TestContext.Current; - var output = testContext.TestOutputHelper; - if (output == null) - { - throw new InvalidOperationException("TestOutputHelper was not available in the current test context"); - } + var output = testContext.TestOutputHelper ?? throw new InvalidOperationException("TestOutputHelper was not available in the current test context"); // Create a service collection and configure logging var services = new ServiceCollection(); diff --git a/test/IntegrationTests/TestHelpers/ExceptionAssert.cs b/test/IntegrationTests/TestHelpers/ExceptionAssert.cs index 1734381..563026a 100644 --- a/test/IntegrationTests/TestHelpers/ExceptionAssert.cs +++ b/test/IntegrationTests/TestHelpers/ExceptionAssert.cs @@ -1,6 +1,3 @@ -using System; -using Xunit; - namespace CodeCargo.NatsDistributedCache.IntegrationTests.TestHelpers; public static class ExceptionAssert diff --git a/test/IntegrationTests/TimeExpirationAsyncTests.cs b/test/IntegrationTests/TimeExpirationAsyncTests.cs index 91871ef..e6c455f 100644 --- a/test/IntegrationTests/TimeExpirationAsyncTests.cs +++ b/test/IntegrationTests/TimeExpirationAsyncTests.cs @@ -1,9 +1,5 @@ -using System; using System.Runtime.CompilerServices; -using System.Threading.Tasks; using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.Options; -using Xunit; namespace CodeCargo.NatsDistributedCache.IntegrationTests; @@ -15,36 +11,6 @@ public TimeExpirationAsyncTests(NatsIntegrationFixture fixture) { } - private IDistributedCache CreateCacheInstance() - { - return new NatsCache( - Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions - { - BucketName = "cache" - }), - NatsConnection); - } - - // async twin to ExceptionAssert.ThrowsArgumentOutOfRange - private static async Task ThrowsArgumentOutOfRangeAsync(Func test, string paramName, string message, object actualValue) - { - var ex = await Assert.ThrowsAsync(test); - if (paramName is not null) - { - Assert.Equal(paramName, ex.ParamName); - } - - if (message is not null) - { - Assert.StartsWith(message, ex.Message); // can have "\r\nParameter name:" etc - } - - if (actualValue is not null) - { - Assert.Equal(actualValue, ex.ActualValue); - } - } - // AbsoluteExpirationInThePastThrowsAsync test moved to UnitTests/TimeExpirationAsyncUnitTests.cs [Fact] public async Task AbsoluteExpirationExpiresAsync() @@ -53,7 +19,7 @@ public async Task AbsoluteExpirationExpiresAsync() var key = await GetNameAndReset(cache); var value = new byte[1]; - await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1))); + await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1.1))); var result = await cache.GetAsync(key); Assert.Equal(value, result); @@ -67,19 +33,6 @@ public async Task AbsoluteExpirationExpiresAsync() Assert.Null(result); } - [Fact] - public async Task AbsoluteSubSecondExpirationExpiresImmediatelyAsync() - { - var cache = CreateCacheInstance(); - var key = await GetNameAndReset(cache); - var value = new byte[1]; - - await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(0.25))); - - var result = await cache.GetAsync(key); - Assert.Null(result); - } - // NegativeRelativeExpirationThrowsAsync test moved to UnitTests/TimeExpirationAsyncUnitTests.cs // ZeroRelativeExpirationThrowsAsync test moved to UnitTests/TimeExpirationAsyncUnitTests.cs @@ -90,7 +43,7 @@ public async Task RelativeExpirationExpiresAsync() var key = await GetNameAndReset(cache); var value = new byte[1]; - await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1))); + await cache.SetAsync(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1.1))); var result = await cache.GetAsync(key); Assert.Equal(value, result); @@ -160,14 +113,14 @@ public async Task SlidingExpirationRenewedByAccessUntilAbsoluteExpirationAsync() var value = new byte[1]; await cache.SetAsync(key, value, new DistributedCacheEntryOptions() - .SetSlidingExpiration(TimeSpan.FromSeconds(1)) - .SetAbsoluteExpiration(TimeSpan.FromSeconds(3))); + .SetSlidingExpiration(TimeSpan.FromSeconds(1.1)) + .SetAbsoluteExpiration(TimeSpan.FromSeconds(4))); var setTime = DateTime.Now; var result = await cache.GetAsync(key); Assert.Equal(value, result); - for (var i = 0; i < 5; i++) + for (var i = 0; i < 4; i++) { await Task.Delay(TimeSpan.FromSeconds(0.5)); @@ -190,4 +143,34 @@ private static async Task GetNameAndReset(IDistributedCache cache, [Call await cache.RemoveAsync(caller); return caller; } + + // async twin to ExceptionAssert.ThrowsArgumentOutOfRange + private static async Task ThrowsArgumentOutOfRangeAsync(Func test, string paramName, string message, object actualValue) + { + var ex = await Assert.ThrowsAsync(test); + if (paramName is not null) + { + Assert.Equal(paramName, ex.ParamName); + } + + if (message is not null) + { + Assert.StartsWith(message, ex.Message); // can have "\r\nParameter name:" etc + } + + if (actualValue is not null) + { + Assert.Equal(actualValue, ex.ActualValue); + } + } + + private IDistributedCache CreateCacheInstance() + { + return new NatsCache( + Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions + { + BucketName = "cache" + }), + NatsConnection); + } } diff --git a/test/IntegrationTests/TimeExpirationTests.cs b/test/IntegrationTests/TimeExpirationTests.cs index 90e7365..16e8cc9 100644 --- a/test/IntegrationTests/TimeExpirationTests.cs +++ b/test/IntegrationTests/TimeExpirationTests.cs @@ -1,10 +1,5 @@ -using System; using System.Runtime.CompilerServices; -using System.Threading; -using CodeCargo.NatsDistributedCache.IntegrationTests.TestHelpers; using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.Options; -using Xunit; namespace CodeCargo.NatsDistributedCache.IntegrationTests; @@ -16,16 +11,6 @@ public TimeExpirationTests(NatsIntegrationFixture fixture) { } - private IDistributedCache CreateCacheInstance() - { - return new NatsCache( - Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions - { - BucketName = "cache" - }), - NatsConnection); - } - // AbsoluteExpirationInThePastThrows test moved to UnitTests/TimeExpirationUnitTests.cs [Fact] public void AbsoluteExpirationExpires() @@ -34,7 +19,7 @@ public void AbsoluteExpirationExpires() var key = GetNameAndReset(cache); var value = new byte[1]; - cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1))); + cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1.1))); var result = cache.Get(key); Assert.Equal(value, result); @@ -48,19 +33,6 @@ public void AbsoluteExpirationExpires() Assert.Null(result); } - [Fact] - public void AbsoluteSubSecondExpirationExpiresImmediately() - { - var cache = CreateCacheInstance(); - var key = GetNameAndReset(cache); - var value = new byte[1]; - - cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(0.25))); - - var result = cache.Get(key); - Assert.Null(result); - } - // NegativeRelativeExpirationThrows test moved to UnitTests/TimeExpirationUnitTests.cs // ZeroRelativeExpirationThrows test moved to UnitTests/TimeExpirationUnitTests.cs @@ -71,7 +43,7 @@ public void RelativeExpirationExpires() var key = GetNameAndReset(cache); var value = new byte[1]; - cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1))); + cache.Set(key, value, new DistributedCacheEntryOptions().SetAbsoluteExpiration(TimeSpan.FromSeconds(1.1))); var result = cache.Get(key); Assert.Equal(value, result); @@ -141,14 +113,14 @@ public void SlidingExpirationRenewedByAccessUntilAbsoluteExpiration() var value = new byte[1]; cache.Set(key, value, new DistributedCacheEntryOptions() - .SetSlidingExpiration(TimeSpan.FromSeconds(1)) - .SetAbsoluteExpiration(TimeSpan.FromSeconds(3))); + .SetSlidingExpiration(TimeSpan.FromSeconds(1.1)) + .SetAbsoluteExpiration(TimeSpan.FromSeconds(4))); var setTime = DateTime.Now; var result = cache.Get(key); Assert.Equal(value, result); - for (var i = 0; i < 5; i++) + for (var i = 0; i < 4; i++) { Thread.Sleep(TimeSpan.FromSeconds(0.5)); @@ -171,4 +143,14 @@ private static string GetNameAndReset(IDistributedCache cache, [CallerMemberName cache.Remove(caller); return caller; } + + private IDistributedCache CreateCacheInstance() + { + return new NatsCache( + Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions + { + BucketName = "cache" + }), + NatsConnection); + } } diff --git a/test/UnitTests/CacheServiceExtensionsUnitTests.cs b/test/UnitTests/CacheServiceExtensionsUnitTests.cs index 747418f..aaeba90 100644 --- a/test/UnitTests/CacheServiceExtensionsUnitTests.cs +++ b/test/UnitTests/CacheServiceExtensionsUnitTests.cs @@ -1,13 +1,8 @@ -using System; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Moq; using NATS.Client.Core; -using Xunit; namespace CodeCargo.NatsDistributedCache.UnitTests; @@ -102,6 +97,10 @@ public void AddNatsCache_UsesCacheOptionsAction() wasInvoked = true; }); + // Build service provider and resolve options to trigger the setup action + var sp = services.BuildServiceProvider(); + _ = sp.GetRequiredService(); + // Assert Assert.True(wasInvoked); } diff --git a/test/UnitTests/NatsCacheSetAndRemoveUnitTests.cs b/test/UnitTests/NatsCacheSetAndRemoveUnitTests.cs index 93fb804..ae4bddc 100644 --- a/test/UnitTests/NatsCacheSetAndRemoveUnitTests.cs +++ b/test/UnitTests/NatsCacheSetAndRemoveUnitTests.cs @@ -1,11 +1,6 @@ -using System; -using System.Threading; -using System.Threading.Tasks; using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.Options; using Moq; using NATS.Client.Core; -using Xunit; namespace CodeCargo.NatsDistributedCache.UnitTests; @@ -18,6 +13,16 @@ public NatsCacheSetAndRemoveUnitTests() _mockNatsConnection = new Mock(); } + [Fact] + public void SetNullValueThrows() + { + var cache = CreateCacheInstance(); + byte[] value = null!; + var key = "myKey"; + + Assert.Throws(() => cache.Set(key, value)); + } + private IDistributedCache CreateCacheInstance() { return new NatsCache( @@ -27,14 +32,4 @@ private IDistributedCache CreateCacheInstance() }), _mockNatsConnection.Object); } - - [Fact] - public void SetNullValueThrows() - { - var cache = CreateCacheInstance(); - byte[] value = null; - var key = "myKey"; - - Assert.Throws(() => cache.Set(key, value)); - } } diff --git a/test/UnitTests/TestHelpers/ExceptionAssert.cs b/test/UnitTests/TestHelpers/ExceptionAssert.cs index fb2acda..0cbe44a 100644 --- a/test/UnitTests/TestHelpers/ExceptionAssert.cs +++ b/test/UnitTests/TestHelpers/ExceptionAssert.cs @@ -1,6 +1,3 @@ -using System; -using Xunit; - namespace CodeCargo.NatsDistributedCache.UnitTests.TestHelpers; public static class ExceptionAssert diff --git a/test/UnitTests/TimeExpirationAsyncUnitTests.cs b/test/UnitTests/TimeExpirationAsyncUnitTests.cs index a649406..befc06a 100644 --- a/test/UnitTests/TimeExpirationAsyncUnitTests.cs +++ b/test/UnitTests/TimeExpirationAsyncUnitTests.cs @@ -1,11 +1,6 @@ -using System; -using System.Runtime.CompilerServices; -using System.Threading.Tasks; using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.Options; using Moq; using NATS.Client.Core; -using Xunit; namespace CodeCargo.NatsDistributedCache.UnitTests; @@ -18,36 +13,6 @@ public TimeExpirationAsyncUnitTests() _mockNatsConnection = new Mock(); } - private IDistributedCache CreateCacheInstance() - { - return new NatsCache( - Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions - { - BucketName = "cache" - }), - _mockNatsConnection.Object); - } - - // async twin to ExceptionAssert.ThrowsArgumentOutOfRange - private static async Task ThrowsArgumentOutOfRangeAsync(Func test, string paramName, string message, object actualValue) - { - var ex = await Assert.ThrowsAsync(test); - if (paramName is not null) - { - Assert.Equal(paramName, ex.ParamName); - } - - if (message is not null) - { - Assert.StartsWith(message, ex.Message); // can have "\r\nParameter name:" etc - } - - if (actualValue is not null) - { - Assert.Equal(actualValue, ex.ActualValue); - } - } - [Fact] public async Task AbsoluteExpirationInThePastThrowsAsync() { @@ -133,4 +98,34 @@ await ThrowsArgumentOutOfRangeAsync( "The sliding expiration value must be positive.", TimeSpan.Zero); } + + // async twin to ExceptionAssert.ThrowsArgumentOutOfRange + private static async Task ThrowsArgumentOutOfRangeAsync(Func test, string paramName, string message, object actualValue) + { + var ex = await Assert.ThrowsAsync(test); + if (paramName is not null) + { + Assert.Equal(paramName, ex.ParamName); + } + + if (message is not null) + { + Assert.StartsWith(message, ex.Message); // can have "\r\nParameter name:" etc + } + + if (actualValue is not null) + { + Assert.Equal(actualValue, ex.ActualValue); + } + } + + private IDistributedCache CreateCacheInstance() + { + return new NatsCache( + Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions + { + BucketName = "cache" + }), + _mockNatsConnection.Object); + } } diff --git a/test/UnitTests/TimeExpirationUnitTests.cs b/test/UnitTests/TimeExpirationUnitTests.cs index 705e1c6..bdbaa17 100644 --- a/test/UnitTests/TimeExpirationUnitTests.cs +++ b/test/UnitTests/TimeExpirationUnitTests.cs @@ -1,12 +1,8 @@ -using System; -using System.Runtime.CompilerServices; -using System.Threading; using CodeCargo.NatsDistributedCache.UnitTests.TestHelpers; using Microsoft.Extensions.Caching.Distributed; -using Microsoft.Extensions.Options; +using Microsoft.Extensions.Logging; using Moq; using NATS.Client.Core; -using Xunit; namespace CodeCargo.NatsDistributedCache.UnitTests; @@ -17,16 +13,12 @@ public class TimeExpirationUnitTests public TimeExpirationUnitTests() { _mockNatsConnection = new Mock(); - } - private IDistributedCache CreateCacheInstance() - { - return new NatsCache( - Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions - { - BucketName = "cache" - }), - _mockNatsConnection.Object); + // Setup the mock to properly handle the Opts property + var opts = new NatsOpts { LoggerFactory = new LoggerFactory() }; + _mockNatsConnection.SetupGet(m => m.Opts).Returns(opts); + var connection = new NatsConnection(opts); + _mockNatsConnection.SetupGet(m => m.Connection).Returns(connection); } [Fact] @@ -114,4 +106,14 @@ public void ZeroSlidingExpirationThrows() "The sliding expiration value must be positive.", TimeSpan.Zero); } + + private IDistributedCache CreateCacheInstance() + { + return new NatsCache( + Microsoft.Extensions.Options.Options.Create(new NatsCacheOptions + { + BucketName = "cache" + }), + _mockNatsConnection.Object); + } } diff --git a/util/PerfTest/PerfTest.cs b/util/PerfTest/PerfTest.cs index b2eb186..4439973 100644 --- a/util/PerfTest/PerfTest.cs +++ b/util/PerfTest/PerfTest.cs @@ -35,7 +35,7 @@ public async Task Run(CancellationToken cancellationToken) await Task.WhenAll(printTask, watchTask); } - private Task StartPrintTask(CancellationToken ct) => + private static Task StartWatchTask(CancellationToken ct) => Task.Run( async () => { @@ -43,18 +43,10 @@ private Task StartPrintTask(CancellationToken ct) => { while (!ct.IsCancellationRequested) { - // Clear - Console.Clear(); - - // Print current statistics - Console.WriteLine("======== Per Stats ========"); - Console.WriteLine($"Keys Inserted: {_stats["KeysInserted"]}"); - Console.WriteLine($"Keys Retrieved: {_stats["KeysRetrieved"]}"); - Console.WriteLine($"Keys Expired: {_stats["KeysExpired"]}"); - Console.WriteLine("==========================="); + // TODO: Implement watching for cache operations and updating stats - // Wait before printing again - await Task.Delay(TimeSpan.FromSeconds(1), ct); + // Wait before checking again + await Task.Delay(100, ct); } } catch (OperationCanceledException) @@ -64,7 +56,7 @@ private Task StartPrintTask(CancellationToken ct) => }, ct); - private static Task StartWatchTask(CancellationToken ct) => + private Task StartPrintTask(CancellationToken ct) => Task.Run( async () => { @@ -72,10 +64,18 @@ private static Task StartWatchTask(CancellationToken ct) => { while (!ct.IsCancellationRequested) { - // TODO: Implement watching for cache operations and updating stats + // Clear + Console.Clear(); - // Wait before checking again - await Task.Delay(100, ct); + // Print current statistics + Console.WriteLine("======== Per Stats ========"); + Console.WriteLine($"Keys Inserted: {_stats["KeysInserted"]}"); + Console.WriteLine($"Keys Retrieved: {_stats["KeysRetrieved"]}"); + Console.WriteLine($"Keys Expired: {_stats["KeysExpired"]}"); + Console.WriteLine("==========================="); + + // Wait before printing again + await Task.Delay(TimeSpan.FromSeconds(1), ct); } } catch (OperationCanceledException)