-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Dataflow Streaming][Multikey] Support MultiKey commits in windmill clients #38768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -35,6 +35,7 @@ | |||||||||||||||||||||||||||||||||
| import java.util.function.Function; | ||||||||||||||||||||||||||||||||||
| import javax.annotation.Nullable; | ||||||||||||||||||||||||||||||||||
| import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair; | ||||||||||||||||||||||||||||||||||
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill; | ||||||||||||||||||||||||||||||||||
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitStatus; | ||||||||||||||||||||||||||||||||||
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; | ||||||||||||||||||||||||||||||||||
| import org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingCommitRequestChunk; | ||||||||||||||||||||||||||||||||||
|
|
@@ -270,7 +271,7 @@ private void flushInternal(Map<Long, PendingRequest> requests) | |||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| if (requests.size() == 1) { | ||||||||||||||||||||||||||||||||||
| Map.Entry<Long, PendingRequest> elem = requests.entrySet().iterator().next(); | ||||||||||||||||||||||||||||||||||
| if (elem.getValue().request().getSerializedSize() | ||||||||||||||||||||||||||||||||||
| if (elem.getValue().serializedCommit().size() | ||||||||||||||||||||||||||||||||||
| > AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE) { | ||||||||||||||||||||||||||||||||||
| issueMultiChunkRequest(elem.getKey(), elem.getValue()); | ||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||
|
|
@@ -289,6 +290,7 @@ private void issueSingleRequest(long id, PendingRequest pendingRequest) | |||||||||||||||||||||||||||||||||
| .setComputationId(pendingRequest.computationId()) | ||||||||||||||||||||||||||||||||||
| .setRequestId(id) | ||||||||||||||||||||||||||||||||||
| .setShardingKey(pendingRequest.shardingKey()) | ||||||||||||||||||||||||||||||||||
| .setCommitType(pendingRequest.commitType()) | ||||||||||||||||||||||||||||||||||
| .setSerializedWorkItemCommit(pendingRequest.serializedCommit()); | ||||||||||||||||||||||||||||||||||
| StreamingCommitWorkRequest chunk = requestBuilder.build(); | ||||||||||||||||||||||||||||||||||
| synchronized (this) { | ||||||||||||||||||||||||||||||||||
|
|
@@ -318,7 +320,8 @@ private void issueBatchedRequest(Map<Long, PendingRequest> requests) | |||||||||||||||||||||||||||||||||
| chunkBuilder | ||||||||||||||||||||||||||||||||||
| .setRequestId(entry.getKey()) | ||||||||||||||||||||||||||||||||||
| .setShardingKey(request.shardingKey()) | ||||||||||||||||||||||||||||||||||
| .setSerializedWorkItemCommit(request.serializedCommit()); | ||||||||||||||||||||||||||||||||||
| .setSerializedWorkItemCommit(request.serializedCommit()) | ||||||||||||||||||||||||||||||||||
| .setCommitType(request.commitType()); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| StreamingCommitWorkRequest request = requestBuilder.build(); | ||||||||||||||||||||||||||||||||||
| synchronized (this) { | ||||||||||||||||||||||||||||||||||
|
|
@@ -360,7 +363,8 @@ private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) | |||||||||||||||||||||||||||||||||
| .setRequestId(id) | ||||||||||||||||||||||||||||||||||
| .setSerializedWorkItemCommit(chunk) | ||||||||||||||||||||||||||||||||||
| .setComputationId(pendingRequest.computationId()) | ||||||||||||||||||||||||||||||||||
| .setShardingKey(pendingRequest.shardingKey()); | ||||||||||||||||||||||||||||||||||
| .setShardingKey(pendingRequest.shardingKey()) | ||||||||||||||||||||||||||||||||||
| .setCommitType(pendingRequest.commitType()); | ||||||||||||||||||||||||||||||||||
| int remaining = serializedCommit.size() - end; | ||||||||||||||||||||||||||||||||||
| if (remaining > 0) { | ||||||||||||||||||||||||||||||||||
| chunkBuilder.setRemainingBytesForWorkItem(remaining); | ||||||||||||||||||||||||||||||||||
|
|
@@ -378,34 +382,34 @@ private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) | |||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| @AutoValue | ||||||||||||||||||||||||||||||||||
| abstract static class PendingRequest { | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private static PendingRequest create( | ||||||||||||||||||||||||||||||||||
| String computationId, WorkItemCommitRequest request, Consumer<CommitStatus> onDone) { | ||||||||||||||||||||||||||||||||||
| return new AutoValue_GrpcCommitWorkStream_PendingRequest(computationId, request, onDone); | ||||||||||||||||||||||||||||||||||
| static PendingRequest create( | ||||||||||||||||||||||||||||||||||
| String computationId, | ||||||||||||||||||||||||||||||||||
| long shardingKey, | ||||||||||||||||||||||||||||||||||
| ByteString serializedCommit, | ||||||||||||||||||||||||||||||||||
| StreamingCommitRequestChunk.CommitType commitType, | ||||||||||||||||||||||||||||||||||
| Consumer<CommitStatus> onDone) { | ||||||||||||||||||||||||||||||||||
| return new AutoValue_GrpcCommitWorkStream_PendingRequest( | ||||||||||||||||||||||||||||||||||
| computationId, shardingKey, serializedCommit, commitType, onDone); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| abstract String computationId(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| abstract WorkItemCommitRequest request(); | ||||||||||||||||||||||||||||||||||
| abstract long shardingKey(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| abstract ByteString serializedCommit(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| abstract StreamingCommitRequestChunk.CommitType commitType(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| abstract Consumer<CommitStatus> onDone(); | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private long getBytes() { | ||||||||||||||||||||||||||||||||||
| return (long) request().getSerializedSize() + computationId().length(); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private ByteString serializedCommit() { | ||||||||||||||||||||||||||||||||||
| return request().toByteString(); | ||||||||||||||||||||||||||||||||||
| return (long) serializedCommit().size() + computationId().length(); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private void completeWithStatus(CommitStatus commitStatus) { | ||||||||||||||||||||||||||||||||||
| onDone().accept(commitStatus); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private long shardingKey() { | ||||||||||||||||||||||||||||||||||
| return request().getShardingKey(); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| private void abort() { | ||||||||||||||||||||||||||||||||||
| completeWithStatus(CommitStatus.ABORTED); | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
@@ -462,7 +466,34 @@ public boolean commitWorkItem( | |||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| PendingRequest request = PendingRequest.create(computation, commitRequest, onDone); | ||||||||||||||||||||||||||||||||||
| PendingRequest request = | ||||||||||||||||||||||||||||||||||
| PendingRequest.create( | ||||||||||||||||||||||||||||||||||
| computation, | ||||||||||||||||||||||||||||||||||
| commitRequest.getShardingKey(), | ||||||||||||||||||||||||||||||||||
| commitRequest.toByteString(), | ||||||||||||||||||||||||||||||||||
| StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_SINGLE_KEY, | ||||||||||||||||||||||||||||||||||
| onDone); | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+469
to
+475
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pass the
Suggested change
|
||||||||||||||||||||||||||||||||||
| add(idGenerator.incrementAndGet(), request); | ||||||||||||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||
| public boolean commitMultiKeyWorkItem( | ||||||||||||||||||||||||||||||||||
| String computation, | ||||||||||||||||||||||||||||||||||
| Windmill.MultiKeyWorkItemCommitRequest commitRequest, | ||||||||||||||||||||||||||||||||||
| Consumer<CommitStatus> onDone) { | ||||||||||||||||||||||||||||||||||
| if (!canAccept(commitRequest.getSerializedSize() + computation.length())) { | ||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| Preconditions.checkArgument(commitRequest.getRequestsCount() > 0); | ||||||||||||||||||||||||||||||||||
| PendingRequest request = | ||||||||||||||||||||||||||||||||||
| PendingRequest.create( | ||||||||||||||||||||||||||||||||||
| computation, | ||||||||||||||||||||||||||||||||||
| // Any key in the batch for routing | ||||||||||||||||||||||||||||||||||
| commitRequest.getRequests(0).getShardingKey(), | ||||||||||||||||||||||||||||||||||
| commitRequest.toByteString(), | ||||||||||||||||||||||||||||||||||
| StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_MULTI_KEY, | ||||||||||||||||||||||||||||||||||
| onDone); | ||||||||||||||||||||||||||||||||||
|
Comment on lines
+489
to
+496
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pass the
Suggested change
|
||||||||||||||||||||||||||||||||||
| add(idGenerator.incrementAndGet(), request); | ||||||||||||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance Optimization: Defer Serialization to the Stream Writer Thread
Currently,
PendingRequesteagerly serializes the commit request to aByteStringon the committer thread (insidecommitWorkItemandcommitMultiKeyWorkItem). Eager serialization on the committer thread can become a performance bottleneck under high throughput.By holding the
MessageLite(or the vendored equivalentorg.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.MessageLite) directly inPendingRequestand using@AutoValue.Memoizedto lazily serialize it, we can:serializedCommit()is actually called).serializedCommit()is accessed multiple times.