diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index bf928e5f..02bfefc7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -186,13 +186,20 @@ jobs: test_path: "Tests/SlimMessageBus.Host.Outbox.PostgreSql.DbContext.Test" docker_services: "" - # 3. Repository-layer tests – Sql.Test + PostgreSql.Test, both via TestContainers + # 3. Repository-layer tests – Sql.Test + PostgreSql.Test + MongoDb.Test, all via TestContainers - transport: Outbox.Repositories name: "Outbox (Repositories)" filter: "Category=Integration&Transport=Outbox.Sql" test_path: "" docker_services: "" + # 4. MongoDb outbox repository tests – via TestContainers + - transport: Outbox.MongoDb + name: "Outbox (MongoDb)" + filter: "Category=Integration&Transport=Outbox.MongoDb" + test_path: "Tests/SlimMessageBus.Host.Outbox.MongoDb.Test" + docker_services: "" + # Cloud transports (require repository secrets) - transport: AmazonSQS name: AmazonSQS diff --git a/README.md b/README.md index d4e3b29d..00a0ef17 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,7 @@ The configuration can be [modularized](docs/intro.md#modularization-of-configura - Plugins: - [Serialization](docs/serialization.md) - [Transactional Outbox](docs/plugin_outbox.md) + - [MongoDB](docs/plugin_outbox_mongodb.md) - [Validation using FluentValidation](docs/plugin_fluent_validation.md) - [AsyncAPI specification generation](docs/plugin_asyncapi.md) - [Consumer Circuit Breaker](docs/intro.md#health-check-circuit-breaker) @@ -171,6 +172,7 @@ The configuration can be [modularized](docs/intro.md#modularization-of-configura | `.Host.Outbox.PostgreSql.DbContext` | Transactional Outbox using PostgreSQL with EF DataContext integration | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.PostgreSql.DbContext.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.PostgreSql.DbContext) | | `.Host.Outbox.Sql` | Transactional Outbox using MSSQL | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql) | | `.Host.Outbox.Sql.DbContext` | Transactional Outbox using MSSQL with EF DataContext integration | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.Sql.DbContext.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.Sql.DbContext) | +| `.Host.Outbox.MongoDb` | [Transactional Outbox using MongoDB](docs/plugin_outbox_mongodb.md) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.Outbox.MongoDb.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.MongoDb) | | `.Host.AsyncApi` | [AsyncAPI](https://www.asyncapi.com/) specification generation via [Saunter](https://github.com/tehmantra/saunter) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.AsyncApi.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.AsyncApi) | | `.Host.CircuitBreaker.HealthCheck` | Consumer circuit breaker based on [health checks](docs/intro.md#health-check-circuit-breaker) | [![NuGet](https://img.shields.io/nuget/v/SlimMessageBus.Host.CircuitBreaker.HealthCheck.svg)](https://www.nuget.org/packages/SlimMessageBus.Host.CircuitBreaker.HealthCheck) | diff --git a/docs/intro.md b/docs/intro.md index 5d9328ed..450f8d7c 100644 --- a/docs/intro.md +++ b/docs/intro.md @@ -1241,6 +1241,36 @@ See source: On the consumer side, before the received message is delivered to the consumer (or request handler) the SMB is performing a DI lookup for the interceptor interface types that are relevant for the given message type (or request and response type). +#### Consumer instance resolution order + +The consumer (or handler) instance is resolved from the DI container **after** all interceptors have run — specifically, it is created the first time `IConsumerContext.Consumer` is accessed inside `ExecuteConsumer`, which is the final step of the interceptor pipeline (the inner-most `next()` call). + +This ordering has an important consequence: any **scoped DI service injected into the consumer's constructor** is resolved *after* every interceptor has executed. Interceptors can therefore set up ambient state (start a database transaction, populate an `AsyncLocal`, update a mutable scoped holder) that will be visible when the consumer's constructor fires and its dependencies are resolved. + +``` +Per-message DI scope created + │ + ▼ + Interceptors resolved from DI + │ + ▼ + IConsumerInterceptor.OnHandle() ← e.g. starts a DB transaction + │ (calls next()) + ▼ + IRequestHandlerInterceptor.OnHandle() + │ (calls next()) + ▼ + Consumer resolved from DI ← scoped constructor deps see the open transaction + │ + ▼ + Consumer.OnHandle() / Handler.OnHandle() + │ + ▼ + Interceptors unwind (commit / rollback) +``` + +> This is why, for example, the MongoDB outbox plugin can register `IClientSessionHandle` as a plain scoped service. The `MongoDbTransactionConsumerInterceptor` starts the session before the consumer is constructed, so the consumer receives a live (non-null) `IClientSessionHandle?` via normal constructor injection with no `Lazy` wrapper needed. + ```cs // Intercepts consumers of type IConsumer and IRequestHandler public interface IConsumerInterceptor : IInterceptor diff --git a/docs/intro.t.md b/docs/intro.t.md index 5d9328ed..450f8d7c 100644 --- a/docs/intro.t.md +++ b/docs/intro.t.md @@ -1241,6 +1241,36 @@ See source: On the consumer side, before the received message is delivered to the consumer (or request handler) the SMB is performing a DI lookup for the interceptor interface types that are relevant for the given message type (or request and response type). +#### Consumer instance resolution order + +The consumer (or handler) instance is resolved from the DI container **after** all interceptors have run — specifically, it is created the first time `IConsumerContext.Consumer` is accessed inside `ExecuteConsumer`, which is the final step of the interceptor pipeline (the inner-most `next()` call). + +This ordering has an important consequence: any **scoped DI service injected into the consumer's constructor** is resolved *after* every interceptor has executed. Interceptors can therefore set up ambient state (start a database transaction, populate an `AsyncLocal`, update a mutable scoped holder) that will be visible when the consumer's constructor fires and its dependencies are resolved. + +``` +Per-message DI scope created + │ + ▼ + Interceptors resolved from DI + │ + ▼ + IConsumerInterceptor.OnHandle() ← e.g. starts a DB transaction + │ (calls next()) + ▼ + IRequestHandlerInterceptor.OnHandle() + │ (calls next()) + ▼ + Consumer resolved from DI ← scoped constructor deps see the open transaction + │ + ▼ + Consumer.OnHandle() / Handler.OnHandle() + │ + ▼ + Interceptors unwind (commit / rollback) +``` + +> This is why, for example, the MongoDB outbox plugin can register `IClientSessionHandle` as a plain scoped service. The `MongoDbTransactionConsumerInterceptor` starts the session before the consumer is constructed, so the consumer receives a live (non-null) `IClientSessionHandle?` via normal constructor injection with no `Lazy` wrapper needed. + ```cs // Intercepts consumers of type IConsumer and IRequestHandler public interface IConsumerInterceptor : IInterceptor diff --git a/docs/plugin_outbox_mongodb.md b/docs/plugin_outbox_mongodb.md new file mode 100644 index 00000000..8b5510e9 --- /dev/null +++ b/docs/plugin_outbox_mongodb.md @@ -0,0 +1,228 @@ +# Transactional Outbox Plugin for MongoDB + +Please read the [Introduction](intro.md) and the [Transactional Outbox](plugin_outbox.md) overview before reading this page. + +- [Introduction](#introduction) +- [Configuration](#configuration) +- [Options](#options) + - [UseOutbox for Producers](#useoutbox-for-producers) + - [UseMongoDbTransaction for Consumers](#usemongodbtransaction-for-consumers) +- [How it works](#how-it-works) +- [Collections](#collections) +- [Migration versioning](#migration-versioning) +- [Indices](#indices) +- [Clean up](#clean-up) +- [Important note](#important-note) + +## Introduction + +[`SlimMessageBus.Host.Outbox.MongoDb`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.MongoDb) adds [Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html) pattern support backed by **MongoDB**. + +It uses the [MongoDB.Driver](https://www.nuget.org/packages/MongoDB.Driver) (3.x) and targets **.NET 8 and .NET 10**. + +> Requires `IMongoClient` to be registered in the DI container. + +## Configuration + +> Required: [`SlimMessageBus.Host.Outbox.MongoDb`](https://www.nuget.org/packages/SlimMessageBus.Host.Outbox.MongoDb) + +```bash +dotnet add package SlimMessageBus.Host.Outbox.MongoDb +``` + +Call `.AddOutboxUsingMongoDb()` on the `MessageBusBuilder` to enable the plugin: + +```csharp +using SlimMessageBus.Host.Outbox.MongoDb; + +builder.Services.AddSlimMessageBus(mbb => +{ + mbb + .AddChildBus("Memory", mbb => + { + mbb.WithProviderMemory() + .AutoDeclareFrom(Assembly.GetExecutingAssembly(), consumerTypeFilter: t => t.Name.EndsWith("CommandHandler")) + // Wrap each command handler in a MongoDB multi-document transaction + .UseMongoDbTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); + }) + .AddChildBus("AzureSB", mbb => + { + mbb.WithProviderServiceBus(cfg => { /* ... */ }) + .Produce(x => x.DefaultTopic("samples.outbox/customer-events")) + // All outgoing messages from this bus will go out via the outbox + .UseOutbox(); + }) + .AddServicesFromAssembly(Assembly.GetExecutingAssembly()) + .AddJsonSerializer() + // Configure MongoDB outbox + .AddOutboxUsingMongoDb(opts => + { + opts.PollBatchSize = 500; + opts.PollIdleSleep = TimeSpan.FromSeconds(10); + opts.MessageCleanup.Interval = TimeSpan.FromSeconds(60); + opts.MessageCleanup.Age = TimeSpan.FromMinutes(60); + // Override MongoDB collection names (optional) + // opts.MongoDbSettings.DatabaseName = "myapp"; + // opts.MongoDbSettings.CollectionName = "smb_outbox"; + // opts.MongoDbSettings.LockCollectionName = "smb_outbox_lock"; + }); +}); + +// SMB requires IMongoClient to be registered in the container +builder.Services.AddSingleton(new MongoClient(connectionString)); +``` + +## Options + +### UseOutbox for Producers + +`.UseOutbox()` marks a producer (or an entire child bus) to route outgoing messages through the outbox instead of publishing them directly to the transport. + +```csharp +mbb.Produce(x => +{ + x.DefaultTopic("order-events"); + x.UseOutbox(); // this producer uses the outbox +}); + +// or for all producers on a bus: +mbb.UseOutbox(); +``` + +### UseMongoDbTransaction for Consumers + +`.UseMongoDbTransaction()` wraps each consumer (or handler) in a MongoDB multi-document transaction. The transaction is committed after a successful `OnHandle` call and rolled back on any exception. + +> **Note:** MongoDB multi-document transactions require a **replica set** (or sharded cluster). Standalone `mongod` instances do not support transactions. + +```csharp +using SlimMessageBus.Host.Outbox.MongoDb; + +// On a single consumer: +mbb.Consume(x => + x.WithConsumer() + .UseMongoDbTransaction()); + +// Or across all consumers on a bus: +mbb.UseMongoDbTransaction(messageTypeFilter: t => t.Name.EndsWith("Command")); +``` + +#### Enlisting your own MongoDB writes in the transaction + +The **outbox insert** always participates in the active transaction automatically. However, unlike SQL (where a `SqlConnection` carries the transaction implicitly), MongoDB requires the `IClientSessionHandle` to be **passed explicitly** to every collection operation. + +To make your own document writes atomic with the outbox insert, inject `IClientSessionHandle?` directly into the consumer constructor: + +```csharp +// No dependency on SlimMessageBus.Host.Outbox.MongoDb — only MongoDB.Driver types needed. +public class CreateCustomerCommandHandler( + IMongoCollection customers, + IClientSessionHandle? session, // null when no transaction is active + IMessageBus bus) : IRequestHandler +{ + public async Task OnHandle(CreateCustomerCommand request, CancellationToken ct) + { + var customer = new Customer(request.Name); + + // Both writes share the same session — committed or rolled back together. + if (session != null) + await customers.InsertOneAsync(session, customer, cancellationToken: ct); + else + await customers.InsertOneAsync(customer, cancellationToken: ct); + + // This publish goes via the outbox and is in the same transaction. + await bus.Publish(new CustomerCreatedEvent(customer.Id)); + return customer.Id; + } +} +``` + +> **Why does constructor injection work here?** +> SMB resolves the consumer from DI *after* all interceptors have executed. `MongoDbTransactionConsumerInterceptor` starts the session before the consumer is constructed, so the DI factory for `IClientSessionHandle` already finds a live session in `MongoDbSessionHolder` by the time the consumer's constructor runs. See [Consumer instance resolution order](intro.md#consumer-instance-resolution-order) for the full execution diagram. + +`session` is `null` when no transaction is active (e.g. `UseMongoDbTransaction()` is not configured, or running against a standalone `mongod`). The `null` check makes the consumer work in both cases. + +## How it works + +- On bus start, `MongoDbOutboxMigrationService` creates the outbox collection and lock collection (if they do not exist) together with the supporting indices. +- When a message is published via a producer marked with `.UseOutbox()`, the message is inserted into the outbox MongoDB collection. + - If the call happens inside a consumer that has `.UseMongoDbTransaction()` enabled, the insert participates in the active MongoDB session, ensuring atomicity with any other writes performed during that consumer invocation. +- A background poller periodically locks a batch of undelivered messages (up to `PollBatchSize`) and forwards them to the actual transport. Locking is done in two steps: + 1. Find candidate document IDs (ordered by `Timestamp`, limited to `PollBatchSize`). + 2. Atomically claim them with an `UpdateMany` that re-applies the eligibility filter to handle concurrent instances. +- When `MaintainSequence = true`, an additional global lock document (in the lock collection) ensures only one application instance processes the outbox at a time, preserving message order at the cost of throughput. +- After successful delivery each document is marked `DeliveryComplete = true`. On repeated failures the `DeliveryAttempt` counter is incremented; once it reaches `MaxDeliveryAttempts` the document is marked `DeliveryAborted = true` and skipped. + +## Collections + +By default three MongoDB collections are used: + +| Collection | Setting | Default | +| ----------------------- | ------------------------------------------ | ------------------------ | +| Outbox messages | `MongoDbSettings.CollectionName` | `smb_outbox` | +| Global lock (table-lock mode) | `MongoDbSettings.LockCollectionName` | `smb_outbox_lock` | +| Applied migrations | `MongoDbSettings.MigrationsCollectionName` | `smb_outbox_migrations` | + +The database is set via `MongoDbSettings.DatabaseName` (default: `slimmessagebus`). + +## Migration versioning + +Schema changes are tracked in the `smb_outbox_migrations` collection. Each migration step has a unique timestamp-based ID (e.g. `"20240101000000_SMB_Init"`). On startup `MongoDbOutboxMigrationService` checks whether each migration ID is present in the collection: + +- **Not present** → the action (index creation/modification) runs and the ID is recorded on success. +- **Present** → skipped. + +This gives **at-least-once** (not exactly-once) execution semantics: + +- A crash before the record is written → **retried on the next startup** (safe, all actions are idempotent). +- Two instances racing simultaneously → both may run the action, one wins the insert race, the other handles the `DuplicateKey` exception (safe for idempotent actions). + +> **Note:** MongoDB does not allow DDL operations such as `createIndex` inside multi-document transactions. Migrations are therefore intentionally **not transactional** — safety comes from idempotency, not atomicity. Only add migration steps that are safe to run more than once (i.e. index creation using `IF NOT EXISTS` semantics). Destructive one-shot operations must be applied externally with `EnableMigration = false`. + +To add a future migration, append a new `TryApplyMigration` call in the service with a new unique ID. Old migration IDs must never be reused. + +### Disabling migrations + +Set `MongoDbSettings.EnableMigration = false` to skip the entire migration step at startup. Use this when you manage schema changes externally (e.g. via a deployment pipeline) and want SMB to leave the database schema untouched. + +```csharp +.AddOutboxUsingMongoDb(opts => +{ + opts.MongoDbSettings.EnableMigration = false; +}); +``` + +## Indices + +`MongoDbOutboxMigrationService` ensures the following indices exist on startup: + +**Outbox collection (`smb_outbox`)** + +| Index fields | Purpose | +| --------------------------------------------------- | -------------------------------- | +| `delivery_complete`, `delivery_aborted`, `timestamp` | Main polling query | +| `lock_instance_id`, `lock_expires_on` | Lock-ownership queries | +| `timestamp` | Cleanup (delete-sent) ordering | + +**Lock collection (`smb_outbox_lock`)** + +| Index fields | Purpose | +| ---------------- | --------------------- | +| `lock_expires_on` | Expired-lock detection | + +## Clean up + +Sent messages older than `MessageCleanup.Age` are removed in batches of `MessageCleanup.BatchSize` on startup and then every `MessageCleanup.Interval`. + +| Property | Description | Default | +| --------- | -------------------------------------------------- | ------- | +| Enabled | `true` if sent messages are to be removed | true | +| Interval | Time between clean-up executions | 1 hour | +| Age | Minimum age of a sent message to delete | 1 hour | +| BatchSize | Number of messages to be removed in each iteration | 10 000 | + +## Important note + +Because the outbox can be processed by any application instance, all active instances must share the same message registrations and compatible serialization schema. + +A message that fails to be delivered will have its `DeliveryAborted` flag set to `true` in the outbox collection once `MaxDeliveryAttempts` is exceeded. It is safe to reset this flag to `false` manually (e.g. via `mongosh`) once the underlying issue has been resolved. diff --git a/src/Host.Plugin.Properties.xml b/src/Host.Plugin.Properties.xml index 4221e5e3..65b22d90 100644 --- a/src/Host.Plugin.Properties.xml +++ b/src/Host.Plugin.Properties.xml @@ -4,7 +4,7 @@ - 3.4.0 + 3.5.0-rc100 \ No newline at end of file diff --git a/src/Infrastructure/docker-compose.yml b/src/Infrastructure/docker-compose.yml index d1c4f4d2..fb847d1e 100644 --- a/src/Infrastructure/docker-compose.yml +++ b/src/Infrastructure/docker-compose.yml @@ -175,5 +175,19 @@ services: networks: - slim + mongodb: + container_name: slim.mongodb + image: mongo:8.0 + ports: + - 27017:27017 + healthcheck: + test: ["CMD-SHELL", "mongosh --quiet --eval 'db.runCommand({ ping: 1 }).ok' || exit 1"] + interval: 5s + timeout: 5s + retries: 10 + start_period: 10s + networks: + - slim + networks: slim: {} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/BuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/BuilderExtensions.cs new file mode 100644 index 00000000..cd990014 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/BuilderExtensions.cs @@ -0,0 +1,43 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Configuration; + +public static class BuilderExtensions +{ + internal static readonly string PropertyMongoDbTransactionEnabled = "MongoDbTransaction_Enabled"; + internal static readonly string PropertyMongoDbTransactionFilter = "MongoDbTransaction_Filter"; + + /// + /// Enables wrapping of every consumed message on this bus in a MongoDB session transaction. + /// Requires MongoDB to be running as a replica set. + /// + public static MessageBusBuilder UseMongoDbTransaction(this MessageBusBuilder builder, bool enabled = true, Func? messageTypeFilter = null) + { + SetTransactionProps(builder.Settings, enabled, messageTypeFilter); + return builder; + } + + /// + /// Enables wrapping of every consumed message on this consumer in a MongoDB session transaction. + /// Requires MongoDB to be running as a replica set. + /// + public static ConsumerBuilder UseMongoDbTransaction(this ConsumerBuilder builder, bool enabled = true, Func? messageTypeFilter = null) + { + SetTransactionProps(builder.Settings, enabled, messageTypeFilter); + return builder; + } + + /// + /// Enables wrapping of every request handler on this handler in a MongoDB session transaction. + /// Requires MongoDB to be running as a replica set. + /// + public static HandlerBuilder UseMongoDbTransaction(this HandlerBuilder builder, bool enabled = true, Func? messageTypeFilter = null) + { + SetTransactionProps(builder.Settings, enabled, messageTypeFilter); + return builder; + } + + private static void SetTransactionProps(HasProviderExtensions settings, bool enabled, Func? messageTypeFilter) + { + settings.Properties[PropertyMongoDbTransactionEnabled] = enabled; + settings.Properties[PropertyMongoDbTransactionFilter] = messageTypeFilter; + } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/MessageBusBuilderExtensions.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/MessageBusBuilderExtensions.cs new file mode 100644 index 00000000..aa59964e --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/MessageBusBuilderExtensions.cs @@ -0,0 +1,91 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Configuration; + +public static class MessageBusBuilderExtensions +{ + /// + /// Configures the outbox to use MongoDB as the backing store. + /// Requires to be registered in the DI container. + /// + public static MessageBusBuilder AddOutboxUsingMongoDb(this MessageBusBuilder mbb, Action? configure = null) + { + mbb.PostConfigurationActions.Add(services => + { + services.TryAddSingleton(sp => + { + var settings = new MongoDbOutboxSettings(); + configure?.Invoke(settings); + return settings; + }); + + services.TryAddSingleton(sp => sp.GetRequiredService().MongoDbSettings); + services.TryAddTransient(sp => sp.GetRequiredService()); + + // MongoDB collections (thread-safe singletons) + services.TryAddSingleton(sp => + { + var dbSettings = sp.GetRequiredService(); + var database = sp.GetRequiredService(); + return database.GetCollection(dbSettings.CollectionName); + }); + + services.TryAddSingleton(sp => + { + var dbSettings = sp.GetRequiredService(); + var database = sp.GetRequiredService(); + return database.GetCollection(dbSettings.LockCollectionName); + }); + + // IMongoDatabase resolved from IMongoClient + settings + services.TryAddSingleton(sp => + { + var dbSettings = sp.GetRequiredService(); + var client = sp.GetRequiredService(); + return client.GetDatabase(dbSettings.DatabaseName); + }); + + // Transaction interceptors + var settings = new[] { mbb.Settings }.Concat(mbb.Children.Values.Select(x => x.Settings)).ToList(); + foreach (var consumerMessageType in settings + .SelectMany(x => x.Consumers + .SelectMany(c => c.Invokers) + .Where(ci => ci.ParentSettings.IsEnabledForMessageType( + x, + BuilderExtensions.PropertyMongoDbTransactionEnabled, + BuilderExtensions.PropertyMongoDbTransactionFilter, + ci.MessageType))) + .Select(x => x.MessageType)) + { + var serviceType = typeof(IConsumerInterceptor<>).MakeGenericType(consumerMessageType); + var implementationType = typeof(MongoDbTransactionConsumerInterceptor<>).MakeGenericType(consumerMessageType); + services.TryAddEnumerable(ServiceDescriptor.Transient(serviceType, implementationType)); + } + + services.TryAddScoped(); + services.TryAddScoped(sp => new MongoDbTransactionService( + sp.GetRequiredService(), + sp.GetRequiredService())); + + // Expose the active session handle so consumers can inject IClientSessionHandle? + // directly into their constructor without any dependency on SMB packages. + // Because SMB now resolves the consumer *after* all interceptors have run, + // this factory fires after BeginTransaction() has set the session on the holder. + // Consumers inject IClientSessionHandle? (nullable). The value is null when no + // transaction is active, and non-null after BeginTransaction() fires (which happens + // before the consumer is constructed, thanks to the deferred resolution in core SMB). + services.TryAddScoped(sp => + sp.GetRequiredService().Session!); + + services.TryAddScoped(sp => new MongoDbOutboxMessageRepository( + sp.GetRequiredService>(), + sp.GetRequiredService(), + sp.GetRequiredService>(), + sp.GetRequiredService>(), + sp.GetRequiredService())); + services.TryAddScoped>(sp => sp.GetRequiredService()); + services.TryAddScoped(sp => sp.GetRequiredService()); + services.TryAddTransient(); + }); + + return mbb.AddOutbox(); + } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/MongoDbOutboxSettings.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/MongoDbOutboxSettings.cs new file mode 100644 index 00000000..518317fa --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Configuration/MongoDbOutboxSettings.cs @@ -0,0 +1,36 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Configuration; + +public class MongoDbOutboxSettings : OutboxSettings +{ + public MongoDbSettings MongoDbSettings { get; set; } = new(); +} + +public class MongoDbSettings +{ + /// + /// The name of the MongoDB database to store outbox messages in. + /// + public string DatabaseName { get; set; } = "slimmessagebus"; + + /// + /// The MongoDB collection name for outbox messages. + /// + public string CollectionName { get; set; } = "smb_outbox"; + + /// + /// The MongoDB collection name for the global table-level lock document (used when is true). + /// + public string LockCollectionName { get; set; } = "smb_outbox_lock"; + + /// + /// The MongoDB collection name used to track which schema migrations have been applied. + /// + public string MigrationsCollectionName { get; set; } = "smb_outbox_migrations"; + + /// + /// When true (the default), will create + /// collections and indices on bus start. Set to false when you manage schema changes + /// externally and want to prevent SMB from touching the database schema at startup. + /// + public bool EnableMigration { get; set; } = true; +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/IMongoDbMessageOutboxRepository.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/IMongoDbMessageOutboxRepository.cs new file mode 100644 index 00000000..215359da --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/IMongoDbMessageOutboxRepository.cs @@ -0,0 +1,5 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb; + +public interface IMongoDbMessageOutboxRepository : IOutboxMessageRepository, IOutboxMessageFactory +{ +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Interceptors/MongoDbTransactionConsumerInterceptor.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Interceptors/MongoDbTransactionConsumerInterceptor.cs new file mode 100644 index 00000000..f594188d --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Interceptors/MongoDbTransactionConsumerInterceptor.cs @@ -0,0 +1,33 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Interceptors; + +public interface IMongoDbTransactionConsumerInterceptor { } + +/// +/// Wraps the consumer in a MongoDB session transaction (requires a MongoDB replica set). +/// +/// The consumed message type. +public class MongoDbTransactionConsumerInterceptor( + ILogger logger, + IMongoDbTransactionService transactionService) + : IMongoDbTransactionConsumerInterceptor, IConsumerInterceptor where T : class +{ + public async Task OnHandle(T message, Func> next, IConsumerContext context) + { + logger.LogDebug("MongoDbTransaction - begin"); + await transactionService.BeginTransaction(); + object result; + try + { + result = await next(); + } + catch (Exception ex) + { + logger.LogDebug(ex, "MongoDbTransaction - rollback"); + await transactionService.RollbackTransaction(); + throw; + } + logger.LogDebug("MongoDbTransaction - commit"); + await transactionService.CommitTransaction(); + return result; + } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/MongoDbOutboxDocument.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/MongoDbOutboxDocument.cs new file mode 100644 index 00000000..56cd0e3c --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/MongoDbOutboxDocument.cs @@ -0,0 +1,85 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb; + +/// +/// Internal MongoDB document representing a stored outbox message. +/// +[BsonIgnoreExtraElements] +internal class MongoDbOutboxDocument +{ + [BsonId] + [BsonRepresentation(BsonType.String)] + public Guid Id { get; set; } + + [BsonElement("timestamp")] + [BsonDateTimeOptions(Kind = DateTimeKind.Utc)] + public DateTime Timestamp { get; set; } + + [BsonElement("bus_name")] + public string BusName { get; set; } = null!; + + [BsonElement("message_type")] + public string MessageType { get; set; } = null!; + + [BsonElement("message_payload")] + public byte[] MessagePayload { get; set; } = null!; + + /// + /// Headers serialized as JSON string (nullable when no headers present). + /// + [BsonElement("headers")] + public string? Headers { get; set; } + + [BsonElement("path")] + public string? Path { get; set; } + + [BsonElement("lock_instance_id")] + public string? LockInstanceId { get; set; } + + [BsonElement("lock_expires_on")] + [BsonDateTimeOptions(Kind = DateTimeKind.Utc)] + public DateTime LockExpiresOn { get; set; } + + [BsonElement("delivery_attempt")] + public int DeliveryAttempt { get; set; } + + [BsonElement("delivery_complete")] + public bool DeliveryComplete { get; set; } + + [BsonElement("delivery_aborted")] + public bool DeliveryAborted { get; set; } +} + +/// +/// Document tracking which schema migrations have been successfully applied. +/// +[BsonIgnoreExtraElements] +internal class MongoDbMigrationDocument +{ + /// Migration identifier, e.g. "20240101000000_SMB_Init". + [BsonId] + public string Id { get; set; } = null!; + + [BsonElement("applied_at")] + [BsonDateTimeOptions(Kind = DateTimeKind.Utc)] + public DateTime AppliedAt { get; set; } + + [BsonElement("product_version")] + public string ProductVersion { get; set; } = null!; +} + +/// +/// Document used as a global table lock to ensure sequential (single-instance) message processing. +/// +[BsonIgnoreExtraElements] +internal class MongoDbOutboxLockDocument +{ + [BsonId] + public string Id { get; set; } = "global"; + + [BsonElement("lock_instance_id")] + public string? LockInstanceId { get; set; } + + [BsonElement("lock_expires_on")] + [BsonDateTimeOptions(Kind = DateTimeKind.Utc)] + public DateTime LockExpiresOn { get; set; } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/MongoDbOutboxMessage.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/MongoDbOutboxMessage.cs new file mode 100644 index 00000000..18512b71 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/MongoDbOutboxMessage.cs @@ -0,0 +1,18 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb; + +public class MongoDbOutboxMessage : OutboxMessage +{ + public Guid Id { get; set; } + + public override string ToString() => Id.ToString(); +} + +internal class MongoDbOutboxAdminMessage : MongoDbOutboxMessage +{ + public DateTime Timestamp { get; set; } + public string? LockInstanceId { get; set; } + public DateTime LockExpiresOn { get; set; } + public int DeliveryAttempt { get; set; } + public bool DeliveryComplete { get; set; } + public bool DeliveryAborted { get; set; } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/ObjectToInferredTypesConverter.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/ObjectToInferredTypesConverter.cs new file mode 100644 index 00000000..8542e6f6 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/ObjectToInferredTypesConverter.cs @@ -0,0 +1,22 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb; + +/// +/// Converter that infers object to primitive types during JSON deserialization. +/// See https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/converters-how-to?pivots=dotnet-7-0#deserialize-inferred-types-to-object-properties +/// +internal sealed class ObjectToInferredTypesConverter : JsonConverter +{ + public override object Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + if (reader.TokenType == JsonTokenType.True) return true; + if (reader.TokenType == JsonTokenType.False) return false; + if (reader.TokenType == JsonTokenType.Number) + return reader.TryGetInt64(out var l) ? (object)l : reader.GetDouble(); + if (reader.TokenType == JsonTokenType.String) + return reader.TryGetDateTime(out var datetime) ? (object)datetime : reader.GetString()!; + return JsonDocument.ParseValue(ref reader).RootElement.Clone(); + } + + public override void Write(Utf8JsonWriter writer, object objectToWrite, JsonSerializerOptions options) => + JsonSerializer.Serialize(writer, objectToWrite, objectToWrite.GetType(), options); +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Repositories/MongoDbOutboxMessageRepository.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Repositories/MongoDbOutboxMessageRepository.cs new file mode 100644 index 00000000..2120f66b --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Repositories/MongoDbOutboxMessageRepository.cs @@ -0,0 +1,357 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Repositories; + +public class MongoDbOutboxMessageRepository : IMongoDbMessageOutboxRepository +{ + private static readonly JsonSerializerOptions _jsonOptions = new() + { + WriteIndented = false, + Converters = { new ObjectToInferredTypesConverter() } + }; + + // Sentinel value meaning "no lock held". Must be earlier than any possible 'now' + // (including FakeTimeProvider's default start of 2000-01-01) so that Lt(LockExpiresOn, now) + // evaluates to true for every newly created message. + private static readonly DateTime _defaultLockExpiresOn = new(1, 1, 1, 0, 0, 0, DateTimeKind.Utc); + + private readonly ILogger _logger; + private readonly TimeProvider _timeProvider; + private readonly IMongoCollection _collection; + private readonly IMongoCollection _lockCollection; + private readonly IMongoDbTransactionService _transactionService; + + internal MongoDbOutboxMessageRepository( + ILogger logger, + TimeProvider timeProvider, + IMongoCollection collection, + IMongoCollection lockCollection, + IMongoDbTransactionService transactionService) + { + _logger = logger; + _timeProvider = timeProvider; + _collection = collection; + _lockCollection = lockCollection; + _transactionService = transactionService; + } + + public async Task Create( + string busName, + IDictionary headers, + string path, + string messageType, + byte[] messagePayload, + CancellationToken cancellationToken) + { + var message = new MongoDbOutboxMessage + { + Id = Guid.NewGuid(), + BusName = busName, + Headers = headers, + Path = path, + MessageType = messageType, + MessagePayload = messagePayload + }; + + var doc = new MongoDbOutboxDocument + { + Id = message.Id, + Timestamp = _timeProvider.GetUtcNow().UtcDateTime, + BusName = busName, + MessageType = messageType, + MessagePayload = messagePayload, + Headers = headers != null ? JsonSerializer.Serialize(headers, _jsonOptions) : null, + Path = path, + LockInstanceId = null, + LockExpiresOn = _defaultLockExpiresOn, + DeliveryAttempt = 0, + DeliveryComplete = false, + DeliveryAborted = false + }; + + var session = _transactionService.CurrentSession; + if (session != null) + { + await _collection.InsertOneAsync(session, doc, options: null, cancellationToken); + } + else + { + await _collection.InsertOneAsync(doc, options: null, cancellationToken); + } + + return message; + } + + public async Task> LockAndSelect( + string instanceId, + int batchSize, + bool tableLock, + TimeSpan lockDuration, + CancellationToken cancellationToken) + { + var now = _timeProvider.GetUtcNow().UtcDateTime; + var lockExpiry = now.Add(lockDuration); + + if (tableLock) + { + var acquired = await TryAcquireTableLock(instanceId, now, lockExpiry, cancellationToken); + if (!acquired) + { + return Array.Empty(); + } + } + + // Build the filter for unlocked/expired messages that aren't yet delivered + var candidateFilter = Builders.Filter.And( + Builders.Filter.Eq(d => d.DeliveryComplete, false), + Builders.Filter.Eq(d => d.DeliveryAborted, false), + Builders.Filter.Or( + Builders.Filter.Eq(d => d.LockInstanceId, instanceId), + Builders.Filter.Lt(d => d.LockExpiresOn, now) + ) + ); + + // Find the IDs of the next batch (ordered by timestamp) + var candidateIds = await _collection + .Find(candidateFilter) + .Sort(Builders.Sort.Ascending(d => d.Timestamp)) + .Limit(batchSize) + .Project(Builders.Projection.Expression(d => d.Id)) + .ToListAsync(cancellationToken); + + if (candidateIds.Count == 0) + { + return Array.Empty(); + } + + // Atomically lock the candidates (re-applying the eligibility filter to handle races) + var lockFilter = Builders.Filter.And( + Builders.Filter.In(d => d.Id, candidateIds), + Builders.Filter.Eq(d => d.DeliveryComplete, false), + Builders.Filter.Eq(d => d.DeliveryAborted, false), + Builders.Filter.Or( + Builders.Filter.Eq(d => d.LockInstanceId, instanceId), + Builders.Filter.Lt(d => d.LockExpiresOn, now) + ) + ); + + await _collection.UpdateManyAsync( + lockFilter, + Builders.Update + .Set(d => d.LockInstanceId, instanceId) + .Set(d => d.LockExpiresOn, lockExpiry), + cancellationToken: cancellationToken); + + // Retrieve successfully locked documents + var lockedFilter = Builders.Filter.And( + Builders.Filter.In(d => d.Id, candidateIds), + Builders.Filter.Eq(d => d.LockInstanceId, instanceId), + Builders.Filter.Gt(d => d.LockExpiresOn, now), + Builders.Filter.Eq(d => d.DeliveryComplete, false), + Builders.Filter.Eq(d => d.DeliveryAborted, false) + ); + + var docs = await _collection + .Find(lockedFilter) + .Sort(Builders.Sort.Ascending(d => d.Timestamp)) + .ToListAsync(cancellationToken); + + return docs.Select(MapToMessage).ToList(); + } + + public async Task UpdateToSent(IReadOnlyCollection messages, CancellationToken cancellationToken) + { + if (messages.Count == 0) return; + + var ids = messages.Select(m => m.Id).ToList(); + var result = await _collection.UpdateManyAsync( + Builders.Filter.In(d => d.Id, ids), + Builders.Update + .Set(d => d.DeliveryComplete, true) + .Inc(d => d.DeliveryAttempt, 1), + cancellationToken: cancellationToken); + + if (result.ModifiedCount != messages.Count) + { + throw new MessageBusException($"The number of modified documents was {result.ModifiedCount}, but {messages.Count} was expected"); + } + } + + public async Task AbortDelivery(IReadOnlyCollection messages, CancellationToken cancellationToken) + { + if (messages.Count == 0) return; + + var ids = messages.Select(m => m.Id).ToList(); + var result = await _collection.UpdateManyAsync( + Builders.Filter.In(d => d.Id, ids), + Builders.Update + .Set(d => d.DeliveryAborted, true) + .Inc(d => d.DeliveryAttempt, 1), + cancellationToken: cancellationToken); + + if (result.ModifiedCount != messages.Count) + { + throw new MessageBusException($"The number of modified documents was {result.ModifiedCount}, but {messages.Count} was expected"); + } + } + + public async Task IncrementDeliveryAttempt(IReadOnlyCollection messages, int maxDeliveryAttempts, CancellationToken cancellationToken) + { + if (messages.Count == 0) return; + + if (maxDeliveryAttempts < 1) + { + throw new ArgumentOutOfRangeException(nameof(maxDeliveryAttempts), "Must be larger than 0."); + } + + var ids = messages.Select(m => m.Id).ToList(); + var idFilter = Builders.Filter.In(d => d.Id, ids); + + // Increment delivery_attempt for all + var incResult = await _collection.UpdateManyAsync( + idFilter, + Builders.Update.Inc(d => d.DeliveryAttempt, 1), + cancellationToken: cancellationToken); + + if (incResult.ModifiedCount != messages.Count) + { + throw new MessageBusException($"The number of modified documents was {incResult.ModifiedCount}, but {messages.Count} was expected"); + } + + // Abort those that have reached or exceeded the max attempts + await _collection.UpdateManyAsync( + Builders.Filter.And( + idFilter, + Builders.Filter.Gte(d => d.DeliveryAttempt, maxDeliveryAttempts) + ), + Builders.Update.Set(d => d.DeliveryAborted, true), + cancellationToken: cancellationToken); + } + + public async Task DeleteSent(DateTimeOffset olderThan, int batchSize, CancellationToken cancellationToken) + { + var olderThanUtc = olderThan.UtcDateTime; + + // Find IDs of completed (not aborted) messages older than the threshold + var candidateIds = await _collection + .Find(Builders.Filter.And( + Builders.Filter.Eq(d => d.DeliveryComplete, true), + Builders.Filter.Eq(d => d.DeliveryAborted, false), + Builders.Filter.Lt(d => d.Timestamp, olderThanUtc) + )) + .Sort(Builders.Sort.Ascending(d => d.Timestamp)) + .Limit(batchSize) + .Project(Builders.Projection.Expression(d => d.Id)) + .ToListAsync(cancellationToken); + + if (candidateIds.Count == 0) + { + return 0; + } + + var deleteResult = await _collection.DeleteManyAsync( + Builders.Filter.In(d => d.Id, candidateIds), + cancellationToken); + + var deleted = (int)deleteResult.DeletedCount; + if (deleted > 0) + _logger.LogInformation("Removed {MessageCount} sent messages from outbox collection", deleted); + else + _logger.LogDebug("Removed {MessageCount} sent messages from outbox collection", deleted); + return deleted; + } + + public async Task RenewLock(string instanceId, TimeSpan lockDuration, CancellationToken cancellationToken) + { + var now = _timeProvider.GetUtcNow().UtcDateTime; + var newExpiry = now.Add(lockDuration); + + var result = await _collection.UpdateManyAsync( + Builders.Filter.And( + Builders.Filter.Eq(d => d.LockInstanceId, instanceId), + Builders.Filter.Gt(d => d.LockExpiresOn, now), + Builders.Filter.Eq(d => d.DeliveryComplete, false), + Builders.Filter.Eq(d => d.DeliveryAborted, false) + ), + Builders.Update.Set(d => d.LockExpiresOn, newExpiry), + cancellationToken: cancellationToken); + + return result.ModifiedCount > 0; + } + + /// + /// Returns all messages from the collection (for test/admin use only). + /// + internal async Task> GetAllMessages(CancellationToken cancellationToken) + { + var docs = await _collection.Find(FilterDefinition.Empty) + .ToListAsync(cancellationToken); + + return docs.Select(d => new MongoDbOutboxAdminMessage + { + Id = d.Id, + BusName = d.BusName, + MessageType = d.MessageType, + MessagePayload = d.MessagePayload, + Headers = d.Headers == null ? null : JsonSerializer.Deserialize>(d.Headers, _jsonOptions), + Path = d.Path, + Timestamp = d.Timestamp, + LockInstanceId = d.LockInstanceId, + LockExpiresOn = d.LockExpiresOn, + DeliveryAttempt = d.DeliveryAttempt, + DeliveryComplete = d.DeliveryComplete, + DeliveryAborted = d.DeliveryAborted + }).ToList(); + } + + private MongoDbOutboxMessage MapToMessage(MongoDbOutboxDocument d) => new() + { + Id = d.Id, + BusName = d.BusName, + MessageType = d.MessageType, + MessagePayload = d.MessagePayload, + Headers = d.Headers == null ? null : JsonSerializer.Deserialize>(d.Headers, _jsonOptions), + Path = d.Path + }; + + private async Task TryAcquireTableLock(string instanceId, DateTime now, DateTime lockExpiry, CancellationToken cancellationToken) + { + // Try to update the existing global lock document if we own it or it has expired + var updateFilter = Builders.Filter.Or( + Builders.Filter.Eq(d => d.LockInstanceId, instanceId), + Builders.Filter.Lt(d => d.LockExpiresOn, now) + ); + + var updateResult = await _lockCollection.UpdateOneAsync( + updateFilter, + Builders.Update + .Set(d => d.LockInstanceId, instanceId) + .Set(d => d.LockExpiresOn, lockExpiry), + new UpdateOptions { IsUpsert = false }, + cancellationToken); + + if (updateResult.ModifiedCount > 0) + { + return true; + } + + // No document exists yet - try to insert one with our lock + try + { + await _lockCollection.InsertOneAsync( + new MongoDbOutboxLockDocument + { + Id = "global", + LockInstanceId = instanceId, + LockExpiresOn = lockExpiry + }, + options: null, + cancellationToken); + + return true; + } + catch (MongoWriteException ex) when (ex.WriteError.Category == ServerErrorCategory.DuplicateKey) + { + // Another instance inserted before us - they hold the lock + return false; + } + } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Services/MongoDbOutboxMigrationService.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Services/MongoDbOutboxMigrationService.cs new file mode 100644 index 00000000..de72c73f --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Services/MongoDbOutboxMigrationService.cs @@ -0,0 +1,188 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Services; + +public class MongoDbOutboxMigrationService : IOutboxMigrationService +{ + private readonly ILogger _logger; + private readonly MongoDbSettings _settings; + private readonly IMongoDatabase _database; + + public MongoDbOutboxMigrationService( + ILogger logger, + MongoDbSettings settings, + IMongoDatabase database) + { + _logger = logger; + _settings = settings; + _database = database; + } + + public async Task Migrate(CancellationToken token) + { + if (!_settings.EnableMigration) + { + _logger.LogDebug("MongoDB outbox migration is disabled (EnableMigration=false), skipping."); + return; + } + + _logger.LogInformation("Running MongoDB outbox migrations..."); + + var migrations = _database.GetCollection(_settings.MigrationsCollectionName); + + await TryApplyMigration(migrations, "20240101000000_SMB_Init", + () => InitialMigration(token), + token); + + _logger.LogInformation("MongoDB outbox migrations complete."); + } + + /// + /// Applies a named migration with at-least-once semantics. + /// + /// MongoDB does not permit DDL operations (e.g. createIndex) inside multi-document + /// transactions, so this method is intentionally NOT transactional. Safety is achieved + /// through idempotency instead: + /// + /// The migration record is written AFTER the action succeeds, so a crash mid-action + /// causes a retry on the next startup rather than a silent skip. + /// Concurrent instances racing on the same migration are safe because all supported + /// actions (index creation) are idempotent. + /// + /// + /// + /// Important: Only add migration actions that are safe to run more than once + /// (i.e. use IF NOT EXISTS semantics). Destructive one-shot operations are not + /// supported and must be applied externally with + /// set to false. + /// + /// + private async Task TryApplyMigration( + IMongoCollection migrations, + string migrationId, + Func action, + CancellationToken token) + { + var alreadyApplied = await migrations + .Find(Builders.Filter.Eq(d => d.Id, migrationId)) + .AnyAsync(token); + + if (alreadyApplied) + { + _logger.LogDebug("Migration {MigrationId} already applied, skipping.", migrationId); + return false; + } + + _logger.LogDebug("Applying migration {MigrationId}...", migrationId); + await action(); + + // Record the migration AFTER the action succeeds so that a crash mid-action + // causes a retry on the next startup rather than being silently skipped. + try + { + await migrations.InsertOneAsync( + new MongoDbMigrationDocument + { + Id = migrationId, + AppliedAt = DateTime.UtcNow, + ProductVersion = GetType().Assembly.GetName().Version?.ToString() ?? "unknown" + }, + options: null, + token); + + _logger.LogInformation("Migration {MigrationId} applied.", migrationId); + } + catch (MongoWriteException ex) when (ex.WriteError.Category == ServerErrorCategory.DuplicateKey) + { + // Another instance recorded the same migration concurrently - that is fine. + _logger.LogDebug(ex, "Migration {MigrationId} was recorded concurrently by another instance.", migrationId); + } + + return true; + } + + // ── Versioned migration steps ──────────────────────────────────────────── + + /// Initial schema: creates outbox and lock collection indices. + private async Task InitialMigration(CancellationToken token) + { + await EnsureOutboxCollectionIndexes(token); + await EnsureLockCollectionIndexes(token); + } + + // ── Index helpers ──────────────────────────────────────────────────────── + + private async Task EnsureOutboxCollectionIndexes(CancellationToken token) + { + var collection = _database.GetCollection(_settings.CollectionName); + + // Primary polling query: delivery_complete=false, delivery_aborted=false, ordered by timestamp + await EnsureIndex(collection, new CreateIndexModel( + Builders.IndexKeys + .Ascending(d => d.DeliveryComplete) + .Ascending(d => d.DeliveryAborted) + .Ascending(d => d.Timestamp), + new CreateIndexOptions + { + Name = "ix_smb_outbox_delivery_complete_aborted_timestamp", + Background = true + }), + token); + + // Lock-ownership queries (RenewLock, LockAndSelect) + await EnsureIndex(collection, new CreateIndexModel( + Builders.IndexKeys + .Ascending(d => d.LockInstanceId) + .Ascending(d => d.LockExpiresOn), + new CreateIndexOptions + { + Name = "ix_smb_outbox_lock_instance_id_expires_on", + Background = true + }), + token); + + // Cleanup ordering (DeleteSent) + await EnsureIndex(collection, new CreateIndexModel( + Builders.IndexKeys + .Ascending(d => d.Timestamp), + new CreateIndexOptions + { + Name = "ix_smb_outbox_timestamp", + Background = true + }), + token); + } + + private async Task EnsureLockCollectionIndexes(CancellationToken token) + { + var collection = _database.GetCollection(_settings.LockCollectionName); + + await EnsureIndex(collection, new CreateIndexModel( + Builders.IndexKeys.Ascending(d => d.LockExpiresOn), + new CreateIndexOptions + { + Name = "ix_smb_outbox_lock_expires_on", + Background = true + }), + token); + } + + private async Task EnsureIndex( + IMongoCollection collection, + CreateIndexModel indexModel, + CancellationToken token) + { + try + { + await collection.Indexes.CreateOneAsync(indexModel, cancellationToken: token); + } + catch (MongoCommandException ex) when ( + ex.CodeName == "IndexOptionsConflict" || + ex.CodeName == "IndexKeySpecsConflict") + { + _logger.LogDebug( + ex, + "Index {IndexName} already exists on {Collection}.", + indexModel.Options.Name, + collection.CollectionNamespace.CollectionName); + } + } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/SlimMessageBus.Host.Outbox.MongoDb.csproj b/src/SlimMessageBus.Host.Outbox.MongoDb/SlimMessageBus.Host.Outbox.MongoDb.csproj new file mode 100644 index 00000000..c8156b67 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/SlimMessageBus.Host.Outbox.MongoDb.csproj @@ -0,0 +1,25 @@ + + + + + + net10.0;net8.0 + Plugin for SlimMessageBus that adds Transactional Outbox pattern support using MongoDB + SlimMessageBus MessageBus Transactional Outbox MongoDB + enable + + + + + + + + + + + + + + + + diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/AbstractMongoDbTransactionService.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/AbstractMongoDbTransactionService.cs new file mode 100644 index 00000000..6d7b73d0 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/AbstractMongoDbTransactionService.cs @@ -0,0 +1,6 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Transactions; + +public abstract class AbstractMongoDbTransactionService : AbstractOutboxTransactionService, IMongoDbTransactionService +{ + public abstract IClientSessionHandle? CurrentSession { get; } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/IMongoDbTransactionService.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/IMongoDbTransactionService.cs new file mode 100644 index 00000000..2feeb161 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/IMongoDbTransactionService.cs @@ -0,0 +1,9 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Transactions; + +public interface IMongoDbTransactionService : IAsyncDisposable +{ + IClientSessionHandle? CurrentSession { get; } + Task BeginTransaction(); + Task CommitTransaction(); + Task RollbackTransaction(); +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/MongoDbSessionHolder.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/MongoDbSessionHolder.cs new file mode 100644 index 00000000..3fc0f18c --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/MongoDbSessionHolder.cs @@ -0,0 +1,13 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Transactions; + +/// +/// Scoped mutable holder for the active MongoDB session. +/// Set by when a transaction begins; +/// read by the factory registered for +/// Lazy<IClientSessionHandle?> so consumers can access the session +/// lazily inside their OnHandle method. +/// +internal sealed class MongoDbSessionHolder +{ + public IClientSessionHandle? Session { get; set; } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/MongoDbTransactionService.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/MongoDbTransactionService.cs new file mode 100644 index 00000000..533892e9 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Transactions/MongoDbTransactionService.cs @@ -0,0 +1,45 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Transactions; + +/// +/// Manages a MongoDB client session and transaction (requires a MongoDB replica set). +/// +internal class MongoDbTransactionService(IMongoClient mongoClient, MongoDbSessionHolder sessionHolder) + : AbstractMongoDbTransactionService +{ + private IClientSessionHandle? _session; + + public override IClientSessionHandle? CurrentSession => _session; + + protected override async Task OnBeginTransaction() + { + _session = await mongoClient.StartSessionAsync(); + _session.StartTransaction(); + sessionHolder.Session = _session; + } + + protected override async Task OnCompleteTransaction(bool transactionFailed) + { + if (_session == null) + { + return; + } + + try + { + if (transactionFailed) + { + await _session.AbortTransactionAsync(); + } + else + { + await _session.CommitTransactionAsync(); + } + } + finally + { + sessionHolder.Session = null; + _session.Dispose(); + _session = null; + } + } +} diff --git a/src/SlimMessageBus.Host.Outbox.MongoDb/Usings.cs b/src/SlimMessageBus.Host.Outbox.MongoDb/Usings.cs new file mode 100644 index 00000000..425e4548 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox.MongoDb/Usings.cs @@ -0,0 +1,23 @@ +global using System; +global using System.Collections.Generic; +global using System.Diagnostics.CodeAnalysis; +global using System.Linq; +global using System.Text.Json; +global using System.Text.Json.Serialization; +global using System.Threading; +global using System.Threading.Tasks; + +global using Microsoft.Extensions.DependencyInjection; +global using Microsoft.Extensions.DependencyInjection.Extensions; +global using Microsoft.Extensions.Logging; + +global using MongoDB.Bson; +global using MongoDB.Bson.Serialization.Attributes; +global using MongoDB.Driver; + +global using SlimMessageBus.Host.Interceptor; +global using SlimMessageBus.Host.Outbox.MongoDb.Configuration; +global using SlimMessageBus.Host.Outbox.MongoDb.Interceptors; +global using SlimMessageBus.Host.Outbox.MongoDb.Repositories; +global using SlimMessageBus.Host.Outbox.MongoDb.Services; +global using SlimMessageBus.Host.Outbox.MongoDb.Transactions; diff --git a/src/SlimMessageBus.Host.Outbox.PostgreSql/ObjectToInferredTypesConverter.cs b/src/SlimMessageBus.Host.Outbox.PostgreSql/ObjectToInferredTypesConverter.cs index b75de73f..3869d4ba 100644 --- a/src/SlimMessageBus.Host.Outbox.PostgreSql/ObjectToInferredTypesConverter.cs +++ b/src/SlimMessageBus.Host.Outbox.PostgreSql/ObjectToInferredTypesConverter.cs @@ -1,4 +1,4 @@ -namespace SlimMessageBus.Host.Outbox.PostgreSql; +namespace SlimMessageBus.Host.Outbox.PostgreSql; /// /// Converter that infers object to primitive types. See https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/converters-how-to?pivots=dotnet-7-0#deserialize-inferred-types-to-object-properties diff --git a/src/SlimMessageBus.Host.Outbox.PostgreSql/Transactions/AbstractPostgreSqlTransactionService.cs b/src/SlimMessageBus.Host.Outbox.PostgreSql/Transactions/AbstractPostgreSqlTransactionService.cs index be557b8e..723e8a31 100644 --- a/src/SlimMessageBus.Host.Outbox.PostgreSql/Transactions/AbstractPostgreSqlTransactionService.cs +++ b/src/SlimMessageBus.Host.Outbox.PostgreSql/Transactions/AbstractPostgreSqlTransactionService.cs @@ -1,83 +1,13 @@ -namespace SlimMessageBus.Host.Outbox.PostgreSql.Transactions; +namespace SlimMessageBus.Host.Outbox.PostgreSql.Transactions; -public abstract class AbstractPostgreSqlTransactionService : IPostgreSqlTransactionService +public abstract class AbstractPostgreSqlTransactionService : AbstractOutboxTransactionService, IPostgreSqlTransactionService { protected AbstractPostgreSqlTransactionService(NpgsqlConnection connection) { Connection = connection; } - private int _transactionCount; - private bool _transactionFailed; - private bool _transactionCompleted; - public NpgsqlConnection Connection { get; } public abstract NpgsqlTransaction? CurrentTransaction { get; } - - #region IAsyncDisposable - - public async ValueTask DisposeAsync() - { - await DisposeAsyncCore(); - GC.SuppressFinalize(this); - } - - protected async virtual ValueTask DisposeAsyncCore() - { - if (!_transactionCompleted && _transactionCount > 0) - { - await RollbackTransaction(); - } - } - - #endregion - - public async virtual Task BeginTransaction() - { - if (_transactionCompleted) - { - throw new MessageBusException("Transaction was completed already"); - } - - if (_transactionCount == 0) - { - // Start transaction - await OnBeginTransaction(); - _transactionFailed = false; - _transactionCompleted = false; - } - _transactionCount++; - } - - private async Task TryCompleteTransaction(bool transactionFailed = false) - { - if (_transactionCount == 0) - { - throw new MessageBusException("Transaction has not been started"); - } - - _transactionCount--; - - if (transactionFailed) - { - // Mark the transaction as failed - _transactionFailed = true; - } - - if (!_transactionCompleted && (_transactionCount == 0 || transactionFailed)) - { - _transactionCompleted = true; - _transactionCount = 0; - await OnCompleteTransaction(_transactionFailed); - } - } - - protected abstract Task OnBeginTransaction(); - - protected abstract Task OnCompleteTransaction(bool transactionFailed); - - public virtual Task CommitTransaction() => TryCompleteTransaction(transactionFailed: false); - - public virtual Task RollbackTransaction() => TryCompleteTransaction(transactionFailed: true); } diff --git a/src/SlimMessageBus.Host.Outbox.Sql/ObjectToInferredTypesConverter.cs b/src/SlimMessageBus.Host.Outbox.Sql/ObjectToInferredTypesConverter.cs index 12f92e87..a273992e 100644 --- a/src/SlimMessageBus.Host.Outbox.Sql/ObjectToInferredTypesConverter.cs +++ b/src/SlimMessageBus.Host.Outbox.Sql/ObjectToInferredTypesConverter.cs @@ -1,25 +1,25 @@ -namespace SlimMessageBus.Host.Outbox.Sql; - -using System.Text.Json; -using System.Text.Json.Serialization; - -/// -/// Converter that infers object to primitive types. See https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/converters-how-to?pivots=dotnet-7-0#deserialize-inferred-types-to-object-properties -/// -public class ObjectToInferredTypesConverter : JsonConverter -{ - public override object Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => - reader.TokenType switch - { - JsonTokenType.True => true, - JsonTokenType.False => false, - JsonTokenType.Number when reader.TryGetInt64(out var l) => l, - JsonTokenType.Number => reader.GetDouble(), - JsonTokenType.String when reader.TryGetDateTime(out var datetime) => datetime, - JsonTokenType.String => reader.GetString()!, - _ => JsonDocument.ParseValue(ref reader).RootElement.Clone() - }; - - public override void Write(Utf8JsonWriter writer, object objectToWrite, JsonSerializerOptions options) => - JsonSerializer.Serialize(writer, objectToWrite, objectToWrite.GetType(), options); -} +namespace SlimMessageBus.Host.Outbox.Sql; + +using System.Text.Json; +using System.Text.Json.Serialization; + +/// +/// Converter that infers object to primitive types. See https://learn.microsoft.com/en-us/dotnet/standard/serialization/system-text-json/converters-how-to?pivots=dotnet-7-0#deserialize-inferred-types-to-object-properties +/// +public class ObjectToInferredTypesConverter : JsonConverter +{ + public override object Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => + reader.TokenType switch + { + JsonTokenType.True => true, + JsonTokenType.False => false, + JsonTokenType.Number when reader.TryGetInt64(out var l) => l, + JsonTokenType.Number => reader.GetDouble(), + JsonTokenType.String when reader.TryGetDateTime(out var datetime) => datetime, + JsonTokenType.String => reader.GetString()!, + _ => JsonDocument.ParseValue(ref reader).RootElement.Clone() + }; + + public override void Write(Utf8JsonWriter writer, object objectToWrite, JsonSerializerOptions options) => + JsonSerializer.Serialize(writer, objectToWrite, objectToWrite.GetType(), options); +} diff --git a/src/SlimMessageBus.Host.Outbox/AbstractOutboxTransactionService.cs b/src/SlimMessageBus.Host.Outbox/AbstractOutboxTransactionService.cs new file mode 100644 index 00000000..842ed1a0 --- /dev/null +++ b/src/SlimMessageBus.Host.Outbox/AbstractOutboxTransactionService.cs @@ -0,0 +1,78 @@ +namespace SlimMessageBus.Host.Outbox; + +/// +/// Abstract base class that implements the nested-transaction state machine shared by all outbox +/// transaction-service implementations (MongoDB, PostgreSQL, SQL …). +/// Subclasses provide the DB-specific and +/// hooks. +/// +public abstract class AbstractOutboxTransactionService : IAsyncDisposable +{ + private int _transactionCount; + private bool _transactionFailed; + private bool _transactionCompleted; + + #region IAsyncDisposable + + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore(); + GC.SuppressFinalize(this); + } + + protected virtual async ValueTask DisposeAsyncCore() + { + if (!_transactionCompleted && _transactionCount > 0) + { + await RollbackTransaction(); + } + } + + #endregion + + public virtual async Task BeginTransaction() + { + if (_transactionCompleted) + { + throw new MessageBusException("Transaction was completed already"); + } + + if (_transactionCount == 0) + { + await OnBeginTransaction(); + _transactionFailed = false; + _transactionCompleted = false; + } + _transactionCount++; + } + + private async Task TryCompleteTransaction(bool transactionFailed = false) + { + if (_transactionCount == 0) + { + throw new MessageBusException("Transaction has not been started"); + } + + _transactionCount--; + + if (transactionFailed) + { + _transactionFailed = true; + } + + if (!_transactionCompleted && (_transactionCount == 0 || transactionFailed)) + { + _transactionCompleted = true; + _transactionCount = 0; + await OnCompleteTransaction(_transactionFailed); + } + } + + protected abstract Task OnBeginTransaction(); + + protected abstract Task OnCompleteTransaction(bool transactionFailed); + + public virtual Task CommitTransaction() => TryCompleteTransaction(transactionFailed: false); + + public virtual Task RollbackTransaction() => TryCompleteTransaction(transactionFailed: true); +} diff --git a/src/SlimMessageBus.Host/Consumer/Context/ConsumerContext.cs b/src/SlimMessageBus.Host/Consumer/Context/ConsumerContext.cs index bdb69971..8aeed966 100644 --- a/src/SlimMessageBus.Host/Consumer/Context/ConsumerContext.cs +++ b/src/SlimMessageBus.Host/Consumer/Context/ConsumerContext.cs @@ -18,7 +18,39 @@ public IDictionary Properties set => _properties = value; } - public object Consumer { get; set; } + private object _consumer; + private Func _consumerFactory; + + /// + /// Lazily resolves the consumer instance on first access. + /// Set by via so that + /// the consumer is created after all interceptors have had a chance to run + /// (e.g. to start a database transaction before the consumer is constructed and its + /// scoped DI dependencies are resolved). + /// + public object Consumer + { + get => _consumer ??= _consumerFactory?.Invoke(); + set => _consumer = value; + } + + /// + /// Factory used to lazily resolve the consumer instance from DI. + /// Set by instead of an eager + /// GetService call so that consumer construction is deferred until the + /// interceptor pipeline has completed. + /// Assigning a new factory also clears any previously cached + /// so the factory is always invoked fresh for each consumer invocation, even when + /// the same instance is reused within the same DI scope. + /// + internal Func ConsumerFactory + { + set + { + _consumerFactory = value; + _consumer = null; // reset cache so factory is re-invoked for the new consumer type + } + } public IMessageTypeConsumerInvokerSettings ConsumerInvoker { get; set; } } diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs index e9376359..5972b5b0 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageHandler.cs @@ -96,13 +96,14 @@ public MessageHandler( } var messageBusTarget = new MessageBusProxy(MessageBus, messageScope.ServiceProvider); - object consumerInstance = null; + // Consumer is resolved lazily inside ConsumerContext.Consumer (on first access from + // ExecuteConsumer) so that all interceptors run first. This allows interceptors to + // start transactions or set up ambient state before the consumer's scoped DI + // dependencies are resolved. + ConsumerContext consumerContext = null; try { - consumerInstance = messageScope.ServiceProvider.GetService(consumerType) - ?? throw new ConfigurationMessageBusException($"Could not resolve consumer/handler type {consumerType} from the DI container. Please check that the configured type {consumerType} is registered within the DI container."); - - var consumerContext = CreateConsumerContext(messageScope, messageHeaders, consumerInvoker, transportMessage, consumerInstance, messageBusTarget, consumerContextProperties, cancellationToken); + consumerContext = CreateConsumerContext(messageScope, messageHeaders, consumerInvoker, transportMessage, consumerType, messageBusTarget, consumerContextProperties, cancellationToken); try { var response = await DoHandleInternal(message, consumerInvoker, messageType, hasResponse, responseType, messageScope, consumerContext).ConfigureAwait(false); @@ -128,9 +129,9 @@ public MessageHandler( } finally { - if (consumerInvoker.ParentSettings.IsDisposeConsumerEnabled && consumerInstance is IDisposable consumerInstanceDisposable) + if (consumerInvoker.ParentSettings.IsDisposeConsumerEnabled && consumerContext?.Consumer is IDisposable consumerInstanceDisposable) { - LogDisposingConsumer(consumerType, consumerInstance); + LogDisposingConsumer(consumerType, consumerContext.Consumer); consumerInstanceDisposable.DisposeSilently("ConsumerInstance", _logger); } } @@ -185,7 +186,7 @@ protected virtual ConsumerContext CreateConsumerContext( IReadOnlyDictionary messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, object transportMessage, - object consumerInstance, + Type consumerType, IMessageBus messageBus, IDictionary consumerContextProperties, CancellationToken cancellationToken) @@ -197,10 +198,18 @@ protected virtual ConsumerContext CreateConsumerContext( consumerContext.Headers = messageHeaders; consumerContext.Bus = messageBus; consumerContext.CancellationToken = cancellationToken; - consumerContext.Consumer = consumerInstance; consumerContext.ConsumerInvoker = consumerInvoker; consumerContext.Properties = consumerContextProperties; + // Defer consumer resolution until first access (inside ExecuteConsumer, after all + // interceptors have run). This ensures that interceptors can set up ambient state + // (e.g. database transactions) before the consumer's scoped dependencies are created. + consumerContext.ConsumerFactory = () => + messageScope.ServiceProvider.GetService(consumerType) + ?? throw new ConfigurationMessageBusException( + $"Could not resolve consumer/handler type {consumerType} from the DI container. " + + $"Please check that the configured type {consumerType} is registered within the DI container."); + return consumerContext; } diff --git a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs index f0adafa7..6fcc65b5 100644 --- a/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs +++ b/src/SlimMessageBus.Host/Consumer/MessageProcessors/MessageProcessor.cs @@ -67,12 +67,12 @@ protected override ConsumerContext CreateConsumerContext(IMessageScope messageSc IReadOnlyDictionary messageHeaders, IMessageTypeConsumerInvokerSettings consumerInvoker, object transportMessage, - object consumerInstance, + Type consumerType, IMessageBus messageBus, IDictionary consumerContextProperties, CancellationToken cancellationToken) { - var context = base.CreateConsumerContext(messageScope, messageHeaders, consumerInvoker, transportMessage, consumerInstance, messageBus, consumerContextProperties, cancellationToken); + var context = base.CreateConsumerContext(messageScope, messageHeaders, consumerInvoker, transportMessage, consumerType, messageBus, consumerContextProperties, cancellationToken); _consumerContextInitializer?.Invoke((TTransportMessage)transportMessage, context); diff --git a/src/SlimMessageBus.sln b/src/SlimMessageBus.sln index 8cfe674a..f272f779 100644 --- a/src/SlimMessageBus.sln +++ b/src/SlimMessageBus.sln @@ -296,6 +296,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Outbox. EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Outbox.PostgreSql.DbContext.Test", "Tests\SlimMessageBus.Host.Outbox.PostgreSql.DbContext.Test\SlimMessageBus.Host.Outbox.PostgreSql.DbContext.Test.csproj", "{0E3DB2A3-FF12-6B4E-6FE0-6B9E61C6D01C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Outbox.MongoDb", "SlimMessageBus.Host.Outbox.MongoDb\SlimMessageBus.Host.Outbox.MongoDb.csproj", "{A3B45901-23CD-4EF5-89AB-C234567890AB}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SlimMessageBus.Host.Outbox.MongoDb.Test", "Tests\SlimMessageBus.Host.Outbox.MongoDb.Test\SlimMessageBus.Host.Outbox.MongoDb.Test.csproj", "{B4C56012-34DE-5F06-9ABC-D345678901BC}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -952,6 +956,22 @@ Global {0E3DB2A3-FF12-6B4E-6FE0-6B9E61C6D01C}.Release|Any CPU.Build.0 = Release|Any CPU {0E3DB2A3-FF12-6B4E-6FE0-6B9E61C6D01C}.Release|x86.ActiveCfg = Release|Any CPU {0E3DB2A3-FF12-6B4E-6FE0-6B9E61C6D01C}.Release|x86.Build.0 = Release|Any CPU + {A3B45901-23CD-4EF5-89AB-C234567890AB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A3B45901-23CD-4EF5-89AB-C234567890AB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A3B45901-23CD-4EF5-89AB-C234567890AB}.Debug|x86.ActiveCfg = Debug|Any CPU + {A3B45901-23CD-4EF5-89AB-C234567890AB}.Debug|x86.Build.0 = Debug|Any CPU + {A3B45901-23CD-4EF5-89AB-C234567890AB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A3B45901-23CD-4EF5-89AB-C234567890AB}.Release|Any CPU.Build.0 = Release|Any CPU + {A3B45901-23CD-4EF5-89AB-C234567890AB}.Release|x86.ActiveCfg = Release|Any CPU + {A3B45901-23CD-4EF5-89AB-C234567890AB}.Release|x86.Build.0 = Release|Any CPU + {B4C56012-34DE-5F06-9ABC-D345678901BC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B4C56012-34DE-5F06-9ABC-D345678901BC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B4C56012-34DE-5F06-9ABC-D345678901BC}.Debug|x86.ActiveCfg = Debug|Any CPU + {B4C56012-34DE-5F06-9ABC-D345678901BC}.Debug|x86.Build.0 = Debug|Any CPU + {B4C56012-34DE-5F06-9ABC-D345678901BC}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B4C56012-34DE-5F06-9ABC-D345678901BC}.Release|Any CPU.Build.0 = Release|Any CPU + {B4C56012-34DE-5F06-9ABC-D345678901BC}.Release|x86.ActiveCfg = Release|Any CPU + {B4C56012-34DE-5F06-9ABC-D345678901BC}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1050,6 +1070,8 @@ Global {DC7F9598-A395-425E-9997-5EA2D3B8D176} = {9F005B5C-A856-4351-8C0C-47A8B785C637} {C65E171C-6D9F-4C9A-931E-97ED84AF57BA} = {B7CDA0D2-98AB-42B6-A41A-9D893C1B9DCB} {0E3DB2A3-FF12-6B4E-6FE0-6B9E61C6D01C} = {9F005B5C-A856-4351-8C0C-47A8B785C637} + {A3B45901-23CD-4EF5-89AB-C234567890AB} = {B7CDA0D2-98AB-42B6-A41A-9D893C1B9DCB} + {B4C56012-34DE-5F06-9ABC-D345678901BC} = {9F005B5C-A856-4351-8C0C-47A8B785C637} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {435A0D65-610C-4B84-B1AA-2C7FBE72DB80} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/AbstractMongoDbTransactionServiceTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/AbstractMongoDbTransactionServiceTests.cs new file mode 100644 index 00000000..67d22d39 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/AbstractMongoDbTransactionServiceTests.cs @@ -0,0 +1,149 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +public class AbstractMongoDbTransactionServiceTests +{ + private readonly TestTransactionService _sut = new(); + + [Fact] + public async Task When_BeginTransaction_Given_FirstCall_Then_InvokesOnBeginTransaction() + { + await _sut.BeginTransaction(); + + _sut.BeginCount.Should().Be(1); + } + + [Fact] + public async Task When_BeginTransaction_Given_Nested_Then_OnlyCallsOnBeginTransactionOnce() + { + await _sut.BeginTransaction(); + await _sut.BeginTransaction(); + + _sut.BeginCount.Should().Be(1); + } + + [Fact] + public async Task When_BeginTransaction_Given_AlreadyCompleted_Then_ThrowsMessageBusException() + { + await _sut.BeginTransaction(); + await _sut.CommitTransaction(); + + var act = () => _sut.BeginTransaction(); + + await act.Should().ThrowAsync(); + } + + [Fact] + public async Task When_CommitTransaction_Given_SingleTransaction_Then_CompletesWithoutFailure() + { + await _sut.BeginTransaction(); + await _sut.CommitTransaction(); + + _sut.CommitCount.Should().Be(1); + _sut.RollbackCount.Should().Be(0); + } + + [Fact] + public async Task When_CommitTransaction_Given_NestedTransactions_Then_OnlyCompletesOnLastCommit() + { + await _sut.BeginTransaction(); + await _sut.BeginTransaction(); + + await _sut.CommitTransaction(); // inner — should not complete yet + _sut.CommitCount.Should().Be(0); + + await _sut.CommitTransaction(); // outer — should now complete + _sut.CommitCount.Should().Be(1); + } + + [Fact] + public async Task When_CommitTransaction_Given_NotStarted_Then_ThrowsMessageBusException() + { + var act = () => _sut.CommitTransaction(); + + await act.Should().ThrowAsync(); + } + + [Fact] + public async Task When_RollbackTransaction_Given_ActiveTransaction_Then_CompletesWithFailure() + { + await _sut.BeginTransaction(); + await _sut.RollbackTransaction(); + + _sut.RollbackCount.Should().Be(1); + _sut.CommitCount.Should().Be(0); + } + + [Fact] + public async Task When_RollbackTransaction_Given_NestedTransactions_Then_CompletesImmediatelyWithFailure() + { + await _sut.BeginTransaction(); + await _sut.BeginTransaction(); + + // rollback inside nested transaction should fail-fast + await _sut.RollbackTransaction(); + + _sut.RollbackCount.Should().Be(1); + _sut.CommitCount.Should().Be(0); + } + + [Fact] + public async Task When_RollbackTransaction_Given_NotStarted_Then_ThrowsMessageBusException() + { + var act = () => _sut.RollbackTransaction(); + + await act.Should().ThrowAsync(); + } + + [Fact] + public async Task When_DisposeAsync_Given_TransactionNotCompleted_Then_RollsBack() + { + await _sut.BeginTransaction(); + + await _sut.DisposeAsync(); + + _sut.RollbackCount.Should().Be(1); + } + + [Fact] + public async Task When_DisposeAsync_Given_TransactionAlreadyCompleted_Then_DoesNotRollBack() + { + await _sut.BeginTransaction(); + await _sut.CommitTransaction(); + + await _sut.DisposeAsync(); + + _sut.RollbackCount.Should().Be(0); + } + + [Fact] + public async Task When_DisposeAsync_Given_NoTransactionStarted_Then_DoesNotRollBack() + { + await _sut.DisposeAsync(); + + _sut.RollbackCount.Should().Be(0); + } + + private sealed class TestTransactionService : AbstractMongoDbTransactionService + { + public int BeginCount { get; private set; } + public int CommitCount { get; private set; } + public int RollbackCount { get; private set; } + + public override IClientSessionHandle? CurrentSession => null; + + protected override Task OnBeginTransaction() + { + BeginCount++; + return Task.CompletedTask; + } + + protected override Task OnCompleteTransaction(bool transactionFailed) + { + if (transactionFailed) + RollbackCount++; + else + CommitCount++; + return Task.CompletedTask; + } + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/BaseMongoDbOutboxRepositoryTest.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/BaseMongoDbOutboxRepositoryTest.cs new file mode 100644 index 00000000..0e5399c0 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/BaseMongoDbOutboxRepositoryTest.cs @@ -0,0 +1,128 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +[Collection(nameof(MongoDbCollection))] +public class BaseMongoDbOutboxRepositoryTest : IAsyncLifetime +{ + protected readonly Fixture _fixture = new(); + + private readonly MongoDbFixture _mongoDbFixture; + private IMongoClient _client = null!; + private IMongoDatabase _database = null!; + + protected MongoDbOutboxMessageRepository _target = null!; + protected MongoDbOutboxMigrationService _migrationService = null!; + protected MongoDbOutboxSettings _settings = null!; + protected FakeTimeProvider _currentTimeProvider = null!; + + protected BaseMongoDbOutboxRepositoryTest(MongoDbFixture mongoDbFixture) + { + _mongoDbFixture = mongoDbFixture; + } + + public async Task InitializeAsync() + { + // Each test uses its own database to achieve isolation + var dbName = Debugger.IsAttached ? "smb_outbox_test" : $"smb_outbox_{Guid.NewGuid():N}"; + + _settings = new MongoDbOutboxSettings(); + _settings.MongoDbSettings.DatabaseName = dbName; + + _client = _mongoDbFixture.CreateClient(); + _database = _client.GetDatabase(dbName); + + _currentTimeProvider = new FakeTimeProvider(); + + var mongoDbSettings = _settings.MongoDbSettings; + var outboxCollection = _database.GetCollection(mongoDbSettings.CollectionName); + var lockCollection = _database.GetCollection(mongoDbSettings.LockCollectionName); + + var transactionService = new NullMongoDbTransactionService(); + + _target = new MongoDbOutboxMessageRepository( + NullLogger.Instance, + _currentTimeProvider, + outboxCollection, + lockCollection, + transactionService); + + _migrationService = new MongoDbOutboxMigrationService( + NullLogger.Instance, + mongoDbSettings, + _database); + + await _migrationService.Migrate(CancellationToken.None); + } + + public async Task DisposeAsync() + { + // Drop the test database to clean up + await _client.DropDatabaseAsync(_settings.MongoDbSettings.DatabaseName); + _client.Dispose(); + } + + internal async Task> SeedOutbox( + int count, + Action? action = null, + CancellationToken cancellationToken = default) + { + var messages = CreateOutboxMessages(count); + var results = new List(count); + + for (var i = 0; i < messages.Count; i++) + { + var message = messages[i]; + action?.Invoke(i, message); + results.Add((MongoDbOutboxMessage)await _target.Create( + message.BusName, + message.Headers, + message.Path, + message.MessageType, + message.MessagePayload, + cancellationToken)); + } + + return results; + } + + internal IReadOnlyList CreateOutboxMessages(int count) + { + return Enumerable + .Range(0, count) + .Select(_ => + { + var samplePayload = new { Key = _fixture.Create(), Number = _fixture.Create() }; + var jsonPayload = JsonSerializer.SerializeToUtf8Bytes(samplePayload); + + var headers = new Dictionary + { + { "Header1", _fixture.Create() }, + { "Header2", _fixture.Create() }, + { "Header3", _fixture.Create() } + }; + + _fixture.Customize(om => om + .With(x => x.MessagePayload, jsonPayload) + .With(x => x.Headers, (IDictionary)headers) + .With(x => x.LockExpiresOn, DateTime.MinValue) + .With(x => x.LockInstanceId, (string?)null) + .With(x => x.DeliveryAborted, false) + .With(x => x.DeliveryAttempt, 0) + .With(x => x.DeliveryComplete, false)); + + return _fixture.Create(); + }) + .ToList(); + } + + /// + /// A no-op transaction service for tests that don't need transactions. + /// + private sealed class NullMongoDbTransactionService : IMongoDbTransactionService + { + public IClientSessionHandle? CurrentSession => null; + public Task BeginTransaction() => Task.CompletedTask; + public Task CommitTransaction() => Task.CompletedTask; + public Task RollbackTransaction() => Task.CompletedTask; + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/BuilderExtensionsTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/BuilderExtensionsTests.cs new file mode 100644 index 00000000..39a05536 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/BuilderExtensionsTests.cs @@ -0,0 +1,97 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +using SlimMessageBus.Host; +using MongoDbBuilderExt = SlimMessageBus.Host.Outbox.MongoDb.Configuration.BuilderExtensions; + +public class BuilderExtensionsTests +{ + [Fact] + public void When_UseMongoDbTransaction_Given_MessageBusBuilderAndEnabled_Then_SetsEnabledProperty() + { + var builder = MessageBusBuilder.Create(); + + builder.UseMongoDbTransaction(enabled: true); + + builder.Settings.Properties[MongoDbBuilderExt.PropertyMongoDbTransactionEnabled].Should().Be(true); + builder.Settings.Properties[MongoDbBuilderExt.PropertyMongoDbTransactionFilter].Should().BeNull(); + } + + [Fact] + public void When_UseMongoDbTransaction_Given_MessageBusBuilderAndDisabled_Then_SetsEnabledFalse() + { + var builder = MessageBusBuilder.Create(); + + builder.UseMongoDbTransaction(enabled: false); + + builder.Settings.Properties[MongoDbBuilderExt.PropertyMongoDbTransactionEnabled].Should().Be(false); + } + + [Fact] + public void When_UseMongoDbTransaction_Given_MessageBusBuilderAndMessageTypeFilter_Then_SetsFilterProperty() + { + var builder = MessageBusBuilder.Create(); + Func filter = t => t == typeof(string); + + builder.UseMongoDbTransaction(messageTypeFilter: filter); + + builder.Settings.Properties[MongoDbBuilderExt.PropertyMongoDbTransactionFilter].Should().BeSameAs(filter); + } + + [Fact] + public void When_UseMongoDbTransaction_Given_MessageBusBuilder_Then_ReturnsSameBuilder() + { + var builder = MessageBusBuilder.Create(); + + var result = builder.UseMongoDbTransaction(); + + result.Should().BeSameAs(builder); + } + + [Fact] + public void When_UseMongoDbTransaction_Given_ConsumerBuilder_Then_SetsEnabledPropertyOnBusSettings() + { + // ConsumerBuilder.Settings returns the parent bus settings (not consumer-specific settings), + // so the flag ends up on the bus-level Properties dictionary. + var consumer = new ConsumerBuilder(new MessageBusSettings()); + + consumer.UseMongoDbTransaction(enabled: true); + + consumer.Settings.Properties[MongoDbBuilderExt.PropertyMongoDbTransactionEnabled].Should().Be(true); + consumer.Settings.Properties[MongoDbBuilderExt.PropertyMongoDbTransactionFilter].Should().BeNull(); + } + + [Fact] + public void When_UseMongoDbTransaction_Given_ConsumerBuilder_Then_ReturnsSameBuilder() + { + var consumer = new ConsumerBuilder(new MessageBusSettings()); + + var result = consumer.UseMongoDbTransaction(); + + result.Should().BeSameAs(consumer); + } + + [Fact] + public void When_UseMongoDbTransaction_Given_HandlerBuilder_Then_SetsEnabledPropertyOnBusSettings() + { + var handler = new HandlerBuilder(new MessageBusSettings()); + + handler.UseMongoDbTransaction(enabled: true); + + handler.Settings.Properties[MongoDbBuilderExt.PropertyMongoDbTransactionEnabled].Should().Be(true); + handler.Settings.Properties[MongoDbBuilderExt.PropertyMongoDbTransactionFilter].Should().BeNull(); + } + + [Fact] + public void When_UseMongoDbTransaction_Given_HandlerBuilder_Then_ReturnsSameBuilder() + { + var handler = new HandlerBuilder(new MessageBusSettings()); + + var result = handler.UseMongoDbTransaction(); + + result.Should().BeSameAs(handler); + } + + public record SampleMessage; + public record SampleRequest; + public record SampleResponse; +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MessageBusBuilderExtensionsRegistrationTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MessageBusBuilderExtensionsRegistrationTests.cs new file mode 100644 index 00000000..7f5bd125 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MessageBusBuilderExtensionsRegistrationTests.cs @@ -0,0 +1,121 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +using Microsoft.Extensions.DependencyInjection; +using MongoDB.Driver; +using SlimMessageBus.Host.Interceptor; +using SlimMessageBus.Host.Outbox.MongoDb.Interceptors; + +/// +/// Exercises the DI registration lambdas inside +/// by executing the relevant entry and +/// resolving each registered service from the built container. +/// +public class MessageBusBuilderExtensionsRegistrationTests +{ + private static ServiceProvider BuildProvider(MessageBusBuilder mbb, int actionIndex) + { + var mongoClientMock = new Mock(); + var mongoDatabaseMock = new Mock(); + var outboxCollMock = new Mock>(); + var lockCollMock = new Mock>(); + + mongoClientMock + .Setup(x => x.GetDatabase(It.IsAny(), It.IsAny())) + .Returns(mongoDatabaseMock.Object); + + mongoDatabaseMock + .Setup(x => x.GetCollection(It.IsAny(), It.IsAny())) + .Returns(outboxCollMock.Object); + + mongoDatabaseMock + .Setup(x => x.GetCollection(It.IsAny(), It.IsAny())) + .Returns(lockCollMock.Object); + + var services = new ServiceCollection(); + services.AddSingleton(mongoClientMock.Object); + services.AddSingleton(TimeProvider.System); + services.AddLogging(); + + // Execute only the AddOutboxUsingMongoDb PostConfigurationAction (not AddOutbox ones) + mbb.PostConfigurationActions[actionIndex](services); + + return services.BuildServiceProvider(); + } + + [Fact] + public async Task When_AddOutboxUsingMongoDb_Given_NoConfiguration_Then_DefaultServicesAreResolvable() + { + // arrange + var mbb = MessageBusBuilder.Create(); + int idx = mbb.PostConfigurationActions.Count; + mbb.AddOutboxUsingMongoDb(); + + await using var sp = BuildProvider(mbb, idx); + await using var scope = sp.CreateAsyncScope(); + var scoped = scope.ServiceProvider; + + // assert — singleton services + sp.GetRequiredService().Should().NotBeNull(); + sp.GetRequiredService().Should().NotBeNull(); + sp.GetRequiredService().Should().NotBeNull(); + sp.GetRequiredService>().Should().NotBeNull(); + sp.GetRequiredService>().Should().NotBeNull(); + + // assert — scoped services + scoped.GetRequiredService().Should().NotBeNull(); + scoped.GetRequiredService().Should().NotBeNull(); + scoped.GetRequiredService().Should().NotBeNull(); + scoped.GetRequiredService>().Should().NotBeNull(); + + // assert — transient service + sp.GetRequiredService().Should().BeOfType(); + } + + [Fact] + public async Task When_AddOutboxUsingMongoDb_Given_ConfigureCallback_Then_CustomSettingsApplied() + { + // arrange + var mbb = MessageBusBuilder.Create(); + int idx = mbb.PostConfigurationActions.Count; + mbb.AddOutboxUsingMongoDb(s => + { + s.MongoDbSettings.DatabaseName = "custom-db"; + s.MongoDbSettings.CollectionName = "custom-outbox"; + }); + + await using var sp = BuildProvider(mbb, idx); + + // assert + var settings = sp.GetRequiredService(); + settings.DatabaseName.Should().Be("custom-db"); + settings.CollectionName.Should().Be("custom-outbox"); + } + + [Fact] + public async Task When_AddOutboxUsingMongoDb_Given_ConsumerWithMongoDbTransactionEnabled_Then_InterceptorRegistered() + { + // arrange + var mbb = MessageBusBuilder.Create(); + mbb.Consume(x => x + .Path("test-topic") + .WithConsumer() + .UseMongoDbTransaction(enabled: true)); + + int idx = mbb.PostConfigurationActions.Count; + mbb.AddOutboxUsingMongoDb(); + + await using var sp = BuildProvider(mbb, idx); + + // assert — the MongoDbTransaction interceptor for SampleMessage should be registered + var interceptors = sp.GetServices>().ToList(); + interceptors.Should().ContainSingle(i => i is MongoDbTransactionConsumerInterceptor); + } + + // ── sample types ───────────────────────────────────────────────────────── + public record SampleMessage; + + public class SampleConsumer : IConsumer + { + public Task OnHandle(SampleMessage message, CancellationToken cancellationToken) => Task.CompletedTask; + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbCollection.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbCollection.cs new file mode 100644 index 00000000..239a7f81 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbCollection.cs @@ -0,0 +1,6 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +[CollectionDefinition(nameof(MongoDbCollection))] +public class MongoDbCollection : ICollectionFixture +{ +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbFixture.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbFixture.cs new file mode 100644 index 00000000..aee571f8 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbFixture.cs @@ -0,0 +1,49 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +using System.Text.RegularExpressions; + +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Configurations; + +using Testcontainers.MongoDb; + +public partial class MongoDbFixture : IAsyncLifetime +{ + [GeneratedRegex("waiting for connections", RegexOptions.IgnoreCase)] + private static partial Regex WaitingForConnectionsRegex(); + + private readonly MongoDbContainer _mongoDbContainer; + + public MongoDbFixture() + { + // mongo:8.0 uses structured JSON logging by default. The built-in wait + // strategy looks for "Waiting for connections" as a substring which is + // still present in the JSON log output ("msg":"Waiting for connections"). + // Use a case-insensitive regex to handle both log formats. + var builder = new MongoDbBuilder("mongo:8.0") + .WithWaitStrategy(Wait.ForUnixContainer() + .UntilMessageIsLogged(WaitingForConnectionsRegex())) + .WithReuse(true); + + if (Debugger.IsAttached) + { + builder = builder.WithPortBinding(27018, 27017); + } + + _mongoDbContainer = builder.Build(); + } + + public string GetConnectionString() => _mongoDbContainer.GetConnectionString(); + + public IMongoClient CreateClient() => new MongoClient(GetConnectionString()); + + public async Task InitializeAsync() + { + await _mongoDbContainer.StartAsync(); + } + + async Task IAsyncLifetime.DisposeAsync() + { + await _mongoDbContainer.DisposeAsync(); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxMessageRepositoryTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxMessageRepositoryTests.cs new file mode 100644 index 00000000..057eb237 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxMessageRepositoryTests.cs @@ -0,0 +1,465 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +using MongoDB.Driver; +using MongoDB.Driver.Core.Clusters; +using MongoDB.Driver.Core.Connections; +using MongoDB.Driver.Core.Servers; + +public class MongoDbOutboxMessageRepositoryTests +{ + private readonly Mock> _collectionMock = new(); + private readonly Mock> _lockCollectionMock = new(); + private readonly Mock _transactionServiceMock = new(); + private readonly FakeTimeProvider _timeProvider = new(new DateTimeOffset(2024, 1, 1, 0, 0, 0, TimeSpan.Zero)); + private readonly MongoDbOutboxMessageRepository _sut; + + public MongoDbOutboxMessageRepositoryTests() + { + _transactionServiceMock.SetupGet(x => x.CurrentSession).Returns((IClientSessionHandle?)null); + + _sut = new MongoDbOutboxMessageRepository( + NullLogger.Instance, + _timeProvider, + _collectionMock.Object, + _lockCollectionMock.Object, + _transactionServiceMock.Object); + } + + // ── helpers ────────────────────────────────────────────────────────────── + + private static Mock> CreateCursor(IReadOnlyList items) + { + var cursor = new Mock>(); + if (items.Count > 0) + { + cursor.SetupSequence(x => x.MoveNextAsync(It.IsAny())) + .ReturnsAsync(true) + .ReturnsAsync(false); + cursor.Setup(x => x.Current).Returns(items.ToList()); + } + else + { + cursor.Setup(x => x.MoveNextAsync(It.IsAny())).ReturnsAsync(false); + } + cursor.Setup(x => x.MoveNext(It.IsAny())).Returns(false); + return cursor; + } + + private static MongoDbOutboxDocument MakeDocument(Guid? id = null) => new() + { + Id = id ?? Guid.NewGuid(), + BusName = "bus1", + MessageType = "TestMessage", + MessagePayload = new byte[] { 1, 2, 3 }, + Headers = """{"h1":"v1"}""", + Path = "path1", + LockInstanceId = null, + LockExpiresOn = DateTime.MinValue, + DeliveryAttempt = 0, + DeliveryComplete = false, + DeliveryAborted = false, + Timestamp = DateTime.UtcNow + }; + + private static MongoDbOutboxMessage MakeMessage(Guid? id = null) => new() { Id = id ?? Guid.NewGuid() }; + + // ── Create ──────────────────────────────────────────────────────────────── + + [Fact] + public async Task When_Create_Given_NoActiveSession_Then_InsertsDocumentWithoutSession() + { + // arrange + _collectionMock + .Setup(x => x.InsertOneAsync( + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // act + var result = await _sut.Create("bus1", new Dictionary { ["k"] = "v" }, "path1", "MsgType", new byte[] { 1 }, CancellationToken.None); + + // assert + var msg = result.Should().BeOfType().Subject; + msg.BusName.Should().Be("bus1"); + msg.Path.Should().Be("path1"); + msg.MessageType.Should().Be("MsgType"); + _collectionMock.Verify(x => x.InsertOneAsync(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + [Fact] + public async Task When_Create_Given_ActiveSession_Then_InsertsDocumentWithSession() + { + // arrange + var sessionMock = new Mock(); + _transactionServiceMock.SetupGet(x => x.CurrentSession).Returns(sessionMock.Object); + + _collectionMock + .Setup(x => x.InsertOneAsync( + sessionMock.Object, + It.IsAny(), + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // act + await _sut.Create("bus1", new Dictionary(), "path1", "MsgType", new byte[] { 1 }, CancellationToken.None); + + // assert + _collectionMock.Verify(x => x.InsertOneAsync(sessionMock.Object, It.IsAny(), It.IsAny(), It.IsAny()), Times.Once); + } + + // ── UpdateToSent ────────────────────────────────────────────────────────── + + [Fact] + public async Task When_UpdateToSent_Given_EmptyList_Then_ReturnsImmediatelyWithoutDbCall() + { + await _sut.UpdateToSent([], CancellationToken.None); + + _collectionMock.VerifyNoOtherCalls(); + } + + [Fact] + public async Task When_UpdateToSent_Given_Messages_Then_UpdatesAllToComplete() + { + // arrange + var messages = new List { MakeMessage(), MakeMessage() }; + + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(2, 2, null)); + + // act + await _sut.UpdateToSent(messages, CancellationToken.None); + + // assert + _collectionMock.Verify(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny()), Times.Once); + } + + [Fact] + public async Task When_UpdateToSent_Given_ModifiedCountMismatch_Then_ThrowsMessageBusException() + { + // arrange + var messages = new List { MakeMessage(), MakeMessage() }; + + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(0, 0, null)); + + // act / assert + await ((Func)(() => _sut.UpdateToSent(messages, CancellationToken.None))) + .Should().ThrowAsync(); + } + + // ── AbortDelivery ───────────────────────────────────────────────────────── + + [Fact] + public async Task When_AbortDelivery_Given_EmptyList_Then_ReturnsImmediatelyWithoutDbCall() + { + await _sut.AbortDelivery([], CancellationToken.None); + + _collectionMock.VerifyNoOtherCalls(); + } + + [Fact] + public async Task When_AbortDelivery_Given_Messages_Then_MarksMessagesAsAborted() + { + // arrange + var messages = new List { MakeMessage(), MakeMessage() }; + + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(2, 2, null)); + + // act + await _sut.AbortDelivery(messages, CancellationToken.None); + + // assert + _collectionMock.Verify(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny()), Times.Once); + } + + [Fact] + public async Task When_AbortDelivery_Given_ModifiedCountMismatch_Then_ThrowsMessageBusException() + { + // arrange + var messages = new List { MakeMessage() }; + + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(0, 0, null)); + + // act / assert + await ((Func)(() => _sut.AbortDelivery(messages, CancellationToken.None))) + .Should().ThrowAsync(); + } + + // ── IncrementDeliveryAttempt ────────────────────────────────────────────── + + [Fact] + public async Task When_IncrementDeliveryAttempt_Given_EmptyList_Then_ReturnsImmediatelyWithoutDbCall() + { + await _sut.IncrementDeliveryAttempt([], maxDeliveryAttempts: 3, CancellationToken.None); + + _collectionMock.VerifyNoOtherCalls(); + } + + [Fact] + public async Task When_IncrementDeliveryAttempt_Given_InvalidMaxAttempts_Then_ThrowsArgumentOutOfRangeException() + { + var messages = new List { MakeMessage() }; + + await ((Func)(() => _sut.IncrementDeliveryAttempt(messages, maxDeliveryAttempts: 0, CancellationToken.None))) + .Should().ThrowAsync(); + } + + [Fact] + public async Task When_IncrementDeliveryAttempt_Given_MessagesWithinMaxAttempts_Then_IncrementsAttemptForAll() + { + // arrange + var messages = new List { MakeMessage(), MakeMessage() }; + + _collectionMock + .SetupSequence(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(2, 2, null)) // inc attempt + .ReturnsAsync(new UpdateResult.Acknowledged(0, 0, null)); // abort (none exceeded max) + + // act + await _sut.IncrementDeliveryAttempt(messages, maxDeliveryAttempts: 5, CancellationToken.None); + + // assert: two UpdateManyAsync calls + _collectionMock.Verify(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny()), Times.Exactly(2)); + } + + [Fact] + public async Task When_IncrementDeliveryAttempt_Given_IncModifiedCountMismatch_Then_ThrowsMessageBusException() + { + // arrange + var messages = new List { MakeMessage(), MakeMessage() }; + + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(0, 0, null)); + + // act / assert + await ((Func)(() => _sut.IncrementDeliveryAttempt(messages, maxDeliveryAttempts: 3, CancellationToken.None))) + .Should().ThrowAsync(); + } + + // ── RenewLock ───────────────────────────────────────────────────────────── + + [Fact] + public async Task When_RenewLock_Given_LockedItemsExist_Then_ReturnsTrue() + { + // arrange + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(3, 3, null)); + + // act + var result = await _sut.RenewLock("instance1", TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + result.Should().BeTrue(); + } + + [Fact] + public async Task When_RenewLock_Given_NoLockedItems_Then_ReturnsFalse() + { + // arrange + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(0, 0, null)); + + // act + var result = await _sut.RenewLock("instance1", TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + result.Should().BeFalse(); + } + + // ── DeleteSent ──────────────────────────────────────────────────────────── + + [Fact] + public async Task When_DeleteSent_Given_NoCandidates_Then_ReturnsZeroWithoutDeletion() + { + // arrange — FindAsync returns empty + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor([]).Object); + + // act + var deleted = await _sut.DeleteSent(_timeProvider.GetUtcNow(), batchSize: 10, CancellationToken.None); + + // assert + deleted.Should().Be(0); + _collectionMock.Verify(x => x.DeleteManyAsync(It.IsAny>(), It.IsAny()), Times.Never); + } + + [Fact] + public async Task When_DeleteSent_Given_CandidatesExist_Then_DeletesThemAndReturnsCount() + { + // arrange + var ids = new[] { Guid.NewGuid(), Guid.NewGuid() }; + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor(ids).Object); + + _collectionMock + .Setup(x => x.DeleteManyAsync(It.IsAny>(), It.IsAny())) + .ReturnsAsync(new DeleteResult.Acknowledged(2)); + + // act + var deleted = await _sut.DeleteSent(_timeProvider.GetUtcNow(), batchSize: 10, CancellationToken.None); + + // assert + deleted.Should().Be(2); + _collectionMock.Verify(x => x.DeleteManyAsync(It.IsAny>(), It.IsAny()), Times.Once); + } + + // ── LockAndSelect ───────────────────────────────────────────────────────── + + [Fact] + public async Task When_LockAndSelect_Given_NoTableLockAndNoCandidates_Then_ReturnsEmpty() + { + // arrange — FindAsync returns empty + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor([]).Object); + + // act + var result = await _sut.LockAndSelect("inst1", batchSize: 5, tableLock: false, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + result.Should().BeEmpty(); + } + + [Fact] + public async Task When_LockAndSelect_Given_NoTableLockAndCandidatesExist_Then_ReturnsLockedMessages() + { + // arrange + var candidateId = Guid.NewGuid(); + var doc = MakeDocument(candidateId); + + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor([candidateId]).Object); + + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(1, 1, null)); + + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor([doc]).Object); + + // act + var result = await _sut.LockAndSelect("inst1", batchSize: 5, tableLock: false, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + result.Should().HaveCount(1); + result.Single().Id.Should().Be(candidateId); + result.Single().BusName.Should().Be(doc.BusName); + result.Single().Path.Should().Be(doc.Path); + } + + [Fact] + public async Task When_LockAndSelect_Given_TableLockAcquiredViaUpdate_Then_ProceedsWithLockAndSelect() + { + // arrange — UpdateOneAsync on lockCollection returns modified=1 (lock acquired) + _lockCollectionMock + .Setup(x => x.UpdateOneAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(1, 1, null)); + + var candidateId = Guid.NewGuid(); + var doc = MakeDocument(candidateId); + + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor([candidateId]).Object); + + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(1, 1, null)); + + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor([doc]).Object); + + // act + var result = await _sut.LockAndSelect("inst1", batchSize: 5, tableLock: true, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + result.Should().HaveCount(1); + } + + [Fact] + public async Task When_LockAndSelect_Given_TableLockAcquiredViaInsert_Then_ProceedsWithLockAndSelect() + { + // arrange — UpdateOneAsync returns modified=0 (no existing lock doc), InsertOneAsync succeeds (we get the lock) + _lockCollectionMock + .Setup(x => x.UpdateOneAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(0, 0, null)); + + _lockCollectionMock + .Setup(x => x.InsertOneAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + var candidateId = Guid.NewGuid(); + var doc = MakeDocument(candidateId); + + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor([candidateId]).Object); + + _collectionMock + .Setup(x => x.UpdateManyAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(1, 1, null)); + + _collectionMock + .Setup(x => x.FindAsync(It.IsAny>(), It.IsAny>(), It.IsAny())) + .ReturnsAsync(CreateCursor([doc]).Object); + + // act + var result = await _sut.LockAndSelect("inst1", batchSize: 5, tableLock: true, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + result.Should().HaveCount(1); + } + + [Fact] + public async Task When_LockAndSelect_Given_TableLockNotAcquired_Then_ReturnsEmpty() + { + // arrange — UpdateOneAsync returns modified=0, InsertOneAsync throws DuplicateKey (another instance holds the lock) + _lockCollectionMock + .Setup(x => x.UpdateOneAsync(It.IsAny>(), It.IsAny>(), It.IsAny(), It.IsAny())) + .ReturnsAsync(new UpdateResult.Acknowledged(0, 0, null)); + + _lockCollectionMock + .Setup(x => x.InsertOneAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .ThrowsAsync(CreateDuplicateKeyException()); + + // act + var result = await _sut.LockAndSelect("inst1", batchSize: 5, tableLock: true, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + result.Should().BeEmpty(); + } + + /// + /// Creates a that looks like a duplicate-key error, + /// used to simulate a concurrent lock-document insert collision. + /// WriteError and its constructor are internal, so we use reflection. + /// + private static MongoWriteException CreateDuplicateKeyException() + { + // WriteError(ServerErrorCategory category, int code, string message, BsonDocument details) + var writeErrorCtor = typeof(WriteError).GetConstructors( + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)[0]; + + var writeError = (WriteError)writeErrorCtor.Invoke([ServerErrorCategory.DuplicateKey, 11000, "E11000 duplicate key", null]); + + // MongoWriteException(ConnectionId connectionId, WriteError writeError, WriteConcernError writeConcernError, Exception innerException) + var connId = new ConnectionId(new ServerId(new ClusterId(), new System.Net.DnsEndPoint("localhost", 27017))); + return new MongoWriteException(connId, writeError, null, null); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxMigrationServiceTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxMigrationServiceTests.cs new file mode 100644 index 00000000..3f13f1d0 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxMigrationServiceTests.cs @@ -0,0 +1,84 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +public class MongoDbOutboxMigrationServiceTests +{ + /// + /// Unit tests that do not require a running MongoDB instance. + /// + public class Unit + { + [Fact] + public async Task When_Migrate_Given_MigrationDisabled_Then_DoesNotTouchDatabase() + { + // arrange + var databaseMock = new Mock(); + var settings = new MongoDbSettings { EnableMigration = false }; + var sut = new MongoDbOutboxMigrationService( + NullLogger.Instance, + settings, + databaseMock.Object); + + // act + await sut.Migrate(CancellationToken.None); + + // assert — the database should never be touched + databaseMock.VerifyNoOtherCalls(); + } + } + + /// + /// Integration tests that require a running MongoDB instance. + /// + [Trait("Category", "Integration")] + [Trait("Transport", "Outbox.MongoDb")] + [Collection(nameof(MongoDbCollection))] + public class Integration(MongoDbFixture mongoDbFixture) : IAsyncLifetime + { + private IMongoClient _client = null!; + private IMongoDatabase _database = null!; + private MongoDbOutboxMigrationService _sut = null!; + private MongoDbSettings _dbSettings = null!; + + public async Task InitializeAsync() + { + var dbName = $"smb_migration_test_{Guid.NewGuid():N}"; + _dbSettings = new MongoDbSettings { DatabaseName = dbName }; + _client = mongoDbFixture.CreateClient(); + _database = _client.GetDatabase(dbName); + _sut = new MongoDbOutboxMigrationService( + NullLogger.Instance, + _dbSettings, + _database); + + await _sut.Migrate(CancellationToken.None); + } + + public async Task DisposeAsync() + { + await _client.DropDatabaseAsync(_dbSettings.DatabaseName); + _client.Dispose(); + } + + [Fact] + public async Task When_Migrate_Given_AlreadyApplied_Then_IsIdempotentAndDoesNotThrow() + { + // act — second call should detect already-applied migration and skip it + var act = () => _sut.Migrate(CancellationToken.None); + + // assert + await act.Should().NotThrowAsync(); + } + + [Fact] + public async Task When_Migrate_Given_AlreadyApplied_Then_IndexesStillExist() + { + // act — run again + await _sut.Migrate(CancellationToken.None); + + // assert — outbox collection indexes are still present + var outboxCollection = _database.GetCollection(_dbSettings.CollectionName); + var indexes = await outboxCollection.Indexes.List().ToListAsync(); + indexes.Count.Should().BeGreaterThan(1); // at least the default _id index + our custom ones + } + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxRepositoryTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxRepositoryTests.cs new file mode 100644 index 00000000..0524d094 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbOutboxRepositoryTests.cs @@ -0,0 +1,310 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +[Trait("Category", "Integration")] +[Trait("Transport", "Outbox.MongoDb")] +public static class MongoDbOutboxRepositoryTests +{ + [Trait("Category", "Integration")] + [Trait("Transport", "Outbox.MongoDb")] + public class SaveTests(MongoDbFixture mongoDbFixture) : BaseMongoDbOutboxRepositoryTest(mongoDbFixture) + { + [Fact] + public async Task SavedMessage_IsPersisted() + { + // arrange + var message = CreateOutboxMessages(1).Single(); + + // act + message.Id = ((MongoDbOutboxMessage)await _target.Create( + message.BusName, message.Headers, message.Path, message.MessageType, message.MessagePayload, CancellationToken.None)).Id; + + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + messages.Count.Should().Be(1); + var actual = messages.Single(); + actual.Id.Should().Be(message.Id); + actual.BusName.Should().Be(message.BusName); + actual.Headers.Should().BeEquivalentTo(message.Headers); + actual.Path.Should().Be(message.Path); + actual.MessageType.Should().Be(message.MessageType); + actual.MessagePayload.Should().BeEquivalentTo(message.MessagePayload); + } + } + + [Trait("Category", "Integration")] + [Trait("Transport", "Outbox.MongoDb")] + public class AbortDeliveryTests(MongoDbFixture mongoDbFixture) : BaseMongoDbOutboxRepositoryTest(mongoDbFixture) + { + [Fact] + public async Task ShouldUpdateStatus() + { + // arrange + var seed = await SeedOutbox(5); + var expected = seed.Take(3).ToList(); + + // act + await _target.AbortDelivery(expected, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + var actual = messages.Where(x => x.DeliveryAborted).ToList(); + actual.Select(x => x.Id).Should().BeEquivalentTo(expected.Select(x => x.Id)); + } + } + + [Trait("Category", "Integration")] + [Trait("Transport", "Outbox.MongoDb")] + public class DeleteSentTests(MongoDbFixture mongoDbFixture) : BaseMongoDbOutboxRepositoryTest(mongoDbFixture) + { + [Fact] + public async Task ExpiredItems_AreDeleted() + { + // arrange + var active = new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc); + var expired = active.AddDays(-1); + + var seedMessages = await SeedOutbox(10, (i, _) => + { +#pragma warning disable EXTEXP0004 + _currentTimeProvider.AdjustTime(i < 5 ? expired : active); +#pragma warning restore EXTEXP0004 + }); + + // mark the first 5 messages as sent + await _target.UpdateToSent(seedMessages.Take(5).ToList(), CancellationToken.None); + + // act + await _target.DeleteSent(active, 10, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + messages.Should().OnlyContain(x => x.Timestamp == active); + } + + [Fact] + public async Task BatchSize_IsRespected() + { + const int batchSize = 10; + const int messageCount = batchSize * 2; + const int expectedRemainingMessages = messageCount - batchSize; + + // arrange + var seedMessages = await SeedOutbox(messageCount); + await _target.UpdateToSent([.. seedMessages], CancellationToken.None); + + // advance time so messages qualify for deletion + _currentTimeProvider.Advance(TimeSpan.FromDays(1)); + + // act + var actualDeletedCount = await _target.DeleteSent(_currentTimeProvider.GetUtcNow(), batchSize, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + actualDeletedCount.Should().Be(batchSize); + messages.Count.Should().Be(expectedRemainingMessages); + } + } + + [Trait("Category", "Integration")] + [Trait("Transport", "Outbox.MongoDb")] + public class LockAndSelectTests(MongoDbFixture mongoDbFixture) : BaseMongoDbOutboxRepositoryTest(mongoDbFixture) + { + [Fact] + public async Task TableLock_RestrictsConcurrentLocks() + { + const int batchSize = 10; + const string instance1 = "1"; + const string instance2 = "2"; + + await SeedOutbox(batchSize * 2); + + var items1 = await _target.LockAndSelect(instance1, batchSize, tableLock: true, TimeSpan.FromMinutes(1), CancellationToken.None); + var items2 = await _target.LockAndSelect(instance2, batchSize, tableLock: true, TimeSpan.FromMinutes(1), CancellationToken.None); + + items1.Count.Should().Be(batchSize); + items2.Count.Should().Be(0); + } + + [Fact] + public async Task NoTableLock_AllowsConcurrentLocks() + { + const int batchSize = 10; + const string instance1 = "1"; + const string instance2 = "2"; + + await SeedOutbox(batchSize * 2); + + var items1 = await _target.LockAndSelect(instance1, batchSize, tableLock: false, TimeSpan.FromMinutes(1), CancellationToken.None); + var items2 = await _target.LockAndSelect(instance2, batchSize, tableLock: false, TimeSpan.FromMinutes(1), CancellationToken.None); + + items1.Count.Should().Be(batchSize); + items2.Count.Should().Be(batchSize); + } + + [Fact] + public async Task AbortedMessages_AreNotIncluded() + { + // arrange + var seed = await SeedOutbox(5); + var aborted = seed.Take(3).ToList(); + var abortedIds = aborted.Select(x => x.Id).ToList(); + + await _target.AbortDelivery(aborted, CancellationToken.None); + + // act + var actual = await _target.LockAndSelect("123", 10, tableLock: false, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + actual.Select(x => x.Id).Should().NotContain(abortedIds); + } + + [Fact] + public async Task SentMessages_AreNotIncluded() + { + // arrange + var seed = await SeedOutbox(5); + var sent = seed.Take(3).ToList(); + var sentIds = sent.Select(x => x.Id).ToList(); + + await _target.UpdateToSent(sent, CancellationToken.None); + + // act + var actual = await _target.LockAndSelect("123", 10, tableLock: false, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + actual.Select(x => x.Id).Should().NotContain(sentIds); + } + } + + [Trait("Category", "Integration")] + [Trait("Transport", "Outbox.MongoDb")] + public class IncrementDeliveryAttemptTests(MongoDbFixture mongoDbFixture) : BaseMongoDbOutboxRepositoryTest(mongoDbFixture) + { + [Fact] + public async Task WithinMaxAttempts_DoesNotAbortDelivery() + { + // arrange + const int maxAttempts = 2; + var seed = await SeedOutbox(5); + var failedMessages = seed.Take(3).ToList(); + var failedIds = failedMessages.Select(x => x.Id).ToList(); + + // act + await _target.IncrementDeliveryAttempt(failedMessages, maxAttempts, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + messages.Should().OnlyContain(x => !x.DeliveryComplete); + messages.Should().OnlyContain(x => !x.DeliveryAborted); + messages.Where(x => !failedIds.Contains(x.Id)).Should().OnlyContain(x => x.DeliveryAttempt == 0); + messages.Where(x => failedIds.Contains(x.Id)).Should().OnlyContain(x => x.DeliveryAttempt == 1); + } + + [Fact] + public async Task BreachingMaxAttempts_AbortsDelivery() + { + // arrange + const int maxAttempts = 1; + var seed = await SeedOutbox(5); + var failedMessages = seed.Take(3).ToList(); + var failedIds = failedMessages.Select(x => x.Id).ToList(); + + // act - call twice to exceed max + await _target.IncrementDeliveryAttempt(failedMessages, maxAttempts, CancellationToken.None); + await _target.IncrementDeliveryAttempt(failedMessages, maxAttempts, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + messages.Should().OnlyContain(x => !x.DeliveryComplete); + + var attempted = messages.Where(x => failedIds.Contains(x.Id)).ToList(); + attempted.Should().OnlyContain(x => x.DeliveryAttempt == 2); + attempted.Should().OnlyContain(x => x.DeliveryAborted); + + var notAttempted = messages.Where(x => !failedIds.Contains(x.Id)).ToList(); + notAttempted.Should().OnlyContain(x => x.DeliveryAttempt == 0); + notAttempted.Should().OnlyContain(x => !x.DeliveryAborted); + } + } + + [Trait("Category", "Integration")] + [Trait("Transport", "Outbox.MongoDb")] + public class UpdateToSentTests(MongoDbFixture mongoDbFixture) : BaseMongoDbOutboxRepositoryTest(mongoDbFixture) + { + [Fact] + public async Task ShouldUpdateStatus() + { + // arrange + var seed = await SeedOutbox(5); + var expected = seed.Take(3).ToList(); + + // act + await _target.UpdateToSent(expected, CancellationToken.None); + var messages = await _target.GetAllMessages(CancellationToken.None); + + // assert + var actual = messages.Where(x => x.DeliveryComplete).ToList(); + actual.Select(x => x.Id).Should().BeEquivalentTo(expected.Select(x => x.Id)); + } + } + + [Trait("Category", "Integration")] + [Trait("Transport", "Outbox.MongoDb")] + public class RenewLockTests(MongoDbFixture mongoDbFixture) : BaseMongoDbOutboxRepositoryTest(mongoDbFixture) + { + [Fact] + public async Task WithinLock_ExtendsLockTimeout() + { + // arrange + const int batchSize = 10; + const string instanceId = "1"; + await SeedOutbox(batchSize); + + var lockedItems = await _target.LockAndSelect(instanceId, batchSize, tableLock: true, TimeSpan.FromSeconds(10), CancellationToken.None); + var lockedIds = lockedItems.Select(x => x.Id).ToList(); + + var before = await _target.GetAllMessages(CancellationToken.None); + var originalLock = before.Min(x => x.LockExpiresOn); + + // act + await _target.RenewLock(instanceId, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + var after = await _target.GetAllMessages(CancellationToken.None); + var actual = after.Where(x => lockedIds.Contains(x.Id)); + actual.Should().OnlyContain(x => x.LockExpiresOn > originalLock); + } + + [Fact] + public async Task HasLockedItemsToRenew_ReturnsTrue() + { + // arrange + const int batchSize = 10; + const string instanceId = "1"; + await SeedOutbox(batchSize); + + await _target.LockAndSelect(instanceId, batchSize, tableLock: true, TimeSpan.FromSeconds(10), CancellationToken.None); + + // act + var actual = await _target.RenewLock(instanceId, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + actual.Should().BeTrue(); + } + + [Fact] + public async Task HasNoLockedItemsToRenew_ReturnsFalse() + { + // arrange + const string instanceId = "1"; + await SeedOutbox(10); + + // act + var actual = await _target.RenewLock(instanceId, TimeSpan.FromMinutes(1), CancellationToken.None); + + // assert + actual.Should().BeFalse(); + } + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbTransactionConsumerInterceptorTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbTransactionConsumerInterceptorTests.cs new file mode 100644 index 00000000..d08c5ef2 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbTransactionConsumerInterceptorTests.cs @@ -0,0 +1,65 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +using Microsoft.Extensions.Logging; + +using SlimMessageBus.Host.Outbox.MongoDb.Interceptors; + +public class MongoDbTransactionConsumerInterceptorTests +{ + private readonly Mock> _loggerMock = new(); + private readonly Mock _transactionServiceMock = new(); + private readonly Mock _contextMock = new(); + private readonly MongoDbTransactionConsumerInterceptor _sut; + + public MongoDbTransactionConsumerInterceptorTests() + { + _sut = new MongoDbTransactionConsumerInterceptor( + _loggerMock.Object, + _transactionServiceMock.Object); + } + + [Fact] + public async Task When_OnHandle_Given_NextSucceeds_Then_BeginsThenCommitsTransaction() + { + // arrange + var message = new SampleMessage(); + var expectedResult = new object(); + + _transactionServiceMock.Setup(x => x.BeginTransaction()).Returns(Task.CompletedTask); + _transactionServiceMock.Setup(x => x.CommitTransaction()).Returns(Task.CompletedTask); + + // act + var result = await _sut.OnHandle(message, () => Task.FromResult(expectedResult), _contextMock.Object); + + // assert + result.Should().BeSameAs(expectedResult); + _transactionServiceMock.Verify(x => x.BeginTransaction(), Times.Once); + _transactionServiceMock.Verify(x => x.CommitTransaction(), Times.Once); + _transactionServiceMock.Verify(x => x.RollbackTransaction(), Times.Never); + } + + [Fact] + public async Task When_OnHandle_Given_NextThrows_Then_RollsBackAndRethrows() + { + // arrange + var message = new SampleMessage(); + var exception = new InvalidOperationException("consumer failed"); + + _transactionServiceMock.Setup(x => x.BeginTransaction()).Returns(Task.CompletedTask); + _transactionServiceMock.Setup(x => x.RollbackTransaction()).Returns(Task.CompletedTask); + + // act + var act = async () => await _sut.OnHandle( + message, + () => throw exception, + _contextMock.Object); + + // assert + await act.Should().ThrowAsync().WithMessage("consumer failed"); + _transactionServiceMock.Verify(x => x.BeginTransaction(), Times.Once); + _transactionServiceMock.Verify(x => x.RollbackTransaction(), Times.Once); + _transactionServiceMock.Verify(x => x.CommitTransaction(), Times.Never); + } + + public record SampleMessage; +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbTransactionServiceTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbTransactionServiceTests.cs new file mode 100644 index 00000000..e18a663e --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/MongoDbTransactionServiceTests.cs @@ -0,0 +1,59 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +public class MongoDbTransactionServiceTests +{ + private readonly Mock _clientMock = new(); + private readonly Mock _sessionMock = new(); + private readonly MongoDbSessionHolder _sessionHolder = new(); + private readonly MongoDbTransactionService _sut; + + public MongoDbTransactionServiceTests() + { + _clientMock + .Setup(x => x.StartSessionAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(_sessionMock.Object); + + _sut = new MongoDbTransactionService(_clientMock.Object, _sessionHolder); + } + + [Fact] + public async Task When_BeginTransaction_Given_NoActiveSession_Then_StartsSessionAndTransaction() + { + await _sut.BeginTransaction(); + + _clientMock.Verify(x => x.StartSessionAsync(It.IsAny(), It.IsAny()), Times.Once); + _sessionMock.Verify(x => x.StartTransaction(It.IsAny()), Times.Once); + _sut.CurrentSession.Should().BeSameAs(_sessionMock.Object); + _sessionHolder.Session.Should().BeSameAs(_sessionMock.Object); + } + + [Fact] + public async Task When_CommitTransaction_Given_ActiveTransaction_Then_CommitsSessionAndCleansUp() + { + _sessionMock + .Setup(x => x.CommitTransactionAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + await _sut.BeginTransaction(); + await _sut.CommitTransaction(); + + _sessionMock.Verify(x => x.CommitTransactionAsync(It.IsAny()), Times.Once); + _sessionHolder.Session.Should().BeNull(); + _sut.CurrentSession.Should().BeNull(); + } + + [Fact] + public async Task When_RollbackTransaction_Given_ActiveTransaction_Then_AbortsSessionAndCleansUp() + { + _sessionMock + .Setup(x => x.AbortTransactionAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + await _sut.BeginTransaction(); + await _sut.RollbackTransaction(); + + _sessionMock.Verify(x => x.AbortTransactionAsync(It.IsAny()), Times.Once); + _sessionHolder.Session.Should().BeNull(); + _sut.CurrentSession.Should().BeNull(); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/ObjectToInferredTypesConverterTests.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/ObjectToInferredTypesConverterTests.cs new file mode 100644 index 00000000..677393eb --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/ObjectToInferredTypesConverterTests.cs @@ -0,0 +1,79 @@ +namespace SlimMessageBus.Host.Outbox.MongoDb.Test; + +public class ObjectToInferredTypesConverterTests +{ + private static readonly JsonSerializerOptions _options = new() + { + Converters = { new ObjectToInferredTypesConverter() } + }; + + [Fact] + public void When_Read_Given_True_Then_ReturnsTrue() + { + var result = JsonSerializer.Deserialize("true", _options); + result.Should().Be(true); + } + + [Fact] + public void When_Read_Given_False_Then_ReturnsFalse() + { + var result = JsonSerializer.Deserialize("false", _options); + result.Should().Be(false); + } + + [Fact] + public void When_Read_Given_Integer_Then_ReturnsLong() + { + var result = JsonSerializer.Deserialize("42", _options); + result.Should().Be(42L); + } + + [Fact] + public void When_Read_Given_DecimalNumber_Then_ReturnsDouble() + { + var result = JsonSerializer.Deserialize("3.14", _options); + result.Should().BeOfType().Which.Should().BeApproximately(3.14, 0.001); + } + + [Fact] + public void When_Read_Given_DateTimeString_Then_ReturnsDateTime() + { + var result = JsonSerializer.Deserialize("\"2024-01-15T10:30:00\"", _options); + result.Should().BeOfType().Which.Should().Be(new DateTime(2024, 1, 15, 10, 30, 0)); + } + + [Fact] + public void When_Read_Given_RegularString_Then_ReturnsString() + { + var result = JsonSerializer.Deserialize("\"hello world\"", _options); + result.Should().Be("hello world"); + } + + [Fact] + public void When_Read_Given_JsonObject_Then_ReturnsJsonElement() + { + var result = JsonSerializer.Deserialize("{\"key\":\"value\"}", _options); + result.Should().BeOfType(); + } + + [Fact] + public void When_Write_Given_HeterogeneousDictionary_Then_RoundTripsCorrectly() + { + // arrange + var dict = new Dictionary + { + ["str"] = "hello", + ["num"] = 99L, + ["flag"] = true + }; + + // act + var json = JsonSerializer.Serialize(dict, _options); + + // assert: round-trip the values back + var roundTripped = JsonSerializer.Deserialize>(json, _options); + roundTripped!["str"].Should().Be("hello"); + roundTripped["num"].Should().Be(99L); + roundTripped["flag"].Should().Be(true); + } +} diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/SlimMessageBus.Host.Outbox.MongoDb.Test.csproj b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/SlimMessageBus.Host.Outbox.MongoDb.Test.csproj new file mode 100644 index 00000000..eefcdfda --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/SlimMessageBus.Host.Outbox.MongoDb.Test.csproj @@ -0,0 +1,22 @@ + + + + + + enable + + + + + + + + + + + + + + + + diff --git a/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/Usings.cs b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/Usings.cs new file mode 100644 index 00000000..80e5fb38 --- /dev/null +++ b/src/Tests/SlimMessageBus.Host.Outbox.MongoDb.Test/Usings.cs @@ -0,0 +1,24 @@ +global using System.Diagnostics; +global using System.Text.Json; + +global using Moq; +global using System.Threading.Tasks; + +global using AutoFixture; + +global using AwesomeAssertions; + +global using Microsoft.Extensions.Logging.Abstractions; +global using Microsoft.Extensions.Time.Testing; + +global using MongoDB.Bson; +global using MongoDB.Driver; + +global using SlimMessageBus; +global using SlimMessageBus.Host; +global using SlimMessageBus.Host.Outbox.MongoDb.Configuration; +global using SlimMessageBus.Host.Outbox.MongoDb.Repositories; +global using SlimMessageBus.Host.Outbox.MongoDb.Services; +global using SlimMessageBus.Host.Outbox.MongoDb.Transactions; + +global using Xunit;