From 400b41d0c679f5b7e760a5f1b86e6800c8996638 Mon Sep 17 00:00:00 2001 From: Akshat Sharma Date: Tue, 2 Jun 2026 21:50:48 +0530 Subject: [PATCH 1/3] Fix BigQuery Storage Write API stream count for bounded writes --- .../BigQueryStorageWriteApiSchemaTransformProvider.java | 9 +++++---- .../bigquery/providers/BigQueryWriteConfiguration.java | 3 +-- sdks/python/apache_beam/io/gcp/bigquery.py | 3 +-- .../en/documentation/io/built-in/google-bigquery.md | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index e3d947235015..478245dfed31 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -179,11 +179,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection inputRows = input.getSinglePCollection(); BigQueryIO.Write write = createStorageWriteApiTransform(inputRows.getSchema()); + int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams(); if (inputRows.isBounded() == IsBounded.UNBOUNDED) { Long triggeringFrequency = configuration.getTriggeringFrequencySeconds(); Boolean autoSharding = configuration.getAutoSharding(); - int numStreams = configuration.getNumStreams() == null ? 0 : configuration.getNumStreams(); boolean useAtLeastOnceSemantics = configuration.getUseAtLeastOnceSemantics() != null @@ -197,12 +197,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { : Duration.standardSeconds(triggeringFrequency)); } // set num streams if specified, otherwise default to autoSharding - if (numStreams > 0) { - write = write.withNumStorageWriteApiStreams(numStreams); - } else if (autoSharding == null || autoSharding) { + if (numStreams <= 0 && (autoSharding == null || autoSharding)) { write = write.withAutoSharding(); } } + if (numStreams > 0) { + write = write.withNumStorageWriteApiStreams(numStreams); + } Schema inputSchema = inputRows.getSchema(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index 55d7f7c8d72a..77b07adc31d6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -152,8 +152,7 @@ public static Builder builder() { public abstract Boolean getAutoSharding(); @SchemaFieldDescription( - "Specifies the number of write streams that the Storage API sink will use. " - + "This parameter is only applicable when writing unbounded data.") + "Specifies the number of write streams that the Storage API sink will use.") @Nullable public abstract Integer getNumStreams(); diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index d751d60c905f..a2d17f12569e 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2155,8 +2155,7 @@ def __init__( all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only applicable to unbounded input. num_storage_api_streams: Specifies the number of write streams that the - Storage API sink will use. This parameter is only applicable when - writing unbounded data. + Storage API sink will use. ignore_unknown_columns: Accept rows that contain values that do not match the schema. The unknown values are ignored. Default is False, which treats unknown values as errors. This option is only valid for diff --git a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md index 9c205f092663..00cd1ffbcdef 100644 --- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md +++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md @@ -855,7 +855,7 @@ pipeline uses. You can set it explicitly on the transform via [`withNumStorageWriteApiStreams`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withNumStorageWriteApiStreams-int-) or provide the `numStorageWriteApiStreams` option to the pipeline as defined in [`BigQueryOptions`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html). -Please note this is only supported for streaming pipelines. +Fixed stream counts can be used with both batch and streaming pipelines. Triggering frequency determines how soon the data is visible for querying in BigQuery. You can explicitly set it via From 07868d187837737326a00eb4b44b3ce16db27c4c Mon Sep 17 00:00:00 2001 From: Akshat Sharma Date: Tue, 2 Jun 2026 22:53:01 +0530 Subject: [PATCH 2/3] Address numStreams validation feedback --- .../BigQueryStorageWriteApiSchemaTransformProvider.java | 2 +- .../gcp/bigquery/providers/BigQueryWriteConfiguration.java | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index 478245dfed31..1d618ba685ed 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -197,7 +197,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { : Duration.standardSeconds(triggeringFrequency)); } // set num streams if specified, otherwise default to autoSharding - if (numStreams <= 0 && (autoSharding == null || autoSharding)) { + if (numStreams == 0 && (autoSharding == null || autoSharding)) { write = write.withAutoSharding(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index 77b07adc31d6..7f09feb245cd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -101,6 +101,12 @@ public void validate() { Boolean autoSharding = getAutoSharding(); Integer numStreams = getNumStreams(); + if (numStreams != null) { + checkArgument( + numStreams >= 0, + invalidConfigMessage + "numStreams must be non-negative, but was: %s", + numStreams); + } if (autoSharding != null && autoSharding && numStreams != null) { checkArgument( numStreams == 0, From d90ef1c07729c3a466a4189060cdfdb880049d48 Mon Sep 17 00:00:00 2001 From: Akshat Sharma Date: Wed, 3 Jun 2026 15:41:37 +0530 Subject: [PATCH 3/3] test: cover negative BigQuery numStreams validation --- .../BigQueryStorageWriteApiSchemaTransformProviderTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 584309778286..81789f784255 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -122,7 +122,10 @@ public void testInvalidConfig() { Arrays.asList( BigQueryWriteConfiguration.builder() .setTable("project:dataset.table") - .setCreateDisposition("INVALID_DISPOSITION")); + .setCreateDisposition("INVALID_DISPOSITION"), + BigQueryWriteConfiguration.builder() + .setTable("project:dataset.table") + .setNumStreams(-1)); for (BigQueryWriteConfiguration.Builder config : invalidConfigs) { assertThrows(