Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -286,6 +286,28 @@ public static String replicaSetUsedIn(String addresses) {
return addresses.substring(0, index);
}

private static final String[][] DEFAULT_HA_TIMEOUT_OPTIONS = {
{"serverSelectionTimeoutMS", "15000"},
{"socketTimeoutMS", "15000"},
{"maxIdleTimeMS", "30000"}
};

public static String appendDefaultHaTimeoutOptions(String mongodbUri) {
if (EmptyKit.isBlank(mongodbUri)) {
return mongodbUri;
}
StringBuilder result = new StringBuilder(mongodbUri);
for (String[] kv : DEFAULT_HA_TIMEOUT_OPTIONS) {
String key = kv[0];
String value = kv[1];
Pattern pattern = Pattern.compile("[?&]" + Pattern.quote(key) + "=", Pattern.CASE_INSENSITIVE);
if (!pattern.matcher(result).find()) {
result.append(result.indexOf("?") >= 0 ? '&' : '?').append(key).append('=').append(value);
}
}
return result.toString();
}

public static String getMongoDBURIOptions(String databaseUri) {
String options = null;
try {
Expand Down Expand Up @@ -479,6 +501,7 @@ public static MongoClient createMongoClient(MongodbConfig mongodbConfig) {
}

public static MongoClientSettings.Builder getMongoClientSettingsBuilder(String mongodbUri, MongodbConfig mongodbConfig) {
mongodbUri = appendDefaultHaTimeoutOptions(mongodbUri);
CodecRegistry defaultCodecRegistry = MongoClientSettings.getDefaultCodecRegistry();
CodecRegistry codecRegistry = CodecRegistries.fromRegistries(CodecRegistries.fromCodecs(
new TapdataBigDecimalCodec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private void readFromOplog(TapConnectorContext connectorContext, String replicaS

final Bson fromMigrateFilter = Filters.exists("fromMigrate", false);

try (MongoClient mongoclient = MongoClients.create(mongodbURI)) {
try (MongoClient mongoclient = MongoClients.create(MongodbUtil.appendDefaultHaTimeoutOptions(mongodbURI))) {
final MongoCollection<Document> oplogCollection = mongoclient.getDatabase(LOCAL_DATABASE).getCollection(OPLOG_COLLECTION);
try (final MongoCursor<Document> mongoCursor = oplogCollection.find(fromMigrateFilter).sort(new Document("$natural", 1)).limit(1).cursorType(CursorType.TailableAwait)
.noCursorTimeout(true).iterator()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,56 @@ void test_load_oplog(){
verify(callback,times(1)).accept(any());
}
}

@Nested
class appendDefaultHaTimeoutOptionsTest {
@Test
void noQueryString_appendsAllThree() {
String uri = "mongodb://host:27017/db";
String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri);
Assertions.assertEquals(
"mongodb://host:27017/db?serverSelectionTimeoutMS=15000&socketTimeoutMS=15000&maxIdleTimeMS=30000",
result);
}

@Test
void existingAuthSource_appendsWithAmpersand() {
String uri = "mongodb://u:p@host/db?authSource=admin";
String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri);
Assertions.assertEquals(
"mongodb://u:p@host/db?authSource=admin&serverSelectionTimeoutMS=15000&socketTimeoutMS=15000&maxIdleTimeMS=30000",
result);
}

@Test
void userSocketTimeoutPreserved_otherTwoAdded() {
String uri = "mongodb://host/db?socketTimeoutMS=5000";
String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri);
Assertions.assertTrue(result.contains("socketTimeoutMS=5000"));
Assertions.assertFalse(result.contains("socketTimeoutMS=15000"));
Assertions.assertTrue(result.contains("serverSelectionTimeoutMS=15000"));
Assertions.assertTrue(result.contains("maxIdleTimeMS=30000"));
}

@Test
void allThreeAlreadySet_returnsUnchanged() {
String uri = "mongodb://host/db?serverSelectionTimeoutMS=20000&socketTimeoutMS=10000&maxIdleTimeMS=45000";
String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri);
Assertions.assertEquals(uri, result);
}

@Test
void caseInsensitiveKey_notDuplicated() {
String uri = "mongodb://host/db?SocketTimeoutMS=5000";
String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri);
Assertions.assertTrue(result.contains("SocketTimeoutMS=5000"));
Assertions.assertFalse(result.contains("socketTimeoutMS=15000"));
}

@Test
void blankUri_returnedAsIs() {
Assertions.assertNull(MongodbUtil.appendDefaultHaTimeoutOptions(null));
Assertions.assertEquals("", MongodbUtil.appendDefaultHaTimeoutOptions(""));
}
}
}