diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java index 9503893e2cf1..417b16fa3655 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java @@ -226,12 +226,15 @@ public void onResponse(StreamingGetDataResponse chunk) { @Override public boolean hasPendingRequests() { + // Note the batchesSizeSupplier may reflect batches that could be sent on another physical + // stream. However we treat them as possibly pending on all physical streams to ensure that we + // recreate streams to send them. return !pending.isEmpty() || batchesSizeSupplier.get() > 0; } @Override public void onDone(Status status) { - if (status.isOk() && hasPendingRequests()) { + if (status.isOk() && !pending.isEmpty()) { LOG.warn("Pending requests not expected on successful GetData stream flushing."); } for (AppendableInputStream responseStream : pending.values()) {