From 230fa1e70254f277ffd0a2f0c0c6f0353f0f98f7 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Tue, 7 Apr 2026 06:58:40 -0700 Subject: [PATCH] Moves finalize ids out of WorkItem proto and into AssembledWorkItem class. (#37954) * Moves applied finalize ids out of windmill WorkItem proto since that field isn't present in internal version. Moves it to AssembledWorkItem. Also adds internal GetWorkResponse.applied_finalize_ids, which is used by Appliance, so now validates runner tests pass. * Reserves previously used proto field and manually triggers post submits. --- ...Commit_Java_ValidatesRunner_Dataflow_Streaming.json | 2 +- ...Java_ValidatesRunner_Dataflow_Streaming_Engine.json | 2 +- runners/google-cloud-dataflow-java/build.gradle | 2 -- .../dataflow/worker/StreamingDataflowWorker.java | 2 ++ .../streaming/harness/SingleSourceWorkerHarness.java | 10 ++++++++++ .../client/grpc/GetWorkResponseChunkAssembler.java | 7 +++++-- .../windmill/client/grpc/GrpcDirectGetWorkStream.java | 1 + .../worker/windmill/client/grpc/GrpcGetWorkStream.java | 1 + .../worker/windmill/work/WorkItemReceiver.java | 1 + .../worker/windmill/work/WorkItemScheduler.java | 2 ++ .../work/processing/StreamingWorkScheduler.java | 6 +++++- .../runners/dataflow/worker/FakeWindmillServer.java | 1 + .../FanOutStreamingEngineWorkerHarnessTest.java | 1 + .../streaming/harness/WindmillStreamSenderTest.java | 1 + .../client/grpc/GrpcDirectGetWorkStreamTest.java | 5 +++++ .../windmill/client/grpc/GrpcWindmillServerTest.java | 2 ++ .../worker/windmill/src/main/proto/windmill.proto | 6 +++--- 17 files changed, 42 insertions(+), 10 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index 0c41d2bcf2fe..090751435f20 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 6, + "modification": 7, } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json index e623d3373a93..50d17c108f2e 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 1, + "modification": 2, } diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index b7c5ce078db3..d06a7be7a082 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -491,8 +491,6 @@ task validatesRunnerStreaming { description "Validates Dataflow runner forcing streaming mode" dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [ name: 'validatesRunnerLegacyWorkerTestStreaming', - // Streaming appliance currently fails bundle finalizer tests. - excludedCategories: validatesRunnerStreamingConfig.excludedCategories + [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ], ])) } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 7f379625b733..1dcedf4370d7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -401,12 +401,14 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies) -> checkNotNull(computationStateCache) .get(processingContext.computationId()) .ifPresent( computationState -> { memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); streamingWorkScheduler.scheduleWork( computationState, workItem, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java index af7746d69028..f41223310385 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java @@ -158,12 +158,16 @@ private void streamingEngineDispatchLoop( drainMode, workItem, serializedWorkItemSize, + appliedFinalizeIds, getWorkStreamLatencies) -> computationStateFetcher .apply(computationId) .ifPresent( computationState -> { waitForResources.run(); + if (!appliedFinalizeIds.isEmpty()) { + streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); + } streamingWorkScheduler.scheduleWork( computationState, workItem, @@ -214,6 +218,12 @@ private void applianceDispatchLoop(Supplier getWorkFn) sleepUninterruptibly(backoff, TimeUnit.MILLISECONDS); backoff = Math.min(1000, backoff * 2); } while (isRunning.get()); + ImmutableList appliedFinalizeIds = + ImmutableList.copyOf( + Preconditions.checkNotNull(workResponse).getAppliedFinalizeIdsList()); + if (!appliedFinalizeIds.isEmpty()) { + streamingWorkScheduler.queueAppliedFinalizeIds(appliedFinalizeIds); + } for (Windmill.ComputationWorkItems computationWork : Preconditions.checkNotNull(workResponse).getWorkList()) { String computationId = computationWork.getComputationId(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java index 4afee8a70df1..15acee239aa6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java @@ -100,12 +100,12 @@ List append(Windmill.StreamingGetWorkResponseChunk chunk) { private Optional flushToWorkItem() { try { workItemBuilder.mergeFrom(data); - workItemBuilder.addAllAppliedFinalizeIds(appliedFinalizeIds); return Optional.of( AssembledWorkItem.create( workItemBuilder.build(), Preconditions.checkNotNull(metadata), workTimingInfosTracker.getLatencyAttributions(), + ImmutableList.copyOf(appliedFinalizeIds), bufferedSize)); } catch (IOException e) { LOG.error("Failed to parse work item from stream: ", e); @@ -149,9 +149,10 @@ private static AssembledWorkItem create( WorkItem workItem, ComputationMetadata computationMetadata, ImmutableList latencyAttributions, + ImmutableList appliedFinalizeIds, long size) { return new AutoValue_GetWorkResponseChunkAssembler_AssembledWorkItem( - workItem, computationMetadata, latencyAttributions, size); + workItem, computationMetadata, latencyAttributions, appliedFinalizeIds, size); } abstract WorkItem workItem(); @@ -160,6 +161,8 @@ private static AssembledWorkItem create( abstract ImmutableList latencyAttributions(); + abstract ImmutableList appliedFinalizeIds(); + abstract long bufferedSize(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java index 8eb4c51a2b49..de8ebf14b709 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java @@ -282,6 +282,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) { createWatermarks(workItem, metadata), createProcessingContext(metadata.computationId()), metadata.drainMode(), + assembledWorkItem.appliedFinalizeIds(), assembledWorkItem.latencyAttributions()); budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize()); GetWorkBudget extension = budgetTracker.computeBudgetExtension(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java index 58407ad8147f..af0d3e907e60 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java @@ -206,6 +206,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) { assembledWorkItem.computationMetadata().drainMode(), assembledWorkItem.workItem(), assembledWorkItem.bufferedSize(), + assembledWorkItem.appliedFinalizeIds(), assembledWorkItem.latencyAttributions()); // Record the fact that there are now fewer outstanding messages and bytes on the stream. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java index 71e524a308af..a45e82cb23ed 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java @@ -33,5 +33,6 @@ void receiveWork( boolean drainMode, Windmill.WorkItem workItem, long serializedWorkItemSize, + ImmutableList appliedFinalizeIds, ImmutableList getWorkStreamLatencies); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java index 4121aa758ba7..a2dfa50a0d63 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java @@ -36,6 +36,7 @@ public interface WorkItemScheduler { * @param watermarks processing watermarks for the workItem. * @param processingContext for processing the workItem. * @param drainMode is job is draining. + * @param appliedFinalizeIds Any applied finalize ids that should have their callbacks run. * @param getWorkStreamLatencies Latencies per processing stage for the WorkItem for reporting * back to Streaming Engine backend. */ @@ -45,5 +46,6 @@ void scheduleWork( Watermarks watermarks, Work.ProcessingContext processingContext, boolean drainMode, + ImmutableList appliedFinalizeIds, ImmutableList getWorkStreamLatencies); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 6d53b1f5cb78..4dd8ee3d0c27 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -219,6 +219,11 @@ public void scheduleWork( work -> processWork(computationState, work, getWorkStreamLatencies))); } + /** Adds any applied finalize ids to the commit finalizer to have their callbacks executed. */ + public void queueAppliedFinalizeIds(ImmutableList appliedFinalizeIds) { + commitFinalizer.finalizeCommits(appliedFinalizeIds); + } + /** * Executes the user DoFns processing {@link Work} then queues the {@link Commit}(s) to be sent to * backing persistent store to mark that the {@link Work} has finished processing. May retry @@ -246,7 +251,6 @@ private void processWork(ComputationState computationState, Work work) { // Before any processing starts, call any pending OnCommit callbacks. Nothing that requires // cleanup should be done before this, since we might exit early here. commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); - commitFinalizer.finalizeCommits(workItem.getAppliedFinalizeIdsList()); if (workItem.getSourceState().getOnlyFinalize()) { Windmill.WorkItemCommitRequest.Builder outputBuilder = initializeOutputBuilder(key, workItem); outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true)); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java index 1c5f7504bf32..5be8ec0a6c72 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java @@ -278,6 +278,7 @@ public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedExcep computationWork.getDrainMode(), workItem, workItem.getSerializedSize(), + ImmutableList.of(), ImmutableList.of( LatencyAttribution.newBuilder() .setState(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java index 5afb9cf93c5f..dc4472728c54 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java @@ -126,6 +126,7 @@ private static WorkItemScheduler noOpProcessWorkItemFn() { watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies) -> {}; } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java index 3217c736adb1..457f75593e23 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java @@ -69,6 +69,7 @@ public class WindmillStreamSenderTest { watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies) -> {}; @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private ManagedChannel inProcessChannel; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java index 76883bebdac0..71e1300d90cf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java @@ -71,6 +71,7 @@ public class GrpcDirectGetWorkStreamTest { watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies) -> {}; private static final Windmill.JobHeader TEST_JOB_HEADER = Windmill.JobHeader.newBuilder() @@ -285,6 +286,7 @@ public void testConsumedWorkItem_computesAndSendsCorrectExtension() throws Inter watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies) -> { scheduledWorkItems.add(work); }); @@ -334,6 +336,7 @@ public void testConsumedWorkItem_doesNotSendExtensionIfOutstandingBudgetHigh() watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies) -> scheduledWorkItems.add(work)); Windmill.WorkItem workItem = Windmill.WorkItem.newBuilder() @@ -372,6 +375,7 @@ public void testConsumedWorkItems() throws InterruptedException { watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies) -> { scheduledWorkItems.add(work); }); @@ -416,6 +420,7 @@ public void testConsumedWorkItems_itemsSplitAcrossResponses() throws Interrupted watermarks, processingContext, drainMode, + appliedFinalizeIds, getWorkStreamLatencies) -> { scheduledWorkItems.add(work); }); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java index d68f9a1e2167..130359212beb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java @@ -339,6 +339,7 @@ public void onCompleted() { boolean drainMode, WorkItem workItem, long serializedWorkItemSize, + ImmutableList appliedFinalizeIds, ImmutableList getWorkStreamLatencies) -> { latch.countDown(); assertEquals(inputDataWatermark, new Instant(18)); @@ -474,6 +475,7 @@ public void onCompleted() { boolean drainMode, WorkItem workItem, long serializedWorkItemSize, + ImmutableList appliedFinalizeIds, ImmutableList getWorkStreamLatencies) -> { assertEquals(inputDataWatermark, new Instant(18)); assertEquals(synchronizedProcessingTime, new Instant(17)); diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index 0a4ba5f4e147..b7579cbacb8e 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -448,9 +448,7 @@ message WorkItem { // present, this field includes metadata associated with any hot key. optional HotKeyInfo hot_key_info = 11; - repeated int64 applied_finalize_ids = 16; - - reserved 12, 13, 14, 15; + reserved 12, 13, 14, 15, 16; } message ComputationWorkItems { @@ -481,6 +479,8 @@ message GetWorkRequest { message GetWorkResponse { repeated ComputationWorkItems work = 1; + // Finalize ids associated with successfully applied work from this worker + repeated int64 applied_finalize_ids = 2 [packed = true]; } // GetData