Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## vNext

- Added new preview packages, `Microsoft.DurableTask.Client.AzureManaged` and `Microsoft.DurableTask.Worker.AzureManaged`

### Microsoft.DurableTask.Client

- Add new `IDurableTaskClientBuilder AddDurableTaskClient(IServiceCollection, string?)` API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static void UseDurableTaskScheduler(
options.EndpointAddress = connectionOptions.EndpointAddress;
options.TaskHubName = connectionOptions.TaskHubName;
options.Credential = connectionOptions.Credential;
options.AllowInsecureCredentials = connectionOptions.AllowInsecureCredentials;
},
configure);
}
Expand Down
9 changes: 7 additions & 2 deletions src/Client/AzureManaged/DurableTaskSchedulerClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,17 @@ public static DurableTaskSchedulerClientOptions FromConnectionString(string conn
/// <param name="connectionString">The connection string to parse.</param>
/// <returns>A new instance of <see cref="DurableTaskSchedulerClientOptions"/>.</returns>
internal static DurableTaskSchedulerClientOptions FromConnectionString(
DurableTaskSchedulerConnectionString connectionString) => new()
DurableTaskSchedulerConnectionString connectionString)
{
TokenCredential? credential = GetCredentialFromConnectionString(connectionString);
return new DurableTaskSchedulerClientOptions()
{
EndpointAddress = connectionString.Endpoint,
TaskHubName = connectionString.TaskHubName,
Credential = GetCredentialFromConnectionString(connectionString),
Credential = credential,
AllowInsecureCredentials = credential is null,
};
}

/// <summary>
/// Creates a gRPC channel for communicating with the Durable Task Scheduler service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static void UseDurableTaskScheduler(
options.EndpointAddress = connectionOptions.EndpointAddress;
options.TaskHubName = connectionOptions.TaskHubName;
options.Credential = connectionOptions.Credential;
options.AllowInsecureCredentials = connectionOptions.AllowInsecureCredentials;
},
configure);
}
Expand Down
9 changes: 7 additions & 2 deletions src/Worker/AzureManaged/DurableTaskSchedulerWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,17 @@ public static DurableTaskSchedulerWorkerOptions FromConnectionString(string conn
/// <param name="connectionString">The connection string to parse.</param>
/// <returns>A new instance of <see cref="DurableTaskSchedulerWorkerOptions"/>.</returns>
internal static DurableTaskSchedulerWorkerOptions FromConnectionString(
DurableTaskSchedulerConnectionString connectionString) => new()
DurableTaskSchedulerConnectionString connectionString)
{
TokenCredential? credential = GetCredentialFromConnectionString(connectionString);
return new DurableTaskSchedulerWorkerOptions()
{
EndpointAddress = connectionString.Endpoint,
TaskHubName = connectionString.TaskHubName,
Credential = GetCredentialFromConnectionString(connectionString),
Credential = credential,
AllowInsecureCredentials = credential is null,
};
}

/// <summary>
/// Creates a gRPC channel for communicating with the Durable Task Scheduler service.
Expand Down
9 changes: 6 additions & 3 deletions src/Worker/Grpc/GrpcDurableTaskWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public GrpcDurableTaskWorker(
/// <inheritdoc />
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker);
this.logger.StartingTaskHubWorker();
await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker, out string address);
this.logger.StartingTaskHubWorker(address);
await new Processor(this, new(callInvoker)).ExecuteAsync(stoppingToken);
}

Expand Down Expand Up @@ -75,22 +75,25 @@ static GrpcChannel GetChannel(string? address)
}
#endif

AsyncDisposable GetCallInvoker(out CallInvoker callInvoker)
AsyncDisposable GetCallInvoker(out CallInvoker callInvoker, out string address)
{
if (this.grpcOptions.Channel is GrpcChannel c)
{
callInvoker = c.CreateCallInvoker();
address = c.Target;
return default;
}

if (this.grpcOptions.CallInvoker is CallInvoker invoker)
{
callInvoker = invoker;
address = "(unspecified)";
return default;
}

c = GetChannel(this.grpcOptions.Address);
callInvoker = c.CreateCallInvoker();
address = c.Target;
return new AsyncDisposable(() => new(c.ShutdownAsync()));
}
}
4 changes: 2 additions & 2 deletions src/Worker/Grpc/Logs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace Microsoft.DurableTask.Worker.Grpc
/// </remarks>
static partial class Logs
{
[LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "Durable Task gRPC worker starting.")]
public static partial void StartingTaskHubWorker(this ILogger logger);
[LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "Durable Task gRPC worker starting and connecting to {endpoint}.")]
public static partial void StartingTaskHubWorker(this ILogger logger, string endpoint);

[LoggerMessage(EventId = 2, Level = LogLevel.Information, Message = "Durable Task gRPC worker has disconnected from gRPC server.")]
public static partial void SidecarDisconnected(this ILogger logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,32 @@ public void UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectl
clientOptions.Credential.Should().BeOfType<DefaultAzureCredential>();
}

[Fact]
public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigureCorrectly()
{
// Arrange
ServiceCollection services = new();
Mock<IDurableTaskClientBuilder> mockBuilder = new();
mockBuilder.Setup(b => b.Services).Returns(services);
string connectionString = $"Endpoint=http://localhost;Authentication=None;TaskHub={ValidTaskHub}";

// Act
mockBuilder.Object.UseDurableTaskScheduler(connectionString);

// Assert
ServiceProvider provider = services.BuildServiceProvider();
IOptions<GrpcDurableTaskClientOptions>? options = provider.GetService<IOptions<GrpcDurableTaskClientOptions>>();
options.Should().NotBeNull();

// Validate the configured options
var workerOptions = provider.GetRequiredService<IOptions<DurableTaskSchedulerClientOptions>>().Value;
workerOptions.EndpointAddress.Should().Be("http://localhost");
workerOptions.TaskHubName.Should().Be(ValidTaskHub);
workerOptions.Credential.Should().BeNull();
workerOptions.ResourceId.Should().Be("https://durabletask.io");
workerOptions.AllowInsecureCredentials.Should().BeTrue();
}

[Theory]
[InlineData(null, "testhub")]
[InlineData("myaccount.westus3.durabletask.io", null)]
Expand Down Expand Up @@ -124,7 +150,7 @@ public void UseDurableTaskScheduler_WithInvalidConnectionString_ShouldThrowArgum

// Assert
action.Should().Throw<ArgumentNullException>()
.WithMessage("Value cannot be null. (Parameter 'Endpoint')");
.WithMessage("Value cannot be null. (Parameter '*')");
}

[Theory]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,32 @@ public void UseDurableTaskScheduler_WithConnectionString_ShouldConfigureCorrectl
workerOptions.AllowInsecureCredentials.Should().BeFalse();
}

[Fact]
public void UseDurableTaskScheduler_WithLocalhostConnectionString_ShouldConfigureCorrectly()
{
// Arrange
ServiceCollection services = new();
Mock<IDurableTaskWorkerBuilder> mockBuilder = new();
mockBuilder.Setup(b => b.Services).Returns(services);
string connectionString = $"Endpoint=http://localhost;Authentication=None;TaskHub={ValidTaskHub}";

// Act
mockBuilder.Object.UseDurableTaskScheduler(connectionString);

// Assert
ServiceProvider provider = services.BuildServiceProvider();
IOptions<GrpcDurableTaskWorkerOptions>? options = provider.GetService<IOptions<GrpcDurableTaskWorkerOptions>>();
options.Should().NotBeNull();

// Validate the configured options
var workerOptions = provider.GetRequiredService<IOptions<DurableTaskSchedulerWorkerOptions>>().Value;
workerOptions.EndpointAddress.Should().Be("http://localhost");
workerOptions.TaskHubName.Should().Be(ValidTaskHub);
workerOptions.Credential.Should().BeNull();
workerOptions.ResourceId.Should().Be("https://durabletask.io");
workerOptions.AllowInsecureCredentials.Should().BeTrue();
}

[Theory]
[InlineData(null, "testhub")]
[InlineData("myaccount.westus3.durabletask.io", null)]
Expand Down Expand Up @@ -129,7 +155,7 @@ public void UseDurableTaskScheduler_WithInvalidConnectionString_ShouldThrowArgum

// Assert
action.Should().Throw<ArgumentNullException>()
.WithMessage("Value cannot be null. (Parameter 'Endpoint')");
.WithMessage("Value cannot be null. (Parameter '*')");
}

[Theory]
Expand Down
Loading