diff --git a/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go b/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go index 61e0bd2bc..201eeb4b2 100644 --- a/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go +++ b/apis/fluentbit/v1alpha2/plugins/output/kafka_types.go @@ -2,6 +2,7 @@ package output import ( "fmt" + "strings" "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins" "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params" @@ -12,8 +13,12 @@ import ( // Kafka output plugin allows to ingest your records into an Apache Kafka service.
// **For full documentation, refer to https://docs.fluentbit.io/manual/pipeline/outputs/kafka** type Kafka struct { - // Specify data format, options available: json, msgpack. + // Specify data format, options available: json, msgpack, raw. + // +kubebuilder:validation:Enum:=json;msgpack;raw Format string `json:"format,omitempty"` + // When using format: raw, the value of the record field specified by rawLogKey + // (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + RawLogKey string `json:"rawLogKey,omitempty"` // Optional key to store the message MessageKey string `json:"messageKey,omitempty"` // If set, the value of Message_Key_Field in the record will indicate the message key. @@ -60,6 +65,11 @@ func (k *Kafka) Params(_ plugins.SecretLoader) (*params.KVs, error) { if k.Format != "" { kvs.Insert("Format", k.Format) } + if k.RawLogKey != "" { + kvs.Insert("Raw_Log_Key", k.RawLogKey) + } else if strings.EqualFold(k.Format, "raw") { + return nil, fmt.Errorf("rawLogKey is required when format is raw") + } if k.MessageKey != "" { kvs.Insert("Message_Key", k.MessageKey) } diff --git a/apis/fluentbit/v1alpha2/plugins/output/kafka_types_test.go b/apis/fluentbit/v1alpha2/plugins/output/kafka_types_test.go new file mode 100644 index 000000000..bc60b81bd --- /dev/null +++ b/apis/fluentbit/v1alpha2/plugins/output/kafka_types_test.go @@ -0,0 +1,46 @@ +package output + +import ( + "testing" + + "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins" + "github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins/params" + . "github.com/onsi/gomega" +) + +func TestOutput_Kafka_Params(t *testing.T) { + g := NewWithT(t) + + sl := plugins.NewSecretLoader(nil, "test namespace") + + kafka := Kafka{ + Format: "raw", + RawLogKey: "message", + Brokers: "kafka:9092", + Topics: "logs", + } + + expected := params.NewKVs() + expected.Insert("Format", "raw") + expected.Insert("Raw_Log_Key", "message") + expected.Insert("Brokers", "kafka:9092") + expected.Insert("Topics", "logs") + + kvs, err := kafka.Params(sl) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(kvs).To(Equal(expected)) +} + +func TestOutput_Kafka_ParamsRequiresRawLogKeyForRawFormat(t *testing.T) { + g := NewWithT(t) + + sl := plugins.NewSecretLoader(nil, "test namespace") + + kafka := Kafka{ + Format: "RAW", + } + + kvs, err := kafka.Params(sl) + g.Expect(err).To(MatchError("rawLogKey is required when format is raw")) + g.Expect(kvs).To(BeNil()) +} diff --git a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml index 214a9c96b..abac964c1 100644 --- a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_clusteroutputs.yaml @@ -2258,7 +2258,12 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' + enum: + - json + - msgpack + - raw type: string messageKey: description: Optional key to store the message @@ -2277,6 +2282,11 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: |- + When using format: raw, the value of the record field specified by rawLogKey + (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml index 76088d713..f5a769c33 100644 --- a/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator-fluent-bit-crds/templates/fluentbit.fluent.io_outputs.yaml @@ -2258,7 +2258,12 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' + enum: + - json + - msgpack + - raw type: string messageKey: description: Optional key to store the message @@ -2277,6 +2282,11 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: |- + When using format: raw, the value of the record field specified by rawLogKey + (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml index 2abf993df..e338a9533 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_clusteroutputs.yaml @@ -2256,7 +2256,12 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' + enum: + - json + - msgpack + - raw type: string messageKey: description: Optional key to store the message @@ -2275,6 +2280,11 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: |- + When using format: raw, the value of the record field specified by rawLogKey + (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml b/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml index dcee8f101..15b167f82 100644 --- a/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml +++ b/charts/fluent-operator/crds/fluentbit.fluent.io_outputs.yaml @@ -2256,7 +2256,12 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' + enum: + - json + - msgpack + - raw type: string messageKey: description: Optional key to store the message @@ -2275,6 +2280,11 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: |- + When using format: raw, the value of the record field specified by rawLogKey + (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml index 8841ee8f2..5d01242a8 100644 --- a/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml @@ -2257,7 +2257,12 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' + enum: + - json + - msgpack + - raw type: string messageKey: description: Optional key to store the message @@ -2276,6 +2281,11 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: |- + When using format: raw, the value of the record field specified by rawLogKey + (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/config/crd/bases/fluentbit.fluent.io_outputs.yaml b/config/crd/bases/fluentbit.fluent.io_outputs.yaml index 34c4f25fa..6c67ff454 100644 --- a/config/crd/bases/fluentbit.fluent.io_outputs.yaml +++ b/config/crd/bases/fluentbit.fluent.io_outputs.yaml @@ -2257,7 +2257,12 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' + enum: + - json + - msgpack + - raw type: string messageKey: description: Optional key to store the message @@ -2276,6 +2281,11 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: |- + When using format: raw, the value of the record field specified by rawLogKey + (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string diff --git a/docs/plugins/fluentbit/output/kafka.md b/docs/plugins/fluentbit/output/kafka.md index 65ac6917a..c85a25f04 100644 --- a/docs/plugins/fluentbit/output/kafka.md +++ b/docs/plugins/fluentbit/output/kafka.md @@ -5,7 +5,8 @@ Kafka output plugin allows to ingest your records into an Apache Kafka service. | Field | Description | Scheme | | ----- | ----------- | ------ | -| format | Specify data format, options available: json, msgpack. | string | +| format | Specify data format, options available: json, msgpack, raw. | string | +| rawLogKey | When using format: raw, the value of the record field specified by rawLogKey (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. | string | | messageKey | Optional key to store the message | string | | messageKeyField | If set, the value of Message_Key_Field in the record will indicate the message key. If not set nor found in the record, Message_Key will be used (if set). | string | | timestampKey | Set the key to store the record timestamp | string | diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml index fa044f4a2..31d3eae62 100644 --- a/manifests/setup/setup.yaml +++ b/manifests/setup/setup.yaml @@ -6457,7 +6457,12 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' + enum: + - json + - msgpack + - raw type: string messageKey: description: Optional key to store the message @@ -6476,6 +6481,11 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: |- + When using format: raw, the value of the record field specified by rawLogKey + (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string @@ -37123,7 +37133,12 @@ spec: So in Topics only a default topic needs to be configured type: boolean format: - description: 'Specify data format, options available: json, msgpack.' + description: 'Specify data format, options available: json, msgpack, + raw.' + enum: + - json + - msgpack + - raw type: string messageKey: description: Optional key to store the message @@ -37142,6 +37157,11 @@ spec: Setting the queue_full_retries value to 0 set's an unlimited number of retries. format: int64 type: integer + rawLogKey: + description: |- + When using format: raw, the value of the record field specified by rawLogKey + (Fluent Bit option: Raw_Log_Key) is sent to Kafka as the payload. + type: string rdkafka: additionalProperties: type: string