diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbUtil.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbUtil.java index a25a25586..4f6389e8e 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbUtil.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbUtil.java @@ -34,9 +34,9 @@ import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -286,6 +286,28 @@ public static String replicaSetUsedIn(String addresses) { return addresses.substring(0, index); } + private static final String[][] DEFAULT_HA_TIMEOUT_OPTIONS = { + {"serverSelectionTimeoutMS", "15000"}, + {"socketTimeoutMS", "15000"}, + {"maxIdleTimeMS", "30000"} + }; + + public static String appendDefaultHaTimeoutOptions(String mongodbUri) { + if (EmptyKit.isBlank(mongodbUri)) { + return mongodbUri; + } + StringBuilder result = new StringBuilder(mongodbUri); + for (String[] kv : DEFAULT_HA_TIMEOUT_OPTIONS) { + String key = kv[0]; + String value = kv[1]; + Pattern pattern = Pattern.compile("[?&]" + Pattern.quote(key) + "=", Pattern.CASE_INSENSITIVE); + if (!pattern.matcher(result).find()) { + result.append(result.indexOf("?") >= 0 ? '&' : '?').append(key).append('=').append(value); + } + } + return result.toString(); + } + public static String getMongoDBURIOptions(String databaseUri) { String options = null; try { @@ -479,6 +501,7 @@ public static MongoClient createMongoClient(MongodbConfig mongodbConfig) { } public static MongoClientSettings.Builder getMongoClientSettingsBuilder(String mongodbUri, MongodbConfig mongodbConfig) { + mongodbUri = appendDefaultHaTimeoutOptions(mongodbUri); CodecRegistry defaultCodecRegistry = MongoClientSettings.getDefaultCodecRegistry(); CodecRegistry codecRegistry = CodecRegistries.fromRegistries(CodecRegistries.fromCodecs( new TapdataBigDecimalCodec(), diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java index 500240fa6..af0a455de 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java @@ -222,7 +222,7 @@ private void readFromOplog(TapConnectorContext connectorContext, String replicaS final Bson fromMigrateFilter = Filters.exists("fromMigrate", false); - try (MongoClient mongoclient = MongoClients.create(mongodbURI)) { + try (MongoClient mongoclient = MongoClients.create(MongodbUtil.appendDefaultHaTimeoutOptions(mongodbURI))) { final MongoCollection oplogCollection = mongoclient.getDatabase(LOCAL_DATABASE).getCollection(OPLOG_COLLECTION); try (final MongoCursor mongoCursor = oplogCollection.find(fromMigrateFilter).sort(new Document("$natural", 1)).limit(1).cursorType(CursorType.TailableAwait) .noCursorTimeout(true).iterator()) { diff --git a/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbUtilTest.java b/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbUtilTest.java index 2314b5f83..961132463 100644 --- a/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbUtilTest.java +++ b/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbUtilTest.java @@ -72,4 +72,56 @@ void test_load_oplog(){ verify(callback,times(1)).accept(any()); } } + + @Nested + class appendDefaultHaTimeoutOptionsTest { + @Test + void noQueryString_appendsAllThree() { + String uri = "mongodb://host:27017/db"; + String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri); + Assertions.assertEquals( + "mongodb://host:27017/db?serverSelectionTimeoutMS=15000&socketTimeoutMS=15000&maxIdleTimeMS=30000", + result); + } + + @Test + void existingAuthSource_appendsWithAmpersand() { + String uri = "mongodb://u:p@host/db?authSource=admin"; + String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri); + Assertions.assertEquals( + "mongodb://u:p@host/db?authSource=admin&serverSelectionTimeoutMS=15000&socketTimeoutMS=15000&maxIdleTimeMS=30000", + result); + } + + @Test + void userSocketTimeoutPreserved_otherTwoAdded() { + String uri = "mongodb://host/db?socketTimeoutMS=5000"; + String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri); + Assertions.assertTrue(result.contains("socketTimeoutMS=5000")); + Assertions.assertFalse(result.contains("socketTimeoutMS=15000")); + Assertions.assertTrue(result.contains("serverSelectionTimeoutMS=15000")); + Assertions.assertTrue(result.contains("maxIdleTimeMS=30000")); + } + + @Test + void allThreeAlreadySet_returnsUnchanged() { + String uri = "mongodb://host/db?serverSelectionTimeoutMS=20000&socketTimeoutMS=10000&maxIdleTimeMS=45000"; + String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri); + Assertions.assertEquals(uri, result); + } + + @Test + void caseInsensitiveKey_notDuplicated() { + String uri = "mongodb://host/db?SocketTimeoutMS=5000"; + String result = MongodbUtil.appendDefaultHaTimeoutOptions(uri); + Assertions.assertTrue(result.contains("SocketTimeoutMS=5000")); + Assertions.assertFalse(result.contains("socketTimeoutMS=15000")); + } + + @Test + void blankUri_returnedAsIs() { + Assertions.assertNull(MongodbUtil.appendDefaultHaTimeoutOptions(null)); + Assertions.assertEquals("", MongodbUtil.appendDefaultHaTimeoutOptions("")); + } + } }