diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index ea7d17973..26945446a 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -111,18 +111,39 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExportHistory.Tests", "test EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistributedTracingSample", "samples\DistributedTracingSample\DistributedTracingSample.csproj", "{4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{21303FBF-2A2B-17C2-D2DF-3E924022E940}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Plugins", "src\Extensions\Plugins\Plugins.csproj", "{464EF328-1A43-417C-BC9D-C1F808D2C3B8}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions.Plugins.Tests", "test\Extensions.Plugins.Tests\Extensions.Plugins.Tests.csproj", "{09D76001-410E-4308-9156-01A9E7F5400B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PluginsSample", "samples\PluginsSample\PluginsSample.csproj", "{BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|x64.ActiveCfg = Debug|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|x64.Build.0 = Debug|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|x86.ActiveCfg = Debug|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|x86.Build.0 = Debug|Any CPU {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|Any CPU.ActiveCfg = Release|Any CPU {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|Any CPU.Build.0 = Release|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|x64.ActiveCfg = Release|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|x64.Build.0 = Release|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|x86.ActiveCfg = Release|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|x86.Build.0 = Release|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|x64.ActiveCfg = Debug|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|x64.Build.0 = Debug|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|x86.ActiveCfg = Debug|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|x86.Build.0 = Debug|Any CPU @@ -660,6 +681,42 @@ Global {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8}.Release|x64.Build.0 = Release|Any CPU {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8}.Release|x86.ActiveCfg = Release|Any CPU {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8}.Release|x86.Build.0 = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|x64.ActiveCfg = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|x64.Build.0 = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|x86.ActiveCfg = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|x86.Build.0 = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|Any CPU.Build.0 = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|x64.ActiveCfg = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|x64.Build.0 = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|x86.ActiveCfg = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|x86.Build.0 = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|x64.ActiveCfg = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|x64.Build.0 = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|x86.ActiveCfg = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|x86.Build.0 = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|Any CPU.Build.0 = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|x64.ActiveCfg = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|x64.Build.0 = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|x86.ActiveCfg = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|x86.Build.0 = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|x64.ActiveCfg = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|x64.Build.0 = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|x86.ActiveCfg = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|x86.Build.0 = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|Any CPU.Build.0 = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x64.ActiveCfg = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x64.Build.0 = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x86.ActiveCfg = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -715,6 +772,10 @@ Global {354CE69B-78DB-9B29-C67E-0DBB862C7A65} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B} {05C9EBA6-7221-D458-47D6-DA457C2F893B} = {E5637F81-2FB9-4CD7-900D-455363B142A7} {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} + {21303FBF-2A2B-17C2-D2DF-3E924022E940} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B} + {464EF328-1A43-417C-BC9D-C1F808D2C3B8} = {21303FBF-2A2B-17C2-D2DF-3E924022E940} + {09D76001-410E-4308-9156-01A9E7F5400B} = {E5637F81-2FB9-4CD7-900D-455363B142A7} + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/samples/PluginsSample/PluginsSample.csproj b/samples/PluginsSample/PluginsSample.csproj new file mode 100644 index 000000000..d9085f186 --- /dev/null +++ b/samples/PluginsSample/PluginsSample.csproj @@ -0,0 +1,22 @@ + + + + Exe + net8.0;net10.0 + enable + + + + + + + + + + + + + + + + diff --git a/samples/PluginsSample/Program.cs b/samples/PluginsSample/Program.cs new file mode 100644 index 000000000..8d1956491 --- /dev/null +++ b/samples/PluginsSample/Program.cs @@ -0,0 +1,220 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// This sample demonstrates the Durable Task Plugin system, which is inspired by Temporal's +// plugin/interceptor pattern. It shows how to use the 5 built-in plugins: +// 1. LoggingPlugin - Structured logging for orchestration and activity lifecycle events +// 2. MetricsPlugin - Execution counts, durations, and success/failure tracking +// 3. AuthorizationPlugin - Input-based authorization checks before execution +// 4. ValidationPlugin - Input validation before task execution +// 5. RateLimitingPlugin - Token-bucket rate limiting for activity dispatches + +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.DurableTask.Testing; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +// Create a shared metrics store so we can read metrics after the orchestration completes. +MetricsStore metricsStore = new(); + +// Use the in-process test host (no external sidecar needed for demonstration). +// In production, replace with .UseGrpc() or .UseDurableTaskScheduler(). +await using DurableTaskTestHost testHost = await DurableTaskTestHost.StartAsync( + registry => + { + // Register orchestration and activities. + registry.AddOrchestratorFunc("GreetingOrchestration", async context => + { + List greetings = new() + { + await context.CallActivityAsync("SayHello", "Tokyo"), + await context.CallActivityAsync("SayHello", "London"), + await context.CallActivityAsync("SayHello", "Seattle"), + }; + + return greetings; + }); + + registry.AddActivityFunc("SayHello", (context, city) => + { + return $"Hello, {city}!"; + }); + }); + +DurableTaskClient client = testHost.Client; + +// Schedule a new orchestration instance. +string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("GreetingOrchestration"); +Console.WriteLine($"Started orchestration: {instanceId}"); + +// Wait for the orchestration to complete. +OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true); +Console.WriteLine($"Orchestration completed with status: {result.RuntimeStatus}"); +Console.WriteLine($"Output: {result.SerializedOutput}"); + +// --- Demonstrate Plugin APIs --- +Console.WriteLine("\n=== Plugin Demonstrations ==="); + +// Demo 1: LoggingPlugin +Console.WriteLine("\n--- 1. LoggingPlugin ---"); +Console.WriteLine("The LoggingPlugin emits structured ILogger events for lifecycle events."); +Console.WriteLine("It would be registered on a worker builder like this:"); +Console.WriteLine(" builder.Services.AddDurableTaskWorker().UseLoggingPlugin().UseGrpc();"); + +ILoggerFactory loggerFactory = LoggerFactory.Create(b => b.AddConsole().SetMinimumLevel(LogLevel.Information)); +LoggingPlugin loggingPlugin = new(loggerFactory); +Console.WriteLine($"Plugin name: {loggingPlugin.Name}"); +Console.WriteLine($"Orchestration interceptors: {loggingPlugin.OrchestrationInterceptors.Count}"); +Console.WriteLine($"Activity interceptors: {loggingPlugin.ActivityInterceptors.Count}"); + +// Demo 2: MetricsPlugin +Console.WriteLine("\n--- 2. MetricsPlugin ---"); +MetricsPlugin metricsPlugin = new(metricsStore); + +// Simulate lifecycle events +var orchCtx = new Microsoft.DurableTask.Plugins.OrchestrationInterceptorContext("Demo", "test-1", false, null); +await metricsPlugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(orchCtx); +await metricsPlugin.OrchestrationInterceptors[0].OnOrchestrationCompletedAsync(orchCtx, "done"); + +var actCtx = new Microsoft.DurableTask.Plugins.ActivityInterceptorContext("DemoActivity", "test-1", "input"); +await metricsPlugin.ActivityInterceptors[0].OnActivityStartingAsync(actCtx); +await metricsPlugin.ActivityInterceptors[0].OnActivityCompletedAsync(actCtx, "result"); +await metricsPlugin.ActivityInterceptors[0].OnActivityStartingAsync(actCtx); +await metricsPlugin.ActivityInterceptors[0].OnActivityFailedAsync(actCtx, new Exception("test failure")); + +Console.WriteLine("Orchestration metrics:"); +foreach (var (name, metrics) in metricsStore.GetAllOrchestrationMetrics()) +{ + Console.WriteLine($" '{name}': Started={metrics.Started}, Completed={metrics.Completed}, Failed={metrics.Failed}"); +} + +Console.WriteLine("Activity metrics:"); +foreach (var (name, metrics) in metricsStore.GetAllActivityMetrics()) +{ + Console.WriteLine($" '{name}': Started={metrics.Started}, Completed={metrics.Completed}, Failed={metrics.Failed}"); +} + +// Demo 3: AuthorizationPlugin +Console.WriteLine("\n--- 3. AuthorizationPlugin ---"); +AuthorizationPlugin authPlugin = new(new AllowAllAuthorizationHandler()); +var authOrcCtx = new Microsoft.DurableTask.Plugins.OrchestrationInterceptorContext("SecureOrch", "secure-1", false, null); +await authPlugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(authOrcCtx); + +// Demo 4: ValidationPlugin +Console.WriteLine("\n--- 4. ValidationPlugin ---"); +ValidationPlugin validationPlugin = new(new CityNameValidator()); +var validCtx = new Microsoft.DurableTask.Plugins.ActivityInterceptorContext("SayHello", "val-1", "Tokyo"); +await validationPlugin.ActivityInterceptors[0].OnActivityStartingAsync(validCtx); +Console.WriteLine(" Validation passed for input 'Tokyo'"); + +try +{ + var invalidCtx = new Microsoft.DurableTask.Plugins.ActivityInterceptorContext("SayHello", "val-1", ""); + await validationPlugin.ActivityInterceptors[0].OnActivityStartingAsync(invalidCtx); +} +catch (ArgumentException ex) +{ + Console.WriteLine($" Validation correctly rejected empty input: {ex.Message}"); +} + +// Demo 5: RateLimitingPlugin +Console.WriteLine("\n--- 5. RateLimitingPlugin ---"); +RateLimitingPlugin rateLimitPlugin = new(new RateLimitingOptions +{ + MaxTokens = 3, + RefillRate = 0, + RefillInterval = TimeSpan.FromHours(1), +}); + +var rlCtx = new Microsoft.DurableTask.Plugins.ActivityInterceptorContext("LimitedAction", "rl-1", "data"); +int allowed = 0; +int denied = 0; +for (int i = 0; i < 5; i++) +{ + try + { + await rateLimitPlugin.ActivityInterceptors[0].OnActivityStartingAsync(rlCtx); + allowed++; + } + catch (RateLimitExceededException) + { + denied++; + } +} + +Console.WriteLine($" Rate limit (max 3): Allowed={allowed}, Denied={denied}"); + +// Demo: SimplePlugin builder with built-in activities +Console.WriteLine("\n--- SimplePlugin Builder (with built-in activities) ---"); +Console.WriteLine("Plugins can provide reusable activities that auto-register when added to a worker."); +Console.WriteLine("Example: A 'StringUtils' plugin that ships pre-built string activities."); + +// This is how a plugin author would package reusable activities. +// Users just call .UsePlugin(stringUtilsPlugin) and the activities become available. +var stringUtilsPlugin = Microsoft.DurableTask.Plugins.SimplePlugin.NewBuilder("MyOrg.StringUtils") + .AddTasks(registry => + { + registry.AddActivityFunc("StringUtils.ToUpper", (ctx, input) => input.ToUpperInvariant()); + registry.AddActivityFunc("StringUtils.Reverse", (ctx, input) => + new string(input.Reverse().ToArray())); + registry.AddActivityFunc("StringUtils.WordCount", (ctx, input) => + input.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length); + }) + .AddOrchestrationInterceptor(loggingPlugin.OrchestrationInterceptors[0]) + .Build(); + +// Verify the plugin registers its tasks into a registry +DurableTaskRegistry testRegistry = new(); +stringUtilsPlugin.RegisterTasks(testRegistry); +Console.WriteLine($" Plugin '{stringUtilsPlugin.Name}' registered activities into the worker."); +Console.WriteLine($" Orchestration interceptors: {stringUtilsPlugin.OrchestrationInterceptors.Count}"); +Console.WriteLine(""); +Console.WriteLine(" Usage in production:"); +Console.WriteLine(" builder.Services.AddDurableTaskWorker()"); +Console.WriteLine(" .UsePlugin(stringUtilsPlugin) // auto-registers StringUtils.* activities"); +Console.WriteLine(" .UseGrpc();"); +Console.WriteLine(""); +Console.WriteLine(" Then in an orchestration:"); +Console.WriteLine(" string upper = await context.CallActivityAsync(\"StringUtils.ToUpper\", \"hello\");"); + +Console.WriteLine("\n=== All plugin demonstrations completed successfully! ==="); + +// --- Helper classes for the sample --- + +/// +/// A simple authorization handler that allows all tasks to execute. +/// In a real application, this would check user claims, roles, or other policies. +/// +sealed class AllowAllAuthorizationHandler : IAuthorizationHandler +{ + public Task AuthorizeAsync(AuthorizationContext context) + { + Console.WriteLine($" [Auth] Authorized {context.TargetType} '{context.Name}' for instance '{context.InstanceId}'"); + return Task.FromResult(true); + } +} + +/// +/// A validator that ensures city names passed to the SayHello activity are non-empty strings. +/// +sealed class CityNameValidator : IInputValidator +{ + public Task ValidateAsync(TaskName taskName, object? input) + { + // Only validate the SayHello activity. + if (taskName.Name == "SayHello") + { + if (input is not string city || string.IsNullOrWhiteSpace(city)) + { + return Task.FromResult(ValidationResult.Failure("City name must be a non-empty string.")); + } + } + + return Task.FromResult(ValidationResult.Success); + } +} + diff --git a/samples/PluginsSample/README.md b/samples/PluginsSample/README.md new file mode 100644 index 000000000..7418a8917 --- /dev/null +++ b/samples/PluginsSample/README.md @@ -0,0 +1,69 @@ +# Plugins Sample + +This sample demonstrates the **Durable Task Plugin system**, which is inspired by +[Temporal's plugin pattern](https://docs.temporal.io/develop/plugins). + +## What Plugins Can Do + +Temporal-style plugins serve **two purposes**: + +### 1. Reusable Activities and Orchestrations + +Plugins can ship pre-built activities and orchestrations that users get automatically +when they register the plugin. This is the "import and use" pattern: + +```csharp +// A plugin author creates a package with reusable activities +var stringUtilsPlugin = SimplePlugin.NewBuilder("MyOrg.StringUtils") + .AddTasks(registry => + { + registry.AddActivityFunc("StringUtils.ToUpper", + (ctx, input) => input.ToUpperInvariant()); + registry.AddActivityFunc("StringUtils.Reverse", + (ctx, input) => new string(input.Reverse().ToArray())); + }) + .Build(); + +// Users just register the plugin — activities are available immediately +builder.Services.AddDurableTaskWorker() + .UsePlugin(stringUtilsPlugin) + .UseGrpc(); + +// Then call the plugin's activities from any orchestration +string upper = await context.CallActivityAsync("StringUtils.ToUpper", "hello"); +``` + +### 2. Cross-Cutting Interceptors + +Plugins can add lifecycle interceptors for concerns like logging, metrics, auth, etc. + +## Built-in Cross-Cutting Plugins + +| Plugin | Description | +|--------|-------------| +| **LoggingPlugin** | Structured `ILogger` events for orchestration/activity lifecycle | +| **MetricsPlugin** | Execution counts, durations, success/failure tracking | +| **AuthorizationPlugin** | `IAuthorizationHandler` checks before execution | +| **ValidationPlugin** | `IInputValidator` input validation before execution | +| **RateLimitingPlugin** | Token-bucket rate limiting for activity dispatches | + +## Prerequisites + +- .NET 8.0 or later + +## Running the Sample + +```bash +dotnet run +``` + +(Uses the in-process test host — no external sidecar needed.) + +## Plugin Architecture + +The plugin system follows these key design principles: + +- **Dual-purpose** — Plugins can provide reusable tasks AND/OR cross-cutting interceptors. +- **Composable** — Multiple plugins can be registered and they execute in registration order. +- **Auto-registering** — Plugin tasks are automatically registered into the worker's task registry. +- **Temporal-aligned** — The `SimplePlugin` builder pattern mirrors Temporal's `SimplePlugin`. diff --git a/src/Abstractions/Abstractions.csproj b/src/Abstractions/Abstractions.csproj index db8be76ab..40ad6f6bb 100644 --- a/src/Abstractions/Abstractions.csproj +++ b/src/Abstractions/Abstractions.csproj @@ -22,6 +22,7 @@ + diff --git a/src/Extensions/Plugins/ActivityInterceptorContext.cs b/src/Extensions/Plugins/ActivityInterceptorContext.cs new file mode 100644 index 000000000..0c497a629 --- /dev/null +++ b/src/Extensions/Plugins/ActivityInterceptorContext.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Context provided to activity interceptors during lifecycle events. +/// +public sealed class ActivityInterceptorContext +{ + /// + /// Initializes a new instance of the class. + /// + /// The name of the activity. + /// The orchestration instance ID that scheduled this activity. + /// The activity input. + public ActivityInterceptorContext(TaskName name, string instanceId, object? input) + { + this.Name = name; + this.InstanceId = instanceId; + this.Input = input; + } + + /// + /// Gets the name of the activity. + /// + public TaskName Name { get; } + + /// + /// Gets the orchestration instance ID that scheduled this activity. + /// + public string InstanceId { get; } + + /// + /// Gets the activity input. + /// + public object? Input { get; } + + /// + /// Gets a dictionary that can be used to pass data between interceptors during a single execution. + /// + public IDictionary Properties { get; } = new Dictionary(); +} diff --git a/src/Extensions/Plugins/BuiltIn/AuthorizationContext.cs b/src/Extensions/Plugins/BuiltIn/AuthorizationContext.cs new file mode 100644 index 000000000..6500762d9 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/AuthorizationContext.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// Context for authorization decisions. +/// +public sealed class AuthorizationContext +{ + /// + /// Initializes a new instance of the class. + /// + /// The task name. + /// The orchestration instance ID. + /// The type of target (orchestration or activity). + /// The task input. + public AuthorizationContext(TaskName name, string instanceId, AuthorizationTargetType targetType, object? input) + { + this.Name = name; + this.InstanceId = instanceId; + this.TargetType = targetType; + this.Input = input; + } + + /// + /// Gets the task name. + /// + public TaskName Name { get; } + + /// + /// Gets the orchestration instance ID. + /// + public string InstanceId { get; } + + /// + /// Gets the type of target being authorized. + /// + public AuthorizationTargetType TargetType { get; } + + /// + /// Gets the task input. + /// + public object? Input { get; } +} diff --git a/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs b/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs new file mode 100644 index 000000000..1eb9cb248 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs @@ -0,0 +1,108 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that performs authorization checks before orchestrations and activities +/// are executed. Users define authorization rules via . +/// If a rule denies access, a is thrown. +/// +public sealed class AuthorizationPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.Authorization"; + + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + /// The authorization handler to evaluate. + public AuthorizationPlugin(IAuthorizationHandler handler) + { + Check.NotNull(handler); + this.orchestrationInterceptors = new List + { + new AuthorizationOrchestrationInterceptor(handler), + }; + this.activityInterceptors = new List + { + new AuthorizationActivityInterceptor(handler), + }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Authorization plugin is cross-cutting only; it does not register any tasks. + } + + sealed class AuthorizationOrchestrationInterceptor : IOrchestrationInterceptor + { + readonly IAuthorizationHandler handler; + + public AuthorizationOrchestrationInterceptor(IAuthorizationHandler handler) => this.handler = handler; + + public async Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + AuthorizationContext authContext = new( + context.Name, + context.InstanceId, + AuthorizationTargetType.Orchestration, + context.Input); + + if (!await this.handler.AuthorizeAsync(authContext)) + { + throw new UnauthorizedAccessException( + $"Authorization denied for orchestration '{context.Name}' (instance '{context.InstanceId}')."); + } + } + + public Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) => + Task.CompletedTask; + } + + sealed class AuthorizationActivityInterceptor : IActivityInterceptor + { + readonly IAuthorizationHandler handler; + + public AuthorizationActivityInterceptor(IAuthorizationHandler handler) => this.handler = handler; + + public async Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + AuthorizationContext authContext = new( + context.Name, + context.InstanceId, + AuthorizationTargetType.Activity, + context.Input); + + if (!await this.handler.AuthorizeAsync(authContext)) + { + throw new UnauthorizedAccessException( + $"Authorization denied for activity '{context.Name}' (instance '{context.InstanceId}')."); + } + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) => + Task.CompletedTask; + } +} diff --git a/src/Extensions/Plugins/BuiltIn/AuthorizationTargetType.cs b/src/Extensions/Plugins/BuiltIn/AuthorizationTargetType.cs new file mode 100644 index 000000000..ea0659fb5 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/AuthorizationTargetType.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// The type of authorization target. +/// +public enum AuthorizationTargetType +{ + /// + /// The target is an orchestration. + /// + Orchestration, + + /// + /// The target is an activity. + /// + Activity, +} diff --git a/src/Extensions/Plugins/BuiltIn/IAuthorizationHandler.cs b/src/Extensions/Plugins/BuiltIn/IAuthorizationHandler.cs new file mode 100644 index 000000000..53b10f82d --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/IAuthorizationHandler.cs @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// Handler for evaluating authorization rules on task executions. +/// +public interface IAuthorizationHandler +{ + /// + /// Evaluates whether the given task execution should be authorized. + /// + /// The authorization context. + /// true if execution is authorized; false otherwise. + Task AuthorizeAsync(AuthorizationContext context); +} diff --git a/src/Extensions/Plugins/BuiltIn/IInputValidator.cs b/src/Extensions/Plugins/BuiltIn/IInputValidator.cs new file mode 100644 index 000000000..16a439096 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/IInputValidator.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// Interface for validating task inputs. +/// +public interface IInputValidator +{ + /// + /// Validates the input for the specified task. + /// + /// The name of the task being validated. + /// The input to validate. + /// The validation result. + Task ValidateAsync(TaskName taskName, object? input); +} diff --git a/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs b/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs new file mode 100644 index 000000000..441100255 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs @@ -0,0 +1,120 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that provides structured logging for orchestration and activity lifecycle events. +/// Logs are emitted at appropriate levels (Information for start/complete, Error for failures) +/// and include contextual information such as instance IDs and task names. +/// +public sealed class LoggingPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.Logging"; + + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + /// The logger factory for creating loggers. + public LoggingPlugin(ILoggerFactory loggerFactory) + { + Check.NotNull(loggerFactory); + LoggingOrchestrationInterceptor orchestrationInterceptor = new(loggerFactory.CreateLogger("DurableTask.Orchestration")); + LoggingActivityInterceptor activityInterceptor = new(loggerFactory.CreateLogger("DurableTask.Activity")); + this.orchestrationInterceptors = new List { orchestrationInterceptor }; + this.activityInterceptors = new List { activityInterceptor }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Logging plugin is cross-cutting only; it does not register any tasks. + } + + sealed class LoggingOrchestrationInterceptor : IOrchestrationInterceptor + { + readonly ILogger logger; + + public LoggingOrchestrationInterceptor(ILogger logger) => this.logger = logger; + + public Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + this.logger.LogInformation( + "Orchestration '{Name}' started. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + + public Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) + { + this.logger.LogInformation( + "Orchestration '{Name}' completed. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + + public Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) + { + this.logger.LogError( + exception, + "Orchestration '{Name}' failed. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + } + + sealed class LoggingActivityInterceptor : IActivityInterceptor + { + readonly ILogger logger; + + public LoggingActivityInterceptor(ILogger logger) => this.logger = logger; + + public Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + this.logger.LogInformation( + "Activity '{Name}' started. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) + { + this.logger.LogInformation( + "Activity '{Name}' completed. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) + { + this.logger.LogError( + exception, + "Activity '{Name}' failed. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + } +} diff --git a/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs b/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs new file mode 100644 index 000000000..5650b0fd4 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs @@ -0,0 +1,237 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that tracks execution metrics for orchestrations and activities, +/// including counts (started, completed, failed) and durations. +/// +public sealed class MetricsPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.Metrics"; + + readonly MetricsStore store; + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + public MetricsPlugin() + : this(new MetricsStore()) + { + } + + /// + /// Initializes a new instance of the class with a shared store. + /// + /// The metrics store to use. + public MetricsPlugin(MetricsStore store) + { + Check.NotNull(store); + this.store = store; + this.orchestrationInterceptors = new List + { + new MetricsOrchestrationInterceptor(store), + }; + this.activityInterceptors = new List + { + new MetricsActivityInterceptor(store), + }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Metrics plugin is cross-cutting only; it does not register any tasks. + } + + /// + /// Gets the metrics store used by this plugin. + /// + public MetricsStore Store => this.store; + + sealed class MetricsOrchestrationInterceptor : IOrchestrationInterceptor + { + readonly MetricsStore store; + + public MetricsOrchestrationInterceptor(MetricsStore store) => this.store = store; + + public Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + this.store.IncrementOrchestrationStarted(context.Name); + context.Properties["_metrics_stopwatch"] = Stopwatch.StartNew(); + return Task.CompletedTask; + } + + public Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) + { + this.store.IncrementOrchestrationCompleted(context.Name); + if (context.Properties.TryGetValue("_metrics_stopwatch", out object? sw) && sw is Stopwatch stopwatch) + { + stopwatch.Stop(); + this.store.RecordOrchestrationDuration(context.Name, stopwatch.Elapsed); + } + + return Task.CompletedTask; + } + + public Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) + { + this.store.IncrementOrchestrationFailed(context.Name); + if (context.Properties.TryGetValue("_metrics_stopwatch", out object? sw) && sw is Stopwatch stopwatch) + { + stopwatch.Stop(); + this.store.RecordOrchestrationDuration(context.Name, stopwatch.Elapsed); + } + + return Task.CompletedTask; + } + } + + sealed class MetricsActivityInterceptor : IActivityInterceptor + { + readonly MetricsStore store; + + public MetricsActivityInterceptor(MetricsStore store) => this.store = store; + + public Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + this.store.IncrementActivityStarted(context.Name); + context.Properties["_metrics_stopwatch"] = Stopwatch.StartNew(); + return Task.CompletedTask; + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) + { + this.store.IncrementActivityCompleted(context.Name); + if (context.Properties.TryGetValue("_metrics_stopwatch", out object? sw) && sw is Stopwatch stopwatch) + { + stopwatch.Stop(); + this.store.RecordActivityDuration(context.Name, stopwatch.Elapsed); + } + + return Task.CompletedTask; + } + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) + { + this.store.IncrementActivityFailed(context.Name); + if (context.Properties.TryGetValue("_metrics_stopwatch", out object? sw) && sw is Stopwatch stopwatch) + { + stopwatch.Stop(); + this.store.RecordActivityDuration(context.Name, stopwatch.Elapsed); + } + + return Task.CompletedTask; + } + } +} + +/// +/// Thread-safe store for orchestration and activity execution metrics. +/// +public sealed class MetricsStore +{ + readonly ConcurrentDictionary orchestrationMetrics = new(); + readonly ConcurrentDictionary activityMetrics = new(); + + /// + /// Gets metrics for a specific orchestration by name. + /// + /// The orchestration name. + /// The metrics for the specified orchestration. + public TaskMetrics GetOrchestrationMetrics(string name) => + this.orchestrationMetrics.GetOrAdd(name, _ => new TaskMetrics()); + + /// + /// Gets metrics for a specific activity by name. + /// + /// The activity name. + /// The metrics for the specified activity. + public TaskMetrics GetActivityMetrics(string name) => + this.activityMetrics.GetOrAdd(name, _ => new TaskMetrics()); + + /// + /// Gets all orchestration metrics. + /// + /// A read-only dictionary of orchestration name to metrics. + public IReadOnlyDictionary GetAllOrchestrationMetrics() => this.orchestrationMetrics; + + /// + /// Gets all activity metrics. + /// + /// A read-only dictionary of activity name to metrics. + public IReadOnlyDictionary GetAllActivityMetrics() => this.activityMetrics; + + internal void IncrementOrchestrationStarted(TaskName name) => this.GetOrchestrationMetrics(name).IncrementStarted(); + + internal void IncrementOrchestrationCompleted(TaskName name) => this.GetOrchestrationMetrics(name).IncrementCompleted(); + + internal void IncrementOrchestrationFailed(TaskName name) => this.GetOrchestrationMetrics(name).IncrementFailed(); + + internal void RecordOrchestrationDuration(TaskName name, TimeSpan duration) => this.GetOrchestrationMetrics(name).RecordDuration(duration); + + internal void IncrementActivityStarted(TaskName name) => this.GetActivityMetrics(name).IncrementStarted(); + + internal void IncrementActivityCompleted(TaskName name) => this.GetActivityMetrics(name).IncrementCompleted(); + + internal void IncrementActivityFailed(TaskName name) => this.GetActivityMetrics(name).IncrementFailed(); + + internal void RecordActivityDuration(TaskName name, TimeSpan duration) => this.GetActivityMetrics(name).RecordDuration(duration); +} + +/// +/// Thread-safe metrics for a single task (orchestration or activity). +/// +public sealed class TaskMetrics +{ + long started; + long completed; + long failed; + long totalDurationTicks; + + /// + /// Gets the number of times this task was started. + /// + public long Started => Interlocked.Read(ref this.started); + + /// + /// Gets the number of times this task completed successfully. + /// + public long Completed => Interlocked.Read(ref this.completed); + + /// + /// Gets the number of times this task failed. + /// + public long Failed => Interlocked.Read(ref this.failed); + + /// + /// Gets the total accumulated duration across all executions. + /// + public TimeSpan TotalDuration => TimeSpan.FromTicks(Interlocked.Read(ref this.totalDurationTicks)); + + internal void IncrementStarted() => Interlocked.Increment(ref this.started); + + internal void IncrementCompleted() => Interlocked.Increment(ref this.completed); + + internal void IncrementFailed() => Interlocked.Increment(ref this.failed); + + internal void RecordDuration(TimeSpan duration) => Interlocked.Add(ref this.totalDurationTicks, duration.Ticks); +} diff --git a/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs b/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs new file mode 100644 index 000000000..6d6313a5c --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs @@ -0,0 +1,171 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Concurrent; + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that applies token-bucket rate limiting to activity executions. +/// When the rate limit is exceeded, a is thrown. +/// Rate limiting is applied per activity name. +/// +public sealed class RateLimitingPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.RateLimiting"; + + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + /// The rate limiting options. + public RateLimitingPlugin(RateLimitingOptions options) + { + Check.NotNull(options); + this.orchestrationInterceptors = Array.Empty(); + this.activityInterceptors = new List + { + new RateLimitingActivityInterceptor(options), + }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Rate limiting plugin is cross-cutting only; it does not register any tasks. + } + + sealed class RateLimitingActivityInterceptor : IActivityInterceptor + { + readonly RateLimitingOptions options; + readonly ConcurrentDictionary buckets = new(); + + public RateLimitingActivityInterceptor(RateLimitingOptions options) => this.options = options; + + public Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + string key = context.Name; + TokenBucket bucket = this.buckets.GetOrAdd(key, _ => new TokenBucket( + this.options.MaxTokens, + this.options.RefillRate, + this.options.RefillInterval)); + + if (!bucket.TryConsume()) + { + throw new RateLimitExceededException( + $"Rate limit exceeded for activity '{context.Name}'. " + + $"Max {this.options.MaxTokens} executions per {this.options.RefillInterval}."); + } + + return Task.CompletedTask; + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) => + Task.CompletedTask; + } +} + +/// +/// Options for the rate limiting plugin. +/// +public sealed class RateLimitingOptions +{ + /// + /// Gets or sets the maximum number of tokens (burst capacity). + /// + public int MaxTokens { get; set; } = 100; + + /// + /// Gets or sets the number of tokens to refill per interval. + /// + public int RefillRate { get; set; } = 10; + + /// + /// Gets or sets the interval between token refills. + /// + public TimeSpan RefillInterval { get; set; } = TimeSpan.FromSeconds(1); +} + +/// +/// Exception thrown when a rate limit is exceeded. +/// +public sealed class RateLimitExceededException : InvalidOperationException +{ + /// + /// Initializes a new instance of the class. + /// + /// The error message. + public RateLimitExceededException(string message) + : base(message) + { + } +} + +/// +/// Thread-safe token bucket implementation for rate limiting. +/// +internal sealed class TokenBucket +{ + readonly int maxTokens; + readonly int refillRate; + readonly TimeSpan refillInterval; + readonly object syncLock = new(); + int tokens; + DateTime lastRefillTime; + + public TokenBucket(int maxTokens, int refillRate, TimeSpan refillInterval) + { + this.maxTokens = maxTokens; + this.refillRate = refillRate; + this.refillInterval = refillInterval; + this.tokens = maxTokens; + this.lastRefillTime = DateTime.UtcNow; + } + + public bool TryConsume() + { + lock (this.syncLock) + { + this.Refill(); + + if (this.tokens > 0) + { + this.tokens--; + return true; + } + + return false; + } + } + + void Refill() + { + DateTime now = DateTime.UtcNow; + TimeSpan elapsed = now - this.lastRefillTime; + + if (elapsed >= this.refillInterval) + { + int intervalsElapsed = (int)(elapsed.Ticks / this.refillInterval.Ticks); + int tokensToAdd = intervalsElapsed * this.refillRate; + this.tokens = Math.Min(this.maxTokens, this.tokens + tokensToAdd); + this.lastRefillTime = now; + } + } +} diff --git a/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs b/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs new file mode 100644 index 000000000..8fbf513ee --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that validates input data before orchestrations and activities execute. +/// Users register validation rules via implementations. +/// If validation fails, an is thrown before the task runs. +/// +public sealed class ValidationPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.Validation"; + + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + /// The input validators to use. + public ValidationPlugin(params IInputValidator[] validators) + { + Check.NotNull(validators); + this.orchestrationInterceptors = new List + { + new ValidationOrchestrationInterceptor(validators), + }; + this.activityInterceptors = new List + { + new ValidationActivityInterceptor(validators), + }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Validation plugin is cross-cutting only; it does not register any tasks. + } + + sealed class ValidationOrchestrationInterceptor : IOrchestrationInterceptor + { + readonly IInputValidator[] validators; + + public ValidationOrchestrationInterceptor(IInputValidator[] validators) => this.validators = validators; + + public async Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + foreach (IInputValidator validator in this.validators) + { + ValidationResult result = await validator.ValidateAsync(context.Name, context.Input); + if (!result.IsValid) + { + throw new ArgumentException( + $"Input validation failed for orchestration '{context.Name}': {result.ErrorMessage}"); + } + } + } + + public Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) => + Task.CompletedTask; + } + + sealed class ValidationActivityInterceptor : IActivityInterceptor + { + readonly IInputValidator[] validators; + + public ValidationActivityInterceptor(IInputValidator[] validators) => this.validators = validators; + + public async Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + foreach (IInputValidator validator in this.validators) + { + ValidationResult result = await validator.ValidateAsync(context.Name, context.Input); + if (!result.IsValid) + { + throw new ArgumentException( + $"Input validation failed for activity '{context.Name}': {result.ErrorMessage}"); + } + } + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) => + Task.CompletedTask; + } +} diff --git a/src/Extensions/Plugins/BuiltIn/ValidationResult.cs b/src/Extensions/Plugins/BuiltIn/ValidationResult.cs new file mode 100644 index 000000000..a2b53542a --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/ValidationResult.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// Result of an input validation check. +/// +public readonly struct ValidationResult +{ + /// + /// A successful validation result. + /// + public static readonly ValidationResult Success = new(true, null); + + ValidationResult(bool isValid, string? errorMessage) + { + this.IsValid = isValid; + this.ErrorMessage = errorMessage; + } + + /// + /// Gets a value indicating whether the validation passed. + /// + public bool IsValid { get; } + + /// + /// Gets the error message if validation failed. + /// + public string? ErrorMessage { get; } + + /// + /// Creates a failed validation result. + /// + /// The error message. + /// A failed validation result. + public static ValidationResult Failure(string errorMessage) => new(false, errorMessage); +} diff --git a/src/Extensions/Plugins/DependencyInjection/DurableTaskBuiltInPluginExtensions.cs b/src/Extensions/Plugins/DependencyInjection/DurableTaskBuiltInPluginExtensions.cs new file mode 100644 index 000000000..3c647151d --- /dev/null +++ b/src/Extensions/Plugins/DependencyInjection/DurableTaskBuiltInPluginExtensions.cs @@ -0,0 +1,119 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask; + +/// +/// Convenience extension methods for adding built-in plugins to the Durable Task worker builder. +/// +public static class DurableTaskBuiltInPluginExtensions +{ + /// + /// Adds the logging plugin that emits structured log events for orchestration and activity lifecycle. + /// + /// The worker builder. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseLoggingPlugin(this IDurableTaskWorkerBuilder builder) + { + Check.NotNull(builder); + + // Defer plugin creation to when the service provider is available. + builder.Services.AddSingleton(sp => + { + ILoggerFactory loggerFactory = sp.GetRequiredService(); + return new LoggingPlugin(loggerFactory); + }); + + builder.Services.TryAddPluginPipeline(); + return builder; + } + + /// + /// Adds the metrics plugin that tracks execution counts and durations. + /// + /// The worker builder. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseMetricsPlugin(this IDurableTaskWorkerBuilder builder) + { + return builder.UseMetricsPlugin(new MetricsStore()); + } + + /// + /// Adds the metrics plugin with a shared metrics store. + /// + /// The worker builder. + /// The metrics store to use. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseMetricsPlugin( + this IDurableTaskWorkerBuilder builder, + MetricsStore store) + { + Check.NotNull(builder); + Check.NotNull(store); + + builder.Services.AddSingleton(store); + return builder.UsePlugin(new MetricsPlugin(store)); + } + + /// + /// Adds the authorization plugin with the specified handler. + /// + /// The worker builder. + /// The authorization handler. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseAuthorizationPlugin( + this IDurableTaskWorkerBuilder builder, + IAuthorizationHandler handler) + { + Check.NotNull(builder); + Check.NotNull(handler); + return builder.UsePlugin(new AuthorizationPlugin(handler)); + } + + /// + /// Adds the validation plugin with the specified validators. + /// + /// The worker builder. + /// The input validators to use. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseValidationPlugin( + this IDurableTaskWorkerBuilder builder, + params IInputValidator[] validators) + { + Check.NotNull(builder); + Check.NotNull(validators); + return builder.UsePlugin(new ValidationPlugin(validators)); + } + + /// + /// Adds the rate limiting plugin with the specified options. + /// + /// The worker builder. + /// Configuration callback for rate limiting options. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseRateLimitingPlugin( + this IDurableTaskWorkerBuilder builder, + Action? configure = null) + { + Check.NotNull(builder); + RateLimitingOptions options = new(); + configure?.Invoke(options); + return builder.UsePlugin(new RateLimitingPlugin(options)); + } + + static void TryAddPluginPipeline(this IServiceCollection services) + { + services.TryAddSingleton(sp => + { + IEnumerable plugins = sp.GetServices(); + return new PluginPipeline(plugins); + }); + } +} diff --git a/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs b/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs new file mode 100644 index 000000000..3a99d1776 --- /dev/null +++ b/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask; + +/// +/// Extension methods for adding plugins to the Durable Task worker builder. +/// +public static class DurableTaskWorkerBuilderExtensionsPlugins +{ + static readonly object WrappingRegistered = new(); + + /// + /// Adds a plugin to the Durable Task worker. All orchestration and activity interceptors + /// from the plugin will be invoked during execution, and the plugin's built-in activities + /// and orchestrations will be auto-registered into the worker's task registry. + /// + /// The worker builder. + /// The plugin to add. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UsePlugin( + this IDurableTaskWorkerBuilder builder, + IDurableTaskPlugin plugin) + { + Check.NotNull(builder); + Check.NotNull(plugin); + + builder.Services.AddSingleton(plugin); + builder.Services.TryAddSingleton(sp => + { + IEnumerable plugins = sp.GetServices(); + return new PluginPipeline(plugins); + }); + + // Auto-register the plugin's built-in activities and orchestrations. + builder.Services.Configure(builder.Name, registry => + { + plugin.RegisterTasks(registry); + }); + + // Register the PostConfigure that wraps all factories with plugin interceptors. + // TryAddEnumerable ensures this only runs once per builder name even if UsePlugin is called multiple times. + builder.Services.TryAddEnumerable( + ServiceDescriptor.Singleton>( + new PluginRegistryPostConfigure(builder.Name))); + + return builder; + } + + /// + /// Adds multiple plugins to the Durable Task worker. + /// + /// The worker builder. + /// The plugins to add. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UsePlugins( + this IDurableTaskWorkerBuilder builder, + params IDurableTaskPlugin[] plugins) + { + Check.NotNull(builder); + Check.NotNull(plugins); + + foreach (IDurableTaskPlugin plugin in plugins) + { + builder.UsePlugin(plugin); + } + + return builder; + } +} diff --git a/src/Extensions/Plugins/DependencyInjection/PluginRegistryPostConfigure.cs b/src/Extensions/Plugins/DependencyInjection/PluginRegistryPostConfigure.cs new file mode 100644 index 000000000..7a2fff1e5 --- /dev/null +++ b/src/Extensions/Plugins/DependencyInjection/PluginRegistryPostConfigure.cs @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Plugins; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask; + +/// +/// Post-configures a to wrap all orchestrator and activity +/// factories with and . +/// This ensures plugin interceptors run transparently for every orchestration and activity execution. +/// +sealed class PluginRegistryPostConfigure : IPostConfigureOptions +{ + readonly string name; + + public PluginRegistryPostConfigure(string name) + { + this.name = name; + } + + /// + public void PostConfigure(string? name, DurableTaskRegistry registry) + { + if (!string.Equals(name, this.name, StringComparison.Ordinal)) + { + return; + } + + // Wrap all orchestrator factories so interceptors run on every execution. + List>> orchestrators = new(registry.Orchestrators); + foreach (KeyValuePair> entry in orchestrators) + { + Func original = entry.Value; + registry.Orchestrators[entry.Key] = sp => + { + ITaskOrchestrator inner = original(sp); + PluginPipeline? pipeline = sp.GetService(typeof(PluginPipeline)) as PluginPipeline; + return pipeline is not null && pipeline.HasOrchestrationInterceptors + ? new PluginOrchestrationWrapper(inner, pipeline) + : inner; + }; + } + + // Wrap all activity factories so interceptors run on every execution. + List>> activities = new(registry.Activities); + foreach (KeyValuePair> entry in activities) + { + Func original = entry.Value; + registry.Activities[entry.Key] = sp => + { + ITaskActivity inner = original(sp); + PluginPipeline? pipeline = sp.GetService(typeof(PluginPipeline)) as PluginPipeline; + return pipeline is not null && pipeline.HasActivityInterceptors + ? new PluginActivityWrapper(inner, pipeline) + : inner; + }; + } + } +} diff --git a/src/Extensions/Plugins/IActivityInterceptor.cs b/src/Extensions/Plugins/IActivityInterceptor.cs new file mode 100644 index 000000000..fbdcf98de --- /dev/null +++ b/src/Extensions/Plugins/IActivityInterceptor.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Interceptor for activity lifecycle events. +/// Implementations can run logic before and after activity execution. +/// +public interface IActivityInterceptor +{ + /// + /// Called before an activity begins execution. + /// + /// The activity interceptor context. + /// A task that completes when the interceptor logic is finished. + Task OnActivityStartingAsync(ActivityInterceptorContext context); + + /// + /// Called after an activity completes execution successfully. + /// + /// The activity interceptor context. + /// The activity result, if any. + /// A task that completes when the interceptor logic is finished. + Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result); + + /// + /// Called after an activity fails with an exception. + /// + /// The activity interceptor context. + /// The exception that caused the failure. + /// A task that completes when the interceptor logic is finished. + Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception); +} diff --git a/src/Extensions/Plugins/IDurableTaskPlugin.cs b/src/Extensions/Plugins/IDurableTaskPlugin.cs new file mode 100644 index 000000000..ebbafc14b --- /dev/null +++ b/src/Extensions/Plugins/IDurableTaskPlugin.cs @@ -0,0 +1,39 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Defines a plugin that can provide reusable activities, orchestrations, and cross-cutting +/// interceptors. Inspired by Temporal's plugin pattern, plugins serve two purposes: +/// +/// Provide built-in activities and orchestrations that users import and register +/// automatically when adding the plugin to a worker. +/// Add cross-cutting interceptors for concerns like logging, metrics, authorization, +/// validation, and rate limiting. +/// +/// +public interface IDurableTaskPlugin +{ + /// + /// Gets the unique name of this plugin. + /// + string Name { get; } + + /// + /// Gets the orchestration interceptors provided by this plugin. + /// + IReadOnlyList OrchestrationInterceptors { get; } + + /// + /// Gets the activity interceptors provided by this plugin. + /// + IReadOnlyList ActivityInterceptors { get; } + + /// + /// Registers the plugin's built-in orchestrations and activities into the given registry. + /// This is called automatically when the plugin is added to a worker via UsePlugin(). + /// + /// The task registry to register activities and orchestrations into. + void RegisterTasks(DurableTaskRegistry registry); +} diff --git a/src/Extensions/Plugins/IOrchestrationInterceptor.cs b/src/Extensions/Plugins/IOrchestrationInterceptor.cs new file mode 100644 index 000000000..9d646221d --- /dev/null +++ b/src/Extensions/Plugins/IOrchestrationInterceptor.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Interceptor for orchestration lifecycle events. +/// Implementations can run logic before and after orchestration execution. +/// +public interface IOrchestrationInterceptor +{ + /// + /// Called before an orchestration begins execution (including replays). + /// + /// The orchestration context. + /// A task that completes when the interceptor logic is finished. + Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context); + + /// + /// Called after an orchestration completes execution successfully. + /// + /// The orchestration context. + /// The orchestration result, if any. + /// A task that completes when the interceptor logic is finished. + Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result); + + /// + /// Called after an orchestration fails with an exception. + /// + /// The orchestration context. + /// The exception that caused the failure. + /// A task that completes when the interceptor logic is finished. + Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception); +} diff --git a/src/Extensions/Plugins/Internal/Check.cs b/src/Extensions/Plugins/Internal/Check.cs new file mode 100644 index 000000000..54200eae1 --- /dev/null +++ b/src/Extensions/Plugins/Internal/Check.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask; + +/// +/// Minimal argument validation helpers for the Plugins extension package. +/// +static class Check +{ + public static T NotNull(T? argument, string? name = null) + where T : class + { + if (argument is null) + { + throw new ArgumentNullException(name); + } + + return argument; + } + + public static string NotNullOrEmpty(string? argument, string? name = null) + { + if (string.IsNullOrEmpty(argument)) + { + throw new ArgumentException("Value cannot be null or empty.", name); + } + + return argument!; + } +} diff --git a/src/Extensions/Plugins/OrchestrationInterceptorContext.cs b/src/Extensions/Plugins/OrchestrationInterceptorContext.cs new file mode 100644 index 000000000..4cd150c50 --- /dev/null +++ b/src/Extensions/Plugins/OrchestrationInterceptorContext.cs @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Context provided to orchestration interceptors during lifecycle events. +/// +public sealed class OrchestrationInterceptorContext +{ + /// + /// Initializes a new instance of the class. + /// + /// The name of the orchestration. + /// The instance ID of the orchestration. + /// Whether this is a replay execution. + /// The orchestration input. + public OrchestrationInterceptorContext(TaskName name, string instanceId, bool isReplaying, object? input) + { + this.Name = name; + this.InstanceId = instanceId; + this.IsReplaying = isReplaying; + this.Input = input; + } + + /// + /// Gets the name of the orchestration. + /// + public TaskName Name { get; } + + /// + /// Gets the instance ID of the orchestration. + /// + public string InstanceId { get; } + + /// + /// Gets a value indicating whether this execution is a replay. + /// + public bool IsReplaying { get; } + + /// + /// Gets the orchestration input. + /// + public object? Input { get; } + + /// + /// Gets a dictionary that can be used to pass data between interceptors during a single execution. + /// + public IDictionary Properties { get; } = new Dictionary(); +} diff --git a/src/Extensions/Plugins/PluginActivityWrapper.cs b/src/Extensions/Plugins/PluginActivityWrapper.cs new file mode 100644 index 000000000..d1165f9b1 --- /dev/null +++ b/src/Extensions/Plugins/PluginActivityWrapper.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Wraps an with the plugin pipeline, invoking +/// interceptors before and after the inner activity runs. +/// +internal sealed class PluginActivityWrapper : ITaskActivity +{ + readonly ITaskActivity inner; + readonly PluginPipeline pipeline; + + /// + /// Initializes a new instance of the class. + /// + /// The original activity to wrap. + /// The plugin pipeline. + public PluginActivityWrapper(ITaskActivity inner, PluginPipeline pipeline) + { + this.inner = inner; + this.pipeline = pipeline; + } + + /// + public Type InputType => this.inner.InputType; + + /// + public Type OutputType => this.inner.OutputType; + + /// + public async Task RunAsync(TaskActivityContext context, object? input) + { + ActivityInterceptorContext interceptorContext = new( + context.Name, + context.InstanceId, + input); + + await this.pipeline.ExecuteActivityStartingAsync(interceptorContext); + + try + { + object? result = await this.inner.RunAsync(context, input); + await this.pipeline.ExecuteActivityCompletedAsync(interceptorContext, result); + return result; + } + catch (Exception ex) + { + await this.pipeline.ExecuteActivityFailedAsync(interceptorContext, ex); + throw; + } + } +} diff --git a/src/Extensions/Plugins/PluginOrchestrationWrapper.cs b/src/Extensions/Plugins/PluginOrchestrationWrapper.cs new file mode 100644 index 000000000..433fce9c7 --- /dev/null +++ b/src/Extensions/Plugins/PluginOrchestrationWrapper.cs @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Wraps an with the plugin pipeline, invoking +/// interceptors before and after the inner orchestrator runs. +/// +internal sealed class PluginOrchestrationWrapper : ITaskOrchestrator +{ + readonly ITaskOrchestrator inner; + readonly PluginPipeline pipeline; + + /// + /// Initializes a new instance of the class. + /// + /// The original orchestrator to wrap. + /// The plugin pipeline. + public PluginOrchestrationWrapper(ITaskOrchestrator inner, PluginPipeline pipeline) + { + this.inner = inner; + this.pipeline = pipeline; + } + + /// + public Type InputType => this.inner.InputType; + + /// + public Type OutputType => this.inner.OutputType; + + /// + public async Task RunAsync(TaskOrchestrationContext context, object? input) + { + OrchestrationInterceptorContext interceptorContext = new( + context.Name, + context.InstanceId, + context.IsReplaying, + input); + + // Only run non-replay interceptors for starting/completing events to avoid duplication. + if (!context.IsReplaying) + { + await this.pipeline.ExecuteOrchestrationStartingAsync(interceptorContext); + } + + try + { + object? result = await this.inner.RunAsync(context, input); + + if (!context.IsReplaying) + { + await this.pipeline.ExecuteOrchestrationCompletedAsync(interceptorContext, result); + } + + return result; + } + catch (Exception ex) + { + if (!context.IsReplaying) + { + await this.pipeline.ExecuteOrchestrationFailedAsync(interceptorContext, ex); + } + + throw; + } + } +} diff --git a/src/Extensions/Plugins/PluginPipeline.cs b/src/Extensions/Plugins/PluginPipeline.cs new file mode 100644 index 000000000..f360edbff --- /dev/null +++ b/src/Extensions/Plugins/PluginPipeline.cs @@ -0,0 +1,137 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Manages the pipeline of registered plugins and executes interceptors in order. +/// +public sealed class PluginPipeline +{ + readonly IReadOnlyList plugins; + + /// + /// Initializes a new instance of the class. + /// + /// The plugins to include in the pipeline. + public PluginPipeline(IEnumerable plugins) + { + Check.NotNull(plugins); + this.plugins = plugins.ToList(); + } + + /// + /// Gets the registered plugins. + /// + public IReadOnlyList Plugins => this.plugins; + + /// + /// Gets a value indicating whether any registered plugin has orchestration interceptors. + /// + public bool HasOrchestrationInterceptors => this.plugins.Any(p => p.OrchestrationInterceptors.Count > 0); + + /// + /// Gets a value indicating whether any registered plugin has activity interceptors. + /// + public bool HasActivityInterceptors => this.plugins.Any(p => p.ActivityInterceptors.Count > 0); + + /// + /// Executes all orchestration-starting interceptors in registration order. + /// + /// The orchestration interceptor context. + /// A task that completes when all interceptors have run. + public async Task ExecuteOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IOrchestrationInterceptor interceptor in plugin.OrchestrationInterceptors) + { + await interceptor.OnOrchestrationStartingAsync(context); + } + } + } + + /// + /// Executes all orchestration-completed interceptors in registration order. + /// + /// The orchestration interceptor context. + /// The orchestration result. + /// A task that completes when all interceptors have run. + public async Task ExecuteOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IOrchestrationInterceptor interceptor in plugin.OrchestrationInterceptors) + { + await interceptor.OnOrchestrationCompletedAsync(context, result); + } + } + } + + /// + /// Executes all orchestration-failed interceptors in registration order. + /// + /// The orchestration interceptor context. + /// The exception that caused the failure. + /// A task that completes when all interceptors have run. + public async Task ExecuteOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IOrchestrationInterceptor interceptor in plugin.OrchestrationInterceptors) + { + await interceptor.OnOrchestrationFailedAsync(context, exception); + } + } + } + + /// + /// Executes all activity-starting interceptors in registration order. + /// + /// The activity interceptor context. + /// A task that completes when all interceptors have run. + public async Task ExecuteActivityStartingAsync(ActivityInterceptorContext context) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IActivityInterceptor interceptor in plugin.ActivityInterceptors) + { + await interceptor.OnActivityStartingAsync(context); + } + } + } + + /// + /// Executes all activity-completed interceptors in registration order. + /// + /// The activity interceptor context. + /// The activity result. + /// A task that completes when all interceptors have run. + public async Task ExecuteActivityCompletedAsync(ActivityInterceptorContext context, object? result) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IActivityInterceptor interceptor in plugin.ActivityInterceptors) + { + await interceptor.OnActivityCompletedAsync(context, result); + } + } + } + + /// + /// Executes all activity-failed interceptors in registration order. + /// + /// The activity interceptor context. + /// The exception that caused the failure. + /// A task that completes when all interceptors have run. + public async Task ExecuteActivityFailedAsync(ActivityInterceptorContext context, Exception exception) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IActivityInterceptor interceptor in plugin.ActivityInterceptors) + { + await interceptor.OnActivityFailedAsync(context, exception); + } + } + } +} diff --git a/src/Extensions/Plugins/Plugins.csproj b/src/Extensions/Plugins/Plugins.csproj new file mode 100644 index 000000000..ba8d7a7a3 --- /dev/null +++ b/src/Extensions/Plugins/Plugins.csproj @@ -0,0 +1,32 @@ + + + + netstandard2.0;net6.0;net8.0;net10.0 + Plugin system for Microsoft Durable Task SDK. Includes built-in plugins for logging, metrics, authorization, validation, and rate limiting. + Microsoft.DurableTask.Extensions.Plugins + Microsoft.DurableTask + true + + + + + + + + + + + + + + + + + + + diff --git a/src/Extensions/Plugins/SimplePlugin.cs b/src/Extensions/Plugins/SimplePlugin.cs new file mode 100644 index 000000000..38f9fa928 --- /dev/null +++ b/src/Extensions/Plugins/SimplePlugin.cs @@ -0,0 +1,135 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// A simple plugin implementation that can provide both reusable activities/orchestrations +/// and cross-cutting interceptors. This is the recommended way to build plugins, following +/// Temporal's SimplePlugin pattern. +/// +/// Use this to create plugins that ship as NuGet packages, providing pre-built activities +/// and orchestrations that users get automatically when they register the plugin, as well +/// as interceptors for cross-cutting concerns like logging or metrics. +/// +/// +public sealed class SimplePlugin : IDurableTaskPlugin +{ + readonly List orchestrationInterceptors; + readonly List activityInterceptors; + readonly Action? registerTasks; + + SimplePlugin( + string name, + List orchestrationInterceptors, + List activityInterceptors, + Action? registerTasks) + { + this.Name = name; + this.orchestrationInterceptors = orchestrationInterceptors; + this.activityInterceptors = activityInterceptors; + this.registerTasks = registerTasks; + } + + /// + public string Name { get; } + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + this.registerTasks?.Invoke(registry); + } + + /// + /// Creates a new for constructing a . + /// + /// The unique name of the plugin. + /// A new builder instance. + public static Builder NewBuilder(string name) + { + Check.NotNullOrEmpty(name); + return new Builder(name); + } + + /// + /// Builder for constructing instances. + /// + public sealed class Builder + { + readonly string name; + readonly List orchestrationInterceptors = new(); + readonly List activityInterceptors = new(); + Action? registerTasks; + + /// + /// Initializes a new instance of the class. + /// + /// The plugin name. + internal Builder(string name) + { + this.name = name; + } + + /// + /// Adds an orchestration interceptor to the plugin. + /// + /// The interceptor to add. + /// This builder, for call chaining. + public Builder AddOrchestrationInterceptor(IOrchestrationInterceptor interceptor) + { + Check.NotNull(interceptor); + this.orchestrationInterceptors.Add(interceptor); + return this; + } + + /// + /// Adds an activity interceptor to the plugin. + /// + /// The interceptor to add. + /// This builder, for call chaining. + public Builder AddActivityInterceptor(IActivityInterceptor interceptor) + { + Check.NotNull(interceptor); + this.activityInterceptors.Add(interceptor); + return this; + } + + /// + /// Sets a callback that registers the plugin's built-in orchestrations and activities. + /// These tasks are automatically registered when the plugin is added to a worker. + /// + /// A callback that registers tasks into the . + /// This builder, for call chaining. + public Builder AddTasks(Action configure) + { + Check.NotNull(configure); + Action? previous = this.registerTasks; + this.registerTasks = registry => + { + previous?.Invoke(registry); + configure(registry); + }; + + return this; + } + + /// + /// Builds the instance. + /// + /// A new . + public SimplePlugin Build() + { + return new SimplePlugin( + this.name, + new List(this.orchestrationInterceptors), + new List(this.activityInterceptors), + this.registerTasks); + } + } +} diff --git a/test/Extensions.Plugins.E2ETests/DtsFixture.cs b/test/Extensions.Plugins.E2ETests/DtsFixture.cs new file mode 100644 index 000000000..cbb6630d5 --- /dev/null +++ b/test/Extensions.Plugins.E2ETests/DtsFixture.cs @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask.Extensions.Plugins.E2ETests; + +/// +/// Shared fixture that sets up a DTS-connected worker and client for E2E tests. +/// Reads the connection string from DTS_CONNECTION_STRING environment variable. +/// +public sealed class DtsFixture : IAsyncDisposable +{ + IHost? host; + + public DurableTaskClient Client { get; private set; } = null!; + + public MetricsStore MetricsStore { get; } = new(); + + public List AuthorizationLog { get; } = new(); + + public static string? GetConnectionString() => + Environment.GetEnvironmentVariable("DTS_CONNECTION_STRING"); + + public async Task StartAsync( + Action configureTasks, + Action? configureWorker = null) + { + string connectionString = GetConnectionString() + ?? throw new InvalidOperationException( + "DTS_CONNECTION_STRING environment variable is required. " + + "Format: Endpoint=https://...;Authentication=DefaultAzure;TaskHub=..."); + + HostApplicationBuilder builder = Host.CreateApplicationBuilder(); + builder.Logging.SetMinimumLevel(LogLevel.Warning); + + builder.Services.AddDurableTaskClient(clientBuilder => + { + clientBuilder.UseDurableTaskScheduler(connectionString); + }); + + builder.Services.AddDurableTaskWorker(workerBuilder => + { + workerBuilder.AddTasks(configureTasks); + workerBuilder.UseDurableTaskScheduler(connectionString); + + // Apply custom plugin configuration. + configureWorker?.Invoke(workerBuilder); + }); + + this.host = builder.Build(); + await this.host.StartAsync(); + + this.Client = this.host.Services.GetRequiredService(); + } + + public async ValueTask DisposeAsync() + { + if (this.host is not null) + { + await this.host.StopAsync(); + this.host.Dispose(); + } + } +} diff --git a/test/Extensions.Plugins.E2ETests/Extensions.Plugins.E2ETests.csproj b/test/Extensions.Plugins.E2ETests/Extensions.Plugins.E2ETests.csproj new file mode 100644 index 000000000..64307abe1 --- /dev/null +++ b/test/Extensions.Plugins.E2ETests/Extensions.Plugins.E2ETests.csproj @@ -0,0 +1,19 @@ + + + + net10.0 + + + + + + + + + + + + + + + diff --git a/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs b/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs new file mode 100644 index 000000000..287ec9cfd --- /dev/null +++ b/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs @@ -0,0 +1,424 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.Logging; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Extensions.Plugins.E2ETests; + +/// +/// End-to-end tests for the plugin system against a real Durable Task Scheduler. +/// Requires the DTS_CONNECTION_STRING environment variable to be set. +/// +[Collection("DTS")] +[Trait("Category", "E2E")] +public class PluginE2ETests : IAsyncLifetime +{ + readonly ITestOutputHelper output; + readonly DtsFixture fixture = new(); + readonly string testId = Guid.NewGuid().ToString("N")[..8]; + + public PluginE2ETests(ITestOutputHelper output) + { + this.output = output; + } + + public Task InitializeAsync() => Task.CompletedTask; + + public async Task DisposeAsync() => await this.fixture.DisposeAsync(); + + /// + /// Returns true if DTS_CONNECTION_STRING is set; tests should return early if false. + /// + bool HasConnectionString() + { + string? cs = DtsFixture.GetConnectionString(); + if (string.IsNullOrEmpty(cs)) + { + this.output.WriteLine("SKIPPED: DTS_CONNECTION_STRING not set."); + return false; + } + + return true; + } + + // ─── Test 1: Metrics plugin tracks orchestration and activity counts ─── + + [Fact] + public async Task MetricsPlugin_TracksExecutionCounts_E2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"MetricsOrch_{this.testId}"; + string actName = $"MetricsAct_{this.testId}"; + MetricsStore store = new(); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + string r = await ctx.CallActivityAsync(actName, "hello"); + return r; + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"echo:{input}"); + }, + worker => worker.UseMetricsPlugin(store)); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + this.output.WriteLine($"Scheduled: {instanceId}"); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + this.output.WriteLine($"Output: {result.SerializedOutput}"); + + // Metrics plugin interceptors should have been invoked via the wrapper. + store.GetOrchestrationMetrics(orchName).Started.Should().BeGreaterOrEqualTo(1); + store.GetOrchestrationMetrics(orchName).Completed.Should().BeGreaterOrEqualTo(1); + store.GetActivityMetrics(actName).Started.Should().BeGreaterOrEqualTo(1); + store.GetActivityMetrics(actName).Completed.Should().BeGreaterOrEqualTo(1); + store.GetOrchestrationMetrics(orchName).Failed.Should().Be(0); + store.GetActivityMetrics(actName).Failed.Should().Be(0); + + this.output.WriteLine($"Orch started={store.GetOrchestrationMetrics(orchName).Started}, completed={store.GetOrchestrationMetrics(orchName).Completed}"); + this.output.WriteLine($"Activity started={store.GetActivityMetrics(actName).Started}, completed={store.GetActivityMetrics(actName).Completed}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 2: Logging plugin fires interceptors without errors ─── + + [Fact] + public async Task LoggingPlugin_DoesNotBreakExecution_E2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"LogOrch_{this.testId}"; + string actName = $"LogAct_{this.testId}"; + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "world"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"Hi {input}"); + }, + worker => worker.UseLoggingPlugin()); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + this.output.WriteLine($"Scheduled: {instanceId}"); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("Hi world"); + this.output.WriteLine($"Completed with output: {result.SerializedOutput}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 3: Authorization plugin blocks unauthorized execution ─── + + [Fact] + public async Task AuthorizationPlugin_BlocksUnauthorized_E2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"AuthBlockOrch_{this.testId}"; + string actName = $"AuthBlockAct_{this.testId}"; + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "data"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => input); + }, + worker => worker.UseAuthorizationPlugin(new DenyAllHandler())); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + this.output.WriteLine($"Scheduled: {instanceId}"); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + // The orchestration should fail because the authorization interceptor denies it. + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Failed); + result.FailureDetails.Should().NotBeNull(); + result.FailureDetails!.ErrorMessage.Should().Contain("Authorization denied"); + this.output.WriteLine($"Failed as expected: {result.FailureDetails.ErrorMessage}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 4: Authorization plugin allows authorized execution ─── + + [Fact] + public async Task AuthorizationPlugin_AllowsAuthorized_E2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"AuthAllowOrch_{this.testId}"; + string actName = $"AuthAllowAct_{this.testId}"; + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "allowed"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"ok:{input}"); + }, + worker => worker.UseAuthorizationPlugin(new AllowAllHandler())); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("ok:allowed"); + this.output.WriteLine($"Authorized OK: {result.SerializedOutput}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 5: Validation plugin rejects invalid input ─── + + [Fact] + public async Task ValidationPlugin_RejectsInvalidInput_E2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"ValRejectOrch_{this.testId}"; + string actName = $"ValRejectAct_{this.testId}"; + + // Validator rejects null/empty inputs for the activity. + IInputValidator validator = new NonEmptyStringValidator(actName); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + // Pass empty string — should be rejected by validation. + return await ctx.CallActivityAsync(actName, string.Empty); + }); + tasks.AddActivityFunc(actName, (ctx, input) => input); + }, + worker => worker.UseValidationPlugin(validator)); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + // Orchestration completes but the activity call should fail with TaskFailedException + // because the validation interceptor throws ArgumentException. + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Failed); + this.output.WriteLine($"Validation rejected: {result.FailureDetails?.ErrorMessage}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 6: Validation plugin passes valid input ─── + + [Fact] + public async Task ValidationPlugin_AcceptsValidInput_E2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"ValAcceptOrch_{this.testId}"; + string actName = $"ValAcceptAct_{this.testId}"; + + IInputValidator validator = new NonEmptyStringValidator(actName); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "valid-data"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"processed:{input}"); + }, + worker => worker.UseValidationPlugin(validator)); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("processed:valid-data"); + this.output.WriteLine($"Valid OK: {result.SerializedOutput}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 7: Plugin-provided activities are callable ─── + + [Fact] + public async Task PluginRegisteredActivity_IsCallable_E2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"PluginTaskOrch_{this.testId}"; + string pluginActName = $"PluginAct_{this.testId}"; + + SimplePlugin taskPlugin = SimplePlugin.NewBuilder("E2E.TaskPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc(pluginActName, (ctx, input) => $"plugin:{input}"); + }) + .Build(); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + // Call the activity that was registered by the plugin, not by the user. + return await ctx.CallActivityAsync(pluginActName, "from-orch"); + }); + }, + worker => worker.UsePlugin(taskPlugin)); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("plugin:from-orch"); + this.output.WriteLine($"Plugin activity result: {result.SerializedOutput}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 8: Multiple plugins all fire interceptors ─── + + [Fact] + public async Task MultiplePlugins_AllInterceptorsFire_E2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"MultiPlugOrch_{this.testId}"; + string actName = $"MultiPlugAct_{this.testId}"; + MetricsStore store = new(); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "multi"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"done:{input}"); + }, + worker => + { + worker.UseLoggingPlugin(); + worker.UseMetricsPlugin(store); + worker.UseAuthorizationPlugin(new AllowAllHandler()); + }); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("done:multi"); + + // Metrics plugin should have recorded counts because all plugins fire. + store.GetOrchestrationMetrics(orchName).Started.Should().BeGreaterOrEqualTo(1); + store.GetActivityMetrics(actName).Started.Should().BeGreaterOrEqualTo(1); + this.output.WriteLine($"Multiple plugins OK. Orch started={store.GetOrchestrationMetrics(orchName).Started}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 9: Plugin with both tasks and interceptors works E2E ─── + + [Fact] + public async Task PluginWithTasksAndInterceptors_WorksE2E() + { + if (!this.HasConnectionString()) return; + string orchName = $"FullPlugOrch_{this.testId}"; + string pluginActName = $"FullPlugAct_{this.testId}"; + MetricsStore store = new(); + + SimplePlugin fullPlugin = SimplePlugin.NewBuilder("E2E.FullPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc(pluginActName, (ctx, input) => $"full:{input}"); + }) + .Build(); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(pluginActName, "test"); + }); + }, + worker => + { + worker.UsePlugin(fullPlugin); + worker.UseMetricsPlugin(store); + }); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("full:test"); + + // Metrics interceptors wrapped the plugin-provided activity too. + store.GetActivityMetrics(pluginActName).Started.Should().BeGreaterOrEqualTo(1); + this.output.WriteLine($"Full plugin OK. Activity started={store.GetActivityMetrics(pluginActName).Started}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Helper classes ─── + + sealed class DenyAllHandler : IAuthorizationHandler + { + public Task AuthorizeAsync(AuthorizationContext context) => Task.FromResult(false); + } + + sealed class AllowAllHandler : IAuthorizationHandler + { + public Task AuthorizeAsync(AuthorizationContext context) => Task.FromResult(true); + } + + sealed class NonEmptyStringValidator : IInputValidator + { + readonly string targetActivity; + + public NonEmptyStringValidator(string targetActivity) => this.targetActivity = targetActivity; + + public Task ValidateAsync(TaskName taskName, object? input) + { + if (taskName.Name == this.targetActivity && input is string s && string.IsNullOrEmpty(s)) + { + return Task.FromResult(ValidationResult.Failure("Input must be non-empty.")); + } + + return Task.FromResult(ValidationResult.Success); + } + } +} + + diff --git a/test/Extensions.Plugins.Tests/AuthorizationPluginTests.cs b/test/Extensions.Plugins.Tests/AuthorizationPluginTests.cs new file mode 100644 index 000000000..81f2acae7 --- /dev/null +++ b/test/Extensions.Plugins.Tests/AuthorizationPluginTests.cs @@ -0,0 +1,90 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Moq; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class AuthorizationPluginTests +{ + [Fact] + public async Task AuthorizationPlugin_AllowsAuthorizedOrchestration() + { + // Arrange + Mock handler = new(); + handler.Setup(h => h.AuthorizeAsync(It.IsAny())) + .ReturnsAsync(true); + + AuthorizationPlugin plugin = new(handler.Object); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act & Assert + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().NotThrowAsync(); + } + + [Fact] + public async Task AuthorizationPlugin_DeniesUnauthorizedOrchestration() + { + // Arrange + Mock handler = new(); + handler.Setup(h => h.AuthorizeAsync(It.IsAny())) + .ReturnsAsync(false); + + AuthorizationPlugin plugin = new(handler.Object); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act & Assert + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*TestOrch*"); + } + + [Fact] + public async Task AuthorizationPlugin_DeniesUnauthorizedActivity() + { + // Arrange + Mock handler = new(); + handler.Setup(h => h.AuthorizeAsync(It.IsAny())) + .ReturnsAsync(false); + + AuthorizationPlugin plugin = new(handler.Object); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act & Assert + await plugin.ActivityInterceptors[0].Invoking( + i => i.OnActivityStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*TestActivity*"); + } + + [Fact] + public async Task AuthorizationPlugin_PassesCorrectContextToHandler() + { + // Arrange + AuthorizationContext? capturedContext = null; + Mock handler = new(); + handler.Setup(h => h.AuthorizeAsync(It.IsAny())) + .Callback(c => capturedContext = c) + .ReturnsAsync(true); + + AuthorizationPlugin plugin = new(handler.Object); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, "some-input"); + + // Act + await plugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(context); + + // Assert + capturedContext.Should().NotBeNull(); + capturedContext!.Name.Should().Be(new TaskName("TestOrch")); + capturedContext.InstanceId.Should().Be("instance1"); + capturedContext.TargetType.Should().Be(AuthorizationTargetType.Orchestration); + capturedContext.Input.Should().Be("some-input"); + } +} diff --git a/test/Extensions.Plugins.Tests/Extensions.Plugins.Tests.csproj b/test/Extensions.Plugins.Tests/Extensions.Plugins.Tests.csproj new file mode 100644 index 000000000..69d426aa2 --- /dev/null +++ b/test/Extensions.Plugins.Tests/Extensions.Plugins.Tests.csproj @@ -0,0 +1,11 @@ + + + + net10.0 + + + + + + + diff --git a/test/Extensions.Plugins.Tests/LoggingPluginTests.cs b/test/Extensions.Plugins.Tests/LoggingPluginTests.cs new file mode 100644 index 000000000..c0c06c294 --- /dev/null +++ b/test/Extensions.Plugins.Tests/LoggingPluginTests.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class LoggingPluginTests +{ + [Fact] + public void LoggingPlugin_HasCorrectName() + { + // Arrange + Mock loggerFactory = new(); + loggerFactory.Setup(f => f.CreateLogger(It.IsAny())).Returns(Mock.Of()); + + // Act + LoggingPlugin plugin = new(loggerFactory.Object); + + // Assert + plugin.Name.Should().Be(LoggingPlugin.DefaultName); + } + + [Fact] + public void LoggingPlugin_HasOneOrchestrationInterceptor() + { + // Arrange + Mock loggerFactory = new(); + loggerFactory.Setup(f => f.CreateLogger(It.IsAny())).Returns(Mock.Of()); + + // Act + LoggingPlugin plugin = new(loggerFactory.Object); + + // Assert + plugin.OrchestrationInterceptors.Should().HaveCount(1); + plugin.ActivityInterceptors.Should().HaveCount(1); + } +} diff --git a/test/Extensions.Plugins.Tests/MetricsPluginTests.cs b/test/Extensions.Plugins.Tests/MetricsPluginTests.cs new file mode 100644 index 000000000..0100cb776 --- /dev/null +++ b/test/Extensions.Plugins.Tests/MetricsPluginTests.cs @@ -0,0 +1,88 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class MetricsPluginTests +{ + [Fact] + public void MetricsPlugin_HasCorrectName() + { + // Arrange & Act + MetricsPlugin plugin = new(); + + // Assert + plugin.Name.Should().Be(MetricsPlugin.DefaultName); + } + + [Fact] + public async Task MetricsPlugin_TracksOrchestrationStarted() + { + // Arrange + MetricsStore store = new(); + MetricsPlugin plugin = new(store); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await plugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(context); + + // Assert + store.GetOrchestrationMetrics("TestOrch").Started.Should().Be(1); + } + + [Fact] + public async Task MetricsPlugin_TracksOrchestrationCompleted() + { + // Arrange + MetricsStore store = new(); + MetricsPlugin plugin = new(store); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await plugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(context); + await plugin.OrchestrationInterceptors[0].OnOrchestrationCompletedAsync(context, "result"); + + // Assert + store.GetOrchestrationMetrics("TestOrch").Completed.Should().Be(1); + store.GetOrchestrationMetrics("TestOrch").TotalDuration.Should().BeGreaterThan(TimeSpan.Zero); + } + + [Fact] + public async Task MetricsPlugin_TracksOrchestrationFailed() + { + // Arrange + MetricsStore store = new(); + MetricsPlugin plugin = new(store); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + Exception exception = new InvalidOperationException("test"); + + // Act + await plugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(context); + await plugin.OrchestrationInterceptors[0].OnOrchestrationFailedAsync(context, exception); + + // Assert + store.GetOrchestrationMetrics("TestOrch").Failed.Should().Be(1); + } + + [Fact] + public async Task MetricsPlugin_TracksActivityLifecycle() + { + // Arrange + MetricsStore store = new(); + MetricsPlugin plugin = new(store); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act + await plugin.ActivityInterceptors[0].OnActivityStartingAsync(context); + await plugin.ActivityInterceptors[0].OnActivityCompletedAsync(context, "result"); + + // Assert + store.GetActivityMetrics("TestActivity").Started.Should().Be(1); + store.GetActivityMetrics("TestActivity").Completed.Should().Be(1); + } +} diff --git a/test/Extensions.Plugins.Tests/PluginPipelineTests.cs b/test/Extensions.Plugins.Tests/PluginPipelineTests.cs new file mode 100644 index 000000000..ae78e7a9e --- /dev/null +++ b/test/Extensions.Plugins.Tests/PluginPipelineTests.cs @@ -0,0 +1,132 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Moq; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class PluginPipelineTests +{ + [Fact] + public async Task ExecuteOrchestrationStarting_InvokesAllInterceptors() + { + // Arrange + Mock interceptor1 = new(); + Mock interceptor2 = new(); + + SimplePlugin plugin = SimplePlugin.NewBuilder("test") + .AddOrchestrationInterceptor(interceptor1.Object) + .AddOrchestrationInterceptor(interceptor2.Object) + .Build(); + + PluginPipeline pipeline = new(new[] { plugin }); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await pipeline.ExecuteOrchestrationStartingAsync(context); + + // Assert + interceptor1.Verify(i => i.OnOrchestrationStartingAsync(context), Times.Once); + interceptor2.Verify(i => i.OnOrchestrationStartingAsync(context), Times.Once); + } + + [Fact] + public async Task ExecuteActivityStarting_InvokesAllInterceptors() + { + // Arrange + Mock interceptor1 = new(); + Mock interceptor2 = new(); + + SimplePlugin plugin = SimplePlugin.NewBuilder("test") + .AddActivityInterceptor(interceptor1.Object) + .AddActivityInterceptor(interceptor2.Object) + .Build(); + + PluginPipeline pipeline = new(new[] { plugin }); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act + await pipeline.ExecuteActivityStartingAsync(context); + + // Assert + interceptor1.Verify(i => i.OnActivityStartingAsync(context), Times.Once); + interceptor2.Verify(i => i.OnActivityStartingAsync(context), Times.Once); + } + + [Fact] + public async Task ExecuteOrchestrationCompleted_InvokesAllInterceptors() + { + // Arrange + Mock interceptor = new(); + + SimplePlugin plugin = SimplePlugin.NewBuilder("test") + .AddOrchestrationInterceptor(interceptor.Object) + .Build(); + + PluginPipeline pipeline = new(new[] { plugin }); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await pipeline.ExecuteOrchestrationCompletedAsync(context, "result"); + + // Assert + interceptor.Verify(i => i.OnOrchestrationCompletedAsync(context, "result"), Times.Once); + } + + [Fact] + public async Task ExecuteOrchestrationFailed_InvokesAllInterceptors() + { + // Arrange + Mock interceptor = new(); + Exception exception = new InvalidOperationException("test error"); + + SimplePlugin plugin = SimplePlugin.NewBuilder("test") + .AddOrchestrationInterceptor(interceptor.Object) + .Build(); + + PluginPipeline pipeline = new(new[] { plugin }); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await pipeline.ExecuteOrchestrationFailedAsync(context, exception); + + // Assert + interceptor.Verify(i => i.OnOrchestrationFailedAsync(context, exception), Times.Once); + } + + [Fact] + public async Task Pipeline_WithMultiplePlugins_InvokesInOrder() + { + // Arrange + List sequence = new(); + Mock interceptor1 = new(); + interceptor1.Setup(i => i.OnOrchestrationStartingAsync(It.IsAny())) + .Callback(() => sequence.Add("plugin1")) + .Returns(Task.CompletedTask); + + Mock interceptor2 = new(); + interceptor2.Setup(i => i.OnOrchestrationStartingAsync(It.IsAny())) + .Callback(() => sequence.Add("plugin2")) + .Returns(Task.CompletedTask); + + SimplePlugin plugin1 = SimplePlugin.NewBuilder("plugin1") + .AddOrchestrationInterceptor(interceptor1.Object) + .Build(); + SimplePlugin plugin2 = SimplePlugin.NewBuilder("plugin2") + .AddOrchestrationInterceptor(interceptor2.Object) + .Build(); + + PluginPipeline pipeline = new(new IDurableTaskPlugin[] { plugin1, plugin2 }); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await pipeline.ExecuteOrchestrationStartingAsync(context); + + // Assert + sequence.Should().ContainInOrder("plugin1", "plugin2"); + } +} diff --git a/test/Extensions.Plugins.Tests/RateLimitingPluginTests.cs b/test/Extensions.Plugins.Tests/RateLimitingPluginTests.cs new file mode 100644 index 000000000..792170001 --- /dev/null +++ b/test/Extensions.Plugins.Tests/RateLimitingPluginTests.cs @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class RateLimitingPluginTests +{ + [Fact] + public async Task RateLimitingPlugin_AllowsWithinLimit() + { + // Arrange + RateLimitingPlugin plugin = new(new RateLimitingOptions { MaxTokens = 5 }); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act & Assert - should allow 5 calls + for (int i = 0; i < 5; i++) + { + await plugin.ActivityInterceptors[0].Invoking( + a => a.OnActivityStartingAsync(context)) + .Should().NotThrowAsync(); + } + } + + [Fact] + public async Task RateLimitingPlugin_DeniesExceedingLimit() + { + // Arrange + RateLimitingPlugin plugin = new(new RateLimitingOptions + { + MaxTokens = 2, + RefillRate = 0, + RefillInterval = TimeSpan.FromHours(1), + }); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act - exhaust the tokens + await plugin.ActivityInterceptors[0].OnActivityStartingAsync(context); + await plugin.ActivityInterceptors[0].OnActivityStartingAsync(context); + + // Assert - should now throw + await plugin.ActivityInterceptors[0].Invoking( + a => a.OnActivityStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*Rate limit exceeded*"); + } + + [Fact] + public async Task RateLimitingPlugin_PerActivityName() + { + // Arrange + RateLimitingPlugin plugin = new(new RateLimitingOptions + { + MaxTokens = 1, + RefillRate = 0, + RefillInterval = TimeSpan.FromHours(1), + }); + ActivityInterceptorContext context1 = new("Activity1", "instance1", "input"); + ActivityInterceptorContext context2 = new("Activity2", "instance1", "input"); + + // Act - consume token for Activity1 + await plugin.ActivityInterceptors[0].OnActivityStartingAsync(context1); + + // Assert - Activity2 should still have its own bucket + await plugin.ActivityInterceptors[0].Invoking( + a => a.OnActivityStartingAsync(context2)) + .Should().NotThrowAsync(); + + // Assert - Activity1 is exhausted + await plugin.ActivityInterceptors[0].Invoking( + a => a.OnActivityStartingAsync(context1)) + .Should().ThrowAsync(); + } + + [Fact] + public void RateLimitingPlugin_HasNoOrchestrationInterceptors() + { + // Arrange & Act + RateLimitingPlugin plugin = new(new RateLimitingOptions()); + + // Assert + plugin.OrchestrationInterceptors.Should().BeEmpty(); + plugin.ActivityInterceptors.Should().HaveCount(1); + } +} diff --git a/test/Extensions.Plugins.Tests/SimplePluginTests.cs b/test/Extensions.Plugins.Tests/SimplePluginTests.cs new file mode 100644 index 000000000..bd541efbc --- /dev/null +++ b/test/Extensions.Plugins.Tests/SimplePluginTests.cs @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class SimplePluginTests +{ + [Fact] + public void SimplePlugin_HasCorrectName() + { + // Arrange & Act + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.TestPlugin").Build(); + + // Assert + plugin.Name.Should().Be("MyOrg.TestPlugin"); + } + + [Fact] + public void SimplePlugin_RegistersTasks_DoesNotThrow() + { + // Arrange + bool activityRegistered = false; + bool orchestratorRegistered = false; + + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.TaskPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc("PluginActivity", (ctx, input) => + { + activityRegistered = true; + return $"processed: {input}"; + }); + registry.AddOrchestratorFunc("PluginOrchestration", ctx => + { + orchestratorRegistered = true; + return Task.FromResult("done"); + }); + }) + .Build(); + + DurableTaskRegistry registry = new(); + + // Act — should not throw + plugin.Invoking(p => p.RegisterTasks(registry)).Should().NotThrow(); + + // The registry accepted the registrations (no duplicate errors) + // Registering the same names again should throw (proving first call succeeded) + plugin.Invoking(p => p.RegisterTasks(registry)).Should().Throw(); + } + + [Fact] + public void SimplePlugin_NoTasks_RegisterTasksIsNoOp() + { + // Arrange + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.InterceptorOnly").Build(); + DurableTaskRegistry registry = new(); + + // Act — no-op, should not throw + plugin.Invoking(p => p.RegisterTasks(registry)).Should().NotThrow(); + } + + [Fact] + public void SimplePlugin_CombinesTasksAndInterceptors() + { + // Arrange + Moq.Mock interceptor = new(); + + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.FullPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc("PluginActivity", (ctx, input) => input); + }) + .AddOrchestrationInterceptor(interceptor.Object) + .Build(); + + DurableTaskRegistry registry = new(); + + // Act + plugin.RegisterTasks(registry); + + // Assert — interceptors are separate from tasks + plugin.OrchestrationInterceptors.Should().HaveCount(1); + + // Tasks were registered (attempting again should throw) + plugin.Invoking(p => p.RegisterTasks(registry)) + .Should().Throw() + .WithMessage("*PluginActivity*"); + } + + [Fact] + public void SimplePlugin_MultipleAddTasks_RegistersAll() + { + // Arrange + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.MultiPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc("Activity1", (ctx, input) => input); + }) + .AddTasks(registry => + { + registry.AddActivityFunc("Activity2", (ctx, input) => input); + }) + .Build(); + + DurableTaskRegistry registry = new(); + + // Act + plugin.RegisterTasks(registry); + + // Assert — both activities are registered (re-registering would throw for both) + Action reRegister = () => plugin.RegisterTasks(registry); + reRegister.Should().Throw().WithMessage("*Activity1*"); + } +} diff --git a/test/Extensions.Plugins.Tests/ValidationPluginTests.cs b/test/Extensions.Plugins.Tests/ValidationPluginTests.cs new file mode 100644 index 000000000..40654f863 --- /dev/null +++ b/test/Extensions.Plugins.Tests/ValidationPluginTests.cs @@ -0,0 +1,83 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class ValidationPluginTests +{ + [Fact] + public async Task ValidationPlugin_PassesValidInput() + { + // Arrange + IInputValidator validator = new AlwaysValidValidator(); + ValidationPlugin plugin = new(validator); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, "valid-input"); + + // Act & Assert + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().NotThrowAsync(); + } + + [Fact] + public async Task ValidationPlugin_RejectsInvalidInput() + { + // Arrange + IInputValidator validator = new AlwaysInvalidValidator(); + ValidationPlugin plugin = new(validator); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, "bad"); + + // Act & Assert + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*validation failed*"); + } + + [Fact] + public async Task ValidationPlugin_RejectsInvalidActivityInput() + { + // Arrange + IInputValidator validator = new AlwaysInvalidValidator(); + ValidationPlugin plugin = new(validator); + ActivityInterceptorContext context = new("TestActivity", "instance1", "bad"); + + // Act & Assert + await plugin.ActivityInterceptors[0].Invoking( + i => i.OnActivityStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*validation failed*"); + } + + [Fact] + public async Task ValidationPlugin_RunsMultipleValidators() + { + // Arrange + IInputValidator validator1 = new AlwaysValidValidator(); + IInputValidator validator2 = new AlwaysInvalidValidator(); + ValidationPlugin plugin = new(validator1, validator2); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, "input"); + + // Act & Assert - second validator should cause failure + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().ThrowAsync(); + } + + sealed class AlwaysValidValidator : IInputValidator + { + public Task ValidateAsync(TaskName taskName, object? input) => + Task.FromResult(ValidationResult.Success); + } + + sealed class AlwaysInvalidValidator : IInputValidator + { + public Task ValidateAsync(TaskName taskName, object? input) => + Task.FromResult(ValidationResult.Failure("Input is invalid")); + } +}