diff --git a/docs/Configuration.md b/docs/Configuration.md index 6d535632f..78addf060 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -89,6 +89,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a | version={string} | `DefaultVersion` | (`3.0` in Azure, else `2.0`) | Redis version level (useful when the server does not make this available) | | writeBuffer={int} | `WriteBuffer` | `4096` | Size of the output buffer | | | `CheckCertificateRevocation` | `true` | A Boolean value that specifies whether the certificate revocation list is checked during authentication. | +| pruneClusterConnections={bool} | `PruneClusterConnections` | `false` | If true connections from configured endpoints to cluster caches that don't appear in a `cluster nodes` response will be closed. Note: if the configured endpoint is passed to GetServer() it will throw an exception. | Additional code-only options: - ReconnectRetryPolicy (`IReconnectRetryPolicy`) - Default: `ReconnectRetryPolicy = LinearRetry(ConnectTimeout);` diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index c1c5dc499..e1797cbab 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -89,7 +89,8 @@ internal const string SyncTimeout = "syncTimeout", TieBreaker = "tiebreaker", Version = "version", - WriteBuffer = "writeBuffer"; + WriteBuffer = "writeBuffer", + PruneClusterConnections = "pruneClusterConnections"; private static readonly Dictionary normalizedOptions = new[] { @@ -118,6 +119,7 @@ internal const string TieBreaker, Version, WriteBuffer, + PruneClusterConnections, }.ToDictionary(x => x, StringComparer.OrdinalIgnoreCase); public static string TryNormalize(string value) @@ -332,6 +334,13 @@ public bool PreserveAsyncOrder /// public Proxy Proxy { get { return proxy.GetValueOrDefault(); } set { proxy = value; } } + /// + /// Should extra connections be removed for a cluster behind a load balancer. + /// If this is configured then endpoints specified in the configuration may not have backing connections + /// (ConnectionMultiplexer.GetServer may throw an ArgumentException if a pruned enpoint is passed) + /// + public bool? PruneClusterConnections { get; set; } + /// /// The retry policy to be used for connection reconnects /// @@ -471,6 +480,7 @@ public ConfigurationOptions Clone() ReconnectRetryPolicy = reconnectRetryPolicy, SslProtocols = SslProtocols, checkCertificateRevocation = checkCertificateRevocation, + PruneClusterConnections = PruneClusterConnections, }; foreach (var item in EndPoints) options.EndPoints.Add(item); @@ -530,6 +540,7 @@ public string ToString(bool includePassword) Append(sb, OptionKeys.ConfigCheckSeconds, configCheckSeconds); Append(sb, OptionKeys.ResponseTimeout, responseTimeout); Append(sb, OptionKeys.DefaultDatabase, DefaultDatabase); + Append(sb, OptionKeys.PruneClusterConnections, PruneClusterConnections); commandMap?.AppendDeltas(sb); return sb.ToString(); } @@ -735,6 +746,9 @@ private void DoParse(string configuration, bool ignoreUnknown) case OptionKeys.SslProtocols: SslProtocols = OptionKeys.ParseSslProtocols(key, value); break; + case OptionKeys.PruneClusterConnections: + PruneClusterConnections = OptionKeys.ParseBoolean(key, value); + break; default: if (!string.IsNullOrEmpty(key) && key[0] == '$') { diff --git a/src/StackExchange.Redis/ConnectionMultiplexer.cs b/src/StackExchange.Redis/ConnectionMultiplexer.cs index f8e4dd7d4..e73ef0b36 100644 --- a/src/StackExchange.Redis/ConnectionMultiplexer.cs +++ b/src/StackExchange.Redis/ConnectionMultiplexer.cs @@ -1088,6 +1088,25 @@ internal ServerSnapshot Add(ServerEndPoint value) return new ServerSnapshot(arr, _count + 1); } + internal ServerSnapshot Remove(ServerEndPoint value) + { + if (value == null) return this; + var arr = _arr; + var arrCopy = new ServerEndPoint[arr.Length]; + int inserted = 0; + for (int i = 0; i < _count; ++i) + { + if (arr[i] != value) + { + arrCopy[inserted++] = arr[i]; + } + } + + if (inserted == _count) return this; + + return new ServerSnapshot(arrCopy, inserted); + } + internal EndPoint[] GetEndPoints() { if (_count == 0) return Array.Empty(); @@ -1100,12 +1119,14 @@ internal EndPoint[] GetEndPoints() return arr; } } - internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, LogProxy log = null, bool activate = true) + + internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, LogProxy log = null, bool activate = true, bool create = true) { if (endpoint == null) return null; var server = (ServerEndPoint)servers[endpoint]; if (server == null) { + if (!create) return null; bool isNew = false; lock (servers) { @@ -1126,6 +1147,27 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, LogProxy log = null return server; } + internal void RemoveServerEndPoint(ServerEndPoint server) + { + if (server == null) return; + var endpoint = server.EndPoint; + if (endpoint == null) return; + var test = (ServerEndPoint)servers[endpoint]; + if (test != server) return; + lock (servers) + { + test = (ServerEndPoint)servers[endpoint]; + if (test != server) + { + if (_isDisposed) throw new ObjectDisposedException(ToString()); + return; + } + servers.Remove(endpoint); + _serverSnapshot = _serverSnapshot.Remove(server); + } + server.Dispose(); + } + internal readonly CommandMap CommandMap; private ConnectionMultiplexer(ConfigurationOptions configuration) @@ -1489,6 +1531,8 @@ private void ActivateAllServers(LogProxy log) } } } + + private bool loadbalancedCluster = false; internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogProxy log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None) { if (_isDisposed) throw new ObjectDisposedException(ToString()); @@ -1537,6 +1581,22 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP } int standaloneCount = 0, clusterCount = 0, sentinelCount = 0; var endpoints = RawConfig.EndPoints; + // If we suspect that connections to configured endpoints will be discarded + // then we can check for configuration changes on the more 'permanent' + // connections assuming that any are connected. Falling back to the configured + // endpoints only in situations where all of the 'permanent' connections + // aren't up which could mean that those endpoints are not going to be used + // going forward (perhaps the A record changed or some ports were shuffled). + // If we reconfigure after the 'permanent' endpoints are rediscovered or + // restored we can again disconnect from the configured endpoints. + if (loadbalancedCluster) + { + var snapshot = _serverSnapshot.GetEndPoints(); + if (snapshot.Any(s => GetServerEndPoint(s, log, false).IsConnected)) + { + endpoints = new EndPointCollection(snapshot); + } + } log?.WriteLine($"{endpoints.Count} unique nodes specified"); if (endpoints.Count == 0) @@ -1691,6 +1751,29 @@ internal async Task ReconfigureAsync(bool first, bool reconfigureAll, LogP if (encounteredConnectedClusterServer) { endpoints = updatedClusterEndpointCollection; + if (RawConfig.PruneClusterConnections == true && endpoints != null) + { + foreach (var configEndpoint in RawConfig.EndPoints) + { + // I suspect that the normal case for this is that there's only one endpoint + // configured in the options, with higher numbers of configured endpoints and + // cluster endpoints it would be worth using a Set to do the check + if (!endpoints.Contains(configEndpoint)) + { + // If we've connected through the cluster through an endpoint that isn't "in" the + // cluster nodes endpoints then we aren't going to assign the endpoint any slots + // so the connection can be closed + loadbalancedCluster = true; + var serverEndPoint = GetServerEndPoint(configEndpoint, create: false); + if (serverEndPoint != null) + { + log?.WriteLine($"Removing endpoint {Format.ToString(serverEndPoint.EndPoint)}"); + RemoveServerEndPoint(serverEndPoint); + } + masters.RemoveAll(ep => ep.EndPoint.Equals(configEndpoint)); + } + } + } } else { diff --git a/src/StackExchange.Redis/RedisResult.cs b/src/StackExchange.Redis/RedisResult.cs index 5f097f717..747c41166 100644 --- a/src/StackExchange.Redis/RedisResult.cs +++ b/src/StackExchange.Redis/RedisResult.cs @@ -219,14 +219,15 @@ internal static RedisResult TryCreate(PhysicalConnection connection, in RawResul /// /// Interprets a multi-bulk result with successive key/name values as a dictionary keyed by name /// + /// The key comparator to use, or by default public Dictionary ToDictionary(IEqualityComparer comparer = null) { var arr = AsRedisResultArray(); int len = arr.Length / 2; var result = new Dictionary(len, comparer ?? StringComparer.InvariantCultureIgnoreCase); - for (int i = 0; i < len;) + for (int i = 0; i < arr.Length; i += 2) { - result.Add(arr[i++].AsString(), arr[i++]); + result.Add(arr[i].AsString(), arr[i + 1]); } return result; } diff --git a/tests/StackExchange.Redis.Tests/RedisResultTests.cs b/tests/StackExchange.Redis.Tests/RedisResultTests.cs new file mode 100644 index 000000000..32582f64e --- /dev/null +++ b/tests/StackExchange.Redis.Tests/RedisResultTests.cs @@ -0,0 +1,98 @@ +using System; +using System.Collections.Generic; +using Xunit; + +namespace StackExchange.Redis.Tests +{ + /// + /// Tests for + /// + public sealed class RedisResultTests + { + /// + /// Tests the basic functionality of + /// + [Fact] + public void ToDictionaryWorks() + { + var redisArrayResult = RedisResult.Create( + new RedisValue[] { "one", 1, "two", 2, "three", 3, "four", 4 }); + + var dict = redisArrayResult.ToDictionary(); + + Assert.Equal(4, dict.Count); + Assert.Equal(1, (RedisValue)dict["one"]); + Assert.Equal(2, (RedisValue)dict["two"]); + Assert.Equal(3, (RedisValue)dict["three"]); + Assert.Equal(4, (RedisValue)dict["four"]); + } + + /// + /// Tests the basic functionality of + /// when the results contain a nested results array, which is common for lua script results + /// + [Fact] + public void ToDictionaryWorksWhenNested() + { + var redisArrayResult = RedisResult.Create( + new RedisResult[] + { + RedisResult.Create((RedisValue)"one"), + RedisResult.Create(new RedisValue[]{"two", 2, "three", 3}), + + RedisResult.Create((RedisValue)"four"), + RedisResult.Create(new RedisValue[] { "five", 5, "six", 6 }), + }); + + var dict = redisArrayResult.ToDictionary(); + var nestedDict = dict["one"].ToDictionary(); + + Assert.Equal(2, dict.Count); + Assert.Equal(2, nestedDict.Count); + Assert.Equal(2, (RedisValue)nestedDict["two"]); + Assert.Equal(3, (RedisValue)nestedDict["three"]); + } + + /// + /// Tests that fails when a duplicate key is encountered. + /// This also tests that the default comparator is case-insensitive. + /// + [Fact] + public void ToDictionaryFailsWithDuplicateKeys() + { + var redisArrayResult = RedisResult.Create( + new RedisValue[] { "banana", 1, "BANANA", 2, "orange", 3, "apple", 4 }); + + Assert.Throws(() => redisArrayResult.ToDictionary(/* Use default comparer, causes collision of banana */)); + } + + /// + /// Tests that correctly uses the provided comparator + /// + [Fact] + public void ToDictionaryWorksWithCustomComparator() + { + var redisArrayResult = RedisResult.Create( + new RedisValue[] { "banana", 1, "BANANA", 2, "orange", 3, "apple", 4 }); + + var dict = redisArrayResult.ToDictionary(StringComparer.Ordinal); + + Assert.Equal(4, dict.Count); + Assert.Equal(1, (RedisValue)dict["banana"]); + Assert.Equal(2, (RedisValue)dict["BANANA"]); + } + + /// + /// Tests that fails when the redis results array contains an odd number + /// of elements. In other words, it's not actually a Key,Value,Key,Value... etc. array + /// + [Fact] + public void ToDictionaryFailsOnMishapenResults() + { + var redisArrayResult = RedisResult.Create( + new RedisValue[] { "one", 1, "two", 2, "three", 3, "four" /* missing 4 */ }); + + Assert.Throws(()=>redisArrayResult.ToDictionary(StringComparer.Ordinal)); + } + } +}