From 2608ae5e92db6cee8b267677afa8b8d2e864af78 Mon Sep 17 00:00:00 2001
From: hardy <2545433047@qq.com>
Date: Fri, 24 Apr 2026 14:50:23 +0800
Subject: [PATCH] fix:TAP-11273 In the master-slave merge and flatten scenario,
the foreign key of the sub-table is not unset when the associated key in the
main table is modified.
---
connectors/mongodb-connector/pom.xml | 2 +-
.../io/tapdata/mongodb/MongodbConnector.java | 125 ++++++------------
.../mongodb/writer/MongodbMergeOperate.java | 61 +++++++--
3 files changed, 92 insertions(+), 96 deletions(-)
diff --git a/connectors/mongodb-connector/pom.xml b/connectors/mongodb-connector/pom.xml
index 011b1c32..5de905f5 100644
--- a/connectors/mongodb-connector/pom.xml
+++ b/connectors/mongodb-connector/pom.xml
@@ -13,7 +13,7 @@
8
- 2.0.7-SNAPSHOT
+ 2.0.6-SNAPSHOT
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
index cba8d300..4051d322 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
@@ -162,7 +162,6 @@ protected MongoCollection getMongoRawCollection(String table) {
public void discoverSchema(TapConnectionContext connectionContext, List tables, int tableSize, Consumer> consumer) throws Throwable {
final String database = mongoConfig.getDatabase();
final String version = MongodbUtil.getVersionString(mongoClient, database);
- Map collectionTypeMap = getCollectionTypeMap(tables);
MongoIterable collectionNames = mongoDatabase.listCollectionNames();
TableFieldTypesGenerator tableFieldTypesGenerator = InstanceFactory.instance(TableFieldTypesGenerator.class);
this.stringTypeValueMap = new HashMap<>();
@@ -196,14 +195,8 @@ public void discoverSchema(TapConnectionContext connectionContext, List
//List all the tables under the database.
List list = list();
nameList.forEach(name -> {
- String collectionType = collectionTypeMap.get(name);
- boolean isView = isViewCollection(collectionType);
TapTable table = new TapTable(name);
- if(!isView){
- table.defaultPrimaryKeys("_id");
- }else{
- table.setType(collectionType);
- }
+ table.defaultPrimaryKeys("_id");
MongoCollection> collection = documentMap.get(name);
try {
MongodbUtil.sampleDataRow(collection, sampleSizeBatchSize, (dataRow) -> {
@@ -222,48 +215,46 @@ public void discoverSchema(TapConnectionContext connectionContext, List
MongodbUtil.maskUriPassword(mongoConfig.getUri()), name, e.getMessage(), e);
}
- if(!isView){
- collection.listIndexes().forEach((index) -> {
- Object keyObj = index.get("key");
- if (!(keyObj instanceof Document)) {
- return;
- }
- Document keys = (Document) keyObj;
-
- TapIndex tapIndex = new TapIndex();
- // TODO: TapIndex struct not enough to represent index, so we encode index info in name
- tapIndex.setName("__t__" + ((Document) index).toJson());
-
- AtomicBoolean haveOid = new AtomicBoolean();
- AtomicInteger keyCounter = new AtomicInteger();
- keys.forEach((k, v) -> {
- TapIndexField tapIndexField = new TapIndexField().name(k);
- if (v instanceof Integer) {
- tapIndexField.fieldAsc(v.equals(1));
- } else {
- tapIndexField.fieldAsc(true);
- }
- tapIndex.indexField(tapIndexField);
- if (k.equals("_id")) {
- haveOid.set(true);
- }
- keyCounter.incrementAndGet();
- });
- if (Boolean.TRUE.equals(index.get(UNIQUE_KEY))) {
- tapIndex.unique(true);
+ collection.listIndexes().forEach((index) -> {
+ Object keyObj = index.get("key");
+ if (!(keyObj instanceof Document)) {
+ return;
+ }
+ Document keys = (Document) keyObj;
+
+ TapIndex tapIndex = new TapIndex();
+ // TODO: TapIndex struct not enough to represent index, so we encode index info in name
+ tapIndex.setName("__t__" + ((Document) index).toJson());
+
+ AtomicBoolean haveOid = new AtomicBoolean();
+ AtomicInteger keyCounter = new AtomicInteger();
+ keys.forEach((k, v) -> {
+ TapIndexField tapIndexField = new TapIndexField().name(k);
+ if (v instanceof Integer) {
+ tapIndexField.fieldAsc(v.equals(1));
} else {
- tapIndex.unique(false);
+ tapIndexField.fieldAsc(true);
}
- if (haveOid.get() && keyCounter.get() == 1) {
- tapIndex.unique(true);
+ tapIndex.indexField(tapIndexField);
+ if (k.equals("_id")) {
+ haveOid.set(true);
}
- TapLogger.info(TAG, "MongodbConnector discoverSchema table: {} index {}", name, ((Document) index).toJson());
- table.add(tapIndex);
+ keyCounter.incrementAndGet();
});
- Map sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name);
- MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys);
- MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table);
- }
+ if (Boolean.TRUE.equals(index.get(UNIQUE_KEY))) {
+ tapIndex.unique(true);
+ } else {
+ tapIndex.unique(false);
+ }
+ if (haveOid.get() && keyCounter.get() == 1) {
+ tapIndex.unique(true);
+ }
+ TapLogger.info(TAG, "MongodbConnector discoverSchema table: {} index {}", name, ((Document) index).toJson());
+ table.add(tapIndex);
+ });
+ Map sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name);
+ MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys);
+ MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table);
if (!Objects.isNull(table.getNameFieldMap()) && !table.getNameFieldMap().isEmpty()) {
list.add(table);
}
@@ -287,14 +278,8 @@ public void discoverSchema(TapConnectionContext connectionContext, List
//List all the tables under the database.
List list = list();
nameList.forEach(name -> {
- String collectionType = collectionTypeMap.get(name);
- boolean isView = isViewCollection(collectionType);
TapTable table = new TapTable(name);
- if(!isView){
- table.defaultPrimaryKeys(singletonList(COLLECTION_ID_FIELD));
- }else{
- table.setType(collectionType);
- }
+ table.defaultPrimaryKeys(singletonList(COLLECTION_ID_FIELD));
// save collection info which include capped info
try (MongoCursor cursor = documentMap.get(name).find().iterator()) {
while (cursor.hasNext()) {
@@ -307,11 +292,9 @@ public void discoverSchema(TapConnectionContext connectionContext, List
break;
}
}
- if(!isView){
- Map sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name);
- MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys);
- MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table);
- }
+ Map sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name);
+ MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys);
+ MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table);
if (!Objects.isNull(table.getNameFieldMap()) && !table.getNameFieldMap().isEmpty()) {
list.add(table);
}
@@ -1932,30 +1915,4 @@ protected void rollbackTransaction(TapConnectorContext connectorContext) {
return null;
});
}
-
- private Map getCollectionTypeMap(List tables) {
- Map collectionTypeMap = new HashMap<>();
- ListCollectionsIterable listCollections;
- if (CollectionUtils.isNotEmpty(tables)) {
- listCollections = mongoDatabase.listCollections().filter(new Document("name", new Document("$in", tables)));
- } else {
- listCollections = mongoDatabase.listCollections();
- }
- for (Document collection : listCollections) {
- String name = collection.getString("name");
- if (StringUtils.isBlank(name) || name.startsWith("system.")) {
- continue;
- }
- String type = collection.getString("type");
- if (StringUtils.isBlank(type)) {
- type = "collection";
- }
- collectionTypeMap.put(name, type);
- }
- return collectionTypeMap;
- }
-
- private boolean isViewCollection(String collectionType) {
- return "view".equalsIgnoreCase(collectionType);
- }
}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java
index 6966ae6f..2af0e3cd 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java
@@ -124,7 +124,8 @@ public static void recursiveMerge(
mergeFilter,
topLevel,
1,
- updateKeyDiffInfos
+ updateKeyDiffInfos,
+ extractMainTableKeys(mergeBundle, topLevel)
);
}
@@ -142,6 +143,39 @@ public static void recursiveMerge(
int topLevel,
int loopTime,
List updateKeyDiffInfos
+ ) {
+ recursiveMerge(mergeBundle, properties, mergeResults, mergeLookupResults, mergeResult, unsetResult,
+ updateJoinKeys, sharedJoinKeys, parentProperties, mergeFilter, topLevel, loopTime,
+ updateKeyDiffInfos, extractMainTableKeys(mergeBundle, topLevel));
+ }
+
+ private static Set extractMainTableKeys(MergeBundle mergeBundle, int topLevel) {
+ if (topLevel != 1 || mergeBundle == null) {
+ return Collections.emptySet();
+ }
+ Map after = mergeBundle.getAfter();
+ Map source = MapUtils.isNotEmpty(after) ? after : mergeBundle.getBefore();
+ if (MapUtils.isEmpty(source)) {
+ return Collections.emptySet();
+ }
+ return new HashSet<>(source.keySet());
+ }
+
+ private static void recursiveMerge(
+ MergeBundle mergeBundle,
+ MergeTableProperties properties,
+ List mergeResults,
+ List mergeLookupResults,
+ MergeResult mergeResult,
+ MergeResult unsetResult,
+ Map updateJoinKeys,
+ Set sharedJoinKeys,
+ MergeTableProperties parentProperties,
+ MergeFilter mergeFilter,
+ int topLevel,
+ int loopTime,
+ List updateKeyDiffInfos,
+ Set mainTableKeys
) {
boolean unsetResultNull = null == unsetResult;
MergeResult updateKeyDiff = null;
@@ -153,7 +187,7 @@ public static void recursiveMerge(
}
break;
case updateWrite:
- unsetResult = updateWriteUnsetMerge(mergeBundle, properties, updateJoinKeys, unsetResult, sharedJoinKeys, mergeFilter, topLevel);
+ unsetResult = updateWriteUnsetMerge(mergeBundle, properties, updateJoinKeys, unsetResult, sharedJoinKeys, mergeFilter, topLevel, mainTableKeys);
if (unsetResultNull) {
addUnsetMerge(mergeResults, unsetResult);
}
@@ -198,7 +232,8 @@ public static void recursiveMerge(
mergeFilter,
topLevel,
privateLoopTime,
- null
+ null,
+ mainTableKeys
);
recursiveOnce = true;
}
@@ -266,7 +301,8 @@ private static MergeResult addMergeResults(List mergeResults, Merge
private static MergeResult updateWriteUnsetMerge(
MergeBundle mergeBundle, MergeTableProperties currentProperty,
Map updateJoinKeys,
- MergeResult mergeResult, Set sharedJoinKeys, MergeFilter mergeFilter, int topLevel) {
+ MergeResult mergeResult, Set sharedJoinKeys, MergeFilter mergeFilter, int topLevel,
+ Set mainTableKeys) {
if (null == currentProperty) {
return mergeResult;
}
@@ -314,7 +350,15 @@ private static MergeResult updateWriteUnsetMerge(
if (null == mergeResult.getOperation()) {
mergeResult.setOperation(MergeResult.Operation.UPDATE);
}
- Document unsetDoc = buildUnsetDocument(sharedJoinKeys, after, targetPath, isArray, firstMergeResult);
+ Set effectiveSharedJoinKeys = sharedJoinKeys;
+ if (EmptyKit.isEmpty(targetPath) && CollectionUtils.isNotEmpty(mainTableKeys)) {
+ effectiveSharedJoinKeys = new HashSet<>();
+ if (sharedJoinKeys != null) {
+ effectiveSharedJoinKeys.addAll(sharedJoinKeys);
+ }
+ effectiveSharedJoinKeys.addAll(mainTableKeys);
+ }
+ Document unsetDoc = buildUnsetDocument(effectiveSharedJoinKeys, after, targetPath, isArray, firstMergeResult);
Document update = mergeResult.getUpdate();
if (update.containsKey(UNSET_KEY)) {
update.get(UNSET_KEY, Document.class).putAll(unsetDoc);
@@ -854,12 +898,7 @@ protected static Document unsetFilter(Map before, Map joinKey : joinKeys) {
String key = joinKey.get("target");
- Object value;
- if (topLevel == 1) {
- value = MapUtil.getValueByKey(after, key);
- } else {
- value = MapUtil.getValueByKey(before, key);
- }
+ Object value = MapUtil.getValueByKey(before, key);
document.put(key, value);
}
return document;