Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 42 additions & 45 deletions tests/nexus_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes(useTemporalFailures
}

testFn := func(s *NexusApiTestSuite, tc testcase, dispatchOnlyByEndpoint bool) {
env := newNexusTestEnv(s.T(), useTemporalFailures, testcore.WithDedicatedCluster())
env := newNexusTestEnv(s.T(), useTemporalFailures)
endpoint := env.createNexusEndpoint(s.T(), tc.endpointName, testcore.RandomizeStr("task-queue"))
var dispatchURL string
if dispatchOnlyByEndpoint {
Expand All @@ -287,8 +287,7 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes(useTemporalFailures
HTTPCaller: httpCaller,
})
s.NoError(err)
capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)
capture := env.StartNamespaceMetricCapture()

pollerErrCh := env.nexusTaskPoller(ctx, s.T(), endpoint.Spec.Target.GetWorker().TaskQueue, tc.handler)

Expand Down Expand Up @@ -316,21 +315,21 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Outcomes(useTemporalFailures
tc.assertion(s, result, err, headerCapture.lastHeaders)
s.NoError(<-pollerErrCh)

snap := capture.Snapshot()

s.Len(snap["nexus_requests"], 1)
s.Subset(snap["nexus_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome})
s.Contains(snap["nexus_requests"][0].Tags, "nexus_endpoint")
s.Equal(int64(1), snap["nexus_requests"][0].Value)
s.Equal(metrics.MetricUnit(""), snap["nexus_requests"][0].Unit)
requests := capture.Metric("nexus_requests")
s.Len(requests, 1)
s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome})
s.Contains(requests[0].Tags, "nexus_endpoint")
s.Equal(int64(1), requests[0].Value)
s.Equal(metrics.MetricUnit(""), requests[0].Unit)

s.Len(snap["nexus_latency"], 1)
s.Subset(snap["nexus_latency"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome})
s.Contains(snap["nexus_latency"][0].Tags, "nexus_endpoint")
latency := capture.Metric("nexus_latency")
s.Len(latency, 1)
s.Subset(latency[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.outcome})
s.Contains(latency[0].Tags, "nexus_endpoint")

// Ensure that StartOperation request is tracked as part of normal service telemetry metrics
s.Condition(func() bool {
for _, m := range snap["service_requests"] {
for _, m := range capture.Metric("service_requests") {
if opTag, ok := m.Tags["operation"]; ok && opTag == "StartNexusOperation" {
return true
}
Expand All @@ -356,30 +355,30 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims(useTemporalFailures b
name string
header nexus.Header
handler nexusTaskHandler
assertion func(*NexusApiTestSuite, *nexusrpc.ClientStartOperationResponse[string], error, map[string][]*metricstest.CapturedRecording)
assertion func(*NexusApiTestSuite, *nexusrpc.ClientStartOperationResponse[string], error, []*metricstest.CapturedRecording)
}
testCases := []testcase{
{
name: "no header",
assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, snap map[string][]*metricstest.CapturedRecording) {
assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, preprocessErrors []*metricstest.CapturedRecording) {
var handlerErr *nexus.HandlerError
s.ErrorAs(err, &handlerErr)
s.Equal(nexus.HandlerErrorTypeUnauthorized, handlerErr.Type)
s.Equal("permission denied", handlerErr.Message)
s.Empty(snap["nexus_request_preprocess_errors"])
s.Empty(preprocessErrors)
},
},
{
name: "invalid bearer",
header: nexus.Header{
"authorization": "Bearer invalid",
},
assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, snap map[string][]*metricstest.CapturedRecording) {
assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, preprocessErrors []*metricstest.CapturedRecording) {
var handlerErr *nexus.HandlerError
s.ErrorAs(err, &handlerErr)
s.Equal(nexus.HandlerErrorTypeUnauthenticated, handlerErr.Type)
s.Equal("unauthorized", handlerErr.Message)
s.Len(snap["nexus_request_preprocess_errors"], 1)
s.Len(preprocessErrors, 1)
},
},
{
Expand All @@ -388,15 +387,16 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims(useTemporalFailures b
"authorization": "Bearer test",
},
handler: nexusEchoHandler,
assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, snap map[string][]*metricstest.CapturedRecording) {
assertion: func(s *NexusApiTestSuite, res *nexusrpc.ClientStartOperationResponse[string], err error, preprocessErrors []*metricstest.CapturedRecording) {
s.NoError(err)
s.Equal("input", res.Successful)
s.Empty(snap["nexus_request_preprocess_errors"])
s.Empty(preprocessErrors)
},
},
}

testFn := func(s *NexusApiTestSuite, tc testcase, dispatchOnlyByEndpoint bool) {
// This still needs a dedicated cluster because of SetOnAuthorize/SetOnGetClaims.
env := newNexusTestEnv(s.T(), useTemporalFailures, testcore.WithDedicatedCluster())
env.GetTestCluster().Host().SetOnAuthorize(func(ctx context.Context, c *authorization.Claims, ct *authorization.CallTarget) (authorization.Result, error) {
if ct.APIName == configs.DispatchNexusTaskByNamespaceAndTaskQueueAPIName && (c == nil || c.Subject != "test") {
Expand Down Expand Up @@ -436,14 +436,13 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_Claims(useTemporalFailures b
pollerErrCh = env.nexusTaskPoller(ctx, s.T(), taskQueue, tc.handler)
}

capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
capture := env.StartGlobalMetricCapture()
result, err := nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{
Header: tc.header,
})
snap := capture.Snapshot()
env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)
preprocessErrors := capture.Metric("nexus_request_preprocess_errors")

tc.assertion(s, result, err, snap)
tc.assertion(s, result, err, preprocessErrors)
if pollerErrCh != nil {
s.NoError(<-pollerErrCh)
}
Expand Down Expand Up @@ -533,7 +532,7 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes(useTemporalFailure
}

testFn := func(s *NexusApiTestSuite, tc testcase, dispatchOnlyByEndpoint bool) {
env := newNexusTestEnv(s.T(), useTemporalFailures, testcore.WithDedicatedCluster())
env := newNexusTestEnv(s.T(), useTemporalFailures)
endpoint := env.createNexusEndpoint(s.T(), tc.endpointName, testcore.RandomizeStr("task-queue"))
var dispatchURL string
if dispatchOnlyByEndpoint {
Expand All @@ -551,8 +550,7 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes(useTemporalFailure
HTTPCaller: httpCaller,
})
s.NoError(err)
capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)
capture := env.StartNamespaceMetricCapture()

pollerErrCh := env.nexusTaskPoller(ctx, s.T(), endpoint.Spec.Target.GetWorker().TaskQueue, tc.handler)

Expand All @@ -576,21 +574,21 @@ func (s *NexusApiTestSuite) TestNexusCancelOperation_Outcomes(useTemporalFailure
tc.assertion(s, err, headerCapture.lastHeaders)
s.NoError(<-pollerErrCh)

snap := capture.Snapshot()

s.Len(snap["nexus_requests"], 1)
s.Subset(snap["nexus_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome})
s.Contains(snap["nexus_requests"][0].Tags, "nexus_endpoint")
s.Equal(int64(1), snap["nexus_requests"][0].Value)
s.Equal(metrics.MetricUnit(""), snap["nexus_requests"][0].Unit)
requests := capture.Metric("nexus_requests")
s.Len(requests, 1)
s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome})
s.Contains(requests[0].Tags, "nexus_endpoint")
s.Equal(int64(1), requests[0].Value)
s.Equal(metrics.MetricUnit(""), requests[0].Unit)

s.Len(snap["nexus_latency"], 1)
s.Subset(snap["nexus_latency"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome})
s.Contains(snap["nexus_latency"][0].Tags, "nexus_endpoint")
latency := capture.Metric("nexus_latency")
s.Len(latency, 1)
s.Subset(latency[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "CancelNexusOperation", "outcome": tc.outcome})
s.Contains(latency[0].Tags, "nexus_endpoint")

// Ensure that CancelOperation request is tracked as part of normal service telemetry metrics
s.Condition(func() bool {
for _, m := range snap["service_requests"] {
for _, m := range capture.Metric("service_requests") {
if opTag, ok := m.Tags["operation"]; ok && opTag == "CancelNexusOperation" {
return true
}
Expand Down Expand Up @@ -666,13 +664,12 @@ func (s *NexusApiTestSuite) TestNexusStartOperation_WithNamespaceAndTaskQueue_Su
// with client-name in gRPC metadata, the matching service emits nexus_task_requests with a
// client_name tag. This proves the header propagates e2e: SDK → frontend → matching.
func (s *NexusApiTestSuite) TestNexusClientNameMetricPropagation(useTemporalFailures bool) {
env := newNexusTestEnv(s.T(), useTemporalFailures, testcore.WithDedicatedCluster())
env := newNexusTestEnv(s.T(), useTemporalFailures)
const expectedClientName = "temporal-go"
taskQueue := testcore.RandomizeStr("tq")
endpoint := env.createNexusEndpoint(s.T(), testcore.RandomizeStr("endpoint"), taskQueue)

capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)
capture := env.StartNamespaceMetricCapture()

ctx, cancel := context.WithCancel(testcore.NewContext())
defer cancel()
Expand Down Expand Up @@ -704,16 +701,16 @@ func (s *NexusApiTestSuite) TestNexusClientNameMetricPropagation(useTemporalFail
s.NoError(<-pollerErrCh)

// Verify that the matching service emitted nexus_task_requests with client_name tag.
snap := capture.Snapshot()
var found bool
for _, rec := range snap["nexus_task_requests"] {
requests := capture.Metric("nexus_task_requests")
for _, rec := range requests {
if rec.Tags["client_name"] == expectedClientName {
found = true
break
}
}
s.True(found, "expected nexus_task_requests metric with client_name=%s, got entries: %v",
expectedClientName, snap["nexus_task_requests"])
expectedClientName, requests)
}

func nexusEchoHandler(_ *testing.T, res *workflowservice.PollNexusTaskQueueResponse) (*nexusTaskResponse, error) {
Expand Down
50 changes: 19 additions & 31 deletions tests/nexus_api_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,25 @@ func TestNexusAPIValidationTestSuite(t *testing.T) {
}

func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTaskQueue_NamespaceNotFound() {
env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster())
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed as using StartNamespaceMetricCapture

env := newNexusTestEnv(s.T(), false)
// Also use this test to verify that namespaces are unescaped in the path.
taskQueue := testcore.RandomizeStr("task-queue")
namespace := "namespace not/found"
u := getDispatchByNsAndTqURL(env.HttpAPIAddress(), namespace, taskQueue)
client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: u, Service: "test-service"})
s.NoError(err)
ctx := testcore.NewContext()
capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)
capture := env.StartNamespaceMetricCaptureFor(namespace)
_, err = nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{})
var handlerError *nexus.HandlerError
s.ErrorAs(err, &handlerError)
s.Equal(nexus.HandlerErrorTypeNotFound, handlerError.Type)
s.Equal(fmt.Sprintf("namespace not found: %q", namespace), handlerError.Message)

snap := capture.Snapshot()

s.Len(snap["nexus_requests"], 1)
s.Equal(map[string]string{"namespace": namespace, "method": "StartNexusOperation", "outcome": "namespace_not_found", "nexus_endpoint": "_unknown_"}, snap["nexus_requests"][0].Tags)
s.Equal(int64(1), snap["nexus_requests"][0].Value)
requests := capture.Metric("nexus_requests")
s.Len(requests, 1)
s.Equal(map[string]string{"namespace": namespace, "method": "StartNexusOperation", "outcome": "namespace_not_found", "nexus_endpoint": "_unknown_"}, requests[0].Tags)
s.Equal(int64(1), requests[0].Value)
}

func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTaskQueue_NamespaceTooLong() {
Expand All @@ -67,18 +65,15 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_WithNamespaceAndTa
client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: u, Service: "test-service"})
s.NoError(err)
ctx := testcore.NewContext()
capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)
capture := env.StartGlobalMetricCapture()
_, err = nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{})
var handlerErr *nexus.HandlerError
s.ErrorAs(err, &handlerErr)
s.Equal(nexus.HandlerErrorTypeBadRequest, handlerErr.Type)
// I wish we'd never put periods in error messages :(
s.Equal("Namespace length exceeds limit.", handlerErr.Message)

snap := capture.Snapshot()

s.Len(snap["nexus_request_preprocess_errors"], 1)
s.Len(capture.Metric("nexus_request_preprocess_errors"), 1)
}

func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_Forbidden() {
Expand Down Expand Up @@ -205,8 +200,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_Forbidden() {
s.NoError(err)
ctx := testcore.NewContext()

capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)
capture := env.StartNamespaceMetricCapture()

// Wait until the endpoint is loaded into the registry.
s.Eventually(func() bool {
Expand All @@ -219,11 +213,10 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_Forbidden() {
s.ErrorAs(err, &handlerErr)
tc.checkFailure(s, handlerErr)

snap := capture.Snapshot()

s.Len(snap["nexus_requests"], 1)
s.Subset(snap["nexus_requests"][0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.expectedOutcomeMetric})
s.Equal(int64(1), snap["nexus_requests"][0].Value)
requests := capture.Metric("nexus_requests")
s.Len(requests, 1)
s.Subset(requests[0].Tags, map[string]string{"namespace": env.Namespace().String(), "method": "StartNexusOperation", "outcome": tc.expectedOutcomeMetric})
s.Equal(int64(1), requests[0].Value)
}

for _, tc := range testCases {
Expand All @@ -240,7 +233,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_PayloadSizeLimit()
input := strings.Repeat("a", (2*1024*1024)-10)

testFn := func(s *NexusAPIValidationTestSuite, dispatchOnlyByEndpoint bool) {
env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster())
env := newNexusTestEnv(s.T(), false)
taskQueue := testcore.RandomizeStr("task-queue")
testEndpoint := env.createNexusEndpoint(s.T(), testcore.RandomizeStr("test-endpoint"), taskQueue)

Expand All @@ -256,9 +249,6 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_PayloadSizeLimit()

client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: dispatchURL, Service: "test-service"})
s.NoError(err)
capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)

var result *nexusrpc.ClientStartOperationResponse[string]

// Wait until the endpoint is loaded into the registry.
Expand All @@ -283,7 +273,7 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_PayloadSizeLimit()
}

func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskMethods_VerifiesTaskTokenMatchesRequestNamespace() {
env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster())
env := newNexusTestEnv(s.T(), false)
ctx := testcore.NewContext()

tt := tokenspb.NexusTask{
Expand Down Expand Up @@ -312,7 +302,7 @@ func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskMethods_Verifies
}

func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskCompleted_ValidateOperationTokenLength() {
env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster())
env := newNexusTestEnv(s.T(), false)
ctx := testcore.NewContext()

tt := tokenspb.NexusTask{
Expand Down Expand Up @@ -345,7 +335,7 @@ func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskCompleted_Valida
}

func (s *NexusAPIValidationTestSuite) TestNexus_RespondNexusTaskMethods_ValidateFailureDetailsJSON() {
env := newNexusTestEnv(s.T(), false, testcore.WithDedicatedCluster())
env := newNexusTestEnv(s.T(), false)
ctx := testcore.NewContext()

tt := tokenspb.NexusTask{
Expand Down Expand Up @@ -399,13 +389,11 @@ func (s *NexusAPIValidationTestSuite) TestNexusStartOperation_ByEndpoint_Endpoin
client, err := nexusrpc.NewHTTPClient(nexusrpc.HTTPClientOptions{BaseURL: u, Service: "test-service"})
s.NoError(err)
ctx := testcore.NewContext()
capture := env.GetTestCluster().Host().CaptureMetricsHandler().StartCapture()
defer env.GetTestCluster().Host().CaptureMetricsHandler().StopCapture(capture)
capture := env.StartGlobalMetricCapture()
_, err = nexusrpc.StartOperation(ctx, client, op, "input", nexus.StartOperationOptions{})
var handlerErr *nexus.HandlerError
s.ErrorAs(err, &handlerErr)
s.Equal(nexus.HandlerErrorTypeNotFound, handlerErr.Type)
s.Equal("nexus endpoint not found", handlerErr.Message)
snap := capture.Snapshot()
s.Len(snap["nexus_request_preprocess_errors"], 1)
s.Len(capture.Metric("nexus_request_preprocess_errors"), 1)
}
Loading
Loading