Skip to content

Commit 4e7e86c

Browse files
committed
Merge branch 'main' into vabachu/entities
2 parents 7a99ac0 + 24ee90e commit 4e7e86c

File tree

8 files changed

+1012
-12
lines changed

8 files changed

+1012
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
## Unreleased
2+
* Add work item filtering support for `DurableTaskGrpcWorker` to enable worker-side filtering of orchestration and activity work items ([#275](https://github.com/microsoft/durabletask-java/pull/275))
23
* Add support for calls to HTTP endpoints ([#271](https://github.com/microsoft/durabletask-java/pull/271))
34
* Add getSuspendPostUri and getResumePostUri getters to HttpManagementPayload ([#264](https://github.com/microsoft/durabletask-java/pull/264))
45

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,15 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
4848
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
4949
private final int maxConcurrentEntityWorkItems;
5050
private final ExecutorService workItemExecutor;
51+
52+
private final WorkItemFilter workItemFilter;
53+
private final GetWorkItemsRequest getWorkItemsRequest;
5154
private final PayloadHelper payloadHelper;
5255
private final int chunkSizeBytes;
5356

5457
private final TaskHubSidecarServiceBlockingStub sidecarClient;
5558

56-
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
59+
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder, WorkItemFilter workItemFilter) {
5760
this.orchestrationFactories.putAll(builder.orchestrationFactories);
5861
this.activityFactories.putAll(builder.activityFactories);
5962
this.entityFactories.putAll(builder.entityFactories);
@@ -94,6 +97,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
9497
return t;
9598
},
9699
new ThreadPoolExecutor.CallerRunsPolicy());
100+
this.workItemFilter = workItemFilter;
101+
this.getWorkItemsRequest = buildGetWorkItemsRequest();
97102
this.payloadHelper = builder.payloadStore != null
98103
? new PayloadHelper(builder.payloadStore, builder.largePayloadOptions)
99104
: null;
@@ -173,16 +178,7 @@ public void startAndBlock() {
173178
// TODO: How do we interrupt manually?
174179
while (true) {
175180
try {
176-
GetWorkItemsRequest.Builder requestBuilder = GetWorkItemsRequest.newBuilder();
177-
if (!this.entityFactories.isEmpty()) {
178-
// Signal to the sidecar that this worker can handle entity work items
179-
requestBuilder.setMaxConcurrentEntityWorkItems(this.maxConcurrentEntityWorkItems);
180-
}
181-
if (this.payloadHelper != null) {
182-
requestBuilder.addCapabilities(WorkerCapability.WORKER_CAPABILITY_LARGE_PAYLOADS);
183-
}
184-
GetWorkItemsRequest getWorkItemsRequest = requestBuilder.build();
185-
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
181+
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(this.getWorkItemsRequest);
186182
while (workItemStream.hasNext()) {
187183
WorkItem workItem = workItemStream.next();
188184
RequestCase requestType = workItem.getRequestCase();
@@ -605,6 +601,47 @@ public void stop() {
605601
this.close();
606602
}
607603

604+
/**
605+
* Returns the work item filter configured for this worker, or {@code null} if none.
606+
*/
607+
WorkItemFilter getWorkItemFilter() {
608+
return this.workItemFilter;
609+
}
610+
611+
private GetWorkItemsRequest buildGetWorkItemsRequest() {
612+
GetWorkItemsRequest.Builder builder = GetWorkItemsRequest.newBuilder();
613+
if (!this.entityFactories.isEmpty()) {
614+
// Signal to the sidecar that this worker can handle entity work items
615+
builder.setMaxConcurrentEntityWorkItems(this.maxConcurrentEntityWorkItems);
616+
}
617+
if (this.workItemFilter != null) {
618+
builder.setWorkItemFilters(toProtoWorkItemFilters(this.workItemFilter));
619+
}
620+
if (this.payloadHelper != null) {
621+
builder.addCapabilities(WorkerCapability.WORKER_CAPABILITY_LARGE_PAYLOADS);
622+
}
623+
return builder.build();
624+
}
625+
626+
static WorkItemFilters toProtoWorkItemFilters(WorkItemFilter filter) {
627+
WorkItemFilters.Builder builder = WorkItemFilters.newBuilder();
628+
for (WorkItemFilter.OrchestrationFilter orch : filter.getOrchestrations()) {
629+
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.OrchestrationFilter.Builder orchBuilder =
630+
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.OrchestrationFilter.newBuilder()
631+
.setName(orch.getName());
632+
orchBuilder.addAllVersions(orch.getVersions());
633+
builder.addOrchestrations(orchBuilder.build());
634+
}
635+
for (WorkItemFilter.ActivityFilter activity : filter.getActivities()) {
636+
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.ActivityFilter.Builder actBuilder =
637+
com.microsoft.durabletask.implementation.protobuf.OrchestratorService.ActivityFilter.newBuilder()
638+
.setName(activity.getName());
639+
actBuilder.addAllVersions(activity.getVersions());
640+
builder.addActivities(actBuilder.build());
641+
}
642+
return builder.build();
643+
}
644+
608645
/**
609646
* Sends an orchestrator response, chunking it if it exceeds the configured chunk size.
610647
*/

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@
66

77
import java.lang.reflect.InvocationTargetException;
88
import java.time.Duration;
9+
import java.util.ArrayList;
10+
import java.util.Collections;
911
import java.util.HashMap;
12+
import java.util.List;
1013
import java.util.Locale;
1114

1215
/**
@@ -42,6 +45,8 @@ public final class DurableTaskGrpcWorkerBuilder {
4245
DurableTaskGrpcWorkerVersioningOptions versioningOptions;
4346
int maxConcurrentEntityWorkItems = 1;
4447
int maxWorkItemThreads;
48+
private WorkItemFilter workItemFilter;
49+
private boolean autoGenerateWorkItemFilters;
4550
PayloadStore payloadStore;
4651
LargePayloadOptions largePayloadOptions;
4752
int chunkSizeBytes = DEFAULT_CHUNK_SIZE_BYTES;
@@ -309,6 +314,24 @@ public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersionin
309314
return this;
310315
}
311316

317+
/**
318+
* Sets explicit work item filters for this worker. When set, only work items matching the filters
319+
* will be dispatched to this worker by the backend.
320+
* <p>
321+
* Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
322+
* only receives work items it can handle. However, if an orchestration calls a task type
323+
* (e.g., an activity or sub-orchestrator) that is not registered with any connected worker,
324+
* the call may hang indefinitely instead of failing with an error.
325+
*
326+
* @param workItemFilter the work item filter to use, or {@code null} to disable filtering
327+
* @return this builder object
328+
*/
329+
public DurableTaskGrpcWorkerBuilder useWorkItemFilters(WorkItemFilter workItemFilter) {
330+
this.workItemFilter = workItemFilter;
331+
this.autoGenerateWorkItemFilters = false;
332+
return this;
333+
}
334+
312335
/**
313336
* Enables large payload externalization with default options.
314337
* <p>
@@ -346,6 +369,27 @@ public DurableTaskGrpcWorkerBuilder useExternalizedPayloads(PayloadStore payload
346369
return this;
347370
}
348371

372+
/**
373+
* Enables automatic work item filtering by generating filters from the registered
374+
* orchestrations and activities. When enabled, the backend will only dispatch work items
375+
* for registered orchestrations and activities to this worker.
376+
* <p>
377+
* Work item filtering can improve efficiency in multi-worker deployments by ensuring each worker
378+
* only receives work items it can handle. However, if an orchestration calls a task type
379+
* (e.g., an activity or sub-orchestrator) that is not registered with any connected worker,
380+
* the call may hang indefinitely instead of failing with an error.
381+
* <p>
382+
* Only use this method when all task types referenced by orchestrations are guaranteed to be
383+
* registered with at least one connected worker.
384+
*
385+
* @return this builder object
386+
*/
387+
public DurableTaskGrpcWorkerBuilder useWorkItemFilters() {
388+
this.autoGenerateWorkItemFilters = true;
389+
this.workItemFilter = null;
390+
return this;
391+
}
392+
349393
/**
350394
* Sets the maximum size in bytes for a single orchestrator response chunk sent over gRPC.
351395
* Responses larger than this will be automatically split into multiple chunks.
@@ -381,6 +425,31 @@ int getChunkSizeBytes() {
381425
* @return a new {@link DurableTaskGrpcWorker} object
382426
*/
383427
public DurableTaskGrpcWorker build() {
384-
return new DurableTaskGrpcWorker(this);
428+
WorkItemFilter resolvedFilter = this.autoGenerateWorkItemFilters
429+
? buildAutoWorkItemFilter()
430+
: this.workItemFilter;
431+
return new DurableTaskGrpcWorker(this, resolvedFilter);
432+
}
433+
434+
private WorkItemFilter buildAutoWorkItemFilter() {
435+
List<String> versions = Collections.emptyList();
436+
if (this.versioningOptions != null
437+
&& this.versioningOptions.getMatchStrategy() == DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.STRICT
438+
&& this.versioningOptions.getVersion() != null) {
439+
versions = Collections.singletonList(this.versioningOptions.getVersion());
440+
}
441+
442+
WorkItemFilter.Builder builder = WorkItemFilter.newBuilder();
443+
List<String> orchestrationNames = new ArrayList<>(this.orchestrationFactories.keySet());
444+
Collections.sort(orchestrationNames);
445+
for (String name : orchestrationNames) {
446+
builder.addOrchestration(name, versions);
447+
}
448+
List<String> activityNames = new ArrayList<>(this.activityFactories.keySet());
449+
Collections.sort(activityNames);
450+
for (String name : activityNames) {
451+
builder.addActivity(name, versions);
452+
}
453+
return builder.build();
385454
}
386455
}

0 commit comments

Comments
 (0)