From fc3a018aa4734d1b5744a04cb586ba1793683270 Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Thu, 11 Dec 2025 15:38:48 +0000 Subject: [PATCH 1/2] Add Put with TTL This is to prepare for new NATS .NET 2.7 release where this feature is removed because it is problematic on buckets with history >1. --- src/NatsDistributedCache/NatsCache.cs | 2 +- src/NatsDistributedCache/NatsExtensions.cs | 131 ++++++++++++ .../NatsExtensionsPutWithTtlTests.cs | 197 ++++++++++++++++++ 3 files changed, 329 insertions(+), 1 deletion(-) create mode 100644 src/NatsDistributedCache/NatsExtensions.cs create mode 100644 test/IntegrationTests/Extensions/NatsExtensionsPutWithTtlTests.cs diff --git a/src/NatsDistributedCache/NatsCache.cs b/src/NatsDistributedCache/NatsCache.cs index d737b34..080adbc 100644 --- a/src/NatsDistributedCache/NatsCache.cs +++ b/src/NatsDistributedCache/NatsCache.cs @@ -87,7 +87,7 @@ public async Task SetAsync( { // todo: remove cast after https://github.com/nats-io/nats.net/pull/852 is released await ((NatsKVStore)kvStore) - .PutAsync(GetEncodedKey(key), entry, ttl ?? TimeSpan.Zero, CacheEntrySerializer, token) + .PutWithTtlAsync(GetEncodedKey(key), entry, ttl ?? TimeSpan.Zero, CacheEntrySerializer, token) .ConfigureAwait(false); } catch (Exception ex) diff --git a/src/NatsDistributedCache/NatsExtensions.cs b/src/NatsDistributedCache/NatsExtensions.cs new file mode 100644 index 0000000..9af4557 --- /dev/null +++ b/src/NatsDistributedCache/NatsExtensions.cs @@ -0,0 +1,131 @@ +using System.Runtime.CompilerServices; +using System.Text.RegularExpressions; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.KeyValueStore; + +namespace CodeCargo.Nats.DistributedCache; + +public static class NatsExtensions +{ + private const string NatsTtl = "Nats-TTL"; + private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); + private static readonly NatsKVException KeyCannotBeEmptyException = new("Key cannot be empty"); + private static readonly NatsKVException KeyCannotStartOrEndWithPeriodException = new("Key cannot start or end with a period"); + private static readonly NatsKVException KeyContainsInvalidCharactersException = new("Key contains invalid characters"); + + /// + /// Put a value into the bucket using the key + /// + /// NATS key-value store instance + /// Key of the entry + /// Value of the entry + /// Time-to-live value + /// Serializer to use for the message type. + /// A used to cancel the API call. + /// Serialized value type + /// Revision number + /// + /// TTLs should only be used when the store is configured with a storage type that supports expiration, + /// and with history set to 1. Otherwise, the TTL behavior is undefined. + /// History is set to 1 by default, so you should be fine unless you changed it explicitly. + /// + public static async ValueTask PutWithTtlAsync(this INatsKVStore store, string key, T value, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) + { + var result = await TryPutWithTtlAsync(store, key, value, ttl, serializer, cancellationToken); + if (!result.Success) + { + ThrowException(result.Error); + } + + return result.Value; + } + + /// + /// Put a value into the bucket using the key + /// + /// NATS key-value store instance + /// Key of the entry + /// Value of the entry + /// Time-to-live value + /// Serializer to use for the message type. + /// A used to cancel the API call. + /// Serialized value type + /// Revision number + /// + /// TTLs should only be used when the store is configured with a storage type that supports expiration, + /// and with history set to 1. Otherwise, the TTL behavior is undefined. + /// History is set to 1 by default, so you should be fine unless you changed it explicitly. + /// + public static async ValueTask> TryPutWithTtlAsync(this INatsKVStore store, string key, T value, TimeSpan ttl = default, INatsSerialize? serializer = default, CancellationToken cancellationToken = default) + { + var keyValidResult = TryValidateKey(key); + if (!keyValidResult.Success) + { + return keyValidResult.Error; + } + + NatsHeaders? headers = default; + if (ttl != default) + { + headers = new NatsHeaders + { + { NatsTtl, ToTtlString(ttl) }, + }; + } + + var publishResult = await store.JetStreamContext.TryPublishAsync($"$KV.{store.Bucket}.{key}", value, serializer: serializer, headers: headers, cancellationToken: cancellationToken); + if (publishResult.Success) + { + var ack = publishResult.Value; + if (ack.Error != null) + { + return new NatsJSApiException(ack.Error); + } + else if (ack.Duplicate) + { + return new NatsJSDuplicateMessageException(ack.Seq); + } + + return ack.Seq; + } + else + { + return publishResult.Error; + } + } + + /// + /// Valid keys are \A[-/_=\.a-zA-Z0-9]+\z, additionally they may not start or end in . + /// + private static NatsResult TryValidateKey(string key) + { + if (string.IsNullOrWhiteSpace(key) || key.Length == 0) + { + return KeyCannotBeEmptyException; + } + + if (key[0] == '.' || key[^1] == '.') + { + return KeyCannotStartOrEndWithPeriodException; + } + + if (!ValidKeyRegex.IsMatch(key)) + { + return KeyContainsInvalidCharactersException; + } + + return NatsResult.Default; + } + + /// + /// For the TTL header, we need to convert the TimeSpan to a Go time.ParseDuration string. + /// + /// TTL + /// String representing the number of seconds Go time.ParseDuration() can understand. + private static string ToTtlString(TimeSpan ttl) + => ttl == TimeSpan.MaxValue ? "never" : $"{(int)ttl.TotalSeconds:D}s"; + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowException(Exception exception) => throw exception; +} diff --git a/test/IntegrationTests/Extensions/NatsExtensionsPutWithTtlTests.cs b/test/IntegrationTests/Extensions/NatsExtensionsPutWithTtlTests.cs new file mode 100644 index 0000000..193eaf8 --- /dev/null +++ b/test/IntegrationTests/Extensions/NatsExtensionsPutWithTtlTests.cs @@ -0,0 +1,197 @@ +using NATS.Client.Core; +using NATS.Client.KeyValueStore; +using NATS.Net; + +namespace CodeCargo.Nats.DistributedCache.IntegrationTests.Extensions; + +/// +/// Integration tests for NatsExtensions.PutWithTtlAsync +/// +public class NatsExtensionsPutWithTtlTests(NatsIntegrationFixture fixture) : TestBase(fixture) +{ + [Fact] + public async Task PutWithTtlAsync_StoresValue() + { + var key = MethodKey(); + var value = "test-value"; + + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + var revision = await store.PutWithTtlAsync(key, value); + + Assert.True(revision > 0); + + var entry = await store.GetEntryAsync(key); + Assert.Equal(value, entry.Value); + } + + [Fact] + public async Task PutWithTtlAsync_WithTtl_ExpiresAfterTtl() + { + var key = MethodKey(); + var value = "test-value-ttl"; + var ttl = TimeSpan.FromSeconds(2); + + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + await store.PutWithTtlAsync(key, value, ttl); + + // Value should exist immediately + var entry = await store.TryGetEntryAsync(key); + Assert.True(entry.Success); + Assert.Equal(value, entry.Value.Value); + + // Wait for TTL to expire + await Task.Delay(TimeSpan.FromSeconds(3)); + + // Value should be expired + var expiredEntry = await store.TryGetEntryAsync(key); + Assert.False(expiredEntry.Success); + } + + [Fact] + public async Task PutWithTtlAsync_WithoutTtl_DoesNotExpire() + { + var key = MethodKey(); + var value = "test-value-no-ttl"; + + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + await store.PutWithTtlAsync(key, value); + + // Value should exist immediately + var entry = await store.TryGetEntryAsync(key); + Assert.True(entry.Success); + Assert.Equal(value, entry.Value.Value); + + // Wait a bit + await Task.Delay(TimeSpan.FromSeconds(2)); + + // Value should still exist + var stillExists = await store.TryGetEntryAsync(key); + Assert.True(stillExists.Success); + Assert.Equal(value, stillExists.Value.Value); + } + + [Fact] + public async Task TryPutWithTtlAsync_WithEmptyKey_ReturnsError() + { + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + var result = await store.TryPutWithTtlAsync(string.Empty, "value"); + + Assert.False(result.Success); + Assert.IsType(result.Error); + Assert.Contains("empty", result.Error.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task TryPutWithTtlAsync_WithKeyStartingWithPeriod_ReturnsError() + { + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + var result = await store.TryPutWithTtlAsync(".invalid-key", "value"); + + Assert.False(result.Success); + Assert.IsType(result.Error); + Assert.Contains("period", result.Error.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task TryPutWithTtlAsync_WithKeyEndingWithPeriod_ReturnsError() + { + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + var result = await store.TryPutWithTtlAsync("invalid-key.", "value"); + + Assert.False(result.Success); + Assert.IsType(result.Error); + Assert.Contains("period", result.Error.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task TryPutWithTtlAsync_WithInvalidCharacters_ReturnsError() + { + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + var result = await store.TryPutWithTtlAsync("invalid key with spaces", "value"); + + Assert.False(result.Success); + Assert.IsType(result.Error); + Assert.Contains("invalid", result.Error.Message, StringComparison.OrdinalIgnoreCase); + } + + [Fact] + public async Task TryPutWithTtlAsync_WithValidKey_ReturnsSuccess() + { + var key = MethodKey(); + var value = "test-value"; + + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + var result = await store.TryPutWithTtlAsync(key, value); + + Assert.True(result.Success); + Assert.True(result.Value > 0); + } + + [Theory] + [InlineData("simple-key")] + [InlineData("key_with_underscore")] + [InlineData("key/with/slashes")] + [InlineData("key.with.periods")] + [InlineData("key=with=equals")] + [InlineData("MixedCase123")] + public async Task TryPutWithTtlAsync_WithValidKeyFormats_Succeeds(string key) + { + var value = "test-value"; + + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + var result = await store.TryPutWithTtlAsync(key, value); + + Assert.True(result.Success); + + var entry = await store.TryGetEntryAsync(key); + Assert.True(entry.Success); + Assert.Equal(value, entry.Value.Value); + } + + [Fact] + public async Task PutWithTtlAsync_WithInvalidKey_ThrowsException() + { + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + await Assert.ThrowsAsync(() => + store.PutWithTtlAsync(string.Empty, "value").AsTask()); + } + + [Fact] + public async Task PutWithTtlAsync_OverwritesExistingValue() + { + var key = MethodKey(); + var value1 = "first-value"; + var value2 = "second-value"; + + var kvContext = NatsConnection.CreateKeyValueStoreContext(); + var store = await kvContext.GetStoreAsync("cache"); + + var revision1 = await store.PutWithTtlAsync(key, value1); + var revision2 = await store.PutWithTtlAsync(key, value2); + + Assert.True(revision2 > revision1); + + var entry = await store.GetEntryAsync(key); + Assert.Equal(value2, entry.Value); + } +} From f261e2b432fde1067c16e45678b78049272a192f Mon Sep 17 00:00:00 2001 From: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com> Date: Thu, 11 Dec 2025 20:55:22 -0500 Subject: [PATCH 2/2] Update src/NatsDistributedCache/NatsExtensions.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/NatsDistributedCache/NatsExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/NatsDistributedCache/NatsExtensions.cs b/src/NatsDistributedCache/NatsExtensions.cs index 9af4557..fcea59a 100644 --- a/src/NatsDistributedCache/NatsExtensions.cs +++ b/src/NatsDistributedCache/NatsExtensions.cs @@ -100,7 +100,7 @@ public static async ValueTask> TryPutWithTtlAsync(this INat /// private static NatsResult TryValidateKey(string key) { - if (string.IsNullOrWhiteSpace(key) || key.Length == 0) + if (string.IsNullOrWhiteSpace(key)) { return KeyCannotBeEmptyException; }