diff --git a/consilens-cli/README.md b/consilens-cli/README.md index 3839318..69b305c 100644 --- a/consilens-cli/README.md +++ b/consilens-cli/README.md @@ -111,18 +111,20 @@ consilens [options] ```yaml source: type: mysql - url: jdbc:mysql://localhost:3306/source_db - username: user1 - password: password1 + connection: + url: jdbc:mysql://localhost:3306/source_db + username: user1 + password: password1 resource: type: table name: orders target: type: postgresql - url: jdbc:postgresql://localhost:5432/target_db?currentSchema=public - username: user2 - password: password2 + connection: + url: jdbc:postgresql://localhost:5432/target_db?currentSchema=public + username: user2 + password: password2 resource: type: table name: orders @@ -154,18 +156,20 @@ result: ```yaml source: type: mysql - url: jdbc:mysql://localhost:3306/database1?useSSL=false&serverTimezone=UTC - username: user1 - password: password1 + connection: + url: jdbc:mysql://localhost:3306/database1?useSSL=false&serverTimezone=UTC + username: user1 + password: password1 resource: type: table name: users target: type: postgresql - url: jdbc:postgresql://localhost:5432/database2?currentSchema=public&ssl=false&ApplicationName=consilens - username: user2 - password: password2 + connection: + url: jdbc:postgresql://localhost:5432/database2?currentSchema=public&ssl=false&ApplicationName=consilens + username: user2 + password: password2 resource: type: sql path: | @@ -194,6 +198,7 @@ comparison: - updated_at filters: source: "status = 'active'" + target: "status = 'active'" strategy: mode: checksum diff --git a/consilens-cli/src/main/java/com/consilens/cli/command/ConfigValidateCommand.java b/consilens-cli/src/main/java/com/consilens/cli/command/ConfigValidateCommand.java index 6c7ed95..ba64c11 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/command/ConfigValidateCommand.java +++ b/consilens-cli/src/main/java/com/consilens/cli/command/ConfigValidateCommand.java @@ -98,9 +98,9 @@ private void printVerboseDetails(ConfigurationManager configurationManager, Stri System.out.println(" Source fields: " + nvl(config.getComparison().getFields().getSource())); System.out.println(" Target fields: " + nvl(config.getComparison().getFields().getTarget())); } - if (config.getComparison().getWhere() != null) { - System.out.println(" Source where : " + nvl(config.getComparison().getWhere().getSource())); - System.out.println(" Target where : " + nvl(config.getComparison().getWhere().getTarget())); + if (config.getComparison().getFilters() != null) { + System.out.println(" Source filters: " + nvl(config.getComparison().getFilters().getSource())); + System.out.println(" Target filters: " + nvl(config.getComparison().getFilters().getTarget())); } } @@ -208,9 +208,9 @@ private String describeResource(com.consilens.cli.model.ConnectionConfig connect return "(not set)"; } com.consilens.cli.model.ConnectionConfig.ResourceConfig resource = connectionConfig.getResource(); - String location = resource.getName() != null && !resource.getName().isBlank() - ? resource.getName() - : resource.getPath(); + String location = "sql".equalsIgnoreCase(resource.getType()) + ? resource.getPath() + : resource.getName(); return nvl(resource.getType()) + ":" + nvl(location); } } diff --git a/consilens-cli/src/main/java/com/consilens/cli/command/DiffCommand.java b/consilens-cli/src/main/java/com/consilens/cli/command/DiffCommand.java index f340c65..1acc5cb 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/command/DiffCommand.java +++ b/consilens-cli/src/main/java/com/consilens/cli/command/DiffCommand.java @@ -4,7 +4,6 @@ import com.consilens.cli.model.CliConfiguration; import com.consilens.cli.model.CliDiffResult; import com.consilens.cli.service.DiffService; -import com.consilens.cli.service.RealtimeCompareRunner; import lombok.extern.slf4j.Slf4j; import picocli.CommandLine.Command; @@ -66,9 +65,6 @@ public void run() { if (dryRun) { log.info("Performing dry run (validation only)..."); result = diffService.performDryRun(config); - } else if (config.getRealtime() != null && Boolean.TRUE.equals(config.getRealtime().getEnabled())) { - log.info("Starting realtime long-running compare loop..."); - result = new RealtimeCompareRunner().runLoop(config); } else { log.info("Starting diff operation..."); log.info("This may take a while depending on table sizes and strategy chosen."); @@ -124,9 +120,9 @@ private String resourceDisplay(com.consilens.cli.model.ConnectionConfig connecti return "(not set)"; } com.consilens.cli.model.ConnectionConfig.ResourceConfig resource = connectionConfig.getResource(); - String location = resource.getName() != null && !resource.getName().isBlank() - ? resource.getName() - : resource.getPath(); + String location = "sql".equalsIgnoreCase(resource.getType()) + ? resource.getPath() + : resource.getName(); return resource.getType() + ":" + location; } } diff --git a/consilens-cli/src/main/java/com/consilens/cli/model/CheckpointStoreConfig.java b/consilens-cli/src/main/java/com/consilens/cli/model/CheckpointStoreConfig.java deleted file mode 100644 index 5552859..0000000 --- a/consilens-cli/src/main/java/com/consilens/cli/model/CheckpointStoreConfig.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.consilens.cli.model; - -import com.fasterxml.jackson.annotation.JsonAnyGetter; -import com.fasterxml.jackson.annotation.JsonAnySetter; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.LinkedHashMap; -import java.util.Map; - -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -@JsonInclude(JsonInclude.Include.NON_DEFAULT) -public class CheckpointStoreConfig { - - @JsonProperty("type") - private String type; - - @JsonProperty("name") - private String name; - - @JsonIgnore - @Builder.Default - private Map options = new LinkedHashMap<>(); - - @JsonAnySetter - public void putOption(String key, Object value) { - options.put(key, value); - } - - @JsonAnyGetter - public Map getOptions() { - return options; - } -} diff --git a/consilens-cli/src/main/java/com/consilens/cli/model/CliConfiguration.java b/consilens-cli/src/main/java/com/consilens/cli/model/CliConfiguration.java index a7b45c5..168190e 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/model/CliConfiguration.java +++ b/consilens-cli/src/main/java/com/consilens/cli/model/CliConfiguration.java @@ -58,9 +58,6 @@ public class CliConfiguration { @JsonProperty("normalization") private NormalizationConfig normalization; - @JsonProperty("realtime") - private RealtimeConfig realtime; - @JsonProperty("result") private ResultConfig result; @@ -271,10 +268,6 @@ public void validate() throws ValidationException { if (normalization != null) { normalization.validate(); } - if (realtime != null) { - realtime.validate("realtime"); - } - validateResultSinks(); } diff --git a/consilens-cli/src/main/java/com/consilens/cli/model/CompareMappingConfig.java b/consilens-cli/src/main/java/com/consilens/cli/model/CompareMappingConfig.java index b9cf21c..8fccae9 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/model/CompareMappingConfig.java +++ b/consilens-cli/src/main/java/com/consilens/cli/model/CompareMappingConfig.java @@ -17,9 +17,6 @@ public class CompareMappingConfig { @JsonProperty("name") private String name; - @JsonProperty("type") - private String type; - @JsonProperty("source") private FieldExpressionConfig source; @@ -31,7 +28,4 @@ public class CompareMappingConfig { @JsonProperty("compare") private Boolean compare; - - @JsonProperty("ordinal") - private Integer ordinal; } diff --git a/consilens-cli/src/main/java/com/consilens/cli/model/ComparisonConfig.java b/consilens-cli/src/main/java/com/consilens/cli/model/ComparisonConfig.java index 3f9de7c..fd6627d 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/model/ComparisonConfig.java +++ b/consilens-cli/src/main/java/com/consilens/cli/model/ComparisonConfig.java @@ -2,7 +2,6 @@ import com.consilens.core.validation.ValidationException; import com.consilens.core.validation.ValidationFramework; -import com.fasterxml.jackson.annotation.JsonAlias; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; @@ -42,12 +41,8 @@ public class ComparisonConfig { private List extraColumns; @JsonProperty("filters") - @JsonAlias("where") private StringPairConfig filters; - @JsonProperty("updateColumn") - private String updateColumn; - public void validate() throws ValidationException { ValidationFramework.forContext("comparison") .validate(keys, "comparison.keys", Objects::nonNull, "comparison.keys 配置不能为空") @@ -87,14 +82,6 @@ public void validate() throws ValidationException { } } - public StringPairConfig getWhere() { - return filters; - } - - public void setWhere(StringPairConfig where) { - this.filters = where; - } - private void validateMappings() { if (mappings == null || mappings.isEmpty()) { return; diff --git a/consilens-cli/src/main/java/com/consilens/cli/model/ConnectionConfig.java b/consilens-cli/src/main/java/com/consilens/cli/model/ConnectionConfig.java index dd995de..967574c 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/model/ConnectionConfig.java +++ b/consilens-cli/src/main/java/com/consilens/cli/model/ConnectionConfig.java @@ -33,15 +33,6 @@ public class ConnectionConfig { @JsonProperty("name") private String name; - @JsonProperty("url") - private String url; - - @JsonProperty("username") - private String username; - - @JsonProperty("password") - private String password; - @JsonProperty("connection") private ConnectorConnectionProperties connection; @@ -57,30 +48,21 @@ public Map toConnectionMap() { if (connection != null) { result.putAll(connection.asMap()); } - putIfPresent(result, "url", url); - putIfPresent(result, "username", username); - putIfPresent(result, "password", password); return result; } + @JsonIgnore public String getUrl() { - if (url != null && !url.trim().isEmpty()) { - return url; - } return connection != null ? connection.getUrl() : null; } + @JsonIgnore public String getUsername() { - if (username != null && !username.trim().isEmpty()) { - return username; - } return connection != null ? connection.getUsername() : null; } + @JsonIgnore public String getPassword() { - if (password != null && !password.trim().isEmpty()) { - return password; - } return connection != null ? connection.getPassword() : null; } @@ -94,9 +76,9 @@ public void validate(String fieldName) throws ValidationException { if (requiresJdbcValidation()) { ValidationFramework.forContext(fieldName) - .notEmpty(getUrl(), fieldName + ".url") - .validJdbcUrl(getUrl(), fieldName + ".url") - .notEmpty(getUsername(), fieldName + ".username") + .notEmpty(getUrl(), fieldName + ".connection.url") + .validJdbcUrl(getUrl(), fieldName + ".connection.url") + .notEmpty(getUsername(), fieldName + ".connection.username") .throwIfInvalid(); return; } @@ -131,12 +113,6 @@ private boolean requiresJdbcValidation() { } } - private void putIfPresent(Map result, String key, String value) { - if (value != null && !value.trim().isEmpty()) { - result.put(key, value); - } - } - @Data @Builder @NoArgsConstructor @@ -222,9 +198,13 @@ public void validate(String fieldName) throws ValidationException { String normalizedType = type.trim().toLowerCase(Locale.ROOT); switch (normalizedType) { case "table": - if (isBlank(name) && isBlank(path)) { + if (isBlank(name)) { + throw ValidationException.simple("CONFIGURATION_VALIDATION", + fieldName + " type=table 时必须配置 name"); + } + if (!isBlank(path)) { throw ValidationException.simple("CONFIGURATION_VALIDATION", - fieldName + " type=table 时必须配置 name 或 path"); + fieldName + " type=table 时不应配置 path"); } break; case "sql": diff --git a/consilens-cli/src/main/java/com/consilens/cli/model/RealtimeConfig.java b/consilens-cli/src/main/java/com/consilens/cli/model/RealtimeConfig.java deleted file mode 100644 index 6ea8536..0000000 --- a/consilens-cli/src/main/java/com/consilens/cli/model/RealtimeConfig.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.consilens.cli.model; - -import com.consilens.core.validation.ValidationException; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.Duration; - -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -@JsonInclude(JsonInclude.Include.NON_DEFAULT) -public class RealtimeConfig { - - @JsonProperty("enabled") - private Boolean enabled; - - @JsonProperty("updateColumns") - private StringPairConfig updateColumns; - - @JsonProperty("watermarkDelay") - private String watermarkDelay; - - @JsonProperty("windowSize") - private String windowSize; - - @JsonProperty("overlap") - private String overlap; - - @JsonProperty("checkpointStore") - private CheckpointStoreConfig checkpointStore; - - @JsonProperty("interval") - private String interval; - - public void validate(String fieldName) { - if (!Boolean.TRUE.equals(enabled)) { - return; - } - if (updateColumns == null) { - throw ValidationException.simple("CONFIGURATION_VALIDATION", fieldName + ".updateColumns 配置不能为空"); - } - updateColumns.validate(fieldName + ".updateColumns"); - parseDuration(fieldName + ".watermarkDelay", watermarkDelay); - parseDuration(fieldName + ".windowSize", windowSize); - parseDuration(fieldName + ".overlap", overlap); - if (interval != null && !interval.isBlank()) { - parseDuration(fieldName + ".interval", interval); - } - if (checkpointStore == null || checkpointStore.getType() == null || checkpointStore.getType().isBlank()) { - throw ValidationException.simple("CONFIGURATION_VALIDATION", - fieldName + ".checkpointStore.type 不能为空"); - } - if ("table".equalsIgnoreCase(checkpointStore.getType()) - && (checkpointStore.getName() == null || checkpointStore.getName().isBlank())) { - throw ValidationException.simple("CONFIGURATION_VALIDATION", - fieldName + ".checkpointStore.name 不能为空"); - } - } - - private void parseDuration(String fieldName, String value) { - if (value == null || value.isBlank()) { - throw ValidationException.simple("CONFIGURATION_VALIDATION", fieldName + " 不能为空"); - } - try { - Duration.parse(value.trim()); - } catch (Exception e) { - throw ValidationException.simple("CONFIGURATION_VALIDATION", fieldName + " 不是合法的 ISO-8601 duration"); - } - } -} diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/CheckpointStore.java b/consilens-cli/src/main/java/com/consilens/cli/service/CheckpointStore.java deleted file mode 100644 index 1331aa7..0000000 --- a/consilens-cli/src/main/java/com/consilens/cli/service/CheckpointStore.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.consilens.cli.service; - -import java.time.Instant; -import java.util.Optional; - -public interface CheckpointStore extends AutoCloseable { - - Optional load(String taskId) throws Exception; - - default boolean tryMarkRunning(String taskId, Instant start, Instant end, String owner, Instant leaseUntil) throws Exception { - markRunning(taskId, start, end); - return true; - } - - void markRunning(String taskId, Instant start, Instant end) throws Exception; - - void markSucceeded(String taskId, Instant watermark, Instant start, Instant end) throws Exception; - - void markFailed(String taskId, Instant start, Instant end, Throwable error) throws Exception; - - @Override - default void close() throws Exception { - } -} diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/CompareCheckpoint.java b/consilens-cli/src/main/java/com/consilens/cli/service/CompareCheckpoint.java deleted file mode 100644 index 2acf8a1..0000000 --- a/consilens-cli/src/main/java/com/consilens/cli/service/CompareCheckpoint.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.consilens.cli.service; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.Instant; - -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class CompareCheckpoint { - - private String taskId; - - private Instant watermark; - - private Instant lastStart; - - private Instant lastEnd; - - private String status; - - private String owner; - - private Instant leaseUntil; -} diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/CompareRequestFactory.java b/consilens-cli/src/main/java/com/consilens/cli/service/CompareRequestFactory.java index 40c7497..f8bf096 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/service/CompareRequestFactory.java +++ b/consilens-cli/src/main/java/com/consilens/cli/service/CompareRequestFactory.java @@ -1,12 +1,10 @@ package com.consilens.cli.service; -import com.consilens.cli.model.CheckpointStoreConfig; import com.consilens.cli.model.CliConfiguration; import com.consilens.cli.model.CompareMappingConfig; import com.consilens.cli.model.ComparisonConfig; import com.consilens.cli.model.ConnectionConfig; import com.consilens.cli.model.FieldExpressionConfig; -import com.consilens.cli.model.RealtimeConfig; import com.consilens.cli.model.normalization.NormalizationConfig; import com.consilens.cli.model.normalization.TypeNormalizationRule; import com.consilens.connector.api.config.ConnectorConfig; @@ -21,16 +19,13 @@ import com.consilens.connector.api.planner.ComparePlanTypes; import com.consilens.connector.api.planner.CompareRequest; import com.consilens.connector.api.planner.CompareStrategyPreference; -import com.consilens.connector.api.planner.RealtimeSpec; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.UUID; public class CompareRequestFactory { @@ -53,7 +48,6 @@ public CompareRequest create(CliConfiguration config) { .targetComparisons(buildResult.targetComparisons) .sourceFilter(buildResult.sourceFilter) .targetFilter(buildResult.targetFilter) - .realtimeSpec(toRealtimeSpec(config)) .normalizationSpec(toNormalizationSpec(config.getNormalization())) .strategyPreference(toStrategyPreference(config)) .executionOptions(toExecutionOptions(config)) @@ -188,7 +182,7 @@ private ConnectorConfig compileMappingsToSqlResource(ConnectorConfig config, String sql = buildMappedSql(resource, comparison, sourceSide); ResourceLocator sqlResource = ResourceLocator.builder() .type("sql") - .name(resource.getName() != null ? resource.getName() : resource.getPath()) + .name(resource.getName()) .path(sql) .options(resource.getOptions()) .build(); @@ -255,9 +249,6 @@ private String toSqlExpression(FieldExpressionConfig expression) { } private String tableRef(ResourceLocator resource) { - if (resource.getPath() != null && !resource.getPath().trim().isEmpty()) { - return resource.getPath().trim(); - } return resource.getName().trim(); } @@ -326,46 +317,6 @@ private PredicateSpec toPredicateSpec(String expression) { .build(); } - private RealtimeSpec toRealtimeSpec(CliConfiguration config) { - RealtimeConfig realtime = config.getRealtime(); - if (realtime == null) { - return null; - } - CheckpointStoreConfig checkpointStore = realtime.getCheckpointStore(); - return RealtimeSpec.builder() - .taskId(buildTaskId(config)) - .enabled(Boolean.TRUE.equals(realtime.getEnabled())) - .watermarkDelay(realtime.getWatermarkDelay()) - .windowSize(realtime.getWindowSize()) - .overlap(realtime.getOverlap()) - .checkpointStoreType(checkpointStore != null ? checkpointStore.getType() : null) - .checkpointStoreName(checkpointStore != null ? checkpointStore.getName() : null) - .build(); - } - - private String buildTaskId(CliConfiguration config) { - String raw = String.join("|", - String.valueOf(config.getSource() != null ? config.getSource().getName() : null), - String.valueOf(config.getTarget() != null ? config.getTarget().getName() : null), - String.valueOf(config.getSource() != null && config.getSource().getResource() != null - ? resourceIdentity(config.getSource().getResource()) - : null), - String.valueOf(config.getTarget() != null && config.getTarget().getResource() != null - ? resourceIdentity(config.getTarget().getResource()) - : null)); - return UUID.nameUUIDFromBytes(raw.getBytes(StandardCharsets.UTF_8)).toString(); - } - - private String resourceIdentity(com.consilens.cli.model.ConnectionConfig.ResourceConfig resource) { - if (resource == null) { - return null; - } - if (resource.getName() != null && !resource.getName().isBlank()) { - return resource.getName(); - } - return resource.getPath(); - } - private CompareStrategyPreference toStrategyPreference(CliConfiguration config) { List preferredPlans; boolean allowFallback; diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/ConnectorProbeService.java b/consilens-cli/src/main/java/com/consilens/cli/service/ConnectorProbeService.java index 6a49d79..a2612e5 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/service/ConnectorProbeService.java +++ b/consilens-cli/src/main/java/com/consilens/cli/service/ConnectorProbeService.java @@ -7,7 +7,6 @@ import com.consilens.connector.api.model.KeySpec; import com.consilens.connector.api.model.PredicateSpec; import com.consilens.connector.api.model.SchemaDescriptor; -import com.consilens.connector.api.model.UpdateWindow; import com.consilens.connector.api.planner.CompareSegment; import com.consilens.connector.api.record.CloseableIterator; import com.consilens.connector.api.spi.ConnectorAdapter; @@ -37,8 +36,7 @@ public long countRows(ConnectorConfig config, KeySpec keySpec, ComparisonSpec comparisons, PredicateSpec filter, - String side, - UpdateWindow updateWindow) throws Exception { + String side) throws Exception { try (ConnectorAdapter adapter = connectorRegistry.create(config); com.consilens.connector.api.dataset.DatasetHandle dataset = adapter.openDataset(config.getResource(), config.getReadOptions())) { SchemaDescriptor schema = dataset.getSchema(); @@ -50,7 +48,6 @@ public long countRows(ConnectorConfig config, .filter(filter) .schema(schema) .side(side) - .updateWindow(updateWindow) .build(); if (dataset.getHashProvider().isPresent()) { diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/DiffService.java b/consilens-cli/src/main/java/com/consilens/cli/service/DiffService.java index a2e4297..778c3c5 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/service/DiffService.java +++ b/consilens-cli/src/main/java/com/consilens/cli/service/DiffService.java @@ -294,10 +294,7 @@ private boolean isTableResource(ConnectionConfig connectionConfig) { } private TablePath resolveTablePath(ConnectionConfig.ResourceConfig resource) { - String value = resource.getName() != null && !resource.getName().isBlank() - ? resource.getName() - : resource.getPath(); - return value != null ? TablePath.of(value) : null; + return resource != null && resource.getName() != null ? TablePath.of(resource.getName()) : null; } private String resourceDisplayName(ConnectionConfig connectionConfig) { @@ -305,8 +302,8 @@ private String resourceDisplayName(ConnectionConfig connectionConfig) { return ""; } ConnectionConfig.ResourceConfig resource = connectionConfig.getResource(); - if (resource.getName() != null && !resource.getName().isBlank()) { - return resource.getName(); + if ("table".equalsIgnoreCase(resource.getType())) { + return resource.getName() != null ? resource.getName() : ""; } return resource.getPath() != null ? resource.getPath() : ""; } @@ -468,15 +465,13 @@ public CliDiffResult performDryRun(CliConfiguration config) throws Exception { request.getSourceKeySpec(), request.getSourceComparisons(), request.getSourceFilter(), - "source", - request.getRealtimeSpec() != null ? request.getRealtimeSpec().getSourceWindow() : null); + "source"); long targetRowCount = probeService.countRows( request.getTarget(), request.getTargetKeySpec(), request.getTargetComparisons(), request.getTargetFilter(), - "target", - request.getRealtimeSpec() != null ? request.getRealtimeSpec().getTargetWindow() : null); + "target"); log.info("Dry run completed - Source rows: {}, Target rows: {}", sourceRowCount, targetRowCount); diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/InMemoryCheckpointStore.java b/consilens-cli/src/main/java/com/consilens/cli/service/InMemoryCheckpointStore.java deleted file mode 100644 index 1f0c38a..0000000 --- a/consilens-cli/src/main/java/com/consilens/cli/service/InMemoryCheckpointStore.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.consilens.cli.service; - -import java.time.Instant; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; - -public class InMemoryCheckpointStore implements CheckpointStore { - - private final Map checkpoints = new ConcurrentHashMap<>(); - - @Override - public Optional load(String taskId) { - return Optional.ofNullable(checkpoints.get(taskId)); - } - - @Override - public boolean tryMarkRunning(String taskId, Instant start, Instant end, String owner, Instant leaseUntil) { - CompareCheckpoint existing = checkpoints.get(taskId); - if (existing != null - && "running".equalsIgnoreCase(existing.getStatus()) - && existing.getLeaseUntil() != null - && existing.getLeaseUntil().isAfter(Instant.now())) { - return false; - } - markRunning(taskId, start, end, owner, leaseUntil); - return true; - } - - @Override - public void markRunning(String taskId, Instant start, Instant end) { - markRunning(taskId, start, end, null, null); - } - - private void markRunning(String taskId, Instant start, Instant end, String owner, Instant leaseUntil) { - checkpoints.put(taskId, CompareCheckpoint.builder() - .taskId(taskId) - .watermark(checkpoints.containsKey(taskId) ? checkpoints.get(taskId).getWatermark() : null) - .lastStart(start) - .lastEnd(end) - .status("running") - .owner(owner) - .leaseUntil(leaseUntil) - .build()); - } - - @Override - public void markSucceeded(String taskId, Instant watermark, Instant start, Instant end) { - checkpoints.put(taskId, CompareCheckpoint.builder() - .taskId(taskId) - .watermark(watermark) - .lastStart(start) - .lastEnd(end) - .status("succeeded") - .build()); - } - - @Override - public void markFailed(String taskId, Instant start, Instant end, Throwable error) { - checkpoints.put(taskId, CompareCheckpoint.builder() - .taskId(taskId) - .watermark(checkpoints.containsKey(taskId) ? checkpoints.get(taskId).getWatermark() : null) - .lastStart(start) - .lastEnd(end) - .status("failed") - .build()); - } -} diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/JdbcCheckpointStore.java b/consilens-cli/src/main/java/com/consilens/cli/service/JdbcCheckpointStore.java deleted file mode 100644 index cb16324..0000000 --- a/consilens-cli/src/main/java/com/consilens/cli/service/JdbcCheckpointStore.java +++ /dev/null @@ -1,206 +0,0 @@ -package com.consilens.cli.service; - -import lombok.extern.slf4j.Slf4j; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.Optional; -import java.util.Properties; - -@Slf4j -public class JdbcCheckpointStore implements CheckpointStore { - - private final String jdbcUrl; - private final Properties properties; - private final String tableName; - - public JdbcCheckpointStore(String jdbcUrl, Properties properties, String tableName, String driverClassName) { - this.jdbcUrl = jdbcUrl; - this.properties = properties != null ? properties : new Properties(); - this.tableName = tableName; - if (driverClassName != null && !driverClassName.isBlank()) { - try { - Class.forName(driverClassName); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("Checkpoint store driver not found: " + driverClassName, e); - } - } - ensureTable(); - } - - @Override - public Optional load(String taskId) throws Exception { - String sql = "SELECT task_id, watermark, last_start, last_end, status, owner, lease_until FROM " + tableName + " WHERE task_id = ?"; - try (Connection connection = openConnection(); - PreparedStatement statement = connection.prepareStatement(sql)) { - statement.setString(1, taskId); - try (ResultSet resultSet = statement.executeQuery()) { - if (!resultSet.next()) { - return Optional.empty(); - } - return Optional.of(CompareCheckpoint.builder() - .taskId(resultSet.getString(1)) - .watermark(toInstant(resultSet.getTimestamp(2))) - .lastStart(toInstant(resultSet.getTimestamp(3))) - .lastEnd(toInstant(resultSet.getTimestamp(4))) - .status(resultSet.getString(5)) - .owner(resultSet.getString(6)) - .leaseUntil(toInstant(resultSet.getTimestamp(7))) - .build()); - } - } - } - - @Override - public boolean tryMarkRunning(String taskId, Instant start, Instant end, String owner, Instant leaseUntil) throws Exception { - Instant now = Instant.now(); - if (tryUpdateRunning(taskId, start, end, owner, leaseUntil, now)) { - return true; - } - try { - insert(taskId, null, start, end, "running", owner, leaseUntil, null); - return true; - } catch (Exception e) { - return tryUpdateRunning(taskId, start, end, owner, leaseUntil, now); - } - } - - @Override - public void markRunning(String taskId, Instant start, Instant end) throws Exception { - upsert(taskId, null, start, end, "running", null, null, null); - } - - @Override - public void markSucceeded(String taskId, Instant watermark, Instant start, Instant end) throws Exception { - upsert(taskId, watermark, start, end, "succeeded", null, null, null); - } - - @Override - public void markFailed(String taskId, Instant start, Instant end, Throwable error) throws Exception { - upsert(taskId, null, start, end, "failed", null, null, error != null ? error.getMessage() : null); - } - - private void ensureTable() { - String ddl = "CREATE TABLE IF NOT EXISTS " + tableName + " (" - + "task_id VARCHAR(128) PRIMARY KEY, " - + "watermark TIMESTAMP NULL, " - + "last_start TIMESTAMP NULL, " - + "last_end TIMESTAMP NULL, " - + "status VARCHAR(32) NOT NULL, " - + "owner VARCHAR(128) NULL, " - + "lease_until TIMESTAMP NULL, " - + "updated_at TIMESTAMP NOT NULL, " - + "attributes TEXT NULL)"; - try (Connection connection = openConnection(); - PreparedStatement statement = connection.prepareStatement(ddl)) { - statement.executeUpdate(); - addColumnIfMissing(connection, "owner VARCHAR(128) NULL"); - addColumnIfMissing(connection, "lease_until TIMESTAMP NULL"); - } catch (Exception e) { - throw new IllegalStateException("Failed to initialize checkpoint table " + tableName, e); - } - } - - private void addColumnIfMissing(Connection connection, String columnDefinition) { - try (PreparedStatement statement = connection.prepareStatement( - "ALTER TABLE " + tableName + " ADD COLUMN " + columnDefinition)) { - statement.executeUpdate(); - } catch (Exception ignored) { - // Column already exists or the database does not support this ALTER form. - } - } - - private void upsert(String taskId, - Instant watermark, - Instant start, - Instant end, - String status, - String owner, - Instant leaseUntil, - String attributes) throws Exception { - Optional existing = load(taskId); - if (existing.isPresent()) { - String sql = "UPDATE " + tableName - + " SET watermark = ?, last_start = ?, last_end = ?, status = ?, owner = ?, lease_until = ?, updated_at = ?, attributes = ? WHERE task_id = ?"; - try (Connection connection = openConnection(); - PreparedStatement statement = connection.prepareStatement(sql)) { - statement.setTimestamp(1, watermark != null ? Timestamp.from(watermark) - : existing.get().getWatermark() != null ? Timestamp.from(existing.get().getWatermark()) : null); - statement.setTimestamp(2, start != null ? Timestamp.from(start) : null); - statement.setTimestamp(3, end != null ? Timestamp.from(end) : null); - statement.setString(4, status); - statement.setString(5, owner); - statement.setTimestamp(6, leaseUntil != null ? Timestamp.from(leaseUntil) : null); - statement.setTimestamp(7, Timestamp.from(Instant.now())); - statement.setString(8, attributes); - statement.setString(9, taskId); - statement.executeUpdate(); - } - return; - } - - insert(taskId, watermark, start, end, status, owner, leaseUntil, attributes); - } - - private boolean tryUpdateRunning(String taskId, - Instant start, - Instant end, - String owner, - Instant leaseUntil, - Instant now) throws Exception { - String sql = "UPDATE " + tableName - + " SET last_start = ?, last_end = ?, status = ?, owner = ?, lease_until = ?, updated_at = ?, attributes = ? " - + "WHERE task_id = ? AND (status <> ? OR lease_until IS NULL OR lease_until <= ?)"; - try (Connection connection = openConnection(); - PreparedStatement statement = connection.prepareStatement(sql)) { - statement.setTimestamp(1, start != null ? Timestamp.from(start) : null); - statement.setTimestamp(2, end != null ? Timestamp.from(end) : null); - statement.setString(3, "running"); - statement.setString(4, owner); - statement.setTimestamp(5, leaseUntil != null ? Timestamp.from(leaseUntil) : null); - statement.setTimestamp(6, Timestamp.from(now)); - statement.setString(7, null); - statement.setString(8, taskId); - statement.setString(9, "running"); - statement.setTimestamp(10, Timestamp.from(now)); - return statement.executeUpdate() > 0; - } - } - - private void insert(String taskId, - Instant watermark, - Instant start, - Instant end, - String status, - String owner, - Instant leaseUntil, - String attributes) throws Exception { - String sql = "INSERT INTO " + tableName - + " (task_id, watermark, last_start, last_end, status, owner, lease_until, updated_at, attributes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"; - try (Connection connection = openConnection(); - PreparedStatement statement = connection.prepareStatement(sql)) { - statement.setString(1, taskId); - statement.setTimestamp(2, watermark != null ? Timestamp.from(watermark) : null); - statement.setTimestamp(3, start != null ? Timestamp.from(start) : null); - statement.setTimestamp(4, end != null ? Timestamp.from(end) : null); - statement.setString(5, status); - statement.setString(6, owner); - statement.setTimestamp(7, leaseUntil != null ? Timestamp.from(leaseUntil) : null); - statement.setTimestamp(8, Timestamp.from(Instant.now())); - statement.setString(9, attributes); - statement.executeUpdate(); - } - } - - private Connection openConnection() throws Exception { - return DriverManager.getConnection(jdbcUrl, properties); - } - - private Instant toInstant(Timestamp timestamp) { - return timestamp != null ? timestamp.toInstant() : null; - } -} diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/RealtimeCompareRunner.java b/consilens-cli/src/main/java/com/consilens/cli/service/RealtimeCompareRunner.java deleted file mode 100644 index fc300be..0000000 --- a/consilens-cli/src/main/java/com/consilens/cli/service/RealtimeCompareRunner.java +++ /dev/null @@ -1,232 +0,0 @@ -package com.consilens.cli.service; - -import com.consilens.cli.model.CheckpointStoreConfig; -import com.consilens.cli.model.CliConfiguration; -import com.consilens.cli.model.CliDiffResult; -import com.consilens.cli.model.ComparisonConfig; -import com.consilens.cli.model.StringPairConfig; -import com.consilens.cli.model.TableMetadata; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.UUID; - -@Slf4j -public class RealtimeCompareRunner { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final DateTimeFormatter SQL_UTC_FORMATTER = - DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC); - - private final DiffService diffService; - private final CompareRequestFactory requestFactory; - private final Clock clock; - private final CheckpointStoreFactory checkpointStoreFactory; - - public RealtimeCompareRunner() { - this(new DiffService(), new CompareRequestFactory(), Clock.systemUTC(), null); - } - - RealtimeCompareRunner(DiffService diffService, CompareRequestFactory requestFactory, Clock clock) { - this(diffService, requestFactory, clock, null); - } - - RealtimeCompareRunner(DiffService diffService, - CompareRequestFactory requestFactory, - Clock clock, - CheckpointStoreFactory checkpointStoreFactory) { - this.diffService = diffService; - this.requestFactory = requestFactory; - this.clock = clock; - this.checkpointStoreFactory = checkpointStoreFactory != null ? checkpointStoreFactory : this::createCheckpointStoreInternal; - } - - public CliDiffResult runLoop(CliConfiguration config) throws Exception { - if (config.getRealtime() == null || !Boolean.TRUE.equals(config.getRealtime().getEnabled())) { - throw new IllegalStateException("realtime.enabled must be true"); - } - Duration interval = config.getRealtime().getInterval() != null && !config.getRealtime().getInterval().isBlank() - ? Duration.parse(config.getRealtime().getInterval()) - : Duration.parse(config.getRealtime().getWindowSize()); - CliDiffResult lastResult = null; - try (CheckpointStore checkpointStore = checkpointStoreFactory.create(config)) { - while (!Thread.currentThread().isInterrupted()) { - try { - lastResult = runIteration(config, checkpointStore); - } catch (Exception e) { - log.error("Realtime compare iteration failed; next iteration will continue after interval", e); - } - try { - sleep(interval); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } - return lastResult != null ? lastResult : emptyResult(config); - } - - private CliDiffResult runIteration(CliConfiguration config, CheckpointStore checkpointStore) throws Exception { - if (config.getRealtime() == null || !Boolean.TRUE.equals(config.getRealtime().getEnabled())) { - throw new IllegalStateException("realtime.enabled must be true"); - } - - Duration watermarkDelay = Duration.parse(config.getRealtime().getWatermarkDelay()); - Duration windowSize = Duration.parse(config.getRealtime().getWindowSize()); - Duration overlap = Duration.parse(config.getRealtime().getOverlap()); - String taskId = requestFactory.create(config).getRealtimeSpec().getTaskId(); - String owner = UUID.randomUUID().toString(); - Instant leaseUntil = clock.instant().plus(maxDuration(windowSize, watermarkDelay)); - - Optional checkpoint = checkpointStore.load(taskId); - Instant safeEnd = clock.instant().minus(watermarkDelay); - Instant start = checkpoint.map(value -> value.getWatermark().minus(overlap)).orElse(safeEnd.minus(windowSize)); - Instant end = safeEnd; - if (!end.isAfter(start)) { - return emptyResult(config); - } - - if (!checkpointStore.tryMarkRunning(taskId, start, end, owner, leaseUntil)) { - return emptyResult(config); - } - try { - CliConfiguration effectiveConfig = cloneConfig(config); - applyWindow(effectiveConfig, start, end); - CliDiffResult result = diffService.performDiff(effectiveConfig); - checkpointStore.markSucceeded(taskId, end, start, end); - return result; - } catch (Exception e) { - checkpointStore.markFailed(taskId, start, end, e); - throw e; - } - } - - private CheckpointStore createCheckpointStoreInternal(CliConfiguration config) { - CheckpointStoreConfig checkpointStore = config.getRealtime().getCheckpointStore(); - if ("memory".equalsIgnoreCase(checkpointStore.getType())) { - return new InMemoryCheckpointStore(); - } - if (!"table".equalsIgnoreCase(checkpointStore.getType())) { - throw new IllegalStateException("Unsupported checkpointStore.type: " + checkpointStore.getType()); - } - Map options = checkpointStore.getOptions() != null - ? new LinkedHashMap<>(checkpointStore.getOptions()) - : new LinkedHashMap<>(); - String jdbcUrl = stringValue(options.remove("url")); - String username = stringValue(options.remove("username")); - String password = stringValue(options.remove("password")); - String driver = stringValue(options.remove("driver")); - if (jdbcUrl == null) { - jdbcUrl = config.getTarget().getUrl(); - } - if (username == null) { - username = config.getTarget().getUsername(); - } - if (password == null) { - password = config.getTarget().getPassword(); - } - Properties properties = new Properties(); - if (username != null) { - properties.setProperty("user", username); - } - if (password != null) { - properties.setProperty("password", password); - } - for (Map.Entry entry : options.entrySet()) { - if (entry.getValue() != null) { - properties.setProperty(entry.getKey(), String.valueOf(entry.getValue())); - } - } - return new JdbcCheckpointStore(jdbcUrl, properties, checkpointStore.getName(), driver); - } - - private void applyWindow(CliConfiguration config, Instant start, Instant end) { - ComparisonConfig comparison = config.getComparison(); - String sourceWindow = mergeWindowPredicate( - comparison.getFilters() != null ? comparison.getFilters().getSource() : null, - config.getRealtime().getUpdateColumns().getSource(), - start, - end); - String targetWindow = mergeWindowPredicate( - comparison.getFilters() != null ? comparison.getFilters().getTarget() : null, - config.getRealtime().getUpdateColumns().getTarget(), - start, - end); - comparison.setFilters(StringPairConfig.builder() - .source(sourceWindow) - .target(targetWindow) - .build()); - } - - private String mergeWindowPredicate(String baseFilter, String updateColumn, Instant start, Instant end) { - String window = updateColumn + " >= '" + SQL_UTC_FORMATTER.format(start) + "' AND " - + updateColumn + " < '" + SQL_UTC_FORMATTER.format(end) + "'"; - if (baseFilter == null || baseFilter.isBlank()) { - return window; - } - return "(" + baseFilter.trim() + ") AND (" + window + ")"; - } - - private Duration maxDuration(Duration left, Duration right) { - Duration max = left.compareTo(right) >= 0 ? left : right; - return max.multipliedBy(2); - } - - private void sleep(Duration interval) throws InterruptedException { - long millis = Math.max(1000L, interval.toMillis()); - Thread.sleep(millis); - } - - private CliConfiguration cloneConfig(CliConfiguration config) { - return OBJECT_MAPPER.convertValue(config, CliConfiguration.class); - } - - private CliDiffResult emptyResult(CliConfiguration config) { - return CliDiffResult.builder() - .strategy(config.getStrategyMode()) - .sourceMissingCount(0) - .targetMissingCount(0) - .mismatchCount(0) - .totalDifferences(0) - .sourceRowCount(0) - .targetRowCount(0) - .differences(new java.util.ArrayList<>()) - .tableMetadata(TableMetadata.builder() - .sourceTable(resourceDisplay(config.getSource())) - .targetTable(resourceDisplay(config.getTarget())) - .sourceColumns(requestFactory.sourceColumns(config)) - .targetColumns(requestFactory.targetColumns(config)) - .build()) - .build(); - } - - private String resourceDisplay(com.consilens.cli.model.ConnectionConfig connectionConfig) { - if (connectionConfig == null || connectionConfig.getResource() == null) { - return ""; - } - com.consilens.cli.model.ConnectionConfig.ResourceConfig resource = connectionConfig.getResource(); - if (resource.getName() != null && !resource.getName().isBlank()) { - return resource.getName(); - } - return resource.getPath() != null ? resource.getPath() : ""; - } - - private String stringValue(Object value) { - return value instanceof String ? (String) value : null; - } - - @FunctionalInterface - interface CheckpointStoreFactory { - CheckpointStore create(CliConfiguration config) throws Exception; - } -} diff --git a/consilens-cli/src/test/java/com/consilens/cli/config/ConfigurationManagerTest.java b/consilens-cli/src/test/java/com/consilens/cli/config/ConfigurationManagerTest.java index 218f881..508cd60 100644 --- a/consilens-cli/src/test/java/com/consilens/cli/config/ConfigurationManagerTest.java +++ b/consilens-cli/src/test/java/com/consilens/cli/config/ConfigurationManagerTest.java @@ -21,12 +21,14 @@ class ConfigurationManagerTest { @Test void shouldLoadJsonConfigurationFile() throws Exception { Path configFile = tempDir.resolve("diff-config.json"); - Files.writeString(configFile, "{\n" + + Files.writeString(configFile, "{\n" + " \"source\": {\n" + " \"type\": \"mysql\",\n" + - " \"url\": \"jdbc:mysql://localhost:3306/test\",\n" + - " \"username\": \"root\",\n" + - " \"password\": \"123456\",\n" + + " \"connection\": {\n" + + " \"url\": \"jdbc:mysql://localhost:3306/test\",\n" + + " \"username\": \"root\",\n" + + " \"password\": \"123456\"\n" + + " },\n" + " \"resource\": {\n" + " \"type\": \"table\",\n" + " \"name\": \"performance_test_table\"\n" + @@ -34,9 +36,11 @@ void shouldLoadJsonConfigurationFile() throws Exception { " },\n" + " \"target\": {\n" + " \"type\": \"postgresql\",\n" + - " \"url\": \"jdbc:postgresql://localhost:5432/postgres?currentSchema=public\",\n" + - " \"username\": \"postgres\",\n" + - " \"password\": \"123456\",\n" + + " \"connection\": {\n" + + " \"url\": \"jdbc:postgresql://localhost:5432/postgres?currentSchema=public\",\n" + + " \"username\": \"postgres\",\n" + + " \"password\": \"123456\"\n" + + " },\n" + " \"resource\": {\n" + " \"type\": \"table\",\n" + " \"name\": \"performance_test_table\"\n" + @@ -110,12 +114,14 @@ void shouldLoadJsonConfigurationFile() throws Exception { @Test void shouldResolveEnvironmentPlaceholdersInJsonConfiguration() throws Exception { Path configFile = tempDir.resolve("diff-config-env.json"); - Files.writeString(configFile, "{\n" + + Files.writeString(configFile, "{\n" + " \"source\": {\n" + " \"type\": \"mysql\",\n" + - " \"url\": \"${env.SOURCE_URL}\",\n" + - " \"username\": \"${env.DB_USER}\",\n" + - " \"password\": \"${env.DB_PASSWORD}\",\n" + + " \"connection\": {\n" + + " \"url\": \"${env.SOURCE_URL}\",\n" + + " \"username\": \"${env.DB_USER}\",\n" + + " \"password\": \"${env.DB_PASSWORD}\"\n" + + " },\n" + " \"resource\": {\n" + " \"type\": \"table\",\n" + " \"name\": \"${env.TABLE_NAME}\"\n" + @@ -123,9 +129,11 @@ void shouldResolveEnvironmentPlaceholdersInJsonConfiguration() throws Exception " },\n" + " \"target\": {\n" + " \"type\": \"postgresql\",\n" + - " \"url\": \"${env.TARGET_URL}\",\n" + - " \"username\": \"${env.DB_USER}\",\n" + - " \"password\": \"${env.DB_PASSWORD}\",\n" + + " \"connection\": {\n" + + " \"url\": \"${env.TARGET_URL}\",\n" + + " \"username\": \"${env.DB_USER}\",\n" + + " \"password\": \"${env.DB_PASSWORD}\"\n" + + " },\n" + " \"resource\": {\n" + " \"type\": \"table\",\n" + " \"name\": \"${env.TABLE_NAME}\"\n" + @@ -205,17 +213,19 @@ void shouldResolveEnvironmentPlaceholdersInYamlConfiguration() throws Exception Path configFile = tempDir.resolve("diff-config-env.yaml"); Files.writeString(configFile, "source:\n" + " type: mysql\n" + - " url: ${env.SOURCE_URL}\n" + - " username: ${env.DB_USER}\n" + - " password: ${env.DB_PASSWORD}\n" + + " connection:\n" + + " url: ${env.SOURCE_URL}\n" + + " username: ${env.DB_USER}\n" + + " password: ${env.DB_PASSWORD}\n" + " resource:\n" + " type: table\n" + " name: ${env.TABLE_NAME}\n" + "target:\n" + " type: postgresql\n" + - " url: ${env.TARGET_URL}\n" + - " username: ${env.DB_USER}\n" + - " password: ${env.DB_PASSWORD}\n" + + " connection:\n" + + " url: ${env.TARGET_URL}\n" + + " username: ${env.DB_USER}\n" + + " password: ${env.DB_PASSWORD}\n" + " resource:\n" + " type: table\n" + " name: ${env.TABLE_NAME}\n" + @@ -260,17 +270,19 @@ void shouldFailWhenRequiredEnvironmentVariableIsMissing() throws Exception { Path configFile = tempDir.resolve("diff-config-missing-env.yaml"); Files.writeString(configFile, "source:\n" + " type: mysql\n" + - " url: ${env.SOURCE_URL}\n" + - " username: root\n" + - " password: 123456\n" + + " connection:\n" + + " url: ${env.SOURCE_URL}\n" + + " username: root\n" + + " password: 123456\n" + " resource:\n" + " type: table\n" + " name: performance_test_table\n" + "target:\n" + " type: postgresql\n" + - " url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public\n" + - " username: postgres\n" + - " password: 123456\n" + + " connection:\n" + + " url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public\n" + + " username: postgres\n" + + " password: 123456\n" + " resource:\n" + " type: table\n" + " name: performance_test_table\n" + @@ -297,17 +309,19 @@ void shouldRejectUnsupportedTableSinkDatabase() throws Exception { Path configFile = tempDir.resolve("diff-config-unsupported-sink.yaml"); Files.writeString(configFile, "source:\n" + " type: mysql\n" + - " url: jdbc:mysql://localhost:3306/test\n" + - " username: root\n" + - " password: 123456\n" + + " connection:\n" + + " url: jdbc:mysql://localhost:3306/test\n" + + " username: root\n" + + " password: 123456\n" + " resource:\n" + " type: table\n" + " name: performance_test_table\n" + "target:\n" + " type: postgresql\n" + - " url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public\n" + - " username: postgres\n" + - " password: 123456\n" + + " connection:\n" + + " url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public\n" + + " username: postgres\n" + + " password: 123456\n" + " resource:\n" + " type: table\n" + " name: performance_test_table\n" + @@ -342,17 +356,19 @@ void shouldRejectTableSinkWithoutExplicitDatabaseType() throws Exception { Path configFile = tempDir.resolve("diff-config-missing-sink-type.yaml"); Files.writeString(configFile, "source:\n" + " type: mysql\n" + - " url: jdbc:mysql://localhost:3306/test\n" + - " username: root\n" + - " password: 123456\n" + + " connection:\n" + + " url: jdbc:mysql://localhost:3306/test\n" + + " username: root\n" + + " password: 123456\n" + " resource:\n" + " type: table\n" + " name: performance_test_table\n" + "target:\n" + " type: postgresql\n" + - " url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public\n" + - " username: postgres\n" + - " password: 123456\n" + + " connection:\n" + + " url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public\n" + + " username: postgres\n" + + " password: 123456\n" + " resource:\n" + " type: table\n" + " name: performance_test_table\n" + diff --git a/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java b/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java index 26721a0..783184c 100644 --- a/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java +++ b/consilens-cli/src/test/java/com/consilens/cli/config/ExampleConfigurationCompatibilityTest.java @@ -21,8 +21,6 @@ class ExampleConfigurationCompatibilityTest { "examples/performance-test-mysql-vs-postgres.yaml", "examples/performance-test-mysql-vs-starrocks.yaml", "examples/custom-sql-mysql-vs-postgres-checksum.yaml", - "examples/realtime-table-mysql-vs-postgres-loop.yaml", - "examples/realtime-custom-sql-mysql-vs-postgres-checksum.yaml", "examples/mysql-to-doris-partitioned-checksum.yaml", "examples/detail-to-aggregate-custom-sql.yaml", "examples/performance-test-mysql-vs-postgres.json" diff --git a/consilens-cli/src/test/java/com/consilens/cli/model/ConnectionConfigTest.java b/consilens-cli/src/test/java/com/consilens/cli/model/ConnectionConfigTest.java index e1defd8..343ac40 100644 --- a/consilens-cli/src/test/java/com/consilens/cli/model/ConnectionConfigTest.java +++ b/consilens-cli/src/test/java/com/consilens/cli/model/ConnectionConfigTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; class ConnectionConfigTest { @@ -30,23 +31,46 @@ void shouldAllowNonJdbcConnectorValidationWhenConnectionMapIsPresent() { } @Test - void shouldMergeGenericConnectionPropertiesWithTopLevelFields() { + void shouldExposeConnectionPropertiesFromNestedConnectionBlock() { ConnectionConfig.ConnectorConnectionProperties properties = ConnectionConfig.ConnectorConnectionProperties.builder() .url("jdbc:mysql://localhost:3306/orders") .username("connector-user") + .password("secret") .build(); properties.addProperty("sslMode", "DISABLED"); ConnectionConfig config = ConnectionConfig.builder() .type("mysql") - .username("top-level-user") .connection(properties) .build(); Map connectionMap = config.toConnectionMap(); assertEquals("jdbc:mysql://localhost:3306/orders", connectionMap.get("url")); - assertEquals("top-level-user", connectionMap.get("username")); + assertEquals("connector-user", connectionMap.get("username")); + assertEquals("secret", connectionMap.get("password")); assertEquals("DISABLED", connectionMap.get("sslMode")); + assertEquals("jdbc:mysql://localhost:3306/orders", config.getUrl()); + assertEquals("connector-user", config.getUsername()); + assertEquals("secret", config.getPassword()); + } + + @Test + void shouldRejectPathForTableResource() { + ConnectionConfig config = ConnectionConfig.builder() + .type("mysql") + .connection(ConnectionConfig.ConnectorConnectionProperties.builder() + .url("jdbc:mysql://localhost:3306/orders") + .username("root") + .password("secret") + .build()) + .resource(ConnectionConfig.ResourceConfig.builder() + .type("table") + .name("orders") + .path("orders") + .build()) + .build(); + + assertThrows(Exception.class, () -> config.validate("source")); } } diff --git a/consilens-cli/src/test/java/com/consilens/cli/service/CompareRequestFactoryTest.java b/consilens-cli/src/test/java/com/consilens/cli/service/CompareRequestFactoryTest.java index 9c69506..2edcac9 100644 --- a/consilens-cli/src/test/java/com/consilens/cli/service/CompareRequestFactoryTest.java +++ b/consilens-cli/src/test/java/com/consilens/cli/service/CompareRequestFactoryTest.java @@ -28,13 +28,11 @@ void shouldCompileMappingsToSqlResources() { .mappings(List.of( CompareMappingConfig.builder() .name("order_amount") - .type("decimal") .source(FieldExpressionConfig.builder().column("amount").build()) .target(FieldExpressionConfig.builder().expression("actual_amount + discount_amount").build()) .build(), CompareMappingConfig.builder() .name("order_status") - .type("string") .source(FieldExpressionConfig.builder().column("status").build()) .target(FieldExpressionConfig.builder() .expression("CASE WHEN state = 1 THEN 'PAID' ELSE 'UNPAID' END") @@ -102,16 +100,20 @@ private CliConfiguration baseConfig() { return CliConfiguration.builder() .source(ConnectionConfig.builder() .type("mysql") - .url("jdbc:mysql://localhost:3306/source") - .username("root") - .password("secret") + .connection(ConnectionConfig.ConnectorConnectionProperties.builder() + .url("jdbc:mysql://localhost:3306/source") + .username("root") + .password("secret") + .build()) .resource(ConnectionConfig.ResourceConfig.builder().type("table").name("source_table").build()) .build()) .target(ConnectionConfig.builder() .type("mysql") - .url("jdbc:mysql://localhost:3306/target") - .username("root") - .password("secret") + .connection(ConnectionConfig.ConnectorConnectionProperties.builder() + .url("jdbc:mysql://localhost:3306/target") + .username("root") + .password("secret") + .build()) .resource(ConnectionConfig.ResourceConfig.builder().type("table").name("target_table").build()) .build()) .strategy(StrategyConfig.builder() diff --git a/consilens-cli/src/test/java/com/consilens/cli/service/DiffServiceTest.java b/consilens-cli/src/test/java/com/consilens/cli/service/DiffServiceTest.java index b120bf3..04b6c4c 100644 --- a/consilens-cli/src/test/java/com/consilens/cli/service/DiffServiceTest.java +++ b/consilens-cli/src/test/java/com/consilens/cli/service/DiffServiceTest.java @@ -102,16 +102,20 @@ private CliConfiguration createConfig() { return CliConfiguration.builder() .source(ConnectionConfig.builder() .type("mysql") - .url("jdbc:mysql://localhost:3306/source_db") - .username("user") - .password("pwd") + .connection(ConnectionConfig.ConnectorConnectionProperties.builder() + .url("jdbc:mysql://localhost:3306/source_db") + .username("user") + .password("pwd") + .build()) .resource(ConnectionConfig.ResourceConfig.builder().type("table").name("source_table").build()) .build()) .target(ConnectionConfig.builder() .type("mysql") - .url("jdbc:mysql://localhost:3306/target_db") - .username("user") - .password("pwd") + .connection(ConnectionConfig.ConnectorConnectionProperties.builder() + .url("jdbc:mysql://localhost:3306/target_db") + .username("user") + .password("pwd") + .build()) .resource(ConnectionConfig.ResourceConfig.builder().type("table").name("target_table").build()) .build()) .comparison(ComparisonConfig.builder() diff --git a/consilens-cli/src/test/java/com/consilens/cli/service/JdbcCheckpointStoreTest.java b/consilens-cli/src/test/java/com/consilens/cli/service/JdbcCheckpointStoreTest.java deleted file mode 100644 index 8312006..0000000 --- a/consilens-cli/src/test/java/com/consilens/cli/service/JdbcCheckpointStoreTest.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.consilens.cli.service; - -import org.junit.jupiter.api.Test; - -import java.time.Instant; -import java.util.Properties; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -class JdbcCheckpointStoreTest { - - @Test - void shouldRespectActiveLeaseAndPreserveWatermarkWhenAcquiring() throws Exception { - String tableName = "checkpoint_" + System.nanoTime(); - try (JdbcCheckpointStore store = new JdbcCheckpointStore( - "jdbc:h2:mem:" + tableName + ";DB_CLOSE_DELAY=-1", - new Properties(), - tableName, - "org.h2.Driver")) { - Instant watermark = Instant.now().minusSeconds(120); - Instant start = watermark.minusSeconds(60); - Instant end = watermark; - store.markSucceeded("task-1", watermark, start, end); - - assertTrue(store.tryMarkRunning( - "task-1", - Instant.now().minusSeconds(30), - Instant.now(), - "owner-1", - Instant.now().plusSeconds(60))); - assertFalse(store.tryMarkRunning( - "task-1", - Instant.now().minusSeconds(30), - Instant.now(), - "owner-2", - Instant.now().plusSeconds(60))); - - CompareCheckpoint running = store.load("task-1").orElseThrow(); - assertEquals(watermark, running.getWatermark()); - assertEquals("owner-1", running.getOwner()); - } - } -} diff --git a/consilens-cli/src/test/java/com/consilens/cli/service/RealtimeCompareRunnerTest.java b/consilens-cli/src/test/java/com/consilens/cli/service/RealtimeCompareRunnerTest.java deleted file mode 100644 index 8da4f49..0000000 --- a/consilens-cli/src/test/java/com/consilens/cli/service/RealtimeCompareRunnerTest.java +++ /dev/null @@ -1,165 +0,0 @@ -package com.consilens.cli.service; - -import com.consilens.cli.model.CheckpointStoreConfig; -import com.consilens.cli.model.CliConfiguration; -import com.consilens.cli.model.CliDiffResult; -import com.consilens.cli.model.ComparisonConfig; -import com.consilens.cli.model.ConnectionConfig; -import com.consilens.cli.model.ListPairConfig; -import com.consilens.cli.model.LocalCompareConfig; -import com.consilens.cli.model.RealtimeConfig; -import com.consilens.cli.model.StrategyConfig; -import com.consilens.cli.model.StringPairConfig; -import org.junit.jupiter.api.Test; - -import java.time.Clock; -import java.time.Instant; -import java.time.ZoneOffset; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -class RealtimeCompareRunnerTest { - - @Test - void shouldAdvanceCheckpointOnSuccessfulRun() throws Exception { - InMemoryCheckpointStore checkpointStore = new InMemoryCheckpointStore(); - RecordingDiffService diffService = new RecordingDiffService(false, true); - RealtimeCompareRunner runner = new RealtimeCompareRunner( - diffService, - new CompareRequestFactory(), - Clock.fixed(Instant.parse("2026-05-03T10:00:00Z"), ZoneOffset.UTC), - config -> checkpointStore); - - runner.runLoop(baseConfig()); - - CompareCheckpoint checkpoint = checkpointStore.load(new CompareRequestFactory().create(baseConfig()).getRealtimeSpec().getTaskId()).orElseThrow(); - assertEquals(Instant.parse("2026-05-03T09:55:00Z"), checkpoint.getWatermark()); - assertTrue(diffService.lastSourceFilter.contains("updated_at >=")); - assertTrue(diffService.lastTargetFilter.contains("updated_at <")); - assertTrue(diffService.lastSourceFilter.contains("'2026-05-03 09:45:00'")); - assertTrue(diffService.lastSourceFilter.contains("'2026-05-03 09:55:00'")); - } - - @Test - void shouldKeepPreviousWatermarkOnFailure() throws Exception { - InMemoryCheckpointStore checkpointStore = new InMemoryCheckpointStore(); - String taskId = new CompareRequestFactory().create(baseConfig()).getRealtimeSpec().getTaskId(); - checkpointStore.markSucceeded(taskId, - Instant.parse("2026-05-03T09:40:00Z"), - Instant.parse("2026-05-03T09:10:00Z"), - Instant.parse("2026-05-03T09:40:00Z")); - RealtimeCompareRunner runner = new RealtimeCompareRunner( - new RecordingDiffService(true, true), - new CompareRequestFactory(), - Clock.fixed(Instant.parse("2026-05-03T10:00:00Z"), ZoneOffset.UTC), - config -> checkpointStore); - - CliDiffResult result = runner.runLoop(baseConfig()); - - CompareCheckpoint checkpoint = checkpointStore.load(taskId).orElseThrow(); - assertEquals(0, result.getTotalDifferences()); - assertEquals(Instant.parse("2026-05-03T09:40:00Z"), checkpoint.getWatermark()); - assertEquals("failed", checkpoint.getStatus()); - } - - @Test - void loopShouldKeepProcessAliveWhenIterationFails() throws Exception { - InMemoryCheckpointStore checkpointStore = new InMemoryCheckpointStore(); - RecordingDiffService diffService = new RecordingDiffService(true, true); - RealtimeCompareRunner runner = new RealtimeCompareRunner( - diffService, - new CompareRequestFactory(), - Clock.fixed(Instant.parse("2026-05-03T10:00:00Z"), ZoneOffset.UTC), - config -> checkpointStore); - - try { - CliDiffResult result = runner.runLoop(baseConfig()); - - assertEquals(0, result.getTotalDifferences()); - assertEquals(1, diffService.invocations); - } finally { - Thread.interrupted(); - } - } - - private CliConfiguration baseConfig() { - return CliConfiguration.builder() - .source(ConnectionConfig.builder() - .type("mysql") - .url("jdbc:mysql://localhost:3306/source") - .username("root") - .password("secret") - .resource(ConnectionConfig.ResourceConfig.builder().type("table").name("orders_src").build()) - .build()) - .target(ConnectionConfig.builder() - .type("mysql") - .url("jdbc:mysql://localhost:3306/target") - .username("root") - .password("secret") - .resource(ConnectionConfig.ResourceConfig.builder().type("table").name("orders_tgt").build()) - .build()) - .comparison(ComparisonConfig.builder() - .keys(ListPairConfig.builder().source(List.of("id")).target(List.of("id")).build()) - .fields(ListPairConfig.builder().source(List.of("amount")).target(List.of("amount")).build()) - .build()) - .strategy(StrategyConfig.builder() - .mode("checksum") - .algorithm("xor") - .bisectionFactor(4) - .bisectionThreshold(1000L) - .localCompare(LocalCompareConfig.builder().mode("full").build()) - .build()) - .realtime(RealtimeConfig.builder() - .enabled(true) - .updateColumns(StringPairConfig.builder().source("updated_at").target("updated_at").build()) - .watermarkDelay("PT5M") - .windowSize("PT10M") - .overlap("PT30M") - .checkpointStore(CheckpointStoreConfig.builder().type("memory").name("ignored").build()) - .build()) - .build(); - } - - private static final class RecordingDiffService extends DiffService { - - private final boolean fail; - private final boolean interruptAfterCall; - private String lastSourceFilter; - private String lastTargetFilter; - private int invocations; - - private RecordingDiffService(boolean fail) { - this(fail, false); - } - - private RecordingDiffService(boolean fail, boolean interruptAfterCall) { - this.fail = fail; - this.interruptAfterCall = interruptAfterCall; - } - - @Override - public CliDiffResult performDiff(CliConfiguration config) throws Exception { - invocations++; - lastSourceFilter = config.getComparison().getFilters().getSource(); - lastTargetFilter = config.getComparison().getFilters().getTarget(); - if (interruptAfterCall) { - Thread.currentThread().interrupt(); - } - if (fail) { - throw new Exception("boom"); - } - return CliDiffResult.builder() - .strategy(config.getStrategyMode()) - .sourceMissingCount(0) - .targetMissingCount(0) - .mismatchCount(0) - .totalDifferences(0) - .sourceRowCount(1) - .targetRowCount(1) - .differences(List.of()) - .build(); - } - } -} diff --git a/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/model/UpdateWindow.java b/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/model/UpdateWindow.java deleted file mode 100644 index d7ba5db..0000000 --- a/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/model/UpdateWindow.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.consilens.connector.api.model; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.Instant; - -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class UpdateWindow { - - private String column; - - private Instant start; - - private Instant end; -} diff --git a/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/CompareRequest.java b/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/CompareRequest.java index 7e90d83..49ed886 100644 --- a/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/CompareRequest.java +++ b/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/CompareRequest.java @@ -32,8 +32,6 @@ public class CompareRequest { private PredicateSpec targetFilter; - private RealtimeSpec realtimeSpec; - private NormalizationSpec normalizationSpec; private CompareStrategyPreference strategyPreference; diff --git a/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/CompareSegment.java b/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/CompareSegment.java index 148349d..0db08d7 100644 --- a/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/CompareSegment.java +++ b/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/CompareSegment.java @@ -7,7 +7,6 @@ import com.consilens.connector.api.model.PredicateSpec; import com.consilens.connector.api.model.ResourceLocator; import com.consilens.connector.api.model.SchemaDescriptor; -import com.consilens.connector.api.model.UpdateWindow; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -36,6 +35,4 @@ public class CompareSegment { private SchemaDescriptor schema; private String side; - - private UpdateWindow updateWindow; } diff --git a/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/RealtimeSpec.java b/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/RealtimeSpec.java deleted file mode 100644 index 87f667d..0000000 --- a/consilens-connector/consilens-connector-api/src/main/java/com/consilens/connector/api/planner/RealtimeSpec.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.consilens.connector.api.planner; - -import com.consilens.connector.api.model.UpdateWindow; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class RealtimeSpec { - - private String taskId; - - private boolean enabled; - - private String watermarkDelay; - - private String windowSize; - - private String overlap; - - private UpdateWindow sourceWindow; - - private UpdateWindow targetWindow; - - private String checkpointStoreType; - - private String checkpointStoreName; -} diff --git a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandle.java b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandle.java index 1252a8f..8b9b081 100644 --- a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandle.java +++ b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/JdbcDatasetHandle.java @@ -727,7 +727,6 @@ private String buildWhereClause(CompareSegment segment, List keyColumns) } return new WhereClauseBuilder(dialect) .addBaseFilter(segment.getFilter()) - .addUpdateWindow(segment.getUpdateWindow()) .addSplit(segment.getSplit(), keyColumns) .build(); } diff --git a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/WhereClauseBuilder.java b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/WhereClauseBuilder.java index cb7bc80..742f4eb 100644 --- a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/WhereClauseBuilder.java +++ b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/main/java/com/consilens/conncetor/base/jdbc/WhereClauseBuilder.java @@ -3,11 +3,9 @@ import com.consilens.connector.api.DatabaseDialect; import com.consilens.connector.api.ConnectorException; import com.consilens.connector.api.model.PredicateSpec; -import com.consilens.connector.api.model.UpdateWindow; import com.consilens.connector.api.planner.KeyRangeSplit; import com.consilens.connector.api.planner.SegmentSplit; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; @@ -27,23 +25,6 @@ public WhereClauseBuilder addBaseFilter(PredicateSpec filter) { return this; } - public WhereClauseBuilder addUpdateWindow(UpdateWindow window) { - if (window == null || window.getColumn() == null || window.getColumn().isBlank()) { - return this; - } - List parts = new ArrayList<>(); - if (window.getStart() != null) { - parts.add(quote(window.getColumn()) + " >= " + dialect.getSqlQueryGenerator().formatValue(Timestamp.from(window.getStart()))); - } - if (window.getEnd() != null) { - parts.add(quote(window.getColumn()) + " < " + dialect.getSqlQueryGenerator().formatValue(Timestamp.from(window.getEnd()))); - } - if (!parts.isEmpty()) { - predicates.add("(" + String.join(" AND ", parts) + ")"); - } - return this; - } - public WhereClauseBuilder addSplit(SegmentSplit split, List keyColumns) { if (split == null) { return this; diff --git a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/test/java/com/consilens/conncetor/base/jdbc/WhereClauseBuilderTest.java b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/test/java/com/consilens/conncetor/base/jdbc/WhereClauseBuilderTest.java index cccb849..818d151 100644 --- a/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/test/java/com/consilens/conncetor/base/jdbc/WhereClauseBuilderTest.java +++ b/consilens-connector/consilens-connector-plugins/consilens-connector-base/src/test/java/com/consilens/conncetor/base/jdbc/WhereClauseBuilderTest.java @@ -4,12 +4,10 @@ import com.consilens.connector.api.DatabaseDialect; import com.consilens.connector.api.SqlQueryGenerator; import com.consilens.connector.api.model.PredicateSpec; -import com.consilens.connector.api.model.UpdateWindow; import com.consilens.connector.api.planner.KeyRangeSplit; import org.junit.jupiter.api.Test; import java.lang.reflect.Proxy; -import java.time.Instant; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -18,22 +16,14 @@ class WhereClauseBuilderTest { @Test void shouldMergePredicatesInStableOrder() { - String startTimestamp = java.sql.Timestamp.from(Instant.parse("2026-05-03T09:00:00Z")).toString(); - String endTimestamp = java.sql.Timestamp.from(Instant.parse("2026-05-03T10:00:00Z")).toString(); String whereClause = new WhereClauseBuilder(dialect()) .addBaseFilter(PredicateSpec.builder().expression("status = 1").build()) - .addUpdateWindow(UpdateWindow.builder() - .column("updated_at") - .start(Instant.parse("2026-05-03T09:00:00Z")) - .end(Instant.parse("2026-05-03T10:00:00Z")) - .build()) .addSplit(new KeyRangeSplit(List.of(1L), List.of(10L)), List.of("id")) .addKeyPredicate(List.of("id"), List.of(List.of(1L), List.of(2L))) .build(); assertEquals("" + "(status = 1) AND " - + "(`updated_at` >= '" + startTimestamp + "' AND `updated_at` < '" + endTimestamp + "') AND " + "(`id` >= 1 AND `id` < 10) AND " + "(`id` IN (1, 2))", whereClause); diff --git a/consilens-core/src/main/java/com/consilens/core/compare/DefaultCompareRuntime.java b/consilens-core/src/main/java/com/consilens/core/compare/DefaultCompareRuntime.java index e94520a..88b5ea2 100644 --- a/consilens-core/src/main/java/com/consilens/core/compare/DefaultCompareRuntime.java +++ b/consilens-core/src/main/java/com/consilens/core/compare/DefaultCompareRuntime.java @@ -7,7 +7,6 @@ import com.consilens.connector.api.model.KeySpec; import com.consilens.connector.api.model.PredicateSpec; import com.consilens.connector.api.model.SchemaDescriptor; -import com.consilens.connector.api.model.UpdateWindow; import com.consilens.connector.api.config.ReadOptions; import com.consilens.connector.api.normalization.DefaultNormalizationSpecValidator; import com.consilens.connector.api.planner.CompareRequest; @@ -74,16 +73,14 @@ public DiffResult execute(CompareRequest request) throws Exception { request.getSourceKeySpec(), request.getSourceComparisons(), request.getSourceFilter(), - "source", - request.getRealtimeSpec() != null ? request.getRealtimeSpec().getSourceWindow() : null); + "source"); CompareSegment targetSegment = buildSegment( targetDataset, request.getTarget(), request.getTargetKeySpec(), request.getTargetComparisons(), request.getTargetFilter(), - "target", - request.getRealtimeSpec() != null ? request.getRealtimeSpec().getTargetWindow() : null); + "target"); ComparePlan plan = comparePlanner.plan(request, sourceDataset, targetDataset); log.info("Selected compare plan: {}", plan.getPlanType()); @@ -112,8 +109,7 @@ private CompareSegment buildSegment(DatasetHandle dataset, KeySpec keySpec, ComparisonSpec comparisons, PredicateSpec filter, - String side, - UpdateWindow updateWindow) { + String side) { SchemaDescriptor schema = dataset.getSchema(); return CompareSegment.builder() .dataset(dataset) @@ -123,7 +119,6 @@ private CompareSegment buildSegment(DatasetHandle dataset, .filter(filter) .schema(schema) .side(side) - .updateWindow(updateWindow) .snapshot(dataset.getSnapshotProvider() .map(provider -> provider.createSnapshot(config.getReadOptions())) .orElse(null)) diff --git a/consilens-core/src/main/java/com/consilens/core/compare/relational/RelationalCompareSegmentAdapter.java b/consilens-core/src/main/java/com/consilens/core/compare/relational/RelationalCompareSegmentAdapter.java index 0699bf2..bdc8a27 100644 --- a/consilens-core/src/main/java/com/consilens/core/compare/relational/RelationalCompareSegmentAdapter.java +++ b/consilens-core/src/main/java/com/consilens/core/compare/relational/RelationalCompareSegmentAdapter.java @@ -10,7 +10,6 @@ import com.consilens.connector.api.model.ResourceLocator; import com.consilens.connector.api.model.SchemaDescriptor; import com.consilens.connector.api.model.TablePath; -import com.consilens.connector.api.model.UpdateWindow; import com.consilens.connector.api.planner.CompareSegment; import com.consilens.connector.api.planner.KeyRangeSplit; import com.consilens.connector.api.planner.OffsetLimitSplit; @@ -24,7 +23,6 @@ import java.sql.Connection; import java.sql.SQLException; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -67,7 +65,7 @@ public static PreparedTableSegment toTableSegment(CompareSegment segment, Compar .relationSource(sqlResource ? sqlRelationSource(resource) : null) .keyColumns(keyColumns) .extraColumns(extraColumns) - .whereClause(resolveWhereClause(segment.getFilter(), segment.getUpdateWindow(), support)) + .whereClause(resolveWhereClause(segment.getFilter())) .caseSensitive(false) .schema(Optional.ofNullable(RelationalSchemaAdapter.toLegacySchema(schema, tablePath))); @@ -106,35 +104,16 @@ private static void applySplit(TableSegment.TableSegmentBuilder builder, Segment throw new ConnectorException("Unsupported relational split type: " + split.getClass().getSimpleName()); } - private static Optional resolveWhereClause(PredicateSpec filter, - UpdateWindow updateWindow, - RelationalDatasetSupport support) { - String whereClause = buildWhereClause(filter, updateWindow, support); + private static Optional resolveWhereClause(PredicateSpec filter) { + String whereClause = buildWhereClause(filter); return whereClause == null || whereClause.isBlank() ? Optional.empty() : Optional.of(whereClause); } - private static String buildWhereClause(PredicateSpec filter, - UpdateWindow updateWindow, - RelationalDatasetSupport support) { + private static String buildWhereClause(PredicateSpec filter) { List predicates = new ArrayList<>(); if (filter != null && filter.getExpression() != null && !filter.getExpression().trim().isEmpty()) { predicates.add("(" + filter.getExpression().trim() + ")"); } - if (updateWindow != null && updateWindow.getColumn() != null && !updateWindow.getColumn().isBlank()) { - List window = new ArrayList<>(); - String column = updateWindow.getColumn(); - if (updateWindow.getStart() != null) { - window.add(column + " >= " + support.getDialect().getSqlQueryGenerator() - .formatValue(Timestamp.from(updateWindow.getStart()))); - } - if (updateWindow.getEnd() != null) { - window.add(column + " < " + support.getDialect().getSqlQueryGenerator() - .formatValue(Timestamp.from(updateWindow.getEnd()))); - } - if (!window.isEmpty()) { - predicates.add("(" + String.join(" AND ", window) + ")"); - } - } return predicates.isEmpty() ? null : String.join(" AND ", predicates); } diff --git a/consilens-sink/consilens-sink-api/src/main/java/com/consilens/sink/api/model/ResultConfig.java b/consilens-sink/consilens-sink-api/src/main/java/com/consilens/sink/api/model/ResultConfig.java index 67211da..1a7587b 100644 --- a/consilens-sink/consilens-sink-api/src/main/java/com/consilens/sink/api/model/ResultConfig.java +++ b/consilens-sink/consilens-sink-api/src/main/java/com/consilens/sink/api/model/ResultConfig.java @@ -24,9 +24,6 @@ public class ResultConfig { @Builder.Default private List sinks = new ArrayList<>(); - @Builder.Default - private int parallelism = 4; - @Builder.Default private boolean failOnSinkError = false; } diff --git a/examples/realtime-custom-sql-mysql-vs-postgres-checksum.yaml b/examples/realtime-custom-sql-mysql-vs-postgres-checksum.yaml deleted file mode 100644 index 8652988..0000000 --- a/examples/realtime-custom-sql-mysql-vs-postgres-checksum.yaml +++ /dev/null @@ -1,83 +0,0 @@ -# 实时自定义 SQL 比对配置 - SQL 结果集走 checksum -# 启动方式:./consilens diff --config examples/realtime-custom-sql-mysql-vs-postgres-checksum.yaml - -source: - type: mysql - name: source-mysql-realtime-sql - connection: - url: jdbc:mysql://localhost:3306/test - username: ${env.MYSQL_USER} - password: ${env.MYSQL_PASSWORD} - resource: - type: sql - path: | - SELECT record_id, - amount, - status, - updated_at - FROM performance_test_table - WHERE deleted = 0 - -target: - type: postgresql - name: target-postgresql-realtime-sql - connection: - url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public - username: ${env.PG_USER} - password: ${env.PG_PASSWORD} - resource: - type: sql - path: | - SELECT record_id, - amount_cents / 100.0 AS amount, - order_status AS status, - updated_time AS updated_at - FROM performance_test_table - WHERE is_deleted = false - -comparison: - keys: - source: - - record_id - target: - - record_id - - fields: - source: - - amount - - status - - updated_at - target: - - amount - - status - - updated_at - -realtime: - enabled: true - updateColumns: - source: updated_at - target: updated_at - watermarkDelay: PT5M - windowSize: PT15M - overlap: PT1H - interval: PT2M - checkpointStore: - type: table - name: consilens_compare_checkpoint - url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public - username: ${env.PG_USER} - password: ${env.PG_PASSWORD} - driver: org.postgresql.Driver - -strategy: - mode: checksum - algorithm: xor - bisectionFactor: 8 - bisectionThreshold: 8000 - batchSize: 1000 - enableProfiling: true - -result: - sinks: - - format: console - type: result diff --git a/examples/realtime-table-mysql-vs-postgres-loop.yaml b/examples/realtime-table-mysql-vs-postgres-loop.yaml deleted file mode 100644 index fb92d47..0000000 --- a/examples/realtime-table-mysql-vs-postgres-loop.yaml +++ /dev/null @@ -1,80 +0,0 @@ -# 实时长驻比对配置 - MySQL 表 vs PostgreSQL 表 -# 启动方式:./consilens diff --config examples/realtime-table-mysql-vs-postgres-loop.yaml -# 目标:按 updated_at 滚动窗口持续比对,checkpoint 持久化到 PostgreSQL 表。 - -source: - type: mysql - name: source-mysql-realtime - connection: - url: jdbc:mysql://localhost:3306/test - username: ${env.MYSQL_USER} - password: ${env.MYSQL_PASSWORD} - resource: - type: table - name: performance_test_table - -target: - type: postgresql - name: target-postgresql-realtime - connection: - url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public - username: ${env.PG_USER} - password: ${env.PG_PASSWORD} - resource: - type: table - name: performance_test_table - -comparison: - keys: - source: - - record_id - target: - - record_id - - fields: - source: - - col_int - - col_decimal - - amount - - status - - updated_at - target: - - col_int - - col_decimal - - amount - - status - - updated_at - - filters: - source: "created_at >= '2026-01-01 00:00:00'" - target: "created_at >= '2026-01-01 00:00:00'" - -realtime: - enabled: true - updateColumns: - source: updated_at - target: updated_at - watermarkDelay: PT5M - windowSize: PT10M - overlap: PT30M - interval: PT1M - checkpointStore: - type: table - name: consilens_compare_checkpoint - url: jdbc:postgresql://localhost:5432/postgres?currentSchema=public - username: ${env.PG_USER} - password: ${env.PG_PASSWORD} - driver: org.postgresql.Driver - -strategy: - mode: checksum - algorithm: xor - bisectionFactor: 8 - bisectionThreshold: 8000 - batchSize: 1000 - enableProfiling: true - -result: - sinks: - - format: console - type: result