diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java index 6966ae6f..2af0e3cd 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/writer/MongodbMergeOperate.java @@ -124,7 +124,8 @@ public static void recursiveMerge( mergeFilter, topLevel, 1, - updateKeyDiffInfos + updateKeyDiffInfos, + extractMainTableKeys(mergeBundle, topLevel) ); } @@ -142,6 +143,39 @@ public static void recursiveMerge( int topLevel, int loopTime, List updateKeyDiffInfos + ) { + recursiveMerge(mergeBundle, properties, mergeResults, mergeLookupResults, mergeResult, unsetResult, + updateJoinKeys, sharedJoinKeys, parentProperties, mergeFilter, topLevel, loopTime, + updateKeyDiffInfos, extractMainTableKeys(mergeBundle, topLevel)); + } + + private static Set extractMainTableKeys(MergeBundle mergeBundle, int topLevel) { + if (topLevel != 1 || mergeBundle == null) { + return Collections.emptySet(); + } + Map after = mergeBundle.getAfter(); + Map source = MapUtils.isNotEmpty(after) ? after : mergeBundle.getBefore(); + if (MapUtils.isEmpty(source)) { + return Collections.emptySet(); + } + return new HashSet<>(source.keySet()); + } + + private static void recursiveMerge( + MergeBundle mergeBundle, + MergeTableProperties properties, + List mergeResults, + List mergeLookupResults, + MergeResult mergeResult, + MergeResult unsetResult, + Map updateJoinKeys, + Set sharedJoinKeys, + MergeTableProperties parentProperties, + MergeFilter mergeFilter, + int topLevel, + int loopTime, + List updateKeyDiffInfos, + Set mainTableKeys ) { boolean unsetResultNull = null == unsetResult; MergeResult updateKeyDiff = null; @@ -153,7 +187,7 @@ public static void recursiveMerge( } break; case updateWrite: - unsetResult = updateWriteUnsetMerge(mergeBundle, properties, updateJoinKeys, unsetResult, sharedJoinKeys, mergeFilter, topLevel); + unsetResult = updateWriteUnsetMerge(mergeBundle, properties, updateJoinKeys, unsetResult, sharedJoinKeys, mergeFilter, topLevel, mainTableKeys); if (unsetResultNull) { addUnsetMerge(mergeResults, unsetResult); } @@ -198,7 +232,8 @@ public static void recursiveMerge( mergeFilter, topLevel, privateLoopTime, - null + null, + mainTableKeys ); recursiveOnce = true; } @@ -266,7 +301,8 @@ private static MergeResult addMergeResults(List mergeResults, Merge private static MergeResult updateWriteUnsetMerge( MergeBundle mergeBundle, MergeTableProperties currentProperty, Map updateJoinKeys, - MergeResult mergeResult, Set sharedJoinKeys, MergeFilter mergeFilter, int topLevel) { + MergeResult mergeResult, Set sharedJoinKeys, MergeFilter mergeFilter, int topLevel, + Set mainTableKeys) { if (null == currentProperty) { return mergeResult; } @@ -314,7 +350,15 @@ private static MergeResult updateWriteUnsetMerge( if (null == mergeResult.getOperation()) { mergeResult.setOperation(MergeResult.Operation.UPDATE); } - Document unsetDoc = buildUnsetDocument(sharedJoinKeys, after, targetPath, isArray, firstMergeResult); + Set effectiveSharedJoinKeys = sharedJoinKeys; + if (EmptyKit.isEmpty(targetPath) && CollectionUtils.isNotEmpty(mainTableKeys)) { + effectiveSharedJoinKeys = new HashSet<>(); + if (sharedJoinKeys != null) { + effectiveSharedJoinKeys.addAll(sharedJoinKeys); + } + effectiveSharedJoinKeys.addAll(mainTableKeys); + } + Document unsetDoc = buildUnsetDocument(effectiveSharedJoinKeys, after, targetPath, isArray, firstMergeResult); Document update = mergeResult.getUpdate(); if (update.containsKey(UNSET_KEY)) { update.get(UNSET_KEY, Document.class).putAll(unsetDoc); @@ -854,12 +898,7 @@ protected static Document unsetFilter(Map before, Map joinKey : joinKeys) { String key = joinKey.get("target"); - Object value; - if (topLevel == 1) { - value = MapUtil.getValueByKey(after, key); - } else { - value = MapUtil.getValueByKey(before, key); - } + Object value = MapUtil.getValueByKey(before, key); document.put(key, value); } return document;