diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 2f16a64b0d76..85754aca9441 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -158,7 +158,7 @@ public interface BigQueryOptions + " mode. This is recommended if your write operation is creating 20+ connections. When using multiplexing, consider tuning " + "the number of connections created by the connection pool with minConnectionPoolConnections and maxConnectionPoolConnections. " + "For more information, see https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management") - @Default.Boolean(false) + @Default.Boolean(true) Boolean getUseStorageApiConnectionPool(); void setUseStorageApiConnectionPool(Boolean value); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index 7c428917503f..2dd278706be8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -35,6 +35,7 @@ import io.grpc.StatusRuntimeException; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -48,6 +49,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -69,12 +71,23 @@ public class CreateTableHelpers { static void createTableWrapper(Callable action, Callable tryCreateTable) throws Exception { BackOff backoff = BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff()); - RuntimeException lastException = null; + Exception lastException = null; do { try { action.call(); return; - } catch (ApiException | StatusRuntimeException e) { + } catch (Exception e) { + // The Storage Write library can wrap errors in UncheckedExecutionException + Optional handledCause = + Throwables.getCausalChain(e).stream() + .filter( + cause -> + (cause instanceof ApiException || cause instanceof StatusRuntimeException)) + .findAny(); + if (!handledCause.isPresent()) { + throw e; + } + lastException = e; // TODO: Once BigQuery reliably returns a consistent error on table not found, we should // only try creating diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 2dfc8b2f1c00..f06aedccd371 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -164,9 +164,6 @@ public StorageApiWriteUnshardedRecords( public PCollectionTuple expand(PCollection> input) { String operationName = input.getName() + "/" + getName(); BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); - org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument( - !options.getUseStorageApiConnectionPool(), - "useStorageApiConnectionPool only supported " + "when using STORAGE_API_AT_LEAST_ONCE"); TupleTagList tupleTagList = TupleTagList.of(failedRowsTag); if (successfulRowsTag != null) { tupleTagList = tupleTagList.and(successfulRowsTag); @@ -1139,7 +1136,7 @@ DestinationState createDestinationState( writeStreamService, useDefaultStream, streamAppendClientCount, - bigQueryOptions.getUseStorageApiConnectionPool(), + useDefaultStream ? bigQueryOptions.getUseStorageApiConnectionPool() : false, bigQueryOptions.getStorageWriteApiMaxRequestSize(), tryCreateTable, useCdc);