From 519f489ab2002f43c3a428a97cafe9097f94d212 Mon Sep 17 00:00:00 2001 From: Darshan Siddesh Jagaluru Date: Wed, 3 Jun 2026 06:43:19 +0000 Subject: [PATCH 1/3] [SpannerToSourceDb] Fixing flaky UT and also improving coverage --- .../dbutils/dml/CassandraDMLGenerator.java | 5 +- .../dbutils/dml/DMLGeneratorUtils.java | 5 +- .../dml/CassandraDMLGeneratorTest.java | 171 ++++++++++++++++++ .../dbutils/dml/CassandraTypeHandlerTest.java | 142 +++++++++++++++ .../dbutils/dml/DMLGeneratorUtilsTest.java | 109 +++++++++++ .../processor/SourceProcessorFactoryTest.java | 20 ++ .../transforms/SourceWriterFnTest.java | 61 ------- .../utils/ShadowTableCreatorTest.java | 21 +++ 8 files changed, 471 insertions(+), 63 deletions(-) diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java index 151a03d1f9..457c46ca6f 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGenerator.java @@ -432,7 +432,10 @@ static Map> getPkColumnValues( try { spannerColName = schemaMapper.getSpannerColumnName("", sourceTable.name(), sourceColName); } catch (NoSuchElementException e) { - continue; + LOG.warn( + "The corresponding spanner column for {} was not found in schema mapping", + sourceColName); + return null; } if (spannerColName == null || spannerColName == "") { LOG.warn( diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtils.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtils.java index c040eee771..1f9a55df98 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtils.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtils.java @@ -172,7 +172,10 @@ public static Map getPkColumnValues( try { spannerColName = schemaMapper.getSpannerColumnName("", sourceTable.name(), sourceColName); } catch (NoSuchElementException e) { - continue; + LOG.warn( + "The corresponding spanner column for {} was not found in schema mapping", + sourceColName); + return null; } if (spannerColName == null) { LOG.warn( diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java index d7e427454b..ee44afd6f9 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraDMLGeneratorTest.java @@ -1231,4 +1231,175 @@ public void testGetPkColumnValues_MissingValueInJson() { assertNull(response); } + + @Test + public void testGetDMLStatement_PkColSpannerColNameThrowsNoSuchElement() { + ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class); + Ddl spannerDdl = Ddl.builder().build(); + spannerDdl = Mockito.spy(spannerDdl); + Table spannerTable = Mockito.mock(Table.class); + SourceSchema sourceSchema = Mockito.mock(SourceSchema.class); + SourceTable sourceTable = Mockito.mock(SourceTable.class); + SourceColumn sourceCol = Mockito.mock(SourceColumn.class); + + Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers"); + Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers"); + Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable); + Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId")); + Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol); + Mockito.when(sourceCol.type()).thenReturn("int"); + Mockito.when(sourceTable.name()).thenReturn("Singers"); + + Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId")) + .thenThrow(new NoSuchElementException("Not found")); + + DMLGeneratorRequest request = + new DMLGeneratorRequest.Builder( + "INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00") + .setSchemaMapper(schemaMapper) + .setDdl(spannerDdl) + .setSourceSchema(sourceSchema) + .setCommitTimestamp(Timestamp.now()) + .build(); + + assertThrows( + InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request)); + } + + @Test + public void testGetDMLStatement_PkColSpannerColNameNull() { + ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class); + Ddl spannerDdl = Ddl.builder().build(); + spannerDdl = Mockito.spy(spannerDdl); + Table spannerTable = Mockito.mock(Table.class); + SourceSchema sourceSchema = Mockito.mock(SourceSchema.class); + SourceTable sourceTable = Mockito.mock(SourceTable.class); + SourceColumn sourceCol = Mockito.mock(SourceColumn.class); + + Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers"); + Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers"); + Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable); + Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId")); + Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol); + Mockito.when(sourceCol.type()).thenReturn("int"); + Mockito.when(sourceTable.name()).thenReturn("Singers"); + + Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId")).thenReturn(null); + + DMLGeneratorRequest request = + new DMLGeneratorRequest.Builder( + "INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00") + .setSchemaMapper(schemaMapper) + .setDdl(spannerDdl) + .setSourceSchema(sourceSchema) + .setCommitTimestamp(Timestamp.now()) + .build(); + + assertThrows( + InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request)); + } + + @Test + public void testGetDMLStatement_PkColSpannerColDefNull() { + ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class); + Ddl spannerDdl = Ddl.builder().build(); + spannerDdl = Mockito.spy(spannerDdl); + Table spannerTable = Mockito.mock(Table.class); + SourceSchema sourceSchema = Mockito.mock(SourceSchema.class); + SourceTable sourceTable = Mockito.mock(SourceTable.class); + SourceColumn sourceCol = Mockito.mock(SourceColumn.class); + + Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers"); + Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers"); + Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable); + Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId")); + Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol); + Mockito.when(sourceCol.type()).thenReturn("int"); + Mockito.when(sourceTable.name()).thenReturn("Singers"); + + Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId")) + .thenReturn("SingerId"); + Mockito.when(spannerTable.column("SingerId")).thenReturn(null); + + DMLGeneratorRequest request = + new DMLGeneratorRequest.Builder( + "INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00") + .setSchemaMapper(schemaMapper) + .setDdl(spannerDdl) + .setSourceSchema(sourceSchema) + .setCommitTimestamp(Timestamp.now()) + .build(); + + assertThrows( + InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request)); + } + + @Test + public void testGetDMLStatement_ColValuesSpannerColDefNullAndKeyValuesHasNonPk() { + ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class); + Table spannerTable = Mockito.mock(Table.class); + SourceSchema sourceSchema = Mockito.mock(SourceSchema.class); + SourceTable sourceTable = Mockito.mock(SourceTable.class); + SourceColumn pkCol = Mockito.mock(SourceColumn.class); + SourceColumn nonPkCol = Mockito.mock(SourceColumn.class); + SourceColumn missingCol = Mockito.mock(SourceColumn.class); + Column spannerPkCol = Mockito.mock(Column.class); + Column spannerNonPkCol = Mockito.mock(Column.class); + + Ddl spannerDdl = Ddl.builder().build(); + spannerDdl = Mockito.spy(spannerDdl); + Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers"); + + Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers"); + Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable); + Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId")); + Mockito.when(sourceTable.column("SingerId")).thenReturn(pkCol); + Mockito.when(pkCol.type()).thenReturn("int"); + Mockito.when(sourceTable.name()).thenReturn("Singers"); + + Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId")) + .thenReturn("SingerId"); + Mockito.when(spannerTable.column("SingerId")).thenReturn(spannerPkCol); + Mockito.when(spannerPkCol.name()).thenReturn("SingerId"); + Mockito.when(spannerPkCol.type()).thenReturn(Type.int64()); + + Mockito.when(sourceTable.columns()).thenReturn(ImmutableList.of(pkCol, nonPkCol, missingCol)); + Mockito.when(nonPkCol.name()).thenReturn("LastName"); + Mockito.when(nonPkCol.type()).thenReturn("varchar"); + Mockito.when(missingCol.name()).thenReturn("MissingCol"); + Mockito.when(missingCol.type()).thenReturn("varchar"); + + Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "LastName")) + .thenReturn("LastName"); + Mockito.when(spannerTable.column("LastName")).thenReturn(spannerNonPkCol); + Mockito.when(spannerNonPkCol.name()).thenReturn("LastName"); + Mockito.when(spannerNonPkCol.type()).thenReturn(Type.string()); + + Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "MissingCol")) + .thenReturn("MissingCol"); + Mockito.when(spannerTable.column("MissingCol")).thenReturn(null); + + JSONObject keyValuesJson = new JSONObject(); + keyValuesJson.put("SingerId", "999"); + keyValuesJson.put("LastName", "Smith"); + + DMLGeneratorRequest request = + new DMLGeneratorRequest.Builder( + "INSERT", "Singers", new JSONObject(), keyValuesJson, "+00:00") + .setSchemaMapper(schemaMapper) + .setDdl(spannerDdl) + .setSourceSchema(sourceSchema) + .setCommitTimestamp(Timestamp.now()) + .build(); + + DMLGeneratorResponse response = cassandraDMLGenerator.getDMLStatement(request); + + assertTrue(response instanceof PreparedStatementGeneratedResponse); + PreparedStatementGeneratedResponse prepResponse = (PreparedStatementGeneratedResponse) response; + + assertTrue(prepResponse.getDmlStatement().contains("\"LastName\"")); + assertTrue(!prepResponse.getDmlStatement().contains("\"MissingCol\"")); + + assertEquals(3, prepResponse.getValues().size()); + } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandlerTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandlerTest.java index 48b699de6c..9423cdf84b 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandlerTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/CassandraTypeHandlerTest.java @@ -1803,4 +1803,146 @@ public void testParseAndCastToCassandraType_Map_JSONObject() { assertEquals(1, map.size()); assertEquals("John", map.get("name")); } + + @Test + public void testNullClassToString() { + assertEquals("NULL_CLASS", CassandraTypeHandler.NullClass.INSTANCE.toString()); + } + + @Test + public void testGetColumnValueByTypeForInvalidDuration() { + String columnValue = "invalid_duration"; + String columnName = "total_time"; + Ddl ddl = + Ddl.builder() + .createTable(TEST_TABLE) + .column(columnName) + .string() + .max() + .endColumn() + .endTable() + .build(); + Column spannerCol = ddl.table(TEST_TABLE).column(columnName); + SourceColumn sourceCol = + SourceColumn.builder(SourceDatabaseType.CASSANDRA) + .name(columnName) + .type("duration") + .build(); + + JSONObject valuesJson = new JSONObject(); + valuesJson.put(columnName, columnValue); + + PreparedStatementValueObject result = + getColumnValueByType(spannerCol, sourceCol, valuesJson, null); + assertThrows( + IllegalArgumentException.class, + () -> CassandraTypeHandler.castToExpectedType(result.dataType(), result.value())); + } + + @Test + public void testGetColumnValueByTypeForInvalidIpAddress() { + String columnValue = "invalid_ip"; + String columnName = "ipAddress"; + Ddl ddl = + Ddl.builder() + .createTable(TEST_TABLE) + .column(columnName) + .string() + .max() + .endColumn() + .endTable() + .build(); + Column spannerCol = ddl.table(TEST_TABLE).column(columnName); + SourceColumn sourceCol = + SourceColumn.builder(SourceDatabaseType.CASSANDRA).name(columnName).type("inet").build(); + + JSONObject valuesJson = new JSONObject(); + valuesJson.put(columnName, columnValue); + + PreparedStatementValueObject result = + getColumnValueByType(spannerCol, sourceCol, valuesJson, null); + assertThrows( + IllegalArgumentException.class, + () -> CassandraTypeHandler.castToExpectedType(result.dataType(), result.value())); + } + + @Test + public void testGetColumnValueByTypeForEmptyBytes() { + String columnName = "data"; + String columnValue = ""; + Ddl ddl = + Ddl.builder() + .createTable(TEST_TABLE) + .column(columnName) + .bytes() + .max() + .endColumn() + .endTable() + .build(); + Column spannerCol = ddl.table(TEST_TABLE).column(columnName); + SourceColumn sourceCol = + SourceColumn.builder(SourceDatabaseType.CASSANDRA).name(columnName).type("blob").build(); + + JSONObject valuesJson = new JSONObject(); + valuesJson.put(columnName, columnValue); + + PreparedStatementValueObject result = + getColumnValueByType(spannerCol, sourceCol, valuesJson, null); + assertEquals("blob", result.dataType()); + assertEquals(CassandraTypeHandler.NullClass.INSTANCE, result.value()); + } + + @Test + public void testGetColumnValueByType_InvalidHexBytesException() { + String columnName = "data"; + String columnValue = "invalid_hex_$$"; + Ddl ddl = + Ddl.builder() + .createTable(TEST_TABLE) + .column(columnName) + .bytes() + .max() + .endColumn() + .endTable() + .build(); + Column spannerCol = ddl.table(TEST_TABLE).column(columnName); + SourceColumn sourceCol = + SourceColumn.builder(SourceDatabaseType.CASSANDRA).name(columnName).type("blob").build(); + + JSONObject valuesJson = new JSONObject(); + valuesJson.put(columnName, columnValue); + + assertThrows( + IllegalArgumentException.class, + () -> getColumnValueByType(spannerCol, sourceCol, valuesJson, null)); + } + + @Test + public void testGetColumnValueByType_InvalidTimestampException() { + String columnName = "created_on"; + String columnValue = "invalid_timestamp"; + Ddl ddl = + Ddl.builder() + .createTable(TEST_TABLE) + .column(columnName) + .date() + .endColumn() + .endTable() + .build(); + Column spannerCol = ddl.table(TEST_TABLE).column(columnName); + SourceColumn sourceCol = + SourceColumn.builder(SourceDatabaseType.CASSANDRA) + .name(columnName) + .type("timestamp") + .build(); + + JSONObject valuesJson = new JSONObject(); + valuesJson.put(columnName, columnValue); + + PreparedStatementValueObject result = + getColumnValueByType(spannerCol, sourceCol, valuesJson, null); + assertThrows( + IllegalArgumentException.class, + () -> CassandraTypeHandler.castToExpectedType(result.dataType(), result.value())); + } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtilsTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtilsTest.java index f1e2dca473..1ceb153d8d 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtilsTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dml/DMLGeneratorUtilsTest.java @@ -146,4 +146,113 @@ public void testGetPkColumnValues_HappyPath() { assertEquals(1, response.size()); assertEquals("mapped_val", response.get("col1")); } + + @Test + public void testGetPkColumnValues_SourceColDefNull() { + ISchemaMapper schemaMapper = mock(ISchemaMapper.class); + Table spannerTable = mock(Table.class); + SourceTable sourceTable = mock(SourceTable.class); + + when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("col1")); + when(sourceTable.column("col1")).thenReturn(null); + + Map response = + DMLGeneratorUtils.getPkColumnValues( + schemaMapper, + spannerTable, + sourceTable, + new JSONObject(), + new JSONObject(), + "+00:00", + null, + null); + + assertNull(response); + } + + @Test + public void testGetPkColumnValues_SpannerColNameThrowsNoSuchElement() { + ISchemaMapper schemaMapper = mock(ISchemaMapper.class); + Table spannerTable = mock(Table.class); + SourceTable sourceTable = mock(SourceTable.class); + SourceColumn sourceCol = mock(SourceColumn.class); + + when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("col1")); + when(sourceTable.column("col1")).thenReturn(sourceCol); + when(sourceCol.isGenerated()).thenReturn(false); + when(sourceTable.name()).thenReturn("src_table"); + + when(schemaMapper.getSpannerColumnName("", "src_table", "col1")) + .thenThrow(new java.util.NoSuchElementException("Not found")); + + Map response = + DMLGeneratorUtils.getPkColumnValues( + schemaMapper, + spannerTable, + sourceTable, + new JSONObject(), + new JSONObject(), + "+00:00", + null, + null); + + assertNull(response); + } + + @Test + public void testGetPkColumnValues_SpannerColNameNull() { + ISchemaMapper schemaMapper = mock(ISchemaMapper.class); + Table spannerTable = mock(Table.class); + SourceTable sourceTable = mock(SourceTable.class); + SourceColumn sourceCol = mock(SourceColumn.class); + + when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("col1")); + when(sourceTable.column("col1")).thenReturn(sourceCol); + when(sourceCol.isGenerated()).thenReturn(false); + when(sourceTable.name()).thenReturn("src_table"); + + when(schemaMapper.getSpannerColumnName("", "src_table", "col1")).thenReturn(null); + + Map response = + DMLGeneratorUtils.getPkColumnValues( + schemaMapper, + spannerTable, + sourceTable, + new JSONObject(), + new JSONObject(), + "+00:00", + null, + null); + + assertNull(response); + } + + @Test + public void testGetPkColumnValues_SpannerColDefNull() { + ISchemaMapper schemaMapper = mock(ISchemaMapper.class); + Table spannerTable = mock(Table.class); + SourceTable sourceTable = mock(SourceTable.class); + SourceColumn sourceCol = mock(SourceColumn.class); + + when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("col1")); + when(sourceTable.column("col1")).thenReturn(sourceCol); + when(sourceCol.isGenerated()).thenReturn(false); + when(sourceTable.name()).thenReturn("src_table"); + + when(schemaMapper.getSpannerColumnName("", "src_table", "col1")).thenReturn("spanner_col1"); + when(spannerTable.column("spanner_col1")).thenReturn(null); + + Map response = + DMLGeneratorUtils.getPkColumnValues( + schemaMapper, + spannerTable, + sourceTable, + new JSONObject(), + new JSONObject(), + "+00:00", + null, + null); + + assertNull(response); + } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java index c101b3d710..4e6f4e6415 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java @@ -18,6 +18,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; +import com.google.cloud.teleport.v2.spanner.migrations.connection.IConnectionHelper; import com.google.cloud.teleport.v2.spanner.migrations.connection.JdbcConnectionHelper; import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard; import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; @@ -29,9 +30,12 @@ import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator; import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; +import org.junit.After; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -39,6 +43,22 @@ @RunWith(JUnit4.class) public class SourceProcessorFactoryTest { + + private static Map originalConnectionHelperMap; + + @BeforeClass + public static void setUpBeforeClass() { + originalConnectionHelperMap = new HashMap<>(); + originalConnectionHelperMap.put(Constants.SOURCE_MYSQL, new JdbcConnectionHelper()); + originalConnectionHelperMap.put(Constants.SOURCE_CASSANDRA, new CassandraConnectionHelper()); + originalConnectionHelperMap.put(Constants.SOURCE_POSTGRESQL, new JdbcConnectionHelper()); + } + + @After + public void tearDown() { + SourceProcessorFactory.setConnectionHelperMap(originalConnectionHelperMap); + } + @Test public void testCreateSourceProcessor_validSource() throws Exception { List shards = diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFnTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFnTest.java index 7028b2e3db..c14c8ace6f 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFnTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/transforms/SourceWriterFnTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -69,7 +68,6 @@ import java.sql.SQLSyntaxErrorException; import java.util.HashMap; import java.util.Map; -import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ModType; @@ -80,7 +78,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.junit.Before; import org.junit.FixMethodOrder; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runners.MethodSorters; @@ -1403,62 +1400,4 @@ static Ddl testDdlForNullDML() { .build(); return ddl; } - - @Ignore("Skipping until test issue is resolved") - @Test - public void testSetup_NullSessionFilePath() throws Exception { - SourceWriterFn sourceWriterFn = - new SourceWriterFn( - ImmutableList.of(testShard), - mockSpannerConfig, - testSourceDbTimezoneOffset, - testSourceSchema, - "shadow_", - "skip", - 500, - "mysql", - null, - mockDdlView, - mockShadowTableDdlView, - null, - "", - "", - ""); - - try (MockedStatic mockedSpannerAccessor = mockStatic(SpannerAccessor.class)) { - mockedSpannerAccessor - .when(() -> SpannerAccessor.getOrCreate(any())) - .thenReturn(mock(SpannerAccessor.class)); - sourceWriterFn.setup(); - } - } - - @Ignore("Skipping until test issue is resolved") - @Test - public void testSetup_EmptySessionFilePath() throws Exception { - SourceWriterFn sourceWriterFn = - new SourceWriterFn( - ImmutableList.of(testShard), - mockSpannerConfig, - testSourceDbTimezoneOffset, - testSourceSchema, - "shadow_", - "skip", - 500, - "mysql", - null, - mockDdlView, - mockShadowTableDdlView, - "", - "", - "", - ""); - - try (MockedStatic mockedSpannerAccessor = mockStatic(SpannerAccessor.class)) { - mockedSpannerAccessor - .when(() -> SpannerAccessor.getOrCreate(any())) - .thenReturn(mock(SpannerAccessor.class)); - sourceWriterFn.setup(); - } - } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/ShadowTableCreatorTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/ShadowTableCreatorTest.java index e1b637b478..2e459febff 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/ShadowTableCreatorTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/ShadowTableCreatorTest.java @@ -16,6 +16,7 @@ package com.google.cloud.teleport.v2.templates.utils; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -32,6 +33,7 @@ import com.google.common.collect.ImmutableList; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -319,4 +321,23 @@ public void testCreateShadowTables_NoNewTables() { // Verify that updateDatabaseDdl was NOT called! verify(mockDatabaseClient, never()).updateDatabaseDdl(any(), any(), any(), any()); } + + @Test + public void testCreateShadowTables_Exception() throws Exception { + Ddl primaryDbDdl = getPrimaryDbDdl(); + Ddl metadataDbDdl = getMetadataDbDdl(); + ShadowTableCreator shadowTableCreator = + new ShadowTableCreator( + Dialect.GOOGLE_STANDARD_SQL, + "shadow_", + primaryDbDdl, + metadataDbDdl, + mockSpannerAccessor, + testSpannerConfig); + + when(mockUpdateDatabaseDdlFuture.get(5, TimeUnit.MINUTES)) + .thenThrow(new ExecutionException(new RuntimeException("Spanner error"))); + + assertThrows(RuntimeException.class, () -> shadowTableCreator.createShadowTablesInSpanner()); + } } From dcf33cca08c52c764da60da5500645f635e696bf Mon Sep 17 00:00:00 2001 From: Darshan Siddesh Jagaluru Date: Wed, 3 Jun 2026 06:58:47 +0000 Subject: [PATCH 2/3] Addressing comment --- .../teleport/v2/templates/utils/ShadowTableCreatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/ShadowTableCreatorTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/ShadowTableCreatorTest.java index 2e459febff..0ccdc23d98 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/ShadowTableCreatorTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/utils/ShadowTableCreatorTest.java @@ -71,7 +71,7 @@ public void doBeforeEachTest() throws Exception { when(mockSpannerAccessor.getDatabaseAdminClient()).thenReturn(mockDatabaseClient); when(mockDatabaseClient.updateDatabaseDdl(any(), any(), any(), any())) .thenReturn(mockUpdateDatabaseDdlFuture); - when(mockUpdateDatabaseDdlFuture.get(15, TimeUnit.MINUTES)).thenReturn(null); + when(mockUpdateDatabaseDdlFuture.get(5, TimeUnit.MINUTES)).thenReturn(null); } @Test From 8ecd28878064102e385fb11ad8566504a6bc01c9 Mon Sep 17 00:00:00 2001 From: Darshan Siddesh Jagaluru Date: Wed, 3 Jun 2026 07:10:10 +0000 Subject: [PATCH 3/3] Addressing comments --- .../templates/dbutils/processor/SourceProcessorFactory.java | 4 ++++ .../dbutils/processor/SourceProcessorFactoryTest.java | 6 +----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java index 67ff6702f8..0f410f259a 100644 --- a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java +++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactory.java @@ -126,6 +126,10 @@ public static void setConnectionHelperMap(Map connect connectionHelperMap = connectionHelper; } + static Map getConnectionHelperMap() { + return connectionHelperMap; + } + /** * Creates a SourceProcessor instance for the specified source type. * diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java index 4e6f4e6415..a2dce414f8 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/processor/SourceProcessorFactoryTest.java @@ -30,7 +30,6 @@ import com.google.cloud.teleport.v2.templates.dbutils.dml.MySQLDMLGenerator; import com.google.cloud.teleport.v2.templates.exceptions.UnsupportedSourceException; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.junit.After; @@ -48,10 +47,7 @@ public class SourceProcessorFactoryTest { @BeforeClass public static void setUpBeforeClass() { - originalConnectionHelperMap = new HashMap<>(); - originalConnectionHelperMap.put(Constants.SOURCE_MYSQL, new JdbcConnectionHelper()); - originalConnectionHelperMap.put(Constants.SOURCE_CASSANDRA, new CassandraConnectionHelper()); - originalConnectionHelperMap.put(Constants.SOURCE_POSTGRESQL, new JdbcConnectionHelper()); + originalConnectionHelperMap = SourceProcessorFactory.getConnectionHelperMap(); } @After