From 2608ae5e92db6cee8b267677afa8b8d2e864af78 Mon Sep 17 00:00:00 2001 From: hardy <2545433047@qq.com> Date: Fri, 24 Apr 2026 14:50:23 +0800 Subject: [PATCH] fix:TAP-11273 In the master-slave merge and flatten scenario, the foreign key of the sub-table is not unset when the associated key in the main table is modified. --- connectors/mongodb-connector/pom.xml | 2 +- .../io/tapdata/mongodb/MongodbConnector.java | 125 ++++++------------ .../mongodb/writer/MongodbMergeOperate.java | 61 +++++++-- 3 files changed, 92 insertions(+), 96 deletions(-) diff --git a/connectors/mongodb-connector/pom.xml b/connectors/mongodb-connector/pom.xml index 011b1c32..5de905f5 100644 --- a/connectors/mongodb-connector/pom.xml +++ b/connectors/mongodb-connector/pom.xml @@ -13,7 +13,7 @@ 8 - 2.0.7-SNAPSHOT + 2.0.6-SNAPSHOT diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java index cba8d300..4051d322 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java @@ -162,7 +162,6 @@ protected MongoCollection getMongoRawCollection(String table) { public void discoverSchema(TapConnectionContext connectionContext, List tables, int tableSize, Consumer> consumer) throws Throwable { final String database = mongoConfig.getDatabase(); final String version = MongodbUtil.getVersionString(mongoClient, database); - Map collectionTypeMap = getCollectionTypeMap(tables); MongoIterable collectionNames = mongoDatabase.listCollectionNames(); TableFieldTypesGenerator tableFieldTypesGenerator = InstanceFactory.instance(TableFieldTypesGenerator.class); this.stringTypeValueMap = new HashMap<>(); @@ -196,14 +195,8 @@ public void discoverSchema(TapConnectionContext connectionContext, List //List all the tables under the database. List list = list(); nameList.forEach(name -> { - String collectionType = collectionTypeMap.get(name); - boolean isView = isViewCollection(collectionType); TapTable table = new TapTable(name); - if(!isView){ - table.defaultPrimaryKeys("_id"); - }else{ - table.setType(collectionType); - } + table.defaultPrimaryKeys("_id"); MongoCollection collection = documentMap.get(name); try { MongodbUtil.sampleDataRow(collection, sampleSizeBatchSize, (dataRow) -> { @@ -222,48 +215,46 @@ public void discoverSchema(TapConnectionContext connectionContext, List MongodbUtil.maskUriPassword(mongoConfig.getUri()), name, e.getMessage(), e); } - if(!isView){ - collection.listIndexes().forEach((index) -> { - Object keyObj = index.get("key"); - if (!(keyObj instanceof Document)) { - return; - } - Document keys = (Document) keyObj; - - TapIndex tapIndex = new TapIndex(); - // TODO: TapIndex struct not enough to represent index, so we encode index info in name - tapIndex.setName("__t__" + ((Document) index).toJson()); - - AtomicBoolean haveOid = new AtomicBoolean(); - AtomicInteger keyCounter = new AtomicInteger(); - keys.forEach((k, v) -> { - TapIndexField tapIndexField = new TapIndexField().name(k); - if (v instanceof Integer) { - tapIndexField.fieldAsc(v.equals(1)); - } else { - tapIndexField.fieldAsc(true); - } - tapIndex.indexField(tapIndexField); - if (k.equals("_id")) { - haveOid.set(true); - } - keyCounter.incrementAndGet(); - }); - if (Boolean.TRUE.equals(index.get(UNIQUE_KEY))) { - tapIndex.unique(true); + collection.listIndexes().forEach((index) -> { + Object keyObj = index.get("key"); + if (!(keyObj instanceof Document)) { + return; + } + Document keys = (Document) keyObj; + + TapIndex tapIndex = new TapIndex(); + // TODO: TapIndex struct not enough to represent index, so we encode index info in name + tapIndex.setName("__t__" + ((Document) index).toJson()); + + AtomicBoolean haveOid = new AtomicBoolean(); + AtomicInteger keyCounter = new AtomicInteger(); + keys.forEach((k, v) -> { + TapIndexField tapIndexField = new TapIndexField().name(k); + if (v instanceof Integer) { + tapIndexField.fieldAsc(v.equals(1)); } else { - tapIndex.unique(false); + tapIndexField.fieldAsc(true); } - if (haveOid.get() && keyCounter.get() == 1) { - tapIndex.unique(true); + tapIndex.indexField(tapIndexField); + if (k.equals("_id")) { + haveOid.set(true); } - TapLogger.info(TAG, "MongodbConnector discoverSchema table: {} index {}", name, ((Document) index).toJson()); - table.add(tapIndex); + keyCounter.incrementAndGet(); }); - Map sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name); - MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys); - MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table); - } + if (Boolean.TRUE.equals(index.get(UNIQUE_KEY))) { + tapIndex.unique(true); + } else { + tapIndex.unique(false); + } + if (haveOid.get() && keyCounter.get() == 1) { + tapIndex.unique(true); + } + TapLogger.info(TAG, "MongodbConnector discoverSchema table: {} index {}", name, ((Document) index).toJson()); + table.add(tapIndex); + }); + Map sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name); + MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys); + MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table); if (!Objects.isNull(table.getNameFieldMap()) && !table.getNameFieldMap().isEmpty()) { list.add(table); } @@ -287,14 +278,8 @@ public void discoverSchema(TapConnectionContext connectionContext, List //List all the tables under the database. List list = list(); nameList.forEach(name -> { - String collectionType = collectionTypeMap.get(name); - boolean isView = isViewCollection(collectionType); TapTable table = new TapTable(name); - if(!isView){ - table.defaultPrimaryKeys(singletonList(COLLECTION_ID_FIELD)); - }else{ - table.setType(collectionType); - } + table.defaultPrimaryKeys(singletonList(COLLECTION_ID_FIELD)); // save collection info which include capped info try (MongoCursor cursor = documentMap.get(name).find().iterator()) { while (cursor.hasNext()) { @@ -307,11 +292,9 @@ public void discoverSchema(TapConnectionContext connectionContext, List break; } } - if(!isView){ - Map sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name); - MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys); - MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table); - } + Map sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name); + MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys); + MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table); if (!Objects.isNull(table.getNameFieldMap()) && !table.getNameFieldMap().isEmpty()) { list.add(table); } @@ -1932,30 +1915,4 @@ protected void rollbackTransaction(TapConnectorContext connectorContext) { return null; }); } - - private Map getCollectionTypeMap(List tables) { - Map collectionTypeMap = new HashMap<>(); - ListCollectionsIterable listCollections; - if (CollectionUtils.isNotEmpty(tables)) { - listCollections = mongoDatabase.listCollections().filter(new Document("name", new Document("$in", tables))); - } else { - listCollections = mongoDatabase.listCollections(); - } - for (Document collection : listCollections) { - String name = collection.getString("name"); - if (StringUtils.isBlank(name) || name.startsWith("system.")) { - continue; - } - String type = collection.getString("type"); - if (StringUtils.isBlank(type)) { - type = "collection"; - } - collectionTypeMap.put(name, type); - } - return collectionTypeMap; - } - - private boolean isViewCollection(String collectionType) { - return "view".equalsIgnoreCase(collectionType); - } } diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java index 6966ae6f..2af0e3cd 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java @@ -124,7 +124,8 @@ public static void recursiveMerge( mergeFilter, topLevel, 1, - updateKeyDiffInfos + updateKeyDiffInfos, + extractMainTableKeys(mergeBundle, topLevel) ); } @@ -142,6 +143,39 @@ public static void recursiveMerge( int topLevel, int loopTime, List updateKeyDiffInfos + ) { + recursiveMerge(mergeBundle, properties, mergeResults, mergeLookupResults, mergeResult, unsetResult, + updateJoinKeys, sharedJoinKeys, parentProperties, mergeFilter, topLevel, loopTime, + updateKeyDiffInfos, extractMainTableKeys(mergeBundle, topLevel)); + } + + private static Set extractMainTableKeys(MergeBundle mergeBundle, int topLevel) { + if (topLevel != 1 || mergeBundle == null) { + return Collections.emptySet(); + } + Map after = mergeBundle.getAfter(); + Map source = MapUtils.isNotEmpty(after) ? after : mergeBundle.getBefore(); + if (MapUtils.isEmpty(source)) { + return Collections.emptySet(); + } + return new HashSet<>(source.keySet()); + } + + private static void recursiveMerge( + MergeBundle mergeBundle, + MergeTableProperties properties, + List mergeResults, + List mergeLookupResults, + MergeResult mergeResult, + MergeResult unsetResult, + Map updateJoinKeys, + Set sharedJoinKeys, + MergeTableProperties parentProperties, + MergeFilter mergeFilter, + int topLevel, + int loopTime, + List updateKeyDiffInfos, + Set mainTableKeys ) { boolean unsetResultNull = null == unsetResult; MergeResult updateKeyDiff = null; @@ -153,7 +187,7 @@ public static void recursiveMerge( } break; case updateWrite: - unsetResult = updateWriteUnsetMerge(mergeBundle, properties, updateJoinKeys, unsetResult, sharedJoinKeys, mergeFilter, topLevel); + unsetResult = updateWriteUnsetMerge(mergeBundle, properties, updateJoinKeys, unsetResult, sharedJoinKeys, mergeFilter, topLevel, mainTableKeys); if (unsetResultNull) { addUnsetMerge(mergeResults, unsetResult); } @@ -198,7 +232,8 @@ public static void recursiveMerge( mergeFilter, topLevel, privateLoopTime, - null + null, + mainTableKeys ); recursiveOnce = true; } @@ -266,7 +301,8 @@ private static MergeResult addMergeResults(List mergeResults, Merge private static MergeResult updateWriteUnsetMerge( MergeBundle mergeBundle, MergeTableProperties currentProperty, Map updateJoinKeys, - MergeResult mergeResult, Set sharedJoinKeys, MergeFilter mergeFilter, int topLevel) { + MergeResult mergeResult, Set sharedJoinKeys, MergeFilter mergeFilter, int topLevel, + Set mainTableKeys) { if (null == currentProperty) { return mergeResult; } @@ -314,7 +350,15 @@ private static MergeResult updateWriteUnsetMerge( if (null == mergeResult.getOperation()) { mergeResult.setOperation(MergeResult.Operation.UPDATE); } - Document unsetDoc = buildUnsetDocument(sharedJoinKeys, after, targetPath, isArray, firstMergeResult); + Set effectiveSharedJoinKeys = sharedJoinKeys; + if (EmptyKit.isEmpty(targetPath) && CollectionUtils.isNotEmpty(mainTableKeys)) { + effectiveSharedJoinKeys = new HashSet<>(); + if (sharedJoinKeys != null) { + effectiveSharedJoinKeys.addAll(sharedJoinKeys); + } + effectiveSharedJoinKeys.addAll(mainTableKeys); + } + Document unsetDoc = buildUnsetDocument(effectiveSharedJoinKeys, after, targetPath, isArray, firstMergeResult); Document update = mergeResult.getUpdate(); if (update.containsKey(UNSET_KEY)) { update.get(UNSET_KEY, Document.class).putAll(unsetDoc); @@ -854,12 +898,7 @@ protected static Document unsetFilter(Map before, Map joinKey : joinKeys) { String key = joinKey.get("target"); - Object value; - if (topLevel == 1) { - value = MapUtil.getValueByKey(after, key); - } else { - value = MapUtil.getValueByKey(before, key); - } + Object value = MapUtil.getValueByKey(before, key); document.put(key, value); } return document;