Skip to content

Commit 0f79297

Browse files
committed
worker done
1 parent df4a0e4 commit 0f79297

2 files changed

Lines changed: 280 additions & 109 deletions

File tree

azuremanaged/worker/src/main/java/com/microsoft/durabletask/worker/azuremanaged/DurableTaskSchedulerWorkerExtensions.java

Lines changed: 133 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -19,84 +19,153 @@
1919
import io.grpc.CallOptions;
2020

2121
import java.util.Objects;
22+
import javax.annotation.Nullable;
2223

2324
/**
24-
* Extension methods for creating DurableTaskGrpcWorker instances that connect to Azure-managed Durable Task Scheduler.
25+
* Extension methods for creating DurableTaskWorker instances that connect to Azure-managed Durable Task Scheduler.
26+
* This class provides various methods to create and configure workers using either connection strings or explicit parameters.
2527
*/
26-
public class DurableTaskSchedulerWorkerExtensions {
28+
public static class DurableTaskSchedulerWorkerExtensions {
29+
/**
30+
* Creates a DurableTaskWorker using a connection string.
31+
*
32+
* @param connectionString The connection string for Azure-managed Durable Task Scheduler.
33+
* @return A new DurableTaskWorker instance.
34+
*/
35+
public static DurableTaskWorker createWorker(String connectionString) {
36+
return createWorker(connectionString, null);
37+
}
2738

2839
/**
29-
* Creates a DurableTaskGrpcWorkerBuilder that connects to Azure-managed Durable Task Scheduler.
40+
* Creates a DurableTaskWorker using a connection string and token credential.
3041
*
31-
* @param options The options for connecting to Azure-managed Durable Task Scheduler.
32-
* @return A new DurableTaskGrpcWorkerBuilder instance.
42+
* @param connectionString The connection string for Azure-managed Durable Task Scheduler.
43+
* @param tokenCredential The token credential for authentication, or null to use connection string credentials.
44+
* @return A new DurableTaskWorker instance.
45+
* @throws NullPointerException if connectionString is null
3346
*/
34-
public static DurableTaskGrpcWorkerBuilder createWorkerBuilder(DurableTaskSchedulerWorkerOptions options) {
35-
Objects.requireNonNull(options, "options must not be null");
36-
options.validate();
47+
public static DurableTaskWorker createWorker(String connectionString, @Nullable TokenCredential tokenCredential) {
48+
Objects.requireNonNull(connectionString, "connectionString must not be null");
49+
return createWorkerFromOptions(
50+
DurableTaskSchedulerWorkerOptions.fromConnectionString(connectionString, tokenCredential));
51+
}
3752

38-
// Create the access token cache if credentials are provided
39-
AccessTokenCache tokenCache = null;
40-
TokenCredential credential = options.getTokenCredential();
41-
if (credential != null) {
42-
TokenRequestContext context = new TokenRequestContext();
43-
context.addScopes(new String[] { options.getResourceId() + "/.default" });
44-
tokenCache = new AccessTokenCache(credential, context, options.getTokenRefreshMargin());
45-
}
53+
/**
54+
* Creates a DurableTaskWorker using explicit endpoint and task hub parameters.
55+
*
56+
* @param endpoint The endpoint address for Azure-managed Durable Task Scheduler.
57+
* @param taskHubName The name of the task hub to connect to.
58+
* @param tokenCredential The token credential for authentication, or null for anonymous access.
59+
* @return A new DurableTaskWorker instance.
60+
* @throws NullPointerException if endpoint or taskHubName is null
61+
*/
62+
public static DurableTaskWorker createWorker(
63+
String endpoint,
64+
String taskHubName,
65+
@Nullable TokenCredential tokenCredential) {
66+
Objects.requireNonNull(endpoint, "endpoint must not be null");
67+
Objects.requireNonNull(taskHubName, "taskHubName must not be null");
68+
69+
return createWorkerFromOptions(new DurableTaskSchedulerWorkerOptions()
70+
.setEndpointAddress(endpoint)
71+
.setTaskHubName(taskHubName)
72+
.setCredential(tokenCredential));
73+
}
4674

47-
// Create the gRPC channel
48-
Channel grpcChannel = createGrpcChannel(options, tokenCache);
75+
/**
76+
* Configures a DurableTaskGrpcWorkerBuilder to use Azure-managed Durable Task Scheduler with a connection string.
77+
*
78+
* @param builder The builder to configure.
79+
* @param connectionString The connection string for Azure-managed Durable Task Scheduler.
80+
* @param tokenCredential The token credential for authentication, or null to use connection string credentials.
81+
* @throws NullPointerException if builder or connectionString is null
82+
*/
83+
public static void useDurableTaskScheduler(
84+
DurableTaskGrpcWorkerBuilder builder,
85+
String connectionString,
86+
@Nullable TokenCredential tokenCredential) {
87+
Objects.requireNonNull(builder, "builder must not be null");
88+
Objects.requireNonNull(connectionString, "connectionString must not be null");
89+
90+
configureBuilder(builder,
91+
DurableTaskSchedulerWorkerOptions.fromConnectionString(connectionString, tokenCredential));
92+
}
4993

50-
// Create and return the worker builder
51-
return new DurableTaskGrpcWorkerBuilder()
52-
.grpcChannel(grpcChannel);
94+
/**
95+
* Configures a DurableTaskGrpcWorkerBuilder to use Azure-managed Durable Task Scheduler with explicit parameters.
96+
*
97+
* @param builder The builder to configure.
98+
* @param endpoint The endpoint address for Azure-managed Durable Task Scheduler.
99+
* @param taskHubName The name of the task hub to connect to.
100+
* @param tokenCredential The token credential for authentication, or null for anonymous access.
101+
* @throws NullPointerException if builder, endpoint, or taskHubName is null
102+
*/
103+
public static void useDurableTaskScheduler(
104+
DurableTaskGrpcWorkerBuilder builder,
105+
String endpoint,
106+
String taskHubName,
107+
@Nullable TokenCredential tokenCredential) {
108+
Objects.requireNonNull(builder, "builder must not be null");
109+
Objects.requireNonNull(endpoint, "endpoint must not be null");
110+
Objects.requireNonNull(taskHubName, "taskHubName must not be null");
111+
112+
configureBuilder(builder, new DurableTaskSchedulerWorkerOptions()
113+
.setEndpointAddress(endpoint)
114+
.setTaskHubName(taskHubName)
115+
.setCredential(tokenCredential));
53116
}
54117

55-
private static Channel createGrpcChannel(DurableTaskSchedulerWorkerOptions options, AccessTokenCache tokenCache) {
56-
// Normalize the endpoint URL and add DNS scheme for gRPC name resolution
57-
String endpoint = "dns:///" + options.getEndpoint();
118+
/**
119+
* Creates a DurableTaskGrpcWorkerBuilder configured for Azure-managed Durable Task Scheduler using a connection string.
120+
*
121+
* @param connectionString The connection string for Azure-managed Durable Task Scheduler.
122+
* @param tokenCredential The token credential for authentication, or null to use connection string credentials.
123+
* @return A new configured DurableTaskGrpcWorkerBuilder instance.
124+
* @throws NullPointerException if connectionString is null
125+
*/
126+
public static DurableTaskGrpcWorkerBuilder createWorkerBuilder(
127+
String connectionString,
128+
@Nullable TokenCredential tokenCredential) {
129+
Objects.requireNonNull(connectionString, "connectionString must not be null");
130+
return createBuilderFromOptions(
131+
DurableTaskSchedulerWorkerOptions.fromConnectionString(connectionString, tokenCredential));
132+
}
58133

59-
// Create metadata interceptor to add task hub name and auth token
60-
ClientInterceptor metadataInterceptor = new ClientInterceptor() {
61-
@Override
62-
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
63-
MethodDescriptor<ReqT, RespT> method,
64-
CallOptions callOptions,
65-
Channel next) {
66-
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
67-
next.newCall(method, callOptions)) {
68-
@Override
69-
public void start(ClientCall.Listener<RespT> responseListener, Metadata headers) {
70-
headers.put(
71-
Metadata.Key.of("taskhub", Metadata.ASCII_STRING_MARSHALLER),
72-
options.getTaskHubName()
73-
);
74-
75-
// Add authorization token if credentials are configured
76-
if (tokenCache != null) {
77-
String token = tokenCache.getToken().getToken();
78-
headers.put(
79-
Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER),
80-
"Bearer " + token
81-
);
82-
}
83-
84-
super.start(responseListener, headers);
85-
}
86-
};
87-
}
88-
};
134+
/**
135+
* Creates a DurableTaskGrpcWorkerBuilder configured for Azure-managed Durable Task Scheduler using explicit parameters.
136+
*
137+
* @param endpoint The endpoint address for Azure-managed Durable Task Scheduler.
138+
* @param taskHubName The name of the task hub to connect to.
139+
* @param tokenCredential The token credential for authentication, or null for anonymous access.
140+
* @return A new configured DurableTaskGrpcWorkerBuilder instance.
141+
* @throws NullPointerException if endpoint or taskHubName is null
142+
*/
143+
public static DurableTaskGrpcWorkerBuilder createWorkerBuilder(
144+
String endpoint,
145+
String taskHubName,
146+
@Nullable TokenCredential tokenCredential) {
147+
Objects.requireNonNull(endpoint, "endpoint must not be null");
148+
Objects.requireNonNull(taskHubName, "taskHubName must not be null");
89149

90-
// Build the channel with appropriate security settings
91-
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget(endpoint)
92-
.intercept(metadataInterceptor);
93-
94-
if (!options.isAllowInsecure()) {
95-
builder.useTransportSecurity();
96-
} else {
97-
builder.usePlaintext();
98-
}
150+
return createBuilderFromOptions(new DurableTaskSchedulerWorkerOptions()
151+
.setEndpointAddress(endpoint)
152+
.setTaskHubName(taskHubName)
153+
.setCredential(tokenCredential));
154+
}
155+
156+
// Private helper methods to reduce code duplication
157+
158+
private static DurableTaskWorker createWorkerFromOptions(DurableTaskSchedulerWorkerOptions options) {
159+
return createBuilderFromOptions(options).build();
160+
}
161+
162+
private static DurableTaskGrpcWorkerBuilder createBuilderFromOptions(DurableTaskSchedulerWorkerOptions options) {
163+
Channel grpcChannel = options.createGrpcChannel();
164+
return new DurableTaskGrpcWorkerBuilder().grpcChannel(grpcChannel);
165+
}
99166

100-
return builder.build();
167+
private static void configureBuilder(DurableTaskGrpcWorkerBuilder builder, DurableTaskSchedulerWorkerOptions options) {
168+
Channel grpcChannel = options.createGrpcChannel();
169+
builder.grpcChannel(grpcChannel);
101170
}
102171
}

0 commit comments

Comments
 (0)