From b8aaae931ebf941fa956cb17087cbf249755dca3 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 13 Apr 2026 14:09:30 -0700 Subject: [PATCH 1/4] NamespaceMetricCapture in Nexus tests --- tests/nexus_api_test.go | 86 ++++++------- tests/nexus_api_validation_test.go | 50 +++----- tests/nexus_workflow_test.go | 172 ++++++++++++++------------ tests/testcore/metric_capture.go | 8 ++ tests/testcore/metric_capture_test.go | 12 ++ 5 files changed, 173 insertions(+), 155 deletions(-) diff --git a/tests/nexus_api_test.go b/tests/nexus_api_test.go index 4a71b350d58..d958faffeb5 100644 --- a/tests/nexus_api_test.go +++ b/tests/nexus_api_test.go @@ -269,7 +269,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes(useTemporalFailures } testFn := func(s *NexusApiTestSuite, tc testcase, dispatchOnlyByEndpoint bool) { - env := newNexusTestEnv(s.T(), useTemporalFailures, testcore.WithDedicatedCluster()) + env := newNexusTestEnv(s.T(), useTemporalFailures) endpoint := env.createNexusEndpoint(s.T(), tc.endpointName, testcore.RandomizeStr("task-queue")) var dispatchURL string if dispatchOnlyByEndpoint { @@ -287,8 +287,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes(useTemporalFailures HTTPCaller: httpCaller, }) s.NoError(err) - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartNamespaceMetricCapture() pollerErrCh := env.nexusTaskPoller(ctx, s.T(), endpoint.Spec.Target.GetWorker().TaskQueue, tc.handler) @@ -316,21 +315,21 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes(useTemporalFailures tc.assertion(s, result, err, headerCapture.lastHeaders) s.NoError(<-pollerErrCh) - snap := capture.Snapshot() - - s.Len(snap["nexus_requests"], 1) - s.Subset(snap["nexus_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome}) - s.Contains(snap["nexus_requests"][0].Tags, "nexus_endpoint") - s.Equal(int64(1), snap["nexus_requests"][0].Value) - s.Equal(metrics.MetricUnit(""), snap["nexus_requests"][0].Unit) + requests := capture.Metric("nexus_requests") + s.Len(requests, 1) + s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome}) + s.Contains(requests[0].Tags, "nexus_endpoint") + s.Equal(int64(1), requests[0].Value) + s.Equal(metrics.MetricUnit(""), requests[0].Unit) - s.Len(snap["nexus_latency"], 1) - s.Subset(snap["nexus_latency"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome}) - s.Contains(snap["nexus_latency"][0].Tags, "nexus_endpoint") + latency := capture.Metric("nexus_latency") + s.Len(latency, 1) + s.Subset(latency[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome}) + s.Contains(latency[0].Tags, "nexus_endpoint") // Ensure that StartOperation request is tracked as part of normal service telemetry metrics s.Condition(func() bool { - for _, m := range snap["service_requests"] { + for _, m := range capture.Metric("service_requests") { if opTag, ok := m.Tags["operation"]; ok && opTag == "StartNexusOperation" { return true } @@ -356,17 +355,17 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims(useTemporalFailures b name string header nexus.Header handler nexusTaskHandler - assertion func(*NexusApiTestSuite, *nexusrpc.ClientStartOperationResponse[string], error, map[string][]*metricstest.CapturedRecording) + assertion func(*NexusApiTestSuite, *nexusrpc.ClientStartOperationResponse[string], error, []*metricstest.CapturedRecording) } testCases := []testcase{ { name: "no header", - assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, snap map[string][]*metricstest.CapturedRecording) { + assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, preprocessErrors []*metricstest.CapturedRecording) { var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeUnauthorized, handlerErr.Type) s.Equal("permission denied", handlerErr.Message) - s.Empty(snap["nexus_request_preprocess_errors"]) + s.Empty(preprocessErrors) }, }, { @@ -374,12 +373,12 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims(useTemporalFailures b header: nexus.Header{ "authorization": "Bearer invalid", }, - assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, snap map[string][]*metricstest.CapturedRecording) { + assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, preprocessErrors []*metricstest.CapturedRecording) { var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeUnauthenticated, handlerErr.Type) s.Equal("unauthorized", handlerErr.Message) - s.Len(snap["nexus_request_preprocess_errors"], 1) + s.Len(preprocessErrors, 1) }, }, { @@ -388,10 +387,10 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims(useTemporalFailures b "authorization": "Bearer test", }, handler: nexusEchoHandler, - assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, snap map[string][]*metricstest.CapturedRecording) { + assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, preprocessErrors []*metricstest.CapturedRecording) { s.NoError(err) s.Equal("input", res.Successful) - s.Empty(snap["nexus_request_preprocess_errors"]) + s.Empty(preprocessErrors) }, }, } @@ -436,14 +435,13 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims(useTemporalFailures b pollerErrCh = env.nexusTaskPoller(ctx, s.T(), taskQueue, tc.handler) } - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() + capture := env.StartGlobalMetricCapture() result, err := nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{ Header: tc.header, }) - snap := capture.Snapshot() - env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + preprocessErrors := capture.Metric("nexus_request_preprocess_errors") - tc.assertion(s, result, err, snap) + tc.assertion(s, result, err, preprocessErrors) if pollerErrCh != nil { s.NoError(<-pollerErrCh) } @@ -533,7 +531,7 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes(useTemporalFailure } testFn := func(s *NexusApiTestSuite, tc testcase, dispatchOnlyByEndpoint bool) { - env := newNexusTestEnv(s.T(), useTemporalFailures, testcore.WithDedicatedCluster()) + env := newNexusTestEnv(s.T(), useTemporalFailures) endpoint := env.createNexusEndpoint(s.T(), tc.endpointName, testcore.RandomizeStr("task-queue")) var dispatchURL string if dispatchOnlyByEndpoint { @@ -551,8 +549,7 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes(useTemporalFailure HTTPCaller: httpCaller, }) s.NoError(err) - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartNamespaceMetricCapture() pollerErrCh := env.nexusTaskPoller(ctx, s.T(), endpoint.Spec.Target.GetWorker().TaskQueue, tc.handler) @@ -576,21 +573,21 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes(useTemporalFailure tc.assertion(s, err, headerCapture.lastHeaders) s.NoError(<-pollerErrCh) - snap := capture.Snapshot() - - s.Len(snap["nexus_requests"], 1) - s.Subset(snap["nexus_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome}) - s.Contains(snap["nexus_requests"][0].Tags, "nexus_endpoint") - s.Equal(int64(1), snap["nexus_requests"][0].Value) - s.Equal(metrics.MetricUnit(""), snap["nexus_requests"][0].Unit) + requests := capture.Metric("nexus_requests") + s.Len(requests, 1) + s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome}) + s.Contains(requests[0].Tags, "nexus_endpoint") + s.Equal(int64(1), requests[0].Value) + s.Equal(metrics.MetricUnit(""), requests[0].Unit) - s.Len(snap["nexus_latency"], 1) - s.Subset(snap["nexus_latency"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome}) - s.Contains(snap["nexus_latency"][0].Tags, "nexus_endpoint") + latency := capture.Metric("nexus_latency") + s.Len(latency, 1) + s.Subset(latency[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome}) + s.Contains(latency[0].Tags, "nexus_endpoint") // Ensure that CancelOperation request is tracked as part of normal service telemetry metrics s.Condition(func() bool { - for _, m := range snap["service_requests"] { + for _, m := range capture.Metric("service_requests") { if opTag, ok := m.Tags["operation"]; ok && opTag == "CancelNexusOperation" { return true } @@ -666,13 +663,12 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_WithNamespaceAndTaskQueue_Su // with client-name in gRPC metadata, the matching service emits nexus_task_requests with a // client_name tag. This proves the header propagates e2e: SDK → frontend → matching. func (s *NexusApiTestSuite) TestNexusClientNameMetricPropagation(useTemporalFailures bool) { - env := newNexusTestEnv(s.T(), useTemporalFailures, testcore.WithDedicatedCluster()) + env := newNexusTestEnv(s.T(), useTemporalFailures) const expectedClientName = "temporal-go" taskQueue := testcore.RandomizeStr("tq") endpoint := env.createNexusEndpoint(s.T(), testcore.RandomizeStr("endpoint"), taskQueue) - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartNamespaceMetricCapture() ctx, cancel := context.WithCancel(testcore.NewContext()) defer cancel() @@ -704,16 +700,16 @@ func (s *NexusApiTestSuite) TestNexusClientNameMetricPropagation(useTemporalFail s.NoError(<-pollerErrCh) // Verify that the matching service emitted nexus_task_requests with client_name tag. - snap := capture.Snapshot() var found bool - for _, rec := range snap["nexus_task_requests"] { + requests := capture.Metric("nexus_task_requests") + for _, rec := range requests { if rec.Tags["client_name"] == expectedClientName { found = true break } } s.True(found, "expected nexus_task_requests metric with client_name=%s, got entries: %v", - expectedClientName, snap["nexus_task_requests"]) + expectedClientName, requests) } func nexusEchoHandler(_ *testing.T, res *workflowservice.PollNexusTaskQueueResponse) (*nexusTaskResponse, error) { diff --git a/tests/nexus_api_validation_test.go b/tests/nexus_api_validation_test.go index bcd26ea96e5..2a4a88abb39 100644 --- a/tests/nexus_api_validation_test.go +++ b/tests/nexus_api_validation_test.go @@ -31,7 +31,7 @@ func TestNexusAPIValidationTestSuite(t *testing.T) { } func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTaskQueue_NamespaceNotFound() { - env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster()) + env := newNexusTestEnv(s.T(), false) // Also use this test to verify that namespaces are unescaped in the path. taskQueue := testcore.RandomizeStr("task-queue") namespace := "namespace not/found" @@ -39,19 +39,17 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTa client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: u, Service: "test-service"}) s.NoError(err) ctx := testcore.NewContext() - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartNamespaceMetricCapture().ForNamespace(namespace) _, err = nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{}) var handlerError *nexus.HandlerError s.ErrorAs(err, &handlerError) s.Equal(nexus.HandlerErrorTypeNotFound, handlerError.Type) s.Equal(fmt.Sprintf("namespace not found: %q", namespace), handlerError.Message) - snap := capture.Snapshot() - - s.Len(snap["nexus_requests"], 1) - s.Equal(map[string]string{"namespace": namespace, "method": "StartNexusOperation", "outcome": "namespace_not_found", "nexus_endpoint": "_unknown_"}, snap["nexus_requests"][0].Tags) - s.Equal(int64(1), snap["nexus_requests"][0].Value) + requests := capture.Metric("nexus_requests") + s.Len(requests, 1) + s.Equal(map[string]string{"namespace": namespace, "method": "StartNexusOperation", "outcome": "namespace_not_found", "nexus_endpoint": "_unknown_"}, requests[0].Tags) + s.Equal(int64(1), requests[0].Value) } func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTaskQueue_NamespaceTooLong() { @@ -67,8 +65,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTa client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: u, Service: "test-service"}) s.NoError(err) ctx := testcore.NewContext() - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartGlobalMetricCapture() _, err = nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{}) var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) @@ -76,9 +73,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTa // I wish we'd never put periods in error messages :( s.Equal("Namespace length exceeds limit.", handlerErr.Message) - snap := capture.Snapshot() - - s.Len(snap["nexus_request_preprocess_errors"], 1) + s.Len(capture.Metric("nexus_request_preprocess_errors"), 1) } func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_Forbidden() { @@ -205,8 +200,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_Forbidden() { s.NoError(err) ctx := testcore.NewContext() - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartNamespaceMetricCapture() // Wait until the endpoint is loaded into the registry. s.Eventually(func() bool { @@ -219,11 +213,10 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_Forbidden() { s.ErrorAs(err, &handlerErr) tc.checkFailure(s, handlerErr) - snap := capture.Snapshot() - - s.Len(snap["nexus_requests"], 1) - s.Subset(snap["nexus_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.expectedOutcomeMetric}) - s.Equal(int64(1), snap["nexus_requests"][0].Value) + requests := capture.Metric("nexus_requests") + s.Len(requests, 1) + s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.expectedOutcomeMetric}) + s.Equal(int64(1), requests[0].Value) } for _, tc := range testCases { @@ -240,7 +233,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_PayloadSizeLimit() input := strings.Repeat("a", (2*1024*1024)-10) testFn := func(s *NexusAPIValidationTestSuite, dispatchOnlyByEndpoint bool) { - env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster()) + env := newNexusTestEnv(s.T(), false) taskQueue := testcore.RandomizeStr("task-queue") testEndpoint := env.createNexusEndpoint(s.T(), testcore.RandomizeStr("test-endpoint"), taskQueue) @@ -256,9 +249,6 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_PayloadSizeLimit() client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: dispatchURL, Service: "test-service"}) s.NoError(err) - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) - var result *nexusrpc.ClientStartOperationResponse[string] // Wait until the endpoint is loaded into the registry. @@ -283,7 +273,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_PayloadSizeLimit() } func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskMethods_VerifiesTaskTokenMatchesRequestNamespace() { - env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster()) + env := newNexusTestEnv(s.T(), false) ctx := testcore.NewContext() tt := tokenspb.NexusTask{ @@ -312,7 +302,7 @@ func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskMethods_Verifies } func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskCompleted_ValidateOperationTokenLength() { - env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster()) + env := newNexusTestEnv(s.T(), false) ctx := testcore.NewContext() tt := tokenspb.NexusTask{ @@ -345,7 +335,7 @@ func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskCompleted_Valida } func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskMethods_ValidateFailureDetailsJSON() { - env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster()) + env := newNexusTestEnv(s.T(), false) ctx := testcore.NewContext() tt := tokenspb.NexusTask{ @@ -399,13 +389,11 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_ByEndpoint_Endpoin client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: u, Service: "test-service"}) s.NoError(err) ctx := testcore.NewContext() - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartGlobalMetricCapture() _, err = nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{}) var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeNotFound, handlerErr.Type) s.Equal("nexus endpoint not found", handlerErr.Message) - snap := capture.Snapshot() - s.Len(snap["nexus_request_preprocess_errors"], 1) + s.Len(capture.Metric("nexus_request_preprocess_errors"), 1) } diff --git a/tests/nexus_workflow_test.go b/tests/nexus_workflow_test.go index 726f4e12f3b..bc061c3958e 100644 --- a/tests/nexus_workflow_test.go +++ b/tests/nexus_workflow_test.go @@ -685,12 +685,15 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion(chasmEnabled Header: nexus.Header{commonnexus.CallbackTokenHeader: callbackToken}, } s.NoError(err) - snap, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, largeCompletion) + + capture := env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, largeCompletion) + completionRequests := capture.Metric("nexus_completion_requests") var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_bad_request"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_bad_request"}) invalidNamespace := testcore.RandomizeStr("ns") _, err = env.FrontendClient().RegisterNamespace(ctx, &workflowservice.RegisterNamespaceRequest{ @@ -706,7 +709,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion(chasmEnabled Result: testcore.MustToPayload(s.T(), "result"), Header: nexus.Header{commonnexus.CallbackTokenHeader: callbackToken}, } - _, err = s.sendNexusCompletionRequest(ctx, env, invalidCallbackURL, completion) + err = s.sendNexusCompletionRequest(ctx, invalidCallbackURL, completion) // Verify we get the correct error response s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type) @@ -726,11 +729,13 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion(chasmEnabled s.NoError(err) completion.Header = nexus.Header{commonnexus.CallbackTokenHeader: callbackToken} - snap, err = s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture = env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests = capture.Metric("nexus_completion_requests") s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeNotFound, handlerErr.Type) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_not_found"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_not_found"}) // Request fails if the state machine reference is stale. staleToken := common.CloneProto(completionToken) @@ -739,33 +744,40 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletion(chasmEnabled s.NoError(err) completion.Header = nexus.Header{commonnexus.CallbackTokenHeader: callbackToken} - snap, err = s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture = env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests = capture.Metric("nexus_completion_requests") s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeNotFound, handlerErr.Type) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_not_found"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_not_found"}) callbackToken, err = gen.Tokenize(completionToken) s.NoError(err) completion.Header = nexus.Header{commonnexus.CallbackTokenHeader: callbackToken} - snap, err = s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture = env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests = capture.Metric("nexus_completion_requests") + serviceRequests := capture.Metric("service_requests") s.NoError(err) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "success"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "success"}) // Ensure that CompleteOperation request is tracked as part of normal service telemetry metrics - idx := slices.IndexFunc(snap["service_requests"], func(m *metricstest.CapturedRecording) bool { + idx := slices.IndexFunc(serviceRequests, func(m *metricstest.CapturedRecording) bool { opTag, ok := m.Tags["operation"] return ok && opTag == "CompleteNexusOperation" }) s.Greater(idx, -1) // Resend the request and verify we get a not found error since the operation has already completed. - snap, err = s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture = env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests = capture.Metric("nexus_completion_requests") s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeNotFound, handlerErr.Type) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_not_found"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_not_found"}) // Poll again and verify the completion is recorded and triggers workflow progress. pollResp, err = env.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ @@ -1204,10 +1216,12 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncFailure(chasmEnabled boo Error: nexus.NewOperationFailedErrorf("test operation failed"), Header: nexus.Header{commonnexus.CallbackTokenHeader: callbackToken}, } - snap, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture := env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests := capture.Metric("nexus_completion_requests") s.NoError(err) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "success"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "success"}) // Poll again and verify the completion is recorded and triggers workflow progress. pollResp, err = env.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ @@ -1263,15 +1277,16 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn Result: testcore.MustToPayload(s.T(), "result"), Header: nexus.Header{commonnexus.CallbackTokenHeader: tokenWithBadNamespace}, } - snap, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture := env.StartGlobalMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeNotFound, handlerErr.Type) - s.Len(snap["nexus_completion_request_preprocess_errors"], 1) + s.Len(capture.Metric("nexus_completion_request_preprocess_errors"), 1) }) s.Run("NamespaceNotFoundNoIdentifier", func(s *NexusWorkflowTestSuite) { - env := s.newNexusWorkflowTestEnv(chasmEnabled) + env := s.newNexusWorkflowTestEnv(chasmEnabled, testcore.WithDedicatedCluster()) // Generate a token with a non-existent namespace ID tokenWithBadNamespace, err := s.generateValidCallbackToken("namespace-doesnt-exist-id", testcore.RandomizeStr("workflow"), uuid.NewString()) s.NoError(err) @@ -1281,15 +1296,16 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn Result: testcore.MustToPayload(s.T(), "result"), Header: nexus.Header{commonnexus.CallbackTokenHeader: tokenWithBadNamespace}, } - snap, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture := env.StartGlobalMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeNotFound, handlerErr.Type) - s.Len(snap["nexus_completion_request_preprocess_errors"], 1) + s.Len(capture.Metric("nexus_completion_request_preprocess_errors"), 1) }) s.Run("OperationTokenTooLong", func(s *NexusWorkflowTestSuite) { - env := s.newNexusWorkflowTestEnv(chasmEnabled) + env := s.newNexusWorkflowTestEnv(chasmEnabled, testcore.WithDedicatedCluster()) publicCallbackURL := "http://" + env.HttpAPIAddress() + "/" + commonnexus.RouteCompletionCallback.Path(env.Namespace().String()) // Generate a valid callback token to get past initial validation @@ -1302,17 +1318,20 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn Header: nexus.Header{commonnexus.CallbackTokenHeader: validToken}, } - snap, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture := env.StartGlobalMetricCapture() + namespaceCapture := env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests := namespaceCapture.Metric("nexus_completion_requests") var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type) - s.Empty(snap["nexus_completion_request_preprocess_errors"]) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_bad_request"}) + s.Empty(capture.Metric("nexus_completion_request_preprocess_errors")) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_bad_request"}) }) s.Run("OperationTokenTooLongNoIdentifier", func(s *NexusWorkflowTestSuite) { - env := s.newNexusWorkflowTestEnv(chasmEnabled) + env := s.newNexusWorkflowTestEnv(chasmEnabled, testcore.WithDedicatedCluster()) publicCallbackURL := "http://" + env.HttpAPIAddress() + commonnexus.PathCompletionCallbackNoIdentifier // Generate a valid callback token to get past initial validation namespaceID := env.NamespaceID().String() @@ -1325,13 +1344,16 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn Header: nexus.Header{commonnexus.CallbackTokenHeader: validToken}, } - snap, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture := env.StartGlobalMetricCapture() + namespaceCapture := env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests := namespaceCapture.Metric("nexus_completion_requests") var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type) - s.Empty(snap["nexus_completion_request_preprocess_errors"]) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_bad_request"}) + s.Empty(capture.Metric("nexus_completion_request_preprocess_errors")) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "error_bad_request"}) }) s.Run("InvalidCallbackToken", func(s *NexusWorkflowTestSuite) { @@ -1342,7 +1364,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn publicCallbackURL := "http://" + env.HttpAPIAddress() + "/" + commonnexus.RouteCompletionCallback.Path(env.Namespace().String()) // metrics collection is not initialized before callback validation // Send request without callback token, helper does not add token if blank - _, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + err := s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) // Verify we get the correct error response var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) @@ -1358,7 +1380,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn publicCallbackURL := "http://" + env.HttpAPIAddress() + commonnexus.PathCompletionCallbackNoIdentifier // metrics collection is not initialized before callback validation // Send request without callback token, helper does not add token if blank - _, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + err := s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) // Verify we get the correct error response var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) @@ -1369,8 +1391,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn s.Run("InvalidClientVersion", func(s *NexusWorkflowTestSuite) { env := s.newNexusWorkflowTestEnv(chasmEnabled) publicCallbackURL := "http://" + env.HttpAPIAddress() + "/" + commonnexus.RouteCompletionCallback.Path(env.Namespace().String()) - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartNamespaceMetricCapture() // Generate a valid callback token to get past initial validation namespaceID := env.NamespaceID().String() @@ -1388,19 +1409,18 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn Serializer: commonnexus.PayloadSerializer, }) err = client.CompleteOperation(ctx, publicCallbackURL, completion) - snap := capture.Snapshot() + completionRequests := capture.Metric("nexus_completion_requests") var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "unsupported_client"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "unsupported_client"}) }) s.Run("InvalidClientVersionNoIdentifier", func(s *NexusWorkflowTestSuite) { env := s.newNexusWorkflowTestEnv(chasmEnabled) publicCallbackURL := "http://" + env.HttpAPIAddress() + commonnexus.PathCompletionCallbackNoIdentifier - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) + capture := env.StartNamespaceMetricCapture() // Generate a valid callback token to get past initial validation namespaceID := env.NamespaceID().String() @@ -1419,12 +1439,12 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionErrors(chasmEn Serializer: commonnexus.PayloadSerializer, }) err = client.CompleteOperation(ctx, publicCallbackURL, completion) - snap := capture.Snapshot() + completionRequests := capture.Metric("nexus_completion_requests") var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "unsupported_client"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "unsupported_client"}) }) } @@ -1452,12 +1472,14 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionAuthErrors(cha } publicCallbackURL := "http://" + env.HttpAPIAddress() + "/" + commonnexus.RouteCompletionCallback.Path(env.Namespace().String()) - snap, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture := env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests := capture.Metric("nexus_completion_requests") var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeUnauthorized, handlerErr.Type) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "unauthorized"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "unauthorized"}) } func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionAuthErrorsNoIdentifier(chasmEnabled bool) { @@ -1483,12 +1505,14 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionAuthErrorsNoId Header: nexus.Header{commonnexus.CallbackTokenHeader: callbackToken}, } publicCallbackURL := "http://" + env.HttpAPIAddress() + commonnexus.PathCompletionCallbackNoIdentifier - snap, err := s.sendNexusCompletionRequest(ctx, env, publicCallbackURL, completion) + capture := env.StartNamespaceMetricCapture() + err = s.sendNexusCompletionRequest(ctx, publicCallbackURL, completion) + completionRequests := capture.Metric("nexus_completion_requests") var handlerErr *nexus.HandlerError s.ErrorAs(err, &handlerErr) s.Equal(nexus.HandlerErrorTypeUnauthorized, handlerErr.Type) - s.Len(snap["nexus_completion_requests"], 1) - s.Subset(snap["nexus_completion_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "unauthorized"}) + s.Len(completionRequests, 1) + s.Subset(completionRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "outcome": "unauthorized"}) } func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionInternalAuth(chasmEnabled bool) { @@ -1935,7 +1959,7 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationAsyncCompletionAfterReset(cha Result: testcore.MustToPayload(s.T(), "result"), Header: nexus.Header{commonnexus.CallbackTokenHeader: callbackToken}, } - _, err = s.sendNexusCompletionRequest(ctx, env, publicCallbackUrl, completion) + err = s.sendNexusCompletionRequest(ctx, publicCallbackUrl, completion) s.NoError(err) // Poll again and verify the completion is recorded and triggers workflow progress. @@ -2210,7 +2234,7 @@ func (s *NexusWorkflowTestSuite) TestNexusSyncOperationErrorRehydration(chasmEna s.NoError(w.Start()) defer w.Stop() - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() + capture := env.StartNamespaceMetricCapture() run, err := env.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ TaskQueue: taskQueue, }, callerWF, tc.outcome) @@ -2230,7 +2254,6 @@ func (s *NexusWorkflowTestSuite) TestNexusSyncOperationErrorRehydration(chasmEna } } }, 10*time.Second, 100*time.Millisecond) - env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) if !chasmEnabled { // TODO: Assert pending Nexus operation rehydration for CHASM once DescribeWorkflowExecution.PendingNexusOperations is implemented there. tc.checkPendingError(s, converter.FailureToError(f)) @@ -2238,16 +2261,15 @@ func (s *NexusWorkflowTestSuite) TestNexusSyncOperationErrorRehydration(chasmEna s.NoError(env.SdkClient().TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "test cleanup")) } else { wfErr := run.Get(ctx, nil) - env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) tc.checkWorkflowError(s, wfErr) } - snap := capture.Snapshot() if !chasmEnabled { // TODO: Assert this for CHASM once sync error rehydration emits nexus_outbound_requests metrics there. - s.Len(snap["nexus_outbound_requests"], 1) + outboundRequests := capture.Metric("nexus_outbound_requests") + s.Len(outboundRequests, 1) s.Subset( - snap["nexus_outbound_requests"][0].Tags, + outboundRequests[0].Tags, map[string]string{ "namespace": env.Namespace().String(), "method": "StartOperation", @@ -2409,19 +2431,18 @@ func (s *NexusWorkflowTestSuite) TestNexusAsyncOperationErrorRehydration(chasmEn s.NoError(w.Start()) defer w.Stop() - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() + capture := env.StartNamespaceMetricCapture() run, err := env.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ TaskQueue: taskQueue, }, callerWF, tc.outcome, tc.action) s.NoError(err) wfErr := run.Get(ctx, nil) - env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) tc.checkWorkflowError(s, wfErr) - snap := capture.Snapshot() - s.GreaterOrEqual(len(snap["nexus_outbound_requests"]), 1) - s.Subset(snap["nexus_outbound_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartOperation", "failure_source": "_unknown_", "outcome": "pending"}) + outboundRequests := capture.Metric("nexus_outbound_requests") + s.GreaterOrEqual(len(outboundRequests), 1) + s.Subset(outboundRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartOperation", "failure_source": "_unknown_", "outcome": "pending"}) } for _, tc := range cases { @@ -2571,14 +2592,12 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationSyncNexusFailure(chasmEnabled s.NoError(w.Start()) s.T().Cleanup(w.Stop) - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() + capture := env.StartNamespaceMetricCapture() run, err := env.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ TaskQueue: taskQueue, }, callerWF) s.NoError(err) wfErr := run.Get(ctx, nil) - env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) - var handlerErr *nexus.HandlerError s.ErrorAs(wfErr, &handlerErr) s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type) @@ -2593,10 +2612,10 @@ func (s *NexusWorkflowTestSuite) TestNexusOperationSyncNexusFailure(chasmEnabled s.NoError(json.Unmarshal(failure.Details, &details)) s.Equal("details", details) - snap := capture.Snapshot() - s.Len(snap["nexus_outbound_requests"], 1) - // Confirming that requests which do not go through our frontend are not tagged with `failure_source` - s.Subset(snap["nexus_outbound_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartOperation", "failure_source": "_unknown_", "outcome": "handler-error:BAD_REQUEST"}) + outboundRequests := capture.Metric("nexus_outbound_requests") + s.Len(outboundRequests, 1) + // Confirming that outbound requests which do not go through our frontend are not tagged with `failure_source` + s.Subset(outboundRequests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartOperation", "failure_source": "_unknown_", "outcome": "handler-error:BAD_REQUEST"}) } func (s *NexusWorkflowTestSuite) TestNexusAsyncOperationWithMultipleCallers(chasmEnabled bool) { @@ -3158,18 +3177,13 @@ func (s *NexusWorkflowTestSuite) generateValidCallbackToken(namespaceID, workflo func (s *NexusWorkflowTestSuite) sendNexusCompletionRequest( ctx context.Context, - env *NexusTestEnv, url string, completion nexusrpc.CompleteOperationOptions, -) (map[string][]*metricstest.CapturedRecording, error) { - capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture() - defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture) - +) error { c := nexusrpc.NewCompletionHTTPClient(nexusrpc.CompletionHTTPClientOptions{ Serializer: commonnexus.PayloadSerializer, }) - err := c.CompleteOperation(ctx, url, completion) - return capture.Snapshot(), err + return c.CompleteOperation(ctx, url, completion) } // NOTE: This test cannot use the SDK workflow package because there is a restriction that prevents setting the diff --git a/tests/testcore/metric_capture.go b/tests/testcore/metric_capture.go index 65946dcdb33..d6ca2237485 100644 --- a/tests/testcore/metric_capture.go +++ b/tests/testcore/metric_capture.go @@ -103,6 +103,14 @@ func newNamespaceMetricCapture(capture *metricstest.Capture, namespace string) * } } +// ForNamespace returns a view of the same capture filtered to the given namespace. +func (c *NamespaceMetricCapture) ForNamespace(namespace string) *NamespaceMetricCapture { + return &NamespaceMetricCapture{ + capture: c.capture, + namespace: namespace, + } +} + func (c *NamespaceMetricCapture) Metric(name string) []*metricstest.CapturedRecording { return c.collectMetric(name, nil) } diff --git a/tests/testcore/metric_capture_test.go b/tests/testcore/metric_capture_test.go index c865a2e1574..5f97cd2cbc9 100644 --- a/tests/testcore/metric_capture_test.go +++ b/tests/testcore/metric_capture_test.go @@ -37,6 +37,18 @@ func (s *MetricCaptureSuite) TestNamespaceMetricCapture() { s.Equal("test-ns", recordings[0].Tags["namespace"]) }) + s.Run("can filter to a different namespace", func(s *MetricCaptureSuite) { + const metricName = "namespaced_metric_other" + handler.WithTags(metrics.NamespaceTag("test-ns")).Counter(metricName).Record(1) + handler.WithTags(metrics.NamespaceTag("other-ns")).Counter(metricName).Record(1) + + namespaceCapture := newNamespaceMetricCapture(capture, "test-ns").ForNamespace("other-ns") + + recordings := namespaceCapture.Metric(metricName) + s.Len(recordings, 1) + s.Equal("other-ns", recordings[0].Tags["namespace"]) + }) + s.Run("panics when the metric is not namespace-scoped", func(s *MetricCaptureSuite) { // Record the metric without a namespace tag. handler.Counter("cluster_metric").Record(1) From 8ce98880a45a3300e347b229f54ab6d3b8d49ed8 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 14 Apr 2026 11:22:47 -0700 Subject: [PATCH 2/4] add comment --- tests/nexus_api_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/nexus_api_test.go b/tests/nexus_api_test.go index d958faffeb5..e45fddfdf30 100644 --- a/tests/nexus_api_test.go +++ b/tests/nexus_api_test.go @@ -396,6 +396,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims(useTemporalFailures b } testFn := func(s *NexusApiTestSuite, tc testcase, dispatchOnlyByEndpoint bool) { + // This still needs a dedicated cluster because of SetOnAuthorize/SetOnGetClaims. env := newNexusTestEnv(s.T(), useTemporalFailures, testcore.WithDedicatedCluster()) env.GetTestCluster().Host().SetOnAuthorize(func(ctx context.Context, c *authorization.Claims, ct *authorization.CallTarget) (authorization.Result, error) { if ct.APIName == configs.DispatchNexusTaskByNamespaceAndTaskQueueAPIName && (c == nil || c.Subject != "test") { From 257af8260ca41e9af5e8ac5745f4a582ee33e17a Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 14 Apr 2026 11:24:12 -0700 Subject: [PATCH 3/4] add StartNamespaceMetricCaptureFor --- tests/nexus_api_validation_test.go | 2 +- tests/testcore/metric_capture.go | 8 -------- tests/testcore/metric_capture_test.go | 4 ++-- tests/testcore/test_env.go | 9 +++++++-- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/tests/nexus_api_validation_test.go b/tests/nexus_api_validation_test.go index 2a4a88abb39..457e4916e07 100644 --- a/tests/nexus_api_validation_test.go +++ b/tests/nexus_api_validation_test.go @@ -39,7 +39,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTa client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: u, Service: "test-service"}) s.NoError(err) ctx := testcore.NewContext() - capture := env.StartNamespaceMetricCapture().ForNamespace(namespace) + capture := env.StartNamespaceMetricCaptureFor(namespace) _, err = nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{}) var handlerError *nexus.HandlerError s.ErrorAs(err, &handlerError) diff --git a/tests/testcore/metric_capture.go b/tests/testcore/metric_capture.go index d6ca2237485..65946dcdb33 100644 --- a/tests/testcore/metric_capture.go +++ b/tests/testcore/metric_capture.go @@ -103,14 +103,6 @@ func newNamespaceMetricCapture(capture *metricstest.Capture, namespace string) * } } -// ForNamespace returns a view of the same capture filtered to the given namespace. -func (c *NamespaceMetricCapture) ForNamespace(namespace string) *NamespaceMetricCapture { - return &NamespaceMetricCapture{ - capture: c.capture, - namespace: namespace, - } -} - func (c *NamespaceMetricCapture) Metric(name string) []*metricstest.CapturedRecording { return c.collectMetric(name, nil) } diff --git a/tests/testcore/metric_capture_test.go b/tests/testcore/metric_capture_test.go index 5f97cd2cbc9..36ce9f18f1a 100644 --- a/tests/testcore/metric_capture_test.go +++ b/tests/testcore/metric_capture_test.go @@ -37,12 +37,12 @@ func (s *MetricCaptureSuite) TestNamespaceMetricCapture() { s.Equal("test-ns", recordings[0].Tags["namespace"]) }) - s.Run("can filter to a different namespace", func(s *MetricCaptureSuite) { + s.Run("can capture for an explicit namespace", func(s *MetricCaptureSuite) { const metricName = "namespaced_metric_other" handler.WithTags(metrics.NamespaceTag("test-ns")).Counter(metricName).Record(1) handler.WithTags(metrics.NamespaceTag("other-ns")).Counter(metricName).Record(1) - namespaceCapture := newNamespaceMetricCapture(capture, "test-ns").ForNamespace("other-ns") + namespaceCapture := newNamespaceMetricCapture(capture, "other-ns") recordings := namespaceCapture.Metric(metricName) s.Len(recordings, 1) diff --git a/tests/testcore/test_env.go b/tests/testcore/test_env.go index 43923a7d3a8..9513cb79b24 100644 --- a/tests/testcore/test_env.go +++ b/tests/testcore/test_env.go @@ -376,10 +376,15 @@ func (e *TestEnv) StartGlobalMetricCapture() *GlobalMetricCapture { return globalCapture } -// StartNamespaceMetricCapture starts a metrics capture that only allows safe per-metric namespace filtering. +// StartNamespaceMetricCapture starts a metrics capture scoped to this test's namespace. // Namespace captures are safe on shared clusters because reads are restricted to // per-metric namespace-filtered iteration and reject non-namespaced metrics. func (e *TestEnv) StartNamespaceMetricCapture() *NamespaceMetricCapture { + return e.StartNamespaceMetricCaptureFor(e.Namespace().String()) +} + +// StartNamespaceMetricCaptureFor starts a metrics capture scoped to the provided namespace. +func (e *TestEnv) StartNamespaceMetricCaptureFor(namespace string) *NamespaceMetricCapture { handler := e.cluster.host.CaptureMetricsHandler() if handler == nil { e.t.Fatal("StartNamespaceMetricCapture is unavailable because metrics capture is not enabled on this cluster") @@ -389,7 +394,7 @@ func (e *TestEnv) StartNamespaceMetricCapture() *NamespaceMetricCapture { e.t.Cleanup(func() { handler.StopCapture(capture) }) - return newNamespaceMetricCapture(capture, e.Namespace().String()) + return newNamespaceMetricCapture(capture, namespace) } func canBeNamespaceScoped(p dynamicconfig.Precedence) bool { From 3dc9e60615cce3771291cbb62f1f1ce5411fb776 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Tue, 14 Apr 2026 11:36:10 -0700 Subject: [PATCH 4/4] fix lint --- tests/testcore/test_env.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/testcore/test_env.go b/tests/testcore/test_env.go index 9513cb79b24..dff78294b6b 100644 --- a/tests/testcore/test_env.go +++ b/tests/testcore/test_env.go @@ -384,7 +384,7 @@ func (e *TestEnv) StartNamespaceMetricCapture() *NamespaceMetricCapture { } // StartNamespaceMetricCaptureFor starts a metrics capture scoped to the provided namespace. -func (e *TestEnv) StartNamespaceMetricCaptureFor(namespace string) *NamespaceMetricCapture { +func (e *TestEnv) StartNamespaceMetricCaptureFor(namespaceName string) *NamespaceMetricCapture { handler := e.cluster.host.CaptureMetricsHandler() if handler == nil { e.t.Fatal("StartNamespaceMetricCapture is unavailable because metrics capture is not enabled on this cluster") @@ -394,7 +394,7 @@ func (e *TestEnv) StartNamespaceMetricCaptureFor(namespace string) *NamespaceMet e.t.Cleanup(func() { handler.StopCapture(capture) }) - return newNamespaceMetricCapture(capture, namespace) + return newNamespaceMetricCapture(capture, namespaceName) } func canBeNamespaceScoped(p dynamicconfig.Precedence) bool {