Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,7 @@ deploy/ssh-password.txt
deploy/github-api-key.txt
deploy/_run_all_log.txt
**/**/Logs/*.json


# AI Documentations
ai-docs/
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public async override Task OnInvokeAsync(MethodInterceptionArgs args)
throw;
}
}

private UnitOfWorkOptions CreateOptions()
{
var options = new UnitOfWorkOptions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Collections.Generic;
using BBT.Aether.Events;

namespace BBT.Aether.Uow;

/// <summary>
/// Interface for local transactions that support event enqueueing.
/// Allows DbContext to push events directly into its owning local transaction,
/// ensuring events are routed to the correct Unit of Work regardless of ambient context.
/// </summary>
public interface ILocalTransactionEventEnqueuer
{
/// <summary>
/// Enqueues domain events that were collected by DbContext during SaveChanges.
/// Events pushed here will be dispatched when the owning Unit of Work commits.
/// </summary>
/// <param name="events">The events to enqueue</param>
void EnqueueEvents(IEnumerable<DomainEventEnvelope> events);
}

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public sealed class BackgroundJobService(
: IBackgroundJobService
{
private const string Source = "urn:background-job";

/// <inheritdoc/>
public async Task<Guid> EnqueueAsync<TPayload>(
string handlerName,
Expand All @@ -54,12 +55,13 @@ public async Task<Guid> EnqueueAsync<TPayload>(
if (string.IsNullOrWhiteSpace(schedule))
throw new ArgumentNullException(nameof(schedule));

logger.LogInformation("Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'",
logger.LogInformation(
"Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'",
handlerName, jobName, schedule);

// Create job entity
var jobId = guidGenerator.Create();

// Convert metadata to nullable dictionary for ExtraPropertyDictionary
var extraProperties = new ExtraPropertyDictionary();
if (metadata != null)
Expand All @@ -69,7 +71,7 @@ public async Task<Guid> EnqueueAsync<TPayload>(
extraProperties[kvp.Key] = kvp.Value;
}
}

var envelope = new CloudEventEnvelope<TPayload>
{
Type = handlerName,
Expand All @@ -78,7 +80,7 @@ public async Task<Guid> EnqueueAsync<TPayload>(
Schema = currentSchema.Name,
DataContentType = "application/json"
};

var jobInfo = new BackgroundJobInfo(jobId, handlerName, jobName)
{
ExpressionValue = schedule,
Expand All @@ -90,26 +92,38 @@ public async Task<Guid> EnqueueAsync<TPayload>(
// Serialize envelope for scheduler
var payloadBytes = eventSerializer.Serialize(envelope);

await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
await using var uow = await uowManager.BeginAsync(
options: new UnitOfWorkOptions { Scope = UnitOfWorkScopeOption.RequiresNew },
cancellationToken: cancellationToken);
try
{
// Save to job store
await jobStore.SaveAsync(jobInfo, cancellationToken);

// Commit transaction
await uow.SaveChangesAsync(cancellationToken);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This call to uow.SaveChangesAsync() is redundant. The CompositeUnitOfWork.CommitAsync() method, which is called later, now includes a call to SaveChangesAsync() at the beginning of its execution. Removing this line will avoid the duplicate call without changing the behavior.

// Register scheduler to run AFTER commit is fully persisted to DB
// This prevents race condition where scheduler triggers before DB write completes
uow.OnCompleted(async _ =>
{
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);
Comment on lines +105 to +107
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Reconsider using the original CancellationToken in the OnCompleted callback.

If the original cancellationToken is canceled by the time OnCompleted runs (e.g., HTTP request aborted after the commit), ScheduleAsync and logging will be skipped even though the job was successfully persisted. That can leave jobs stored but never scheduled. Use CancellationToken.None (or a separate, clearly scoped token) for the post-commit scheduling/logging work, and reserve the original token for transactional operations only.


logger.LogInformation(
"Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);
});

// Commit transaction - OnCompleted handlers run after this completes
await uow.CommitAsync(cancellationToken);

// Schedule with configured scheduler (Dapr, Quartz, etc.)
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken);

logger.LogInformation("Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",

logger.LogInformation(
"Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);

return jobId;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName);
logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName,
jobName);
await uow.RollbackAsync(cancellationToken);
throw;
}
Expand Down Expand Up @@ -155,7 +169,8 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can

// Reschedule with new schedule
var payloadBytes = eventSerializer.Serialize(envelope);
await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, cancellationToken);
await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes,
cancellationToken);

// Save updated entity
await jobStore.SaveAsync(jobInfo, cancellationToken);
Expand Down Expand Up @@ -196,7 +211,8 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken
await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken);

// Update status to Cancelled
await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, cancellationToken: cancellationToken);
await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow,
cancellationToken: cancellationToken);

// Commit transaction
await uow.CommitAsync(cancellationToken);
Expand All @@ -211,5 +227,4 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken
throw;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using BBT.Aether.Domain.Repositories;
using BBT.Aether.Events;
using BBT.Aether.MultiSchema;
using BBT.Aether.Uow;
using Microsoft.Extensions.Logging;

namespace BBT.Aether.BackgroundJob.Dapr;
Expand All @@ -20,6 +21,7 @@ public sealed class DaprJobExecutionBridge(
IJobStore jobStore,
ICurrentSchema currentSchema,
IEventSerializer eventSerializer,
IUnitOfWorkManager unitOfWorkManager,
ILogger<DaprJobExecutionBridge> logger)
: IJobExecutionBridge
{
Expand All @@ -39,24 +41,35 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can
{
currentSchema.Set(envelope.Schema);
}

var argsBytes = eventSerializer.Serialize(envelope.Data);
argsPayload = new ReadOnlyMemory<byte>(argsBytes);
}
else
{
argsPayload = payload;
}

var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
if (jobInfo == null)

await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken);
try
{
var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
if (jobInfo == null)
{
logger.LogError("Job with name '{JobName}' not found in store", jobName);
throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
}

Comment on lines +53 to +62
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Status updates on job failure/cancellation are likely being rolled back with the Unit of Work.

Because the bridge now owns the outer UoW, the failure/cancellation path ends up like this:

  • JobDispatcher.ExecuteAsync sets status to Running in its own RequiresNew UoW.
  • On handler failure/cancellation it calls HandleJobFailureAsync/HandleJobCancellationAsync, which now rely on the bridge’s ambient UoW.
  • The bridge then catches the exception and rolls back that UoW.

So the Failed/Cancelled updates are rolled back and the job remains Running (or older), unlike the previous implementation where those updates were committed in a local UoW.

I’d suggest either keeping failure/cancellation status updates in their own RequiresNew UoW, or adjusting the bridge so job metadata commits even when the dispatcher throws (e.g., separating handler side effects from job status persistence).

// Dispatch to handler using the handler name from job entity
await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);

await uow.CommitAsync(cancellationToken);
}
catch (Exception)
{
logger.LogError("Job with name '{JobName}' not found in store", jobName);
throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
await uow.RollbackAsync(cancellationToken);
throw;
}

// Dispatch to handler using the handler name from job entity
await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
}
catch (Exception ex)
{
Expand All @@ -74,7 +87,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can
try
{
var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload);

// Validate it's actually an envelope by checking required properties
if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
{
Expand All @@ -89,5 +102,4 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can
return null;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed,
return;
}

await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
try
{
// Update status to Running
Expand All @@ -59,21 +58,18 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running,
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed,
clock.UtcNow, cancellationToken: cancellationToken);

// Commit all changes together
await uow.CommitAsync(cancellationToken);

logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
await HandleJobCancellationAsync(uow, jobId, cancellationToken);
await HandleJobCancellationAsync(jobId, cancellationToken);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
await HandleJobFailureAsync(uow, jobId, ex, cancellationToken);
await HandleJobFailureAsync(jobId, ex, cancellationToken);
throw;
}
}
Expand Down Expand Up @@ -133,25 +129,23 @@ private async Task UpdateStatusWithinUowAsync(
/// <summary>
/// Handles job cancellation by updating status within the existing UoW.
/// </summary>
private async Task HandleJobCancellationAsync(IUnitOfWork uow, Guid jobId, CancellationToken cancellationToken)
private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
{
try
{
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled,
clock.UtcNow, "Job was cancelled", cancellationToken);
await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Cancelled");
await uow.RollbackAsync(cancellationToken);
}
}
Comment on lines +132 to 143

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The status update to Cancelled in this method will be rolled back. The DispatchAsync method calls this from within a catch block, but the DaprJobExecutionBridge (the caller of DispatchAsync) will roll back the entire Unit of Work upon catching the exception. This means the status update is lost, and the job might remain in a Running state in the database.

To fix this, the status update should be performed in a new, separate transaction. You can achieve this by using the existing helper method UpdateStatusWithinUowAsync.

    private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
    {
        await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Cancelled, "Job was cancelled", cancellationToken);
    }


/// <summary>
/// Handles job failure by updating status within the existing UoW.
/// </summary>
private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception exception,
private async Task HandleJobFailureAsync(Guid jobId, Exception exception,
CancellationToken cancellationToken)
{
try
Expand All @@ -161,12 +155,10 @@ private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception

await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed,
clock.UtcNow, errorMessage, cancellationToken);
await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Failed");
await uow.RollbackAsync(cancellationToken);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using BBT.Aether.Domain.Entities;
using BBT.Aether.Domain.EntityFrameworkCore.Modeling;
using BBT.Aether.Events;
using BBT.Aether.Uow;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;

Expand All @@ -21,6 +22,14 @@ public abstract class AetherDbContext<TDbContext>(
: DbContext(options)
where TDbContext : DbContext
{
/// <summary>
/// Gets or sets the local transaction event enqueuer.
/// When set by EfCoreTransactionSource, events will be routed directly to the owning
/// local transaction instead of relying on ambient Unit of Work context.
/// This ensures events reach the correct UoW even when SaveChanges is called directly on DbContext.
/// </summary>
public ILocalTransactionEventEnqueuer? LocalEventEnqueuer { get; set; }

private readonly static MethodInfo ConfigureBasePropertiesMethodInfo
= typeof(AetherDbContext<TDbContext>)
.GetMethod(
Expand Down Expand Up @@ -271,24 +280,35 @@ public void ClearDomainEvents()
}

/// <summary>
/// Publishes collected domain events to the sink (Unit of Work).
/// Called automatically during SaveChanges to push events into the UoW transaction queue.
/// Publishes collected domain events to the appropriate destination.
/// Priority order:
/// 1. LocalEventEnqueuer (direct link to owning local transaction - most reliable)
/// 2. eventSink (ambient UoW fallback - for backward compatibility)
/// This ensures events reach the correct UoW regardless of how SaveChanges is called.
/// </summary>
private void PublishDomainEventsToSink()
{
if (eventSink is null)
var domainEvents = CollectDomainEvents();
if (domainEvents.Count == 0)
{
// No sink configured - events will be collected explicitly by UoW (backward compatibility)
return;
}

var domainEvents = CollectDomainEvents();
if (domainEvents.Count == 0)
// Priority 1: Direct link to owning local transaction
// This is the most reliable path - events go directly to the transaction that owns this DbContext
if (LocalEventEnqueuer is not null)
{
LocalEventEnqueuer.EnqueueEvents(domainEvents);
ClearDomainEvents();
return;
}

eventSink.EnqueueDomainEvents(domainEvents);
ClearDomainEvents();
// Priority 2: Ambient UoW fallback (backward compatibility)
// Uses unitOfWorkManager.Current which may not always point to the correct UoW
if (eventSink is not null)
{
eventSink.EnqueueDomainEvents(domainEvents);
ClearDomainEvents();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public async Task CommitAsync(CancellationToken cancellationToken = default)

try
{
await SaveChangesAsync(cancellationToken);
var strategy = domainEventOptions?.DispatchStrategy ?? DomainEventDispatchStrategy.AlwaysUseOutbox;

if (strategy == DomainEventDispatchStrategy.AlwaysUseOutbox)
Expand Down
Loading
Loading