Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,7 @@ deploy/ssh-password.txt
deploy/github-api-key.txt
deploy/_run_all_log.txt
**/**/Logs/*.json


# AI Documentations
ai-docs/
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public async override Task OnInvokeAsync(MethodInterceptionArgs args)
throw;
}
}

private UnitOfWorkOptions CreateOptions()
{
var options = new UnitOfWorkOptions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Collections.Generic;
using BBT.Aether.Events;

namespace BBT.Aether.Uow;

/// <summary>
/// Interface for local transactions that support event enqueueing.
/// Allows DbContext to push events directly into its owning local transaction,
/// ensuring events are routed to the correct Unit of Work regardless of ambient context.
/// </summary>
public interface ILocalTransactionEventEnqueuer
{
/// <summary>
/// Enqueues domain events that were collected by DbContext during SaveChanges.
/// Events pushed here will be dispatched when the owning Unit of Work commits.
/// </summary>
/// <param name="events">The events to enqueue</param>
void EnqueueEvents(IEnumerable<DomainEventEnvelope> events);
}

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public sealed class BackgroundJobService(
: IBackgroundJobService
{
private const string Source = "urn:background-job";

/// <inheritdoc/>
public async Task<Guid> EnqueueAsync<TPayload>(
string handlerName,
Expand All @@ -54,12 +55,13 @@ public async Task<Guid> EnqueueAsync<TPayload>(
if (string.IsNullOrWhiteSpace(schedule))
throw new ArgumentNullException(nameof(schedule));

logger.LogInformation("Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'",
logger.LogInformation(
"Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'",
handlerName, jobName, schedule);

// Create job entity
var jobId = guidGenerator.Create();

// Convert metadata to nullable dictionary for ExtraPropertyDictionary
var extraProperties = new ExtraPropertyDictionary();
if (metadata != null)
Expand All @@ -69,7 +71,7 @@ public async Task<Guid> EnqueueAsync<TPayload>(
extraProperties[kvp.Key] = kvp.Value;
}
}

var envelope = new CloudEventEnvelope<TPayload>
{
Type = handlerName,
Expand All @@ -78,7 +80,7 @@ public async Task<Guid> EnqueueAsync<TPayload>(
Schema = currentSchema.Name,
DataContentType = "application/json"
};

var jobInfo = new BackgroundJobInfo(jobId, handlerName, jobName)
{
ExpressionValue = schedule,
Expand All @@ -90,26 +92,38 @@ public async Task<Guid> EnqueueAsync<TPayload>(
// Serialize envelope for scheduler
var payloadBytes = eventSerializer.Serialize(envelope);

await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
await using var uow = await uowManager.BeginAsync(
options: new UnitOfWorkOptions { Scope = UnitOfWorkScopeOption.RequiresNew },
cancellationToken: cancellationToken);
try
{
// Save to job store
await jobStore.SaveAsync(jobInfo, cancellationToken);

// Commit transaction
await uow.SaveChangesAsync(cancellationToken);
// Register scheduler to run AFTER commit is fully persisted to DB
// This prevents race condition where scheduler triggers before DB write completes
uow.OnCompleted(async _ =>
{
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
Comment on lines +105 to +107
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Reusing the original cancellation token in OnCompleted can leave the job persisted but never scheduled.

Because the callback runs after the transaction commits, reusing the EnqueueAsync cancellationToken means a cancellation between commit and callback can prevent scheduling while the job is already persisted. Use CancellationToken.None (or a dedicated timeout token) in OnCompleted, and reserve the original token for operations up to and including SaveChangesAsync.


logger.LogInformation(
"Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);
});
Comment on lines +105 to +112

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The OnCompleted handler captures the cancellationToken from the EnqueueAsync method's scope. This could lead to an issue where if the original request is cancelled after the transaction has been committed but before the OnCompleted handler finishes, the job scheduling via jobScheduler.ScheduleAsync would also be cancelled. This would leave the system in an inconsistent state where the BackgroundJobInfo entity is persisted in the database, but the job is not actually scheduled to run. Since the OnCompleted handler executes after the commit is successful, its operation should be decoupled from the cancellation of the original request. Using CancellationToken.None will ensure the scheduling proceeds regardless of the original request's status post-commit, making the process more robust.

            uow.OnCompleted(async _ =>
            {
                // Use CancellationToken.None to ensure scheduling is not cancelled
                // by the original request's token after the UoW has committed.
                await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, CancellationToken.None);
                
                logger.LogInformation(
                    "Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
                    handlerName, jobName, jobId);
            });


// Commit transaction - OnCompleted handlers run after this completes
await uow.CommitAsync(cancellationToken);

// Schedule with configured scheduler (Dapr, Quartz, etc.)
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);

logger.LogInformation("Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",

logger.LogInformation(
"Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);

return jobId;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName);
logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName,
jobName);
await uow.RollbackAsync(cancellationToken);
throw;
}
Expand Down Expand Up @@ -155,7 +169,8 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can

// Reschedule with new schedule
var payloadBytes = eventSerializer.Serialize(envelope);
await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, cancellationToken);
await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes,
cancellationToken);

// Save updated entity
await jobStore.SaveAsync(jobInfo, cancellationToken);
Expand Down Expand Up @@ -196,7 +211,8 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken
await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken);

// Update status to Cancelled
await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, cancellationToken: cancellationToken);
await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow,
cancellationToken: cancellationToken);

// Commit transaction
await uow.CommitAsync(cancellationToken);
Expand All @@ -211,5 +227,4 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken
throw;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using BBT.Aether.Domain.Repositories;
using BBT.Aether.Events;
using BBT.Aether.MultiSchema;
using BBT.Aether.Uow;
using Microsoft.Extensions.Logging;

namespace BBT.Aether.BackgroundJob.Dapr;
Expand All @@ -20,6 +21,7 @@ public sealed class DaprJobExecutionBridge(
IJobStore jobStore,
ICurrentSchema currentSchema,
IEventSerializer eventSerializer,
IUnitOfWorkManager unitOfWorkManager,
ILogger<DaprJobExecutionBridge> logger)
: IJobExecutionBridge
{
Expand All @@ -39,24 +41,35 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can
{
currentSchema.Set(envelope.Schema);
}

var argsBytes = eventSerializer.Serialize(envelope.Data);
argsPayload = new ReadOnlyMemory<byte>(argsBytes);
}
else
{
argsPayload = payload;
}

var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
if (jobInfo == null)

await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken);
try
{
var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
if (jobInfo == null)
{
logger.LogError("Job with name '{JobName}' not found in store", jobName);
throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
}

// Dispatch to handler using the handler name from job entity
await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);

await uow.CommitAsync(cancellationToken);
}
catch (Exception)
{
logger.LogError("Job with name '{JobName}' not found in store", jobName);
throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
await uow.RollbackAsync(cancellationToken);
throw;
}

// Dispatch to handler using the handler name from job entity
await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
}
catch (Exception ex)
{
Expand All @@ -74,7 +87,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can
try
{
var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload);

// Validate it's actually an envelope by checking required properties
if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
{
Expand All @@ -89,5 +102,4 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can
return null;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed,
return;
}

await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
try
{
// Update status to Running
Expand All @@ -59,21 +58,18 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running,
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed,
clock.UtcNow, cancellationToken: cancellationToken);

// Commit all changes together
await uow.CommitAsync(cancellationToken);

logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
await HandleJobCancellationAsync(uow, jobId, cancellationToken);
await HandleJobCancellationAsync(jobId, cancellationToken);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
await HandleJobFailureAsync(uow, jobId, ex, cancellationToken);
await HandleJobFailureAsync(jobId, ex, cancellationToken);
throw;
}
}
Expand Down Expand Up @@ -133,25 +129,23 @@ private async Task UpdateStatusWithinUowAsync(
/// <summary>
/// Handles job cancellation by updating status within the existing UoW.
/// </summary>
private async Task HandleJobCancellationAsync(IUnitOfWork uow, Guid jobId, CancellationToken cancellationToken)
private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
{
try
{
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled,
clock.UtcNow, "Job was cancelled", cancellationToken);
await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Cancelled");
await uow.RollbackAsync(cancellationToken);
}
}

/// <summary>
/// Handles job failure by updating status within the existing UoW.
/// </summary>
private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception exception,
private async Task HandleJobFailureAsync(Guid jobId, Exception exception,
CancellationToken cancellationToken)
{
try
Expand All @@ -161,12 +155,10 @@ private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception

await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed,
clock.UtcNow, errorMessage, cancellationToken);
await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Failed");
await uow.RollbackAsync(cancellationToken);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using BBT.Aether.Domain.Entities;
using BBT.Aether.Domain.EntityFrameworkCore.Modeling;
using BBT.Aether.Events;
using BBT.Aether.Uow;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;

Expand All @@ -21,6 +22,14 @@ public abstract class AetherDbContext<TDbContext>(
: DbContext(options)
where TDbContext : DbContext
{
/// <summary>
/// Gets or sets the local transaction event enqueuer.
/// When set by EfCoreTransactionSource, events will be routed directly to the owning
/// local transaction instead of relying on ambient Unit of Work context.
/// This ensures events reach the correct UoW even when SaveChanges is called directly on DbContext.
/// </summary>
public ILocalTransactionEventEnqueuer? LocalEventEnqueuer { get; set; }

private readonly static MethodInfo ConfigureBasePropertiesMethodInfo
= typeof(AetherDbContext<TDbContext>)
.GetMethod(
Expand Down Expand Up @@ -271,24 +280,35 @@ public void ClearDomainEvents()
}

/// <summary>
/// Publishes collected domain events to the sink (Unit of Work).
/// Called automatically during SaveChanges to push events into the UoW transaction queue.
/// Publishes collected domain events to the appropriate destination.
/// Priority order:
/// 1. LocalEventEnqueuer (direct link to owning local transaction - most reliable)
/// 2. eventSink (ambient UoW fallback - for backward compatibility)
/// This ensures events reach the correct UoW regardless of how SaveChanges is called.
/// </summary>
private void PublishDomainEventsToSink()
{
if (eventSink is null)
var domainEvents = CollectDomainEvents();
if (domainEvents.Count == 0)
{
// No sink configured - events will be collected explicitly by UoW (backward compatibility)
return;
}

var domainEvents = CollectDomainEvents();
if (domainEvents.Count == 0)
// Priority 1: Direct link to owning local transaction
// This is the most reliable path - events go directly to the transaction that owns this DbContext
if (LocalEventEnqueuer is not null)
{
LocalEventEnqueuer.EnqueueEvents(domainEvents);
ClearDomainEvents();
return;
}

eventSink.EnqueueDomainEvents(domainEvents);
ClearDomainEvents();
// Priority 2: Ambient UoW fallback (backward compatibility)
// Uses unitOfWorkManager.Current which may not always point to the correct UoW
if (eventSink is not null)
{
eventSink.EnqueueDomainEvents(domainEvents);
ClearDomainEvents();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public async Task CommitAsync(CancellationToken cancellationToken = default)

try
{
await SaveChangesAsync(cancellationToken);
var strategy = domainEventOptions?.DispatchStrategy ?? DomainEventDispatchStrategy.AlwaysUseOutbox;

if (strategy == DomainEventDispatchStrategy.AlwaysUseOutbox)
Expand Down
Loading