-
Notifications
You must be signed in to change notification settings - Fork 0
Refactor job dispatching and envelope handling logic #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ public interface IBackgroundJobInvoker | |
| /// <param name="cancellationToken">Cancellation token</param> | ||
| /// <returns>A task representing the asynchronous operation</returns> | ||
| Task InvokeAsync( | ||
| IServiceScopeFactory scopeFactory, | ||
| IServiceProvider scopeFactory, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| IEventSerializer eventSerializer, | ||
| ReadOnlyMemory<byte> payload, | ||
| CancellationToken cancellationToken); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,27 +1,24 @@ | ||
| using System; | ||
| using System.Text.Json; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using BBT.Aether.Domain.Repositories; | ||
| using BBT.Aether.Events; | ||
| using BBT.Aether.MultiSchema; | ||
| using BBT.Aether.Uow; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
||
| namespace BBT.Aether.BackgroundJob.Dapr; | ||
|
|
||
| /// <summary> | ||
| /// Dapr-specific implementation of IJobExecutionBridge. | ||
| /// Bridges Dapr's job execution callback to Aether's JobDispatcher. | ||
| /// Looks up job entity by job name (Dapr's job identifier) and delegates to dispatcher with the handler name. | ||
| /// Extracts schema context from CloudEventEnvelope and sets it before job execution. | ||
| /// Looks up job entity by job name (Dapr's job identifier) and delegates to dispatcher. | ||
| /// Extracts schema context from CloudEventEnvelope before jobStore access for multi-tenant support. | ||
| /// </summary> | ||
| public sealed class DaprJobExecutionBridge( | ||
| IServiceScopeFactory scopeFactory, | ||
| IJobDispatcher jobDispatcher, | ||
| IJobStore jobStore, | ||
| ICurrentSchema currentSchema, | ||
| IEventSerializer eventSerializer, | ||
| IUnitOfWorkManager unitOfWorkManager, | ||
| ILogger<DaprJobExecutionBridge> logger) | ||
| : IJobExecutionBridge | ||
| { | ||
|
|
@@ -32,74 +29,33 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can | |
|
|
||
| try | ||
| { | ||
| var envelope = TryParseEnvelope(payload.ToArray()); | ||
| ReadOnlyMemory<byte> argsPayload; | ||
| await using var scope = scopeFactory.CreateAsyncScope(); | ||
|
|
||
| if (envelope != null) | ||
| { | ||
| if (!string.IsNullOrWhiteSpace(envelope.Schema)) | ||
| { | ||
| currentSchema.Set(envelope.Schema); | ||
| } | ||
| // Parse envelope and set schema context before jobStore access (multi-tenant support) | ||
| var dataPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, payload, out var envelope); | ||
|
|
||
| var argsBytes = eventSerializer.Serialize(envelope.Data); | ||
| argsPayload = new ReadOnlyMemory<byte>(argsBytes); | ||
| } | ||
| else | ||
| if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Schema)) | ||
| { | ||
| argsPayload = payload; | ||
| var currentSchema = scope.ServiceProvider.GetRequiredService<ICurrentSchema>(); | ||
| currentSchema.Set(envelope.Schema); | ||
| } | ||
|
|
||
| await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken); | ||
| try | ||
| { | ||
| var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); | ||
| if (jobInfo == null) | ||
| { | ||
| logger.LogError("Job with name '{JobName}' not found in store", jobName); | ||
| throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); | ||
| } | ||
| var jobStore = scope.ServiceProvider.GetRequiredService<IJobStore>(); | ||
| var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); | ||
|
|
||
| // Dispatch to handler using the handler name from job entity | ||
| await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); | ||
|
|
||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (Exception) | ||
| if (jobInfo == null) | ||
| { | ||
| await uow.RollbackAsync(cancellationToken); | ||
| throw; | ||
| logger.LogError("Job with name '{JobName}' not found in store", jobName); | ||
| return; | ||
| } | ||
|
Comment on lines
+46
to
50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When a job is not found in the store, the method now logs an error and returns. This will likely result in a successful HTTP status code (e.g., 200 OK) being returned to Dapr. The previous implementation threw an exception, which correctly signaled a failure to Dapr and allowed its retry policies to engage. Silently returning on this critical error path could lead to jobs being lost without proper processing or retries. I recommend throwing an exception to signal failure to the calling infrastructure. if (jobInfo == null)
{
logger.LogError("Job with name '{JobName}' not found in store", jobName);
throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
} |
||
|
|
||
| // Dispatch to handler with extracted data payload | ||
|
Comment on lines
+43
to
+52
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question (bug_risk): Silently returning when a job is missing changes the contract vs. throwing; this may hide configuration or consistency issues. Previously, a missing job ( If upstream systems (e.g., Dapr, schedulers, monitoring) depend on failures to surface misconfiguration or race conditions (e.g., job deleted before execution), this behavior change may hide those issues. Please either keep throwing after logging or return an explicit failure signal if the integration supports it, rather than silently returning. |
||
| await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, dataPayload, cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to execute Dapr job '{JobName}' through execution bridge", jobName); | ||
| throw; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Attempts to parse the payload as a CloudEventEnvelope. | ||
| /// Returns null if the payload is not in envelope format (old format). | ||
| /// </summary> | ||
| private CloudEventEnvelope? TryParseEnvelope(byte[] payload) | ||
| { | ||
| try | ||
| { | ||
| var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload); | ||
|
|
||
| // Validate it's actually an envelope by checking required properties | ||
| if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type)) | ||
| { | ||
| return envelope; | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogDebug(ex, "Payload is not in CloudEventEnvelope format, treating as legacy format"); | ||
| return null; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ | |
| using BBT.Aether.Domain.Entities; | ||
| using BBT.Aether.Domain.Repositories; | ||
| using BBT.Aether.Events; | ||
| using BBT.Aether.MultiSchema; | ||
| using BBT.Aether.Uow; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Logging; | ||
|
|
@@ -14,8 +15,6 @@ namespace BBT.Aether.BackgroundJob; | |
| /// <inheritdoc /> | ||
| public class JobDispatcher( | ||
| IServiceScopeFactory scopeFactory, | ||
| IJobStore jobStore, | ||
| IUnitOfWorkManager uowManager, | ||
| BackgroundJobOptions options, | ||
| IClock clock, | ||
| IEventSerializer eventSerializer, | ||
|
|
@@ -34,51 +33,81 @@ public virtual async Task DispatchAsync( | |
|
|
||
| if (string.IsNullOrWhiteSpace(handlerName)) | ||
| throw new ArgumentNullException(nameof(handlerName)); | ||
|
|
||
| await using var scope = scopeFactory.CreateAsyncScope(); | ||
|
|
||
| if (await IsJobAlreadyProcessedAsync(jobId, handlerName, cancellationToken)) | ||
| return; | ||
|
|
||
| if (!options.Invokers.ContainsKey(handlerName)) | ||
| var argsPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, jobPayload, out var envelope); | ||
|
|
||
| if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Schema)) | ||
| { | ||
| logger.LogWarning("No handler found for handler name '{HandlerName}' with job id '{JobId}'", handlerName, jobId); | ||
| await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed, | ||
| "No handler found for handler type", cancellationToken); | ||
| return; | ||
| var currentSchema = scope.ServiceProvider.GetRequiredService<ICurrentSchema>(); | ||
| currentSchema.Set(envelope.Schema); | ||
| } | ||
|
|
||
| try | ||
|
|
||
| var uowManager = scope.ServiceProvider.GetRequiredService<IUnitOfWorkManager>(); | ||
| var jobStore = scope.ServiceProvider.GetRequiredService<IJobStore>(); | ||
|
|
||
| // First UoW: Check idempotency and update status to Running | ||
| await using (var uow = await uowManager.BeginRequiresNew(cancellationToken)) | ||
| { | ||
| if (await IsJobAlreadyProcessedAsync(jobStore, jobId, handlerName, cancellationToken)) | ||
| { | ||
| await uow.CommitAsync(cancellationToken); | ||
| return; | ||
| } | ||
|
|
||
| if (!options.Invokers.ContainsKey(handlerName)) | ||
| { | ||
| logger.LogWarning("No handler found for handler name '{HandlerName}' with job id '{JobId}'", handlerName, | ||
| jobId); | ||
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, clock.UtcNow, | ||
| "No handler found for handler type", cancellationToken); | ||
| await uow.CommitAsync(cancellationToken); | ||
| return; | ||
| } | ||
|
|
||
| // Update status to Running | ||
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running, | ||
| cancellationToken: cancellationToken); | ||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
|
|
||
| await InvokeHandlerAsync(handlerName, jobPayload, cancellationToken); | ||
| // Second UoW: Execute handler and mark as completed | ||
| try | ||
| { | ||
| await using var handlerUow = await uowManager.BeginRequiresNew(cancellationToken); | ||
|
|
||
| await InvokeHandlerAsync(scope.ServiceProvider, handlerName, argsPayload, cancellationToken); | ||
|
|
||
| // Update status to Completed | ||
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed, | ||
| clock.UtcNow, cancellationToken: cancellationToken); | ||
| logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, | ||
| jobId); | ||
|
|
||
| logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId); | ||
| await handlerUow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) | ||
| { | ||
| logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); | ||
| await HandleJobCancellationAsync(jobId, cancellationToken); | ||
| throw; | ||
| await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled, | ||
| "Job was cancelled", cancellationToken); | ||
|
Comment on lines
90
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (bug_risk): Cancellation and failure are no longer propagated to callers, which may break upstream retry/error flows. Previously, both If the intent is to make dispatch effectively fire-and-forget once status is updated, this needs to be an explicit contract change at this boundary. Otherwise, please rethrow from both catch blocks after |
||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId); | ||
| await HandleJobFailureAsync(jobId, ex, cancellationToken); | ||
| throw; | ||
| var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000); | ||
| await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed, | ||
| errorMessage, cancellationToken); | ||
| } | ||
|
Comment on lines
90
to
102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
"Job was cancelled", cancellationToken);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000);
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed,
errorMessage, cancellationToken);
throw;
} |
||
| } | ||
|
|
||
| /// <summary> | ||
| /// Checks if a job has already been processed (idempotency check). | ||
| /// Returns true if job is in Completed or Cancelled state. | ||
| /// </summary> | ||
| private async Task<bool> IsJobAlreadyProcessedAsync(Guid jobId, string handlerName, CancellationToken cancellationToken) | ||
| private async Task<bool> IsJobAlreadyProcessedAsync(IJobStore jobStore, Guid jobId, string handlerName, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| var jobInfo = await jobStore.GetAsync(jobId, cancellationToken); | ||
| if (jobInfo == null) | ||
|
|
@@ -95,7 +124,8 @@ private async Task<bool> IsJobAlreadyProcessedAsync(Guid jobId, string handlerNa | |
|
|
||
| if (jobInfo.Status == BackgroundJobStatus.Cancelled) | ||
| { | ||
| logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled. Skipping reprocessing (idempotency).", | ||
| logger.LogWarning( | ||
| "Handler '{HandlerName}' for job id '{JobId}' was cancelled. Skipping reprocessing (idempotency).", | ||
| handlerName, jobId); | ||
| return true; | ||
| } | ||
|
|
@@ -104,61 +134,26 @@ private async Task<bool> IsJobAlreadyProcessedAsync(Guid jobId, string handlerNa | |
| } | ||
|
|
||
| /// <summary> | ||
| /// Updates job status within a UoW transaction. | ||
| /// Helper method to reduce code duplication. | ||
| /// Marks job status within a separate UoW to ensure status update is persisted | ||
| /// even if the main transaction failed. | ||
| /// </summary> | ||
| private async Task UpdateStatusWithinUowAsync( | ||
| private async Task MarkJobStatusAsync( | ||
| IUnitOfWorkManager uowManager, | ||
| IJobStore jobStore, | ||
| Guid jobId, | ||
| BackgroundJobStatus status, | ||
| string? error = null, | ||
| CancellationToken cancellationToken = default) | ||
| { | ||
| await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); | ||
| try | ||
| { | ||
| await jobStore.UpdateStatusAsync(jobId, status, clock.UtcNow, error, cancellationToken); | ||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to update job status to {Status}", status); | ||
| await uow.RollbackAsync(cancellationToken); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Handles job cancellation by updating status within the existing UoW. | ||
| /// </summary> | ||
| private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken) | ||
| { | ||
| try | ||
| { | ||
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled, | ||
| clock.UtcNow, "Job was cancelled", cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to update job status to Cancelled"); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Handles job failure by updating status within the existing UoW. | ||
| /// </summary> | ||
| private async Task HandleJobFailureAsync(Guid jobId, Exception exception, | ||
| string? errorMessage, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| try | ||
| { | ||
| var errorMessage = $"{exception.GetType().Name}: {exception.Message}"; | ||
| errorMessage = errorMessage.Truncate(4000); | ||
|
|
||
| await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, | ||
| clock.UtcNow, errorMessage, cancellationToken); | ||
| await using var uow = await uowManager.BeginRequiresNew(cancellationToken); | ||
| await jobStore.UpdateStatusAsync(jobId, status, clock.UtcNow, errorMessage, cancellationToken); | ||
| await uow.CommitAsync(cancellationToken); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| logger.LogError(ex, "Failed to update job status to Failed"); | ||
| logger.LogError(ex, "Failed to mark job {JobId} as {Status}", jobId, status); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -167,7 +162,8 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, | |
| /// Generic type TArgs was closed at registration time (startup), not at runtime. | ||
| /// This method is completely type-safe and fast. | ||
| /// </summary> | ||
| private async Task InvokeHandlerAsync(string handlerName, ReadOnlyMemory<byte> jobPayload, | ||
| private async Task InvokeHandlerAsync(IServiceProvider scopedProvider, string handlerName, | ||
| ReadOnlyMemory<byte> jobPayload, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| // Get pre-created invoker from options (generic already closed at startup) | ||
|
|
@@ -177,6 +173,6 @@ private async Task InvokeHandlerAsync(string handlerName, ReadOnlyMemory<byte> j | |
| } | ||
|
|
||
| // Invoke handler - completely type-safe, no reflection at runtime | ||
| await invoker.InvokeAsync(scopeFactory, eventSerializer, jobPayload, cancellationToken); | ||
| await invoker.InvokeAsync(scopedProvider, eventSerializer, jobPayload, cancellationToken); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: The parameter name
scopeFactoryis misleading now that the type isIServiceProvider.In
BackgroundJobInvoker<TArgs>this parameter is already namedserviceProvider. Please align the name here with the implementation to reflect the actual type and avoid confusion for interface consumers during future changes.Suggested implementation:
IBackgroundJobInvoker.InvokeAsyncto rename the corresponding parameter fromscopeFactorytoserviceProvider.InvokeAsyncthat are using named arguments (if any) to useserviceProviderinstead ofscopeFactory.