diff --git a/.github/workflows/java-pr.yml b/.github/workflows/java-pr.yml index a777e95712..9191bb3901 100644 --- a/.github/workflows/java-pr.yml +++ b/.github/workflows/java-pr.yml @@ -79,7 +79,7 @@ jobs: java_build: name: Build timeout-minutes: 60 - runs-on: [self-hosted, it] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 @@ -94,7 +94,7 @@ jobs: name: Unit Tests needs: [java_build] timeout-minutes: 60 - runs-on: [self-hosted, it] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 @@ -126,7 +126,7 @@ jobs: needs: [spotless_check, checkstyle_check, java_build, java_unit_tests] timeout-minutes: 60 # Run on any runner that matches all the specified runs-on values. - runs-on: [self-hosted, it] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 @@ -134,7 +134,7 @@ jobs: id: setup-env uses: ./.github/actions/setup-env - name: Run Integration Smoke Tests - run: | + run: | ./cicd/run-it-smoke-tests \ --modules-to-build="DEFAULT" \ --it-region="us-central1" \ @@ -156,7 +156,7 @@ jobs: needs: [java_integration_smoke_tests_templates] timeout-minutes: 240 # Run on any runner that matches all the specified runs-on values. - runs-on: [self-hosted, it] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 @@ -164,7 +164,7 @@ jobs: id: setup-env uses: ./.github/actions/setup-env - name: Run Integration Tests - run: | + run: | ./cicd/run-it-tests \ --modules-to-build="DEFAULT" \ --it-region="us-central1" \ @@ -187,7 +187,7 @@ jobs: needs: [spotless_check, checkstyle_check, java_build, java_unit_tests, java_integration_tests_templates] timeout-minutes: 600 # Run on any runner that matches all the specified runs-on values. - runs-on: [self-hosted, perf] + runs-on: ubuntu-latest steps: - name: Checkout Code uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0 diff --git a/cloudbuild/cloudbuild.java.googlecloud.pr.yaml b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml new file mode 100644 index 0000000000..fad12fef5d --- /dev/null +++ b/cloudbuild/cloudbuild.java.googlecloud.pr.yaml @@ -0,0 +1,46 @@ +steps: + + # --------- + # Update the flex template + # --------- + + - id: "update flex-template" + waitFor: ['-'] + name: "gcr.io/cloud-builders/mvn" + args: [ + "mvn clean package -PtemplatesRun", + "-DskipTests", + "-DprojectId=${_TARGET_PROJECT_ID}", + "-DbucketName=${_TEMPLATE_BUCKET}", + "-Dregion=us-central1", + "-DjobName=cloud-pubsub-to-gcs-text-flex-job", + "-DtemplateName=Cloud_PubSub_to_GCS_Text_Flex_w_attrs", + "-Dparameters=inputTopic={$_INPUT_TOPIC},inputSubscription={$_INPUT_SUBSCRIPTION},outputDirectory=${_OUTPUT_DIRECTORY},userTempLocation=${_USER_TEMP_LOCATION},outputFilenamePrefix=${_OUTPUT_FILENAME_PREFIX},outputShardTemplate=${_OUTPUT_SHARD_TEMPLATE},numShards=${_NUM_SHARDS},windowDuration=${_WINDOW_DURATION},yearPattern=${_YEAR_PATTERN},monthPattern=${_MONTH_PATTERN},dayPattern=${_DAY_PATTERN},hourPattern=${_HOUR_PATTERN},minutePattern=${_MINUTE_PATTERN}", + "-f v2/googlecloud-to-googlecloud" + ] + +timeout: 1200s + +substitutions: + ### Required + _OUTPUT_DIRECTORY: gs://analytics_events_warehouse/errors_w_attrs + + ### Optional + _INPUT_TOPIC: dataflow-failure + _INPUT_SUBSCRIPTION: failed_dataflow_transformation_develop + _USER_TEMP_LOCATION: gs://dataflow-staging-us-central1-739390938599/tmp + _OUTPUT_FILENAME_PREFIX: error + _OUTPUT_FILENAME_SUFFIX: "" + _OUTPUT_SHARD_TEMPLATE: W-P-SS-of-NN + _NUM_SHARDS: "0" + _WINDOW_DURATION: 5m + _YEAR_PATTERN: YYYY + _MONTH_PATTERN: MM + _DAY_PATTERN: dd + _HOUR_PATTERN: HH + _MINUTE_PATTERN: mm + + _TARGET_PROJECT_ID: onx-dw-raw + _TEMPLATE_BUCKET: gs://onx-flex-templates +options: + dynamic_substitutions: true diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java index 0b66afd011..fa4e78580d 100644 --- a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/pubsubtotext/PubsubToText.java @@ -24,19 +24,25 @@ import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options; import com.google.cloud.teleport.v2.utils.DurationUtils; import com.google.common.base.Strings; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation.Required; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; /** * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into @@ -151,6 +157,18 @@ public interface Options String getOutputFilenameSuffix(); void setOutputFilenameSuffix(String value); + + @TemplateParameter.Text( + order = 7, + groupName = "Target", + optional = true, + description = "Optionally include attributes in pubsub pull", + helpText = + "If specified, pull the message and the attributes from the topic or subscription", + example = "True,False") + String getAttributeFlag(); + + void setAttributeFlag(String value); } /** @@ -176,6 +194,8 @@ public static void main(String[] args) { */ public static PipelineResult run(Options options) { boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription()); + boolean pullAttributes = !Strings.isNullOrEmpty(options.getAttributeFlag()); + boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic()); if (useInputSubscription == useInputTopic) { throw new IllegalArgumentException( @@ -193,11 +213,49 @@ public static PipelineResult run(Options options) { * 2) Window the messages into minute intervals specified by the executor. * 3) Output the windowed files to GCS */ + if (useInputSubscription) { - messages = - pipeline.apply( - "Read PubSub Events", - PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + if (pullAttributes) { + PCollection messagesAttr = + pipeline.apply( + "Read PubSub Events", + PubsubIO.readMessagesWithAttributes() + .fromSubscription(options.getInputSubscription())); + messages = + messagesAttr.apply( + "ExtractPayloadAndAttributesToString", + MapElements.into(TypeDescriptor.of(String.class)) + .via( + (PubsubMessage message) -> { + // Get the message payload as a string + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + // Get the message attributes and convert them to a JSON-like string + // format + Map attributes = message.getAttributeMap(); + String attributesString = + attributes.entrySet().stream() + .map( + entry -> + "\"" + + entry.getKey() + + "\": \"" + + entry.getValue() + + "\"") + .collect(Collectors.joining(", ", "{", "}")); + + // Return the concatenated string with both the payload and attributes + return "{ \"payload\": \"" + + payload + + "\", \"attributes\": " + + attributesString + + " }"; + })); + } else { + messages = + pipeline.apply( + "Read PubSub Events", + PubsubIO.readStrings().fromSubscription(options.getInputSubscription())); + } } else { messages = pipeline.apply(