Skip to content

Commit b0fe0ef

Browse files
committed
Remove leaderElection config
1 parent 1d44585 commit b0fe0ef

5 files changed

Lines changed: 128 additions & 30 deletions

File tree

helm/dataflow-operator/templates/deployment.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ spec:
5555
args:
5656
- --metrics-bind-address=:{{ .Values.metrics.port }}
5757
- --health-probe-bind-address=:{{ .Values.health.probePort }}
58-
{{- if .Values.leaderElection.enabled }}
59-
- --leader-elect
60-
{{- end }}
6158
ports:
6259
- name: metrics
6360
containerPort: {{ .Values.metrics.port }}

helm/dataflow-operator/values.yaml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,6 @@ rbac:
5656
# Specifies whether RBAC resources should be created
5757
create: true
5858

59-
# Leader election configuration
60-
leaderElection:
61-
enabled: false
62-
# LeaderElectionID is the name of the resource that leader election
63-
# will use for holding the leader lock.
64-
leaderElectionID: "dataflow-operator.dataflow.io"
65-
6659
# Metrics configuration
6760
metrics:
6861
enabled: true

internal/transformers/router.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,31 +18,39 @@ package transformers
1818

1919
import (
2020
"context"
21-
"fmt"
2221
"strings"
2322

2423
v1 "github.com/dataflow-operator/dataflow/api/v1"
2524
"github.com/dataflow-operator/dataflow/internal/types"
25+
"github.com/go-logr/logr"
2626
"github.com/tidwall/gjson"
2727
)
2828

2929
// RouterTransformer routes messages to different sinks based on conditions
3030
type RouterTransformer struct {
3131
config *v1.RouterTransformation
32+
logger logr.Logger
3233
}
3334

3435
// NewRouterTransformer creates a new router transformer
3536
func NewRouterTransformer(config *v1.RouterTransformation) *RouterTransformer {
3637
return &RouterTransformer{
3738
config: config,
39+
logger: logr.Discard(),
3840
}
3941
}
4042

43+
// SetLogger sets the logger for the transformer (used by processor to inject logr)
44+
func (r *RouterTransformer) SetLogger(logger logr.Logger) {
45+
r.logger = logger
46+
}
47+
4148
// Transform routes messages based on conditions
4249
// Returns messages with routing metadata
4350
func (r *RouterTransformer) Transform(ctx context.Context, message *types.Message) ([]*types.Message, error) {
44-
fmt.Printf("DEBUG Router: Processing message, routes count: %d\n", len(r.config.Routes))
45-
fmt.Printf("DEBUG Router: Message data: %s\n", string(message.Data))
51+
r.logger.V(1).Info("Router processing message",
52+
"routesCount", len(r.config.Routes),
53+
"dataSize", len(message.Data))
4654

4755
for i, route := range r.config.Routes {
4856
// Check if condition contains comparison operator (==)
@@ -51,19 +59,24 @@ func (r *RouterTransformer) Transform(ctx context.Context, message *types.Messag
5159
var expectedValue string
5260
var isComparison bool
5361

54-
fmt.Printf("DEBUG Router: Checking route %d, condition: '%s'\n", i, condition)
62+
r.logger.V(1).Info("Router checking route",
63+
"routeIndex", i,
64+
"condition", condition)
5565

5666
// Parse condition like "$.type == 'order'" or "$.type"
5767
if idx := findComparisonOperator(condition); idx >= 0 {
5868
// Trim spaces from field path
5969
fieldPath = strings.TrimSpace(condition[:idx])
6070
expectedValue = extractStringValue(condition[idx:])
6171
isComparison = true
62-
fmt.Printf("DEBUG Router: Parsed comparison - fieldPath: '%s', expectedValue: '%s'\n", fieldPath, expectedValue)
72+
r.logger.V(1).Info("Router parsed comparison",
73+
"fieldPath", fieldPath,
74+
"expectedValue", expectedValue)
6375
} else {
6476
fieldPath = condition
6577
isComparison = false
66-
fmt.Printf("DEBUG Router: No comparison operator found, using fieldPath: '%s'\n", fieldPath)
78+
r.logger.V(1).Info("Router using field path (no comparison)",
79+
"fieldPath", fieldPath)
6780
}
6881

6982
// Remove $. prefix if present (gjson doesn't need it for root fields)
@@ -77,19 +90,25 @@ func (r *RouterTransformer) Transform(ctx context.Context, message *types.Messag
7790
result := gjson.GetBytes(message.Data, fieldPath)
7891

7992
if !result.Exists() {
80-
fmt.Printf("DEBUG Router: Field '%s' does not exist in message\n", fieldPath)
93+
r.logger.V(1).Info("Router field does not exist",
94+
"fieldPath", fieldPath)
8195
continue
8296
}
8397

84-
fmt.Printf("DEBUG Router: Field '%s' exists, value: '%s' (raw: %v)\n", fieldPath, result.String(), result.Value())
98+
r.logger.V(1).Info("Router field exists",
99+
"fieldPath", fieldPath,
100+
"value", result.String())
85101

86102
// Check if condition is true
87103
var isTrue bool
88104
if isComparison {
89105
// For comparison, check if value matches expected
90106
value := result.String()
91107
isTrue = value == expectedValue
92-
fmt.Printf("DEBUG Router: Comparison result: value='%s' == expected='%s' = %v\n", value, expectedValue, isTrue)
108+
r.logger.V(1).Info("Router comparison result",
109+
"value", value,
110+
"expectedValue", expectedValue,
111+
"match", isTrue)
93112
} else {
94113
// For simple existence check, use truthiness
95114
value := result.Value()
@@ -116,14 +135,16 @@ func (r *RouterTransformer) Transform(ctx context.Context, message *types.Messag
116135
}
117136
newMsg.Metadata["routed_condition"] = route.Condition
118137
newMsg.Timestamp = message.Timestamp
119-
// Debug: log routing decision
120-
fmt.Printf("DEBUG Router: Message routed to condition '%s', value='%s', expected='%s'\n",
121-
route.Condition, result.String(), expectedValue)
138+
r.logger.V(1).Info("Router message routed",
139+
"condition", route.Condition,
140+
"value", result.String(),
141+
"expectedValue", expectedValue)
122142
return []*types.Message{newMsg}, nil
123143
} else if isComparison {
124-
// Debug: log why condition didn't match
125-
fmt.Printf("DEBUG Router: Condition '%s' didn't match: value='%s', expected='%s'\n",
126-
route.Condition, result.String(), expectedValue)
144+
r.logger.V(1).Info("Router condition did not match",
145+
"condition", route.Condition,
146+
"value", result.String(),
147+
"expectedValue", expectedValue)
127148
}
128149
}
129150

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
Copyright 2024.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package transformers
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"testing"
23+
24+
v1 "github.com/dataflow-operator/dataflow/api/v1"
25+
"github.com/dataflow-operator/dataflow/internal/types"
26+
"github.com/go-logr/logr"
27+
"github.com/stretchr/testify/assert"
28+
"github.com/stretchr/testify/require"
29+
)
30+
31+
// Ensure RouterTransformer implements SetLogger (used by processor).
32+
var _ interface{ SetLogger(logr.Logger) } = (*RouterTransformer)(nil)
33+
34+
func TestRouterTransformer_SetLogger_DoesNotPanic(t *testing.T) {
35+
transformer := NewRouterTransformer(&v1.RouterTransformation{
36+
Routes: []v1.RouteRule{
37+
{Condition: "$.type == 'order'", Sink: v1.SinkSpec{Type: "kafka"}},
38+
},
39+
})
40+
transformer.SetLogger(logr.Discard())
41+
42+
msg := types.NewMessage([]byte(`{"type":"order"}`))
43+
result, err := transformer.Transform(context.Background(), msg)
44+
require.NoError(t, err)
45+
require.Len(t, result, 1)
46+
assert.Contains(t, result[0].Metadata, "routed_condition")
47+
assert.Equal(t, "$.type == 'order'", result[0].Metadata["routed_condition"])
48+
}
49+
50+
func TestRouterTransformer_Transform_WithLogger_SameBehavior(t *testing.T) {
51+
transformer := NewRouterTransformer(&v1.RouterTransformation{
52+
Routes: []v1.RouteRule{
53+
{Condition: "type == 'user'", Sink: v1.SinkSpec{Type: "kafka"}},
54+
{Condition: "type == 'order'", Sink: v1.SinkSpec{Type: "kafka"}},
55+
},
56+
})
57+
transformer.SetLogger(logr.Discard())
58+
ctx := context.Background()
59+
60+
t.Run("match first route", func(t *testing.T) {
61+
data, _ := json.Marshal(map[string]interface{}{"id": 1, "type": "user", "name": "alice"})
62+
msg := types.NewMessage(data)
63+
result, err := transformer.Transform(ctx, msg)
64+
require.NoError(t, err)
65+
require.Len(t, result, 1)
66+
assert.Equal(t, "type == 'user'", result[0].Metadata["routed_condition"])
67+
assert.Equal(t, data, result[0].Data)
68+
})
69+
70+
t.Run("match second route", func(t *testing.T) {
71+
data, _ := json.Marshal(map[string]interface{}{"id": 2, "type": "order", "amount": 100})
72+
msg := types.NewMessage(data)
73+
result, err := transformer.Transform(ctx, msg)
74+
require.NoError(t, err)
75+
require.Len(t, result, 1)
76+
assert.Equal(t, "type == 'order'", result[0].Metadata["routed_condition"])
77+
assert.Equal(t, data, result[0].Data)
78+
})
79+
80+
t.Run("no match returns original message", func(t *testing.T) {
81+
data, _ := json.Marshal(map[string]interface{}{"id": 3, "type": "unknown"})
82+
msg := types.NewMessage(data)
83+
msg.Metadata["custom"] = "value"
84+
result, err := transformer.Transform(ctx, msg)
85+
require.NoError(t, err)
86+
require.Len(t, result, 1)
87+
assert.NotContains(t, result[0].Metadata, "routed_condition")
88+
assert.Equal(t, data, result[0].Data)
89+
assert.Equal(t, "value", result[0].Metadata["custom"])
90+
})
91+
}

main.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,10 @@ func levelFromEnvOrOptions(envLevel string, optsLevel zapcore.LevelEnabler) zapc
7171

7272
func main() {
7373
var metricsAddr string
74-
var enableLeaderElection bool
7574
var probeAddr string
7675
var logFile string
7776
flag.StringVar(&metricsAddr, "metrics-bind-address", ":9090", "The address the metric endpoint binds to.")
7877
flag.StringVar(&probeAddr, "health-probe-bind-address", ":9091", "The address the probe endpoint binds to.")
79-
flag.BoolVar(&enableLeaderElection, "leader-elect", false,
80-
"Enable leader election for controller manager. "+
81-
"Enabling this will ensure there is only one active controller manager.")
8278
flag.StringVar(&logFile, "log-file", "", "Path to log file. If empty, logs will be written to stdout.")
8379
opts := zaprctrl.Options{
8480
Development: true,
@@ -143,7 +139,7 @@ func main() {
143139
Port: 9443,
144140
}),
145141
HealthProbeBindAddress: probeAddr,
146-
LeaderElection: enableLeaderElection,
142+
LeaderElection: true, // Always HA-ready: only one active controller across replicas
147143
LeaderElectionID: "dataflow-operator.dataflow.io",
148144
})
149145
if err != nil {

0 commit comments

Comments
 (0)