Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a
| version={string} | `DefaultVersion` | (`3.0` in Azure, else `2.0`) | Redis version level (useful when the server does not make this available) |
| writeBuffer={int} | `WriteBuffer` | `4096` | Size of the output buffer |
| | `CheckCertificateRevocation` | `true` | A Boolean value that specifies whether the certificate revocation list is checked during authentication. |
| pruneClusterConnections={bool} | `PruneClusterConnections` | `false` | If true connections from configured endpoints to cluster caches that don't appear in a `cluster nodes` response will be closed. Note: if the configured endpoint is passed to GetServer() it will throw an exception. |

Additional code-only options:
- ReconnectRetryPolicy (`IReconnectRetryPolicy`) - Default: `ReconnectRetryPolicy = LinearRetry(ConnectTimeout);`
Expand Down
16 changes: 15 additions & 1 deletion src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ internal const string
SyncTimeout = "syncTimeout",
TieBreaker = "tiebreaker",
Version = "version",
WriteBuffer = "writeBuffer";
WriteBuffer = "writeBuffer",
PruneClusterConnections = "pruneClusterConnections";

private static readonly Dictionary<string, string> normalizedOptions = new[]
{
Expand Down Expand Up @@ -118,6 +119,7 @@ internal const string
TieBreaker,
Version,
WriteBuffer,
PruneClusterConnections,
}.ToDictionary(x => x, StringComparer.OrdinalIgnoreCase);

public static string TryNormalize(string value)
Expand Down Expand Up @@ -332,6 +334,13 @@ public bool PreserveAsyncOrder
/// </summary>
public Proxy Proxy { get { return proxy.GetValueOrDefault(); } set { proxy = value; } }

/// <summary>
/// Should extra connections be removed for a cluster behind a load balancer.
/// If this is configured then endpoints specified in the configuration may not have backing connections
/// (ConnectionMultiplexer.GetServer may throw an ArgumentException if a pruned enpoint is passed)
/// </summary>
public bool? PruneClusterConnections { get; set; }

/// <summary>
/// The retry policy to be used for connection reconnects
/// </summary>
Expand Down Expand Up @@ -471,6 +480,7 @@ public ConfigurationOptions Clone()
ReconnectRetryPolicy = reconnectRetryPolicy,
SslProtocols = SslProtocols,
checkCertificateRevocation = checkCertificateRevocation,
PruneClusterConnections = PruneClusterConnections,
};
foreach (var item in EndPoints)
options.EndPoints.Add(item);
Expand Down Expand Up @@ -530,6 +540,7 @@ public string ToString(bool includePassword)
Append(sb, OptionKeys.ConfigCheckSeconds, configCheckSeconds);
Append(sb, OptionKeys.ResponseTimeout, responseTimeout);
Append(sb, OptionKeys.DefaultDatabase, DefaultDatabase);
Append(sb, OptionKeys.PruneClusterConnections, PruneClusterConnections);
commandMap?.AppendDeltas(sb);
return sb.ToString();
}
Expand Down Expand Up @@ -735,6 +746,9 @@ private void DoParse(string configuration, bool ignoreUnknown)
case OptionKeys.SslProtocols:
SslProtocols = OptionKeys.ParseSslProtocols(key, value);
break;
case OptionKeys.PruneClusterConnections:
PruneClusterConnections = OptionKeys.ParseBoolean(key, value);
break;
default:
if (!string.IsNullOrEmpty(key) && key[0] == '$')
{
Expand Down
85 changes: 84 additions & 1 deletion src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,25 @@ internal ServerSnapshot Add(ServerEndPoint value)
return new ServerSnapshot(arr, _count + 1);
}

internal ServerSnapshot Remove(ServerEndPoint value)
{
if (value == null) return this;
var arr = _arr;
var arrCopy = new ServerEndPoint[arr.Length];
int inserted = 0;
for (int i = 0; i < _count; ++i)
{
if (arr[i] != value)
{
arrCopy[inserted++] = arr[i];
}
}

if (inserted == _count) return this;

return new ServerSnapshot(arrCopy, inserted);
}

internal EndPoint[] GetEndPoints()
{
if (_count == 0) return Array.Empty<EndPoint>();
Expand All @@ -1100,12 +1119,14 @@ internal EndPoint[] GetEndPoints()
return arr;
}
}
internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, LogProxy log = null, bool activate = true)

internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, LogProxy log = null, bool activate = true, bool create = true)
{
if (endpoint == null) return null;
var server = (ServerEndPoint)servers[endpoint];
if (server == null)
{
if (!create) return null;
bool isNew = false;
lock (servers)
{
Expand All @@ -1126,6 +1147,27 @@ internal ServerEndPoint GetServerEndPoint(EndPoint endpoint, LogProxy log = null
return server;
}

internal void RemoveServerEndPoint(ServerEndPoint server)
{
if (server == null) return;
var endpoint = server.EndPoint;
if (endpoint == null) return;
var test = (ServerEndPoint)servers[endpoint];
if (test != server) return;
lock (servers)
{
test = (ServerEndPoint)servers[endpoint];
if (test != server)
{
if (_isDisposed) throw new ObjectDisposedException(ToString());
return;
}
servers.Remove(endpoint);
_serverSnapshot = _serverSnapshot.Remove(server);
}
server.Dispose();
}

internal readonly CommandMap CommandMap;

private ConnectionMultiplexer(ConfigurationOptions configuration)
Expand Down Expand Up @@ -1489,6 +1531,8 @@ private void ActivateAllServers(LogProxy log)
}
}
}

private bool loadbalancedCluster = false;
internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogProxy log, EndPoint blame, string cause, bool publishReconfigure = false, CommandFlags publishReconfigureFlags = CommandFlags.None)
{
if (_isDisposed) throw new ObjectDisposedException(ToString());
Expand Down Expand Up @@ -1537,6 +1581,22 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
}
int standaloneCount = 0, clusterCount = 0, sentinelCount = 0;
var endpoints = RawConfig.EndPoints;
// If we suspect that connections to configured endpoints will be discarded
// then we can check for configuration changes on the more 'permanent'
// connections assuming that any are connected. Falling back to the configured
// endpoints only in situations where all of the 'permanent' connections
// aren't up which could mean that those endpoints are not going to be used
// going forward (perhaps the A record changed or some ports were shuffled).
// If we reconfigure after the 'permanent' endpoints are rediscovered or
// restored we can again disconnect from the configured endpoints.
if (loadbalancedCluster)
{
var snapshot = _serverSnapshot.GetEndPoints();
if (snapshot.Any(s => GetServerEndPoint(s, log, false).IsConnected))
{
endpoints = new EndPointCollection(snapshot);
}
}
log?.WriteLine($"{endpoints.Count} unique nodes specified");

if (endpoints.Count == 0)
Expand Down Expand Up @@ -1691,6 +1751,29 @@ internal async Task<bool> ReconfigureAsync(bool first, bool reconfigureAll, LogP
if (encounteredConnectedClusterServer)
{
endpoints = updatedClusterEndpointCollection;
if (RawConfig.PruneClusterConnections == true && endpoints != null)
{
foreach (var configEndpoint in RawConfig.EndPoints)
{
// I suspect that the normal case for this is that there's only one endpoint
// configured in the options, with higher numbers of configured endpoints and
// cluster endpoints it would be worth using a Set to do the check
if (!endpoints.Contains(configEndpoint))
{
// If we've connected through the cluster through an endpoint that isn't "in" the
// cluster nodes endpoints then we aren't going to assign the endpoint any slots
// so the connection can be closed
loadbalancedCluster = true;
var serverEndPoint = GetServerEndPoint(configEndpoint, create: false);
if (serverEndPoint != null)
{
log?.WriteLine($"Removing endpoint {Format.ToString(serverEndPoint.EndPoint)}");
RemoveServerEndPoint(serverEndPoint);
}
masters.RemoveAll(ep => ep.EndPoint.Equals(configEndpoint));
}
}
}
}
else
{
Expand Down
5 changes: 3 additions & 2 deletions src/StackExchange.Redis/RedisResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,15 @@ internal static RedisResult TryCreate(PhysicalConnection connection, in RawResul
/// <summary>
/// Interprets a multi-bulk result with successive key/name values as a dictionary keyed by name
/// </summary>
/// <param name="comparer">The key comparator to use, or <see cref="StringComparer.InvariantCultureIgnoreCase"/> by default</param>
public Dictionary<string, RedisResult> ToDictionary(IEqualityComparer<string> comparer = null)
{
var arr = AsRedisResultArray();
int len = arr.Length / 2;
var result = new Dictionary<string, RedisResult>(len, comparer ?? StringComparer.InvariantCultureIgnoreCase);
for (int i = 0; i < len;)
for (int i = 0; i < arr.Length; i += 2)
{
result.Add(arr[i++].AsString(), arr[i++]);
result.Add(arr[i].AsString(), arr[i + 1]);
}
return result;
}
Expand Down
98 changes: 98 additions & 0 deletions tests/StackExchange.Redis.Tests/RedisResultTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System;
using System.Collections.Generic;
using Xunit;

namespace StackExchange.Redis.Tests
{
/// <summary>
/// Tests for <see cref="RedisResult"/>
/// </summary>
public sealed class RedisResultTests
{
/// <summary>
/// Tests the basic functionality of <see cref="RedisResult.ToDictionary(IEqualityComparer{string})"/>
/// </summary>
[Fact]
public void ToDictionaryWorks()
{
var redisArrayResult = RedisResult.Create(
new RedisValue[] { "one", 1, "two", 2, "three", 3, "four", 4 });

var dict = redisArrayResult.ToDictionary();

Assert.Equal(4, dict.Count);
Assert.Equal(1, (RedisValue)dict["one"]);
Assert.Equal(2, (RedisValue)dict["two"]);
Assert.Equal(3, (RedisValue)dict["three"]);
Assert.Equal(4, (RedisValue)dict["four"]);
}

/// <summary>
/// Tests the basic functionality of <see cref="RedisResult.ToDictionary(IEqualityComparer{string})"/>
/// when the results contain a nested results array, which is common for lua script results
/// </summary>
[Fact]
public void ToDictionaryWorksWhenNested()
{
var redisArrayResult = RedisResult.Create(
new RedisResult[]
{
RedisResult.Create((RedisValue)"one"),
RedisResult.Create(new RedisValue[]{"two", 2, "three", 3}),

RedisResult.Create((RedisValue)"four"),
RedisResult.Create(new RedisValue[] { "five", 5, "six", 6 }),
});

var dict = redisArrayResult.ToDictionary();
var nestedDict = dict["one"].ToDictionary();

Assert.Equal(2, dict.Count);
Assert.Equal(2, nestedDict.Count);
Assert.Equal(2, (RedisValue)nestedDict["two"]);
Assert.Equal(3, (RedisValue)nestedDict["three"]);
}

/// <summary>
/// Tests that <see cref="RedisResult.ToDictionary(IEqualityComparer{string})"/> fails when a duplicate key is encountered.
/// This also tests that the default comparator is case-insensitive.
/// </summary>
[Fact]
public void ToDictionaryFailsWithDuplicateKeys()
{
var redisArrayResult = RedisResult.Create(
new RedisValue[] { "banana", 1, "BANANA", 2, "orange", 3, "apple", 4 });

Assert.Throws<ArgumentException>(() => redisArrayResult.ToDictionary(/* Use default comparer, causes collision of banana */));
}

/// <summary>
/// Tests that <see cref="RedisResult.ToDictionary(IEqualityComparer{string})"/> correctly uses the provided comparator
/// </summary>
[Fact]
public void ToDictionaryWorksWithCustomComparator()
{
var redisArrayResult = RedisResult.Create(
new RedisValue[] { "banana", 1, "BANANA", 2, "orange", 3, "apple", 4 });

var dict = redisArrayResult.ToDictionary(StringComparer.Ordinal);

Assert.Equal(4, dict.Count);
Assert.Equal(1, (RedisValue)dict["banana"]);
Assert.Equal(2, (RedisValue)dict["BANANA"]);
}

/// <summary>
/// Tests that <see cref="RedisResult.ToDictionary(IEqualityComparer{string})"/> fails when the redis results array contains an odd number
/// of elements. In other words, it's not actually a Key,Value,Key,Value... etc. array
/// </summary>
[Fact]
public void ToDictionaryFailsOnMishapenResults()
{
var redisArrayResult = RedisResult.Create(
new RedisValue[] { "one", 1, "two", 2, "three", 3, "four" /* missing 4 */ });

Assert.Throws<IndexOutOfRangeException>(()=>redisArrayResult.ToDictionary(StringComparer.Ordinal));
}
}
}