diff --git a/cloudevents/service.go b/cloudevents/service.go index 19c0742..9c34d35 100644 --- a/cloudevents/service.go +++ b/cloudevents/service.go @@ -11,13 +11,18 @@ import ( "net/http" "os" "os/signal" + "reflect" "runtime" "strings" + "sync" "syscall" "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/rs/zerolog/log" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) const ( @@ -43,8 +48,29 @@ type Service struct { stop chan error } +var otelOnce sync.Once + // New Service which service the given instance. func New(f any) *Service { + // Register W3C Trace Context propagator so that trace headers injected + // by Knative's queue proxy (traceparent, tracestate) are extracted and + // made available in the context.Context passed to the function's Handle. + otelOnce.Do(func() { + p := otel.GetTextMapPropagator() + if reflect.TypeOf(p).String() != "*global.textMapPropagator" { + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + p, + )) + } else { + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + } + }) + svc := &Service{ f: f, stop: make(chan error), @@ -59,7 +85,9 @@ func New(f any) *Service { mux := http.NewServeMux() mux.HandleFunc("/health/readiness", svc.Ready) mux.HandleFunc("/health/liveness", svc.Alive) - mux.Handle("/", newCloudeventHandler(f)) // See implementation note + // Wrap with otelhttp so W3C trace context is extracted from HTTP headers + // and propagated into the context before the function handler is called. + mux.Handle("/", otelhttp.NewHandler(newCloudeventHandler(f), "handle")) svc.Handler = mux return svc } diff --git a/cloudevents/service_test.go b/cloudevents/service_test.go index 2adf686..9ab679f 100644 --- a/cloudevents/service_test.go +++ b/cloudevents/service_test.go @@ -6,11 +6,13 @@ import ( "log" "net/http" "os" + "strings" "testing" "time" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/event" + "go.opentelemetry.io/otel/trace" "knative.dev/func-go/cloudevents/mock" ) @@ -145,10 +147,17 @@ func TestCfg_Static(t *testing.T) { defer cancel() // Run test from within a temp dir + wd, err := os.Getwd() + if err != nil { + t.Fatal(err) + } dir := t.TempDir() if err := os.Chdir(dir); err != nil { t.Fatal(err) } + t.Cleanup(func() { + _ = os.Chdir(wd) + }) // Write an example `cfg` file if err := os.WriteFile("cfg", []byte(`FUNC_VERSION="v1.2.3"`), os.ModePerm); err != nil { @@ -394,3 +403,82 @@ func TestAlive_Invoked(t *testing.T) { t.Fatalf("unexpected http status code: %v", resp.StatusCode) } } + +// TestHandle_OTelTracePropagation ensures that W3C trace context headers +// injected into the request are correctly parsed and propagated to the +// context.Context of the function's handle function. +func TestHandle_OTelTracePropagation(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") // use an OS-chosen port + + var ( + ctx, cancel = context.WithCancel(context.Background()) + errCh = make(chan error) + startCh = make(chan any) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, _ map[string]string) error { + startCh <- true + return nil + } + traceCheckedCh = make(chan struct{}) + onHandle = func(ctx context.Context, event event.Event) (*event.Event, error) { + sc := trace.SpanContextFromContext(ctx) + if !sc.IsValid() { + t.Error("expected a valid span context from propagated trace headers, but got invalid") + } + if sc.TraceID().String() != "4bf92f3577b34da6a3ce929d0e0e4736" { + t.Errorf("expected TraceID '4bf92f3577b34da6a3ce929d0e0e4736', got %v", sc.TraceID().String()) + } + // SpanID is not asserted here because otelhttp may start a new server span + // (with a different SpanID) when a tracer provider is configured. + close(traceCheckedCh) + return nil, nil + } + ) + defer cancel() + + f := &mock.Function{OnStart: onStart, OnHandle: onHandle} + service := New(f) + + go func() { + if err := service.Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to start") + case err := <-errCh: + t.Fatal(err) + case <-startCh: + } + + // Send a request with trace headers + req, err := http.NewRequest("POST", "http://"+service.Addr().String(), strings.NewReader(`{"hello":"world"}`)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("ce-specversion", "1.0") + req.Header.Set("ce-type", "example.type") + req.Header.Set("ce-source", "example/uri") + req.Header.Set("ce-id", "a688ed0b") + req.Header.Set("traceparent", "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected http status code: %v", resp.StatusCode) + } + + select { + case <-traceCheckedCh: + // Passed trace context validation + case <-time.After(500 * time.Millisecond): + t.Fatal("timed out waiting for trace context check in handle") + } +} diff --git a/go.mod b/go.mod index 343d999..70526cc 100644 --- a/go.mod +++ b/go.mod @@ -5,16 +5,24 @@ go 1.25.0 require ( github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/rs/zerolog v1.32.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 + go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/trace v1.38.0 knative.dev/hack v0.0.0-20260428014158-b2a37f1b6e7b ) require ( + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/sys v0.18.0 // indirect diff --git a/go.sum b/go.sum index d18bdd9..6b902cc 100644 --- a/go.sum +++ b/go.sum @@ -4,9 +4,16 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= -github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -31,10 +38,20 @@ github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= @@ -48,8 +65,6 @@ golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= knative.dev/hack v0.0.0-20260428014158-b2a37f1b6e7b h1:MvbV2F2BdI8qKrYYUhDwbUZbX0BAYRSIpXM2TOtTvs0= diff --git a/http/service.go b/http/service.go index 4eb99aa..8f25325 100644 --- a/http/service.go +++ b/http/service.go @@ -11,12 +11,17 @@ import ( "net/http" "os" "os/signal" + "reflect" "runtime" "strings" + "sync" "syscall" "time" "github.com/rs/zerolog/log" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) const ( @@ -45,8 +50,29 @@ type Service struct { f Handler } +var otelOnce sync.Once + // New Service which serves the given instance. func New(f Handler) *Service { + // Register W3C Trace Context propagator so that trace headers injected + // by Knative's queue proxy (traceparent, tracestate) are extracted and + // made available in the context.Context passed to the function's Handle. + otelOnce.Do(func() { + p := otel.GetTextMapPropagator() + if reflect.TypeOf(p).String() != "*global.textMapPropagator" { + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + p, + )) + } else { + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )) + } + }) + svc := &Service{ f: f, stop: make(chan error), @@ -61,7 +87,9 @@ func New(f Handler) *Service { mux := http.NewServeMux() mux.HandleFunc("/health/readiness", svc.Ready) mux.HandleFunc("/health/liveness", svc.Alive) - mux.HandleFunc("/", svc.Handle) + // Wrap with otelhttp so W3C trace context is extracted from HTTP headers + // and propagated into the context before the function handler is called. + mux.Handle("/", otelhttp.NewHandler(http.HandlerFunc(svc.Handle), "handle")) svc.Handler = mux // Print some helpful information about which interfaces the function diff --git a/http/service_test.go b/http/service_test.go index dac6fb2..8849e58 100644 --- a/http/service_test.go +++ b/http/service_test.go @@ -6,9 +6,11 @@ import ( "io" "net/http" "os" + "strings" "testing" "time" + "go.opentelemetry.io/otel/trace" "knative.dev/func-go/http/mock" ) @@ -150,10 +152,17 @@ func TestCfg_Static(t *testing.T) { defer cancel() // Run test from within a temp dir + wd, err := os.Getwd() + if err != nil { + t.Fatal(err) + } dir := t.TempDir() if err := os.Chdir(dir); err != nil { t.Fatal(err) } + t.Cleanup(func() { + _ = os.Chdir(wd) + }) // Write an example `cfg` file if err := os.WriteFile("cfg", []byte(`FUNC_VERSION="v1.2.3"`), os.ModePerm); err != nil { @@ -420,3 +429,78 @@ func TestHandle_WithContext(t *testing.T) { }() t.Log("legacy static handler signature accepted. see func tests for confirmation of invocation") } + +// TestHandle_OTelTracePropagation ensures that W3C trace context headers +// injected into the request are correctly parsed and propagated to the +// context.Context of the function's handle function. +func TestHandle_OTelTracePropagation(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") // use an OS-chosen port + + var ( + ctx, cancel = context.WithCancel(context.Background()) + errCh = make(chan error) + startCh = make(chan any) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, _ map[string]string) error { + startCh <- true + return nil + } + traceCheckedCh = make(chan struct{}) + onHandle = func(w http.ResponseWriter, r *http.Request) { + sc := trace.SpanContextFromContext(r.Context()) + if !sc.IsValid() { + t.Error("expected a valid span context from propagated trace headers, but got invalid") + } + if sc.TraceID().String() != "4bf92f3577b34da6a3ce929d0e0e4736" { + t.Errorf("expected TraceID '4bf92f3577b34da6a3ce929d0e0e4736', got %v", sc.TraceID().String()) + } + // SpanID is not asserted here because otelhttp may start a new server span + // (with a different SpanID) when a tracer provider is configured. + close(traceCheckedCh) + w.WriteHeader(http.StatusOK) + } + ) + defer cancel() + + f := &mock.Function{OnStart: onStart, OnHandle: onHandle} + service := New(f) + + go func() { + if err := service.Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to start") + case err := <-errCh: + t.Fatal(err) + case <-startCh: + } + + // Send a request with trace headers + req, err := http.NewRequest("POST", "http://"+service.Addr().String(), strings.NewReader(`{"hello":"world"}`)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("traceparent", "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected http status code: %v", resp.StatusCode) + } + + select { + case <-traceCheckedCh: + // Passed trace context validation + case <-time.After(500 * time.Millisecond): + t.Fatal("timed out waiting for trace context check in handle") + } +}