Skip to content

Commit d809252

Browse files
committed
Fix create config
1 parent 88ea342 commit d809252

2 files changed

Lines changed: 47 additions & 0 deletions

File tree

config/crd/bases/dataflow.dataflow.io_dataflows.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1171,6 +1171,7 @@ spec:
11711171
- sink
11721172
- source
11731173
type: object
1174+
x-kubernetes-preserve-unknown-fields: true
11741175
status:
11751176
description: DataFlowStatus defines the observed state of DataFlow
11761177
properties:

internal/controller/dataflow_controller.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,47 @@ func genReconcileID() string {
227227
return hex.EncodeToString(b)
228228
}
229229

230+
const lastAppliedAnnotation = "kubectl.kubernetes.io/last-applied-configuration"
231+
232+
// restoreSpecFromLastApplied restores source/sink config from last-applied-configuration
233+
// when the API stripped it (e.g. due to CRD schema pruning). Modifies dataflow in place.
234+
func restoreSpecFromLastApplied(dataflow *dataflowv1.DataFlow) error {
235+
raw, ok := dataflow.Annotations[lastAppliedAnnotation]
236+
if !ok || raw == "" {
237+
return nil
238+
}
239+
var applied struct {
240+
Spec struct {
241+
Source struct {
242+
Config *runtime.RawExtension `json:"config,omitempty"`
243+
} `json:"source"`
244+
Sink struct {
245+
Config *runtime.RawExtension `json:"config,omitempty"`
246+
} `json:"sink"`
247+
Errors *struct {
248+
Config *runtime.RawExtension `json:"config,omitempty"`
249+
} `json:"errors,omitempty"`
250+
} `json:"spec"`
251+
}
252+
if err := json.Unmarshal([]byte(raw), &applied); err != nil {
253+
return err
254+
}
255+
if (dataflow.Spec.Source.Config == nil || len(dataflow.Spec.Source.Config.Raw) == 0) &&
256+
applied.Spec.Source.Config != nil && len(applied.Spec.Source.Config.Raw) > 0 {
257+
dataflow.Spec.Source.Config = applied.Spec.Source.Config
258+
}
259+
if (dataflow.Spec.Sink.Config == nil || len(dataflow.Spec.Sink.Config.Raw) == 0) &&
260+
applied.Spec.Sink.Config != nil && len(applied.Spec.Sink.Config.Raw) > 0 {
261+
dataflow.Spec.Sink.Config = applied.Spec.Sink.Config
262+
}
263+
if dataflow.Spec.Errors != nil && applied.Spec.Errors != nil &&
264+
(dataflow.Spec.Errors.Config == nil || len(dataflow.Spec.Errors.Config.Raw) == 0) &&
265+
applied.Spec.Errors.Config != nil && len(applied.Spec.Errors.Config.Raw) > 0 {
266+
dataflow.Spec.Errors.Config = applied.Spec.Errors.Config
267+
}
268+
return nil
269+
}
270+
230271
//+kubebuilder:rbac:groups=dataflow.dataflow.io,resources=dataflows,verbs=get;list;watch;create;update;patch;delete
231272
//+kubebuilder:rbac:groups=dataflow.dataflow.io,resources=dataflows/status,verbs=get;update;patch
232273
//+kubebuilder:rbac:groups=dataflow.dataflow.io,resources=dataflows/finalizers,verbs=update
@@ -268,6 +309,11 @@ func (r *DataFlowReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
268309

269310
log.Info("Reconciling DataFlow")
270311

312+
// Restore config from last-applied-configuration if API stripped it (CRD pruning).
313+
if err := restoreSpecFromLastApplied(&dataflow); err != nil {
314+
log.V(1).Info("Could not restore spec from last-applied-configuration", "error", err)
315+
}
316+
271317
// Check if DataFlow is being deleted: run cleanup only when our finalizer is present
272318
if !dataflow.DeletionTimestamp.IsZero() {
273319
hasOurFinalizer := false

0 commit comments

Comments
 (0)