From 3c0df3565fe8545e779502846273c6c9c9eb2a5c Mon Sep 17 00:00:00 2001 From: Adam Seering Date: Tue, 24 Feb 2026 17:28:43 +0000 Subject: [PATCH 1/2] Address race condition --- google/cloud/spanner_v1/snapshot.py | 4 ++++ google/cloud/spanner_v1/transaction.py | 6 ++++++ 2 files changed, 10 insertions(+) diff --git a/google/cloud/spanner_v1/snapshot.py b/google/cloud/spanner_v1/snapshot.py index a7abcdaaa3..e83018b587 100644 --- a/google/cloud/spanner_v1/snapshot.py +++ b/google/cloud/spanner_v1/snapshot.py @@ -616,6 +616,10 @@ def _get_streamed_result_set( if self._transaction_id is None: is_inline_begin = True self._lock.acquire() + if self._transaction_id is not None: + is_inline_begin = False + self._lock.release() + request.transaction = TransactionSelector(id=self._transaction_id) iterator = _restart_on_unavailable( method=method, diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 413ac0af1f..ac0486f9fa 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -497,6 +497,9 @@ def execute_update( if self._transaction_id is None: is_inline_begin = True self._lock.acquire() + if self._transaction_id is not None: + is_inline_begin = False + self._lock.release() execute_sql_request = ExecuteSqlRequest( session=session.name, @@ -651,6 +654,9 @@ def batch_update( if self._transaction_id is None: is_inline_begin = True self._lock.acquire() + if self._transaction_id is not None: + is_inline_begin = False + self._lock.release() execute_batch_dml_request = ExecuteBatchDmlRequest( session=session.name, From 24214727ebe39604bc04c400cc3073775bacfa02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 25 Feb 2026 15:50:16 +0100 Subject: [PATCH 2/2] test: do not reset request count for concurrent tests --- tests/unit/test_spanner.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_spanner.py b/tests/unit/test_spanner.py index ecd7d4fd86..d980cd0e4a 100644 --- a/tests/unit/test_spanner.py +++ b/tests/unit/test_spanner.py @@ -333,6 +333,7 @@ def _read_helper( count=0, partition=None, directed_read_options=None, + concurrent=False, ): VALUES = [["bharney", 31], ["phred", 32]] VALUE_PBS = [[_make_value_pb(item) for item in row] for row in VALUES] @@ -359,7 +360,8 @@ def _read_helper( result_sets[i].values.extend(VALUE_PBS[i]) api.streaming_read.return_value = _MockIterator(*result_sets) - transaction._read_request_count = count + if not concurrent: + transaction._read_request_count = count if partition is not None: # 'limit' and 'partition' incompatible result_set = transaction.read( @@ -386,7 +388,8 @@ def _read_helper( directed_read_options=directed_read_options, ) - self.assertEqual(transaction._read_request_count, count + 1) + if not concurrent: + self.assertEqual(transaction._read_request_count, count + 1) self.assertEqual(list(result_set), VALUES) self.assertEqual(result_set.metadata, metadata_pb) @@ -1105,13 +1108,13 @@ def test_transaction_for_concurrent_statement_should_begin_one_transaction_with_ threads.append( threading.Thread( target=self._read_helper, - kwargs={"transaction": transaction, "api": api}, + kwargs={"transaction": transaction, "api": api, "concurrent": True}, ) ) threads.append( threading.Thread( target=self._read_helper, - kwargs={"transaction": transaction, "api": api}, + kwargs={"transaction": transaction, "api": api, "concurrent": True}, ) ) for thread in threads: