diff --git a/CHANGES.md b/CHANGES.md index 5499cb066476..4072a66e07c6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -65,6 +65,7 @@ ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* DebeziumIO (Java): added `OffsetRetainer` interface and `FileSystemOffsetRetainer` implementation to persist and restore CDC offsets across pipeline restarts, and exposed `withStartOffset` / `withOffsetRetainer` on `DebeziumIO.Read` and the cross-language `ReadBuilder` ([#28248](https://github.com/apache/beam/issues/28248)). ## New Features / Improvements diff --git a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go index 547aba0ceb99..f31a71fbc9d9 100644 --- a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go +++ b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go @@ -77,6 +77,8 @@ type readFromDebeziumSchema struct { MaxNumberOfRecords *int64 MaxTimeToRun *int64 ConnectionProperties []string + StartOffset []string + OffsetStoragePath *string } type debeziumConfig struct { @@ -149,6 +151,44 @@ func ConnectionProperties(cp []string) readOption { } } +// StartOffset specifies the offset from which the connector should resume consuming +// changes. Each entry must be a "key=value" string, where numeric values are encoded +// as their decimal string representation. +// +// Example for PostgreSQL: +// +// debeziumio.StartOffset([]string{"lsn=28160840"}) +// +// Example for MySQL: +// +// debeziumio.StartOffset([]string{"file=binlog.000001", "pos=156"}) +// +// Obtain the offset from the output of a previous pipeline run. Numeric values such +// as LSN or binlog position are automatically parsed to Long on the Java side. +func StartOffset(offset []string) readOption { + return func(cfg *debeziumConfig) { + cfg.readSchema.StartOffset = offset + } +} + +// OffsetStoragePath sets a path where the connector offset is automatically saved after each +// checkpoint and loaded on pipeline startup, allowing the pipeline to resume from where it +// left off without any manual offset management. +// +// The path can be on any filesystem supported by the active Beam runner +// (local disk, GCS, S3, etc.). +// +// Example: +// +// debeziumio.OffsetStoragePath("gs://my-bucket/debezium/orders-offset.json") +// +// When set, takes precedence over StartOffset. +func OffsetStoragePath(path string) readOption { + return func(cfg *debeziumConfig) { + cfg.readSchema.OffsetStoragePath = &path + } +} + // ExpansionAddr sets the expansion service address to use for DebeziumIO cross-langauage transform. func ExpansionAddr(expansionAddr string) readOption { return func(cfg *debeziumConfig) { diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java index ebf91a4a0957..6c31d5a02349 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java @@ -144,6 +144,10 @@ public abstract static class Read extends PTransform> abstract @Nullable Long getPollingTimeout(); + abstract @Nullable Map getStartOffset(); + + abstract @Nullable OffsetRetainer getOffsetRetainer(); + abstract @Nullable Coder getCoder(); abstract Builder toBuilder(); @@ -162,6 +166,10 @@ abstract static class Builder { abstract Builder setPollingTimeout(Long miliseconds); + abstract Builder setStartOffset(Map startOffset); + + abstract Builder setOffsetRetainer(OffsetRetainer retainer); + abstract Read build(); } @@ -230,6 +238,74 @@ public Read withPollingTimeout(Long miliseconds) { return toBuilder().setPollingTimeout(miliseconds).build(); } + /** + * Sets a starting offset so the connector resumes consuming changes from a previously seen + * position rather than from the beginning of the change stream. + * + *

The offset format is connector-specific. You can capture the current offset for each + * processed record inside your {@link SourceRecordMapper} via {@link + * org.apache.kafka.connect.source.SourceRecord#sourceOffset()} and persist it externally (for + * example in Cloud Storage, a database, or a local file). On the next pipeline run, pass the + * last saved offset here. + * + *

Example (PostgreSQL): + * + *

{@code
+     * // Capture the offset inside the SourceRecordMapper:
+     * Map offset = sourceRecord.sourceOffset();
+     * // Persist 'offset' externally, then on restart:
+     * DebeziumIO.read()
+     *     .withConnectorConfiguration(config)
+     *     .withStartOffset(savedOffset)
+     *     .withFormatFunction(myMapper);
+     * }
+ * + * @param startOffset A map representing the resumption point, as returned by {@code + * SourceRecord#sourceOffset()}. + * @return PTransform {@link #read} + */ + public Read withStartOffset(Map startOffset) { + checkArgument(startOffset != null, "startOffset can not be null"); + return toBuilder().setStartOffset(startOffset).build(); + } + + /** + * Sets an {@link OffsetRetainer} that automatically saves and restores the connector offset, + * allowing the pipeline to resume from where it left off after a restart without any manual + * offset management. + * + *

When a retainer is configured: + * + *

    + *
  1. At pipeline startup, {@link OffsetRetainer#loadOffset()} is called. If a saved offset + * is found, the connector resumes from that position; otherwise it starts from the + * beginning of the change stream. + *
  2. After each successful checkpoint ({@code task.commit()}), {@link + * OffsetRetainer#saveOffset(Map)} is called with the latest committed offset. + *
+ * + *

The built-in {@link FileSystemOffsetRetainer} persists the offset as a JSON file on any + * Beam-compatible filesystem (local, GCS, S3, etc.): + * + *

{@code
+     * DebeziumIO.read()
+     *     .withConnectorConfiguration(config)
+     *     .withOffsetRetainer(
+     *         new FileSystemOffsetRetainer("gs://my-bucket/debezium/orders-offset.json"))
+     *     .withFormatFunction(myMapper);
+     * }
+ * + *

When both a retainer and {@link #withStartOffset(Map)} are set, the retainer takes + * precedence. Use {@link #withStartOffset(Map)} alone for a one-time manual override. + * + * @param retainer The {@link OffsetRetainer} to use for loading and saving offsets. + * @return PTransform {@link #read} + */ + public Read withOffsetRetainer(OffsetRetainer retainer) { + checkArgument(retainer != null, "retainer can not be null"); + return toBuilder().setOffsetRetainer(retainer).build(); + } + protected Schema getRecordSchema() { KafkaSourceConsumerFn fn = new KafkaSourceConsumerFn<>(getConnectorConfiguration().getConnectorClass().get(), this); diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java index 22a34ae2654b..4b431ac38ba2 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java @@ -18,6 +18,7 @@ package org.apache.beam.io.debezium; import com.google.auto.service.AutoService; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.sdk.expansion.ExternalTransformRegistrar; @@ -78,6 +79,8 @@ public static class Configuration extends CrossLanguageConfiguration { private @Nullable List connectionProperties; private @Nullable Long maxNumberOfRecords; private @Nullable Long maxTimeToRun; + private @Nullable List startOffset; + private @Nullable String offsetStoragePath; public void setConnectionProperties(@Nullable List connectionProperties) { this.connectionProperties = connectionProperties; @@ -90,6 +93,14 @@ public void setMaxNumberOfRecords(@Nullable Long maxNumberOfRecords) { public void setMaxTimeToRun(@Nullable Long maxTimeToRun) { this.maxTimeToRun = maxTimeToRun; } + + public void setStartOffset(@Nullable List startOffset) { + this.startOffset = startOffset; + } + + public void setOffsetStoragePath(@Nullable String offsetStoragePath) { + this.offsetStoragePath = offsetStoragePath; + } } @Override @@ -123,6 +134,33 @@ public PTransform> buildExternal(Configuration confi readTransform = readTransform.withMaxTimeToRun(configuration.maxTimeToRun); } + if (configuration.startOffset != null) { + Map startOffsetMap = new HashMap<>(); + for (String property : configuration.startOffset) { + String[] parts = property.split("=", 2); + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid startOffset entry: \"" + + property + + "\". Expected format is \"key=value\"."); + } + String key = parts[0]; + String value = parts[1]; + try { + startOffsetMap.put(key, Long.parseLong(value)); + } catch (NumberFormatException e) { + startOffsetMap.put(key, value); + } + } + readTransform = readTransform.withStartOffset(startOffsetMap); + } + + if (configuration.offsetStoragePath != null) { + readTransform = + readTransform.withOffsetRetainer( + FileSystemOffsetRetainer.of(configuration.offsetStoragePath)); + } + return readTransform; } } diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java new file mode 100644 index 000000000000..552eccafe3ad --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link OffsetRetainer} that persists the Debezium connector offset as a JSON file using Beam's + * {@link FileSystems} abstraction. + * + *

The {@code path} argument can point to any filesystem supported by the active Beam runner, + * including local disk, Google Cloud Storage, Amazon S3, and others + * + *

On every {@code task.commit()}, the latest offset is serialised to JSON and written to the + * given path (overwriting the previous file). On pipeline startup the file is read back and the + * connector resumes from the stored position. If the file does not yet exist the connector starts + * from the beginning of the change stream. + * + *

Example — resume from GCS: + * + *

{@code
+ * DebeziumIO.read()
+ *     .withConnectorConfiguration(config)
+ *     .withOffsetRetainer(
+ *         FileSystemOffsetRetainer.of("gs://my-bucket/debezium/orders-offset.json"))
+ *     .withFormatFunction(myMapper);
+ * }
+ * + *

Example — local filesystem (useful for testing): + * + *

{@code
+ * DebeziumIO.read()
+ *     .withConnectorConfiguration(config)
+ *     .withOffsetRetainer(FileSystemOffsetRetainer.of("/tmp/debezium-offset.json"))
+ *     .withFormatFunction(myMapper);
+ * }
+ * + *

Note: writes are performed atomically: the offset is first written to a {@code .tmp} + * sibling file and then renamed to the final path, so a mid-write crash leaves the previous offset + * intact. + */ +public class FileSystemOffsetRetainer implements OffsetRetainer { + + private static final Logger LOG = LoggerFactory.getLogger(FileSystemOffsetRetainer.class); + private static final TypeReference> MAP_TYPE = new TypeReference<>() {}; + + private final String path; + + // ObjectMapper is thread-safe after configuration and does not need to be serialised. + private transient @Nullable ObjectMapper objectMapper; + + // Tracks the last successfully saved offset so repeated identical saves are skipped. + private transient @Nullable Map lastSavedOffset; + + private FileSystemOffsetRetainer(String path) { + this.path = path; + } + + /** Creates a new {@code FileSystemOffsetRetainer} that stores the offset at {@code path}. */ + public static FileSystemOffsetRetainer of(String path) { + return new FileSystemOffsetRetainer(path); + } + + private ObjectMapper mapper() { + if (objectMapper == null) { + objectMapper = new ObjectMapper(); + } + return objectMapper; + } + + /** + * Reads the offset JSON file and returns its contents, or {@code null} if the file does not yet + * exist (first run). Throws {@link RuntimeException} if the file exists but cannot be read, to + * prevent silently reprocessing data from the beginning. + */ + @Override + public @Nullable Map loadOffset() { + try { + ResourceId resourceId = FileSystems.matchNewResource(path, /* isDirectory= */ false); + try (ReadableByteChannel channel = FileSystems.open(resourceId); + InputStream stream = Channels.newInputStream(channel)) { + Map offset = mapper().readValue(stream, MAP_TYPE); + LOG.info("OffsetRetainer: loaded offset from {}: {}", path, offset); + return offset; + } + } catch (FileNotFoundException e) { + LOG.info("OffsetRetainer: no offset file found at {}; starting from the beginning.", path); + return null; + } catch (IOException e) { + throw new RuntimeException( + "OffsetRetainer: failed to read offset from " + + path + + ". " + + "Delete the file to restart from the beginning.", + e); + } + } + + /** + * Serialises {@code offset} to JSON and writes it atomically to the configured path. + * + *

If the offset is identical to the last successfully written one, the write is skipped to + * avoid unnecessary I/O on every checkpoint. + * + *

Otherwise the data is first written to a {@code .tmp} sibling file and then renamed to the + * final path, so a mid-write crash leaves the previous offset intact. + * + *

Errors are logged as warnings and swallowed so the pipeline continues. + */ + @Override + public void saveOffset(Map offset) { + if (offset.equals(lastSavedOffset)) { + LOG.debug("OffsetRetainer: offset unchanged, skipping write to {}", path); + return; + } + String tmpPath = path + ".tmp"; + try { + ResourceId tmpResourceId = FileSystems.matchNewResource(tmpPath, /* isDirectory= */ false); + try (WritableByteChannel channel = FileSystems.create(tmpResourceId, "application/json"); + OutputStream stream = Channels.newOutputStream(channel)) { + mapper().writeValue(stream, offset); + } + ResourceId finalResourceId = FileSystems.matchNewResource(path, /* isDirectory= */ false); + FileSystems.rename( + Collections.singletonList(tmpResourceId), Collections.singletonList(finalResourceId)); + lastSavedOffset = offset; + LOG.debug("OffsetRetainer: saved offset to {}: {}", path, offset); + } catch (IOException e) { + LOG.warn( + "OffsetRetainer: failed to save offset to {}." + + " The offset will be lost if the pipeline restarts.", + path, + e); + } + } +} diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java index fb4c2f21458f..d298ddd9cafb 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java @@ -122,7 +122,21 @@ public class KafkaSourceConsumerFn extends DoFn, T> { @GetInitialRestriction public OffsetHolder getInitialRestriction(@Element Map unused) throws IOException { - return new OffsetHolder(null, null, null, spec.getMaxNumberOfRecords(), spec.getMaxTimeToRun()); + Map initialOffset = null; + + // Retainer takes precedence: it reflects the most recently committed position. + OffsetRetainer retainer = spec.getOffsetRetainer(); + if (retainer != null) { + initialOffset = retainer.loadOffset(); + } + + // Fall back to the explicit one-time override when the retainer has no saved offset. + if (initialOffset == null) { + initialOffset = spec.getStartOffset(); + } + + return new OffsetHolder( + initialOffset, null, null, spec.getMaxNumberOfRecords(), spec.getMaxTimeToRun()); } @NewTracker @@ -284,6 +298,16 @@ public ProcessContinuation process( receiver.outputWithTimestamp(json, recordInstant); } task.commit(); + + // Persist the offset after every successful commit so the pipeline can resume + // from this position on restart. + OffsetRetainer retainer = spec.getOffsetRetainer(); + @SuppressWarnings("unchecked") + Map committedOffset = + (Map) tracker.currentRestriction().offset; + if (retainer != null && committedOffset != null) { + retainer.saveOffset(committedOffset); + } } } catch (Exception ex) { throw new RuntimeException("Error occurred when consuming changes from Database. ", ex); diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java new file mode 100644 index 000000000000..b1fe5a58bbe1 --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import java.io.Serializable; +import java.util.Map; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Strategy interface for persisting and restoring Debezium connector offsets across pipeline + * restarts. + * + *

When configured via {@link DebeziumIO.Read#withOffsetRetainer(OffsetRetainer)}, the pipeline + * behaves as follows: + * + *

    + *
  1. On startup, {@link #loadOffset()} is called once. If a non-null offset is returned, the + * Debezium connector resumes from that position; otherwise it starts from the beginning of + * the change stream. + *
  2. After each successful {@code task.commit()}, {@link #saveOffset(Map)} is called with the + * latest committed offset. + *
+ * + *

A ready-to-use filesystem-based implementation is provided by {@link + * FileSystemOffsetRetainer}, which supports any Beam-compatible filesystem (local, GCS, S3, etc.) + * + *

Implementations must be {@link Serializable} because they are embedded inside {@link + * DebeziumIO.Read}, which is a {@link org.apache.beam.sdk.transforms.PTransform} that gets + * serialized and shipped to workers. + */ +public interface OffsetRetainer extends Serializable { + + /** + * Returns the most recently saved offset, or {@code null} if no offset has been saved yet. + * + *

A {@code null} return causes the connector to start from the beginning of the change stream. + * Implementations should handle transient I/O errors gracefully and return {@code null} on + * failure rather than propagating an exception. + */ + @Nullable + Map loadOffset(); + + /** + * Persists the given offset so it can be recovered after a pipeline restart. + * + *

Called after each successful {@code task.commit()} with the latest committed offset. + * Implementations should swallow transient errors rather than throwing, so that a failed save + * does not terminate the pipeline. + * + * @param offset The current connector offset, as returned by {@link + * org.apache.kafka.connect.source.SourceRecord#sourceOffset()}. + */ + void saveOffset(Map offset); +} diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java index 88ecc4fdd906..869fa5d34a55 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.io.debezium; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -24,9 +27,13 @@ import io.debezium.connector.mysql.MySqlConnector; import io.debezium.connector.mysql.MySqlConnectorConfig; import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; import java.util.Map; import org.apache.beam.io.debezium.DebeziumIO.ConnectorConfiguration; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.kafka.common.config.ConfigValue; +import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -95,4 +102,149 @@ public void testSourceConnectorNullUsernameAndPassword() { IllegalArgumentException.class, () -> MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password)); } + + @Test + public void testReadWithStartOffsetStoresOffset() { + Map offset = ImmutableMap.of("file", "mysql-bin.000003", "pos", 156L); + DebeziumIO.Read read = + DebeziumIO.read() + .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION) + .withStartOffset(offset); + assertEquals(offset, read.getStartOffset()); + } + + @Test + public void testReadWithoutStartOffsetIsNull() { + DebeziumIO.Read read = + DebeziumIO.read().withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION); + assertNull(read.getStartOffset()); + } + + @Test + public void testReadWithNullStartOffsetThrows() { + assertThrows( + IllegalArgumentException.class, + () -> + DebeziumIO.read() + .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION) + .withStartOffset(null)); + } + + @Test + public void testGetInitialRestrictionUsesStartOffset() throws Exception { + Map offset = ImmutableMap.of("file", "mysql-bin.000003", "pos", 156L); + DebeziumIO.Read spec = + DebeziumIO.read() + .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION) + .withStartOffset(offset); + KafkaSourceConsumerFn fn = new KafkaSourceConsumerFn<>(MySqlConnector.class, spec); + KafkaSourceConsumerFn.OffsetHolder restriction = fn.getInitialRestriction(null); + assertEquals(offset, restriction.offset); + } + + @Test + public void testGetInitialRestrictionWithoutStartOffsetIsNull() throws Exception { + DebeziumIO.Read spec = + DebeziumIO.read().withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION); + KafkaSourceConsumerFn fn = new KafkaSourceConsumerFn<>(MySqlConnector.class, spec); + KafkaSourceConsumerFn.OffsetHolder restriction = fn.getInitialRestriction(null); + assertNull(restriction.offset); + } + + // ---- OffsetRetainer tests ----------------------------------------------- + + /** Minimal in-memory retainer used only in tests. */ + private static class InMemoryOffsetRetainer implements OffsetRetainer { + private final @Nullable Map loadResult; + private @Nullable Map lastSaved; + + InMemoryOffsetRetainer(@Nullable Map loadResult) { + this.loadResult = loadResult; + } + + @Override + public @Nullable Map loadOffset() { + return loadResult; + } + + @Override + public void saveOffset(Map offset) { + lastSaved = new HashMap<>(offset); + } + } + + @Test + public void testWithOffsetRetainerStoresRetainer() { + InMemoryOffsetRetainer retainer = new InMemoryOffsetRetainer(null); + DebeziumIO.Read read = + DebeziumIO.read() + .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION) + .withOffsetRetainer(retainer); + assertNotNull(read.getOffsetRetainer()); + } + + @Test + public void testWithNullOffsetRetainerThrows() { + assertThrows( + IllegalArgumentException.class, + () -> + DebeziumIO.read() + .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION) + .withOffsetRetainer(null)); + } + + @Test + public void testGetInitialRestrictionUsesRetainerOffset() throws Exception { + Map savedOffset = ImmutableMap.of("lsn", 28160840L); + DebeziumIO.Read spec = + DebeziumIO.read() + .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION) + .withOffsetRetainer(new InMemoryOffsetRetainer(savedOffset)); + KafkaSourceConsumerFn fn = new KafkaSourceConsumerFn<>(MySqlConnector.class, spec); + KafkaSourceConsumerFn.OffsetHolder restriction = fn.getInitialRestriction(null); + assertEquals(savedOffset, restriction.offset); + } + + @Test + public void testRetainerTakesPriorityOverWithStartOffset() throws Exception { + Map retainerOffset = ImmutableMap.of("lsn", 99L); + Map explicitOffset = ImmutableMap.of("lsn", 1L); + DebeziumIO.Read spec = + DebeziumIO.read() + .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION) + .withStartOffset(explicitOffset) + .withOffsetRetainer(new InMemoryOffsetRetainer(retainerOffset)); + KafkaSourceConsumerFn fn = new KafkaSourceConsumerFn<>(MySqlConnector.class, spec); + KafkaSourceConsumerFn.OffsetHolder restriction = fn.getInitialRestriction(null); + assertEquals(retainerOffset, restriction.offset); + } + + @Test + public void testRetainerFallsBackToWithStartOffsetWhenLoadReturnsNull() throws Exception { + Map explicitOffset = ImmutableMap.of("lsn", 1L); + DebeziumIO.Read spec = + DebeziumIO.read() + .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION) + .withStartOffset(explicitOffset) + // Retainer has no saved offset yet (returns null). + .withOffsetRetainer(new InMemoryOffsetRetainer(null)); + KafkaSourceConsumerFn fn = new KafkaSourceConsumerFn<>(MySqlConnector.class, spec); + KafkaSourceConsumerFn.OffsetHolder restriction = fn.getInitialRestriction(null); + assertEquals(explicitOffset, restriction.offset); + } + + @Test + public void testBuildExternalThrowsOnMalformedStartOffsetEntry() { + DebeziumTransformRegistrar.ReadBuilder.Configuration config = + new DebeziumTransformRegistrar.ReadBuilder.Configuration(); + config.setUsername("user"); + config.setPassword("pass"); + config.setHost("localhost"); + config.setPort("3306"); + config.setConnectorClass("MySQL"); + config.setStartOffset(Arrays.asList("lsn=100", "no-equals-sign")); + assertThrows( + IllegalArgumentException.class, + () -> new DebeziumTransformRegistrar.ReadBuilder().buildExternal(config)); + } } diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java new file mode 100644 index 000000000000..e3ac0fff5daf --- /dev/null +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link FileSystemOffsetRetainer}. */ +@RunWith(JUnit4.class) +public class FileSystemOffsetRetainerTest { + + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public void setUp() { + FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create()); + } + + @Test + public void testLoadOffsetReturnNullWhenFileIsMissing() { + String path = tmpFolder.getRoot().getAbsolutePath() + "/nonexistent.json"; + assertNull(FileSystemOffsetRetainer.of(path).loadOffset()); + } + + @Test + public void testLoadOffsetThrowsWhenFileIsUnreadable() throws Exception { + String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json"; + FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path); + retainer.saveOffset(ImmutableMap.of("lsn", "100")); + + // Corrupt the file so JSON parsing fails. + Files.newBufferedWriter(Paths.get(path), StandardCharsets.UTF_8).close(); // truncate to empty + assertThrows(RuntimeException.class, retainer::loadOffset); + } + + @Test + public void testSaveAndLoadOffsetRoundTrip() { + String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json"; + FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path); + + retainer.saveOffset(ImmutableMap.of("file", "binlog.000001", "pos", "156")); + + Map loaded = retainer.loadOffset(); + assertNotNull(loaded); + assertEquals("binlog.000001", loaded.get("file")); + assertEquals("156", loaded.get("pos")); + } + + @Test + public void testSaveOffsetSkipsWriteWhenOffsetUnchanged() throws Exception { + String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json"; + FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path); + Map offset = ImmutableMap.of("lsn", "100"); + + retainer.saveOffset(offset); + long modifiedAfterFirstSave = new File(path).lastModified(); + + // Second call with the same offset should not touch the file. + Thread.sleep(10); // ensure mtime would differ if a write occurred + retainer.saveOffset(offset); + assertEquals(modifiedAfterFirstSave, new File(path).lastModified()); + } + + @Test + public void testSaveOffsetLeavesNoTmpFile() { + String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json"; + FileSystemOffsetRetainer.of(path).saveOffset(ImmutableMap.of("lsn", "28160840")); + + assertTrue("Final offset file should exist", new File(path).exists()); + assertFalse("Temp file should not remain after rename", new File(path + ".tmp").exists()); + } + + @Test + public void testSerializedRetainerCanLoadAfterDeserialization() throws Exception { + String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json"; + FileSystemOffsetRetainer original = FileSystemOffsetRetainer.of(path); + original.saveOffset(ImmutableMap.of("lsn", "12345")); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try (ObjectOutputStream oos = new ObjectOutputStream(bos)) { + oos.writeObject(original); + } + FileSystemOffsetRetainer deserialized; + try (ObjectInputStream ois = + new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) { + deserialized = (FileSystemOffsetRetainer) ois.readObject(); + } + + Map loaded = deserialized.loadOffset(); + assertNotNull(loaded); + assertEquals("12345", loaded.get("lsn")); + } +} diff --git a/sdks/python/apache_beam/io/debezium.py b/sdks/python/apache_beam/io/debezium.py index 26516fa4e4b7..d1ca02aa68d1 100644 --- a/sdks/python/apache_beam/io/debezium.py +++ b/sdks/python/apache_beam/io/debezium.py @@ -109,7 +109,9 @@ class DriverClassName(Enum): 'ReadFromDebeziumSchema', [('connector_class', str), ('username', str), ('password', str), ('host', str), ('port', str), ('max_number_of_records', Optional[int]), - ('connection_properties', List[str])]) + ('connection_properties', List[str]), + ('start_offset', Optional[List[str]]), + ('offset_storage_path', Optional[str])]) class ReadFromDebezium(PTransform): @@ -131,6 +133,8 @@ def __init__( port, max_number_of_records=None, connection_properties=None, + start_offset=None, + offset_storage_path=None, expansion_service=None): """ Initializes a read operation from Debezium. @@ -144,8 +148,38 @@ def __init__( to be fetched before stop. :param connection_properties: properties of the debezium connection passed as string - with with format - [propertyName=property;]* + with format [propertyName=property;]* + :param start_offset: starting offset to resume the connector from + a previously seen position. Provided as a list + of "key=value" strings, where numeric values are + encoded as their decimal string representation. + Example for PostgreSQL:: + + start_offset=["lsn=28160840"] + + Example for MySQL:: + + start_offset=["file=binlog.000001", "pos=156"] + + Obtain the offset from the JSON output of a + previous pipeline run (the "metadata" field + contains connector-specific position info) or + via ``SourceRecord.sourceOffset()`` in a custom + Java SourceRecordMapper. + :param offset_storage_path: path to a file where the connector offset + is automatically saved after each checkpoint + and loaded on pipeline startup, allowing the + pipeline to resume from where it left off. + Supports any filesystem available to the + Beam runner (local, GCS, S3, etc.). + Example:: + + offset_storage_path=( + "gs://my-bucket/debezium/offset.json" + ) + + When set, takes precedence over + ``start_offset``. :param expansion_service: The address (host:port) of the ExpansionService. """ @@ -157,7 +191,9 @@ def __init__( host=host, port=str(port), max_number_of_records=max_number_of_records, - connection_properties=connection_properties) + connection_properties=connection_properties, + start_offset=start_offset, + offset_storage_path=offset_storage_path) self.expansion_service = expansion_service or default_io_expansion_service() def expand(self, pbegin):