From 873083862a2396e84599869d33ea89f8e9917f20 Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Wed, 25 Sep 2024 11:22:05 -0700 Subject: [PATCH 01/10] pubsub to text with attributes --- .../templates/pubsubtotext/PubsubToText.java | 57 +++++++++++++++++-- 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java index 0b66afd011..19a42bce80 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java @@ -24,19 +24,25 @@ import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options; import com.google.cloud.teleport.v2.utils.DurationUtils; import com.google.common.base.Strings; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation.Required; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; /** * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into @@ -65,6 +71,7 @@ }, streaming = true, supportsAtLeastOnce = true) + public class PubsubToText { /** @@ -72,6 +79,7 @@ public class PubsubToText { * *

Inherits standard configuration options. */ + public interface Options extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions { @@ -151,6 +159,18 @@ public interface Options String getOutputFilenameSuffix(); void setOutputFilenameSuffix(String value); + + @TemplateParameter.Text( + order = 7, + groupName = "Target", + optional = true, + description = "Include attributes in pull", + helpText = "If specified, pull the message and the attributes from the topic or subscription", + example = "True,False" + ) + String getAttributeFlag(); + + void setAttributeFlag(String value); } /** @@ -176,6 +196,8 @@ public static void main(String[] args) { */ public static PipelineResult run(Options options) { boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription()); + boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag()); + boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic()); if (useInputSubscription == useInputTopic) { throw new IllegalArgumentException( @@ -187,17 +209,40 @@ public static PipelineResult run(Options options) { PCollection messages = null; - /* + /* * Steps: * 1) Read string messages from PubSub * 2) Window the messages into minute intervals specified by the executor. * 3) Output the windowed files to GCS */ + if (useInputSubscription) { - messages = - pipeline.apply( - "Read PubSub Events", - PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + if (pullAttributes){ + PCollection messagesAttr = pipeline.apply( + "Read PubSub Events", + PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())); + messages = messagesAttr.apply( + "ExtractPayloadAndAttributesToString", MapElements.into(TypeDescriptor.of(String.class)) + .via((PubsubMessage message) -> { + // Get the message payload as a string + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + // Get the message attributes and convert them to a JSON-like string format + Map attributes = message.getAttributeMap(); + String attributesString = attributes.entrySet() + .stream() + .map(entry -> "\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"") + .collect(Collectors.joining(", ", "{", "}")); + + // Return the concatenated string with both the payload and attributes + return "{ \"payload\": \"" + payload + "\", \"attributes\": " + attributesString + " }"; + }) + ); + }else{ + messages = + pipeline.apply( + "Read PubSub Events", + PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + } } else { messages = pipeline.apply( @@ -245,4 +290,4 @@ public static PipelineResult run(Options options) { private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) { return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation; } -} +} \ No newline at end of file From be44aaf67af38ec5876260ce189332edc56b9826 Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Thu, 3 Oct 2024 13:35:59 -0700 Subject: [PATCH 02/10] wip --- .../cloudbuild.java.googlecloud.pr.yaml | 32 +++++++++++++++++++ .../templates/pubsubtotext/PubsubToText.java | 2 +- 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 cloudbuild/cloudbuild.java.googlecloud.pr.yaml diff --git a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml new file mode 100644 index 0000000000..b568341873 --- /dev/null +++ b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml @@ -0,0 +1,32 @@ +steps: + + # --------- + # Update the flex template + # --------- + + - id: "update flex-template" + waitFor: ['-'] + name: "gcr.io/cloud-builders/gcloud" + args: [ + "mvn clean package -PtemplatesRun" + "-DskipTests" + "-DprojectId=$PROJECT" + "-DbucketName=${_TEMPLATE_BUCKET}" + "-Dregion=us-central1" + "-DjobName=cloud-pubsub-to-gcs-text-flex-job" + "-DtemplateName=Cloud_PubSub_to_GCS_Text_Flex_w_attrs" + "-Dparameters=inputTopic={$_INPUT_TOPIC},inputSubscription={$_INPUT_SUBSCRIPTION},outputDirectory=${_OUTPUT_DIRECTORY},userTempLocation=${_USER_TEMP_LOCATION},outputFilenamePrefix=${_OUTPUT_FILENAME_PREFIX}" + "-f v2/googlecloud-to-googlecloud" + ] + +timeout: 1200s + +substitutions: + _TARGET_PROJECT_ID: onx-dw-raw + _TEMPLATE_BUCKET: gs://onx-flex-templates + _TEMPLATE_PATH: /templates/dev/${_PR_NUMBER}/onx_dw_beam_pipelines.json + _TEMPLATE_IMAGE: us-central1-docker.pkg.dev/${_TARGET_PROJECT_ID}/onx-dw-beam-pipelines/dev/flex:${_PR_NUMBER} + _SDK_IMAGE: us-central1-docker.pkg.dev/${_TARGET_PROJECT_ID}/onx-dw-beam-pipelines/dev/sdk:${_PR_NUMBER} + _CORE_IMAGE: us-central1-docker.pkg.dev/${_TARGET_PROJECT_ID}/onx-dw-beam-pipelines/dev/core:${_PR_NUMBER} +options: + dynamic_substitutions: true diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java index 19a42bce80..27ebfae287 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java @@ -164,7 +164,7 @@ public interface Options order = 7, groupName = "Target", optional = true, - description = "Include attributes in pull", + description = "Optionally include attributes in pubsub pull", helpText = "If specified, pull the message and the attributes from the topic or subscription", example = "True,False" ) From 7b21288bd9b079ce95559b0b0ff68e89f9eb163c Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Thu, 3 Oct 2024 15:00:07 -0700 Subject: [PATCH 03/10] add cloudbuild for template --- .../cloudbuild.java.googlecloud.pr.yaml | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml index b568341873..3778d232f7 100644 --- a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml +++ b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml @@ -10,23 +10,37 @@ steps: args: [ "mvn clean package -PtemplatesRun" "-DskipTests" - "-DprojectId=$PROJECT" + "-DprojectId=${_TARGET_PROJECT_ID}" "-DbucketName=${_TEMPLATE_BUCKET}" "-Dregion=us-central1" "-DjobName=cloud-pubsub-to-gcs-text-flex-job" "-DtemplateName=Cloud_PubSub_to_GCS_Text_Flex_w_attrs" - "-Dparameters=inputTopic={$_INPUT_TOPIC},inputSubscription={$_INPUT_SUBSCRIPTION},outputDirectory=${_OUTPUT_DIRECTORY},userTempLocation=${_USER_TEMP_LOCATION},outputFilenamePrefix=${_OUTPUT_FILENAME_PREFIX}" + "-Dparameters=inputTopic={$_INPUT_TOPIC},inputSubscription={$_INPUT_SUBSCRIPTION},outputDirectory=${_OUTPUT_DIRECTORY},userTempLocation=${_USER_TEMP_LOCATION},outputFilenamePrefix=${_OUTPUT_FILENAME_PREFIX},outputShardTemplate=${_OUTPUT_SHARD_TEMPLATE},numShards=${_NUM_SHARDS},windowDuration=${_WINDOW_DURATION},yearPattern=${_YEAR_PATTERN},monthPattern=${_MONTH_PATTERN},dayPattern=${_DAY_PATTERN},hourPattern=${_HOUR_PATTERN},minutePattern=${_MINUTE_PATTERN}" "-f v2/googlecloud-to-googlecloud" ] timeout: 1200s substitutions: + ### Required + _OUTPUT_DIRECTORY: gs://analytics_events_warehouse/errors_w_attrs + + ### Optional + _INPUT_TOPIC: dataflow-failure + _INPUT_SUBSCRIPTION: failed_dataflow_transformation_develop + _USER_TEMP_LOCATION: gs://dataflow-staging-us-central1-739390938599/tmp + _OUTPUT_FILENAME_PREFIX: error + _OUTPUT_FILENAME_SUFFIX: "" + _OUTPUT_SHARD_TEMPLATE: W-P-SS-of-NN + _NUM_SHARDS: 0 + _WINDOW_DURATION: 5m + _YEAR_PATTERN: YYYY + _MONTH_PATTERN: MM + _DAY_PATTERN: dd + _HOUR_PATTERN: HH + _MINUTE_PATTERN: mm + _TARGET_PROJECT_ID: onx-dw-raw _TEMPLATE_BUCKET: gs://onx-flex-templates - _TEMPLATE_PATH: /templates/dev/${_PR_NUMBER}/onx_dw_beam_pipelines.json - _TEMPLATE_IMAGE: us-central1-docker.pkg.dev/${_TARGET_PROJECT_ID}/onx-dw-beam-pipelines/dev/flex:${_PR_NUMBER} - _SDK_IMAGE: us-central1-docker.pkg.dev/${_TARGET_PROJECT_ID}/onx-dw-beam-pipelines/dev/sdk:${_PR_NUMBER} - _CORE_IMAGE: us-central1-docker.pkg.dev/${_TARGET_PROJECT_ID}/onx-dw-beam-pipelines/dev/core:${_PR_NUMBER} options: dynamic_substitutions: true From b7364d915745ed8c38e78840e26b04aa9390e7bb Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Mon, 7 Oct 2024 13:46:30 -0700 Subject: [PATCH 04/10] wip --- cloudbuild/cloudbuild.java.googlecloud.pr.yaml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml index 3778d232f7..de8b925771 100644 --- a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml +++ b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml @@ -8,14 +8,14 @@ steps: waitFor: ['-'] name: "gcr.io/cloud-builders/gcloud" args: [ - "mvn clean package -PtemplatesRun" - "-DskipTests" - "-DprojectId=${_TARGET_PROJECT_ID}" - "-DbucketName=${_TEMPLATE_BUCKET}" - "-Dregion=us-central1" - "-DjobName=cloud-pubsub-to-gcs-text-flex-job" - "-DtemplateName=Cloud_PubSub_to_GCS_Text_Flex_w_attrs" - "-Dparameters=inputTopic={$_INPUT_TOPIC},inputSubscription={$_INPUT_SUBSCRIPTION},outputDirectory=${_OUTPUT_DIRECTORY},userTempLocation=${_USER_TEMP_LOCATION},outputFilenamePrefix=${_OUTPUT_FILENAME_PREFIX},outputShardTemplate=${_OUTPUT_SHARD_TEMPLATE},numShards=${_NUM_SHARDS},windowDuration=${_WINDOW_DURATION},yearPattern=${_YEAR_PATTERN},monthPattern=${_MONTH_PATTERN},dayPattern=${_DAY_PATTERN},hourPattern=${_HOUR_PATTERN},minutePattern=${_MINUTE_PATTERN}" + "mvn clean package -PtemplatesRun", + "-DskipTests", + "-DprojectId=${_TARGET_PROJECT_ID}", + "-DbucketName=${_TEMPLATE_BUCKET}", + "-Dregion=us-central1", + "-DjobName=cloud-pubsub-to-gcs-text-flex-job", + "-DtemplateName=Cloud_PubSub_to_GCS_Text_Flex_w_attrs", + "-Dparameters=inputTopic={$_INPUT_TOPIC},inputSubscription={$_INPUT_SUBSCRIPTION},outputDirectory=${_OUTPUT_DIRECTORY},userTempLocation=${_USER_TEMP_LOCATION},outputFilenamePrefix=${_OUTPUT_FILENAME_PREFIX},outputShardTemplate=${_OUTPUT_SHARD_TEMPLATE},numShards=${_NUM_SHARDS},windowDuration=${_WINDOW_DURATION},yearPattern=${_YEAR_PATTERN},monthPattern=${_MONTH_PATTERN},dayPattern=${_DAY_PATTERN},hourPattern=${_HOUR_PATTERN},minutePattern=${_MINUTE_PATTERN}", "-f v2/googlecloud-to-googlecloud" ] From 22541f7c461a11f42ef823cf66fea9a91c8f911b Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Mon, 7 Oct 2024 13:49:15 -0700 Subject: [PATCH 05/10] formatting --- .../templates/pubsubtotext/PubsubToText.java | 399 +++++++++--------- 1 file changed, 200 insertions(+), 199 deletions(-) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java index 27ebfae287..3c31ed7f58 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java @@ -57,237 +57,238 @@ category = TemplateCategory.STREAMING, displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage", description = - "The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records " - + "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.", + "The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records " + + "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.", optionsClass = Options.class, flexContainerName = "pubsub-to-text", documentation = - "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text", + "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text", contactInformation = "https://cloud.google.com/support", requirements = { - "The Pub/Sub topic or subscription must exist prior to execution.", - "The messages published to the topic must be in text format.", - "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file." + "The Pub/Sub topic or subscription must exist prior to execution.", + "The messages published to the topic must be in text format.", + "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file." }, streaming = true, supportsAtLeastOnce = true) public class PubsubToText { - /** - * Options supported by the pipeline. - * - *

Inherits standard configuration options. - */ - - public interface Options - extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions { - - @TemplateParameter.PubsubTopic( - order = 1, - groupName = "Source", - optional = true, - description = "Pub/Sub input topic", - helpText = - "The Pub/Sub topic to read the input from. The topic name should be in the format " - + "`projects//topics/`. If this parameter is provided " - + "don't use `inputSubscription`.", - example = "projects/your-project-id/topics/your-topic-name") - String getInputTopic(); - - void setInputTopic(String value); - - @TemplateParameter.PubsubSubscription( - order = 2, - groupName = "Source", - optional = true, - description = "Pub/Sub input subscription", - helpText = - "The Pub/Sub subscription to read the input from. The subscription name uses the format " - + "`projects//subscription/`. If this parameter is " - + "provided, don't use `inputTopic`.", - example = "projects/your-project-id/subscriptions/your-subscription-name") - String getInputSubscription(); - - void setInputSubscription(String value); - - @TemplateParameter.GcsWriteFolder( - order = 3, - groupName = "Target", - description = "Output file directory in Cloud Storage", - helpText = - "The path and filename prefix to write write output files to. " - + "This value must end in a slash.", - example = "gs://your-bucket/your-path") - @Required - String getOutputDirectory(); - - void setOutputDirectory(String value); - - @TemplateParameter.GcsWriteFolder( - order = 4, - optional = true, - description = "User provided temp location", - helpText = + /** + * Options supported by the pipeline. + * + *

Inherits standard configuration options. + */ + + public interface Options + extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions { + + @TemplateParameter.PubsubTopic( + order = 1, + groupName = "Source", + optional = true, + description = "Pub/Sub input topic", + helpText = + "The Pub/Sub topic to read the input from. The topic name should be in the format " + + "`projects//topics/`. If this parameter is provided " + + "don't use `inputSubscription`.", + example = "projects/your-project-id/topics/your-topic-name") + String getInputTopic(); + + void setInputTopic(String value); + + @TemplateParameter.PubsubSubscription( + order = 2, + groupName = "Source", + optional = true, + description = "Pub/Sub input subscription", + helpText = + "The Pub/Sub subscription to read the input from. The subscription name uses the format " + + "`projects//subscription/`. If this parameter is " + + "provided, don't use `inputTopic`.", + example = "projects/your-project-id/subscriptions/your-subscription-name") + String getInputSubscription(); + + void setInputSubscription(String value); + + @TemplateParameter.GcsWriteFolder( + order = 3, + groupName = "Target", + description = "Output file directory in Cloud Storage", + helpText = + "The path and filename prefix to write write output files to. " + + "This value must end in a slash.", + example = "gs://your-bucket/your-path") + @Required + String getOutputDirectory(); + + void setOutputDirectory(String value); + + @TemplateParameter.GcsWriteFolder( + order = 4, + optional = true, + description = "User provided temp location", + helpText = "The user provided directory to output temporary files to. Must end with a slash.") - String getUserTempLocation(); - - void setUserTempLocation(String value); - - @TemplateParameter.Text( - order = 5, - groupName = "Target", - optional = true, - description = "Output filename prefix of the files to write", - helpText = "The prefix to place on each windowed file.", - example = "output-") - @Default.String("output") - @Required - String getOutputFilenamePrefix(); - - void setOutputFilenamePrefix(String value); - - @TemplateParameter.Text( - order = 6, - groupName = "Target", - optional = true, - description = "Output filename suffix of the files to write", - helpText = + String getUserTempLocation(); + + void setUserTempLocation(String value); + + @TemplateParameter.Text( + order = 5, + groupName = "Target", + optional = true, + description = "Output filename prefix of the files to write", + helpText = "The prefix to place on each windowed file.", + example = "output-") + @Default.String("output") + @Required + String getOutputFilenamePrefix(); + + void setOutputFilenamePrefix(String value); + + @TemplateParameter.Text( + order = 6, + groupName = "Target", + optional = true, + description = "Output filename suffix of the files to write", + helpText = "The suffix to place on each windowed file, typically a file extension such as `.txt` or `.csv`.", - example = ".txt") - @Default.String("") - String getOutputFilenameSuffix(); - - void setOutputFilenameSuffix(String value); - - @TemplateParameter.Text( - order = 7, - groupName = "Target", - optional = true, - description = "Optionally include attributes in pubsub pull", - helpText = "If specified, pull the message and the attributes from the topic or subscription", - example = "True,False" - ) - String getAttributeFlag(); - - void setAttributeFlag(String value); - } - - /** - * Main entry point for executing the pipeline. - * - * @param args The command-line arguments to the pipeline. - */ - public static void main(String[] args) { - UncaughtExceptionLogger.register(); - - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - - options.setStreaming(true); - - run(options); - } - - /** - * Runs the pipeline with the supplied options. - * - * @param options The execution parameters to the pipeline. - * @return The result of the pipeline execution. - */ - public static PipelineResult run(Options options) { - boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription()); - boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag()); - - boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic()); - if (useInputSubscription == useInputTopic) { - throw new IllegalArgumentException( - "Either input topic or input subscription must be provided, but not both."); + example = ".txt") + @Default.String("") + String getOutputFilenameSuffix(); + + void setOutputFilenameSuffix(String value); + + @TemplateParameter.Text( + order = 7, + groupName = "Target", + optional = true, + description = "Optionally include attributes in pubsub pull", + helpText = "If specified, pull the message and the attributes from the topic or subscription", + example = "True,False" + ) + String getAttributeFlag(); + + void setAttributeFlag(String value); } - // Create the pipeline - Pipeline pipeline = Pipeline.create(options); + /** + * Main entry point for executing the pipeline. + * + * @param args The command-line arguments to the pipeline. + */ + public static void main(String[] args) { + UncaughtExceptionLogger.register(); - PCollection messages = null; + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - /* - * Steps: - * 1) Read string messages from PubSub - * 2) Window the messages into minute intervals specified by the executor. - * 3) Output the windowed files to GCS + options.setStreaming(true); + + run(options); + } + + /** + * Runs the pipeline with the supplied options. + * + * @param options The execution parameters to the pipeline. + * @return The result of the pipeline execution. */ + public static PipelineResult run(Options options) { + boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription()); + boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag()); + + boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic()); + if (useInputSubscription == useInputTopic) { + throw new IllegalArgumentException( + "Either input topic or input subscription must be provided, but not both."); + } + + // Create the pipeline + Pipeline pipeline = Pipeline.create(options); + + PCollection < String > messages = null; - if (useInputSubscription) { - if (pullAttributes){ - PCollection messagesAttr = pipeline.apply( - "Read PubSub Events", - PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())); - messages = messagesAttr.apply( - "ExtractPayloadAndAttributesToString", MapElements.into(TypeDescriptor.of(String.class)) - .via((PubsubMessage message) -> { - // Get the message payload as a string - String payload = new String(message.getPayload(), StandardCharsets.UTF_8); - // Get the message attributes and convert them to a JSON-like string format - Map attributes = message.getAttributeMap(); - String attributesString = attributes.entrySet() + /* + * Steps: + * 1) Read string messages from PubSub + * 2) Window the messages into minute intervals specified by the executor. + * 3) Output the windowed files to GCS + */ + + if (useInputSubscription) { + if (pullAttributes) { + PCollection < PubsubMessage > messagesAttr = pipeline.apply( + "Read PubSub Events", + PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())); + messages = messagesAttr.apply( + "ExtractPayloadAndAttributesToString", MapElements.into(TypeDescriptor.of(String.class)) + .via((PubsubMessage message) - > { + // Get the message payload as a string + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + // Get the message attributes and convert them to a JSON-like string format + Map < String, + String > attributes = message.getAttributeMap(); + String attributesString = attributes.entrySet() .stream() - .map(entry -> "\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"") + .map(entry - > "\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"") .collect(Collectors.joining(", ", "{", "}")); - // Return the concatenated string with both the payload and attributes - return "{ \"payload\": \"" + payload + "\", \"attributes\": " + attributesString + " }"; - }) - ); - }else{ + // Return the concatenated string with both the payload and attributes + return "{ \"payload\": \"" + payload + "\", \"attributes\": " + attributesString + " }"; + }) + ); + } else { + messages = + pipeline.apply( + "Read PubSub Events", + PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + } + } else { messages = pipeline.apply( - "Read PubSub Events", - PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic())); } - } else { - messages = - pipeline.apply( - "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic())); - } - messages - .apply( - options.getWindowDuration() + " Window", - Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))) - - // Apply windowed file writes - .apply( - "Write File(s)", - TextIO.write() + messages + .apply( + options.getWindowDuration() + " Window", + Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))) + + // Apply windowed file writes + .apply( + "Write File(s)", + TextIO.write() .withWindowedWrites() .withNumShards(options.getNumShards()) .to( WindowedFilenamePolicy.writeWindowedFiles() - .withOutputDirectory(options.getOutputDirectory()) - .withOutputFilenamePrefix(options.getOutputFilenamePrefix()) - .withShardTemplate(options.getOutputShardTemplate()) - .withSuffix(options.getOutputFilenameSuffix()) - .withYearPattern(options.getYearPattern()) - .withMonthPattern(options.getMonthPattern()) - .withDayPattern(options.getDayPattern()) - .withHourPattern(options.getHourPattern()) - .withMinutePattern(options.getMinutePattern())) + .withOutputDirectory(options.getOutputDirectory()) + .withOutputFilenamePrefix(options.getOutputFilenamePrefix()) + .withShardTemplate(options.getOutputShardTemplate()) + .withSuffix(options.getOutputFilenameSuffix()) + .withYearPattern(options.getYearPattern()) + .withMonthPattern(options.getMonthPattern()) + .withDayPattern(options.getDayPattern()) + .withHourPattern(options.getHourPattern()) + .withMinutePattern(options.getMinutePattern())) .withTempDirectory( FileBasedSink.convertToFileResourceIfPossible( maybeUseUserTempLocation( options.getUserTempLocation(), options.getOutputDirectory())))); - // Execute the pipeline and return the result. - return pipeline.run(); - } - - /** - * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful - * when output bucket is locked and temporary data cannot be deleted. - * - * @param userTempLocation user provided temp location - * @param outputLocation user provided outputDirectory to be used as the default temp location - * @return userTempLocation if available, otherwise outputLocation is returned. - */ - private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) { - return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation; - } + // Execute the pipeline and return the result. + return pipeline.run(); + } + + /** + * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful + * when output bucket is locked and temporary data cannot be deleted. + * + * @param userTempLocation user provided temp location + * @param outputLocation user provided outputDirectory to be used as the default temp location + * @return userTempLocation if available, otherwise outputLocation is returned. + */ + private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) { + return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation; + } } \ No newline at end of file From 51e2d34956cafb3cb16a562d5f877cc9172b606e Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Mon, 7 Oct 2024 13:50:43 -0700 Subject: [PATCH 06/10] wip --- cloudbuild/cloudbuild.java.googlecloud.pr.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml index de8b925771..89a5055099 100644 --- a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml +++ b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml @@ -32,7 +32,7 @@ substitutions: _OUTPUT_FILENAME_PREFIX: error _OUTPUT_FILENAME_SUFFIX: "" _OUTPUT_SHARD_TEMPLATE: W-P-SS-of-NN - _NUM_SHARDS: 0 + _NUM_SHARDS: "0" _WINDOW_DURATION: 5m _YEAR_PATTERN: YYYY _MONTH_PATTERN: MM From 65586993fbea643d26b1aabe8ac301094a1e1eee Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Mon, 7 Oct 2024 13:55:57 -0700 Subject: [PATCH 07/10] wip --- cloudbuild/cloudbuild.java.googlecloud.pr.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml index 89a5055099..fad12fef5d 100644 --- a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml +++ b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml @@ -6,7 +6,7 @@ steps: - id: "update flex-template" waitFor: ['-'] - name: "gcr.io/cloud-builders/gcloud" + name: "gcr.io/cloud-builders/mvn" args: [ "mvn clean package -PtemplatesRun", "-DskipTests", From 52cca7f1c30aa504d1e49b08b75033158f5881f7 Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Mon, 7 Oct 2024 13:57:39 -0700 Subject: [PATCH 08/10] wip --- .../templates/pubsubtotext/PubsubToText.java | 399 +++++++++--------- 1 file changed, 199 insertions(+), 200 deletions(-) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java index 3c31ed7f58..60cfd9f7cc 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java @@ -57,238 +57,237 @@ category = TemplateCategory.STREAMING, displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage", description = - "The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records " + - "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.", + "The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records " + + "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.", optionsClass = Options.class, flexContainerName = "pubsub-to-text", documentation = - "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text", + "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text", contactInformation = "https://cloud.google.com/support", requirements = { - "The Pub/Sub topic or subscription must exist prior to execution.", - "The messages published to the topic must be in text format.", - "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file." + "The Pub/Sub topic or subscription must exist prior to execution.", + "The messages published to the topic must be in text format.", + "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file." }, streaming = true, supportsAtLeastOnce = true) public class PubsubToText { - /** - * Options supported by the pipeline. - * - *

Inherits standard configuration options. - */ - - public interface Options - extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions { - - @TemplateParameter.PubsubTopic( - order = 1, - groupName = "Source", - optional = true, - description = "Pub/Sub input topic", - helpText = - "The Pub/Sub topic to read the input from. The topic name should be in the format " + - "`projects//topics/`. If this parameter is provided " + - "don't use `inputSubscription`.", - example = "projects/your-project-id/topics/your-topic-name") - String getInputTopic(); - - void setInputTopic(String value); - - @TemplateParameter.PubsubSubscription( - order = 2, - groupName = "Source", - optional = true, - description = "Pub/Sub input subscription", - helpText = - "The Pub/Sub subscription to read the input from. The subscription name uses the format " + - "`projects//subscription/`. If this parameter is " + - "provided, don't use `inputTopic`.", - example = "projects/your-project-id/subscriptions/your-subscription-name") - String getInputSubscription(); - - void setInputSubscription(String value); - - @TemplateParameter.GcsWriteFolder( - order = 3, - groupName = "Target", - description = "Output file directory in Cloud Storage", - helpText = - "The path and filename prefix to write write output files to. " + - "This value must end in a slash.", - example = "gs://your-bucket/your-path") - @Required - String getOutputDirectory(); - - void setOutputDirectory(String value); - - @TemplateParameter.GcsWriteFolder( - order = 4, - optional = true, - description = "User provided temp location", - helpText = + /** + * Options supported by the pipeline. + * + *

Inherits standard configuration options. + */ + + public interface Options + extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions { + + @TemplateParameter.PubsubTopic( + order = 1, + groupName = "Source", + optional = true, + description = "Pub/Sub input topic", + helpText = + "The Pub/Sub topic to read the input from. The topic name should be in the format " + + "`projects//topics/`. If this parameter is provided " + + "don't use `inputSubscription`.", + example = "projects/your-project-id/topics/your-topic-name") + String getInputTopic(); + + void setInputTopic(String value); + + @TemplateParameter.PubsubSubscription( + order = 2, + groupName = "Source", + optional = true, + description = "Pub/Sub input subscription", + helpText = + "The Pub/Sub subscription to read the input from. The subscription name uses the format " + + "`projects//subscription/`. If this parameter is " + + "provided, don't use `inputTopic`.", + example = "projects/your-project-id/subscriptions/your-subscription-name") + String getInputSubscription(); + + void setInputSubscription(String value); + + @TemplateParameter.GcsWriteFolder( + order = 3, + groupName = "Target", + description = "Output file directory in Cloud Storage", + helpText = + "The path and filename prefix to write write output files to. " + + "This value must end in a slash.", + example = "gs://your-bucket/your-path") + @Required + String getOutputDirectory(); + + void setOutputDirectory(String value); + + @TemplateParameter.GcsWriteFolder( + order = 4, + optional = true, + description = "User provided temp location", + helpText = "The user provided directory to output temporary files to. Must end with a slash.") - String getUserTempLocation(); - - void setUserTempLocation(String value); - - @TemplateParameter.Text( - order = 5, - groupName = "Target", - optional = true, - description = "Output filename prefix of the files to write", - helpText = "The prefix to place on each windowed file.", - example = "output-") - @Default.String("output") - @Required - String getOutputFilenamePrefix(); - - void setOutputFilenamePrefix(String value); - - @TemplateParameter.Text( - order = 6, - groupName = "Target", - optional = true, - description = "Output filename suffix of the files to write", - helpText = + String getUserTempLocation(); + + void setUserTempLocation(String value); + + @TemplateParameter.Text( + order = 5, + groupName = "Target", + optional = true, + description = "Output filename prefix of the files to write", + helpText = "The prefix to place on each windowed file.", + example = "output-") + @Default.String("output") + @Required + String getOutputFilenamePrefix(); + + void setOutputFilenamePrefix(String value); + + @TemplateParameter.Text( + order = 6, + groupName = "Target", + optional = true, + description = "Output filename suffix of the files to write", + helpText = "The suffix to place on each windowed file, typically a file extension such as `.txt` or `.csv`.", - example = ".txt") - @Default.String("") - String getOutputFilenameSuffix(); - - void setOutputFilenameSuffix(String value); - - @TemplateParameter.Text( - order = 7, - groupName = "Target", - optional = true, - description = "Optionally include attributes in pubsub pull", - helpText = "If specified, pull the message and the attributes from the topic or subscription", - example = "True,False" - ) - String getAttributeFlag(); - - void setAttributeFlag(String value); + example = ".txt") + @Default.String("") + String getOutputFilenameSuffix(); + + void setOutputFilenameSuffix(String value); + + @TemplateParameter.Text( + order = 7, + groupName = "Target", + optional = true, + description = "Optionally include attributes in pubsub pull", + helpText = "If specified, pull the message and the attributes from the topic or subscription", + example = "True,False" + ) + String getAttributeFlag(); + + void setAttributeFlag(String value); + } + + /** + * Main entry point for executing the pipeline. + * + * @param args The command-line arguments to the pipeline. + */ + public static void main(String[] args) { + UncaughtExceptionLogger.register(); + + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + + options.setStreaming(true); + + run(options); + } + + /** + * Runs the pipeline with the supplied options. + * + * @param options The execution parameters to the pipeline. + * @return The result of the pipeline execution. + */ + public static PipelineResult run(Options options) { + boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription()); + boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag()); + + boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic()); + if (useInputSubscription == useInputTopic) { + throw new IllegalArgumentException( + "Either input topic or input subscription must be provided, but not both."); } - /** - * Main entry point for executing the pipeline. - * - * @param args The command-line arguments to the pipeline. - */ - public static void main(String[] args) { - UncaughtExceptionLogger.register(); + // Create the pipeline + Pipeline pipeline = Pipeline.create(options); - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + PCollection messages = null; - options.setStreaming(true); - - run(options); - } - - /** - * Runs the pipeline with the supplied options. - * - * @param options The execution parameters to the pipeline. - * @return The result of the pipeline execution. + /* + * Steps: + * 1) Read string messages from PubSub + * 2) Window the messages into minute intervals specified by the executor. + * 3) Output the windowed files to GCS */ - public static PipelineResult run(Options options) { - boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription()); - boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag()); - - boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic()); - if (useInputSubscription == useInputTopic) { - throw new IllegalArgumentException( - "Either input topic or input subscription must be provided, but not both."); - } - - // Create the pipeline - Pipeline pipeline = Pipeline.create(options); - - PCollection < String > messages = null; - /* - * Steps: - * 1) Read string messages from PubSub - * 2) Window the messages into minute intervals specified by the executor. - * 3) Output the windowed files to GCS - */ - - if (useInputSubscription) { - if (pullAttributes) { - PCollection < PubsubMessage > messagesAttr = pipeline.apply( - "Read PubSub Events", - PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())); - messages = messagesAttr.apply( - "ExtractPayloadAndAttributesToString", MapElements.into(TypeDescriptor.of(String.class)) - .via((PubsubMessage message) - > { - // Get the message payload as a string - String payload = new String(message.getPayload(), StandardCharsets.UTF_8); - // Get the message attributes and convert them to a JSON-like string format - Map < String, - String > attributes = message.getAttributeMap(); - String attributesString = attributes.entrySet() + if (useInputSubscription) { + if (pullAttributes){ + PCollection messagesAttr = pipeline.apply( + "Read PubSub Events", + PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())); + messages = messagesAttr.apply( + "ExtractPayloadAndAttributesToString", MapElements.into(TypeDescriptor.of(String.class)) + .via((PubsubMessage message) -> { + // Get the message payload as a string + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + // Get the message attributes and convert them to a JSON-like string format + Map attributes = message.getAttributeMap(); + String attributesString = attributes.entrySet() .stream() - .map(entry - > "\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"") + .map(entry -> "\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"") .collect(Collectors.joining(", ", "{", "}")); - // Return the concatenated string with both the payload and attributes - return "{ \"payload\": \"" + payload + "\", \"attributes\": " + attributesString + " }"; - }) - ); - } else { - messages = - pipeline.apply( - "Read PubSub Events", - PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); - } - } else { + // Return the concatenated string with both the payload and attributes + return "{ \"payload\": \"" + payload + "\", \"attributes\": " + attributesString + " }"; + }) + ); + }else{ messages = pipeline.apply( - "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic())); + "Read PubSub Events", + PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); } - messages - .apply( - options.getWindowDuration() + " Window", - Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))) - - // Apply windowed file writes - .apply( - "Write File(s)", - TextIO.write() + } else { + messages = + pipeline.apply( + "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic())); + } + messages + .apply( + options.getWindowDuration() + " Window", + Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration())))) + + // Apply windowed file writes + .apply( + "Write File(s)", + TextIO.write() .withWindowedWrites() .withNumShards(options.getNumShards()) .to( WindowedFilenamePolicy.writeWindowedFiles() - .withOutputDirectory(options.getOutputDirectory()) - .withOutputFilenamePrefix(options.getOutputFilenamePrefix()) - .withShardTemplate(options.getOutputShardTemplate()) - .withSuffix(options.getOutputFilenameSuffix()) - .withYearPattern(options.getYearPattern()) - .withMonthPattern(options.getMonthPattern()) - .withDayPattern(options.getDayPattern()) - .withHourPattern(options.getHourPattern()) - .withMinutePattern(options.getMinutePattern())) + .withOutputDirectory(options.getOutputDirectory()) + .withOutputFilenamePrefix(options.getOutputFilenamePrefix()) + .withShardTemplate(options.getOutputShardTemplate()) + .withSuffix(options.getOutputFilenameSuffix()) + .withYearPattern(options.getYearPattern()) + .withMonthPattern(options.getMonthPattern()) + .withDayPattern(options.getDayPattern()) + .withHourPattern(options.getHourPattern()) + .withMinutePattern(options.getMinutePattern())) .withTempDirectory( FileBasedSink.convertToFileResourceIfPossible( maybeUseUserTempLocation( options.getUserTempLocation(), options.getOutputDirectory())))); - // Execute the pipeline and return the result. - return pipeline.run(); - } - - /** - * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful - * when output bucket is locked and temporary data cannot be deleted. - * - * @param userTempLocation user provided temp location - * @param outputLocation user provided outputDirectory to be used as the default temp location - * @return userTempLocation if available, otherwise outputLocation is returned. - */ - private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) { - return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation; - } + // Execute the pipeline and return the result. + return pipeline.run(); + } + + /** + * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful + * when output bucket is locked and temporary data cannot be deleted. + * + * @param userTempLocation user provided temp location + * @param outputLocation user provided outputDirectory to be used as the default temp location + * @return userTempLocation if available, otherwise outputLocation is returned. + */ + private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) { + return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation; + } } \ No newline at end of file From 942d9457e3bb8f064e0dee46b9bf461adb2d74f3 Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Mon, 7 Oct 2024 14:02:57 -0700 Subject: [PATCH 09/10] fix spotless --- .../templates/pubsubtotext/PubsubToText.java | 77 +++++++++++-------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java index 60cfd9f7cc..fa4e78580d 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java @@ -71,7 +71,6 @@ }, streaming = true, supportsAtLeastOnce = true) - public class PubsubToText { /** @@ -79,7 +78,6 @@ public class PubsubToText { * *

Inherits standard configuration options. */ - public interface Options extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions { @@ -165,9 +163,9 @@ public interface Options groupName = "Target", optional = true, description = "Optionally include attributes in pubsub pull", - helpText = "If specified, pull the message and the attributes from the topic or subscription", - example = "True,False" - ) + helpText = + "If specified, pull the message and the attributes from the topic or subscription", + example = "True,False") String getAttributeFlag(); void setAttributeFlag(String value); @@ -209,7 +207,7 @@ public static PipelineResult run(Options options) { PCollection messages = null; - /* + /* * Steps: * 1) Read string messages from PubSub * 2) Window the messages into minute intervals specified by the executor. @@ -217,32 +215,47 @@ public static PipelineResult run(Options options) { */ if (useInputSubscription) { - if (pullAttributes){ - PCollection messagesAttr = pipeline.apply( + if (pullAttributes) { + PCollection messagesAttr = + pipeline.apply( "Read PubSub Events", - PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())); - messages = messagesAttr.apply( - "ExtractPayloadAndAttributesToString", MapElements.into(TypeDescriptor.of(String.class)) - .via((PubsubMessage message) -> { - // Get the message payload as a string - String payload = new String(message.getPayload(), StandardCharsets.UTF_8); - // Get the message attributes and convert them to a JSON-like string format - Map attributes = message.getAttributeMap(); - String attributesString = attributes.entrySet() - .stream() - .map(entry -> "\"" + entry.getKey() + "\": \"" + entry.getValue() + "\"") - .collect(Collectors.joining(", ", "{", "}")); - - // Return the concatenated string with both the payload and attributes - return "{ \"payload\": \"" + payload + "\", \"attributes\": " + attributesString + " }"; - }) - ); - }else{ - messages = - pipeline.apply( - "Read PubSub Events", - PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); - } + PubsubIO.readMessagesWithAttributes() + .fromSubscription(options.getInputSubscription())); + messages = + messagesAttr.apply( + "ExtractPayloadAndAttributesToString", + MapElements.into(TypeDescriptor.of(String.class)) + .via( + (PubsubMessage message) -> { + // Get the message payload as a string + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + // Get the message attributes and convert them to a JSON-like string + // format + Map attributes = message.getAttributeMap(); + String attributesString = + attributes.entrySet().stream() + .map( + entry -> + "\"" + + entry.getKey() + + "\": \"" + + entry.getValue() + + "\"") + .collect(Collectors.joining(", ", "{", "}")); + + // Return the concatenated string with both the payload and attributes + return "{ \"payload\": \"" + + payload + + "\", \"attributes\": " + + attributesString + + " }"; + })); + } else { + messages = + pipeline.apply( + "Read PubSub Events", + PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + } } else { messages = pipeline.apply( @@ -290,4 +303,4 @@ public static PipelineResult run(Options options) { private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) { return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation; } -} \ No newline at end of file +} From e5f6e7f13e4e39939ce66a57f10c740deaebb28f Mon Sep 17 00:00:00 2001 From: Nialls Chavez Date: Mon, 7 Oct 2024 14:08:14 -0700 Subject: [PATCH 10/10] try stuff --- .github/workflows/java-pr.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/java-pr.yml b/.github/workflows/java-pr.yml index a777e95712..9191bb3901 100644 --- a/.github/workflows/java-pr.yml +++ b/.github/workflows/java-pr.yml @@ -79,7 +79,7 @@ jobs: java_build: name: Build timeout-minutes: 60 - runs-on: [self-hosted, it] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 @@ -94,7 +94,7 @@ jobs: name: Unit Tests needs: [java_build] timeout-minutes: 60 - runs-on: [self-hosted, it] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 @@ -126,7 +126,7 @@ jobs: needs: [spotless_check, checkstyle_check, java_build, java_unit_tests] timeout-minutes: 60 # Run on any runner that matches all the specified runs-on values. - runs-on: [self-hosted, it] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 @@ -134,7 +134,7 @@ jobs: id: setup-env uses: ./.github/actions/setup-env - name: Run Integration Smoke Tests - run: | + run: | ./cicd/run-it-smoke-tests \ --modules-to-build="DEFAULT" \ --it-region="us-central1" \ @@ -156,7 +156,7 @@ jobs: needs: [java_integration_smoke_tests_templates] timeout-minutes: 240 # Run on any runner that matches all the specified runs-on values. - runs-on: [self-hosted, it] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 @@ -164,7 +164,7 @@ jobs: id: setup-env uses: ./.github/actions/setup-env - name: Run Integration Tests - run: | + run: | ./cicd/run-it-tests \ --modules-to-build="DEFAULT" \ --it-region="us-central1" \ @@ -187,7 +187,7 @@ jobs: needs: [spotless_check, checkstyle_check, java_build, java_unit_tests, java_integration_tests_templates] timeout-minutes: 600 # Run on any runner that matches all the specified runs-on values. - runs-on: [self-hosted, perf] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0