Skip to content

Commit da2ceaf

Browse files
committed
Kafka add securityProtocol
1 parent 1b5dc92 commit da2ceaf

5 files changed

Lines changed: 352 additions & 8 deletions

File tree

api/v1/dataflow_types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,12 @@ type KafkaSourceSpec struct {
173173
// +optional
174174
SASL *SASLConfig `json:"sasl,omitempty"`
175175

176+
// SecurityProtocol maps to Kafka client property security.protocol.
177+
// Supported: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
178+
// If empty, TLS/SASL enable flags are inferred from tls/sasl sections (legacy behavior).
179+
// +optional
180+
SecurityProtocol string `json:"securityProtocol,omitempty"`
181+
176182
// BrokersSecretRef references a Kubernetes secret for brokers (comma-separated)
177183
// +optional
178184
BrokersSecretRef *SecretRef `json:"brokersSecretRef,omitempty"`
@@ -705,6 +711,12 @@ type KafkaSinkSpec struct {
705711
// +optional
706712
SASL *SASLConfig `json:"sasl,omitempty"`
707713

714+
// SecurityProtocol maps to Kafka client property security.protocol.
715+
// Supported: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
716+
// If empty, TLS/SASL enable flags are inferred from tls/sasl sections (legacy behavior).
717+
// +optional
718+
SecurityProtocol string `json:"securityProtocol,omitempty"`
719+
708720
// BrokersSecretRef references a Kubernetes secret for brokers (comma-separated)
709721
// +optional
710722
BrokersSecretRef *SecretRef `json:"brokersSecretRef,omitempty"`

api/v1/dataflow_validation.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ func validateKafkaSource(k *KafkaSourceSpec, f *field.Path) field.ErrorList {
151151
if k.TopicSecretRef != nil {
152152
all = append(all, validateSecretRef(k.TopicSecretRef, f.Child("topicSecretRef"))...)
153153
}
154+
all = append(all, validateKafkaSecurityProtocol(k.SecurityProtocol, k.TLS, k.SASL, f.Child("securityProtocol"))...)
154155
all = append(all, validateKafkaConsumerTiming(k, f)...)
155156
return all
156157
}
@@ -460,9 +461,78 @@ func validateKafkaSink(k *KafkaSinkSpec, f *field.Path) field.ErrorList {
460461
if k.TopicSecretRef != nil {
461462
all = append(all, validateSecretRef(k.TopicSecretRef, f.Child("topicSecretRef"))...)
462463
}
464+
all = append(all, validateKafkaSecurityProtocol(k.SecurityProtocol, k.TLS, k.SASL, f.Child("securityProtocol"))...)
463465
return all
464466
}
465467

468+
func validateKafkaSecurityProtocol(protocol string, tls *TLSConfig, sasl *SASLConfig, f *field.Path) field.ErrorList {
469+
if protocol == "" {
470+
return nil
471+
}
472+
var all field.ErrorList
473+
normalized, err := normalizeKafkaSecurityProtocol(protocol)
474+
if err != nil {
475+
all = append(all, field.NotSupported(f, protocol, []string{"PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"}))
476+
return all
477+
}
478+
hasTLS := tls != nil
479+
hasSASL := kafkaSASLConfigured(sasl)
480+
481+
switch normalized {
482+
case "PLAINTEXT":
483+
if hasSASL {
484+
all = append(all, field.Forbidden(f, fmt.Sprintf("%s cannot be used with sasl configuration", protocol)))
485+
}
486+
if hasTLS {
487+
all = append(all, field.Forbidden(f, fmt.Sprintf("%s cannot be used with tls configuration", protocol)))
488+
}
489+
case "SSL":
490+
if hasSASL {
491+
all = append(all, field.Forbidden(f, fmt.Sprintf("%s cannot be used with sasl configuration", protocol)))
492+
}
493+
if !hasTLS {
494+
all = append(all, field.Required(f, "tls configuration is required for securityProtocol SSL"))
495+
}
496+
case "SASL_PLAINTEXT":
497+
if hasTLS {
498+
all = append(all, field.Forbidden(f, fmt.Sprintf("%s cannot be used with tls configuration", protocol)))
499+
}
500+
if !hasSASL {
501+
all = append(all, field.Required(f, "sasl configuration is required for securityProtocol SASL_PLAINTEXT"))
502+
}
503+
case "SASL_SSL":
504+
if !hasTLS {
505+
all = append(all, field.Required(f, "tls configuration is required for securityProtocol SASL_SSL"))
506+
}
507+
if !hasSASL {
508+
all = append(all, field.Required(f, "sasl configuration is required for securityProtocol SASL_SSL"))
509+
}
510+
}
511+
return all
512+
}
513+
514+
func normalizeKafkaSecurityProtocol(protocol string) (string, error) {
515+
if protocol == "" {
516+
return "", nil
517+
}
518+
normalized := strings.ToUpper(strings.ReplaceAll(strings.ReplaceAll(protocol, "-", "_"), " ", "_"))
519+
switch normalized {
520+
case "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL":
521+
return normalized, nil
522+
default:
523+
return "", fmt.Errorf("unsupported security protocol: %s", protocol)
524+
}
525+
}
526+
527+
func kafkaSASLConfigured(sasl *SASLConfig) bool {
528+
if sasl == nil {
529+
return false
530+
}
531+
hasUser := sasl.Username != "" || sasl.UsernameSecretRef != nil
532+
hasPass := sasl.Password != "" || sasl.PasswordSecretRef != nil
533+
return hasUser && hasPass
534+
}
535+
466536
func validatePostgreSQLSink(p *PostgreSQLSinkSpec, f *field.Path) field.ErrorList {
467537
var all field.ErrorList
468538
hasConn := p.ConnectionString != "" || p.ConnectionStringSecretRef != nil
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
Copyright 2024.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package v1
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/apimachinery/pkg/util/validation/field"
23+
)
24+
25+
func TestValidateKafkaSecurityProtocol(t *testing.T) {
26+
path := field.NewPath("spec").Child("source").Child("config").Child("securityProtocol")
27+
sasl := &SASLConfig{
28+
Mechanism: "scram-sha-256",
29+
Username: "user",
30+
Password: "pass",
31+
}
32+
tls := &TLSConfig{InsecureSkipVerify: true}
33+
34+
tests := []struct {
35+
name string
36+
protocol string
37+
tls *TLSConfig
38+
sasl *SASLConfig
39+
wantErr bool
40+
}{
41+
{"empty protocol", "", nil, sasl, false},
42+
{"SASL_PLAINTEXT valid", "SASL_PLAINTEXT", nil, sasl, false},
43+
{"sasl-plaintext normalized", "sasl-plaintext", nil, sasl, false},
44+
{"SASL_SSL valid", "SASL_SSL", tls, sasl, false},
45+
{"SSL valid", "SSL", tls, nil, false},
46+
{"PLAINTEXT valid", "PLAINTEXT", nil, nil, false},
47+
{"unknown protocol", "WSS", nil, nil, true},
48+
{"SASL_PLAINTEXT without sasl", "SASL_PLAINTEXT", nil, nil, true},
49+
{"SASL_PLAINTEXT with tls", "SASL_PLAINTEXT", tls, sasl, true},
50+
{"SASL_SSL without tls", "SASL_SSL", nil, sasl, true},
51+
{"SASL_SSL without sasl", "SASL_SSL", tls, nil, true},
52+
{"SSL without tls", "SSL", nil, nil, true},
53+
{"SSL with sasl", "SSL", tls, sasl, true},
54+
{"PLAINTEXT with sasl", "PLAINTEXT", nil, sasl, true},
55+
{"PLAINTEXT with tls", "PLAINTEXT", tls, nil, true},
56+
}
57+
58+
for _, tt := range tests {
59+
t.Run(tt.name, func(t *testing.T) {
60+
errs := validateKafkaSecurityProtocol(tt.protocol, tt.tls, tt.sasl, path)
61+
if tt.wantErr && len(errs) == 0 {
62+
t.Fatal("expected validation error")
63+
}
64+
if !tt.wantErr && len(errs) != 0 {
65+
t.Fatalf("expected no errors, got %v", errs)
66+
}
67+
})
68+
}
69+
}
70+
71+
func TestValidateKafkaSource_SecurityProtocol(t *testing.T) {
72+
path := field.NewPath("spec").Child("source").Child("config")
73+
spec := &KafkaSourceSpec{
74+
Brokers: []string{"broker:9092"},
75+
Topic: "t",
76+
SecurityProtocol: "SASL_PLAINTEXT",
77+
SASL: &SASLConfig{
78+
Mechanism: "scram-sha-256",
79+
Username: "user",
80+
Password: "pass",
81+
},
82+
}
83+
if errs := validateKafkaSource(spec, path); len(errs) != 0 {
84+
t.Fatalf("expected no errors, got %v", errs)
85+
}
86+
}
87+
88+
func TestValidateKafkaSink_SecurityProtocol(t *testing.T) {
89+
path := field.NewPath("spec").Child("sink").Child("config")
90+
spec := &KafkaSinkSpec{
91+
Brokers: []string{"broker:9092"},
92+
Topic: "t",
93+
SecurityProtocol: "SASL_PLAINTEXT",
94+
SASL: &SASLConfig{
95+
Mechanism: "scram-sha-256",
96+
Username: "user",
97+
Password: "pass",
98+
},
99+
}
100+
if errs := validateKafkaSink(spec, path); len(errs) != 0 {
101+
t.Fatalf("expected no errors, got %v", errs)
102+
}
103+
}

internal/connectors/kafka.go

Lines changed: 90 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,7 @@ func (k *KafkaSourceConnector) Connect(ctx context.Context) error {
127127
saramaConfig.Metadata.Full = true // Required for Yandex Cloud Kafka
128128
saramaConfig.ClientID = "dataflow-operator" // Required for SASL authentication
129129

130-
if err := applyKafkaTLS(k.config.TLS, saramaConfig, k.logger); err != nil {
131-
return err
132-
}
133-
if err := applyKafkaSASL(k.config.SASL, saramaConfig, k.logger); err != nil {
130+
if err := applyKafkaNetworkConfig(k.config.TLS, k.config.SASL, k.config.SecurityProtocol, saramaConfig, k.logger); err != nil {
134131
return err
135132
}
136133
if err := applyKafkaConsumerConfig(k.config, saramaConfig); err != nil {
@@ -955,10 +952,7 @@ func (k *KafkaSinkConnector) Connect(ctx context.Context) error {
955952
saramaConfig.Net.MaxOpenRequests = 1 // Required for idempotent producer ordering
956953
saramaConfig.ClientID = "dataflow-operator" // Required for SASL authentication
957954

958-
if err := applyKafkaTLS(k.config.TLS, saramaConfig, k.logger); err != nil {
959-
return err
960-
}
961-
if err := applyKafkaSASL(k.config.SASL, saramaConfig, k.logger); err != nil {
955+
if err := applyKafkaNetworkConfig(k.config.TLS, k.config.SASL, k.config.SecurityProtocol, saramaConfig, k.logger); err != nil {
962956
return err
963957
}
964958

@@ -1104,6 +1098,94 @@ func getRouteFromMessage(msg *types.Message) string {
11041098
return "default"
11051099
}
11061100

1101+
func applyKafkaNetworkConfig(tls *v1.TLSConfig, sasl *v1.SASLConfig, securityProtocol string, saramaConfig *sarama.Config, logger logr.Logger) error {
1102+
if err := applyKafkaTLS(tls, saramaConfig, logger); err != nil {
1103+
return err
1104+
}
1105+
if err := applyKafkaSASL(sasl, saramaConfig, logger); err != nil {
1106+
return err
1107+
}
1108+
return enforceKafkaSecurityProtocol(tls, sasl, securityProtocol, saramaConfig)
1109+
}
1110+
1111+
func normalizeKafkaSecurityProtocol(protocol string) (string, error) {
1112+
if protocol == "" {
1113+
return "", nil
1114+
}
1115+
normalized := strings.ToUpper(strings.ReplaceAll(strings.ReplaceAll(protocol, "-", "_"), " ", "_"))
1116+
switch normalized {
1117+
case "PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL":
1118+
return normalized, nil
1119+
default:
1120+
return "", fmt.Errorf("unsupported security protocol: %s (supported: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL)", protocol)
1121+
}
1122+
}
1123+
1124+
func kafkaHasTLSConfigured(tls *v1.TLSConfig) bool {
1125+
return tls != nil
1126+
}
1127+
1128+
func kafkaHasSASLConfigured(sasl *v1.SASLConfig) bool {
1129+
if sasl == nil {
1130+
return false
1131+
}
1132+
hasUser := sasl.Username != "" || sasl.UsernameSecretRef != nil
1133+
hasPass := sasl.Password != "" || sasl.PasswordSecretRef != nil
1134+
return hasUser && hasPass
1135+
}
1136+
1137+
func enforceKafkaSecurityProtocol(tls *v1.TLSConfig, sasl *v1.SASLConfig, securityProtocol string, saramaConfig *sarama.Config) error {
1138+
if securityProtocol == "" {
1139+
return nil
1140+
}
1141+
protocol, err := normalizeKafkaSecurityProtocol(securityProtocol)
1142+
if err != nil {
1143+
return err
1144+
}
1145+
hasTLS := kafkaHasTLSConfigured(tls)
1146+
hasSASL := kafkaHasSASLConfigured(sasl)
1147+
1148+
switch protocol {
1149+
case "PLAINTEXT":
1150+
if hasSASL {
1151+
return fmt.Errorf("securityProtocol PLAINTEXT cannot be used with sasl configuration")
1152+
}
1153+
if hasTLS {
1154+
return fmt.Errorf("securityProtocol PLAINTEXT cannot be used with tls configuration")
1155+
}
1156+
saramaConfig.Net.TLS.Enable = false
1157+
saramaConfig.Net.SASL.Enable = false
1158+
case "SSL":
1159+
if hasSASL {
1160+
return fmt.Errorf("securityProtocol SSL cannot be used with sasl configuration")
1161+
}
1162+
if !hasTLS {
1163+
return fmt.Errorf("securityProtocol SSL requires tls configuration")
1164+
}
1165+
saramaConfig.Net.TLS.Enable = true
1166+
saramaConfig.Net.SASL.Enable = false
1167+
case "SASL_PLAINTEXT":
1168+
if hasTLS {
1169+
return fmt.Errorf("securityProtocol SASL_PLAINTEXT cannot be used with tls configuration")
1170+
}
1171+
if !hasSASL {
1172+
return fmt.Errorf("securityProtocol SASL_PLAINTEXT requires sasl configuration")
1173+
}
1174+
saramaConfig.Net.TLS.Enable = false
1175+
saramaConfig.Net.SASL.Enable = true
1176+
case "SASL_SSL":
1177+
if !hasTLS {
1178+
return fmt.Errorf("securityProtocol SASL_SSL requires tls configuration")
1179+
}
1180+
if !hasSASL {
1181+
return fmt.Errorf("securityProtocol SASL_SSL requires sasl configuration")
1182+
}
1183+
saramaConfig.Net.TLS.Enable = true
1184+
saramaConfig.Net.SASL.Enable = true
1185+
}
1186+
return nil
1187+
}
1188+
11071189
// applyKafkaTLS configures TLS on sarama config from TLSConfig.
11081190
func applyKafkaTLS(tlsConfig *v1.TLSConfig, saramaConfig *sarama.Config, logger logr.Logger) error {
11091191
if tlsConfig == nil {

0 commit comments

Comments
 (0)