Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,10 @@ static Map<String, PreparedStatementValueObject<?>> getPkColumnValues(
try {
spannerColName = schemaMapper.getSpannerColumnName("", sourceTable.name(), sourceColName);
} catch (NoSuchElementException e) {
continue;
LOG.warn(
"The corresponding spanner column for {} was not found in schema mapping",
sourceColName);
return null;
}
if (spannerColName == null || spannerColName == "") {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ public static Map<String, String> getPkColumnValues(
try {
spannerColName = schemaMapper.getSpannerColumnName("", sourceTable.name(), sourceColName);
} catch (NoSuchElementException e) {
continue;
LOG.warn(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should restore this change , what if there are 2 pks in mysql , 1 in spanner and the 2nd one in mysql has default set , then the insert statement would be perfectly valid, i tested:

create table test1(id int, id1 int default 1, name varchar(1000), PRIMARY key (id,id1));


insert into test1(id) values (1);

"The corresponding spanner column for {} was not found in schema mapping",
sourceColName);
return null;
}
if (spannerColName == null) {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public static void setConnectionHelperMap(Map<String, IConnectionHelper> connect
connectionHelperMap = connectionHelper;
}

static Map<String, IConnectionHelper> getConnectionHelperMap() {
return connectionHelperMap;
}
Comment thread
darshan-sj marked this conversation as resolved.

/**
* Creates a SourceProcessor instance for the specified source type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1231,4 +1231,175 @@ public void testGetPkColumnValues_MissingValueInJson() {

assertNull(response);
}

@Test
public void testGetDMLStatement_PkColSpannerColNameThrowsNoSuchElement() {
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
Ddl spannerDdl = Ddl.builder().build();
spannerDdl = Mockito.spy(spannerDdl);
Table spannerTable = Mockito.mock(Table.class);
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
SourceTable sourceTable = Mockito.mock(SourceTable.class);
SourceColumn sourceCol = Mockito.mock(SourceColumn.class);

Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");
Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId"));
Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol);
Mockito.when(sourceCol.type()).thenReturn("int");
Mockito.when(sourceTable.name()).thenReturn("Singers");

Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId"))
.thenThrow(new NoSuchElementException("Not found"));

DMLGeneratorRequest request =
new DMLGeneratorRequest.Builder(
"INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00")
.setSchemaMapper(schemaMapper)
.setDdl(spannerDdl)
.setSourceSchema(sourceSchema)
.setCommitTimestamp(Timestamp.now())
.build();

assertThrows(
InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request));
}

@Test
public void testGetDMLStatement_PkColSpannerColNameNull() {
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
Ddl spannerDdl = Ddl.builder().build();
spannerDdl = Mockito.spy(spannerDdl);
Table spannerTable = Mockito.mock(Table.class);
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
SourceTable sourceTable = Mockito.mock(SourceTable.class);
SourceColumn sourceCol = Mockito.mock(SourceColumn.class);

Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");
Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId"));
Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol);
Mockito.when(sourceCol.type()).thenReturn("int");
Mockito.when(sourceTable.name()).thenReturn("Singers");

Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId")).thenReturn(null);

DMLGeneratorRequest request =
new DMLGeneratorRequest.Builder(
"INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00")
.setSchemaMapper(schemaMapper)
.setDdl(spannerDdl)
.setSourceSchema(sourceSchema)
.setCommitTimestamp(Timestamp.now())
.build();

assertThrows(
InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request));
}

@Test
public void testGetDMLStatement_PkColSpannerColDefNull() {
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
Ddl spannerDdl = Ddl.builder().build();
spannerDdl = Mockito.spy(spannerDdl);
Table spannerTable = Mockito.mock(Table.class);
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
SourceTable sourceTable = Mockito.mock(SourceTable.class);
SourceColumn sourceCol = Mockito.mock(SourceColumn.class);

Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");
Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId"));
Mockito.when(sourceTable.column("SingerId")).thenReturn(sourceCol);
Mockito.when(sourceCol.type()).thenReturn("int");
Mockito.when(sourceTable.name()).thenReturn("Singers");

Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId"))
.thenReturn("SingerId");
Mockito.when(spannerTable.column("SingerId")).thenReturn(null);

DMLGeneratorRequest request =
new DMLGeneratorRequest.Builder(
"INSERT", "Singers", new JSONObject(), new JSONObject(), "+00:00")
.setSchemaMapper(schemaMapper)
.setDdl(spannerDdl)
.setSourceSchema(sourceSchema)
.setCommitTimestamp(Timestamp.now())
.build();

assertThrows(
InvalidDMLGenerationException.class, () -> cassandraDMLGenerator.getDMLStatement(request));
}

@Test
public void testGetDMLStatement_ColValuesSpannerColDefNullAndKeyValuesHasNonPk() {
ISchemaMapper schemaMapper = Mockito.mock(ISchemaMapper.class);
Table spannerTable = Mockito.mock(Table.class);
SourceSchema sourceSchema = Mockito.mock(SourceSchema.class);
SourceTable sourceTable = Mockito.mock(SourceTable.class);
SourceColumn pkCol = Mockito.mock(SourceColumn.class);
SourceColumn nonPkCol = Mockito.mock(SourceColumn.class);
SourceColumn missingCol = Mockito.mock(SourceColumn.class);
Column spannerPkCol = Mockito.mock(Column.class);
Column spannerNonPkCol = Mockito.mock(Column.class);

Ddl spannerDdl = Ddl.builder().build();
spannerDdl = Mockito.spy(spannerDdl);
Mockito.doReturn(spannerTable).when(spannerDdl).table("Singers");

Mockito.when(schemaMapper.getSourceTableName("", "Singers")).thenReturn("Singers");
Mockito.when(sourceSchema.table("Singers")).thenReturn(sourceTable);
Mockito.when(sourceTable.primaryKeyColumns()).thenReturn(ImmutableList.of("SingerId"));
Mockito.when(sourceTable.column("SingerId")).thenReturn(pkCol);
Mockito.when(pkCol.type()).thenReturn("int");
Mockito.when(sourceTable.name()).thenReturn("Singers");

Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "SingerId"))
.thenReturn("SingerId");
Mockito.when(spannerTable.column("SingerId")).thenReturn(spannerPkCol);
Mockito.when(spannerPkCol.name()).thenReturn("SingerId");
Mockito.when(spannerPkCol.type()).thenReturn(Type.int64());

Mockito.when(sourceTable.columns()).thenReturn(ImmutableList.of(pkCol, nonPkCol, missingCol));
Mockito.when(nonPkCol.name()).thenReturn("LastName");
Mockito.when(nonPkCol.type()).thenReturn("varchar");
Mockito.when(missingCol.name()).thenReturn("MissingCol");
Mockito.when(missingCol.type()).thenReturn("varchar");

Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "LastName"))
.thenReturn("LastName");
Mockito.when(spannerTable.column("LastName")).thenReturn(spannerNonPkCol);
Mockito.when(spannerNonPkCol.name()).thenReturn("LastName");
Mockito.when(spannerNonPkCol.type()).thenReturn(Type.string());

Mockito.when(schemaMapper.getSpannerColumnName("", "Singers", "MissingCol"))
.thenReturn("MissingCol");
Mockito.when(spannerTable.column("MissingCol")).thenReturn(null);

JSONObject keyValuesJson = new JSONObject();
keyValuesJson.put("SingerId", "999");
keyValuesJson.put("LastName", "Smith");

DMLGeneratorRequest request =
new DMLGeneratorRequest.Builder(
"INSERT", "Singers", new JSONObject(), keyValuesJson, "+00:00")
.setSchemaMapper(schemaMapper)
.setDdl(spannerDdl)
.setSourceSchema(sourceSchema)
.setCommitTimestamp(Timestamp.now())
.build();

DMLGeneratorResponse response = cassandraDMLGenerator.getDMLStatement(request);

assertTrue(response instanceof PreparedStatementGeneratedResponse);
PreparedStatementGeneratedResponse prepResponse = (PreparedStatementGeneratedResponse) response;

assertTrue(prepResponse.getDmlStatement().contains("\"LastName\""));
assertTrue(!prepResponse.getDmlStatement().contains("\"MissingCol\""));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use assertFalse here for readbility


assertEquals(3, prepResponse.getValues().size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1803,4 +1803,146 @@ public void testParseAndCastToCassandraType_Map_JSONObject() {
assertEquals(1, map.size());
assertEquals("John", map.get("name"));
}

@Test
public void testNullClassToString() {
assertEquals("NULL_CLASS", CassandraTypeHandler.NullClass.INSTANCE.toString());
}

@Test
public void testGetColumnValueByTypeForInvalidDuration() {
String columnValue = "invalid_duration";
String columnName = "total_time";
Ddl ddl =
Ddl.builder()
.createTable(TEST_TABLE)
.column(columnName)
.string()
.max()
.endColumn()
.endTable()
.build();
Column spannerCol = ddl.table(TEST_TABLE).column(columnName);
SourceColumn sourceCol =
SourceColumn.builder(SourceDatabaseType.CASSANDRA)
.name(columnName)
.type("duration")
.build();

JSONObject valuesJson = new JSONObject();
valuesJson.put(columnName, columnValue);

PreparedStatementValueObject result =
getColumnValueByType(spannerCol, sourceCol, valuesJson, null);
assertThrows(
IllegalArgumentException.class,
() -> CassandraTypeHandler.castToExpectedType(result.dataType(), result.value()));
}

@Test
public void testGetColumnValueByTypeForInvalidIpAddress() {
String columnValue = "invalid_ip";
String columnName = "ipAddress";
Ddl ddl =
Ddl.builder()
.createTable(TEST_TABLE)
.column(columnName)
.string()
.max()
.endColumn()
.endTable()
.build();
Column spannerCol = ddl.table(TEST_TABLE).column(columnName);
SourceColumn sourceCol =
SourceColumn.builder(SourceDatabaseType.CASSANDRA).name(columnName).type("inet").build();

JSONObject valuesJson = new JSONObject();
valuesJson.put(columnName, columnValue);

PreparedStatementValueObject result =
getColumnValueByType(spannerCol, sourceCol, valuesJson, null);
assertThrows(
IllegalArgumentException.class,
() -> CassandraTypeHandler.castToExpectedType(result.dataType(), result.value()));
}

@Test
public void testGetColumnValueByTypeForEmptyBytes() {
String columnName = "data";
String columnValue = "";
Ddl ddl =
Ddl.builder()
.createTable(TEST_TABLE)
.column(columnName)
.bytes()
.max()
.endColumn()
.endTable()
.build();
Column spannerCol = ddl.table(TEST_TABLE).column(columnName);
SourceColumn sourceCol =
SourceColumn.builder(SourceDatabaseType.CASSANDRA).name(columnName).type("blob").build();

JSONObject valuesJson = new JSONObject();
valuesJson.put(columnName, columnValue);

PreparedStatementValueObject result =
getColumnValueByType(spannerCol, sourceCol, valuesJson, null);
assertEquals("blob", result.dataType());
assertEquals(CassandraTypeHandler.NullClass.INSTANCE, result.value());
}

@Test
public void testGetColumnValueByType_InvalidHexBytesException() {
String columnName = "data";
String columnValue = "invalid_hex_$$";
Ddl ddl =
Ddl.builder()
.createTable(TEST_TABLE)
.column(columnName)
.bytes()
.max()
.endColumn()
.endTable()
.build();
Column spannerCol = ddl.table(TEST_TABLE).column(columnName);
SourceColumn sourceCol =
SourceColumn.builder(SourceDatabaseType.CASSANDRA).name(columnName).type("blob").build();

JSONObject valuesJson = new JSONObject();
valuesJson.put(columnName, columnValue);

assertThrows(
IllegalArgumentException.class,
() -> getColumnValueByType(spannerCol, sourceCol, valuesJson, null));
}

@Test
public void testGetColumnValueByType_InvalidTimestampException() {
String columnName = "created_on";
String columnValue = "invalid_timestamp";
Ddl ddl =
Ddl.builder()
.createTable(TEST_TABLE)
.column(columnName)
.date()
.endColumn()
.endTable()
.build();
Column spannerCol = ddl.table(TEST_TABLE).column(columnName);
SourceColumn sourceCol =
SourceColumn.builder(SourceDatabaseType.CASSANDRA)
.name(columnName)
.type("timestamp")
.build();

JSONObject valuesJson = new JSONObject();
valuesJson.put(columnName, columnValue);

PreparedStatementValueObject result =
getColumnValueByType(spannerCol, sourceCol, valuesJson, null);
assertThrows(
IllegalArgumentException.class,
() -> CassandraTypeHandler.castToExpectedType(result.dataType(), result.value()));
}
}
Loading
Loading