diff --git a/cmd/fkafka/main.go b/cmd/fkafka/main.go new file mode 100644 index 0000000..0fc568b --- /dev/null +++ b/cmd/fkafka/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "context" + "fmt" + "os" + + "knative.dev/func-go/kafka" +) + +// Main illustrates how scaffolding works to wrap a user's function. +func main() { + // Instanced Example + // (in scaffolding 'New()' will be in module 'f') + if err := kafka.Start(New()); err != nil { + fmt.Fprintln(os.Stderr, err.Error()) + os.Exit(1) + } + + // Static Example + // (in scaffolding 'Handle' will be in the module 'f') + // if err := kafka.Start(kafka.DefaultHandler{Handler: Handle}); err != nil { + // fmt.Fprintln(os.Stderr, err.Error()) + // os.Exit(1) + // } +} + +// Handle is an example static function implementation. +func Handle(ctx context.Context, msg kafka.Message) error { + fmt.Println("Static Kafka Handler invoked") + return nil +} + +// MyFunction is an example instanced Kafka function implementation. +type MyFunction struct{} + +func New() *MyFunction { + return &MyFunction{} +} + +func (f *MyFunction) Handle(ctx context.Context, msg kafka.Message) error { + fmt.Printf("Received message: topic=%s partition=%d offset=%d key=%s value=%s\n", + msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) + return nil +} diff --git a/go.mod b/go.mod index 343d999..e48e3fd 100644 --- a/go.mod +++ b/go.mod @@ -3,19 +3,33 @@ module knative.dev/func-go go 1.25.0 require ( + github.com/IBM/sarama v1.50.3 github.com/cloudevents/sdk-go/v2 v2.15.2 github.com/rs/zerolog v1.32.0 knative.dev/hack v0.0.0-20260428014158-b2a37f1b6e7b ) require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.18.6 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pierrec/lz4/v4 v4.1.27 // indirect + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/crypto v0.53.0 // indirect + golang.org/x/net v0.56.0 // indirect + golang.org/x/sys v0.46.0 // indirect ) diff --git a/go.sum b/go.sum index d18bdd9..f716086 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,40 @@ +github.com/IBM/sarama v1.50.3 h1:zpY2iZYmt+z+0Bo3aYF+cD48OBt2hIgiDPZUuZKTXcc= +github.com/IBM/sarama v1.50.3/go.mod h1:Jo4MSfdDT3ycmQj7/ab8eLZwnvwCKZm/8H7SCbtyo8U= github.com/cloudevents/sdk-go/v2 v2.15.2 h1:54+I5xQEnI73RBhWHxbI1XJcqOFOVJN85vb41+8mHUc= github.com/cloudevents/sdk-go/v2 v2.15.2/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao= +github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -23,33 +46,82 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pierrec/lz4/v4 v4.1.27 h1:+PhzhWDrjRj89TH2sw43nE3+4+W8lSxIuQadEHZyjUk= +github.com/pierrec/lz4/v4 v4.1.27/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.53.0 h1:QZ4Muo8THX6CizN2vPPd5fBGHyogrdK9fG4wLPFUsto= +golang.org/x/crypto v0.53.0/go.mod h1:DNLU434OwVakk9PzuwV8w62mAJpRJL3vsgcfp4Qnsio= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.56.0 h1:Rw8j/hFzGvJUZwNBXnAtf5sVDVt+65SK2C7IxCxZt5o= +golang.org/x/net v0.56.0/go.mod h1:D3Ku6r+V6JROoZK144D2XfMHFcMq/0zSfLelVTCFKec= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.21.0 h1:HLII4xRRTtCRkxYp4HNFF0Js/Og6q2i++KXbg0gHCwM= +golang.org/x/sync v0.21.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.46.0 h1:noSf2Fq6F8DBgS+LysIkx7rIExoNHJsxOAtPp4rthXw= +golang.org/x/sys v0.46.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= knative.dev/hack v0.0.0-20260428014158-b2a37f1b6e7b h1:MvbV2F2BdI8qKrYYUhDwbUZbX0BAYRSIpXM2TOtTvs0= diff --git a/kafka/instance.go b/kafka/instance.go new file mode 100644 index 0000000..b283b7a --- /dev/null +++ b/kafka/instance.go @@ -0,0 +1,42 @@ +package kafka + +import "context" + +// Handler is a function instance which can handle a Kafka message. +type Handler interface { + // Handle a Kafka message. + Handle(context.Context, Message) error +} + +// Starter is an instance which has defined the Start hook. +type Starter interface { + // Start instance event hook. + Start(context.Context, map[string]string) error +} + +// Stopper is an instance which has defined the Stop hook. +type Stopper interface { + // Stop instance event hook. + Stop(context.Context) error +} + +// ReadinessReporter is an instance which reports its readiness. +type ReadinessReporter interface { + // Ready to be invoked or not. + Ready(context.Context) (bool, error) +} + +// LivenessReporter is an instance which reports it is alive. +type LivenessReporter interface { + // Alive allows the instance to report its liveness status. + Alive(context.Context) (bool, error) +} + +// DefaultHandler wraps a static function for use with the Kafka middleware. +type DefaultHandler struct { + Handler func(context.Context, Message) error +} + +func (f DefaultHandler) Handle(ctx context.Context, msg Message) error { + return f.Handler(ctx, msg) +} \ No newline at end of file diff --git a/kafka/logs.go b/kafka/logs.go new file mode 100644 index 0000000..f2c45e3 --- /dev/null +++ b/kafka/logs.go @@ -0,0 +1,23 @@ +package kafka + +import "github.com/rs/zerolog" + +func init() { + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + SetLogLevel(DefaultLogLevel) +} + +type logLevel zerolog.Level + +const ( + LogDebug = logLevel(zerolog.DebugLevel) + LogInfo = logLevel(zerolog.InfoLevel) + LogWarn = logLevel(zerolog.WarnLevel) + LogDisabled = logLevel(zerolog.Disabled) +) + +// SetLogLevel to LogDebug, LogInfo, LogWarn, or LogDisabled +// Errors are always returned as values. +func SetLogLevel(l logLevel) { + zerolog.SetGlobalLevel(zerolog.Level(l)) +} diff --git a/kafka/message.go b/kafka/message.go new file mode 100644 index 0000000..fbf4f4b --- /dev/null +++ b/kafka/message.go @@ -0,0 +1,20 @@ +package kafka + +import "time" + +// Message represents a Kafka message delivered to the function's handler. +type Message struct { + Key []byte + Value []byte + Headers []Header + Topic string + Partition int32 + Offset int64 + Timestamp time.Time +} + +// Header is a key-value pair attached to a Kafka message. +type Header struct { + Key string + Value []byte +} \ No newline at end of file diff --git a/kafka/mock/function.go b/kafka/mock/function.go new file mode 100644 index 0000000..7b547f6 --- /dev/null +++ b/kafka/mock/function.go @@ -0,0 +1,25 @@ +package mock + +import "context" + +// Function is a mock for testing lifecycle hooks (Start/Stop). +// It does not implement kafka.Handler to avoid import cycles. +// Tests that need a full kafka.Handler should define one inline. +type Function struct { + OnStart func(context.Context, map[string]string) error + OnStop func(context.Context) error +} + +func (f *Function) Start(ctx context.Context, cfg map[string]string) error { + if f.OnStart != nil { + return f.OnStart(ctx, cfg) + } + return nil +} + +func (f *Function) Stop(ctx context.Context) error { + if f.OnStop != nil { + return f.OnStop(ctx) + } + return nil +} \ No newline at end of file diff --git a/kafka/service.go b/kafka/service.go new file mode 100644 index 0000000..5dbfd29 --- /dev/null +++ b/kafka/service.go @@ -0,0 +1,443 @@ +// Package kafka implements a Functions Kafka middleware for use by +// scaffolding which exposes a function as a service that consumes +// messages from Kafka topics. +package kafka + +import ( + "bufio" + "context" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "runtime" + "strings" + "sync/atomic" + "syscall" + "time" + + "github.com/IBM/sarama" + "github.com/rs/zerolog/log" +) + +const ( + DefaultLogLevel = LogDebug + DefaultListenAddress = "[::]:8080" +) + +const ( + ServerShutdownTimeout = 30 * time.Second + InstanceStopTimeout = 30 * time.Second +) + +// Start an instance using a new Service. +func Start(f Handler) error { + log.Debug().Msg("func runtime creating function instance") + return New(f).Start(context.Background()) +} + +// Service exposes a Function Instance as a Kafka consumer with HTTP health +// endpoints. +type Service struct { + http.Server + listener net.Listener + stop chan error + f Handler + ready atomic.Bool +} + +// New Service which serves the given instance. +func New(f Handler) *Service { + svc := &Service{ + f: f, + stop: make(chan error, 1), + Server: http.Server{ + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 30 * time.Second, + MaxHeaderBytes: 1 << 20, + ReadHeaderTimeout: 2 * time.Second, + }, + } + mux := http.NewServeMux() + mux.HandleFunc("/health/readiness", svc.Ready) + mux.HandleFunc("/health/liveness", svc.Alive) + svc.Handler = mux + + logImplements(f) + + return svc +} + +func logImplements(f any) { + if _, ok := f.(Starter); ok { + log.Info().Msg("Function implements Start") + } + if _, ok := f.(Stopper); ok { + log.Info().Msg("Function implements Stop") + } + if _, ok := f.(ReadinessReporter); ok { + log.Info().Msg("Function implements Ready") + } + if _, ok := f.(LivenessReporter); ok { + log.Info().Msg("Function implements Alive") + } +} + +// Start the service. Blocks until the context is canceled, a runtime error +// occurs, or an OS interrupt/kill signal is received. +func (s *Service) Start(ctx context.Context) (err error) { + addr := listenAddress() + log.Debug().Str("address", addr).Msg("function starting") + + if s.listener, err = net.Listen("tcp", addr); err != nil { + return + } + + if err = s.startInstance(ctx); err != nil { + return + } + + s.handleSignals() + + // Start HTTP health server + go func() { + if err := s.Serve(s.listener); err != http.ErrServerClosed { + log.Error().Err(err).Msg("http server exited with unexpected error") + s.stop <- err + } + }() + + // Start Kafka consumer + consumerCtx, consumerCancel := context.WithCancel(ctx) + defer consumerCancel() + go func() { + if err := s.consumeLoop(consumerCtx); err != nil { + log.Error().Err(err).Msg("kafka consumer exited with error") + s.stop <- err + } + }() + + log.Debug().Msg("waiting for stop signals or errors") + select { + case err = <-s.stop: + if err != nil { + log.Error().Err(err).Msg("function error") + } + case <-ctx.Done(): + log.Debug().Msg("function canceled") + } + consumerCancel() + return s.shutdown(err) +} + +// Addr returns the address upon which the service is listening if started; +// nil otherwise. +func (s *Service) Addr() net.Addr { + if s.listener == nil { + return nil + } + return s.listener.Addr() +} + +// Ready handles readiness checks. +func (s *Service) Ready(w http.ResponseWriter, r *http.Request) { + if i, ok := s.f.(ReadinessReporter); ok { + ready, err := i.Ready(r.Context()) + if err != nil { + log.Debug().Err(err).Msg("error checking readiness") + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, "error checking readiness: ", err.Error()) + return + } + if !ready { + log.Debug().Msg("function not yet ready") + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintln(w, "function not yet ready") + return + } + } else if !s.ready.Load() { + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintln(w, "kafka consumer not yet ready") + return + } + fmt.Fprintf(w, "READY") +} + +// Alive handles liveness checks. +func (s *Service) Alive(w http.ResponseWriter, r *http.Request) { + if i, ok := s.f.(LivenessReporter); ok { + alive, err := i.Alive(r.Context()) + if err != nil { + log.Err(err).Msg("error checking liveness") + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprint(w, "error checking liveness: ", err.Error()) + return + } + if !alive { + log.Debug().Msg("function not alive") + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte("function not alive")) + return + } + } + fmt.Fprintf(w, "ALIVE") +} + +func (s *Service) startInstance(ctx context.Context) error { + if i, ok := s.f.(Starter); ok { + cfg, err := newCfg() + if err != nil { + return err + } + go func() { + if err := i.Start(ctx, cfg); err != nil { + s.stop <- err + } + }() + } else { + log.Debug().Msg("function does not implement Start. Skipping") + } + return nil +} + +func (s *Service) handleSignals() { + sigs := make(chan os.Signal, 2) + signal.Notify(sigs) + go func() { + for { + sig := <-sigs + if sig == syscall.SIGINT || sig == syscall.SIGTERM { + log.Debug().Any("signal", sig).Msg("signal received") + s.stop <- nil + } else if runtime.GOOS == "linux" && sig == syscall.Signal(0x17) { + // Ignore SIGURG; signal 23 (0x17) + } + } + }() +} + +func (s *Service) shutdown(sourceErr error) (err error) { + log.Debug().Msg("function stopping") + var runtimeErr, instanceErr error + + ctx, cancel := context.WithTimeout(context.Background(), ServerShutdownTimeout) + defer cancel() + runtimeErr = s.Shutdown(ctx) + + if i, ok := s.f.(Stopper); ok { + ctx, cancel = context.WithTimeout(context.Background(), InstanceStopTimeout) + defer cancel() + instanceErr = i.Stop(ctx) + } + + return collapseErrors("shutdown error", sourceErr, instanceErr, runtimeErr) +} + +// consumeLoop connects to Kafka and consumes messages, calling the function +// handler for each message. +func (s *Service) consumeLoop(ctx context.Context) error { + brokers := kafkaBrokers() + topics := kafkaTopics() + group := kafkaConsumerGroup() + + if len(brokers) == 0 { + return fmt.Errorf("KAFKA_BROKERS environment variable is required") + } + if len(topics) == 0 { + return fmt.Errorf("KAFKA_TOPICS environment variable is required") + } + if group == "" { + return fmt.Errorf("KAFKA_CONSUMER_GROUP environment variable is required") + } + + log.Info(). + Strs("brokers", brokers). + Strs("topics", topics). + Str("group", group). + Msg("connecting to kafka") + + config := sarama.NewConfig() + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{ + sarama.NewBalanceStrategyRoundRobin(), + } + config.Consumer.Offsets.Initial = sarama.OffsetNewest + + client, err := sarama.NewConsumerGroup(brokers, group, config) + if err != nil { + return fmt.Errorf("creating consumer group: %w", err) + } + defer client.Close() + + handler := &consumerGroupHandler{ + f: s.f, + ready: &s.ready, + } + + for { + if err := client.Consume(ctx, topics, handler); err != nil { + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("consumer error: %w", err) + } + if ctx.Err() != nil { + return nil + } + // Rebalance happened; loop to rejoin. + s.ready.Store(false) + } +} + +// consumerGroupHandler implements sarama.ConsumerGroupHandler. +// +// TODO: support exactly-once semantics via transactional consumer/producer +// TODO: add optional deduplication (e.g. by message key or offset tracking) +type consumerGroupHandler struct { + f Handler + ready *atomic.Bool +} + +func (h *consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { + h.ready.Store(true) + log.Info().Msg("kafka consumer ready (partitions assigned)") + return nil +} + +func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { + h.ready.Store(false) + log.Info().Msg("kafka consumer partitions revoked") + return nil +} + +func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + m := Message{ + Key: msg.Key, + Value: msg.Value, + Topic: msg.Topic, + Partition: msg.Partition, + Offset: msg.Offset, + Timestamp: msg.Timestamp, + } + for _, h := range msg.Headers { + if h != nil { + m.Headers = append(m.Headers, Header{ + Key: string(h.Key), + Value: h.Value, + }) + } + } + + // TODO: add retry support (backoff, max attempts, DLQ) + // TODO: a failed message is effectively lost if a later message in the + // same partition succeeds, because MarkMessage on a higher offset + // implicitly commits the earlier one. Consider stopping the partition + // on error or tracking failed offsets separately. + if err := h.f.Handle(session.Context(), m); err != nil { + log.Error().Err(err). + Str("topic", msg.Topic). + Int32("partition", msg.Partition). + Int64("offset", msg.Offset). + Msg("error handling kafka message") + continue + } + session.MarkMessage(msg, "") + } + return nil +} + +func kafkaBrokers() []string { + v := os.Getenv("KAFKA_BROKERS") + if v == "" { + return nil + } + return strings.Split(v, ",") +} + +func kafkaTopics() []string { + v := os.Getenv("KAFKA_TOPICS") + if v == "" { + return nil + } + return strings.Split(v, ",") +} + +func kafkaConsumerGroup() string { + return os.Getenv("KAFKA_CONSUMER_GROUP") +} + +func listenAddress() string { + listenAddress := os.Getenv("LISTEN_ADDRESS") + if listenAddress != "" { + return listenAddress + } + + address := os.Getenv("ADDRESS") + port := os.Getenv("PORT") + if address != "" || port != "" { + if address != "" { + log.Warn().Msg("Environment variable ADDRESS is deprecated and support will be removed in future versions. Try rebuilding your Function with the latest version of func to use LISTEN_ADDRESS instead.") + } else { + address = "127.0.0.1" + } + if port != "" { + log.Warn().Msg("Environment variable PORT is deprecated and support will be removed in future version.s Try rebuilding your Function with the latest version of func to use LISTEN_ADDRESS instead.") + } else { + port = "8080" + } + return address + ":" + port + } + + return DefaultListenAddress +} + +func readCfg() (map[string]string, error) { + cfg := map[string]string{} + + f, err := os.Open("cfg") + if err != nil { + log.Debug().Msg("no static config") + return cfg, nil + } + defer f.Close() + + scanner := bufio.NewScanner(f) + i := 0 + for scanner.Scan() { + i++ + line := scanner.Text() + parts := strings.SplitN(line, "=", 2) + if len(parts) != 2 { + return cfg, fmt.Errorf("config line %v invalid: %v", i, line) + } + cfg[strings.TrimSpace(parts[0])] = strings.Trim(strings.TrimSpace(parts[1]), "\"") + } + return cfg, scanner.Err() +} + +func newCfg() (cfg map[string]string, err error) { + if cfg, err = readCfg(); err != nil { + return + } + + for _, e := range os.Environ() { + pair := strings.SplitN(e, "=", 2) + cfg[pair[0]] = pair[1] + } + return +} + +func collapseErrors(msg string, ee ...error) (err error) { + for _, e := range ee { + if e != nil { + if err == nil { + err = e + } else { + log.Error().Err(e).Msg(msg) + } + } + } + return +} \ No newline at end of file diff --git a/kafka/service_test.go b/kafka/service_test.go new file mode 100644 index 0000000..db72db7 --- /dev/null +++ b/kafka/service_test.go @@ -0,0 +1,556 @@ +package kafka + +import ( + "context" + "fmt" + "io" + "net" + "net/http" + "os" + "testing" + "time" +) + +// testFunction is a mock function used in tests. Defined inline to avoid +// import cycles with the kafka/mock package. +type testFunction struct { + onStart func(context.Context, map[string]string) error + onStop func(context.Context) error + onHandle func(context.Context, Message) error +} + +func (f *testFunction) Start(ctx context.Context, cfg map[string]string) error { + if f.onStart != nil { + return f.onStart(ctx, cfg) + } + return nil +} + +func (f *testFunction) Stop(ctx context.Context) error { + if f.onStop != nil { + return f.onStop(ctx) + } + return nil +} + +func (f *testFunction) Handle(ctx context.Context, msg Message) error { + if f.onHandle != nil { + return f.onHandle(ctx, msg) + } + return nil +} + +// TestStart_Invoked ensures that the Start method of a function is invoked +// if it is implemented by the function instance. +func TestStart_Invoked(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + ctx, cancel = context.WithCancel(context.Background()) + startCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, _ map[string]string) error { + startCh <- true + return nil + } + ) + defer cancel() + + f := &testFunction{onStart: onStart} + + go func() { + if err := New(f).Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + // Consumer loop will error because KAFKA_BROKERS is not set, + // but Start should still be invoked before that. + _ = err + case <-startCh: + t.Log("start signal received") + } + cancel() +} + +// TestStart_Static checks that static method Start(f) is a convenience method +// for New(f).Start(). +func TestStart_Static(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + startCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, _ map[string]string) error { + startCh <- true + return nil + } + ) + + f := &testFunction{onStart: onStart} + + go func() { + if err := Start(f); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + _ = err + case <-startCh: + t.Log("start signal received") + } +} + +// TestStart_CfgEnvs ensures that the function's Start method receives a map +// containing all available environment variables as a parameter. +func TestStart_CfgEnvs(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + ctx, cancel = context.WithCancel(context.Background()) + startCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, cfg map[string]string) error { + v := cfg["TEST_ENV"] + if v != "example_value" { + t.Fatalf("did not receive TEST_ENV. got %v", cfg["TEST_ENV"]) + } else { + t.Log("expected value received") + } + startCh <- true + return nil + } + ) + defer cancel() + + f := &testFunction{onStart: onStart} + + t.Setenv("TEST_ENV", "example_value") + + go func() { + if err := New(f).Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + _ = err + case <-startCh: + t.Log("start signal received") + } +} + +// TestCfg_Static ensures that additional static "environment variables" +// built into the container as cfg are correctly read. +func TestCfg_Static(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + ctx, cancel = context.WithCancel(context.Background()) + startCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + ) + defer cancel() + + dir := t.TempDir() + if err := os.Chdir(dir); err != nil { + t.Fatal(err) + } + + if err := os.WriteFile("cfg", []byte(`FUNC_VERSION="v1.2.3"`), os.ModePerm); err != nil { + t.Fatal(err) + } + + f := &testFunction{onStart: func(_ context.Context, cfg map[string]string) error { + v := cfg["FUNC_VERSION"] + if v != "v1.2.3" { + t.Fatalf("FUNC_VERSION not received. Expected 'v1.2.3', got '%v'", + cfg["FUNC_VERSION"]) + } else { + t.Log("expected value received") + } + startCh <- true + return nil + }} + + go func() { + if err := New(f).Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + _ = err + case <-startCh: + t.Log("start signal received") + } +} + +// TestStop_Invoked ensures the Stop method of a function is invoked on context +// cancellation if it is implemented by the function instance. +func TestStop_Invoked(t *testing.T) { + t.Setenv("LISTEN_ADDRESS", "127.0.0.1:") + + var ( + ctx, cancel = context.WithCancel(context.Background()) + startCh = make(chan any) + stopCh = make(chan any) + errCh = make(chan error) + timeoutCh = time.After(500 * time.Millisecond) + onStart = func(_ context.Context, _ map[string]string) error { + startCh <- true + return nil + } + onStop = func(_ context.Context) error { + stopCh <- true + return nil + } + ) + + f := &testFunction{onStart: onStart, onStop: onStop} + + go func() { + if err := New(f).Start(ctx); err != nil { + errCh <- err + } + }() + + select { + case <-timeoutCh: + t.Fatal("function failed to notify of start") + case err := <-errCh: + _ = err + return + case <-startCh: + t.Log("start signal received") + } + + cancel() + + select { + case <-time.After(500 * time.Millisecond): + t.Fatal("function failed to notify of stop") + case err := <-errCh: + _ = err + case <-stopCh: + t.Log("stop signal received") + } +} + +// TestReady_Invoked ensures the default Ready endpoint returns OK when +// the consumer is marked as ready. +func TestReady_Invoked(t *testing.T) { + f := &testFunction{} + service := New(f) + service.ready.Store(true) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected http status code: %v", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if string(body) != "READY" { + t.Fatalf("unexpected body: %v", string(body)) + } +} + +// TestReady_NotReady ensures the readiness endpoint returns 503 when the +// consumer has not yet joined the group. +func TestReady_NotReady(t *testing.T) { + f := &testFunction{} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %v", resp.StatusCode) + } +} + +// TestAlive_Invoked ensures the default Alive endpoint returns OK. +func TestAlive_Invoked(t *testing.T) { + f := &testFunction{} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/liveness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected http status code: %v", resp.StatusCode) + } +} + +// TestHandle_Direct ensures the handler is invoked correctly when called +// directly (without a real Kafka broker). +func TestHandle_Direct(t *testing.T) { + handled := false + + f := &testFunction{ + onHandle: func(_ context.Context, msg Message) error { + if string(msg.Value) != "test-value" { + t.Fatalf("unexpected message value: %s", msg.Value) + } + if string(msg.Key) != "test-key" { + t.Fatalf("unexpected message key: %s", msg.Key) + } + if msg.Topic != "test-topic" { + t.Fatalf("unexpected topic: %s", msg.Topic) + } + if len(msg.Headers) != 1 || msg.Headers[0].Key != "h1" { + t.Fatalf("unexpected headers: %v", msg.Headers) + } + handled = true + return nil + }, + } + + msg := Message{ + Key: []byte("test-key"), + Value: []byte("test-value"), + Topic: "test-topic", + Partition: 0, + Offset: 42, + Headers: []Header{ + {Key: "h1", Value: []byte("v1")}, + }, + } + + err := f.Handle(context.Background(), msg) + if err != nil { + t.Fatal(err) + } + if !handled { + t.Fatal("handler was not invoked") + } +} + +// TestDefaultHandler ensures the DefaultHandler wrapper works correctly +// for static function implementations. +func TestDefaultHandler(t *testing.T) { + handled := false + + staticFn := func(_ context.Context, msg Message) error { + handled = true + if string(msg.Value) != "hello" { + t.Fatalf("unexpected value: %s", msg.Value) + } + return nil + } + + dh := DefaultHandler{Handler: staticFn} + err := dh.Handle(context.Background(), Message{Value: []byte("hello")}) + if err != nil { + t.Fatal(err) + } + if !handled { + t.Fatal("static handler was not invoked") + } +} + +// TestHandle_Error ensures that a handler returning an error does not +// cause a panic or unexpected behavior. +func TestHandle_Error(t *testing.T) { + expectedErr := "processing failed" + f := &testFunction{ + onHandle: func(_ context.Context, _ Message) error { + return fmt.Errorf("%s", expectedErr) + }, + } + + err := f.Handle(context.Background(), Message{Value: []byte("bad")}) + if err == nil { + t.Fatal("expected error, got nil") + } + if err.Error() != expectedErr { + t.Fatalf("expected %q, got %q", expectedErr, err.Error()) + } +} + +// TestReady_CustomReporter ensures the readiness endpoint delegates to +// the function's Ready method when it implements ReadinessReporter. +func TestReady_CustomReporter(t *testing.T) { + f := &readyFunction{ready: true} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %v", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatal(err) + } + if string(body) != "READY" { + t.Fatalf("expected READY, got %q", string(body)) + } +} + +// TestReady_CustomReporterNotReady ensures the readiness endpoint returns +// 503 when the function's Ready method returns false. +func TestReady_CustomReporterNotReady(t *testing.T) { + f := &readyFunction{ready: false} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/readiness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %v", resp.StatusCode) + } +} + +// TestAlive_CustomReporter ensures the liveness endpoint delegates to +// the function's Alive method when it implements LivenessReporter. +func TestAlive_CustomReporter(t *testing.T) { + f := &aliveFunction{alive: true} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/liveness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("expected 200, got %v", resp.StatusCode) + } +} + +// TestAlive_CustomReporterNotAlive ensures the liveness endpoint returns +// 503 when the function's Alive method returns false. +func TestAlive_CustomReporterNotAlive(t *testing.T) { + f := &aliveFunction{alive: false} + service := New(f) + + ln, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + go service.Serve(ln) + defer service.Close() + + resp, err := http.Get("http://" + ln.Addr().String() + "/health/liveness") + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("expected 503, got %v", resp.StatusCode) + } +} + +// readyFunction implements Handler and ReadinessReporter for testing. +type readyFunction struct { + ready bool +} + +func (f *readyFunction) Handle(_ context.Context, _ Message) error { return nil } +func (f *readyFunction) Ready(_ context.Context) (bool, error) { return f.ready, nil } + +// aliveFunction implements Handler and LivenessReporter for testing. +type aliveFunction struct { + alive bool +} + +func (f *aliveFunction) Handle(_ context.Context, _ Message) error { return nil } +func (f *aliveFunction) Alive(_ context.Context) (bool, error) { return f.alive, nil } \ No newline at end of file