diff --git a/.gitignore b/.gitignore index 78eec43..5e26958 100644 --- a/.gitignore +++ b/.gitignore @@ -278,3 +278,7 @@ deploy/ssh-password.txt deploy/github-api-key.txt deploy/_run_all_log.txt **/**/Logs/*.json + + +# AI Documentations +ai-docs/ \ No newline at end of file diff --git a/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs b/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs index 30f5fdc..95b36bd 100644 --- a/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs +++ b/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs @@ -104,7 +104,7 @@ public async override Task OnInvokeAsync(MethodInterceptionArgs args) throw; } } - + private UnitOfWorkOptions CreateOptions() { var options = new UnitOfWorkOptions diff --git a/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs b/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs new file mode 100644 index 0000000..4cc142b --- /dev/null +++ b/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs @@ -0,0 +1,20 @@ +using System.Collections.Generic; +using BBT.Aether.Events; + +namespace BBT.Aether.Uow; + +/// +/// Interface for local transactions that support event enqueueing. +/// Allows DbContext to push events directly into its owning local transaction, +/// ensuring events are routed to the correct Unit of Work regardless of ambient context. +/// +public interface ILocalTransactionEventEnqueuer +{ + /// + /// Enqueues domain events that were collected by DbContext during SaveChanges. + /// Events pushed here will be dispatched when the owning Unit of Work commits. + /// + /// The events to enqueue + void EnqueueEvents(IEnumerable events); +} + diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs index ed9ac1c..27354e4 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs @@ -33,6 +33,7 @@ public sealed class BackgroundJobService( : IBackgroundJobService { private const string Source = "urn:background-job"; + /// public async Task EnqueueAsync( string handlerName, @@ -54,12 +55,13 @@ public async Task EnqueueAsync( if (string.IsNullOrWhiteSpace(schedule)) throw new ArgumentNullException(nameof(schedule)); - logger.LogInformation("Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", + logger.LogInformation( + "Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", handlerName, jobName, schedule); // Create job entity var jobId = guidGenerator.Create(); - + // Convert metadata to nullable dictionary for ExtraPropertyDictionary var extraProperties = new ExtraPropertyDictionary(); if (metadata != null) @@ -69,7 +71,7 @@ public async Task EnqueueAsync( extraProperties[kvp.Key] = kvp.Value; } } - + var envelope = new CloudEventEnvelope { Type = handlerName, @@ -78,7 +80,7 @@ public async Task EnqueueAsync( Schema = currentSchema.Name, DataContentType = "application/json" }; - + var jobInfo = new BackgroundJobInfo(jobId, handlerName, jobName) { ExpressionValue = schedule, @@ -90,26 +92,38 @@ public async Task EnqueueAsync( // Serialize envelope for scheduler var payloadBytes = eventSerializer.Serialize(envelope); - await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); + await using var uow = await uowManager.BeginAsync( + options: new UnitOfWorkOptions { Scope = UnitOfWorkScopeOption.RequiresNew }, + cancellationToken: cancellationToken); try { // Save to job store await jobStore.SaveAsync(jobInfo, cancellationToken); - - // Commit transaction + await uow.SaveChangesAsync(cancellationToken); + // Register scheduler to run AFTER commit is fully persisted to DB + // This prevents race condition where scheduler triggers before DB write completes + uow.OnCompleted(async _ => + { + await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); + + logger.LogInformation( + "Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", + handlerName, jobName, jobId); + }); + + // Commit transaction - OnCompleted handlers run after this completes await uow.CommitAsync(cancellationToken); - - // Schedule with configured scheduler (Dapr, Quartz, etc.) - await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); - - logger.LogInformation("Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", + + logger.LogInformation( + "Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", handlerName, jobName, jobId); return jobId; } catch (Exception ex) { - logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName); + logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, + jobName); await uow.RollbackAsync(cancellationToken); throw; } @@ -155,7 +169,8 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can // Reschedule with new schedule var payloadBytes = eventSerializer.Serialize(envelope); - await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, cancellationToken); + await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, + cancellationToken); // Save updated entity await jobStore.SaveAsync(jobInfo, cancellationToken); @@ -196,7 +211,8 @@ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken); // Update status to Cancelled - await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, cancellationToken: cancellationToken); + await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, + cancellationToken: cancellationToken); // Commit transaction await uow.CommitAsync(cancellationToken); @@ -211,5 +227,4 @@ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken throw; } } -} - +} \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs index 647f1ee..eb5b752 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs @@ -5,6 +5,7 @@ using BBT.Aether.Domain.Repositories; using BBT.Aether.Events; using BBT.Aether.MultiSchema; +using BBT.Aether.Uow; using Microsoft.Extensions.Logging; namespace BBT.Aether.BackgroundJob.Dapr; @@ -20,6 +21,7 @@ public sealed class DaprJobExecutionBridge( IJobStore jobStore, ICurrentSchema currentSchema, IEventSerializer eventSerializer, + IUnitOfWorkManager unitOfWorkManager, ILogger logger) : IJobExecutionBridge { @@ -39,7 +41,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can { currentSchema.Set(envelope.Schema); } - + var argsBytes = eventSerializer.Serialize(envelope.Data); argsPayload = new ReadOnlyMemory(argsBytes); } @@ -47,16 +49,27 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can { argsPayload = payload; } - - var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); - if (jobInfo == null) + + await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken); + try + { + var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); + if (jobInfo == null) + { + logger.LogError("Job with name '{JobName}' not found in store", jobName); + throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); + } + + // Dispatch to handler using the handler name from job entity + await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); + + await uow.CommitAsync(cancellationToken); + } + catch (Exception) { - logger.LogError("Job with name '{JobName}' not found in store", jobName); - throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); + await uow.RollbackAsync(cancellationToken); + throw; } - - // Dispatch to handler using the handler name from job entity - await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); } catch (Exception ex) { @@ -74,7 +87,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can try { var envelope = eventSerializer.Deserialize(payload); - + // Validate it's actually an envelope by checking required properties if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type)) { @@ -89,5 +102,4 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can return null; } } -} - +} \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs index 19b0a02..28c38c9 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs @@ -46,7 +46,6 @@ await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed, return; } - await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); try { // Update status to Running @@ -59,21 +58,18 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running, await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed, clock.UtcNow, cancellationToken: cancellationToken); - // Commit all changes together - await uow.CommitAsync(cancellationToken); - logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); - await HandleJobCancellationAsync(uow, jobId, cancellationToken); + await HandleJobCancellationAsync(jobId, cancellationToken); throw; } catch (Exception ex) { logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId); - await HandleJobFailureAsync(uow, jobId, ex, cancellationToken); + await HandleJobFailureAsync(jobId, ex, cancellationToken); throw; } } @@ -133,25 +129,23 @@ private async Task UpdateStatusWithinUowAsync( /// /// Handles job cancellation by updating status within the existing UoW. /// - private async Task HandleJobCancellationAsync(IUnitOfWork uow, Guid jobId, CancellationToken cancellationToken) + private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken) { try { await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled, clock.UtcNow, "Job was cancelled", cancellationToken); - await uow.CommitAsync(cancellationToken); } catch (Exception ex) { logger.LogError(ex, "Failed to update job status to Cancelled"); - await uow.RollbackAsync(cancellationToken); } } /// /// Handles job failure by updating status within the existing UoW. /// - private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception exception, + private async Task HandleJobFailureAsync(Guid jobId, Exception exception, CancellationToken cancellationToken) { try @@ -161,12 +155,10 @@ private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, clock.UtcNow, errorMessage, cancellationToken); - await uow.CommitAsync(cancellationToken); } catch (Exception ex) { logger.LogError(ex, "Failed to update job status to Failed"); - await uow.RollbackAsync(cancellationToken); } } diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs index 3665e4f..27e51c6 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs @@ -10,6 +10,7 @@ using BBT.Aether.Domain.Entities; using BBT.Aether.Domain.EntityFrameworkCore.Modeling; using BBT.Aether.Events; +using BBT.Aether.Uow; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata; @@ -21,6 +22,14 @@ public abstract class AetherDbContext( : DbContext(options) where TDbContext : DbContext { + /// + /// Gets or sets the local transaction event enqueuer. + /// When set by EfCoreTransactionSource, events will be routed directly to the owning + /// local transaction instead of relying on ambient Unit of Work context. + /// This ensures events reach the correct UoW even when SaveChanges is called directly on DbContext. + /// + public ILocalTransactionEventEnqueuer? LocalEventEnqueuer { get; set; } + private readonly static MethodInfo ConfigureBasePropertiesMethodInfo = typeof(AetherDbContext) .GetMethod( @@ -271,24 +280,35 @@ public void ClearDomainEvents() } /// - /// Publishes collected domain events to the sink (Unit of Work). - /// Called automatically during SaveChanges to push events into the UoW transaction queue. + /// Publishes collected domain events to the appropriate destination. + /// Priority order: + /// 1. LocalEventEnqueuer (direct link to owning local transaction - most reliable) + /// 2. eventSink (ambient UoW fallback - for backward compatibility) + /// This ensures events reach the correct UoW regardless of how SaveChanges is called. /// private void PublishDomainEventsToSink() { - if (eventSink is null) + var domainEvents = CollectDomainEvents(); + if (domainEvents.Count == 0) { - // No sink configured - events will be collected explicitly by UoW (backward compatibility) return; } - var domainEvents = CollectDomainEvents(); - if (domainEvents.Count == 0) + // Priority 1: Direct link to owning local transaction + // This is the most reliable path - events go directly to the transaction that owns this DbContext + if (LocalEventEnqueuer is not null) { + LocalEventEnqueuer.EnqueueEvents(domainEvents); + ClearDomainEvents(); return; } - eventSink.EnqueueDomainEvents(domainEvents); - ClearDomainEvents(); + // Priority 2: Ambient UoW fallback (backward compatibility) + // Uses unitOfWorkManager.Current which may not always point to the correct UoW + if (eventSink is not null) + { + eventSink.EnqueueDomainEvents(domainEvents); + ClearDomainEvents(); + } } } \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs index 64652c3..c60b447 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs @@ -192,6 +192,7 @@ public async Task CommitAsync(CancellationToken cancellationToken = default) try { + await SaveChangesAsync(cancellationToken); var strategy = domainEventOptions?.DispatchStrategy ?? DomainEventDispatchStrategy.AlwaysUseOutbox; if (strategy == DomainEventDispatchStrategy.AlwaysUseOutbox) diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs index 5d40404..f0f8e74 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs @@ -39,19 +39,26 @@ public async Task CreateTransactionAsync( : await dbContext.Database.BeginTransactionAsync(cancellationToken); } - return new EfCoreLocalTransaction(dbContext, transaction); + var localTx = new EfCoreLocalTransaction(dbContext, transaction); + + // CRITICAL: Establish direct link between DbContext and LocalTransaction + // This ensures events are routed to the correct UoW regardless of ambient context + dbContext.LocalEventEnqueuer = localTx; + + return localTx; } /// /// Local transaction implementation for EF Core. /// Supports lazy transaction escalation and domain event collection. - /// Events are automatically pushed via IDomainEventSink during SaveChanges. + /// Events are pushed directly from DbContext via LocalEventEnqueuer during SaveChanges. /// private sealed class EfCoreLocalTransaction( AetherDbContext context, IDbContextTransaction? transaction) : ILocalTransaction, ITransactionalLocal, ISupportsSaveChanges, IAsyncDisposable, ILocalTransactionEventEnqueuer { + private readonly AetherDbContext _context = context; private IDbContextTransaction? _transaction = transaction; private readonly List _collectedEvents = new(); @@ -72,8 +79,8 @@ public async Task EnsureTransactionAsync(IsolationLevel? isolationLevel = null, // Begin transaction with specified or default isolation level _transaction = isolationLevel.HasValue - ? await context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken) - : await context.Database.BeginTransactionAsync(cancellationToken); + ? await _context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken) + : await _context.Database.BeginTransactionAsync(cancellationToken); } /// @@ -111,18 +118,21 @@ public async Task RollbackAsync(CancellationToken cancellationToken = default) /// public void ClearCollectedEvents() { - context.ClearDomainEvents(); + _context.ClearDomainEvents(); _collectedEvents.Clear(); } public async Task SaveChangesAsync(CancellationToken cancellationToken = default) { - await context.SaveChangesAsync(cancellationToken); + await _context.SaveChangesAsync(cancellationToken); } /// public async ValueTask DisposeAsync() { + // Clear the direct link between DbContext and this transaction + _context.LocalEventEnqueuer = null; + if (_transaction != null) { await _transaction.DisposeAsync(); @@ -131,16 +141,3 @@ public async ValueTask DisposeAsync() } } } - -/// -/// Internal interface for local transactions that support event enqueueing. -/// Allows the sink to push events into transactions. -/// -internal interface ILocalTransactionEventEnqueuer -{ - /// - /// Enqueues events that were collected by DbContext during SaveChanges. - /// - /// The events to enqueue - void EnqueueEvents(IEnumerable events); -}