From c2fe4e8d8f7df1a8bae6c38fd6c605afbd96333e Mon Sep 17 00:00:00 2001 From: Jose Szychowski Date: Thu, 18 Jun 2026 11:50:43 -0300 Subject: [PATCH 1/3] feat: integrate Vector-based billing usage collector via OTLP access log export --- .../downstream-gateway.yaml | 30 + .../kustomization.yaml | 258 ++++++ .../billing-usage-collector/namespace.yaml | 7 + internal/extensionserver/cache/index.go | 16 +- internal/extensionserver/cache/index_test.go | 7 +- internal/extensionserver/cache/types.go | 10 + internal/extensionserver/mutate/tpp.go | 47 +- internal/extensionserver/mutate/tpp_test.go | 14 +- test/e2e/billing/chainsaw-test.yaml | 766 ++++++++++++++++++ 9 files changed, 1136 insertions(+), 19 deletions(-) create mode 100644 config/tools/billing-usage-collector/kustomization.yaml create mode 100644 config/tools/billing-usage-collector/namespace.yaml create mode 100644 test/e2e/billing/chainsaw-test.yaml diff --git a/config/dev/downstream_resources/downstream-gateway.yaml b/config/dev/downstream_resources/downstream-gateway.yaml index 29b2ecb8..5654da34 100644 --- a/config/dev/downstream_resources/downstream-gateway.yaml +++ b/config/dev/downstream_resources/downstream-gateway.yaml @@ -96,6 +96,36 @@ spec: disable: false enableVirtualHostStats: true enablePerEndpointStats: true + accessLog: + settings: + - format: + type: JSON + json: + start_time: "%START_TIME%" + method: "%REQ(:METHOD)%" + path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%" + protocol: "%PROTOCOL%" + response_code: "%RESPONSE_CODE%" + bytes_received: "%BYTES_RECEIVED%" + bytes_sent: "%BYTES_SENT%" + duration: "%DURATION%" + route_name: "%ROUTE_NAME%" + project_name: "%METADATA(ROUTE:datum-gateway:project_name)%" + # Push access logs straight to the node-local Vector agent over OTLP. + # The JSON fields above are delivered as OTLP log-record attributes, + # which Vector's `envoy_access_logs` opentelemetry source parses into + # billing CloudEvents. internalTrafficPolicy: Local on the Vector + # Service keeps this hop on-node. + sinks: + - type: OpenTelemetry + openTelemetry: + host: billing-usage-collector-vector.billing-system.svc.cluster.local + port: 4317 + # OTLP has no custom URL path; identify this stream via an OTel + # resource attribute instead. Surfaces in Vector under + # `.resources` and can be used to route/filter signals. + resources: + service.name: nso-httproute-signals --- apiVersion: gateway.networking.k8s.io/v1 kind: GatewayClass diff --git a/config/tools/billing-usage-collector/kustomization.yaml b/config/tools/billing-usage-collector/kustomization.yaml new file mode 100644 index 00000000..21146531 --- /dev/null +++ b/config/tools/billing-usage-collector/kustomization.yaml @@ -0,0 +1,258 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +namespace: billing-system + +resources: + - namespace.yaml + +helmCharts: + - name: vector + namespace: billing-system + releaseName: billing-usage-collector + version: 0.34.0 + repo: https://helm.vector.dev + valuesInline: + role: Agent + image: + tag: "0.55.0-alpine" + tolerations: + - operator: Exists + serviceAccount: + create: true + name: billing-usage-collector + env: + - name: BILLING_GATEWAY_URL + value: "http://mock-billing-gateway.billing-system.svc.cluster.local:8080" + customConfig: + data_dir: "/vector-data-dir" + sources: + billing_sdk: + type: http_server + address: "0.0.0.0:9880" + path: "/cloudevents" + decoding: + codec: json + nso_logs: + type: http_server + address: "0.0.0.0:9881" + path: "/logs" + decoding: + codec: json + # Envoy Gateway pushes access logs here via its OpenTelemetry access + # log sink (OTLP). The JSON access log fields arrive as OTLP log-record + # attributes (i.e. under `.attributes.*`). This is the real ingestion + # path for live traffic; the `nso_logs` http_server above is only used + # by the e2e test to inject hand-crafted log lines. + envoy_access_logs: + type: opentelemetry + grpc: + address: "0.0.0.0:4317" + http: + address: "0.0.0.0:4318" + transforms: + parse_nso_logs: + type: remap + inputs: + - nso_logs + - envoy_access_logs.logs + source: | + # The OpenTelemetry source (Envoy push) nests the access log fields + # under `.attributes`; the http_server source (e2e test injection) + # delivers them at the top level. Normalize both to top-level fields + # so the rest of this transform is source-agnostic. + if exists(.attributes) { + . = object(.attributes) ?? . + } + + parsed_route, err = parse_regex(.route_name, r'^httproute/(?P[^/]+)/(?P[^/]+)') + if err != null || is_null(parsed_route) { + abort + } + + httproute_namespace = parsed_route.namespace + httproute_name = parsed_route.name + + project_id = replace!(to_string(httproute_namespace), r'^ns-', "") + subject = "projects/" + project_id + + # Human-readable project name injected by the gateway access log + # (x-datum-project-name header). Absent/empty renders as "" or "-". + project_name = "" + if !is_null(.project_name) { + pn = to_string(.project_name) ?? "" + if pn != "-" { + project_name = pn + } + } + + bytes_sent = to_int(.bytes_sent) ?? 0 + bytes_received = to_int(.bytes_received) ?? 0 + duration_ms = to_int(.duration) ?? 0 + + uuid_clean = replace(uuid_v4(), "-", "") + ulid_base = "0" + upcase(slice(uuid_clean, 1, 26) ?? "") + + events = [] + region = "us-east-1" + if !is_null(.region) { + region = .region + } + gateway_name = "test-gateway" + if !is_null(.gateway_name) { + gateway_name = .gateway_name + } + gateway_namespace = "default" + if !is_null(.gateway_namespace) { + gateway_namespace = .gateway_namespace + } + gateway_class = "datum-downstream-gateway" + if !is_null(.gateway_class) { + gateway_class = .gateway_class + } + start_time = .start_time + if is_null(start_time) { + start_time = format_timestamp!(now(), "%Y-%m-%dT%H:%M:%SZ") + } + + req_event = { + "specversion": "1.0", + "id": ulid_base, + "type": "networking.datumapis.com/gateway/requests", + "source": "//networking.datumapis.com/gateway", + "subject": subject, + "time": start_time, + "datacontenttype": "application/json", + "data": { + "value": "1", + "dimensions": { + "region": region, + "gateway": gateway_name, + "gateway_namespace": gateway_namespace, + "gateway_class": gateway_class, + "httproute_name": httproute_name, + "httproute_namespace": httproute_namespace, + "project_name": project_name + } + } + } + events = push(events, req_event) + + if bytes_sent > 0 { + uuid_clean_egress = replace(uuid_v4(), "-", "") + ulid_egress = "0" + upcase(slice(uuid_clean_egress, 1, 26) ?? "") + egress_event = req_event + egress_event.id = ulid_egress + egress_event.type = "networking.datumapis.com/gateway/egress-bytes" + egress_event.data.value = to_string(bytes_sent) + events = push(events, egress_event) + } + + if bytes_received > 0 { + uuid_clean_ingress = replace(uuid_v4(), "-", "") + ulid_ingress = "0" + upcase(slice(uuid_clean_ingress, 1, 26) ?? "") + ingress_event = req_event + ingress_event.id = ulid_ingress + ingress_event.type = "networking.datumapis.com/gateway/ingress-bytes" + ingress_event.data.value = to_string(bytes_received) + events = push(events, ingress_event) + } + + if duration_ms > 0 { + duration_sec = to_float(duration_ms) / 1000.0 + if duration_sec > 0 { + uuid_clean_conn = replace(uuid_v4(), "-", "") + ulid_conn = "0" + upcase(slice(uuid_clean_conn, 1, 26) ?? "") + conn_event = req_event + conn_event.id = ulid_conn + conn_event.type = "networking.datumapis.com/gateway/connection-seconds" + conn_event.data.value = to_string(duration_sec) + events = push(events, conn_event) + } + } + + . = events + clean_cloudevents: + type: remap + inputs: + - billing_sdk + - parse_nso_logs + source: | + del(.timestamp) + del(.host) + del(.source_type) + sinks: + billing_gateway: + type: http + inputs: + - clean_cloudevents + uri: "${BILLING_GATEWAY_URL}/v1/usage/events:batchIngest" + method: post + encoding: + codec: json + batch: + max_events: 100 + timeout_secs: 1 + tls: + verify_certificate: false + buffer: + type: memory + when_full: block + request: + retry_policy: + max_retries: 30 + initial_backoff_secs: 1 + max_backoff_secs: 60 + jitter_mode: full + initContainers: + - name: init-data + image: busybox:1.36 + securityContext: + runAsUser: 0 + runAsNonRoot: false + privileged: true + command: + - sh + - -c + - chown -R 1000:1000 /var/lib/vector + volumeMounts: + - name: data + mountPath: /var/lib/vector + podSecurityContext: + runAsUser: 1000 + runAsGroup: 1000 + runAsNonRoot: true + securityContext: + allowPrivilegeEscalation: false + resources: + requests: + cpu: 50m + memory: 64Mi + limits: + cpu: 500m + memory: 256Mi + service: + enabled: true + type: ClusterIP + internalTrafficPolicy: Local + ports: + - name: cloudevents + port: 9880 + protocol: TCP + targetPort: 9880 + - name: nso-logs + port: 9881 + protocol: TCP + targetPort: 9881 + # OTLP endpoints for Envoy Gateway's OpenTelemetry access log sink. + # internalTrafficPolicy: Local keeps each Envoy pod talking to the + # Vector agent on its own node. + - name: otlp-grpc + port: 4317 + protocol: TCP + targetPort: 4317 + - name: otlp-http + port: 4318 + protocol: TCP + targetPort: 4318 + podMonitor: + enabled: true diff --git a/config/tools/billing-usage-collector/namespace.yaml b/config/tools/billing-usage-collector/namespace.yaml new file mode 100644 index 00000000..d53bf36a --- /dev/null +++ b/config/tools/billing-usage-collector/namespace.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: billing-system + labels: + app.kubernetes.io/name: billing + app.kubernetes.io/managed-by: kustomize diff --git a/internal/extensionserver/cache/index.go b/internal/extensionserver/cache/index.go index 7cf4498e..5542530d 100644 --- a/internal/extensionserver/cache/index.go +++ b/internal/extensionserver/cache/index.go @@ -6,6 +6,7 @@ import ( "net/url" "sort" "strconv" + "strings" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" @@ -29,9 +30,10 @@ import ( // they are prepended to every policy's per-rule directive list. func BuildPolicyIndexFromClient(ctx context.Context, cl client.Client, baseDirectives []string) (*PolicyIndex, error) { idx := &PolicyIndex{ - DStoUS: make(map[string]string), - TPPs: make(map[string][]TPPInfo), - Connectors: make(map[ConnectorKey]ConnectorInfo), + DStoUS: make(map[string]string), + ProjectNames: make(map[string]string), + TPPs: make(map[string][]TPPInfo), + Connectors: make(map[ConnectorKey]ConnectorInfo), } if err := populateFromClient(ctx, cl, idx, baseDirectives); err != nil { return nil, err @@ -66,6 +68,14 @@ func populateFromClient(ctx context.Context, cl client.Client, idx *PolicyIndex, // in filter_metadata, so dsNS == ns.Name. Identity entry. idx.DStoUS[ns.Name] = ns.Name } + + // Derive project name from the upstream cluster name label. The label + // value is "cluster-" (written by mappednamespace.go). In + // single-cluster deployments where no cluster name is configured the + // label is absent and ProjectNames[ns.Name] is left empty. + if clusterLabel := ns.Labels[downstreamclient.UpstreamOwnerClusterNameLabel]; clusterLabel != "" { + idx.ProjectNames[ns.Name] = strings.TrimPrefix(clusterLabel, "cluster-") + } } // --- TrafficProtectionPolicies --- diff --git a/internal/extensionserver/cache/index_test.go b/internal/extensionserver/cache/index_test.go index 5f870fe0..42d7dab9 100644 --- a/internal/extensionserver/cache/index_test.go +++ b/internal/extensionserver/cache/index_test.go @@ -985,9 +985,10 @@ func TestPopulateFromClient_NamespaceNameCollision_LatentRisk(t *testing.T) { // Simulate what BuildPolicyIndex does: call populateFromClient once per cluster. idx := &PolicyIndex{ - DStoUS: make(map[string]string), - TPPs: make(map[string][]TPPInfo), - Connectors: make(map[ConnectorKey]ConnectorInfo), + DStoUS: make(map[string]string), + ProjectNames: make(map[string]string), + TPPs: make(map[string][]TPPInfo), + Connectors: make(map[ConnectorKey]ConnectorInfo), } require.NoError(t, populateFromClient(context.Background(), clA, idx, nil)) require.NoError(t, populateFromClient(context.Background(), clB, idx, nil)) diff --git a/internal/extensionserver/cache/types.go b/internal/extensionserver/cache/types.go index eb0262e2..b76ccf9b 100644 --- a/internal/extensionserver/cache/types.go +++ b/internal/extensionserver/cache/types.go @@ -36,6 +36,16 @@ type PolicyIndex struct { // Values are upstream namespace names; they key into TPPs and Connectors. DStoUS map[string]string + // ProjectNames maps downstream namespace names to the human-readable + // project name for that namespace. Derived from + // meta.datumapis.com/upstream-cluster-name on replica namespaces (value + // format: "cluster-"). Empty string when the label is absent + // (e.g. single-cluster dev with no cluster name configured). + // + // TODO: replace with resourcemanager.miloapis.com/project-name once that + // label is available on project namespaces. + ProjectNames map[string]string + // TPPs maps upstream namespace names to the list of // TrafficProtectionPolicies in that namespace, sorted by creation // timestamp then name to match NSO reconciler precedence order. diff --git a/internal/extensionserver/mutate/tpp.go b/internal/extensionserver/mutate/tpp.go index b876e4ed..e6486b7d 100644 --- a/internal/extensionserver/mutate/tpp.go +++ b/internal/extensionserver/mutate/tpp.go @@ -155,10 +155,11 @@ func InjectCorazaListenerFilters(l *listenerv3.Listener, cfg *CorazaConfig) (int // TrafficProtectionPolicy. For each VirtualHost it: // 1. Extracts the EG filter_metadata["envoy-gateway"] gateway resource ref. // 2. Resolves the upstream namespace via idx.DStoUS. -// 3. Finds the governing TPP from idx.TPPs (route-level wins over gateway-level). -// 4. Writes typed_per_filter_config and datum-gateway metadata on governed routes. +// 3. Stamps project_name into datum-gateway route metadata on every NSO-owned route. +// 4. Finds the governing TPP from idx.TPPs (route-level wins over gateway-level). +// 5. Writes typed_per_filter_config and datum-gateway metadata on governed routes. // -// Returns the number of routes mutated. +// Returns the number of routes mutated (WAF-configured routes only). func ApplyTPPRouteConfig( rc *routev3.RouteConfiguration, idx *extcache.PolicyIndex, @@ -184,12 +185,19 @@ func ApplyTPPRouteConfig( continue } + projectName := idx.ProjectNames[dsNS] tpps := idx.TPPs[upstreamNS] // Gateway-level governing TPP (no SectionName scoping in P1; see design §2.2 C5). gwTPP := findGatewayTPP(tpps, gwName) for _, rt := range vh.GetRoutes() { + // Stamp project_name on every NSO-owned route so the Envoy access log + // can emit it via %METADATA(ROUTE:datum-gateway:project_name)%. + // applyRouteWAFConfig overwrites this entry for TPP-governed routes, + // so it also includes project_name in the metadata it builds. + injectProjectNameMetadata(rt, projectName) + // Check for a route-level TPP (HTTPRoute targeting) — takes precedence. _, _, routeName, _ := extractEGResource(rt.GetMetadata()) routeTPP := findRouteTPP(tpps, routeName) @@ -202,7 +210,7 @@ func ApplyTPPRouteConfig( continue } - if err := applyRouteWAFConfig(rt, governing, cfg); err != nil { + if err := applyRouteWAFConfig(rt, governing, projectName, cfg); err != nil { return mutated, fmt.Errorf("apply WAF config to route %q: %w", rt.GetName(), err) } mutated++ @@ -211,10 +219,34 @@ func ApplyTPPRouteConfig( return mutated, nil } +// injectProjectNameMetadata writes project_name into the datum-gateway +// filter_metadata of a route. Called for every NSO-owned route regardless of +// whether a TrafficProtectionPolicy governs it, so that +// %METADATA(ROUTE:datum-gateway:project_name)% is always available in the +// Envoy access log format. +func injectProjectNameMetadata(rt *routev3.Route, projectName string) { + if rt.Metadata == nil { + rt.Metadata = &corev3.Metadata{} + } + if rt.Metadata.FilterMetadata == nil { + rt.Metadata.FilterMetadata = make(map[string]*structpb.Struct) + } + existing := rt.Metadata.FilterMetadata[datumGatewayMetadataKey] + if existing == nil { + s, _ := structpb.NewStruct(map[string]any{"project_name": projectName}) + rt.Metadata.FilterMetadata[datumGatewayMetadataKey] = s + return + } + if existing.Fields == nil { + existing.Fields = make(map[string]*structpb.Value) + } + existing.Fields["project_name"] = structpb.NewStringValue(projectName) +} + // applyRouteWAFConfig writes the datum-gateway filter_metadata and Coraza // typed_per_filter_config onto a single route. -func applyRouteWAFConfig(rt *routev3.Route, tpp *extcache.TPPInfo, cfg *CorazaConfig) error { - meta, err := buildDatumGatewayMetadata(tpp) +func applyRouteWAFConfig(rt *routev3.Route, tpp *extcache.TPPInfo, projectName string, cfg *CorazaConfig) error { + meta, err := buildDatumGatewayMetadata(tpp, projectName) if err != nil { return fmt.Errorf("build datum-gateway metadata: %w", err) } @@ -368,8 +400,9 @@ func corazaPluginConfigAny(directives []string, cfg *CorazaConfig) (*anypb.Any, // buildDatumGatewayMetadata builds the filter_metadata["datum-gateway"] struct // for a governed route. Matches the metadata contract in STATE.md. -func buildDatumGatewayMetadata(tpp *extcache.TPPInfo) (*structpb.Struct, error) { +func buildDatumGatewayMetadata(tpp *extcache.TPPInfo, projectName string) (*structpb.Struct, error) { s, err := structpb.NewStruct(map[string]any{ + "project_name": projectName, egMetaFieldResources: []any{ map[string]any{ egMetaFieldKind: "TrafficProtectionPolicy", diff --git a/internal/extensionserver/mutate/tpp_test.go b/internal/extensionserver/mutate/tpp_test.go index 22183f0a..60c1747a 100644 --- a/internal/extensionserver/mutate/tpp_test.go +++ b/internal/extensionserver/mutate/tpp_test.go @@ -145,9 +145,10 @@ func tppTargetingHTTPRoute(upstreamNS, tppName, routeName string) extcache.TPPIn // ("test-project") are fixed — all callers in this package use those values. func policyIndex(tpps ...extcache.TPPInfo) *extcache.PolicyIndex { return &extcache.PolicyIndex{ - DStoUS: map[string]string{"ns-abc-123": "test-project"}, - TPPs: map[string][]extcache.TPPInfo{"test-project": tpps}, - Connectors: make(map[extcache.ConnectorKey]extcache.ConnectorInfo), + DStoUS: map[string]string{"ns-abc-123": "test-project"}, + ProjectNames: map[string]string{"ns-abc-123": "test-project"}, + TPPs: map[string][]extcache.TPPInfo{"test-project": tpps}, + Connectors: make(map[extcache.ConnectorKey]extcache.ConnectorInfo), } } @@ -342,9 +343,10 @@ func TestApplyTPPRouteConfig_UnknownDSNamespace_Skipped(t *testing.T) { cfg := testCorazaConfig() // idx does NOT contain a mapping for dsNS. idx := &extcache.PolicyIndex{ - DStoUS: map[string]string{"ns-other": "other-project"}, - TPPs: make(map[string][]extcache.TPPInfo), - Connectors: make(map[extcache.ConnectorKey]extcache.ConnectorInfo), + DStoUS: map[string]string{"ns-other": "other-project"}, + ProjectNames: make(map[string]string), + TPPs: make(map[string][]extcache.TPPInfo), + Connectors: make(map[extcache.ConnectorKey]extcache.ConnectorInfo), } vh := buildVHWithGatewayMeta( diff --git a/test/e2e/billing/chainsaw-test.yaml b/test/e2e/billing/chainsaw-test.yaml new file mode 100644 index 00000000..3cf21816 --- /dev/null +++ b/test/e2e/billing/chainsaw-test.yaml @@ -0,0 +1,766 @@ +# yaml-language-server: $schema=https://raw.githubusercontent.com/kyverno/chainsaw/main/.schemas/json/test-chainsaw-v1alpha1.json +apiVersion: chainsaw.kyverno.io/v1alpha1 +kind: Test +metadata: + name: billing-http-metering +spec: + bindings: + - name: clusterIssuerName + value: (join('-', ['e2e-billing', $namespace])) + - name: gatewayClassName + value: (join('-', ['e2e-billing', $namespace])) + cluster: nso-standard + steps: + - name: Create CA + try: + - create: + cluster: nso-infra + resource: + apiVersion: cert-manager.io/v1 + kind: ClusterIssuer + metadata: + name: (join('-', [$clusterIssuerName, 'issuer'])) + spec: + selfSigned: {} + + - create: + cluster: nso-infra + resource: + apiVersion: cert-manager.io/v1 + kind: Certificate + metadata: + name: (join('-', [$clusterIssuerName, 'ca'])) + namespace: cert-manager + spec: + isCA: true + commonName: (join('-', [$clusterIssuerName, 'ca'])) + secretName: ($clusterIssuerName) + privateKey: + algorithm: ECDSA + size: 256 + issuerRef: + name: (join('-', [$clusterIssuerName, 'issuer'])) + kind: ClusterIssuer + group: cert-manager.io + + - create: + cluster: nso-infra + resource: + apiVersion: cert-manager.io/v1 + kind: ClusterIssuer + metadata: + name: ($clusterIssuerName) + spec: + ca: + secretName: ($clusterIssuerName) + + - script: + cluster: nso-infra + env: + - name: CLUSTER_ISSUER_NAME + value: ($clusterIssuerName) + content: | + kubectl get secret -n cert-manager $CLUSTER_ISSUER_NAME -o yaml | \ + sed 's/namespace: .*/namespace: envoy-gateway-system/' | \ + kubectl apply -n envoy-gateway-system -f - + + - name: Deploy Mock Billing Gateway + try: + - create: + cluster: nso-infra + resource: + apiVersion: v1 + kind: Service + metadata: + name: mock-billing-gateway + namespace: billing-system + spec: + ports: + - name: http + port: 8080 + targetPort: 8080 + selector: + app: mock-billing-gateway + - create: + cluster: nso-infra + resource: + apiVersion: apps/v1 + kind: Deployment + metadata: + name: mock-billing-gateway + namespace: billing-system + spec: + replicas: 1 + selector: + matchLabels: + app: mock-billing-gateway + template: + metadata: + labels: + app: mock-billing-gateway + spec: + containers: + - name: mock-billing-gateway + image: python:3.11-alpine + command: ["python", "-c", "import socket; s=socket.socket(); s.bind(('0.0.0.0', 8080)); s.listen(5); print('Started');\nwhile True:\n conn, addr = s.accept(); req = conn.recv(4096); print(req.decode('utf-8', errors='ignore'), flush=True); conn.sendall(b'HTTP/1.1 200 OK\\r\\nContent-Length: 0\\r\\n\\r\\n'); conn.close()"] + resources: + requests: + cpu: 10m + memory: 16Mi + limits: + cpu: 50m + memory: 32Mi + - assert: + cluster: nso-infra + resource: + apiVersion: apps/v1 + kind: Deployment + metadata: + name: mock-billing-gateway + namespace: billing-system + status: + readyReplicas: 1 + + - name: Reset Vector + try: + - script: + timeout: 90s + cluster: nso-infra + content: | + kubectl rollout restart daemonset/billing-usage-collector-vector -n billing-system + kubectl rollout status daemonset/billing-usage-collector-vector -n billing-system --timeout=60s + + - name: Create GatewayClasses + try: + - create: + cluster: nso-standard + resource: + apiVersion: gateway.networking.k8s.io/v1 + kind: GatewayClass + metadata: + name: ($gatewayClassName) + spec: + controllerName: gateway.networking.datumapis.com/external-global-proxy-controller + + # Ensure the downstream environment's standard gateway class and config exists + - apply: + cluster: nso-infra + resource: + apiVersion: gateway.envoyproxy.io/v1alpha1 + kind: EnvoyProxy + metadata: + name: custom-proxy-config + namespace: envoy-gateway-system + spec: + telemetry: + accessLog: + settings: + - sinks: + # File sink: the test still scrapes stdout to derive the + # expected metric values (bytes, duration, project_name). + - type: File + file: + path: /dev/stdout + # OpenTelemetry sink: Envoy pushes the access log to the + # node-local Vector agent automatically, exercising the + # real production ingestion path (no manual forwarding). + - type: OpenTelemetry + openTelemetry: + host: billing-usage-collector-vector.billing-system.svc.cluster.local + port: 4317 + resources: + service.name: nso-httproute-signals + format: + type: JSON + json: + start_time: "%START_TIME%" + method: "%REQ(:METHOD)%" + path: "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%" + protocol: "%PROTOCOL%" + response_code: "%RESPONSE_CODE%" + bytes_received: "%BYTES_RECEIVED%" + bytes_sent: "%BYTES_SENT%" + duration: "%DURATION%" + route_name: "%ROUTE_NAME%" + project_name: "%METADATA(ROUTE:datum-gateway:project_name)%" + provider: + type: Kubernetes + kubernetes: + envoyService: + type: ClusterIP + patch: + type: StrategicMerge + value: + spec: + ipFamilyPolicy: RequireDualStack + envoyDeployment: + patch: + type: StrategicMerge + value: + spec: + template: + spec: + containers: + - name: envoy + volumeMounts: + - mountPath: /etc/ssl/certs/ + name: selfsigned-ca + readOnly: true + volumes: + - name: selfsigned-ca + secret: + secretName: ($clusterIssuerName) + items: + - key: ca.crt + path: ca-certificates.crt + mergeGateways: true + + - apply: + cluster: nso-infra + resource: + apiVersion: gateway.networking.k8s.io/v1 + kind: GatewayClass + metadata: + name: datum-downstream-gateway-e2e + spec: + controllerName: gateway.envoyproxy.io/gatewayclass-controller + parametersRef: + group: gateway.envoyproxy.io + kind: EnvoyProxy + name: custom-proxy-config + namespace: envoy-gateway-system + + - apply: + cluster: nso-infra + resource: + apiVersion: v1 + kind: Namespace + metadata: + name: datum-downstream-gateway-hostnames + + # Get downstream namespace name by retrieving standard namespace UID + - script: + cluster: nso-standard + content: | + kubectl get ns $NAMESPACE -o json + outputs: + - name: downstreamNamespaceName + value: (join('-', ['ns', json_parse($stdout).metadata.uid])) + + - name: Create namespace on infra cluster + try: + - script: + cluster: nso-infra + content: | + kubectl create namespace $NAMESPACE --dry-run=client -o yaml | kubectl apply -f - + cleanup: + - script: + timeout: 60s + cluster: nso-infra + content: | + kubectl delete namespace $NAMESPACE --ignore-not-found=true + + - name: Create backend pod and service + try: + - create: + cluster: nso-infra + resource: + apiVersion: cert-manager.io/v1 + kind: Certificate + metadata: + name: backend-pod + spec: + dnsNames: + - (join('.', ['backend-service', $namespace, 'svc.cluster.local'])) + issuerRef: + name: ($clusterIssuerName) + kind: ClusterIssuer + secretName: backend-cert + - create: + cluster: nso-infra + resource: + apiVersion: v1 + kind: Pod + metadata: + name: backend-pod + labels: + app: backend + spec: + containers: + - name: backend + image: ghcr.io/mccutchen/go-httpbin:2.18.1 + command: ["/bin/go-httpbin"] + args: + - -host + - "0.0.0.0" + - -port + - "8080" + resources: + requests: + cpu: 10m + memory: 16Mi + limits: + cpu: 50m + memory: 64Mi + - create: + cluster: nso-infra + resource: + apiVersion: v1 + kind: Service + metadata: + name: backend-service + spec: + ports: + - name: http + port: 8080 + targetPort: 8080 + selector: + app: backend + + - name: Provision Domain + try: + - create: + cluster: nso-standard + resource: + apiVersion: networking.datumapis.com/v1alpha + kind: Domain + metadata: + name: test-domain + spec: + domainName: e2e.env.datum.net + - script: + content: | + kubectl -n $NAMESPACE patch domain test-domain \ + --subresource=status --type=merge \ + -p '{"status":{"conditions":[{"type": "Verified", "status": "True", "reason": "Test", "message": "test", "lastTransitionTime": "2025-02-24T23:59:09Z"}]}}' + + - name: Provision Gateway + try: + - script: + cluster: nso-standard + skipCommandOutput: true + skipLogOutput: true + content: | + kubectl get ns $NAMESPACE -o json + outputs: + - name: downstreamNamespaceName + value: (join('-', ['ns', json_parse($stdout).metadata.uid])) + - create: + resource: + apiVersion: gateway.networking.k8s.io/v1 + kind: Gateway + metadata: + name: test-gateway + spec: + gatewayClassName: ($gatewayClassName) + listeners: + - name: default-http + port: 80 + protocol: HTTP + allowedRoutes: + namespaces: + from: Same + - assert: + resource: + apiVersion: gateway.networking.k8s.io/v1 + kind: Gateway + metadata: + name: test-gateway + status: + conditions: + - type: Accepted + status: "True" + - type: Programmed + status: "True" + - assert: + cluster: nso-infra + resource: + apiVersion: gateway.networking.k8s.io/v1 + kind: Gateway + metadata: + name: test-gateway + namespace: ($downstreamNamespaceName) + + - name: Provision HTTPRoute + try: + - script: + cluster: nso-standard + skipCommandOutput: true + skipLogOutput: true + content: | + kubectl get ns $NAMESPACE -o json + outputs: + - name: downstreamNamespaceName + value: (join('-', ['ns', json_parse($stdout).metadata.uid])) + - create: + resource: + apiVersion: discovery.k8s.io/v1 + kind: EndpointSlice + metadata: + name: test-slice-1 + labels: + kubernetes.io/service-name: backend-service + addressType: FQDN + ports: + - name: http + port: 8080 + endpoints: + - addresses: + - (join('.', ['backend-service', $namespace, 'svc.cluster.local'])) + conditions: + ready: true + serving: true + terminating: false + - create: + resource: + apiVersion: gateway.networking.k8s.io/v1 + kind: HTTPRoute + metadata: + name: test-route + spec: + parentRefs: + - name: test-gateway + rules: + - matches: + - path: + type: PathPrefix + value: / + backendRefs: + - group: discovery.k8s.io + kind: EndpointSlice + name: test-slice-1 + port: 8080 + - assert: + cluster: nso-infra + resource: + apiVersion: gateway.networking.k8s.io/v1 + kind: HTTPRoute + metadata: + name: test-route + namespace: ($downstreamNamespaceName) + + - name: Provision Pod to test connectivity + try: + - script: + cluster: nso-standard + skipCommandOutput: true + skipLogOutput: true + content: | + kubectl get ns $NAMESPACE -o json + outputs: + - name: downstreamNamespaceName + value: (join('-', ['ns', json_parse($stdout).metadata.uid])) + - script: + timeout: 45s + skipCommandOutput: true + skipLogOutput: true + cluster: nso-infra + content: | + # Wait up to 10s for the envoy service to be created + for i in $(seq 1 10); do + SVC=$(kubectl get svc -n envoy-gateway-system -l gateway.envoyproxy.io/owning-gatewayclass=datum-downstream-gateway-e2e -o json || true) + if [ ! -z "$SVC" ] && echo "$SVC" | grep -q "metadata"; then + echo "$SVC" + exit 0 + fi + sleep 1 + done + echo "ERROR: Envoy service not found for namespace ($downstreamNamespaceName)" >&2 + exit 1 + outputs: + - name: gatewayService + value: (json_parse($stdout).items | [0]) + + - create: + cluster: nso-infra + resource: + apiVersion: v1 + kind: Pod + metadata: + name: test-pod + spec: + containers: + - name: test-pod + image: alpine:3.19 + command: ["sleep", "3600"] + env: + - name: GATEWAY_SERVICE_NAME + value: ($gatewayService.metadata.name) + - name: GATEWAY_SERVICE_NAMESPACE + value: envoy-gateway-system + lifecycle: + postStart: + exec: + command: ["apk", "add", "curl"] + terminationGracePeriodSeconds: 0 + + - assert: + cluster: nso-infra + resource: + apiVersion: v1 + kind: Pod + metadata: + name: backend-pod + status: + phase: Running + + - assert: + cluster: nso-infra + resource: + apiVersion: v1 + kind: Pod + metadata: + name: test-pod + status: + phase: Running + + - sleep: + duration: 10s + + # Get primary hostname on nso-standard + - script: + cluster: nso-standard + content: | + PRIMARY_HOSTNAME=$(kubectl -n $NAMESPACE get gateway test-gateway -o jsonpath='{.status.addresses[0].value}' || true) + if [ -z "$PRIMARY_HOSTNAME" ]; then + PRIMARY_HOSTNAME="test-domain.prism.e2e.env.datum.net" + fi + printf "%s" "$PRIMARY_HOSTNAME" + outputs: + - name: primaryHostname + value: ($stdout) + + # Send request with a known request/response size and capture it on nso-infra + - script: + cluster: nso-infra + env: + - name: STANDARD_NAMESPACE + value: ($namespace) + - name: PRIMARY_HOSTNAME + value: ($primaryHostname) + content: | + # Send request with 11 bytes of request body ("hello world") to /delay/2 to ensure duration >= 2s + kubectl -n $STANDARD_NAMESPACE exec -i test-pod -- sh -c " \ + set -x; \ + curl -kvf -H \"Host: ${PRIMARY_HOSTNAME}\" -d 'hello world' http://\${GATEWAY_SERVICE_NAME}.\${GATEWAY_SERVICE_NAMESPACE}.svc.cluster.local/delay/2; \ + " + + # Verify Vector emitted the CloudEvents. Envoy pushed the access log to + # Vector automatically via its OpenTelemetry sink when the request above + # was served, so this step only derives the expected values and polls the + # mock billing gateway for the result. + - script: + timeout: 45s + cluster: nso-infra + env: + - name: DOWNSTREAM_NAMESPACE + value: ($downstreamNamespaceName) + content: | + # ===================================================================== + # Validates the full HTTP metering pipeline, end to end: + # + # Envoy access log --(OTLP push)--> Vector opentelemetry source + # | | + # | (also written to stdout for | (VRL transform fans the + # | capturing expected values) | log record out into + # v v CloudEvents) + # captured here (STEP 1-2) mock-billing-gateway (STEP 4) + # + # Envoy pushes the access log to Vector automatically (OpenTelemetry + # sink on custom-proxy-config -> billing-usage-collector-vector:4317) + # when the request is served -- the test does NOT forward it manually. + # + # A single access log line is expected to produce FOUR CloudEvents, + # one per usage dimension, all sharing the same subject + # ("projects/"): + # + # type value + # ------------------------------------------------ ------------------- + # networking.datumapis.com/gateway/requests "1" + # networking.datumapis.com/gateway/ingress-bytes bytes_received ("11") + # networking.datumapis.com/gateway/egress-bytes bytes_sent + # networking.datumapis.com/gateway/connection-seconds duration_ms / 1000 + # + # Every CloudEvent also carries a "dimensions" object that must include + # the project_name (stamped by the extension server onto route + # filter_metadata["datum-gateway"]["project_name"] at xDS build time + # and surfaced in the access log via + # %METADATA(ROUTE:datum-gateway:project_name)%). STEP 4 asserts both + # the per-type values AND that the project_name dimension is correct. + # ===================================================================== + + sleep 3 + # --- STEP 1: capture the Envoy access log line for our HTTPRoute ----- + # The route_name embeds the downstream namespace, so grepping for + # 'httproute//' isolates this test's request. + ENVOY_LOG=$(kubectl -n envoy-gateway-system logs -l gateway.envoyproxy.io/owning-gatewayclass=datum-downstream-gateway-e2e -c envoy --tail=500 | grep "httproute/$DOWNSTREAM_NAMESPACE/" | tail -n 1 || true) + if [ -z "$ENVOY_LOG" ]; then + echo "ERROR: No Envoy access log found containing 'httproute/'" >&2 + exit 1 + fi + echo "Found Envoy access log: $ENVOY_LOG" + + # --- STEP 2: derive the expected CloudEvent values from that log ---- + # These are the source-of-truth values Vector should reproduce in the + # CloudEvents it emits. They are parsed from the captured log rather + # than hardcoded so the assertions track the real request. + BYTES_SENT=$(python3 -c "import sys, json; print(json.loads(sys.argv[1])['bytes_sent'])" "$ENVOY_LOG") + DURATION_MS=$(python3 -c "import sys, json; print(json.loads(sys.argv[1])['duration'])" "$ENVOY_LOG") + EXPECTED_DURATION=$(( DURATION_MS / 1000 )) + # project_name is stamped by the extension server onto route + # filter_metadata["datum-gateway"]["project_name"] at xDS build + # time and read by Envoy via %METADATA(ROUTE:datum-gateway:project_name)%. + # A "-" or empty value means the extension server did not inject the + # metadata (extension server not running, or %METADATA()% not + # supported in this EG version for JSON access logs). + PROJECT_NAME=$(python3 -c "import sys, json; print(json.loads(sys.argv[1]).get('project_name', ''))" "$ENVOY_LOG") + if [ -z "$PROJECT_NAME" ] || [ "$PROJECT_NAME" = "-" ]; then + echo "ERROR: Envoy access log has no project_name; extension server did not inject datum-gateway route metadata, or %METADATA(ROUTE:...)% is not supported in this EG version for JSON access logs. Log: $ENVOY_LOG" >&2 + exit 1 + fi + + echo "Parsed Envoy Log metrics:" + echo " - Ingress bytes (expected): 11" + echo " - Egress bytes (expected): $BYTES_SENT" + echo " - Duration ms (expected): $DURATION_MS ($EXPECTED_DURATION seconds)" + echo " - Project name (expected): $PROJECT_NAME" + + # --- STEP 3: (no manual forwarding) -------------------------------- + # The request above already caused Envoy to push this access log to + # Vector automatically via its OpenTelemetry sink (custom-proxy-config + # -> billing-usage-collector-vector:4317). Vector's VRL transform + # parses it and emits the four CloudEvents to the mock-billing-gateway + # sink, so there is nothing to POST here -- we just poll for the result. + + # --- STEP 4: verify the CloudEvents the mock billing gateway received + # Poll the mock gateway's request log (Vector may take a moment to + # flush) and assert that, for our subject, all four event types are + # present with the expected values AND the correct project_name + # dimension. Give up after the deadline and dump diagnostics. + DEADLINE=$(($(date +%s) + 30)) + while [ "$(date +%s)" -lt "$DEADLINE" ]; do + MOCK_LOGS=$(kubectl logs -n billing-system -l app=mock-billing-gateway --tail=100 || true) + + # Define Python verification script as a bash variable (indented safely in YAML) + PYTHON_VERIFY=" + import sys, json + expected_bytes_sent = float($BYTES_SENT) + expected_duration_sec = float($DURATION_MS) / 1000.0 + expected_project_name = \"$PROJECT_NAME\" + logs = sys.stdin.read() + diagnostics = [] + def check_events(events): + # Returns True once a single subject has the full, correct set + # of CloudEvents for our request. Records why each subject was + # rejected in 'diagnostics' for failure output. + diagnostics.clear() + # Group events by subject (projects/); a valid run has all + # four event types under one subject. + by_subject = {} + for e in events: + subj = e.get('subject', '') + by_subject.setdefault(subj, []).append(e) + if not by_subject: + diagnostics.append(\"No events found in this log chunk.\") + return False + for subj, subj_events in by_subject.items(): + # Index this subject's events by type so we can assert each + # of the four expected usage events exists exactly once. + types = {e.get('type'): e for e in subj_events} + req = types.get('networking.datumapis.com/gateway/requests') + ingress = types.get('networking.datumapis.com/gateway/ingress-bytes') + egress = types.get('networking.datumapis.com/gateway/egress-bytes') + conn = types.get('networking.datumapis.com/gateway/connection-seconds') + missing = [] + if not req: missing.append('requests') + if not ingress: missing.append('ingress-bytes') + if not egress: missing.append('egress-bytes') + if not conn: missing.append('connection-seconds') + if missing: + diagnostics.append(f\"Subject '{subj}': missing event types {missing} (found: {list(types.keys())})\") + continue + r_val = req['data'].get('value') + i_val = ingress['data'].get('value') + e_val = egress['data'].get('value') + c_val = conn['data'].get('value') + if r_val != '1': + diagnostics.append(f\"Subject '{subj}': req value expected '1', got '{r_val}'\") + continue + if i_val != '11': + diagnostics.append(f\"Subject '{subj}': ingress value expected '11', got '{i_val}'\") + continue + try: + e_val_f = float(e_val) + if e_val_f != expected_bytes_sent: + diagnostics.append(f\"Subject '{subj}': egress value expected {expected_bytes_sent}, got {e_val_f}\") + continue + except Exception as ex: + diagnostics.append(f\"Subject '{subj}': egress value '{e_val}' could not be parsed as float: {ex}\") + continue + try: + c_val_f = float(c_val) + diff = abs(c_val_f - expected_duration_sec) + if diff > 0.001: + diagnostics.append(f\"Subject '{subj}': conn value expected {expected_duration_sec} (diff <= 0.001), got {c_val_f} (diff {diff})\") + continue + except Exception as ex: + diagnostics.append(f\"Subject '{subj}': conn value '{c_val}' could not be parsed as float: {ex}\") + continue + # All per-type values matched; finally assert the shared + # project_name dimension (carried on every event; checked + # here on the 'requests' event) resolves to the project. + dims = req['data'].get('dimensions', {}) + p_val = dims.get('project_name') + if p_val != expected_project_name: + diagnostics.append(f\"Subject '{subj}': project_name dimension expected '{expected_project_name}', got '{p_val}'\") + continue + # This subject passed every check. + return True + return False + + # Try JSON array format (e.g. future codec: json_array) + found = False + for line in logs.splitlines(): + stripped = line.strip() + if stripped.startswith('[') and stripped.endswith(']'): + try: + if check_events(json.loads(stripped)): + found = True + break + except Exception: + pass + + # Try NDJSON format (Vector default: one JSON object per line) + if not found: + ndjson_events = [] + for line in logs.splitlines(): + stripped = line.strip() + if stripped.startswith('{') and stripped.endswith('}'): + try: + e = json.loads(stripped) + if 'type' in e and 'data' in e: + ndjson_events.append(e) + except Exception: + pass + if ndjson_events: + found = check_events(ndjson_events) + + if not found: + print(\"Validation Failure Details:\", file=sys.stderr) + for d in diagnostics: + print(f\" - {d}\", file=sys.stderr) + sys.exit(0 if found else 1) + " + + # Verify request count, ingress-bytes, egress-bytes, and connection-seconds CloudEvents using python + if echo "$MOCK_LOGS" | python3 -c "import sys, textwrap; exec(textwrap.dedent(sys.argv[1]))" "$PYTHON_VERIFY"; then + echo "SUCCESS: mock-billing-gateway received correct requests (1), ingress (11), egress ($BYTES_SENT), connection-seconds (equivalent to $DURATION_MS ms) and project_name ($PROJECT_NAME) CloudEvents!" + exit 0 + fi + sleep 2 + done + echo "FAIL: mock-billing-gateway did not receive correct CloudEvent metrics. Logs:" >&2 + kubectl logs -n billing-system -l app=mock-billing-gateway --tail=100 >&2 + exit 1 + + From 34706ecbb2b2efd42e173c9e8ff25ba147a38c20 Mon Sep 17 00:00:00 2001 From: Jose Szychowski Date: Thu, 18 Jun 2026 16:12:54 -0300 Subject: [PATCH 2/3] feat: enable E2E testing for the envoy-gateway extension server and implement selective WAF policy injection based on configuration. --- Makefile | 29 ++++++- .../client-cert-patch.yaml | 12 +++ .../deployment-patch.yaml | 41 +++++++++ .../extension-server-e2e/kustomization.yaml | 28 +++++++ config/extension-server-e2e/namespace.yaml | 4 + config/extension-server-e2e/pdb-patch.yaml | 8 ++ .../extension-server-e2e/server-config.yaml | 17 ++++ config/extension-server-e2e/tls.yaml | 59 +++++++++++++ ...nvoy-gateway-extension-manager-config.yaml | 83 +++++++++++++++++++ .../overlays/e2e/kustomization.yaml | 5 ++ internal/extensionserver/mutate/tpp.go | 12 +-- internal/extensionserver/mutate/tpp_test.go | 16 ++-- test/e2e/billing/chainsaw-test.yaml | 3 +- 13 files changed, 302 insertions(+), 15 deletions(-) create mode 100644 config/extension-server-e2e/client-cert-patch.yaml create mode 100644 config/extension-server-e2e/deployment-patch.yaml create mode 100644 config/extension-server-e2e/kustomization.yaml create mode 100644 config/extension-server-e2e/namespace.yaml create mode 100644 config/extension-server-e2e/pdb-patch.yaml create mode 100644 config/extension-server-e2e/server-config.yaml create mode 100644 config/extension-server-e2e/tls.yaml create mode 100644 config/tools/envoy-gateway/overlays/e2e/envoy-gateway-extension-manager-config.yaml create mode 100644 config/tools/envoy-gateway/overlays/e2e/kustomization.yaml diff --git a/Makefile b/Makefile index fb65611d..c042168f 100644 --- a/Makefile +++ b/Makefile @@ -81,7 +81,8 @@ test-e2e: chainsaw } $(KIND) get kubeconfig --name nso-standard > $(TMPDIR)/.kind-nso-standard.yaml $(KIND) get kubeconfig --name nso-infra > $(TMPDIR)/.kind-nso-infra.yaml - $(CHAINSAW) test ./test/e2e \ + $(CHAINSAW) test $(or $(TEST_DIR),./test/e2e) \ + --parallel 1 \ --cluster nso-standard=$(TMPDIR)/.kind-nso-standard.yaml \ --cluster nso-infra=$(TMPDIR)/.kind-nso-infra.yaml @@ -153,7 +154,7 @@ set-image-controller: manifests kustomize cd config/manager && $(KUSTOMIZE) edit set image ghcr.io/datum-cloud/network-services-operator=${IMG} .PHONY: prepare-infra-cluster -prepare-infra-cluster: cert-manager envoy-gateway external-dns downstream-crds +prepare-infra-cluster: cert-manager envoy-gateway external-dns downstream-crds billing-usage-collector load-image-nso-infra extension-server configure-eg-extension-manager .PHONY: downstream-crds downstream-crds: ## Install NSO CRDs on the downstream (infra) cluster that the replicator mirrors into it. @@ -168,12 +169,16 @@ prepare-e2e: chainsaw set-image-controller cert-manager load-image-all deploy-e2 prepare-dev: chainsaw set-image-controller cert-manager install .PHONY: load-image-all -load-image-all: load-image-operator +load-image-all: load-image-operator load-image-nso-infra .PHONY: load-image-operator load-image-operator: docker-build kind $(KIND) load docker-image $(IMG) -n nso-standard +.PHONY: load-image-nso-infra +load-image-nso-infra: docker-build kind ## Load operator image into nso-infra kind cluster (needed by the extension server). + $(KIND) load docker-image $(IMG) -n nso-infra + .PHONY: cert-manager cert-manager: cmctl $(KUSTOMIZE) build --enable-helm config/tools/cert-manager | kubectl apply --server-side=true --force-conflicts -f - @@ -187,6 +192,24 @@ envoy-gateway: external-dns: $(KUSTOMIZE) build --enable-helm config/tools/external-dns | kubectl apply --server-side=true --force-conflicts -f - +.PHONY: billing-usage-collector +billing-usage-collector: + $(KUSTOMIZE) build --enable-helm config/tools/billing-usage-collector | kubectl apply --server-side=true --force-conflicts -f - + +.PHONY: extension-server +extension-server: ## Deploy the NSO extension server to the infra cluster (e2e overlay with cert-manager-issued TLS). + $(KUSTOMIZE) build --enable-helm config/extension-server-e2e | kubectl apply --server-side=true --force-conflicts -f - + kubectl rollout restart deployment/network-services-operator-envoy-gateway-extension-server \ + -n network-services-operator-system + kubectl rollout status deployment/network-services-operator-envoy-gateway-extension-server \ + -n network-services-operator-system --timeout=5m + +.PHONY: configure-eg-extension-manager +configure-eg-extension-manager: ## Patch the EG ConfigMap to enable extensionManager and restart the EG controller. + $(KUSTOMIZE) build --enable-helm config/tools/envoy-gateway/overlays/e2e | kubectl apply --server-side=true --force-conflicts -f - + kubectl rollout restart deployment/envoy-gateway -n envoy-gateway-system + kubectl rollout status deployment/envoy-gateway -n envoy-gateway-system --timeout=3m + .PHONY: kind-standard-cluster kind-standard-cluster: kind $(KIND) create cluster --config=config/tools/kind/standard-cluster.yaml diff --git a/config/extension-server-e2e/client-cert-patch.yaml b/config/extension-server-e2e/client-cert-patch.yaml new file mode 100644 index 00000000..d0cd85f8 --- /dev/null +++ b/config/extension-server-e2e/client-cert-patch.yaml @@ -0,0 +1,12 @@ +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: envoy-gateway-extension-server-eg-client-tls +spec: + issuerRef: + # Switch from the production placeholder ClusterIssuer to the e2e CA Issuer. + # kustomize propagates the namePrefix so this resolves to + # network-services-operator-nso-es-ca-issuer at apply time. + name: nso-es-ca-issuer + kind: Issuer + group: cert-manager.io diff --git a/config/extension-server-e2e/deployment-patch.yaml b/config/extension-server-e2e/deployment-patch.yaml new file mode 100644 index 00000000..8be324ab --- /dev/null +++ b/config/extension-server-e2e/deployment-patch.yaml @@ -0,0 +1,41 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: envoy-gateway-extension-server +spec: + # Single-node kind clusters can't satisfy the base DoNotSchedule topology constraint. + replicas: 1 + template: + spec: + # Remove the hostname spread constraint — only 1 node in kind. + topologySpreadConstraints: null + volumes: + # Replace cert-manager CSI driver mount with a regular Secret volume. + # cert-manager Certificate nso-es-tls populates this Secret. + - name: tls + csi: null + secret: + secretName: nso-extension-server-tls + # Replace ConfigMap CA bundle with the same Secret; the ca.crt key holds + # the issuing CA cert that the extension server uses to verify EG's client cert. + - name: tls-ca + configMap: null + secret: + secretName: nso-extension-server-tls + items: + - key: ca.crt + path: ca.crt + # Extension server operator config — disables Coraza WAF injection + # so standard (non-contrib) Envoy images work in e2e. + - name: server-config + configMap: + name: extension-server-config + containers: + - name: envoy-gateway-extension-server + env: + - name: SERVER_CONFIG + value: /server-config/config.yaml + volumeMounts: + - name: server-config + mountPath: /server-config + readOnly: true diff --git a/config/extension-server-e2e/kustomization.yaml b/config/extension-server-e2e/kustomization.yaml new file mode 100644 index 00000000..001934da --- /dev/null +++ b/config/extension-server-e2e/kustomization.yaml @@ -0,0 +1,28 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: network-services-operator-system +namePrefix: network-services-operator- + +resources: + - namespace.yaml + - ../extension-server + - tls.yaml + - server-config.yaml + +patches: + # Replace CSI + ConfigMap volumes with Secret-based mounts; reduce replicas to 1 for single-node kind. + - path: deployment-patch.yaml + target: + kind: Deployment + name: envoy-gateway-extension-server + # Remove the PDB minAvailable constraint — kind clusters have only 1 node so minAvailable:1 prevents eviction. + - path: pdb-patch.yaml + target: + kind: PodDisruptionBudget + name: envoy-gateway-extension-server-pdb + # Switch the EG client cert issuer from the placeholder to the e2e CA. + - path: client-cert-patch.yaml + target: + kind: Certificate + name: envoy-gateway-extension-server-eg-client-tls diff --git a/config/extension-server-e2e/namespace.yaml b/config/extension-server-e2e/namespace.yaml new file mode 100644 index 00000000..064c2bd4 --- /dev/null +++ b/config/extension-server-e2e/namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: network-services-operator-system diff --git a/config/extension-server-e2e/pdb-patch.yaml b/config/extension-server-e2e/pdb-patch.yaml new file mode 100644 index 00000000..97d2b047 --- /dev/null +++ b/config/extension-server-e2e/pdb-patch.yaml @@ -0,0 +1,8 @@ +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: envoy-gateway-extension-server-pdb +spec: + # replicas=1 in e2e; minAvailable:1 would block any eviction. Set to 0 so kind + # cluster teardown / upgrades don't stall. + minAvailable: 0 diff --git a/config/extension-server-e2e/server-config.yaml b/config/extension-server-e2e/server-config.yaml new file mode 100644 index 00000000..94cb9312 --- /dev/null +++ b/config/extension-server-e2e/server-config.yaml @@ -0,0 +1,17 @@ +--- +# ConfigMap holding the extension server operator config for e2e. +# Disables Coraza WAF injection — e2e uses the standard Envoy image which +# does not have the golang filter compiled in, so injecting coraza-waf causes +# Envoy to reject the listener configuration. +apiVersion: v1 +kind: ConfigMap +metadata: + name: extension-server-config + namespace: network-services-operator-system +data: + config.yaml: | + apiVersion: apiserver.config.datumapis.com/v1alpha1 + kind: NetworkServicesOperator + gateway: + coraza: + disabled: true diff --git a/config/extension-server-e2e/tls.yaml b/config/extension-server-e2e/tls.yaml new file mode 100644 index 00000000..6a9b0b5d --- /dev/null +++ b/config/extension-server-e2e/tls.yaml @@ -0,0 +1,59 @@ +--- +# Self-signed bootstrap issuer; creates the e2e CA cert below. +apiVersion: cert-manager.io/v1 +kind: Issuer +metadata: + name: nso-es-selfsigned +spec: + selfSigned: {} +--- +# CA cert. cert-manager places the CA cert+key in Secret nso-extension-server-ca +# (in the same namespace). The CA Issuer below references that Secret. +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: nso-es-ca +spec: + isCA: true + commonName: nso-extension-server-ca + secretName: nso-extension-server-ca + privateKey: + algorithm: ECDSA + size: 256 + issuerRef: + name: nso-es-selfsigned + kind: Issuer + group: cert-manager.io +--- +# CA-backed issuer used by both the server cert and the EG client cert below. +apiVersion: cert-manager.io/v1 +kind: Issuer +metadata: + name: nso-es-ca-issuer +spec: + ca: + secretName: nso-extension-server-ca +--- +# Extension-server TLS cert. cert-manager writes it to Secret nso-extension-server-tls, +# which includes ca.crt = the CA cert — EG reads this field from certificateRef to verify +# the extension server's presented cert. +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: nso-es-tls +spec: + secretName: nso-extension-server-tls + dnsNames: + - network-services-operator-envoy-gateway-extension-server.network-services-operator-system.svc + - network-services-operator-envoy-gateway-extension-server.network-services-operator-system.svc.cluster.local + privateKey: + algorithm: ECDSA + size: 256 + issuerRef: + name: nso-es-ca-issuer + kind: Issuer + group: cert-manager.io + usages: + - server auth + - digital signature + - key encipherment diff --git a/config/tools/envoy-gateway/overlays/e2e/envoy-gateway-extension-manager-config.yaml b/config/tools/envoy-gateway/overlays/e2e/envoy-gateway-extension-manager-config.yaml new file mode 100644 index 00000000..cc35091c --- /dev/null +++ b/config/tools/envoy-gateway/overlays/e2e/envoy-gateway-extension-manager-config.yaml @@ -0,0 +1,83 @@ +--- +# Patches the Helm-managed envoy-gateway-config ConfigMap to enable the +# extensionManager. Applied after the EG helm chart with --server-side +# --force-conflicts so the extra field is owned by this apply, not helm. +# +# All other fields (extensionApis, runtimeFlags, provider image pins, etc.) are +# reproduced verbatim from the v1.8.1 helm chart so the full ConfigMap value is +# consistent — the data field is a single YAML string and cannot be merged at +# field granularity by server-side apply. +apiVersion: v1 +kind: ConfigMap +metadata: + name: envoy-gateway-config + namespace: envoy-gateway-system +data: + envoy-gateway.yaml: | + apiVersion: gateway.envoyproxy.io/v1alpha1 + kind: EnvoyGateway + extensionApis: + enableBackend: true + enableEnvoyPatchPolicy: true + gateway: + controllerName: gateway.envoyproxy.io/gatewayclass-controller + logging: + level: + default: info + provider: + kubernetes: + rateLimitDeployment: + container: + image: docker.io/envoyproxy/ratelimit:ff287602 + patch: + type: StrategicMerge + value: + spec: + template: + spec: + containers: + - imagePullPolicy: IfNotPresent + name: envoy-ratelimit + shutdownManager: + image: docker.io/envoyproxy/gateway:v1.8.1 + type: Kubernetes + runtimeFlags: + enabled: + - XDSNameSchemeV2 + extensionManager: + service: + fqdn: + hostname: network-services-operator-envoy-gateway-extension-server.network-services-operator-system.svc.cluster.local + port: 5005 + tls: + # certificateRef points to the Secret created by the nso-es-tls Certificate. + # EG reads ca.crt from this Secret to verify the extension server's TLS cert. + certificateRef: + name: nso-extension-server-tls + namespace: network-services-operator-system + # EG presents this client cert when dialing the extension server. + clientCertificateRef: + name: envoy-gateway-extension-server-eg-client-tls + namespace: network-services-operator-system + retry: + maxAttempts: 4 + initialBackoff: 100ms + maxBackoff: 1s + backoffMultiplier: + numerator: 200 + retryableStatusCodes: + - UNAVAILABLE + hooks: + xdsTranslator: + post: + - Translation + translation: + listener: + includeAll: true + route: + includeAll: true + cluster: + includeAll: true + secret: + includeAll: true + failOpen: false diff --git a/config/tools/envoy-gateway/overlays/e2e/kustomization.yaml b/config/tools/envoy-gateway/overlays/e2e/kustomization.yaml new file mode 100644 index 00000000..655dcf2e --- /dev/null +++ b/config/tools/envoy-gateway/overlays/e2e/kustomization.yaml @@ -0,0 +1,5 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +resources: + - envoy-gateway-extension-manager-config.yaml diff --git a/internal/extensionserver/mutate/tpp.go b/internal/extensionserver/mutate/tpp.go index e6486b7d..8ae17930 100644 --- a/internal/extensionserver/mutate/tpp.go +++ b/internal/extensionserver/mutate/tpp.go @@ -165,11 +165,6 @@ func ApplyTPPRouteConfig( idx *extcache.PolicyIndex, cfg *CorazaConfig, ) (int, error) { - // When disabled, emit no per-route WAF config — matches the no-op in - // InjectCorazaListenerFilters so the two halves are always consistent. - if cfg.Disabled { - return 0, nil - } mutated := 0 for _, vh := range rc.GetVirtualHosts() { // Extract VH-level EG resource reference; expect Kind=Gateway. @@ -198,6 +193,13 @@ func ApplyTPPRouteConfig( // so it also includes project_name in the metadata it builds. injectProjectNameMetadata(rt, projectName) + // WAF per-route config requires the listener-level filter to be present. + // Skip when Coraza is disabled (e.g. standard Envoy image without the + // golang filter) so that project_name is still stamped above. + if cfg.Disabled { + continue + } + // Check for a route-level TPP (HTTPRoute targeting) — takes precedence. _, _, routeName, _ := extractEGResource(rt.GetMetadata()) routeTPP := findRouteTPP(tpps, routeName) diff --git a/internal/extensionserver/mutate/tpp_test.go b/internal/extensionserver/mutate/tpp_test.go index 60c1747a..86fc6489 100644 --- a/internal/extensionserver/mutate/tpp_test.go +++ b/internal/extensionserver/mutate/tpp_test.go @@ -488,9 +488,10 @@ func TestInjectCorazaListenerFilters_StaticRouteConfig_Skipped(t *testing.T) { } // TestApplyTPPRouteConfig_Disabled_NoOp verifies that setting -// CorazaConfig.Disabled=true causes ApplyTPPRouteConfig to return 0 mutations -// and leave all routes untouched, even when a governing TPP exists. -func TestApplyTPPRouteConfig_Disabled_NoOp(t *testing.T) { +// CorazaConfig.Disabled=true skips WAF per-route config but still stamps +// project_name into the datum-gateway route metadata so the access log can +// emit it even on standard Envoy images (no golang filter). +func TestApplyTPPRouteConfig_Disabled_StampsProjectName(t *testing.T) { cfg := testCorazaConfig() cfg.Disabled = true @@ -511,13 +512,16 @@ func TestApplyTPPRouteConfig_Disabled_NoOp(t *testing.T) { n, err := ApplyTPPRouteConfig(rc, idx, cfg) require.NoError(t, err) - assert.Equal(t, 0, n, "disabled Coraza must not apply any per-route WAF config") + assert.Equal(t, 0, n, "disabled Coraza must not count any WAF per-route mutations") rt := vh.Routes[0] assert.Nil(t, rt.GetTypedPerFilterConfig(), "route must have no typed_per_filter_config when Coraza is disabled") - assert.Nil(t, rt.GetMetadata().GetFilterMetadata()[datumGatewayMetadataKey], - "route must have no datum-gateway metadata when Coraza is disabled") + // project_name is stamped unconditionally so access logs always carry it. + meta := rt.GetMetadata().GetFilterMetadata()[datumGatewayMetadataKey] + require.NotNil(t, meta, "datum-gateway filter_metadata must be set even when Coraza is disabled") + assert.Equal(t, upstreamNS, meta.GetFields()["project_name"].GetStringValue(), + "project_name must be stamped on NSO-owned routes regardless of Coraza.Disabled") } func TestApplyTPPRouteConfig_RouteLevelTPPWins(t *testing.T) { diff --git a/test/e2e/billing/chainsaw-test.yaml b/test/e2e/billing/chainsaw-test.yaml index 3cf21816..5605d838 100644 --- a/test/e2e/billing/chainsaw-test.yaml +++ b/test/e2e/billing/chainsaw-test.yaml @@ -613,7 +613,8 @@ spec: # A "-" or empty value means the extension server did not inject the # metadata (extension server not running, or %METADATA()% not # supported in this EG version for JSON access logs). - PROJECT_NAME=$(python3 -c "import sys, json; print(json.loads(sys.argv[1]).get('project_name', ''))" "$ENVOY_LOG") + # json null → Python None → print gives "None"; coerce to "" explicitly. + PROJECT_NAME=$(python3 -c "import sys, json; v=json.loads(sys.argv[1]).get('project_name'); print('' if v is None else v)" "$ENVOY_LOG") if [ -z "$PROJECT_NAME" ] || [ "$PROJECT_NAME" = "-" ]; then echo "ERROR: Envoy access log has no project_name; extension server did not inject datum-gateway route metadata, or %METADATA(ROUTE:...)% is not supported in this EG version for JSON access logs. Log: $ENVOY_LOG" >&2 exit 1 From 3848da0e198d235e4a38653976ebdf2e96c5ee76 Mon Sep 17 00:00:00 2001 From: Jose Szychowski Date: Mon, 22 Jun 2026 11:45:36 -0300 Subject: [PATCH 3/3] refactor: transition billing-usage-collector from OTLP push to Kubernetes log tailing via Vector's kubernetes_logs source --- .../kustomization.yaml | 68 +++++++++++-------- test/e2e/billing/chainsaw-test.yaml | 59 ++++++++-------- 2 files changed, 70 insertions(+), 57 deletions(-) diff --git a/config/tools/billing-usage-collector/kustomization.yaml b/config/tools/billing-usage-collector/kustomization.yaml index 21146531..fa8ba92f 100644 --- a/config/tools/billing-usage-collector/kustomization.yaml +++ b/config/tools/billing-usage-collector/kustomization.yaml @@ -38,30 +38,46 @@ helmCharts: path: "/logs" decoding: codec: json - # Envoy Gateway pushes access logs here via its OpenTelemetry access - # log sink (OTLP). The JSON access log fields arrive as OTLP log-record - # attributes (i.e. under `.attributes.*`). This is the real ingestion - # path for live traffic; the `nso_logs` http_server above is only used - # by the e2e test to inject hand-crafted log lines. + # Envoy Gateway writes access logs to stdout via its File access log + # sink (path: /dev/stdout), exactly as the production downstream + # gateway EnvoyProxy does. The node-local Vector agent tails the + # container log files through the kubernetes_logs source -- the real, + # file-based ingestion path for live traffic. The JSON access log + # line arrives as a string in `.message`. The `nso_logs` http_server + # above is only used by the e2e test to inject hand-crafted log lines. envoy_access_logs: - type: opentelemetry - grpc: - address: "0.0.0.0:4317" - http: - address: "0.0.0.0:4318" + type: kubernetes_logs + # Only tail the Envoy proxy pods; their access logs live on the + # `envoy` container's stdout. Other namespaces/containers are + # ignored so the agent does not parse unrelated cluster logs. + extra_field_selector: "metadata.namespace=envoy-gateway-system" + # Re-scan for new container log files every 2s instead of the 60s + # default so the agent picks up freshly-created Envoy proxy pods + # quickly -- the e2e test provisions a gateway and verifies within a + # short window. + glob_minimum_cooldown_ms: 2000 transforms: parse_nso_logs: type: remap inputs: - nso_logs - - envoy_access_logs.logs + - envoy_access_logs source: | - # The OpenTelemetry source (Envoy push) nests the access log fields - # under `.attributes`; the http_server source (e2e test injection) - # delivers them at the top level. Normalize both to top-level fields - # so the rest of this transform is source-agnostic. - if exists(.attributes) { - . = object(.attributes) ?? . + # The kubernetes_logs source (Envoy File sink -> stdout) delivers + # the raw access log line as a string in `.message`; parse it as + # JSON and lift the fields to the top level. The http_server source + # (e2e test injection) already delivers them at the top level. + # Normalize both so the rest of this transform is source-agnostic. + # Lines that are not JSON (e.g. Envoy startup logs) are dropped. + if is_string(.message) { + # `.message` is known to be a string inside this guard, so + # string!() never actually aborts (hence Vector's harmless + # "can't abort infallible function" compile warning). + parsed, err = parse_json(string!(.message)) + if err != null { + abort + } + . = object(parsed) ?? {} } parsed_route, err = parse_regex(.route_name, r'^httproute/(?P[^/]+)/(?P[^/]+)') @@ -219,7 +235,12 @@ helmCharts: mountPath: /var/lib/vector podSecurityContext: runAsUser: 1000 - runAsGroup: 1000 + # Primary GID 0 (root group) so the kubernetes_logs source can read the + # Envoy container logs: the kubelet writes them as root:root with + # /var/log/pods at 0750 and each 0.log at 0640 -- both group-readable by + # the root group only. We keep runAsUser 1000 / runAsNonRoot: true so the + # process is still a non-root user; only the supplemental group is root. + runAsGroup: 0 runAsNonRoot: true securityContext: allowPrivilegeEscalation: false @@ -243,16 +264,5 @@ helmCharts: port: 9881 protocol: TCP targetPort: 9881 - # OTLP endpoints for Envoy Gateway's OpenTelemetry access log sink. - # internalTrafficPolicy: Local keeps each Envoy pod talking to the - # Vector agent on its own node. - - name: otlp-grpc - port: 4317 - protocol: TCP - targetPort: 4317 - - name: otlp-http - port: 4318 - protocol: TCP - targetPort: 4318 podMonitor: enabled: true diff --git a/test/e2e/billing/chainsaw-test.yaml b/test/e2e/billing/chainsaw-test.yaml index 5605d838..c08bd116 100644 --- a/test/e2e/billing/chainsaw-test.yaml +++ b/test/e2e/billing/chainsaw-test.yaml @@ -156,20 +156,17 @@ spec: accessLog: settings: - sinks: - # File sink: the test still scrapes stdout to derive the + # File sink only -- identical to the production + # downstream gateway EnvoyProxy. Envoy writes the JSON + # access log to stdout; the node-local Vector agent + # tails the container log file (kubernetes_logs source) + # and emits the CloudEvents. This exercises the real, + # file-based production ingestion path -- no OTLP push. + # The test also scrapes the same stdout to derive the # expected metric values (bytes, duration, project_name). - type: File file: path: /dev/stdout - # OpenTelemetry sink: Envoy pushes the access log to the - # node-local Vector agent automatically, exercising the - # real production ingestion path (no manual forwarding). - - type: OpenTelemetry - openTelemetry: - host: billing-usage-collector-vector.billing-system.svc.cluster.local - port: 4317 - resources: - service.name: nso-httproute-signals format: type: JSON json: @@ -545,10 +542,11 @@ spec: curl -kvf -H \"Host: ${PRIMARY_HOSTNAME}\" -d 'hello world' http://\${GATEWAY_SERVICE_NAME}.\${GATEWAY_SERVICE_NAMESPACE}.svc.cluster.local/delay/2; \ " - # Verify Vector emitted the CloudEvents. Envoy pushed the access log to - # Vector automatically via its OpenTelemetry sink when the request above - # was served, so this step only derives the expected values and polls the - # mock billing gateway for the result. + # Verify Vector emitted the CloudEvents. Envoy wrote the access log to + # stdout via its File sink when the request above was served, and the + # node-local Vector agent tailed the container log file (kubernetes_logs + # source) automatically, so this step only derives the expected values + # and polls the mock billing gateway for the result. - script: timeout: 45s cluster: nso-infra @@ -559,16 +557,20 @@ spec: # ===================================================================== # Validates the full HTTP metering pipeline, end to end: # - # Envoy access log --(OTLP push)--> Vector opentelemetry source - # | | - # | (also written to stdout for | (VRL transform fans the - # | capturing expected values) | log record out into - # v v CloudEvents) - # captured here (STEP 1-2) mock-billing-gateway (STEP 4) + # Envoy access log --(File sink -> stdout)--> container log file + # | | + # | (captured here for | (Vector agent + # | expected values) | kubernetes_logs + # | | source tails it, + # | | VRL transform fans + # v v it into CloudEvents) + # captured here (STEP 1-2) mock-billing-gateway (STEP 4) # - # Envoy pushes the access log to Vector automatically (OpenTelemetry - # sink on custom-proxy-config -> billing-usage-collector-vector:4317) - # when the request is served -- the test does NOT forward it manually. + # Envoy writes the access log to stdout (File sink on + # custom-proxy-config, path: /dev/stdout) when the request is + # served; the node-local Vector agent tails the container log file + # automatically -- the test does NOT forward it manually. This is + # the same file-based ingestion path as the production gateway. # # A single access log line is expected to produce FOUR CloudEvents, # one per usage dimension, all sharing the same subject @@ -627,11 +629,12 @@ spec: echo " - Project name (expected): $PROJECT_NAME" # --- STEP 3: (no manual forwarding) -------------------------------- - # The request above already caused Envoy to push this access log to - # Vector automatically via its OpenTelemetry sink (custom-proxy-config - # -> billing-usage-collector-vector:4317). Vector's VRL transform - # parses it and emits the four CloudEvents to the mock-billing-gateway - # sink, so there is nothing to POST here -- we just poll for the result. + # The request above already caused Envoy to write this access log to + # stdout via its File sink (custom-proxy-config, path: /dev/stdout). + # The node-local Vector agent tails the Envoy container log file via + # its kubernetes_logs source, and Vector's VRL transform parses it + # and emits the four CloudEvents to the mock-billing-gateway sink, so + # there is nothing to POST here -- we just poll for the result. # --- STEP 4: verify the CloudEvents the mock billing gateway received # Poll the mock gateway's request log (Vector may take a moment to