diff --git a/framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs b/framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs
index e94309f..8c34254 100644
--- a/framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs
+++ b/framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs
@@ -22,7 +22,7 @@ public interface IBackgroundJobInvoker
/// Cancellation token
/// A task representing the asynchronous operation
Task InvokeAsync(
- IServiceScopeFactory scopeFactory,
+ IServiceProvider scopeFactory,
IEventSerializer eventSerializer,
ReadOnlyMemory payload,
CancellationToken cancellationToken);
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs
index 4dea794..7aebe3f 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs
@@ -15,14 +15,14 @@ namespace BBT.Aether.BackgroundJob;
internal sealed class BackgroundJobInvoker : IBackgroundJobInvoker
{
public async Task InvokeAsync(
- IServiceScopeFactory scopeFactory,
+ IServiceProvider serviceProvider,
IEventSerializer eventSerializer,
ReadOnlyMemory payload,
CancellationToken cancellationToken)
{
// Resolve dependencies from DI
- await using var scope = scopeFactory.CreateAsyncScope();
- var handler = scope.ServiceProvider.GetRequiredService>();
+ // await using var scope = scopeFactory.CreateAsyncScope();
+ var handler = serviceProvider.GetRequiredService>();
var args = eventSerializer.Deserialize(payload.Span);
if (args == null)
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
index eb5b752..d43ae19 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs
@@ -1,11 +1,10 @@
using System;
-using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using BBT.Aether.Domain.Repositories;
using BBT.Aether.Events;
using BBT.Aether.MultiSchema;
-using BBT.Aether.Uow;
+using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
namespace BBT.Aether.BackgroundJob.Dapr;
@@ -13,15 +12,13 @@ namespace BBT.Aether.BackgroundJob.Dapr;
///
/// Dapr-specific implementation of IJobExecutionBridge.
/// Bridges Dapr's job execution callback to Aether's JobDispatcher.
-/// Looks up job entity by job name (Dapr's job identifier) and delegates to dispatcher with the handler name.
-/// Extracts schema context from CloudEventEnvelope and sets it before job execution.
+/// Looks up job entity by job name (Dapr's job identifier) and delegates to dispatcher.
+/// Extracts schema context from CloudEventEnvelope before jobStore access for multi-tenant support.
///
public sealed class DaprJobExecutionBridge(
+ IServiceScopeFactory scopeFactory,
IJobDispatcher jobDispatcher,
- IJobStore jobStore,
- ICurrentSchema currentSchema,
IEventSerializer eventSerializer,
- IUnitOfWorkManager unitOfWorkManager,
ILogger logger)
: IJobExecutionBridge
{
@@ -32,44 +29,28 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can
try
{
- var envelope = TryParseEnvelope(payload.ToArray());
- ReadOnlyMemory argsPayload;
+ await using var scope = scopeFactory.CreateAsyncScope();
- if (envelope != null)
- {
- if (!string.IsNullOrWhiteSpace(envelope.Schema))
- {
- currentSchema.Set(envelope.Schema);
- }
+ // Parse envelope and set schema context before jobStore access (multi-tenant support)
+ var dataPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, payload, out var envelope);
- var argsBytes = eventSerializer.Serialize(envelope.Data);
- argsPayload = new ReadOnlyMemory(argsBytes);
- }
- else
+ if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Schema))
{
- argsPayload = payload;
+ var currentSchema = scope.ServiceProvider.GetRequiredService();
+ currentSchema.Set(envelope.Schema);
}
- await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken);
- try
- {
- var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
- if (jobInfo == null)
- {
- logger.LogError("Job with name '{JobName}' not found in store", jobName);
- throw new InvalidOperationException($"Job with name '{jobName}' not found in store.");
- }
+ var jobStore = scope.ServiceProvider.GetRequiredService();
+ var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken);
- // Dispatch to handler using the handler name from job entity
- await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken);
-
- await uow.CommitAsync(cancellationToken);
- }
- catch (Exception)
+ if (jobInfo == null)
{
- await uow.RollbackAsync(cancellationToken);
- throw;
+ logger.LogError("Job with name '{JobName}' not found in store", jobName);
+ return;
}
+
+ // Dispatch to handler with extracted data payload
+ await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, dataPayload, cancellationToken);
}
catch (Exception ex)
{
@@ -77,29 +58,4 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can
throw;
}
}
-
- ///
- /// Attempts to parse the payload as a CloudEventEnvelope.
- /// Returns null if the payload is not in envelope format (old format).
- ///
- private CloudEventEnvelope? TryParseEnvelope(byte[] payload)
- {
- try
- {
- var envelope = eventSerializer.Deserialize(payload);
-
- // Validate it's actually an envelope by checking required properties
- if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
- {
- return envelope;
- }
-
- return null;
- }
- catch (Exception ex)
- {
- logger.LogDebug(ex, "Payload is not in CloudEventEnvelope format, treating as legacy format");
- return null;
- }
- }
}
\ No newline at end of file
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
index 28c38c9..e3fa0db 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs
@@ -5,6 +5,7 @@
using BBT.Aether.Domain.Entities;
using BBT.Aether.Domain.Repositories;
using BBT.Aether.Events;
+using BBT.Aether.MultiSchema;
using BBT.Aether.Uow;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
@@ -14,8 +15,6 @@ namespace BBT.Aether.BackgroundJob;
///
public class JobDispatcher(
IServiceScopeFactory scopeFactory,
- IJobStore jobStore,
- IUnitOfWorkManager uowManager,
BackgroundJobOptions options,
IClock clock,
IEventSerializer eventSerializer,
@@ -34,43 +33,72 @@ public virtual async Task DispatchAsync(
if (string.IsNullOrWhiteSpace(handlerName))
throw new ArgumentNullException(nameof(handlerName));
+
+ await using var scope = scopeFactory.CreateAsyncScope();
- if (await IsJobAlreadyProcessedAsync(jobId, handlerName, cancellationToken))
- return;
-
- if (!options.Invokers.ContainsKey(handlerName))
+ var argsPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, jobPayload, out var envelope);
+
+ if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Schema))
{
- logger.LogWarning("No handler found for handler name '{HandlerName}' with job id '{JobId}'", handlerName, jobId);
- await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed,
- "No handler found for handler type", cancellationToken);
- return;
+ var currentSchema = scope.ServiceProvider.GetRequiredService();
+ currentSchema.Set(envelope.Schema);
}
-
- try
+
+ var uowManager = scope.ServiceProvider.GetRequiredService();
+ var jobStore = scope.ServiceProvider.GetRequiredService();
+
+ // First UoW: Check idempotency and update status to Running
+ await using (var uow = await uowManager.BeginRequiresNew(cancellationToken))
{
+ if (await IsJobAlreadyProcessedAsync(jobStore, jobId, handlerName, cancellationToken))
+ {
+ await uow.CommitAsync(cancellationToken);
+ return;
+ }
+
+ if (!options.Invokers.ContainsKey(handlerName))
+ {
+ logger.LogWarning("No handler found for handler name '{HandlerName}' with job id '{JobId}'", handlerName,
+ jobId);
+ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, clock.UtcNow,
+ "No handler found for handler type", cancellationToken);
+ await uow.CommitAsync(cancellationToken);
+ return;
+ }
+
// Update status to Running
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running,
cancellationToken: cancellationToken);
+ await uow.CommitAsync(cancellationToken);
+ }
- await InvokeHandlerAsync(handlerName, jobPayload, cancellationToken);
+ // Second UoW: Execute handler and mark as completed
+ try
+ {
+ await using var handlerUow = await uowManager.BeginRequiresNew(cancellationToken);
+
+ await InvokeHandlerAsync(scope.ServiceProvider, handlerName, argsPayload, cancellationToken);
// Update status to Completed
await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed,
clock.UtcNow, cancellationToken: cancellationToken);
+ logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName,
+ jobId);
- logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId);
+ await handlerUow.CommitAsync(cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId);
- await HandleJobCancellationAsync(jobId, cancellationToken);
- throw;
+ await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled,
+ "Job was cancelled", cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId);
- await HandleJobFailureAsync(jobId, ex, cancellationToken);
- throw;
+ var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000);
+ await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed,
+ errorMessage, cancellationToken);
}
}
@@ -78,7 +106,8 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed,
/// Checks if a job has already been processed (idempotency check).
/// Returns true if job is in Completed or Cancelled state.
///
- private async Task IsJobAlreadyProcessedAsync(Guid jobId, string handlerName, CancellationToken cancellationToken)
+ private async Task IsJobAlreadyProcessedAsync(IJobStore jobStore, Guid jobId, string handlerName,
+ CancellationToken cancellationToken)
{
var jobInfo = await jobStore.GetAsync(jobId, cancellationToken);
if (jobInfo == null)
@@ -95,7 +124,8 @@ private async Task IsJobAlreadyProcessedAsync(Guid jobId, string handlerNa
if (jobInfo.Status == BackgroundJobStatus.Cancelled)
{
- logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled. Skipping reprocessing (idempotency).",
+ logger.LogWarning(
+ "Handler '{HandlerName}' for job id '{JobId}' was cancelled. Skipping reprocessing (idempotency).",
handlerName, jobId);
return true;
}
@@ -104,61 +134,26 @@ private async Task IsJobAlreadyProcessedAsync(Guid jobId, string handlerNa
}
///
- /// Updates job status within a UoW transaction.
- /// Helper method to reduce code duplication.
+ /// Marks job status within a separate UoW to ensure status update is persisted
+ /// even if the main transaction failed.
///
- private async Task UpdateStatusWithinUowAsync(
+ private async Task MarkJobStatusAsync(
+ IUnitOfWorkManager uowManager,
+ IJobStore jobStore,
Guid jobId,
BackgroundJobStatus status,
- string? error = null,
- CancellationToken cancellationToken = default)
- {
- await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken);
- try
- {
- await jobStore.UpdateStatusAsync(jobId, status, clock.UtcNow, error, cancellationToken);
- await uow.CommitAsync(cancellationToken);
- }
- catch (Exception ex)
- {
- logger.LogError(ex, "Failed to update job status to {Status}", status);
- await uow.RollbackAsync(cancellationToken);
- }
- }
-
- ///
- /// Handles job cancellation by updating status within the existing UoW.
- ///
- private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken)
- {
- try
- {
- await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled,
- clock.UtcNow, "Job was cancelled", cancellationToken);
- }
- catch (Exception ex)
- {
- logger.LogError(ex, "Failed to update job status to Cancelled");
- }
- }
-
- ///
- /// Handles job failure by updating status within the existing UoW.
- ///
- private async Task HandleJobFailureAsync(Guid jobId, Exception exception,
+ string? errorMessage,
CancellationToken cancellationToken)
{
try
{
- var errorMessage = $"{exception.GetType().Name}: {exception.Message}";
- errorMessage = errorMessage.Truncate(4000);
-
- await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed,
- clock.UtcNow, errorMessage, cancellationToken);
+ await using var uow = await uowManager.BeginRequiresNew(cancellationToken);
+ await jobStore.UpdateStatusAsync(jobId, status, clock.UtcNow, errorMessage, cancellationToken);
+ await uow.CommitAsync(cancellationToken);
}
catch (Exception ex)
{
- logger.LogError(ex, "Failed to update job status to Failed");
+ logger.LogError(ex, "Failed to mark job {JobId} as {Status}", jobId, status);
}
}
@@ -167,7 +162,8 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed,
/// Generic type TArgs was closed at registration time (startup), not at runtime.
/// This method is completely type-safe and fast.
///
- private async Task InvokeHandlerAsync(string handlerName, ReadOnlyMemory jobPayload,
+ private async Task InvokeHandlerAsync(IServiceProvider scopedProvider, string handlerName,
+ ReadOnlyMemory jobPayload,
CancellationToken cancellationToken)
{
// Get pre-created invoker from options (generic already closed at startup)
@@ -177,6 +173,6 @@ private async Task InvokeHandlerAsync(string handlerName, ReadOnlyMemory j
}
// Invoke handler - completely type-safe, no reflection at runtime
- await invoker.InvokeAsync(scopeFactory, eventSerializer, jobPayload, cancellationToken);
+ await invoker.InvokeAsync(scopedProvider, eventSerializer, jobPayload, cancellationToken);
}
}
\ No newline at end of file
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs
new file mode 100644
index 0000000..c1e2de7
--- /dev/null
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs
@@ -0,0 +1,62 @@
+using System;
+
+namespace BBT.Aether.Events;
+
+///
+/// Helper class for parsing and extracting data from CloudEventEnvelope.
+/// Provides common functionality used by job dispatchers and event processors.
+///
+internal static class CloudEventEnvelopeHelper
+{
+ ///
+ /// Attempts to parse the payload as a CloudEventEnvelope.
+ /// Returns null if the payload is not in envelope format (legacy format).
+ ///
+ /// The event serializer to use for deserialization.
+ /// The raw payload bytes to parse.
+ /// The parsed envelope or null if parsing fails or payload is not in envelope format.
+ public static CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, byte[] payload)
+ {
+ try
+ {
+ var envelope = eventSerializer.Deserialize(payload);
+
+ // Validate it's actually an envelope by checking required properties
+ if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type))
+ {
+ return envelope;
+ }
+
+ return null;
+ }
+ catch
+ {
+ return null;
+ }
+ }
+
+ ///
+ /// Extracts the data payload from a CloudEventEnvelope, serializing it back to bytes.
+ /// If payload is not in envelope format, returns the original payload.
+ ///
+ /// The event serializer to use.
+ /// The raw payload bytes.
+ /// Output: the parsed envelope if successful, null otherwise.
+ /// The data payload bytes (either from envelope.Data or original payload).
+ public static ReadOnlyMemory ExtractDataPayload(
+ IEventSerializer eventSerializer,
+ ReadOnlyMemory payload,
+ out CloudEventEnvelope? envelope)
+ {
+ envelope = TryParseEnvelope(eventSerializer, payload.ToArray());
+
+ if (envelope != null)
+ {
+ var argsBytes = eventSerializer.Serialize(envelope.Data);
+ return new ReadOnlyMemory(argsBytes);
+ }
+
+ return payload;
+ }
+}
+
diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs
index 7a9d648..22d917d 100644
--- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs
+++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs
@@ -125,7 +125,7 @@ private async Task ProcessSingleEventAsync(
// Begin a new UoW for handler execution + marking as processed
await using var handlerUow = await unitOfWorkManager.BeginRequiresNew(cancellationToken);
-
+
// Invoke the handler
await invoker.InvokeAsync(scopedServiceProvider, inboxMessage.EventData, cancellationToken);