Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion apis/fluentbit/v1alpha2/plugins/output/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package output

import (
"fmt"
"strings"

"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params"
Expand All @@ -12,8 +13,12 @@ import (
// Kafka output plugin allows to ingest your records into an Apache Kafka service. <br />
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/outputs/kafka**
type Kafka struct {
// Specify data format, options available: json, msgpack.
// Specify data format, options available: json, msgpack, raw.
// +kubebuilder:validation:Enum:=json;msgpack;raw
Format string `json:"format,omitempty"`
// When using format: raw, the value of the record field specified by rawLogKey
// (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
RawLogKey string `json:"rawLogKey,omitempty"`
// Optional key to store the message
MessageKey string `json:"messageKey,omitempty"`
// If set, the value of Message_Key_Field in the record will indicate the message key.
Expand Down Expand Up @@ -60,6 +65,11 @@ func (k *Kafka) Params(_ plugins.SecretLoader) (*params.KVs, error) {
if k.Format != "" {
kvs.Insert("Format", k.Format)
}
if k.RawLogKey != "" {
kvs.Insert("Raw_Log_Key", k.RawLogKey)
} else if strings.EqualFold(k.Format, "raw") {
return nil, fmt.Errorf("rawLogKey is required when format is raw")
}
Comment thread
Vaibhav-C-S marked this conversation as resolved.
Comment thread
Vaibhav-C-S marked this conversation as resolved.
if k.MessageKey != "" {
kvs.Insert("Message_Key", k.MessageKey)
}
Expand Down
46 changes: 46 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/kafka_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package output

import (
"testing"

"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params"
. "github.com/onsi/gomega"
)

func TestOutput_Kafka_Params(t *testing.T) {
g := NewWithT(t)

sl := plugins.NewSecretLoader(nil, "test namespace")

kafka := Kafka{
Format: "raw",
RawLogKey: "message",
Brokers: "kafka:9092",
Topics: "logs",
}

expected := params.NewKVs()
expected.Insert("Format", "raw")
expected.Insert("Raw_Log_Key", "message")
expected.Insert("Brokers", "kafka:9092")
expected.Insert("Topics", "logs")

kvs, err := kafka.Params(sl)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(kvs).To(Equal(expected))
}

func TestOutput_Kafka_ParamsRequiresRawLogKeyForRawFormat(t *testing.T) {
g := NewWithT(t)

sl := plugins.NewSecretLoader(nil, "test namespace")

kafka := Kafka{
Format: "RAW",
}

kvs, err := kafka.Params(sl)
g.Expect(err).To(MatchError("rawLogKey is required when format is raw"))
g.Expect(kvs).To(BeNil())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,12 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
enum:
- json
- msgpack
- raw
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2277,6 +2282,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2258,7 +2258,12 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
enum:
- json
- msgpack
- raw
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2277,6 +2282,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,12 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
enum:
- json
- msgpack
- raw
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2275,6 +2280,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
12 changes: 11 additions & 1 deletion charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,12 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
enum:
- json
- msgpack
- raw
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2275,6 +2280,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
12 changes: 11 additions & 1 deletion config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2257,7 +2257,12 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
enum:
- json
- msgpack
- raw
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2276,6 +2281,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
12 changes: 11 additions & 1 deletion config/crd/bases/fluentbit.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2257,7 +2257,12 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
enum:
- json
- msgpack
- raw
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -2276,6 +2281,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down
3 changes: 2 additions & 1 deletion docs/plugins/fluentbit/output/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ Kafka output plugin allows to ingest your records into an Apache Kafka service.

| Field | Description | Scheme |
| ----- | ----------- | ------ |
| format | Specify data format, options available: json, msgpack. | string |
| format | Specify data format, options available: json, msgpack, raw. | string |
| rawLogKey | When using format: raw, the value of the record field specified by rawLogKey (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. | string |
| messageKey | Optional key to store the message | string |
| messageKeyField | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | string |
| timestampKey | Set the key to store the record timestamp | string |
Expand Down
24 changes: 22 additions & 2 deletions manifests/setup/setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6457,7 +6457,12 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
enum:
- json
- msgpack
- raw
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -6476,6 +6481,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down Expand Up @@ -37123,7 +37133,12 @@ spec:
So in Topics only a default topic needs to be configured
type: boolean
format:
description: 'Specify data format, options available: json, msgpack.'
description: 'Specify data format, options available: json, msgpack,
raw.'
enum:
- json
- msgpack
- raw
type: string
messageKey:
description: Optional key to store the message
Expand All @@ -37142,6 +37157,11 @@ spec:
Setting the queue_full_retries value to 0 set's an unlimited number of retries.
format: int64
type: integer
rawLogKey:
description: |-
When using format: raw, the value of the record field specified by rawLogKey
(Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload.
type: string
rdkafka:
additionalProperties:
type: string
Expand Down