Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21,749 changes: 11,155 additions & 10,594 deletions generate/zz_filesystem_generated.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/functions/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type Function struct {

// Invoke defines hints for use when invoking this function.
// See Client.Invoke for usage.
Invoke string `yaml:"invoke,omitempty" jsonschema:"enum=http,enum=cloudevent"`
Invoke string `yaml:"invoke,omitempty" jsonschema:"enum=http,enum=cloudevent,enum=kafka"`

// Build defines the build properties for a function
Build BuildSpec `yaml:"build,omitempty"`
Expand Down
10 changes: 8 additions & 2 deletions pkg/scaffolding/signatures.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ const (
UnknownSignature Signature = iota
InstancedHTTP
InstancedCloudevents
InstancedKafka
StaticHTTP
StaticCloudevents
StaticKafka
)

func (s Signature) String() string {
return []string{
"unknown",
"instanced-http",
"instanced-cloudevents",
"instanced-kafka",
"static-http",
"static-cloudevents",
"static-kafka",
}[s]
}

Expand All @@ -27,10 +31,12 @@ func (s Signature) String() string {
var signatureMap = map[bool]map[string]Signature{
true: {
"http": InstancedHTTP,
"cloudevent": InstancedCloudevents},
"cloudevent": InstancedCloudevents,
"kafka": InstancedKafka},
false: {
"http": StaticHTTP,
"cloudevent": StaticCloudevents},
"cloudevent": StaticCloudevents,
"kafka": StaticKafka},
}

// toSignature converts an instanced boolean and invocation hint into
Expand Down
2 changes: 2 additions & 0 deletions pkg/scaffolding/signatures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func TestSignatures(t *testing.T) {
{false, "", StaticHTTP, "static-http"},
{false, "http", StaticHTTP, "static-http"},
{false, "cloudevent", StaticCloudevents, "static-cloudevents"},
{true, "kafka", InstancedKafka, "instanced-kafka"},
{false, "kafka", StaticKafka, "static-kafka"},
{true, "invalid", UnknownSignature, "unknown"},
{false, "invalid", UnknownSignature, "unknown"},
}
Expand Down
32 changes: 32 additions & 0 deletions templates/go/.instanced-kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kafka Function Instance

Welcome to your new Go Function! The boilerplate function code can be found in
[`function.go`](function.go). This Function consumes messages from Kafka topics.

## How it works

Your `Handle` method is called once for each Kafka message. Return `nil` to
indicate successful processing — the message offset will be committed
automatically. Return an error to skip the message (the error is logged and
the consumer moves on to the next message).

## Delivery guarantees

Messages are delivered **at-least-once** and processed **in order per
partition**. If the consumer crashes after processing a message but before the
offset is committed, the message will be redelivered. There is no built-in
deduplication — if your function cannot safely process the same message twice,
you should implement idempotency in your `Handle` method (for example by
tracking previously seen message keys or offsets).

## Development

Develop new features by adding a test to [`function_test.go`](function_test.go) for
each feature, and confirm it works with `go test`.

Once your function is passing tests, deploy it using `func deploy`. The
`func` CLI also offers several other testing and development commands; see
`func --help` for more.

For more, see [the complete documentation]('https://github.com/knative/func/tree/main/docs')

85 changes: 85 additions & 0 deletions templates/go/.instanced-kafka/function.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// package function is an example of a Kafka Function implementation.
//
// This package name can be changed when using the "host" builder
// (as can the module name in go.mod)
package function

import (
"context"
"fmt"

"knative.dev/func-go/kafka"
)

// MyFunction is the function provided by this library.
// This structure name can be changed.
type MyFunction struct{}

// New constructs an instance of your function. It is called each time a new
// instance of the function service is created. This function must be named
// "New", accept no arguments, and return a structure which exports at least
// a Handle method (and optionally any of the additional methods described
// in the comments below).
func New() *MyFunction {
return &MyFunction{}
}

// Handle a Kafka message.
//
// Returning nil signals successful processing and the message offset is
// committed. Returning an error logs the error and the message is skipped
// (not retried).
func (f *MyFunction) Handle(ctx context.Context, msg kafka.Message) error {
fmt.Printf("Received message: topic=%s partition=%d offset=%d key=%s value=%s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
return nil
}

// Start is called whenever a function instance is started.
//
// Provided to this start method are all arguments and environment variables
// which apply to this function. For better function portability, testability
// and robustness, it is encouraged to use this method for accessing function
// configuration rather than looking for environment variables or flags.
// func (f *MyFunction) Start(ctx context.Context, args map[string]string) error {
// fmt.Println("Function Started")
// return nil
// }

// Stop is called whenever a function is stopped.
//
// This may happen for reasons such as being rescheduled onto a different node,
// being updated with a newer version, or if the number of function instances
// is being scaled down due to low load. This is a good place to cleanup and
// realease any resources which expect to be manually released.
//
// func (f *Function) Stop(ctx context.Context) error { return nil }

// Alive is an optional method which allows you to more deeply indicate that
// your function is alive. The default liveness implementation returns true
// if the function process is not deadlocked and able to respond. A custom
// implementation of this method may be useful when a function should not be
// considered alive if any dependent services are alive, or other more
// complex logic.
//
// func (f *Function) Alive(ctx context.Context) (bool, error) {
// return true, nil
// }

// Ready is an optional method which, when implemented, will ensure that
// requests are not made to the Function's request handler until this method
// reports true.
//
// func (f *Function) Ready(ctx context.Context) (bool, error) {
// return true, nil
// }

// Handle is an optional method which can be used to implement simple functions
// with little or no state, and minimal testing requirements. By implementing
// this package static function, one can forego the constructor and struct
// outlined above. Note that if this method is defined, the system will ignore
// the instanced function constructor if it is defined.
//
// func Handle(ctx context.Context, msg kafka.Message) error {
// /* Your Static Handler Code Here */
// }
23 changes: 23 additions & 0 deletions templates/go/.instanced-kafka/function_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package function

import (
"context"
"testing"

"knative.dev/func-go/kafka"
)

// TestHandle ensures that the constructor returns an object which handles
// a Kafka message without error.
func TestHandle(t *testing.T) {
msg := kafka.Message{
Key: []byte("test-key"),
Value: []byte("test-value"),
Topic: "test-topic",
}

err := New().Handle(context.Background(), msg)
if err != nil {
t.Fatal(err)
}
}
5 changes: 5 additions & 0 deletions templates/go/.instanced-kafka/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module function

go 1.21

require knative.dev/func-go v0.21.3
1 change: 1 addition & 0 deletions templates/go/.instanced-kafka/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
knative.dev/func-go v0.21.3/go.mod h1:YAUlPi4bY5OQb7n9424zm9GtRigOQ1/IBtqkLqC29Dw=
5 changes: 5 additions & 0 deletions templates/go/.instanced-kafka/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# optional. Invocation defines hints for how Functions created using this
# template can be invoked. These settings can be updated on the resultant
# Function as development progresses to ensure 'invoke' can always trigger the
# execution of a running Function instance for testing and development.
invoke: "kafka"
32 changes: 32 additions & 0 deletions templates/go/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Go Kafka Function

Welcome to your new Go Function! The boilerplate function code can be found in
[`handle.go`](handle.go). This Function consumes messages from Kafka topics.

## How it works

Your `Handle` function is called once for each Kafka message. Return `nil` to
indicate successful processing — the message offset will be committed
automatically. Return an error to skip the message (the error is logged and
the consumer moves on to the next message).

## Delivery guarantees

Messages are delivered **at-least-once** and processed **in order per
partition**. If the consumer crashes after processing a message but before the
offset is committed, the message will be redelivered. There is no built-in
deduplication — if your function cannot safely process the same message twice,
you should implement idempotency in your `Handle` function (for example by
tracking previously seen message keys or offsets).

## Development

Develop new features by adding a test to [`handle_test.go`](handle_test.go) for
each feature, and confirm it works with `go test`.

Update the running analog of the function using the `func` CLI or client
library. The function will consume messages from the Kafka topics configured
via the `KAFKA_TOPICS` environment variable.

For more, see [the complete documentation]('https://github.com/knative/func/tree/main/docs')

5 changes: 5 additions & 0 deletions templates/go/kafka/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module function

go 1.21

require knative.dev/func-go v0.21.3
1 change: 1 addition & 0 deletions templates/go/kafka/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
knative.dev/func-go v0.21.3/go.mod h1:YAUlPi4bY5OQb7n9424zm9GtRigOQ1/IBtqkLqC29Dw=
25 changes: 25 additions & 0 deletions templates/go/kafka/handle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package function

import (
"context"
"fmt"

"knative.dev/func-go/kafka"
)

// Handle a Kafka message.
//
// Returning nil signals successful processing and the message offset is
// committed. Returning an error logs the error and the message is skipped
// (not retried).
func Handle(ctx context.Context, msg kafka.Message) error {
/*
* YOUR CODE HERE
*
* Try running `go test`. Add more test as you code in `handle_test.go`.
*/

fmt.Printf("Received message: topic=%s partition=%d offset=%d key=%s value=%s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
return nil
}
22 changes: 22 additions & 0 deletions templates/go/kafka/handle_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package function

import (
"context"
"testing"

"knative.dev/func-go/kafka"
)

// TestHandle ensures that Handle accepts a valid Kafka message without error.
func TestHandle(t *testing.T) {
msg := kafka.Message{
Key: []byte("test-key"),
Value: []byte("test-value"),
Topic: "test-topic",
}

err := Handle(context.Background(), msg)
if err != nil {
t.Fatal(err)
}
}
5 changes: 5 additions & 0 deletions templates/go/kafka/manifest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# optional. Invocation defines hints for how Functions created using this
# template can be invoked. These settings can be updated on the resultant
# Function as development progresses to ensure 'invoke' can always trigger the
# execution of a running Function instance for testing and development.
invoke: "kafka"
1 change: 1 addition & 0 deletions templates/go/scaffolding/instanced-kafka/f
31 changes: 31 additions & 0 deletions templates/go/scaffolding/instanced-kafka/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module s

replace function => ./f

go 1.25.0

require (
function v0.0.0-00010101000000-000000000000
knative.dev/func-go v0.21.3
)

require (
github.com/IBM/sarama v1.50.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.18.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pierrec/lz4/v4 v4.1.27 // indirect
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
github.com/rs/zerolog v1.32.0 // indirect
golang.org/x/crypto v0.53.0 // indirect
golang.org/x/net v0.56.0 // indirect
golang.org/x/sys v0.46.0 // indirect
)
Loading