From 2d45339dbe9d1766aaff114c97a181ef7b86a150 Mon Sep 17 00:00:00 2001 From: tianz Date: Tue, 7 Apr 2026 19:56:05 +0000 Subject: [PATCH 1/3] use-local-beam --- .../SpannerChangeStreamsToBigQueryOptions.java | 11 +++++++++++ .../v2/options/SpannerChangeStreamsToGcsOptions.java | 11 +++++++++++ .../options/SpannerChangeStreamsToPubSubOptions.java | 11 +++++++++++ .../v2/templates/SpannerChangeStreamsToGcs.java | 10 +++++++++- .../v2/templates/SpannerChangeStreamsToPubSub.java | 10 +++++++++- .../SpannerChangeStreamsToBigQuery.java | 9 +++++++++ 6 files changed, 60 insertions(+), 2 deletions(-) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java index d92075fb06..df6d61ee88 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java @@ -247,4 +247,15 @@ public interface SpannerChangeStreamsToBigQueryOptions Boolean getDisableDlqRetries(); void setDisableDlqRetries(Boolean value); + + @TemplateParameter.Text( + order = 20, + optional = true, + groupName = "Source", + description = "Colon separated list of change streams TVF names to query and union", + helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java index 11adfdd2bf..ef6cbdf93a 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java @@ -186,4 +186,15 @@ public interface SpannerChangeStreamsToGcsOptions RpcPriority getRpcPriority(); void setRpcPriority(RpcPriority rpcPriority); + + @TemplateParameter.Text( + order = 15, + optional = true, + groupName = "Source", + description = "Colon separated list of change streams TVF names to query and union", + helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java index a28ab475ad..210c6ae97c 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java @@ -250,4 +250,15 @@ public interface SpannerChangeStreamsToPubSubOptions extends DataflowPipelineOpt Boolean getUseSpannerEmulatorHost(); void setUseSpannerEmulatorHost(Boolean value); + + @TemplateParameter.Text( + order = 20, + optional = true, + groupName = "Source", + description = "Colon separated list of change streams TVF names to query and union", + helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + @Default.String("") + String getSpannerChangeStreamTvfNameList(); + + void setSpannerChangeStreamTvfNameList(String value); } diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java index 99e8ae23f9..24982c64d6 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java @@ -24,6 +24,7 @@ import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreams; import com.google.cloud.teleport.v2.utils.DurationUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -137,6 +138,12 @@ public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) { ? null : options.getSpannerMetadataTableName(); + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + List tvfNameList = null; + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + tvfNameList = Arrays.asList(tvfNameListString.split(":")); + } + final RpcPriority rpcPriority = options.getRpcPriority(); SpannerConfig spannerConfig = SpannerConfig.create() @@ -162,7 +169,8 @@ public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) { .withInclusiveStartAt(startTimestamp) .withInclusiveEndAt(endTimestamp) .withRpcPriority(rpcPriority) - .withMetadataTable(metadataTableName)) + .withMetadataTable(metadataTableName) + .withTvfNameList(tvfNameList)) .apply( "Creating " + options.getWindowDuration() + " Window", Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java index 412f75851d..379469216c 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java @@ -24,6 +24,7 @@ import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreamsToPubSub; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -155,6 +156,12 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) { ? null : options.getSpannerMetadataTableName(); + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + List tvfNameList = null; + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + tvfNameList = Arrays.asList(tvfNameListString.split(":")); + } + final RpcPriority rpcPriority = options.getRpcPriority(); SpannerConfig spannerConfig = SpannerConfig.create() @@ -187,7 +194,8 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) { .withInclusiveStartAt(startTimestamp) .withInclusiveEndAt(endTimestamp) .withRpcPriority(rpcPriority) - .withMetadataTable(metadataTableName)) + .withMetadataTable(metadataTableName) + .withTvfNameList(tvfNameList)) .apply( "Convert each record to a PubsubMessage", FileFormatFactorySpannerChangeStreamsToPubSub.newBuilder() diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java index 222aef3d52..7f225df261 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -324,6 +325,14 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName); } + String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); + if (tvfNameListString != null && !tvfNameListString.isEmpty()) { + List tvfNameList = Arrays.asList(tvfNameListString.split(":")); + if (tvfNameList != null && !tvfNameList.isEmpty()) { + readChangeStream = readChangeStream.withTvfNameList(tvfNameList); + } + } + PCollection dataChangeRecord = pipeline .apply("Read from Spanner Change Streams", readChangeStream) From 92a50ac1e0e90ff22e610548314c0cacc5efc3ae Mon Sep 17 00:00:00 2001 From: tianz101 <169101161+tianz101@users.noreply.github.com> Date: Wed, 20 May 2026 15:14:25 -0700 Subject: [PATCH 2/3] Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../SpannerChangeStreamsToBigQuery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java index 7f225df261..4a68f9ccd4 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java @@ -328,7 +328,7 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); if (tvfNameListString != null && !tvfNameListString.isEmpty()) { List tvfNameList = Arrays.asList(tvfNameListString.split(":")); - if (tvfNameList != null && !tvfNameList.isEmpty()) { + if (!tvfNameList.isEmpty()) { readChangeStream = readChangeStream.withTvfNameList(tvfNameList); } } From c01c44014be3ac151e6d8b749999f8d317a9f41e Mon Sep 17 00:00:00 2001 From: tianz Date: Tue, 7 Apr 2026 19:56:05 +0000 Subject: [PATCH 3/3] use-local-beam --- .../v2/options/SpannerChangeStreamsToBigQueryOptions.java | 7 ++++--- .../v2/options/SpannerChangeStreamsToGcsOptions.java | 5 +++-- .../v2/options/SpannerChangeStreamsToPubSubOptions.java | 5 +++-- .../teleport/v2/templates/SpannerChangeStreamsToGcs.java | 6 +++++- .../v2/templates/SpannerChangeStreamsToPubSub.java | 6 +++++- .../SpannerChangeStreamsToBigQuery.java | 5 ++++- 6 files changed, 24 insertions(+), 10 deletions(-) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java index df6d61ee88..39c0f75c9f 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToBigQueryOptions.java @@ -248,12 +248,13 @@ public interface SpannerChangeStreamsToBigQueryOptions void setDisableDlqRetries(Boolean value); - @TemplateParameter.Text( + @TemplateParameter.Text( order = 20, optional = true, groupName = "Source", - description = "Colon separated list of change streams TVF names to query and union", - helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + description = + "Semicolon-separated list of Spanner Change Stream TVF names to query and union.", + helpText = "Semicolon-separated list of Spanner Change Stream TVF names to query and union.") @Default.String("") String getSpannerChangeStreamTvfNameList(); diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java index ef6cbdf93a..efb24e70f7 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToGcsOptions.java @@ -191,8 +191,9 @@ public interface SpannerChangeStreamsToGcsOptions order = 15, optional = true, groupName = "Source", - description = "Colon separated list of change streams TVF names to query and union", - helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + description = + "Semicolon-separated list of Spanner Change Stream TVF names to query and union.", + helpText = "Semicolon-separated list of Spanner Change Stream TVF names to query and union.") @Default.String("") String getSpannerChangeStreamTvfNameList(); diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java index 210c6ae97c..a4c078dbe1 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/SpannerChangeStreamsToPubSubOptions.java @@ -255,8 +255,9 @@ public interface SpannerChangeStreamsToPubSubOptions extends DataflowPipelineOpt order = 20, optional = true, groupName = "Source", - description = "Colon separated list of change streams TVF names to query and union", - helpText = "Colon separated list of change streams TVF names to query and union. Note that using colon instead of comma because gcloud does not allow comma in the parameter value.") + description = + "Semicolon-separated list of Spanner Change Stream TVF names to query and union.", + helpText = "Semicolon-separated list of Spanner Change Stream TVF names to query and union.") @Default.String("") String getSpannerChangeStreamTvfNameList(); diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java index 24982c64d6..891bb0ea1e 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToGcs.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -141,7 +142,10 @@ public static PipelineResult run(SpannerChangeStreamsToGcsOptions options) { String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); List tvfNameList = null; if (tvfNameListString != null && !tvfNameListString.isEmpty()) { - tvfNameList = Arrays.asList(tvfNameListString.split(":")); + tvfNameList = + Arrays.stream(tvfNameListString.split(";")) + .filter(name -> !name.trim().isEmpty()) + .collect(Collectors.toList()); } final RpcPriority rpcPriority = options.getRpcPriority(); diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java index 379469216c..84624af019 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamsToPubSub.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; @@ -159,7 +160,10 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) { String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); List tvfNameList = null; if (tvfNameListString != null && !tvfNameListString.isEmpty()) { - tvfNameList = Arrays.asList(tvfNameListString.split(":")); + tvfNameList = + Arrays.stream(tvfNameListString.split(";")) + .filter(name -> !name.trim().isEmpty()) + .collect(Collectors.toList()); } final RpcPriority rpcPriority = options.getRpcPriority(); diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java index 4a68f9ccd4..5cfb8dae53 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/SpannerChangeStreamsToBigQuery.java @@ -327,7 +327,10 @@ public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) String tvfNameListString = options.getSpannerChangeStreamTvfNameList(); if (tvfNameListString != null && !tvfNameListString.isEmpty()) { - List tvfNameList = Arrays.asList(tvfNameListString.split(":")); + List tvfNameList = + Arrays.stream(tvfNameListString.split(";")) + .filter(name -> !name.trim().isEmpty()) + .collect(Collectors.toList()); if (!tvfNameList.isEmpty()) { readChangeStream = readChangeStream.withTvfNameList(tvfNameList); }