Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion cloudevents/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,18 @@ import (
"net/http"
"os"
"os/signal"
"reflect"
"runtime"
"strings"
"sync"
"syscall"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

const (
Expand All @@ -43,8 +48,29 @@ type Service struct {
stop chan error
}

var otelOnce sync.Once

// New Service which service the given instance.
func New(f any) *Service {
// Register W3C Trace Context propagator so that trace headers injected
// by Knative's queue proxy (traceparent, tracestate) are extracted and
// made available in the context.Context passed to the function's Handle.
otelOnce.Do(func() {
p := otel.GetTextMapPropagator()
if reflect.TypeOf(p).String() != "*global.textMapPropagator" {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
p,
))
} else {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
}
})

svc := &Service{
f: f,
stop: make(chan error),
Expand All @@ -59,7 +85,9 @@ func New(f any) *Service {
mux := http.NewServeMux()
mux.HandleFunc("/health/readiness", svc.Ready)
mux.HandleFunc("/health/liveness", svc.Alive)
mux.Handle("/", newCloudeventHandler(f)) // See implementation note
// Wrap with otelhttp so W3C trace context is extracted from HTTP headers
// and propagated into the context before the function handler is called.
mux.Handle("/", otelhttp.NewHandler(newCloudeventHandler(f), "handle"))
svc.Handler = mux
return svc
}
Expand Down
88 changes: 88 additions & 0 deletions cloudevents/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"log"
"net/http"
"os"
"strings"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"go.opentelemetry.io/otel/trace"
"knative.dev/func-go/cloudevents/mock"
)

Expand Down Expand Up @@ -145,10 +147,17 @@ func TestCfg_Static(t *testing.T) {
defer cancel()

// Run test from within a temp dir
wd, err := os.Getwd()
if err != nil {
t.Fatal(err)
}
dir := t.TempDir()
if err := os.Chdir(dir); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
_ = os.Chdir(wd)
})

// Write an example `cfg` file
if err := os.WriteFile("cfg", []byte(`FUNC_VERSION="v1.2.3"`), os.ModePerm); err != nil {
Expand Down Expand Up @@ -394,3 +403,82 @@ func TestAlive_Invoked(t *testing.T) {
t.Fatalf("unexpected http status code: %v", resp.StatusCode)
}
}

// TestHandle_OTelTracePropagation ensures that W3C trace context headers
// injected into the request are correctly parsed and propagated to the
// context.Context of the function's handle function.
func TestHandle_OTelTracePropagation(t *testing.T) {
t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") // use an OS-chosen port

var (
ctx, cancel = context.WithCancel(context.Background())
errCh = make(chan error)
startCh = make(chan any)
timeoutCh = time.After(500 * time.Millisecond)
onStart = func(_ context.Context, _ map[string]string) error {
startCh <- true
return nil
}
traceCheckedCh = make(chan struct{})
onHandle = func(ctx context.Context, event event.Event) (*event.Event, error) {
sc := trace.SpanContextFromContext(ctx)
if !sc.IsValid() {
t.Error("expected a valid span context from propagated trace headers, but got invalid")
}
if sc.TraceID().String() != "4bf92f3577b34da6a3ce929d0e0e4736" {
t.Errorf("expected TraceID '4bf92f3577b34da6a3ce929d0e0e4736', got %v", sc.TraceID().String())
}
// SpanID is not asserted here because otelhttp may start a new server span
// (with a different SpanID) when a tracer provider is configured.
close(traceCheckedCh)
return nil, nil
}
)
defer cancel()

f := &mock.Function{OnStart: onStart, OnHandle: onHandle}
service := New(f)

go func() {
if err := service.Start(ctx); err != nil {
errCh <- err
}
}()

select {
case <-timeoutCh:
t.Fatal("function failed to start")
case err := <-errCh:
t.Fatal(err)
case <-startCh:
}

// Send a request with trace headers
req, err := http.NewRequest("POST", "http://"+service.Addr().String(), strings.NewReader(`{"hello":"world"}`))
if err != nil {
t.Fatal(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("ce-specversion", "1.0")
req.Header.Set("ce-type", "example.type")
req.Header.Set("ce-source", "example/uri")
req.Header.Set("ce-id", "a688ed0b")
req.Header.Set("traceparent", "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected http status code: %v", resp.StatusCode)
}

select {
case <-traceCheckedCh:
// Passed trace context validation
case <-time.After(500 * time.Millisecond):
t.Fatal("timed out waiting for trace context check in handle")
}
}
10 changes: 9 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
module knative.dev/func-go

go 1.24
go 1.24.0

require (
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/rs/zerolog v1.32.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/trace v1.38.0
knative.dev/hack v0.0.0-20260428014158-b2a37f1b6e7b
)

require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
27 changes: 21 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand All @@ -31,10 +38,20 @@ github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand All @@ -48,8 +65,6 @@ golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
knative.dev/hack v0.0.0-20260428014158-b2a37f1b6e7b h1:MvbV2F2BdI8qKrYYUhDwbUZbX0BAYRSIpXM2TOtTvs0=
Expand Down
30 changes: 29 additions & 1 deletion http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ import (
"net/http"
"os"
"os/signal"
"reflect"
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/rs/zerolog/log"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

const (
Expand Down Expand Up @@ -45,8 +50,29 @@ type Service struct {
f Handler
}

var otelOnce sync.Once

// New Service which serves the given instance.
func New(f Handler) *Service {
// Register W3C Trace Context propagator so that trace headers injected
// by Knative's queue proxy (traceparent, tracestate) are extracted and
// made available in the context.Context passed to the function's Handle.
otelOnce.Do(func() {
p := otel.GetTextMapPropagator()
if reflect.TypeOf(p).String() != "*global.textMapPropagator" {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
p,
))
} else {
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
))
}
})

svc := &Service{
f: f,
stop: make(chan error),
Expand All @@ -61,7 +87,9 @@ func New(f Handler) *Service {
mux := http.NewServeMux()
mux.HandleFunc("/health/readiness", svc.Ready)
mux.HandleFunc("/health/liveness", svc.Alive)
mux.HandleFunc("/", svc.Handle)
// Wrap with otelhttp so W3C trace context is extracted from HTTP headers
// and propagated into the context before the function handler is called.
mux.Handle("/", otelhttp.NewHandler(http.HandlerFunc(svc.Handle), "handle"))
svc.Handler = mux

// Print some helpful information about which interfaces the function
Expand Down
Loading