diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbExecuteCommandFunction.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbExecuteCommandFunction.java index c181401c..fce379de 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbExecuteCommandFunction.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbExecuteCommandFunction.java @@ -169,11 +169,8 @@ private AggregateIterable getAggregateIterable(Map exe ExecuteObject executeObject = new ExecuteObject(executeObj); String database = executeObject.getDatabase(); String collection = executeObject.getCollection(); - if (collection == null || collection.isEmpty()) { - throw new RuntimeException(String.format("Process execute %s failed, collection name cannot be blank", executeObject)); - } List pipelines = executeObject.getPipeline(); - if (pipelines.isEmpty()) { + if (pipelines == null || pipelines.isEmpty()) { throw new RuntimeException(String.format("Process execute %s failed, pipeline cannot be blank", executeObject)); } boolean allowDiskUse = true; @@ -189,7 +186,11 @@ private AggregateIterable getAggregateIterable(Map exe // ignored } - return mongoClient.getDatabase(database).getCollection(collection).aggregate(pipelines).allowDiskUse(allowDiskUse); + MongoDatabase mongoDatabase = mongoClient.getDatabase(database); + if (collection == null || collection.isEmpty()) { + return mongoDatabase.aggregate(pipelines).allowDiskUse(allowDiskUse); + } + return mongoDatabase.getCollection(collection).aggregate(pipelines).allowDiskUse(allowDiskUse); } public Object aggregate(Map executeObj, MongoClient mongoClient) {