-
Notifications
You must be signed in to change notification settings - Fork 0
Release v1.0 #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release v1.0 #29
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| using System.Collections.Generic; | ||
| using BBT.Aether.Events; | ||
|
|
||
| namespace BBT.Aether.Uow; | ||
|
|
||
| /// <summary> | ||
| /// Interface for local transactions that support event enqueueing. | ||
| /// Allows DbContext to push events directly into its owning local transaction, | ||
| /// ensuring events are routed to the correct Unit of Work regardless of ambient context. | ||
| /// </summary> | ||
| public interface ILocalTransactionEventEnqueuer | ||
| { | ||
| /// <summary> | ||
| /// Enqueues domain events that were collected by DbContext during SaveChanges. | ||
| /// Events pushed here will be dispatched when the owning Unit of Work commits. | ||
| /// </summary> | ||
| /// <param name="events">The events to enqueue</param> | ||
| void EnqueueEvents(IEnumerable<DomainEventEnvelope> events); | ||
| } | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ public sealed class BackgroundJobService( | |
| : IBackgroundJobService | ||
| { | ||
| private const string Source = "urn:background-job"; | ||
|
|
||
| /// <inheritdoc/> | ||
| public async Task<Guid> EnqueueAsync<TPayload>( | ||
| string handlerName, | ||
|
|
@@ -54,12 +55,13 @@ public async Task<Guid> EnqueueAsync<TPayload>( | |
| if (string.IsNullOrWhiteSpace(schedule)) | ||
| throw new ArgumentNullException(nameof(schedule)); | ||
|
|
||
| logger.LogInformation("Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", | ||
| logger.LogInformation( | ||
| "Enqueueing job handler '{HandlerName}' with job name '{JobName}' and schedule '{Schedule}'", | ||
| handlerName, jobName, schedule); | ||
|
|
||
| // Create job entity | ||
| var jobId = guidGenerator.Create(); | ||
|
|
||
| // Convert metadata to nullable dictionary for ExtraPropertyDictionary | ||
| var extraProperties = new ExtraPropertyDictionary(); | ||
| if (metadata != null) | ||
|
|
@@ -69,7 +71,7 @@ public async Task<Guid> EnqueueAsync<TPayload>( | |
| extraProperties[kvp.Key] = kvp.Value; | ||
| } | ||
| } | ||
|
|
||
| var envelope = new CloudEventEnvelope<TPayload> | ||
| { | ||
| Type = handlerName, | ||
|
|
@@ -78,7 +80,7 @@ public async Task<Guid> EnqueueAsync<TPayload>( | |
| Schema = currentSchema.Name, | ||
| DataContentType = "application/json" | ||
| }; | ||
|
|
||
| var jobInfo = new BackgroundJobInfo(jobId, handlerName, jobName) | ||
| { | ||
| ExpressionValue = schedule, | ||
|
|
@@ -90,26 +92,38 @@ public async Task<Guid> EnqueueAsync<TPayload>( | |
| // Serialize envelope for scheduler | ||
| var payloadBytes = eventSerializer.Serialize(envelope); | ||
|
|
||
| await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); | ||
| await using var uow = await uowManager.BeginAsync( | ||
| options: new UnitOfWorkOptions { Scope = UnitOfWorkScopeOption.RequiresNew }, | ||
| cancellationToken: cancellationToken); | ||
| try | ||
| { | ||
| // Save to job store | ||
| await jobStore.SaveAsync(jobInfo, cancellationToken); | ||
|
|
||
| // Commit transaction | ||
| await uow.SaveChangesAsync(cancellationToken); | ||
| // Register scheduler to run AFTER commit is fully persisted to DB | ||
| // This prevents race condition where scheduler triggers before DB write completes | ||
| uow.OnCompleted(async _ => | ||
| { | ||
| await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); | ||
|
Comment on lines
+105
to
+107
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Reconsider using the original CancellationToken in the OnCompleted callback. If the original |
||
|
|
||
| logger.LogInformation( | ||
| "Successfully scheduled job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", | ||
| handlerName, jobName, jobId); | ||
| }); | ||
|
|
||
| // Commit transaction - OnCompleted handlers run after this completes | ||
| await uow.CommitAsync(cancellationToken); | ||
|
|
||
| // Schedule with configured scheduler (Dapr, Quartz, etc.) | ||
| await jobScheduler.ScheduleAsync(handlerName, jobName, schedule, payloadBytes, cancellationToken); | ||
|
|
||
| logger.LogInformation("Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", | ||
|
|
||
| logger.LogInformation( | ||
| "Successfully enqueued job handler '{HandlerName}' with job name '{JobName}'. Entity ID: {EntityId}", | ||
| handlerName, jobName, jobId); | ||
|
|
||
| return jobId; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, jobName); | ||
| logger.LogError(ex, "Failed to enqueue job handler '{HandlerName}' with job name '{JobName}'", handlerName, | ||
| jobName); | ||
| await uow.RollbackAsync(cancellationToken); | ||
| throw; | ||
| } | ||
|
|
@@ -155,7 +169,8 @@ public async Task UpdateAsync(Guid id, string newSchedule, CancellationToken can | |
|
|
||
| // Reschedule with new schedule | ||
| var payloadBytes = eventSerializer.Serialize(envelope); | ||
| await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, cancellationToken); | ||
| await jobScheduler.ScheduleAsync(jobInfo.HandlerName, jobInfo.JobName, newSchedule, payloadBytes, | ||
| cancellationToken); | ||
|
|
||
| // Save updated entity | ||
| await jobStore.SaveAsync(jobInfo, cancellationToken); | ||
|
|
@@ -196,7 +211,8 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken | |
| await jobScheduler.DeleteAsync(jobInfo.HandlerName, jobInfo.JobName, cancellationToken); | ||
|
|
||
| // Update status to Cancelled | ||
| await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, cancellationToken: cancellationToken); | ||
| await jobStore.UpdateStatusAsync(id, BackgroundJobStatus.Cancelled, clock.UtcNow, | ||
| cancellationToken: cancellationToken); | ||
|
|
||
| // Commit transaction | ||
| await uow.CommitAsync(cancellationToken); | ||
|
|
@@ -211,5 +227,4 @@ public async Task<bool> DeleteAsync(Guid id, CancellationToken cancellationToken | |
| throw; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| using BBT.Aether.Domain.Repositories; | ||
| using BBT.Aether.Events; | ||
| using BBT.Aether.MultiSchema; | ||
| using BBT.Aether.Uow; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| namespace BBT.Aether.BackgroundJob.Dapr; | ||
|
|
@@ -20,6 +21,7 @@ public sealed class DaprJobExecutionBridge( | |
| IJobStore jobStore, | ||
| ICurrentSchema currentSchema, | ||
| IEventSerializer eventSerializer, | ||
| IUnitOfWorkManager unitOfWorkManager, | ||
| ILogger<DaprJobExecutionBridge> logger) | ||
| : IJobExecutionBridge | ||
| { | ||
|
|
@@ -39,24 +41,35 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can | |
| { | ||
| currentSchema.Set(envelope.Schema); | ||
| } | ||
|
|
||
| var argsBytes = eventSerializer.Serialize(envelope.Data); | ||
| argsPayload = new ReadOnlyMemory<byte>(argsBytes); | ||
| } | ||
| else | ||
| { | ||
| argsPayload = payload; | ||
| } | ||
|
|
||
| var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); | ||
| if (jobInfo == null) | ||
|
|
||
| await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken); | ||
| try | ||
| { | ||
| var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); | ||
| if (jobInfo == null) | ||
| { | ||
| logger.LogError("Job with name '{JobName}' not found in store", jobName); | ||
| throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); | ||
| } | ||
|
|
||
|
Comment on lines
+53
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Status updates on job failure/cancellation are likely being rolled back with the Unit of Work. Because the bridge now owns the outer UoW, the failure/cancellation path ends up like this:
So the I’d suggest either keeping failure/cancellation status updates in their own |
||
| // Dispatch to handler using the handler name from job entity | ||
| await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); | ||
|
|
||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (Exception) | ||
| { | ||
| logger.LogError("Job with name '{JobName}' not found in store", jobName); | ||
| throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); | ||
| await uow.RollbackAsync(cancellationToken); | ||
| throw; | ||
| } | ||
|
|
||
| // Dispatch to handler using the handler name from job entity | ||
| await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
|
|
@@ -74,7 +87,7 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can | |
| try | ||
| { | ||
| var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload); | ||
|
|
||
| // Validate it's actually an envelope by checking required properties | ||
| if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type)) | ||
| { | ||
|
|
@@ -89,5 +102,4 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can | |
| return null; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,7 +46,6 @@ await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed, | |
| return; | ||
| } | ||
|
|
||
| await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); | ||
| try | ||
| { | ||
| // Update status to Running | ||
|
|
@@ -59,21 +58,18 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running, | |
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed, | ||
| clock.UtcNow, cancellationToken: cancellationToken); | ||
|
|
||
| // Commit all changes together | ||
| await uow.CommitAsync(cancellationToken); | ||
|
|
||
| logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId); | ||
| } | ||
| catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
| { | ||
| logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); | ||
| await HandleJobCancellationAsync(uow, jobId, cancellationToken); | ||
| await HandleJobCancellationAsync(jobId, cancellationToken); | ||
| throw; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId); | ||
| await HandleJobFailureAsync(uow, jobId, ex, cancellationToken); | ||
| await HandleJobFailureAsync(jobId, ex, cancellationToken); | ||
| throw; | ||
| } | ||
| } | ||
|
|
@@ -133,25 +129,23 @@ private async Task UpdateStatusWithinUowAsync( | |
| /// <summary> | ||
| /// Handles job cancellation by updating status within the existing UoW. | ||
| /// </summary> | ||
| private async Task HandleJobCancellationAsync(IUnitOfWork uow, Guid jobId, CancellationToken cancellationToken) | ||
| private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken) | ||
| { | ||
| try | ||
| { | ||
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled, | ||
| clock.UtcNow, "Job was cancelled", cancellationToken); | ||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to update job status to Cancelled"); | ||
| await uow.RollbackAsync(cancellationToken); | ||
| } | ||
| } | ||
|
Comment on lines
+132
to
143
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The status update to To fix this, the status update should be performed in a new, separate transaction. You can achieve this by using the existing helper method private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
{
await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Cancelled, "Job was cancelled", cancellationToken);
} |
||
|
|
||
| /// <summary> | ||
| /// Handles job failure by updating status within the existing UoW. | ||
| /// </summary> | ||
| private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception exception, | ||
| private async Task HandleJobFailureAsync(Guid jobId, Exception exception, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| try | ||
|
|
@@ -161,12 +155,10 @@ private async Task HandleJobFailureAsync(IUnitOfWork uow, Guid jobId, Exception | |
|
|
||
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, | ||
| clock.UtcNow, errorMessage, cancellationToken); | ||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to update job status to Failed"); | ||
| await uow.RollbackAsync(cancellationToken); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call to
uow.SaveChangesAsync()is redundant. TheCompositeUnitOfWork.CommitAsync()method, which is called later, now includes a call toSaveChangesAsync()at the beginning of its execution. Removing this line will avoid the duplicate call without changing the behavior.