diff --git a/framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs b/framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs index e94309f..8c34254 100644 --- a/framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs +++ b/framework/src/BBT.Aether.Core/BBT/Aether/BackgroundJob/IBackgroundJobInvoker.cs @@ -22,7 +22,7 @@ public interface IBackgroundJobInvoker /// Cancellation token /// A task representing the asynchronous operation Task InvokeAsync( - IServiceScopeFactory scopeFactory, + IServiceProvider scopeFactory, IEventSerializer eventSerializer, ReadOnlyMemory payload, CancellationToken cancellationToken); diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs index 4dea794..7aebe3f 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/BackgroundJobInvoker.cs @@ -15,14 +15,14 @@ namespace BBT.Aether.BackgroundJob; internal sealed class BackgroundJobInvoker : IBackgroundJobInvoker { public async Task InvokeAsync( - IServiceScopeFactory scopeFactory, + IServiceProvider serviceProvider, IEventSerializer eventSerializer, ReadOnlyMemory payload, CancellationToken cancellationToken) { // Resolve dependencies from DI - await using var scope = scopeFactory.CreateAsyncScope(); - var handler = scope.ServiceProvider.GetRequiredService>(); + // await using var scope = scopeFactory.CreateAsyncScope(); + var handler = serviceProvider.GetRequiredService>(); var args = eventSerializer.Deserialize(payload.Span); if (args == null) diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs index eb5b752..d43ae19 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/Dapr/DaprJobExecutionBridge.cs @@ -1,11 +1,10 @@ using System; -using System.Text.Json; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Domain.Repositories; using BBT.Aether.Events; using BBT.Aether.MultiSchema; -using BBT.Aether.Uow; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace BBT.Aether.BackgroundJob.Dapr; @@ -13,15 +12,13 @@ namespace BBT.Aether.BackgroundJob.Dapr; /// /// Dapr-specific implementation of IJobExecutionBridge. /// Bridges Dapr's job execution callback to Aether's JobDispatcher. -/// Looks up job entity by job name (Dapr's job identifier) and delegates to dispatcher with the handler name. -/// Extracts schema context from CloudEventEnvelope and sets it before job execution. +/// Looks up job entity by job name (Dapr's job identifier) and delegates to dispatcher. +/// Extracts schema context from CloudEventEnvelope before jobStore access for multi-tenant support. /// public sealed class DaprJobExecutionBridge( + IServiceScopeFactory scopeFactory, IJobDispatcher jobDispatcher, - IJobStore jobStore, - ICurrentSchema currentSchema, IEventSerializer eventSerializer, - IUnitOfWorkManager unitOfWorkManager, ILogger logger) : IJobExecutionBridge { @@ -32,44 +29,28 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can try { - var envelope = TryParseEnvelope(payload.ToArray()); - ReadOnlyMemory argsPayload; + await using var scope = scopeFactory.CreateAsyncScope(); - if (envelope != null) - { - if (!string.IsNullOrWhiteSpace(envelope.Schema)) - { - currentSchema.Set(envelope.Schema); - } + // Parse envelope and set schema context before jobStore access (multi-tenant support) + var dataPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, payload, out var envelope); - var argsBytes = eventSerializer.Serialize(envelope.Data); - argsPayload = new ReadOnlyMemory(argsBytes); - } - else + if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Schema)) { - argsPayload = payload; + var currentSchema = scope.ServiceProvider.GetRequiredService(); + currentSchema.Set(envelope.Schema); } - await using var uow = await unitOfWorkManager.BeginAsync(cancellationToken: cancellationToken); - try - { - var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); - if (jobInfo == null) - { - logger.LogError("Job with name '{JobName}' not found in store", jobName); - throw new InvalidOperationException($"Job with name '{jobName}' not found in store."); - } + var jobStore = scope.ServiceProvider.GetRequiredService(); + var jobInfo = await jobStore.GetByJobNameAsync(jobName, cancellationToken); - // Dispatch to handler using the handler name from job entity - await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, argsPayload, cancellationToken); - - await uow.CommitAsync(cancellationToken); - } - catch (Exception) + if (jobInfo == null) { - await uow.RollbackAsync(cancellationToken); - throw; + logger.LogError("Job with name '{JobName}' not found in store", jobName); + return; } + + // Dispatch to handler with extracted data payload + await jobDispatcher.DispatchAsync(jobInfo.Id, jobInfo.HandlerName, dataPayload, cancellationToken); } catch (Exception ex) { @@ -77,29 +58,4 @@ public async Task ExecuteAsync(string jobName, ReadOnlyMemory payload, Can throw; } } - - /// - /// Attempts to parse the payload as a CloudEventEnvelope. - /// Returns null if the payload is not in envelope format (old format). - /// - private CloudEventEnvelope? TryParseEnvelope(byte[] payload) - { - try - { - var envelope = eventSerializer.Deserialize(payload); - - // Validate it's actually an envelope by checking required properties - if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type)) - { - return envelope; - } - - return null; - } - catch (Exception ex) - { - logger.LogDebug(ex, "Payload is not in CloudEventEnvelope format, treating as legacy format"); - return null; - } - } } \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs index 28c38c9..e3fa0db 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/BackgroundJob/JobDispatcher.cs @@ -5,6 +5,7 @@ using BBT.Aether.Domain.Entities; using BBT.Aether.Domain.Repositories; using BBT.Aether.Events; +using BBT.Aether.MultiSchema; using BBT.Aether.Uow; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -14,8 +15,6 @@ namespace BBT.Aether.BackgroundJob; /// public class JobDispatcher( IServiceScopeFactory scopeFactory, - IJobStore jobStore, - IUnitOfWorkManager uowManager, BackgroundJobOptions options, IClock clock, IEventSerializer eventSerializer, @@ -34,43 +33,72 @@ public virtual async Task DispatchAsync( if (string.IsNullOrWhiteSpace(handlerName)) throw new ArgumentNullException(nameof(handlerName)); + + await using var scope = scopeFactory.CreateAsyncScope(); - if (await IsJobAlreadyProcessedAsync(jobId, handlerName, cancellationToken)) - return; - - if (!options.Invokers.ContainsKey(handlerName)) + var argsPayload = CloudEventEnvelopeHelper.ExtractDataPayload(eventSerializer, jobPayload, out var envelope); + + if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Schema)) { - logger.LogWarning("No handler found for handler name '{HandlerName}' with job id '{JobId}'", handlerName, jobId); - await UpdateStatusWithinUowAsync(jobId, BackgroundJobStatus.Failed, - "No handler found for handler type", cancellationToken); - return; + var currentSchema = scope.ServiceProvider.GetRequiredService(); + currentSchema.Set(envelope.Schema); } - - try + + var uowManager = scope.ServiceProvider.GetRequiredService(); + var jobStore = scope.ServiceProvider.GetRequiredService(); + + // First UoW: Check idempotency and update status to Running + await using (var uow = await uowManager.BeginRequiresNew(cancellationToken)) { + if (await IsJobAlreadyProcessedAsync(jobStore, jobId, handlerName, cancellationToken)) + { + await uow.CommitAsync(cancellationToken); + return; + } + + if (!options.Invokers.ContainsKey(handlerName)) + { + logger.LogWarning("No handler found for handler name '{HandlerName}' with job id '{JobId}'", handlerName, + jobId); + await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, clock.UtcNow, + "No handler found for handler type", cancellationToken); + await uow.CommitAsync(cancellationToken); + return; + } + // Update status to Running await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Running, cancellationToken: cancellationToken); + await uow.CommitAsync(cancellationToken); + } - await InvokeHandlerAsync(handlerName, jobPayload, cancellationToken); + // Second UoW: Execute handler and mark as completed + try + { + await using var handlerUow = await uowManager.BeginRequiresNew(cancellationToken); + + await InvokeHandlerAsync(scope.ServiceProvider, handlerName, argsPayload, cancellationToken); // Update status to Completed await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed, clock.UtcNow, cancellationToken: cancellationToken); + logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, + jobId); - logger.LogInformation("Successfully completed handler '{HandlerName}' for job id '{JobId}'", handlerName, jobId); + await handlerUow.CommitAsync(cancellationToken); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled", handlerName, jobId); - await HandleJobCancellationAsync(jobId, cancellationToken); - throw; + await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Cancelled, + "Job was cancelled", cancellationToken); } catch (Exception ex) { logger.LogError(ex, "Handler '{HandlerName}' for job id '{JobId}' failed", handlerName, jobId); - await HandleJobFailureAsync(jobId, ex, cancellationToken); - throw; + var errorMessage = $"{ex.GetType().Name}: {ex.Message}".Truncate(4000); + await MarkJobStatusAsync(uowManager, jobStore, jobId, BackgroundJobStatus.Failed, + errorMessage, cancellationToken); } } @@ -78,7 +106,8 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Completed, /// Checks if a job has already been processed (idempotency check). /// Returns true if job is in Completed or Cancelled state. /// - private async Task IsJobAlreadyProcessedAsync(Guid jobId, string handlerName, CancellationToken cancellationToken) + private async Task IsJobAlreadyProcessedAsync(IJobStore jobStore, Guid jobId, string handlerName, + CancellationToken cancellationToken) { var jobInfo = await jobStore.GetAsync(jobId, cancellationToken); if (jobInfo == null) @@ -95,7 +124,8 @@ private async Task IsJobAlreadyProcessedAsync(Guid jobId, string handlerNa if (jobInfo.Status == BackgroundJobStatus.Cancelled) { - logger.LogWarning("Handler '{HandlerName}' for job id '{JobId}' was cancelled. Skipping reprocessing (idempotency).", + logger.LogWarning( + "Handler '{HandlerName}' for job id '{JobId}' was cancelled. Skipping reprocessing (idempotency).", handlerName, jobId); return true; } @@ -104,61 +134,26 @@ private async Task IsJobAlreadyProcessedAsync(Guid jobId, string handlerNa } /// - /// Updates job status within a UoW transaction. - /// Helper method to reduce code duplication. + /// Marks job status within a separate UoW to ensure status update is persisted + /// even if the main transaction failed. /// - private async Task UpdateStatusWithinUowAsync( + private async Task MarkJobStatusAsync( + IUnitOfWorkManager uowManager, + IJobStore jobStore, Guid jobId, BackgroundJobStatus status, - string? error = null, - CancellationToken cancellationToken = default) - { - await using var uow = await uowManager.BeginAsync(cancellationToken: cancellationToken); - try - { - await jobStore.UpdateStatusAsync(jobId, status, clock.UtcNow, error, cancellationToken); - await uow.CommitAsync(cancellationToken); - } - catch (Exception ex) - { - logger.LogError(ex, "Failed to update job status to {Status}", status); - await uow.RollbackAsync(cancellationToken); - } - } - - /// - /// Handles job cancellation by updating status within the existing UoW. - /// - private async Task HandleJobCancellationAsync(Guid jobId, CancellationToken cancellationToken) - { - try - { - await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Cancelled, - clock.UtcNow, "Job was cancelled", cancellationToken); - } - catch (Exception ex) - { - logger.LogError(ex, "Failed to update job status to Cancelled"); - } - } - - /// - /// Handles job failure by updating status within the existing UoW. - /// - private async Task HandleJobFailureAsync(Guid jobId, Exception exception, + string? errorMessage, CancellationToken cancellationToken) { try { - var errorMessage = $"{exception.GetType().Name}: {exception.Message}"; - errorMessage = errorMessage.Truncate(4000); - - await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, - clock.UtcNow, errorMessage, cancellationToken); + await using var uow = await uowManager.BeginRequiresNew(cancellationToken); + await jobStore.UpdateStatusAsync(jobId, status, clock.UtcNow, errorMessage, cancellationToken); + await uow.CommitAsync(cancellationToken); } catch (Exception ex) { - logger.LogError(ex, "Failed to update job status to Failed"); + logger.LogError(ex, "Failed to mark job {JobId} as {Status}", jobId, status); } } @@ -167,7 +162,8 @@ await jobStore.UpdateStatusAsync(jobId, BackgroundJobStatus.Failed, /// Generic type TArgs was closed at registration time (startup), not at runtime. /// This method is completely type-safe and fast. /// - private async Task InvokeHandlerAsync(string handlerName, ReadOnlyMemory jobPayload, + private async Task InvokeHandlerAsync(IServiceProvider scopedProvider, string handlerName, + ReadOnlyMemory jobPayload, CancellationToken cancellationToken) { // Get pre-created invoker from options (generic already closed at startup) @@ -177,6 +173,6 @@ private async Task InvokeHandlerAsync(string handlerName, ReadOnlyMemory j } // Invoke handler - completely type-safe, no reflection at runtime - await invoker.InvokeAsync(scopeFactory, eventSerializer, jobPayload, cancellationToken); + await invoker.InvokeAsync(scopedProvider, eventSerializer, jobPayload, cancellationToken); } } \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs new file mode 100644 index 0000000..c1e2de7 --- /dev/null +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/CloudEventEnvelopeHelper.cs @@ -0,0 +1,62 @@ +using System; + +namespace BBT.Aether.Events; + +/// +/// Helper class for parsing and extracting data from CloudEventEnvelope. +/// Provides common functionality used by job dispatchers and event processors. +/// +internal static class CloudEventEnvelopeHelper +{ + /// + /// Attempts to parse the payload as a CloudEventEnvelope. + /// Returns null if the payload is not in envelope format (legacy format). + /// + /// The event serializer to use for deserialization. + /// The raw payload bytes to parse. + /// The parsed envelope or null if parsing fails or payload is not in envelope format. + public static CloudEventEnvelope? TryParseEnvelope(IEventSerializer eventSerializer, byte[] payload) + { + try + { + var envelope = eventSerializer.Deserialize(payload); + + // Validate it's actually an envelope by checking required properties + if (envelope != null && !string.IsNullOrWhiteSpace(envelope.Type)) + { + return envelope; + } + + return null; + } + catch + { + return null; + } + } + + /// + /// Extracts the data payload from a CloudEventEnvelope, serializing it back to bytes. + /// If payload is not in envelope format, returns the original payload. + /// + /// The event serializer to use. + /// The raw payload bytes. + /// Output: the parsed envelope if successful, null otherwise. + /// The data payload bytes (either from envelope.Data or original payload). + public static ReadOnlyMemory ExtractDataPayload( + IEventSerializer eventSerializer, + ReadOnlyMemory payload, + out CloudEventEnvelope? envelope) + { + envelope = TryParseEnvelope(eventSerializer, payload.ToArray()); + + if (envelope != null) + { + var argsBytes = eventSerializer.Serialize(envelope.Data); + return new ReadOnlyMemory(argsBytes); + } + + return payload; + } +} + diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs index 7a9d648..22d917d 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/InboxProcessor.cs @@ -125,7 +125,7 @@ private async Task ProcessSingleEventAsync( // Begin a new UoW for handler execution + marking as processed await using var handlerUow = await unitOfWorkManager.BeginRequiresNew(cancellationToken); - + // Invoke the handler await invoker.InvokeAsync(scopedServiceProvider, inboxMessage.EventData, cancellationToken);