diff --git a/connectors-common/cdc-core/pom.xml b/connectors-common/cdc-core/pom.xml new file mode 100644 index 000000000..56e0f6626 --- /dev/null +++ b/connectors-common/cdc-core/pom.xml @@ -0,0 +1,29 @@ + + + 4.0.0 + + io.tapdata + connectors-common + 1.0-SNAPSHOT + + + cdc-core + + + 17 + 17 + UTF-8 + 2.0.5-SNAPSHOT + + + + + io.tapdata + tapdata-pdk-api + ${tapdata.pdk.api.verison} + + + + \ No newline at end of file diff --git a/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/Acceptor.java b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/Acceptor.java new file mode 100644 index 000000000..453f04dd1 --- /dev/null +++ b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/Acceptor.java @@ -0,0 +1,29 @@ +package io.tapdata.cdc; + +import io.tapdata.entity.event.TapEvent; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; + +import java.util.List; + +public interface Acceptor> { + + void accept(EType e); + + void accept(List e, Object offset); + + Acc setConsumer(Consumer consumer); + + Acc setBatchSize(int size); + + Acc setBatchSizeTimeout(long ms); + + void streamReadStarted(); + + void streamReadEnded(); + + default int getBatchSize() { + return 1; + } + + Consumer getConsumer(); +} diff --git a/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/CustomAbstractAccepter.java b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/CustomAbstractAccepter.java new file mode 100644 index 000000000..65e716396 --- /dev/null +++ b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/CustomAbstractAccepter.java @@ -0,0 +1,13 @@ +package io.tapdata.cdc; + +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 15:03 Create + * @description + */ +public abstract class CustomAbstractAccepter, Consumer extends TapStreamReadConsumer> + implements Acceptor { +} diff --git a/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/EventAbstractAccepter.java b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/EventAbstractAccepter.java new file mode 100644 index 000000000..ef270ac1e --- /dev/null +++ b/connectors-common/cdc-core/src/main/java/io/tapdata/cdc/EventAbstractAccepter.java @@ -0,0 +1,13 @@ +package io.tapdata.cdc; + +import io.tapdata.entity.event.TapEvent; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 15:00 Create + * @description + */ +public abstract class EventAbstractAccepter, Consumer extends TapStreamReadConsumer> extends CustomAbstractAccepter { +} \ No newline at end of file diff --git a/connectors-common/pom.xml b/connectors-common/pom.xml index fb3509acf..8e729463c 100644 --- a/connectors-common/pom.xml +++ b/connectors-common/pom.xml @@ -22,6 +22,7 @@ js-connector-core-plus read-partition hive-core + cdc-core diff --git a/connectors-common/postgres-core/pom.xml b/connectors-common/postgres-core/pom.xml index 24fd0f92b..9e0c817bd 100644 --- a/connectors-common/postgres-core/pom.xml +++ b/connectors-common/postgres-core/pom.xml @@ -21,7 +21,7 @@ 1.0-SNAPSHOT 1.5.4.Final 1.0-SNAPSHOT - 2.0.1-SNAPSHOT + 2.0.5-SNAPSHOT 1.2.83 @@ -124,6 +124,11 @@ commons-compress 1.27.1 + + io.tapdata + cdc-core + 1.0-SNAPSHOT + diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java index bf81703d5..e28568fec 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/AbstractWalLogMiner.java @@ -1,8 +1,11 @@ package io.tapdata.connector.postgres.cdc; import com.google.common.collect.Lists; +import io.tapdata.cdc.CustomAbstractAccepter; import io.tapdata.common.sqlparser.ResultDO; import io.tapdata.connector.postgres.PostgresJdbcContext; +import io.tapdata.connector.postgres.cdc.accept.LogMinerProBatchAccepter; +import io.tapdata.connector.postgres.cdc.accept.LogMinerProOneByOneAccepter; import io.tapdata.connector.postgres.config.PostgresConfig; import io.tapdata.entity.event.TapEvent; import io.tapdata.entity.event.dml.TapDeleteRecordEvent; @@ -18,6 +21,8 @@ import io.tapdata.kit.EmptyKit; import io.tapdata.kit.StringKit; import io.tapdata.pdk.apis.consumer.StreamReadConsumer; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; import java.math.BigDecimal; import java.sql.ResultSet; @@ -25,7 +30,14 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalTime; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -35,8 +47,8 @@ public abstract class AbstractWalLogMiner { protected final PostgresJdbcContext postgresJdbcContext; protected final Log tapLogger; - protected StreamReadConsumer consumer; - protected int recordSize; + //protected StreamReadConsumer consumer; + protected CustomAbstractAccepter consumer; protected List tableList; protected boolean filterSchema; private Map dataTypeMap; @@ -95,9 +107,26 @@ public AbstractWalLogMiner withWalLogDirectory(String walLogDirectory) { public abstract void startMiner(Supplier isAlive) throws Throwable; + /** + * @deprecated + * */ public AbstractWalLogMiner registerConsumer(StreamReadConsumer consumer, int recordSize) { - this.consumer = consumer; - this.recordSize = recordSize; + return registerCdcConsumer(consumer, recordSize); + } + + public AbstractWalLogMiner registerCdcConsumer(TapStreamReadConsumer consumer, int recordSize) { + if (consumer instanceof StreamReadConsumer) { + this.consumer = new LogMinerProBatchAccepter() + .setConsumer((StreamReadConsumer) consumer) + .setBatchSize(recordSize) + .setEventCreator(this::createEvent); + } else if (consumer instanceof StreamReadOneByOneConsumer) { + this.consumer = new LogMinerProOneByOneAccepter() + .setConsumer((StreamReadOneByOneConsumer) consumer) + .setEventCreator(this::createEvent); + } else { + throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName()); + } return this; } diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java index f3f51840e..cbb00f90c 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/PostgresCdcRunner.java @@ -6,11 +6,12 @@ import io.debezium.engine.DebeziumEngine; import io.debezium.engine.StopEngineException; import io.tapdata.connector.postgres.PostgresJdbcContext; +import io.tapdata.connector.postgres.cdc.accept.DebeziumBatchAccepter; +import io.tapdata.connector.postgres.cdc.accept.DebeziumOneByOneAccepter; +import io.tapdata.connector.postgres.cdc.accept.PGEventAbstractAccepter; import io.tapdata.connector.postgres.cdc.config.PostgresDebeziumConfig; import io.tapdata.connector.postgres.cdc.offset.PostgresOffset; -import io.tapdata.connector.postgres.cdc.offset.PostgresOffsetStorage; import io.tapdata.connector.postgres.config.PostgresConfig; -import io.tapdata.entity.event.TapEvent; import io.tapdata.entity.event.control.HeartbeatEvent; import io.tapdata.entity.event.dml.TapDeleteRecordEvent; import io.tapdata.entity.event.dml.TapInsertRecordEvent; @@ -19,7 +20,6 @@ import io.tapdata.entity.logger.TapLogger; import io.tapdata.entity.schema.TapTable; import io.tapdata.entity.schema.partition.TapSubPartitionTableInfo; -import io.tapdata.entity.simplify.TapSimplify; import io.tapdata.entity.utils.DataMap; import io.tapdata.entity.utils.cache.Entry; import io.tapdata.entity.utils.cache.Iterator; @@ -27,6 +27,8 @@ import io.tapdata.kit.EmptyKit; import io.tapdata.kit.NumberKit; import io.tapdata.pdk.apis.consumer.StreamReadConsumer; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; import io.tapdata.pdk.apis.context.TapConnectorContext; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; @@ -39,8 +41,16 @@ import java.time.LocalDate; import java.time.LocalTime; import java.time.ZoneOffset; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TimeZone; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -57,8 +67,8 @@ public class PostgresCdcRunner extends DebeziumCdcRunner { private final TapConnectorContext connectorContext; private PostgresDebeziumConfig postgresDebeziumConfig; private PostgresOffset postgresOffset; - private int recordSize; - private StreamReadConsumer consumer; + //private StreamReadConsumer consumer; + protected PGEventAbstractAccepter consumer; private final AtomicReference throwableAtomicReference = new AtomicReference<>(); protected TimeZone timeZone; private String dropTransactionId = null; @@ -145,9 +155,27 @@ public AtomicReference getThrowable() { return throwableAtomicReference; } + /** + * @deprecated + * */ public void registerConsumer(StreamReadConsumer consumer, int recordSize) { - this.recordSize = recordSize; - this.consumer = consumer; + registerCdcConsumer(consumer, recordSize); + } + + public void registerCdcConsumer(TapStreamReadConsumer consumer, int recordSize) { + Supplier batchSizeGetter; + if (consumer instanceof StreamReadConsumer) { + this.consumer = new DebeziumBatchAccepter() + .setConsumer((StreamReadConsumer) consumer) + .setBatchSize(recordSize); + batchSizeGetter = () -> recordSize; + } else if (consumer instanceof StreamReadOneByOneConsumer) { + this.consumer = new DebeziumOneByOneAccepter() + .setConsumer((StreamReadOneByOneConsumer) consumer); + batchSizeGetter = () -> ((StreamReadOneByOneConsumer) consumer).getBatchSize(); + } else { + throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName()); + } //build debezium engine this.engine = (EmbeddedEngine) EmbeddedEngine.create() .using(postgresDebeziumConfig.create()) @@ -167,8 +195,12 @@ public void taskStopped() { // .using(this.getClass().getClassLoader()) // .using(Clock.SYSTEM) // .notifying(this::consumeRecord) - .using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) -> - numberOfMessagesSinceLastCommit >= recordSize || timeSinceLastCommit.getSeconds() >= 5) + .using((numberOfMessagesSinceLastCommit, timeSinceLastCommit) -> { + int size = Math.min(Math.max(1, batchSizeGetter.get()), 10000); + //超时时间最小1秒,最大10秒 + int timeout = Math.min(Math.max(1, size / 100), 5); + return numberOfMessagesSinceLastCommit >= size || timeSinceLastCommit.getSeconds() >= timeout; + }) .notifying(this::consumeRecords).using((result, message, throwable) -> { if (result) { if (StringUtils.isNotBlank(message)) { @@ -195,7 +227,7 @@ public void taskStopped() { @Override public void consumeRecords(List sourceRecords, DebeziumEngine.RecordCommitter committer) throws InterruptedException { super.consumeRecords(sourceRecords, committer); - List eventList = TapSimplify.list(); + //List eventList = TapSimplify.list(); Map offset = null; for (SourceRecord sr : sourceRecords) { try { @@ -207,7 +239,8 @@ public void consumeRecords(List sourceRecords, DebeziumEngine.Reco continue; } if ("io.debezium.connector.common.Heartbeat".equals(sr.valueSchema().name())) { - eventList.add(new HeartbeatEvent().init().referenceTime(((Struct) sr.value()).getInt64("ts_ms"))); + consumer.updateOffset(offset) + .accept(new HeartbeatEvent().init().referenceTime(((Struct) sr.value()).getInt64("ts_ms"))); continue; } else if (EmptyKit.isNull(sr.valueSchema().field("op"))) { continue; @@ -260,22 +293,12 @@ public void consumeRecords(List sourceRecords, DebeziumEngine.Reco event.setNamespaces(Lists.newArrayList(schema, table)); } } - eventList.add(event); - if (eventList.size() >= recordSize) { - PostgresOffset postgresOffset = new PostgresOffset(); - postgresOffset.setSourceOffset(TapSimplify.toJson(offset)); - consumer.accept(eventList, postgresOffset); - eventList = TapSimplify.list(); - } + consumer.updateOffset(offset).accept(event); } catch (StopConnectorException | StopEngineException ex) { throw ex; } } - if (EmptyKit.isNotEmpty(eventList)) { - PostgresOffset postgresOffset = new PostgresOffset(); - postgresOffset.setSourceOffset(TapSimplify.toJson(offset)); - consumer.accept(eventList, postgresOffset); - } + consumer.updateOffset(offset).accept(null); } private DataMap getMapFromStruct(Struct struct) { diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java index 38d91ffb1..9311754f4 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMiner.java @@ -3,22 +3,25 @@ import io.tapdata.common.concurrent.ConcurrentProcessor; import io.tapdata.common.concurrent.TapExecutors; import io.tapdata.connector.postgres.PostgresJdbcContext; -import io.tapdata.entity.event.TapEvent; +import io.tapdata.connector.postgres.cdc.accept.LogMinerBatchAccepter; +import io.tapdata.connector.postgres.cdc.accept.LogMinerOneByOneAccepter; import io.tapdata.entity.logger.Log; import io.tapdata.entity.simplify.TapSimplify; import io.tapdata.exception.TapPdkOffsetOutOfLogEx; import io.tapdata.kit.EmptyKit; +import io.tapdata.pdk.apis.consumer.StreamReadConsumer; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; -import java.sql.*; -import java.util.ArrayList; -import java.util.List; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.sql.Timestamp; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; -import static io.tapdata.base.ConnectorBase.list; - public class WalLogMiner extends AbstractWalLogMiner { private String walFile; @@ -29,6 +32,22 @@ public WalLogMiner(PostgresJdbcContext postgresJdbcContext, Log tapLogger) { super(postgresJdbcContext, tapLogger); } + public AbstractWalLogMiner registerCdcConsumer(TapStreamReadConsumer consumer, int recordSize) { + if (consumer instanceof StreamReadConsumer) { + this.consumer = new LogMinerBatchAccepter() + .setConsumer((StreamReadConsumer) consumer) + .setBatchSize(recordSize) + .setEventCreator(this::createEvent); + } else if (consumer instanceof StreamReadOneByOneConsumer) { + this.consumer = new LogMinerOneByOneAccepter() + .setConsumer((StreamReadOneByOneConsumer) consumer) + .setEventCreator(this::createEvent); + } else { + throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName()); + } + return this; + } + public WalLogMiner offset(Object offsetState) { if (offsetState instanceof String) { String[] fileAndLsn = ((String) offsetState).split(","); @@ -64,24 +83,10 @@ public void startMiner(Supplier isAlive) throws Throwable { } Thread t = new Thread(() -> { consumer.streamReadStarted(); - NormalRedo lastRedo = null; - AtomicReference> events = new AtomicReference<>(list()); while (isAlive.get()) { try { NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS); - if (EmptyKit.isNotNull(redo)) { - lastRedo = redo; - events.get().add(createEvent(redo)); - if (events.get().size() >= recordSize) { - consumer.accept(events.get(), redo.getCdcSequenceStr()); - events.set(new ArrayList<>()); - } - } else { - if (events.get().size() > 0) { - consumer.accept(events.get(), lastRedo.getCdcSequenceStr()); - events.set(new ArrayList<>()); - } - } + consumer.accept(redo); } catch (Exception e) { threadException.set(e); return; diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java index e467fbd56..3acd0993f 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV2.java @@ -3,8 +3,6 @@ import io.tapdata.common.concurrent.ConcurrentProcessor; import io.tapdata.common.concurrent.TapExecutors; import io.tapdata.connector.postgres.PostgresJdbcContext; -import io.tapdata.entity.event.TapEvent; -import io.tapdata.entity.event.control.HeartbeatEvent; import io.tapdata.entity.logger.Log; import io.tapdata.entity.simplify.TapSimplify; import io.tapdata.kit.EmptyKit; @@ -21,8 +19,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static io.tapdata.base.ConnectorBase.list; - public class WalLogMinerV2 extends AbstractWalLogMiner { private String startLsn; @@ -46,28 +42,10 @@ public void startMiner(Supplier isAlive) throws Throwable { ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner"); Thread t = new Thread(() -> { consumer.streamReadStarted(); - NormalRedo lastRedo = null; - AtomicReference> events = new AtomicReference<>(list()); while (isAlive.get()) { try { NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS); - if (EmptyKit.isNotNull(redo)) { - if (EmptyKit.isNotNull(redo.getOperation())) { - lastRedo = redo; - events.get().add(createEvent(redo)); - if (events.get().size() >= recordSize) { - consumer.accept(events.get(), redo.getCdcSequenceStr()); - events.set(new ArrayList<>()); - } - } else { - consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr()); - } - } else { - if (events.get().size() > 0) { - consumer.accept(events.get(), lastRedo.getCdcSequenceStr()); - events.set(new ArrayList<>()); - } - } + consumer.accept(redo); } catch (Exception e) { threadException.set(e); return; diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java index 605eee1ce..9bfeca62e 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalLogMinerV3.java @@ -3,8 +3,6 @@ import io.tapdata.common.concurrent.ConcurrentProcessor; import io.tapdata.common.concurrent.TapExecutors; import io.tapdata.connector.postgres.PostgresJdbcContext; -import io.tapdata.entity.event.TapEvent; -import io.tapdata.entity.event.control.HeartbeatEvent; import io.tapdata.entity.logger.Log; import io.tapdata.entity.simplify.TapSimplify; import io.tapdata.kit.EmptyKit; @@ -15,16 +13,11 @@ import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; -import static io.tapdata.base.ConnectorBase.list; - public class WalLogMinerV3 extends AbstractWalLogMiner { private String timestamp; @@ -42,28 +35,10 @@ public void startMiner(Supplier isAlive) throws Throwable { ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner"); Thread t = new Thread(() -> { consumer.streamReadStarted(); - NormalRedo lastRedo = null; - AtomicReference> events = new AtomicReference<>(list()); while (isAlive.get()) { try { NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS); - if (EmptyKit.isNotNull(redo)) { - if (EmptyKit.isNotNull(redo.getOperation())) { - lastRedo = redo; - events.get().add(createEvent(redo)); - if (events.get().size() >= recordSize) { - consumer.accept(events.get(), redo.getCdcSequenceStr()); - events.set(new ArrayList<>()); - } - } else { - consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr()); - } - } else { - if (events.get().size() > 0) { - consumer.accept(events.get(), lastRedo.getCdcSequenceStr()); - events.set(new ArrayList<>()); - } - } + consumer.accept(redo); } catch (Exception e) { threadException.set(e); return; diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java index 1a278bbc6..4c882722d 100644 --- a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/WalPgtoMiner.java @@ -1,24 +1,19 @@ package io.tapdata.connector.postgres.cdc; import com.alibaba.fastjson.JSONObject; -import io.tapdata.base.ConnectorBase; import io.tapdata.common.concurrent.ConcurrentProcessor; import io.tapdata.common.concurrent.TapExecutors; import io.tapdata.common.sqlparser.ResultDO; import io.tapdata.connector.postgres.PostgresJdbcContext; -import io.tapdata.entity.event.TapEvent; -import io.tapdata.entity.event.control.HeartbeatEvent; import io.tapdata.entity.logger.Log; import io.tapdata.entity.simplify.TapSimplify; import io.tapdata.kit.EmptyKit; import io.tapdata.kit.HttpKit; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; public class WalPgtoMiner extends AbstractWalLogMiner { @@ -65,28 +60,10 @@ public void startMiner(Supplier isAlive) throws Throwable { try (ConcurrentProcessor concurrentProcessor = TapExecutors.createSimple(8, 32, "wal-miner")) { Thread t = new Thread(() -> { consumer.streamReadStarted(); - NormalRedo lastRedo = null; - AtomicReference> events = new AtomicReference<>(ConnectorBase.list()); while (isAlive.get()) { try { NormalRedo redo = concurrentProcessor.get(2, TimeUnit.SECONDS); - if (EmptyKit.isNotNull(redo)) { - if (EmptyKit.isNotNull(redo.getOperation())) { - lastRedo = redo; - events.get().add(createEvent(redo)); - if (events.get().size() >= recordSize) { - consumer.accept(events.get(), redo.getCdcSequenceStr()); - events.set(new ArrayList<>()); - } - } else { - consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr()); - } - } else { - if (!events.get().isEmpty()) { - consumer.accept(events.get(), lastRedo.getCdcSequenceStr()); - events.set(new ArrayList<>()); - } - } + consumer.accept(redo); } catch (Exception e) { threadException.set(e); return; diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumBatchAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumBatchAccepter.java new file mode 100644 index 000000000..384f76970 --- /dev/null +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumBatchAccepter.java @@ -0,0 +1,83 @@ +package io.tapdata.connector.postgres.cdc.accept; + +import io.tapdata.connector.postgres.cdc.offset.PostgresOffset; +import io.tapdata.entity.event.TapEvent; +import io.tapdata.entity.simplify.TapSimplify; +import io.tapdata.pdk.apis.consumer.StreamReadConsumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 14:19 Create + * @description + */ +public class DebeziumBatchAccepter + extends PGEventAbstractAccepter { + List eventList = new ArrayList<>(); + StreamReadConsumer consumer; + int batchSize = 1; + long batchSizeTimeout = 1000L; + + @Override + public void accept(TapEvent e) { + if (null != e) { + eventList.add(e); + } else { + if (!eventList.isEmpty()) { + acceptOnce(); + } + } + if (eventList.size() >= getBatchSize()) { + acceptOnce(); + } + } + + void acceptOnce() { + PostgresOffset postgresOffset = new PostgresOffset(); + postgresOffset.setSourceOffset(TapSimplify.toJson(offset)); + getConsumer().accept(eventList, postgresOffset); + eventList = TapSimplify.list(); + } + + @Override + public void accept(List e, Object offset) { + getConsumer().accept(e, offset); + } + + @Override + public DebeziumBatchAccepter setConsumer(StreamReadConsumer consumer) { + this.consumer = consumer; + return this; + } + + @Override + public DebeziumBatchAccepter setBatchSize(int size) { + this.batchSize = size; + return this; + } + + @Override + public DebeziumBatchAccepter setBatchSizeTimeout(long ms) { + this.batchSizeTimeout = ms; + return this; + } + + @Override + public void streamReadStarted() { + this.consumer.streamReadStarted(); + } + + @Override + public void streamReadEnded() { + this.consumer.streamReadEnded(); + } + + @Override + public StreamReadConsumer getConsumer() { + return this.consumer; + } +} diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumOneByOneAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumOneByOneAccepter.java new file mode 100644 index 000000000..af6ae2757 --- /dev/null +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/DebeziumOneByOneAccepter.java @@ -0,0 +1,86 @@ +package io.tapdata.connector.postgres.cdc.accept; + +import io.tapdata.connector.postgres.cdc.offset.PostgresOffset; +import io.tapdata.entity.event.TapEvent; +import io.tapdata.entity.simplify.TapSimplify; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; + +import java.util.List; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 14:20 Create + * @description + */ +public class DebeziumOneByOneAccepter + extends PGEventAbstractAccepter { + int acceptCount = 0; + boolean first = true; + StreamReadOneByOneConsumer consumer; + int batchSize = 1; + long batchSizeTimeout = 1000L; + + @Override + public void accept(TapEvent e) { + if (null == e) { + return; + } + acceptCount++; + boolean calcOffset = first || acceptCount >= getBatchSize(); + PostgresOffset postgresOffset = null; + if (calcOffset) { + postgresOffset = new PostgresOffset(); + postgresOffset.setSourceOffset(TapSimplify.toJson(offset)); + acceptCount = 0; + } + if (first) { + first = false; + } + getConsumer().accept(e, postgresOffset); + } + + + @Override + public void accept(List e, Object offset) { + getConsumer().accept(e, offset); + } + + @Override + public DebeziumOneByOneAccepter setConsumer(StreamReadOneByOneConsumer consumer) { + this.consumer = consumer; + return this; + } + + @Override + public DebeziumOneByOneAccepter setBatchSize(int size) { + this.batchSize = size; + return this; + } + + @Override + public DebeziumOneByOneAccepter setBatchSizeTimeout(long ms) { + this.batchSizeTimeout = ms; + return this; + } + + @Override + public void streamReadStarted() { + getConsumer().streamReadStarted(); + } + + @Override + public void streamReadEnded() { + getConsumer().streamReadEnded(); + } + + @Override + public StreamReadOneByOneConsumer getConsumer() { + return this.consumer; + } + + @Override + public int getBatchSize() { + return getConsumer().getBatchSize(); + } +} diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerBatchAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerBatchAccepter.java new file mode 100644 index 000000000..96b418c51 --- /dev/null +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerBatchAccepter.java @@ -0,0 +1,91 @@ +package io.tapdata.connector.postgres.cdc.accept; + +import io.tapdata.cdc.CustomAbstractAccepter; +import io.tapdata.connector.postgres.cdc.NormalRedo; +import io.tapdata.entity.event.TapEvent; +import io.tapdata.kit.EmptyKit; +import io.tapdata.pdk.apis.consumer.StreamReadConsumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 14:19 Create + * @description + */ +public class LogMinerBatchAccepter + extends CustomAbstractAccepter { + protected NormalRedo lastRedo; + protected List events = new ArrayList<>(); + protected Function eventCreator; + protected StreamReadConsumer consumer; + protected int batchSize = 1; + protected long batchSizeTimeout = 1000L; + + @Override + public void accept(NormalRedo redo) { + if (EmptyKit.isNotNull(redo)) { + lastRedo = redo; + TapEvent e = eventCreator.apply(redo); + if (null == e) { + return; + } + events.add(e); + if (events.size() >= getBatchSize()) { + getConsumer().accept(events, redo.getCdcSequenceStr()); + events = new ArrayList<>(); + } + } else { + if (!events.isEmpty()) { + getConsumer().accept(events, lastRedo.getCdcSequenceStr()); + events = new ArrayList<>(); + } + } + } + + @Override + public void accept(List e, Object offset) { + getConsumer().accept(e, offset); + } + + @Override + public LogMinerBatchAccepter setConsumer(StreamReadConsumer consumer) { + this.consumer = consumer; + return this; + } + + @Override + public LogMinerBatchAccepter setBatchSize(int size) { + this.batchSize = size; + return this; + } + + @Override + public LogMinerBatchAccepter setBatchSizeTimeout(long ms) { + this.batchSizeTimeout = ms; + return this; + } + + @Override + public void streamReadStarted() { + this.consumer.streamReadStarted(); + } + + @Override + public void streamReadEnded() { + this.consumer.streamReadEnded(); + } + + @Override + public StreamReadConsumer getConsumer() { + return this.consumer; + } + + public LogMinerBatchAccepter setEventCreator(Function eventCreator) { + this.eventCreator = eventCreator; + return this; + } +} diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerOneByOneAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerOneByOneAccepter.java new file mode 100644 index 000000000..8d023ce5b --- /dev/null +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerOneByOneAccepter.java @@ -0,0 +1,84 @@ +package io.tapdata.connector.postgres.cdc.accept; + +import io.tapdata.cdc.CustomAbstractAccepter; +import io.tapdata.connector.postgres.cdc.NormalRedo; +import io.tapdata.entity.event.TapEvent; +import io.tapdata.kit.EmptyKit; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; + +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 14:19 Create + * @description + */ +public class LogMinerOneByOneAccepter + extends CustomAbstractAccepter { + protected Function eventCreator; + protected StreamReadOneByOneConsumer consumer; + protected long batchSizeTimeout = 1000L; + protected int batchSize = 1; + + @Override + public void accept(NormalRedo redo) { + if (EmptyKit.isNotNull(redo)) { + TapEvent e = eventCreator.apply(redo); + if (null == e) { + return; + } + getConsumer().accept(e, redo.getCdcSequenceStr()); + } + } + + @Override + public void accept(List e, Object offset) { + getConsumer().accept(e, offset); + } + + @Override + public LogMinerOneByOneAccepter setConsumer(StreamReadOneByOneConsumer consumer) { + this.consumer = consumer; + return this; + } + + @Override + public LogMinerOneByOneAccepter setBatchSize(int size) { + this.batchSize = size; + return this; + } + + @Override + public LogMinerOneByOneAccepter setBatchSizeTimeout(long ms) { + this.batchSizeTimeout = ms; + return this; + } + + @Override + public void streamReadStarted() { + this.consumer.streamReadStarted(); + } + + @Override + public void streamReadEnded() { + this.consumer.streamReadEnded(); + } + + @Override + public StreamReadOneByOneConsumer getConsumer() { + return this.consumer; + } + + @Override + public int getBatchSize() { + return Optional.ofNullable(getConsumer()).map(StreamReadOneByOneConsumer::getBatchSize).orElse(1); + } + + public LogMinerOneByOneAccepter setEventCreator(Function eventCreator) { + this.eventCreator = eventCreator; + return this; + } +} diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProBatchAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProBatchAccepter.java new file mode 100644 index 000000000..2e7a2793c --- /dev/null +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProBatchAccepter.java @@ -0,0 +1,42 @@ +package io.tapdata.connector.postgres.cdc.accept; + +import io.tapdata.connector.postgres.cdc.NormalRedo; +import io.tapdata.entity.event.TapEvent; +import io.tapdata.entity.event.control.HeartbeatEvent; +import io.tapdata.kit.EmptyKit; + +import java.util.ArrayList; +import java.util.Collections; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 16:11 Create + * @description + */ +public class LogMinerProBatchAccepter extends LogMinerBatchAccepter { + @Override + public void accept(NormalRedo redo) { + if (EmptyKit.isNotNull(redo)) { + if (EmptyKit.isNotNull(redo.getOperation())) { + lastRedo = redo; + TapEvent e = eventCreator.apply(redo); + if (null == e) { + return; + } + events.add(e); + if (events.size() >= getBatchSize()) { + getConsumer().accept(events, redo.getCdcSequenceStr()); + events = new ArrayList<>(); + } + } else { + consumer.accept(Collections.singletonList(new HeartbeatEvent().init().referenceTime(System.currentTimeMillis())), redo.getCdcSequenceStr()); + } + } else { + if (!events.isEmpty()) { + getConsumer().accept(events, lastRedo.getCdcSequenceStr()); + events = new ArrayList<>(); + } + } + } +} diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProOneByOneAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProOneByOneAccepter.java new file mode 100644 index 000000000..9681c1894 --- /dev/null +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/LogMinerProOneByOneAccepter.java @@ -0,0 +1,30 @@ +package io.tapdata.connector.postgres.cdc.accept; + +import io.tapdata.connector.postgres.cdc.NormalRedo; +import io.tapdata.entity.event.TapEvent; +import io.tapdata.entity.event.control.HeartbeatEvent; +import io.tapdata.kit.EmptyKit; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 16:11 Create + * @description + */ +public class LogMinerProOneByOneAccepter extends LogMinerOneByOneAccepter { + @Override + public void accept(NormalRedo redo) { + if (EmptyKit.isNotNull(redo)) { + TapEvent e = null; + if (EmptyKit.isNotNull(redo.getOperation())) { + e = eventCreator.apply(redo); + } else { + e = new HeartbeatEvent().init().referenceTime(System.currentTimeMillis()); + } + if (null == e) { + return; + } + getConsumer().accept(e, redo.getCdcSequenceStr()); + } + } +} diff --git a/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/PGEventAbstractAccepter.java b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/PGEventAbstractAccepter.java new file mode 100644 index 000000000..97e728028 --- /dev/null +++ b/connectors-common/postgres-core/src/main/java/io/tapdata/connector/postgres/cdc/accept/PGEventAbstractAccepter.java @@ -0,0 +1,21 @@ +package io.tapdata.connector.postgres.cdc.accept; + +import io.tapdata.cdc.EventAbstractAccepter; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; + +import java.util.Map; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/10 17:16 Create + * @description + */ +public abstract class PGEventAbstractAccepter, Consumer extends TapStreamReadConsumer> extends EventAbstractAccepter { + protected Map offset; + + public T updateOffset(Map offset) { + this.offset = offset; + return (T) this; + } +} diff --git a/connectors/aliyun-adb-postgres-connector/pom.xml b/connectors/aliyun-adb-postgres-connector/pom.xml index fde69c624..6b14c53dc 100644 --- a/connectors/aliyun-adb-postgres-connector/pom.xml +++ b/connectors/aliyun-adb-postgres-connector/pom.xml @@ -22,7 +22,7 @@ - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/aliyun-rds-postgres-connector/pom.xml b/connectors/aliyun-rds-postgres-connector/pom.xml index cba9f8240..d2882f881 100644 --- a/connectors/aliyun-rds-postgres-connector/pom.xml +++ b/connectors/aliyun-rds-postgres-connector/pom.xml @@ -23,7 +23,7 @@ - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/dws-connector/pom.xml b/connectors/dws-connector/pom.xml index 231865362..351e14beb 100644 --- a/connectors/dws-connector/pom.xml +++ b/connectors/dws-connector/pom.xml @@ -15,7 +15,7 @@ 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/greenplum-connector/pom.xml b/connectors/greenplum-connector/pom.xml index f1948d02e..073777d39 100644 --- a/connectors/greenplum-connector/pom.xml +++ b/connectors/greenplum-connector/pom.xml @@ -15,7 +15,7 @@ UTF-8 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/highgo-connector/pom.xml b/connectors/highgo-connector/pom.xml index 6aec28edf..dbbd66358 100644 --- a/connectors/highgo-connector/pom.xml +++ b/connectors/highgo-connector/pom.xml @@ -15,7 +15,7 @@ 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT 1.5.4.Final diff --git a/connectors/mongodb-connector/pom.xml b/connectors/mongodb-connector/pom.xml index b4fc0b6bb..6e7df67db 100644 --- a/connectors/mongodb-connector/pom.xml +++ b/connectors/mongodb-connector/pom.xml @@ -13,7 +13,7 @@ 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT @@ -129,6 +129,11 @@ nashorn-core 15.4 + + io.tapdata + cdc-core + 1.0-SNAPSHOT + diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java index 916af764b..49e794a71 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/MongodbConnector.java @@ -1,10 +1,44 @@ package io.tapdata.mongodb; import com.alibaba.fastjson.JSONObject; -import com.mongodb.*; +import com.mongodb.CreateIndexCommitQuorum; +import com.mongodb.MongoBulkWriteException; +import com.mongodb.MongoClientException; +import com.mongodb.MongoCommandException; +import com.mongodb.MongoConfigurationException; +import com.mongodb.MongoConnectionPoolClearedException; +import com.mongodb.MongoCursorNotFoundException; +import com.mongodb.MongoException; +import com.mongodb.MongoInterruptedException; +import com.mongodb.MongoNodeIsRecoveringException; +import com.mongodb.MongoNotPrimaryException; +import com.mongodb.MongoQueryException; +import com.mongodb.MongoSecurityException; +import com.mongodb.MongoServerUnavailableException; +import com.mongodb.MongoSocketClosedException; +import com.mongodb.MongoSocketException; +import com.mongodb.MongoSocketOpenException; +import com.mongodb.MongoSocketReadException; +import com.mongodb.MongoSocketReadTimeoutException; +import com.mongodb.MongoSocketWriteException; +import com.mongodb.MongoTimeoutException; +import com.mongodb.MongoWriteConcernException; +import com.mongodb.MongoWriteException; import com.mongodb.bulk.BulkWriteError; -import com.mongodb.client.*; -import com.mongodb.client.model.*; +import com.mongodb.client.ClientSession; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; +import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.CreateIndexOptions; +import com.mongodb.client.model.IndexModel; +import com.mongodb.client.model.IndexOptions; +import com.mongodb.client.model.Sorts; +import com.mongodb.client.model.TimeSeriesGranularity; +import com.mongodb.client.model.TimeSeriesOptions; import io.tapdata.base.ConnectorBase; import io.tapdata.common.CommonDbConfig; import io.tapdata.entity.codec.TapCodecsRegistry; @@ -18,8 +52,21 @@ import io.tapdata.entity.event.dml.TapUpdateRecordEvent; import io.tapdata.entity.logger.Log; import io.tapdata.entity.logger.TapLogger; -import io.tapdata.entity.schema.*; -import io.tapdata.entity.schema.value.*; +import io.tapdata.entity.schema.TapField; +import io.tapdata.entity.schema.TapIndex; +import io.tapdata.entity.schema.TapIndexEx; +import io.tapdata.entity.schema.TapIndexField; +import io.tapdata.entity.schema.TapTable; +import io.tapdata.entity.schema.value.ByteData; +import io.tapdata.entity.schema.value.DateTime; +import io.tapdata.entity.schema.value.TapBinaryValue; +import io.tapdata.entity.schema.value.TapDateTimeValue; +import io.tapdata.entity.schema.value.TapDateValue; +import io.tapdata.entity.schema.value.TapNumberValue; +import io.tapdata.entity.schema.value.TapStringValue; +import io.tapdata.entity.schema.value.TapTimeValue; +import io.tapdata.entity.schema.value.TapValue; +import io.tapdata.entity.schema.value.TapYearValue; import io.tapdata.entity.simplify.TapSimplify; import io.tapdata.entity.utils.DataMap; import io.tapdata.entity.utils.InstanceFactory; @@ -40,9 +87,20 @@ import io.tapdata.partition.DatabaseReadPartitionSplitter; import io.tapdata.pdk.apis.annotations.TapConnectorClass; import io.tapdata.pdk.apis.consumer.StreamReadConsumer; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; import io.tapdata.pdk.apis.context.TapConnectionContext; import io.tapdata.pdk.apis.context.TapConnectorContext; -import io.tapdata.pdk.apis.entity.*; +import io.tapdata.pdk.apis.entity.Capability; +import io.tapdata.pdk.apis.entity.ConnectionOptions; +import io.tapdata.pdk.apis.entity.ExecuteResult; +import io.tapdata.pdk.apis.entity.FilterResults; +import io.tapdata.pdk.apis.entity.Projection; +import io.tapdata.pdk.apis.entity.QueryOperator; +import io.tapdata.pdk.apis.entity.SortOn; +import io.tapdata.pdk.apis.entity.TapAdvanceFilter; +import io.tapdata.pdk.apis.entity.TapExecuteCommand; +import io.tapdata.pdk.apis.entity.TestItem; +import io.tapdata.pdk.apis.entity.WriteListResult; import io.tapdata.pdk.apis.exception.NotSupportedException; import io.tapdata.pdk.apis.exception.TapTestItemException; import io.tapdata.pdk.apis.functions.ConnectorFunctions; @@ -57,16 +115,46 @@ import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; -import org.bson.*; +import org.bson.BsonArray; +import org.bson.BsonBoolean; +import org.bson.BsonDocument; +import org.bson.BsonInt64; +import org.bson.BsonString; +import org.bson.BsonType; +import org.bson.BsonValue; +import org.bson.Document; +import org.bson.RawBsonDocument; import org.bson.conversions.Bson; -import org.bson.types.*; +import org.bson.types.Binary; +import org.bson.types.Code; +import org.bson.types.Decimal128; +import org.bson.types.ObjectId; +import org.bson.types.Symbol; import java.io.Closeable; import java.math.BigDecimal; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; @@ -75,7 +163,12 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static com.mongodb.client.model.Filters.*; +import static com.mongodb.client.model.Filters.and; +import static com.mongodb.client.model.Filters.eq; +import static com.mongodb.client.model.Filters.gt; +import static com.mongodb.client.model.Filters.gte; +import static com.mongodb.client.model.Filters.lt; +import static com.mongodb.client.model.Filters.lte; import static java.util.Collections.singletonList; /** @@ -554,6 +647,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec connectorFunctions.supportCreateIndex(this::createIndex); connectorFunctions.supportCreateTableV2(this::createTableV2); connectorFunctions.supportStreamRead(this::streamRead); + connectorFunctions.supportOneByOneStreamRead(this::streamReadOneByOne); connectorFunctions.supportTimestampToStreamOffset(this::streamOffset); connectorFunctions.supportErrorHandleFunction(this::errorHandle); @@ -1640,19 +1734,32 @@ protected Object streamOffset(TapConnectorContext connectorContext, Long offsetS protected void streamRead(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) { int size = tableList.size(); MongoCdcOffset mongoCdcOffset = MongoCdcOffset.fromOffset(offset); - streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer); + streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset()); if (size == tableList.size() || !tableList.isEmpty()) { if (mongodbStreamReader == null) { mongodbStreamReader = createStreamReader(); } - mongodbStreamReader.onStart(mongoConfig); - doStreamRead(mongodbStreamReader, connectorContext, tableList, mongoCdcOffset.getCdcOffset(), eventBatchSize, consumer); + mongodbStreamReader.initAcceptor(eventBatchSize, consumer).onStart(mongoConfig); + doStreamRead(mongodbStreamReader, connectorContext, tableList, mongoCdcOffset.getCdcOffset()); + } + } + protected void streamReadOneByOne(TapConnectorContext connectorContext, List tableList, Object offset, StreamReadOneByOneConsumer consumer) { + int size = tableList.size(); + MongoCdcOffset mongoCdcOffset = MongoCdcOffset.fromOffset(offset); + streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset()); + if (size == tableList.size() || !tableList.isEmpty()) { + if (mongodbStreamReader == null) { + mongodbStreamReader = createStreamReader(); + } + mongodbStreamReader.initAcceptor(consumer.getBatchSize(), consumer) + .onStart(mongoConfig); + doStreamRead(mongodbStreamReader, connectorContext, tableList, mongoCdcOffset.getCdcOffset()); } } - protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) { + protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContext connectorContext, List tableList, Object offset) { try { - streamReader.read(connectorContext, tableList, offset, eventBatchSize, consumer); + streamReader.read(connectorContext, tableList, offset); } catch (Exception e) { exceptionCollector.collectTerminateByServer(e); exceptionCollector.collectWritePrivileges(e); @@ -1666,7 +1773,7 @@ protected void doStreamRead(MongodbStreamReader streamReader, TapConnectorContex } } - protected void streamReadOpLog(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) { + protected void streamReadOpLog(TapConnectorContext connectorContext, List tableList, Object offset) { String database = mongoConfig.getDatabase(); if (StreamWithOpLogCollection.OP_LOG_DB.equals(database) && tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)) { connectorContext.getLog().info("Start read oplog collection, db: local"); @@ -1676,7 +1783,7 @@ protected void streamReadOpLog(TapConnectorContext connectorContext, List doStreamRead(opLogStreamReader, connectorContext, list(StreamWithOpLogCollection.OP_LOG_COLLECTION), offset, eventBatchSize, consumer)); + sourceRunnerFuture = sourceRunner.submit(() -> doStreamRead(opLogStreamReader, connectorContext, list(StreamWithOpLogCollection.OP_LOG_COLLECTION), offset)); } } } diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java index cefd9f403..f03112e1d 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbStreamReader.java @@ -1,7 +1,7 @@ package io.tapdata.mongodb.reader; import io.tapdata.mongodb.entity.MongodbConfig; -import io.tapdata.pdk.apis.consumer.StreamReadConsumer; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; import io.tapdata.pdk.apis.context.TapConnectorContext; import java.util.List; @@ -13,8 +13,9 @@ public interface MongodbStreamReader { void onStart(MongodbConfig mongodbConfig); - void read(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) throws Exception; + MongodbStreamReader initAcceptor(int eventBatchSize, TapStreamReadConsumer consumer); + void read(TapConnectorContext connectorContext, List tableList, Object offset) throws Exception; Object streamOffset(Long offsetStartTime); void onDestroy(); diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java index 62d8989de..1a38d2e0a 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/MongodbV4StreamReader.java @@ -23,8 +23,13 @@ import io.tapdata.mongodb.MongodbExceptionCollector; import io.tapdata.mongodb.MongodbUtil; import io.tapdata.mongodb.entity.MongodbConfig; +import io.tapdata.mongodb.reader.cdc.v4.NormalV4Acceptor; +import io.tapdata.mongodb.reader.cdc.v4.OneByOneV4Acceptor; +import io.tapdata.mongodb.reader.cdc.v4.V4Accept; import io.tapdata.mongodb.util.MongodbLookupUtil; import io.tapdata.pdk.apis.consumer.StreamReadConsumer; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; import io.tapdata.pdk.apis.context.TapConnectorContext; import org.apache.commons.collections4.MapUtils; import org.bson.*; @@ -34,7 +39,6 @@ import org.bson.io.ByteBufferBsonInput; import java.nio.ByteBuffer; -import java.time.Instant; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,6 +68,7 @@ public class MongodbV4StreamReader implements MongodbStreamReader { private MongodbExceptionCollector mongodbExceptionCollector; private String dropTransactionId; private Thread consumeStreamEventThread; + private V4Accept v4Acceptor; public MongodbV4StreamReader setPreImage(boolean isPreImage) { this.isPreImage = isPreImage; @@ -74,6 +79,21 @@ public MongodbV4StreamReader() { this.mongodbExceptionCollector = new MongodbExceptionCollector(); } + @Override + public MongodbV4StreamReader initAcceptor(int batchSize, TapStreamReadConsumer consumer) { + if (consumer instanceof StreamReadConsumer) { + this.v4Acceptor = new NormalV4Acceptor() + .setConsumer((StreamReadConsumer) consumer) + .setBatchSize(batchSize); + } else if (consumer instanceof StreamReadOneByOneConsumer) { + this.v4Acceptor = new OneByOneV4Acceptor() + .setConsumer((StreamReadOneByOneConsumer) consumer); + } else { + throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName()); + } + return this; + } + @Override public void onStart(MongodbConfig mongodbConfig) { this.mongodbConfig = mongodbConfig; @@ -89,7 +109,7 @@ public void onStart(MongodbConfig mongodbConfig) { } @Override - public void read(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) throws Exception { + public void read(TapConnectorContext connectorContext, List tableList, Object offset) throws Exception { openChangeStreamPreAndPostImages(tableList); if (Boolean.TRUE.equals(mongodbConfig.getDoubleActive())) { tableList.add("_tap_double_active"); @@ -125,38 +145,15 @@ public void read(TapConnectorContext connectorContext, List tableList, O if (isPreImage) { changeStream.fullDocumentBeforeChange(fullDocumentBeforeChangeOption); } - consumer.streamReadStarted(); + this.v4Acceptor.streamReadStarted(); AtomicReference throwableAtomicReference = new AtomicReference<>(); try (final MongoChangeStreamCursor> streamCursor = changeStream.cursor()) { consumeStreamEventThread = new Thread(() -> { - List events = list(); - OffsetEvent lastOffsetEvent = null; - long lastSendTime = System.currentTimeMillis(); - // Calculate time window based on batch size - // If batch size <= 500, use fixed 50ms window - // If batch size > 500, use dynamic window = batch size / 10 (ms) - long timeWindowMs = eventBatchSize <= 500 ? 50 : eventBatchSize / 10; + while (running.get()) { try { OffsetEvent event = concurrentProcessor.get(10, TimeUnit.MILLISECONDS); - if (EmptyKit.isNotNull(event)) { - lastOffsetEvent = event; - events.add(event.getEvent()); - // Check batch size OR time window - if (events.size() >= eventBatchSize || - (System.currentTimeMillis() - lastSendTime > timeWindowMs)) { - consumer.accept(events, event.getOffset()); - events.clear(); - lastSendTime = System.currentTimeMillis(); - } - } else { - // Send remaining events when queue is empty - if (!events.isEmpty() && lastOffsetEvent != null) { - consumer.accept(events, lastOffsetEvent.getOffset()); - events.clear(); - lastSendTime = System.currentTimeMillis(); - } - } + this.v4Acceptor.accept(event); } catch (Exception e) { throwableAtomicReference.set(e); return; @@ -212,7 +209,7 @@ public void read(TapConnectorContext connectorContext, List tableList, O } } - static class OffsetEvent { + public static class OffsetEvent { private final Object offset; private final TapEvent event; diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/NormalV3Acceptor.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/NormalV3Acceptor.java new file mode 100644 index 000000000..69d13975b --- /dev/null +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/NormalV3Acceptor.java @@ -0,0 +1,94 @@ +package io.tapdata.mongodb.reader.cdc.v3; + +import io.tapdata.entity.event.TapEvent; +import io.tapdata.mongodb.reader.v3.MongoV3StreamOffset; +import io.tapdata.mongodb.reader.v3.TapEventOffset; +import io.tapdata.pdk.apis.consumer.StreamReadConsumer; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/8 17:30 Create + * @description + */ +public class NormalV3Acceptor extends V3Accept { + StreamReadConsumer consumer; + + int batchSize = 1; + + long batchSizeTimeout = TimeUnit.SECONDS.toMillis(1L); + + long lastConsumeTs; + + List tapEvents = new ArrayList<>(); + + // Calculate time window based on batch size + // If batch size <= 500, use fixed 50ms window + // If batch size > 500, use dynamic window = batch size / 10 (ms) + @Override + public void accept(TapEventOffset tapEventOffset) { + if (tapEventOffset != null) { + tapEvents.add(tapEventOffset.getTapEvent()); + this.offset.put(tapEventOffset.getReplicaSetName(), tapEventOffset.getOffset()); + } + + long consumeInterval = System.currentTimeMillis() - lastConsumeTs; + if (tapEvents.size() >= batchSize + || consumeInterval > batchSizeTimeout) { + Map newOffset = new ConcurrentHashMap<>(this.offset); + consumer.accept(tapEvents, newOffset); + tapEvents = new ArrayList<>(batchSize); + lastConsumeTs = System.currentTimeMillis(); + } + } + + @Override + public void accept(List e, Object offset) { + consumer.accept(e, offset); + } + + @Override + public NormalV3Acceptor setConsumer(StreamReadConsumer consumer) { + this.consumer = consumer; + return this; + } + + @Override + public NormalV3Acceptor setBatchSize(int size) { + this.batchSize = size; + return this; + } + + @Override + public NormalV3Acceptor setBatchSizeTimeout(long ms) { + this.batchSizeTimeout = ms; + return this; + } + + @Override + public void streamReadStarted() { + this.lastConsumeTs = System.currentTimeMillis(); + this.consumer.streamReadStarted(); + } + + @Override + public void streamReadEnded() { + this.consumer.streamReadEnded(); + } + + @Override + public int getBatchSize() { + return batchSize; + } + + @Override + public StreamReadConsumer getConsumer() { + return this.consumer; + } +} diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/OneByOneV3Acceptor.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/OneByOneV3Acceptor.java new file mode 100644 index 000000000..012bb6cf3 --- /dev/null +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/OneByOneV3Acceptor.java @@ -0,0 +1,69 @@ +package io.tapdata.mongodb.reader.cdc.v3; + +import io.tapdata.entity.event.TapEvent; +import io.tapdata.mongodb.reader.v3.TapEventOffset; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.HashMap; +import java.util.List; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/8 17:30 Create + * @description + */ +public class OneByOneV3Acceptor extends V3Accept { + StreamReadOneByOneConsumer consumer; + + @Override + public void accept(TapEventOffset e) { + if (e == null || null == e.getTapEvent()) { + return; + } + this.offset.put(e.getReplicaSetName(), e.getOffset()); + consumer.accept(e.getTapEvent(), new HashMap<>(offset)); + } + + @Override + public void accept(List e, Object offset) { + if (CollectionUtils.isEmpty(e)) { + return; + } + consumer.accept(e, offset); + } + + @Override + public OneByOneV3Acceptor setConsumer(StreamReadOneByOneConsumer consumer) { + this.consumer = consumer; + return this; + } + + @Override + public OneByOneV3Acceptor setBatchSize(int size) { + return this; + } + + @Override + public OneByOneV3Acceptor setBatchSizeTimeout(long ms) { + return this; + } + + @Override + public void streamReadStarted() { + this.consumer.streamReadStarted(); + } + + @Override + public void streamReadEnded() { + this.consumer.streamReadEnded(); + } + + @Override + public StreamReadOneByOneConsumer getConsumer() { + return this.consumer; + } + + +} diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/V3Accept.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/V3Accept.java new file mode 100644 index 000000000..5c1a8316c --- /dev/null +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v3/V3Accept.java @@ -0,0 +1,25 @@ +package io.tapdata.mongodb.reader.cdc.v3; + +import io.tapdata.cdc.CustomAbstractAccepter; +import io.tapdata.mongodb.reader.v3.MongoV3StreamOffset; +import io.tapdata.mongodb.reader.v3.TapEventOffset; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; + +import java.util.Map; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/9 11:06 Create + * @description + */ +public abstract class V3Accept, Consumer extends TapStreamReadConsumer> + extends CustomAbstractAccepter { + + protected Map offset; + + public void setOffset(Map offset) { + this.offset = offset; + } + +} diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/NormalV4Acceptor.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/NormalV4Acceptor.java new file mode 100644 index 000000000..8fc86aa8c --- /dev/null +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/NormalV4Acceptor.java @@ -0,0 +1,99 @@ +package io.tapdata.mongodb.reader.cdc.v4; + +import io.tapdata.entity.event.TapEvent; +import io.tapdata.kit.EmptyKit; +import io.tapdata.mongodb.reader.MongodbV4StreamReader; +import io.tapdata.pdk.apis.consumer.StreamReadConsumer; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/8 17:30 Create + * @description + */ +public class NormalV4Acceptor extends V4Accept { + StreamReadConsumer consumer; + + int batchSize = 1; + + long batchSizeTimeout = 1000L; + + MongodbV4StreamReader.OffsetEvent lastOffsetEvent = null; + + long lastSendTime; + + List events = new ArrayList<>(); + + // Calculate time window based on batch size + // If batch size <= 500, use fixed 50ms window + // If batch size > 500, use dynamic window = batch size / 10 (ms) + @Override + public void accept(MongodbV4StreamReader.OffsetEvent event) { + if (EmptyKit.isNotNull(event)) { + lastOffsetEvent = event; + events.add(event.getEvent()); + // Check batch size OR time window + if (events.size() >= batchSize || + (System.currentTimeMillis() - lastSendTime > batchSizeTimeout)) { + consumer.accept(events, event.getOffset()); + events.clear(); + lastSendTime = System.currentTimeMillis(); + } + } else { + // Send remaining events when queue is empty + if (!events.isEmpty() && lastOffsetEvent != null) { + consumer.accept(events, lastOffsetEvent.getOffset()); + events.clear(); + lastSendTime = System.currentTimeMillis(); + } + } + } + + @Override + public void accept(List e, Object offset) { + this.consumer.accept(e, offset); + } + + @Override + public NormalV4Acceptor setConsumer(StreamReadConsumer consumer) { + this.consumer = consumer; + return this; + } + + @Override + public NormalV4Acceptor setBatchSize(int size) { + this.batchSize = size; + return this; + } + + @Override + public NormalV4Acceptor setBatchSizeTimeout(long ms) { + this.batchSizeTimeout = ms; + return this; + } + + @Override + public void streamReadStarted() { + this.lastSendTime = System.currentTimeMillis(); + this.consumer.streamReadStarted(); + } + + @Override + public void streamReadEnded() { + this.consumer.streamReadEnded(); + } + + + @Override + public int getBatchSize() { + return batchSize; + } + + @Override + public StreamReadConsumer getConsumer() { + return this.consumer; + } +} diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/OneByOneV4Acceptor.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/OneByOneV4Acceptor.java new file mode 100644 index 000000000..a8fb12c56 --- /dev/null +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/OneByOneV4Acceptor.java @@ -0,0 +1,65 @@ +package io.tapdata.mongodb.reader.cdc.v4; + +import io.tapdata.entity.event.TapEvent; +import io.tapdata.mongodb.reader.MongodbV4StreamReader; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.List; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/8 17:30 Create + * @description + */ +public class OneByOneV4Acceptor extends V4Accept { + StreamReadOneByOneConsumer consumer; + + @Override + public void accept(MongodbV4StreamReader.OffsetEvent e) { + if (null == e || e.getEvent() == null) { + return; + } + this.consumer.accept(e.getEvent(), e.getOffset()); + } + + @Override + public void accept(List e, Object offset) { + if (CollectionUtils.isEmpty(e)) { + return; + } + this.consumer.accept(e, offset); + } + + @Override + public OneByOneV4Acceptor setConsumer(StreamReadOneByOneConsumer consumer) { + this.consumer = consumer; + return this; + } + + @Override + public OneByOneV4Acceptor setBatchSize(int size) { + return this; + } + + @Override + public OneByOneV4Acceptor setBatchSizeTimeout(long ms) { + return this; + } + + @Override + public void streamReadStarted() { + this.consumer.streamReadStarted(); + } + + @Override + public void streamReadEnded() { + this.consumer.streamReadEnded(); + } + + @Override + public StreamReadOneByOneConsumer getConsumer() { + return this.consumer; + } +} diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/V4Accept.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/V4Accept.java new file mode 100644 index 000000000..dfa5577b3 --- /dev/null +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/cdc/v4/V4Accept.java @@ -0,0 +1,15 @@ +package io.tapdata.mongodb.reader.cdc.v4; + +import io.tapdata.cdc.CustomAbstractAccepter; +import io.tapdata.mongodb.reader.MongodbV4StreamReader; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; + +/** + * @author Gavin'Xiao + * @author Gavin'Xiao + * @version v1.0 2025/12/9 11:07 Create + * @description + */ +public abstract class V4Accept, Consumer extends TapStreamReadConsumer> + extends CustomAbstractAccepter { +} diff --git a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java index 16e7e87d2..b9b20bbdc 100644 --- a/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java +++ b/connectors/mongodb-connector/src/main/java/io/tapdata/mongodb/reader/v3/MongodbV3StreamReader.java @@ -9,9 +9,8 @@ import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Filters; import io.tapdata.entity.event.TapBaseEvent; -import io.tapdata.entity.event.TapEvent; -import io.tapdata.entity.event.dml.TapUpdateRecordEvent; import io.tapdata.entity.event.control.HeartbeatEvent; +import io.tapdata.entity.event.dml.TapUpdateRecordEvent; import io.tapdata.entity.logger.TapLogger; import io.tapdata.entity.utils.cache.KVMap; import io.tapdata.exception.TapPdkOffsetOutOfLogEx; @@ -19,9 +18,14 @@ import io.tapdata.mongodb.MongodbUtil; import io.tapdata.mongodb.entity.MongodbConfig; import io.tapdata.mongodb.reader.MongodbStreamReader; +import io.tapdata.mongodb.reader.cdc.v3.NormalV3Acceptor; +import io.tapdata.mongodb.reader.cdc.v3.OneByOneV3Acceptor; +import io.tapdata.mongodb.reader.cdc.v3.V3Accept; import io.tapdata.mongodb.util.IntervalReport; import io.tapdata.mongodb.util.MongodbLookupUtil; import io.tapdata.pdk.apis.consumer.StreamReadConsumer; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; import io.tapdata.pdk.apis.context.TapConnectorContext; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -30,14 +34,21 @@ import org.bson.conversions.Bson; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static io.tapdata.base.ConnectorBase.*; +import static io.tapdata.base.ConnectorBase.deleteDMLEvent; +import static io.tapdata.base.ConnectorBase.insertRecordEvent; +import static io.tapdata.base.ConnectorBase.updateDMLEvent; /** * @author jackin @@ -49,7 +60,6 @@ public class MongodbV3StreamReader implements MongodbStreamReader { private final static String LOCAL_DATABASE = "local"; private final static String OPLOG_COLLECTION = "oplog.rs"; - public static final long CONSUME_TIME_OUT = TimeUnit.SECONDS.toMillis(1L); private MongodbConfig mongodbConfig; @@ -73,6 +83,8 @@ public class MongodbV3StreamReader implements MongodbStreamReader { private ConnectionString connectionString; + V3Accept acceptor; + @Override public void onStart(MongodbConfig mongodbConfig) { this.mongodbConfig = mongodbConfig; @@ -84,8 +96,25 @@ public void onStart(MongodbConfig mongodbConfig) { replicaSetReadThreadPool = new ThreadPoolExecutor(nodesURI.size(), nodesURI.size(), 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); } + @Override - public void read(TapConnectorContext connectorContext, List tableList, Object offset, int eventBatchSize, StreamReadConsumer consumer) throws Exception { + public MongodbV3StreamReader initAcceptor(int eventBatchSize, TapStreamReadConsumer consumer) { + if (consumer instanceof StreamReadConsumer) { + this.acceptor = new NormalV3Acceptor() + .setConsumer((StreamReadConsumer) consumer) + .setBatchSize(eventBatchSize) + .setBatchSizeTimeout(eventBatchSize <= 500 ? 50 : eventBatchSize / 10); + } else if (consumer instanceof StreamReadOneByOneConsumer) { + this.acceptor = new OneByOneV3Acceptor() + .setConsumer((StreamReadOneByOneConsumer) consumer); + } else { + throw new IllegalArgumentException("Unsupported consumer type: " + consumer.getClass().getName()); + } + return this; + } + + @Override + public void read(TapConnectorContext connectorContext, List tableList, Object offset) throws Exception { if (CollectionUtils.isNotEmpty(tableList)) { for (String tableName : tableList) { namespaces.add(new StringBuilder(mongodbConfig.getDatabase()).append(".").append(tableName).toString()); @@ -93,7 +122,7 @@ public void read(TapConnectorContext connectorContext, List tableList, O } if (tapEventQueue == null) { - tapEventQueue = new LinkedBlockingDeque<>(eventBatchSize); + tapEventQueue = new LinkedBlockingDeque<>(this.acceptor.getBatchSize()); } if (this.globalStateMap == null) { @@ -105,6 +134,7 @@ public void read(TapConnectorContext connectorContext, List tableList, O } else { this.offset = new ConcurrentHashMap<>(); } + this.acceptor.setOffset(this.offset); if (MapUtils.isNotEmpty(nodesURI)) { int intervalReportMillis = 3000; // interval report offset if no match any table event @@ -136,7 +166,7 @@ protected void report(BsonTimestamp bsonTimestamp) throws InterruptedException { if (running.get()) { try { Thread.currentThread().setName("replicaSet-read-thread-" + replicaSetName); - readFromOplog(connectorContext, replicaSetName, intervalReport, mongodbURI, eventBatchSize, consumer); + readFromOplog(connectorContext, replicaSetName, intervalReport, mongodbURI); } catch (Exception e) { running.compareAndSet(true, false); TapLogger.error(TAG, "read oplog event from {} failed {}", replicaSetName, e.getMessage(), e); @@ -146,25 +176,10 @@ protected void report(BsonTimestamp bsonTimestamp) throws InterruptedException { }); } } - - List tapEvents = new ArrayList<>(eventBatchSize); - long lastConsumeTs = 0L; while (running.get()) { try { final TapEventOffset tapEventOffset = tapEventQueue.poll(3, TimeUnit.SECONDS); - if (tapEventOffset != null) { - tapEvents.add(tapEventOffset.getTapEvent()); - this.offset.put(tapEventOffset.getReplicaSetName(), tapEventOffset.getOffset()); - } - - long consumeInterval = System.currentTimeMillis() - lastConsumeTs; - if (tapEvents.size() >= eventBatchSize - || consumeInterval > CONSUME_TIME_OUT) { - Map newOffset = new ConcurrentHashMap<>(this.offset); - consumer.accept(tapEvents, newOffset); - tapEvents = new ArrayList<>(eventBatchSize); - lastConsumeTs = System.currentTimeMillis(); - } + this.acceptor.accept(tapEventOffset); } catch (InterruptedException e) { TapLogger.info("Stream polling failed: {}", e.getMessage(), e); break; @@ -209,7 +224,7 @@ public void onDestroy() { } } - private void readFromOplog(TapConnectorContext connectorContext, String replicaSetName, IntervalReport intervalReport, String mongodbURI, int eventBatchSize, StreamReadConsumer consumer) { + private void readFromOplog(TapConnectorContext connectorContext, String replicaSetName, IntervalReport intervalReport, String mongodbURI) { Bson filter = null; BsonTimestamp startTs = null; @@ -255,7 +270,7 @@ private void readFromOplog(TapConnectorContext connectorContext, String replicaS .cursorType(CursorType.TailableAwait) .noCursorTimeout(true).iterator()) { - consumer.streamReadStarted(); + this.acceptor.streamReadStarted(); while (running.get()) { if (mongoCursor.hasNext()) { final Document event = mongoCursor.next(); diff --git a/connectors/mongodb-connector/src/main/resources/spec.json b/connectors/mongodb-connector/src/main/resources/spec.json index 2e2a4f7be..d82329c88 100644 --- a/connectors/mongodb-connector/src/main/resources/spec.json +++ b/connectors/mongodb-connector/src/main/resources/spec.json @@ -4,7 +4,17 @@ "icon": "icons/mongodb.png", "doc" : "${doc}", "tags" : ["schema-free","Database","doubleActive"], - "id": "mongodb" + "id": "mongodb", + "autoAccumulateBatch": { + "batchRead": { + "open": false, + "maxDelayMs": "1000" + }, + "increaseRead": { + "open": true, + "maxDelayMs": "1000" + } + } }, "configOptions": { "capabilities": [ diff --git a/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbConnectorTest.java b/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbConnectorTest.java deleted file mode 100644 index 95e5795f2..000000000 --- a/connectors/mongodb-connector/src/test/java/io/tapdata/mongodb/MongodbConnectorTest.java +++ /dev/null @@ -1,810 +0,0 @@ -package io.tapdata.mongodb; - -import com.mongodb.client.*; -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoCollection; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.model.IndexOptions; -import io.tapdata.entity.event.TapEvent; -import io.tapdata.entity.event.ddl.index.TapCreateIndexEvent; -import io.tapdata.entity.logger.Log; -import io.tapdata.entity.schema.TapIndex; -import io.tapdata.entity.schema.TapIndexField; -import io.tapdata.entity.schema.TapIndexEx; -import io.tapdata.entity.schema.TapTable; -import io.tapdata.mongodb.batch.ErrorHandler; -import io.tapdata.mongodb.batch.MongoBatchReader; -import io.tapdata.mongodb.entity.MongoCdcOffset; -import io.tapdata.mongodb.entity.MongodbConfig; -import io.tapdata.mongodb.entity.ReadParam; -import io.tapdata.mongodb.reader.MongodbOpLogStreamV3Reader; -import io.tapdata.mongodb.reader.MongodbStreamReader; -import io.tapdata.mongodb.reader.StreamWithOpLogCollection; -import io.tapdata.pdk.apis.consumer.StreamReadConsumer; -import io.tapdata.pdk.apis.context.TapConnectionContext; -import io.tapdata.pdk.apis.context.TapConnectorContext; -import io.tapdata.pdk.apis.entity.ExecuteResult; -import io.tapdata.pdk.apis.entity.TapAdvanceFilter; -import io.tapdata.pdk.apis.entity.TapExecuteCommand; -import io.tapdata.pdk.apis.functions.connection.TableInfo; -import org.bson.Document; -import org.bson.conversions.Bson; -import org.junit.jupiter.api.*; -import org.mockito.MockedStatic; -import org.mockito.Mockito; -import org.springframework.test.util.ReflectionTestUtils; - -import java.util.*; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Supplier; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.*; -import static org.mockito.Mockito.*; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -class MongodbConnectorTest { - MongodbConnector mongodbConnector; - TapConnectorContext connectorContext; - MongodbConfig mongoConfig; - MongoClient mongoClient; - MongodbStreamReader mongodbStreamReader; - MongodbStreamReader opLogStreamReader; - MongodbExceptionCollector exceptionCollector; - ThreadPoolExecutor sourceRunner; - Future sourceRunnerFuture; - Log log; - @BeforeEach - void init() { - mongodbConnector = mock(MongodbConnector.class); - connectorContext = mock(TapConnectorContext.class); - mongoConfig = mock(MongodbConfig.class); - ReflectionTestUtils.setField(mongodbConnector, "mongoConfig", mongoConfig); - mongoClient = mock(MongoClient.class); - ReflectionTestUtils.setField(mongodbConnector,"mongoClient",mongoClient); - mongodbStreamReader = mock(MongodbStreamReader.class); - ReflectionTestUtils.setField(mongodbConnector, "mongodbStreamReader", mongodbStreamReader); - opLogStreamReader = mock(MongodbStreamReader.class); - ReflectionTestUtils.setField(mongodbConnector, "opLogStreamReader", opLogStreamReader); - exceptionCollector = new MongodbExceptionCollector(); - ReflectionTestUtils.setField(mongodbConnector, "exceptionCollector", exceptionCollector); - sourceRunner = mock(ThreadPoolExecutor.class); - ReflectionTestUtils.setField(mongodbConnector, "sourceRunner", sourceRunner); - sourceRunnerFuture = mock(Future.class); - ReflectionTestUtils.setField(mongodbConnector, "sourceRunnerFuture", sourceRunnerFuture); - log = mock(Log.class); - when(connectorContext.getLog()).thenReturn(log); - } - - @Nested - class BatchReadTest { - TapTable table; - Object offset; - int eventBatchSize; - BiConsumer, Object> tapReadOffsetConsumer; - MongoBatchReader reader; - Exception e; - @BeforeEach - void init() throws Throwable { - e = new Exception(); - reader = mock(MongoBatchReader.class); - table = mock(TapTable.class); - offset = 0L; - eventBatchSize = 100; - tapReadOffsetConsumer = mock(BiConsumer.class); - doNothing().when(mongodbConnector).errorHandle(e, connectorContext); - doAnswer(a -> { - ReadParam param = a.getArgument(0, ReadParam.class); - ErrorHandler errorHandler = param.getErrorHandler(); - errorHandler.doHandle(e); - return null; - }).when(reader).batchReadCollection(any(ReadParam.class)); - doCallRealMethod().when(mongodbConnector).batchRead(connectorContext, table, offset, eventBatchSize, tapReadOffsetConsumer); - } - @Test - void testNormal() throws Throwable { - try(MockedStatic mbr = mockStatic(MongoBatchReader.class)) { - mbr.when(() -> MongoBatchReader.of(any(ReadParam.class))).thenReturn(reader); - mongodbConnector.batchRead(connectorContext, table, offset, eventBatchSize, tapReadOffsetConsumer); - verify(mongodbConnector).errorHandle(e, connectorContext); - } - } - } - - @Nested - class StreamOffsetTest { - Long offsetStartTime; - - @BeforeEach - void init() { - offsetStartTime = 0L; - - when(mongoConfig.getDatabase()).thenReturn("test"); - when(opLogStreamReader.streamOffset(offsetStartTime)).thenReturn(0L); - when(mongodbStreamReader.streamOffset(offsetStartTime)).thenReturn(0L); - when(mongodbConnector.createStreamReader()).thenReturn(mongodbStreamReader); - doNothing().when(opLogStreamReader).onStart(mongoConfig); - when(mongodbConnector.streamOffset(connectorContext, offsetStartTime)).thenCallRealMethod(); - } - @Test - void testNormal() { - Object offset = mongodbConnector.streamOffset(connectorContext, offsetStartTime); - Assertions.assertNotNull(offset); - Assertions.assertEquals(HashMap.class.getName(), offset.getClass().getName()); - MongoCdcOffset o = MongoCdcOffset.fromOffset(offset); - Assertions.assertEquals(0L, o.getOpLogOffset()); - Assertions.assertEquals(0L, o.getCdcOffset()); - verify(opLogStreamReader).streamOffset(offsetStartTime); - verify(mongoConfig, times(0)).getDatabase(); - verify(mongodbStreamReader).streamOffset(offsetStartTime); - } - @Test - void testMongodbStreamReaderIsNull() { - ReflectionTestUtils.setField(mongodbConnector, "mongodbStreamReader", null); - when(mongodbConnector.createStreamReader()).thenReturn(mongodbStreamReader); - Object offset = mongodbConnector.streamOffset(connectorContext, offsetStartTime); - Assertions.assertNotNull(offset); - Assertions.assertEquals(HashMap.class.getName(), offset.getClass().getName()); - MongoCdcOffset o = MongoCdcOffset.fromOffset(offset); - Assertions.assertEquals(0L, o.getOpLogOffset()); - Assertions.assertEquals(0L, o.getCdcOffset()); - verify(opLogStreamReader).streamOffset(offsetStartTime); - verify(mongodbStreamReader).streamOffset(offsetStartTime); - verify(mongoConfig, times(0)).getDatabase(); - verify(mongodbConnector).createStreamReader(); - } - @Test - void testOpLogStreamReaderIsNullButDatabaseNotLocal() { - ReflectionTestUtils.setField(mongodbConnector, "opLogStreamReader", null); - Object offset = mongodbConnector.streamOffset(connectorContext, offsetStartTime); - Assertions.assertNotNull(offset); - Assertions.assertEquals(HashMap.class.getName(), offset.getClass().getName()); - MongoCdcOffset o = MongoCdcOffset.fromOffset(offset); - Assertions.assertNull(o.getOpLogOffset()); - Assertions.assertEquals(0L, o.getCdcOffset()); - verify(opLogStreamReader, times(0)).streamOffset(offsetStartTime); - verify(mongodbStreamReader).streamOffset(offsetStartTime); - verify(mongoConfig).getDatabase(); - verify(mongodbConnector, times(0)).createStreamReader(); - } - @Test - void testOpLogStreamReaderIsNullDatabaseIsLocal() { - when(mongoConfig.getDatabase()).thenReturn("local"); - ReflectionTestUtils.setField(mongodbConnector, "opLogStreamReader", null); - try(MockedStatic mol = mockStatic(MongodbOpLogStreamV3Reader.class)) { - mol.when(MongodbOpLogStreamV3Reader::of).thenReturn(opLogStreamReader); - Object offset = mongodbConnector.streamOffset(connectorContext, offsetStartTime); - Assertions.assertNotNull(offset); - Assertions.assertEquals(HashMap.class.getName(), offset.getClass().getName()); - MongoCdcOffset o = MongoCdcOffset.fromOffset(offset); - Assertions.assertEquals(0L, o.getOpLogOffset()); - Assertions.assertEquals(0L, o.getCdcOffset()); - verify(opLogStreamReader).streamOffset(offsetStartTime); - verify(mongodbStreamReader).streamOffset(offsetStartTime); - verify(mongoConfig).getDatabase(); - verify(mongodbConnector, times(0)).createStreamReader(); - verify(opLogStreamReader).onStart(mongoConfig); - } - } - } - - @Nested - class StreamReadTest { - List tableList; - Object offset; - int eventBatchSize; - StreamReadConsumer consumer; - MongoCdcOffset mongoCdcOffset; - - @BeforeEach - void init() { - mongoCdcOffset = new MongoCdcOffset(0L, 0L); - tableList = mock(List.class); - offset = 0L; - eventBatchSize = 100; - consumer = mock(StreamReadConsumer.class); - when(tableList.size()).thenReturn(1, 1); - - doNothing().when(mongodbConnector).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer); - when(tableList.isEmpty()).thenReturn(false); - when(mongodbConnector.createStreamReader()).thenReturn(mongodbStreamReader); - doNothing().when(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer); - doCallRealMethod().when(mongodbConnector).streamRead(connectorContext, tableList, offset, eventBatchSize, consumer); - } - @Test - void testNormal() { - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer)); - verify(tableList, times(2)).size(); - verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer); - verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer); - verify(tableList, times(0)).isEmpty(); - verify(mongodbConnector, times(0)).createStreamReader(); - verify(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer); - } - @Test - void testAfterReadOpLog() { - when(tableList.size()).thenReturn(1, 2); - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer)); - verify(tableList, times(2)).size(); - verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer); - verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer); - verify(tableList).isEmpty(); - verify(mongodbConnector, times(0)).createStreamReader(); - verify(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer); - } - @Test - void testMongodbStreamReaderIsNull() { - ReflectionTestUtils.setField(mongodbConnector, "mongodbStreamReader", null); - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer)); - verify(tableList, times(2)).size(); - verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer); - verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer); - verify(tableList, times(0)).isEmpty(); - verify(mongodbConnector, times(1)).createStreamReader(); - verify(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer); - } - @Test - void testOffsetIsMongoOffset() { - offset = mongoCdcOffset; - doCallRealMethod().when(mongodbConnector).streamRead(connectorContext, tableList, offset, eventBatchSize, consumer); - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer)); - verify(tableList, times(2)).size(); - verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer); - verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer); - verify(tableList, times(0)).isEmpty(); - verify(mongodbConnector, times(0)).createStreamReader(); - verify(mongodbConnector).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer); - } - @Test - void testOnlyReadOpLogCollection() { - when(tableList.size()).thenReturn(1, 2); - when(tableList.isEmpty()).thenReturn(true); - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamRead(connectorContext, tableList, offset, eventBatchSize, consumer)); - verify(tableList, times(2)).size(); - verify(mongodbConnector).streamReadOpLog(connectorContext, tableList, null, eventBatchSize, consumer); - verify(mongodbConnector, times(0)).streamReadOpLog(connectorContext, tableList, mongoCdcOffset.getOpLogOffset(), eventBatchSize, consumer); - verify(tableList).isEmpty(); - verify(mongodbConnector, times(0)).createStreamReader(); - verify(mongodbConnector, times(0)).doStreamRead(mongodbStreamReader, connectorContext, tableList, 0L, eventBatchSize, consumer); - } - } - - @Nested - class DoStreamTest { - List tableList; - Object offset; - int eventBatchSize; - StreamReadConsumer consumer; - MongoCdcOffset mongoCdcOffset; - MongodbStreamReader streamReader; - @BeforeEach - void init() throws Exception { - streamReader = mock(MongodbStreamReader.class); - mongoCdcOffset = new MongoCdcOffset(0L, 0L); - tableList = mock(List.class); - offset = 0L; - eventBatchSize = 100; - consumer = mock(StreamReadConsumer.class); - when(tableList.size()).thenReturn(1, 1); - - doNothing().when(streamReader).read(connectorContext, tableList, offset, eventBatchSize, consumer); - doNothing().when(streamReader).onDestroy(); - doNothing().when(mongodbConnector).errorHandle(any(Exception.class), any(TapConnectorContext.class)); - doNothing().when(log).debug(anyString(), anyString()); - - doCallRealMethod().when(mongodbConnector).doStreamRead(streamReader, connectorContext, tableList, offset, eventBatchSize, consumer); - } - @Test - void testNormal() throws Exception { - Assertions.assertDoesNotThrow(() -> mongodbConnector.doStreamRead(streamReader, connectorContext, tableList, offset, eventBatchSize, consumer)); - verify(streamReader, times(1)).read(connectorContext, tableList, offset, eventBatchSize, consumer); - verify(streamReader, times(0)).onDestroy(); - verify(mongodbConnector, times(0)).errorHandle(any(Exception.class), any(TapConnectorContext.class)); - verify(log, times(0)).debug(anyString(), anyString()); - } - @Test - void testException() throws Exception { - doAnswer(a -> { - throw new Exception("error"); - }).when(streamReader).read(connectorContext, tableList, offset, eventBatchSize, consumer); - Assertions.assertDoesNotThrow(() -> mongodbConnector.doStreamRead(streamReader, connectorContext, tableList, offset, eventBatchSize, consumer)); - verify(streamReader, times(1)).read(connectorContext, tableList, offset, eventBatchSize, consumer); - verify(streamReader, times(1)).onDestroy(); - verify(mongodbConnector, times(1)).errorHandle(any(Exception.class), any(TapConnectorContext.class)); - verify(log, times(0)).debug(anyString(), anyString()); - } - @Test - void testDestroyException() throws Exception { - doAnswer(a -> { - throw new Exception("error"); - }).when(streamReader).read(connectorContext, tableList, offset, eventBatchSize, consumer); - doAnswer(a -> { - throw new Exception("failed"); - }).when(streamReader).onDestroy(); - Assertions.assertDoesNotThrow(() -> mongodbConnector.doStreamRead(streamReader, connectorContext, tableList, offset, eventBatchSize, consumer)); - verify(streamReader, times(1)).read(connectorContext, tableList, offset, eventBatchSize, consumer); - verify(streamReader, times(1)).onDestroy(); - verify(mongodbConnector, times(1)).errorHandle(any(Exception.class), any(TapConnectorContext.class)); - verify(log, times(1)).debug(anyString(), anyString()); - } - } - - @Nested - class StreamReadOpLogTest { - List tableList; - Object offset; - int eventBatchSize; - StreamReadConsumer consumer; - MongoCdcOffset mongoCdcOffset; - @BeforeEach - void init() { - mongoCdcOffset = new MongoCdcOffset(0L, 0L); - tableList = mock(List.class); - offset = 0L; - eventBatchSize = 100; - consumer = mock(StreamReadConsumer.class); - - when(mongoConfig.getDatabase()).thenReturn("test"); - when(tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)).thenReturn(false); - doNothing().when(log).info("Start read oplog collection, db: local"); - when(tableList.remove(StreamWithOpLogCollection.OP_LOG_COLLECTION)).thenReturn(true); - when(tableList.isEmpty()).thenReturn(true); - doNothing().when(mongodbConnector).doStreamRead( - any(MongodbStreamReader.class), - any(TapConnectorContext.class), - anyList(), - any(), - anyInt(), - any(StreamReadConsumer.class)); - doCallRealMethod().when(mongodbConnector).streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer); - } - - @Test - void testNotReadOpLog() { - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer)); - verify(mongoConfig).getDatabase(); - verify(tableList, times(0)).contains(StreamWithOpLogCollection.OP_LOG_COLLECTION); - verify(connectorContext, times(0)).getLog(); - verify(log, times(0)).info("Start read oplog collection, db: local"); - verify(tableList, times(0)).remove(StreamWithOpLogCollection.OP_LOG_COLLECTION); - verify(tableList, times(0)).isEmpty(); - verify(mongodbConnector, times(0)).doStreamRead( - any(MongodbStreamReader.class), - any(TapConnectorContext.class), - anyList(), - any(), - anyInt(), - any(StreamReadConsumer.class)); - } - @Test - void testNotReadOpLog2() { - when(mongoConfig.getDatabase()).thenReturn("local"); - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer)); - verify(mongoConfig).getDatabase(); - verify(tableList, times(1)).contains(StreamWithOpLogCollection.OP_LOG_COLLECTION); - verify(connectorContext, times(0)).getLog(); - verify(log, times(0)).info("Start read oplog collection, db: local"); - verify(tableList, times(0)).remove(StreamWithOpLogCollection.OP_LOG_COLLECTION); - verify(tableList, times(0)).isEmpty(); - verify(mongodbConnector, times(0)).doStreamRead( - any(MongodbStreamReader.class), - any(TapConnectorContext.class), - anyList(), - any(), - anyInt(), - any(StreamReadConsumer.class)); - } - @Test - void testReadOpLog() { - when(mongoConfig.getDatabase()).thenReturn("local"); - when(tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)).thenReturn(true); - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer)); - verify(mongoConfig).getDatabase(); - verify(tableList, times(1)).contains(StreamWithOpLogCollection.OP_LOG_COLLECTION); - verify(connectorContext, times(1)).getLog(); - verify(log, times(1)).info("Start read oplog collection, db: local"); - verify(tableList, times(1)).remove(StreamWithOpLogCollection.OP_LOG_COLLECTION); - verify(tableList, times(1)).isEmpty(); - verify(mongodbConnector, times(1)).doStreamRead( - any(MongodbStreamReader.class), - any(TapConnectorContext.class), - anyList(), - any(), - anyInt(), - any(StreamReadConsumer.class)); - } - @Test - void testReadOpLogWithNullMongodbOpLogStreamV3Reader() { - try (MockedStatic mol = mockStatic(MongodbOpLogStreamV3Reader.class)) { - mol.when(MongodbOpLogStreamV3Reader::of).thenReturn(opLogStreamReader); - doNothing().when(opLogStreamReader).onStart(mongoConfig); - ReflectionTestUtils.setField(mongodbConnector, "opLogStreamReader", null); - when(mongoConfig.getDatabase()).thenReturn("local"); - when(tableList.contains(StreamWithOpLogCollection.OP_LOG_COLLECTION)).thenReturn(true); - Assertions.assertDoesNotThrow(() -> mongodbConnector.streamReadOpLog(connectorContext, tableList, 0L, eventBatchSize, consumer)); - verify(mongoConfig).getDatabase(); - verify(tableList, times(1)).contains(StreamWithOpLogCollection.OP_LOG_COLLECTION); - verify(connectorContext, times(1)).getLog(); - verify(log, times(1)).info("Start read oplog collection, db: local"); - verify(tableList, times(1)).remove(StreamWithOpLogCollection.OP_LOG_COLLECTION); - verify(tableList, times(1)).isEmpty(); - verify(opLogStreamReader).onStart(mongoConfig); - verify(mongodbConnector, times(1)).doStreamRead( - any(MongodbStreamReader.class), - any(TapConnectorContext.class), - anyList(), - any(), - anyInt(), - any(StreamReadConsumer.class)); - } - } - } - - @Nested - class CloseOpLogThreadSourceTest { - - @BeforeEach - void init() { - when(sourceRunnerFuture.cancel(true)).thenReturn(true); - - doNothing().when(sourceRunner).shutdown(); - doCallRealMethod().when(mongodbConnector).closeOpLogThreadSource(); - } - @Test - void testNormal() { - Assertions.assertDoesNotThrow(mongodbConnector::closeOpLogThreadSource); - verify(sourceRunnerFuture).cancel(true); - verify(sourceRunner).shutdown(); - Assertions.assertNull(mongodbConnector.sourceRunner); - Assertions.assertNull(mongodbConnector.sourceRunnerFuture); - } - @Test - void testThrowException() { - when(sourceRunnerFuture.cancel(true)).thenAnswer(a -> { - throw new Exception("failed"); - }); - doAnswer(a -> { - throw new Exception("failed"); - }).when(sourceRunner).shutdown(); - assertThrows(Exception.class,mongodbConnector::closeOpLogThreadSource); - verify(sourceRunnerFuture).cancel(true); - verify(sourceRunner).shutdown(); - Assertions.assertNull(mongodbConnector.sourceRunner); - Assertions.assertNull(mongodbConnector.sourceRunnerFuture); - } - } - - @Nested - class getMongoCollection{ - private MongoDatabase mongoDatabase; - private String table; - @BeforeEach - void beforeEach(){ - table = "table"; - mongoDatabase = mock(MongoDatabase.class); - ReflectionTestUtils.setField(mongodbConnector,"mongoDatabase",mongoDatabase); - } - @Test - void testGetMongoCollectionWithEx(){ - when(mongoDatabase.getCollection(table)).thenThrow(RuntimeException.class); - when(mongoConfig.getUri()).thenReturn("mongodb://127.0.0.1:27017/test"); - doCallRealMethod().when(mongodbConnector).getMongoCollection(table); - assertThrows(RuntimeException.class,()->mongodbConnector.getMongoCollection(table)); - } - } - @Nested - class executeCommand{ - private TapExecuteCommand tapExecuteCommand; - private Consumer executeResultConsumer; - private MongodbExecuteCommandFunction mongodbExecuteCommandFunction; - @BeforeEach - void beforeEach(){ - tapExecuteCommand = mock(TapExecuteCommand.class); - executeResultConsumer = mock(Consumer.class); - mongodbExecuteCommandFunction = mock(MongodbExecuteCommandFunction.class); - } - @Test - void testExecuteCommandWithEx(){ - Map executeObj = new HashMap<>(); - executeObj.put("database","test"); - when(tapExecuteCommand.getParams()).thenReturn(executeObj); - when(tapExecuteCommand.getCommand()).thenReturn("executeQuery"); - doThrow(new RuntimeException()).when(mongodbExecuteCommandFunction).executeQuery(anyMap(),any(MongoClient.class),any(Consumer.class),any(Supplier.class)); - doCallRealMethod().when(mongodbConnector).executeCommand(connectorContext,tapExecuteCommand,executeResultConsumer); - assertThrows(RuntimeException.class,()->mongodbConnector.executeCommand(connectorContext,tapExecuteCommand,executeResultConsumer)); - } - } - @Nested - class queryFieldMinMaxValue{ - private TapTable table; - private TapAdvanceFilter partitionFilter; - private String fieldName; - @BeforeEach - void beforeEach(){ - table = mock(TapTable.class); - partitionFilter = mock(TapAdvanceFilter.class); - fieldName = "field"; - when(table.getId()).thenReturn("table"); - } - @Test - void testQueryFieldMinMaxValueWithEx(){ - MongoCollection collection = mock(MongoCollection.class); - when(mongodbConnector.getMongoCollection("table")).thenReturn(collection); - TapIndexEx partitionIndex = mock(TapIndexEx.class); - when(table.partitionIndex()).thenReturn(partitionIndex); - List indexFields = new ArrayList<>(); - TapIndexField indexField = mock(TapIndexField.class); - when(indexField.getName()).thenReturn(fieldName); - indexFields.add(indexField); - when(partitionIndex.getIndexFields()).thenReturn(indexFields); - when(collection.find(any(Bson.class))).thenThrow(RuntimeException.class); - doCallRealMethod().when(mongodbConnector).queryFieldMinMaxValue(connectorContext,table,partitionFilter,fieldName); - assertThrows(RuntimeException.class,()->mongodbConnector.queryFieldMinMaxValue(connectorContext,table,partitionFilter,fieldName)); - } - } - @Nested - class batchCount{ - private TapTable table; - @Test - void testBatchCountWithEx() throws Throwable { - try (MockedStatic mb = Mockito - .mockStatic(MongodbConnector.class)) { - table = mock(TapTable.class); - when(table.getId()).thenReturn("collectionName"); - when(mongoConfig.getDatabase()).thenReturn("database"); - mb.when(()->MongodbConnector.getCollectionNotAggregateCountByTableName(mongoClient,"database","collectionName",null)).thenThrow(RuntimeException.class); - doCallRealMethod().when(mongodbConnector).batchCount(connectorContext,table); - assertThrows(RuntimeException.class,()->mongodbConnector.batchCount(connectorContext,table)); - } - } - } - @Nested - class createIndex{ - private TapTable table; - private List indexList; - private Log log; - private MongoDatabase mongoDatabase; - @BeforeEach - void beforeEach(){ - table = mock(TapTable.class); - when(table.getName()).thenReturn("table"); - indexList = new ArrayList<>(); - TapIndex tapIndex = new TapIndex(); - tapIndex.setName("__t__test"); - indexList.add(tapIndex); - log = mock(Log.class); - mongoDatabase = mock(MongoDatabase.class); - ReflectionTestUtils.setField(mongodbConnector,"mongoDatabase",mongoDatabase); - } - - @Test - void testCreateIndexWithExWhenCreateIndex(){ - try (MockedStatic mb = Mockito - .mockStatic(Document.class)) { - Document document = mock(Document.class); - mb.when(()->Document.parse("test")).thenReturn(document); - MongoCollection targetCollection = mock(MongoCollection.class); - when(mongoDatabase.getCollection("table")).thenReturn(targetCollection); - when(targetCollection.createIndex(any(),any(IndexOptions.class))).thenThrow(RuntimeException.class); - doCallRealMethod().when(mongodbConnector).createIndex(table,indexList,log); - mongodbConnector.createIndex(table,indexList,log); - verify(log).warn(anyString()); - } - } - } - @Nested - class createIndexWithTapCreateIndexEvent{ - private TapTable table; - private TapCreateIndexEvent tapCreateIndexEvent; - private MongoDatabase mongoDatabase; - @BeforeEach - void beforeEach(){ - table = mock(TapTable.class); - when(table.getName()).thenReturn("test"); - tapCreateIndexEvent = mock(TapCreateIndexEvent.class); - mongoDatabase = mock(MongoDatabase.class); - ReflectionTestUtils.setField(mongodbConnector,"mongoDatabase",mongoDatabase); - } - @Test - void testCreateIndexEventWithEx(){ - List indexList = new ArrayList<>(); - TapIndex tapIndex = new TapIndex(); - tapIndex.setName("index"); - List indexFields = new ArrayList<>(); - indexFields.add(mock(TapIndexField.class)); - tapIndex.setIndexFields(indexFields); - indexList.add(tapIndex); - when(tapCreateIndexEvent.getIndexList()).thenReturn(indexList); - when(mongoDatabase.getCollection("test")).thenThrow(RuntimeException.class); - when(mongoConfig.getUri()).thenReturn("mongodb://127.0.0.1:27017/test"); - doCallRealMethod().when(mongodbConnector).createIndex(connectorContext,table,tapCreateIndexEvent); - assertThrows(RuntimeException.class,()->mongodbConnector.createIndex(connectorContext,table,tapCreateIndexEvent)); - } - } - @Nested - class createStreamReader{ - - @Test - void testCreateStreamReaderWithEx(){ - when(mongoConfig.getDatabase()).thenReturn("database"); - when(mongoClient.getDatabase("database")).thenThrow(RuntimeException.class); - doCallRealMethod().when(mongodbConnector).createStreamReader(); - assertThrows(RuntimeException.class,()->mongodbConnector.createStreamReader()); - } - } - @Nested - class getTableNames{ - private int batchSize; - private Consumer> listConsumer; - @Test - void testGetTableNamesWithEx() throws Throwable { - when(mongoConfig.getDatabase()).thenReturn("database"); - when(mongoClient.getDatabase("database")).thenThrow(RuntimeException.class); - doCallRealMethod().when(mongodbConnector).getTableNames(connectorContext,batchSize,listConsumer); - assertThrows(RuntimeException.class,()->mongodbConnector.getTableNames(connectorContext,batchSize,listConsumer)); - } - } - @Nested - class onStop{ - @Test - void testOnStopWithEx() throws Throwable { - doThrow(RuntimeException.class).when(mongoClient).close(); - doCallRealMethod().when(mongodbConnector).onStop(connectorContext); - assertThrows(RuntimeException.class,()->mongodbConnector.onStop(connectorContext)); - } - } - @Nested - class GetTableInfoTest{ - private TapConnectionContext tapConnectorContext; - private String tableName; - @BeforeEach - void beforeEach(){ - tapConnectorContext = mock(TapConnectorContext.class); - tableName = "table"; - } - @Test - void testGetTableInfoWithEx() throws Throwable { - when(mongoConfig.getDatabase()).thenReturn("test"); - when(mongoClient.getDatabase("test")).thenThrow(RuntimeException.class); - doCallRealMethod().when(mongodbConnector).getTableInfo(tapConnectorContext,tableName); - assertThrows(RuntimeException.class, ()->mongodbConnector.getTableInfo(tapConnectorContext,tableName)); - } - - @Test - void testGetTableInfoSizeOverInteger() throws Throwable { - - when(mongoConfig.getDatabase()).thenReturn("test"); - MongoDatabase mongoDatabase = mock(MongoDatabase.class); - when(mongoClient.getDatabase("test")).thenReturn(mongoDatabase); - Document collStats = new Document(); - Integer count = 26888601; - Double size = 366907867698976.0; - collStats.put("count",count); - collStats.put("size",size); - when(mongoDatabase.runCommand(new Document("collStats", tableName))).thenReturn(collStats); - doCallRealMethod().when(mongodbConnector).getTableInfo(tapConnectorContext,tableName); - TableInfo actualData = mongodbConnector.getTableInfo(tapConnectorContext, tableName); - Assertions.assertTrue(actualData.getNumOfRows().longValue() == count); - Assertions.assertTrue(actualData.getStorageSize().longValue() == size); - - } - } - - @Nested - @DisplayName("Method queryIndexes test") - class QueryIndexesTest { - - private TapConnectorContext tapConnectorContext; - private TapTable tapTable; - - @BeforeEach - void setUp() { - tapConnectorContext = mock(TapConnectorContext.class); - tapTable = new TapTable("test"); - doCallRealMethod().when(mongodbConnector).queryIndexes(any(), any(), any()); - } - - @Test - @DisplayName("test main process") - void test1() { - List listIndexes = new ArrayList<>(); - listIndexes.add(new Document("name", "_id_").append("key", new Document("_id", 1))); - listIndexes.add(new Document("name", "uid_1").append("key", new Document("uid", 1))); - listIndexes.add(new Document("name", "sub.sid1_1_sub.sid2_-1").append("key", new Document("sub.sid1", 1).append("sub.sid2", -1))); - Iterator iterator = listIndexes.iterator(); - MongoCursor mongoCursor = mock(MongoCursor.class); - when(mongoCursor.next()).thenAnswer(invocationOnMock -> iterator.next()); - when(mongoCursor.hasNext()).thenAnswer(invocationOnMock -> iterator.hasNext()); - ListIndexesIterable listIndexesIterable = mock(ListIndexesIterable.class); - when(listIndexesIterable.iterator()).thenReturn(mongoCursor); - doCallRealMethod().when(listIndexesIterable).forEach(any(Consumer.class)); - MongoCollection mongoCollection = mock(MongoCollection.class); - when(mongoCollection.listIndexes()).thenReturn(listIndexesIterable); - MongoDatabase mongoDatabase = mock(MongoDatabase.class); - when(mongoDatabase.getCollection(tapTable.getId())).thenReturn(mongoCollection); - ReflectionTestUtils.setField(mongodbConnector, "mongoDatabase", mongoDatabase); - mongodbConnector.queryIndexes(tapConnectorContext, tapTable, indexes -> { - assertEquals(3, indexes.size()); - TapIndex tapIndex = indexes.get(0); - assertEquals(listIndexes.get(0).getString("name"), tapIndex.getName()); - List indexFields = tapIndex.getIndexFields(); - assertEquals(1, indexFields.size()); - assertEquals("_id", indexFields.get(0).getName()); - assertTrue(indexFields.get(0).getFieldAsc()); - tapIndex = indexes.get(2); - assertEquals(listIndexes.get(2).getString("name"), tapIndex.getName()); - indexFields = tapIndex.getIndexFields(); - assertEquals("sub.sid2", indexFields.get(1).getName()); - assertFalse(indexFields.get(1).getFieldAsc()); - }); - } - - @Test - @DisplayName("test other index key value: text") - void test2() { - List listIndexes = new ArrayList<>(); - listIndexes.add(new Document("name", "content_text").append("key", new Document("content", "text"))); - Iterator iterator = listIndexes.iterator(); - MongoCursor mongoCursor = mock(MongoCursor.class); - when(mongoCursor.next()).thenAnswer(invocationOnMock -> iterator.next()); - when(mongoCursor.hasNext()).thenAnswer(invocationOnMock -> iterator.hasNext()); - ListIndexesIterable listIndexesIterable = mock(ListIndexesIterable.class); - when(listIndexesIterable.iterator()).thenReturn(mongoCursor); - doCallRealMethod().when(listIndexesIterable).forEach(any(Consumer.class)); - MongoCollection mongoCollection = mock(MongoCollection.class); - when(mongoCollection.listIndexes()).thenReturn(listIndexesIterable); - MongoDatabase mongoDatabase = mock(MongoDatabase.class); - when(mongoDatabase.getCollection(tapTable.getId())).thenReturn(mongoCollection); - ReflectionTestUtils.setField(mongodbConnector, "mongoDatabase", mongoDatabase); - mongodbConnector.queryIndexes(tapConnectorContext, tapTable, indexes -> { - assertEquals(1, indexes.size()); - TapIndex tapIndex = indexes.get(0); - assertEquals(listIndexes.get(0).getString("name"), tapIndex.getName()); - List indexFields = tapIndex.getIndexFields(); - assertEquals(1, indexFields.size()); - assertEquals("content", indexFields.get(0).getName()); - assertTrue(indexFields.get(0).getFieldAsc()); - }); - } - - @Test - @DisplayName("test unique index") - void test3() { - List listIndexes = new ArrayList<>(); - listIndexes.add(new Document("name", "uid_1").append("key", new Document("uid", 1)).append("unique", true)); - listIndexes.add(new Document("name", "uid1_1").append("key", new Document("uid1", 1))); - Iterator iterator = listIndexes.iterator(); - MongoCursor mongoCursor = mock(MongoCursor.class); - when(mongoCursor.next()).thenAnswer(invocationOnMock -> iterator.next()); - when(mongoCursor.hasNext()).thenAnswer(invocationOnMock -> iterator.hasNext()); - ListIndexesIterable listIndexesIterable = mock(ListIndexesIterable.class); - when(listIndexesIterable.iterator()).thenReturn(mongoCursor); - doCallRealMethod().when(listIndexesIterable).forEach(any(Consumer.class)); - MongoCollection mongoCollection = mock(MongoCollection.class); - when(mongoCollection.listIndexes()).thenReturn(listIndexesIterable); - MongoDatabase mongoDatabase = mock(MongoDatabase.class); - when(mongoDatabase.getCollection(tapTable.getId())).thenReturn(mongoCollection); - ReflectionTestUtils.setField(mongodbConnector, "mongoDatabase", mongoDatabase); - mongodbConnector.queryIndexes(tapConnectorContext, tapTable, indexes -> { - assertEquals(2, indexes.size()); - assertTrue(indexes.get(0).getUnique()); - assertFalse(indexes.get(1).getUnique()); - }); - } - } -} \ No newline at end of file diff --git a/connectors/mongodb-lower-connector/pom.xml b/connectors/mongodb-lower-connector/pom.xml index 4f52b7ff9..97f7fb1fb 100644 --- a/connectors/mongodb-lower-connector/pom.xml +++ b/connectors/mongodb-lower-connector/pom.xml @@ -13,7 +13,7 @@ 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/opengauss-connector/pom.xml b/connectors/opengauss-connector/pom.xml index f709b1313..8357d757b 100644 --- a/connectors/opengauss-connector/pom.xml +++ b/connectors/opengauss-connector/pom.xml @@ -15,7 +15,7 @@ UTF-8 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/polar-db-postgres-connector/pom.xml b/connectors/polar-db-postgres-connector/pom.xml index 803b5d737..d40830ef9 100644 --- a/connectors/polar-db-postgres-connector/pom.xml +++ b/connectors/polar-db-postgres-connector/pom.xml @@ -23,7 +23,7 @@ 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/postgres-connector/pom.xml b/connectors/postgres-connector/pom.xml index 742ca1dc4..c37f0041a 100644 --- a/connectors/postgres-connector/pom.xml +++ b/connectors/postgres-connector/pom.xml @@ -16,7 +16,7 @@ 8 42.7.5 - 2.0.4-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java index 9086fcaf4..a32657efe 100644 --- a/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java +++ b/connectors/postgres-connector/src/main/java/io/tapdata/connector/postgres/PostgresConnector.java @@ -40,6 +40,8 @@ import io.tapdata.kit.*; import io.tapdata.pdk.apis.annotations.TapConnectorClass; import io.tapdata.pdk.apis.consumer.StreamReadConsumer; +import io.tapdata.pdk.apis.consumer.StreamReadOneByOneConsumer; +import io.tapdata.pdk.apis.consumer.TapStreamReadConsumer; import io.tapdata.pdk.apis.context.TapConnectionContext; import io.tapdata.pdk.apis.context.TapConnectorContext; import io.tapdata.pdk.apis.entity.*; @@ -148,6 +150,7 @@ public void registerCapabilities(ConnectorFunctions connectorFunctions, TapCodec connectorFunctions.supportBatchCount(this::batchCount); connectorFunctions.supportBatchRead(this::batchReadWithoutOffset); connectorFunctions.supportStreamRead(this::streamRead); + connectorFunctions.supportOneByOneStreamRead(this::streamReadOneByOne); connectorFunctions.supportTimestampToStreamOffset(this::timestampToStreamOffset); // query connectorFunctions.supportQueryByFilter(this::queryByFilter); @@ -604,27 +607,31 @@ protected void afterInitialSync(TapConnectorContext connectorContext, TapTable t }); } - private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, int recordSize, StreamReadConsumer consumer) throws Throwable { + private void streamReadOneByOne(TapConnectorContext nodeContext, List tableList, Object offsetState, StreamReadOneByOneConsumer consumer) throws Throwable { + streamRead(nodeContext, tableList, offsetState, consumer.getBatchSize(), consumer); + } + + private void streamRead(TapConnectorContext nodeContext, List tableList, Object offsetState, int recordSize, TapStreamReadConsumer consumer) throws Throwable { if ("walminer".equals(postgresConfig.getLogPluginName())) { if (EmptyKit.isNotEmpty(postgresConfig.getPgtoHost())) { new WalPgtoMiner(postgresJdbcContext, firstConnectorId, tapLogger) .watch(tableList, nodeContext.getTableMap()) .offset(offsetState) - .registerConsumer(consumer, recordSize) + .registerCdcConsumer(consumer, recordSize) .startMiner(this::isAlive); } else { new WalLogMinerV2(postgresJdbcContext, tapLogger) .watch(tableList, nodeContext.getTableMap()) .withWalLogDirectory(getWalDirectory()) .offset(offsetState) - .registerConsumer(consumer, recordSize) + .registerCdcConsumer(consumer, recordSize) .startMiner(this::isAlive); } } else { cdcRunner = new PostgresCdcRunner(postgresJdbcContext, nodeContext); testReplicateIdentity(nodeContext.getTableMap()); buildSlot(nodeContext, true); - cdcRunner.useSlot(slotName.toString()).watch(tableList).offset(offsetState).registerConsumer(consumer, recordSize); + cdcRunner.useSlot(slotName.toString()).watch(tableList).offset(offsetState).registerCdcConsumer(consumer, recordSize); cdcRunner.startCdcRunner(); if (EmptyKit.isNotNull(cdcRunner) && EmptyKit.isNotNull(cdcRunner.getThrowable().get())) { Throwable throwable = ErrorKit.getLastCause(cdcRunner.getThrowable().get()); @@ -684,21 +691,21 @@ private void streamReadMultiConnection(TapConnectorContext nodeContext, List 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT diff --git a/connectors/vastbase-connector/pom.xml b/connectors/vastbase-connector/pom.xml index e0dd296ef..76c6661d0 100644 --- a/connectors/vastbase-connector/pom.xml +++ b/connectors/vastbase-connector/pom.xml @@ -15,7 +15,7 @@ 8 - 2.0.0-SNAPSHOT + 2.0.5-SNAPSHOT 1.5.4.Final