From 1c2a6552032c46a87165b60c31eaeec9ba545cb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tayfun=20Y=C4=B1lmaz?= Date: Tue, 9 Dec 2025 14:05:43 +0300 Subject: [PATCH] Improve event routing and transaction handling in UoW and background jobs Introduces ILocalTransactionEventEnqueuer to enable direct event routing from DbContext to the owning local transaction, ensuring domain events are dispatched to the correct Unit of Work. Refactors background job scheduling to use OnCompleted for post-commit scheduling, adds explicit transaction management in DaprJobExecutionBridge, and removes ambient UoW reliance in JobDispatcher. Updates .gitignore to exclude ai-docs/. --- .gitignore | 4 ++ .../Aether/Aspects/Uow/UnitOfWorkAttribute.cs | 2 +- .../Uow/ILocalTransactionEventEnqueuer.cs | 20 ++++++++ .../BackgroundJob/BackgroundJobService.cs | 49 ++++++++++++------- .../Dapr/DaprJobExecutionBridge.cs | 36 +++++++++----- .../BBT/Aether/BackgroundJob/JobDispatcher.cs | 16 ++---- .../EntityFrameworkCore/AetherDbContext.cs | 36 +++++++++++--- .../BBT/Aether/Uow/CompositeUnitOfWork.cs | 1 + .../EfCoreTransactionSource.cs | 35 ++++++------- 9 files changed, 130 insertions(+), 69 deletions(-) create mode 100644 framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs diff --git a/.gitignore b/.gitignore index 78eec43..5e26958 100644 --- a/.gitignore +++ b/.gitignore @@ -278,3 +278,7 @@ deploy/ssh-password.txt deploy/github-api-key.txt deploy/_run_all_log.txt **/**/Logs/*.json + + +# AI Documentations +ai-docs/ \ No newline at end of file diff --git a/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs b/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs index 30f5fdc..95b36bd 100644 --- a/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs +++ b/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs @@ -104,7 +104,7 @@ public async override Task OnInvokeAsync(MethodInterceptionArgs args) throw; } } - + private UnitOfWorkOptions CreateOptions() { var options = new UnitOfWorkOptions diff --git a/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs b/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs new file mode 100644 index 0000000..4cc142b --- /dev/null +++ b/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs @@ -0,0 +1,20 @@ +using System.Collections.Generic; +using BBT.Aether.Events; + +namespace BBT.Aether.Uow; + +/// +/// Interface for local transactions that support event enqueueing. +/// Allows DbContext to push events directly into its owning local transaction, +/// ensuring events are routed to the correct Unit of Work regardless of ambient context. +/// +public interface ILocalTransactionEventEnqueuer +{ + /// + /// Enqueues domain events that were collected by DbContext during SaveChanges. + /// Events pushed here will be dispatched when the owning Unit of Work commits. + /// + /// The events to enqueue + void EnqueueEvents(IEnumerable events); +} + diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs index ed9ac1c..27354e4 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs @@ -33,6 +33,7 @@ public sealed class BackgroundJobService( : IBackgroundJobService { private const string Source = "urn:background-job"; + /// public async Task EnqueueAsync( string handlerName, @@ -54,12 +55,13 @@ public async Task EnqueueAsync( if (string.IsNullOrWhiteSpace(schedule)) throw new ArgumentNullException(nameof(schedule)); - logger.LogInformation("Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", + logger.LogInformation( + "Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", handlerName, jobName, schedule); // Create job entity var jobId = guidGenerator.Create(); - + // Convert metadata to nullable dictionary for ExtraPropertyDictionary var extraProperties = new ExtraPropertyDictionary(); if (metadata != null) @@ -69,7 +71,7 @@ public async Task EnqueueAsync( extraProperties[kvp.Key] = kvp.Value; } } - + var envelope = new CloudEventEnvelope { Type = handlerName, @@ -78,7 +80,7 @@ public async Task EnqueueAsync( Schema = currentSchema.Name, DataContentType = "application/json" }; - + var jobInfo = new BackgroundJobInfo(jobId, handlerName, jobName) { ExpressionValue = schedule, @@ -90,26 +92,38 @@ public async Task EnqueueAsync( // Serialize envelope for scheduler var payloadBytes = eventSerializer.Serialize(envelope); - await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); + await using var uow = await uowManager.BeginAsync( + options: new UnitOfWorkOptions { Scope = UnitOfWorkScopeOption.RequiresNew }, + cancellationToken: cancellationToken); try { // Save to job store await jobStore.SaveAsync(jobInfo, cancellationToken); - - // Commit transaction + await uow.SaveChangesAsync(cancellationToken); + // Register scheduler to run AFTER commit is fully persisted to DB + // This prevents race condition where scheduler triggers before DB write completes + uow.OnCompleted(async _ => + { + await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); + + logger.LogInformation( + "Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", + handlerName, jobName, jobId); + }); + + // Commit transaction - OnCompleted handlers run after this completes await uow.CommitAsync(cancellationToken); - - // Schedule with configured scheduler (Dapr, Quartz, etc.) - await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); - - logger.LogInformation("Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", + + logger.LogInformation( + "Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", handlerName, jobName, jobId); return jobId; } catch (Exception ex) { - logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName); + logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, + jobName); await uow.RollbackAsync(cancellationToken); throw; } @@ -155,7 +169,8 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can // Reschedule with new schedule var payloadBytes = eventSerializer.Serialize(envelope); - await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, cancellationToken); + await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, + cancellationToken); // Save updated entity await jobStore.SaveAsync(jobInfo, cancellationToken); @@ -196,7 +211,8 @@ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken); // Update status to Cancelled - await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, cancellationToken: cancellationToken); + await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, + cancellationToken: cancellationToken); // Commit transaction await uow.CommitAsync(cancellationToken); @@ -211,5 +227,4 @@ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken throw; } } -} - +} \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs index 647f1ee..eb5b752 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs @@ -5,6 +5,7 @@ using BBT.Aether.Domain.Repositories; using BBT.Aether.Events; using BBT.Aether.MultiSchema; +using BBT.Aether.Uow; using Microsoft.Extensions.Logging; namespace BBT.Aether.BackgroundJob.Dapr; @@ -20,6 +21,7 @@ public sealed class DaprJobExecutionBridge( IJobStore jobStore, ICurrentSchema currentSchema, IEventSerializer eventSerializer, + IUnitOfWorkManager unitOfWorkManager, ILogger logger) : IJobExecutionBridge { @@ -39,7 +41,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can { currentSchema.Set(envelope.Schema); } - + var argsBytes = eventSerializer.Serialize(envelope.Data); argsPayload = new ReadOnlyMemory(argsBytes); } @@ -47,16 +49,27 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can { argsPayload = payload; } - - var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); - if (jobInfo == null) + + await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken); + try + { + var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); + if (jobInfo == null) + { + logger.LogError("Job with name '{JobName}' not found in store", jobName); + throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); + } + + // Dispatch to handler using the handler name from job entity + await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); + + await uow.CommitAsync(cancellationToken); + } + catch (Exception) { - logger.LogError("Job with name '{JobName}' not found in store", jobName); - throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); + await uow.RollbackAsync(cancellationToken); + throw; } - - // Dispatch to handler using the handler name from job entity - await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); } catch (Exception ex) { @@ -74,7 +87,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can try { var envelope = eventSerializer.Deserialize(payload); - + // Validate it's actually an envelope by checking required properties if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type)) { @@ -89,5 +102,4 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can return null; } } -} - +} \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs index 19b0a02..28c38c9 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs @@ -46,7 +46,6 @@ await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed, return; } - await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); try { // Update status to Running @@ -59,21 +58,18 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running, await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed, clock.UtcNow, cancellationToken: cancellationToken); - // Commit all changes together - await uow.CommitAsync(cancellationToken); - logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); - await HandleJobCancellationAsync(uow, jobId, cancellationToken); + await HandleJobCancellationAsync(jobId, cancellationToken); throw; } catch (Exception ex) { logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId); - await HandleJobFailureAsync(uow, jobId, ex, cancellationToken); + await HandleJobFailureAsync(jobId, ex, cancellationToken); throw; } } @@ -133,25 +129,23 @@ private async Task UpdateStatusWithinUowAsync( /// /// Handles job cancellation by updating status within the existing UoW. /// - private async Task HandleJobCancellationAsync(IUnitOfWork uow, Guid jobId, CancellationToken cancellationToken) + private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken) { try { await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled, clock.UtcNow, "Job was cancelled", cancellationToken); - await uow.CommitAsync(cancellationToken); } catch (Exception ex) { logger.LogError(ex, "Failed to update job status to Cancelled"); - await uow.RollbackAsync(cancellationToken); } } /// /// Handles job failure by updating status within the existing UoW. /// - private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception exception, + private async Task HandleJobFailureAsync(Guid jobId, Exception exception, CancellationToken cancellationToken) { try @@ -161,12 +155,10 @@ private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, clock.UtcNow, errorMessage, cancellationToken); - await uow.CommitAsync(cancellationToken); } catch (Exception ex) { logger.LogError(ex, "Failed to update job status to Failed"); - await uow.RollbackAsync(cancellationToken); } } diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs index 3665e4f..27e51c6 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs @@ -10,6 +10,7 @@ using BBT.Aether.Domain.Entities; using BBT.Aether.Domain.EntityFrameworkCore.Modeling; using BBT.Aether.Events; +using BBT.Aether.Uow; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata; @@ -21,6 +22,14 @@ public abstract class AetherDbContext( : DbContext(options) where TDbContext : DbContext { + /// + /// Gets or sets the local transaction event enqueuer. + /// When set by EfCoreTransactionSource, events will be routed directly to the owning + /// local transaction instead of relying on ambient Unit of Work context. + /// This ensures events reach the correct UoW even when SaveChanges is called directly on DbContext. + /// + public ILocalTransactionEventEnqueuer? LocalEventEnqueuer { get; set; } + private readonly static MethodInfo ConfigureBasePropertiesMethodInfo = typeof(AetherDbContext) .GetMethod( @@ -271,24 +280,35 @@ public void ClearDomainEvents() } /// - /// Publishes collected domain events to the sink (Unit of Work). - /// Called automatically during SaveChanges to push events into the UoW transaction queue. + /// Publishes collected domain events to the appropriate destination. + /// Priority order: + /// 1. LocalEventEnqueuer (direct link to owning local transaction - most reliable) + /// 2. eventSink (ambient UoW fallback - for backward compatibility) + /// This ensures events reach the correct UoW regardless of how SaveChanges is called. /// private void PublishDomainEventsToSink() { - if (eventSink is null) + var domainEvents = CollectDomainEvents(); + if (domainEvents.Count == 0) { - // No sink configured - events will be collected explicitly by UoW (backward compatibility) return; } - var domainEvents = CollectDomainEvents(); - if (domainEvents.Count == 0) + // Priority 1: Direct link to owning local transaction + // This is the most reliable path - events go directly to the transaction that owns this DbContext + if (LocalEventEnqueuer is not null) { + LocalEventEnqueuer.EnqueueEvents(domainEvents); + ClearDomainEvents(); return; } - eventSink.EnqueueDomainEvents(domainEvents); - ClearDomainEvents(); + // Priority 2: Ambient UoW fallback (backward compatibility) + // Uses unitOfWorkManager.Current which may not always point to the correct UoW + if (eventSink is not null) + { + eventSink.EnqueueDomainEvents(domainEvents); + ClearDomainEvents(); + } } } \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs index 64652c3..c60b447 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs @@ -192,6 +192,7 @@ public async Task CommitAsync(CancellationToken cancellationToken = default) try { + await SaveChangesAsync(cancellationToken); var strategy = domainEventOptions?.DispatchStrategy ?? DomainEventDispatchStrategy.AlwaysUseOutbox; if (strategy == DomainEventDispatchStrategy.AlwaysUseOutbox) diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs index 5d40404..f0f8e74 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs @@ -39,19 +39,26 @@ public async Task CreateTransactionAsync( : await dbContext.Database.BeginTransactionAsync(cancellationToken); } - return new EfCoreLocalTransaction(dbContext, transaction); + var localTx = new EfCoreLocalTransaction(dbContext, transaction); + + // CRITICAL: Establish direct link between DbContext and LocalTransaction + // This ensures events are routed to the correct UoW regardless of ambient context + dbContext.LocalEventEnqueuer = localTx; + + return localTx; } /// /// Local transaction implementation for EF Core. /// Supports lazy transaction escalation and domain event collection. - /// Events are automatically pushed via IDomainEventSink during SaveChanges. + /// Events are pushed directly from DbContext via LocalEventEnqueuer during SaveChanges. /// private sealed class EfCoreLocalTransaction( AetherDbContext context, IDbContextTransaction? transaction) : ILocalTransaction, ITransactionalLocal, ISupportsSaveChanges, IAsyncDisposable, ILocalTransactionEventEnqueuer { + private readonly AetherDbContext _context = context; private IDbContextTransaction? _transaction = transaction; private readonly List _collectedEvents = new(); @@ -72,8 +79,8 @@ public async Task EnsureTransactionAsync(IsolationLevel? isolationLevel = null, // Begin transaction with specified or default isolation level _transaction = isolationLevel.HasValue - ? await context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken) - : await context.Database.BeginTransactionAsync(cancellationToken); + ? await _context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken) + : await _context.Database.BeginTransactionAsync(cancellationToken); } /// @@ -111,18 +118,21 @@ public async Task RollbackAsync(CancellationToken cancellationToken = default) /// public void ClearCollectedEvents() { - context.ClearDomainEvents(); + _context.ClearDomainEvents(); _collectedEvents.Clear(); } public async Task SaveChangesAsync(CancellationToken cancellationToken = default) { - await context.SaveChangesAsync(cancellationToken); + await _context.SaveChangesAsync(cancellationToken); } /// public async ValueTask DisposeAsync() { + // Clear the direct link between DbContext and this transaction + _context.LocalEventEnqueuer = null; + if (_transaction != null) { await _transaction.DisposeAsync(); @@ -131,16 +141,3 @@ public async ValueTask DisposeAsync() } } } - -/// -/// Internal interface for local transactions that support event enqueueing. -/// Allows the sink to push events into transactions. -/// -internal interface ILocalTransactionEventEnqueuer -{ - /// - /// Enqueues events that were collected by DbContext during SaveChanges. - /// - /// The events to enqueue - void EnqueueEvents(IEnumerable events); -}