Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/extension-server/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ rules:
- connectors/status
verbs:
- get
# The edge re-translation controller patches a trigger annotation onto
# Gateways to make Envoy Gateway re-translate when a Connector's liveness
# changes.
- apiGroups:
- gateway.networking.k8s.io
resources:
- gateways
verbs:
- get
- patch
3 changes: 1 addition & 2 deletions internal/cmd/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,7 @@ func NewCommand(build BuildInfo) *cobra.Command {
}

if err := (&controller.ConnectorReconciler{
Config: serverConfig,
DownstreamCluster: downstreamCluster,
Config: serverConfig,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Connector")
os.Exit(1)
Expand Down
150 changes: 0 additions & 150 deletions internal/controller/connector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,20 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
mcbuilder "sigs.k8s.io/multicluster-runtime/pkg/builder"
mchandler "sigs.k8s.io/multicluster-runtime/pkg/handler"
mcmanager "sigs.k8s.io/multicluster-runtime/pkg/manager"
"sigs.k8s.io/multicluster-runtime/pkg/multicluster"
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"

networkingv1alpha "go.datum.net/network-services-operator/api/v1alpha"
networkingv1alpha1 "go.datum.net/network-services-operator/api/v1alpha1"
"go.datum.net/network-services-operator/internal/config"
downstreamclient "go.datum.net/network-services-operator/internal/downstreamclient"
)

// connectorReadyAnnotationKey is patched onto downstream Gateways when a
// Connector's Ready condition flips online↔offline. Envoy Gateway's
// AnnotationChangedPredicate on Gateway fires a full re-translation so the
// extension server can re-apply the correct routing config from its cache.
//
// This is the Mode-B (eppEmissionEnabled:false) replacement for the trigger
// that EnvoyPatchPolicy objects provided in Mode A: the EPP itself was the
// EG-watched resource; in Mode B we touch a Gateway annotation instead.
// Only used when r.DownstreamCluster is set and EPP emission is disabled.
const connectorReadyAnnotationKey = "networking.datumapis.com/connector-ready-generation"

// ConnectorReconciler reconciles a Connector object
type ConnectorReconciler struct {
mgr mcmanager.Manager
Config config.NetworkServicesOperator

// DownstreamCluster is the edge/infra cluster where Gateways are
// materialized and where Envoy Gateway runs. When non-nil and EPP
// emission is disabled (Mode B), the reconciler touches a trigger
// annotation on affected downstream Gateways whenever a Connector's
// Ready condition flips, causing EG to re-translate via its
// AnnotationChangedPredicate on the Gateway resource.
DownstreamCluster cluster.Cluster
}

// +kubebuilder:rbac:groups=networking.datumapis.com,resources=connectors,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -89,9 +67,6 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
defer logger.Info("reconcile complete")

originalStatus := connector.Status.DeepCopy()
// Capture the Ready condition BEFORE any mutations so we can detect a
// status→Ready flip at the end of this reconcile (Mode B only).
prevReadyCondition := apimeta.FindStatusCondition(originalStatus.Conditions, networkingv1alpha1.ConnectorConditionReady)

readyCondition := apimeta.FindStatusCondition(connector.Status.Conditions, networkingv1alpha1.ConnectorConditionReady)
if readyCondition == nil {
Expand Down Expand Up @@ -192,137 +167,12 @@ func (r *ConnectorReconciler) Reconcile(ctx context.Context, req mcreconcile.Req
}
}

// Mode B (extension server): when the Connector's Ready condition flips
// (Lease expiry → offline, Lease renewal → online), touch a trigger
// annotation on each affected downstream Gateway so EG's
// AnnotationChangedPredicate fires a full re-translation. This replaces
// the trigger that EPP objects provided in Mode A.
//
// The extensionManager.resources watch (TPP + Connector) already handles
// spec-change triggers via GenerationChangedPredicate; status-only
// transitions (this path) require the Gateway annotation touch because
// status writes do NOT increment metadata.generation.
if !r.Config.Gateway.IsEPPEmissionEnabled() && r.DownstreamCluster != nil {
newReadyCondition := apimeta.FindStatusCondition(connector.Status.Conditions, networkingv1alpha1.ConnectorConditionReady)
if connectorReadyStatusChanged(prevReadyCondition, newReadyCondition) {
if annotationErr := r.touchDownstreamGatewayAnnotations(ctx, cl.GetClient(), string(req.ClusterName), &connector); annotationErr != nil {
// Log and continue — annotation touch is best-effort; a
// missed touch means Envoy holds last-known-good config
// until the next EG rebuild rather than crashing.
logger.Error(annotationErr, "failed to touch downstream gateway annotations for connector ready state transition")
}
}
}

if leaseStatus.requeueAfter != nil {
return ctrl.Result{RequeueAfter: *leaseStatus.requeueAfter}, nil
}
return ctrl.Result{}, nil
}

// connectorReadyStatusChanged returns true when the Ready condition's Status
// has changed between the previous and current condition. Nil → non-nil (first
// time the condition is set) is also treated as a change.
func connectorReadyStatusChanged(prev, next *metav1.Condition) bool {
if prev == nil && next == nil {
return false
}
if prev == nil || next == nil {
return true
}
return prev.Status != next.Status
}

// connectorReadyAnnotationValue returns a deterministic string encoding of
// the Connector's Ready condition for use as the annotation value.
// The value changes only when the Ready Status changes (True ↔ False), so
// patching it onto a downstream Gateway is idempotent across reconciles that
// leave Ready in the same state.
func connectorReadyAnnotationValue(readyCondition *metav1.Condition) string {
if readyCondition == nil {
return "Unknown/Unknown"
}
return fmt.Sprintf("%s/%s", readyCondition.Status, readyCondition.Reason)
}

// touchDownstreamGatewayAnnotations patches a trigger annotation on every
// downstream Gateway backed by an HTTPProxy that references this Connector.
// The annotation change fires EG's AnnotationChangedPredicate on the Gateway,
// which enqueues a full re-translation so the extension server can re-apply
// the correct online/offline routing config from its informer cache.
func (r *ConnectorReconciler) touchDownstreamGatewayAnnotations(
ctx context.Context,
upstreamClient client.Client,
clusterName string,
connector *networkingv1alpha1.Connector,
) error {
logger := log.FromContext(ctx)

// List all HTTPProxies in the Connector's namespace; filter to those that
// reference this Connector. The HTTPProxy controller creates one Gateway
// per HTTPProxy with the same name and namespace, so Gateway affinity
// follows directly from the HTTPProxy→Connector reference.
var httpProxies networkingv1alpha.HTTPProxyList
if err := upstreamClient.List(ctx, &httpProxies, client.InNamespace(connector.Namespace)); err != nil {
return fmt.Errorf("list HTTPProxies in %s: %w", connector.Namespace, err)
}

// Resolve the downstream namespace once for the whole batch.
strategy := downstreamclient.NewMappedNamespaceResourceStrategy(
clusterName,
upstreamClient,
r.DownstreamCluster.GetClient(),
)
downstreamNS, err := strategy.GetDownstreamNamespaceNameForUpstreamNamespace(ctx, connector.Namespace)
if err != nil {
return fmt.Errorf("get downstream namespace for %s: %w", connector.Namespace, err)
}

readyCondition := apimeta.FindStatusCondition(connector.Status.Conditions, networkingv1alpha1.ConnectorConditionReady)
annotationValue := connectorReadyAnnotationValue(readyCondition)
downstreamCl := r.DownstreamCluster.GetClient()

for i := range httpProxies.Items {
hp := &httpProxies.Items[i]
if !httpProxyReferencesConnector(hp, connector.Name) {
continue
}

// Gateway name == HTTPProxy name (see HTTPProxyReconciler.collectDesiredResources).
gwKey := client.ObjectKey{Namespace: downstreamNS, Name: hp.Name}
var gw gatewayv1.Gateway
if err := downstreamCl.Get(ctx, gwKey, &gw); err != nil {
if apierrors.IsNotFound(err) {
// Gateway not yet created or already deleted — skip.
logger.V(1).Info("downstream gateway not found; skipping annotation touch",
"gateway", gwKey)
continue
}
return fmt.Errorf("get downstream gateway %s: %w", gwKey, err)
}

// Idempotent: skip Patch if the value hasn't changed.
if gw.Annotations[connectorReadyAnnotationKey] == annotationValue {
continue
}

patch := client.MergeFrom(gw.DeepCopy())
if gw.Annotations == nil {
gw.Annotations = make(map[string]string)
}
gw.Annotations[connectorReadyAnnotationKey] = annotationValue
if err := downstreamCl.Patch(ctx, &gw, patch); err != nil {
return fmt.Errorf("patch downstream gateway %s annotation: %w", gwKey, err)
}
logger.Info("touched downstream gateway annotation for connector ready state change",
"gateway", gwKey,
"connector", connector.Name,
"annotationValue", annotationValue)
}

return nil
}

func (r *ConnectorReconciler) connectorLeaseDurationSeconds() int32 {
if r.Config.Connector.LeaseDurationSeconds > 0 {
return r.Config.Connector.LeaseDurationSeconds
Expand Down
Loading
Loading