From de8528965aa231757362fbef538bd2518c0bd52d Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 29 Apr 2026 13:03:47 -0700 Subject: [PATCH 01/11] Add Pipeline Options --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 3 +++ .../runners/dataflow/options/DataflowPipelineOptions.java | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index b375de661885..5e8b022ab01c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1368,6 +1368,9 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME); dataflowOptions.setPipelineUrl(stagedPipeline.getLocation()); + String pipelineProtoHash = Hashing.sha256().hashBytes(serializedProtoPipeline).toString(); + dataflowOptions.setPipelineProtoHash(pipelineProtoHash); + if (useUnifiedWorker(options)) { LOG.info("Skipping v1 transform replacements since job will run on v2."); } else { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 57f927d73073..e79eecca9c17 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -185,6 +185,12 @@ public interface DataflowPipelineOptions void setPipelineUrl(String urlString); + /** The SHA-256 hash of the staged portable pipeline proto. */ + @Description("The SHA-256 hash of the staged portable pipeline proto") + String getPipelineProtoHash(); + + void setPipelineProtoHash(String hash); + @Description("The customized dataflow worker jar") String getDataflowWorkerJar(); From b8963a9dd5ec1594c4e92ca79c29aa3ed559f78e Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 30 Apr 2026 11:59:15 -0700 Subject: [PATCH 02/11] Add Pipeline Hash along with URL --- .../beam/runners/dataflow/dataflowlib/job.go | 27 +++++++--- .../runners/dataflow/dataflowlib/job_test.go | 54 +++++++++++++++++++ .../runners/dataflow/internal/apiclient.py | 4 ++ .../dataflow/internal/apiclient_test.py | 35 ++++++++++++ 4 files changed, 112 insertions(+), 8 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index db83992fbebc..5dfb13cf7972 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -17,6 +17,8 @@ package dataflowlib import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "strings" "time" @@ -131,6 +133,13 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker return nil, err } + serializedPipeline, err := proto.Marshal(p) + if err != nil { + return nil, err + } + hash := sha256.Sum256(serializedPipeline) + pipelineProtoHash := hex.EncodeToString(hash[:]) + packages := []*df.Package{{ Name: "worker", Location: workerURL, @@ -176,10 +185,11 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker SdkPipelineOptions: newMsg(pipelineOptions{ DisplayData: printOptions(opts, images), Options: dataflowOptions{ - PipelineURL: modelURL, - Region: opts.Region, - Experiments: opts.Experiments, - TempLocation: opts.TempLocation, + PipelineURL: modelURL, + PipelineProtoHash: pipelineProtoHash, + Region: opts.Region, + Experiments: opts.Experiments, + TempLocation: opts.TempLocation, }, GoOptions: opts.Options, }), @@ -350,10 +360,11 @@ func GetMetrics(ctx context.Context, client *df.Service, project, region, jobID // pipeline options that are communicated to cross-language SDK harnesses, so any pipeline options // needed for cross-language transforms in Dataflow must be declared here. type dataflowOptions struct { - Experiments []string `json:"experiments,omitempty"` - PipelineURL string `json:"pipelineUrl"` - Region string `json:"region"` - TempLocation string `json:"tempLocation"` + Experiments []string `json:"experiments,omitempty"` + PipelineURL string `json:"pipelineUrl"` + PipelineProtoHash string `json:"pipelineProtoHash,omitempty"` + Region string `json:"region"` + TempLocation string `json:"tempLocation"` } func printOptions(opts *JobOptions, images []string) []*displayData { diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go index fb44ff9c0133..96be696f803b 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go @@ -21,12 +21,18 @@ import ( "reflect" "testing" + "crypto/sha256" + "encoding/hex" + "encoding/json" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" df "google.golang.org/api/dataflow/v1b3" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" ) @@ -280,3 +286,51 @@ func Test_containerImages(t *testing.T) { } } + +func TestTranslate(t *testing.T) { + p := &pipepb.Pipeline{ + Components: &pipepb.Components{ + Environments: map[string]*pipepb.Environment{ + "env1": { + Payload: protox.MustEncode(&pipepb.DockerPayload{ + ContainerImage: "dummy_image", + }), + }, + }, + }, + } + opts := &JobOptions{ + Name: "test-job", + Project: "test-project", + Region: "test-region", + Options: runtime.RawOptions{ + Options: make(map[string]string), + }, + } + + job, err := Translate(context.Background(), p, opts, "worker-url", "model-url") + if err != nil { + t.Fatalf("Translate failed: %v", err) + } + + // Verify PipelineProtoHash + var recoveredOptions struct { + Options struct { + PipelineURL string `json:"pipelineUrl"` + PipelineProtoHash string `json:"pipelineProtoHash"` + } `json:"options"` + } + + rawOpts := job.Environment.SdkPipelineOptions + if err := json.Unmarshal(rawOpts, &recoveredOptions); err != nil { + t.Fatalf("Failed to unmarshal SdkPipelineOptions: %v", err) + } + + serializedPipeline, _ := proto.Marshal(p) + hash := sha256.Sum256(serializedPipeline) + expectedHashStr := hex.EncodeToString(hash[:]) + + if recoveredOptions.Options.PipelineProtoHash != expectedHashStr { + t.Errorf("Expected PipelineProtoHash %v, got %v", expectedHashStr, recoveredOptions.Options.PipelineProtoHash) + } +} diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 29cb36071488..40d619a78184 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -279,6 +279,10 @@ def __init__( for k, v in sdk_pipeline_options.items() if v is not None } options_dict["pipelineUrl"] = proto_pipeline_staged_url + if self._proto_pipeline: + serialized_pipeline = self._proto_pipeline.SerializeToString() + options_dict["pipelineProtoHash"] = hashlib.sha256( + serialized_pipeline).hexdigest() # Don't pass impersonate_service_account through to the harness. # Though impersonation should start a job, the workers should # not try to modify their credentials. diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 66b1c8e1e5bb..404e543ca3f4 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -97,6 +97,41 @@ def test_pipeline_url(self): self.assertEqual(pipeline_url.string_value, FAKE_PIPELINE_URL) + def test_pipeline_proto_hash(self): + pipeline_options = PipelineOptions([ + '--temp_location', + 'gs://any-location/temp' + ]) + proto_pipeline = beam_runner_api_pb2.Pipeline() + proto_pipeline.components.transforms['dummy'].unique_name = 'dummy' + + env = apiclient.Environment( + [], + pipeline_options, + '2.0.0', + FAKE_PIPELINE_URL, + proto_pipeline) + + recovered_options = None + for additionalProperty in env.proto.sdkPipelineOptions.additionalProperties: + if additionalProperty.key == 'options': + recovered_options = additionalProperty.value + break + else: + self.fail('No pipeline options found') + + pipeline_proto_hash = None + for property in recovered_options.object_value.properties: + if property.key == 'pipelineProtoHash': + pipeline_proto_hash = property.value + break + else: + self.fail('No pipelineProtoHash found') + + import hashlib + expected_hash = hashlib.sha256(proto_pipeline.SerializeToString()).hexdigest() + self.assertEqual(pipeline_proto_hash.string_value, expected_hash) + def test_set_network(self): pipeline_options = PipelineOptions([ '--network', From c0391e9e1983602290b0437298ae86ce89a0ad36 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 1 May 2026 10:47:13 -0700 Subject: [PATCH 03/11] fix go lang staging --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 8 +++++++- .../pkg/beam/runners/dataflow/dataflowlib/execute.go | 9 +++++++-- sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go | 11 +---------- .../pkg/beam/runners/dataflow/dataflowlib/job_test.go | 10 +++++----- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 101441dbcb56..6b722153c1ac 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -24,6 +24,8 @@ package dataflow import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "flag" "fmt" @@ -40,6 +42,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts" @@ -235,7 +238,10 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error) log.Info(ctx, "Dry-run: not submitting job!") log.Info(ctx, model.String()) - job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL) + modelBytes := protox.MustEncode(model) + hash := sha256.Sum256(modelBytes) + pipelineProtoHash := hex.EncodeToString(hash[:]) + job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL, pipelineProtoHash) if err != nil { return nil, err } diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go index 806b8940ae99..396eefab7318 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go @@ -19,6 +19,8 @@ package dataflowlib import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "os" "strings" @@ -83,14 +85,17 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker // (2) Upload model to GCS log.Info(ctx, raw.String()) - if err := StageModel(ctx, opts.Project, modelURL, protox.MustEncode(raw)); err != nil { + modelBytes := protox.MustEncode(raw) + modelHash := sha256.Sum256(modelBytes) + pipelineProtoHash := hex.EncodeToString(modelHash[:]) + if err := StageModel(ctx, opts.Project, modelURL, modelBytes); err != nil { return presult, err } log.Infof(ctx, "Staged model pipeline: %v", modelURL) // (3) Translate to v1b3 and submit - job, err := Translate(ctx, raw, opts, workerURL, modelURL) + job, err := Translate(ctx, raw, opts, workerURL, modelURL, pipelineProtoHash) if err != nil { return presult, err } diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index 5dfb13cf7972..44a68fa69d6c 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -17,8 +17,6 @@ package dataflowlib import ( "context" - "crypto/sha256" - "encoding/hex" "fmt" "strings" "time" @@ -117,7 +115,7 @@ func containerImages(p *pipepb.Pipeline) ([]*df.SdkHarnessContainerImage, []stri } // Translate translates a pipeline to a Dataflow job. -func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, workerURL, modelURL string) (*df.Job, error) { +func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, workerURL, modelURL string, pipelineProtoHash string) (*df.Job, error) { // (1) Translate pipeline to v1b3 speak. jobType := "JOB_TYPE_BATCH" @@ -133,13 +131,6 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker return nil, err } - serializedPipeline, err := proto.Marshal(p) - if err != nil { - return nil, err - } - hash := sha256.Sum256(serializedPipeline) - pipelineProtoHash := hex.EncodeToString(hash[:]) - packages := []*df.Package{{ Name: "worker", Location: workerURL, diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go index 96be696f803b..c293d24e908c 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go @@ -308,7 +308,11 @@ func TestTranslate(t *testing.T) { }, } - job, err := Translate(context.Background(), p, opts, "worker-url", "model-url") + serializedPipeline, _ := proto.Marshal(p) + hash := sha256.Sum256(serializedPipeline) + expectedHashStr := hex.EncodeToString(hash[:]) + + job, err := Translate(context.Background(), p, opts, "worker-url", "model-url", expectedHashStr) if err != nil { t.Fatalf("Translate failed: %v", err) } @@ -326,10 +330,6 @@ func TestTranslate(t *testing.T) { t.Fatalf("Failed to unmarshal SdkPipelineOptions: %v", err) } - serializedPipeline, _ := proto.Marshal(p) - hash := sha256.Sum256(serializedPipeline) - expectedHashStr := hex.EncodeToString(hash[:]) - if recoveredOptions.Options.PipelineProtoHash != expectedHashStr { t.Errorf("Expected PipelineProtoHash %v, got %v", expectedHashStr, recoveredOptions.Options.PipelineProtoHash) } From d00793d7617bcd3a46faacf780624b525ec1a6be Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 1 May 2026 11:28:24 -0700 Subject: [PATCH 04/11] Fix formatting --- .../options/DataflowPipelineOptions.java | 4 ++-- .../runners/dataflow/dataflowlib/job_test.go | 7 +----- .../dataflow/internal/apiclient_test.py | 22 +++++++++---------- 3 files changed, 13 insertions(+), 20 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index e79eecca9c17..f1e92695be8c 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -185,8 +185,8 @@ public interface DataflowPipelineOptions void setPipelineUrl(String urlString); - /** The SHA-256 hash of the staged portable pipeline proto. */ - @Description("The SHA-256 hash of the staged portable pipeline proto") + /** The hex-encoded SHA256 hash of the staged portable pipeline proto. */ + @Description("The hex-encoded SHA256 hash of the staged portable pipeline proto") String getPipelineProtoHash(); void setPipelineProtoHash(String hash); diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go index c293d24e908c..a93a322d1301 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go @@ -21,8 +21,6 @@ import ( "reflect" "testing" - "crypto/sha256" - "encoding/hex" "encoding/json" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" @@ -32,7 +30,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" df "google.golang.org/api/dataflow/v1b3" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" ) @@ -308,9 +305,7 @@ func TestTranslate(t *testing.T) { }, } - serializedPipeline, _ := proto.Marshal(p) - hash := sha256.Sum256(serializedPipeline) - expectedHashStr := hex.EncodeToString(hash[:]) + expectedHashStr := "dummy-hash-12345" job, err := Translate(context.Background(), p, opts, "worker-url", "model-url", expectedHashStr) if err != nil { diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 404e543ca3f4..5fed491f019d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -98,19 +98,16 @@ def test_pipeline_url(self): self.assertEqual(pipeline_url.string_value, FAKE_PIPELINE_URL) def test_pipeline_proto_hash(self): - pipeline_options = PipelineOptions([ - '--temp_location', - 'gs://any-location/temp' - ]) + pipeline_options = PipelineOptions( + ['--temp_location', 'gs://any-location/temp']) proto_pipeline = beam_runner_api_pb2.Pipeline() proto_pipeline.components.transforms['dummy'].unique_name = 'dummy' - - env = apiclient.Environment( - [], - pipeline_options, - '2.0.0', - FAKE_PIPELINE_URL, - proto_pipeline) + + env = apiclient.Environment([], + pipeline_options, + '2.0.0', + FAKE_PIPELINE_URL, + proto_pipeline) recovered_options = None for additionalProperty in env.proto.sdkPipelineOptions.additionalProperties: @@ -129,7 +126,8 @@ def test_pipeline_proto_hash(self): self.fail('No pipelineProtoHash found') import hashlib - expected_hash = hashlib.sha256(proto_pipeline.SerializeToString()).hexdigest() + expected_hash = hashlib.sha256( + proto_pipeline.SerializeToString()).hexdigest() self.assertEqual(pipeline_proto_hash.string_value, expected_hash) def test_set_network(self): From bf527924c46794adffbf56a384f151d8e29b553a Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 1 May 2026 12:56:36 -0700 Subject: [PATCH 05/11] backmerge master --- .../cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock | 0 .../cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock | 0 .gocache/cache/download/golang.org/x/net/@v/v0.50.0.lock | 0 .gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock | 0 .gocache/cache/download/google.golang.org/grpc/@v/v1.78.0.lock | 0 .../cache/download/google.golang.org/protobuf/@v/v1.36.11.lock | 0 .../cache/download/cloud.google.com/go/profiler/@v/v0.4.3.lock | 0 .../cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock | 0 .../cache/download/github.com/avast/retry-go/v4/@v/v4.7.0.lock | 0 .../cache/download/github.com/dustin/go-humanize/@v/v1.0.1.lock | 0 .../cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock | 0 .../cache/download/github.com/google/go-cmp/@v/v0.7.0.lock | 0 .../.gocache/cache/download/github.com/google/uuid/@v/v1.6.0.lock | 0 .../golang.org/toolchain/@v/v0.0.1-go1.26.1.darwin-arm64.lock | 0 .../golang.org/x/exp/@v/v0.0.0-20250106191152-7588d65b2ba8.lock | 0 .../.gocache/cache/download/golang.org/x/oauth2/@v/v0.35.0.lock | 0 sdks/go/.gocache/cache/download/golang.org/x/sync/@v/v0.20.0.lock | 0 .../cache/download/google.golang.org/api/@v/v0.257.0.lock | 0 .../cache/download/google.golang.org/grpc/@v/v1.80.0.lock | 0 .../cache/download/google.golang.org/protobuf/@v/v1.36.11.lock | 0 20 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 .gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock create mode 100644 .gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock create mode 100644 .gocache/cache/download/golang.org/x/net/@v/v0.50.0.lock create mode 100644 .gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock create mode 100644 .gocache/cache/download/google.golang.org/grpc/@v/v1.78.0.lock create mode 100644 .gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock create mode 100644 sdks/go/.gocache/cache/download/cloud.google.com/go/profiler/@v/v0.4.3.lock create mode 100644 sdks/go/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock create mode 100644 sdks/go/.gocache/cache/download/github.com/avast/retry-go/v4/@v/v4.7.0.lock create mode 100644 sdks/go/.gocache/cache/download/github.com/dustin/go-humanize/@v/v1.0.1.lock create mode 100644 sdks/go/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock create mode 100644 sdks/go/.gocache/cache/download/github.com/google/go-cmp/@v/v0.7.0.lock create mode 100644 sdks/go/.gocache/cache/download/github.com/google/uuid/@v/v1.6.0.lock create mode 100644 sdks/go/.gocache/cache/download/golang.org/toolchain/@v/v0.0.1-go1.26.1.darwin-arm64.lock create mode 100644 sdks/go/.gocache/cache/download/golang.org/x/exp/@v/v0.0.0-20250106191152-7588d65b2ba8.lock create mode 100644 sdks/go/.gocache/cache/download/golang.org/x/oauth2/@v/v0.35.0.lock create mode 100644 sdks/go/.gocache/cache/download/golang.org/x/sync/@v/v0.20.0.lock create mode 100644 sdks/go/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock create mode 100644 sdks/go/.gocache/cache/download/google.golang.org/grpc/@v/v1.80.0.lock create mode 100644 sdks/go/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock diff --git a/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock b/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock b/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.gocache/cache/download/golang.org/x/net/@v/v0.50.0.lock b/.gocache/cache/download/golang.org/x/net/@v/v0.50.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock b/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.gocache/cache/download/google.golang.org/grpc/@v/v1.78.0.lock b/.gocache/cache/download/google.golang.org/grpc/@v/v1.78.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock b/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/cloud.google.com/go/profiler/@v/v0.4.3.lock b/sdks/go/.gocache/cache/download/cloud.google.com/go/profiler/@v/v0.4.3.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock b/sdks/go/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/github.com/avast/retry-go/v4/@v/v4.7.0.lock b/sdks/go/.gocache/cache/download/github.com/avast/retry-go/v4/@v/v4.7.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/github.com/dustin/go-humanize/@v/v1.0.1.lock b/sdks/go/.gocache/cache/download/github.com/dustin/go-humanize/@v/v1.0.1.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock b/sdks/go/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/github.com/google/go-cmp/@v/v0.7.0.lock b/sdks/go/.gocache/cache/download/github.com/google/go-cmp/@v/v0.7.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/github.com/google/uuid/@v/v1.6.0.lock b/sdks/go/.gocache/cache/download/github.com/google/uuid/@v/v1.6.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/golang.org/toolchain/@v/v0.0.1-go1.26.1.darwin-arm64.lock b/sdks/go/.gocache/cache/download/golang.org/toolchain/@v/v0.0.1-go1.26.1.darwin-arm64.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/golang.org/x/exp/@v/v0.0.0-20250106191152-7588d65b2ba8.lock b/sdks/go/.gocache/cache/download/golang.org/x/exp/@v/v0.0.0-20250106191152-7588d65b2ba8.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/golang.org/x/oauth2/@v/v0.35.0.lock b/sdks/go/.gocache/cache/download/golang.org/x/oauth2/@v/v0.35.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/golang.org/x/sync/@v/v0.20.0.lock b/sdks/go/.gocache/cache/download/golang.org/x/sync/@v/v0.20.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock b/sdks/go/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/google.golang.org/grpc/@v/v1.80.0.lock b/sdks/go/.gocache/cache/download/google.golang.org/grpc/@v/v1.80.0.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/go/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock b/sdks/go/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock new file mode 100644 index 000000000000..e69de29bb2d1 From 1b083693bc8212786ed8da2d2c546aafc186f090 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 1 May 2026 12:58:18 -0700 Subject: [PATCH 06/11] remove cache --- .../cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock | 0 .../cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock | 0 .gocache/cache/download/golang.org/x/net/@v/v0.50.0.lock | 0 .gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock | 0 .gocache/cache/download/google.golang.org/grpc/@v/v1.78.0.lock | 0 .../cache/download/google.golang.org/protobuf/@v/v1.36.11.lock | 0 .../cache/download/cloud.google.com/go/profiler/@v/v0.4.3.lock | 0 .../cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock | 0 .../cache/download/github.com/avast/retry-go/v4/@v/v4.7.0.lock | 0 .../cache/download/github.com/dustin/go-humanize/@v/v1.0.1.lock | 0 .../cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock | 0 .../cache/download/github.com/google/go-cmp/@v/v0.7.0.lock | 0 .../.gocache/cache/download/github.com/google/uuid/@v/v1.6.0.lock | 0 .../golang.org/toolchain/@v/v0.0.1-go1.26.1.darwin-arm64.lock | 0 .../golang.org/x/exp/@v/v0.0.0-20250106191152-7588d65b2ba8.lock | 0 .../.gocache/cache/download/golang.org/x/oauth2/@v/v0.35.0.lock | 0 sdks/go/.gocache/cache/download/golang.org/x/sync/@v/v0.20.0.lock | 0 .../cache/download/google.golang.org/api/@v/v0.257.0.lock | 0 .../cache/download/google.golang.org/grpc/@v/v1.80.0.lock | 0 .../cache/download/google.golang.org/protobuf/@v/v1.36.11.lock | 0 20 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 .gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock delete mode 100644 .gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock delete mode 100644 .gocache/cache/download/golang.org/x/net/@v/v0.50.0.lock delete mode 100644 .gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock delete mode 100644 .gocache/cache/download/google.golang.org/grpc/@v/v1.78.0.lock delete mode 100644 .gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock delete mode 100644 sdks/go/.gocache/cache/download/cloud.google.com/go/profiler/@v/v0.4.3.lock delete mode 100644 sdks/go/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock delete mode 100644 sdks/go/.gocache/cache/download/github.com/avast/retry-go/v4/@v/v4.7.0.lock delete mode 100644 sdks/go/.gocache/cache/download/github.com/dustin/go-humanize/@v/v1.0.1.lock delete mode 100644 sdks/go/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock delete mode 100644 sdks/go/.gocache/cache/download/github.com/google/go-cmp/@v/v0.7.0.lock delete mode 100644 sdks/go/.gocache/cache/download/github.com/google/uuid/@v/v1.6.0.lock delete mode 100644 sdks/go/.gocache/cache/download/golang.org/toolchain/@v/v0.0.1-go1.26.1.darwin-arm64.lock delete mode 100644 sdks/go/.gocache/cache/download/golang.org/x/exp/@v/v0.0.0-20250106191152-7588d65b2ba8.lock delete mode 100644 sdks/go/.gocache/cache/download/golang.org/x/oauth2/@v/v0.35.0.lock delete mode 100644 sdks/go/.gocache/cache/download/golang.org/x/sync/@v/v0.20.0.lock delete mode 100644 sdks/go/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock delete mode 100644 sdks/go/.gocache/cache/download/google.golang.org/grpc/@v/v1.80.0.lock delete mode 100644 sdks/go/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock diff --git a/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock b/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock b/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/.gocache/cache/download/golang.org/x/net/@v/v0.50.0.lock b/.gocache/cache/download/golang.org/x/net/@v/v0.50.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock b/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/.gocache/cache/download/google.golang.org/grpc/@v/v1.78.0.lock b/.gocache/cache/download/google.golang.org/grpc/@v/v1.78.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock b/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/cloud.google.com/go/profiler/@v/v0.4.3.lock b/sdks/go/.gocache/cache/download/cloud.google.com/go/profiler/@v/v0.4.3.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock b/sdks/go/.gocache/cache/download/cloud.google.com/go/storage/@v/v1.59.2.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/github.com/avast/retry-go/v4/@v/v4.7.0.lock b/sdks/go/.gocache/cache/download/github.com/avast/retry-go/v4/@v/v4.7.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/github.com/dustin/go-humanize/@v/v1.0.1.lock b/sdks/go/.gocache/cache/download/github.com/dustin/go-humanize/@v/v1.0.1.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock b/sdks/go/.gocache/cache/download/github.com/golang-cz/devslog/@v/v0.0.15.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/github.com/google/go-cmp/@v/v0.7.0.lock b/sdks/go/.gocache/cache/download/github.com/google/go-cmp/@v/v0.7.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/github.com/google/uuid/@v/v1.6.0.lock b/sdks/go/.gocache/cache/download/github.com/google/uuid/@v/v1.6.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/golang.org/toolchain/@v/v0.0.1-go1.26.1.darwin-arm64.lock b/sdks/go/.gocache/cache/download/golang.org/toolchain/@v/v0.0.1-go1.26.1.darwin-arm64.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/golang.org/x/exp/@v/v0.0.0-20250106191152-7588d65b2ba8.lock b/sdks/go/.gocache/cache/download/golang.org/x/exp/@v/v0.0.0-20250106191152-7588d65b2ba8.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/golang.org/x/oauth2/@v/v0.35.0.lock b/sdks/go/.gocache/cache/download/golang.org/x/oauth2/@v/v0.35.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/golang.org/x/sync/@v/v0.20.0.lock b/sdks/go/.gocache/cache/download/golang.org/x/sync/@v/v0.20.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock b/sdks/go/.gocache/cache/download/google.golang.org/api/@v/v0.257.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/google.golang.org/grpc/@v/v1.80.0.lock b/sdks/go/.gocache/cache/download/google.golang.org/grpc/@v/v1.80.0.lock deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/sdks/go/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock b/sdks/go/.gocache/cache/download/google.golang.org/protobuf/@v/v1.36.11.lock deleted file mode 100644 index e69de29bb2d1..000000000000 From d967d4894c9a10ace691f3e50e3947c7c556248e Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 1 May 2026 13:02:55 -0700 Subject: [PATCH 07/11] fix test --- sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go index a819c53d3d7b..901adb6c7b72 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go @@ -296,7 +296,7 @@ func TestTranslate(t *testing.T) { workerURL := "gs://any-location/temp" modelURL := "gs://any-location/temp" - job, err := Translate(ctx, p, opts, workerURL, modelURL) + job, err := Translate(ctx, p, opts, workerURL, modelURL, "dummy-hash-12345") if err != nil { t.Fatalf("Translate(...) error = %v, want nil", err) } @@ -314,7 +314,7 @@ func TestTranslate(t *testing.T) { } } -func TestTranslate(t *testing.T) { +func TestTranslateWithPipelineHash(t *testing.T) { p := &pipepb.Pipeline{ Components: &pipepb.Components{ Environments: map[string]*pipepb.Environment{ From 68250091da49cbbc9d6b8ebc7c41efcf40e227d7 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 1 May 2026 14:21:14 -0700 Subject: [PATCH 08/11] fix proto hash creation --- .../runners/dataflow/internal/apiclient.py | 17 ++++++++++------- .../runners/dataflow/internal/apiclient_test.py | 10 ++++++---- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 40d619a78184..f38a2ee34bbf 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -97,7 +97,8 @@ def __init__( options, environment_version, proto_pipeline_staged_url, - proto_pipeline=None): + proto_pipeline=None, + pipeline_proto_hash=None): self.standard_options = options.view_as(StandardOptions) self.google_cloud_options = options.view_as(GoogleCloudOptions) self.worker_options = options.view_as(WorkerOptions) @@ -279,10 +280,8 @@ def __init__( for k, v in sdk_pipeline_options.items() if v is not None } options_dict["pipelineUrl"] = proto_pipeline_staged_url - if self._proto_pipeline: - serialized_pipeline = self._proto_pipeline.SerializeToString() - options_dict["pipelineProtoHash"] = hashlib.sha256( - serialized_pipeline).hexdigest() + if pipeline_proto_hash: + options_dict["pipelineProtoHash"] = pipeline_proto_hash # Don't pass impersonate_service_account through to the harness. # Though impersonation should start a job, the workers should # not try to modify their credentials. @@ -835,10 +834,13 @@ def create_job_description(self, job): resources = self._stage_resources(job.proto_pipeline, job.options) # Stage proto pipeline. + serialized_pipeline = job.proto_pipeline.SerializeToString() + pipeline_proto_hash = hashlib.sha256(serialized_pipeline).hexdigest() + self.stage_file_with_retry( job.google_cloud_options.staging_location, shared_names.STAGED_PIPELINE_FILENAME, - io.BytesIO(job.proto_pipeline.SerializeToString())) + io.BytesIO(serialized_pipeline)) job.proto.environment = Environment( proto_pipeline_staged_url=FileSystems.join( @@ -847,7 +849,8 @@ def create_job_description(self, job): packages=resources, options=job.options, environment_version=self.environment_version, - proto_pipeline=job.proto_pipeline).proto + proto_pipeline=job.proto_pipeline, + pipeline_proto_hash=pipeline_proto_hash).proto _LOGGER.debug('JOB: %s', job) @retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 5fed491f019d..e6f858d6d9f5 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -22,6 +22,7 @@ import io import itertools import json +import hashlib import logging import os import sys @@ -103,11 +104,15 @@ def test_pipeline_proto_hash(self): proto_pipeline = beam_runner_api_pb2.Pipeline() proto_pipeline.components.transforms['dummy'].unique_name = 'dummy' + expected_hash = hashlib.sha256( + proto_pipeline.SerializeToString()).hexdigest() + env = apiclient.Environment([], pipeline_options, '2.0.0', FAKE_PIPELINE_URL, - proto_pipeline) + proto_pipeline, + pipeline_proto_hash=expected_hash) recovered_options = None for additionalProperty in env.proto.sdkPipelineOptions.additionalProperties: @@ -125,9 +130,6 @@ def test_pipeline_proto_hash(self): else: self.fail('No pipelineProtoHash found') - import hashlib - expected_hash = hashlib.sha256( - proto_pipeline.SerializeToString()).hexdigest() self.assertEqual(pipeline_proto_hash.string_value, expected_hash) def test_set_network(self): From 70464a92db1a01b24210ce8c1abc4faad86e6987 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Fri, 1 May 2026 14:32:29 -0700 Subject: [PATCH 09/11] fix lint --- .../apache_beam/runners/dataflow/internal/apiclient_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index e6f858d6d9f5..43f4c8a21513 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -19,10 +19,10 @@ # pytype: skip-file +import hashlib import io import itertools import json -import hashlib import logging import os import sys From ddaa1ae0f2b69849e2ea0f7b3d24b10ca94c552d Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 4 May 2026 11:33:04 -0700 Subject: [PATCH 10/11] move pipeline hash to sdkharness --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 2 +- .../runners/dataflow/options/DataflowPipelineOptions.java | 6 ------ .../org/apache/beam/sdk/options/SdkHarnessOptions.java | 7 +++++++ 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5e8b022ab01c..ecc231ab825e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1369,7 +1369,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { dataflowOptions.setPipelineUrl(stagedPipeline.getLocation()); String pipelineProtoHash = Hashing.sha256().hashBytes(serializedProtoPipeline).toString(); - dataflowOptions.setPipelineProtoHash(pipelineProtoHash); + options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash); if (useUnifiedWorker(options)) { LOG.info("Skipping v1 transform replacements since job will run on v2."); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index f1e92695be8c..57f927d73073 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -185,12 +185,6 @@ public interface DataflowPipelineOptions void setPipelineUrl(String urlString); - /** The hex-encoded SHA256 hash of the staged portable pipeline proto. */ - @Description("The hex-encoded SHA256 hash of the staged portable pipeline proto") - String getPipelineProtoHash(); - - void setPipelineProtoHash(String hash); - @Description("The customized dataflow worker jar") String getDataflowWorkerJar(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java index 831dd69ec95f..435e7e73d3a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java @@ -481,4 +481,11 @@ public OpenTelemetry create(PipelineOptions options) { return GlobalOpenTelemetry.get(); } } + + /** The hex-encoded SHA256 hash of the staged portable pipeline proto. */ + @Description("The hex-encoded SHA256 hash of the staged portable pipeline proto") + String getPipelineProtoHash(); + + void setPipelineProtoHash(String hash); } + From 852eb3a9006b1285586516e355dc10cd92221f1f Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 4 May 2026 13:32:19 -0700 Subject: [PATCH 11/11] fix spotless --- .../main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java index 435e7e73d3a6..7267dda9ed0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java @@ -488,4 +488,3 @@ public OpenTelemetry create(PipelineOptions options) { void setPipelineProtoHash(String hash); } -