diff --git a/connectors-common/cdc-core/pom.xml b/connectors-common/cdc-core/pom.xml
new file mode 100644
index 000000000..56e0f6626
--- /dev/null
+++ b/connectors-common/cdc-core/pom.xml
@@ -0,0 +1,29 @@
+
+
+ 4.0.0
+
+ io.tapdata
+ connectors-common
+ 1.0-SNAPSHOT
+
+
+ cdc-core
+
+
+ 17
+ 17
+ UTF-8
+ 2.0.5-SNAPSHOT
+
+
+
+
+ io.tapdata
+ tapdata-pdk-api
+ ${tapdata.pdk.api.verison}
+
+
+
+
\ No newline at end of file
diff --git a/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/Acceptor.java b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/Acceptor.java
new file mode 100644
index 000000000..453f04dd1
--- /dev/null
+++ b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/Acceptor.java
@@ -0,0 +1,29 @@
+package io.tapdata.cdc;
+
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
+
+import java.util.List;
+
+public interface Acceptor> {
+
+ void accept(EType e);
+
+ void accept(List e, Object offset);
+
+ Acc setConsumer(Consumer consumer);
+
+ Acc setBatchSize(int size);
+
+ Acc setBatchSizeTimeout(long ms);
+
+ void streamReadStarted();
+
+ void streamReadEnded();
+
+ default int getBatchSize() {
+ return 1;
+ }
+
+ Consumer getConsumer();
+}
diff --git a/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/CustomAbstractAccepter.java b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/CustomAbstractAccepter.java
new file mode 100644
index 000000000..65e716396
--- /dev/null
+++ b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/CustomAbstractAccepter.java
@@ -0,0 +1,13 @@
+package io.tapdata.cdc;
+
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 15:03 Create
+ * @description
+ */
+public abstract class CustomAbstractAccepter, Consumer extends TapStreamReadConsumer, Object>>
+ implements Acceptor {
+}
diff --git a/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/EventAbstractAccepter.java b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/EventAbstractAccepter.java
new file mode 100644
index 000000000..ef270ac1e
--- /dev/null
+++ b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/EventAbstractAccepter.java
@@ -0,0 +1,13 @@
+package io.tapdata.cdc;
+
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 15:00 Create
+ * @description
+ */
+public abstract class EventAbstractAccepter, Consumer extends TapStreamReadConsumer, Object>> extends CustomAbstractAccepter {
+}
\ No newline at end of file
diff --git a/connectors-common/pom.xml b/connectors-common/pom.xml
index fb3509acf..8e729463c 100644
--- a/connectors-common/pom.xml
+++ b/connectors-common/pom.xml
@@ -22,6 +22,7 @@
js-connector-core-plus
read-partition
hive-core
+ cdc-core
diff --git a/connectors-common/postgres-core/pom.xml b/connectors-common/postgres-core/pom.xml
index 24fd0f92b..9e0c817bd 100644
--- a/connectors-common/postgres-core/pom.xml
+++ b/connectors-common/postgres-core/pom.xml
@@ -21,7 +21,7 @@
1.0-SNAPSHOT
1.5.4.Final
1.0-SNAPSHOT
- 2.0.1-SNAPSHOT
+ 2.0.5-SNAPSHOT
1.2.83
@@ -124,6 +124,11 @@
commons-compress
1.27.1
+
+ io.tapdata
+ cdc-core
+ 1.0-SNAPSHOT
+
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java
index bf81703d5..e28568fec 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java
@@ -1,8 +1,11 @@
package io.tapdata.connector.postgres.cdc;
import com.google.common.collect.Lists;
+import io.tapdata.cdc.CustomAbstractAccepter;
import io.tapdata.common.sqlparser.ResultDO;
import io.tapdata.connector.postgres.PostgresJdbcContext;
+import io.tapdata.connector.postgres.cdc.accept.LogMinerProBatchAccepter;
+import io.tapdata.connector.postgres.cdc.accept.LogMinerProOneByOneAccepter;
import io.tapdata.connector.postgres.config.PostgresConfig;
import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.dml.TapDeleteRecordEvent;
@@ -18,6 +21,8 @@
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.StringKit;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import java.math.BigDecimal;
import java.sql.ResultSet;
@@ -25,7 +30,14 @@
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -35,8 +47,8 @@ public abstract class AbstractWalLogMiner {
protected final PostgresJdbcContext postgresJdbcContext;
protected final Log tapLogger;
- protected StreamReadConsumer consumer;
- protected int recordSize;
+ //protected StreamReadConsumer consumer;
+ protected CustomAbstractAccepter consumer;
protected List tableList;
protected boolean filterSchema;
private Map dataTypeMap;
@@ -95,9 +107,26 @@ public AbstractWalLogMiner withWalLogDirectory(String walLogDirectory) {
public abstract void startMiner(Supplier isAlive) throws Throwable;
+ /**
+ * @deprecated
+ * */
public AbstractWalLogMiner registerConsumer(StreamReadConsumer consumer, int recordSize) {
- this.consumer = consumer;
- this.recordSize = recordSize;
+ return registerCdcConsumer(consumer, recordSize);
+ }
+
+ public AbstractWalLogMiner registerCdcConsumer(TapStreamReadConsumer, Object> consumer, int recordSize) {
+ if (consumer instanceof StreamReadConsumer) {
+ this.consumer = new LogMinerProBatchAccepter()
+ .setConsumer((StreamReadConsumer) consumer)
+ .setBatchSize(recordSize)
+ .setEventCreator(this::createEvent);
+ } else if (consumer instanceof StreamReadOneByOneConsumer) {
+ this.consumer = new LogMinerProOneByOneAccepter()
+ .setConsumer((StreamReadOneByOneConsumer) consumer)
+ .setEventCreator(this::createEvent);
+ } else {
+ throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
+ }
return this;
}
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java
index f3f51840e..cbb00f90c 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java
@@ -6,11 +6,12 @@
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.tapdata.connector.postgres.PostgresJdbcContext;
+import io.tapdata.connector.postgres.cdc.accept.DebeziumBatchAccepter;
+import io.tapdata.connector.postgres.cdc.accept.DebeziumOneByOneAccepter;
+import io.tapdata.connector.postgres.cdc.accept.PGEventAbstractAccepter;
import io.tapdata.connector.postgres.cdc.config.PostgresDebeziumConfig;
import io.tapdata.connector.postgres.cdc.offset.PostgresOffset;
-import io.tapdata.connector.postgres.cdc.offset.PostgresOffsetStorage;
import io.tapdata.connector.postgres.config.PostgresConfig;
-import io.tapdata.entity.event.TapEvent;
import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.event.dml.TapDeleteRecordEvent;
import io.tapdata.entity.event.dml.TapInsertRecordEvent;
@@ -19,7 +20,6 @@
import io.tapdata.entity.logger.TapLogger;
import io.tapdata.entity.schema.TapTable;
import io.tapdata.entity.schema.partition.TapSubPartitionTableInfo;
-import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.entity.utils.DataMap;
import io.tapdata.entity.utils.cache.Entry;
import io.tapdata.entity.utils.cache.Iterator;
@@ -27,6 +27,8 @@
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.NumberKit;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -39,8 +41,16 @@
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneOffset;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@@ -57,8 +67,8 @@ public class PostgresCdcRunner extends DebeziumCdcRunner {
private final TapConnectorContext connectorContext;
private PostgresDebeziumConfig postgresDebeziumConfig;
private PostgresOffset postgresOffset;
- private int recordSize;
- private StreamReadConsumer consumer;
+ //private StreamReadConsumer consumer;
+ protected PGEventAbstractAccepter, ?> consumer;
private final AtomicReference throwableAtomicReference = new AtomicReference<>();
protected TimeZone timeZone;
private String dropTransactionId = null;
@@ -145,9 +155,27 @@ public AtomicReference getThrowable() {
return throwableAtomicReference;
}
+ /**
+ * @deprecated
+ * */
public void registerConsumer(StreamReadConsumer consumer, int recordSize) {
- this.recordSize = recordSize;
- this.consumer = consumer;
+ registerCdcConsumer(consumer, recordSize);
+ }
+
+ public void registerCdcConsumer(TapStreamReadConsumer, Object> consumer, int recordSize) {
+ Supplier batchSizeGetter;
+ if (consumer instanceof StreamReadConsumer) {
+ this.consumer = new DebeziumBatchAccepter()
+ .setConsumer((StreamReadConsumer) consumer)
+ .setBatchSize(recordSize);
+ batchSizeGetter = () -> recordSize;
+ } else if (consumer instanceof StreamReadOneByOneConsumer) {
+ this.consumer = new DebeziumOneByOneAccepter()
+ .setConsumer((StreamReadOneByOneConsumer) consumer);
+ batchSizeGetter = () -> ((StreamReadOneByOneConsumer) consumer).getBatchSize();
+ } else {
+ throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
+ }
//build debezium engine
this.engine = (EmbeddedEngine) EmbeddedEngine.create()
.using(postgresDebeziumConfig.create())
@@ -167,8 +195,12 @@ public void taskStopped() {
// .using(this.getClass().getClassLoader())
// .using(Clock.SYSTEM)
// .notifying(this::consumeRecord)
- .using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) ->
- numberOfMessagesSinceLastCommit >= recordSize || timeSinceLastCommit.getSeconds() >= 5)
+ .using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) -> {
+ int size = Math.min(Math.max(1, batchSizeGetter.get()), 10000);
+ //超时时间最小1秒,最大10秒
+ int timeout = Math.min(Math.max(1, size / 100), 5);
+ return numberOfMessagesSinceLastCommit >= size || timeSinceLastCommit.getSeconds() >= timeout;
+ })
.notifying(this::consumeRecords).using((result, message, throwable) -> {
if (result) {
if (StringUtils.isNotBlank(message)) {
@@ -195,7 +227,7 @@ public void taskStopped() {
@Override
public void consumeRecords(List sourceRecords, DebeziumEngine.RecordCommitter committer) throws InterruptedException {
super.consumeRecords(sourceRecords, committer);
- List eventList = TapSimplify.list();
+ //List eventList = TapSimplify.list();
Map offset = null;
for (SourceRecord sr : sourceRecords) {
try {
@@ -207,7 +239,8 @@ public void consumeRecords(List sourceRecords, DebeziumEngine.Reco
continue;
}
if ("io.debezium.connector.common.Heartbeat".equals(sr.valueSchema().name())) {
- eventList.add(new HeartbeatEvent().init().referenceTime(((Struct) sr.value()).getInt64("ts_ms")));
+ consumer.updateOffset(offset)
+ .accept(new HeartbeatEvent().init().referenceTime(((Struct) sr.value()).getInt64("ts_ms")));
continue;
} else if (EmptyKit.isNull(sr.valueSchema().field("op"))) {
continue;
@@ -260,22 +293,12 @@ public void consumeRecords(List sourceRecords, DebeziumEngine.Reco
event.setNamespaces(Lists.newArrayList(schema, table));
}
}
- eventList.add(event);
- if (eventList.size() >= recordSize) {
- PostgresOffset postgresOffset = new PostgresOffset();
- postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
- consumer.accept(eventList, postgresOffset);
- eventList = TapSimplify.list();
- }
+ consumer.updateOffset(offset).accept(event);
} catch (StopConnectorException | StopEngineException ex) {
throw ex;
}
}
- if (EmptyKit.isNotEmpty(eventList)) {
- PostgresOffset postgresOffset = new PostgresOffset();
- postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
- consumer.accept(eventList, postgresOffset);
- }
+ consumer.updateOffset(offset).accept(null);
}
private DataMap getMapFromStruct(Struct struct) {
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java
index 38d91ffb1..9311754f4 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java
@@ -3,22 +3,25 @@
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.connector.postgres.PostgresJdbcContext;
-import io.tapdata.entity.event.TapEvent;
+import io.tapdata.connector.postgres.cdc.accept.LogMinerBatchAccepter;
+import io.tapdata.connector.postgres.cdc.accept.LogMinerOneByOneAccepter;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.exception.TapPdkOffsetOutOfLogEx;
import io.tapdata.kit.EmptyKit;
+import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.List;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.Timestamp;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import static io.tapdata.base.ConnectorBase.list;
-
public class WalLogMiner extends AbstractWalLogMiner {
private String walFile;
@@ -29,6 +32,22 @@ public WalLogMiner(PostgresJdbcContext postgresJdbcContext, Log tapLogger) {
super(postgresJdbcContext, tapLogger);
}
+ public AbstractWalLogMiner registerCdcConsumer(TapStreamReadConsumer, Object> consumer, int recordSize) {
+ if (consumer instanceof StreamReadConsumer) {
+ this.consumer = new LogMinerBatchAccepter()
+ .setConsumer((StreamReadConsumer) consumer)
+ .setBatchSize(recordSize)
+ .setEventCreator(this::createEvent);
+ } else if (consumer instanceof StreamReadOneByOneConsumer) {
+ this.consumer = new LogMinerOneByOneAccepter()
+ .setConsumer((StreamReadOneByOneConsumer) consumer)
+ .setEventCreator(this::createEvent);
+ } else {
+ throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
+ }
+ return this;
+ }
+
public WalLogMiner offset(Object offsetState) {
if (offsetState instanceof String) {
String[] fileAndLsn = ((String) offsetState).split(",");
@@ -64,24 +83,10 @@ public void startMiner(Supplier isAlive) throws Throwable {
}
Thread t = new Thread(() -> {
consumer.streamReadStarted();
- NormalRedo lastRedo = null;
- AtomicReference> events = new AtomicReference<>(list());
while (isAlive.get()) {
try {
NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS);
- if (EmptyKit.isNotNull(redo)) {
- lastRedo = redo;
- events.get().add(createEvent(redo));
- if (events.get().size() >= recordSize) {
- consumer.accept(events.get(), redo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- } else {
- if (events.get().size() > 0) {
- consumer.accept(events.get(), lastRedo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- }
+ consumer.accept(redo);
} catch (Exception e) {
threadException.set(e);
return;
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java
index e467fbd56..3acd0993f 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java
@@ -3,8 +3,6 @@
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.connector.postgres.PostgresJdbcContext;
-import io.tapdata.entity.event.TapEvent;
-import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.kit.EmptyKit;
@@ -21,8 +19,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import static io.tapdata.base.ConnectorBase.list;
-
public class WalLogMinerV2 extends AbstractWalLogMiner {
private String startLsn;
@@ -46,28 +42,10 @@ public void startMiner(Supplier isAlive) throws Throwable {
ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner");
Thread t = new Thread(() -> {
consumer.streamReadStarted();
- NormalRedo lastRedo = null;
- AtomicReference> events = new AtomicReference<>(list());
while (isAlive.get()) {
try {
NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS);
- if (EmptyKit.isNotNull(redo)) {
- if (EmptyKit.isNotNull(redo.getOperation())) {
- lastRedo = redo;
- events.get().add(createEvent(redo));
- if (events.get().size() >= recordSize) {
- consumer.accept(events.get(), redo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- } else {
- consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr());
- }
- } else {
- if (events.get().size() > 0) {
- consumer.accept(events.get(), lastRedo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- }
+ consumer.accept(redo);
} catch (Exception e) {
threadException.set(e);
return;
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java
index 605eee1ce..9bfeca62e 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java
@@ -3,8 +3,6 @@
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.connector.postgres.PostgresJdbcContext;
-import io.tapdata.entity.event.TapEvent;
-import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.kit.EmptyKit;
@@ -15,16 +13,11 @@
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import static io.tapdata.base.ConnectorBase.list;
-
public class WalLogMinerV3 extends AbstractWalLogMiner {
private String timestamp;
@@ -42,28 +35,10 @@ public void startMiner(Supplier isAlive) throws Throwable {
ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner");
Thread t = new Thread(() -> {
consumer.streamReadStarted();
- NormalRedo lastRedo = null;
- AtomicReference> events = new AtomicReference<>(list());
while (isAlive.get()) {
try {
NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS);
- if (EmptyKit.isNotNull(redo)) {
- if (EmptyKit.isNotNull(redo.getOperation())) {
- lastRedo = redo;
- events.get().add(createEvent(redo));
- if (events.get().size() >= recordSize) {
- consumer.accept(events.get(), redo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- } else {
- consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr());
- }
- } else {
- if (events.get().size() > 0) {
- consumer.accept(events.get(), lastRedo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- }
+ consumer.accept(redo);
} catch (Exception e) {
threadException.set(e);
return;
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java
index 1a278bbc6..4c882722d 100644
--- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java
@@ -1,24 +1,19 @@
package io.tapdata.connector.postgres.cdc;
import com.alibaba.fastjson.JSONObject;
-import io.tapdata.base.ConnectorBase;
import io.tapdata.common.concurrent.ConcurrentProcessor;
import io.tapdata.common.concurrent.TapExecutors;
import io.tapdata.common.sqlparser.ResultDO;
import io.tapdata.connector.postgres.PostgresJdbcContext;
-import io.tapdata.entity.event.TapEvent;
-import io.tapdata.entity.event.control.HeartbeatEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.kit.EmptyKit;
import io.tapdata.kit.HttpKit;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class WalPgtoMiner extends AbstractWalLogMiner {
@@ -65,28 +60,10 @@ public void startMiner(Supplier isAlive) throws Throwable {
try (ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner")) {
Thread t = new Thread(() -> {
consumer.streamReadStarted();
- NormalRedo lastRedo = null;
- AtomicReference> events = new AtomicReference<>(ConnectorBase.list());
while (isAlive.get()) {
try {
NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS);
- if (EmptyKit.isNotNull(redo)) {
- if (EmptyKit.isNotNull(redo.getOperation())) {
- lastRedo = redo;
- events.get().add(createEvent(redo));
- if (events.get().size() >= recordSize) {
- consumer.accept(events.get(), redo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- } else {
- consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr());
- }
- } else {
- if (!events.get().isEmpty()) {
- consumer.accept(events.get(), lastRedo.getCdcSequenceStr());
- events.set(new ArrayList<>());
- }
- }
+ consumer.accept(redo);
} catch (Exception e) {
threadException.set(e);
return;
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumBatchAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumBatchAccepter.java
new file mode 100644
index 000000000..384f76970
--- /dev/null
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumBatchAccepter.java
@@ -0,0 +1,83 @@
+package io.tapdata.connector.postgres.cdc.accept;
+
+import io.tapdata.connector.postgres.cdc.offset.PostgresOffset;
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.entity.simplify.TapSimplify;
+import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 14:19 Create
+ * @description
+ */
+public class DebeziumBatchAccepter
+ extends PGEventAbstractAccepter {
+ List eventList = new ArrayList<>();
+ StreamReadConsumer consumer;
+ int batchSize = 1;
+ long batchSizeTimeout = 1000L;
+
+ @Override
+ public void accept(TapEvent e) {
+ if (null != e) {
+ eventList.add(e);
+ } else {
+ if (!eventList.isEmpty()) {
+ acceptOnce();
+ }
+ }
+ if (eventList.size() >= getBatchSize()) {
+ acceptOnce();
+ }
+ }
+
+ void acceptOnce() {
+ PostgresOffset postgresOffset = new PostgresOffset();
+ postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
+ getConsumer().accept(eventList, postgresOffset);
+ eventList = TapSimplify.list();
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ getConsumer().accept(e, offset);
+ }
+
+ @Override
+ public DebeziumBatchAccepter setConsumer(StreamReadConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public DebeziumBatchAccepter setBatchSize(int size) {
+ this.batchSize = size;
+ return this;
+ }
+
+ @Override
+ public DebeziumBatchAccepter setBatchSizeTimeout(long ms) {
+ this.batchSizeTimeout = ms;
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+ @Override
+ public StreamReadConsumer getConsumer() {
+ return this.consumer;
+ }
+}
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumOneByOneAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumOneByOneAccepter.java
new file mode 100644
index 000000000..af6ae2757
--- /dev/null
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumOneByOneAccepter.java
@@ -0,0 +1,86 @@
+package io.tapdata.connector.postgres.cdc.accept;
+
+import io.tapdata.connector.postgres.cdc.offset.PostgresOffset;
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.entity.simplify.TapSimplify;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+
+import java.util.List;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 14:20 Create
+ * @description
+ */
+public class DebeziumOneByOneAccepter
+ extends PGEventAbstractAccepter {
+ int acceptCount = 0;
+ boolean first = true;
+ StreamReadOneByOneConsumer consumer;
+ int batchSize = 1;
+ long batchSizeTimeout = 1000L;
+
+ @Override
+ public void accept(TapEvent e) {
+ if (null == e) {
+ return;
+ }
+ acceptCount++;
+ boolean calcOffset = first || acceptCount >= getBatchSize();
+ PostgresOffset postgresOffset = null;
+ if (calcOffset) {
+ postgresOffset = new PostgresOffset();
+ postgresOffset.setSourceOffset(TapSimplify.toJson(offset));
+ acceptCount = 0;
+ }
+ if (first) {
+ first = false;
+ }
+ getConsumer().accept(e, postgresOffset);
+ }
+
+
+ @Override
+ public void accept(List e, Object offset) {
+ getConsumer().accept(e, offset);
+ }
+
+ @Override
+ public DebeziumOneByOneAccepter setConsumer(StreamReadOneByOneConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public DebeziumOneByOneAccepter setBatchSize(int size) {
+ this.batchSize = size;
+ return this;
+ }
+
+ @Override
+ public DebeziumOneByOneAccepter setBatchSizeTimeout(long ms) {
+ this.batchSizeTimeout = ms;
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ getConsumer().streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ getConsumer().streamReadEnded();
+ }
+
+ @Override
+ public StreamReadOneByOneConsumer getConsumer() {
+ return this.consumer;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return getConsumer().getBatchSize();
+ }
+}
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerBatchAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerBatchAccepter.java
new file mode 100644
index 000000000..96b418c51
--- /dev/null
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerBatchAccepter.java
@@ -0,0 +1,91 @@
+package io.tapdata.connector.postgres.cdc.accept;
+
+import io.tapdata.cdc.CustomAbstractAccepter;
+import io.tapdata.connector.postgres.cdc.NormalRedo;
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.kit.EmptyKit;
+import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 14:19 Create
+ * @description
+ */
+public class LogMinerBatchAccepter
+ extends CustomAbstractAccepter {
+ protected NormalRedo lastRedo;
+ protected List events = new ArrayList<>();
+ protected Function eventCreator;
+ protected StreamReadConsumer consumer;
+ protected int batchSize = 1;
+ protected long batchSizeTimeout = 1000L;
+
+ @Override
+ public void accept(NormalRedo redo) {
+ if (EmptyKit.isNotNull(redo)) {
+ lastRedo = redo;
+ TapEvent e = eventCreator.apply(redo);
+ if (null == e) {
+ return;
+ }
+ events.add(e);
+ if (events.size() >= getBatchSize()) {
+ getConsumer().accept(events, redo.getCdcSequenceStr());
+ events = new ArrayList<>();
+ }
+ } else {
+ if (!events.isEmpty()) {
+ getConsumer().accept(events, lastRedo.getCdcSequenceStr());
+ events = new ArrayList<>();
+ }
+ }
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ getConsumer().accept(e, offset);
+ }
+
+ @Override
+ public LogMinerBatchAccepter setConsumer(StreamReadConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public LogMinerBatchAccepter setBatchSize(int size) {
+ this.batchSize = size;
+ return this;
+ }
+
+ @Override
+ public LogMinerBatchAccepter setBatchSizeTimeout(long ms) {
+ this.batchSizeTimeout = ms;
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+ @Override
+ public StreamReadConsumer getConsumer() {
+ return this.consumer;
+ }
+
+ public LogMinerBatchAccepter setEventCreator(Function eventCreator) {
+ this.eventCreator = eventCreator;
+ return this;
+ }
+}
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerOneByOneAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerOneByOneAccepter.java
new file mode 100644
index 000000000..8d023ce5b
--- /dev/null
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerOneByOneAccepter.java
@@ -0,0 +1,84 @@
+package io.tapdata.connector.postgres.cdc.accept;
+
+import io.tapdata.cdc.CustomAbstractAccepter;
+import io.tapdata.connector.postgres.cdc.NormalRedo;
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.kit.EmptyKit;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 14:19 Create
+ * @description
+ */
+public class LogMinerOneByOneAccepter
+ extends CustomAbstractAccepter {
+ protected Function eventCreator;
+ protected StreamReadOneByOneConsumer consumer;
+ protected long batchSizeTimeout = 1000L;
+ protected int batchSize = 1;
+
+ @Override
+ public void accept(NormalRedo redo) {
+ if (EmptyKit.isNotNull(redo)) {
+ TapEvent e = eventCreator.apply(redo);
+ if (null == e) {
+ return;
+ }
+ getConsumer().accept(e, redo.getCdcSequenceStr());
+ }
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ getConsumer().accept(e, offset);
+ }
+
+ @Override
+ public LogMinerOneByOneAccepter setConsumer(StreamReadOneByOneConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public LogMinerOneByOneAccepter setBatchSize(int size) {
+ this.batchSize = size;
+ return this;
+ }
+
+ @Override
+ public LogMinerOneByOneAccepter setBatchSizeTimeout(long ms) {
+ this.batchSizeTimeout = ms;
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+ @Override
+ public StreamReadOneByOneConsumer getConsumer() {
+ return this.consumer;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return Optional.ofNullable(getConsumer()).map(StreamReadOneByOneConsumer::getBatchSize).orElse(1);
+ }
+
+ public LogMinerOneByOneAccepter setEventCreator(Function eventCreator) {
+ this.eventCreator = eventCreator;
+ return this;
+ }
+}
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProBatchAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProBatchAccepter.java
new file mode 100644
index 000000000..2e7a2793c
--- /dev/null
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProBatchAccepter.java
@@ -0,0 +1,42 @@
+package io.tapdata.connector.postgres.cdc.accept;
+
+import io.tapdata.connector.postgres.cdc.NormalRedo;
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.entity.event.control.HeartbeatEvent;
+import io.tapdata.kit.EmptyKit;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 16:11 Create
+ * @description
+ */
+public class LogMinerProBatchAccepter extends LogMinerBatchAccepter {
+ @Override
+ public void accept(NormalRedo redo) {
+ if (EmptyKit.isNotNull(redo)) {
+ if (EmptyKit.isNotNull(redo.getOperation())) {
+ lastRedo = redo;
+ TapEvent e = eventCreator.apply(redo);
+ if (null == e) {
+ return;
+ }
+ events.add(e);
+ if (events.size() >= getBatchSize()) {
+ getConsumer().accept(events, redo.getCdcSequenceStr());
+ events = new ArrayList<>();
+ }
+ } else {
+ consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr());
+ }
+ } else {
+ if (!events.isEmpty()) {
+ getConsumer().accept(events, lastRedo.getCdcSequenceStr());
+ events = new ArrayList<>();
+ }
+ }
+ }
+}
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProOneByOneAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProOneByOneAccepter.java
new file mode 100644
index 000000000..9681c1894
--- /dev/null
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProOneByOneAccepter.java
@@ -0,0 +1,30 @@
+package io.tapdata.connector.postgres.cdc.accept;
+
+import io.tapdata.connector.postgres.cdc.NormalRedo;
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.entity.event.control.HeartbeatEvent;
+import io.tapdata.kit.EmptyKit;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 16:11 Create
+ * @description
+ */
+public class LogMinerProOneByOneAccepter extends LogMinerOneByOneAccepter {
+ @Override
+ public void accept(NormalRedo redo) {
+ if (EmptyKit.isNotNull(redo)) {
+ TapEvent e = null;
+ if (EmptyKit.isNotNull(redo.getOperation())) {
+ e = eventCreator.apply(redo);
+ } else {
+ e = new HeartbeatEvent().init().referenceTime(System.currentTimeMillis());
+ }
+ if (null == e) {
+ return;
+ }
+ getConsumer().accept(e, redo.getCdcSequenceStr());
+ }
+ }
+}
diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/PGEventAbstractAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/PGEventAbstractAccepter.java
new file mode 100644
index 000000000..97e728028
--- /dev/null
+++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/PGEventAbstractAccepter.java
@@ -0,0 +1,21 @@
+package io.tapdata.connector.postgres.cdc.accept;
+
+import io.tapdata.cdc.EventAbstractAccepter;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
+
+import java.util.Map;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/10 17:16 Create
+ * @description
+ */
+public abstract class PGEventAbstractAccepter, Consumer extends TapStreamReadConsumer, Object>> extends EventAbstractAccepter {
+ protected Map offset;
+
+ public T updateOffset(Map offset) {
+ this.offset = offset;
+ return (T) this;
+ }
+}
diff --git a/connectors/aliyun-adb-postgres-connector/pom.xml b/connectors/aliyun-adb-postgres-connector/pom.xml
index fde69c624..6b14c53dc 100644
--- a/connectors/aliyun-adb-postgres-connector/pom.xml
+++ b/connectors/aliyun-adb-postgres-connector/pom.xml
@@ -22,7 +22,7 @@
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/aliyun-rds-postgres-connector/pom.xml b/connectors/aliyun-rds-postgres-connector/pom.xml
index cba9f8240..d2882f881 100644
--- a/connectors/aliyun-rds-postgres-connector/pom.xml
+++ b/connectors/aliyun-rds-postgres-connector/pom.xml
@@ -23,7 +23,7 @@
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/dws-connector/pom.xml b/connectors/dws-connector/pom.xml
index 231865362..351e14beb 100644
--- a/connectors/dws-connector/pom.xml
+++ b/connectors/dws-connector/pom.xml
@@ -15,7 +15,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/greenplum-connector/pom.xml b/connectors/greenplum-connector/pom.xml
index f1948d02e..073777d39 100644
--- a/connectors/greenplum-connector/pom.xml
+++ b/connectors/greenplum-connector/pom.xml
@@ -15,7 +15,7 @@
UTF-8
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/highgo-connector/pom.xml b/connectors/highgo-connector/pom.xml
index 6aec28edf..dbbd66358 100644
--- a/connectors/highgo-connector/pom.xml
+++ b/connectors/highgo-connector/pom.xml
@@ -15,7 +15,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
1.5.4.Final
diff --git a/connectors/mongodb-connector/pom.xml b/connectors/mongodb-connector/pom.xml
index b4fc0b6bb..6e7df67db 100644
--- a/connectors/mongodb-connector/pom.xml
+++ b/connectors/mongodb-connector/pom.xml
@@ -13,7 +13,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
@@ -129,6 +129,11 @@
nashorn-core
15.4
+
+ io.tapdata
+ cdc-core
+ 1.0-SNAPSHOT
+
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
index 916af764b..49e794a71 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java
@@ -1,10 +1,44 @@
package io.tapdata.mongodb;
import com.alibaba.fastjson.JSONObject;
-import com.mongodb.*;
+import com.mongodb.CreateIndexCommitQuorum;
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.MongoClientException;
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoConfigurationException;
+import com.mongodb.MongoConnectionPoolClearedException;
+import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.MongoException;
+import com.mongodb.MongoInterruptedException;
+import com.mongodb.MongoNodeIsRecoveringException;
+import com.mongodb.MongoNotPrimaryException;
+import com.mongodb.MongoQueryException;
+import com.mongodb.MongoSecurityException;
+import com.mongodb.MongoServerUnavailableException;
+import com.mongodb.MongoSocketClosedException;
+import com.mongodb.MongoSocketException;
+import com.mongodb.MongoSocketOpenException;
+import com.mongodb.MongoSocketReadException;
+import com.mongodb.MongoSocketReadTimeoutException;
+import com.mongodb.MongoSocketWriteException;
+import com.mongodb.MongoTimeoutException;
+import com.mongodb.MongoWriteConcernException;
+import com.mongodb.MongoWriteException;
import com.mongodb.bulk.BulkWriteError;
-import com.mongodb.client.*;
-import com.mongodb.client.model.*;
+import com.mongodb.client.ClientSession;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoCursor;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import com.mongodb.client.model.CreateCollectionOptions;
+import com.mongodb.client.model.CreateIndexOptions;
+import com.mongodb.client.model.IndexModel;
+import com.mongodb.client.model.IndexOptions;
+import com.mongodb.client.model.Sorts;
+import com.mongodb.client.model.TimeSeriesGranularity;
+import com.mongodb.client.model.TimeSeriesOptions;
import io.tapdata.base.ConnectorBase;
import io.tapdata.common.CommonDbConfig;
import io.tapdata.entity.codec.TapCodecsRegistry;
@@ -18,8 +52,21 @@
import io.tapdata.entity.event.dml.TapUpdateRecordEvent;
import io.tapdata.entity.logger.Log;
import io.tapdata.entity.logger.TapLogger;
-import io.tapdata.entity.schema.*;
-import io.tapdata.entity.schema.value.*;
+import io.tapdata.entity.schema.TapField;
+import io.tapdata.entity.schema.TapIndex;
+import io.tapdata.entity.schema.TapIndexEx;
+import io.tapdata.entity.schema.TapIndexField;
+import io.tapdata.entity.schema.TapTable;
+import io.tapdata.entity.schema.value.ByteData;
+import io.tapdata.entity.schema.value.DateTime;
+import io.tapdata.entity.schema.value.TapBinaryValue;
+import io.tapdata.entity.schema.value.TapDateTimeValue;
+import io.tapdata.entity.schema.value.TapDateValue;
+import io.tapdata.entity.schema.value.TapNumberValue;
+import io.tapdata.entity.schema.value.TapStringValue;
+import io.tapdata.entity.schema.value.TapTimeValue;
+import io.tapdata.entity.schema.value.TapValue;
+import io.tapdata.entity.schema.value.TapYearValue;
import io.tapdata.entity.simplify.TapSimplify;
import io.tapdata.entity.utils.DataMap;
import io.tapdata.entity.utils.InstanceFactory;
@@ -40,9 +87,20 @@
import io.tapdata.partition.DatabaseReadPartitionSplitter;
import io.tapdata.pdk.apis.annotations.TapConnectorClass;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
import io.tapdata.pdk.apis.context.TapConnectionContext;
import io.tapdata.pdk.apis.context.TapConnectorContext;
-import io.tapdata.pdk.apis.entity.*;
+import io.tapdata.pdk.apis.entity.Capability;
+import io.tapdata.pdk.apis.entity.ConnectionOptions;
+import io.tapdata.pdk.apis.entity.ExecuteResult;
+import io.tapdata.pdk.apis.entity.FilterResults;
+import io.tapdata.pdk.apis.entity.Projection;
+import io.tapdata.pdk.apis.entity.QueryOperator;
+import io.tapdata.pdk.apis.entity.SortOn;
+import io.tapdata.pdk.apis.entity.TapAdvanceFilter;
+import io.tapdata.pdk.apis.entity.TapExecuteCommand;
+import io.tapdata.pdk.apis.entity.TestItem;
+import io.tapdata.pdk.apis.entity.WriteListResult;
import io.tapdata.pdk.apis.exception.NotSupportedException;
import io.tapdata.pdk.apis.exception.TapTestItemException;
import io.tapdata.pdk.apis.functions.ConnectorFunctions;
@@ -57,16 +115,46 @@
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
-import org.bson.*;
+import org.bson.BsonArray;
+import org.bson.BsonBoolean;
+import org.bson.BsonDocument;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+import org.bson.BsonType;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.bson.RawBsonDocument;
import org.bson.conversions.Bson;
-import org.bson.types.*;
+import org.bson.types.Binary;
+import org.bson.types.Code;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
+import org.bson.types.Symbol;
import java.io.Closeable;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
@@ -75,7 +163,12 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import static com.mongodb.client.model.Filters.*;
+import static com.mongodb.client.model.Filters.and;
+import static com.mongodb.client.model.Filters.eq;
+import static com.mongodb.client.model.Filters.gt;
+import static com.mongodb.client.model.Filters.gte;
+import static com.mongodb.client.model.Filters.lt;
+import static com.mongodb.client.model.Filters.lte;
import static java.util.Collections.singletonList;
/**
@@ -554,6 +647,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportCreateIndex(this::createIndex);
connectorFunctions.supportCreateTableV2(this::createTableV2);
connectorFunctions.supportStreamRead(this::streamRead);
+ connectorFunctions.supportOneByOneStreamRead(this::streamReadOneByOne);
connectorFunctions.supportTimestampToStreamOffset(this::streamOffset);
connectorFunctions.supportErrorHandleFunction(this::errorHandle);
@@ -1640,19 +1734,32 @@ protected Object streamOffset(TapConnectorContext connectorContext, Long offsetS
protected void streamRead(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) {
int size = tableList.size();
MongoCdcOffset mongoCdcOffset = MongoCdcOffset.fromOffset(offset);
- streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer);
+ streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset());
if (size == tableList.size() || !tableList.isEmpty()) {
if (mongodbStreamReader == null) {
mongodbStreamReader = createStreamReader();
}
- mongodbStreamReader.onStart(mongoConfig);
- doStreamRead(mongodbStreamReader, connectorContext, tableList, mongoCdcOffset.getCdcOffset(), eventBatchSize, consumer);
+ mongodbStreamReader.initAcceptor(eventBatchSize, consumer).onStart(mongoConfig);
+ doStreamRead(mongodbStreamReader, connectorContext, tableList, mongoCdcOffset.getCdcOffset());
+ }
+ }
+ protected void streamReadOneByOne(TapConnectorContext connectorContext, List tableList, Object offset, StreamReadOneByOneConsumer consumer) {
+ int size = tableList.size();
+ MongoCdcOffset mongoCdcOffset = MongoCdcOffset.fromOffset(offset);
+ streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset());
+ if (size == tableList.size() || !tableList.isEmpty()) {
+ if (mongodbStreamReader == null) {
+ mongodbStreamReader = createStreamReader();
+ }
+ mongodbStreamReader.initAcceptor(consumer.getBatchSize(), consumer)
+ .onStart(mongoConfig);
+ doStreamRead(mongodbStreamReader, connectorContext, tableList, mongoCdcOffset.getCdcOffset());
}
}
- protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) {
+ protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContext connectorContext, List tableList, Object offset) {
try {
- streamReader.read(connectorContext, tableList, offset, eventBatchSize, consumer);
+ streamReader.read(connectorContext, tableList, offset);
} catch (Exception e) {
exceptionCollector.collectTerminateByServer(e);
exceptionCollector.collectWritePrivileges(e);
@@ -1666,7 +1773,7 @@ protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContex
}
}
- protected void streamReadOpLog(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) {
+ protected void streamReadOpLog(TapConnectorContext connectorContext, List tableList, Object offset) {
String database = mongoConfig.getDatabase();
if (StreamWithOpLogCollection.OP_LOG_DB.equals(database) && tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)) {
connectorContext.getLog().info("Start read oplog collection, db: local");
@@ -1676,7 +1783,7 @@ protected void streamReadOpLog(TapConnectorContext connectorContext, List doStreamRead(opLogStreamReader, connectorContext, list(StreamWithOpLogCollection.OP_LOG_COLLECTION), offset, eventBatchSize, consumer));
+ sourceRunnerFuture = sourceRunner.submit(() -> doStreamRead(opLogStreamReader, connectorContext, list(StreamWithOpLogCollection.OP_LOG_COLLECTION), offset));
}
}
}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java
index cefd9f403..f03112e1d 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java
@@ -1,7 +1,7 @@
package io.tapdata.mongodb.reader;
import io.tapdata.mongodb.entity.MongodbConfig;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import java.util.List;
@@ -13,8 +13,9 @@ public interface MongodbStreamReader {
void onStart(MongodbConfig mongodbConfig);
- void read(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) throws Exception;
+ MongodbStreamReader initAcceptor(int eventBatchSize, TapStreamReadConsumer, Object> consumer);
+ void read(TapConnectorContext connectorContext, List tableList, Object offset) throws Exception;
Object streamOffset(Long offsetStartTime);
void onDestroy();
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java
index 62d8989de..1a38d2e0a 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java
@@ -23,8 +23,13 @@
import io.tapdata.mongodb.MongodbExceptionCollector;
import io.tapdata.mongodb.MongodbUtil;
import io.tapdata.mongodb.entity.MongodbConfig;
+import io.tapdata.mongodb.reader.cdc.v4.NormalV4Acceptor;
+import io.tapdata.mongodb.reader.cdc.v4.OneByOneV4Acceptor;
+import io.tapdata.mongodb.reader.cdc.v4.V4Accept;
import io.tapdata.mongodb.util.MongodbLookupUtil;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import org.apache.commons.collections4.MapUtils;
import org.bson.*;
@@ -34,7 +39,6 @@
import org.bson.io.ByteBufferBsonInput;
import java.nio.ByteBuffer;
-import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,6 +68,7 @@ public class MongodbV4StreamReader implements MongodbStreamReader {
private MongodbExceptionCollector mongodbExceptionCollector;
private String dropTransactionId;
private Thread consumeStreamEventThread;
+ private V4Accept, ?> v4Acceptor;
public MongodbV4StreamReader setPreImage(boolean isPreImage) {
this.isPreImage = isPreImage;
@@ -74,6 +79,21 @@ public MongodbV4StreamReader() {
this.mongodbExceptionCollector = new MongodbExceptionCollector();
}
+ @Override
+ public MongodbV4StreamReader initAcceptor(int batchSize, TapStreamReadConsumer, Object> consumer) {
+ if (consumer instanceof StreamReadConsumer) {
+ this.v4Acceptor = new NormalV4Acceptor()
+ .setConsumer((StreamReadConsumer) consumer)
+ .setBatchSize(batchSize);
+ } else if (consumer instanceof StreamReadOneByOneConsumer) {
+ this.v4Acceptor = new OneByOneV4Acceptor()
+ .setConsumer((StreamReadOneByOneConsumer) consumer);
+ } else {
+ throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
+ }
+ return this;
+ }
+
@Override
public void onStart(MongodbConfig mongodbConfig) {
this.mongodbConfig = mongodbConfig;
@@ -89,7 +109,7 @@ public void onStart(MongodbConfig mongodbConfig) {
}
@Override
- public void read(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) throws Exception {
+ public void read(TapConnectorContext connectorContext, List tableList, Object offset) throws Exception {
openChangeStreamPreAndPostImages(tableList);
if (Boolean.TRUE.equals(mongodbConfig.getDoubleActive())) {
tableList.add("_tap_double_active");
@@ -125,38 +145,15 @@ public void read(TapConnectorContext connectorContext, List tableList, O
if (isPreImage) {
changeStream.fullDocumentBeforeChange(fullDocumentBeforeChangeOption);
}
- consumer.streamReadStarted();
+ this.v4Acceptor.streamReadStarted();
AtomicReference throwableAtomicReference = new AtomicReference<>();
try (final MongoChangeStreamCursor> streamCursor = changeStream.cursor()) {
consumeStreamEventThread = new Thread(() -> {
- List events = list();
- OffsetEvent lastOffsetEvent = null;
- long lastSendTime = System.currentTimeMillis();
- // Calculate time window based on batch size
- // If batch size <= 500, use fixed 50ms window
- // If batch size > 500, use dynamic window = batch size / 10 (ms)
- long timeWindowMs = eventBatchSize <= 500 ? 50 : eventBatchSize / 10;
+
while (running.get()) {
try {
OffsetEvent event = concurrentProcessor.get(10, TimeUnit.MILLISECONDS);
- if (EmptyKit.isNotNull(event)) {
- lastOffsetEvent = event;
- events.add(event.getEvent());
- // Check batch size OR time window
- if (events.size() >= eventBatchSize ||
- (System.currentTimeMillis() - lastSendTime > timeWindowMs)) {
- consumer.accept(events, event.getOffset());
- events.clear();
- lastSendTime = System.currentTimeMillis();
- }
- } else {
- // Send remaining events when queue is empty
- if (!events.isEmpty() && lastOffsetEvent != null) {
- consumer.accept(events, lastOffsetEvent.getOffset());
- events.clear();
- lastSendTime = System.currentTimeMillis();
- }
- }
+ this.v4Acceptor.accept(event);
} catch (Exception e) {
throwableAtomicReference.set(e);
return;
@@ -212,7 +209,7 @@ public void read(TapConnectorContext connectorContext, List tableList, O
}
}
- static class OffsetEvent {
+ public static class OffsetEvent {
private final Object offset;
private final TapEvent event;
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/NormalV3Acceptor.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/NormalV3Acceptor.java
new file mode 100644
index 000000000..69d13975b
--- /dev/null
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/NormalV3Acceptor.java
@@ -0,0 +1,94 @@
+package io.tapdata.mongodb.reader.cdc.v3;
+
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.mongodb.reader.v3.MongoV3StreamOffset;
+import io.tapdata.mongodb.reader.v3.TapEventOffset;
+import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/8 17:30 Create
+ * @description
+ */
+public class NormalV3Acceptor extends V3Accept {
+ StreamReadConsumer consumer;
+
+ int batchSize = 1;
+
+ long batchSizeTimeout = TimeUnit.SECONDS.toMillis(1L);
+
+ long lastConsumeTs;
+
+ List tapEvents = new ArrayList<>();
+
+ // Calculate time window based on batch size
+ // If batch size <= 500, use fixed 50ms window
+ // If batch size > 500, use dynamic window = batch size / 10 (ms)
+ @Override
+ public void accept(TapEventOffset tapEventOffset) {
+ if (tapEventOffset != null) {
+ tapEvents.add(tapEventOffset.getTapEvent());
+ this.offset.put(tapEventOffset.getReplicaSetName(), tapEventOffset.getOffset());
+ }
+
+ long consumeInterval = System.currentTimeMillis() - lastConsumeTs;
+ if (tapEvents.size() >= batchSize
+ || consumeInterval > batchSizeTimeout) {
+ Map newOffset = new ConcurrentHashMap<>(this.offset);
+ consumer.accept(tapEvents, newOffset);
+ tapEvents = new ArrayList<>(batchSize);
+ lastConsumeTs = System.currentTimeMillis();
+ }
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ consumer.accept(e, offset);
+ }
+
+ @Override
+ public NormalV3Acceptor setConsumer(StreamReadConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public NormalV3Acceptor setBatchSize(int size) {
+ this.batchSize = size;
+ return this;
+ }
+
+ @Override
+ public NormalV3Acceptor setBatchSizeTimeout(long ms) {
+ this.batchSizeTimeout = ms;
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.lastConsumeTs = System.currentTimeMillis();
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public StreamReadConsumer getConsumer() {
+ return this.consumer;
+ }
+}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/OneByOneV3Acceptor.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/OneByOneV3Acceptor.java
new file mode 100644
index 000000000..012bb6cf3
--- /dev/null
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/OneByOneV3Acceptor.java
@@ -0,0 +1,69 @@
+package io.tapdata.mongodb.reader.cdc.v3;
+
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.mongodb.reader.v3.TapEventOffset;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/8 17:30 Create
+ * @description
+ */
+public class OneByOneV3Acceptor extends V3Accept {
+ StreamReadOneByOneConsumer consumer;
+
+ @Override
+ public void accept(TapEventOffset e) {
+ if (e == null || null == e.getTapEvent()) {
+ return;
+ }
+ this.offset.put(e.getReplicaSetName(), e.getOffset());
+ consumer.accept(e.getTapEvent(), new HashMap<>(offset));
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ if (CollectionUtils.isEmpty(e)) {
+ return;
+ }
+ consumer.accept(e, offset);
+ }
+
+ @Override
+ public OneByOneV3Acceptor setConsumer(StreamReadOneByOneConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public OneByOneV3Acceptor setBatchSize(int size) {
+ return this;
+ }
+
+ @Override
+ public OneByOneV3Acceptor setBatchSizeTimeout(long ms) {
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+ @Override
+ public StreamReadOneByOneConsumer getConsumer() {
+ return this.consumer;
+ }
+
+
+}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/V3Accept.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/V3Accept.java
new file mode 100644
index 000000000..5c1a8316c
--- /dev/null
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/V3Accept.java
@@ -0,0 +1,25 @@
+package io.tapdata.mongodb.reader.cdc.v3;
+
+import io.tapdata.cdc.CustomAbstractAccepter;
+import io.tapdata.mongodb.reader.v3.MongoV3StreamOffset;
+import io.tapdata.mongodb.reader.v3.TapEventOffset;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
+
+import java.util.Map;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/9 11:06 Create
+ * @description
+ */
+public abstract class V3Accept, Consumer extends TapStreamReadConsumer, Object>>
+ extends CustomAbstractAccepter {
+
+ protected Map offset;
+
+ public void setOffset(Map offset) {
+ this.offset = offset;
+ }
+
+}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/NormalV4Acceptor.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/NormalV4Acceptor.java
new file mode 100644
index 000000000..8fc86aa8c
--- /dev/null
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/NormalV4Acceptor.java
@@ -0,0 +1,99 @@
+package io.tapdata.mongodb.reader.cdc.v4;
+
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.kit.EmptyKit;
+import io.tapdata.mongodb.reader.MongodbV4StreamReader;
+import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/8 17:30 Create
+ * @description
+ */
+public class NormalV4Acceptor extends V4Accept {
+ StreamReadConsumer consumer;
+
+ int batchSize = 1;
+
+ long batchSizeTimeout = 1000L;
+
+ MongodbV4StreamReader.OffsetEvent lastOffsetEvent = null;
+
+ long lastSendTime;
+
+ List events = new ArrayList<>();
+
+ // Calculate time window based on batch size
+ // If batch size <= 500, use fixed 50ms window
+ // If batch size > 500, use dynamic window = batch size / 10 (ms)
+ @Override
+ public void accept(MongodbV4StreamReader.OffsetEvent event) {
+ if (EmptyKit.isNotNull(event)) {
+ lastOffsetEvent = event;
+ events.add(event.getEvent());
+ // Check batch size OR time window
+ if (events.size() >= batchSize ||
+ (System.currentTimeMillis() - lastSendTime > batchSizeTimeout)) {
+ consumer.accept(events, event.getOffset());
+ events.clear();
+ lastSendTime = System.currentTimeMillis();
+ }
+ } else {
+ // Send remaining events when queue is empty
+ if (!events.isEmpty() && lastOffsetEvent != null) {
+ consumer.accept(events, lastOffsetEvent.getOffset());
+ events.clear();
+ lastSendTime = System.currentTimeMillis();
+ }
+ }
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ this.consumer.accept(e, offset);
+ }
+
+ @Override
+ public NormalV4Acceptor setConsumer(StreamReadConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public NormalV4Acceptor setBatchSize(int size) {
+ this.batchSize = size;
+ return this;
+ }
+
+ @Override
+ public NormalV4Acceptor setBatchSizeTimeout(long ms) {
+ this.batchSizeTimeout = ms;
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.lastSendTime = System.currentTimeMillis();
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public StreamReadConsumer getConsumer() {
+ return this.consumer;
+ }
+}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/OneByOneV4Acceptor.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/OneByOneV4Acceptor.java
new file mode 100644
index 000000000..a8fb12c56
--- /dev/null
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/OneByOneV4Acceptor.java
@@ -0,0 +1,65 @@
+package io.tapdata.mongodb.reader.cdc.v4;
+
+import io.tapdata.entity.event.TapEvent;
+import io.tapdata.mongodb.reader.MongodbV4StreamReader;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/8 17:30 Create
+ * @description
+ */
+public class OneByOneV4Acceptor extends V4Accept {
+ StreamReadOneByOneConsumer consumer;
+
+ @Override
+ public void accept(MongodbV4StreamReader.OffsetEvent e) {
+ if (null == e || e.getEvent() == null) {
+ return;
+ }
+ this.consumer.accept(e.getEvent(), e.getOffset());
+ }
+
+ @Override
+ public void accept(List e, Object offset) {
+ if (CollectionUtils.isEmpty(e)) {
+ return;
+ }
+ this.consumer.accept(e, offset);
+ }
+
+ @Override
+ public OneByOneV4Acceptor setConsumer(StreamReadOneByOneConsumer consumer) {
+ this.consumer = consumer;
+ return this;
+ }
+
+ @Override
+ public OneByOneV4Acceptor setBatchSize(int size) {
+ return this;
+ }
+
+ @Override
+ public OneByOneV4Acceptor setBatchSizeTimeout(long ms) {
+ return this;
+ }
+
+ @Override
+ public void streamReadStarted() {
+ this.consumer.streamReadStarted();
+ }
+
+ @Override
+ public void streamReadEnded() {
+ this.consumer.streamReadEnded();
+ }
+
+ @Override
+ public StreamReadOneByOneConsumer getConsumer() {
+ return this.consumer;
+ }
+}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/V4Accept.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/V4Accept.java
new file mode 100644
index 000000000..dfa5577b3
--- /dev/null
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/V4Accept.java
@@ -0,0 +1,15 @@
+package io.tapdata.mongodb.reader.cdc.v4;
+
+import io.tapdata.cdc.CustomAbstractAccepter;
+import io.tapdata.mongodb.reader.MongodbV4StreamReader;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
+
+/**
+ * @author Gavin'Xiao
+ * @author Gavin'Xiao
+ * @version v1.0 2025/12/9 11:07 Create
+ * @description
+ */
+public abstract class V4Accept, Consumer extends TapStreamReadConsumer, Object>>
+ extends CustomAbstractAccepter {
+}
diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java
index 16e7e87d2..b9b20bbdc 100644
--- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java
+++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java
@@ -9,9 +9,8 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import io.tapdata.entity.event.TapBaseEvent;
-import io.tapdata.entity.event.TapEvent;
-import io.tapdata.entity.event.dml.TapUpdateRecordEvent;
import io.tapdata.entity.event.control.HeartbeatEvent;
+import io.tapdata.entity.event.dml.TapUpdateRecordEvent;
import io.tapdata.entity.logger.TapLogger;
import io.tapdata.entity.utils.cache.KVMap;
import io.tapdata.exception.TapPdkOffsetOutOfLogEx;
@@ -19,9 +18,14 @@
import io.tapdata.mongodb.MongodbUtil;
import io.tapdata.mongodb.entity.MongodbConfig;
import io.tapdata.mongodb.reader.MongodbStreamReader;
+import io.tapdata.mongodb.reader.cdc.v3.NormalV3Acceptor;
+import io.tapdata.mongodb.reader.cdc.v3.OneByOneV3Acceptor;
+import io.tapdata.mongodb.reader.cdc.v3.V3Accept;
import io.tapdata.mongodb.util.IntervalReport;
import io.tapdata.mongodb.util.MongodbLookupUtil;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
@@ -30,14 +34,21 @@
import org.bson.conversions.Bson;
import java.time.Instant;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static io.tapdata.base.ConnectorBase.*;
+import static io.tapdata.base.ConnectorBase.deleteDMLEvent;
+import static io.tapdata.base.ConnectorBase.insertRecordEvent;
+import static io.tapdata.base.ConnectorBase.updateDMLEvent;
/**
* @author jackin
@@ -49,7 +60,6 @@ public class MongodbV3StreamReader implements MongodbStreamReader {
private final static String LOCAL_DATABASE = "local";
private final static String OPLOG_COLLECTION = "oplog.rs";
- public static final long CONSUME_TIME_OUT = TimeUnit.SECONDS.toMillis(1L);
private MongodbConfig mongodbConfig;
@@ -73,6 +83,8 @@ public class MongodbV3StreamReader implements MongodbStreamReader {
private ConnectionString connectionString;
+ V3Accept, ?> acceptor;
+
@Override
public void onStart(MongodbConfig mongodbConfig) {
this.mongodbConfig = mongodbConfig;
@@ -84,8 +96,25 @@ public void onStart(MongodbConfig mongodbConfig) {
replicaSetReadThreadPool = new ThreadPoolExecutor(nodesURI.size(), nodesURI.size(), 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
}
+
@Override
- public void read(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) throws Exception {
+ public MongodbV3StreamReader initAcceptor(int eventBatchSize, TapStreamReadConsumer, Object> consumer) {
+ if (consumer instanceof StreamReadConsumer) {
+ this.acceptor = new NormalV3Acceptor()
+ .setConsumer((StreamReadConsumer) consumer)
+ .setBatchSize(eventBatchSize)
+ .setBatchSizeTimeout(eventBatchSize <= 500 ? 50 : eventBatchSize / 10);
+ } else if (consumer instanceof StreamReadOneByOneConsumer) {
+ this.acceptor = new OneByOneV3Acceptor()
+ .setConsumer((StreamReadOneByOneConsumer) consumer);
+ } else {
+ throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName());
+ }
+ return this;
+ }
+
+ @Override
+ public void read(TapConnectorContext connectorContext, List tableList, Object offset) throws Exception {
if (CollectionUtils.isNotEmpty(tableList)) {
for (String tableName : tableList) {
namespaces.add(new StringBuilder(mongodbConfig.getDatabase()).append(".").append(tableName).toString());
@@ -93,7 +122,7 @@ public void read(TapConnectorContext connectorContext, List tableList, O
}
if (tapEventQueue == null) {
- tapEventQueue = new LinkedBlockingDeque<>(eventBatchSize);
+ tapEventQueue = new LinkedBlockingDeque<>(this.acceptor.getBatchSize());
}
if (this.globalStateMap == null) {
@@ -105,6 +134,7 @@ public void read(TapConnectorContext connectorContext, List tableList, O
} else {
this.offset = new ConcurrentHashMap<>();
}
+ this.acceptor.setOffset(this.offset);
if (MapUtils.isNotEmpty(nodesURI)) {
int intervalReportMillis = 3000; // interval report offset if no match any table event
@@ -136,7 +166,7 @@ protected void report(BsonTimestamp bsonTimestamp) throws InterruptedException {
if (running.get()) {
try {
Thread.currentThread().setName("replicaSet-read-thread-" + replicaSetName);
- readFromOplog(connectorContext, replicaSetName, intervalReport, mongodbURI, eventBatchSize, consumer);
+ readFromOplog(connectorContext, replicaSetName, intervalReport, mongodbURI);
} catch (Exception e) {
running.compareAndSet(true, false);
TapLogger.error(TAG, "read oplog event from {} failed {}", replicaSetName, e.getMessage(), e);
@@ -146,25 +176,10 @@ protected void report(BsonTimestamp bsonTimestamp) throws InterruptedException {
});
}
}
-
- List tapEvents = new ArrayList<>(eventBatchSize);
- long lastConsumeTs = 0L;
while (running.get()) {
try {
final TapEventOffset tapEventOffset = tapEventQueue.poll(3, TimeUnit.SECONDS);
- if (tapEventOffset != null) {
- tapEvents.add(tapEventOffset.getTapEvent());
- this.offset.put(tapEventOffset.getReplicaSetName(), tapEventOffset.getOffset());
- }
-
- long consumeInterval = System.currentTimeMillis() - lastConsumeTs;
- if (tapEvents.size() >= eventBatchSize
- || consumeInterval > CONSUME_TIME_OUT) {
- Map newOffset = new ConcurrentHashMap<>(this.offset);
- consumer.accept(tapEvents, newOffset);
- tapEvents = new ArrayList<>(eventBatchSize);
- lastConsumeTs = System.currentTimeMillis();
- }
+ this.acceptor.accept(tapEventOffset);
} catch (InterruptedException e) {
TapLogger.info("Stream polling failed: {}", e.getMessage(), e);
break;
@@ -209,7 +224,7 @@ public void onDestroy() {
}
}
- private void readFromOplog(TapConnectorContext connectorContext, String replicaSetName, IntervalReport intervalReport, String mongodbURI, int eventBatchSize, StreamReadConsumer consumer) {
+ private void readFromOplog(TapConnectorContext connectorContext, String replicaSetName, IntervalReport intervalReport, String mongodbURI) {
Bson filter = null;
BsonTimestamp startTs = null;
@@ -255,7 +270,7 @@ private void readFromOplog(TapConnectorContext connectorContext, String replicaS
.cursorType(CursorType.TailableAwait)
.noCursorTimeout(true).iterator()) {
- consumer.streamReadStarted();
+ this.acceptor.streamReadStarted();
while (running.get()) {
if (mongoCursor.hasNext()) {
final Document event = mongoCursor.next();
diff --git a/connectors/mongodb-connector/src/main/resources/spec.json b/connectors/mongodb-connector/src/main/resources/spec.json
index 2e2a4f7be..d82329c88 100644
--- a/connectors/mongodb-connector/src/main/resources/spec.json
+++ b/connectors/mongodb-connector/src/main/resources/spec.json
@@ -4,7 +4,17 @@
"icon": "icons/mongodb.png",
"doc" : "${doc}",
"tags" : ["schema-free","Database","doubleActive"],
- "id": "mongodb"
+ "id": "mongodb",
+ "autoAccumulateBatch": {
+ "batchRead": {
+ "open": false,
+ "maxDelayMs": "1000"
+ },
+ "increaseRead": {
+ "open": true,
+ "maxDelayMs": "1000"
+ }
+ }
},
"configOptions": {
"capabilities": [
diff --git a/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbConnectorTest.java b/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbConnectorTest.java
deleted file mode 100644
index 95e5795f2..000000000
--- a/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbConnectorTest.java
+++ /dev/null
@@ -1,810 +0,0 @@
-package io.tapdata.mongodb;
-
-import com.mongodb.client.*;
-import com.mongodb.client.MongoClient;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.IndexOptions;
-import io.tapdata.entity.event.TapEvent;
-import io.tapdata.entity.event.ddl.index.TapCreateIndexEvent;
-import io.tapdata.entity.logger.Log;
-import io.tapdata.entity.schema.TapIndex;
-import io.tapdata.entity.schema.TapIndexField;
-import io.tapdata.entity.schema.TapIndexEx;
-import io.tapdata.entity.schema.TapTable;
-import io.tapdata.mongodb.batch.ErrorHandler;
-import io.tapdata.mongodb.batch.MongoBatchReader;
-import io.tapdata.mongodb.entity.MongoCdcOffset;
-import io.tapdata.mongodb.entity.MongodbConfig;
-import io.tapdata.mongodb.entity.ReadParam;
-import io.tapdata.mongodb.reader.MongodbOpLogStreamV3Reader;
-import io.tapdata.mongodb.reader.MongodbStreamReader;
-import io.tapdata.mongodb.reader.StreamWithOpLogCollection;
-import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
-import io.tapdata.pdk.apis.context.TapConnectionContext;
-import io.tapdata.pdk.apis.context.TapConnectorContext;
-import io.tapdata.pdk.apis.entity.ExecuteResult;
-import io.tapdata.pdk.apis.entity.TapAdvanceFilter;
-import io.tapdata.pdk.apis.entity.TapExecuteCommand;
-import io.tapdata.pdk.apis.functions.connection.TableInfo;
-import org.bson.Document;
-import org.bson.conversions.Bson;
-import org.junit.jupiter.api.*;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
-import org.springframework.test.util.ReflectionTestUtils;
-
-import java.util.*;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockStatic;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-class MongodbConnectorTest {
- MongodbConnector mongodbConnector;
- TapConnectorContext connectorContext;
- MongodbConfig mongoConfig;
- MongoClient mongoClient;
- MongodbStreamReader mongodbStreamReader;
- MongodbStreamReader opLogStreamReader;
- MongodbExceptionCollector exceptionCollector;
- ThreadPoolExecutor sourceRunner;
- Future> sourceRunnerFuture;
- Log log;
- @BeforeEach
- void init() {
- mongodbConnector = mock(MongodbConnector.class);
- connectorContext = mock(TapConnectorContext.class);
- mongoConfig = mock(MongodbConfig.class);
- ReflectionTestUtils.setField(mongodbConnector, "mongoConfig", mongoConfig);
- mongoClient = mock(MongoClient.class);
- ReflectionTestUtils.setField(mongodbConnector,"mongoClient",mongoClient);
- mongodbStreamReader = mock(MongodbStreamReader.class);
- ReflectionTestUtils.setField(mongodbConnector, "mongodbStreamReader", mongodbStreamReader);
- opLogStreamReader = mock(MongodbStreamReader.class);
- ReflectionTestUtils.setField(mongodbConnector, "opLogStreamReader", opLogStreamReader);
- exceptionCollector = new MongodbExceptionCollector();
- ReflectionTestUtils.setField(mongodbConnector, "exceptionCollector", exceptionCollector);
- sourceRunner = mock(ThreadPoolExecutor.class);
- ReflectionTestUtils.setField(mongodbConnector, "sourceRunner", sourceRunner);
- sourceRunnerFuture = mock(Future.class);
- ReflectionTestUtils.setField(mongodbConnector, "sourceRunnerFuture", sourceRunnerFuture);
- log = mock(Log.class);
- when(connectorContext.getLog()).thenReturn(log);
- }
-
- @Nested
- class BatchReadTest {
- TapTable table;
- Object offset;
- int eventBatchSize;
- BiConsumer, Object> tapReadOffsetConsumer;
- MongoBatchReader reader;
- Exception e;
- @BeforeEach
- void init() throws Throwable {
- e = new Exception();
- reader = mock(MongoBatchReader.class);
- table = mock(TapTable.class);
- offset = 0L;
- eventBatchSize = 100;
- tapReadOffsetConsumer = mock(BiConsumer.class);
- doNothing().when(mongodbConnector).errorHandle(e, connectorContext);
- doAnswer(a -> {
- ReadParam param = a.getArgument(0, ReadParam.class);
- ErrorHandler errorHandler = param.getErrorHandler();
- errorHandler.doHandle(e);
- return null;
- }).when(reader).batchReadCollection(any(ReadParam.class));
- doCallRealMethod().when(mongodbConnector).batchRead(connectorContext, table, offset, eventBatchSize, tapReadOffsetConsumer);
- }
- @Test
- void testNormal() throws Throwable {
- try(MockedStatic mbr = mockStatic(MongoBatchReader.class)) {
- mbr.when(() -> MongoBatchReader.of(any(ReadParam.class))).thenReturn(reader);
- mongodbConnector.batchRead(connectorContext, table, offset, eventBatchSize, tapReadOffsetConsumer);
- verify(mongodbConnector).errorHandle(e, connectorContext);
- }
- }
- }
-
- @Nested
- class StreamOffsetTest {
- Long offsetStartTime;
-
- @BeforeEach
- void init() {
- offsetStartTime = 0L;
-
- when(mongoConfig.getDatabase()).thenReturn("test");
- when(opLogStreamReader.streamOffset(offsetStartTime)).thenReturn(0L);
- when(mongodbStreamReader.streamOffset(offsetStartTime)).thenReturn(0L);
- when(mongodbConnector.createStreamReader()).thenReturn(mongodbStreamReader);
- doNothing().when(opLogStreamReader).onStart(mongoConfig);
- when(mongodbConnector.streamOffset(connectorContext, offsetStartTime)).thenCallRealMethod();
- }
- @Test
- void testNormal() {
- Object offset = mongodbConnector.streamOffset(connectorContext, offsetStartTime);
- Assertions.assertNotNull(offset);
- Assertions.assertEquals(HashMap.class.getName(), offset.getClass().getName());
- MongoCdcOffset o = MongoCdcOffset.fromOffset(offset);
- Assertions.assertEquals(0L, o.getOpLogOffset());
- Assertions.assertEquals(0L, o.getCdcOffset());
- verify(opLogStreamReader).streamOffset(offsetStartTime);
- verify(mongoConfig, times(0)).getDatabase();
- verify(mongodbStreamReader).streamOffset(offsetStartTime);
- }
- @Test
- void testMongodbStreamReaderIsNull() {
- ReflectionTestUtils.setField(mongodbConnector, "mongodbStreamReader", null);
- when(mongodbConnector.createStreamReader()).thenReturn(mongodbStreamReader);
- Object offset = mongodbConnector.streamOffset(connectorContext, offsetStartTime);
- Assertions.assertNotNull(offset);
- Assertions.assertEquals(HashMap.class.getName(), offset.getClass().getName());
- MongoCdcOffset o = MongoCdcOffset.fromOffset(offset);
- Assertions.assertEquals(0L, o.getOpLogOffset());
- Assertions.assertEquals(0L, o.getCdcOffset());
- verify(opLogStreamReader).streamOffset(offsetStartTime);
- verify(mongodbStreamReader).streamOffset(offsetStartTime);
- verify(mongoConfig, times(0)).getDatabase();
- verify(mongodbConnector).createStreamReader();
- }
- @Test
- void testOpLogStreamReaderIsNullButDatabaseNotLocal() {
- ReflectionTestUtils.setField(mongodbConnector, "opLogStreamReader", null);
- Object offset = mongodbConnector.streamOffset(connectorContext, offsetStartTime);
- Assertions.assertNotNull(offset);
- Assertions.assertEquals(HashMap.class.getName(), offset.getClass().getName());
- MongoCdcOffset o = MongoCdcOffset.fromOffset(offset);
- Assertions.assertNull(o.getOpLogOffset());
- Assertions.assertEquals(0L, o.getCdcOffset());
- verify(opLogStreamReader, times(0)).streamOffset(offsetStartTime);
- verify(mongodbStreamReader).streamOffset(offsetStartTime);
- verify(mongoConfig).getDatabase();
- verify(mongodbConnector, times(0)).createStreamReader();
- }
- @Test
- void testOpLogStreamReaderIsNullDatabaseIsLocal() {
- when(mongoConfig.getDatabase()).thenReturn("local");
- ReflectionTestUtils.setField(mongodbConnector, "opLogStreamReader", null);
- try(MockedStatic mol = mockStatic(MongodbOpLogStreamV3Reader.class)) {
- mol.when(MongodbOpLogStreamV3Reader::of).thenReturn(opLogStreamReader);
- Object offset = mongodbConnector.streamOffset(connectorContext, offsetStartTime);
- Assertions.assertNotNull(offset);
- Assertions.assertEquals(HashMap.class.getName(), offset.getClass().getName());
- MongoCdcOffset o = MongoCdcOffset.fromOffset(offset);
- Assertions.assertEquals(0L, o.getOpLogOffset());
- Assertions.assertEquals(0L, o.getCdcOffset());
- verify(opLogStreamReader).streamOffset(offsetStartTime);
- verify(mongodbStreamReader).streamOffset(offsetStartTime);
- verify(mongoConfig).getDatabase();
- verify(mongodbConnector, times(0)).createStreamReader();
- verify(opLogStreamReader).onStart(mongoConfig);
- }
- }
- }
-
- @Nested
- class StreamReadTest {
- List tableList;
- Object offset;
- int eventBatchSize;
- StreamReadConsumer consumer;
- MongoCdcOffset mongoCdcOffset;
-
- @BeforeEach
- void init() {
- mongoCdcOffset = new MongoCdcOffset(0L, 0L);
- tableList = mock(List.class);
- offset = 0L;
- eventBatchSize = 100;
- consumer = mock(StreamReadConsumer.class);
- when(tableList.size()).thenReturn(1, 1);
-
- doNothing().when(mongodbConnector).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer);
- when(tableList.isEmpty()).thenReturn(false);
- when(mongodbConnector.createStreamReader()).thenReturn(mongodbStreamReader);
- doNothing().when(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer);
- doCallRealMethod().when(mongodbConnector).streamRead(connectorContext, tableList, offset, eventBatchSize, consumer);
- }
- @Test
- void testNormal() {
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer));
- verify(tableList, times(2)).size();
- verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer);
- verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer);
- verify(tableList, times(0)).isEmpty();
- verify(mongodbConnector, times(0)).createStreamReader();
- verify(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer);
- }
- @Test
- void testAfterReadOpLog() {
- when(tableList.size()).thenReturn(1, 2);
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer));
- verify(tableList, times(2)).size();
- verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer);
- verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer);
- verify(tableList).isEmpty();
- verify(mongodbConnector, times(0)).createStreamReader();
- verify(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer);
- }
- @Test
- void testMongodbStreamReaderIsNull() {
- ReflectionTestUtils.setField(mongodbConnector, "mongodbStreamReader", null);
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer));
- verify(tableList, times(2)).size();
- verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer);
- verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer);
- verify(tableList, times(0)).isEmpty();
- verify(mongodbConnector, times(1)).createStreamReader();
- verify(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer);
- }
- @Test
- void testOffsetIsMongoOffset() {
- offset = mongoCdcOffset;
- doCallRealMethod().when(mongodbConnector).streamRead(connectorContext, tableList, offset, eventBatchSize, consumer);
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer));
- verify(tableList, times(2)).size();
- verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer);
- verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer);
- verify(tableList, times(0)).isEmpty();
- verify(mongodbConnector, times(0)).createStreamReader();
- verify(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer);
- }
- @Test
- void testOnlyReadOpLogCollection() {
- when(tableList.size()).thenReturn(1, 2);
- when(tableList.isEmpty()).thenReturn(true);
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer));
- verify(tableList, times(2)).size();
- verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer);
- verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer);
- verify(tableList).isEmpty();
- verify(mongodbConnector, times(0)).createStreamReader();
- verify(mongodbConnector, times(0)).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer);
- }
- }
-
- @Nested
- class DoStreamTest {
- List tableList;
- Object offset;
- int eventBatchSize;
- StreamReadConsumer consumer;
- MongoCdcOffset mongoCdcOffset;
- MongodbStreamReader streamReader;
- @BeforeEach
- void init() throws Exception {
- streamReader = mock(MongodbStreamReader.class);
- mongoCdcOffset = new MongoCdcOffset(0L, 0L);
- tableList = mock(List.class);
- offset = 0L;
- eventBatchSize = 100;
- consumer = mock(StreamReadConsumer.class);
- when(tableList.size()).thenReturn(1, 1);
-
- doNothing().when(streamReader).read(connectorContext, tableList, offset, eventBatchSize, consumer);
- doNothing().when(streamReader).onDestroy();
- doNothing().when(mongodbConnector).errorHandle(any(Exception.class), any(TapConnectorContext.class));
- doNothing().when(log).debug(anyString(), anyString());
-
- doCallRealMethod().when(mongodbConnector).doStreamRead(streamReader, connectorContext, tableList, offset, eventBatchSize, consumer);
- }
- @Test
- void testNormal() throws Exception {
- Assertions.assertDoesNotThrow(() -> mongodbConnector.doStreamRead(streamReader, connectorContext, tableList, offset, eventBatchSize, consumer));
- verify(streamReader, times(1)).read(connectorContext, tableList, offset, eventBatchSize, consumer);
- verify(streamReader, times(0)).onDestroy();
- verify(mongodbConnector, times(0)).errorHandle(any(Exception.class), any(TapConnectorContext.class));
- verify(log, times(0)).debug(anyString(), anyString());
- }
- @Test
- void testException() throws Exception {
- doAnswer(a -> {
- throw new Exception("error");
- }).when(streamReader).read(connectorContext, tableList, offset, eventBatchSize, consumer);
- Assertions.assertDoesNotThrow(() -> mongodbConnector.doStreamRead(streamReader, connectorContext, tableList, offset, eventBatchSize, consumer));
- verify(streamReader, times(1)).read(connectorContext, tableList, offset, eventBatchSize, consumer);
- verify(streamReader, times(1)).onDestroy();
- verify(mongodbConnector, times(1)).errorHandle(any(Exception.class), any(TapConnectorContext.class));
- verify(log, times(0)).debug(anyString(), anyString());
- }
- @Test
- void testDestroyException() throws Exception {
- doAnswer(a -> {
- throw new Exception("error");
- }).when(streamReader).read(connectorContext, tableList, offset, eventBatchSize, consumer);
- doAnswer(a -> {
- throw new Exception("failed");
- }).when(streamReader).onDestroy();
- Assertions.assertDoesNotThrow(() -> mongodbConnector.doStreamRead(streamReader, connectorContext, tableList, offset, eventBatchSize, consumer));
- verify(streamReader, times(1)).read(connectorContext, tableList, offset, eventBatchSize, consumer);
- verify(streamReader, times(1)).onDestroy();
- verify(mongodbConnector, times(1)).errorHandle(any(Exception.class), any(TapConnectorContext.class));
- verify(log, times(1)).debug(anyString(), anyString());
- }
- }
-
- @Nested
- class StreamReadOpLogTest {
- List tableList;
- Object offset;
- int eventBatchSize;
- StreamReadConsumer consumer;
- MongoCdcOffset mongoCdcOffset;
- @BeforeEach
- void init() {
- mongoCdcOffset = new MongoCdcOffset(0L, 0L);
- tableList = mock(List.class);
- offset = 0L;
- eventBatchSize = 100;
- consumer = mock(StreamReadConsumer.class);
-
- when(mongoConfig.getDatabase()).thenReturn("test");
- when(tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)).thenReturn(false);
- doNothing().when(log).info("Start read oplog collection, db: local");
- when(tableList.remove(StreamWithOpLogCollection.OP_LOG_COLLECTION)).thenReturn(true);
- when(tableList.isEmpty()).thenReturn(true);
- doNothing().when(mongodbConnector).doStreamRead(
- any(MongodbStreamReader.class),
- any(TapConnectorContext.class),
- anyList(),
- any(),
- anyInt(),
- any(StreamReadConsumer.class));
- doCallRealMethod().when(mongodbConnector).streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer);
- }
-
- @Test
- void testNotReadOpLog() {
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer));
- verify(mongoConfig).getDatabase();
- verify(tableList, times(0)).contains(StreamWithOpLogCollection.OP_LOG_COLLECTION);
- verify(connectorContext, times(0)).getLog();
- verify(log, times(0)).info("Start read oplog collection, db: local");
- verify(tableList, times(0)).remove(StreamWithOpLogCollection.OP_LOG_COLLECTION);
- verify(tableList, times(0)).isEmpty();
- verify(mongodbConnector, times(0)).doStreamRead(
- any(MongodbStreamReader.class),
- any(TapConnectorContext.class),
- anyList(),
- any(),
- anyInt(),
- any(StreamReadConsumer.class));
- }
- @Test
- void testNotReadOpLog2() {
- when(mongoConfig.getDatabase()).thenReturn("local");
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer));
- verify(mongoConfig).getDatabase();
- verify(tableList, times(1)).contains(StreamWithOpLogCollection.OP_LOG_COLLECTION);
- verify(connectorContext, times(0)).getLog();
- verify(log, times(0)).info("Start read oplog collection, db: local");
- verify(tableList, times(0)).remove(StreamWithOpLogCollection.OP_LOG_COLLECTION);
- verify(tableList, times(0)).isEmpty();
- verify(mongodbConnector, times(0)).doStreamRead(
- any(MongodbStreamReader.class),
- any(TapConnectorContext.class),
- anyList(),
- any(),
- anyInt(),
- any(StreamReadConsumer.class));
- }
- @Test
- void testReadOpLog() {
- when(mongoConfig.getDatabase()).thenReturn("local");
- when(tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)).thenReturn(true);
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer));
- verify(mongoConfig).getDatabase();
- verify(tableList, times(1)).contains(StreamWithOpLogCollection.OP_LOG_COLLECTION);
- verify(connectorContext, times(1)).getLog();
- verify(log, times(1)).info("Start read oplog collection, db: local");
- verify(tableList, times(1)).remove(StreamWithOpLogCollection.OP_LOG_COLLECTION);
- verify(tableList, times(1)).isEmpty();
- verify(mongodbConnector, times(1)).doStreamRead(
- any(MongodbStreamReader.class),
- any(TapConnectorContext.class),
- anyList(),
- any(),
- anyInt(),
- any(StreamReadConsumer.class));
- }
- @Test
- void testReadOpLogWithNullMongodbOpLogStreamV3Reader() {
- try (MockedStatic mol = mockStatic(MongodbOpLogStreamV3Reader.class)) {
- mol.when(MongodbOpLogStreamV3Reader::of).thenReturn(opLogStreamReader);
- doNothing().when(opLogStreamReader).onStart(mongoConfig);
- ReflectionTestUtils.setField(mongodbConnector, "opLogStreamReader", null);
- when(mongoConfig.getDatabase()).thenReturn("local");
- when(tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)).thenReturn(true);
- Assertions.assertDoesNotThrow(() -> mongodbConnector.streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer));
- verify(mongoConfig).getDatabase();
- verify(tableList, times(1)).contains(StreamWithOpLogCollection.OP_LOG_COLLECTION);
- verify(connectorContext, times(1)).getLog();
- verify(log, times(1)).info("Start read oplog collection, db: local");
- verify(tableList, times(1)).remove(StreamWithOpLogCollection.OP_LOG_COLLECTION);
- verify(tableList, times(1)).isEmpty();
- verify(opLogStreamReader).onStart(mongoConfig);
- verify(mongodbConnector, times(1)).doStreamRead(
- any(MongodbStreamReader.class),
- any(TapConnectorContext.class),
- anyList(),
- any(),
- anyInt(),
- any(StreamReadConsumer.class));
- }
- }
- }
-
- @Nested
- class CloseOpLogThreadSourceTest {
-
- @BeforeEach
- void init() {
- when(sourceRunnerFuture.cancel(true)).thenReturn(true);
-
- doNothing().when(sourceRunner).shutdown();
- doCallRealMethod().when(mongodbConnector).closeOpLogThreadSource();
- }
- @Test
- void testNormal() {
- Assertions.assertDoesNotThrow(mongodbConnector::closeOpLogThreadSource);
- verify(sourceRunnerFuture).cancel(true);
- verify(sourceRunner).shutdown();
- Assertions.assertNull(mongodbConnector.sourceRunner);
- Assertions.assertNull(mongodbConnector.sourceRunnerFuture);
- }
- @Test
- void testThrowException() {
- when(sourceRunnerFuture.cancel(true)).thenAnswer(a -> {
- throw new Exception("failed");
- });
- doAnswer(a -> {
- throw new Exception("failed");
- }).when(sourceRunner).shutdown();
- assertThrows(Exception.class,mongodbConnector::closeOpLogThreadSource);
- verify(sourceRunnerFuture).cancel(true);
- verify(sourceRunner).shutdown();
- Assertions.assertNull(mongodbConnector.sourceRunner);
- Assertions.assertNull(mongodbConnector.sourceRunnerFuture);
- }
- }
-
- @Nested
- class getMongoCollection{
- private MongoDatabase mongoDatabase;
- private String table;
- @BeforeEach
- void beforeEach(){
- table = "table";
- mongoDatabase = mock(MongoDatabase.class);
- ReflectionTestUtils.setField(mongodbConnector,"mongoDatabase",mongoDatabase);
- }
- @Test
- void testGetMongoCollectionWithEx(){
- when(mongoDatabase.getCollection(table)).thenThrow(RuntimeException.class);
- when(mongoConfig.getUri()).thenReturn("mongodb://127.0.0.1:27017/test");
- doCallRealMethod().when(mongodbConnector).getMongoCollection(table);
- assertThrows(RuntimeException.class,()->mongodbConnector.getMongoCollection(table));
- }
- }
- @Nested
- class executeCommand{
- private TapExecuteCommand tapExecuteCommand;
- private Consumer executeResultConsumer;
- private MongodbExecuteCommandFunction mongodbExecuteCommandFunction;
- @BeforeEach
- void beforeEach(){
- tapExecuteCommand = mock(TapExecuteCommand.class);
- executeResultConsumer = mock(Consumer.class);
- mongodbExecuteCommandFunction = mock(MongodbExecuteCommandFunction.class);
- }
- @Test
- void testExecuteCommandWithEx(){
- Map executeObj = new HashMap<>();
- executeObj.put("database","test");
- when(tapExecuteCommand.getParams()).thenReturn(executeObj);
- when(tapExecuteCommand.getCommand()).thenReturn("executeQuery");
- doThrow(new RuntimeException()).when(mongodbExecuteCommandFunction).executeQuery(anyMap(),any(MongoClient.class),any(Consumer.class),any(Supplier.class));
- doCallRealMethod().when(mongodbConnector).executeCommand(connectorContext,tapExecuteCommand,executeResultConsumer);
- assertThrows(RuntimeException.class,()->mongodbConnector.executeCommand(connectorContext,tapExecuteCommand,executeResultConsumer));
- }
- }
- @Nested
- class queryFieldMinMaxValue{
- private TapTable table;
- private TapAdvanceFilter partitionFilter;
- private String fieldName;
- @BeforeEach
- void beforeEach(){
- table = mock(TapTable.class);
- partitionFilter = mock(TapAdvanceFilter.class);
- fieldName = "field";
- when(table.getId()).thenReturn("table");
- }
- @Test
- void testQueryFieldMinMaxValueWithEx(){
- MongoCollection collection = mock(MongoCollection.class);
- when(mongodbConnector.getMongoCollection("table")).thenReturn(collection);
- TapIndexEx partitionIndex = mock(TapIndexEx.class);
- when(table.partitionIndex()).thenReturn(partitionIndex);
- List indexFields = new ArrayList<>();
- TapIndexField indexField = mock(TapIndexField.class);
- when(indexField.getName()).thenReturn(fieldName);
- indexFields.add(indexField);
- when(partitionIndex.getIndexFields()).thenReturn(indexFields);
- when(collection.find(any(Bson.class))).thenThrow(RuntimeException.class);
- doCallRealMethod().when(mongodbConnector).queryFieldMinMaxValue(connectorContext,table,partitionFilter,fieldName);
- assertThrows(RuntimeException.class,()->mongodbConnector.queryFieldMinMaxValue(connectorContext,table,partitionFilter,fieldName));
- }
- }
- @Nested
- class batchCount{
- private TapTable table;
- @Test
- void testBatchCountWithEx() throws Throwable {
- try (MockedStatic mb = Mockito
- .mockStatic(MongodbConnector.class)) {
- table = mock(TapTable.class);
- when(table.getId()).thenReturn("collectionName");
- when(mongoConfig.getDatabase()).thenReturn("database");
- mb.when(()->MongodbConnector.getCollectionNotAggregateCountByTableName(mongoClient,"database","collectionName",null)).thenThrow(RuntimeException.class);
- doCallRealMethod().when(mongodbConnector).batchCount(connectorContext,table);
- assertThrows(RuntimeException.class,()->mongodbConnector.batchCount(connectorContext,table));
- }
- }
- }
- @Nested
- class createIndex{
- private TapTable table;
- private List indexList;
- private Log log;
- private MongoDatabase mongoDatabase;
- @BeforeEach
- void beforeEach(){
- table = mock(TapTable.class);
- when(table.getName()).thenReturn("table");
- indexList = new ArrayList<>();
- TapIndex tapIndex = new TapIndex();
- tapIndex.setName("__t__test");
- indexList.add(tapIndex);
- log = mock(Log.class);
- mongoDatabase = mock(MongoDatabase.class);
- ReflectionTestUtils.setField(mongodbConnector,"mongoDatabase",mongoDatabase);
- }
-
- @Test
- void testCreateIndexWithExWhenCreateIndex(){
- try (MockedStatic mb = Mockito
- .mockStatic(Document.class)) {
- Document document = mock(Document.class);
- mb.when(()->Document.parse("test")).thenReturn(document);
- MongoCollection targetCollection = mock(MongoCollection.class);
- when(mongoDatabase.getCollection("table")).thenReturn(targetCollection);
- when(targetCollection.createIndex(any(),any(IndexOptions.class))).thenThrow(RuntimeException.class);
- doCallRealMethod().when(mongodbConnector).createIndex(table,indexList,log);
- mongodbConnector.createIndex(table,indexList,log);
- verify(log).warn(anyString());
- }
- }
- }
- @Nested
- class createIndexWithTapCreateIndexEvent{
- private TapTable table;
- private TapCreateIndexEvent tapCreateIndexEvent;
- private MongoDatabase mongoDatabase;
- @BeforeEach
- void beforeEach(){
- table = mock(TapTable.class);
- when(table.getName()).thenReturn("test");
- tapCreateIndexEvent = mock(TapCreateIndexEvent.class);
- mongoDatabase = mock(MongoDatabase.class);
- ReflectionTestUtils.setField(mongodbConnector,"mongoDatabase",mongoDatabase);
- }
- @Test
- void testCreateIndexEventWithEx(){
- List indexList = new ArrayList<>();
- TapIndex tapIndex = new TapIndex();
- tapIndex.setName("index");
- List indexFields = new ArrayList<>();
- indexFields.add(mock(TapIndexField.class));
- tapIndex.setIndexFields(indexFields);
- indexList.add(tapIndex);
- when(tapCreateIndexEvent.getIndexList()).thenReturn(indexList);
- when(mongoDatabase.getCollection("test")).thenThrow(RuntimeException.class);
- when(mongoConfig.getUri()).thenReturn("mongodb://127.0.0.1:27017/test");
- doCallRealMethod().when(mongodbConnector).createIndex(connectorContext,table,tapCreateIndexEvent);
- assertThrows(RuntimeException.class,()->mongodbConnector.createIndex(connectorContext,table,tapCreateIndexEvent));
- }
- }
- @Nested
- class createStreamReader{
-
- @Test
- void testCreateStreamReaderWithEx(){
- when(mongoConfig.getDatabase()).thenReturn("database");
- when(mongoClient.getDatabase("database")).thenThrow(RuntimeException.class);
- doCallRealMethod().when(mongodbConnector).createStreamReader();
- assertThrows(RuntimeException.class,()->mongodbConnector.createStreamReader());
- }
- }
- @Nested
- class getTableNames{
- private int batchSize;
- private Consumer> listConsumer;
- @Test
- void testGetTableNamesWithEx() throws Throwable {
- when(mongoConfig.getDatabase()).thenReturn("database");
- when(mongoClient.getDatabase("database")).thenThrow(RuntimeException.class);
- doCallRealMethod().when(mongodbConnector).getTableNames(connectorContext,batchSize,listConsumer);
- assertThrows(RuntimeException.class,()->mongodbConnector.getTableNames(connectorContext,batchSize,listConsumer));
- }
- }
- @Nested
- class onStop{
- @Test
- void testOnStopWithEx() throws Throwable {
- doThrow(RuntimeException.class).when(mongoClient).close();
- doCallRealMethod().when(mongodbConnector).onStop(connectorContext);
- assertThrows(RuntimeException.class,()->mongodbConnector.onStop(connectorContext));
- }
- }
- @Nested
- class GetTableInfoTest{
- private TapConnectionContext tapConnectorContext;
- private String tableName;
- @BeforeEach
- void beforeEach(){
- tapConnectorContext = mock(TapConnectorContext.class);
- tableName = "table";
- }
- @Test
- void testGetTableInfoWithEx() throws Throwable {
- when(mongoConfig.getDatabase()).thenReturn("test");
- when(mongoClient.getDatabase("test")).thenThrow(RuntimeException.class);
- doCallRealMethod().when(mongodbConnector).getTableInfo(tapConnectorContext,tableName);
- assertThrows(RuntimeException.class, ()->mongodbConnector.getTableInfo(tapConnectorContext,tableName));
- }
-
- @Test
- void testGetTableInfoSizeOverInteger() throws Throwable {
-
- when(mongoConfig.getDatabase()).thenReturn("test");
- MongoDatabase mongoDatabase = mock(MongoDatabase.class);
- when(mongoClient.getDatabase("test")).thenReturn(mongoDatabase);
- Document collStats = new Document();
- Integer count = 26888601;
- Double size = 366907867698976.0;
- collStats.put("count",count);
- collStats.put("size",size);
- when(mongoDatabase.runCommand(new Document("collStats", tableName))).thenReturn(collStats);
- doCallRealMethod().when(mongodbConnector).getTableInfo(tapConnectorContext,tableName);
- TableInfo actualData = mongodbConnector.getTableInfo(tapConnectorContext, tableName);
- Assertions.assertTrue(actualData.getNumOfRows().longValue() == count);
- Assertions.assertTrue(actualData.getStorageSize().longValue() == size);
-
- }
- }
-
- @Nested
- @DisplayName("Method queryIndexes test")
- class QueryIndexesTest {
-
- private TapConnectorContext tapConnectorContext;
- private TapTable tapTable;
-
- @BeforeEach
- void setUp() {
- tapConnectorContext = mock(TapConnectorContext.class);
- tapTable = new TapTable("test");
- doCallRealMethod().when(mongodbConnector).queryIndexes(any(), any(), any());
- }
-
- @Test
- @DisplayName("test main process")
- void test1() {
- List listIndexes = new ArrayList<>();
- listIndexes.add(new Document("name", "_id_").append("key", new Document("_id", 1)));
- listIndexes.add(new Document("name", "uid_1").append("key", new Document("uid", 1)));
- listIndexes.add(new Document("name", "sub.sid1_1_sub.sid2_-1").append("key", new Document("sub.sid1", 1).append("sub.sid2", -1)));
- Iterator iterator = listIndexes.iterator();
- MongoCursor mongoCursor = mock(MongoCursor.class);
- when(mongoCursor.next()).thenAnswer(invocationOnMock -> iterator.next());
- when(mongoCursor.hasNext()).thenAnswer(invocationOnMock -> iterator.hasNext());
- ListIndexesIterable listIndexesIterable = mock(ListIndexesIterable.class);
- when(listIndexesIterable.iterator()).thenReturn(mongoCursor);
- doCallRealMethod().when(listIndexesIterable).forEach(any(Consumer.class));
- MongoCollection mongoCollection = mock(MongoCollection.class);
- when(mongoCollection.listIndexes()).thenReturn(listIndexesIterable);
- MongoDatabase mongoDatabase = mock(MongoDatabase.class);
- when(mongoDatabase.getCollection(tapTable.getId())).thenReturn(mongoCollection);
- ReflectionTestUtils.setField(mongodbConnector, "mongoDatabase", mongoDatabase);
- mongodbConnector.queryIndexes(tapConnectorContext, tapTable, indexes -> {
- assertEquals(3, indexes.size());
- TapIndex tapIndex = indexes.get(0);
- assertEquals(listIndexes.get(0).getString("name"), tapIndex.getName());
- List indexFields = tapIndex.getIndexFields();
- assertEquals(1, indexFields.size());
- assertEquals("_id", indexFields.get(0).getName());
- assertTrue(indexFields.get(0).getFieldAsc());
- tapIndex = indexes.get(2);
- assertEquals(listIndexes.get(2).getString("name"), tapIndex.getName());
- indexFields = tapIndex.getIndexFields();
- assertEquals("sub.sid2", indexFields.get(1).getName());
- assertFalse(indexFields.get(1).getFieldAsc());
- });
- }
-
- @Test
- @DisplayName("test other index key value: text")
- void test2() {
- List listIndexes = new ArrayList<>();
- listIndexes.add(new Document("name", "content_text").append("key", new Document("content", "text")));
- Iterator iterator = listIndexes.iterator();
- MongoCursor mongoCursor = mock(MongoCursor.class);
- when(mongoCursor.next()).thenAnswer(invocationOnMock -> iterator.next());
- when(mongoCursor.hasNext()).thenAnswer(invocationOnMock -> iterator.hasNext());
- ListIndexesIterable listIndexesIterable = mock(ListIndexesIterable.class);
- when(listIndexesIterable.iterator()).thenReturn(mongoCursor);
- doCallRealMethod().when(listIndexesIterable).forEach(any(Consumer.class));
- MongoCollection mongoCollection = mock(MongoCollection.class);
- when(mongoCollection.listIndexes()).thenReturn(listIndexesIterable);
- MongoDatabase mongoDatabase = mock(MongoDatabase.class);
- when(mongoDatabase.getCollection(tapTable.getId())).thenReturn(mongoCollection);
- ReflectionTestUtils.setField(mongodbConnector, "mongoDatabase", mongoDatabase);
- mongodbConnector.queryIndexes(tapConnectorContext, tapTable, indexes -> {
- assertEquals(1, indexes.size());
- TapIndex tapIndex = indexes.get(0);
- assertEquals(listIndexes.get(0).getString("name"), tapIndex.getName());
- List indexFields = tapIndex.getIndexFields();
- assertEquals(1, indexFields.size());
- assertEquals("content", indexFields.get(0).getName());
- assertTrue(indexFields.get(0).getFieldAsc());
- });
- }
-
- @Test
- @DisplayName("test unique index")
- void test3() {
- List listIndexes = new ArrayList<>();
- listIndexes.add(new Document("name", "uid_1").append("key", new Document("uid", 1)).append("unique", true));
- listIndexes.add(new Document("name", "uid1_1").append("key", new Document("uid1", 1)));
- Iterator iterator = listIndexes.iterator();
- MongoCursor mongoCursor = mock(MongoCursor.class);
- when(mongoCursor.next()).thenAnswer(invocationOnMock -> iterator.next());
- when(mongoCursor.hasNext()).thenAnswer(invocationOnMock -> iterator.hasNext());
- ListIndexesIterable listIndexesIterable = mock(ListIndexesIterable.class);
- when(listIndexesIterable.iterator()).thenReturn(mongoCursor);
- doCallRealMethod().when(listIndexesIterable).forEach(any(Consumer.class));
- MongoCollection mongoCollection = mock(MongoCollection.class);
- when(mongoCollection.listIndexes()).thenReturn(listIndexesIterable);
- MongoDatabase mongoDatabase = mock(MongoDatabase.class);
- when(mongoDatabase.getCollection(tapTable.getId())).thenReturn(mongoCollection);
- ReflectionTestUtils.setField(mongodbConnector, "mongoDatabase", mongoDatabase);
- mongodbConnector.queryIndexes(tapConnectorContext, tapTable, indexes -> {
- assertEquals(2, indexes.size());
- assertTrue(indexes.get(0).getUnique());
- assertFalse(indexes.get(1).getUnique());
- });
- }
- }
-}
\ No newline at end of file
diff --git a/connectors/mongodb-lower-connector/pom.xml b/connectors/mongodb-lower-connector/pom.xml
index 4f52b7ff9..97f7fb1fb 100644
--- a/connectors/mongodb-lower-connector/pom.xml
+++ b/connectors/mongodb-lower-connector/pom.xml
@@ -13,7 +13,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/opengauss-connector/pom.xml b/connectors/opengauss-connector/pom.xml
index f709b1313..8357d757b 100644
--- a/connectors/opengauss-connector/pom.xml
+++ b/connectors/opengauss-connector/pom.xml
@@ -15,7 +15,7 @@
UTF-8
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/polar-db-postgres-connector/pom.xml b/connectors/polar-db-postgres-connector/pom.xml
index 803b5d737..d40830ef9 100644
--- a/connectors/polar-db-postgres-connector/pom.xml
+++ b/connectors/polar-db-postgres-connector/pom.xml
@@ -23,7 +23,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/postgres-connector/pom.xml b/connectors/postgres-connector/pom.xml
index 742ca1dc4..c37f0041a 100644
--- a/connectors/postgres-connector/pom.xml
+++ b/connectors/postgres-connector/pom.xml
@@ -16,7 +16,7 @@
8
42.7.5
- 2.0.4-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java
index 9086fcaf4..a32657efe 100644
--- a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java
+++ b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java
@@ -40,6 +40,8 @@
import io.tapdata.kit.*;
import io.tapdata.pdk.apis.annotations.TapConnectorClass;
import io.tapdata.pdk.apis.consumer.StreamReadConsumer;
+import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer;
+import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer;
import io.tapdata.pdk.apis.context.TapConnectionContext;
import io.tapdata.pdk.apis.context.TapConnectorContext;
import io.tapdata.pdk.apis.entity.*;
@@ -148,6 +150,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec
connectorFunctions.supportBatchCount(this::batchCount);
connectorFunctions.supportBatchRead(this::batchReadWithoutOffset);
connectorFunctions.supportStreamRead(this::streamRead);
+ connectorFunctions.supportOneByOneStreamRead(this::streamReadOneByOne);
connectorFunctions.supportTimestampToStreamOffset(this::timestampToStreamOffset);
// query
connectorFunctions.supportQueryByFilter(this::queryByFilter);
@@ -604,27 +607,31 @@ protected void afterInitialSync(TapConnectorContext connectorContext, TapTable t
});
}
- private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, int recordSize, StreamReadConsumer consumer) throws Throwable {
+ private void streamReadOneByOne(TapConnectorContext nodeContext, List tableList, Object offsetState, StreamReadOneByOneConsumer consumer) throws Throwable {
+ streamRead(nodeContext, tableList, offsetState, consumer.getBatchSize(), consumer);
+ }
+
+ private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, int recordSize, TapStreamReadConsumer, Object> consumer) throws Throwable {
if ("walminer".equals(postgresConfig.getLogPluginName())) {
if (EmptyKit.isNotEmpty(postgresConfig.getPgtoHost())) {
new WalPgtoMiner(postgresJdbcContext, firstConnectorId, tapLogger)
.watch(tableList, nodeContext.getTableMap())
.offset(offsetState)
- .registerConsumer(consumer, recordSize)
+ .registerCdcConsumer(consumer, recordSize)
.startMiner(this::isAlive);
} else {
new WalLogMinerV2(postgresJdbcContext, tapLogger)
.watch(tableList, nodeContext.getTableMap())
.withWalLogDirectory(getWalDirectory())
.offset(offsetState)
- .registerConsumer(consumer, recordSize)
+ .registerCdcConsumer(consumer, recordSize)
.startMiner(this::isAlive);
}
} else {
cdcRunner = new PostgresCdcRunner(postgresJdbcContext, nodeContext);
testReplicateIdentity(nodeContext.getTableMap());
buildSlot(nodeContext, true);
- cdcRunner.useSlot(slotName.toString()).watch(tableList).offset(offsetState).registerConsumer(consumer, recordSize);
+ cdcRunner.useSlot(slotName.toString()).watch(tableList).offset(offsetState).registerCdcConsumer(consumer, recordSize);
cdcRunner.startCdcRunner();
if (EmptyKit.isNotNull(cdcRunner) && EmptyKit.isNotNull(cdcRunner.getThrowable().get())) {
Throwable throwable = ErrorKit.getLastCause(cdcRunner.getThrowable().get());
@@ -684,21 +691,21 @@ private void streamReadMultiConnection(TapConnectorContext nodeContext, List
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
diff --git a/connectors/vastbase-connector/pom.xml b/connectors/vastbase-connector/pom.xml
index e0dd296ef..76c6661d0 100644
--- a/connectors/vastbase-connector/pom.xml
+++ b/connectors/vastbase-connector/pom.xml
@@ -15,7 +15,7 @@
8
- 2.0.0-SNAPSHOT
+ 2.0.5-SNAPSHOT
1.5.4.Final