From 18d35160994a7aa19c3e0edaa1bc3821891d2ede Mon Sep 17 00:00:00 2001 From: Scot Wells Date: Tue, 23 Jun 2026 11:18:26 -0500 Subject: [PATCH] fix(connector): re-translate at the edge on connector liveness change (#209) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a connector comes online (Ready False→True), its liveness reaches the edge via the upstream-status annotation, but Envoy Gateway did not re-translate against it. EG watches Connector with a generation-only predicate, so the annotation change is ignored, and the previous project-side Gateway annotation touch raced the annotation's hub→edge propagation. EG could translate while the edge still saw the connector offline, serving 503 with no recovery. Add an edge-local controller in the extension-server process that watches the replicated Connector and touches the owning Gateway when liveness changes — after the new liveness is already in the shared cache — forcing EG to re-translate against fresh data. Remove the racy project-side touch. Co-Authored-By: Claude Opus 4.8 (1M context) --- config/extension-server/rbac/role.yaml | 10 + internal/cmd/manager/manager.go | 3 +- internal/controller/connector_controller.go | 150 --------- .../controller/connector_controllers_test.go | 287 ------------------ ...way_resource_replicator_controller_test.go | 118 +++++++ internal/extensionserver/cache/index.go | 8 + internal/extensionserver/cmd/run.go | 15 +- .../extensionserver/retrigger/retrigger.go | 162 ++++++++++ .../retrigger/retrigger_test.go | 197 ++++++++++++ 9 files changed, 510 insertions(+), 440 deletions(-) create mode 100644 internal/extensionserver/retrigger/retrigger.go create mode 100644 internal/extensionserver/retrigger/retrigger_test.go diff --git a/config/extension-server/rbac/role.yaml b/config/extension-server/rbac/role.yaml index 830ec814..aa17dabf 100644 --- a/config/extension-server/rbac/role.yaml +++ b/config/extension-server/rbac/role.yaml @@ -34,3 +34,13 @@ rules: - connectors/status verbs: - get + # The edge re-translation controller patches a trigger annotation onto + # Gateways to make Envoy Gateway re-translate when a Connector's liveness + # changes. + - apiGroups: + - gateway.networking.k8s.io + resources: + - gateways + verbs: + - get + - patch diff --git a/internal/cmd/manager/manager.go b/internal/cmd/manager/manager.go index 84422d6b..11dd37b3 100644 --- a/internal/cmd/manager/manager.go +++ b/internal/cmd/manager/manager.go @@ -437,8 +437,7 @@ func NewCommand(build BuildInfo) *cobra.Command { } if err := (&controller.ConnectorReconciler{ - Config: serverConfig, - DownstreamCluster: downstreamCluster, + Config: serverConfig, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Connector") os.Exit(1) diff --git a/internal/controller/connector_controller.go b/internal/controller/connector_controller.go index f353cec1..a3f0b2b4 100644 --- a/internal/controller/connector_controller.go +++ b/internal/controller/connector_controller.go @@ -20,42 +20,20 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder" mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler" mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager" "sigs.k8s.io/multicluster-runtime/pkg/multicluster" mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" - networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" "go.datum.net/network-services-operator/internal/config" - downstreamclient "go.datum.net/network-services-operator/internal/downstreamclient" ) -// connectorReadyAnnotationKey is patched onto downstream Gateways when a -// Connector's Ready condition flips online↔offline. Envoy Gateway's -// AnnotationChangedPredicate on Gateway fires a full re-translation so the -// extension server can re-apply the correct routing config from its cache. -// -// This is the Mode-B (eppEmissionEnabled:false) replacement for the trigger -// that EnvoyPatchPolicy objects provided in Mode A: the EPP itself was the -// EG-watched resource; in Mode B we touch a Gateway annotation instead. -// Only used when r.DownstreamCluster is set and EPP emission is disabled. -const connectorReadyAnnotationKey = "networking.datumapis.com/connector-ready-generation" - // ConnectorReconciler reconciles a Connector object type ConnectorReconciler struct { mgr mcmanager.Manager Config config.NetworkServicesOperator - - // DownstreamCluster is the edge/infra cluster where Gateways are - // materialized and where Envoy Gateway runs. When non-nil and EPP - // emission is disabled (Mode B), the reconciler touches a trigger - // annotation on affected downstream Gateways whenever a Connector's - // Ready condition flips, causing EG to re-translate via its - // AnnotationChangedPredicate on the Gateway resource. - DownstreamCluster cluster.Cluster } // +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors,verbs=get;list;watch;create;update;patch;delete @@ -89,9 +67,6 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req mcreconcile.Req defer logger.Info("reconcile complete") originalStatus := connector.Status.DeepCopy() - // Capture the Ready condition BEFORE any mutations so we can detect a - // status→Ready flip at the end of this reconcile (Mode B only). - prevReadyCondition := apimeta.FindStatusCondition(originalStatus.Conditions, networkingv1alpha1.ConnectorConditionReady) readyCondition := apimeta.FindStatusCondition(connector.Status.Conditions, networkingv1alpha1.ConnectorConditionReady) if readyCondition == nil { @@ -192,137 +167,12 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req mcreconcile.Req } } - // Mode B (extension server): when the Connector's Ready condition flips - // (Lease expiry → offline, Lease renewal → online), touch a trigger - // annotation on each affected downstream Gateway so EG's - // AnnotationChangedPredicate fires a full re-translation. This replaces - // the trigger that EPP objects provided in Mode A. - // - // The extensionManager.resources watch (TPP + Connector) already handles - // spec-change triggers via GenerationChangedPredicate; status-only - // transitions (this path) require the Gateway annotation touch because - // status writes do NOT increment metadata.generation. - if !r.Config.Gateway.IsEPPEmissionEnabled() && r.DownstreamCluster != nil { - newReadyCondition := apimeta.FindStatusCondition(connector.Status.Conditions, networkingv1alpha1.ConnectorConditionReady) - if connectorReadyStatusChanged(prevReadyCondition, newReadyCondition) { - if annotationErr := r.touchDownstreamGatewayAnnotations(ctx, cl.GetClient(), string(req.ClusterName), &connector); annotationErr != nil { - // Log and continue — annotation touch is best-effort; a - // missed touch means Envoy holds last-known-good config - // until the next EG rebuild rather than crashing. - logger.Error(annotationErr, "failed to touch downstream gateway annotations for connector ready state transition") - } - } - } - if leaseStatus.requeueAfter != nil { return ctrl.Result{RequeueAfter: *leaseStatus.requeueAfter}, nil } return ctrl.Result{}, nil } -// connectorReadyStatusChanged returns true when the Ready condition's Status -// has changed between the previous and current condition. Nil → non-nil (first -// time the condition is set) is also treated as a change. -func connectorReadyStatusChanged(prev, next *metav1.Condition) bool { - if prev == nil && next == nil { - return false - } - if prev == nil || next == nil { - return true - } - return prev.Status != next.Status -} - -// connectorReadyAnnotationValue returns a deterministic string encoding of -// the Connector's Ready condition for use as the annotation value. -// The value changes only when the Ready Status changes (True ↔ False), so -// patching it onto a downstream Gateway is idempotent across reconciles that -// leave Ready in the same state. -func connectorReadyAnnotationValue(readyCondition *metav1.Condition) string { - if readyCondition == nil { - return "Unknown/Unknown" - } - return fmt.Sprintf("%s/%s", readyCondition.Status, readyCondition.Reason) -} - -// touchDownstreamGatewayAnnotations patches a trigger annotation on every -// downstream Gateway backed by an HTTPProxy that references this Connector. -// The annotation change fires EG's AnnotationChangedPredicate on the Gateway, -// which enqueues a full re-translation so the extension server can re-apply -// the correct online/offline routing config from its informer cache. -func (r *ConnectorReconciler) touchDownstreamGatewayAnnotations( - ctx context.Context, - upstreamClient client.Client, - clusterName string, - connector *networkingv1alpha1.Connector, -) error { - logger := log.FromContext(ctx) - - // List all HTTPProxies in the Connector's namespace; filter to those that - // reference this Connector. The HTTPProxy controller creates one Gateway - // per HTTPProxy with the same name and namespace, so Gateway affinity - // follows directly from the HTTPProxy→Connector reference. - var httpProxies networkingv1alpha.HTTPProxyList - if err := upstreamClient.List(ctx, &httpProxies, client.InNamespace(connector.Namespace)); err != nil { - return fmt.Errorf("list HTTPProxies in %s: %w", connector.Namespace, err) - } - - // Resolve the downstream namespace once for the whole batch. - strategy := downstreamclient.NewMappedNamespaceResourceStrategy( - clusterName, - upstreamClient, - r.DownstreamCluster.GetClient(), - ) - downstreamNS, err := strategy.GetDownstreamNamespaceNameForUpstreamNamespace(ctx, connector.Namespace) - if err != nil { - return fmt.Errorf("get downstream namespace for %s: %w", connector.Namespace, err) - } - - readyCondition := apimeta.FindStatusCondition(connector.Status.Conditions, networkingv1alpha1.ConnectorConditionReady) - annotationValue := connectorReadyAnnotationValue(readyCondition) - downstreamCl := r.DownstreamCluster.GetClient() - - for i := range httpProxies.Items { - hp := &httpProxies.Items[i] - if !httpProxyReferencesConnector(hp, connector.Name) { - continue - } - - // Gateway name == HTTPProxy name (see HTTPProxyReconciler.collectDesiredResources). - gwKey := client.ObjectKey{Namespace: downstreamNS, Name: hp.Name} - var gw gatewayv1.Gateway - if err := downstreamCl.Get(ctx, gwKey, &gw); err != nil { - if apierrors.IsNotFound(err) { - // Gateway not yet created or already deleted — skip. - logger.V(1).Info("downstream gateway not found; skipping annotation touch", - "gateway", gwKey) - continue - } - return fmt.Errorf("get downstream gateway %s: %w", gwKey, err) - } - - // Idempotent: skip Patch if the value hasn't changed. - if gw.Annotations[connectorReadyAnnotationKey] == annotationValue { - continue - } - - patch := client.MergeFrom(gw.DeepCopy()) - if gw.Annotations == nil { - gw.Annotations = make(map[string]string) - } - gw.Annotations[connectorReadyAnnotationKey] = annotationValue - if err := downstreamCl.Patch(ctx, &gw, patch); err != nil { - return fmt.Errorf("patch downstream gateway %s annotation: %w", gwKey, err) - } - logger.Info("touched downstream gateway annotation for connector ready state change", - "gateway", gwKey, - "connector", connector.Name, - "annotationValue", annotationValue) - } - - return nil -} - func (r *ConnectorReconciler) connectorLeaseDurationSeconds() int32 { if r.Config.Connector.LeaseDurationSeconds > 0 { return r.Config.Connector.LeaseDurationSeconds diff --git a/internal/controller/connector_controllers_test.go b/internal/controller/connector_controllers_test.go index e8f95b1a..7e2f2ebb 100644 --- a/internal/controller/connector_controllers_test.go +++ b/internal/controller/connector_controllers_test.go @@ -2,17 +2,14 @@ package controller import ( "context" - "sync" "testing" "time" "github.com/stretchr/testify/assert" coordinationv1 "k8s.io/api/coordination/v1" - corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -20,12 +17,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" - gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" - networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" - "go.datum.net/network-services-operator/internal/config" ) func TestConnectorReconcile(t *testing.T) { @@ -242,284 +236,3 @@ func TestConnectorAdvertisementReconcile(t *testing.T) { }) } } - -// patchCountingClient wraps a fake client and counts how many times Patch has -// been called. Used to verify idempotency in downstream Gateway annotation tests. -type patchCountingClient struct { - client.Client - mu sync.Mutex - patchCount int -} - -func (c *patchCountingClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { - c.mu.Lock() - c.patchCount++ - c.mu.Unlock() - return c.Client.Patch(ctx, obj, patch, opts...) -} - -func (c *patchCountingClient) PatchCount() int { - c.mu.Lock() - defer c.mu.Unlock() - return c.patchCount -} - -// buildConnectorTestScheme assembles the scheme needed for connector annotation tests. -func buildConnectorTestScheme(t *testing.T) *runtime.Scheme { - t.Helper() - s := runtime.NewScheme() - assert.NoError(t, scheme.AddToScheme(s)) - assert.NoError(t, corev1.AddToScheme(s)) - assert.NoError(t, coordinationv1.AddToScheme(s)) - assert.NoError(t, networkingv1alpha1.AddToScheme(s)) - assert.NoError(t, networkingv1alpha.AddToScheme(s)) - assert.NoError(t, gatewayv1.Install(s)) - return s -} - -// TestConnectorReconcile_ReadyFlip_TouchesDownstreamGatewayAnnotation verifies -// that when a Connector's Ready condition flips (nil → True via a valid Lease), -// the reconciler patches the trigger annotation on the affected downstream -// Gateway, causing EG's AnnotationChangedPredicate to fire. -func TestConnectorReconcile_ReadyFlip_TouchesDownstreamGatewayAnnotation(t *testing.T) { - log.SetLogger(zap.New(zap.UseDevMode(true))) - - const ( - upstreamNS = "user-project" - upstreamUID = types.UID("ns-uid-abc-123") - downstreamNS = "ns-ns-uid-abc-123" // "ns-" + upstreamUID - connName = "my-connector" - proxyName = "my-httpproxy" - ) - - testScheme := buildConnectorTestScheme(t) - - connector := &networkingv1alpha1.Connector{ - ObjectMeta: metav1.ObjectMeta{ - Name: connName, - Namespace: upstreamNS, - }, - Spec: networkingv1alpha1.ConnectorSpec{ - ConnectorClassName: "datum-connect", - }, - } - connectorClass := &networkingv1alpha1.ConnectorClass{ - ObjectMeta: metav1.ObjectMeta{Name: "datum-connect"}, - } - // HTTPProxy that references the Connector, creating Gateway affinity. - httpProxy := &networkingv1alpha.HTTPProxy{ - ObjectMeta: metav1.ObjectMeta{Name: proxyName, Namespace: upstreamNS}, - Spec: networkingv1alpha.HTTPProxySpec{ - Rules: []networkingv1alpha.HTTPProxyRule{ - { - Backends: []networkingv1alpha.HTTPProxyRuleBackend{ - { - Connector: &networkingv1alpha.ConnectorReference{Name: connName}, - Endpoint: "http://example.com", - }, - }, - }, - }, - }, - } - // Upstream namespace with a known UID so we can predict the downstream ns name. - ns := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: upstreamNS, - UID: upstreamUID, - }, - } - // Valid lease causes the reconciler to set Ready=True. - lease := &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{Name: connName, Namespace: upstreamNS}, - Spec: coordinationv1.LeaseSpec{ - LeaseDurationSeconds: ptr.To[int32](30), - RenewTime: &metav1.MicroTime{Time: time.Now()}, - }, - } - - upstreamCl := fake.NewClientBuilder(). - WithScheme(testScheme). - WithObjects(connector, connectorClass, httpProxy, ns, lease). - WithStatusSubresource(connector). - Build() - - // Downstream Gateway — same name as the HTTPProxy, in the mapped namespace. - downstreamGW := &gatewayv1.Gateway{ - ObjectMeta: metav1.ObjectMeta{Name: proxyName, Namespace: downstreamNS}, - Spec: gatewayv1.GatewaySpec{ - GatewayClassName: "datum-downstream-gateway", - }, - } - downstreamFakeCl := fake.NewClientBuilder(). - WithScheme(testScheme). - WithObjects(downstreamGW). - Build() - countingDownstreamCl := &patchCountingClient{Client: downstreamFakeCl} - - reconciler := &ConnectorReconciler{ - mgr: &fakeMockManager{cl: upstreamCl}, - Config: config.NetworkServicesOperator{ - Gateway: config.GatewayConfig{ - EPPEmissionEnabled: ptr.To(false), // Mode B - }, - }, - DownstreamCluster: &fakeCluster{cl: countingDownstreamCl}, - } - - req := mcreconcile.Request{ - Request: reconcile.Request{NamespacedName: client.ObjectKeyFromObject(connector)}, - ClusterName: "single", - } - _, err := reconciler.Reconcile(context.Background(), req) - assert.NoError(t, err) - - // Connector should now be Ready=True. - var updatedConnector networkingv1alpha1.Connector - assert.NoError(t, upstreamCl.Get(context.Background(), client.ObjectKeyFromObject(connector), &updatedConnector)) - readyCond := apimeta.FindStatusCondition(updatedConnector.Status.Conditions, networkingv1alpha1.ConnectorConditionReady) - assert.NotNil(t, readyCond) - assert.Equal(t, metav1.ConditionTrue, readyCond.Status) - - // Downstream gateway must have the trigger annotation set. - var updatedGW gatewayv1.Gateway - assert.NoError(t, downstreamFakeCl.Get(context.Background(), client.ObjectKeyFromObject(downstreamGW), &updatedGW)) - assert.NotEmpty(t, updatedGW.Annotations[connectorReadyAnnotationKey], - "trigger annotation should be set on downstream gateway after Ready flip") - - // Exactly one Patch should have been issued (the annotation write). - assert.Equal(t, 1, countingDownstreamCl.PatchCount(), - "should have issued exactly one Patch to set the trigger annotation") -} - -// TestConnectorReconcile_ReadyNoFlip_NoAnnotationWrite verifies that when a -// Connector's Ready condition does NOT change between reconciles (stays True), -// the reconciler does NOT write the trigger annotation again — idempotent. -func TestConnectorReconcile_ReadyNoFlip_NoAnnotationWrite(t *testing.T) { - log.SetLogger(zap.New(zap.UseDevMode(true))) - - const ( - upstreamNS = "user-project" - upstreamUID = types.UID("ns-uid-abc-123") - downstreamNS = "ns-ns-uid-abc-123" - connName = "my-connector" - proxyName = "my-httpproxy" - ) - - testScheme := buildConnectorTestScheme(t) - - // Connector whose status already has Ready=True (set before first reconcile). - // This simulates the connector being stable — no flip this reconcile. - existingReadyCondition := metav1.Condition{ - Type: networkingv1alpha1.ConnectorConditionReady, - Status: metav1.ConditionTrue, - Reason: networkingv1alpha1.ConnectorReasonReady, - ObservedGeneration: 1, - } - connector := &networkingv1alpha1.Connector{ - ObjectMeta: metav1.ObjectMeta{ - Name: connName, - Namespace: upstreamNS, - Generation: 1, - }, - Spec: networkingv1alpha1.ConnectorSpec{ - ConnectorClassName: "datum-connect", - }, - Status: networkingv1alpha1.ConnectorStatus{ - Conditions: []metav1.Condition{ - { - Type: networkingv1alpha1.ConnectorConditionAccepted, - Status: metav1.ConditionTrue, - Reason: networkingv1alpha1.ConnectorReasonAccepted, - ObservedGeneration: 1, - }, - existingReadyCondition, - }, - LeaseRef: &corev1.LocalObjectReference{Name: connName}, - }, - } - connectorClass := &networkingv1alpha1.ConnectorClass{ - ObjectMeta: metav1.ObjectMeta{Name: "datum-connect"}, - } - httpProxy := &networkingv1alpha.HTTPProxy{ - ObjectMeta: metav1.ObjectMeta{Name: proxyName, Namespace: upstreamNS}, - Spec: networkingv1alpha.HTTPProxySpec{ - Rules: []networkingv1alpha.HTTPProxyRule{ - { - Backends: []networkingv1alpha.HTTPProxyRuleBackend{ - { - Connector: &networkingv1alpha.ConnectorReference{Name: connName}, - Endpoint: "http://example.com", - }, - }, - }, - }, - }, - } - ns := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: upstreamNS, UID: upstreamUID}, - } - // Lease still valid → Ready stays True. - lease := &coordinationv1.Lease{ - ObjectMeta: metav1.ObjectMeta{Name: connName, Namespace: upstreamNS}, - Spec: coordinationv1.LeaseSpec{ - LeaseDurationSeconds: ptr.To[int32](30), - RenewTime: &metav1.MicroTime{Time: time.Now()}, - }, - } - - upstreamCl := fake.NewClientBuilder(). - WithScheme(testScheme). - WithObjects(connector, connectorClass, httpProxy, ns, lease). - WithStatusSubresource(connector). - Build() - - // Pre-set the annotation to the value the reconciler would write so we can - // detect whether it tries to re-write it (idempotency). - expectedAnnotationValue := connectorReadyAnnotationValue(&existingReadyCondition) - downstreamGW := &gatewayv1.Gateway{ - ObjectMeta: metav1.ObjectMeta{ - Name: proxyName, - Namespace: downstreamNS, - Annotations: map[string]string{ - connectorReadyAnnotationKey: expectedAnnotationValue, - }, - }, - Spec: gatewayv1.GatewaySpec{ - GatewayClassName: "datum-downstream-gateway", - }, - } - downstreamFakeCl := fake.NewClientBuilder(). - WithScheme(testScheme). - WithObjects(downstreamGW). - Build() - countingDownstreamCl := &patchCountingClient{Client: downstreamFakeCl} - - reconciler := &ConnectorReconciler{ - mgr: &fakeMockManager{cl: upstreamCl}, - Config: config.NetworkServicesOperator{ - Gateway: config.GatewayConfig{ - EPPEmissionEnabled: ptr.To(false), - }, - }, - DownstreamCluster: &fakeCluster{cl: countingDownstreamCl}, - } - - req := mcreconcile.Request{ - Request: reconcile.Request{NamespacedName: client.ObjectKeyFromObject(connector)}, - ClusterName: "single", - } - _, err := reconciler.Reconcile(context.Background(), req) - assert.NoError(t, err) - - // Downstream gateway annotation should remain unchanged. - var updatedGW gatewayv1.Gateway - assert.NoError(t, downstreamFakeCl.Get(context.Background(), client.ObjectKeyFromObject(downstreamGW), &updatedGW)) - assert.Equal(t, expectedAnnotationValue, updatedGW.Annotations[connectorReadyAnnotationKey], - "annotation should be unchanged when Ready does not flip") - - // No Patch should have been issued because Ready status is unchanged AND - // the annotation value is already correct. - assert.Equal(t, 0, countingDownstreamCl.PatchCount(), - "should NOT issue a Patch when Ready does not change") -} diff --git a/internal/controller/gateway_resource_replicator_controller_test.go b/internal/controller/gateway_resource_replicator_controller_test.go index 6ff1b4a8..b67f5842 100644 --- a/internal/controller/gateway_resource_replicator_controller_test.go +++ b/internal/controller/gateway_resource_replicator_controller_test.go @@ -688,6 +688,124 @@ func TestReplicatorConnectorLivenessAnnotationIdempotent(t *testing.T) { "status annotation must remain stable after an idempotent reconcile") } +// TestReplicatorReMirrorsConnectorAfterReadyFlip verifies that after an upstream +// Connector's Ready condition flips False→True (a status-only change), a +// reconcile re-mirrors the new status into the downstream upstream-status +// annotation — the downstream annotation tracks upstream status changes. +func TestReplicatorReMirrorsConnectorAfterReadyFlip(t *testing.T) { + connectorGVK := schema.GroupVersionKind{ + Group: "networking.datumapis.com", Version: "v1alpha1", Kind: "Connector", + } + + scheme := runtime.NewScheme() + assert.NoError(t, corev1.AddToScheme(scheme)) + + ctx := context.Background() + + upstreamNs := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "test-suite", UID: types.UID("ns-uid")}, + } + + // connectionDetails are populated (agent connected) while Ready is still False. + connectionDetails := map[string]any{ + "type": "PublicKey", + "publicKey": map[string]any{ + "id": "378843c806c8c93c5770abaa19bc47e04e9f56977c6e7cc28044a09ef5a1cd23", + }, + } + notReadyStatus := map[string]any{ + "conditions": []any{ + map[string]any{ + "type": "Ready", + "status": "False", + "reason": "ConnectorNotReady", + "message": "Connector lease has expired. Agent may be offline.", + }, + }, + "connectionDetails": connectionDetails, + } + + upstreamStatusTemplate := &unstructured.Unstructured{} + upstreamStatusTemplate.SetGroupVersionKind(connectorGVK) + + upstreamObj := &unstructured.Unstructured{} + upstreamObj.SetGroupVersionKind(connectorGVK) + upstreamObj.SetNamespace(upstreamNs.Name) + upstreamObj.SetName("connector-209") + upstreamObj.SetUID("connector-209-uid") + upstreamObj.Object["spec"] = map[string]any{"connectorClassName": "iroh"} + upstreamObj.Object["status"] = notReadyStatus + + upstreamClient := fake.NewClientBuilder(). + WithScheme(scheme). + WithStatusSubresource(upstreamStatusTemplate). + WithObjects(upstreamNs, upstreamObj.DeepCopy()). + Build() + + downstreamClient := fake.NewClientBuilder().WithScheme(scheme).Build() + + reconciler := newReplicatorForGVKTest(connectorGVK, upstreamClient, downstreamClient, scheme) + + req := GVKRequest{ + GVK: connectorGVK, + Request: mcreconcile.Request{ + ClusterName: "upstream", + Request: reconcile.Request{NamespacedName: client.ObjectKeyFromObject(upstreamObj)}, + }, + } + + // Initial replication while Ready:False (finalizer pass + sync pass). + _, err := reconciler.Reconcile(ctx, req) + assert.NoError(t, err, "first reconcile") + _, err = reconciler.Reconcile(ctx, req) + assert.NoError(t, err, "second reconcile") + + dsKey := client.ObjectKey{Name: "connector-209", Namespace: "ns-ns-uid"} + + var downstream unstructured.Unstructured + downstream.SetGroupVersionKind(connectorGVK) + assert.NoError(t, downstreamClient.Get(ctx, dsKey, &downstream)) + + expectedNotReady, err := json.Marshal(notReadyStatus) + assert.NoError(t, err) + assert.Equal(t, string(expectedNotReady), + downstream.GetAnnotations()[networkingv1alpha1.UpstreamStatusAnnotation], + "initial mirror must capture the Ready:False state") + + // Flip upstream Ready False→True via a status-only update (no spec change). + var upstreamLive unstructured.Unstructured + upstreamLive.SetGroupVersionKind(connectorGVK) + assert.NoError(t, upstreamClient.Get(ctx, client.ObjectKeyFromObject(upstreamObj), &upstreamLive)) + readyStatus := map[string]any{ + "conditions": []any{ + map[string]any{ + "type": "Ready", + "status": "True", + "reason": "ConnectorReady", + "message": "The connector is ready to tunnel traffic.", + }, + }, + "connectionDetails": connectionDetails, + } + upstreamLive.Object["status"] = readyStatus + assert.NoError(t, upstreamClient.Status().Update(ctx, &upstreamLive), + "flip upstream connector to Ready:True via status subresource") + + // Reconcile again, as the upstream status watch would in production. + _, err = reconciler.Reconcile(ctx, req) + assert.NoError(t, err, "reconcile after Ready flip") + + downstream = unstructured.Unstructured{} + downstream.SetGroupVersionKind(connectorGVK) + assert.NoError(t, downstreamClient.Get(ctx, dsKey, &downstream)) + + expectedReady, err := json.Marshal(readyStatus) + assert.NoError(t, err) + assert.Equal(t, string(expectedReady), + downstream.GetAnnotations()[networkingv1alpha1.UpstreamStatusAnnotation], + "after Ready flips True, the replicator must re-mirror the annotation with Ready:True") +} + func newReplicatorForTest(upstream client.Client, downstream client.Client, scheme *runtime.Scheme) *GatewayResourceReplicatorReconciler { return newReplicatorForGVKTest(testGVK, upstream, downstream, scheme) } diff --git a/internal/extensionserver/cache/index.go b/internal/extensionserver/cache/index.go index 5074b502..1ab53da1 100644 --- a/internal/extensionserver/cache/index.go +++ b/internal/extensionserver/cache/index.go @@ -190,6 +190,14 @@ func connectorLiveness(connector *networkingv1alpha1.Connector) (online bool, no return connectorStatusLiveness(&connector.Status) } +// ConnectorLiveness reports whether a Connector is online and, if so, its tunnel +// node id, using the same annotation-first classification the extension server +// applies when programming routes. It is exported so callers can detect liveness +// changes consistently with what the hook will program. +func ConnectorLiveness(connector *networkingv1alpha1.Connector) (online bool, nodeID string) { + return connectorLiveness(connector) +} + // connectorStatusLiveness derives the (online, nodeID) classification from a // ConnectorStatus, regardless of whether that status came from the mirrored // upstream-status annotation or the object's live status. Online is the Ready diff --git a/internal/extensionserver/cmd/run.go b/internal/extensionserver/cmd/run.go index 4f08bcff..2ddae519 100644 --- a/internal/extensionserver/cmd/run.go +++ b/internal/extensionserver/cmd/run.go @@ -32,6 +32,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" @@ -40,6 +41,7 @@ import ( extcache "go.datum.net/network-services-operator/internal/extensionserver/cache" extmetrics "go.datum.net/network-services-operator/internal/extensionserver/metrics" "go.datum.net/network-services-operator/internal/extensionserver/mutate" + "go.datum.net/network-services-operator/internal/extensionserver/retrigger" extserver "go.datum.net/network-services-operator/internal/extensionserver/server" exttls "go.datum.net/network-services-operator/internal/extensionserver/tls" exttracing "go.datum.net/network-services-operator/internal/extensionserver/tracing" @@ -205,11 +207,14 @@ func run(o options) { } // --- Cache scheme --- - // Minimal scheme covering the four types watched by the informer cache. + // Minimal scheme covering the four types watched by the informer cache, + // plus Gateway: the edge re-translation controller patches a trigger + // annotation onto Gateways (it does not cache them — patches are writes). cacheScheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(cacheScheme)) // corev1.Namespace utilruntime.Must(networkingv1alpha.AddToScheme(cacheScheme)) utilruntime.Must(networkingv1alpha1.AddToScheme(cacheScheme)) + utilruntime.Must(gatewayv1.Install(cacheScheme)) // --- Single-cluster read-only cache manager --- // NewManager uses ctrl.GetConfig() (in-cluster config) internally and @@ -221,6 +226,14 @@ func run(o options) { os.Exit(1) } + // --- Edge re-translation controller --- + // Makes Envoy Gateway re-translate when a Connector's liveness changes, so + // the hook re-runs against fresh liveness. See the retrigger package doc. + if err := (&retrigger.Reconciler{Client: mgr.GetClient()}).SetupWithManager(mgr); err != nil { + log.Error("set up gateway re-translation controller", "err", err) + os.Exit(1) + } + // --- mTLS config --- // LoadServerTLSConfig uses GetCertificate (re-reads on each handshake) so // cert-manager rotations are picked up automatically without a pod restart. diff --git a/internal/extensionserver/retrigger/retrigger.go b/internal/extensionserver/retrigger/retrigger.go new file mode 100644 index 00000000..6a37855d --- /dev/null +++ b/internal/extensionserver/retrigger/retrigger.go @@ -0,0 +1,162 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +// Package retrigger contains the edge controller that makes Envoy Gateway +// re-translate when a Connector's liveness changes. +// +// A Connector's online/offline state reaches the edge in the +// networking.datumapis.com/upstream-status annotation (Karmada propagates +// metadata, not status), and the extension server reads it to program tunnels. +// Envoy Gateway, however, only re-runs translation — and so the extension hook — +// when a resource it watches changes through an annotation-aware predicate; it +// does not re-translate on Connector changes. So an online connector's fresh +// liveness sits in the cache while the data plane keeps serving the stale +// (often offline) program. +// +// This controller, co-located with the extension server and sharing its cache, +// watches Connectors and touches the owning Gateway when liveness changes. Envoy +// Gateway does re-translate on Gateway annotation changes, and because the touch +// happens after the new liveness is already in the shared cache, the hook +// re-runs against fresh data. +package retrigger + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" + networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" + extcache "go.datum.net/network-services-operator/internal/extensionserver/cache" +) + +// ConnectorReadyAnnotationKey is the annotation patched onto a Gateway to make +// Envoy Gateway re-translate. Its value encodes the connector's liveness, so EG +// re-translates only when the liveness actually changes. +const ConnectorReadyAnnotationKey = "networking.datumapis.com/connector-ready-generation" + +// Reconciler touches the owning Gateway annotation whenever an edge Connector's +// liveness changes, forcing EG to re-translate and re-run the extension hook. +type Reconciler struct { + Client client.Client +} + +// Reconcile resolves the connector's current liveness and stamps it onto every +// Gateway backed by an HTTPProxy that references the connector. The Gateway +// patch is a merge patch with no preceding Get: if the value is unchanged the +// API server treats it as a no-op (no resourceVersion bump, no EG event), so it +// is naturally idempotent and never triggers a spurious re-translation. +func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + + var connector networkingv1alpha1.Connector + if err := r.Client.Get(ctx, req.NamespacedName, &connector); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + online, nodeID := extcache.ConnectorLiveness(&connector) + value := livenessValue(online, nodeID) + + // The Connector, HTTPProxy, and Gateway share a namespace, and the Gateway is + // named after the HTTPProxy, so the connector→Gateway mapping is local. + var proxies networkingv1alpha.HTTPProxyList + if err := r.Client.List(ctx, &proxies, client.InNamespace(connector.Namespace)); err != nil { + return ctrl.Result{}, fmt.Errorf("list HTTPProxies in %s: %w", connector.Namespace, err) + } + + var firstErr error + for i := range proxies.Items { + proxy := &proxies.Items[i] + if !httpProxyReferencesConnector(proxy, connector.Name) { + continue + } + + gwKey := client.ObjectKey{Namespace: connector.Namespace, Name: proxy.Name} + if err := r.touchGateway(ctx, gwKey, value); err != nil { + logger.Error(err, "failed to touch gateway for connector liveness change", + "gateway", gwKey, "connector", connector.Name) + if firstErr == nil { + firstErr = err + } + continue + } + logger.Info("touched gateway to trigger EG re-translation", + "gateway", gwKey, "connector", connector.Name, "liveness", value) + } + + return ctrl.Result{}, firstErr +} + +// touchGateway applies a merge patch setting the trigger annotation. A +// non-existent Gateway is ignored: EG translates a Gateway when it is created, +// reading the (already fresh) extension cache, so there is nothing to nudge yet. +func (r *Reconciler) touchGateway(ctx context.Context, key client.ObjectKey, value string) error { + gw := &gatewayv1.Gateway{} + gw.Namespace = key.Namespace + gw.Name = key.Name + + body := fmt.Appendf(nil, `{"metadata":{"annotations":{%q:%q}}}`, ConnectorReadyAnnotationKey, value) + if err := r.Client.Patch(ctx, gw, client.RawPatch(types.MergePatchType, body)); err != nil { + return client.IgnoreNotFound(err) + } + return nil +} + +// livenessValue is the trigger annotation value. It includes the node id so a +// change in connectionDetails (e.g. the tunnel endpoint moves) re-translates +// too, not only Ready flips. +func livenessValue(online bool, nodeID string) string { + return fmt.Sprintf("%t/%s", online, nodeID) +} + +// SetupWithManager registers the controller. It reconciles a Connector only when +// its liveness — the (online, nodeID) the extension server keys on — actually +// changes, so heartbeat status churn that does not affect routing is ignored. +func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&networkingv1alpha1.Connector{}, builder.WithPredicates(livenessChangedPredicate())). + Named("extension-server-gateway-retrigger"). + Complete(r) +} + +// livenessChangedPredicate admits creates (so connectors already online when the +// controller starts get their Gateways stamped) and updates that change the +// (online, nodeID) classification. Deletes are ignored: removing a Connector +// tears down its HTTPProxy/Gateway/HTTPRoute, which EG re-translates on its own. +func livenessChangedPredicate() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(event.CreateEvent) bool { return true }, + DeleteFunc: func(event.DeleteEvent) bool { return false }, + UpdateFunc: func(e event.UpdateEvent) bool { + oldC, ok1 := e.ObjectOld.(*networkingv1alpha1.Connector) + newC, ok2 := e.ObjectNew.(*networkingv1alpha1.Connector) + if !ok1 || !ok2 { + return true + } + oOnline, oNode := extcache.ConnectorLiveness(oldC) + nOnline, nNode := extcache.ConnectorLiveness(newC) + return oOnline != nOnline || oNode != nNode + }, + } +} + +// httpProxyReferencesConnector reports whether any of the HTTPProxy's backends +// reference the named Connector. (Kept local to avoid importing the controller +// package's multicluster dependencies into the edge extension server.) +func httpProxyReferencesConnector(httpProxy *networkingv1alpha.HTTPProxy, connectorName string) bool { + for _, rule := range httpProxy.Spec.Rules { + for _, backend := range rule.Backends { + if backend.Connector != nil && backend.Connector.Name == connectorName { + return true + } + } + } + return false +} diff --git a/internal/extensionserver/retrigger/retrigger_test.go b/internal/extensionserver/retrigger/retrigger_test.go new file mode 100644 index 00000000..f902321b --- /dev/null +++ b/internal/extensionserver/retrigger/retrigger_test.go @@ -0,0 +1,197 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package retrigger + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/event" + gatewayv1 "sigs.k8s.io/gateway-api/apis/v1" + + networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha" + networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1" +) + +const ( + testNS = "ns-uid" + testConnector = "connector-1" + testProxy = "proxy-1" // Gateway name == HTTPProxy name +) + +func testScheme(t *testing.T) *runtime.Scheme { + t.Helper() + s := runtime.NewScheme() + require.NoError(t, corev1.AddToScheme(s)) + require.NoError(t, networkingv1alpha.AddToScheme(s)) + require.NoError(t, networkingv1alpha1.AddToScheme(s)) + require.NoError(t, gatewayv1.Install(s)) + return s +} + +// connectorWithUpstreamStatus builds an edge Connector whose liveness is carried +// in the UpstreamStatusAnnotation (the real two-cluster shape), with the given +// Ready value and, when online, a PublicKey node id. +func connectorWithUpstreamStatus(t *testing.T, ready bool, nodeID string) *networkingv1alpha1.Connector { + t.Helper() + readyStatus := metav1.ConditionFalse + if ready { + readyStatus = metav1.ConditionTrue + } + status := networkingv1alpha1.ConnectorStatus{ + Conditions: []metav1.Condition{{ + Type: networkingv1alpha1.ConnectorConditionReady, + Status: readyStatus, + Reason: "Test", + }}, + } + if ready && nodeID != "" { + status.ConnectionDetails = &networkingv1alpha1.ConnectorConnectionDetails{ + Type: networkingv1alpha1.PublicKeyConnectorConnectionType, + PublicKey: &networkingv1alpha1.ConnectorConnectionDetailsPublicKey{Id: nodeID}, + } + } + raw, err := json.Marshal(status) + require.NoError(t, err) + + return &networkingv1alpha1.Connector{ + ObjectMeta: metav1.ObjectMeta{ + Name: testConnector, + Namespace: testNS, + Annotations: map[string]string{networkingv1alpha1.UpstreamStatusAnnotation: string(raw)}, + }, + } +} + +func proxyRefersConnector() *networkingv1alpha.HTTPProxy { + return &networkingv1alpha.HTTPProxy{ + ObjectMeta: metav1.ObjectMeta{Name: testProxy, Namespace: testNS}, + Spec: networkingv1alpha.HTTPProxySpec{ + Rules: []networkingv1alpha.HTTPProxyRule{{ + Backends: []networkingv1alpha.HTTPProxyRuleBackend{{ + Connector: &networkingv1alpha.ConnectorReference{Name: testConnector}, + }}, + }}, + }, + } +} + +func gateway() *gatewayv1.Gateway { + return &gatewayv1.Gateway{ + ObjectMeta: metav1.ObjectMeta{Name: testProxy, Namespace: testNS}, + } +} + +func reconcileConnector(t *testing.T, cl client.Client) { + t.Helper() + r := &Reconciler{Client: cl} + _, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: client.ObjectKey{Namespace: testNS, Name: testConnector}, + }) + require.NoError(t, err) +} + +func gatewayLiveness(t *testing.T, cl client.Client) string { + t.Helper() + var gw gatewayv1.Gateway + require.NoError(t, cl.Get(context.Background(), client.ObjectKey{Namespace: testNS, Name: testProxy}, &gw)) + return gw.Annotations[ConnectorReadyAnnotationKey] +} + +// TestReconcile_OnlineConnector_StampsGatewayWithNodeID: when an online +// connector is reconciled, the owning Gateway gets a trigger annotation encoding +// the live (online, nodeID) so EG re-translates. +func TestReconcile_OnlineConnector_StampsGatewayWithNodeID(t *testing.T) { + scheme := testScheme(t) + cl := fake.NewClientBuilder().WithScheme(scheme). + WithObjects(connectorWithUpstreamStatus(t, true, "node-abc"), proxyRefersConnector(), gateway()). + Build() + + reconcileConnector(t, cl) + + assert.Equal(t, "true/node-abc", gatewayLiveness(t, cl), + "online connector must stamp the Gateway trigger annotation with online/nodeID") +} + +// TestReconcile_OfflineConnector_StampsOffline verifies the offline value so a +// True→False flip changes the annotation and re-translates to the 503 program. +func TestReconcile_OfflineConnector_StampsOffline(t *testing.T) { + scheme := testScheme(t) + cl := fake.NewClientBuilder().WithScheme(scheme). + WithObjects(connectorWithUpstreamStatus(t, false, ""), proxyRefersConnector(), gateway()). + Build() + + reconcileConnector(t, cl) + + assert.Equal(t, "false/", gatewayLiveness(t, cl), + "offline connector must stamp an offline trigger value") +} + +// TestReconcile_NoGateway_NoError verifies a missing Gateway (HTTPProxy exists +// but its Gateway is not yet created) is ignored — EG translates a Gateway on +// creation, so there is nothing to nudge yet. +func TestReconcile_NoGateway_NoError(t *testing.T) { + scheme := testScheme(t) + cl := fake.NewClientBuilder().WithScheme(scheme). + WithObjects(connectorWithUpstreamStatus(t, true, "node-abc"), proxyRefersConnector()). + Build() + + reconcileConnector(t, cl) // require.NoError inside +} + +// TestReconcile_UnrelatedProxy_NotTouched verifies a Gateway whose HTTPProxy +// does not reference the connector is left untouched. +func TestReconcile_UnrelatedProxy_NotTouched(t *testing.T) { + scheme := testScheme(t) + otherProxy := &networkingv1alpha.HTTPProxy{ + ObjectMeta: metav1.ObjectMeta{Name: "other", Namespace: testNS}, + Spec: networkingv1alpha.HTTPProxySpec{ + Rules: []networkingv1alpha.HTTPProxyRule{{ + Backends: []networkingv1alpha.HTTPProxyRuleBackend{{Endpoint: "http://x:80"}}, + }}, + }, + } + otherGW := &gatewayv1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: "other", Namespace: testNS}} + cl := fake.NewClientBuilder().WithScheme(scheme). + WithObjects(connectorWithUpstreamStatus(t, true, "node-abc"), otherProxy, otherGW). + Build() + + reconcileConnector(t, cl) + + var gw gatewayv1.Gateway + require.NoError(t, cl.Get(context.Background(), client.ObjectKey{Namespace: testNS, Name: "other"}, &gw)) + assert.NotContains(t, gw.Annotations, ConnectorReadyAnnotationKey, + "a Gateway whose HTTPProxy does not reference the connector must not be touched") +} + +// TestLivenessChangedPredicate verifies the controller only reconciles on +// changes that affect the (online, nodeID) the extension server keys on — not +// on unrelated status churn (e.g. heartbeat lastTransitionTime updates). +func TestLivenessChangedPredicate(t *testing.T) { + p := livenessChangedPredicate() + + offline := connectorWithUpstreamStatus(t, false, "") + onlineA := connectorWithUpstreamStatus(t, true, "node-a") + onlineB := connectorWithUpstreamStatus(t, true, "node-b") + + assert.True(t, p.Create(event.CreateEvent{Object: onlineA}), + "create is always admitted so already-online connectors stamp their Gateway on startup") + assert.False(t, p.Delete(event.DeleteEvent{Object: onlineA}), + "delete is ignored; EG re-translates on Gateway/route teardown") + + assert.True(t, p.Update(event.UpdateEvent{ObjectOld: offline, ObjectNew: onlineA}), + "Ready False→True must reconcile") + assert.True(t, p.Update(event.UpdateEvent{ObjectOld: onlineA, ObjectNew: onlineB}), + "nodeID change must reconcile") + assert.False(t, p.Update(event.UpdateEvent{ObjectOld: onlineA, ObjectNew: onlineA.DeepCopy()}), + "no liveness change must NOT reconcile") +}