From 1abb4b4bf275e0367b0ee169047b1a26f4894b31 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 25 Mar 2026 16:39:20 -0700 Subject: [PATCH 01/15] Add checks for dependency corruption --- sdks/go/pkg/beam/artifact/materialize.go | 16 ++++++++-- sdks/go/pkg/beam/artifact/materialize_test.go | 28 +++++++++++++++++ .../runners/dataflow/internal/apiclient.py | 7 +++-- .../dataflow/internal/apiclient_test.py | 31 ++++++++++++------- 4 files changed, 64 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 624e30efcd2b..aef6fae99829 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -131,6 +131,7 @@ func newMaterializeWithClient(ctx context.Context, client jobpb.ArtifactRetrieva RoleUrn: URNStagingTo, RolePayload: rolePayload, }, + expectedSha256: filePayload.Sha256, }) } @@ -183,8 +184,9 @@ func MustExtractFilePayload(artifact *pipepb.ArtifactInformation) (string, strin } type artifact struct { - client jobpb.ArtifactRetrievalServiceClient - dep *pipepb.ArtifactInformation + client jobpb.ArtifactRetrievalServiceClient + dep *pipepb.ArtifactInformation + expectedSha256 string } func (a artifact) retrieve(ctx context.Context, dest string) error { @@ -231,7 +233,15 @@ func (a artifact) retrieve(ctx context.Context, dest string) error { stat, _ := fd.Stat() log.Printf("Downloaded: %v (sha256: %v, size: %v)", filename, sha256Hash, stat.Size()) - return fd.Close() + if err := fd.Close(); err != nil { + return err + } + + if a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { + return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256) + } + + return nil } func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, error) { diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go index 31890ed045cc..d1a3cc5d506f 100644 --- a/sdks/go/pkg/beam/artifact/materialize_test.go +++ b/sdks/go/pkg/beam/artifact/materialize_test.go @@ -266,6 +266,19 @@ func TestNewRetrieveWithResolution(t *testing.T) { checkStagedFiles(mds, dest, expected, t) } +func TestNewRetrieveWithBadShaFails(t *testing.T) { + expected := map[string]string{"a.txt": "a"} + client := &fakeRetrievalService{artifacts: expected} + dest := makeTempDir(t) + defer os.RemoveAll(dest) + ctx := grpcx.WriteWorkerID(context.Background(), "worker") + + _, err := newMaterializeWithClient(ctx, client, client.fileArtifactsWithBadSha(), dest) + if err == nil { + t.Fatalf("expected materialization to fail due to bad sha256 mismatch") + } +} + func checkStagedFiles(mds []*pipepb.ArtifactInformation, dest string, expected map[string]string, t *testing.T) { if len(mds) != len(expected) { t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected)) @@ -323,6 +336,21 @@ func (fake *fakeRetrievalService) fileArtifactsWithoutStagingTo() []*pipepb.Arti return artifacts } +func (fake *fakeRetrievalService) fileArtifactsWithBadSha() []*pipepb.ArtifactInformation { + var artifacts []*pipepb.ArtifactInformation + for name := range fake.artifacts { + payload, _ := proto.Marshal(&pipepb.ArtifactFilePayload{ + Path: filepath.Join("/tmp", name), + Sha256: "badhash", + }) + artifacts = append(artifacts, &pipepb.ArtifactInformation{ + TypeUrn: URNFileArtifact, + TypePayload: payload, + }) + } + return artifacts +} + func (fake *fakeRetrievalService) urlArtifactsWithoutStagingTo() []*pipepb.ArtifactInformation { var artifacts []*pipepb.ArtifactInformation for name := range fake.artifacts { diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 164ace532b23..193061868f0a 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -595,11 +595,12 @@ def _stage_resources(self, pipeline, options): else: remote_name = os.path.basename(type_payload.path) is_staged_role = False - - if self._enable_caching and not type_payload.sha256: + # compute sha256 even if caching is disabled. + # This is used to check the payload integrity along with caching. + if not type_payload.sha256: type_payload.sha256 = self._compute_sha256(type_payload.path) - if type_payload.sha256 and type_payload.sha256 in staged_hashes: + if self._enable_caching and type_payload.sha256 and type_payload.sha256 in staged_hashes: _LOGGER.info( 'Found duplicated artifact sha256: %s (%s)', type_payload.path, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index b767cef86b2e..8d1efd893b5b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1340,13 +1340,14 @@ def test_stage_resources(self): ]) })) client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(apiclient._LegacyDataflowStager, - 'stage_job_resources') as mock_stager: - client._stage_resources(pipeline, pipeline_options) + with mock.patch.object(apiclient.DataflowApplicationClient, '_compute_sha256', return_value='dummy_hash'): + with mock.patch.object(apiclient._LegacyDataflowStager, + 'stage_job_resources') as mock_stager: + client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [('/tmp/foo1', 'foo1', ''), ('/tmp/bar1', 'bar1', ''), - ('/tmp/baz', 'baz1', ''), ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', ''), ('/tmp/bar2', 'bar2', '')], + [('/tmp/foo1', 'foo1', 'dummy_hash'), ('/tmp/bar1', 'bar1', 'dummy_hash'), + ('/tmp/baz', 'baz1', 'dummy_hash'), ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', 'dummy_hash'), ('/tmp/bar2', 'bar2', 'dummy_hash')], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1357,7 +1358,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo1' + url='gs://test-location/staging/foo1', + sha256='dummy_hash' ).SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1366,7 +1368,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar1'). + url='gs://test-location/staging/bar1', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1375,7 +1378,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1'). + url='gs://test-location/staging/baz1', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1396,7 +1400,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo2'). + url='gs://test-location/staging/foo2', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1405,7 +1410,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar2'). + url='gs://test-location/staging/bar2', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1414,7 +1420,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1'). + url='gs://test-location/staging/baz1', + sha256='dummy_hash'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. From 106c037b4cc5d53e954663dca72d3189fe7b1f72 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 08:13:12 -0700 Subject: [PATCH 02/15] Add experiment to disable checks --- sdks/go/container/boot.go | 3 ++ sdks/go/pkg/beam/artifact/materialize.go | 25 +++++++++- sdks/go/pkg/beam/artifact/materialize_test.go | 47 +++++++++++++++++++ sdks/java/container/boot.go | 3 ++ sdks/python/container/boot.go | 3 ++ sdks/typescript/container/boot.go | 3 ++ 6 files changed, 83 insertions(+), 1 deletion(-) diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index b75201520f39..4e93f01b9d15 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -158,6 +158,9 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + // Inject pipeline options into context + ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // (2) Retrieve the staged files. // // The Go SDK harness downloads the worker binary and invokes diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index aef6fae99829..2a787a1adbcb 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -39,6 +39,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" "google.golang.org/protobuf/proto" + structpb "google.golang.org/protobuf/types/known/structpb" ) // TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto @@ -237,7 +238,7 @@ func (a artifact) retrieve(ctx context.Context, dest string) error { return err } - if a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { + if isArtifactValidationEnabled(ctx) && a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256) } @@ -521,3 +522,25 @@ func queue2slice(q chan *jobpb.ArtifactMetadata) []*jobpb.ArtifactMetadata { } return ret } + +type contextKey string + +const pipelineOptionsKey contextKey = "pipeline_options" + +// Returns a new context carrying the full pipeline options struct. +func WithPipelineOptions(ctx context.Context, options *structpb.Struct) context.Context { + return context.WithValue(ctx, pipelineOptionsKey, options) +} + +// Parses pipeline options to check if "disable_integrity_checks" is enabled. +func isArtifactValidationEnabled(ctx context.Context) bool { + options, _ := ctx.Value(pipelineOptionsKey).(*structpb.Struct) + if options != nil { + for _, v := range options.GetFields()["options"].GetStructValue().GetFields()["experiments"].GetListValue().GetValues() { + if v.GetStringValue() == "disable_integrity_checks" { + return false + } + } + } + return true +} diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go index d1a3cc5d506f..7901675e0f01 100644 --- a/sdks/go/pkg/beam/artifact/materialize_test.go +++ b/sdks/go/pkg/beam/artifact/materialize_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" + structpb "google.golang.org/protobuf/types/known/structpb" ) // TestRetrieve tests that we can successfully retrieve fresh files. @@ -266,6 +267,23 @@ func TestNewRetrieveWithResolution(t *testing.T) { checkStagedFiles(mds, dest, expected, t) } +func TestIsArtifactValidationEnabled(t *testing.T) { + ctx := context.Background() + if !isArtifactValidationEnabled(ctx) { + t.Errorf("empty context should have validation enabled") + } + + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"disable_integrity_checks"}, + }, + }) + ctx2 := WithPipelineOptions(ctx, options) + if isArtifactValidationEnabled(ctx2) { + t.Errorf("populated context should have validation disabled") + } +} + func TestNewRetrieveWithBadShaFails(t *testing.T) { expected := map[string]string{"a.txt": "a"} client := &fakeRetrievalService{artifacts: expected} @@ -279,6 +297,35 @@ func TestNewRetrieveWithBadShaFails(t *testing.T) { } } +func TestNewRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) { + expected := map[string]string{"a.txt": "a"} + client := &fakeRetrievalService{artifacts: expected} + dest := makeTempDir(t) + defer os.RemoveAll(dest) + + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"disable_integrity_checks"}, + }, + }) + ctx := WithPipelineOptions(grpcx.WriteWorkerID(context.Background(), "worker"), options) + + mds, err := newMaterializeWithClient(ctx, client, client.fileArtifactsWithBadSha(), dest) + if err != nil { + t.Fatalf("materialize failed but should have succeeded because validation was disabled via experiment: %v", err) + } + + generated := make(map[string]string) + for _, md := range mds { + name, _ := MustExtractFilePayload(md) + payload, _ := proto.Marshal(&pipepb.ArtifactStagingToRolePayload{ + StagedName: name}) + generated[name] = string(payload) + } + + checkStagedFiles(mds, dest, generated, t) +} + func checkStagedFiles(mds []*pipepb.ArtifactInformation, dest string, expected map[string]string, t *testing.T) { if len(mds) != len(expected) { t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected)) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index f6c33b635d3c..a5005e4dc320 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -105,6 +105,9 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + // Inject pipeline options into context + ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // (2) Retrieve the staged user jars. We ignore any disk limit, // because the staged jars are mandatory. diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 7c0f22675daf..e0d2fd3d0937 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -184,6 +184,9 @@ func launchSDKProcess() error { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + // Inject pipeline options into context + ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + experiments := getExperiments(options) pipNoBuildIsolation = false if slices.Contains(experiments, "pip_no_build_isolation") { diff --git a/sdks/typescript/container/boot.go b/sdks/typescript/container/boot.go index 44f94f804330..70c512d62b04 100644 --- a/sdks/typescript/container/boot.go +++ b/sdks/typescript/container/boot.go @@ -91,6 +91,9 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } + // Inject pipeline options into context + ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // (2) Retrieve and install the staged packages. dir := filepath.Join(*semiPersistDir, *id, "staged") From 082e3db68d828e1c9eea0903fae73cc8c8532a12 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 09:50:08 -0700 Subject: [PATCH 03/15] Add flag for legacy function too --- sdks/go/pkg/beam/artifact/materialize.go | 2 +- sdks/go/pkg/beam/artifact/materialize_test.go | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 2a787a1adbcb..4d117b049956 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -453,7 +453,7 @@ func retrieve(ctx context.Context, client jobpb.LegacyArtifactRetrievalServiceCl } // Artifact Sha256 hash is an optional field in metadata so we should only validate when its present. - if a.Sha256 != "" && sha256Hash != a.Sha256 { + if isArtifactValidationEnabled(ctx) && a.Sha256 != "" && sha256Hash != a.Sha256 { return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256) } return nil diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go index 7901675e0f01..9d527569d1bc 100644 --- a/sdks/go/pkg/beam/artifact/materialize_test.go +++ b/sdks/go/pkg/beam/artifact/materialize_test.go @@ -83,6 +83,57 @@ func TestMultiRetrieve(t *testing.T) { } } +func TestRetrieveWithBadShaFails(t *testing.T) { + cc := startServer(t) + defer cc.Close() + + ctx := grpcx.WriteWorkerID(context.Background(), "idA") + keys := []string{"foo"} + st := "whatever" + rt, artifacts := populate(ctx, cc, t, keys, 300, st) + + dst := makeTempDir(t) + defer os.RemoveAll(dst) + + client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc) + for _, a := range artifacts { + a.Sha256 = "badhash" // mutate hash + if err := Retrieve(ctx, client, a, rt, dst); err == nil { + t.Errorf("expected materialization to fail due to bad sha256 mismatch") + } + } +} + +func TestRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) { + cc := startServer(t) + defer cc.Close() + + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"disable_integrity_checks"}, + }, + }) + ctx := WithPipelineOptions(grpcx.WriteWorkerID(context.Background(), "idA"), options) + keys := []string{"foo"} + st := "whatever" + rt, artifacts := populate(ctx, cc, t, keys, 300, st) + + dst := makeTempDir(t) + defer os.RemoveAll(dst) + + client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc) + for _, a := range artifacts { + originalHash := a.Sha256 + a.Sha256 = "badhash" // mutate hash + filename := makeFilename(dst, a.Name) + if err := Retrieve(ctx, client, a, rt, dst); err != nil { + t.Errorf("materialize failed but should have succeeded because validation was disabled via experiment: %v", err) + continue + } + verifySHA256(t, filename, originalHash) + } +} + // populate stages a set of artifacts with the given keys, each with // slightly different sizes and chucksizes. func populate(ctx context.Context, cc *grpc.ClientConn, t *testing.T, keys []string, size int, st string) (string, []*jobpb.ArtifactMetadata) { From 3ad52e7687cef4611280daf60b5013093032120f Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 10:49:01 -0700 Subject: [PATCH 04/15] fix lint --- sdks/go/pkg/beam/artifact/materialize.go | 4 +-- .../dataflow/internal/apiclient_test.py | 31 +++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 4d117b049956..edd59a26da17 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -527,12 +527,12 @@ type contextKey string const pipelineOptionsKey contextKey = "pipeline_options" -// Returns a new context carrying the full pipeline options struct. +// WithPipelineOptions returns a new context carrying the full pipeline options struct. func WithPipelineOptions(ctx context.Context, options *structpb.Struct) context.Context { return context.WithValue(ctx, pipelineOptionsKey, options) } -// Parses pipeline options to check if "disable_integrity_checks" is enabled. +// isArtifactValidationEnabled parses pipeline options to check if "disable_integrity_checks" is enabled. func isArtifactValidationEnabled(ctx context.Context) bool { options, _ := ctx.Value(pipelineOptionsKey).(*structpb.Struct) if options != nil { diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 8d1efd893b5b..c6cecc3c66f6 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1340,14 +1340,19 @@ def test_stage_resources(self): ]) })) client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(apiclient.DataflowApplicationClient, '_compute_sha256', return_value='dummy_hash'): + with mock.patch.object(apiclient.DataflowApplicationClient, + '_compute_sha256', + return_value='dummy_hash'): with mock.patch.object(apiclient._LegacyDataflowStager, 'stage_job_resources') as mock_stager: client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [('/tmp/foo1', 'foo1', 'dummy_hash'), ('/tmp/bar1', 'bar1', 'dummy_hash'), - ('/tmp/baz', 'baz1', 'dummy_hash'), ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', 'dummy_hash'), ('/tmp/bar2', 'bar2', 'dummy_hash')], + [('/tmp/foo1', 'foo1', 'dummy_hash'), + ('/tmp/bar1', 'bar1', 'dummy_hash'), + ('/tmp/baz', 'baz1', 'dummy_hash'), + ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', 'dummy_hash'), + ('/tmp/bar2', 'bar2', 'dummy_hash')], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1359,8 +1364,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/foo1', - sha256='dummy_hash' - ).SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1369,8 +1373,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/bar1', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1379,8 +1382,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/baz1', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1401,8 +1403,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/foo2', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1411,8 +1412,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/bar2', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1421,8 +1421,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/baz1', - sha256='dummy_hash'). - SerializeToString(), + sha256='dummy_hash').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( From d0b0fbbb590a48b2c0f87e43a4783eb43466fae4 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 12:38:50 -0700 Subject: [PATCH 05/15] fix python flag --- .../runners/dataflow/internal/apiclient.py | 2 +- .../dataflow/internal/apiclient_test.py | 42 ++++++++----------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 193061868f0a..fe36f56df427 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -600,7 +600,7 @@ def _stage_resources(self, pipeline, options): if not type_payload.sha256: type_payload.sha256 = self._compute_sha256(type_payload.path) - if self._enable_caching and type_payload.sha256 and type_payload.sha256 in staged_hashes: + if type_payload.sha256 and type_payload.sha256 in staged_hashes: _LOGGER.info( 'Found duplicated artifact sha256: %s (%s)', type_payload.path, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index c6cecc3c66f6..b767cef86b2e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1340,19 +1340,13 @@ def test_stage_resources(self): ]) })) client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(apiclient.DataflowApplicationClient, - '_compute_sha256', - return_value='dummy_hash'): - with mock.patch.object(apiclient._LegacyDataflowStager, - 'stage_job_resources') as mock_stager: - client._stage_resources(pipeline, pipeline_options) + with mock.patch.object(apiclient._LegacyDataflowStager, + 'stage_job_resources') as mock_stager: + client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [('/tmp/foo1', 'foo1', 'dummy_hash'), - ('/tmp/bar1', 'bar1', 'dummy_hash'), - ('/tmp/baz', 'baz1', 'dummy_hash'), - ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', 'dummy_hash'), - ('/tmp/bar2', 'bar2', 'dummy_hash')], + [('/tmp/foo1', 'foo1', ''), ('/tmp/bar1', 'bar1', ''), + ('/tmp/baz', 'baz1', ''), ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', ''), ('/tmp/bar2', 'bar2', '')], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1363,8 +1357,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo1', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/foo1' + ).SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1372,8 +1366,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar1', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/bar1'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1381,8 +1375,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/baz1'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1402,8 +1396,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo2', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/foo2'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1411,8 +1405,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar2', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/bar2'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1420,8 +1414,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1', - sha256='dummy_hash').SerializeToString(), + url='gs://test-location/staging/baz1'). + SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( From 6581d90e321d48fcfb1a0e1fe64a827292b8ef6e Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 13:21:04 -0700 Subject: [PATCH 06/15] fix test --- .../dataflow/internal/apiclient_test.py | 38 +++++++++++++------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index b767cef86b2e..adb1feab53f7 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1340,13 +1340,21 @@ def test_stage_resources(self): ]) })) client = apiclient.DataflowApplicationClient(pipeline_options) - with mock.patch.object(apiclient._LegacyDataflowStager, - 'stage_job_resources') as mock_stager: - client._stage_resources(pipeline, pipeline_options) + with mock.patch.object(apiclient.DataflowApplicationClient, + '_compute_sha256', + side_effect=lambda path: 'hash' + path): + with mock.patch.object(apiclient._LegacyDataflowStager, + 'stage_job_resources') as mock_stager: + client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [('/tmp/foo1', 'foo1', ''), ('/tmp/bar1', 'bar1', ''), - ('/tmp/baz', 'baz1', ''), ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', ''), ('/tmp/bar2', 'bar2', '')], + [ + ('/tmp/foo1', 'foo1', 'hash/tmp/foo1'), + ('/tmp/bar1', 'bar1', 'hash/tmp/bar1'), + ('/tmp/baz', 'baz1', 'hash/tmp/baz'), + ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', 'hash/tmp/foo2'), + ('/tmp/bar2', 'bar2', 'hash/tmp/bar2') + ], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1357,7 +1365,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo1' + url='gs://test-location/staging/foo1', + sha256='hash/tmp/foo1' ).SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1366,7 +1375,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar1'). + url='gs://test-location/staging/bar1', + sha256='hash/tmp/bar1'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1375,7 +1385,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1'). + url='gs://test-location/staging/baz1', + sha256='hash/tmp/baz'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1396,7 +1407,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/foo2'). + url='gs://test-location/staging/foo2', + sha256='hash/tmp/foo2'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1405,7 +1417,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/bar2'). + url='gs://test-location/staging/bar2', + sha256='hash/tmp/bar2'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. @@ -1414,7 +1427,8 @@ def test_stage_resources(self): beam_runner_api_pb2.ArtifactInformation( type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( - url='gs://test-location/staging/baz1'). + url='gs://test-location/staging/baz1', + sha256='hash/tmp/baz'). SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. From 643fe5a7e73afb4ce5d6ea02012f45972691b8cb Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 26 Mar 2026 13:49:27 -0700 Subject: [PATCH 07/15] fix formatter --- .../dataflow/internal/apiclient_test.py | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index adb1feab53f7..51f4264d3e45 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -1347,14 +1347,12 @@ def test_stage_resources(self): 'stage_job_resources') as mock_stager: client._stage_resources(pipeline, pipeline_options) mock_stager.assert_called_once_with( - [ - ('/tmp/foo1', 'foo1', 'hash/tmp/foo1'), - ('/tmp/bar1', 'bar1', 'hash/tmp/bar1'), - ('/tmp/baz', 'baz1', 'hash/tmp/baz'), - ('/tmp/renamed1', 'renamed1', 'abcdefg'), - ('/tmp/foo2', 'foo2', 'hash/tmp/foo2'), - ('/tmp/bar2', 'bar2', 'hash/tmp/bar2') - ], + [('/tmp/foo1', 'foo1', 'hash/tmp/foo1'), + ('/tmp/bar1', 'bar1', 'hash/tmp/bar1'), + ('/tmp/baz', 'baz1', 'hash/tmp/baz'), + ('/tmp/renamed1', 'renamed1', 'abcdefg'), + ('/tmp/foo2', 'foo2', 'hash/tmp/foo2'), + ('/tmp/bar2', 'bar2', 'hash/tmp/bar2')], staging_location='gs://test-location/staging') pipeline_expected = beam_runner_api_pb2.Pipeline( @@ -1366,8 +1364,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/foo1', - sha256='hash/tmp/foo1' - ).SerializeToString(), + sha256='hash/tmp/foo1').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1376,8 +1373,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/bar1', - sha256='hash/tmp/bar1'). - SerializeToString(), + sha256='hash/tmp/bar1').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1386,8 +1382,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/baz1', - sha256='hash/tmp/baz'). - SerializeToString(), + sha256='hash/tmp/baz').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1408,8 +1403,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/foo2', - sha256='hash/tmp/foo2'). - SerializeToString(), + sha256='hash/tmp/foo2').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1418,8 +1412,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/bar2', - sha256='hash/tmp/bar2'). - SerializeToString(), + sha256='hash/tmp/bar2').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( @@ -1428,8 +1421,7 @@ def test_stage_resources(self): type_urn=common_urns.artifact_types.URL.urn, type_payload=beam_runner_api_pb2.ArtifactUrlPayload( url='gs://test-location/staging/baz1', - sha256='hash/tmp/baz'). - SerializeToString(), + sha256='hash/tmp/baz').SerializeToString(), role_urn=common_urns.artifact_roles.STAGING_TO.urn, role_payload=beam_runner_api_pb2. ArtifactStagingToRolePayload( From 1b114e2e6d06a8041e98fac056b414ab8231d53d Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 30 Apr 2026 15:26:51 -0700 Subject: [PATCH 08/15] update flag from server --- sdks/go/pkg/beam/artifact/materialize.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index edd59a26da17..261c794add5e 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -537,7 +537,7 @@ func isArtifactValidationEnabled(ctx context.Context) bool { options, _ := ctx.Value(pipelineOptionsKey).(*structpb.Struct) if options != nil { for _, v := range options.GetFields()["options"].GetStructValue().GetFields()["experiments"].GetListValue().GetValues() { - if v.GetStringValue() == "disable_integrity_checks" { + if v.GetStringValue() == "disable_staged_file_integrity_checks" { return false } } From b6d6133e84990be46c1ec75248bc7cca4d302255 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 30 Apr 2026 16:35:16 -0700 Subject: [PATCH 09/15] fix tests --- sdks/go/container/boot.go | 1 + sdks/go/pkg/beam/artifact/materialize_test.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index 4e93f01b9d15..4bcad03150f3 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -149,6 +149,7 @@ func main() { log.Fatalf("Endpoint not set: %v", err) } logger := &tools.Logger{Endpoint: *loggingEndpoint} + log.SetOutput(tools.NewBufferedLoggerWithFlushInterval(ctx, logger, 0)) logger.Printf(ctx, "Initializing Go harness: %v", strings.Join(os.Args, " ")) // (1) Obtain the pipeline options diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go index 9d527569d1bc..f1957c9cf0d0 100644 --- a/sdks/go/pkg/beam/artifact/materialize_test.go +++ b/sdks/go/pkg/beam/artifact/materialize_test.go @@ -110,7 +110,7 @@ func TestRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) { options, _ := structpb.NewStruct(map[string]interface{}{ "options": map[string]interface{}{ - "experiments": []interface{}{"disable_integrity_checks"}, + "experiments": []interface{}{"disable_staged_file_integrity_checks"}, }, }) ctx := WithPipelineOptions(grpcx.WriteWorkerID(context.Background(), "idA"), options) @@ -326,7 +326,7 @@ func TestIsArtifactValidationEnabled(t *testing.T) { options, _ := structpb.NewStruct(map[string]interface{}{ "options": map[string]interface{}{ - "experiments": []interface{}{"disable_integrity_checks"}, + "experiments": []interface{}{"disable_staged_file_integrity_checks"}, }, }) ctx2 := WithPipelineOptions(ctx, options) @@ -356,7 +356,7 @@ func TestNewRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) { options, _ := structpb.NewStruct(map[string]interface{}{ "options": map[string]interface{}{ - "experiments": []interface{}{"disable_integrity_checks"}, + "experiments": []interface{}{"disable_staged_file_integrity_checks"}, }, }) ctx := WithPipelineOptions(grpcx.WriteWorkerID(context.Background(), "worker"), options) From 00d67bfb7673d8427155f81700a72c2ee5eae568 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 4 May 2026 11:37:34 -0700 Subject: [PATCH 10/15] Add log for feature rollout validation --- sdks/go/pkg/beam/artifact/materialize.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 261c794add5e..cd2f5cb7e397 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -238,8 +238,11 @@ func (a artifact) retrieve(ctx context.Context, dest string) error { return err } - if isArtifactValidationEnabled(ctx) && a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { - return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256) + if isArtifactValidationEnabled(ctx) { + if a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { + return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256) + } + log.Printf("Sha256 validation done for file: %v, with sha256 %v", filename, a.expectedSha256) } return nil @@ -453,8 +456,11 @@ func retrieve(ctx context.Context, client jobpb.LegacyArtifactRetrievalServiceCl } // Artifact Sha256 hash is an optional field in metadata so we should only validate when its present. - if isArtifactValidationEnabled(ctx) && a.Sha256 != "" && sha256Hash != a.Sha256 { - return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256) + if isArtifactValidationEnabled(ctx) { + if a.Sha256 != "" && sha256Hash != a.Sha256 { + return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256) + } + log.Printf("Sha256 validation done for file: %v, with sha256 %v", filename, a.Sha256) } return nil } From cf1c46b6c4347b1e49980f9491428c95c117148a Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 4 May 2026 13:20:00 -0700 Subject: [PATCH 11/15] Add PipelineOptions util --- sdks/go/pkg/beam/artifact/materialize.go | 20 +------ sdks/go/pkg/beam/artifact/options.go | 69 +++++++++++++++++++++++ sdks/go/pkg/beam/artifact/options_test.go | 69 +++++++++++++++++++++++ 3 files changed, 139 insertions(+), 19 deletions(-) create mode 100644 sdks/go/pkg/beam/artifact/options.go create mode 100644 sdks/go/pkg/beam/artifact/options_test.go diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index cd2f5cb7e397..a36e30357189 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -39,7 +39,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx" "google.golang.org/protobuf/proto" - structpb "google.golang.org/protobuf/types/known/structpb" ) // TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto @@ -529,24 +528,7 @@ func queue2slice(q chan *jobpb.ArtifactMetadata) []*jobpb.ArtifactMetadata { return ret } -type contextKey string - -const pipelineOptionsKey contextKey = "pipeline_options" - -// WithPipelineOptions returns a new context carrying the full pipeline options struct. -func WithPipelineOptions(ctx context.Context, options *structpb.Struct) context.Context { - return context.WithValue(ctx, pipelineOptionsKey, options) -} - // isArtifactValidationEnabled parses pipeline options to check if "disable_integrity_checks" is enabled. func isArtifactValidationEnabled(ctx context.Context) bool { - options, _ := ctx.Value(pipelineOptionsKey).(*structpb.Struct) - if options != nil { - for _, v := range options.GetFields()["options"].GetStructValue().GetFields()["experiments"].GetListValue().GetValues() { - if v.GetStringValue() == "disable_staged_file_integrity_checks" { - return false - } - } - } - return true + return !HasExperiment(PipelineOptions(ctx), "disable_staged_file_integrity_checks") } diff --git a/sdks/go/pkg/beam/artifact/options.go b/sdks/go/pkg/beam/artifact/options.go new file mode 100644 index 000000000000..35d07ebc5370 --- /dev/null +++ b/sdks/go/pkg/beam/artifact/options.go @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package artifact + +import ( + "context" + + structpb "google.golang.org/protobuf/types/known/structpb" +) + +type contextKey string + +const pipelineOptionsKey contextKey = "pipeline_options" + +// WithPipelineOptions returns a new context carrying the full pipeline options struct. +func WithPipelineOptions(ctx context.Context, options *structpb.Struct) context.Context { + return context.WithValue(ctx, pipelineOptionsKey, options) +} + +// PipelineOptions returns the pipeline options from the context if present. +func PipelineOptions(ctx context.Context) *structpb.Struct { + options, _ := ctx.Value(pipelineOptionsKey).(*structpb.Struct) + return options +} + +// GetExperiments extracts a list of experiments from the pipeline options. +func GetExperiments(options *structpb.Struct) []string { + if options == nil { + return nil + } + + // Try legacy style first + var exps []string + for _, v := range options.GetFields()["options"].GetStructValue().GetFields()["experiments"].GetListValue().GetValues() { + exps = append(exps, v.GetStringValue()) + } + if len(exps) > 0 { + return exps + } + + // Try URN style + for _, v := range options.GetFields()["beam:option:experiments:v1"].GetListValue().GetValues() { + exps = append(exps, v.GetStringValue()) + } + return exps +} + +// HasExperiment checks if a specific experiment is enabled in the pipeline options. +func HasExperiment(options *structpb.Struct, experiment string) bool { + for _, exp := range GetExperiments(options) { + if exp == experiment { + return true + } + } + return false +} diff --git a/sdks/go/pkg/beam/artifact/options_test.go b/sdks/go/pkg/beam/artifact/options_test.go new file mode 100644 index 000000000000..429c18d0eb39 --- /dev/null +++ b/sdks/go/pkg/beam/artifact/options_test.go @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package artifact + +import ( + "context" + "testing" + + structpb "google.golang.org/protobuf/types/known/structpb" +) + +func TestPipelineOptionsAndExperiments(t *testing.T) { + ctx := context.Background() + if PipelineOptions(ctx) != nil { + t.Errorf("empty context should have nil pipeline options") + } + if len(GetExperiments(PipelineOptions(ctx))) != 0 { + t.Errorf("empty context should have no experiments") + } + + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"exp1", "exp2"}, + }, + }) + ctx2 := WithPipelineOptions(ctx, options) + if PipelineOptions(ctx2) == nil { + t.Errorf("context should have pipeline options") + } + exps := GetExperiments(options) + if len(exps) != 2 || exps[0] != "exp1" || exps[1] != "exp2" { + t.Errorf("GetExperiments() = %v, want [exp1 exp2]", exps) + } + if !HasExperiment(options, "exp1") { + t.Errorf("HasExperiment(exp1) = false, want true") + } + if HasExperiment(options, "exp3") { + t.Errorf("HasExperiment(exp3) = true, want false") + } + + // Test URN Style + urnOptions, _ := structpb.NewStruct(map[string]interface{}{ + "beam:option:experiments:v1": []interface{}{"expA", "expB"}, + }) + ctx3 := WithPipelineOptions(ctx, urnOptions) + if PipelineOptions(ctx3) == nil { + t.Errorf("context should have pipeline options") + } + expsURN := GetExperiments(urnOptions) + if len(expsURN) != 2 || expsURN[0] != "expA" || expsURN[1] != "expB" { + t.Errorf("GetExperiments() = %v, want [expA expB]", expsURN) + } + if !HasExperiment(urnOptions, "expB") { + t.Errorf("HasExperiment(expB) = false, want true") + } +} From 22c874c785058127d05712010e581909f28acf0b Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 4 May 2026 13:29:07 -0700 Subject: [PATCH 12/15] refactor test --- sdks/go/pkg/beam/artifact/options_test.go | 55 ++++++++++++++--------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/sdks/go/pkg/beam/artifact/options_test.go b/sdks/go/pkg/beam/artifact/options_test.go index 429c18d0eb39..ffec731ff818 100644 --- a/sdks/go/pkg/beam/artifact/options_test.go +++ b/sdks/go/pkg/beam/artifact/options_test.go @@ -22,48 +22,63 @@ import ( structpb "google.golang.org/protobuf/types/known/structpb" ) -func TestPipelineOptionsAndExperiments(t *testing.T) { +func TestPipelineOptionsContext(t *testing.T) { ctx := context.Background() if PipelineOptions(ctx) != nil { t.Errorf("empty context should have nil pipeline options") } - if len(GetExperiments(PipelineOptions(ctx))) != 0 { - t.Errorf("empty context should have no experiments") - } options, _ := structpb.NewStruct(map[string]interface{}{ "options": map[string]interface{}{ - "experiments": []interface{}{"exp1", "exp2"}, + "experiments": []interface{}{"exp1"}, }, }) - ctx2 := WithPipelineOptions(ctx, options) - if PipelineOptions(ctx2) == nil { + ctxWithOpts := WithPipelineOptions(ctx, options) + got := PipelineOptions(ctxWithOpts) + if got == nil { t.Errorf("context should have pipeline options") } +} + +func TestGetExperiments_Nil(t *testing.T) { + if got := GetExperiments(nil); got != nil { + t.Errorf("GetExperiments(nil) = %v, want nil", got) + } +} + +func TestGetExperiments_Legacy(t *testing.T) { + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"exp1", "exp2"}, + }, + }) exps := GetExperiments(options) if len(exps) != 2 || exps[0] != "exp1" || exps[1] != "exp2" { t.Errorf("GetExperiments() = %v, want [exp1 exp2]", exps) } - if !HasExperiment(options, "exp1") { - t.Errorf("HasExperiment(exp1) = false, want true") - } - if HasExperiment(options, "exp3") { - t.Errorf("HasExperiment(exp3) = true, want false") - } +} - // Test URN Style +func TestGetExperiments_URN(t *testing.T) { urnOptions, _ := structpb.NewStruct(map[string]interface{}{ "beam:option:experiments:v1": []interface{}{"expA", "expB"}, }) - ctx3 := WithPipelineOptions(ctx, urnOptions) - if PipelineOptions(ctx3) == nil { - t.Errorf("context should have pipeline options") - } expsURN := GetExperiments(urnOptions) if len(expsURN) != 2 || expsURN[0] != "expA" || expsURN[1] != "expB" { t.Errorf("GetExperiments() = %v, want [expA expB]", expsURN) } - if !HasExperiment(urnOptions, "expB") { - t.Errorf("HasExperiment(expB) = false, want true") +} + +func TestHasExperiment(t *testing.T) { + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"exp1", "exp2"}, + }, + }) + + if !HasExperiment(options, "exp1") { + t.Errorf("HasExperiment(exp1) = false, want true") + } + if HasExperiment(options, "exp3") { + t.Errorf("HasExperiment(exp3) = true, want false") } } From 9a5da8a9a184f916362cbc749bff65f4c0296c8d Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 4 May 2026 20:15:53 -0700 Subject: [PATCH 13/15] Fix suggestions --- sdks/go/container/boot.go | 4 +-- sdks/go/pkg/beam/artifact/materialize.go | 33 +++++++++++++++---- sdks/go/pkg/beam/artifact/materialize_test.go | 24 +++----------- sdks/go/pkg/beam/artifact/options.go | 23 +------------ sdks/go/pkg/beam/artifact/options_test.go | 32 ++++++++---------- sdks/java/container/boot.go | 4 +-- sdks/python/container/boot.go | 4 +-- sdks/typescript/container/boot.go | 4 +-- 8 files changed, 52 insertions(+), 76 deletions(-) diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index 4bcad03150f3..3d56e10abc16 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -159,8 +159,8 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } - // Inject pipeline options into context - ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // Inject artifact validation enabled state into context + ctx = artifact.WithArtifactValidation(ctx, !artifact.HasExperiment(info.GetPipelineOptions(), "disable_staged_file_integrity_checks")) // (2) Retrieve the staged files. // diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index a36e30357189..761f70101499 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -34,6 +34,7 @@ import ( "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx" @@ -51,6 +52,28 @@ const ( NoArtifactsStaged = "__no_artifacts_staged__" ) +type validationKey string + +const artifactValidationKey validationKey = "artifact_validation_enabled" + +// WithArtifactValidation returns a new context carrying the artifact validation enabled state. +func WithArtifactValidation(ctx context.Context, enabled bool) context.Context { + return context.WithValue(ctx, artifactValidationKey, enabled) +} + +// ArtifactValidation returns the artifact validation enabled state from the context. +func ArtifactValidation(ctx context.Context) bool { + if val, ok := ctx.Value(artifactValidationKey).(bool); ok { + return val + } + return true +} + +// isArtifactValidationEnabled parses pipeline options to check if "disable_integrity_checks" is enabled. +func isArtifactValidationEnabled(ctx context.Context) bool { + return ArtifactValidation(ctx) +} + // Materialize is a convenience helper for ensuring that all artifacts are // present and uncorrupted. It interprets each artifact name as a relative // path under the dest directory. It does not retrieve valid artifacts already @@ -456,10 +479,11 @@ func retrieve(ctx context.Context, client jobpb.LegacyArtifactRetrievalServiceCl // Artifact Sha256 hash is an optional field in metadata so we should only validate when its present. if isArtifactValidationEnabled(ctx) { - if a.Sha256 != "" && sha256Hash != a.Sha256 { + if a.Sha256 == "" { + beamlog.Warnf(ctx, "Artifact validation skipped for file: %v", filename) + } else if sha256Hash != a.Sha256 { return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256) } - log.Printf("Sha256 validation done for file: %v, with sha256 %v", filename, a.Sha256) } return nil } @@ -527,8 +551,3 @@ func queue2slice(q chan *jobpb.ArtifactMetadata) []*jobpb.ArtifactMetadata { } return ret } - -// isArtifactValidationEnabled parses pipeline options to check if "disable_integrity_checks" is enabled. -func isArtifactValidationEnabled(ctx context.Context) bool { - return !HasExperiment(PipelineOptions(ctx), "disable_staged_file_integrity_checks") -} diff --git a/sdks/go/pkg/beam/artifact/materialize_test.go b/sdks/go/pkg/beam/artifact/materialize_test.go index f1957c9cf0d0..bf27e13e8a89 100644 --- a/sdks/go/pkg/beam/artifact/materialize_test.go +++ b/sdks/go/pkg/beam/artifact/materialize_test.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" - structpb "google.golang.org/protobuf/types/known/structpb" ) // TestRetrieve tests that we can successfully retrieve fresh files. @@ -108,12 +107,7 @@ func TestRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) { cc := startServer(t) defer cc.Close() - options, _ := structpb.NewStruct(map[string]interface{}{ - "options": map[string]interface{}{ - "experiments": []interface{}{"disable_staged_file_integrity_checks"}, - }, - }) - ctx := WithPipelineOptions(grpcx.WriteWorkerID(context.Background(), "idA"), options) + ctx := WithArtifactValidation(grpcx.WriteWorkerID(context.Background(), "idA"), false) keys := []string{"foo"} st := "whatever" rt, artifacts := populate(ctx, cc, t, keys, 300, st) @@ -324,14 +318,9 @@ func TestIsArtifactValidationEnabled(t *testing.T) { t.Errorf("empty context should have validation enabled") } - options, _ := structpb.NewStruct(map[string]interface{}{ - "options": map[string]interface{}{ - "experiments": []interface{}{"disable_staged_file_integrity_checks"}, - }, - }) - ctx2 := WithPipelineOptions(ctx, options) + ctx2 := WithArtifactValidation(ctx, false) if isArtifactValidationEnabled(ctx2) { - t.Errorf("populated context should have validation disabled") + t.Errorf("context with validation disabled should have validation disabled") } } @@ -354,12 +343,7 @@ func TestNewRetrieveWithBadShaAndExperimentSucceeds(t *testing.T) { dest := makeTempDir(t) defer os.RemoveAll(dest) - options, _ := structpb.NewStruct(map[string]interface{}{ - "options": map[string]interface{}{ - "experiments": []interface{}{"disable_staged_file_integrity_checks"}, - }, - }) - ctx := WithPipelineOptions(grpcx.WriteWorkerID(context.Background(), "worker"), options) + ctx := WithArtifactValidation(grpcx.WriteWorkerID(context.Background(), "worker"), false) mds, err := newMaterializeWithClient(ctx, client, client.fileArtifactsWithBadSha(), dest) if err != nil { diff --git a/sdks/go/pkg/beam/artifact/options.go b/sdks/go/pkg/beam/artifact/options.go index 35d07ebc5370..47356433161c 100644 --- a/sdks/go/pkg/beam/artifact/options.go +++ b/sdks/go/pkg/beam/artifact/options.go @@ -16,41 +16,20 @@ package artifact import ( - "context" - structpb "google.golang.org/protobuf/types/known/structpb" ) -type contextKey string - -const pipelineOptionsKey contextKey = "pipeline_options" - -// WithPipelineOptions returns a new context carrying the full pipeline options struct. -func WithPipelineOptions(ctx context.Context, options *structpb.Struct) context.Context { - return context.WithValue(ctx, pipelineOptionsKey, options) -} - -// PipelineOptions returns the pipeline options from the context if present. -func PipelineOptions(ctx context.Context) *structpb.Struct { - options, _ := ctx.Value(pipelineOptionsKey).(*structpb.Struct) - return options -} - // GetExperiments extracts a list of experiments from the pipeline options. func GetExperiments(options *structpb.Struct) []string { if options == nil { return nil } - // Try legacy style first var exps []string + // Try legacy style for _, v := range options.GetFields()["options"].GetStructValue().GetFields()["experiments"].GetListValue().GetValues() { exps = append(exps, v.GetStringValue()) } - if len(exps) > 0 { - return exps - } - // Try URN style for _, v := range options.GetFields()["beam:option:experiments:v1"].GetListValue().GetValues() { exps = append(exps, v.GetStringValue()) diff --git a/sdks/go/pkg/beam/artifact/options_test.go b/sdks/go/pkg/beam/artifact/options_test.go index ffec731ff818..a9f0e4bb7e35 100644 --- a/sdks/go/pkg/beam/artifact/options_test.go +++ b/sdks/go/pkg/beam/artifact/options_test.go @@ -16,30 +16,11 @@ package artifact import ( - "context" "testing" structpb "google.golang.org/protobuf/types/known/structpb" ) -func TestPipelineOptionsContext(t *testing.T) { - ctx := context.Background() - if PipelineOptions(ctx) != nil { - t.Errorf("empty context should have nil pipeline options") - } - - options, _ := structpb.NewStruct(map[string]interface{}{ - "options": map[string]interface{}{ - "experiments": []interface{}{"exp1"}, - }, - }) - ctxWithOpts := WithPipelineOptions(ctx, options) - got := PipelineOptions(ctxWithOpts) - if got == nil { - t.Errorf("context should have pipeline options") - } -} - func TestGetExperiments_Nil(t *testing.T) { if got := GetExperiments(nil); got != nil { t.Errorf("GetExperiments(nil) = %v, want nil", got) @@ -82,3 +63,16 @@ func TestHasExperiment(t *testing.T) { t.Errorf("HasExperiment(exp3) = true, want false") } } + +func TestGetExperiments_Combined(t *testing.T) { + options, _ := structpb.NewStruct(map[string]interface{}{ + "options": map[string]interface{}{ + "experiments": []interface{}{"exp1", "exp2"}, + }, + "beam:option:experiments:v1": []interface{}{"expA", "expB"}, + }) + exps := GetExperiments(options) + if len(exps) != 4 || exps[0] != "exp1" || exps[1] != "exp2" || exps[2] != "expA" || exps[3] != "expB" { + t.Errorf("GetExperiments() = %v, want [exp1 exp2 expA expB]", exps) + } +} diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index f04c2918b002..3ce79e4927eb 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -105,8 +105,8 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } - // Inject pipeline options into context - ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // Inject artifact validation enabled state into context + ctx = artifact.WithArtifactValidation(ctx, !artifact.HasExperiment(info.GetPipelineOptions(), "disable_staged_file_integrity_checks")) // (2) Retrieve the staged user jars. We ignore any disk limit, // because the staged jars are mandatory. diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index e0d2fd3d0937..647ea6408053 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -184,8 +184,8 @@ func launchSDKProcess() error { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } - // Inject pipeline options into context - ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // Inject artifact validation enabled state into context + ctx = artifact.WithArtifactValidation(ctx, !artifact.HasExperiment(info.GetPipelineOptions(), "disable_staged_file_integrity_checks")) experiments := getExperiments(options) pipNoBuildIsolation = false diff --git a/sdks/typescript/container/boot.go b/sdks/typescript/container/boot.go index 70c512d62b04..95e26124facc 100644 --- a/sdks/typescript/container/boot.go +++ b/sdks/typescript/container/boot.go @@ -91,8 +91,8 @@ func main() { logger.Fatalf(ctx, "Failed to convert pipeline options: %v", err) } - // Inject pipeline options into context - ctx = artifact.WithPipelineOptions(ctx, info.GetPipelineOptions()) + // Inject artifact validation enabled state into context + ctx = artifact.WithArtifactValidation(ctx, !artifact.HasExperiment(info.GetPipelineOptions(), "disable_staged_file_integrity_checks")) // (2) Retrieve and install the staged packages. From 5d278ce48e36fd5e4c2426484d8e1440b15b5dba Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 4 May 2026 20:41:15 -0700 Subject: [PATCH 14/15] add log --- sdks/go/pkg/beam/artifact/materialize.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index 761f70101499..a0fcdf7bf231 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -34,7 +34,6 @@ import ( "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" - beamlog "github.com/apache/beam/sdks/v2/go/pkg/beam/log" jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx" @@ -261,7 +260,9 @@ func (a artifact) retrieve(ctx context.Context, dest string) error { } if isArtifactValidationEnabled(ctx) { - if a.expectedSha256 != "" && sha256Hash != a.expectedSha256 { + if a.expectedSha256 == "" { + log.Printf("WARN: Artifact validation skipped for file: %v", filename) + } else if sha256Hash != a.expectedSha256 { return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256) } log.Printf("Sha256 validation done for file: %v, with sha256 %v", filename, a.expectedSha256) @@ -480,7 +481,7 @@ func retrieve(ctx context.Context, client jobpb.LegacyArtifactRetrievalServiceCl // Artifact Sha256 hash is an optional field in metadata so we should only validate when its present. if isArtifactValidationEnabled(ctx) { if a.Sha256 == "" { - beamlog.Warnf(ctx, "Artifact validation skipped for file: %v", filename) + log.Printf("WARN: Artifact validation skipped for file: %v", filename) } else if sha256Hash != a.Sha256 { return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256) } From a72d19e34cff604e1aea9af3399a96dff5423fa7 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 5 May 2026 07:58:49 -0700 Subject: [PATCH 15/15] remove log --- sdks/go/pkg/beam/artifact/materialize.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sdks/go/pkg/beam/artifact/materialize.go b/sdks/go/pkg/beam/artifact/materialize.go index a0fcdf7bf231..db624f3776af 100644 --- a/sdks/go/pkg/beam/artifact/materialize.go +++ b/sdks/go/pkg/beam/artifact/materialize.go @@ -60,19 +60,14 @@ func WithArtifactValidation(ctx context.Context, enabled bool) context.Context { return context.WithValue(ctx, artifactValidationKey, enabled) } -// ArtifactValidation returns the artifact validation enabled state from the context. -func ArtifactValidation(ctx context.Context) bool { +// isArtifactValidationEnabled parses pipeline options to check if "disable_integrity_checks" is enabled. +func isArtifactValidationEnabled(ctx context.Context) bool { if val, ok := ctx.Value(artifactValidationKey).(bool); ok { return val } return true } -// isArtifactValidationEnabled parses pipeline options to check if "disable_integrity_checks" is enabled. -func isArtifactValidationEnabled(ctx context.Context) bool { - return ArtifactValidation(ctx) -} - // Materialize is a convenience helper for ensuring that all artifacts are // present and uncorrupted. It interprets each artifact name as a relative // path under the dest directory. It does not retrieve valid artifacts already @@ -265,7 +260,6 @@ func (a artifact) retrieve(ctx context.Context, dest string) error { } else if sha256Hash != a.expectedSha256 { return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.expectedSha256) } - log.Printf("Sha256 validation done for file: %v, with sha256 %v", filename, a.expectedSha256) } return nil