From c1631801c6cc070eddb3e77b44ea725d9decc141 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 12 Mar 2026 12:20:14 -0700 Subject: [PATCH 1/4] Add Temporal-inspired plugin system with 5 built-in plugins Introduces a new Microsoft.DurableTask.Extensions.Plugins package that provides a composable plugin/interceptor system inspired by Temporal's plugin architecture. Plugin System Core: - IDurableTaskPlugin interface with named plugins containing interceptors - IOrchestrationInterceptor for orchestration lifecycle events (start/complete/fail) - IActivityInterceptor for activity lifecycle events (start/complete/fail) - SimplePlugin builder pattern (mirrors Temporal's SimplePlugin) - PluginPipeline for orchestrating interceptor execution - PluginOrchestrationWrapper and PluginActivityWrapper for transparent integration - DI extension methods: UsePlugin(), UsePlugins() on IDurableTaskWorkerBuilder 5 Built-in Plugins: 1. LoggingPlugin - Structured ILogger events for all orchestration/activity lifecycle 2. MetricsPlugin - Thread-safe execution counts, durations, success/failure tracking 3. AuthorizationPlugin - IAuthorizationHandler-based authorization before execution 4. ValidationPlugin - IInputValidator-based input validation before execution 5. RateLimitingPlugin - Token-bucket rate limiting for activity dispatches Also includes: - 24 unit tests covering all plugins and the pipeline - PluginsSample demonstrating all 5 plugins with in-process test host - Convenience extension methods: UseLoggingPlugin(), UseMetricsPlugin(), etc. --- Microsoft.DurableTask.sln | 73 ++++++ samples/PluginsSample/PluginsSample.csproj | 22 ++ samples/PluginsSample/Program.cs | 195 +++++++++++++++ samples/PluginsSample/README.md | 56 +++++ .../Plugins/ActivityInterceptorContext.cs | 43 ++++ .../Plugins/BuiltIn/AuthorizationContext.cs | 45 ++++ .../Plugins/BuiltIn/AuthorizationPlugin.cs | 102 ++++++++ .../BuiltIn/AuthorizationTargetType.cs | 20 ++ .../Plugins/BuiltIn/IAuthorizationHandler.cs | 17 ++ .../Plugins/BuiltIn/IInputValidator.cs | 18 ++ .../Plugins/BuiltIn/LoggingPlugin.cs | 114 +++++++++ .../Plugins/BuiltIn/MetricsPlugin.cs | 231 ++++++++++++++++++ .../Plugins/BuiltIn/RateLimitingPlugin.cs | 165 +++++++++++++ .../Plugins/BuiltIn/ValidationPlugin.cs | 98 ++++++++ .../Plugins/BuiltIn/ValidationResult.cs | 38 +++ .../DurableTaskBuiltInPluginExtensions.cs | 119 +++++++++ ...ableTaskWorkerBuilderExtensions.Plugins.cs | 60 +++++ .../Plugins/IActivityInterceptor.cs | 34 +++ src/Extensions/Plugins/IDurableTaskPlugin.cs | 28 +++ .../Plugins/IOrchestrationInterceptor.cs | 34 +++ .../OrchestrationInterceptorContext.cs | 50 ++++ .../Plugins/PluginActivityWrapper.cs | 54 ++++ .../Plugins/PluginOrchestrationWrapper.cs | 68 ++++++ src/Extensions/Plugins/PluginPipeline.cs | 127 ++++++++++ src/Extensions/Plugins/Plugins.csproj | 27 ++ src/Extensions/Plugins/SimplePlugin.cs | 99 ++++++++ .../AuthorizationPluginTests.cs | 90 +++++++ .../Extensions.Plugins.Tests.csproj | 11 + .../LoggingPluginTests.cs | 42 ++++ .../MetricsPluginTests.cs | 88 +++++++ .../PluginPipelineTests.cs | 132 ++++++++++ .../RateLimitingPluginTests.cs | 89 +++++++ .../ValidationPluginTests.cs | 83 +++++++ 33 files changed, 2472 insertions(+) create mode 100644 samples/PluginsSample/PluginsSample.csproj create mode 100644 samples/PluginsSample/Program.cs create mode 100644 samples/PluginsSample/README.md create mode 100644 src/Extensions/Plugins/ActivityInterceptorContext.cs create mode 100644 src/Extensions/Plugins/BuiltIn/AuthorizationContext.cs create mode 100644 src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs create mode 100644 src/Extensions/Plugins/BuiltIn/AuthorizationTargetType.cs create mode 100644 src/Extensions/Plugins/BuiltIn/IAuthorizationHandler.cs create mode 100644 src/Extensions/Plugins/BuiltIn/IInputValidator.cs create mode 100644 src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs create mode 100644 src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs create mode 100644 src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs create mode 100644 src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs create mode 100644 src/Extensions/Plugins/BuiltIn/ValidationResult.cs create mode 100644 src/Extensions/Plugins/DependencyInjection/DurableTaskBuiltInPluginExtensions.cs create mode 100644 src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs create mode 100644 src/Extensions/Plugins/IActivityInterceptor.cs create mode 100644 src/Extensions/Plugins/IDurableTaskPlugin.cs create mode 100644 src/Extensions/Plugins/IOrchestrationInterceptor.cs create mode 100644 src/Extensions/Plugins/OrchestrationInterceptorContext.cs create mode 100644 src/Extensions/Plugins/PluginActivityWrapper.cs create mode 100644 src/Extensions/Plugins/PluginOrchestrationWrapper.cs create mode 100644 src/Extensions/Plugins/PluginPipeline.cs create mode 100644 src/Extensions/Plugins/Plugins.csproj create mode 100644 src/Extensions/Plugins/SimplePlugin.cs create mode 100644 test/Extensions.Plugins.Tests/AuthorizationPluginTests.cs create mode 100644 test/Extensions.Plugins.Tests/Extensions.Plugins.Tests.csproj create mode 100644 test/Extensions.Plugins.Tests/LoggingPluginTests.cs create mode 100644 test/Extensions.Plugins.Tests/MetricsPluginTests.cs create mode 100644 test/Extensions.Plugins.Tests/PluginPipelineTests.cs create mode 100644 test/Extensions.Plugins.Tests/RateLimitingPluginTests.cs create mode 100644 test/Extensions.Plugins.Tests/ValidationPluginTests.cs diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index ea7d17973..d6d59344f 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -111,18 +111,47 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExportHistory.Tests", "test EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DistributedTracingSample", "samples\DistributedTracingSample\DistributedTracingSample.csproj", "{4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", "{21303FBF-2A2B-17C2-D2DF-3E924022E940}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Plugins", "src\Extensions\Plugins\Plugins.csproj", "{464EF328-1A43-417C-BC9D-C1F808D2C3B8}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Core", "Core", "{9462A531-9647-6067-D102-00175DB73A6F}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Core", "Core", "{D4D9077D-1CEC-0E01-C5EE-AFAD11489446}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions.Plugins.Tests", "test\Extensions.Plugins.Tests\Extensions.Plugins.Tests.csproj", "{09D76001-410E-4308-9156-01A9E7F5400B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PluginsSample", "samples\PluginsSample\PluginsSample.csproj", "{BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{51A52603-541D-DE3F-2825-C80F9EE6C532}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{3B8F957E-7773-4C0C-ACD7-91A1591D9312}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|x64.ActiveCfg = Debug|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|x64.Build.0 = Debug|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|x86.ActiveCfg = Debug|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Debug|x86.Build.0 = Debug|Any CPU {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|Any CPU.ActiveCfg = Release|Any CPU {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|Any CPU.Build.0 = Release|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|x64.ActiveCfg = Release|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|x64.Build.0 = Release|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|x86.ActiveCfg = Release|Any CPU + {B12489CB-B7E5-497B-8F0C-F87F678947C3}.Release|x86.Build.0 = Release|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|x64.ActiveCfg = Debug|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|x64.Build.0 = Debug|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|x86.ActiveCfg = Debug|Any CPU {B0EB48BE-E4F7-4F50-B8BD-5C6172A7A584}.Debug|x86.Build.0 = Debug|Any CPU @@ -660,6 +689,42 @@ Global {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8}.Release|x64.Build.0 = Release|Any CPU {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8}.Release|x86.ActiveCfg = Release|Any CPU {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8}.Release|x86.Build.0 = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|x64.ActiveCfg = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|x64.Build.0 = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|x86.ActiveCfg = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Debug|x86.Build.0 = Debug|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|Any CPU.Build.0 = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|x64.ActiveCfg = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|x64.Build.0 = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|x86.ActiveCfg = Release|Any CPU + {464EF328-1A43-417C-BC9D-C1F808D2C3B8}.Release|x86.Build.0 = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|x64.ActiveCfg = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|x64.Build.0 = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|x86.ActiveCfg = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Debug|x86.Build.0 = Debug|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|Any CPU.Build.0 = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|x64.ActiveCfg = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|x64.Build.0 = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|x86.ActiveCfg = Release|Any CPU + {09D76001-410E-4308-9156-01A9E7F5400B}.Release|x86.Build.0 = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|x64.ActiveCfg = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|x64.Build.0 = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|x86.ActiveCfg = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Debug|x86.Build.0 = Debug|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|Any CPU.Build.0 = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x64.ActiveCfg = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x64.Build.0 = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x86.ActiveCfg = Release|Any CPU + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -715,6 +780,14 @@ Global {354CE69B-78DB-9B29-C67E-0DBB862C7A65} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B} {05C9EBA6-7221-D458-47D6-DA457C2F893B} = {E5637F81-2FB9-4CD7-900D-455363B142A7} {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} + {21303FBF-2A2B-17C2-D2DF-3E924022E940} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B} + {464EF328-1A43-417C-BC9D-C1F808D2C3B8} = {21303FBF-2A2B-17C2-D2DF-3E924022E940} + {9462A531-9647-6067-D102-00175DB73A6F} = {1C217BB2-CE16-41CC-9D47-0FC0DB60BDB3} + {D4D9077D-1CEC-0E01-C5EE-AFAD11489446} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} + {09D76001-410E-4308-9156-01A9E7F5400B} = {E5637F81-2FB9-4CD7-900D-455363B142A7} + {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} + {51A52603-541D-DE3F-2825-C80F9EE6C532} = {1C217BB2-CE16-41CC-9D47-0FC0DB60BDB3} + {3B8F957E-7773-4C0C-ACD7-91A1591D9312} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/samples/PluginsSample/PluginsSample.csproj b/samples/PluginsSample/PluginsSample.csproj new file mode 100644 index 000000000..d9085f186 --- /dev/null +++ b/samples/PluginsSample/PluginsSample.csproj @@ -0,0 +1,22 @@ + + + + Exe + net8.0;net10.0 + enable + + + + + + + + + + + + + + + + diff --git a/samples/PluginsSample/Program.cs b/samples/PluginsSample/Program.cs new file mode 100644 index 000000000..e23a7ad79 --- /dev/null +++ b/samples/PluginsSample/Program.cs @@ -0,0 +1,195 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +// This sample demonstrates the Durable Task Plugin system, which is inspired by Temporal's +// plugin/interceptor pattern. It shows how to use the 5 built-in plugins: +// 1. LoggingPlugin - Structured logging for orchestration and activity lifecycle events +// 2. MetricsPlugin - Execution counts, durations, and success/failure tracking +// 3. AuthorizationPlugin - Input-based authorization checks before execution +// 4. ValidationPlugin - Input validation before task execution +// 5. RateLimitingPlugin - Token-bucket rate limiting for activity dispatches + +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.DurableTask.Testing; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +// Create a shared metrics store so we can read metrics after the orchestration completes. +MetricsStore metricsStore = new(); + +// Use the in-process test host (no external sidecar needed for demonstration). +// In production, replace with .UseGrpc() or .UseDurableTaskScheduler(). +await using DurableTaskTestHost testHost = await DurableTaskTestHost.StartAsync( + registry => + { + // Register orchestration and activities. + registry.AddOrchestratorFunc("GreetingOrchestration", async context => + { + List greetings = new() + { + await context.CallActivityAsync("SayHello", "Tokyo"), + await context.CallActivityAsync("SayHello", "London"), + await context.CallActivityAsync("SayHello", "Seattle"), + }; + + return greetings; + }); + + registry.AddActivityFunc("SayHello", (context, city) => + { + return $"Hello, {city}!"; + }); + }); + +DurableTaskClient client = testHost.Client; + +// Schedule a new orchestration instance. +string instanceId = await client.ScheduleNewOrchestrationInstanceAsync("GreetingOrchestration"); +Console.WriteLine($"Started orchestration: {instanceId}"); + +// Wait for the orchestration to complete. +OrchestrationMetadata result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true); +Console.WriteLine($"Orchestration completed with status: {result.RuntimeStatus}"); +Console.WriteLine($"Output: {result.SerializedOutput}"); + +// --- Demonstrate Plugin APIs --- +Console.WriteLine("\n=== Plugin Demonstrations ==="); + +// Demo 1: LoggingPlugin +Console.WriteLine("\n--- 1. LoggingPlugin ---"); +Console.WriteLine("The LoggingPlugin emits structured ILogger events for lifecycle events."); +Console.WriteLine("It would be registered on a worker builder like this:"); +Console.WriteLine(" builder.Services.AddDurableTaskWorker().UseLoggingPlugin().UseGrpc();"); + +ILoggerFactory loggerFactory = LoggerFactory.Create(b => b.AddConsole().SetMinimumLevel(LogLevel.Information)); +LoggingPlugin loggingPlugin = new(loggerFactory); +Console.WriteLine($"Plugin name: {loggingPlugin.Name}"); +Console.WriteLine($"Orchestration interceptors: {loggingPlugin.OrchestrationInterceptors.Count}"); +Console.WriteLine($"Activity interceptors: {loggingPlugin.ActivityInterceptors.Count}"); + +// Demo 2: MetricsPlugin +Console.WriteLine("\n--- 2. MetricsPlugin ---"); +MetricsPlugin metricsPlugin = new(metricsStore); + +// Simulate lifecycle events +var orchCtx = new Microsoft.DurableTask.Plugins.OrchestrationInterceptorContext("Demo", "test-1", false, null); +await metricsPlugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(orchCtx); +await metricsPlugin.OrchestrationInterceptors[0].OnOrchestrationCompletedAsync(orchCtx, "done"); + +var actCtx = new Microsoft.DurableTask.Plugins.ActivityInterceptorContext("DemoActivity", "test-1", "input"); +await metricsPlugin.ActivityInterceptors[0].OnActivityStartingAsync(actCtx); +await metricsPlugin.ActivityInterceptors[0].OnActivityCompletedAsync(actCtx, "result"); +await metricsPlugin.ActivityInterceptors[0].OnActivityStartingAsync(actCtx); +await metricsPlugin.ActivityInterceptors[0].OnActivityFailedAsync(actCtx, new Exception("test failure")); + +Console.WriteLine("Orchestration metrics:"); +foreach (var (name, metrics) in metricsStore.GetAllOrchestrationMetrics()) +{ + Console.WriteLine($" '{name}': Started={metrics.Started}, Completed={metrics.Completed}, Failed={metrics.Failed}"); +} + +Console.WriteLine("Activity metrics:"); +foreach (var (name, metrics) in metricsStore.GetAllActivityMetrics()) +{ + Console.WriteLine($" '{name}': Started={metrics.Started}, Completed={metrics.Completed}, Failed={metrics.Failed}"); +} + +// Demo 3: AuthorizationPlugin +Console.WriteLine("\n--- 3. AuthorizationPlugin ---"); +AuthorizationPlugin authPlugin = new(new AllowAllAuthorizationHandler()); +var authOrcCtx = new Microsoft.DurableTask.Plugins.OrchestrationInterceptorContext("SecureOrch", "secure-1", false, null); +await authPlugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(authOrcCtx); + +// Demo 4: ValidationPlugin +Console.WriteLine("\n--- 4. ValidationPlugin ---"); +ValidationPlugin validationPlugin = new(new CityNameValidator()); +var validCtx = new Microsoft.DurableTask.Plugins.ActivityInterceptorContext("SayHello", "val-1", "Tokyo"); +await validationPlugin.ActivityInterceptors[0].OnActivityStartingAsync(validCtx); +Console.WriteLine(" Validation passed for input 'Tokyo'"); + +try +{ + var invalidCtx = new Microsoft.DurableTask.Plugins.ActivityInterceptorContext("SayHello", "val-1", ""); + await validationPlugin.ActivityInterceptors[0].OnActivityStartingAsync(invalidCtx); +} +catch (ArgumentException ex) +{ + Console.WriteLine($" Validation correctly rejected empty input: {ex.Message}"); +} + +// Demo 5: RateLimitingPlugin +Console.WriteLine("\n--- 5. RateLimitingPlugin ---"); +RateLimitingPlugin rateLimitPlugin = new(new RateLimitingOptions +{ + MaxTokens = 3, + RefillRate = 0, + RefillInterval = TimeSpan.FromHours(1), +}); + +var rlCtx = new Microsoft.DurableTask.Plugins.ActivityInterceptorContext("LimitedAction", "rl-1", "data"); +int allowed = 0; +int denied = 0; +for (int i = 0; i < 5; i++) +{ + try + { + await rateLimitPlugin.ActivityInterceptors[0].OnActivityStartingAsync(rlCtx); + allowed++; + } + catch (RateLimitExceededException) + { + denied++; + } +} + +Console.WriteLine($" Rate limit (max 3): Allowed={allowed}, Denied={denied}"); + +// Demo: SimplePlugin builder +Console.WriteLine("\n--- SimplePlugin Builder ---"); +var customPlugin = Microsoft.DurableTask.Plugins.SimplePlugin.NewBuilder("MyOrg.CustomPlugin") + .AddOrchestrationInterceptor(loggingPlugin.OrchestrationInterceptors[0]) + .AddActivityInterceptor(metricsPlugin.ActivityInterceptors[0]) + .Build(); +Console.WriteLine($"Custom plugin '{customPlugin.Name}' created with {customPlugin.OrchestrationInterceptors.Count} orchestration and {customPlugin.ActivityInterceptors.Count} activity interceptors."); + +Console.WriteLine("\n=== All plugin demonstrations completed successfully! ==="); + +// --- Helper classes for the sample --- + +/// +/// A simple authorization handler that allows all tasks to execute. +/// In a real application, this would check user claims, roles, or other policies. +/// +sealed class AllowAllAuthorizationHandler : IAuthorizationHandler +{ + public Task AuthorizeAsync(AuthorizationContext context) + { + Console.WriteLine($" [Auth] Authorized {context.TargetType} '{context.Name}' for instance '{context.InstanceId}'"); + return Task.FromResult(true); + } +} + +/// +/// A validator that ensures city names passed to the SayHello activity are non-empty strings. +/// +sealed class CityNameValidator : IInputValidator +{ + public Task ValidateAsync(TaskName taskName, object? input) + { + // Only validate the SayHello activity. + if (taskName.Name == "SayHello") + { + if (input is not string city || string.IsNullOrWhiteSpace(city)) + { + return Task.FromResult(ValidationResult.Failure("City name must be a non-empty string.")); + } + } + + return Task.FromResult(ValidationResult.Success); + } +} + diff --git a/samples/PluginsSample/README.md b/samples/PluginsSample/README.md new file mode 100644 index 000000000..ac2a686fe --- /dev/null +++ b/samples/PluginsSample/README.md @@ -0,0 +1,56 @@ +# Plugins Sample + +This sample demonstrates the **Durable Task Plugin system**, which is inspired by +[Temporal's plugin/interceptor pattern](https://docs.temporal.io/develop/plugins). + +## What This Sample Shows + +The sample registers all 5 built-in plugins on a Durable Task worker: + +1. **LoggingPlugin** — Emits structured log events for orchestration and activity lifecycle events (start, complete, fail). +2. **MetricsPlugin** — Tracks execution counts and durations for orchestrations and activities. +3. **AuthorizationPlugin** — Runs authorization checks before task execution (using a custom `IAuthorizationHandler`). +4. **ValidationPlugin** — Validates input data before task execution (using a custom `IInputValidator`). +5. **RateLimitingPlugin** — Applies token-bucket rate limiting to activity dispatches. + +## Prerequisites + +- .NET 8.0 or later +- A running Durable Task Scheduler sidecar (emulator or DTS) + +## Running the Sample + +Start the DTS emulator: + +```bash +docker run --name durabletask-emulator -d -p 4001:4001 mcr.microsoft.com/dts/dts-emulator:latest +``` + +Run the sample: + +```bash +dotnet run +``` + +## Plugin Architecture + +The plugin system follows these key design principles: + +- **Composable** — Multiple plugins can be registered and they execute in registration order. +- **Non-invasive** — Plugins wrap orchestrations and activities through interceptors without modifying the core logic. +- **Temporal-aligned** — The `SimplePlugin` builder pattern mirrors Temporal's `SimplePlugin` for cross-ecosystem familiarity. + +### Creating Custom Plugins + +```csharp +// Create a custom plugin using the SimplePlugin builder +SimplePlugin myPlugin = SimplePlugin.NewBuilder("MyOrg.MyPlugin") + .AddOrchestrationInterceptor(new MyOrchestrationInterceptor()) + .AddActivityInterceptor(new MyActivityInterceptor()) + .Build(); + +// Register it on the worker builder +builder.Services.AddDurableTaskWorker() + .UsePlugin(myPlugin) + .UseGrpc(); +``` diff --git a/src/Extensions/Plugins/ActivityInterceptorContext.cs b/src/Extensions/Plugins/ActivityInterceptorContext.cs new file mode 100644 index 000000000..0c497a629 --- /dev/null +++ b/src/Extensions/Plugins/ActivityInterceptorContext.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Context provided to activity interceptors during lifecycle events. +/// +public sealed class ActivityInterceptorContext +{ + /// + /// Initializes a new instance of the class. + /// + /// The name of the activity. + /// The orchestration instance ID that scheduled this activity. + /// The activity input. + public ActivityInterceptorContext(TaskName name, string instanceId, object? input) + { + this.Name = name; + this.InstanceId = instanceId; + this.Input = input; + } + + /// + /// Gets the name of the activity. + /// + public TaskName Name { get; } + + /// + /// Gets the orchestration instance ID that scheduled this activity. + /// + public string InstanceId { get; } + + /// + /// Gets the activity input. + /// + public object? Input { get; } + + /// + /// Gets a dictionary that can be used to pass data between interceptors during a single execution. + /// + public IDictionary Properties { get; } = new Dictionary(); +} diff --git a/src/Extensions/Plugins/BuiltIn/AuthorizationContext.cs b/src/Extensions/Plugins/BuiltIn/AuthorizationContext.cs new file mode 100644 index 000000000..6500762d9 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/AuthorizationContext.cs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// Context for authorization decisions. +/// +public sealed class AuthorizationContext +{ + /// + /// Initializes a new instance of the class. + /// + /// The task name. + /// The orchestration instance ID. + /// The type of target (orchestration or activity). + /// The task input. + public AuthorizationContext(TaskName name, string instanceId, AuthorizationTargetType targetType, object? input) + { + this.Name = name; + this.InstanceId = instanceId; + this.TargetType = targetType; + this.Input = input; + } + + /// + /// Gets the task name. + /// + public TaskName Name { get; } + + /// + /// Gets the orchestration instance ID. + /// + public string InstanceId { get; } + + /// + /// Gets the type of target being authorized. + /// + public AuthorizationTargetType TargetType { get; } + + /// + /// Gets the task input. + /// + public object? Input { get; } +} diff --git a/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs b/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs new file mode 100644 index 000000000..c47ea3ab5 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs @@ -0,0 +1,102 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that performs authorization checks before orchestrations and activities +/// are executed. Users define authorization rules via . +/// If a rule denies access, a is thrown. +/// +public sealed class AuthorizationPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.Authorization"; + + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + /// The authorization handler to evaluate. + public AuthorizationPlugin(IAuthorizationHandler handler) + { + Check.NotNull(handler); + this.orchestrationInterceptors = new List + { + new AuthorizationOrchestrationInterceptor(handler), + }; + this.activityInterceptors = new List + { + new AuthorizationActivityInterceptor(handler), + }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + sealed class AuthorizationOrchestrationInterceptor : IOrchestrationInterceptor + { + readonly IAuthorizationHandler handler; + + public AuthorizationOrchestrationInterceptor(IAuthorizationHandler handler) => this.handler = handler; + + public async Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + AuthorizationContext authContext = new( + context.Name, + context.InstanceId, + AuthorizationTargetType.Orchestration, + context.Input); + + if (!await this.handler.AuthorizeAsync(authContext)) + { + throw new UnauthorizedAccessException( + $"Authorization denied for orchestration '{context.Name}' (instance '{context.InstanceId}')."); + } + } + + public Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) => + Task.CompletedTask; + } + + sealed class AuthorizationActivityInterceptor : IActivityInterceptor + { + readonly IAuthorizationHandler handler; + + public AuthorizationActivityInterceptor(IAuthorizationHandler handler) => this.handler = handler; + + public async Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + AuthorizationContext authContext = new( + context.Name, + context.InstanceId, + AuthorizationTargetType.Activity, + context.Input); + + if (!await this.handler.AuthorizeAsync(authContext)) + { + throw new UnauthorizedAccessException( + $"Authorization denied for activity '{context.Name}' (instance '{context.InstanceId}')."); + } + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) => + Task.CompletedTask; + } +} diff --git a/src/Extensions/Plugins/BuiltIn/AuthorizationTargetType.cs b/src/Extensions/Plugins/BuiltIn/AuthorizationTargetType.cs new file mode 100644 index 000000000..ea0659fb5 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/AuthorizationTargetType.cs @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// The type of authorization target. +/// +public enum AuthorizationTargetType +{ + /// + /// The target is an orchestration. + /// + Orchestration, + + /// + /// The target is an activity. + /// + Activity, +} diff --git a/src/Extensions/Plugins/BuiltIn/IAuthorizationHandler.cs b/src/Extensions/Plugins/BuiltIn/IAuthorizationHandler.cs new file mode 100644 index 000000000..53b10f82d --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/IAuthorizationHandler.cs @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// Handler for evaluating authorization rules on task executions. +/// +public interface IAuthorizationHandler +{ + /// + /// Evaluates whether the given task execution should be authorized. + /// + /// The authorization context. + /// true if execution is authorized; false otherwise. + Task AuthorizeAsync(AuthorizationContext context); +} diff --git a/src/Extensions/Plugins/BuiltIn/IInputValidator.cs b/src/Extensions/Plugins/BuiltIn/IInputValidator.cs new file mode 100644 index 000000000..16a439096 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/IInputValidator.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// Interface for validating task inputs. +/// +public interface IInputValidator +{ + /// + /// Validates the input for the specified task. + /// + /// The name of the task being validated. + /// The input to validate. + /// The validation result. + Task ValidateAsync(TaskName taskName, object? input); +} diff --git a/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs b/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs new file mode 100644 index 000000000..021dd05b2 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs @@ -0,0 +1,114 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that provides structured logging for orchestration and activity lifecycle events. +/// Logs are emitted at appropriate levels (Information for start/complete, Error for failures) +/// and include contextual information such as instance IDs and task names. +/// +public sealed class LoggingPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.Logging"; + + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + /// The logger factory for creating loggers. + public LoggingPlugin(ILoggerFactory loggerFactory) + { + Check.NotNull(loggerFactory); + LoggingOrchestrationInterceptor orchestrationInterceptor = new(loggerFactory.CreateLogger("DurableTask.Orchestration")); + LoggingActivityInterceptor activityInterceptor = new(loggerFactory.CreateLogger("DurableTask.Activity")); + this.orchestrationInterceptors = new List { orchestrationInterceptor }; + this.activityInterceptors = new List { activityInterceptor }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + sealed class LoggingOrchestrationInterceptor : IOrchestrationInterceptor + { + readonly ILogger logger; + + public LoggingOrchestrationInterceptor(ILogger logger) => this.logger = logger; + + public Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + this.logger.LogInformation( + "Orchestration '{Name}' started. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + + public Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) + { + this.logger.LogInformation( + "Orchestration '{Name}' completed. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + + public Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) + { + this.logger.LogError( + exception, + "Orchestration '{Name}' failed. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + } + + sealed class LoggingActivityInterceptor : IActivityInterceptor + { + readonly ILogger logger; + + public LoggingActivityInterceptor(ILogger logger) => this.logger = logger; + + public Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + this.logger.LogInformation( + "Activity '{Name}' started. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) + { + this.logger.LogInformation( + "Activity '{Name}' completed. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) + { + this.logger.LogError( + exception, + "Activity '{Name}' failed. InstanceId: {InstanceId}", + context.Name, + context.InstanceId); + return Task.CompletedTask; + } + } +} diff --git a/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs b/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs new file mode 100644 index 000000000..a852d16eb --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs @@ -0,0 +1,231 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Concurrent; +using System.Diagnostics; + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that tracks execution metrics for orchestrations and activities, +/// including counts (started, completed, failed) and durations. +/// +public sealed class MetricsPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.Metrics"; + + readonly MetricsStore store; + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + public MetricsPlugin() + : this(new MetricsStore()) + { + } + + /// + /// Initializes a new instance of the class with a shared store. + /// + /// The metrics store to use. + public MetricsPlugin(MetricsStore store) + { + Check.NotNull(store); + this.store = store; + this.orchestrationInterceptors = new List + { + new MetricsOrchestrationInterceptor(store), + }; + this.activityInterceptors = new List + { + new MetricsActivityInterceptor(store), + }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + /// + /// Gets the metrics store used by this plugin. + /// + public MetricsStore Store => this.store; + + sealed class MetricsOrchestrationInterceptor : IOrchestrationInterceptor + { + readonly MetricsStore store; + + public MetricsOrchestrationInterceptor(MetricsStore store) => this.store = store; + + public Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + this.store.IncrementOrchestrationStarted(context.Name); + context.Properties["_metrics_stopwatch"] = Stopwatch.StartNew(); + return Task.CompletedTask; + } + + public Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) + { + this.store.IncrementOrchestrationCompleted(context.Name); + if (context.Properties.TryGetValue("_metrics_stopwatch", out object? sw) && sw is Stopwatch stopwatch) + { + stopwatch.Stop(); + this.store.RecordOrchestrationDuration(context.Name, stopwatch.Elapsed); + } + + return Task.CompletedTask; + } + + public Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) + { + this.store.IncrementOrchestrationFailed(context.Name); + if (context.Properties.TryGetValue("_metrics_stopwatch", out object? sw) && sw is Stopwatch stopwatch) + { + stopwatch.Stop(); + this.store.RecordOrchestrationDuration(context.Name, stopwatch.Elapsed); + } + + return Task.CompletedTask; + } + } + + sealed class MetricsActivityInterceptor : IActivityInterceptor + { + readonly MetricsStore store; + + public MetricsActivityInterceptor(MetricsStore store) => this.store = store; + + public Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + this.store.IncrementActivityStarted(context.Name); + context.Properties["_metrics_stopwatch"] = Stopwatch.StartNew(); + return Task.CompletedTask; + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) + { + this.store.IncrementActivityCompleted(context.Name); + if (context.Properties.TryGetValue("_metrics_stopwatch", out object? sw) && sw is Stopwatch stopwatch) + { + stopwatch.Stop(); + this.store.RecordActivityDuration(context.Name, stopwatch.Elapsed); + } + + return Task.CompletedTask; + } + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) + { + this.store.IncrementActivityFailed(context.Name); + if (context.Properties.TryGetValue("_metrics_stopwatch", out object? sw) && sw is Stopwatch stopwatch) + { + stopwatch.Stop(); + this.store.RecordActivityDuration(context.Name, stopwatch.Elapsed); + } + + return Task.CompletedTask; + } + } +} + +/// +/// Thread-safe store for orchestration and activity execution metrics. +/// +public sealed class MetricsStore +{ + readonly ConcurrentDictionary orchestrationMetrics = new(); + readonly ConcurrentDictionary activityMetrics = new(); + + /// + /// Gets metrics for a specific orchestration by name. + /// + /// The orchestration name. + /// The metrics for the specified orchestration. + public TaskMetrics GetOrchestrationMetrics(string name) => + this.orchestrationMetrics.GetOrAdd(name, _ => new TaskMetrics()); + + /// + /// Gets metrics for a specific activity by name. + /// + /// The activity name. + /// The metrics for the specified activity. + public TaskMetrics GetActivityMetrics(string name) => + this.activityMetrics.GetOrAdd(name, _ => new TaskMetrics()); + + /// + /// Gets all orchestration metrics. + /// + /// A read-only dictionary of orchestration name to metrics. + public IReadOnlyDictionary GetAllOrchestrationMetrics() => this.orchestrationMetrics; + + /// + /// Gets all activity metrics. + /// + /// A read-only dictionary of activity name to metrics. + public IReadOnlyDictionary GetAllActivityMetrics() => this.activityMetrics; + + internal void IncrementOrchestrationStarted(TaskName name) => this.GetOrchestrationMetrics(name).IncrementStarted(); + + internal void IncrementOrchestrationCompleted(TaskName name) => this.GetOrchestrationMetrics(name).IncrementCompleted(); + + internal void IncrementOrchestrationFailed(TaskName name) => this.GetOrchestrationMetrics(name).IncrementFailed(); + + internal void RecordOrchestrationDuration(TaskName name, TimeSpan duration) => this.GetOrchestrationMetrics(name).RecordDuration(duration); + + internal void IncrementActivityStarted(TaskName name) => this.GetActivityMetrics(name).IncrementStarted(); + + internal void IncrementActivityCompleted(TaskName name) => this.GetActivityMetrics(name).IncrementCompleted(); + + internal void IncrementActivityFailed(TaskName name) => this.GetActivityMetrics(name).IncrementFailed(); + + internal void RecordActivityDuration(TaskName name, TimeSpan duration) => this.GetActivityMetrics(name).RecordDuration(duration); +} + +/// +/// Thread-safe metrics for a single task (orchestration or activity). +/// +public sealed class TaskMetrics +{ + long started; + long completed; + long failed; + long totalDurationTicks; + + /// + /// Gets the number of times this task was started. + /// + public long Started => Interlocked.Read(ref this.started); + + /// + /// Gets the number of times this task completed successfully. + /// + public long Completed => Interlocked.Read(ref this.completed); + + /// + /// Gets the number of times this task failed. + /// + public long Failed => Interlocked.Read(ref this.failed); + + /// + /// Gets the total accumulated duration across all executions. + /// + public TimeSpan TotalDuration => TimeSpan.FromTicks(Interlocked.Read(ref this.totalDurationTicks)); + + internal void IncrementStarted() => Interlocked.Increment(ref this.started); + + internal void IncrementCompleted() => Interlocked.Increment(ref this.completed); + + internal void IncrementFailed() => Interlocked.Increment(ref this.failed); + + internal void RecordDuration(TimeSpan duration) => Interlocked.Add(ref this.totalDurationTicks, duration.Ticks); +} diff --git a/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs b/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs new file mode 100644 index 000000000..831ff8b20 --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs @@ -0,0 +1,165 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Concurrent; + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that applies token-bucket rate limiting to activity executions. +/// When the rate limit is exceeded, a is thrown. +/// Rate limiting is applied per activity name. +/// +public sealed class RateLimitingPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.RateLimiting"; + + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + /// The rate limiting options. + public RateLimitingPlugin(RateLimitingOptions options) + { + Check.NotNull(options); + this.orchestrationInterceptors = Array.Empty(); + this.activityInterceptors = new List + { + new RateLimitingActivityInterceptor(options), + }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + sealed class RateLimitingActivityInterceptor : IActivityInterceptor + { + readonly RateLimitingOptions options; + readonly ConcurrentDictionary buckets = new(); + + public RateLimitingActivityInterceptor(RateLimitingOptions options) => this.options = options; + + public Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + string key = context.Name; + TokenBucket bucket = this.buckets.GetOrAdd(key, _ => new TokenBucket( + this.options.MaxTokens, + this.options.RefillRate, + this.options.RefillInterval)); + + if (!bucket.TryConsume()) + { + throw new RateLimitExceededException( + $"Rate limit exceeded for activity '{context.Name}'. " + + $"Max {this.options.MaxTokens} executions per {this.options.RefillInterval}."); + } + + return Task.CompletedTask; + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) => + Task.CompletedTask; + } +} + +/// +/// Options for the rate limiting plugin. +/// +public sealed class RateLimitingOptions +{ + /// + /// Gets or sets the maximum number of tokens (burst capacity). + /// + public int MaxTokens { get; set; } = 100; + + /// + /// Gets or sets the number of tokens to refill per interval. + /// + public int RefillRate { get; set; } = 10; + + /// + /// Gets or sets the interval between token refills. + /// + public TimeSpan RefillInterval { get; set; } = TimeSpan.FromSeconds(1); +} + +/// +/// Exception thrown when a rate limit is exceeded. +/// +public sealed class RateLimitExceededException : InvalidOperationException +{ + /// + /// Initializes a new instance of the class. + /// + /// The error message. + public RateLimitExceededException(string message) + : base(message) + { + } +} + +/// +/// Thread-safe token bucket implementation for rate limiting. +/// +internal sealed class TokenBucket +{ + readonly int maxTokens; + readonly int refillRate; + readonly TimeSpan refillInterval; + readonly object syncLock = new(); + int tokens; + DateTime lastRefillTime; + + public TokenBucket(int maxTokens, int refillRate, TimeSpan refillInterval) + { + this.maxTokens = maxTokens; + this.refillRate = refillRate; + this.refillInterval = refillInterval; + this.tokens = maxTokens; + this.lastRefillTime = DateTime.UtcNow; + } + + public bool TryConsume() + { + lock (this.syncLock) + { + this.Refill(); + + if (this.tokens > 0) + { + this.tokens--; + return true; + } + + return false; + } + } + + void Refill() + { + DateTime now = DateTime.UtcNow; + TimeSpan elapsed = now - this.lastRefillTime; + + if (elapsed >= this.refillInterval) + { + int intervalsElapsed = (int)(elapsed.Ticks / this.refillInterval.Ticks); + int tokensToAdd = intervalsElapsed * this.refillRate; + this.tokens = Math.Min(this.maxTokens, this.tokens + tokensToAdd); + this.lastRefillTime = now; + } + } +} diff --git a/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs b/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs new file mode 100644 index 000000000..c7c5376ad --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs @@ -0,0 +1,98 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// A plugin that validates input data before orchestrations and activities execute. +/// Users register validation rules via implementations. +/// If validation fails, an is thrown before the task runs. +/// +public sealed class ValidationPlugin : IDurableTaskPlugin +{ + /// + /// The default plugin name. + /// + public const string DefaultName = "Microsoft.DurableTask.Validation"; + + readonly IReadOnlyList orchestrationInterceptors; + readonly IReadOnlyList activityInterceptors; + + /// + /// Initializes a new instance of the class. + /// + /// The input validators to use. + public ValidationPlugin(params IInputValidator[] validators) + { + Check.NotNull(validators); + this.orchestrationInterceptors = new List + { + new ValidationOrchestrationInterceptor(validators), + }; + this.activityInterceptors = new List + { + new ValidationActivityInterceptor(validators), + }; + } + + /// + public string Name => DefaultName; + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + sealed class ValidationOrchestrationInterceptor : IOrchestrationInterceptor + { + readonly IInputValidator[] validators; + + public ValidationOrchestrationInterceptor(IInputValidator[] validators) => this.validators = validators; + + public async Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + foreach (IInputValidator validator in this.validators) + { + ValidationResult result = await validator.ValidateAsync(context.Name, context.Input); + if (!result.IsValid) + { + throw new ArgumentException( + $"Input validation failed for orchestration '{context.Name}': {result.ErrorMessage}"); + } + } + } + + public Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) => + Task.CompletedTask; + } + + sealed class ValidationActivityInterceptor : IActivityInterceptor + { + readonly IInputValidator[] validators; + + public ValidationActivityInterceptor(IInputValidator[] validators) => this.validators = validators; + + public async Task OnActivityStartingAsync(ActivityInterceptorContext context) + { + foreach (IInputValidator validator in this.validators) + { + ValidationResult result = await validator.ValidateAsync(context.Name, context.Input); + if (!result.IsValid) + { + throw new ArgumentException( + $"Input validation failed for activity '{context.Name}': {result.ErrorMessage}"); + } + } + } + + public Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result) => + Task.CompletedTask; + + public Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception) => + Task.CompletedTask; + } +} diff --git a/src/Extensions/Plugins/BuiltIn/ValidationResult.cs b/src/Extensions/Plugins/BuiltIn/ValidationResult.cs new file mode 100644 index 000000000..a2b53542a --- /dev/null +++ b/src/Extensions/Plugins/BuiltIn/ValidationResult.cs @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins.BuiltIn; + +/// +/// Result of an input validation check. +/// +public readonly struct ValidationResult +{ + /// + /// A successful validation result. + /// + public static readonly ValidationResult Success = new(true, null); + + ValidationResult(bool isValid, string? errorMessage) + { + this.IsValid = isValid; + this.ErrorMessage = errorMessage; + } + + /// + /// Gets a value indicating whether the validation passed. + /// + public bool IsValid { get; } + + /// + /// Gets the error message if validation failed. + /// + public string? ErrorMessage { get; } + + /// + /// Creates a failed validation result. + /// + /// The error message. + /// A failed validation result. + public static ValidationResult Failure(string errorMessage) => new(false, errorMessage); +} diff --git a/src/Extensions/Plugins/DependencyInjection/DurableTaskBuiltInPluginExtensions.cs b/src/Extensions/Plugins/DependencyInjection/DurableTaskBuiltInPluginExtensions.cs new file mode 100644 index 000000000..3c647151d --- /dev/null +++ b/src/Extensions/Plugins/DependencyInjection/DurableTaskBuiltInPluginExtensions.cs @@ -0,0 +1,119 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask; + +/// +/// Convenience extension methods for adding built-in plugins to the Durable Task worker builder. +/// +public static class DurableTaskBuiltInPluginExtensions +{ + /// + /// Adds the logging plugin that emits structured log events for orchestration and activity lifecycle. + /// + /// The worker builder. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseLoggingPlugin(this IDurableTaskWorkerBuilder builder) + { + Check.NotNull(builder); + + // Defer plugin creation to when the service provider is available. + builder.Services.AddSingleton(sp => + { + ILoggerFactory loggerFactory = sp.GetRequiredService(); + return new LoggingPlugin(loggerFactory); + }); + + builder.Services.TryAddPluginPipeline(); + return builder; + } + + /// + /// Adds the metrics plugin that tracks execution counts and durations. + /// + /// The worker builder. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseMetricsPlugin(this IDurableTaskWorkerBuilder builder) + { + return builder.UseMetricsPlugin(new MetricsStore()); + } + + /// + /// Adds the metrics plugin with a shared metrics store. + /// + /// The worker builder. + /// The metrics store to use. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseMetricsPlugin( + this IDurableTaskWorkerBuilder builder, + MetricsStore store) + { + Check.NotNull(builder); + Check.NotNull(store); + + builder.Services.AddSingleton(store); + return builder.UsePlugin(new MetricsPlugin(store)); + } + + /// + /// Adds the authorization plugin with the specified handler. + /// + /// The worker builder. + /// The authorization handler. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseAuthorizationPlugin( + this IDurableTaskWorkerBuilder builder, + IAuthorizationHandler handler) + { + Check.NotNull(builder); + Check.NotNull(handler); + return builder.UsePlugin(new AuthorizationPlugin(handler)); + } + + /// + /// Adds the validation plugin with the specified validators. + /// + /// The worker builder. + /// The input validators to use. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseValidationPlugin( + this IDurableTaskWorkerBuilder builder, + params IInputValidator[] validators) + { + Check.NotNull(builder); + Check.NotNull(validators); + return builder.UsePlugin(new ValidationPlugin(validators)); + } + + /// + /// Adds the rate limiting plugin with the specified options. + /// + /// The worker builder. + /// Configuration callback for rate limiting options. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UseRateLimitingPlugin( + this IDurableTaskWorkerBuilder builder, + Action? configure = null) + { + Check.NotNull(builder); + RateLimitingOptions options = new(); + configure?.Invoke(options); + return builder.UsePlugin(new RateLimitingPlugin(options)); + } + + static void TryAddPluginPipeline(this IServiceCollection services) + { + services.TryAddSingleton(sp => + { + IEnumerable plugins = sp.GetServices(); + return new PluginPipeline(plugins); + }); + } +} diff --git a/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs b/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs new file mode 100644 index 000000000..0e9b09d8d --- /dev/null +++ b/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs @@ -0,0 +1,60 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace Microsoft.DurableTask; + +/// +/// Extension methods for adding plugins to the Durable Task worker builder. +/// +public static class DurableTaskWorkerBuilderExtensionsPlugins +{ + /// + /// Adds a plugin to the Durable Task worker. All orchestration and activity interceptors + /// from the plugin will be invoked during execution. + /// + /// The worker builder. + /// The plugin to add. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UsePlugin( + this IDurableTaskWorkerBuilder builder, + IDurableTaskPlugin plugin) + { + Check.NotNull(builder); + Check.NotNull(plugin); + + builder.Services.AddSingleton(plugin); + builder.Services.TryAddSingleton(sp => + { + IEnumerable plugins = sp.GetServices(); + return new PluginPipeline(plugins); + }); + + return builder; + } + + /// + /// Adds multiple plugins to the Durable Task worker. + /// + /// The worker builder. + /// The plugins to add. + /// The original builder, for call chaining. + public static IDurableTaskWorkerBuilder UsePlugins( + this IDurableTaskWorkerBuilder builder, + params IDurableTaskPlugin[] plugins) + { + Check.NotNull(builder); + Check.NotNull(plugins); + + foreach (IDurableTaskPlugin plugin in plugins) + { + builder.UsePlugin(plugin); + } + + return builder; + } +} diff --git a/src/Extensions/Plugins/IActivityInterceptor.cs b/src/Extensions/Plugins/IActivityInterceptor.cs new file mode 100644 index 000000000..fbdcf98de --- /dev/null +++ b/src/Extensions/Plugins/IActivityInterceptor.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Interceptor for activity lifecycle events. +/// Implementations can run logic before and after activity execution. +/// +public interface IActivityInterceptor +{ + /// + /// Called before an activity begins execution. + /// + /// The activity interceptor context. + /// A task that completes when the interceptor logic is finished. + Task OnActivityStartingAsync(ActivityInterceptorContext context); + + /// + /// Called after an activity completes execution successfully. + /// + /// The activity interceptor context. + /// The activity result, if any. + /// A task that completes when the interceptor logic is finished. + Task OnActivityCompletedAsync(ActivityInterceptorContext context, object? result); + + /// + /// Called after an activity fails with an exception. + /// + /// The activity interceptor context. + /// The exception that caused the failure. + /// A task that completes when the interceptor logic is finished. + Task OnActivityFailedAsync(ActivityInterceptorContext context, Exception exception); +} diff --git a/src/Extensions/Plugins/IDurableTaskPlugin.cs b/src/Extensions/Plugins/IDurableTaskPlugin.cs new file mode 100644 index 000000000..1b8f85ce0 --- /dev/null +++ b/src/Extensions/Plugins/IDurableTaskPlugin.cs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Defines a plugin that can intercept orchestration and activity lifecycle events. +/// Inspired by Temporal's plugin/interceptor pattern, plugins provide a composable +/// way to add cross-cutting concerns like logging, metrics, authorization, validation, +/// and rate limiting to Durable Task workers and clients. +/// +public interface IDurableTaskPlugin +{ + /// + /// Gets the unique name of this plugin. + /// + string Name { get; } + + /// + /// Gets the orchestration interceptors provided by this plugin. + /// + IReadOnlyList OrchestrationInterceptors { get; } + + /// + /// Gets the activity interceptors provided by this plugin. + /// + IReadOnlyList ActivityInterceptors { get; } +} diff --git a/src/Extensions/Plugins/IOrchestrationInterceptor.cs b/src/Extensions/Plugins/IOrchestrationInterceptor.cs new file mode 100644 index 000000000..9d646221d --- /dev/null +++ b/src/Extensions/Plugins/IOrchestrationInterceptor.cs @@ -0,0 +1,34 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Interceptor for orchestration lifecycle events. +/// Implementations can run logic before and after orchestration execution. +/// +public interface IOrchestrationInterceptor +{ + /// + /// Called before an orchestration begins execution (including replays). + /// + /// The orchestration context. + /// A task that completes when the interceptor logic is finished. + Task OnOrchestrationStartingAsync(OrchestrationInterceptorContext context); + + /// + /// Called after an orchestration completes execution successfully. + /// + /// The orchestration context. + /// The orchestration result, if any. + /// A task that completes when the interceptor logic is finished. + Task OnOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result); + + /// + /// Called after an orchestration fails with an exception. + /// + /// The orchestration context. + /// The exception that caused the failure. + /// A task that completes when the interceptor logic is finished. + Task OnOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception); +} diff --git a/src/Extensions/Plugins/OrchestrationInterceptorContext.cs b/src/Extensions/Plugins/OrchestrationInterceptorContext.cs new file mode 100644 index 000000000..4cd150c50 --- /dev/null +++ b/src/Extensions/Plugins/OrchestrationInterceptorContext.cs @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Context provided to orchestration interceptors during lifecycle events. +/// +public sealed class OrchestrationInterceptorContext +{ + /// + /// Initializes a new instance of the class. + /// + /// The name of the orchestration. + /// The instance ID of the orchestration. + /// Whether this is a replay execution. + /// The orchestration input. + public OrchestrationInterceptorContext(TaskName name, string instanceId, bool isReplaying, object? input) + { + this.Name = name; + this.InstanceId = instanceId; + this.IsReplaying = isReplaying; + this.Input = input; + } + + /// + /// Gets the name of the orchestration. + /// + public TaskName Name { get; } + + /// + /// Gets the instance ID of the orchestration. + /// + public string InstanceId { get; } + + /// + /// Gets a value indicating whether this execution is a replay. + /// + public bool IsReplaying { get; } + + /// + /// Gets the orchestration input. + /// + public object? Input { get; } + + /// + /// Gets a dictionary that can be used to pass data between interceptors during a single execution. + /// + public IDictionary Properties { get; } = new Dictionary(); +} diff --git a/src/Extensions/Plugins/PluginActivityWrapper.cs b/src/Extensions/Plugins/PluginActivityWrapper.cs new file mode 100644 index 000000000..d1165f9b1 --- /dev/null +++ b/src/Extensions/Plugins/PluginActivityWrapper.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Wraps an with the plugin pipeline, invoking +/// interceptors before and after the inner activity runs. +/// +internal sealed class PluginActivityWrapper : ITaskActivity +{ + readonly ITaskActivity inner; + readonly PluginPipeline pipeline; + + /// + /// Initializes a new instance of the class. + /// + /// The original activity to wrap. + /// The plugin pipeline. + public PluginActivityWrapper(ITaskActivity inner, PluginPipeline pipeline) + { + this.inner = inner; + this.pipeline = pipeline; + } + + /// + public Type InputType => this.inner.InputType; + + /// + public Type OutputType => this.inner.OutputType; + + /// + public async Task RunAsync(TaskActivityContext context, object? input) + { + ActivityInterceptorContext interceptorContext = new( + context.Name, + context.InstanceId, + input); + + await this.pipeline.ExecuteActivityStartingAsync(interceptorContext); + + try + { + object? result = await this.inner.RunAsync(context, input); + await this.pipeline.ExecuteActivityCompletedAsync(interceptorContext, result); + return result; + } + catch (Exception ex) + { + await this.pipeline.ExecuteActivityFailedAsync(interceptorContext, ex); + throw; + } + } +} diff --git a/src/Extensions/Plugins/PluginOrchestrationWrapper.cs b/src/Extensions/Plugins/PluginOrchestrationWrapper.cs new file mode 100644 index 000000000..433fce9c7 --- /dev/null +++ b/src/Extensions/Plugins/PluginOrchestrationWrapper.cs @@ -0,0 +1,68 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Wraps an with the plugin pipeline, invoking +/// interceptors before and after the inner orchestrator runs. +/// +internal sealed class PluginOrchestrationWrapper : ITaskOrchestrator +{ + readonly ITaskOrchestrator inner; + readonly PluginPipeline pipeline; + + /// + /// Initializes a new instance of the class. + /// + /// The original orchestrator to wrap. + /// The plugin pipeline. + public PluginOrchestrationWrapper(ITaskOrchestrator inner, PluginPipeline pipeline) + { + this.inner = inner; + this.pipeline = pipeline; + } + + /// + public Type InputType => this.inner.InputType; + + /// + public Type OutputType => this.inner.OutputType; + + /// + public async Task RunAsync(TaskOrchestrationContext context, object? input) + { + OrchestrationInterceptorContext interceptorContext = new( + context.Name, + context.InstanceId, + context.IsReplaying, + input); + + // Only run non-replay interceptors for starting/completing events to avoid duplication. + if (!context.IsReplaying) + { + await this.pipeline.ExecuteOrchestrationStartingAsync(interceptorContext); + } + + try + { + object? result = await this.inner.RunAsync(context, input); + + if (!context.IsReplaying) + { + await this.pipeline.ExecuteOrchestrationCompletedAsync(interceptorContext, result); + } + + return result; + } + catch (Exception ex) + { + if (!context.IsReplaying) + { + await this.pipeline.ExecuteOrchestrationFailedAsync(interceptorContext, ex); + } + + throw; + } + } +} diff --git a/src/Extensions/Plugins/PluginPipeline.cs b/src/Extensions/Plugins/PluginPipeline.cs new file mode 100644 index 000000000..4886a1b4b --- /dev/null +++ b/src/Extensions/Plugins/PluginPipeline.cs @@ -0,0 +1,127 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// Manages the pipeline of registered plugins and executes interceptors in order. +/// +public sealed class PluginPipeline +{ + readonly IReadOnlyList plugins; + + /// + /// Initializes a new instance of the class. + /// + /// The plugins to include in the pipeline. + public PluginPipeline(IEnumerable plugins) + { + Check.NotNull(plugins); + this.plugins = plugins.ToList(); + } + + /// + /// Gets the registered plugins. + /// + public IReadOnlyList Plugins => this.plugins; + + /// + /// Executes all orchestration-starting interceptors in registration order. + /// + /// The orchestration interceptor context. + /// A task that completes when all interceptors have run. + public async Task ExecuteOrchestrationStartingAsync(OrchestrationInterceptorContext context) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IOrchestrationInterceptor interceptor in plugin.OrchestrationInterceptors) + { + await interceptor.OnOrchestrationStartingAsync(context); + } + } + } + + /// + /// Executes all orchestration-completed interceptors in registration order. + /// + /// The orchestration interceptor context. + /// The orchestration result. + /// A task that completes when all interceptors have run. + public async Task ExecuteOrchestrationCompletedAsync(OrchestrationInterceptorContext context, object? result) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IOrchestrationInterceptor interceptor in plugin.OrchestrationInterceptors) + { + await interceptor.OnOrchestrationCompletedAsync(context, result); + } + } + } + + /// + /// Executes all orchestration-failed interceptors in registration order. + /// + /// The orchestration interceptor context. + /// The exception that caused the failure. + /// A task that completes when all interceptors have run. + public async Task ExecuteOrchestrationFailedAsync(OrchestrationInterceptorContext context, Exception exception) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IOrchestrationInterceptor interceptor in plugin.OrchestrationInterceptors) + { + await interceptor.OnOrchestrationFailedAsync(context, exception); + } + } + } + + /// + /// Executes all activity-starting interceptors in registration order. + /// + /// The activity interceptor context. + /// A task that completes when all interceptors have run. + public async Task ExecuteActivityStartingAsync(ActivityInterceptorContext context) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IActivityInterceptor interceptor in plugin.ActivityInterceptors) + { + await interceptor.OnActivityStartingAsync(context); + } + } + } + + /// + /// Executes all activity-completed interceptors in registration order. + /// + /// The activity interceptor context. + /// The activity result. + /// A task that completes when all interceptors have run. + public async Task ExecuteActivityCompletedAsync(ActivityInterceptorContext context, object? result) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IActivityInterceptor interceptor in plugin.ActivityInterceptors) + { + await interceptor.OnActivityCompletedAsync(context, result); + } + } + } + + /// + /// Executes all activity-failed interceptors in registration order. + /// + /// The activity interceptor context. + /// The exception that caused the failure. + /// A task that completes when all interceptors have run. + public async Task ExecuteActivityFailedAsync(ActivityInterceptorContext context, Exception exception) + { + foreach (IDurableTaskPlugin plugin in this.plugins) + { + foreach (IActivityInterceptor interceptor in plugin.ActivityInterceptors) + { + await interceptor.OnActivityFailedAsync(context, exception); + } + } + } +} diff --git a/src/Extensions/Plugins/Plugins.csproj b/src/Extensions/Plugins/Plugins.csproj new file mode 100644 index 000000000..59abdda2a --- /dev/null +++ b/src/Extensions/Plugins/Plugins.csproj @@ -0,0 +1,27 @@ + + + + netstandard2.0;net6.0;net8.0;net10.0 + Plugin system for Microsoft Durable Task SDK. Includes built-in plugins for logging, metrics, authorization, validation, and rate limiting. + Microsoft.DurableTask.Extensions.Plugins + Microsoft.DurableTask + true + + + + + + + + + + + + + + + + + + + diff --git a/src/Extensions/Plugins/SimplePlugin.cs b/src/Extensions/Plugins/SimplePlugin.cs new file mode 100644 index 000000000..7a85310eb --- /dev/null +++ b/src/Extensions/Plugins/SimplePlugin.cs @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Plugins; + +/// +/// A simple plugin implementation that aggregates interceptors. This is the recommended +/// way to build plugins, following Temporal's SimplePlugin pattern. +/// +public sealed class SimplePlugin : IDurableTaskPlugin +{ + readonly List orchestrationInterceptors; + readonly List activityInterceptors; + + SimplePlugin( + string name, + List orchestrationInterceptors, + List activityInterceptors) + { + this.Name = name; + this.orchestrationInterceptors = orchestrationInterceptors; + this.activityInterceptors = activityInterceptors; + } + + /// + public string Name { get; } + + /// + public IReadOnlyList OrchestrationInterceptors => this.orchestrationInterceptors; + + /// + public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + + /// + /// Creates a new for constructing a . + /// + /// The unique name of the plugin. + /// A new builder instance. + public static Builder NewBuilder(string name) + { + Check.NotNullOrEmpty(name); + return new Builder(name); + } + + /// + /// Builder for constructing instances. + /// + public sealed class Builder + { + readonly string name; + readonly List orchestrationInterceptors = new(); + readonly List activityInterceptors = new(); + + /// + /// Initializes a new instance of the class. + /// + /// The plugin name. + internal Builder(string name) + { + this.name = name; + } + + /// + /// Adds an orchestration interceptor to the plugin. + /// + /// The interceptor to add. + /// This builder, for call chaining. + public Builder AddOrchestrationInterceptor(IOrchestrationInterceptor interceptor) + { + Check.NotNull(interceptor); + this.orchestrationInterceptors.Add(interceptor); + return this; + } + + /// + /// Adds an activity interceptor to the plugin. + /// + /// The interceptor to add. + /// This builder, for call chaining. + public Builder AddActivityInterceptor(IActivityInterceptor interceptor) + { + Check.NotNull(interceptor); + this.activityInterceptors.Add(interceptor); + return this; + } + + /// + /// Builds the instance. + /// + /// A new . + public SimplePlugin Build() + { + return new SimplePlugin( + this.name, + new List(this.orchestrationInterceptors), + new List(this.activityInterceptors)); + } + } +} diff --git a/test/Extensions.Plugins.Tests/AuthorizationPluginTests.cs b/test/Extensions.Plugins.Tests/AuthorizationPluginTests.cs new file mode 100644 index 000000000..81f2acae7 --- /dev/null +++ b/test/Extensions.Plugins.Tests/AuthorizationPluginTests.cs @@ -0,0 +1,90 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Moq; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class AuthorizationPluginTests +{ + [Fact] + public async Task AuthorizationPlugin_AllowsAuthorizedOrchestration() + { + // Arrange + Mock handler = new(); + handler.Setup(h => h.AuthorizeAsync(It.IsAny())) + .ReturnsAsync(true); + + AuthorizationPlugin plugin = new(handler.Object); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act & Assert + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().NotThrowAsync(); + } + + [Fact] + public async Task AuthorizationPlugin_DeniesUnauthorizedOrchestration() + { + // Arrange + Mock handler = new(); + handler.Setup(h => h.AuthorizeAsync(It.IsAny())) + .ReturnsAsync(false); + + AuthorizationPlugin plugin = new(handler.Object); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act & Assert + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*TestOrch*"); + } + + [Fact] + public async Task AuthorizationPlugin_DeniesUnauthorizedActivity() + { + // Arrange + Mock handler = new(); + handler.Setup(h => h.AuthorizeAsync(It.IsAny())) + .ReturnsAsync(false); + + AuthorizationPlugin plugin = new(handler.Object); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act & Assert + await plugin.ActivityInterceptors[0].Invoking( + i => i.OnActivityStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*TestActivity*"); + } + + [Fact] + public async Task AuthorizationPlugin_PassesCorrectContextToHandler() + { + // Arrange + AuthorizationContext? capturedContext = null; + Mock handler = new(); + handler.Setup(h => h.AuthorizeAsync(It.IsAny())) + .Callback(c => capturedContext = c) + .ReturnsAsync(true); + + AuthorizationPlugin plugin = new(handler.Object); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, "some-input"); + + // Act + await plugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(context); + + // Assert + capturedContext.Should().NotBeNull(); + capturedContext!.Name.Should().Be(new TaskName("TestOrch")); + capturedContext.InstanceId.Should().Be("instance1"); + capturedContext.TargetType.Should().Be(AuthorizationTargetType.Orchestration); + capturedContext.Input.Should().Be("some-input"); + } +} diff --git a/test/Extensions.Plugins.Tests/Extensions.Plugins.Tests.csproj b/test/Extensions.Plugins.Tests/Extensions.Plugins.Tests.csproj new file mode 100644 index 000000000..69d426aa2 --- /dev/null +++ b/test/Extensions.Plugins.Tests/Extensions.Plugins.Tests.csproj @@ -0,0 +1,11 @@ + + + + net10.0 + + + + + + + diff --git a/test/Extensions.Plugins.Tests/LoggingPluginTests.cs b/test/Extensions.Plugins.Tests/LoggingPluginTests.cs new file mode 100644 index 000000000..c0c06c294 --- /dev/null +++ b/test/Extensions.Plugins.Tests/LoggingPluginTests.cs @@ -0,0 +1,42 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class LoggingPluginTests +{ + [Fact] + public void LoggingPlugin_HasCorrectName() + { + // Arrange + Mock loggerFactory = new(); + loggerFactory.Setup(f => f.CreateLogger(It.IsAny())).Returns(Mock.Of()); + + // Act + LoggingPlugin plugin = new(loggerFactory.Object); + + // Assert + plugin.Name.Should().Be(LoggingPlugin.DefaultName); + } + + [Fact] + public void LoggingPlugin_HasOneOrchestrationInterceptor() + { + // Arrange + Mock loggerFactory = new(); + loggerFactory.Setup(f => f.CreateLogger(It.IsAny())).Returns(Mock.Of()); + + // Act + LoggingPlugin plugin = new(loggerFactory.Object); + + // Assert + plugin.OrchestrationInterceptors.Should().HaveCount(1); + plugin.ActivityInterceptors.Should().HaveCount(1); + } +} diff --git a/test/Extensions.Plugins.Tests/MetricsPluginTests.cs b/test/Extensions.Plugins.Tests/MetricsPluginTests.cs new file mode 100644 index 000000000..0100cb776 --- /dev/null +++ b/test/Extensions.Plugins.Tests/MetricsPluginTests.cs @@ -0,0 +1,88 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class MetricsPluginTests +{ + [Fact] + public void MetricsPlugin_HasCorrectName() + { + // Arrange & Act + MetricsPlugin plugin = new(); + + // Assert + plugin.Name.Should().Be(MetricsPlugin.DefaultName); + } + + [Fact] + public async Task MetricsPlugin_TracksOrchestrationStarted() + { + // Arrange + MetricsStore store = new(); + MetricsPlugin plugin = new(store); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await plugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(context); + + // Assert + store.GetOrchestrationMetrics("TestOrch").Started.Should().Be(1); + } + + [Fact] + public async Task MetricsPlugin_TracksOrchestrationCompleted() + { + // Arrange + MetricsStore store = new(); + MetricsPlugin plugin = new(store); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await plugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(context); + await plugin.OrchestrationInterceptors[0].OnOrchestrationCompletedAsync(context, "result"); + + // Assert + store.GetOrchestrationMetrics("TestOrch").Completed.Should().Be(1); + store.GetOrchestrationMetrics("TestOrch").TotalDuration.Should().BeGreaterThan(TimeSpan.Zero); + } + + [Fact] + public async Task MetricsPlugin_TracksOrchestrationFailed() + { + // Arrange + MetricsStore store = new(); + MetricsPlugin plugin = new(store); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + Exception exception = new InvalidOperationException("test"); + + // Act + await plugin.OrchestrationInterceptors[0].OnOrchestrationStartingAsync(context); + await plugin.OrchestrationInterceptors[0].OnOrchestrationFailedAsync(context, exception); + + // Assert + store.GetOrchestrationMetrics("TestOrch").Failed.Should().Be(1); + } + + [Fact] + public async Task MetricsPlugin_TracksActivityLifecycle() + { + // Arrange + MetricsStore store = new(); + MetricsPlugin plugin = new(store); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act + await plugin.ActivityInterceptors[0].OnActivityStartingAsync(context); + await plugin.ActivityInterceptors[0].OnActivityCompletedAsync(context, "result"); + + // Assert + store.GetActivityMetrics("TestActivity").Started.Should().Be(1); + store.GetActivityMetrics("TestActivity").Completed.Should().Be(1); + } +} diff --git a/test/Extensions.Plugins.Tests/PluginPipelineTests.cs b/test/Extensions.Plugins.Tests/PluginPipelineTests.cs new file mode 100644 index 000000000..ae78e7a9e --- /dev/null +++ b/test/Extensions.Plugins.Tests/PluginPipelineTests.cs @@ -0,0 +1,132 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Moq; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class PluginPipelineTests +{ + [Fact] + public async Task ExecuteOrchestrationStarting_InvokesAllInterceptors() + { + // Arrange + Mock interceptor1 = new(); + Mock interceptor2 = new(); + + SimplePlugin plugin = SimplePlugin.NewBuilder("test") + .AddOrchestrationInterceptor(interceptor1.Object) + .AddOrchestrationInterceptor(interceptor2.Object) + .Build(); + + PluginPipeline pipeline = new(new[] { plugin }); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await pipeline.ExecuteOrchestrationStartingAsync(context); + + // Assert + interceptor1.Verify(i => i.OnOrchestrationStartingAsync(context), Times.Once); + interceptor2.Verify(i => i.OnOrchestrationStartingAsync(context), Times.Once); + } + + [Fact] + public async Task ExecuteActivityStarting_InvokesAllInterceptors() + { + // Arrange + Mock interceptor1 = new(); + Mock interceptor2 = new(); + + SimplePlugin plugin = SimplePlugin.NewBuilder("test") + .AddActivityInterceptor(interceptor1.Object) + .AddActivityInterceptor(interceptor2.Object) + .Build(); + + PluginPipeline pipeline = new(new[] { plugin }); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act + await pipeline.ExecuteActivityStartingAsync(context); + + // Assert + interceptor1.Verify(i => i.OnActivityStartingAsync(context), Times.Once); + interceptor2.Verify(i => i.OnActivityStartingAsync(context), Times.Once); + } + + [Fact] + public async Task ExecuteOrchestrationCompleted_InvokesAllInterceptors() + { + // Arrange + Mock interceptor = new(); + + SimplePlugin plugin = SimplePlugin.NewBuilder("test") + .AddOrchestrationInterceptor(interceptor.Object) + .Build(); + + PluginPipeline pipeline = new(new[] { plugin }); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await pipeline.ExecuteOrchestrationCompletedAsync(context, "result"); + + // Assert + interceptor.Verify(i => i.OnOrchestrationCompletedAsync(context, "result"), Times.Once); + } + + [Fact] + public async Task ExecuteOrchestrationFailed_InvokesAllInterceptors() + { + // Arrange + Mock interceptor = new(); + Exception exception = new InvalidOperationException("test error"); + + SimplePlugin plugin = SimplePlugin.NewBuilder("test") + .AddOrchestrationInterceptor(interceptor.Object) + .Build(); + + PluginPipeline pipeline = new(new[] { plugin }); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await pipeline.ExecuteOrchestrationFailedAsync(context, exception); + + // Assert + interceptor.Verify(i => i.OnOrchestrationFailedAsync(context, exception), Times.Once); + } + + [Fact] + public async Task Pipeline_WithMultiplePlugins_InvokesInOrder() + { + // Arrange + List sequence = new(); + Mock interceptor1 = new(); + interceptor1.Setup(i => i.OnOrchestrationStartingAsync(It.IsAny())) + .Callback(() => sequence.Add("plugin1")) + .Returns(Task.CompletedTask); + + Mock interceptor2 = new(); + interceptor2.Setup(i => i.OnOrchestrationStartingAsync(It.IsAny())) + .Callback(() => sequence.Add("plugin2")) + .Returns(Task.CompletedTask); + + SimplePlugin plugin1 = SimplePlugin.NewBuilder("plugin1") + .AddOrchestrationInterceptor(interceptor1.Object) + .Build(); + SimplePlugin plugin2 = SimplePlugin.NewBuilder("plugin2") + .AddOrchestrationInterceptor(interceptor2.Object) + .Build(); + + PluginPipeline pipeline = new(new IDurableTaskPlugin[] { plugin1, plugin2 }); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, null); + + // Act + await pipeline.ExecuteOrchestrationStartingAsync(context); + + // Assert + sequence.Should().ContainInOrder("plugin1", "plugin2"); + } +} diff --git a/test/Extensions.Plugins.Tests/RateLimitingPluginTests.cs b/test/Extensions.Plugins.Tests/RateLimitingPluginTests.cs new file mode 100644 index 000000000..792170001 --- /dev/null +++ b/test/Extensions.Plugins.Tests/RateLimitingPluginTests.cs @@ -0,0 +1,89 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class RateLimitingPluginTests +{ + [Fact] + public async Task RateLimitingPlugin_AllowsWithinLimit() + { + // Arrange + RateLimitingPlugin plugin = new(new RateLimitingOptions { MaxTokens = 5 }); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act & Assert - should allow 5 calls + for (int i = 0; i < 5; i++) + { + await plugin.ActivityInterceptors[0].Invoking( + a => a.OnActivityStartingAsync(context)) + .Should().NotThrowAsync(); + } + } + + [Fact] + public async Task RateLimitingPlugin_DeniesExceedingLimit() + { + // Arrange + RateLimitingPlugin plugin = new(new RateLimitingOptions + { + MaxTokens = 2, + RefillRate = 0, + RefillInterval = TimeSpan.FromHours(1), + }); + ActivityInterceptorContext context = new("TestActivity", "instance1", "input"); + + // Act - exhaust the tokens + await plugin.ActivityInterceptors[0].OnActivityStartingAsync(context); + await plugin.ActivityInterceptors[0].OnActivityStartingAsync(context); + + // Assert - should now throw + await plugin.ActivityInterceptors[0].Invoking( + a => a.OnActivityStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*Rate limit exceeded*"); + } + + [Fact] + public async Task RateLimitingPlugin_PerActivityName() + { + // Arrange + RateLimitingPlugin plugin = new(new RateLimitingOptions + { + MaxTokens = 1, + RefillRate = 0, + RefillInterval = TimeSpan.FromHours(1), + }); + ActivityInterceptorContext context1 = new("Activity1", "instance1", "input"); + ActivityInterceptorContext context2 = new("Activity2", "instance1", "input"); + + // Act - consume token for Activity1 + await plugin.ActivityInterceptors[0].OnActivityStartingAsync(context1); + + // Assert - Activity2 should still have its own bucket + await plugin.ActivityInterceptors[0].Invoking( + a => a.OnActivityStartingAsync(context2)) + .Should().NotThrowAsync(); + + // Assert - Activity1 is exhausted + await plugin.ActivityInterceptors[0].Invoking( + a => a.OnActivityStartingAsync(context1)) + .Should().ThrowAsync(); + } + + [Fact] + public void RateLimitingPlugin_HasNoOrchestrationInterceptors() + { + // Arrange & Act + RateLimitingPlugin plugin = new(new RateLimitingOptions()); + + // Assert + plugin.OrchestrationInterceptors.Should().BeEmpty(); + plugin.ActivityInterceptors.Should().HaveCount(1); + } +} diff --git a/test/Extensions.Plugins.Tests/ValidationPluginTests.cs b/test/Extensions.Plugins.Tests/ValidationPluginTests.cs new file mode 100644 index 000000000..40654f863 --- /dev/null +++ b/test/Extensions.Plugins.Tests/ValidationPluginTests.cs @@ -0,0 +1,83 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class ValidationPluginTests +{ + [Fact] + public async Task ValidationPlugin_PassesValidInput() + { + // Arrange + IInputValidator validator = new AlwaysValidValidator(); + ValidationPlugin plugin = new(validator); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, "valid-input"); + + // Act & Assert + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().NotThrowAsync(); + } + + [Fact] + public async Task ValidationPlugin_RejectsInvalidInput() + { + // Arrange + IInputValidator validator = new AlwaysInvalidValidator(); + ValidationPlugin plugin = new(validator); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, "bad"); + + // Act & Assert + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*validation failed*"); + } + + [Fact] + public async Task ValidationPlugin_RejectsInvalidActivityInput() + { + // Arrange + IInputValidator validator = new AlwaysInvalidValidator(); + ValidationPlugin plugin = new(validator); + ActivityInterceptorContext context = new("TestActivity", "instance1", "bad"); + + // Act & Assert + await plugin.ActivityInterceptors[0].Invoking( + i => i.OnActivityStartingAsync(context)) + .Should().ThrowAsync() + .WithMessage("*validation failed*"); + } + + [Fact] + public async Task ValidationPlugin_RunsMultipleValidators() + { + // Arrange + IInputValidator validator1 = new AlwaysValidValidator(); + IInputValidator validator2 = new AlwaysInvalidValidator(); + ValidationPlugin plugin = new(validator1, validator2); + OrchestrationInterceptorContext context = new("TestOrch", "instance1", false, "input"); + + // Act & Assert - second validator should cause failure + await plugin.OrchestrationInterceptors[0].Invoking( + i => i.OnOrchestrationStartingAsync(context)) + .Should().ThrowAsync(); + } + + sealed class AlwaysValidValidator : IInputValidator + { + public Task ValidateAsync(TaskName taskName, object? input) => + Task.FromResult(ValidationResult.Success); + } + + sealed class AlwaysInvalidValidator : IInputValidator + { + public Task ValidateAsync(TaskName taskName, object? input) => + Task.FromResult(ValidationResult.Failure("Input is invalid")); + } +} From fd890cff41794172eba6c3a02894a4b0cf650eec Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 12 Mar 2026 12:32:00 -0700 Subject: [PATCH 2/4] Add task registration to plugins (Temporal's full plugin model) Plugins now serve two purposes, matching Temporal's design: 1. Reusable activities/orchestrations - plugins can ship pre-built tasks that auto-register when UsePlugin() is called. Users import a plugin and its activities become available to call from any orchestration. 2. Cross-cutting interceptors - the existing interceptor support for logging, metrics, auth, validation, rate limiting (unchanged). Changes: - IDurableTaskPlugin: added RegisterTasks(DurableTaskRegistry) method - SimplePlugin: added AddTasks(Action) builder method - UsePlugin(): now auto-calls RegisterTasks to register plugin tasks - 5 built-in plugins: implement RegisterTasks as no-op (cross-cutting only) - Added 5 new SimplePluginTests for task registration - Updated sample to demonstrate the importable plugin pattern - Updated README with dual-purpose documentation All 29 tests pass. --- samples/PluginsSample/Program.cs | 35 +++++- samples/PluginsSample/README.md | 81 +++++++----- .../Plugins/BuiltIn/AuthorizationPlugin.cs | 6 + .../Plugins/BuiltIn/LoggingPlugin.cs | 6 + .../Plugins/BuiltIn/MetricsPlugin.cs | 6 + .../Plugins/BuiltIn/RateLimitingPlugin.cs | 6 + .../Plugins/BuiltIn/ValidationPlugin.cs | 6 + ...ableTaskWorkerBuilderExtensions.Plugins.cs | 6 + src/Extensions/Plugins/IDurableTaskPlugin.cs | 19 ++- src/Extensions/Plugins/SimplePlugin.cs | 44 ++++++- .../SimplePluginTests.cs | 118 ++++++++++++++++++ 11 files changed, 286 insertions(+), 47 deletions(-) create mode 100644 test/Extensions.Plugins.Tests/SimplePluginTests.cs diff --git a/samples/PluginsSample/Program.cs b/samples/PluginsSample/Program.cs index e23a7ad79..8d1956491 100644 --- a/samples/PluginsSample/Program.cs +++ b/samples/PluginsSample/Program.cs @@ -148,13 +148,38 @@ await context.CallActivityAsync("SayHello", "Seattle"), Console.WriteLine($" Rate limit (max 3): Allowed={allowed}, Denied={denied}"); -// Demo: SimplePlugin builder -Console.WriteLine("\n--- SimplePlugin Builder ---"); -var customPlugin = Microsoft.DurableTask.Plugins.SimplePlugin.NewBuilder("MyOrg.CustomPlugin") +// Demo: SimplePlugin builder with built-in activities +Console.WriteLine("\n--- SimplePlugin Builder (with built-in activities) ---"); +Console.WriteLine("Plugins can provide reusable activities that auto-register when added to a worker."); +Console.WriteLine("Example: A 'StringUtils' plugin that ships pre-built string activities."); + +// This is how a plugin author would package reusable activities. +// Users just call .UsePlugin(stringUtilsPlugin) and the activities become available. +var stringUtilsPlugin = Microsoft.DurableTask.Plugins.SimplePlugin.NewBuilder("MyOrg.StringUtils") + .AddTasks(registry => + { + registry.AddActivityFunc("StringUtils.ToUpper", (ctx, input) => input.ToUpperInvariant()); + registry.AddActivityFunc("StringUtils.Reverse", (ctx, input) => + new string(input.Reverse().ToArray())); + registry.AddActivityFunc("StringUtils.WordCount", (ctx, input) => + input.Split(' ', StringSplitOptions.RemoveEmptyEntries).Length); + }) .AddOrchestrationInterceptor(loggingPlugin.OrchestrationInterceptors[0]) - .AddActivityInterceptor(metricsPlugin.ActivityInterceptors[0]) .Build(); -Console.WriteLine($"Custom plugin '{customPlugin.Name}' created with {customPlugin.OrchestrationInterceptors.Count} orchestration and {customPlugin.ActivityInterceptors.Count} activity interceptors."); + +// Verify the plugin registers its tasks into a registry +DurableTaskRegistry testRegistry = new(); +stringUtilsPlugin.RegisterTasks(testRegistry); +Console.WriteLine($" Plugin '{stringUtilsPlugin.Name}' registered activities into the worker."); +Console.WriteLine($" Orchestration interceptors: {stringUtilsPlugin.OrchestrationInterceptors.Count}"); +Console.WriteLine(""); +Console.WriteLine(" Usage in production:"); +Console.WriteLine(" builder.Services.AddDurableTaskWorker()"); +Console.WriteLine(" .UsePlugin(stringUtilsPlugin) // auto-registers StringUtils.* activities"); +Console.WriteLine(" .UseGrpc();"); +Console.WriteLine(""); +Console.WriteLine(" Then in an orchestration:"); +Console.WriteLine(" string upper = await context.CallActivityAsync(\"StringUtils.ToUpper\", \"hello\");"); Console.WriteLine("\n=== All plugin demonstrations completed successfully! ==="); diff --git a/samples/PluginsSample/README.md b/samples/PluginsSample/README.md index ac2a686fe..7418a8917 100644 --- a/samples/PluginsSample/README.md +++ b/samples/PluginsSample/README.md @@ -1,56 +1,69 @@ # Plugins Sample This sample demonstrates the **Durable Task Plugin system**, which is inspired by -[Temporal's plugin/interceptor pattern](https://docs.temporal.io/develop/plugins). +[Temporal's plugin pattern](https://docs.temporal.io/develop/plugins). -## What This Sample Shows +## What Plugins Can Do -The sample registers all 5 built-in plugins on a Durable Task worker: +Temporal-style plugins serve **two purposes**: -1. **LoggingPlugin** — Emits structured log events for orchestration and activity lifecycle events (start, complete, fail). -2. **MetricsPlugin** — Tracks execution counts and durations for orchestrations and activities. -3. **AuthorizationPlugin** — Runs authorization checks before task execution (using a custom `IAuthorizationHandler`). -4. **ValidationPlugin** — Validates input data before task execution (using a custom `IInputValidator`). -5. **RateLimitingPlugin** — Applies token-bucket rate limiting to activity dispatches. +### 1. Reusable Activities and Orchestrations -## Prerequisites - -- .NET 8.0 or later -- A running Durable Task Scheduler sidecar (emulator or DTS) +Plugins can ship pre-built activities and orchestrations that users get automatically +when they register the plugin. This is the "import and use" pattern: -## Running the Sample +```csharp +// A plugin author creates a package with reusable activities +var stringUtilsPlugin = SimplePlugin.NewBuilder("MyOrg.StringUtils") + .AddTasks(registry => + { + registry.AddActivityFunc("StringUtils.ToUpper", + (ctx, input) => input.ToUpperInvariant()); + registry.AddActivityFunc("StringUtils.Reverse", + (ctx, input) => new string(input.Reverse().ToArray())); + }) + .Build(); -Start the DTS emulator: +// Users just register the plugin — activities are available immediately +builder.Services.AddDurableTaskWorker() + .UsePlugin(stringUtilsPlugin) + .UseGrpc(); -```bash -docker run --name durabletask-emulator -d -p 4001:4001 mcr.microsoft.com/dts/dts-emulator:latest +// Then call the plugin's activities from any orchestration +string upper = await context.CallActivityAsync("StringUtils.ToUpper", "hello"); ``` -Run the sample: +### 2. Cross-Cutting Interceptors + +Plugins can add lifecycle interceptors for concerns like logging, metrics, auth, etc. + +## Built-in Cross-Cutting Plugins + +| Plugin | Description | +|--------|-------------| +| **LoggingPlugin** | Structured `ILogger` events for orchestration/activity lifecycle | +| **MetricsPlugin** | Execution counts, durations, success/failure tracking | +| **AuthorizationPlugin** | `IAuthorizationHandler` checks before execution | +| **ValidationPlugin** | `IInputValidator` input validation before execution | +| **RateLimitingPlugin** | Token-bucket rate limiting for activity dispatches | + +## Prerequisites + +- .NET 8.0 or later + +## Running the Sample ```bash dotnet run ``` +(Uses the in-process test host — no external sidecar needed.) + ## Plugin Architecture The plugin system follows these key design principles: +- **Dual-purpose** — Plugins can provide reusable tasks AND/OR cross-cutting interceptors. - **Composable** — Multiple plugins can be registered and they execute in registration order. -- **Non-invasive** — Plugins wrap orchestrations and activities through interceptors without modifying the core logic. -- **Temporal-aligned** — The `SimplePlugin` builder pattern mirrors Temporal's `SimplePlugin` for cross-ecosystem familiarity. - -### Creating Custom Plugins - -```csharp -// Create a custom plugin using the SimplePlugin builder -SimplePlugin myPlugin = SimplePlugin.NewBuilder("MyOrg.MyPlugin") - .AddOrchestrationInterceptor(new MyOrchestrationInterceptor()) - .AddActivityInterceptor(new MyActivityInterceptor()) - .Build(); - -// Register it on the worker builder -builder.Services.AddDurableTaskWorker() - .UsePlugin(myPlugin) - .UseGrpc(); -``` +- **Auto-registering** — Plugin tasks are automatically registered into the worker's task registry. +- **Temporal-aligned** — The `SimplePlugin` builder pattern mirrors Temporal's `SimplePlugin`. diff --git a/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs b/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs index c47ea3ab5..1eb9cb248 100644 --- a/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs +++ b/src/Extensions/Plugins/BuiltIn/AuthorizationPlugin.cs @@ -44,6 +44,12 @@ public AuthorizationPlugin(IAuthorizationHandler handler) /// public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Authorization plugin is cross-cutting only; it does not register any tasks. + } + sealed class AuthorizationOrchestrationInterceptor : IOrchestrationInterceptor { readonly IAuthorizationHandler handler; diff --git a/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs b/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs index 021dd05b2..441100255 100644 --- a/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs +++ b/src/Extensions/Plugins/BuiltIn/LoggingPlugin.cs @@ -42,6 +42,12 @@ public LoggingPlugin(ILoggerFactory loggerFactory) /// public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Logging plugin is cross-cutting only; it does not register any tasks. + } + sealed class LoggingOrchestrationInterceptor : IOrchestrationInterceptor { readonly ILogger logger; diff --git a/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs b/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs index a852d16eb..5650b0fd4 100644 --- a/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs +++ b/src/Extensions/Plugins/BuiltIn/MetricsPlugin.cs @@ -56,6 +56,12 @@ public MetricsPlugin(MetricsStore store) /// public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Metrics plugin is cross-cutting only; it does not register any tasks. + } + /// /// Gets the metrics store used by this plugin. /// diff --git a/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs b/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs index 831ff8b20..6d6313a5c 100644 --- a/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs +++ b/src/Extensions/Plugins/BuiltIn/RateLimitingPlugin.cs @@ -43,6 +43,12 @@ public RateLimitingPlugin(RateLimitingOptions options) /// public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Rate limiting plugin is cross-cutting only; it does not register any tasks. + } + sealed class RateLimitingActivityInterceptor : IActivityInterceptor { readonly RateLimitingOptions options; diff --git a/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs b/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs index c7c5376ad..8fbf513ee 100644 --- a/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs +++ b/src/Extensions/Plugins/BuiltIn/ValidationPlugin.cs @@ -44,6 +44,12 @@ public ValidationPlugin(params IInputValidator[] validators) /// public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + // Validation plugin is cross-cutting only; it does not register any tasks. + } + sealed class ValidationOrchestrationInterceptor : IOrchestrationInterceptor { readonly IInputValidator[] validators; diff --git a/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs b/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs index 0e9b09d8d..22237ae69 100644 --- a/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs +++ b/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs @@ -34,6 +34,12 @@ public static IDurableTaskWorkerBuilder UsePlugin( return new PluginPipeline(plugins); }); + // Auto-register the plugin's built-in activities and orchestrations. + builder.Services.Configure(builder.Name, registry => + { + plugin.RegisterTasks(registry); + }); + return builder; } diff --git a/src/Extensions/Plugins/IDurableTaskPlugin.cs b/src/Extensions/Plugins/IDurableTaskPlugin.cs index 1b8f85ce0..ebbafc14b 100644 --- a/src/Extensions/Plugins/IDurableTaskPlugin.cs +++ b/src/Extensions/Plugins/IDurableTaskPlugin.cs @@ -4,10 +4,14 @@ namespace Microsoft.DurableTask.Plugins; /// -/// Defines a plugin that can intercept orchestration and activity lifecycle events. -/// Inspired by Temporal's plugin/interceptor pattern, plugins provide a composable -/// way to add cross-cutting concerns like logging, metrics, authorization, validation, -/// and rate limiting to Durable Task workers and clients. +/// Defines a plugin that can provide reusable activities, orchestrations, and cross-cutting +/// interceptors. Inspired by Temporal's plugin pattern, plugins serve two purposes: +/// +/// Provide built-in activities and orchestrations that users import and register +/// automatically when adding the plugin to a worker. +/// Add cross-cutting interceptors for concerns like logging, metrics, authorization, +/// validation, and rate limiting. +/// /// public interface IDurableTaskPlugin { @@ -25,4 +29,11 @@ public interface IDurableTaskPlugin /// Gets the activity interceptors provided by this plugin. /// IReadOnlyList ActivityInterceptors { get; } + + /// + /// Registers the plugin's built-in orchestrations and activities into the given registry. + /// This is called automatically when the plugin is added to a worker via UsePlugin(). + /// + /// The task registry to register activities and orchestrations into. + void RegisterTasks(DurableTaskRegistry registry); } diff --git a/src/Extensions/Plugins/SimplePlugin.cs b/src/Extensions/Plugins/SimplePlugin.cs index 7a85310eb..38f9fa928 100644 --- a/src/Extensions/Plugins/SimplePlugin.cs +++ b/src/Extensions/Plugins/SimplePlugin.cs @@ -4,22 +4,31 @@ namespace Microsoft.DurableTask.Plugins; /// -/// A simple plugin implementation that aggregates interceptors. This is the recommended -/// way to build plugins, following Temporal's SimplePlugin pattern. +/// A simple plugin implementation that can provide both reusable activities/orchestrations +/// and cross-cutting interceptors. This is the recommended way to build plugins, following +/// Temporal's SimplePlugin pattern. +/// +/// Use this to create plugins that ship as NuGet packages, providing pre-built activities +/// and orchestrations that users get automatically when they register the plugin, as well +/// as interceptors for cross-cutting concerns like logging or metrics. +/// /// public sealed class SimplePlugin : IDurableTaskPlugin { readonly List orchestrationInterceptors; readonly List activityInterceptors; + readonly Action? registerTasks; SimplePlugin( string name, List orchestrationInterceptors, - List activityInterceptors) + List activityInterceptors, + Action? registerTasks) { this.Name = name; this.orchestrationInterceptors = orchestrationInterceptors; this.activityInterceptors = activityInterceptors; + this.registerTasks = registerTasks; } /// @@ -31,6 +40,12 @@ public sealed class SimplePlugin : IDurableTaskPlugin /// public IReadOnlyList ActivityInterceptors => this.activityInterceptors; + /// + public void RegisterTasks(DurableTaskRegistry registry) + { + this.registerTasks?.Invoke(registry); + } + /// /// Creates a new for constructing a . /// @@ -50,6 +65,7 @@ public sealed class Builder readonly string name; readonly List orchestrationInterceptors = new(); readonly List activityInterceptors = new(); + Action? registerTasks; /// /// Initializes a new instance of the class. @@ -84,6 +100,25 @@ public Builder AddActivityInterceptor(IActivityInterceptor interceptor) return this; } + /// + /// Sets a callback that registers the plugin's built-in orchestrations and activities. + /// These tasks are automatically registered when the plugin is added to a worker. + /// + /// A callback that registers tasks into the . + /// This builder, for call chaining. + public Builder AddTasks(Action configure) + { + Check.NotNull(configure); + Action? previous = this.registerTasks; + this.registerTasks = registry => + { + previous?.Invoke(registry); + configure(registry); + }; + + return this; + } + /// /// Builds the instance. /// @@ -93,7 +128,8 @@ public SimplePlugin Build() return new SimplePlugin( this.name, new List(this.orchestrationInterceptors), - new List(this.activityInterceptors)); + new List(this.activityInterceptors), + this.registerTasks); } } } diff --git a/test/Extensions.Plugins.Tests/SimplePluginTests.cs b/test/Extensions.Plugins.Tests/SimplePluginTests.cs new file mode 100644 index 000000000..bd541efbc --- /dev/null +++ b/test/Extensions.Plugins.Tests/SimplePluginTests.cs @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Plugins; +using Xunit; + +namespace Microsoft.DurableTask.Extensions.Plugins.Tests; + +public class SimplePluginTests +{ + [Fact] + public void SimplePlugin_HasCorrectName() + { + // Arrange & Act + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.TestPlugin").Build(); + + // Assert + plugin.Name.Should().Be("MyOrg.TestPlugin"); + } + + [Fact] + public void SimplePlugin_RegistersTasks_DoesNotThrow() + { + // Arrange + bool activityRegistered = false; + bool orchestratorRegistered = false; + + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.TaskPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc("PluginActivity", (ctx, input) => + { + activityRegistered = true; + return $"processed: {input}"; + }); + registry.AddOrchestratorFunc("PluginOrchestration", ctx => + { + orchestratorRegistered = true; + return Task.FromResult("done"); + }); + }) + .Build(); + + DurableTaskRegistry registry = new(); + + // Act — should not throw + plugin.Invoking(p => p.RegisterTasks(registry)).Should().NotThrow(); + + // The registry accepted the registrations (no duplicate errors) + // Registering the same names again should throw (proving first call succeeded) + plugin.Invoking(p => p.RegisterTasks(registry)).Should().Throw(); + } + + [Fact] + public void SimplePlugin_NoTasks_RegisterTasksIsNoOp() + { + // Arrange + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.InterceptorOnly").Build(); + DurableTaskRegistry registry = new(); + + // Act — no-op, should not throw + plugin.Invoking(p => p.RegisterTasks(registry)).Should().NotThrow(); + } + + [Fact] + public void SimplePlugin_CombinesTasksAndInterceptors() + { + // Arrange + Moq.Mock interceptor = new(); + + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.FullPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc("PluginActivity", (ctx, input) => input); + }) + .AddOrchestrationInterceptor(interceptor.Object) + .Build(); + + DurableTaskRegistry registry = new(); + + // Act + plugin.RegisterTasks(registry); + + // Assert — interceptors are separate from tasks + plugin.OrchestrationInterceptors.Should().HaveCount(1); + + // Tasks were registered (attempting again should throw) + plugin.Invoking(p => p.RegisterTasks(registry)) + .Should().Throw() + .WithMessage("*PluginActivity*"); + } + + [Fact] + public void SimplePlugin_MultipleAddTasks_RegistersAll() + { + // Arrange + SimplePlugin plugin = SimplePlugin.NewBuilder("MyOrg.MultiPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc("Activity1", (ctx, input) => input); + }) + .AddTasks(registry => + { + registry.AddActivityFunc("Activity2", (ctx, input) => input); + }) + .Build(); + + DurableTaskRegistry registry = new(); + + // Act + plugin.RegisterTasks(registry); + + // Assert — both activities are registered (re-registering would throw for both) + Action reRegister = () => plugin.RegisterTasks(registry); + reRegister.Should().Throw().WithMessage("*Activity1*"); + } +} From 0a5c24fd115260570ca0fc1b6bb3daf5e6d3bb8b Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:11:27 -0700 Subject: [PATCH 3/4] Wire up plugin wrappers into execution pipeline + 9 E2E tests against real DTS Key changes: - PluginOrchestrationWrapper and PluginActivityWrapper are now wired into the actual execution pipeline via PluginRegistryPostConfigure. When UsePlugin() is called, a PostConfigure wraps every registered orchestrator/activity factory so the plugin interceptors run transparently on real executions. - Added InternalsVisibleTo for Plugins in Abstractions.csproj to access registry internals (Orchestrators, Activities dictionaries). - Removed SharedSection Core from Plugins.csproj to avoid polyfill type clashes on net6+/net8+/net10+ (IVT exposes netstandard2.0 polyfills). Added minimal local Check class instead. - Added HasOrchestrationInterceptors and HasActivityInterceptors properties to PluginPipeline for conditional wrapping. E2E Tests (test/Extensions.Plugins.E2ETests/): - 9 tests against real DTS scheduler (wbtwus1, westus3) - All pass in ~1.4 minutes - Tests: MetricsPlugin, LoggingPlugin, AuthorizationPlugin (allow + deny), ValidationPlugin (accept + reject), PluginRegisteredActivity, MultiplePlugins, PluginWithTasksAndInterceptors - Requires DTS_CONNECTION_STRING env var (not for CI, local only) --- Microsoft.DurableTask.sln | 21 + src/Abstractions/Abstractions.csproj | 1 + ...ableTaskWorkerBuilderExtensions.Plugins.cs | 12 +- .../PluginRegistryPostConfigure.cs | 61 +++ src/Extensions/Plugins/Internal/Check.cs | 31 ++ src/Extensions/Plugins/PluginPipeline.cs | 10 + src/Extensions/Plugins/Plugins.csproj | 7 +- .../Extensions.Plugins.E2ETests/DtsFixture.cs | 73 +++ .../Extensions.Plugins.E2ETests.csproj | 19 + .../PluginE2ETests.cs | 425 ++++++++++++++++++ 10 files changed, 658 insertions(+), 2 deletions(-) create mode 100644 src/Extensions/Plugins/DependencyInjection/PluginRegistryPostConfigure.cs create mode 100644 src/Extensions/Plugins/Internal/Check.cs create mode 100644 test/Extensions.Plugins.E2ETests/DtsFixture.cs create mode 100644 test/Extensions.Plugins.E2ETests/Extensions.Plugins.E2ETests.csproj create mode 100644 test/Extensions.Plugins.E2ETests/PluginE2ETests.cs diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index d6d59344f..25722bed0 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -127,6 +127,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{51A52603-5 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{3B8F957E-7773-4C0C-ACD7-91A1591D9312}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions.Plugins.E2ETests", "test\Extensions.Plugins.E2ETests\Extensions.Plugins.E2ETests.csproj", "{0926BD30-3713-488B-8043-761BB3C33014}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureManaged", "AzureManaged", "{D4587EC0-1B16-8420-7502-A967139249D4}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureManaged", "AzureManaged", "{53193780-CD18-2643-6953-C26F59EAEDF5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -725,6 +731,18 @@ Global {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x64.Build.0 = Release|Any CPU {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x86.ActiveCfg = Release|Any CPU {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x86.Build.0 = Release|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Debug|Any CPU.Build.0 = Debug|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Debug|x64.ActiveCfg = Debug|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Debug|x64.Build.0 = Debug|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Debug|x86.ActiveCfg = Debug|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Debug|x86.Build.0 = Debug|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Release|Any CPU.ActiveCfg = Release|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Release|Any CPU.Build.0 = Release|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Release|x64.ActiveCfg = Release|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Release|x64.Build.0 = Release|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Release|x86.ActiveCfg = Release|Any CPU + {0926BD30-3713-488B-8043-761BB3C33014}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -788,6 +806,9 @@ Global {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {51A52603-541D-DE3F-2825-C80F9EE6C532} = {1C217BB2-CE16-41CC-9D47-0FC0DB60BDB3} {3B8F957E-7773-4C0C-ACD7-91A1591D9312} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} + {0926BD30-3713-488B-8043-761BB3C33014} = {E5637F81-2FB9-4CD7-900D-455363B142A7} + {D4587EC0-1B16-8420-7502-A967139249D4} = {1C217BB2-CE16-41CC-9D47-0FC0DB60BDB3} + {53193780-CD18-2643-6953-C26F59EAEDF5} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/src/Abstractions/Abstractions.csproj b/src/Abstractions/Abstractions.csproj index db8be76ab..40ad6f6bb 100644 --- a/src/Abstractions/Abstractions.csproj +++ b/src/Abstractions/Abstractions.csproj @@ -22,6 +22,7 @@ + diff --git a/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs b/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs index 22237ae69..3a99d1776 100644 --- a/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs +++ b/src/Extensions/Plugins/DependencyInjection/DurableTaskWorkerBuilderExtensions.Plugins.cs @@ -5,6 +5,7 @@ using Microsoft.DurableTask.Worker; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; namespace Microsoft.DurableTask; @@ -13,9 +14,12 @@ namespace Microsoft.DurableTask; /// public static class DurableTaskWorkerBuilderExtensionsPlugins { + static readonly object WrappingRegistered = new(); + /// /// Adds a plugin to the Durable Task worker. All orchestration and activity interceptors - /// from the plugin will be invoked during execution. + /// from the plugin will be invoked during execution, and the plugin's built-in activities + /// and orchestrations will be auto-registered into the worker's task registry. /// /// The worker builder. /// The plugin to add. @@ -40,6 +44,12 @@ public static IDurableTaskWorkerBuilder UsePlugin( plugin.RegisterTasks(registry); }); + // Register the PostConfigure that wraps all factories with plugin interceptors. + // TryAddEnumerable ensures this only runs once per builder name even if UsePlugin is called multiple times. + builder.Services.TryAddEnumerable( + ServiceDescriptor.Singleton>( + new PluginRegistryPostConfigure(builder.Name))); + return builder; } diff --git a/src/Extensions/Plugins/DependencyInjection/PluginRegistryPostConfigure.cs b/src/Extensions/Plugins/DependencyInjection/PluginRegistryPostConfigure.cs new file mode 100644 index 000000000..7a2fff1e5 --- /dev/null +++ b/src/Extensions/Plugins/DependencyInjection/PluginRegistryPostConfigure.cs @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Plugins; +using Microsoft.Extensions.Options; + +namespace Microsoft.DurableTask; + +/// +/// Post-configures a to wrap all orchestrator and activity +/// factories with and . +/// This ensures plugin interceptors run transparently for every orchestration and activity execution. +/// +sealed class PluginRegistryPostConfigure : IPostConfigureOptions +{ + readonly string name; + + public PluginRegistryPostConfigure(string name) + { + this.name = name; + } + + /// + public void PostConfigure(string? name, DurableTaskRegistry registry) + { + if (!string.Equals(name, this.name, StringComparison.Ordinal)) + { + return; + } + + // Wrap all orchestrator factories so interceptors run on every execution. + List>> orchestrators = new(registry.Orchestrators); + foreach (KeyValuePair> entry in orchestrators) + { + Func original = entry.Value; + registry.Orchestrators[entry.Key] = sp => + { + ITaskOrchestrator inner = original(sp); + PluginPipeline? pipeline = sp.GetService(typeof(PluginPipeline)) as PluginPipeline; + return pipeline is not null && pipeline.HasOrchestrationInterceptors + ? new PluginOrchestrationWrapper(inner, pipeline) + : inner; + }; + } + + // Wrap all activity factories so interceptors run on every execution. + List>> activities = new(registry.Activities); + foreach (KeyValuePair> entry in activities) + { + Func original = entry.Value; + registry.Activities[entry.Key] = sp => + { + ITaskActivity inner = original(sp); + PluginPipeline? pipeline = sp.GetService(typeof(PluginPipeline)) as PluginPipeline; + return pipeline is not null && pipeline.HasActivityInterceptors + ? new PluginActivityWrapper(inner, pipeline) + : inner; + }; + } + } +} diff --git a/src/Extensions/Plugins/Internal/Check.cs b/src/Extensions/Plugins/Internal/Check.cs new file mode 100644 index 000000000..54200eae1 --- /dev/null +++ b/src/Extensions/Plugins/Internal/Check.cs @@ -0,0 +1,31 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask; + +/// +/// Minimal argument validation helpers for the Plugins extension package. +/// +static class Check +{ + public static T NotNull(T? argument, string? name = null) + where T : class + { + if (argument is null) + { + throw new ArgumentNullException(name); + } + + return argument; + } + + public static string NotNullOrEmpty(string? argument, string? name = null) + { + if (string.IsNullOrEmpty(argument)) + { + throw new ArgumentException("Value cannot be null or empty.", name); + } + + return argument!; + } +} diff --git a/src/Extensions/Plugins/PluginPipeline.cs b/src/Extensions/Plugins/PluginPipeline.cs index 4886a1b4b..f360edbff 100644 --- a/src/Extensions/Plugins/PluginPipeline.cs +++ b/src/Extensions/Plugins/PluginPipeline.cs @@ -25,6 +25,16 @@ public PluginPipeline(IEnumerable plugins) /// public IReadOnlyList Plugins => this.plugins; + /// + /// Gets a value indicating whether any registered plugin has orchestration interceptors. + /// + public bool HasOrchestrationInterceptors => this.plugins.Any(p => p.OrchestrationInterceptors.Count > 0); + + /// + /// Gets a value indicating whether any registered plugin has activity interceptors. + /// + public bool HasActivityInterceptors => this.plugins.Any(p => p.ActivityInterceptors.Count > 0); + /// /// Executes all orchestration-starting interceptors in registration order. /// diff --git a/src/Extensions/Plugins/Plugins.csproj b/src/Extensions/Plugins/Plugins.csproj index 59abdda2a..ba8d7a7a3 100644 --- a/src/Extensions/Plugins/Plugins.csproj +++ b/src/Extensions/Plugins/Plugins.csproj @@ -21,7 +21,12 @@ - + diff --git a/test/Extensions.Plugins.E2ETests/DtsFixture.cs b/test/Extensions.Plugins.E2ETests/DtsFixture.cs new file mode 100644 index 000000000..cbb6630d5 --- /dev/null +++ b/test/Extensions.Plugins.E2ETests/DtsFixture.cs @@ -0,0 +1,73 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Microsoft.DurableTask.Extensions.Plugins.E2ETests; + +/// +/// Shared fixture that sets up a DTS-connected worker and client for E2E tests. +/// Reads the connection string from DTS_CONNECTION_STRING environment variable. +/// +public sealed class DtsFixture : IAsyncDisposable +{ + IHost? host; + + public DurableTaskClient Client { get; private set; } = null!; + + public MetricsStore MetricsStore { get; } = new(); + + public List AuthorizationLog { get; } = new(); + + public static string? GetConnectionString() => + Environment.GetEnvironmentVariable("DTS_CONNECTION_STRING"); + + public async Task StartAsync( + Action configureTasks, + Action? configureWorker = null) + { + string connectionString = GetConnectionString() + ?? throw new InvalidOperationException( + "DTS_CONNECTION_STRING environment variable is required. " + + "Format: Endpoint=https://...;Authentication=DefaultAzure;TaskHub=..."); + + HostApplicationBuilder builder = Host.CreateApplicationBuilder(); + builder.Logging.SetMinimumLevel(LogLevel.Warning); + + builder.Services.AddDurableTaskClient(clientBuilder => + { + clientBuilder.UseDurableTaskScheduler(connectionString); + }); + + builder.Services.AddDurableTaskWorker(workerBuilder => + { + workerBuilder.AddTasks(configureTasks); + workerBuilder.UseDurableTaskScheduler(connectionString); + + // Apply custom plugin configuration. + configureWorker?.Invoke(workerBuilder); + }); + + this.host = builder.Build(); + await this.host.StartAsync(); + + this.Client = this.host.Services.GetRequiredService(); + } + + public async ValueTask DisposeAsync() + { + if (this.host is not null) + { + await this.host.StopAsync(); + this.host.Dispose(); + } + } +} diff --git a/test/Extensions.Plugins.E2ETests/Extensions.Plugins.E2ETests.csproj b/test/Extensions.Plugins.E2ETests/Extensions.Plugins.E2ETests.csproj new file mode 100644 index 000000000..64307abe1 --- /dev/null +++ b/test/Extensions.Plugins.E2ETests/Extensions.Plugins.E2ETests.csproj @@ -0,0 +1,19 @@ + + + + net10.0 + + + + + + + + + + + + + + + diff --git a/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs b/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs new file mode 100644 index 000000000..3072c0a0a --- /dev/null +++ b/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs @@ -0,0 +1,425 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Plugins; +using Microsoft.DurableTask.Plugins.BuiltIn; +using Microsoft.DurableTask.Worker; +using Microsoft.Extensions.Logging; +using Xunit; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Extensions.Plugins.E2ETests; + +/// +/// End-to-end tests for the plugin system against a real Durable Task Scheduler. +/// Requires the DTS_CONNECTION_STRING environment variable to be set. +/// +[Collection("DTS")] +[Trait("Category", "E2E")] +public class PluginE2ETests : IAsyncLifetime +{ + readonly ITestOutputHelper output; + readonly DtsFixture fixture = new(); + readonly string testId = Guid.NewGuid().ToString("N")[..8]; + + public PluginE2ETests(ITestOutputHelper output) + { + this.output = output; + } + + public Task InitializeAsync() => Task.CompletedTask; + + public async Task DisposeAsync() => await this.fixture.DisposeAsync(); + + void SkipIfNoConnectionString() + { + string? cs = DtsFixture.GetConnectionString(); + if (string.IsNullOrEmpty(cs)) + { + throw new SkipException( + "DTS_CONNECTION_STRING not set. Set it to run E2E tests against a real DTS scheduler."); + } + } + + // ─── Test 1: Metrics plugin tracks orchestration and activity counts ─── + + [Fact] + public async Task MetricsPlugin_TracksExecutionCounts_E2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"MetricsOrch_{this.testId}"; + string actName = $"MetricsAct_{this.testId}"; + MetricsStore store = new(); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + string r = await ctx.CallActivityAsync(actName, "hello"); + return r; + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"echo:{input}"); + }, + worker => worker.UseMetricsPlugin(store)); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + this.output.WriteLine($"Scheduled: {instanceId}"); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + this.output.WriteLine($"Output: {result.SerializedOutput}"); + + // Metrics plugin interceptors should have been invoked via the wrapper. + store.GetOrchestrationMetrics(orchName).Started.Should().BeGreaterOrEqualTo(1); + store.GetOrchestrationMetrics(orchName).Completed.Should().BeGreaterOrEqualTo(1); + store.GetActivityMetrics(actName).Started.Should().BeGreaterOrEqualTo(1); + store.GetActivityMetrics(actName).Completed.Should().BeGreaterOrEqualTo(1); + store.GetOrchestrationMetrics(orchName).Failed.Should().Be(0); + store.GetActivityMetrics(actName).Failed.Should().Be(0); + + this.output.WriteLine($"Orch started={store.GetOrchestrationMetrics(orchName).Started}, completed={store.GetOrchestrationMetrics(orchName).Completed}"); + this.output.WriteLine($"Activity started={store.GetActivityMetrics(actName).Started}, completed={store.GetActivityMetrics(actName).Completed}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 2: Logging plugin fires interceptors without errors ─── + + [Fact] + public async Task LoggingPlugin_DoesNotBreakExecution_E2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"LogOrch_{this.testId}"; + string actName = $"LogAct_{this.testId}"; + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "world"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"Hi {input}"); + }, + worker => worker.UseLoggingPlugin()); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + this.output.WriteLine($"Scheduled: {instanceId}"); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("Hi world"); + this.output.WriteLine($"Completed with output: {result.SerializedOutput}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 3: Authorization plugin blocks unauthorized execution ─── + + [Fact] + public async Task AuthorizationPlugin_BlocksUnauthorized_E2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"AuthBlockOrch_{this.testId}"; + string actName = $"AuthBlockAct_{this.testId}"; + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "data"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => input); + }, + worker => worker.UseAuthorizationPlugin(new DenyAllHandler())); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + this.output.WriteLine($"Scheduled: {instanceId}"); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + // The orchestration should fail because the authorization interceptor denies it. + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Failed); + result.FailureDetails.Should().NotBeNull(); + result.FailureDetails!.ErrorMessage.Should().Contain("Authorization denied"); + this.output.WriteLine($"Failed as expected: {result.FailureDetails.ErrorMessage}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 4: Authorization plugin allows authorized execution ─── + + [Fact] + public async Task AuthorizationPlugin_AllowsAuthorized_E2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"AuthAllowOrch_{this.testId}"; + string actName = $"AuthAllowAct_{this.testId}"; + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "allowed"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"ok:{input}"); + }, + worker => worker.UseAuthorizationPlugin(new AllowAllHandler())); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("ok:allowed"); + this.output.WriteLine($"Authorized OK: {result.SerializedOutput}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 5: Validation plugin rejects invalid input ─── + + [Fact] + public async Task ValidationPlugin_RejectsInvalidInput_E2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"ValRejectOrch_{this.testId}"; + string actName = $"ValRejectAct_{this.testId}"; + + // Validator rejects null/empty inputs for the activity. + IInputValidator validator = new NonEmptyStringValidator(actName); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + // Pass empty string — should be rejected by validation. + return await ctx.CallActivityAsync(actName, string.Empty); + }); + tasks.AddActivityFunc(actName, (ctx, input) => input); + }, + worker => worker.UseValidationPlugin(validator)); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + // Orchestration completes but the activity call should fail with TaskFailedException + // because the validation interceptor throws ArgumentException. + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Failed); + this.output.WriteLine($"Validation rejected: {result.FailureDetails?.ErrorMessage}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 6: Validation plugin passes valid input ─── + + [Fact] + public async Task ValidationPlugin_AcceptsValidInput_E2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"ValAcceptOrch_{this.testId}"; + string actName = $"ValAcceptAct_{this.testId}"; + + IInputValidator validator = new NonEmptyStringValidator(actName); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "valid-data"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"processed:{input}"); + }, + worker => worker.UseValidationPlugin(validator)); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("processed:valid-data"); + this.output.WriteLine($"Valid OK: {result.SerializedOutput}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 7: Plugin-provided activities are callable ─── + + [Fact] + public async Task PluginRegisteredActivity_IsCallable_E2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"PluginTaskOrch_{this.testId}"; + string pluginActName = $"PluginAct_{this.testId}"; + + SimplePlugin taskPlugin = SimplePlugin.NewBuilder("E2E.TaskPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc(pluginActName, (ctx, input) => $"plugin:{input}"); + }) + .Build(); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + // Call the activity that was registered by the plugin, not by the user. + return await ctx.CallActivityAsync(pluginActName, "from-orch"); + }); + }, + worker => worker.UsePlugin(taskPlugin)); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("plugin:from-orch"); + this.output.WriteLine($"Plugin activity result: {result.SerializedOutput}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 8: Multiple plugins all fire interceptors ─── + + [Fact] + public async Task MultiplePlugins_AllInterceptorsFire_E2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"MultiPlugOrch_{this.testId}"; + string actName = $"MultiPlugAct_{this.testId}"; + MetricsStore store = new(); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(actName, "multi"); + }); + tasks.AddActivityFunc(actName, (ctx, input) => $"done:{input}"); + }, + worker => + { + worker.UseLoggingPlugin(); + worker.UseMetricsPlugin(store); + worker.UseAuthorizationPlugin(new AllowAllHandler()); + }); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("done:multi"); + + // Metrics plugin should have recorded counts because all plugins fire. + store.GetOrchestrationMetrics(orchName).Started.Should().BeGreaterOrEqualTo(1); + store.GetActivityMetrics(actName).Started.Should().BeGreaterOrEqualTo(1); + this.output.WriteLine($"Multiple plugins OK. Orch started={store.GetOrchestrationMetrics(orchName).Started}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Test 9: Plugin with both tasks and interceptors works E2E ─── + + [Fact] + public async Task PluginWithTasksAndInterceptors_WorksE2E() + { + this.SkipIfNoConnectionString(); + string orchName = $"FullPlugOrch_{this.testId}"; + string pluginActName = $"FullPlugAct_{this.testId}"; + MetricsStore store = new(); + + SimplePlugin fullPlugin = SimplePlugin.NewBuilder("E2E.FullPlugin") + .AddTasks(registry => + { + registry.AddActivityFunc(pluginActName, (ctx, input) => $"full:{input}"); + }) + .Build(); + + await this.fixture.StartAsync( + tasks => + { + tasks.AddOrchestratorFunc(orchName, async ctx => + { + return await ctx.CallActivityAsync(pluginActName, "test"); + }); + }, + worker => + { + worker.UsePlugin(fullPlugin); + worker.UseMetricsPlugin(store); + }); + + string instanceId = await this.fixture.Client.ScheduleNewOrchestrationInstanceAsync(orchName); + + OrchestrationMetadata result = await this.fixture.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, new CancellationTokenSource(TimeSpan.FromSeconds(60)).Token); + + result.RuntimeStatus.Should().Be(OrchestrationRuntimeStatus.Completed); + result.SerializedOutput.Should().Contain("full:test"); + + // Metrics interceptors wrapped the plugin-provided activity too. + store.GetActivityMetrics(pluginActName).Started.Should().BeGreaterOrEqualTo(1); + this.output.WriteLine($"Full plugin OK. Activity started={store.GetActivityMetrics(pluginActName).Started}"); + + await this.fixture.Client.PurgeInstanceAsync(instanceId); + } + + // ─── Helper classes ─── + + sealed class DenyAllHandler : IAuthorizationHandler + { + public Task AuthorizeAsync(AuthorizationContext context) => Task.FromResult(false); + } + + sealed class AllowAllHandler : IAuthorizationHandler + { + public Task AuthorizeAsync(AuthorizationContext context) => Task.FromResult(true); + } + + sealed class NonEmptyStringValidator : IInputValidator + { + readonly string targetActivity; + + public NonEmptyStringValidator(string targetActivity) => this.targetActivity = targetActivity; + + public Task ValidateAsync(TaskName taskName, object? input) + { + if (taskName.Name == this.targetActivity && input is string s && string.IsNullOrEmpty(s)) + { + return Task.FromResult(ValidationResult.Failure("Input must be non-empty.")); + } + + return Task.FromResult(ValidationResult.Success); + } + } +} + +/// +/// Custom xUnit exception for skipped tests. +/// +public class SkipException : Exception +{ + public SkipException(string message) : base(message) { } +} From bfa95311a6e9e31aae1e837cefa9bb6a6dc54cd2 Mon Sep 17 00:00:00 2001 From: peterstone2017 <12449837+YunchuWang@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:37:16 -0700 Subject: [PATCH 4/4] Fix CI: remove E2E test project from solution The E2E tests require DTS_CONNECTION_STRING (real DTS scheduler) and cannot run in CI. Removed the E2E test project from the solution file so CI's 'dotnet test \' no longer includes it. E2E tests can still be run locally: \ = '...' dotnet test test/Extensions.Plugins.E2ETests/Extensions.Plugins.E2ETests.csproj Tests now return early with output message when connection string is not set. Verified locally: - dotnet build Microsoft.DurableTask.sln --configuration release - dotnet test Microsoft.DurableTask.sln --configuration release (exit 0) - dotnet pack Microsoft.DurableTask.sln --configuration release --- Microsoft.DurableTask.sln | 33 ----------------- .../PluginE2ETests.cs | 37 +++++++++---------- 2 files changed, 18 insertions(+), 52 deletions(-) diff --git a/Microsoft.DurableTask.sln b/Microsoft.DurableTask.sln index 25722bed0..26945446a 100644 --- a/Microsoft.DurableTask.sln +++ b/Microsoft.DurableTask.sln @@ -115,24 +115,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Extensions", "Extensions", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Plugins", "src\Extensions\Plugins\Plugins.csproj", "{464EF328-1A43-417C-BC9D-C1F808D2C3B8}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Core", "Core", "{9462A531-9647-6067-D102-00175DB73A6F}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Core", "Core", "{D4D9077D-1CEC-0E01-C5EE-AFAD11489446}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions.Plugins.Tests", "test\Extensions.Plugins.Tests\Extensions.Plugins.Tests.csproj", "{09D76001-410E-4308-9156-01A9E7F5400B}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PluginsSample", "samples\PluginsSample\PluginsSample.csproj", "{BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}" EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{51A52603-541D-DE3F-2825-C80F9EE6C532}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{3B8F957E-7773-4C0C-ACD7-91A1591D9312}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Extensions.Plugins.E2ETests", "test\Extensions.Plugins.E2ETests\Extensions.Plugins.E2ETests.csproj", "{0926BD30-3713-488B-8043-761BB3C33014}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureManaged", "AzureManaged", "{D4587EC0-1B16-8420-7502-A967139249D4}" -EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureManaged", "AzureManaged", "{53193780-CD18-2643-6953-C26F59EAEDF5}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -731,18 +717,6 @@ Global {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x64.Build.0 = Release|Any CPU {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x86.ActiveCfg = Release|Any CPU {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD}.Release|x86.Build.0 = Release|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Debug|Any CPU.Build.0 = Debug|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Debug|x64.ActiveCfg = Debug|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Debug|x64.Build.0 = Debug|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Debug|x86.ActiveCfg = Debug|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Debug|x86.Build.0 = Debug|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Release|Any CPU.ActiveCfg = Release|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Release|Any CPU.Build.0 = Release|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Release|x64.ActiveCfg = Release|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Release|x64.Build.0 = Release|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Release|x86.ActiveCfg = Release|Any CPU - {0926BD30-3713-488B-8043-761BB3C33014}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -800,15 +774,8 @@ Global {4A7305AE-AAAE-43AE-AAB2-DA58DACC6FA8} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} {21303FBF-2A2B-17C2-D2DF-3E924022E940} = {8AFC9781-F6F1-4696-BB4A-9ED7CA9D612B} {464EF328-1A43-417C-BC9D-C1F808D2C3B8} = {21303FBF-2A2B-17C2-D2DF-3E924022E940} - {9462A531-9647-6067-D102-00175DB73A6F} = {1C217BB2-CE16-41CC-9D47-0FC0DB60BDB3} - {D4D9077D-1CEC-0E01-C5EE-AFAD11489446} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} {09D76001-410E-4308-9156-01A9E7F5400B} = {E5637F81-2FB9-4CD7-900D-455363B142A7} {BC7110D2-21D6-4A73-88FD-D4D2A2A670DD} = {EFF7632B-821E-4CFC-B4A0-ED4B24296B17} - {51A52603-541D-DE3F-2825-C80F9EE6C532} = {1C217BB2-CE16-41CC-9D47-0FC0DB60BDB3} - {3B8F957E-7773-4C0C-ACD7-91A1591D9312} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} - {0926BD30-3713-488B-8043-761BB3C33014} = {E5637F81-2FB9-4CD7-900D-455363B142A7} - {D4587EC0-1B16-8420-7502-A967139249D4} = {1C217BB2-CE16-41CC-9D47-0FC0DB60BDB3} - {53193780-CD18-2643-6953-C26F59EAEDF5} = {5B448FF6-EC42-491D-A22E-1DC8B618E6D5} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AB41CB55-35EA-4986-A522-387AB3402E71} diff --git a/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs b/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs index 3072c0a0a..287ec9cfd 100644 --- a/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs +++ b/test/Extensions.Plugins.E2ETests/PluginE2ETests.cs @@ -33,14 +33,19 @@ public PluginE2ETests(ITestOutputHelper output) public async Task DisposeAsync() => await this.fixture.DisposeAsync(); - void SkipIfNoConnectionString() + /// + /// Returns true if DTS_CONNECTION_STRING is set; tests should return early if false. + /// + bool HasConnectionString() { string? cs = DtsFixture.GetConnectionString(); if (string.IsNullOrEmpty(cs)) { - throw new SkipException( - "DTS_CONNECTION_STRING not set. Set it to run E2E tests against a real DTS scheduler."); + this.output.WriteLine("SKIPPED: DTS_CONNECTION_STRING not set."); + return false; } + + return true; } // ─── Test 1: Metrics plugin tracks orchestration and activity counts ─── @@ -48,7 +53,7 @@ void SkipIfNoConnectionString() [Fact] public async Task MetricsPlugin_TracksExecutionCounts_E2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"MetricsOrch_{this.testId}"; string actName = $"MetricsAct_{this.testId}"; MetricsStore store = new(); @@ -93,7 +98,7 @@ await this.fixture.StartAsync( [Fact] public async Task LoggingPlugin_DoesNotBreakExecution_E2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"LogOrch_{this.testId}"; string actName = $"LogAct_{this.testId}"; @@ -126,7 +131,7 @@ await this.fixture.StartAsync( [Fact] public async Task AuthorizationPlugin_BlocksUnauthorized_E2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"AuthBlockOrch_{this.testId}"; string actName = $"AuthBlockAct_{this.testId}"; @@ -161,7 +166,7 @@ await this.fixture.StartAsync( [Fact] public async Task AuthorizationPlugin_AllowsAuthorized_E2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"AuthAllowOrch_{this.testId}"; string actName = $"AuthAllowAct_{this.testId}"; @@ -193,7 +198,7 @@ await this.fixture.StartAsync( [Fact] public async Task ValidationPlugin_RejectsInvalidInput_E2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"ValRejectOrch_{this.testId}"; string actName = $"ValRejectAct_{this.testId}"; @@ -230,7 +235,7 @@ await this.fixture.StartAsync( [Fact] public async Task ValidationPlugin_AcceptsValidInput_E2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"ValAcceptOrch_{this.testId}"; string actName = $"ValAcceptAct_{this.testId}"; @@ -264,7 +269,7 @@ await this.fixture.StartAsync( [Fact] public async Task PluginRegisteredActivity_IsCallable_E2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"PluginTaskOrch_{this.testId}"; string pluginActName = $"PluginAct_{this.testId}"; @@ -303,7 +308,7 @@ await this.fixture.StartAsync( [Fact] public async Task MultiplePlugins_AllInterceptorsFire_E2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"MultiPlugOrch_{this.testId}"; string actName = $"MultiPlugAct_{this.testId}"; MetricsStore store = new(); @@ -345,7 +350,7 @@ await this.fixture.StartAsync( [Fact] public async Task PluginWithTasksAndInterceptors_WorksE2E() { - this.SkipIfNoConnectionString(); + if (!this.HasConnectionString()) return; string orchName = $"FullPlugOrch_{this.testId}"; string pluginActName = $"FullPlugAct_{this.testId}"; MetricsStore store = new(); @@ -416,10 +421,4 @@ public Task ValidateAsync(TaskName taskName, object? input) } } -/// -/// Custom xUnit exception for skipped tests. -/// -public class SkipException : Exception -{ - public SkipException(string message) : base(message) { } -} +