Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface IBackgroundJobInvoker
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>A task representing the asynchronous operation</returns>
Task InvokeAsync(
IServiceScopeFactory scopeFactory,
IServiceProvider scopeFactory,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: The parameter name scopeFactory is misleading now that the type is IServiceProvider.

In BackgroundJobInvoker<TArgs> this parameter is already named serviceProvider. Please align the name here with the implementation to reflect the actual type and avoid confusion for interface consumers during future changes.

Suggested implementation:

    /// <param name="cancellationToken">Cancellation token</param>
    /// <returns>A task representing the asynchronous operation</returns>
    Task InvokeAsync(
        IServiceProvider serviceProvider,
        IEventSerializer eventSerializer,
        ReadOnlyMemory<byte> payload,
        CancellationToken cancellationToken);
  1. Update all implementations of IBackgroundJobInvoker.InvokeAsync to rename the corresponding parameter from scopeFactory to serviceProvider.
  2. Update all call sites of InvokeAsync that are using named arguments (if any) to use serviceProvider instead of scopeFactory.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The parameter scopeFactory had its type changed from IServiceScopeFactory to IServiceProvider, but its name was not updated. This is misleading and doesn't reflect the new type. Renaming it to serviceProvider would improve clarity.

        IServiceProvider serviceProvider,

IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ namespace BBT.Aether.BackgroundJob;
internal sealed class BackgroundJobInvoker<TArgs> : IBackgroundJobInvoker
{
public async Task InvokeAsync(
IServiceScopeFactory scopeFactory,
IServiceProvider serviceProvider,
IEventSerializer eventSerializer,
ReadOnlyMemory<byte> payload,
CancellationToken cancellationToken)
{
// Resolve dependencies from DI
await using var scope = scopeFactory.CreateAsyncScope();
var handler = scope.ServiceProvider.GetRequiredService<IBackgroundJobHandler<TArgs>>();
// await using var scope = scopeFactory.CreateAsyncScope();
var handler = serviceProvider.GetRequiredService<IBackgroundJobHandler<TArgs>>();

var args = eventSerializer.Deserialize<TArgs>(payload.Span);
if (args == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using BBT.Aether.Domain.Repositories;
using BBT.Aether.Events;
using BBT.Aether.MultiSchema;
using BBT.Aether.Uow;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace BBT.Aether.BackgroundJob.Dapr;

/// <summary>
/// Dapr-specific implementation of IJobExecutionBridge.
/// Bridges Dapr's job execution callback to Aether's JobDispatcher.
/// Looks up job entity by job name (Dapr's job identifier) and delegates to dispatcher with the handler name.
/// Extracts schema context from CloudEventEnvelope and sets it before job execution.
/// Looks up job entity by job name (Dapr's job identifier) and delegates to dispatcher.
/// Extracts schema context from CloudEventEnvelope before jobStore access for multi-tenant support.
/// </summary>
public sealed class DaprJobExecutionBridge(
IServiceScopeFactory scopeFactory,
IJobDispatcher jobDispatcher,
IJobStore jobStore,
ICurrentSchema currentSchema,
IEventSerializer eventSerializer,
IUnitOfWorkManager unitOfWorkManager,
ILogger<DaprJobExecutionBridge> logger)
: IJobExecutionBridge
{
Expand All @@ -32,74 +29,33 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory<byte> payload, Can

try
{
var envelope = TryParseEnvelope(payload.ToArray());
ReadOnlyMemory<byte> argsPayload;
await using var scope = scopeFactory.CreateAsyncScope();

if (envelope != null)
{
if (!string.IsNullOrWhiteSpace(envelope.Schema))
{
currentSchema.Set(envelope.Schema);
}
// Parse envelope and set schema context before jobStore access (multi-tenant support)
var dataPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, payload, out var envelope);

var argsBytes = eventSerializer.Serialize(envelope.Data);
argsPayload = new ReadOnlyMemory<byte>(argsBytes);
}
else
if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Schema))
{
argsPayload = payload;
var currentSchema = scope.ServiceProvider.GetRequiredService<ICurrentSchema>();
currentSchema.Set(envelope.Schema);
}

await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken);
try
{
var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
if (jobInfo == null)
{
logger.LogError("Job with name '{JobName}' not found in store", jobName);
throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
}
var jobStore = scope.ServiceProvider.GetRequiredService<IJobStore>();
var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);

// Dispatch to handler using the handler name from job entity
await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);

await uow.CommitAsync(cancellationToken);
}
catch (Exception)
if (jobInfo == null)
{
await uow.RollbackAsync(cancellationToken);
throw;
logger.LogError("Job with name '{JobName}' not found in store", jobName);
return;
}
Comment on lines +46 to 50

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When a job is not found in the store, the method now logs an error and returns. This will likely result in a successful HTTP status code (e.g., 200 OK) being returned to Dapr. The previous implementation threw an exception, which correctly signaled a failure to Dapr and allowed its retry policies to engage. Silently returning on this critical error path could lead to jobs being lost without proper processing or retries. I recommend throwing an exception to signal failure to the calling infrastructure.

            if (jobInfo == null)
            {
                logger.LogError("Job with name '{JobName}' not found in store", jobName);
                throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
            }


// Dispatch to handler with extracted data payload
Comment on lines +43 to +52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question (bug_risk): Silently returning when a job is missing changes the contract vs. throwing; this may hide configuration or consistency issues.

Previously, a missing job (GetByJobNameAsync returns null) caused an InvalidOperationException after logging; now it only logs and returns, so callers of ExecuteAsync will see this as success unless they parse logs.

If upstream systems (e.g., Dapr, schedulers, monitoring) depend on failures to surface misconfiguration or race conditions (e.g., job deleted before execution), this behavior change may hide those issues. Please either keep throwing after logging or return an explicit failure signal if the integration supports it, rather than silently returning.

await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, dataPayload, cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to execute Dapr job '{JobName}' through execution bridge", jobName);
throw;
}
}

/// <summary>
/// Attempts to parse the payload as a CloudEventEnvelope.
/// Returns null if the payload is not in envelope format (old format).
/// </summary>
private CloudEventEnvelope? TryParseEnvelope(byte[] payload)
{
try
{
var envelope = eventSerializer.Deserialize<CloudEventEnvelope>(payload);

// Validate it's actually an envelope by checking required properties
if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
{
return envelope;
}

return null;
}
catch (Exception ex)
{
logger.LogDebug(ex, "Payload is not in CloudEventEnvelope format, treating as legacy format");
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using BBT.Aether.Domain.Entities;
using BBT.Aether.Domain.Repositories;
using BBT.Aether.Events;
using BBT.Aether.MultiSchema;
using BBT.Aether.Uow;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand All @@ -14,8 +15,6 @@ namespace BBT.Aether.BackgroundJob;
/// <inheritdoc />
public class JobDispatcher(
IServiceScopeFactory scopeFactory,
IJobStore jobStore,
IUnitOfWorkManager uowManager,
BackgroundJobOptions options,
IClock clock,
IEventSerializer eventSerializer,
Expand All @@ -34,51 +33,81 @@ public virtual async Task DispatchAsync(

if (string.IsNullOrWhiteSpace(handlerName))
throw new ArgumentNullException(nameof(handlerName));

await using var scope = scopeFactory.CreateAsyncScope();

if (await IsJobAlreadyProcessedAsync(jobId, handlerName, cancellationToken))
return;

if (!options.Invokers.ContainsKey(handlerName))
var argsPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, jobPayload, out var envelope);

if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Schema))
{
logger.LogWarning("No handler found for handler name '{HandlerName}' with job id '{JobId}'", handlerName, jobId);
await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed,
"No handler found for handler type", cancellationToken);
return;
var currentSchema = scope.ServiceProvider.GetRequiredService<ICurrentSchema>();
currentSchema.Set(envelope.Schema);
}

try

var uowManager = scope.ServiceProvider.GetRequiredService<IUnitOfWorkManager>();
var jobStore = scope.ServiceProvider.GetRequiredService<IJobStore>();

// First UoW: Check idempotency and update status to Running
await using (var uow = await uowManager.BeginRequiresNew(cancellationToken))
{
if (await IsJobAlreadyProcessedAsync(jobStore, jobId, handlerName, cancellationToken))
{
await uow.CommitAsync(cancellationToken);
return;
}

if (!options.Invokers.ContainsKey(handlerName))
{
logger.LogWarning("No handler found for handler name '{HandlerName}' with job id '{JobId}'", handlerName,
jobId);
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, clock.UtcNow,
"No handler found for handler type", cancellationToken);
await uow.CommitAsync(cancellationToken);
return;
}

// Update status to Running
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running,
cancellationToken: cancellationToken);
await uow.CommitAsync(cancellationToken);
}

await InvokeHandlerAsync(handlerName, jobPayload, cancellationToken);
// Second UoW: Execute handler and mark as completed
try
{
await using var handlerUow = await uowManager.BeginRequiresNew(cancellationToken);

await InvokeHandlerAsync(scope.ServiceProvider, handlerName, argsPayload, cancellationToken);

// Update status to Completed
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed,
clock.UtcNow, cancellationToken: cancellationToken);
logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName,
jobId);

logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId);
await handlerUow.CommitAsync(cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
await HandleJobCancellationAsync(jobId, cancellationToken);
throw;
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
"Job was cancelled", cancellationToken);
Comment on lines 90 to +94
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Cancellation and failure are no longer propagated to callers, which may break upstream retry/error flows.

Previously, both OperationCanceledException and general Exception were rethrown after updating job status; now both catch blocks swallow the exception, so DispatchAsync appears to succeed even on cancellation or failure. Callers (e.g., Dapr bridge, schedulers, orchestration) that rely on exceptions to drive retries/DLQ, compensating actions, or failure metrics will no longer see these signals.

If the intent is to make dispatch effectively fire-and-forget once status is updated, this needs to be an explicit contract change at this boundary. Otherwise, please rethrow from both catch blocks after MarkJobStatusAsync so callers still observe the failure/cancellation while status is persisted.

}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
await HandleJobFailureAsync(jobId, ex, cancellationToken);
throw;
var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000);
await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed,
errorMessage, cancellationToken);
}
Comment on lines 90 to 102

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The catch blocks now handle exceptions by updating the job status but no longer re-throw the exception. This prevents the caller (e.g., DaprJobExecutionBridge) from knowing that the job execution failed. As a result, a success status will likely be returned to the job scheduler (like Dapr), which can prevent its built-in retry mechanisms from working correctly. To ensure the calling infrastructure is aware of the failure, the exception should be re-thrown after the job status has been marked as failed or cancelled.

        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
        {
            logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
            await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
                "Job was cancelled", cancellationToken);
            throw;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
            var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000);
            await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed,
                errorMessage, cancellationToken);
            throw;
        }

}

/// <summary>
/// Checks if a job has already been processed (idempotency check).
/// Returns true if job is in Completed or Cancelled state.
/// </summary>
private async Task<bool> IsJobAlreadyProcessedAsync(Guid jobId, string handlerName, CancellationToken cancellationToken)
private async Task<bool> IsJobAlreadyProcessedAsync(IJobStore jobStore, Guid jobId, string handlerName,
CancellationToken cancellationToken)
{
var jobInfo = await jobStore.GetAsync(jobId, cancellationToken);
if (jobInfo == null)
Expand All @@ -95,7 +124,8 @@ private async Task<bool> IsJobAlreadyProcessedAsync(Guid jobId, string handlerNa

if (jobInfo.Status == BackgroundJobStatus.Cancelled)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled. Skipping reprocessing (idempotency).",
logger.LogWarning(
"Handler '{HandlerName}' for job id '{JobId}' was cancelled. Skipping reprocessing (idempotency).",
handlerName, jobId);
return true;
}
Expand All @@ -104,61 +134,26 @@ private async Task<bool> IsJobAlreadyProcessedAsync(Guid jobId, string handlerNa
}

/// <summary>
/// Updates job status within a UoW transaction.
/// Helper method to reduce code duplication.
/// Marks job status within a separate UoW to ensure status update is persisted
/// even if the main transaction failed.
/// </summary>
private async Task UpdateStatusWithinUowAsync(
private async Task MarkJobStatusAsync(
IUnitOfWorkManager uowManager,
IJobStore jobStore,
Guid jobId,
BackgroundJobStatus status,
string? error = null,
CancellationToken cancellationToken = default)
{
await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
try
{
await jobStore.UpdateStatusAsync(jobId, status, clock.UtcNow, error, cancellationToken);
await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to {Status}", status);
await uow.RollbackAsync(cancellationToken);
}
}

/// <summary>
/// Handles job cancellation by updating status within the existing UoW.
/// </summary>
private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
{
try
{
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled,
clock.UtcNow, "Job was cancelled", cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Cancelled");
}
}

/// <summary>
/// Handles job failure by updating status within the existing UoW.
/// </summary>
private async Task HandleJobFailureAsync(Guid jobId, Exception exception,
string? errorMessage,
CancellationToken cancellationToken)
{
try
{
var errorMessage = $"{exception.GetType().Name}: {exception.Message}";
errorMessage = errorMessage.Truncate(4000);

await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed,
clock.UtcNow, errorMessage, cancellationToken);
await using var uow = await uowManager.BeginRequiresNew(cancellationToken);
await jobStore.UpdateStatusAsync(jobId, status, clock.UtcNow, errorMessage, cancellationToken);
await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to update job status to Failed");
logger.LogError(ex, "Failed to mark job {JobId} as {Status}", jobId, status);
}
}

Expand All @@ -167,7 +162,8 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed,
/// Generic type TArgs was closed at registration time (startup), not at runtime.
/// This method is completely type-safe and fast.
/// </summary>
private async Task InvokeHandlerAsync(string handlerName, ReadOnlyMemory<byte> jobPayload,
private async Task InvokeHandlerAsync(IServiceProvider scopedProvider, string handlerName,
ReadOnlyMemory<byte> jobPayload,
CancellationToken cancellationToken)
{
// Get pre-created invoker from options (generic already closed at startup)
Expand All @@ -177,6 +173,6 @@ private async Task InvokeHandlerAsync(string handlerName, ReadOnlyMemory<byte> j
}

// Invoke handler - completely type-safe, no reflection at runtime
await invoker.InvokeAsync(scopeFactory, eventSerializer, jobPayload, cancellationToken);
await invoker.InvokeAsync(scopedProvider, eventSerializer, jobPayload, cancellationToken);
}
}
Loading