diff --git a/.gitignore b/.gitignore
index 78eec43..5e26958 100644
--- a/.gitignore
+++ b/.gitignore
@@ -278,3 +278,7 @@ deploy/ssh-password.txt
deploy/github-api-key.txt
deploy/_run_all_log.txt
**/**/Logs/*.json
+
+
+# AI Documentations
+ai-docs/
\ No newline at end of file
diff --git a/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs b/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs
index 30f5fdc..95b36bd 100644
--- a/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs
+++ b/framework/src/BBT.Aether.Aspects/BBT/Aether/Aspects/Uow/UnitOfWorkAttribute.cs
@@ -104,7 +104,7 @@ public async override Task OnInvokeAsync(MethodInterceptionArgs args)
throw;
}
}
-
+
private UnitOfWorkOptions CreateOptions()
{
var options = new UnitOfWorkOptions
diff --git a/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs b/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs
new file mode 100644
index 0000000..4cc142b
--- /dev/null
+++ b/framework/src/BBT.Aether.Core/BBT/Aether/Uow/ILocalTransactionEventEnqueuer.cs
@@ -0,0 +1,20 @@
+using System.Collections.Generic;
+using BBT.Aether.Events;
+
+namespace BBT.Aether.Uow;
+
+///
+/// Interface for local transactions that support event enqueueing.
+/// Allows DbContext to push events directly into its owning local transaction,
+/// ensuring events are routed to the correct Unit of Work regardless of ambient context.
+///
+public interface ILocalTransactionEventEnqueuer
+{
+ ///
+ /// Enqueues domain events that were collected by DbContext during SaveChanges.
+ /// Events pushed here will be dispatched when the owning Unit of Work commits.
+ ///
+ /// The events to enqueue
+ void EnqueueEvents(IEnumerable events);
+}
+
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs
index ed9ac1c..27354e4 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobService.cs
@@ -33,6 +33,7 @@ public sealed class BackgroundJobService(
: IBackgroundJobService
{
private const string Source = "urn:background-job";
+
///
public async Task EnqueueAsync(
string handlerName,
@@ -54,12 +55,13 @@ public async Task EnqueueAsync(
if (string.IsNullOrWhiteSpace(schedule))
throw new ArgumentNullException(nameof(schedule));
- logger.LogInformation("Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'",
+ logger.LogInformation(
+ "Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'",
handlerName, jobName, schedule);
// Create job entity
var jobId = guidGenerator.Create();
-
+
// Convert metadata to nullable dictionary for ExtraPropertyDictionary
var extraProperties = new ExtraPropertyDictionary();
if (metadata != null)
@@ -69,7 +71,7 @@ public async Task EnqueueAsync(
extraProperties[kvp.Key] = kvp.Value;
}
}
-
+
var envelope = new CloudEventEnvelope
{
Type = handlerName,
@@ -78,7 +80,7 @@ public async Task EnqueueAsync(
Schema = currentSchema.Name,
DataContentType = "application/json"
};
-
+
var jobInfo = new BackgroundJobInfo(jobId, handlerName, jobName)
{
ExpressionValue = schedule,
@@ -90,26 +92,38 @@ public async Task EnqueueAsync(
// Serialize envelope for scheduler
var payloadBytes = eventSerializer.Serialize(envelope);
- await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
+ await using var uow = await uowManager.BeginAsync(
+ options: new UnitOfWorkOptions { Scope = UnitOfWorkScopeOption.RequiresNew },
+ cancellationToken: cancellationToken);
try
{
// Save to job store
await jobStore.SaveAsync(jobInfo, cancellationToken);
-
- // Commit transaction
+ await uow.SaveChangesAsync(cancellationToken);
+ // Register scheduler to run AFTER commit is fully persisted to DB
+ // This prevents race condition where scheduler triggers before DB write completes
+ uow.OnCompleted(async _ =>
+ {
+ await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
+
+ logger.LogInformation(
+ "Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
+ handlerName, jobName, jobId);
+ });
+
+ // Commit transaction - OnCompleted handlers run after this completes
await uow.CommitAsync(cancellationToken);
-
- // Schedule with configured scheduler (Dapr, Quartz, etc.)
- await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
-
- logger.LogInformation("Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
+
+ logger.LogInformation(
+ "Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);
return jobId;
}
catch (Exception ex)
{
- logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName);
+ logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName,
+ jobName);
await uow.RollbackAsync(cancellationToken);
throw;
}
@@ -155,7 +169,8 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can
// Reschedule with new schedule
var payloadBytes = eventSerializer.Serialize(envelope);
- await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, cancellationToken);
+ await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes,
+ cancellationToken);
// Save updated entity
await jobStore.SaveAsync(jobInfo, cancellationToken);
@@ -196,7 +211,8 @@ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken
await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken);
// Update status to Cancelled
- await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, cancellationToken: cancellationToken);
+ await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow,
+ cancellationToken: cancellationToken);
// Commit transaction
await uow.CommitAsync(cancellationToken);
@@ -211,5 +227,4 @@ public async Task DeleteAsync(Guid id, CancellationToken cancellationToken
throw;
}
}
-}
-
+}
\ No newline at end of file
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
index 647f1ee..eb5b752 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
@@ -5,6 +5,7 @@
using BBT.Aether.Domain.Repositories;
using BBT.Aether.Events;
using BBT.Aether.MultiSchema;
+using BBT.Aether.Uow;
using Microsoft.Extensions.Logging;
namespace BBT.Aether.BackgroundJob.Dapr;
@@ -20,6 +21,7 @@ public sealed class DaprJobExecutionBridge(
IJobStore jobStore,
ICurrentSchema currentSchema,
IEventSerializer eventSerializer,
+ IUnitOfWorkManager unitOfWorkManager,
ILogger logger)
: IJobExecutionBridge
{
@@ -39,7 +41,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can
{
currentSchema.Set(envelope.Schema);
}
-
+
var argsBytes = eventSerializer.Serialize(envelope.Data);
argsPayload = new ReadOnlyMemory(argsBytes);
}
@@ -47,16 +49,27 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can
{
argsPayload = payload;
}
-
- var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
- if (jobInfo == null)
+
+ await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken);
+ try
+ {
+ var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
+ if (jobInfo == null)
+ {
+ logger.LogError("Job with name '{JobName}' not found in store", jobName);
+ throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
+ }
+
+ // Dispatch to handler using the handler name from job entity
+ await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
+
+ await uow.CommitAsync(cancellationToken);
+ }
+ catch (Exception)
{
- logger.LogError("Job with name '{JobName}' not found in store", jobName);
- throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
+ await uow.RollbackAsync(cancellationToken);
+ throw;
}
-
- // Dispatch to handler using the handler name from job entity
- await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
}
catch (Exception ex)
{
@@ -74,7 +87,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can
try
{
var envelope = eventSerializer.Deserialize(payload);
-
+
// Validate it's actually an envelope by checking required properties
if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
{
@@ -89,5 +102,4 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can
return null;
}
}
-}
-
+}
\ No newline at end of file
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
index 19b0a02..28c38c9 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
@@ -46,7 +46,6 @@ await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed,
return;
}
- await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
try
{
// Update status to Running
@@ -59,21 +58,18 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running,
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed,
clock.UtcNow, cancellationToken: cancellationToken);
- // Commit all changes together
- await uow.CommitAsync(cancellationToken);
-
logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
- await HandleJobCancellationAsync(uow, jobId, cancellationToken);
+ await HandleJobCancellationAsync(jobId, cancellationToken);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
- await HandleJobFailureAsync(uow, jobId, ex, cancellationToken);
+ await HandleJobFailureAsync(jobId, ex, cancellationToken);
throw;
}
}
@@ -133,25 +129,23 @@ private async Task UpdateStatusWithinUowAsync(
///
/// Handles job cancellation by updating status within the existing UoW.
///
- private async Task HandleJobCancellationAsync(IUnitOfWork uow, Guid jobId, CancellationToken cancellationToken)
+ private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
{
try
{
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled,
clock.UtcNow, "Job was cancelled", cancellationToken);
- await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Cancelled");
- await uow.RollbackAsync(cancellationToken);
}
}
///
/// Handles job failure by updating status within the existing UoW.
///
- private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception exception,
+ private async Task HandleJobFailureAsync(Guid jobId, Exception exception,
CancellationToken cancellationToken)
{
try
@@ -161,12 +155,10 @@ private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed,
clock.UtcNow, errorMessage, cancellationToken);
- await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Failed");
- await uow.RollbackAsync(cancellationToken);
}
}
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs
index 3665e4f..27e51c6 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Domain/EntityFrameworkCore/AetherDbContext.cs
@@ -10,6 +10,7 @@
using BBT.Aether.Domain.Entities;
using BBT.Aether.Domain.EntityFrameworkCore.Modeling;
using BBT.Aether.Events;
+using BBT.Aether.Uow;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;
@@ -21,6 +22,14 @@ public abstract class AetherDbContext(
: DbContext(options)
where TDbContext : DbContext
{
+ ///
+ /// Gets or sets the local transaction event enqueuer.
+ /// When set by EfCoreTransactionSource, events will be routed directly to the owning
+ /// local transaction instead of relying on ambient Unit of Work context.
+ /// This ensures events reach the correct UoW even when SaveChanges is called directly on DbContext.
+ ///
+ public ILocalTransactionEventEnqueuer? LocalEventEnqueuer { get; set; }
+
private readonly static MethodInfo ConfigureBasePropertiesMethodInfo
= typeof(AetherDbContext)
.GetMethod(
@@ -271,24 +280,35 @@ public void ClearDomainEvents()
}
///
- /// Publishes collected domain events to the sink (Unit of Work).
- /// Called automatically during SaveChanges to push events into the UoW transaction queue.
+ /// Publishes collected domain events to the appropriate destination.
+ /// Priority order:
+ /// 1. LocalEventEnqueuer (direct link to owning local transaction - most reliable)
+ /// 2. eventSink (ambient UoW fallback - for backward compatibility)
+ /// This ensures events reach the correct UoW regardless of how SaveChanges is called.
///
private void PublishDomainEventsToSink()
{
- if (eventSink is null)
+ var domainEvents = CollectDomainEvents();
+ if (domainEvents.Count == 0)
{
- // No sink configured - events will be collected explicitly by UoW (backward compatibility)
return;
}
- var domainEvents = CollectDomainEvents();
- if (domainEvents.Count == 0)
+ // Priority 1: Direct link to owning local transaction
+ // This is the most reliable path - events go directly to the transaction that owns this DbContext
+ if (LocalEventEnqueuer is not null)
{
+ LocalEventEnqueuer.EnqueueEvents(domainEvents);
+ ClearDomainEvents();
return;
}
- eventSink.EnqueueDomainEvents(domainEvents);
- ClearDomainEvents();
+ // Priority 2: Ambient UoW fallback (backward compatibility)
+ // Uses unitOfWorkManager.Current which may not always point to the correct UoW
+ if (eventSink is not null)
+ {
+ eventSink.EnqueueDomainEvents(domainEvents);
+ ClearDomainEvents();
+ }
}
}
\ No newline at end of file
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs
index 64652c3..c60b447 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/CompositeUnitOfWork.cs
@@ -192,6 +192,7 @@ public async Task CommitAsync(CancellationToken cancellationToken = default)
try
{
+ await SaveChangesAsync(cancellationToken);
var strategy = domainEventOptions?.DispatchStrategy ?? DomainEventDispatchStrategy.AlwaysUseOutbox;
if (strategy == DomainEventDispatchStrategy.AlwaysUseOutbox)
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs
index 5d40404..f0f8e74 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Uow/EntityFrameworkCore/EfCoreTransactionSource.cs
@@ -39,19 +39,26 @@ public async Task CreateTransactionAsync(
: await dbContext.Database.BeginTransactionAsync(cancellationToken);
}
- return new EfCoreLocalTransaction(dbContext, transaction);
+ var localTx = new EfCoreLocalTransaction(dbContext, transaction);
+
+ // CRITICAL: Establish direct link between DbContext and LocalTransaction
+ // This ensures events are routed to the correct UoW regardless of ambient context
+ dbContext.LocalEventEnqueuer = localTx;
+
+ return localTx;
}
///
/// Local transaction implementation for EF Core.
/// Supports lazy transaction escalation and domain event collection.
- /// Events are automatically pushed via IDomainEventSink during SaveChanges.
+ /// Events are pushed directly from DbContext via LocalEventEnqueuer during SaveChanges.
///
private sealed class EfCoreLocalTransaction(
AetherDbContext context,
IDbContextTransaction? transaction)
: ILocalTransaction, ITransactionalLocal, ISupportsSaveChanges, IAsyncDisposable, ILocalTransactionEventEnqueuer
{
+ private readonly AetherDbContext _context = context;
private IDbContextTransaction? _transaction = transaction;
private readonly List _collectedEvents = new();
@@ -72,8 +79,8 @@ public async Task EnsureTransactionAsync(IsolationLevel? isolationLevel = null,
// Begin transaction with specified or default isolation level
_transaction = isolationLevel.HasValue
- ? await context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken)
- : await context.Database.BeginTransactionAsync(cancellationToken);
+ ? await _context.Database.BeginTransactionAsync(isolationLevel.Value, cancellationToken)
+ : await _context.Database.BeginTransactionAsync(cancellationToken);
}
///
@@ -111,18 +118,21 @@ public async Task RollbackAsync(CancellationToken cancellationToken = default)
///
public void ClearCollectedEvents()
{
- context.ClearDomainEvents();
+ _context.ClearDomainEvents();
_collectedEvents.Clear();
}
public async Task SaveChangesAsync(CancellationToken cancellationToken = default)
{
- await context.SaveChangesAsync(cancellationToken);
+ await _context.SaveChangesAsync(cancellationToken);
}
///
public async ValueTask DisposeAsync()
{
+ // Clear the direct link between DbContext and this transaction
+ _context.LocalEventEnqueuer = null;
+
if (_transaction != null)
{
await _transaction.DisposeAsync();
@@ -131,16 +141,3 @@ public async ValueTask DisposeAsync()
}
}
}
-
-///
-/// Internal interface for local transactions that support event enqueueing.
-/// Allows the sink to push events into transactions.
-///
-internal interface ILocalTransactionEventEnqueuer
-{
- ///
- /// Enqueues events that were collected by DbContext during SaveChanges.
- ///
- /// The events to enqueue
- void EnqueueEvents(IEnumerable events);
-}