From 882ea20efdb34be3e0a46785bfd4dd2759bb2c2e Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Mon, 26 Jan 2026 17:39:52 -0800 Subject: [PATCH 01/13] Fix GrpcChannel handle leak in AzureManaged backend - Add GrpcChannelCache for thread-safe channel caching by endpoint - Update Client/Worker extensions to use shared cache - Ensure channels are disposed when ServiceProvider disposes - Add comprehensive unit and integration tests --- .../DurableTaskSchedulerClientExtensions.cs | 20 +- src/Shared/AzureManaged/GrpcChannelCache.cs | 173 +++++++ .../DurableTaskSchedulerWorkerExtensions.cs | 20 +- ...rableTaskSchedulerClientExtensionsTests.cs | 53 +++ .../GrpcChannelCacheTests.cs | 427 ++++++++++++++++++ ...rableTaskSchedulerWorkerExtensionsTests.cs | 54 +++ 6 files changed, 741 insertions(+), 6 deletions(-) create mode 100644 src/Shared/AzureManaged/GrpcChannelCache.cs create mode 100644 test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index b11f450a6..bdcb1246a 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using Azure.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -91,6 +92,10 @@ static void ConfigureSchedulerOptions( options.EnableEntitySupport = true; }); + // Register the channel cache as a singleton to ensure channels are reused + // and properly disposed when the service provider is disposed. + builder.Services.TryAddSingleton(); + builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -101,7 +106,10 @@ static void ConfigureSchedulerOptions( /// using the provided Durable Task Scheduler options. /// /// Monitor for accessing the current scheduler options configuration. - class ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) : + /// Cache for gRPC channels to ensure reuse and proper disposal. + class ConfigureGrpcChannel( + IOptionsMonitor schedulerOptions, + GrpcChannelCache channelCache) : IConfigureNamedOptions { /// @@ -117,8 +125,14 @@ class ConfigureGrpcChannel(IOptionsMonitor sc /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskClientOptions options) { - DurableTaskSchedulerClientOptions source = schedulerOptions.Get(name ?? Options.DefaultName); - options.Channel = source.CreateChannel(); + string optionsName = name ?? Options.DefaultName; + DurableTaskSchedulerClientOptions source = schedulerOptions.Get(optionsName); + + // Create a cache key based on the options name, endpoint, and task hub. + // This ensures channels are reused for the same configuration + // but separate channels are created for different configurations. + string cacheKey = $"client:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel()); } } } diff --git a/src/Shared/AzureManaged/GrpcChannelCache.cs b/src/Shared/AzureManaged/GrpcChannelCache.cs new file mode 100644 index 000000000..3235ae828 --- /dev/null +++ b/src/Shared/AzureManaged/GrpcChannelCache.cs @@ -0,0 +1,173 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Concurrent; +using Grpc.Net.Client; + +namespace Microsoft.DurableTask; + +/// +/// Thread-safe cache for gRPC channels that ensures channels are reused across retries/calls +/// and properly disposed when replaced or evicted. +/// +sealed class GrpcChannelCache : IDisposable +{ + readonly ConcurrentDictionary channels = new(); + readonly object syncLock = new(); + volatile bool disposed; + + /// + /// Gets or creates a cached gRPC channel for the specified key. + /// If a channel already exists for the key, it is returned. + /// If the factory creates a new channel, any existing channel for the key is disposed. + /// + /// The cache key (typically endpoint + taskhub combination). + /// Factory function to create a new channel if needed. + /// The cached or newly created gRPC channel. + public GrpcChannel GetOrCreate(string key, Func channelFactory) + { + Check.NotNullOrEmpty(key); + Check.NotNull(channelFactory); + + if (this.disposed) + { + throw new ObjectDisposedException(nameof(GrpcChannelCache)); + } + + // Fast path: return existing channel + if (this.channels.TryGetValue(key, out GrpcChannel? existingChannel)) + { + return existingChannel; + } + + // Slow path: create new channel with synchronization to avoid creating multiple channels + lock (this.syncLock) + { + if (this.disposed) + { + throw new ObjectDisposedException(nameof(GrpcChannelCache)); + } + + // Double-check after acquiring lock + if (this.channels.TryGetValue(key, out existingChannel)) + { + return existingChannel; + } + + GrpcChannel newChannel = channelFactory(); + this.channels[key] = newChannel; + return newChannel; + } + } + + /// + /// Replaces an existing channel for the specified key with a new one, + /// disposing the old channel if it exists. + /// + /// The cache key. + /// The new channel to cache. + public void Replace(string key, GrpcChannel newChannel) + { + Check.NotNullOrEmpty(key); + Check.NotNull(newChannel); + + if (this.disposed) + { + throw new ObjectDisposedException(nameof(GrpcChannelCache)); + } + + GrpcChannel? oldChannel = null; + + lock (this.syncLock) + { + if (this.disposed) + { + throw new ObjectDisposedException(nameof(GrpcChannelCache)); + } + + if (this.channels.TryGetValue(key, out oldChannel)) + { + // Only replace if it's actually a different channel + if (ReferenceEquals(oldChannel, newChannel)) + { + return; + } + } + + this.channels[key] = newChannel; + } + + // Dispose the old channel outside the lock to avoid potential deadlocks + DisposeChannelAsync(oldChannel); + } + + /// + /// Removes and disposes a channel for the specified key. + /// + /// The cache key. + /// True if a channel was removed; otherwise, false. + public bool TryRemove(string key) + { + Check.NotNullOrEmpty(key); + + if (this.channels.TryRemove(key, out GrpcChannel? channel)) + { + DisposeChannelAsync(channel); + return true; + } + + return false; + } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + lock (this.syncLock) + { + if (this.disposed) + { + return; + } + + this.disposed = true; + + foreach (KeyValuePair kvp in this.channels) + { + DisposeChannelAsync(kvp.Value); + } + + this.channels.Clear(); + } + } + + static void DisposeChannelAsync(GrpcChannel? channel) + { + if (channel == null) + { + return; + } + + // ShutdownAsync is the graceful way to close a gRPC channel + // We fire-and-forget but ensure the channel is eventually disposed + _ = Task.Run(async () => + { + try + { + await channel.ShutdownAsync(); + } + catch + { + // Ignore shutdown errors during disposal + } + finally + { + channel.Dispose(); + } + }); + } +} diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 7a3baa414..7360dd124 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using Azure.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; using Microsoft.DurableTask.Worker.Grpc.Internal; using Microsoft.Extensions.DependencyInjection; @@ -93,6 +94,10 @@ static void ConfigureSchedulerOptions( options.EnableEntitySupport = true; }); + // Register the channel cache as a singleton to ensure channels are reused + // and properly disposed when the service provider is disposed. + builder.Services.TryAddSingleton(); + builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -103,7 +108,10 @@ static void ConfigureSchedulerOptions( /// using the provided Durable Task Scheduler options. /// /// Monitor for accessing the current scheduler options configuration. - class ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) : + /// Cache for gRPC channels to ensure reuse and proper disposal. + class ConfigureGrpcChannel( + IOptionsMonitor schedulerOptions, + GrpcChannelCache channelCache) : IConfigureNamedOptions { /// @@ -119,8 +127,14 @@ class ConfigureGrpcChannel(IOptionsMonitor sc /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { - DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(name ?? Options.DefaultName); - options.Channel = source.CreateChannel(); + string optionsName = name ?? Options.DefaultName; + DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(optionsName); + + // Create a cache key based on the options name, endpoint, and task hub. + // This ensures channels are reused for the same configuration + // but separate channels are created for different configurations. + string cacheKey = $"worker:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel()); options.ConfigureForAzureManaged(); } } diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 3c760ddf6..77e64055e 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -280,4 +280,57 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo clientOptions.RetryOptions.RetryableStatusCodes.Should().Contain(StatusCode.Unknown); } } + + [Fact] + public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options multiple times to trigger channel configuration + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); + } + + [Fact] + public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two different named clients with different endpoints + mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); + } } diff --git a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs new file mode 100644 index 000000000..bb5b7cd7e --- /dev/null +++ b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs @@ -0,0 +1,427 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using FluentAssertions; +using Grpc.Net.Client; +using Xunit; + +namespace Microsoft.DurableTask.Tests; + +public class GrpcChannelCacheTests +{ + const string TestEndpoint = "http://localhost:5000"; + + [Fact] + public void GetOrCreate_SameKey_ReturnsSameChannel() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "test-key"; + int factoryCallCount = 0; + GrpcChannel Factory() + { + factoryCallCount++; + return GrpcChannel.ForAddress(TestEndpoint); + } + + // Act + GrpcChannel channel1 = cache.GetOrCreate(key, Factory); + GrpcChannel channel2 = cache.GetOrCreate(key, Factory); + + // Assert + channel1.Should().BeSameAs(channel2); + factoryCallCount.Should().Be(1, "factory should only be called once for the same key"); + } + + [Fact] + public void GetOrCreate_DifferentKeys_ReturnsDifferentChannels() + { + // Arrange + using GrpcChannelCache cache = new(); + string key1 = "key1"; + string key2 = "key2"; + + // Act + GrpcChannel channel1 = cache.GetOrCreate(key1, () => GrpcChannel.ForAddress(TestEndpoint)); + GrpcChannel channel2 = cache.GetOrCreate(key2, () => GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + channel1.Should().NotBeSameAs(channel2); + } + + [Fact] + public void GetOrCreate_ConcurrentAccess_CreatesSingleChannel() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "concurrent-key"; + int factoryCallCount = 0; + object countLock = new(); + GrpcChannel Factory() + { + lock (countLock) + { + factoryCallCount++; + } + + // Add small delay to increase chance of race conditions + Thread.Sleep(10); + return GrpcChannel.ForAddress(TestEndpoint); + } + + // Act + GrpcChannel[] channels = new GrpcChannel[10]; + Parallel.For(0, 10, i => + { + channels[i] = cache.GetOrCreate(key, Factory); + }); + + // Assert + factoryCallCount.Should().Be(1, "factory should only be called once even with concurrent access"); + channels.All(c => ReferenceEquals(c, channels[0])).Should().BeTrue("all channels should be the same instance"); + } + + [Fact] + public void Replace_ExistingChannel_DisposesOldChannel() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "replace-key"; + GrpcChannel oldChannel = GrpcChannel.ForAddress(TestEndpoint); + GrpcChannel newChannel = GrpcChannel.ForAddress(TestEndpoint); + cache.GetOrCreate(key, () => oldChannel); + + // Act + cache.Replace(key, newChannel); + GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); + + // Assert + retrievedChannel.Should().BeSameAs(newChannel); + retrievedChannel.Should().NotBeSameAs(oldChannel); + } + + [Fact] + public void Replace_SameChannel_DoesNothing() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "same-channel-key"; + GrpcChannel channel = GrpcChannel.ForAddress(TestEndpoint); + cache.GetOrCreate(key, () => channel); + + // Act & Assert - should not throw or change anything + cache.Replace(key, channel); + GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); + retrievedChannel.Should().BeSameAs(channel); + } + + [Fact] + public void Replace_NonExistingKey_AddsChannel() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "new-key"; + GrpcChannel channel = GrpcChannel.ForAddress(TestEndpoint); + + // Act + cache.Replace(key, channel); + GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); + + // Assert + retrievedChannel.Should().BeSameAs(channel); + } + + [Fact] + public void TryRemove_ExistingKey_RemovesAndReturnsTrue() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "remove-key"; + cache.GetOrCreate(key, () => GrpcChannel.ForAddress(TestEndpoint)); + + // Act + bool result = cache.TryRemove(key); + + // Assert + result.Should().BeTrue(); + + // Verify the key is removed by checking that a new channel is created + int factoryCallCount = 0; + cache.GetOrCreate(key, () => + { + factoryCallCount++; + return GrpcChannel.ForAddress(TestEndpoint); + }); + factoryCallCount.Should().Be(1, "a new channel should be created after removal"); + } + + [Fact] + public void TryRemove_NonExistingKey_ReturnsFalse() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "non-existing-key"; + + // Act + bool result = cache.TryRemove(key); + + // Assert + result.Should().BeFalse(); + } + + [Fact] + public void Dispose_DisposesAllChannels() + { + // Arrange + GrpcChannelCache cache = new(); + cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); + cache.GetOrCreate("key2", () => GrpcChannel.ForAddress(TestEndpoint)); + cache.GetOrCreate("key3", () => GrpcChannel.ForAddress(TestEndpoint)); + + // Act + cache.Dispose(); + + // Assert - attempting to use the cache after disposal should throw + Action action = () => cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); + action.Should().Throw(); + } + + [Fact] + public void Dispose_MultipleCalls_DoesNotThrow() + { + // Arrange + GrpcChannelCache cache = new(); + cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); + + // Act & Assert - multiple dispose calls should not throw + cache.Dispose(); + cache.Dispose(); + cache.Dispose(); + } + + [Fact] + public void GetOrCreate_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + GrpcChannelCache cache = new(); + cache.Dispose(); + + // Act + Action action = () => cache.GetOrCreate("key", () => GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw() + .WithMessage("*GrpcChannelCache*"); + } + + [Fact] + public void Replace_AfterDispose_ThrowsObjectDisposedException() + { + // Arrange + GrpcChannelCache cache = new(); + cache.Dispose(); + + // Act + Action action = () => cache.Replace("key", GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw() + .WithMessage("*GrpcChannelCache*"); + } + + [Fact] + public void GetOrCreate_WithNullKey_ThrowsArgumentException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.GetOrCreate(null!, () => GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void GetOrCreate_WithEmptyKey_ThrowsArgumentException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.GetOrCreate(string.Empty, () => GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void GetOrCreate_WithNullFactory_ThrowsArgumentNullException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.GetOrCreate("key", null!); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void Replace_WithNullKey_ThrowsArgumentException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.Replace(null!, GrpcChannel.ForAddress(TestEndpoint)); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void Replace_WithNullChannel_ThrowsArgumentNullException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.Replace("key", null!); + + // Assert + action.Should().Throw(); + } + + [Fact] + public void TryRemove_WithNullKey_ThrowsArgumentException() + { + // Arrange + using GrpcChannelCache cache = new(); + + // Act + Action action = () => cache.TryRemove(null!); + + // Assert + action.Should().Throw(); + } + + /// + /// This test verifies the core fix for the handle leak issue. + /// Without the cache, each call to configure options would create a new GrpcChannel, + /// causing handle count to grow unbounded when the service is unreachable. + /// With the cache, repeated calls reuse the same channel, preventing handle leaks. + /// + [Fact] + public void GetOrCreate_SimulatesRetryScenario_DoesNotCreateMultipleChannels() + { + // Arrange + using GrpcChannelCache cache = new(); + string key = "client:default:myendpoint.durabletask.io:myhub"; + int factoryCallCount = 0; + + GrpcChannel CreateChannel() + { + factoryCallCount++; + // Each GrpcChannel creates HttpClient + SocketsHttpHandler internally, + // which allocates socket handles. Without caching, this would leak handles. + return GrpcChannel.ForAddress(TestEndpoint); + } + + // Act - Simulate what happens during retries when service is unreachable: + // The options configuration callback may be invoked multiple times + const int retryAttempts = 100; + GrpcChannel[] channels = new GrpcChannel[retryAttempts]; + for (int i = 0; i < retryAttempts; i++) + { + channels[i] = cache.GetOrCreate(key, CreateChannel); + } + + // Assert - The factory should only be called ONCE, not 100 times + // This is the key behavior that prevents handle accumulation + factoryCallCount.Should().Be(1, + "the channel factory should only be called once regardless of how many times GetOrCreate is called - " + + "this is what prevents handle leaks when the service is unreachable"); + + // All returned channels should be the exact same instance + channels.All(c => ReferenceEquals(c, channels[0])).Should().BeTrue( + "all calls should return the same cached channel instance"); + } + + /// + /// Verifies that the old behavior (without cache) would create multiple channels. + /// This demonstrates what the cache prevents. + /// + [Fact] + public void WithoutCache_MultipleCallsCreateMultipleChannels() + { + // Arrange - simulate old behavior without cache + int factoryCallCount = 0; + List channels = new(); + + GrpcChannel CreateChannelWithoutCache() + { + factoryCallCount++; + return GrpcChannel.ForAddress(TestEndpoint); + } + + // Act - Without caching, each "retry" creates a new channel + const int retryAttempts = 10; + for (int i = 0; i < retryAttempts; i++) + { + // This simulates the OLD behavior before the fix + channels.Add(CreateChannelWithoutCache()); + } + + // Assert - Each call creates a new channel (the problematic behavior we fixed) + factoryCallCount.Should().Be(retryAttempts, + "without caching, each call creates a new channel - this causes handle leaks"); + + // All channels are different instances + channels.Distinct().Count().Should().Be(retryAttempts, + "without caching, each channel is a unique instance with its own handles"); + + // Cleanup + foreach (var channel in channels) + { + channel.Dispose(); + } + } + + /// + /// Verifies channels are properly disposed when the cache is disposed, + /// which releases the associated handles. + /// + [Fact] + public async Task Dispose_ReleasesChannelResources() + { + // Arrange + GrpcChannelCache cache = new(); + List createdChannels = new(); + + // Create multiple channels through the cache + for (int i = 0; i < 5; i++) + { + string key = $"key{i}"; + GrpcChannel channel = cache.GetOrCreate(key, () => + { + var c = GrpcChannel.ForAddress(TestEndpoint); + createdChannels.Add(c); + return c; + }); + } + + createdChannels.Count.Should().Be(5); + + // Act - Dispose the cache (this should dispose all channels) + cache.Dispose(); + + // Wait a bit for async disposal to complete + await Task.Delay(100); + + // Assert - The cache should be disposed and unusable + Action action = () => cache.GetOrCreate("new-key", () => GrpcChannel.ForAddress(TestEndpoint)); + action.Should().Throw(); + } +} diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index a661b53d3..1510c7e56 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -198,4 +198,58 @@ public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() options.ResourceId.Should().Be("https://durabletask.io"); options.AllowInsecureCredentials.Should().BeFalse(); } + + [Fact] + public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options multiple times to trigger channel configuration + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get(Options.DefaultName); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get(Options.DefaultName); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().BeSameAs(options2.Channel, "same configuration should reuse the same channel"); + } + + [Fact] + public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two different named workers with different endpoints + mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); + } } + From 387c960168bf90b136e4e24f781e0d33d661a06b Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Mon, 26 Jan 2026 17:53:23 -0800 Subject: [PATCH 02/13] Address PR review comments and fix potential deadlock - Move channel factory call outside lock to prevent deadlock - Combine nested if statements in Replace method - Use 'using' statement for channel disposal - Catch Exception instead of bare catch - Remove unused variable in test --- src/Shared/AzureManaged/GrpcChannelCache.cs | 40 ++++++++++--------- .../GrpcChannelCacheTests.cs | 4 +- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/Shared/AzureManaged/GrpcChannelCache.cs b/src/Shared/AzureManaged/GrpcChannelCache.cs index 3235ae828..13694aeb9 100644 --- a/src/Shared/AzureManaged/GrpcChannelCache.cs +++ b/src/Shared/AzureManaged/GrpcChannelCache.cs @@ -40,21 +40,26 @@ public GrpcChannel GetOrCreate(string key, Func channelFactory) return existingChannel; } - // Slow path: create new channel with synchronization to avoid creating multiple channels + // Create channel outside lock to avoid potential deadlock if factory calls back into cache + GrpcChannel newChannel = channelFactory(); + lock (this.syncLock) { if (this.disposed) { + // Cache was disposed while we were creating the channel - dispose and throw + DisposeChannelAsync(newChannel); throw new ObjectDisposedException(nameof(GrpcChannelCache)); } - // Double-check after acquiring lock + // Check if another thread added a channel while we were creating ours if (this.channels.TryGetValue(key, out existingChannel)) { + // Dispose our duplicate and return the existing one + DisposeChannelAsync(newChannel); return existingChannel; } - GrpcChannel newChannel = channelFactory(); this.channels[key] = newChannel; return newChannel; } @@ -85,13 +90,11 @@ public void Replace(string key, GrpcChannel newChannel) throw new ObjectDisposedException(nameof(GrpcChannelCache)); } - if (this.channels.TryGetValue(key, out oldChannel)) + // Only replace if it's actually a different channel + if (this.channels.TryGetValue(key, out oldChannel) && + ReferenceEquals(oldChannel, newChannel)) { - // Only replace if it's actually a different channel - if (ReferenceEquals(oldChannel, newChannel)) - { - return; - } + return; } this.channels[key] = newChannel; @@ -156,17 +159,16 @@ static void DisposeChannelAsync(GrpcChannel? channel) // We fire-and-forget but ensure the channel is eventually disposed _ = Task.Run(async () => { - try - { - await channel.ShutdownAsync(); - } - catch + using (channel) { - // Ignore shutdown errors during disposal - } - finally - { - channel.Dispose(); + try + { + await channel.ShutdownAsync(); + } + catch (Exception) + { + // Ignore shutdown errors during disposal + } } }); } diff --git a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs index bb5b7cd7e..eb483f00c 100644 --- a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs +++ b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs @@ -404,9 +404,9 @@ public async Task Dispose_ReleasesChannelResources() for (int i = 0; i < 5; i++) { string key = $"key{i}"; - GrpcChannel channel = cache.GetOrCreate(key, () => + cache.GetOrCreate(key, () => { - var c = GrpcChannel.ForAddress(TestEndpoint); + GrpcChannel c = GrpcChannel.ForAddress(TestEndpoint); createdChannels.Add(c); return c; }); From 9ef84a9242632ba4d63ddbcd0e293e05215c0fdd Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Mon, 26 Jan 2026 18:01:56 -0800 Subject: [PATCH 03/13] Simplify channel caching per review feedback - Remove separate GrpcChannelCache class - Inline channel caching directly in ConfigureGrpcChannel using ConcurrentDictionary> - Make ConfigureGrpcChannel implement IDisposable for proper channel disposal - Remove unused Replace() and TryRemove() methods - Add disposal verification tests - Reduces complexity from 170+ LOC to ~40 LOC per extension --- .../DurableTaskSchedulerClientExtensions.cs | 84 +++- src/Shared/AzureManaged/GrpcChannelCache.cs | 175 ------- .../DurableTaskSchedulerWorkerExtensions.cs | 84 +++- ...rableTaskSchedulerClientExtensionsTests.cs | 35 ++ .../GrpcChannelCacheTests.cs | 427 ------------------ ...rableTaskSchedulerWorkerExtensionsTests.cs | 35 ++ 6 files changed, 212 insertions(+), 628 deletions(-) delete mode 100644 src/Shared/AzureManaged/GrpcChannelCache.cs delete mode 100644 test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index bdcb1246a..f57458c00 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Concurrent; using Azure.Core; using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; @@ -92,10 +93,6 @@ static void ConfigureSchedulerOptions( options.EnableEntitySupport = true; }); - // Register the channel cache as a singleton to ensure channels are reused - // and properly disposed when the service provider is disposed. - builder.Services.TryAddSingleton(); - builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -104,14 +101,23 @@ static void ConfigureSchedulerOptions( /// /// Configuration class that sets up gRPC channels for client options /// using the provided Durable Task Scheduler options. + /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - /// Monitor for accessing the current scheduler options configuration. - /// Cache for gRPC channels to ensure reuse and proper disposal. - class ConfigureGrpcChannel( - IOptionsMonitor schedulerOptions, - GrpcChannelCache channelCache) : - IConfigureNamedOptions + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable { + readonly IOptionsMonitor schedulerOptions; + readonly ConcurrentDictionary> channels = new(); + volatile bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// Monitor for accessing the current scheduler options configuration. + public ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) + { + this.schedulerOptions = schedulerOptions; + } + /// /// Configures the default named options instance. /// @@ -125,14 +131,66 @@ class ConfigureGrpcChannel( /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskClientOptions options) { +#if NET7_0_OR_GREATER + ObjectDisposedException.ThrowIf(this.disposed, this); +#else + if (this.disposed) + { + throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); + } +#endif + string optionsName = name ?? Options.DefaultName; - DurableTaskSchedulerClientOptions source = schedulerOptions.Get(optionsName); + DurableTaskSchedulerClientOptions source = this.schedulerOptions.Get(optionsName); // Create a cache key based on the options name, endpoint, and task hub. // This ensures channels are reused for the same configuration // but separate channels are created for different configurations. - string cacheKey = $"client:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; - options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel()); + string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = this.channels.GetOrAdd( + cacheKey, + _ => new Lazy(source.CreateChannel)).Value; + } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + this.disposed = true; + + foreach (KeyValuePair> kvp in this.channels) + { + if (kvp.Value.IsValueCreated) + { + DisposeChannel(kvp.Value.Value); + } + } + + this.channels.Clear(); + } + + static void DisposeChannel(GrpcChannel channel) + { + // ShutdownAsync is the graceful way to close a gRPC channel. + // Fire-and-forget but ensure the channel is eventually disposed. + _ = Task.Run(async () => + { + using (channel) + { + try + { + await channel.ShutdownAsync(); + } + catch (Exception) + { + // Ignore shutdown errors during disposal + } + } + }); } } } diff --git a/src/Shared/AzureManaged/GrpcChannelCache.cs b/src/Shared/AzureManaged/GrpcChannelCache.cs deleted file mode 100644 index 13694aeb9..000000000 --- a/src/Shared/AzureManaged/GrpcChannelCache.cs +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Collections.Concurrent; -using Grpc.Net.Client; - -namespace Microsoft.DurableTask; - -/// -/// Thread-safe cache for gRPC channels that ensures channels are reused across retries/calls -/// and properly disposed when replaced or evicted. -/// -sealed class GrpcChannelCache : IDisposable -{ - readonly ConcurrentDictionary channels = new(); - readonly object syncLock = new(); - volatile bool disposed; - - /// - /// Gets or creates a cached gRPC channel for the specified key. - /// If a channel already exists for the key, it is returned. - /// If the factory creates a new channel, any existing channel for the key is disposed. - /// - /// The cache key (typically endpoint + taskhub combination). - /// Factory function to create a new channel if needed. - /// The cached or newly created gRPC channel. - public GrpcChannel GetOrCreate(string key, Func channelFactory) - { - Check.NotNullOrEmpty(key); - Check.NotNull(channelFactory); - - if (this.disposed) - { - throw new ObjectDisposedException(nameof(GrpcChannelCache)); - } - - // Fast path: return existing channel - if (this.channels.TryGetValue(key, out GrpcChannel? existingChannel)) - { - return existingChannel; - } - - // Create channel outside lock to avoid potential deadlock if factory calls back into cache - GrpcChannel newChannel = channelFactory(); - - lock (this.syncLock) - { - if (this.disposed) - { - // Cache was disposed while we were creating the channel - dispose and throw - DisposeChannelAsync(newChannel); - throw new ObjectDisposedException(nameof(GrpcChannelCache)); - } - - // Check if another thread added a channel while we were creating ours - if (this.channels.TryGetValue(key, out existingChannel)) - { - // Dispose our duplicate and return the existing one - DisposeChannelAsync(newChannel); - return existingChannel; - } - - this.channels[key] = newChannel; - return newChannel; - } - } - - /// - /// Replaces an existing channel for the specified key with a new one, - /// disposing the old channel if it exists. - /// - /// The cache key. - /// The new channel to cache. - public void Replace(string key, GrpcChannel newChannel) - { - Check.NotNullOrEmpty(key); - Check.NotNull(newChannel); - - if (this.disposed) - { - throw new ObjectDisposedException(nameof(GrpcChannelCache)); - } - - GrpcChannel? oldChannel = null; - - lock (this.syncLock) - { - if (this.disposed) - { - throw new ObjectDisposedException(nameof(GrpcChannelCache)); - } - - // Only replace if it's actually a different channel - if (this.channels.TryGetValue(key, out oldChannel) && - ReferenceEquals(oldChannel, newChannel)) - { - return; - } - - this.channels[key] = newChannel; - } - - // Dispose the old channel outside the lock to avoid potential deadlocks - DisposeChannelAsync(oldChannel); - } - - /// - /// Removes and disposes a channel for the specified key. - /// - /// The cache key. - /// True if a channel was removed; otherwise, false. - public bool TryRemove(string key) - { - Check.NotNullOrEmpty(key); - - if (this.channels.TryRemove(key, out GrpcChannel? channel)) - { - DisposeChannelAsync(channel); - return true; - } - - return false; - } - - /// - public void Dispose() - { - if (this.disposed) - { - return; - } - - lock (this.syncLock) - { - if (this.disposed) - { - return; - } - - this.disposed = true; - - foreach (KeyValuePair kvp in this.channels) - { - DisposeChannelAsync(kvp.Value); - } - - this.channels.Clear(); - } - } - - static void DisposeChannelAsync(GrpcChannel? channel) - { - if (channel == null) - { - return; - } - - // ShutdownAsync is the graceful way to close a gRPC channel - // We fire-and-forget but ensure the channel is eventually disposed - _ = Task.Run(async () => - { - using (channel) - { - try - { - await channel.ShutdownAsync(); - } - catch (Exception) - { - // Ignore shutdown errors during disposal - } - } - }); - } -} diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 7360dd124..21d5137f2 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +using System.Collections.Concurrent; using Azure.Core; using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; @@ -94,10 +95,6 @@ static void ConfigureSchedulerOptions( options.EnableEntitySupport = true; }); - // Register the channel cache as a singleton to ensure channels are reused - // and properly disposed when the service provider is disposed. - builder.Services.TryAddSingleton(); - builder.Services.TryAddEnumerable( ServiceDescriptor.Singleton, ConfigureGrpcChannel>()); builder.UseGrpc(_ => { }); @@ -106,14 +103,23 @@ static void ConfigureSchedulerOptions( /// /// Configuration class that sets up gRPC channels for worker options /// using the provided Durable Task Scheduler options. + /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - /// Monitor for accessing the current scheduler options configuration. - /// Cache for gRPC channels to ensure reuse and proper disposal. - class ConfigureGrpcChannel( - IOptionsMonitor schedulerOptions, - GrpcChannelCache channelCache) : - IConfigureNamedOptions + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable { + readonly IOptionsMonitor schedulerOptions; + readonly ConcurrentDictionary> channels = new(); + volatile bool disposed; + + /// + /// Initializes a new instance of the class. + /// + /// Monitor for accessing the current scheduler options configuration. + public ConfigureGrpcChannel(IOptionsMonitor schedulerOptions) + { + this.schedulerOptions = schedulerOptions; + } + /// /// Configures the default named options instance. /// @@ -127,15 +133,67 @@ class ConfigureGrpcChannel( /// The options instance to configure. public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { +#if NET7_0_OR_GREATER + ObjectDisposedException.ThrowIf(this.disposed, this); +#else + if (this.disposed) + { + throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); + } +#endif + string optionsName = name ?? Options.DefaultName; - DurableTaskSchedulerWorkerOptions source = schedulerOptions.Get(optionsName); + DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName); // Create a cache key based on the options name, endpoint, and task hub. // This ensures channels are reused for the same configuration // but separate channels are created for different configurations. - string cacheKey = $"worker:{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; - options.Channel = channelCache.GetOrCreate(cacheKey, () => source.CreateChannel()); + string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + options.Channel = this.channels.GetOrAdd( + cacheKey, + _ => new Lazy(source.CreateChannel)).Value; options.ConfigureForAzureManaged(); } + + /// + public void Dispose() + { + if (this.disposed) + { + return; + } + + this.disposed = true; + + foreach (KeyValuePair> kvp in this.channels) + { + if (kvp.Value.IsValueCreated) + { + DisposeChannel(kvp.Value.Value); + } + } + + this.channels.Clear(); + } + + static void DisposeChannel(GrpcChannel channel) + { + // ShutdownAsync is the graceful way to close a gRPC channel. + // Fire-and-forget but ensure the channel is eventually disposed. + _ = Task.Run(async () => + { + using (channel) + { + try + { + await channel.ShutdownAsync(); + } + catch (Exception) + { + // Ignore shutdown errors during disposal + } + } + }); + } } } diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 77e64055e..2b7fbdd22 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -333,4 +333,39 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() options2.Channel.Should().NotBeNull(); options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); } + + [Fact] + public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + + // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels + provider.Dispose(); + + // Assert - after disposal, creating a new provider and getting options should work + // (this verifies the old provider was properly cleaned up) + ServiceCollection services2 = new ServiceCollection(); + Mock mockBuilder2 = new Mock(); + mockBuilder2.Setup(b => b.Services).Returns(services2); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider2 = services2.BuildServiceProvider(); + + IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); + GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); + newOptions.Channel.Should().NotBeNull(); + newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + } } diff --git a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs b/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs deleted file mode 100644 index eb483f00c..000000000 --- a/test/Shared/AzureManaged.Tests/GrpcChannelCacheTests.cs +++ /dev/null @@ -1,427 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using FluentAssertions; -using Grpc.Net.Client; -using Xunit; - -namespace Microsoft.DurableTask.Tests; - -public class GrpcChannelCacheTests -{ - const string TestEndpoint = "http://localhost:5000"; - - [Fact] - public void GetOrCreate_SameKey_ReturnsSameChannel() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "test-key"; - int factoryCallCount = 0; - GrpcChannel Factory() - { - factoryCallCount++; - return GrpcChannel.ForAddress(TestEndpoint); - } - - // Act - GrpcChannel channel1 = cache.GetOrCreate(key, Factory); - GrpcChannel channel2 = cache.GetOrCreate(key, Factory); - - // Assert - channel1.Should().BeSameAs(channel2); - factoryCallCount.Should().Be(1, "factory should only be called once for the same key"); - } - - [Fact] - public void GetOrCreate_DifferentKeys_ReturnsDifferentChannels() - { - // Arrange - using GrpcChannelCache cache = new(); - string key1 = "key1"; - string key2 = "key2"; - - // Act - GrpcChannel channel1 = cache.GetOrCreate(key1, () => GrpcChannel.ForAddress(TestEndpoint)); - GrpcChannel channel2 = cache.GetOrCreate(key2, () => GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - channel1.Should().NotBeSameAs(channel2); - } - - [Fact] - public void GetOrCreate_ConcurrentAccess_CreatesSingleChannel() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "concurrent-key"; - int factoryCallCount = 0; - object countLock = new(); - GrpcChannel Factory() - { - lock (countLock) - { - factoryCallCount++; - } - - // Add small delay to increase chance of race conditions - Thread.Sleep(10); - return GrpcChannel.ForAddress(TestEndpoint); - } - - // Act - GrpcChannel[] channels = new GrpcChannel[10]; - Parallel.For(0, 10, i => - { - channels[i] = cache.GetOrCreate(key, Factory); - }); - - // Assert - factoryCallCount.Should().Be(1, "factory should only be called once even with concurrent access"); - channels.All(c => ReferenceEquals(c, channels[0])).Should().BeTrue("all channels should be the same instance"); - } - - [Fact] - public void Replace_ExistingChannel_DisposesOldChannel() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "replace-key"; - GrpcChannel oldChannel = GrpcChannel.ForAddress(TestEndpoint); - GrpcChannel newChannel = GrpcChannel.ForAddress(TestEndpoint); - cache.GetOrCreate(key, () => oldChannel); - - // Act - cache.Replace(key, newChannel); - GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); - - // Assert - retrievedChannel.Should().BeSameAs(newChannel); - retrievedChannel.Should().NotBeSameAs(oldChannel); - } - - [Fact] - public void Replace_SameChannel_DoesNothing() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "same-channel-key"; - GrpcChannel channel = GrpcChannel.ForAddress(TestEndpoint); - cache.GetOrCreate(key, () => channel); - - // Act & Assert - should not throw or change anything - cache.Replace(key, channel); - GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); - retrievedChannel.Should().BeSameAs(channel); - } - - [Fact] - public void Replace_NonExistingKey_AddsChannel() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "new-key"; - GrpcChannel channel = GrpcChannel.ForAddress(TestEndpoint); - - // Act - cache.Replace(key, channel); - GrpcChannel retrievedChannel = cache.GetOrCreate(key, () => throw new InvalidOperationException("Should not be called")); - - // Assert - retrievedChannel.Should().BeSameAs(channel); - } - - [Fact] - public void TryRemove_ExistingKey_RemovesAndReturnsTrue() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "remove-key"; - cache.GetOrCreate(key, () => GrpcChannel.ForAddress(TestEndpoint)); - - // Act - bool result = cache.TryRemove(key); - - // Assert - result.Should().BeTrue(); - - // Verify the key is removed by checking that a new channel is created - int factoryCallCount = 0; - cache.GetOrCreate(key, () => - { - factoryCallCount++; - return GrpcChannel.ForAddress(TestEndpoint); - }); - factoryCallCount.Should().Be(1, "a new channel should be created after removal"); - } - - [Fact] - public void TryRemove_NonExistingKey_ReturnsFalse() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "non-existing-key"; - - // Act - bool result = cache.TryRemove(key); - - // Assert - result.Should().BeFalse(); - } - - [Fact] - public void Dispose_DisposesAllChannels() - { - // Arrange - GrpcChannelCache cache = new(); - cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); - cache.GetOrCreate("key2", () => GrpcChannel.ForAddress(TestEndpoint)); - cache.GetOrCreate("key3", () => GrpcChannel.ForAddress(TestEndpoint)); - - // Act - cache.Dispose(); - - // Assert - attempting to use the cache after disposal should throw - Action action = () => cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); - action.Should().Throw(); - } - - [Fact] - public void Dispose_MultipleCalls_DoesNotThrow() - { - // Arrange - GrpcChannelCache cache = new(); - cache.GetOrCreate("key1", () => GrpcChannel.ForAddress(TestEndpoint)); - - // Act & Assert - multiple dispose calls should not throw - cache.Dispose(); - cache.Dispose(); - cache.Dispose(); - } - - [Fact] - public void GetOrCreate_AfterDispose_ThrowsObjectDisposedException() - { - // Arrange - GrpcChannelCache cache = new(); - cache.Dispose(); - - // Act - Action action = () => cache.GetOrCreate("key", () => GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw() - .WithMessage("*GrpcChannelCache*"); - } - - [Fact] - public void Replace_AfterDispose_ThrowsObjectDisposedException() - { - // Arrange - GrpcChannelCache cache = new(); - cache.Dispose(); - - // Act - Action action = () => cache.Replace("key", GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw() - .WithMessage("*GrpcChannelCache*"); - } - - [Fact] - public void GetOrCreate_WithNullKey_ThrowsArgumentException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.GetOrCreate(null!, () => GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void GetOrCreate_WithEmptyKey_ThrowsArgumentException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.GetOrCreate(string.Empty, () => GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void GetOrCreate_WithNullFactory_ThrowsArgumentNullException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.GetOrCreate("key", null!); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void Replace_WithNullKey_ThrowsArgumentException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.Replace(null!, GrpcChannel.ForAddress(TestEndpoint)); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void Replace_WithNullChannel_ThrowsArgumentNullException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.Replace("key", null!); - - // Assert - action.Should().Throw(); - } - - [Fact] - public void TryRemove_WithNullKey_ThrowsArgumentException() - { - // Arrange - using GrpcChannelCache cache = new(); - - // Act - Action action = () => cache.TryRemove(null!); - - // Assert - action.Should().Throw(); - } - - /// - /// This test verifies the core fix for the handle leak issue. - /// Without the cache, each call to configure options would create a new GrpcChannel, - /// causing handle count to grow unbounded when the service is unreachable. - /// With the cache, repeated calls reuse the same channel, preventing handle leaks. - /// - [Fact] - public void GetOrCreate_SimulatesRetryScenario_DoesNotCreateMultipleChannels() - { - // Arrange - using GrpcChannelCache cache = new(); - string key = "client:default:myendpoint.durabletask.io:myhub"; - int factoryCallCount = 0; - - GrpcChannel CreateChannel() - { - factoryCallCount++; - // Each GrpcChannel creates HttpClient + SocketsHttpHandler internally, - // which allocates socket handles. Without caching, this would leak handles. - return GrpcChannel.ForAddress(TestEndpoint); - } - - // Act - Simulate what happens during retries when service is unreachable: - // The options configuration callback may be invoked multiple times - const int retryAttempts = 100; - GrpcChannel[] channels = new GrpcChannel[retryAttempts]; - for (int i = 0; i < retryAttempts; i++) - { - channels[i] = cache.GetOrCreate(key, CreateChannel); - } - - // Assert - The factory should only be called ONCE, not 100 times - // This is the key behavior that prevents handle accumulation - factoryCallCount.Should().Be(1, - "the channel factory should only be called once regardless of how many times GetOrCreate is called - " + - "this is what prevents handle leaks when the service is unreachable"); - - // All returned channels should be the exact same instance - channels.All(c => ReferenceEquals(c, channels[0])).Should().BeTrue( - "all calls should return the same cached channel instance"); - } - - /// - /// Verifies that the old behavior (without cache) would create multiple channels. - /// This demonstrates what the cache prevents. - /// - [Fact] - public void WithoutCache_MultipleCallsCreateMultipleChannels() - { - // Arrange - simulate old behavior without cache - int factoryCallCount = 0; - List channels = new(); - - GrpcChannel CreateChannelWithoutCache() - { - factoryCallCount++; - return GrpcChannel.ForAddress(TestEndpoint); - } - - // Act - Without caching, each "retry" creates a new channel - const int retryAttempts = 10; - for (int i = 0; i < retryAttempts; i++) - { - // This simulates the OLD behavior before the fix - channels.Add(CreateChannelWithoutCache()); - } - - // Assert - Each call creates a new channel (the problematic behavior we fixed) - factoryCallCount.Should().Be(retryAttempts, - "without caching, each call creates a new channel - this causes handle leaks"); - - // All channels are different instances - channels.Distinct().Count().Should().Be(retryAttempts, - "without caching, each channel is a unique instance with its own handles"); - - // Cleanup - foreach (var channel in channels) - { - channel.Dispose(); - } - } - - /// - /// Verifies channels are properly disposed when the cache is disposed, - /// which releases the associated handles. - /// - [Fact] - public async Task Dispose_ReleasesChannelResources() - { - // Arrange - GrpcChannelCache cache = new(); - List createdChannels = new(); - - // Create multiple channels through the cache - for (int i = 0; i < 5; i++) - { - string key = $"key{i}"; - cache.GetOrCreate(key, () => - { - GrpcChannel c = GrpcChannel.ForAddress(TestEndpoint); - createdChannels.Add(c); - return c; - }); - } - - createdChannels.Count.Should().Be(5); - - // Act - Dispose the cache (this should dispose all channels) - cache.Dispose(); - - // Wait a bit for async disposal to complete - await Task.Delay(100); - - // Assert - The cache should be disposed and unusable - Action action = () => cache.GetOrCreate("new-key", () => GrpcChannel.ForAddress(TestEndpoint)); - action.Should().Throw(); - } -} diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index 1510c7e56..370924a4c 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -251,5 +251,40 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() options2.Channel.Should().NotBeNull(); options1.Channel.Should().NotBeSameAs(options2.Channel, "different configurations should use different channels"); } + + [Fact] + public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + + // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels + provider.Dispose(); + + // Assert - after disposal, creating a new provider and getting options should work + // (this verifies the old provider was properly cleaned up) + ServiceCollection services2 = new ServiceCollection(); + Mock mockBuilder2 = new Mock(); + mockBuilder2.Setup(b => b.Services).Returns(services2); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider2 = services2.BuildServiceProvider(); + + IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); + GrpcDurableTaskWorkerOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); + newOptions.Channel.Should().NotBeNull(); + newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + } } From 19f521f1371672458deab4ff0a1a9fc5b130131c Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Mon, 26 Jan 2026 18:10:06 -0800 Subject: [PATCH 04/13] Address remaining CodeQL comments - Use LINQ Where() instead of if inside foreach for filtering channels - Narrow catch (Exception) to specific types (OperationCanceledException, ObjectDisposedException) --- .../DurableTaskSchedulerClientExtensions.cs | 12 +++++------- .../DurableTaskSchedulerWorkerExtensions.cs | 12 +++++------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index f57458c00..c92fc15fa 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.Collections.Concurrent; +using System.Linq; using Azure.Core; using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; @@ -162,12 +163,9 @@ public void Dispose() this.disposed = true; - foreach (KeyValuePair> kvp in this.channels) + foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) { - if (kvp.Value.IsValueCreated) - { - DisposeChannel(kvp.Value.Value); - } + DisposeChannel(channel.Value); } this.channels.Clear(); @@ -185,9 +183,9 @@ static void DisposeChannel(GrpcChannel channel) { await channel.ShutdownAsync(); } - catch (Exception) + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) { - // Ignore shutdown errors during disposal + // Ignore expected shutdown/disposal errors } } }); diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 21d5137f2..42d5f4f70 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.Collections.Concurrent; +using System.Linq; using Azure.Core; using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; @@ -165,12 +166,9 @@ public void Dispose() this.disposed = true; - foreach (KeyValuePair> kvp in this.channels) + foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) { - if (kvp.Value.IsValueCreated) - { - DisposeChannel(kvp.Value.Value); - } + DisposeChannel(channel.Value); } this.channels.Clear(); @@ -188,9 +186,9 @@ static void DisposeChannel(GrpcChannel channel) { await channel.ShutdownAsync(); } - catch (Exception) + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) { - // Ignore shutdown errors during disposal + // Ignore expected shutdown/disposal errors } } }); From 9b6703204dc25f5c88d9414a02c22a0ea664de27 Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Tue, 27 Jan 2026 14:45:13 -0800 Subject: [PATCH 05/13] Refactor gRPC channel disposal to use IAsyncDisposable and improve exception handling --- .../DurableTaskSchedulerClientExtensions.cs | 53 +++++++++++-------- .../DurableTaskSchedulerWorkerExtensions.cs | 53 +++++++++++-------- ...rableTaskSchedulerClientExtensionsTests.cs | 41 +++++++++++--- ...rableTaskSchedulerWorkerExtensionsTests.cs | 41 +++++++++++--- 4 files changed, 132 insertions(+), 56 deletions(-) diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index c92fc15fa..d2f81fcd5 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -104,11 +104,11 @@ static void ConfigureSchedulerOptions( /// using the provided Durable Task Scheduler options. /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IAsyncDisposable { readonly IOptionsMonitor schedulerOptions; readonly ConcurrentDictionary> channels = new(); - volatile bool disposed; + int disposed; /// /// Initializes a new instance of the class. @@ -133,9 +133,9 @@ public ConfigureGrpcChannel(IOptionsMonitor s public void Configure(string? name, GrpcDurableTaskClientOptions options) { #if NET7_0_OR_GREATER - ObjectDisposedException.ThrowIf(this.disposed, this); + ObjectDisposedException.ThrowIf(this.disposed == 1, this); #else - if (this.disposed) + if (this.disposed == 1) { throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); } @@ -154,41 +154,50 @@ public void Configure(string? name, GrpcDurableTaskClientOptions options) } /// - public void Dispose() + public async ValueTask DisposeAsync() { - if (this.disposed) + if (Interlocked.Exchange(ref this.disposed, 1) == 1) { return; } - this.disposed = true; - + List? exceptions = null; foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) { - DisposeChannel(channel.Value); + try + { + await DisposeChannelAsync(channel.Value).ConfigureAwait(false); + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } } this.channels.Clear(); + GC.SuppressFinalize(this); + + if (exceptions is { Count: > 0 }) + { + throw new AggregateException(exceptions); + } } - static void DisposeChannel(GrpcChannel channel) + static async Task DisposeChannelAsync(GrpcChannel channel) { // ShutdownAsync is the graceful way to close a gRPC channel. - // Fire-and-forget but ensure the channel is eventually disposed. - _ = Task.Run(async () => + using (channel) { - using (channel) + try { - try - { - await channel.ShutdownAsync(); - } - catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) - { - // Ignore expected shutdown/disposal errors - } + await channel.ShutdownAsync().ConfigureAwait(false); } - }); + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) + { + // Ignore expected shutdown/disposal errors + } + } } } } diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 42d5f4f70..d2534f9be 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -106,11 +106,11 @@ static void ConfigureSchedulerOptions( /// using the provided Durable Task Scheduler options. /// Channels are cached per configuration key and disposed when the service provider is disposed. /// - sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IDisposable + sealed class ConfigureGrpcChannel : IConfigureNamedOptions, IAsyncDisposable { readonly IOptionsMonitor schedulerOptions; readonly ConcurrentDictionary> channels = new(); - volatile bool disposed; + int disposed; /// /// Initializes a new instance of the class. @@ -135,9 +135,9 @@ public ConfigureGrpcChannel(IOptionsMonitor s public void Configure(string? name, GrpcDurableTaskWorkerOptions options) { #if NET7_0_OR_GREATER - ObjectDisposedException.ThrowIf(this.disposed, this); + ObjectDisposedException.ThrowIf(this.disposed == 1, this); #else - if (this.disposed) + if (this.disposed == 1) { throw new ObjectDisposedException(nameof(ConfigureGrpcChannel)); } @@ -157,41 +157,50 @@ public void Configure(string? name, GrpcDurableTaskWorkerOptions options) } /// - public void Dispose() + public async ValueTask DisposeAsync() { - if (this.disposed) + if (Interlocked.Exchange(ref this.disposed, 1) == 1) { return; } - this.disposed = true; - + List? exceptions = null; foreach (Lazy channel in this.channels.Values.Where(lazy => lazy.IsValueCreated)) { - DisposeChannel(channel.Value); + try + { + await DisposeChannelAsync(channel.Value).ConfigureAwait(false); + } + catch (Exception ex) + { + exceptions ??= new List(); + exceptions.Add(ex); + } } this.channels.Clear(); + GC.SuppressFinalize(this); + + if (exceptions is { Count: > 0 }) + { + throw new AggregateException(exceptions); + } } - static void DisposeChannel(GrpcChannel channel) + static async Task DisposeChannelAsync(GrpcChannel channel) { // ShutdownAsync is the graceful way to close a gRPC channel. - // Fire-and-forget but ensure the channel is eventually disposed. - _ = Task.Run(async () => + using (channel) { - using (channel) + try { - try - { - await channel.ShutdownAsync(); - } - catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) - { - // Ignore expected shutdown/disposal errors - } + await channel.ShutdownAsync().ConfigureAwait(false); } - }); + catch (Exception ex) when (ex is OperationCanceledException or ObjectDisposedException) + { + // Ignore expected shutdown/disposal errors + } + } } } } diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 2b7fbdd22..e64485637 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -5,6 +5,7 @@ using Azure.Identity; using FluentAssertions; using Grpc.Core; +using Grpc.Net.Client; using Microsoft.DurableTask.Client.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -335,7 +336,7 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() } [Fact] - public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -351,21 +352,49 @@ public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); GrpcDurableTaskClientOptions options = optionsMonitor.Get(Options.DefaultName); options.Channel.Should().NotBeNull(); + GrpcChannel channel = options.Channel!; // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels - provider.Dispose(); + await provider.DisposeAsync(); - // Assert - after disposal, creating a new provider and getting options should work - // (this verifies the old provider was properly cleaned up) + // Assert - verify the channel was disposed by checking it throws ObjectDisposedException + Action action = () => channel.CreateCallInvoker(); + action.Should().Throw("channel should be disposed after provider disposal"); + + // Also verify that creating a new provider and getting options still works ServiceCollection services2 = new ServiceCollection(); Mock mockBuilder2 = new Mock(); mockBuilder2.Setup(b => b.Services).Returns(services2); mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - ServiceProvider provider2 = services2.BuildServiceProvider(); + await using ServiceProvider provider2 = services2.BuildServiceProvider(); IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); GrpcDurableTaskClientOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); newOptions.Channel.Should().NotBeNull(); - newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + newOptions.Channel.Should().NotBeSameAs(channel, "new provider should create a new channel"); + } + + [Fact] + public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisposedException() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options monitor before disposal + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + + // Dispose the service provider + await provider.DisposeAsync(); + + // Assert - attempting to get options after disposal should throw + Action action = () => optionsMonitor.Get(Options.DefaultName); + action.Should().Throw("configuring options after disposal should throw"); } } diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index 370924a4c..591ffaebf 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -4,6 +4,7 @@ using Azure.Core; using Azure.Identity; using FluentAssertions; +using Grpc.Net.Client; using Microsoft.DurableTask.Worker.Grpc; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; @@ -253,7 +254,7 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() } [Fact] - public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() + public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -269,22 +270,50 @@ public void UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannels() IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName); options.Channel.Should().NotBeNull(); + GrpcChannel channel = options.Channel!; // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels - provider.Dispose(); + await provider.DisposeAsync(); - // Assert - after disposal, creating a new provider and getting options should work - // (this verifies the old provider was properly cleaned up) + // Assert - verify the channel was disposed by checking it throws ObjectDisposedException + Action action = () => channel.CreateCallInvoker(); + action.Should().Throw("channel should be disposed after provider disposal"); + + // Also verify that creating a new provider and getting options still works ServiceCollection services2 = new ServiceCollection(); Mock mockBuilder2 = new Mock(); mockBuilder2.Setup(b => b.Services).Returns(services2); mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - ServiceProvider provider2 = services2.BuildServiceProvider(); + await using ServiceProvider provider2 = services2.BuildServiceProvider(); IOptionsMonitor newOptionsMonitor = provider2.GetRequiredService>(); GrpcDurableTaskWorkerOptions newOptions = newOptionsMonitor.Get(Options.DefaultName); newOptions.Channel.Should().NotBeNull(); - newOptions.Channel.Should().NotBeSameAs(options.Channel, "new provider should create a new channel"); + newOptions.Channel.Should().NotBeSameAs(channel, "new provider should create a new channel"); + } + + [Fact] + public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisposedException() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder = new Mock(); + mockBuilder.Setup(b => b.Services).Returns(services); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act + mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); + ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options monitor before disposal + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + + // Dispose the service provider + await provider.DisposeAsync(); + + // Assert - attempting to get options after disposal should throw + Action action = () => optionsMonitor.Get(Options.DefaultName); + action.Should().Throw("configuring options after disposal should throw"); } } From f3079028cf31425edb1357fd88d2cfb30889e6d7 Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Wed, 28 Jan 2026 07:36:43 -0800 Subject: [PATCH 06/13] Update test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../DurableTaskSchedulerClientExtensionsTests.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index e64485637..09f79427f 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -293,12 +293,12 @@ public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() // Act mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - ServiceProvider provider = services.BuildServiceProvider(); + using ServiceProvider provider = services.BuildServiceProvider(); // Resolve options multiple times to trigger channel configuration - IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); - GrpcDurableTaskClientOptions options1 = optionsMonitor.Get(Options.DefaultName); - GrpcDurableTaskClientOptions options2 = optionsMonitor.Get(Options.DefaultName); + IOptionsFactory optionsFactory = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsFactory.Create(Options.DefaultName); + GrpcDurableTaskClientOptions options2 = optionsFactory.Create(Options.DefaultName); // Assert options1.Channel.Should().NotBeNull(); From 95bc2d027e768df059e3fe4821eff0a5c561ef4b Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Wed, 28 Jan 2026 07:37:03 -0800 Subject: [PATCH 07/13] Update src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../AzureManaged/DurableTaskSchedulerWorkerExtensions.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index d2534f9be..d409ca274 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -149,7 +149,8 @@ public void Configure(string? name, GrpcDurableTaskWorkerOptions options) // Create a cache key based on the options name, endpoint, and task hub. // This ensures channels are reused for the same configuration // but separate channels are created for different configurations. - string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + // Use a delimiter character (\u001F) that will not appear in typical endpoint URIs. + string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}"; options.Channel = this.channels.GetOrAdd( cacheKey, _ => new Lazy(source.CreateChannel)).Value; From a585f2c06ead1f919fdcde262f8fe46a165a2b4d Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Wed, 28 Jan 2026 07:37:40 -0800 Subject: [PATCH 08/13] Update test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../DurableTaskSchedulerWorkerExtensionsTests.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index 591ffaebf..e503cc594 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -237,10 +237,10 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() mockBuilder2.Setup(b => b.Name).Returns("worker2"); DefaultAzureCredential credential = new DefaultAzureCredential(); - // Act - configure two different named workers with different endpoints - mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); - mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); - ServiceProvider provider = services.BuildServiceProvider(); + // Act - configure two different named workers with the same endpoint and task hub + mockBuilder1.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential); + mockBuilder2.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential); + using ServiceProvider provider = services.BuildServiceProvider(); // Resolve options for both named workers IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); From b6630f2f96cdade61121b440fae103e82434ebc4 Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Wed, 28 Jan 2026 07:37:56 -0800 Subject: [PATCH 09/13] Update test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../DurableTaskSchedulerWorkerExtensionsTests.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index e503cc594..9695391ea 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -211,12 +211,12 @@ public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() // Act mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - ServiceProvider provider = services.BuildServiceProvider(); + using ServiceProvider provider = services.BuildServiceProvider(); - // Resolve options multiple times to trigger channel configuration - IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); - GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get(Options.DefaultName); - GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get(Options.DefaultName); + // Resolve options multiple times to trigger channel configuration via new options instances + IOptionsFactory optionsFactory = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsFactory.Create(Options.DefaultName); + GrpcDurableTaskWorkerOptions options2 = optionsFactory.Create(Options.DefaultName); // Assert options1.Channel.Should().NotBeNull(); From 5ec4cd9dae8b291942dc2b25607931c6aa149c5e Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 08:01:49 -0800 Subject: [PATCH 10/13] Fix gRPC channel cache key to include all channel-affecting properties (#626) --- .../DurableTaskSchedulerClientExtensions.cs | 11 +- .../DurableTaskSchedulerWorkerExtensions.cs | 7 +- ...rableTaskSchedulerClientExtensionsTests.cs | 147 +++++++++++++++++- ...rableTaskSchedulerWorkerExtensionsTests.cs | 141 ++++++++++++++++- 4 files changed, 292 insertions(+), 14 deletions(-) diff --git a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs index d2f81fcd5..5cd4fc94d 100644 --- a/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs +++ b/src/Client/AzureManaged/DurableTaskSchedulerClientExtensions.cs @@ -144,10 +144,15 @@ public void Configure(string? name, GrpcDurableTaskClientOptions options) string optionsName = name ?? Options.DefaultName; DurableTaskSchedulerClientOptions source = this.schedulerOptions.Get(optionsName); - // Create a cache key based on the options name, endpoint, and task hub. + // Create a cache key that includes all properties that affect CreateChannel behavior. // This ensures channels are reused for the same configuration - // but separate channels are created for different configurations. - string cacheKey = $"{optionsName}:{source.EndpointAddress}:{source.TaskHubName}"; + // but separate channels are created when any relevant property changes. + // Use a delimiter character (\u001F) that will not appear in typical endpoint URIs. + string credentialType = source.Credential?.GetType().FullName ?? "null"; + string retryOptionsKey = source.RetryOptions != null + ? $"{source.RetryOptions.MaxRetries}|{source.RetryOptions.InitialBackoffMs}|{source.RetryOptions.MaxBackoffMs}|{source.RetryOptions.BackoffMultiplier}|{(source.RetryOptions.RetryableStatusCodes != null ? string.Join(",", source.RetryOptions.RetryableStatusCodes) : string.Empty)}" + : "null"; + string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{retryOptionsKey}"; options.Channel = this.channels.GetOrAdd( cacheKey, _ => new Lazy(source.CreateChannel)).Value; diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index d409ca274..946bc4ed7 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -146,11 +146,12 @@ public void Configure(string? name, GrpcDurableTaskWorkerOptions options) string optionsName = name ?? Options.DefaultName; DurableTaskSchedulerWorkerOptions source = this.schedulerOptions.Get(optionsName); - // Create a cache key based on the options name, endpoint, and task hub. + // Create a cache key that includes all properties that affect CreateChannel behavior. // This ensures channels are reused for the same configuration - // but separate channels are created for different configurations. + // but separate channels are created when any relevant property changes. // Use a delimiter character (\u001F) that will not appear in typical endpoint URIs. - string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}"; + string credentialType = source.Credential?.GetType().FullName ?? "null"; + string cacheKey = $"{optionsName}\u001F{source.EndpointAddress}\u001F{source.TaskHubName}\u001F{source.ResourceId}\u001F{credentialType}\u001F{source.AllowInsecureCredentials}\u001F{source.WorkerId}"; options.Channel = this.channels.GetOrAdd( cacheKey, _ => new Lazy(source.CreateChannel)).Value; diff --git a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs index 09f79427f..1fe5b9f6f 100644 --- a/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs +++ b/test/Client/AzureManaged.Tests/DurableTaskSchedulerClientExtensionsTests.cs @@ -283,7 +283,7 @@ public void UseDurableTaskScheduler_WithConnectionStringAndRetryOptions_ShouldCo } [Fact] - public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + public async Task UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -293,7 +293,7 @@ public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() // Act mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - using ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); // Resolve options multiple times to trigger channel configuration IOptionsFactory optionsFactory = provider.GetRequiredService>(); @@ -307,7 +307,7 @@ public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() } [Fact] - public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + public async Task UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -322,7 +322,7 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() // Act - configure two different named clients with different endpoints mockBuilder1.Object.UseDurableTaskScheduler("endpoint1.westus3.durabletask.io", ValidTaskHub, credential); mockBuilder2.Object.UseDurableTaskScheduler("endpoint2.westus3.durabletask.io", ValidTaskHub, credential); - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); // Resolve options for both named clients IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); @@ -397,4 +397,143 @@ public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisp Action action = () => optionsMonitor.Get(Options.DefaultName); action.Should().Throw("configuring options after disposal should throw"); } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentResourceId_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two clients with the same endpoint/taskhub but different ResourceId + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.ResourceId = "https://durabletask.io"; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.ResourceId = "https://custom.durabletask.io"; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different ResourceId should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentCredentialType_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + + // Act - configure two clients with the same endpoint/taskhub but different credential types + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, new DefaultAzureCredential()); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, new AzureCliCredential()); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different credential type should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentAllowInsecureCredentials_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two clients with the same endpoint/taskhub but different AllowInsecureCredentials + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.AllowInsecureCredentials = false; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.AllowInsecureCredentials = true; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different AllowInsecureCredentials should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentRetryOptions_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("client1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("client2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two clients with the same endpoint/taskhub but different RetryOptions + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.RetryOptions = new DurableTaskSchedulerClientOptions.ClientRetryOptions + { + MaxRetries = 3 + }; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.RetryOptions = new DurableTaskSchedulerClientOptions.ClientRetryOptions + { + MaxRetries = 5 + }; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named clients + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskClientOptions options1 = optionsMonitor.Get("client1"); + GrpcDurableTaskClientOptions options2 = optionsMonitor.Get("client2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different RetryOptions should use different channels"); + } } diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index 9695391ea..1db552db2 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -201,7 +201,7 @@ public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() } [Fact] - public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() + public async Task UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -211,7 +211,7 @@ public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() // Act mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - using ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); // Resolve options multiple times to trigger channel configuration via new options instances IOptionsFactory optionsFactory = provider.GetRequiredService>(); @@ -225,7 +225,7 @@ public void UseDurableTaskScheduler_SameConfiguration_ReusesSameChannel() } [Fact] - public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() + public async Task UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -240,7 +240,7 @@ public void UseDurableTaskScheduler_DifferentNamedOptions_UsesSeparateChannels() // Act - configure two different named workers with the same endpoint and task hub mockBuilder1.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential); mockBuilder2.Object.UseDurableTaskScheduler("endpoint.westus3.durabletask.io", ValidTaskHub, credential); - using ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); // Resolve options for both named workers IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); @@ -315,5 +315,138 @@ public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisp Action action = () => optionsMonitor.Get(Options.DefaultName); action.Should().Throw("configuring options after disposal should throw"); } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentResourceId_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two workers with the same endpoint/taskhub but different ResourceId + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.ResourceId = "https://durabletask.io"; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.ResourceId = "https://custom.durabletask.io"; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different ResourceId should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentCredentialType_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + + // Act - configure two workers with the same endpoint/taskhub but different credential types + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, new DefaultAzureCredential()); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, new AzureCliCredential()); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different credential type should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentAllowInsecureCredentials_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two workers with the same endpoint/taskhub but different AllowInsecureCredentials + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.AllowInsecureCredentials = false; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.AllowInsecureCredentials = true; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different AllowInsecureCredentials should use different channels"); + } + + [Fact] + public async Task UseDurableTaskScheduler_DifferentWorkerId_UsesSeparateChannels() + { + // Arrange + ServiceCollection services = new ServiceCollection(); + Mock mockBuilder1 = new Mock(); + Mock mockBuilder2 = new Mock(); + mockBuilder1.Setup(b => b.Services).Returns(services); + mockBuilder1.Setup(b => b.Name).Returns("worker1"); + mockBuilder2.Setup(b => b.Services).Returns(services); + mockBuilder2.Setup(b => b.Name).Returns("worker2"); + DefaultAzureCredential credential = new DefaultAzureCredential(); + + // Act - configure two workers with the same endpoint/taskhub but different WorkerId + mockBuilder1.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.WorkerId = "worker-id-1"; + }); + mockBuilder2.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential, options => + { + options.WorkerId = "worker-id-2"; + }); + await using ServiceProvider provider = services.BuildServiceProvider(); + + // Resolve options for both named workers + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options1 = optionsMonitor.Get("worker1"); + GrpcDurableTaskWorkerOptions options2 = optionsMonitor.Get("worker2"); + + // Assert + options1.Channel.Should().NotBeNull(); + options2.Channel.Should().NotBeNull(); + options1.Channel.Should().NotBeSameAs(options2.Channel, "different WorkerId should use different channels"); + } } From 609df5d671cc6b43cc8bd75ff58913a58cf2428a Mon Sep 17 00:00:00 2001 From: Tomer Rosenthal Date: Wed, 28 Jan 2026 08:17:23 -0800 Subject: [PATCH 11/13] Potential fix for pull request finding 'Generic catch clause' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs index 946bc4ed7..985d010ed 100644 --- a/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs +++ b/src/Worker/AzureManaged/DurableTaskSchedulerWorkerExtensions.cs @@ -173,7 +173,7 @@ public async ValueTask DisposeAsync() { await DisposeChannelAsync(channel.Value).ConfigureAwait(false); } - catch (Exception ex) + catch (Exception ex) when (ex is not OperationCanceledException and not ObjectDisposedException) { exceptions ??= new List(); exceptions.Add(ex); From d5d1151f1bf8ce05b0df6ba6030b552867129da0 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 08:31:18 -0800 Subject: [PATCH 12/13] Fix resource leaks in DurableTaskSchedulerWorkerExtensionsTests (#627) --- ...rableTaskSchedulerWorkerExtensionsTests.cs | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs index 1db552db2..6e08347ee 100644 --- a/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs +++ b/test/Worker/AzureManaged.Tests/DurableTaskSchedulerWorkerExtensionsTests.cs @@ -19,7 +19,7 @@ public class DurableTaskSchedulerWorkerExtensionsTests const string ValidTaskHub = "testhub"; [Fact] - public void UseDurableTaskScheduler_WithEndpointAndCredential_ShouldConfigureCorrectly() + public async Task UseDurableTaskScheduler_WithEndpointAndCredential_ShouldConfigureCorrectly() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -31,7 +31,7 @@ public void UseDurableTaskScheduler_WithEndpointAndCredential_ShouldConfigureCor mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); // Assert - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); IOptions? options = provider.GetService>(); options.Should().NotBeNull(); @@ -45,7 +45,7 @@ public void UseDurableTaskScheduler_WithEndpointAndCredential_ShouldConfigureCor } [Fact] - public void UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectly() + public async Task UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectly() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -57,7 +57,7 @@ public void UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectl mockBuilder.Object.UseDurableTaskScheduler(connectionString); // Assert - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); IOptions? options = provider.GetService>(); options.Should().NotBeNull(); @@ -71,7 +71,7 @@ public void UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectl } [Fact] - public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigureCorrectly() + public async Task UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigureCorrectly() { // Arrange ServiceCollection services = new(); @@ -83,7 +83,7 @@ public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigur mockBuilder.Object.UseDurableTaskScheduler(connectionString); // Assert - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); IOptions? options = provider.GetService>(); options.Should().NotBeNull(); @@ -99,7 +99,7 @@ public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigur [Theory] [InlineData(null, "testhub")] [InlineData("myaccount.westus3.durabletask.io", null)] - public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidationException(string? endpoint, string? taskHub) + public async Task UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidationException(string? endpoint, string? taskHub) { // Arrange ServiceCollection services = new ServiceCollection(); @@ -109,7 +109,7 @@ public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidat // Act mockBuilder.Object.UseDurableTaskScheduler(endpoint!, taskHub!, credential); - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); // Assert var action = () => provider.GetRequiredService>().Value; @@ -120,7 +120,7 @@ public void UseDurableTaskScheduler_WithNullParameters_ShouldThrowOptionsValidat } [Fact] - public void UseDurableTaskScheduler_WithNullCredential_ShouldSucceed() + public async Task UseDurableTaskScheduler_WithNullCredential_ShouldSucceed() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -133,7 +133,7 @@ public void UseDurableTaskScheduler_WithNullCredential_ShouldSucceed() action.Should().NotThrow(); // Validate the configured options - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); var workerOptions = provider.GetRequiredService>().Value; workerOptions.EndpointAddress.Should().Be(ValidEndpoint); workerOptions.TaskHubName.Should().Be(ValidTaskHub); @@ -175,7 +175,7 @@ public void UseDurableTaskScheduler_WithNullOrEmptyConnectionString_ShouldThrowA } [Fact] - public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() + public async Task UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() { // Arrange ServiceCollection services = new ServiceCollection(); @@ -188,7 +188,7 @@ public void UseDurableTaskScheduler_WithNamedOptions_ShouldConfigureCorrectly() mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); // Assert - ServiceProvider provider = services.BuildServiceProvider(); + await using ServiceProvider provider = services.BuildServiceProvider(); IOptionsMonitor? optionsMonitor = provider.GetService>(); optionsMonitor.Should().NotBeNull(); DurableTaskSchedulerWorkerOptions options = optionsMonitor!.Get("CustomName"); @@ -264,16 +264,16 @@ public async Task UseDurableTaskScheduler_ServiceProviderDispose_DisposesChannel // Act mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - ServiceProvider provider = services.BuildServiceProvider(); - - // Resolve options to trigger channel creation - IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); - GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName); - options.Channel.Should().NotBeNull(); - GrpcChannel channel = options.Channel!; - - // Dispose the service provider - this should dispose the ConfigureGrpcChannel which disposes channels - await provider.DisposeAsync(); + + GrpcChannel channel; + await using (ServiceProvider provider = services.BuildServiceProvider()) + { + // Resolve options to trigger channel creation + IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); + GrpcDurableTaskWorkerOptions options = optionsMonitor.Get(Options.DefaultName); + options.Channel.Should().NotBeNull(); + channel = options.Channel!; + } // Assert - verify the channel was disposed by checking it throws ObjectDisposedException Action action = () => channel.CreateCallInvoker(); @@ -303,13 +303,13 @@ public async Task UseDurableTaskScheduler_ConfigureAfterDispose_ThrowsObjectDisp // Act mockBuilder.Object.UseDurableTaskScheduler(ValidEndpoint, ValidTaskHub, credential); - ServiceProvider provider = services.BuildServiceProvider(); - - // Resolve options monitor before disposal - IOptionsMonitor optionsMonitor = provider.GetRequiredService>(); - - // Dispose the service provider - await provider.DisposeAsync(); + + IOptionsMonitor optionsMonitor; + await using (ServiceProvider provider = services.BuildServiceProvider()) + { + // Resolve options monitor before disposal + optionsMonitor = provider.GetRequiredService>(); + } // Assert - attempting to get options after disposal should throw Action action = () => optionsMonitor.Get(Options.DefaultName); From 9a005e59a848c61e2e7940f4fff5d070af32232a Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 10:06:53 -0800 Subject: [PATCH 13/13] Initial plan (#628)