Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c13af41
feat: add scaling/resilience tests and expose missing queue options
niemyjski May 27, 2026
6056375
remove migration docs; belongs in foundatiofx/Foundatio docs
niemyjski May 27, 2026
f178f90
```
niemyjski May 28, 2026
4f356e1
fix: address PR feedback - type safety, code quality, and restored co…
niemyjski May 28, 2026
2b660ab
refactor: simplify DelayedRetryType enum - use ToString().ToLowerInva…
niemyjski May 28, 2026
415c616
refactor: use [EnumMember] attribute for DelayedRetryType wire values
niemyjski May 28, 2026
d4c1af7
refactor: move EnumExtensions to Extensions folder
niemyjski May 28, 2026
5e2790a
feat: add RabbitMQ 4.3 quorum queue options - dead-letter strategy, o…
niemyjski May 28, 2026
2293784
fix: cache reflection in EnumExtensions, add quorum queue guards for …
niemyjski May 28, 2026
578828f
style: rename s_cache to Cache per project conventions
niemyjski May 28, 2026
ad183b2
fix: resolve PR feedback - field keyword, version gating, test reliab…
niemyjski May 28, 2026
dff68b7
fix: prevent CI test hang and fix quorum queue GlobalQos fallback
niemyjski May 28, 2026
af0ab68
fix: increase Aspire CreateAsync timeout to 5 min for CI image pulls
niemyjski May 28, 2026
3b64d25
fix: skip tests gracefully when Aspire infrastructure is unavailable
niemyjski May 28, 2026
3a16e64
perf: reduce chaos test delays and increase Aspire health check timeouts
niemyjski May 28, 2026
f4c18c1
fix: narrow generic catch clauses per PR feedback
niemyjski May 28, 2026
ce0943c
fix: revert StartAsync to 2min and chaos health to 30s to prevent CI …
niemyjski May 28, 2026
03f4009
fix: address remaining PR feedback - use MessageBusException for conf…
niemyjski May 28, 2026
3364af2
feat: Add RabbitMQ message priority support and documentation updates
niemyjski May 28, 2026
06e3384
fix: address remaining PR feedback from Copilot review
niemyjski May 28, 2026
a19c742
fix: use explicit LINQ Where filter for Properties iteration
niemyjski May 28, 2026
b13be24
fix: simplify OperationCanceledException handling in WaitForNodeReady…
niemyjski May 28, 2026
525149b
Delete write_doc.py
niemyjski May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,53 @@ The `rabbitmq_delayed_message_exchange` plugin is [archived and no longer mainta
- On RabbitMQ >= 4.3: Delayed messages use the in-memory fallback automatically. Be aware that these are not durable across process restarts.
- For durable delayed delivery on RabbitMQ 4.3+, consider implementing TTL + Dead-Letter Exchange patterns or using an external scheduler.

### RabbitMQ 4.3 Feature Support

**Supported (AMQP 0.9.1 compatible):**

- 32 strict message priority levels on quorum queues (via `UseMessagePriority()`)
- Delayed retries with linear backoff (via `UseDelayedRetries()`)
- Per-queue consumer timeouts (via `ConsumerTimeout()`)
- Single active consumer (via `UseSingleActiveConsumer()`)

**Not supported (require AMQP 1.0 protocol):**

- `x-opt-delivery-time` annotation -- per-message delayed retry override via the `modified` disposition outcome. AMQP 0.9.1 `basic.nack`/`basic.reject` do not support annotations.
- `x-opt-delivery-delay` annotation -- relative delay for the enterprise Message Scheduler / Delayed Queue plugin.
- Rejected-by and rejection reason -- returned to publishers in the AMQP 1.0 `Rejected` outcome.
- Consumer activity notification -- signaled via AMQP 1.0 flow frames for single active consumer state transitions.

This library uses the `RabbitMQ.Client` package (AMQP 0.9.1). To use AMQP 1.0 features, consider the [Amqp.Net Lite](https://github.com/Azure/amqpnetlite) library or the [RabbitMQ AMQP 1.0 .NET client](https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client).

### OpenTelemetry

Foundatio automatically propagates W3C trace context (`traceparent` / `tracestate`) through message headers. On publish, the current `Activity.Id` is stored as the message's `CorrelationId`; on receive, Foundatio starts a new `Activity` parented to that ID, linking consumer spans back to the publisher's trace.

To collect Foundatio's application-level message spans, add the `"Foundatio"` source:

```csharp
.AddSource("Foundatio")
```

For additional **transport-level** visibility (AMQP channel operations, network round-trips), the `RabbitMQ.Client` 7.x library emits its own spans:

```csharp
.AddSource("RabbitMQ.Client.*")
```

A complete tracing setup:

```csharp
services.AddOpenTelemetry().WithTracing(tracing =>
{
tracing.AddSource("Foundatio"); // message publish/handle spans
tracing.AddSource("RabbitMQ.Client.*"); // AMQP transport spans (optional)
tracing.AddOtlpExporter();
});
```

> **Note:** The [`RabbitMQ.Client.OpenTelemetry`](https://www.nuget.org/packages/RabbitMQ.Client.OpenTelemetry/) package (currently pre-release) is NOT required -- Foundatio handles cross-process trace propagation at the application level. That package adds an alternative propagation mechanism at the transport level which is redundant when using Foundatio.

### Core Features

- [Getting Started](https://foundatio.dev/guide/getting-started) - Installation and setup
Expand Down
21 changes: 21 additions & 0 deletions src/Foundatio.RabbitMQ/Extensions/EnumExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Collections.Concurrent;
using System.Reflection;
using System.Runtime.Serialization;

namespace Foundatio.Messaging;

internal static class EnumExtensions
{
private static readonly ConcurrentDictionary<Enum, string> Cache = new();

public static string ToEnumString<T>(this T value) where T : struct, Enum
{
return Cache.GetOrAdd(value, static v =>
{
var member = v.GetType().GetField(v.ToString()!);
var attribute = member?.GetCustomAttribute<EnumMemberAttribute>();
return attribute?.Value ?? v.ToString()!;
});
}
}
48 changes: 48 additions & 0 deletions src/Foundatio.RabbitMQ/Messaging/DeadLetterStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System.Runtime.Serialization;

namespace Foundatio.Messaging;

/// <summary>
/// Dead-letter strategy for quorum queues.
/// Controls how messages are transferred to the dead-letter exchange.
/// Set via the x-dead-letter-strategy queue argument.
/// See: https://www.rabbitmq.com/docs/quorum-queues#dead-lettering
/// </summary>
public enum DeadLetterStrategy
{
/// <summary>
/// Default. Messages may be lost in transit between queues during dead-lettering.
/// Suitable when dead-lettered messages are informational and loss is acceptable.
/// </summary>
[EnumMember(Value = "at-most-once")]
AtMostOnce,

/// <summary>
/// Guarantees message transfer to the dead-letter exchange using internal publisher confirms.
/// Requires overflow to be set to reject-publish (not drop-head).
/// Uses more memory and CPU. Only enable when dead-lettered messages must not be lost.
/// </summary>
[EnumMember(Value = "at-least-once")]
AtLeastOnce
}

/// <summary>
/// Queue overflow behavior when a queue reaches its maximum length.
/// Set via the x-overflow queue argument.
/// See: https://www.rabbitmq.com/docs/maxlength#overflow-behaviour
/// </summary>
public enum QueueOverflowBehavior
{
/// <summary>
/// Default. Drop or dead-letter messages from the head (oldest) of the queue.
/// </summary>
[EnumMember(Value = "drop-head")]
DropHead,

/// <summary>
/// Reject new publishes with basic.nack when the queue is full.
/// Required for at-least-once dead-lettering on quorum queues.
/// </summary>
[EnumMember(Value = "reject-publish")]
RejectPublish
}
39 changes: 39 additions & 0 deletions src/Foundatio.RabbitMQ/Messaging/DelayedRetryType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Runtime.Serialization;

namespace Foundatio.Messaging;

/// <summary>
/// Specifies the native delayed retry behavior for quorum queues (RabbitMQ 4.3+).
/// Maps to the x-delayed-retry-type queue argument.
/// See: https://www.rabbitmq.com/docs/quorum-queues#delayed-retries
/// </summary>
public enum DelayedRetryType
{
/// <summary>
/// All returned messages (nacks, rejects, and timeouts) are delayed before redelivery.
/// </summary>
[EnumMember(Value = "all")]
All,

/// <summary>
/// Messages returned without marking the delivery as failed are delayed.
/// Includes: basic.nack, AMQP 1.0 released, modified with delivery-failed=false.
/// These do NOT increment delivery-count, supporting unlimited returns.
/// </summary>
[EnumMember(Value = "returned")]
Returned,

/// <summary>
/// Only messages where delivery actually failed are delayed.
/// Includes: basic.reject, client crash, AMQP 1.0 rejected, modified with delivery-failed=true.
/// These increment delivery-count toward the delivery-limit.
/// </summary>
[EnumMember(Value = "failed")]
Failed,

/// <summary>
/// Delayed retry is explicitly disabled.
/// </summary>
[EnumMember(Value = "disabled")]
Disabled
}
89 changes: 84 additions & 5 deletions src/Foundatio.RabbitMQ/Messaging/RabbitMQMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -18,6 +19,7 @@ public class RabbitMQMessageBus : MessageBusBase<RabbitMQMessageBusOptions>
{
private const string XDeliveryCountHeader = "x-delivery-count";
private const string XOriginalMessageIdHeader = "x-original-message-id";
private const string PriorityPropertyKey = "Priority";
private static readonly Version _delayedExchangePluginIncompatibleVersion = new(4, 3);
private static readonly Version _globalQosRemovedVersion = new(4, 3);

Expand Down Expand Up @@ -80,6 +82,12 @@ public RabbitMQMessageBus(RabbitMQMessageBusOptions options) : base(options)
Uri = primaryUri,
AutomaticRecoveryEnabled = true
};

if (options.RequestedHeartbeat.HasValue)
_factory.RequestedHeartbeat = options.RequestedHeartbeat.Value;

if (options.NetworkRecoveryInterval.HasValue)
_factory.NetworkRecoveryInterval = options.NetworkRecoveryInterval.Value;
}

public RabbitMQMessageBus(Builder<RabbitMQMessageBusOptionsBuilder, RabbitMQMessageBusOptions> config)
Expand Down Expand Up @@ -148,7 +156,12 @@ protected override async Task EnsureTopicSubscriptionAsync(CancellationToken can
#pragma warning disable CS0618 // GlobalQos is obsolete but we still need to read it for backward compatibility
bool useGlobalQos = _options.GlobalQos;
#pragma warning restore CS0618
if (useGlobalQos && _serverVersion is not null && _serverVersion >= _globalQosRemovedVersion)
if (useGlobalQos && _isQuorumQueue)
{
_logger.LogWarning("GlobalQos is not supported on quorum queues. Falling back to per-channel prefetch (global: false). Remove the GlobalQos option to suppress this warning");
useGlobalQos = false;
}
else if (useGlobalQos && _serverVersion is not null && _serverVersion >= _globalQosRemovedVersion)
{
_logger.LogWarning("GlobalQos is not supported on RabbitMQ {ServerVersion}. Falling back to per-channel prefetch (global: false). Remove the GlobalQos option to suppress this warning", _serverVersion);
useGlobalQos = false;
Expand Down Expand Up @@ -626,11 +639,16 @@ protected override async Task PublishImplAsync(string messageType, object messag
if (_options.DefaultMessageTimeToLive.HasValue)
basicProperties.Expiration = _options.DefaultMessageTimeToLive.Value.TotalMilliseconds.ToString(CultureInfo.InvariantCulture);

if (options.Properties.TryGetValue(PriorityPropertyKey, out string? priorityValue) && Byte.TryParse(priorityValue, out byte priority))
basicProperties.Priority = priority;

if (options.Properties.Count > 0)
{
basicProperties.Headers ??= new Dictionary<string, object?>();
foreach (var property in options.Properties)
foreach (var property in options.Properties.Where(p => !String.Equals(p.Key, PriorityPropertyKey, StringComparison.Ordinal)))
{
basicProperties.Headers.Add(property.Key, property.Value);
}
}

// RabbitMQ only supports delayed messages with a third party plugin called "rabbitmq_delayed_message_exchange"
Expand Down Expand Up @@ -772,11 +790,72 @@ private async Task<string> CreateQueueAsync(IChannel channel)
// Set up the queue where the messages will reside - it requires the queue name and durability.
// Durable (the queue will survive a broker restart)
// Arguments (some brokers use it to implement additional features like message TTL)
var result = await channel.QueueDeclareAsync(_options.SubscriptionQueueName, _options.IsDurable, _options.IsSubscriptionQueueExclusive, _options.SubscriptionQueueAutoDelete, _options.Arguments).AnyContext();
var arguments = _options.Arguments is not null
? new Dictionary<string, object?>(_options.Arguments)
: new Dictionary<string, object?>();

if (!String.IsNullOrWhiteSpace(_options.DeadLetterExchange))
Comment thread
niemyjski marked this conversation as resolved.
{
arguments["x-dead-letter-exchange"] = _options.DeadLetterExchange;

if (!String.IsNullOrWhiteSpace(_options.DeadLetterRoutingKey))
arguments["x-dead-letter-routing-key"] = _options.DeadLetterRoutingKey;

if (_options.DeadLetterStrategy.HasValue)
{
if (_options.DeadLetterStrategy == DeadLetterStrategy.AtLeastOnce)
{
if (!_isQuorumQueue)
throw new MessageBusException("At-least-once dead-lettering requires quorum queues. Call UseQuorumQueues().");

if (_options.Overflow != QueueOverflowBehavior.RejectPublish)
throw new MessageBusException("At-least-once dead-lettering requires overflow to be set to RejectPublish. Call .OverflowBehavior(QueueOverflowBehavior.RejectPublish).");
}

arguments["x-dead-letter-strategy"] = _options.DeadLetterStrategy.Value.ToEnumString();
}
Comment thread
niemyjski marked this conversation as resolved.
}

if (_options.Overflow.HasValue)
arguments["x-overflow"] = _options.Overflow.Value.ToEnumString();

if (_options.ConsumerTimeout.HasValue)
{
if (!_isQuorumQueue)
throw new MessageBusException("Per-queue consumer timeout (x-consumer-timeout) requires quorum queues (RabbitMQ 4.3+). Call UseQuorumQueues() before ConsumerTimeout().");

if (_serverVersion is not null && _serverVersion < _delayedExchangePluginIncompatibleVersion)
throw new MessageBusException($"Per-queue consumer timeout (x-consumer-timeout) requires RabbitMQ 4.3+. Detected server version: {_serverVersion}.");

Comment thread
niemyjski marked this conversation as resolved.
arguments["x-consumer-timeout"] = (long)_options.ConsumerTimeout.Value.TotalMilliseconds;
}

if (_options.SingleActiveConsumer)
arguments["x-single-active-consumer"] = true;

if (_options.MaxPriority.HasValue)
arguments["x-max-priority"] = (int)_options.MaxPriority.Value;

if (_options.DelayedRetryType.HasValue)
{
if (!_isQuorumQueue)
throw new MessageBusException("Delayed retries (x-delayed-retry-*) require quorum queues (RabbitMQ 4.3+). Call UseQuorumQueues() before UseDelayedRetries().");

if (_serverVersion is not null && _serverVersion < _delayedExchangePluginIncompatibleVersion)
throw new MessageBusException($"Delayed retries (x-delayed-retry-*) require RabbitMQ 4.3+. Detected server version: {_serverVersion}.");

arguments["x-delayed-retry-type"] = _options.DelayedRetryType.Value.ToEnumString();
if (_options.DelayedRetryMin.HasValue)
arguments["x-delayed-retry-min"] = _options.DelayedRetryMin.Value;
if (_options.DelayedRetryMax.HasValue)
arguments["x-delayed-retry-max"] = _options.DelayedRetryMax.Value;
Comment thread
niemyjski marked this conversation as resolved.
}

var result = await channel.QueueDeclareAsync(_options.SubscriptionQueueName, _options.IsDurable, _options.IsSubscriptionQueueExclusive, _options.SubscriptionQueueAutoDelete, arguments.Count > 0 ? arguments : null).AnyContext();
string queueName = result.QueueName;

// bind the queue with the exchange.
await channel.QueueBindAsync(queueName, _options.Topic, "").AnyContext();
// Bind the queue with the exchange.
await channel.QueueBindAsync(queueName, _options.Topic, String.Empty).AnyContext();

return queueName;
}
Expand Down
Loading