[Dataflow Streaming][Multikey] Support MultiKey commits in windmill clients#38768
[Dataflow Streaming][Multikey] Support MultiKey commits in windmill clients#38768arunpandianp wants to merge 1 commit into
Conversation
arunpandianp
commented
Jun 2, 2026
- Add MultiKeyWorkItemCommitRequest to windmill.proto.
- Support MultiKey commits in Commit model and StreamingEngineWorkCommitter.
- Update GrpcCommitWorkStream to batch and stream MultiKey commit requests.
…lients - Add MultiKeyWorkItemCommitRequest to windmill.proto. - Support MultiKey commits in Commit model and StreamingEngineWorkCommitter. - Update GrpcCommitWorkStream to batch and stream MultiKey commit requests.
|
R: @scwhittle |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces support for MultiKey commits in the Dataflow streaming engine. By defining a new MultiKeyWorkItemCommitRequest and updating the commit orchestration logic, the system can now batch multiple work items into a single commit request, significantly improving efficiency and throughput for streaming pipelines. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
There was a problem hiding this comment.
Code Review
This pull request introduces support for multi-key work item commits in the Windmill client, adding the MultiKeyWorkItemCommitRequest proto message and updating the committers, batchers, and gRPC streams to handle multi-key commit batches. The review feedback suggests a valuable performance optimization in GrpcCommitWorkStream.PendingRequest to defer serialization from the committer thread to the stream writer thread by holding the protobuf message directly and using @AutoValue.Memoized for lazy serialization.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| static PendingRequest create( | ||
| String computationId, | ||
| long shardingKey, | ||
| ByteString serializedCommit, | ||
| StreamingCommitRequestChunk.CommitType commitType, | ||
| Consumer<CommitStatus> onDone) { | ||
| return new AutoValue_GrpcCommitWorkStream_PendingRequest( | ||
| computationId, shardingKey, serializedCommit, commitType, onDone); | ||
| } | ||
|
|
||
| abstract String computationId(); | ||
|
|
||
| abstract WorkItemCommitRequest request(); | ||
| abstract long shardingKey(); | ||
|
|
||
| abstract ByteString serializedCommit(); | ||
|
|
||
| abstract StreamingCommitRequestChunk.CommitType commitType(); | ||
|
|
||
| abstract Consumer<CommitStatus> onDone(); | ||
|
|
||
| private long getBytes() { | ||
| return (long) request().getSerializedSize() + computationId().length(); | ||
| } | ||
|
|
||
| private ByteString serializedCommit() { | ||
| return request().toByteString(); | ||
| return (long) serializedCommit().size() + computationId().length(); | ||
| } |
There was a problem hiding this comment.
Performance Optimization: Defer Serialization to the Stream Writer Thread
Currently, PendingRequest eagerly serializes the commit request to a ByteString on the committer thread (inside commitWorkItem and commitMultiKeyWorkItem). Eager serialization on the committer thread can become a performance bottleneck under high throughput.
By holding the MessageLite (or the vendored equivalent org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.MessageLite) directly in PendingRequest and using @AutoValue.Memoized to lazily serialize it, we can:
- Defer the serialization overhead to the stream writer thread (when
serializedCommit()is actually called). - Ensure that serialization is only performed once and cached, even if
serializedCommit()is accessed multiple times.
static PendingRequest create(
String computationId,
long shardingKey,
com.google.protobuf.MessageLite request,
StreamingCommitRequestChunk.CommitType commitType,
Consumer<CommitStatus> onDone) {
return new AutoValue_GrpcCommitWorkStream_PendingRequest(
computationId, shardingKey, request, commitType, onDone);
}
abstract String computationId();
abstract long shardingKey();
abstract com.google.protobuf.MessageLite request();
abstract StreamingCommitRequestChunk.CommitType commitType();
abstract Consumer<CommitStatus> onDone();
private long getBytes() {
return (long) serializedCommit().size() + computationId().length();
}
@AutoValue.Memoized
ByteString serializedCommit() {
return request().toByteString();
}| PendingRequest request = | ||
| PendingRequest.create( | ||
| computation, | ||
| commitRequest.getShardingKey(), | ||
| commitRequest.toByteString(), | ||
| StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_SINGLE_KEY, | ||
| onDone); |
There was a problem hiding this comment.
Pass the commitRequest directly to PendingRequest.create to support lazy serialization.
| PendingRequest request = | |
| PendingRequest.create( | |
| computation, | |
| commitRequest.getShardingKey(), | |
| commitRequest.toByteString(), | |
| StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_SINGLE_KEY, | |
| onDone); | |
| PendingRequest request = | |
| PendingRequest.create( | |
| computation, | |
| commitRequest.getShardingKey(), | |
| commitRequest, | |
| StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_SINGLE_KEY, | |
| onDone); |
| PendingRequest request = | ||
| PendingRequest.create( | ||
| computation, | ||
| // Any key in the batch for routing | ||
| commitRequest.getRequests(0).getShardingKey(), | ||
| commitRequest.toByteString(), | ||
| StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_MULTI_KEY, | ||
| onDone); |
There was a problem hiding this comment.
Pass the commitRequest directly to PendingRequest.create to support lazy serialization.
| PendingRequest request = | |
| PendingRequest.create( | |
| computation, | |
| // Any key in the batch for routing | |
| commitRequest.getRequests(0).getShardingKey(), | |
| commitRequest.toByteString(), | |
| StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_MULTI_KEY, | |
| onDone); | |
| PendingRequest request = | |
| PendingRequest.create( | |
| computation, | |
| // Any key in the batch for routing | |
| commitRequest.getRequests(0).getShardingKey(), | |
| commitRequest, | |
| StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_MULTI_KEY, | |
| onDone); |