Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion connectors/mongodb-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

<properties>
<java.version>8</java.version>
<tapdata.pdk.api.version>2.0.7-SNAPSHOT</tapdata.pdk.api.version>
<tapdata.pdk.api.version>2.0.6-SNAPSHOT</tapdata.pdk.api.version>
</properties>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ protected MongoCollection<RawBsonDocument> getMongoRawCollection(String table) {
public void discoverSchema(TapConnectionContext connectionContext, List<String> tables, int tableSize, Consumer<List<TapTable>> consumer) throws Throwable {
final String database = mongoConfig.getDatabase();
final String version = MongodbUtil.getVersionString(mongoClient, database);
Map<String, String> collectionTypeMap = getCollectionTypeMap(tables);
MongoIterable<String> collectionNames = mongoDatabase.listCollectionNames();
TableFieldTypesGenerator tableFieldTypesGenerator = InstanceFactory.instance(TableFieldTypesGenerator.class);
this.stringTypeValueMap = new HashMap<>();
Expand Down Expand Up @@ -196,14 +195,8 @@ public void discoverSchema(TapConnectionContext connectionContext, List<String>
//List all the tables under the database.
List<TapTable> list = list();
nameList.forEach(name -> {
String collectionType = collectionTypeMap.get(name);
boolean isView = isViewCollection(collectionType);
TapTable table = new TapTable(name);
if(!isView){
table.defaultPrimaryKeys("_id");
}else{
table.setType(collectionType);
}
table.defaultPrimaryKeys("_id");
MongoCollection<?> collection = documentMap.get(name);
try {
MongodbUtil.sampleDataRow(collection, sampleSizeBatchSize, (dataRow) -> {
Expand All @@ -222,48 +215,46 @@ public void discoverSchema(TapConnectionContext connectionContext, List<String>
MongodbUtil.maskUriPassword(mongoConfig.getUri()), name, e.getMessage(), e);
}

if(!isView){
collection.listIndexes().forEach((index) -> {
Object keyObj = index.get("key");
if (!(keyObj instanceof Document)) {
return;
}
Document keys = (Document) keyObj;

TapIndex tapIndex = new TapIndex();
// TODO: TapIndex struct not enough to represent index, so we encode index info in name
tapIndex.setName("__t__" + ((Document) index).toJson());

AtomicBoolean haveOid = new AtomicBoolean();
AtomicInteger keyCounter = new AtomicInteger();
keys.forEach((k, v) -> {
TapIndexField tapIndexField = new TapIndexField().name(k);
if (v instanceof Integer) {
tapIndexField.fieldAsc(v.equals(1));
} else {
tapIndexField.fieldAsc(true);
}
tapIndex.indexField(tapIndexField);
if (k.equals("_id")) {
haveOid.set(true);
}
keyCounter.incrementAndGet();
});
if (Boolean.TRUE.equals(index.get(UNIQUE_KEY))) {
tapIndex.unique(true);
collection.listIndexes().forEach((index) -> {
Object keyObj = index.get("key");
if (!(keyObj instanceof Document)) {
return;
}
Document keys = (Document) keyObj;

TapIndex tapIndex = new TapIndex();
// TODO: TapIndex struct not enough to represent index, so we encode index info in name
tapIndex.setName("__t__" + ((Document) index).toJson());

AtomicBoolean haveOid = new AtomicBoolean();
AtomicInteger keyCounter = new AtomicInteger();
keys.forEach((k, v) -> {
TapIndexField tapIndexField = new TapIndexField().name(k);
if (v instanceof Integer) {
tapIndexField.fieldAsc(v.equals(1));
} else {
tapIndex.unique(false);
tapIndexField.fieldAsc(true);
}
if (haveOid.get() && keyCounter.get() == 1) {
tapIndex.unique(true);
tapIndex.indexField(tapIndexField);
if (k.equals("_id")) {
haveOid.set(true);
}
TapLogger.info(TAG, "MongodbConnector discoverSchema table: {} index {}", name, ((Document) index).toJson());
table.add(tapIndex);
keyCounter.incrementAndGet();
});
Map<String, Object> sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name);
MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys);
MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table);
}
if (Boolean.TRUE.equals(index.get(UNIQUE_KEY))) {
tapIndex.unique(true);
} else {
tapIndex.unique(false);
}
if (haveOid.get() && keyCounter.get() == 1) {
tapIndex.unique(true);
}
TapLogger.info(TAG, "MongodbConnector discoverSchema table: {} index {}", name, ((Document) index).toJson());
table.add(tapIndex);
});
Map<String, Object> sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name);
MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys);
MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table);
if (!Objects.isNull(table.getNameFieldMap()) && !table.getNameFieldMap().isEmpty()) {
list.add(table);
}
Expand All @@ -287,14 +278,8 @@ public void discoverSchema(TapConnectionContext connectionContext, List<String>
//List all the tables under the database.
List<TapTable> list = list();
nameList.forEach(name -> {
String collectionType = collectionTypeMap.get(name);
boolean isView = isViewCollection(collectionType);
TapTable table = new TapTable(name);
if(!isView){
table.defaultPrimaryKeys(singletonList(COLLECTION_ID_FIELD));
}else{
table.setType(collectionType);
}
table.defaultPrimaryKeys(singletonList(COLLECTION_ID_FIELD));
// save collection info which include capped info
try (MongoCursor<BsonDocument> cursor = documentMap.get(name).find().iterator()) {
while (cursor.hasNext()) {
Expand All @@ -307,11 +292,9 @@ public void discoverSchema(TapConnectionContext connectionContext, List<String>
break;
}
}
if(!isView){
Map<String, Object> sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name);
MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys);
MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table);
}
Map<String, Object> sharkedKeys = MongodbUtil.getCollectionSharkedKeys(mongoClient, database, name);
MongoShardUtil.saveCollectionStats(table, MongodbUtil.getCollectionStatus(mongoClient, database, name), sharkedKeys);
MongodbUtil.getTimeSeriesCollectionStatus(mongoClient, database, name,table);
if (!Objects.isNull(table.getNameFieldMap()) && !table.getNameFieldMap().isEmpty()) {
list.add(table);
}
Expand Down Expand Up @@ -1932,30 +1915,4 @@ protected void rollbackTransaction(TapConnectorContext connectorContext) {
return null;
});
}

private Map<String, String> getCollectionTypeMap(List<String> tables) {
Map<String, String> collectionTypeMap = new HashMap<>();
ListCollectionsIterable<Document> listCollections;
if (CollectionUtils.isNotEmpty(tables)) {
listCollections = mongoDatabase.listCollections().filter(new Document("name", new Document("$in", tables)));
} else {
listCollections = mongoDatabase.listCollections();
}
for (Document collection : listCollections) {
String name = collection.getString("name");
if (StringUtils.isBlank(name) || name.startsWith("system.")) {
continue;
}
String type = collection.getString("type");
if (StringUtils.isBlank(type)) {
type = "collection";
}
collectionTypeMap.put(name, type);
}
return collectionTypeMap;
}

private boolean isViewCollection(String collectionType) {
return "view".equalsIgnoreCase(collectionType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public static void recursiveMerge(
mergeFilter,
topLevel,
1,
updateKeyDiffInfos
updateKeyDiffInfos,
extractMainTableKeys(mergeBundle, topLevel)
);
}

Expand All @@ -142,6 +143,39 @@ public static void recursiveMerge(
int topLevel,
int loopTime,
List<MapDiffUtil.KeyDiffInfo> updateKeyDiffInfos
) {
recursiveMerge(mergeBundle, properties, mergeResults, mergeLookupResults, mergeResult, unsetResult,
updateJoinKeys, sharedJoinKeys, parentProperties, mergeFilter, topLevel, loopTime,
updateKeyDiffInfos, extractMainTableKeys(mergeBundle, topLevel));
}

private static Set<String> extractMainTableKeys(MergeBundle mergeBundle, int topLevel) {
if (topLevel != 1 || mergeBundle == null) {
return Collections.emptySet();
}
Map<String, Object> after = mergeBundle.getAfter();
Map<String, Object> source = MapUtils.isNotEmpty(after) ? after : mergeBundle.getBefore();
if (MapUtils.isEmpty(source)) {
return Collections.emptySet();
}
return new HashSet<>(source.keySet());
}

private static void recursiveMerge(
MergeBundle mergeBundle,
MergeTableProperties properties,
List<MergeResult> mergeResults,
List<MergeLookupResult> mergeLookupResults,
MergeResult mergeResult,
MergeResult unsetResult,
Map<String, MergeInfo.UpdateJoinKey> updateJoinKeys,
Set<String> sharedJoinKeys,
MergeTableProperties parentProperties,
MergeFilter mergeFilter,
int topLevel,
int loopTime,
List<MapDiffUtil.KeyDiffInfo> updateKeyDiffInfos,
Set<String> mainTableKeys
) {
boolean unsetResultNull = null == unsetResult;
MergeResult updateKeyDiff = null;
Expand All @@ -153,7 +187,7 @@ public static void recursiveMerge(
}
break;
case updateWrite:
unsetResult = updateWriteUnsetMerge(mergeBundle, properties, updateJoinKeys, unsetResult, sharedJoinKeys, mergeFilter, topLevel);
unsetResult = updateWriteUnsetMerge(mergeBundle, properties, updateJoinKeys, unsetResult, sharedJoinKeys, mergeFilter, topLevel, mainTableKeys);
if (unsetResultNull) {
addUnsetMerge(mergeResults, unsetResult);
}
Expand Down Expand Up @@ -198,7 +232,8 @@ public static void recursiveMerge(
mergeFilter,
topLevel,
privateLoopTime,
null
null,
mainTableKeys
);
recursiveOnce = true;
}
Expand Down Expand Up @@ -266,7 +301,8 @@ private static MergeResult addMergeResults(List<MergeResult> mergeResults, Merge
private static MergeResult updateWriteUnsetMerge(
MergeBundle mergeBundle, MergeTableProperties currentProperty,
Map<String, MergeInfo.UpdateJoinKey> updateJoinKeys,
MergeResult mergeResult, Set<String> sharedJoinKeys, MergeFilter mergeFilter, int topLevel) {
MergeResult mergeResult, Set<String> sharedJoinKeys, MergeFilter mergeFilter, int topLevel,
Set<String> mainTableKeys) {
if (null == currentProperty) {
return mergeResult;
}
Expand Down Expand Up @@ -314,7 +350,15 @@ private static MergeResult updateWriteUnsetMerge(
if (null == mergeResult.getOperation()) {
mergeResult.setOperation(MergeResult.Operation.UPDATE);
}
Document unsetDoc = buildUnsetDocument(sharedJoinKeys, after, targetPath, isArray, firstMergeResult);
Set<String> effectiveSharedJoinKeys = sharedJoinKeys;
if (EmptyKit.isEmpty(targetPath) && CollectionUtils.isNotEmpty(mainTableKeys)) {
effectiveSharedJoinKeys = new HashSet<>();
if (sharedJoinKeys != null) {
effectiveSharedJoinKeys.addAll(sharedJoinKeys);
}
effectiveSharedJoinKeys.addAll(mainTableKeys);
}
Document unsetDoc = buildUnsetDocument(effectiveSharedJoinKeys, after, targetPath, isArray, firstMergeResult);
Document update = mergeResult.getUpdate();
if (update.containsKey(UNSET_KEY)) {
update.get(UNSET_KEY, Document.class).putAll(unsetDoc);
Expand Down Expand Up @@ -854,12 +898,7 @@ protected static Document unsetFilter(Map<String, Object> before, Map<String, Ob
Document document = new Document();
for (Map<String, String> joinKey : joinKeys) {
String key = joinKey.get("target");
Object value;
if (topLevel == 1) {
value = MapUtil.getValueByKey(after, key);
} else {
value = MapUtil.getValueByKey(before, key);
}
Object value = MapUtil.getValueByKey(before, key);
document.put(key, value);
}
return document;
Expand Down