[BigQueryIO] Improve sharding for batch pipelines#38783
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request optimizes sharding behavior for bounded PCollections within the BigQuery Storage API write path. By introducing a key-based redistribution strategy for bounded inputs, it ensures that data destined for the same table is grouped effectively, which improves connection efficiency and resource utilization when writing to dynamic destinations. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request updates the sharding logic in StorageApiLoads.java to handle bounded and unbounded PCollections differently when numShards is specified. For bounded collections, it introduces a custom sharding mechanism based on the hash of the destination table's URN. The review feedback points out a critical bug where using Math.abs with modulo can result in a negative shard index if the value is Integer.MIN_VALUE, and suggests using Math.floorMod instead to ensure a non-negative shard index.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
0361f51 to
c3a5d94
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request updates StorageApiLoads to handle sharding differently for bounded and unbounded inputs when numShards > 0. For bounded inputs, it applies a custom sharding key based on the table URN hash and a random integer before redistributing. However, using WithKeys.of to call dynamicDestinations.getTable(...) can fail at runtime if the implementation relies on side inputs. It is recommended to use a custom DoFn with ParDo to properly support side inputs.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
b3ae7eb to
d94b56a
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request updates StorageApiLoads to handle sharding differently for bounded and unbounded PCollections when numShards > 0. For bounded inputs, it introduces a new AddShardKeyFn to assign shard keys based on the table URN hash and a random factor, redistributes by key, and then removes the shard key. The feedback points out that AddShardKeyFn must be declared as a static inner class to prevent serialization issues at runtime, as non-static inner classes maintain an implicit reference to their enclosing class.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
4a26812 to
a8b014c
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request updates StorageApiLoads.java to handle sharding differently for bounded and unbounded inputs. For bounded inputs, it introduces a new AddShardKeyFn to assign shard keys before redistributing. Feedback on this implementation points out that AddShardKeyFn should be static to avoid serialization issues, and that its sharding logic should use a per-bundle random offset rather than a per-element random number to properly group elements by destination.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
a8b014c to
32f0844
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces sharding for bounded inputs in 'StorageApiLoads' when 'numShards > 0' by adding a new 'AddShardKeyFn' to assign shard keys. The review feedback highlights two critical issues in 'AddShardKeyFn': first, using XOR with 'ThreadLocalRandom' leads to non-uniform shard distribution when 'numShards' is not a power of two; second, keying solely by the shard key interleaves multiple destinations, causing connection thrashing downstream. The reviewer recommends using addition instead of XOR and a composite key 'KV<DestT, Integer>' to ensure uniform distribution and efficient connection management.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
32f0844 to
5a118eb
Compare
|
Assigning reviewers: R: @chamikaramj for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| int hash = Hashing.murmur3_32_fixed().hashString(tableUrn, StandardCharsets.UTF_8).asInt(); | ||
|
|
||
| int shardKey = | ||
| Math.floorMod(hash + ThreadLocalRandom.current().nextInt(numShards), numShards); |
There was a problem hiding this comment.
can you add a comment on what you are trying to do?
as is I don't see what the hashing is doing because we add a random # to it that is the same size as what we are modding.
if you want a given table to go to some particular key, then we shouldn't have the randomness. If you want it to go to some subset of keys then it seems like the random nextInt should be less than numShards.
Would be good to have a test verifying the behavior
There was a problem hiding this comment.
thanks Sam, refactored, updated comment and also tested. goal is to have affinity of elements sharing same table plus sharding if we want to increase paralleism per table at the cost of increased connection count.
concurrent connections then is at maximum of number_of_tables * numShards.
| convertMessagesResult.get(successfulConvertedRowsTag); | ||
|
|
||
| if (numShards > 0) { | ||
| if (numShards > 0 && input.isBounded() == PCollection.IsBounded.UNBOUNDED) { |
There was a problem hiding this comment.
I think this should check if the pipeline is in streaming mode or not instead, as streaming pipelines can process bounded pcollections.
There was a problem hiding this comment.
thanks! should be correct now.
5a118eb to
22350a6
Compare
…t connections. this will keep elements for same destination close to each other and shard them. For single table write it's same behaviour, for dynamic destination it will improve reduce amount of connections used
22350a6 to
c4d81ee
Compare
Improve sharding for bounded pcolleciton, to better control concurrent connections. this will keep elements for same destination close to each other and shard them. For single table write it's same behaviour, for dynamic destination it will improve reduce amount of connections used.
this is batch only change, streaming improvement will need some thinking, as this is breaking change.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.