From ed327a74efe8046ecb45a76afd31c0ea9dd20e8c Mon Sep 17 00:00:00 2001 From: hardy <2545433047@qq.com> Date: Tue, 21 Apr 2026 18:51:16 +0800 Subject: [PATCH] feat:TAP-11093 Aggregation node optimization --- .../mongodb/MongodbExecuteCommandFunction.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbExecuteCommandFunction.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbExecuteCommandFunction.java index c181401c9..fce379deb 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbExecuteCommandFunction.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbExecuteCommandFunction.java @@ -169,11 +169,8 @@ private AggregateIterable getAggregateIterable(Map exe ExecuteObject executeObject = new ExecuteObject(executeObj); String database = executeObject.getDatabase(); String collection = executeObject.getCollection(); - if (collection == null || collection.isEmpty()) { - throw new RuntimeException(String.format("Process execute %s failed, collection name cannot be blank", executeObject)); - } List pipelines = executeObject.getPipeline(); - if (pipelines.isEmpty()) { + if (pipelines == null || pipelines.isEmpty()) { throw new RuntimeException(String.format("Process execute %s failed, pipeline cannot be blank", executeObject)); } boolean allowDiskUse = true; @@ -189,7 +186,11 @@ private AggregateIterable getAggregateIterable(Map exe // ignored } - return mongoClient.getDatabase(database).getCollection(collection).aggregate(pipelines).allowDiskUse(allowDiskUse); + MongoDatabase mongoDatabase = mongoClient.getDatabase(database); + if (collection == null || collection.isEmpty()) { + return mongoDatabase.aggregate(pipelines).allowDiskUse(allowDiskUse); + } + return mongoDatabase.getCollection(collection).aggregate(pipelines).allowDiskUse(allowDiskUse); } public Object aggregate(Map executeObj, MongoClient mongoClient) {