Skip to content

Commit ef35ebd

Browse files
committed
Add debeziumUnwrap transformers
1 parent 117bafb commit ef35ebd

13 files changed

Lines changed: 544 additions & 12 deletions

api/v1/dataflow_types.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ type SASLConfig struct {
808808
// TransformationSpec defines a transformation to apply (type + config).
809809
// +kubebuilder:pruning:PreserveUnknownFields
810810
type TransformationSpec struct {
811-
// Type of transformation: timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase
811+
// Type of transformation: timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase, debeziumUnwrap
812812
Type string `json:"type"`
813813

814814
// Config holds transformation configuration. Structure depends on type.
@@ -862,6 +862,11 @@ func (t *TransformationSpec) GetCamelCaseConfig() (*CamelCaseTransformation, err
862862
return getTypedConfig[CamelCaseTransformation](t.Config)
863863
}
864864

865+
// GetDebeziumUnwrapConfig returns DebeziumUnwrap transformation config.
866+
func (t *TransformationSpec) GetDebeziumUnwrapConfig() (*DebeziumUnwrapTransformation, error) {
867+
return getTypedConfig[DebeziumUnwrapTransformation](t.Config)
868+
}
869+
865870
// TimestampTransformation adds a timestamp field
866871
type TimestampTransformation struct {
867872
// FieldName is the name of the timestamp field (default: created_at)
@@ -940,6 +945,21 @@ type CamelCaseTransformation struct {
940945
Deep bool `json:"deep,omitempty"`
941946
}
942947

948+
// DebeziumUnwrapTransformation unwraps Debezium envelope messages into row payloads.
949+
type DebeziumUnwrapTransformation struct {
950+
// InferDeleteFromTombstone converts Kafka tombstone records into operation=delete messages using metadata.key JSON.
951+
// +optional
952+
InferDeleteFromTombstone bool `json:"inferDeleteFromTombstone,omitempty"`
953+
954+
// IncludeSourceInMetadata copies payload.source fields into metadata with source_ prefix.
955+
// +optional
956+
IncludeSourceInMetadata bool `json:"includeSourceInMetadata,omitempty"`
957+
958+
// SnapshotOperation defines operation for Debezium snapshot records (op="r"): insert (default) or update.
959+
// +optional
960+
SnapshotOperation string `json:"snapshotOperation,omitempty"`
961+
}
962+
943963
// DataFlowStatus defines the observed state of DataFlow
944964
type DataFlowStatus struct {
945965
// Phase represents the current phase of the data flow

api/v1/dataflow_validation.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,17 @@ func validateTransformations(transformations []TransformationSpec, f *field.Path
540540
} else {
541541
all = append(all, field.Required(idx.Child("config"), "camelCase transformation configuration is required"))
542542
}
543+
case transformtypes.DebeziumUnwrap:
544+
if hasConfig {
545+
var cfg DebeziumUnwrapTransformation
546+
if err := json.Unmarshal(t.Config.Raw, &cfg); err != nil {
547+
all = append(all, field.Invalid(idx.Child("config"), string(t.Config.Raw), "invalid debeziumUnwrap config: "+err.Error()))
548+
} else if cfg.SnapshotOperation != "" && cfg.SnapshotOperation != "insert" && cfg.SnapshotOperation != "update" {
549+
all = append(all, field.NotSupported(idx.Child("config", "snapshotOperation"), cfg.SnapshotOperation, []string{"insert", "update"}))
550+
}
551+
} else {
552+
all = append(all, field.Required(idx.Child("config"), "debeziumUnwrap transformation configuration is required"))
553+
}
543554
}
544555
}
545556
return all
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
Copyright 2024.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package v1
18+
19+
import (
20+
"encoding/json"
21+
"strings"
22+
"testing"
23+
24+
"k8s.io/apimachinery/pkg/runtime"
25+
)
26+
27+
func mustRawConfigForValidation(v interface{}) *runtime.RawExtension {
28+
b, _ := json.Marshal(v)
29+
return &runtime.RawExtension{Raw: b}
30+
}
31+
32+
func TestValidateTransformations_DebeziumUnwrap(t *testing.T) {
33+
baseSpec := DataFlowSpec{
34+
Source: SourceSpec{
35+
Type: "kafka",
36+
Config: mustRawConfigForValidation(KafkaSourceSpec{Brokers: []string{"broker:9092"}, Topic: "src"}),
37+
},
38+
Sink: SinkSpec{
39+
Type: "kafka",
40+
Config: mustRawConfigForValidation(KafkaSinkSpec{Brokers: []string{"broker:9092"}, Topic: "dst"}),
41+
},
42+
}
43+
44+
t.Run("valid config", func(t *testing.T) {
45+
spec := baseSpec
46+
spec.Transformations = []TransformationSpec{{
47+
Type: "debeziumUnwrap",
48+
Config: mustRawConfigForValidation(DebeziumUnwrapTransformation{
49+
InferDeleteFromTombstone: true,
50+
IncludeSourceInMetadata: true,
51+
SnapshotOperation: "update",
52+
}),
53+
}}
54+
errs := ValidateDataFlowSpec(&spec)
55+
if len(errs) != 0 {
56+
t.Fatalf("expected no validation errors, got %v", errs)
57+
}
58+
})
59+
60+
t.Run("missing config", func(t *testing.T) {
61+
spec := baseSpec
62+
spec.Transformations = []TransformationSpec{{Type: "debeziumUnwrap"}}
63+
errs := ValidateDataFlowSpec(&spec)
64+
if len(errs) == 0 {
65+
t.Fatal("expected validation error for missing config")
66+
}
67+
if !strings.Contains(errs.ToAggregate().Error(), "debeziumUnwrap transformation configuration is required") {
68+
t.Fatalf("unexpected error: %v", errs.ToAggregate())
69+
}
70+
})
71+
72+
t.Run("invalid snapshot operation", func(t *testing.T) {
73+
spec := baseSpec
74+
spec.Transformations = []TransformationSpec{{
75+
Type: "debeziumUnwrap",
76+
Config: mustRawConfigForValidation(DebeziumUnwrapTransformation{
77+
SnapshotOperation: "noop",
78+
}),
79+
}}
80+
errs := ValidateDataFlowSpec(&spec)
81+
if len(errs) == 0 {
82+
t.Fatal("expected validation error for snapshotOperation")
83+
}
84+
if !strings.Contains(errs.ToAggregate().Error(), "Unsupported value") {
85+
t.Fatalf("unexpected error: %v", errs.ToAggregate())
86+
}
87+
})
88+
}

api/v1/dataflow_webhook_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/json"
2222
"testing"
2323

24+
"github.com/dataflow-operator/dataflow/pkg/transformtypes"
2425
"k8s.io/apimachinery/pkg/runtime"
2526
)
2627

@@ -98,3 +99,29 @@ func TestDataFlow_ValidateDelete(t *testing.T) {
9899
t.Errorf("ValidateDelete: unexpected warnings: %v", warnings)
99100
}
100101
}
102+
103+
func TestDataFlow_ValidateCreate_DebeziumUnwrapInvalidSnapshotOperation(t *testing.T) {
104+
df := &DataFlow{}
105+
df.Spec = DataFlowSpec{
106+
Source: SourceSpec{
107+
Type: "kafka",
108+
Config: mustConfig(KafkaSourceSpec{Brokers: []string{"b"}, Topic: "t"}),
109+
},
110+
Sink: SinkSpec{
111+
Type: "kafka",
112+
Config: mustConfig(KafkaSinkSpec{Brokers: []string{"b"}, Topic: "out"}),
113+
},
114+
Transformations: []TransformationSpec{
115+
{
116+
Type: transformtypes.DebeziumUnwrap,
117+
Config: mustConfig(DebeziumUnwrapTransformation{
118+
SnapshotOperation: "delete",
119+
}),
120+
},
121+
},
122+
}
123+
_, err := df.ValidateCreate(context.Background(), df)
124+
if err == nil {
125+
t.Fatal("ValidateCreate: expected validation error for invalid snapshotOperation")
126+
}
127+
}

api/v1/zz_generated.deepcopy.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/dataflow.dataflow.io_dataflowcrons.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1181,7 +1181,7 @@ spec:
11811181
x-kubernetes-preserve-unknown-fields: true
11821182
type:
11831183
description: 'Type of transformation: timestamp, flatten, filter,
1184-
mask, router, select, remove, snakeCase, camelCase'
1184+
mask, router, select, remove, snakeCase, camelCase, debeziumUnwrap'
11851185
type: string
11861186
required:
11871187
- type

config/crd/bases/dataflow.dataflow.io_dataflows.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1166,7 +1166,7 @@ spec:
11661166
x-kubernetes-preserve-unknown-fields: true
11671167
type:
11681168
description: 'Type of transformation: timestamp, flatten, filter,
1169-
mask, router, select, remove, snakeCase, camelCase'
1169+
mask, router, select, remove, snakeCase, camelCase, debeziumUnwrap'
11701170
type: string
11711171
required:
11721172
- type
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
apiVersion: dataflow.dataflow.io/v1
2+
kind: DataFlow
3+
metadata:
4+
name: kafka-debezium-to-postgres
5+
spec:
6+
source:
7+
type: kafka
8+
config:
9+
brokers:
10+
- kafka:9092
11+
topic: dbserver1.inventory.customers
12+
consumerGroup: dataflow-debezium-group
13+
transformations:
14+
- type: debeziumUnwrap
15+
config:
16+
inferDeleteFromTombstone: true
17+
includeSourceInMetadata: true
18+
snapshotOperation: insert
19+
sink:
20+
type: postgresql
21+
config:
22+
connectionString: "postgres://dataflow:dataflow@postgres:5432/dataflow?sslmode=disable"
23+
table: customers
24+
upsertMode: true
25+
conflictKey: id
26+
softDeleteColumn: deleted_at

0 commit comments

Comments
 (0)