-
Notifications
You must be signed in to change notification settings - Fork 0
Improve event routing and transaction handling in UoW and background … #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| using System.Collections.Generic; | ||
| using BBT.Aether.Events; | ||
|
|
||
| namespace BBT.Aether.Uow; | ||
|
|
||
| /// <summary> | ||
| /// Interface for local transactions that support event enqueueing. | ||
| /// Allows DbContext to push events directly into its owning local transaction, | ||
| /// ensuring events are routed to the correct Unit of Work regardless of ambient context. | ||
| /// </summary> | ||
| public interface ILocalTransactionEventEnqueuer | ||
| { | ||
| /// <summary> | ||
| /// Enqueues domain events that were collected by DbContext during SaveChanges. | ||
| /// Events pushed here will be dispatched when the owning Unit of Work commits. | ||
| /// </summary> | ||
| /// <param name="events">The events to enqueue</param> | ||
| void EnqueueEvents(IEnumerable<DomainEventEnvelope> events); | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ public sealed class BackgroundJobService( | |
| : IBackgroundJobService | ||
| { | ||
| private const string Source = "urn:background-job"; | ||
|
|
||
| /// <inheritdoc/> | ||
| public async Task<Guid> EnqueueAsync<TPayload>( | ||
| string handlerName, | ||
|
|
@@ -54,12 +55,13 @@ public async Task<Guid> EnqueueAsync<TPayload>( | |
| if (string.IsNullOrWhiteSpace(schedule)) | ||
| throw new ArgumentNullException(nameof(schedule)); | ||
|
|
||
| logger.LogInformation("Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", | ||
| logger.LogInformation( | ||
| "Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", | ||
| handlerName, jobName, schedule); | ||
|
|
||
| // Create job entity | ||
| var jobId = guidGenerator.Create(); | ||
|
|
||
| // Convert metadata to nullable dictionary for ExtraPropertyDictionary | ||
| var extraProperties = new ExtraPropertyDictionary(); | ||
| if (metadata != null) | ||
|
|
@@ -69,7 +71,7 @@ public async Task<Guid> EnqueueAsync<TPayload>( | |
| extraProperties[kvp.Key] = kvp.Value; | ||
| } | ||
| } | ||
|
|
||
| var envelope = new CloudEventEnvelope<TPayload> | ||
| { | ||
| Type = handlerName, | ||
|
|
@@ -78,7 +80,7 @@ public async Task<Guid> EnqueueAsync<TPayload>( | |
| Schema = currentSchema.Name, | ||
| DataContentType = "application/json" | ||
| }; | ||
|
|
||
| var jobInfo = new BackgroundJobInfo(jobId, handlerName, jobName) | ||
| { | ||
| ExpressionValue = schedule, | ||
|
|
@@ -90,26 +92,38 @@ public async Task<Guid> EnqueueAsync<TPayload>( | |
| // Serialize envelope for scheduler | ||
| var payloadBytes = eventSerializer.Serialize(envelope); | ||
|
|
||
| await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); | ||
| await using var uow = await uowManager.BeginAsync( | ||
| options: new UnitOfWorkOptions { Scope = UnitOfWorkScopeOption.RequiresNew }, | ||
| cancellationToken: cancellationToken); | ||
| try | ||
| { | ||
| // Save to job store | ||
| await jobStore.SaveAsync(jobInfo, cancellationToken); | ||
|
|
||
| // Commit transaction | ||
| await uow.SaveChangesAsync(cancellationToken); | ||
| // Register scheduler to run AFTER commit is fully persisted to DB | ||
| // This prevents race condition where scheduler triggers before DB write completes | ||
| uow.OnCompleted(async _ => | ||
| { | ||
| await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); | ||
|
|
||
| logger.LogInformation( | ||
| "Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", | ||
| handlerName, jobName, jobId); | ||
| }); | ||
|
Comment on lines
+105
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The uow.OnCompleted(async _ =>
{
// Use CancellationToken.None to ensure scheduling is not cancelled
// by the original request's token after the UoW has committed.
await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, CancellationToken.None);
logger.LogInformation(
"Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}",
handlerName, jobName, jobId);
}); |
||
|
|
||
| // Commit transaction - OnCompleted handlers run after this completes | ||
| await uow.CommitAsync(cancellationToken); | ||
|
|
||
| // Schedule with configured scheduler (Dapr, Quartz, etc.) | ||
| await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); | ||
|
|
||
| logger.LogInformation("Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", | ||
|
|
||
| logger.LogInformation( | ||
| "Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", | ||
| handlerName, jobName, jobId); | ||
|
|
||
| return jobId; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName); | ||
| logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, | ||
| jobName); | ||
| await uow.RollbackAsync(cancellationToken); | ||
| throw; | ||
| } | ||
|
|
@@ -155,7 +169,8 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can | |
|
|
||
| // Reschedule with new schedule | ||
| var payloadBytes = eventSerializer.Serialize(envelope); | ||
| await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, cancellationToken); | ||
| await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, | ||
| cancellationToken); | ||
|
|
||
| // Save updated entity | ||
| await jobStore.SaveAsync(jobInfo, cancellationToken); | ||
|
|
@@ -196,7 +211,8 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken | |
| await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken); | ||
|
|
||
| // Update status to Cancelled | ||
| await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, cancellationToken: cancellationToken); | ||
| await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, | ||
| cancellationToken: cancellationToken); | ||
|
|
||
| // Commit transaction | ||
| await uow.CommitAsync(cancellationToken); | ||
|
|
@@ -211,5 +227,4 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken | |
| throw; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): Reusing the original cancellation token in OnCompleted can leave the job persisted but never scheduled.
Because the callback runs after the transaction commits, reusing the
EnqueueAsynccancellationTokenmeans a cancellation between commit and callback can prevent scheduling while the job is already persisted. UseCancellationToken.None(or a dedicated timeout token) inOnCompleted, and reserve the original token for operations up to and includingSaveChangesAsync.