diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java index d92075fb06..39c0f75c9f 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java @@ -247,4 +247,16 @@ public interface SpannerChangeStreamsToBigQueryOptions Boolean getDisableDlqRetries(); void setDisableDlqRetries(Boolean value); + + @TemplateParameter.Text( + order = 20, + optional = true, + groupName = "Source", + description = + "Semicolon-separated list of Spanner Change Stream TVF names to query and union.", + helpText = "Semicolon-separated list of Spanner Change Stream TVF names to query and union.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java index 11adfdd2bf..efb24e70f7 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java @@ -186,4 +186,16 @@ public interface SpannerChangeStreamsToGcsOptions RpcPriority getRpcPriority(); void setRpcPriority(RpcPriority rpcPriority); + + @TemplateParameter.Text( + order = 15, + optional = true, + groupName = "Source", + description = + "Semicolon-separated list of Spanner Change Stream TVF names to query and union.", + helpText = "Semicolon-separated list of Spanner Change Stream TVF names to query and union.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java index a28ab475ad..a4c078dbe1 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java @@ -250,4 +250,16 @@ public interface SpannerChangeStreamsToPubSubOptions extends DataflowPipelineOpt Boolean getUseSpannerEmulatorHost(); void setUseSpannerEmulatorHost(Boolean value); + + @TemplateParameter.Text( + order = 20, + optional = true, + groupName = "Source", + description = + "Semicolon-separated list of Spanner Change Stream TVF names to query and union.", + helpText = "Semicolon-separated list of Spanner Change Stream TVF names to query and union.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java index 99e8ae23f9..891bb0ea1e 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java @@ -24,7 +24,9 @@ import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams; import com.google.cloud.teleport.v2.utils.DurationUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -137,6 +139,15 @@ public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) { ? null : options.getSpannerMetadataTableName(); + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + List tvfNameList = null; + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + tvfNameList = + Arrays.stream(tvfNameListString.split(";")) + .filter(name -> !name.trim().isEmpty()) + .collect(Collectors.toList()); + } + final RpcPriority rpcPriority = options.getRpcPriority(); SpannerConfig spannerConfig = SpannerConfig.create() @@ -162,7 +173,8 @@ public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) { .withInclusiveStartAt(startTimestamp) .withInclusiveEndAt(endTimestamp) .withRpcPriority(rpcPriority) - .withMetadataTable(metadataTableName)) + .withMetadataTable(metadataTableName) + .withTvfNameList(tvfNameList)) .apply( "Creating " + options.getWindowDuration() + " Window", Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java index 412f75851d..84624af019 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java @@ -24,7 +24,9 @@ import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreamsToPubSub; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -155,6 +157,15 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) { ? null : options.getSpannerMetadataTableName(); + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + List tvfNameList = null; + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + tvfNameList = + Arrays.stream(tvfNameListString.split(";")) + .filter(name -> !name.trim().isEmpty()) + .collect(Collectors.toList()); + } + final RpcPriority rpcPriority = options.getRpcPriority(); SpannerConfig spannerConfig = SpannerConfig.create() @@ -187,7 +198,8 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) { .withInclusiveStartAt(startTimestamp) .withInclusiveEndAt(endTimestamp) .withRpcPriority(rpcPriority) - .withMetadataTable(metadataTableName)) + .withMetadataTable(metadataTableName) + .withTvfNameList(tvfNameList)) .apply( "Convert each record to a PubsubMessage", FileFormatFactorySpannerChangeStreamsToPubSub.newBuilder() diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java index 222aef3d52..5cfb8dae53 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -324,6 +325,17 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName); } + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + List tvfNameList = + Arrays.stream(tvfNameListString.split(";")) + .filter(name -> !name.trim().isEmpty()) + .collect(Collectors.toList()); + if (!tvfNameList.isEmpty()) { + readChangeStream = readChangeStream.withTvfNameList(tvfNameList); + } + } + PCollection dataChangeRecord = pipeline .apply("Read from Spanner Change Streams", readChangeStream)