From 5226c7fe1952790f00bf8f78f0f7f261321f5324 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 20 Feb 2026 13:31:11 +0100 Subject: [PATCH 1/2] fix: retry CreateSession also when waitForMinSessions is zero The creation of a session at startup would only be retried if waitForMinSessions was set. This changed introduces retries for CreateSession at startup if the error code is potentially transient. The number of retries is set to maximum 10. --- .../MultiplexedSessionDatabaseClient.java | 18 +- .../spanner/ExcludeFromChangeStreamTest.java | 299 ++++++++++++++++++ ...edSessionDatabaseClientMockServerTest.java | 31 ++ 3 files changed, 345 insertions(+), 3 deletions(-) create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/ExcludeFromChangeStreamTest.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 867786be84..6162d73f90 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -226,14 +226,18 @@ public void close() { final SettableApiFuture initialSessionReferenceFuture = SettableApiFuture.create(); this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture); - asyncCreateMultiplexedSession(initialSessionReferenceFuture); + + Duration waitDuration = + sessionClient.getSpanner().getOptions().getSessionPoolOptions().getWaitForMinSessions(); + int initialAttempts = waitDuration == null || waitDuration.isZero() ? 10 : 1; + asyncCreateMultiplexedSession(initialSessionReferenceFuture, initialAttempts); maybeWaitForSessionCreation( sessionClient.getSpanner().getOptions().getSessionPoolOptions(), initialSessionReferenceFuture); } private void asyncCreateMultiplexedSession( - SettableApiFuture sessionReferenceFuture) { + SettableApiFuture sessionReferenceFuture, int remainingAttempts) { this.sessionClient.asyncCreateMultiplexedSession( new SessionConsumer() { @Override @@ -263,7 +267,15 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount MultiplexedSessionDatabaseClient.this.resourceNotFoundException.set( (ResourceNotFoundException) spannerException); } + // Set the exception to trigger an error for all waiters. + // Then retry the session creation if the error is (potentially) transient. sessionReferenceFuture.setException(t); + if (remainingAttempts > 1 + && RETRYABLE_ERROR_CODES.contains(spannerException.getErrorCode())) { + final SettableApiFuture future = SettableApiFuture.create(); + MultiplexedSessionDatabaseClient.this.multiplexedSessionReference.set(future); + asyncCreateMultiplexedSession(future, remainingAttempts - 1); + } } }); } @@ -283,7 +295,7 @@ private void maybeWaitForSessionCreation( // If any exception is thrown, then retry the multiplexed session creation if (sessionReferenceFuture == null) { sessionReferenceFuture = SettableApiFuture.create(); - asyncCreateMultiplexedSession(sessionReferenceFuture); + asyncCreateMultiplexedSession(sessionReferenceFuture, 1); this.multiplexedSessionReference.set(sessionReferenceFuture); } try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ExcludeFromChangeStreamTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ExcludeFromChangeStreamTest.java new file mode 100644 index 0000000000..498e2fab10 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ExcludeFromChangeStreamTest.java @@ -0,0 +1,299 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; +import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.connection.AbstractMockServerTest; +import com.google.cloud.spanner.connection.RandomResultSetGenerator; +import com.google.common.collect.ImmutableList; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.ReadRequest; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ExcludeFromChangeStreamTest extends AbstractMockServerTest { + + @BeforeClass + public static void setupReadResult() { + RandomResultSetGenerator generator = new RandomResultSetGenerator(10); + mockSpanner.putStatementResult( + StatementResult.query( + Statement.of("SELECT my-column FROM my-table WHERE 1=1"), generator.generate())); + } + + private Spanner createSpanner() { + return SpannerOptions.newBuilder() + .setProjectId("fake-project") + .setHost("http://localhost:" + getPort()) + .setCredentials(NoCredentials.getInstance()) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build() + .getService(); + } + + @Test + public void testStandardTransaction() { + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + + ReadRequest readRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(readRequest.hasTransaction()); + assertTrue(readRequest.getTransaction().hasBegin()); + assertTrue(readRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(readRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0); + assertNotNull(commitRequest.getTransactionId()); + + mockSpanner.clearRequests(); + } + } + } + + @Test + public void testTransactionAbortedDuringRead() { + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + AtomicBoolean hasAborted = new AtomicBoolean(false); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + if (hasAborted.compareAndSet(false, true)) { + mockSpanner.abortNextStatement(); + } + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(1, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + + BeginTransactionRequest beginRequest = + mockSpanner.getRequestsOfType(BeginTransactionRequest.class).get(0); + assertTrue(beginRequest.getOptions().hasReadWrite()); + assertTrue(beginRequest.getOptions().getExcludeTxnFromChangeStreams()); + + ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(firstReadRequest.hasTransaction()); + assertTrue(firstReadRequest.getTransaction().hasBegin()); + assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1); + assertTrue(secondReadRequest.hasTransaction()); + assertTrue(secondReadRequest.getTransaction().hasId()); + + CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0); + assertNotNull(commitRequest.getTransactionId()); + + mockSpanner.clearRequests(); + } + } + } + + @Test + public void testTransactionAbortedDuringCommit() { + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + AtomicBoolean hasAborted = new AtomicBoolean(false); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + if (hasAborted.compareAndSet(false, true)) { + mockSpanner.abortNextStatement(); + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(CommitRequest.class)); + + ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(firstReadRequest.hasTransaction()); + assertTrue(firstReadRequest.getTransaction().hasBegin()); + assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1); + assertTrue(secondReadRequest.hasTransaction()); + assertTrue(secondReadRequest.getTransaction().hasBegin()); + assertTrue(secondReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(secondReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + for (CommitRequest commitRequest : mockSpanner.getRequestsOfType(CommitRequest.class)) { + assertNotNull(commitRequest.getTransactionId()); + } + mockSpanner.clearRequests(); + } + } + } + + @Test + public void testReadReturnsUnavailable() { + + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException())); + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + + ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(firstReadRequest.hasTransaction()); + assertTrue(firstReadRequest.getTransaction().hasBegin()); + assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1); + assertTrue(secondReadRequest.hasTransaction()); + assertTrue(secondReadRequest.getTransaction().hasBegin()); + assertTrue(secondReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(secondReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0); + assertNotNull(commitRequest.getTransactionId()); + + mockSpanner.clearRequests(); + } + } + } + + @Test + public void testReadReturnsUnavailableHalfway() { + try (Spanner spanner = createSpanner()) { + for (int i = 0; i < 10; i++) { + mockSpanner.setStreamingReadExecutionTime( + SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 2)); + + DatabaseClient client = + spanner.getDatabaseClient( + DatabaseId.of("fake-project", "fake-instance", "fake-database")); + client + .readWriteTransaction(Options.tag("some-tag"), Options.excludeTxnFromChangeStreams()) + .run( + transaction -> { + try (ResultSet resultSet = + transaction.read("my-table", KeySet.all(), ImmutableList.of("my-column"))) { + while (resultSet.next()) {} + } + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("my-table") + .set("my-column") + .to(1L) + .build()); + return null; + }); + assertEquals(0, mockSpanner.countRequestsOfType(BeginTransactionRequest.class)); + assertEquals(2, mockSpanner.countRequestsOfType(ReadRequest.class)); + assertEquals(1, mockSpanner.countRequestsOfType(CommitRequest.class)); + + ReadRequest firstReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(0); + assertTrue(firstReadRequest.hasTransaction()); + assertTrue(firstReadRequest.getTransaction().hasBegin()); + assertTrue(firstReadRequest.getTransaction().getBegin().hasReadWrite()); + assertTrue(firstReadRequest.getTransaction().getBegin().getExcludeTxnFromChangeStreams()); + + ReadRequest secondReadRequest = mockSpanner.getRequestsOfType(ReadRequest.class).get(1); + assertTrue(secondReadRequest.hasTransaction()); + assertTrue(secondReadRequest.getTransaction().hasId()); + + CommitRequest commitRequest = mockSpanner.getRequestsOfType(CommitRequest.class).get(0); + assertNotNull(commitRequest.getTransactionId()); + + mockSpanner.clearRequests(); + } + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 720b088f14..e0c99fb16c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -101,6 +101,37 @@ public void createSpannerInstance() { .getService(); } + @Test + public void testCreateSessionDeadlineExceeded() { + // Simulate a problem with the CreateSession RPC making it slow. + mockSpanner.setCreateSessionExecutionTime( + SimulatedExecutionTime.ofException(Status.DEADLINE_EXCEEDED.asRuntimeException())); + + Spanner testSpanner = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .build() + .getService(); + DatabaseClient client = testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + // The first attempt should lead to a DEADLINE_EXCEEDED error being propagated from the + // CreateSession attempt. + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + SpannerException exception = assertThrows(SpannerException.class, resultSet::next); + assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode()); + } + + // Remove the simulated problem on the mock server. + // The next attempt should then succeed. + mockSpanner.removeAllExecutionTimes(); + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) {} + } + } + @Test public void testMultiUseReadOnlyTransactionUsesSameSession() { // Execute two queries using the same transaction. Both queries should use the same From b3742d985f54da2d008452e175be431e1e9d0c36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Fri, 20 Feb 2026 14:05:48 +0100 Subject: [PATCH 2/2] chore: address review comment + fix test --- .../spanner/MultiplexedSessionDatabaseClient.java | 12 +++++++++++- ...tiplexedSessionDatabaseClientMockServerTest.java | 13 +++++++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 6162d73f90..1021e1c469 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -53,6 +53,15 @@ * transactions. */ final class MultiplexedSessionDatabaseClient extends AbstractMultiplexedSessionDatabaseClient { + /** + * The maximum number of attempts that the client will try to execute CreateSession for the + * initial multiplexed session. This value is only used for the very first multiplexed session + * that is created, and it is only used if the application has not set a waitForMinSessions value. + * If waitForMinSessions has been set, then the client will retry until the duration in + * waitForMinSessions has been reached. + */ + private static final int MAX_INITIAL_CREATE_SESSION_ATTEMPTS = 10; + @VisibleForTesting static final Statement DETERMINE_DIALECT_STATEMENT = Statement.newBuilder( @@ -229,7 +238,8 @@ public void close() { Duration waitDuration = sessionClient.getSpanner().getOptions().getSessionPoolOptions().getWaitForMinSessions(); - int initialAttempts = waitDuration == null || waitDuration.isZero() ? 10 : 1; + int initialAttempts = + waitDuration == null || waitDuration.isZero() ? MAX_INITIAL_CREATE_SESSION_ATTEMPTS : 1; asyncCreateMultiplexedSession(initialSessionReferenceFuture, initialAttempts); maybeWaitForSessionCreation( sessionClient.getSpanner().getOptions().getSessionPoolOptions(), diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index e0c99fb16c..629b561186 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -377,12 +377,21 @@ public void testRetryWithNoSessionCreationWaitTime() { }); assertEquals(ErrorCode.DEADLINE_EXCEEDED, spannerException.getErrorCode()); + // The CreateSession RPC will be retried, and as the exception is removed by the first call, + // the second attempt will succeed. + try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + List createSessionRequests = mockSpanner.getRequestsOfType(CreateSessionRequest.class); - assertEquals(1, createSessionRequests.size()); + assertEquals(2, createSessionRequests.size()); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); - assertEquals(0, requests.size()); + assertEquals(1, requests.size()); testSpanner.close(); }