From 55272839037511b3363c49022cd457d0739f3f0b Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 28 Apr 2026 19:10:54 -0700 Subject: [PATCH 1/5] enable connection pooling by default --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 2f16a64b0d76..85754aca9441 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -158,7 +158,7 @@ public interface BigQueryOptions + " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning " + "the number of connections created by the connection pool with minConnectionPoolConnections and maxConnectionPoolConnections. " + "For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management") - @Default.Boolean(false) + @Default.Boolean(true) Boolean getUseStorageApiConnectionPool(); void setUseStorageApiConnectionPool(Boolean value); From 571493ca1d47c1a5fd6189463afa879fb9361e02 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 26 May 2026 16:50:40 -0700 Subject: [PATCH 2/5] fix assertion failure --- .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 2dfc8b2f1c00..f06aedccd371 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -164,9 +164,6 @@ public StorageApiWriteUnshardedRecords( public PCollectionTuple expand(PCollection> input) { String operationName = input.getName() + "/" + getName(); BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); - org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument( - !options.getUseStorageApiConnectionPool(), - "useStorageApiConnectionPool only supported " + "when using STORAGE_API_AT_LEAST_ONCE"); TupleTagList tupleTagList = TupleTagList.of(failedRowsTag); if (successfulRowsTag != null) { tupleTagList = tupleTagList.and(successfulRowsTag); @@ -1139,7 +1136,7 @@ DestinationState createDestinationState( writeStreamService, useDefaultStream, streamAppendClientCount, - bigQueryOptions.getUseStorageApiConnectionPool(), + useDefaultStream ? bigQueryOptions.getUseStorageApiConnectionPool() : false, bigQueryOptions.getStorageWriteApiMaxRequestSize(), tryCreateTable, useCdc); From 19c419f158e9d5ce6ca3b5afb39da96bd16ed56c Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 2 Jun 2026 16:43:38 -0700 Subject: [PATCH 3/5] fix exception handling --- .../beam/sdk/io/gcp/bigquery/CreateTableHelpers.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index 7c428917503f..41c617ef2088 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -32,6 +32,7 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.grpc.StatusRuntimeException; import java.util.Collections; import java.util.Map; @@ -74,7 +75,14 @@ static void createTableWrapper(Callable action, Callable tryCreat try { action.call(); return; - } catch (ApiException | StatusRuntimeException e) { + } catch (ApiException | StatusRuntimeException | UncheckedExecutionException e) { + // The Storage Write library can wrap errors in UncheckedExecutionException + if (e instanceof UncheckedExecutionException) { + Throwable cause = e.getCause(); + if (!(cause instanceof ApiException || cause instanceof StatusRuntimeException)) { + throw e; + } + } lastException = e; // TODO: Once BigQuery reliably returns a consistent error on table not found, we should // only try creating From 736608cdfddc6dc8be9bd73be3487b8d25ba16d0 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 2 Jun 2026 17:06:09 -0700 Subject: [PATCH 4/5] generalize --- .../io/gcp/bigquery/CreateTableHelpers.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index 41c617ef2088..837a820798b5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -32,10 +32,11 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; -import com.google.common.util.concurrent.UncheckedExecutionException; +import com.google.common.base.Throwables; import io.grpc.StatusRuntimeException; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -70,19 +71,23 @@ public class CreateTableHelpers { static void createTableWrapper(Callable action, Callable tryCreateTable) throws Exception { BackOff backoff = BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff()); - RuntimeException lastException = null; + Exception lastException = null; do { try { action.call(); return; - } catch (ApiException | StatusRuntimeException | UncheckedExecutionException e) { + } catch (Exception e) { // The Storage Write library can wrap errors in UncheckedExecutionException - if (e instanceof UncheckedExecutionException) { - Throwable cause = e.getCause(); - if (!(cause instanceof ApiException || cause instanceof StatusRuntimeException)) { - throw e; - } + Optional handledCause = + Throwables.getCausalChain(e).stream() + .filter( + cause -> + (cause instanceof ApiException || cause instanceof StatusRuntimeException)) + .findAny(); + if (!handledCause.isPresent()) { + throw e; } + lastException = e; // TODO: Once BigQuery reliably returns a consistent error on table not found, we should // only try creating From 947f14e46d47543f537ede00eebf3516a5a8199b Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 2 Jun 2026 18:47:31 -0700 Subject: [PATCH 5/5] fix import --- .../org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index 837a820798b5..2dd278706be8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -32,7 +32,6 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; -import com.google.common.base.Throwables; import io.grpc.StatusRuntimeException; import java.util.Collections; import java.util.Map; @@ -50,6 +49,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration;