diff --git a/README.md b/README.md index 812b236..bcf720e 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,53 @@ The `rabbitmq_delayed_message_exchange` plugin is [archived and no longer mainta - On RabbitMQ >= 4.3: Delayed messages use the in-memory fallback automatically. Be aware that these are not durable across process restarts. - For durable delayed delivery on RabbitMQ 4.3+, consider implementing TTL + Dead-Letter Exchange patterns or using an external scheduler. +### RabbitMQ 4.3 Feature Support + +**Supported (AMQP 0.9.1 compatible):** + +- 32 strict message priority levels on quorum queues (via `UseMessagePriority()`) +- Delayed retries with linear backoff (via `UseDelayedRetries()`) +- Per-queue consumer timeouts (via `ConsumerTimeout()`) +- Single active consumer (via `UseSingleActiveConsumer()`) + +**Not supported (require AMQP 1.0 protocol):** + +- `x-opt-delivery-time` annotation -- per-message delayed retry override via the `modified` disposition outcome. AMQP 0.9.1 `basic.nack`/`basic.reject` do not support annotations. +- `x-opt-delivery-delay` annotation -- relative delay for the enterprise Message Scheduler / Delayed Queue plugin. +- Rejected-by and rejection reason -- returned to publishers in the AMQP 1.0 `Rejected` outcome. +- Consumer activity notification -- signaled via AMQP 1.0 flow frames for single active consumer state transitions. + +This library uses the `RabbitMQ.Client` package (AMQP 0.9.1). To use AMQP 1.0 features, consider the [Amqp.Net Lite](https://github.com/Azure/amqpnetlite) library or the [RabbitMQ AMQP 1.0 .NET client](https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client). + +### OpenTelemetry + +Foundatio automatically propagates W3C trace context (`traceparent` / `tracestate`) through message headers. On publish, the current `Activity.Id` is stored as the message's `CorrelationId`; on receive, Foundatio starts a new `Activity` parented to that ID, linking consumer spans back to the publisher's trace. + +To collect Foundatio's application-level message spans, add the `"Foundatio"` source: + +```csharp +.AddSource("Foundatio") +``` + +For additional **transport-level** visibility (AMQP channel operations, network round-trips), the `RabbitMQ.Client` 7.x library emits its own spans: + +```csharp +.AddSource("RabbitMQ.Client.*") +``` + +A complete tracing setup: + +```csharp +services.AddOpenTelemetry().WithTracing(tracing => +{ + tracing.AddSource("Foundatio"); // message publish/handle spans + tracing.AddSource("RabbitMQ.Client.*"); // AMQP transport spans (optional) + tracing.AddOtlpExporter(); +}); +``` + +> **Note:** The [`RabbitMQ.Client.OpenTelemetry`](https://www.nuget.org/packages/RabbitMQ.Client.OpenTelemetry/) package (currently pre-release) is NOT required -- Foundatio handles cross-process trace propagation at the application level. That package adds an alternative propagation mechanism at the transport level which is redundant when using Foundatio. + ### Core Features - [Getting Started](https://foundatio.dev/guide/getting-started) - Installation and setup diff --git a/src/Foundatio.RabbitMQ/Extensions/EnumExtensions.cs b/src/Foundatio.RabbitMQ/Extensions/EnumExtensions.cs new file mode 100644 index 0000000..4212662 --- /dev/null +++ b/src/Foundatio.RabbitMQ/Extensions/EnumExtensions.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Runtime.Serialization; + +namespace Foundatio.Messaging; + +internal static class EnumExtensions +{ + private static readonly ConcurrentDictionary Cache = new(); + + public static string ToEnumString(this T value) where T : struct, Enum + { + return Cache.GetOrAdd(value, static v => + { + var member = v.GetType().GetField(v.ToString()!); + var attribute = member?.GetCustomAttribute(); + return attribute?.Value ?? v.ToString()!; + }); + } +} diff --git a/src/Foundatio.RabbitMQ/Messaging/DeadLetterStrategy.cs b/src/Foundatio.RabbitMQ/Messaging/DeadLetterStrategy.cs new file mode 100644 index 0000000..3e21a87 --- /dev/null +++ b/src/Foundatio.RabbitMQ/Messaging/DeadLetterStrategy.cs @@ -0,0 +1,48 @@ +using System.Runtime.Serialization; + +namespace Foundatio.Messaging; + +/// +/// Dead-letter strategy for quorum queues. +/// Controls how messages are transferred to the dead-letter exchange. +/// Set via the x-dead-letter-strategy queue argument. +/// See: https://www.rabbitmq.com/docs/quorum-queues#dead-lettering +/// +public enum DeadLetterStrategy +{ + /// + /// Default. Messages may be lost in transit between queues during dead-lettering. + /// Suitable when dead-lettered messages are informational and loss is acceptable. + /// + [EnumMember(Value = "at-most-once")] + AtMostOnce, + + /// + /// Guarantees message transfer to the dead-letter exchange using internal publisher confirms. + /// Requires overflow to be set to reject-publish (not drop-head). + /// Uses more memory and CPU. Only enable when dead-lettered messages must not be lost. + /// + [EnumMember(Value = "at-least-once")] + AtLeastOnce +} + +/// +/// Queue overflow behavior when a queue reaches its maximum length. +/// Set via the x-overflow queue argument. +/// See: https://www.rabbitmq.com/docs/maxlength#overflow-behaviour +/// +public enum QueueOverflowBehavior +{ + /// + /// Default. Drop or dead-letter messages from the head (oldest) of the queue. + /// + [EnumMember(Value = "drop-head")] + DropHead, + + /// + /// Reject new publishes with basic.nack when the queue is full. + /// Required for at-least-once dead-lettering on quorum queues. + /// + [EnumMember(Value = "reject-publish")] + RejectPublish +} diff --git a/src/Foundatio.RabbitMQ/Messaging/DelayedRetryType.cs b/src/Foundatio.RabbitMQ/Messaging/DelayedRetryType.cs new file mode 100644 index 0000000..a6496f2 --- /dev/null +++ b/src/Foundatio.RabbitMQ/Messaging/DelayedRetryType.cs @@ -0,0 +1,39 @@ +using System.Runtime.Serialization; + +namespace Foundatio.Messaging; + +/// +/// Specifies the native delayed retry behavior for quorum queues (RabbitMQ 4.3+). +/// Maps to the x-delayed-retry-type queue argument. +/// See: https://www.rabbitmq.com/docs/quorum-queues#delayed-retries +/// +public enum DelayedRetryType +{ + /// + /// All returned messages (nacks, rejects, and timeouts) are delayed before redelivery. + /// + [EnumMember(Value = "all")] + All, + + /// + /// Messages returned without marking the delivery as failed are delayed. + /// Includes: basic.nack, AMQP 1.0 released, modified with delivery-failed=false. + /// These do NOT increment delivery-count, supporting unlimited returns. + /// + [EnumMember(Value = "returned")] + Returned, + + /// + /// Only messages where delivery actually failed are delayed. + /// Includes: basic.reject, client crash, AMQP 1.0 rejected, modified with delivery-failed=true. + /// These increment delivery-count toward the delivery-limit. + /// + [EnumMember(Value = "failed")] + Failed, + + /// + /// Delayed retry is explicitly disabled. + /// + [EnumMember(Value = "disabled")] + Disabled +} diff --git a/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs b/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs index 373b7fe..37fa75d 100644 --- a/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs +++ b/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Globalization; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -18,6 +19,7 @@ public class RabbitMQMessageBus : MessageBusBase { private const string XDeliveryCountHeader = "x-delivery-count"; private const string XOriginalMessageIdHeader = "x-original-message-id"; + private const string PriorityPropertyKey = "Priority"; private static readonly Version _delayedExchangePluginIncompatibleVersion = new(4, 3); private static readonly Version _globalQosRemovedVersion = new(4, 3); @@ -80,6 +82,12 @@ public RabbitMQMessageBus(RabbitMQMessageBusOptions options) : base(options) Uri = primaryUri, AutomaticRecoveryEnabled = true }; + + if (options.RequestedHeartbeat.HasValue) + _factory.RequestedHeartbeat = options.RequestedHeartbeat.Value; + + if (options.NetworkRecoveryInterval.HasValue) + _factory.NetworkRecoveryInterval = options.NetworkRecoveryInterval.Value; } public RabbitMQMessageBus(Builder config) @@ -148,7 +156,12 @@ protected override async Task EnsureTopicSubscriptionAsync(CancellationToken can #pragma warning disable CS0618 // GlobalQos is obsolete but we still need to read it for backward compatibility bool useGlobalQos = _options.GlobalQos; #pragma warning restore CS0618 - if (useGlobalQos && _serverVersion is not null && _serverVersion >= _globalQosRemovedVersion) + if (useGlobalQos && _isQuorumQueue) + { + _logger.LogWarning("GlobalQos is not supported on quorum queues. Falling back to per-channel prefetch (global: false). Remove the GlobalQos option to suppress this warning"); + useGlobalQos = false; + } + else if (useGlobalQos && _serverVersion is not null && _serverVersion >= _globalQosRemovedVersion) { _logger.LogWarning("GlobalQos is not supported on RabbitMQ {ServerVersion}. Falling back to per-channel prefetch (global: false). Remove the GlobalQos option to suppress this warning", _serverVersion); useGlobalQos = false; @@ -626,11 +639,16 @@ protected override async Task PublishImplAsync(string messageType, object messag if (_options.DefaultMessageTimeToLive.HasValue) basicProperties.Expiration = _options.DefaultMessageTimeToLive.Value.TotalMilliseconds.ToString(CultureInfo.InvariantCulture); + if (options.Properties.TryGetValue(PriorityPropertyKey, out string? priorityValue) && Byte.TryParse(priorityValue, out byte priority)) + basicProperties.Priority = priority; + if (options.Properties.Count > 0) { basicProperties.Headers ??= new Dictionary(); - foreach (var property in options.Properties) + foreach (var property in options.Properties.Where(p => !String.Equals(p.Key, PriorityPropertyKey, StringComparison.Ordinal))) + { basicProperties.Headers.Add(property.Key, property.Value); + } } // RabbitMQ only supports delayed messages with a third party plugin called "rabbitmq_delayed_message_exchange" @@ -772,11 +790,72 @@ private async Task CreateQueueAsync(IChannel channel) // Set up the queue where the messages will reside - it requires the queue name and durability. // Durable (the queue will survive a broker restart) // Arguments (some brokers use it to implement additional features like message TTL) - var result = await channel.QueueDeclareAsync(_options.SubscriptionQueueName, _options.IsDurable, _options.IsSubscriptionQueueExclusive, _options.SubscriptionQueueAutoDelete, _options.Arguments).AnyContext(); + var arguments = _options.Arguments is not null + ? new Dictionary(_options.Arguments) + : new Dictionary(); + + if (!String.IsNullOrWhiteSpace(_options.DeadLetterExchange)) + { + arguments["x-dead-letter-exchange"] = _options.DeadLetterExchange; + + if (!String.IsNullOrWhiteSpace(_options.DeadLetterRoutingKey)) + arguments["x-dead-letter-routing-key"] = _options.DeadLetterRoutingKey; + + if (_options.DeadLetterStrategy.HasValue) + { + if (_options.DeadLetterStrategy == DeadLetterStrategy.AtLeastOnce) + { + if (!_isQuorumQueue) + throw new MessageBusException("At-least-once dead-lettering requires quorum queues. Call UseQuorumQueues()."); + + if (_options.Overflow != QueueOverflowBehavior.RejectPublish) + throw new MessageBusException("At-least-once dead-lettering requires overflow to be set to RejectPublish. Call .OverflowBehavior(QueueOverflowBehavior.RejectPublish)."); + } + + arguments["x-dead-letter-strategy"] = _options.DeadLetterStrategy.Value.ToEnumString(); + } + } + + if (_options.Overflow.HasValue) + arguments["x-overflow"] = _options.Overflow.Value.ToEnumString(); + + if (_options.ConsumerTimeout.HasValue) + { + if (!_isQuorumQueue) + throw new MessageBusException("Per-queue consumer timeout (x-consumer-timeout) requires quorum queues (RabbitMQ 4.3+). Call UseQuorumQueues() before ConsumerTimeout()."); + + if (_serverVersion is not null && _serverVersion < _delayedExchangePluginIncompatibleVersion) + throw new MessageBusException($"Per-queue consumer timeout (x-consumer-timeout) requires RabbitMQ 4.3+. Detected server version: {_serverVersion}."); + + arguments["x-consumer-timeout"] = (long)_options.ConsumerTimeout.Value.TotalMilliseconds; + } + + if (_options.SingleActiveConsumer) + arguments["x-single-active-consumer"] = true; + + if (_options.MaxPriority.HasValue) + arguments["x-max-priority"] = (int)_options.MaxPriority.Value; + + if (_options.DelayedRetryType.HasValue) + { + if (!_isQuorumQueue) + throw new MessageBusException("Delayed retries (x-delayed-retry-*) require quorum queues (RabbitMQ 4.3+). Call UseQuorumQueues() before UseDelayedRetries()."); + + if (_serverVersion is not null && _serverVersion < _delayedExchangePluginIncompatibleVersion) + throw new MessageBusException($"Delayed retries (x-delayed-retry-*) require RabbitMQ 4.3+. Detected server version: {_serverVersion}."); + + arguments["x-delayed-retry-type"] = _options.DelayedRetryType.Value.ToEnumString(); + if (_options.DelayedRetryMin.HasValue) + arguments["x-delayed-retry-min"] = _options.DelayedRetryMin.Value; + if (_options.DelayedRetryMax.HasValue) + arguments["x-delayed-retry-max"] = _options.DelayedRetryMax.Value; + } + + var result = await channel.QueueDeclareAsync(_options.SubscriptionQueueName, _options.IsDurable, _options.IsSubscriptionQueueExclusive, _options.SubscriptionQueueAutoDelete, arguments.Count > 0 ? arguments : null).AnyContext(); string queueName = result.QueueName; - // bind the queue with the exchange. - await channel.QueueBindAsync(queueName, _options.Topic, "").AnyContext(); + // Bind the queue with the exchange. + await channel.QueueBindAsync(queueName, _options.Topic, String.Empty).AnyContext(); return queueName; } diff --git a/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBusOptions.cs b/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBusOptions.cs index 03f1007..f87bc37 100644 --- a/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBusOptions.cs +++ b/src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBusOptions.cs @@ -101,6 +101,109 @@ public class RabbitMQMessageBusOptions : SharedMessageBusOptions /// Default: 10 seconds (covers one full NetworkRecoveryInterval cycle with margin). /// public TimeSpan PublishRecoveryTimeout { get; set; } = TimeSpan.FromSeconds(10); + + /// + /// Heartbeat timeout negotiated with the broker. Controls how quickly dead TCP connections are detected. + /// Lower values detect failures faster but may cause false positives on congested networks. + /// Set to TimeSpan.Zero to disable heartbeats (not recommended for production). + /// Default: null (uses client library default of 60 seconds). + /// See: https://www.rabbitmq.com/docs/heartbeats + /// + public TimeSpan? RequestedHeartbeat { get; set; } + + /// + /// Time between automatic connection recovery attempts after a network failure. + /// Higher values reduce reconnection pressure on the broker during outages but increase downtime. + /// Default: null (uses client library default of 5 seconds). + /// See: https://www.rabbitmq.com/client-libraries/dotnet-api-guide#connection-recovery + /// + public TimeSpan? NetworkRecoveryInterval { get; set; } + + /// + /// Dead letter exchange name. Messages that exceed the delivery limit or are rejected + /// will be routed to this exchange instead of being dropped. + /// Set via the x-dead-letter-exchange queue argument. + /// See: https://www.rabbitmq.com/docs/dlx + /// + public string? DeadLetterExchange { get; set; } + + /// + /// Routing key used when dead-lettering messages. If not set, the original routing key is preserved. + /// Only effective when DeadLetterExchange is also set. + /// Set via the x-dead-letter-routing-key queue argument. + /// + public string? DeadLetterRoutingKey { get; set; } + + /// + /// Dead-letter strategy for quorum queues. Controls whether messages are transferred + /// to the DLX with at-most-once (default, may lose messages) or at-least-once (guaranteed delivery) + /// semantics. At-least-once requires Overflow to be set to RejectPublish. + /// Set via the x-dead-letter-strategy queue argument. + /// See: https://www.rabbitmq.com/docs/quorum-queues#dead-lettering + /// + public DeadLetterStrategy? DeadLetterStrategy { get; set; } + + /// + /// Queue overflow behavior when the queue reaches its max length. + /// Must be set to RejectPublish when using at-least-once dead-lettering. + /// Set via the x-overflow queue argument. + /// See: https://www.rabbitmq.com/docs/maxlength#overflow-behaviour + /// + public QueueOverflowBehavior? Overflow { get; set; } + + /// + /// Consumer timeout in milliseconds for quorum queues (RabbitMQ 4.3+). + /// Limits how long a consumer can hold unacknowledged messages before the broker returns them. + /// When exceeded, messages are requeued and the consumer is cancelled gracefully. + /// Set via the x-consumer-timeout queue argument. + /// Default: null (uses broker default, typically 30 minutes). + /// See: https://www.rabbitmq.com/docs/consumers#acknowledgement-timeout + /// + public TimeSpan? ConsumerTimeout { get; set; } + + /// + /// When true, only one consumer at a time will receive messages from the queue. + /// Other consumers act as standby and automatically take over if the active consumer disconnects. + /// Useful for strict message ordering with automatic failover. + /// Set via the x-single-active-consumer queue argument. + /// See: https://www.rabbitmq.com/docs/consumers#single-active-consumer + /// + public bool SingleActiveConsumer { get; set; } + + /// + /// Maximum number of priority levels for the queue (1-32). + /// Messages published with a higher priority value are delivered to consumers before lower-priority messages. + /// RabbitMQ 4.3+ quorum queues support 32 strict priority levels. + /// Set via the x-max-priority queue argument. + /// See: https://www.rabbitmq.com/docs/priority + /// + public byte? MaxPriority { get; set; } + + /// + /// Configures native delayed retry for quorum queues (RabbitMQ 4.3+). + /// When set, rejected/failed messages are held in a delayed state before becoming available again. + /// The delay uses linear backoff: min(min_delay * delivery_count, max_delay). + /// Requires quorum queues. Set via x-delayed-retry-type queue argument. + /// Default: null (not configured). + /// See: https://www.rabbitmq.com/docs/quorum-queues#delayed-retries + /// + public DelayedRetryType? DelayedRetryType { get; set; } + + /// + /// Minimum delay in milliseconds for native delayed retry (RabbitMQ 4.3+). + /// The actual delay is: min(DelayedRetryMin * delivery_count, DelayedRetryMax). + /// Only effective when DelayedRetryType is set. + /// Set via x-delayed-retry-min queue argument. + /// + public int? DelayedRetryMin { get; set; } + + /// + /// Maximum delay in milliseconds for native delayed retry (RabbitMQ 4.3+). + /// Caps the linear backoff so delays don't grow unbounded. + /// Only effective when DelayedRetryType is set. + /// Set via x-delayed-retry-max queue argument. + /// + public int? DelayedRetryMax { get; set; } } public class RabbitMQMessageBusOptionsBuilder : SharedMessageBusOptionsBuilder @@ -251,4 +354,114 @@ public RabbitMQMessageBusOptionsBuilder UseQuorumQueues() return this; } + + /// + /// Sets the heartbeat timeout negotiated with the broker. + /// Controls how quickly dead TCP connections are detected. + /// + /// Heartbeat interval. TimeSpan.Zero disables heartbeats. + public RabbitMQMessageBusOptionsBuilder RequestedHeartbeat(TimeSpan heartbeat) + { + ArgumentOutOfRangeException.ThrowIfLessThan(heartbeat, TimeSpan.Zero); + Target.RequestedHeartbeat = heartbeat; + return this; + } + + /// + /// Sets the interval between automatic connection recovery attempts. + /// + /// Recovery interval. Must be positive. + public RabbitMQMessageBusOptionsBuilder NetworkRecoveryInterval(TimeSpan interval) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(interval, TimeSpan.Zero); + Target.NetworkRecoveryInterval = interval; + return this; + } + + /// + /// Configures a dead letter exchange for messages that exceed the delivery limit or are rejected. + /// + /// The DLX exchange name. + /// Optional routing key for dead-lettered messages. + /// Dead-letter strategy. AtLeastOnce requires overflow to be RejectPublish. + public RabbitMQMessageBusOptionsBuilder DeadLetterExchange(string exchange, string? routingKey = null, DeadLetterStrategy? strategy = null) + { + ArgumentException.ThrowIfNullOrWhiteSpace(exchange); + Target.DeadLetterExchange = exchange; + Target.DeadLetterRoutingKey = routingKey; + Target.DeadLetterStrategy = strategy; + return this; + } + + /// + /// Sets the queue overflow behavior when max length is reached. + /// Must be RejectPublish when using at-least-once dead-lettering on quorum queues. + /// + /// The overflow behavior. + public RabbitMQMessageBusOptionsBuilder OverflowBehavior(QueueOverflowBehavior behavior) + { + Target.Overflow = behavior; + return this; + } + + /// + /// Sets the consumer timeout for quorum queues (RabbitMQ 4.3+). + /// When a consumer holds unacknowledged messages longer than this, the broker returns them + /// and gracefully cancels the consumer. + /// + /// Timeout duration. Must be positive. + public RabbitMQMessageBusOptionsBuilder ConsumerTimeout(TimeSpan timeout) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(timeout, TimeSpan.Zero); + Target.ConsumerTimeout = timeout; + return this; + } + + /// + /// Enables single active consumer mode for strict message ordering with automatic failover. + /// Only one consumer at a time will receive messages; others act as standby. + /// + /// Whether to enable single active consumer. Default: true. + public RabbitMQMessageBusOptionsBuilder UseSingleActiveConsumer(bool enabled = true) + { + Target.SingleActiveConsumer = enabled; + return this; + } + + /// + /// Enables message priority on the queue. Messages published with higher priority are + /// delivered to consumers before lower-priority messages. + /// RabbitMQ 4.3+ quorum queues support up to 32 strict priority levels. + /// + /// Maximum priority levels (1-32). Default: 32. + public RabbitMQMessageBusOptionsBuilder UseMessagePriority(byte maxPriority = 32) + { + ArgumentOutOfRangeException.ThrowIfZero(maxPriority); + ArgumentOutOfRangeException.ThrowIfGreaterThan(maxPriority, (byte)32); + + Target.MaxPriority = maxPriority; + return this; + } + + /// + /// Configures native delayed retry for quorum queues (RabbitMQ 4.3+). + /// Rejected/failed messages are held in a delayed state with linear backoff before redelivery. + /// This replaces the need for the delayed message exchange plugin for retry scenarios. + /// + /// Minimum delay in milliseconds (multiplied by delivery count). + /// Maximum delay cap in milliseconds. + /// Retry type controlling which messages are delayed. Default: All. + public RabbitMQMessageBusOptionsBuilder UseDelayedRetries(int minDelayMs = 1000, int maxDelayMs = 60000, DelayedRetryType retryType = DelayedRetryType.All) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(minDelayMs, 0); + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(maxDelayMs, 0); + + if (maxDelayMs < minDelayMs) + throw new ArgumentOutOfRangeException(nameof(maxDelayMs), $"maxDelayMs ({maxDelayMs}) must be >= minDelayMs ({minDelayMs})"); + + Target.DelayedRetryType = retryType; + Target.DelayedRetryMin = minDelayMs; + Target.DelayedRetryMax = maxDelayMs; + return this; + } } diff --git a/tests/Foundatio.RabbitMQ.Tests/AspireFixture.cs b/tests/Foundatio.RabbitMQ.Tests/AspireFixture.cs index a99cec7..add1c42 100644 --- a/tests/Foundatio.RabbitMQ.Tests/AspireFixture.cs +++ b/tests/Foundatio.RabbitMQ.Tests/AspireFixture.cs @@ -10,54 +10,80 @@ namespace Foundatio.RabbitMQ.Tests; public class AspireFixture : IAsyncLifetime { - private static readonly Lazy> SharedApp = new(StartAppAsync, LazyThreadSafetyMode.ExecutionAndPublication); + private static readonly Lazy> SharedApp = new(StartAppAsync, LazyThreadSafetyMode.ExecutionAndPublication); private DistributedApplication? _app; - public DistributedApplication App => _app ?? throw new InvalidOperationException("Fixture not initialized"); + public DistributedApplication App => _app ?? throw new InvalidOperationException("Fixture not initialized - Aspire AppHost failed to start"); public string? MessagingConnectionString { get; private set; } public string? MessagingDelayedConnectionString { get; private set; } + public bool ChaosClusterAvailable { get; private set; } + public bool IsAvailable => _app is not null && MessagingConnectionString is not null; public async ValueTask InitializeAsync() { _app = await SharedApp.Value; + if (_app is null) + return; - MessagingConnectionString = await _app.GetConnectionStringAsync("messaging") - ?? throw new InvalidOperationException("Could not get messaging connection string"); + MessagingConnectionString = await _app.GetConnectionStringAsync("messaging"); + if (MessagingConnectionString is null) + return; try { await _app.ResourceNotifications.WaitForResourceAsync( "messaging-delayed", KnownResourceStates.Running) - .WaitAsync(TimeSpan.FromSeconds(120)); + .WaitAsync(TimeSpan.FromSeconds(60)); var delayedEndpoint = _app.GetEndpoint("messaging-delayed", "amqp"); MessagingDelayedConnectionString = $"amqp://guest:guest@{delayedEndpoint.Host}:{delayedEndpoint.Port}"; } - catch (TimeoutException) + catch (Exception ex) when (ex is not OutOfMemoryException and not StackOverflowException) { MessagingDelayedConnectionString = null; } + + try + { + await Task.WhenAll( + _app.ResourceNotifications.WaitForResourceHealthyAsync("chaos-1") + .WaitAsync(TimeSpan.FromSeconds(30)), + _app.ResourceNotifications.WaitForResourceHealthyAsync("chaos-2") + .WaitAsync(TimeSpan.FromSeconds(30)), + _app.ResourceNotifications.WaitForResourceHealthyAsync("chaos-3") + .WaitAsync(TimeSpan.FromSeconds(30)) + ); + ChaosClusterAvailable = true; + } + catch (Exception ex) when (ex is not OutOfMemoryException and not StackOverflowException) + { + ChaosClusterAvailable = false; + } } - private static async Task StartAppAsync() + private static async Task StartAppAsync() { - var appHost = await DistributedApplicationTestingBuilder - .CreateAsync(); + try + { + var appHost = await DistributedApplicationTestingBuilder + .CreateAsync() + .WaitAsync(TimeSpan.FromMinutes(5)); - var app = await appHost.BuildAsync(); - using var startCts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); - await app.StartAsync(startCts.Token); + var app = await appHost.BuildAsync() + .WaitAsync(TimeSpan.FromMinutes(1)); - await app.ResourceNotifications.WaitForResourceHealthyAsync("messaging") - .WaitAsync(TimeSpan.FromSeconds(120)); + using var startCts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + await app.StartAsync(startCts.Token); - for (int i = 1; i <= 3; i++) - { - await app.ResourceNotifications.WaitForResourceHealthyAsync($"chaos-{i}") + await app.ResourceNotifications.WaitForResourceHealthyAsync("messaging") .WaitAsync(TimeSpan.FromSeconds(120)); - } - return app; + return app; + } + catch (Exception ex) when (ex is not OutOfMemoryException and not StackOverflowException) + { + return null; + } } public ValueTask DisposeAsync() => ValueTask.CompletedTask; diff --git a/tests/Foundatio.RabbitMQ.Tests/ChaosTestHelper.cs b/tests/Foundatio.RabbitMQ.Tests/ChaosTestHelper.cs index 9acb38f..1ea51b4 100644 --- a/tests/Foundatio.RabbitMQ.Tests/ChaosTestHelper.cs +++ b/tests/Foundatio.RabbitMQ.Tests/ChaosTestHelper.cs @@ -47,6 +47,36 @@ public async Task StartNodeAsync(string resourceName, CancellationToken cancella await RunDockerCommandAsync($"start {containerId}", cancellationToken); } + public async Task WaitForNodeReadyAsync(string resourceName, TimeSpan timeout, CancellationToken cancellationToken = default) + { + using var deadlineCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + deadlineCts.CancelAfter(timeout); + var linkedToken = deadlineCts.Token; + + while (!linkedToken.IsCancellationRequested) + { + try + { + var containerId = await GetContainerIdAsync(resourceName, cancellationToken: linkedToken); + var output = await DockerExecAsync(containerId, "rabbitmqctl status", linkedToken); + if (output.Contains("pid", StringComparison.OrdinalIgnoreCase)) + return; + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) when (ex is not OutOfMemoryException and not StackOverflowException) + { + _logger.LogTrace(ex, "Node {Resource} not ready yet: {Message}, retrying...", resourceName, ex.Message); + } + + await Task.Delay(TimeSpan.FromSeconds(1), linkedToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + } + + throw new TimeoutException($"Node '{resourceName}' did not become ready within {timeout.TotalSeconds}s"); + } + public async Task HasDiskAlarmAsync(string resourceName, CancellationToken cancellationToken = default) { var containerId = await GetContainerIdAsync(resourceName, cancellationToken: cancellationToken); @@ -61,23 +91,48 @@ public async Task WaitForAlarmActiveAsync(string resourceName, TimeSpan timeout, { if (await HasDiskAlarmAsync(resourceName, cancellationToken)) return; - await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken); + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); } throw new TimeoutException($"Disk alarm on '{resourceName}' did not activate within {timeout.TotalSeconds}s"); - } public async Task WaitForAlarmClearedAsync(string resourceName, TimeSpan timeout, CancellationToken cancellationToken = default) + } + + public async Task WaitForAlarmClearedAsync(string resourceName, TimeSpan timeout, CancellationToken cancellationToken = default) { var deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { if (!await HasDiskAlarmAsync(resourceName, cancellationToken)) return; - await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken); + await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken); } throw new TimeoutException($"Disk alarm on '{resourceName}' did not clear within {timeout.TotalSeconds}s"); } + public async Task TriggerMemoryAlarmAsync(string resourceName, CancellationToken cancellationToken = default) + { + _logger.LogInformation("Setting vm_memory_high_watermark to 0.0001 on {Resource} to trigger memory alarm", resourceName); + var containerId = await GetContainerIdAsync(resourceName, cancellationToken: cancellationToken); + await DockerExecAsync(containerId, "rabbitmqctl set_vm_memory_high_watermark 0.0001", cancellationToken); + } + + private const string TestResetMemoryWatermark = "0.8"; + + public async Task ClearMemoryAlarmAsync(string resourceName, CancellationToken cancellationToken = default) + { + _logger.LogInformation("Resetting vm_memory_high_watermark to test default ({Watermark}) on {Resource}", TestResetMemoryWatermark, resourceName); + var containerId = await GetContainerIdAsync(resourceName, cancellationToken: cancellationToken); + await DockerExecAsync(containerId, $"rabbitmqctl set_vm_memory_high_watermark {TestResetMemoryWatermark}", cancellationToken); + } + + public async Task CloseAllConnectionsAsync(string resourceName, CancellationToken cancellationToken = default) + { + _logger.LogInformation("Force-closing all connections on {Resource}", resourceName); + var containerId = await GetContainerIdAsync(resourceName, cancellationToken: cancellationToken); + await DockerExecAsync(containerId, "rabbitmqctl close_all_connections chaos-test", cancellationToken); + } + public string GetConnectionString(string resourceName) { var endpoint = _app.GetEndpoint(resourceName, "amqp"); @@ -108,6 +163,9 @@ private static Task DockerExecAsync(string containerId, string command, private static async Task RunDockerCommandAsync(string args, CancellationToken cancellationToken) { + using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + timeoutCts.CancelAfter(TimeSpan.FromSeconds(30)); + using var process = new Process(); process.StartInfo = new ProcessStartInfo { @@ -123,10 +181,10 @@ private static async Task RunDockerCommandAsync(string args, Cancellatio try { - var outputTask = process.StandardOutput.ReadToEndAsync(cancellationToken); - var errorTask = process.StandardError.ReadToEndAsync(cancellationToken); + var outputTask = process.StandardOutput.ReadToEndAsync(timeoutCts.Token); + var errorTask = process.StandardError.ReadToEndAsync(timeoutCts.Token); await Task.WhenAll(outputTask, errorTask); - await process.WaitForExitAsync(cancellationToken); + await process.WaitForExitAsync(timeoutCts.Token); if (process.ExitCode != 0) throw new InvalidOperationException( diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqChaosTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqChaosTests.cs index 691311e..1dd55a5 100644 --- a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqChaosTests.cs +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqChaosTests.cs @@ -20,8 +20,10 @@ public class RabbitMqChaosTests(AspireFixture fixture, ITestOutputHelper output) [Fact] public async Task PublishAsync_DuringDiskAlarm_BlocksUntilAlarmClears() { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + // Arrange - var connectionString = Chaos.GetConnectionString("chaos-1"); + string connectionString = Chaos.GetConnectionString("chaos-1"); await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(connectionString) .Topic("chaos-disk-alarm-test-" + Guid.NewGuid().ToString("N")[..8]) @@ -33,13 +35,13 @@ public async Task PublishAsync_DuringDiskAlarm_BlocksUntilAlarmClears() // Act - trigger disk alarm await Chaos.FillDiskAsync("chaos-1", TestCancellationToken); - await Chaos.WaitForAlarmActiveAsync("chaos-1", TimeSpan.FromSeconds(30), TestCancellationToken); + await Chaos.WaitForAlarmActiveAsync("chaos-1", TimeSpan.FromSeconds(10), TestCancellationToken); // Issue a publish with a short timeout. The disk alarm should eventually block // the connection, causing this to either timeout or succeed quickly before the // block notification arrives. Either outcome is acceptable; the key assertion // is that publishing resumes after the alarm clears. - using var alarmPublishCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + using var alarmPublishCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); try { await messageBus.PublishAsync(new SimpleMessageA { Data = "during alarm" }, @@ -52,10 +54,10 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "during alarm" }, // Clear alarm and verify publish resumes await Chaos.ClearDiskAsync("chaos-1", TestCancellationToken); - await Chaos.WaitForAlarmClearedAsync("chaos-1", TimeSpan.FromSeconds(30), TestCancellationToken); + await Chaos.WaitForAlarmClearedAsync("chaos-1", TimeSpan.FromSeconds(10), TestCancellationToken); // After clearing, a new publish should succeed - using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); bool published = false; while (!recoveryCts.Token.IsCancellationRequested && !published) { @@ -65,9 +67,9 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "after-clear" }, cancellationToken: recoveryCts.Token); published = true; } - catch (Exception ex) when (ex is not OperationCanceledException) + catch (Exception ex) when (ex is not OperationCanceledException and not OutOfMemoryException and not StackOverflowException) { - await Task.Delay(TimeSpan.FromSeconds(2), recoveryCts.Token); + await Task.Delay(TimeSpan.FromSeconds(1), recoveryCts.Token); } } @@ -82,8 +84,10 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "after-clear" }, [Fact] public async Task SubscribeAsync_DuringDiskAlarm_ContinuesReceivingAfterRecovery() { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + // Arrange - var connectionString = Chaos.GetConnectionString("chaos-2"); + string connectionString = Chaos.GetConnectionString("chaos-2"); var received = new ConcurrentBag(); await using var messageBus = new RabbitMQMessageBus(o => o @@ -97,19 +101,19 @@ await messageBus.SubscribeAsync(msg => }, TestCancellationToken); await messageBus.PublishAsync(new SimpleMessageA { Data = "before-alarm" }, cancellationToken: TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); // Act - trigger alarm and then clear it await Chaos.FillDiskAsync("chaos-2", TestCancellationToken); - await Chaos.WaitForAlarmActiveAsync("chaos-2", TimeSpan.FromSeconds(30), TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); + await Chaos.WaitForAlarmActiveAsync("chaos-2", TimeSpan.FromSeconds(10), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); await Chaos.ClearDiskAsync("chaos-2", TestCancellationToken); - await Chaos.WaitForAlarmClearedAsync("chaos-2", TimeSpan.FromSeconds(30), TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); + await Chaos.WaitForAlarmClearedAsync("chaos-2", TimeSpan.FromSeconds(10), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); await messageBus.PublishAsync(new SimpleMessageA { Data = "after-recovery" }, cancellationToken: TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); // Assert Assert.Contains("before-alarm", received); @@ -119,8 +123,10 @@ await messageBus.SubscribeAsync(msg => [Fact] public async Task PublishAsync_AfterNodeRestart_RecoversAndDelivers() { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + // Arrange - var connectionString = Chaos.GetConnectionString("chaos-3"); + string connectionString = Chaos.GetConnectionString("chaos-3"); await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(connectionString) @@ -131,12 +137,12 @@ public async Task PublishAsync_AfterNodeRestart_RecoversAndDelivers() // Act - kill and restart await Chaos.StopNodeAsync("chaos-3", TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(5), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); await Chaos.StartNodeAsync("chaos-3", TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(15), TestCancellationToken); + await Chaos.WaitForNodeReadyAsync("chaos-3", TimeSpan.FromSeconds(30), TestCancellationToken); // Assert - publish should eventually succeed after recovery - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); bool published = false; while (!cts.Token.IsCancellationRequested && !published) @@ -147,10 +153,10 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "after-restart" }, cancellationToken: cts.Token); published = true; } - catch (Exception ex) when (ex is not OperationCanceledException) + catch (Exception ex) when (ex is not OperationCanceledException and not OutOfMemoryException and not StackOverflowException) { _logger.LogWarning(ex, "Publish failed during recovery, retrying..."); - await Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); } } @@ -160,16 +166,18 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "after-restart" }, [Fact] public async Task PublishAsync_WithMultipleHosts_FailsOverToHealthyNode() { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + // Arrange - var host1 = Chaos.GetConnectionString("chaos-1"); - var host2 = Chaos.GetConnectionString("chaos-2"); - var host3 = Chaos.GetConnectionString("chaos-3"); + string host1 = Chaos.GetConnectionString("chaos-1"); + string host2 = Chaos.GetConnectionString("chaos-2"); + string host3 = Chaos.GetConnectionString("chaos-3"); var uri2 = new Uri(host2); var uri3 = new Uri(host3); await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(host1) - .Hosts([$"{uri2.Host}:{uri2.Port}", $"{uri3.Host}:{uri3.Port}"]) + .Hosts($"{uri2.Host}:{uri2.Port}", $"{uri3.Host}:{uri3.Port}") .Topic("chaos-failover-test-" + Guid.NewGuid().ToString("N")[..8]) .LoggerFactory(Log)); @@ -177,12 +185,12 @@ public async Task PublishAsync_WithMultipleHosts_FailsOverToHealthyNode() // Act - trigger disk alarm on primary await Chaos.FillDiskAsync("chaos-1", TestCancellationToken); - await Chaos.WaitForAlarmActiveAsync("chaos-1", TimeSpan.FromSeconds(30), TestCancellationToken); + await Chaos.WaitForAlarmActiveAsync("chaos-1", TimeSpan.FromSeconds(10), TestCancellationToken); try { // Assert - should still be able to publish (failover to chaos-2 or chaos-3) - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); bool published = false; while (!cts.Token.IsCancellationRequested && !published) @@ -193,10 +201,10 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "via-failover" }, cancellationToken: cts.Token); published = true; } - catch (Exception ex) when (ex is not OperationCanceledException) + catch (Exception ex) when (ex is not OperationCanceledException and not OutOfMemoryException and not StackOverflowException) { _logger.LogWarning(ex, "Publish attempt failed, retrying..."); - await Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); } } @@ -212,16 +220,18 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "via-failover" }, [Fact] public async Task PublishAsync_DuringQuorumLoss_RetriesAndResumesWhenNodeRejoins() { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + // Arrange - connect to all 3 cluster nodes - var host1 = Chaos.GetConnectionString("chaos-1"); - var host2 = Chaos.GetConnectionString("chaos-2"); - var host3 = Chaos.GetConnectionString("chaos-3"); + string host1 = Chaos.GetConnectionString("chaos-1"); + string host2 = Chaos.GetConnectionString("chaos-2"); + string host3 = Chaos.GetConnectionString("chaos-3"); var uri2 = new Uri(host2); var uri3 = new Uri(host3); await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(host1) - .Hosts([$"{uri2.Host}:{uri2.Port}", $"{uri3.Host}:{uri3.Port}"]) + .Hosts($"{uri2.Host}:{uri2.Port}", $"{uri3.Host}:{uri3.Port}") .Topic("chaos-quorum-loss-test-" + Guid.NewGuid().ToString("N")[..8]) .LoggerFactory(Log)); @@ -230,10 +240,10 @@ public async Task PublishAsync_DuringQuorumLoss_RetriesAndResumesWhenNodeRejoins // Act - kill 2 of 3 nodes (causes quorum loss in RabbitMQ 4.x Raft) await Chaos.StopNodeAsync("chaos-2", TestCancellationToken); await Chaos.StopNodeAsync("chaos-3", TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(5), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); // Publish should fail/timeout during quorum loss - using var failCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + using var failCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var publishDuringLoss = await Record.ExceptionAsync(() => messageBus.PublishAsync(new SimpleMessageA { Data = "during-quorum-loss" }, cancellationToken: failCts.Token)); @@ -245,10 +255,11 @@ public async Task PublishAsync_DuringQuorumLoss_RetriesAndResumesWhenNodeRejoins // Act - bring nodes back to restore quorum await Chaos.StartNodeAsync("chaos-2", TestCancellationToken); await Chaos.StartNodeAsync("chaos-3", TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(30), TestCancellationToken); + await Chaos.WaitForNodeReadyAsync("chaos-2", TimeSpan.FromSeconds(30), TestCancellationToken); + await Chaos.WaitForNodeReadyAsync("chaos-3", TimeSpan.FromSeconds(30), TestCancellationToken); // Assert - publishing should resume once quorum is restored - using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(120)); + using var recoveryCts = new CancellationTokenSource(TimeSpan.FromSeconds(45)); bool published = false; while (!recoveryCts.Token.IsCancellationRequested && !published) @@ -259,10 +270,10 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "after-quorum-restored cancellationToken: recoveryCts.Token); published = true; } - catch (Exception ex) when (ex is not OperationCanceledException) + catch (Exception ex) when (ex is not OperationCanceledException and not OutOfMemoryException and not StackOverflowException) { _logger.LogWarning(ex, "Publish still failing during recovery, retrying..."); - await Task.Delay(TimeSpan.FromSeconds(3), recoveryCts.Token); + await Task.Delay(TimeSpan.FromSeconds(2), recoveryCts.Token); } } @@ -272,8 +283,10 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "after-quorum-restored [Fact] public async Task PublishAsync_WithPublisherConfirms_DuringDiskAlarm_FailsOrTimesOut() { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + // Arrange - var connectionString = Chaos.GetConnectionString("chaos-1"); + string connectionString = Chaos.GetConnectionString("chaos-1"); await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(connectionString) @@ -286,12 +299,12 @@ public async Task PublishAsync_WithPublisherConfirms_DuringDiskAlarm_FailsOrTime // Act await Chaos.FillDiskAsync("chaos-1", TestCancellationToken); - await Chaos.WaitForAlarmActiveAsync("chaos-1", TimeSpan.FromSeconds(30), TestCancellationToken); + await Chaos.WaitForAlarmActiveAsync("chaos-1", TimeSpan.FromSeconds(10), TestCancellationToken); try { // Assert - publish with confirms should fail or timeout during alarm - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var exception = await Record.ExceptionAsync(() => messageBus.PublishAsync(new SimpleMessageA { Data = "during alarm" }, cancellationToken: cts.Token)); @@ -309,8 +322,10 @@ public async Task PublishAsync_WithPublisherConfirms_DuringDiskAlarm_FailsOrTime [Fact] public async Task SubscribeAsync_AfterNodeKill_ReconnectsAndReceivesMessages() { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + // Arrange - var connectionString = Chaos.GetConnectionString("chaos-3"); + string connectionString = Chaos.GetConnectionString("chaos-3"); var received = new ConcurrentBag(); await using var messageBus = new RabbitMQMessageBus(o => o @@ -324,17 +339,17 @@ await messageBus.SubscribeAsync(msg => }, TestCancellationToken); await messageBus.PublishAsync(new SimpleMessageA { Data = "before-kill" }, cancellationToken: TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); Assert.Contains("before-kill", received); // Act - kill node and restart await Chaos.StopNodeAsync("chaos-3", TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(5), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); await Chaos.StartNodeAsync("chaos-3", TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(15), TestCancellationToken); + await Chaos.WaitForNodeReadyAsync("chaos-3", TimeSpan.FromSeconds(30), TestCancellationToken); // Assert - subscriber should reconnect and receive new messages - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); bool messageReceived = false; while (!cts.Token.IsCancellationRequested && !messageReceived) @@ -343,13 +358,13 @@ await messageBus.SubscribeAsync(msg => { await messageBus.PublishAsync(new SimpleMessageA { Data = "after-kill" }, cancellationToken: cts.Token); - await Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); messageReceived = received.Contains("after-kill"); } - catch (Exception ex) when (ex is not OperationCanceledException) + catch (Exception ex) when (ex is not OperationCanceledException and not OutOfMemoryException and not StackOverflowException) { _logger.LogWarning(ex, "Publish/subscribe still recovering..."); - await Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); } } @@ -359,8 +374,10 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "after-kill" }, [Fact] public async Task PublishAsync_DuringRapidNodeFlapping_RemainsResilient() { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + // Arrange - var connectionString = Chaos.GetConnectionString("chaos-2"); + string connectionString = Chaos.GetConnectionString("chaos-2"); await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(connectionString) @@ -374,15 +391,15 @@ public async Task PublishAsync_DuringRapidNodeFlapping_RemainsResilient() { _logger.LogInformation("Flap cycle {Cycle}/3: killing node", i + 1); await Chaos.StopNodeAsync("chaos-2", TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); _logger.LogInformation("Flap cycle {Cycle}/3: restarting node", i + 1); await Chaos.StartNodeAsync("chaos-2", TestCancellationToken); - await Task.Delay(TimeSpan.FromSeconds(10), TestCancellationToken); + await Chaos.WaitForNodeReadyAsync("chaos-2", TimeSpan.FromSeconds(20), TestCancellationToken); } // Assert - should eventually be able to publish after flapping stabilizes - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20)); bool published = false; while (!cts.Token.IsCancellationRequested && !published) @@ -393,10 +410,10 @@ await messageBus.PublishAsync(new SimpleMessageA { Data = "after-flapping" }, cancellationToken: cts.Token); published = true; } - catch (Exception ex) when (ex is not OperationCanceledException) + catch (Exception ex) when (ex is not OperationCanceledException and not OutOfMemoryException and not StackOverflowException) { _logger.LogWarning(ex, "Still recovering from flapping..."); - await Task.Delay(TimeSpan.FromSeconds(2), cts.Token); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); } } diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusClassicTestBase.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusClassicTestBase.cs index 1799872..e9ba7e4 100644 --- a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusClassicTestBase.cs +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusClassicTestBase.cs @@ -17,6 +17,9 @@ public RabbitMqMessageBusClassicTestBase(string connectionString, ITestOutputHel protected override IMessageBus? GetMessageBus(Func? config = null) { + if (string.IsNullOrEmpty(ConnectionString)) + return null; + return new RabbitMQMessageBus(o => { o.ConnectionString(ConnectionString); @@ -31,6 +34,8 @@ public RabbitMqMessageBusClassicTestBase(string connectionString, ITestOutputHel [Fact] public override async Task CanHandlePoisonedMessageWithAutomaticAcknowledgementsAsync() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs index 41d9828..1b1a98b 100644 --- a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqMessageBusTestBase.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Concurrent; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Foundatio.AsyncEx; @@ -17,6 +19,9 @@ public abstract class RabbitMqMessageBusTestBase(string connectionString, ITestO protected override IMessageBus? GetMessageBus(Func? config = null) { + if (string.IsNullOrEmpty(ConnectionString)) + return null; + return new RabbitMQMessageBus(o => { o.SubscriptionQueueName($"{_topic}_{Guid.NewGuid():N}"); @@ -244,6 +249,8 @@ public override Task SubscribeAsync_WithValidThenPoisonedMessage_DeliversOnlyVal [Fact] public virtual async Task CanHandlePoisonedMessageWithAutomaticAcknowledgementsAsync() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + string topic = $"test_topic_poisoned_{DateTime.UtcNow.Ticks}"; await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) @@ -275,9 +282,58 @@ await messageBus.SubscribeAsync(_ => } } + [Fact] + public async Task PublishAsync_WithPriority_DeliversHighPriorityFirst() + { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + + // Arrange + string topic = $"test_topic_priority_{DateTime.UtcNow.Ticks}"; + string queueName = $"{topic}_{Guid.NewGuid():N}"; + + await using var publisher = new RabbitMQMessageBus(o => o + .ConnectionString(ConnectionString) + .SubscriptionQueueName(queueName) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic) + .UseQuorumQueues() + .UseMessagePriority() + .PrefetchCount(1) + .LoggerFactory(Log)); + + await publisher.PublishAsync(new SimpleMessageA { Data = "low" }, + new MessageOptions { Properties = { ["Priority"] = "1" } }, TestCancellationToken); + await publisher.PublishAsync(new SimpleMessageA { Data = "high" }, + new MessageOptions { Properties = { ["Priority"] = "10" } }, TestCancellationToken); + await publisher.PublishAsync(new SimpleMessageA { Data = "medium" }, + new MessageOptions { Properties = { ["Priority"] = "5" } }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromMilliseconds(500), TestCancellationToken); + + var received = new ConcurrentQueue(); + var countdownEvent = new AsyncCountdownEvent(3); + + // Act + await publisher.SubscribeAsync(msg => + { + received.Enqueue(msg.Data!); + countdownEvent.Signal(); + }, TestCancellationToken); + + await countdownEvent.WaitAsync(TimeSpan.FromSeconds(10)); + + // Assert + var messages = received.ToArray(); + Assert.Equal(3, messages.Length); + Assert.Equal("high", messages[0]); + Assert.Equal("medium", messages[1]); + Assert.Equal("low", messages[2]); + } + [Fact] public async Task CanPersistAndNotLoseMessages() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + var messageBus1 = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs index 38bfef1..ca417bb 100644 --- a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqPublishResilienceTests.cs @@ -13,6 +13,8 @@ public class RabbitMqPublishResilienceTests(AspireFixture fixture, ITestOutputHe [Fact] public async Task PublishAsync_WithRecoveryTimeoutDisabled_FailsImmediatelyOnConnectionDrop() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) @@ -33,6 +35,8 @@ public async Task PublishAsync_WithRecoveryTimeoutDisabled_FailsImmediatelyOnCon [Fact] public async Task PublishAsync_DuringRecovery_WaitsAndSucceeds() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) @@ -57,6 +61,8 @@ public async Task PublishAsync_DuringRecovery_WaitsAndSucceeds() [Fact] public async Task PublishAsync_RecoveryTimeout_ThrowsMessageBusException() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) @@ -76,6 +82,8 @@ public async Task PublishAsync_RecoveryTimeout_ThrowsMessageBusException() [Fact] public async Task PublishAsync_CancellationDuringRecovery_RespectsCancellation() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) @@ -94,6 +102,8 @@ await Assert.ThrowsAnyAsync(() => [Fact] public async Task PublishAsync_WhenConnectionHealthy_SucceedsImmediately() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) @@ -109,6 +119,8 @@ public async Task PublishAsync_WhenConnectionHealthy_SucceedsImmediately() [Fact] public async Task PublishAsync_DuringDisposal_FailsFastWithOperationCanceledException() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) @@ -140,6 +152,8 @@ public async Task PublishAsync_DuringDisposal_FailsFastWithOperationCanceledExce [Fact] public async Task PublishAsync_RecoveryErrorDoesNotOpenGate_WaitsUntilTimeout() { + Assert.SkipWhen(string.IsNullOrEmpty(ConnectionString), "RabbitMQ infrastructure not available"); + await using var messageBus = new RabbitMQMessageBus(o => o .ConnectionString(ConnectionString) .LoggerFactory(Log) diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqScalingTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqScalingTests.cs new file mode 100644 index 0000000..571b964 --- /dev/null +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqScalingTests.cs @@ -0,0 +1,575 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Foundatio.AsyncEx; +using Foundatio.Messaging; +using Foundatio.Tests.Extensions; +using Foundatio.Tests.Messaging; +using Foundatio.Xunit; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace Foundatio.RabbitMQ.Tests.Messaging; + +public class RabbitMqScalingTests(AspireFixture fixture, ITestOutputHelper output) + : TestWithLoggingBase(output), IClassFixture +{ + private ChaosTestHelper? _chaos; + private ChaosTestHelper Chaos => _chaos ??= new(fixture.App, Log); + + [Fact] + public async Task SubscribeAsync_WithCompetingConsumers_DistributesMessagesAcrossAll() + { + Assert.SkipWhen(!fixture.IsAvailable, "RabbitMQ infrastructure not available"); + + // Arrange + string topic = "scaling-competing-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-shared"; + const int messageCount = 50; + const int consumerCount = 3; + + var received = new ConcurrentDictionary>(); + for (int i = 0; i < consumerCount; i++) + received[i] = []; + + var buses = new RabbitMQMessageBus[consumerCount]; + var allReceived = new AsyncCountdownEvent(messageCount); + + try + { + for (int i = 0; i < consumerCount; i++) + { + int consumerIndex = i; + buses[i] = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .SubscriptionQueueName(queueName) + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic) + .PrefetchCount(1) + .UseQuorumQueues() + .LoggerFactory(Log)); + + await buses[i].SubscribeAsync(msg => + { + received[consumerIndex].Add(msg.Data!); + allReceived.Signal(); + }, TestCancellationToken); + } + + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + + // Act + await using var publisher = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .LoggerFactory(Log)); + + for (int i = 0; i < messageCount; i++) + { + await publisher.PublishAsync(new SimpleMessageA { Data = $"msg-{i}" }, + cancellationToken: TestCancellationToken); + } + + await allReceived.WaitAsync(TimeSpan.FromSeconds(30)); + + // Assert + int totalReceived = received.Values.Sum(b => b.Count); + Assert.Equal(messageCount, totalReceived); + + foreach (var (consumerIndex, bag) in received) + { + _logger.LogInformation("Consumer {Index} received {Count} messages", consumerIndex, bag.Count); + Assert.True(bag.Count > 0, $"Consumer {consumerIndex} should have received at least 1 message (got 0 of {messageCount})"); + } + } + finally + { + foreach (var bus in buses.OfType()) + await bus.DisposeAsync(); + } + } + + [Fact] + public async Task SubscribeAsync_WithPrefetchLimit_OnlyDeliversUpToPrefetchCount() + { + Assert.SkipWhen(!fixture.IsAvailable, "RabbitMQ infrastructure not available"); + + // Arrange + string topic = "scaling-prefetch-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-prefetch"; + const ushort prefetchCount = 2; + const int messageCount = 10; + + var deliveredBeforeAck = new ConcurrentBag(); + var releaseGate = new AsyncManualResetEvent(false); + + await using var messageBus = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .SubscriptionQueueName(queueName) + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic) + .PrefetchCount(prefetchCount) + .UseQuorumQueues() + .LoggerFactory(Log)); + + try + { + await messageBus.SubscribeAsync(async msg => + { + deliveredBeforeAck.Add(msg.Data!); + _logger.LogInformation("Received message: {Data} (total delivered: {Count})", msg.Data, deliveredBeforeAck.Count); + await releaseGate.WaitAsync(TestCancellationToken); + }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + + // Act + await using var publisher = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .LoggerFactory(Log)); + + for (int i = 0; i < messageCount; i++) + { + await publisher.PublishAsync(new SimpleMessageA { Data = $"prefetch-{i}" }, + cancellationToken: TestCancellationToken); + } + + await Task.Delay(TimeSpan.FromSeconds(5), TestCancellationToken); + + // Assert + int deliveredWhileBlocked = deliveredBeforeAck.Count; + _logger.LogInformation("Messages delivered while consumer is blocked: {Count} (prefetch={Prefetch})", + deliveredWhileBlocked, prefetchCount); + + Assert.True(deliveredWhileBlocked <= prefetchCount, + $"Expected at most {prefetchCount} messages delivered while consumer is blocked, but got {deliveredWhileBlocked}"); + } + finally + { + releaseGate.Set(); + } + } + + [Fact] + public async Task PublishAsync_WithConfirmsEnabled_GuaranteesDeliveryToSubscriber() + { + Assert.SkipWhen(!fixture.IsAvailable, "RabbitMQ infrastructure not available"); + + // Arrange + string topic = "scaling-confirms-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-confirmed"; + var received = new ConcurrentBag(); + var messageReceived = new AsyncCountdownEvent(1); + + await using var subscriber = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .SubscriptionQueueName(queueName) + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic) + .UseQuorumQueues() + .LoggerFactory(Log)); + + await subscriber.SubscribeAsync(msg => + { + received.Add(msg.Data!); + messageReceived.Signal(); + }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + + await using var publisher = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .PublisherConfirmsEnabled(true) + .LoggerFactory(Log)); + + // Act + await publisher.PublishAsync(new SimpleMessageA { Data = "confirmed-message" }, + cancellationToken: TestCancellationToken); + + // Assert + await messageReceived.WaitAsync(TimeSpan.FromSeconds(10)); + Assert.Contains("confirmed-message", received); + } + + [Fact] + public async Task SubscribeAsync_WithMismatchedQueueArguments_ThrowsWithoutRetry() + { + Assert.SkipWhen(!fixture.IsAvailable, "RabbitMQ infrastructure not available"); + + // Arrange + string topic = "scaling-mismatch-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-mismatch"; + + await using var classicBus = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .SubscriptionQueueName(queueName) + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .IsDurable(true) + .LoggerFactory(Log)); + + await classicBus.SubscribeAsync(_ => { }, TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + + // Act + var exception = await Record.ExceptionAsync(async () => + { + await using var quorumBus = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .SubscriptionQueueName(queueName) + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .IsDurable(true) + .UseQuorumQueues() + .LoggerFactory(Log)); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await quorumBus.SubscribeAsync(_ => { }, cts.Token); + }); + + // Assert + _logger.LogInformation("Queue mismatch exception: {Type}: {Message}", + exception?.GetType().Name, exception?.Message); + + Assert.NotNull(exception); + } + + [Fact] + public async Task SubscribeAsync_AfterMemoryAlarm_ResumesReceivingMessages() + { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + + // Arrange + var connectionString = Chaos.GetConnectionString("chaos-1"); + var received = new ConcurrentBag(); + + await using var messageBus = new RabbitMQMessageBus(o => o + .ConnectionString(connectionString) + .Topic("scaling-memory-alarm-" + Guid.NewGuid().ToString("N")[..8]) + .LoggerFactory(Log)); + + await messageBus.SubscribeAsync(msg => + { + received.Add(msg.Data!); + }, TestCancellationToken); + + await messageBus.PublishAsync(new SimpleMessageA { Data = "before-memory-alarm" }, + cancellationToken: TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + Assert.Contains("before-memory-alarm", received); + + // Act + try + { + await Chaos.TriggerMemoryAlarmAsync("chaos-1", TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); + + await Chaos.ClearMemoryAlarmAsync("chaos-1", TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + bool published = false; + while (!cts.Token.IsCancellationRequested && !published) + { + try + { + await messageBus.PublishAsync(new SimpleMessageA { Data = "after-memory-alarm" }, + cancellationToken: cts.Token); + published = true; + } + catch (Exception ex) when (ex is not OperationCanceledException and not OutOfMemoryException and not StackOverflowException) + { + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + } + } + + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + + // Assert + Assert.Contains("after-memory-alarm", received); + } + finally + { + await Chaos.ClearMemoryAlarmAsync("chaos-1", TestCancellationToken); + } + } + + [Fact] + public async Task SubscribeAsync_AfterConnectionForceClose_ReconnectsAndResumes() + { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + + // Arrange + var connectionString = Chaos.GetConnectionString("chaos-2"); + var received = new ConcurrentBag(); + + await using var messageBus = new RabbitMQMessageBus(o => o + .ConnectionString(connectionString) + .Topic("scaling-force-close-" + Guid.NewGuid().ToString("N")[..8]) + .LoggerFactory(Log)); + + await messageBus.SubscribeAsync(msg => + { + received.Add(msg.Data!); + }, TestCancellationToken); + + await messageBus.PublishAsync(new SimpleMessageA { Data = "before-force-close" }, + cancellationToken: TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + Assert.Contains("before-force-close", received); + + // Act + await Chaos.CloseAllConnectionsAsync("chaos-2", TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(5), TestCancellationToken); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + bool messageReceived = false; + + while (!cts.Token.IsCancellationRequested && !messageReceived) + { + try + { + await messageBus.PublishAsync(new SimpleMessageA { Data = "after-force-close" }, + cancellationToken: cts.Token); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + messageReceived = received.Contains("after-force-close"); + } + catch (Exception ex) when (ex is not OperationCanceledException and not OutOfMemoryException and not StackOverflowException) + { + _logger.LogWarning(ex, "Still recovering from force-close..."); + await Task.Delay(TimeSpan.FromSeconds(1), cts.Token); + } + } + + // Assert + Assert.True(messageReceived, "Subscriber should receive messages after forced connection close"); + } + + [Fact] + public async Task PublishAsync_DuringRollingNodeRestart_MaintainsDeliveryWithQuorumQueues() + { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + + // Arrange + var host1 = Chaos.GetConnectionString("chaos-1"); + var host2 = Chaos.GetConnectionString("chaos-2"); + var host3 = Chaos.GetConnectionString("chaos-3"); + var uri1 = new Uri(host1); + var uri2 = new Uri(host2); + var uri3 = new Uri(host3); + + string topic = "scaling-rolling-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-rolling"; + var published = new ConcurrentBag(); + var received = new ConcurrentBag(); + + await using var publisher = new RabbitMQMessageBus(o => o + .ConnectionString(host1) + .Hosts($"{uri1.Host}:{uri1.Port}", $"{uri2.Host}:{uri2.Port}", $"{uri3.Host}:{uri3.Port}") + .Topic(topic) + .PublisherConfirmsEnabled(true) + .PublishRecoveryTimeout(TimeSpan.FromSeconds(30)) + .LoggerFactory(Log)); + + await using var subscriber = new RabbitMQMessageBus(o => o + .ConnectionString(host1) + .Hosts($"{uri1.Host}:{uri1.Port}", $"{uri2.Host}:{uri2.Port}", $"{uri3.Host}:{uri3.Port}") + .Topic(topic) + .SubscriptionQueueName(queueName) + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic) + .PrefetchCount(10) + .UseQuorumQueues() + .LoggerFactory(Log)); + + await subscriber.SubscribeAsync(msg => + { + received.Add(msg.Data!); + }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + + await publisher.PublishAsync(new SimpleMessageA { Data = "warmup" }, + cancellationToken: TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + + // Act + using var publishCts = new CancellationTokenSource(); + int publishCount = 0; + + var publishTask = Task.Run(async () => + { + while (!publishCts.Token.IsCancellationRequested) + { + try + { + var msg = $"rolling-{Interlocked.Increment(ref publishCount)}"; + await publisher.PublishAsync(new SimpleMessageA { Data = msg }, + cancellationToken: publishCts.Token); + published.Add(msg); + await Task.Delay(TimeSpan.FromMilliseconds(500), publishCts.Token); + } + catch (OperationCanceledException) + { + break; + } + catch (Exception ex) when (ex is not OutOfMemoryException and not StackOverflowException) + { + _logger.LogWarning(ex, "Publish failed during rolling restart, retrying..."); + await Task.Delay(TimeSpan.FromSeconds(1), publishCts.Token); + } + } + }, publishCts.Token); + + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + + string[] nodeOrder = ["chaos-1", "chaos-2", "chaos-3"]; + foreach (string node in nodeOrder) + { + _logger.LogInformation("Rolling restart: stopping {Node}", node); + await Chaos.StopNodeAsync(node, TestCancellationToken); + await Task.Delay(TimeSpan.FromSeconds(5), TestCancellationToken); + + _logger.LogInformation("Rolling restart: starting {Node}", node); + await Chaos.StartNodeAsync(node, TestCancellationToken); + await Chaos.WaitForNodeReadyAsync(node, TimeSpan.FromSeconds(30), TestCancellationToken); + } + + await Task.Delay(TimeSpan.FromSeconds(5), TestCancellationToken); + await publishCts.CancelAsync(); + + try { await publishTask; } + catch (OperationCanceledException) + { + _logger.LogDebug("Publish task cancelled during shutdown (expected)"); + } + + await Task.Delay(TimeSpan.FromSeconds(3), TestCancellationToken); + + // Assert + _logger.LogInformation("Rolling restart results: published={Published}, received={Received}", + published.Count, received.Count); + + Assert.True(published.Count > 0, "Should have published at least some messages during rolling restart"); + Assert.True(received.Count > 0, "Should have received messages during rolling restart"); + + int receivedRolling = received.Count(m => m.StartsWith("rolling-")); + double lossRate = published.Count > 0 + ? Math.Max(0.0, 1.0 - ((double)receivedRolling / published.Count)) + : 0.0; + _logger.LogInformation("Message loss rate: {LossRate:P2} (published={Pub}, received rolling={Recv})", + lossRate, published.Count, receivedRolling); + Assert.True(lossRate < 0.1, $"Message loss rate should be under 10% with quorum queues, was {lossRate:P2}"); + } + + [Fact] + public async Task SubscribeAsync_AfterConsumerDisconnectWithUnackedMessages_RedeliversToNewConsumer() + { + Assert.SkipWhen(!fixture.ChaosClusterAvailable, "Chaos cluster not available"); + + // Arrange + var host1 = Chaos.GetConnectionString("chaos-1"); + var host2 = Chaos.GetConnectionString("chaos-2"); + var host3 = Chaos.GetConnectionString("chaos-3"); + var uri1 = new Uri(host1); + var uri2 = new Uri(host2); + var uri3 = new Uri(host3); + + string topic = "scaling-inflight-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-inflight"; + var firstDeliveries = new ConcurrentBag(); + var redeliveries = new ConcurrentBag(); + var holdGate = new AsyncManualResetEvent(false); + var allHosts = new[] { $"{uri1.Host}:{uri1.Port}", $"{uri2.Host}:{uri2.Port}", $"{uri3.Host}:{uri3.Port}" }; + + await using var publisher = new RabbitMQMessageBus(o => o + .ConnectionString(host1) + .Hosts(allHosts) + .Topic(topic) + .PublisherConfirmsEnabled(true) + .LoggerFactory(Log)); + + RabbitMQMessageBus? subscriber1 = new RabbitMQMessageBus(o => o + .ConnectionString(host3) + .Hosts(allHosts) + .Topic(topic) + .SubscriptionQueueName(queueName) + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic) + .PrefetchCount(5) + .UseQuorumQueues() + .DeliveryLimit(5) + .LoggerFactory(Log)); + + try + { + await subscriber1.SubscribeAsync(async msg => + { + _logger.LogInformation("Subscriber1 received: {Data}", msg.Data); + firstDeliveries.Add(msg.Data!); + await holdGate.WaitAsync(TestCancellationToken); + }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + + for (int i = 0; i < 3; i++) + { + await publisher.PublishAsync(new SimpleMessageA { Data = $"inflight-{i}" }, + cancellationToken: TestCancellationToken); + } + + await Task.Delay(TimeSpan.FromSeconds(2), TestCancellationToken); + _logger.LogInformation("Messages delivered to subscriber1 before kill: {Count}", firstDeliveries.Count); + + // Act + await subscriber1.DisposeAsync(); + subscriber1 = null; + + await using var subscriber2 = new RabbitMQMessageBus(o => o + .ConnectionString(host1) + .Hosts(allHosts) + .Topic(topic) + .SubscriptionQueueName(queueName) + .IsSubscriptionQueueExclusive(false) + .SubscriptionQueueAutoDelete(false) + .AcknowledgementStrategy(AcknowledgementStrategy.Automatic) + .PrefetchCount(5) + .UseQuorumQueues() + .DeliveryLimit(5) + .LoggerFactory(Log)); + + await subscriber2.SubscribeAsync(msg => + { + _logger.LogInformation("Subscriber2 received (redelivery): {Data}", msg.Data); + redeliveries.Add(msg.Data!); + }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromSeconds(5), TestCancellationToken); + + // Assert + _logger.LogInformation("Redelivered messages: {Count}", redeliveries.Count); + Assert.True(redeliveries.Count >= 1, + $"Expected at least 1 redelivered message after subscriber disconnect, got {redeliveries.Count}"); + } + finally + { + holdGate.Set(); + if (subscriber1 is not null) + await subscriber1.DisposeAsync(); + } + } +} diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqServerVersionTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqServerVersionTests.cs index 93a23d4..0019ea8 100644 --- a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqServerVersionTests.cs +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqServerVersionTests.cs @@ -91,7 +91,7 @@ public void ParseServerVersion_WithInvalidVersionBytes_ReturnsNull() } [Fact] - public void VersionGating_Rmq42_IsBelow43() + public void VersionComparison_WithLowerVersion_DetectsAsBelow() { // Arrange var rmq42 = new Version(4, 2, 0); @@ -105,7 +105,7 @@ public void VersionGating_Rmq42_IsBelow43() } [Fact] - public void VersionGating_Rmq43_IsAtOrAbove43() + public void VersionComparison_WithExactThreshold_DetectsAsAtOrAbove() { // Arrange var rmq43 = new Version(4, 3, 0); @@ -119,7 +119,7 @@ public void VersionGating_Rmq43_IsAtOrAbove43() } [Fact] - public void VersionGating_Rmq50_IsAbove43() + public void VersionComparison_WithHigherMajor_DetectsAsAbove() { // Arrange var rmq50 = new Version(5, 0, 0); diff --git a/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqVersionGatingTests.cs b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqVersionGatingTests.cs new file mode 100644 index 0000000..2a98133 --- /dev/null +++ b/tests/Foundatio.RabbitMQ.Tests/Messaging/RabbitMqVersionGatingTests.cs @@ -0,0 +1,125 @@ +using System; +using System.Threading.Tasks; +using Foundatio.AsyncEx; +using Foundatio.Messaging; +using Foundatio.Tests.Extensions; +using Foundatio.Tests.Messaging; +using Foundatio.Xunit; +using Xunit; + +namespace Foundatio.RabbitMQ.Tests.Messaging; + +public class RabbitMqVersionGatingTests(AspireFixture fixture, ITestOutputHelper output) + : TestWithLoggingBase(output), IClassFixture +{ + [Fact] + public async Task SubscribeAsync_WithDeprecatedGlobalQos_FallsBackToPerChannelQos() + { + Assert.SkipWhen(!fixture.IsAvailable, "RabbitMQ infrastructure not available"); + + // Arrange + string topic = "versiongate-globalqos-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-queue"; + var messageReceived = new AsyncCountdownEvent(1); + string? receivedData = null; + +#pragma warning disable CS0618 + await using var messageBus = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .SubscriptionQueueName(queueName) + .PrefetchCount(10) + .GlobalQos(true) + .UseQuorumQueues() + .LoggerFactory(Log)); +#pragma warning restore CS0618 + + await messageBus.SubscribeAsync(msg => + { + receivedData = msg.Data; + messageReceived.Signal(); + }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + + // Act + await messageBus.PublishAsync(new SimpleMessageA { Data = "globalqos-fallback" }, + cancellationToken: TestCancellationToken); + + // Assert + await messageReceived.WaitAsync(TimeSpan.FromSeconds(10)); + Assert.Equal("globalqos-fallback", receivedData); + } + + [Fact] + public async Task PublishAsync_WithConfirmsAndVersionDetection_DeliversSuccessfully() + { + Assert.SkipWhen(!fixture.IsAvailable, "RabbitMQ infrastructure not available"); + + // Arrange + string topic = "versiongate-confirms-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-queue"; + var messageReceived = new AsyncCountdownEvent(1); + string? receivedData = null; + + await using var messageBus = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .SubscriptionQueueName(queueName) + .PublisherConfirmsEnabled(true) + .UseQuorumQueues() + .LoggerFactory(Log)); + + await messageBus.SubscribeAsync(msg => + { + receivedData = msg.Data; + messageReceived.Signal(); + }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + + // Act + await messageBus.PublishAsync(new SimpleMessageA { Data = "confirmed" }, + cancellationToken: TestCancellationToken); + + // Assert + await messageReceived.WaitAsync(TimeSpan.FromSeconds(10)); + Assert.Equal("confirmed", receivedData); + } + + [Fact] + public async Task SubscribeAsync_WithQuorumQueueAndDeliveryLimit_DeliversMessages() + { + Assert.SkipWhen(!fixture.IsAvailable, "RabbitMQ infrastructure not available"); + + // Arrange + string topic = "versiongate-delivery-" + Guid.NewGuid().ToString("N")[..8]; + string queueName = $"{topic}-queue"; + var messageReceived = new AsyncCountdownEvent(1); + string? receivedData = null; + + await using var messageBus = new RabbitMQMessageBus(o => o + .ConnectionString(fixture.MessagingConnectionString!) + .Topic(topic) + .SubscriptionQueueName(queueName) + .UseQuorumQueues() + .DeliveryLimit(3) + .LoggerFactory(Log)); + + await messageBus.SubscribeAsync(msg => + { + receivedData = msg.Data; + messageReceived.Signal(); + }, TestCancellationToken); + + await Task.Delay(TimeSpan.FromSeconds(1), TestCancellationToken); + + // Act + await messageBus.PublishAsync(new SimpleMessageA { Data = "delivery-limit" }, + cancellationToken: TestCancellationToken); + + // Assert + await messageReceived.WaitAsync(TimeSpan.FromSeconds(10)); + Assert.Equal("delivery-limit", receivedData); + } +}