From 39d642702e87b5543e5ed2a282d9352a44824600 Mon Sep 17 00:00:00 2001 From: Scot Wells Date: Thu, 25 Jun 2026 13:28:25 -0500 Subject: [PATCH] feat(parity): confirm the edge is running the config it was given Adds a check that compares what the platform intended for the edge against what the edge is actually serving, and names any gap in plain terms (missing, wrong count, wrong place, rejected, or dropped at a limit). Turns a healthy -looking response into real evidence the configuration took hold. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01JbCy8vy66RdNYzGSgqH6P6 --- cmd/parity-check/main.go | 342 +++++++++++++++++++++++++++++++++ cmd/parity-check/main_test.go | 42 ++++ test/parity/README.md | 58 ++++++ test/parity/configdump.go | 147 ++++++++++++++ test/parity/configdump_test.go | 185 ++++++++++++++++++ test/parity/fetch.go | 272 ++++++++++++++++++++++++++ test/parity/fetch_test.go | 90 +++++++++ test/parity/parity.go | 313 ++++++++++++++++++++++++++++++ test/parity/parity_test.go | 154 +++++++++++++++ test/parity/register.go | 21 ++ test/parity/scan.go | 188 ++++++++++++++++++ 11 files changed, 1812 insertions(+) create mode 100644 cmd/parity-check/main.go create mode 100644 cmd/parity-check/main_test.go create mode 100644 test/parity/README.md create mode 100644 test/parity/configdump.go create mode 100644 test/parity/configdump_test.go create mode 100644 test/parity/fetch.go create mode 100644 test/parity/fetch_test.go create mode 100644 test/parity/parity.go create mode 100644 test/parity/parity_test.go create mode 100644 test/parity/register.go create mode 100644 test/parity/scan.go diff --git a/cmd/parity-check/main.go b/cmd/parity-check/main.go new file mode 100644 index 00000000..b145cdbc --- /dev/null +++ b/cmd/parity-check/main.go @@ -0,0 +1,342 @@ +// Command parity-check is the command-line wrapper around the parity package. It +// reads the configuration the extension server intended to program, reads the +// proxy's live configuration, compares them, prints the result as JSON, and +// exits non-zero on any mismatch. +// +// The proxy and extension server can be reached either by a direct URL or, when +// their containers ship without any shell or HTTP client, by forwarding a port +// to the pod and fetching through it from this process. The flags below select +// which. +package main + +import ( + "bufio" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "strings" + "time" + + "go.datum.net/network-services-operator/test/parity" +) + +func main() { + cfg := parseFlags() + if err := run(cfg); err != nil { + fmt.Fprintf(os.Stderr, "parity-check: %v\n", err) + os.Exit(2) // 2 = could not fetch or parse; 1 = mismatch found. + } +} + +type config struct { + corazaFilter string + timeout time.Duration + + adminURL string + admin execTarget + + extURL string + ext execTarget + // extSelector matches all extension server replicas. Only one replica + // actually builds configuration at a time, so the check must query every + // replica and use the authoritative one. Mutually exclusive with + // --ext-exec-pod / --ext-url. + extSelector string + + // expectMinBuildID, when set, requires the build count to be at least this + // value, proving a fresh build happened. Capture the count before a change, + // then require a higher one after. + expectMinBuildID uint64 + jsonOut bool + + // printBuildID, when true, resolves the authoritative replica and prints only + // its build count, then exits without comparing anything. Used to capture the + // count before a change and to wait for configuration to settle. + printBuildID bool +} + +type execTarget struct { + pod string + namespace string + container string + kubeCtx string +} + +func (e execTarget) set() bool { return e.pod != "" } + +func parseFlags() config { + var c config + flag.StringVar(&c.corazaFilter, "coraza-filter", "coraza-waf", + "Coraza HTTP filter name (identifies WAF HCM injection)") + flag.DurationVar(&c.timeout, "timeout", 30*time.Second, "overall fetch timeout") + + flag.StringVar(&c.adminURL, "admin-url", "", + "Envoy admin base URL (e.g. http://127.0.0.1:19000); use this OR --admin-exec-pod") + flag.StringVar(&c.admin.pod, "admin-exec-pod", "", "proxy pod to kubectl-exec into for admin access") + flag.StringVar(&c.admin.namespace, "admin-exec-namespace", "envoy-gateway-system", "namespace of the proxy pod") + flag.StringVar(&c.admin.container, "admin-exec-container", "envoy", "container in the proxy pod") + flag.StringVar(&c.admin.kubeCtx, "admin-exec-context", "", "kubeconfig context for admin exec (optional)") + + flag.StringVar(&c.extURL, "ext-url", "", + "ext-server health base URL (e.g. http://127.0.0.1:8080); use this OR --ext-exec-pod / --ext-exec-selector") + flag.StringVar(&c.ext.pod, "ext-exec-pod", "", "single ext-server pod to kubectl-exec into") + flag.StringVar(&c.extSelector, "ext-exec-selector", "", + "label selector for ALL ext-server replicas; picks the authoritative one (preferred over --ext-exec-pod)") + flag.StringVar(&c.ext.namespace, "ext-exec-namespace", "", "namespace of the ext-server pod(s)") + flag.StringVar(&c.ext.container, "ext-exec-container", "", "container in the ext-server pod") + flag.StringVar(&c.ext.kubeCtx, "ext-exec-context", "", "kubeconfig context for ext exec (optional)") + + flag.Uint64Var(&c.expectMinBuildID, "expect-min-build-id", 0, "require programmed-set BuildID >= this (STALE oracle)") + flag.BoolVar(&c.jsonOut, "json", true, "emit the ParityReport as JSON") + flag.BoolVar(&c.printBuildID, "print-build-id", false, + "resolve the authoritative ext-server replica and print ONLY its BuildID, then exit") + flag.Parse() + return c +} + +func run(c config) error { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Resolve the authoritative replica and emit only its build count, without + // fetching or comparing the proxy's configuration. + if c.printBuildID { + expected, _, err := resolveExpected(ctx, c) + if err != nil { + return err + } + fmt.Printf("%d\n", expected.BuildID) + return nil + } + + adminFetcher, err := buildFetcher(c.adminURL, c.admin, true) + if err != nil { + return fmt.Errorf("admin fetcher: %w", err) + } + + // Resolve the intended configuration and the fetcher for the authoritative + // replica. With a selector this queries every replica and picks the active + // one; otherwise it uses the single configured endpoint. + expected, extFetcher, err := resolveExpected(ctx, c) + if err != nil { + return err + } + if c.expectMinBuildID > 0 && expected.BuildID < c.expectMinBuildID { + return fmt.Errorf("STALE: programmed-set BuildID %d < required %d (ext-server did not re-translate)", + expected.BuildID, c.expectMinBuildID) + } + + droppedSecrets := expected.Keys[parity.FamilyTLSPrune] + actual, _, err := parity.FetchActual(ctx, adminFetcher, extFetcher, c.corazaFilter, droppedSecrets) + if err != nil { + return err + } + + report := parity.Compare(expected, actual) + + if c.jsonOut { + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + _ = enc.Encode(report) + } + fmt.Fprintln(os.Stderr, report.String()) + + if !report.OK() { + os.Exit(1) // mismatch found — distinct from a fetch or parse failure (exit 2). + } + return nil +} + +// resolveExpected returns the intended configuration and the fetcher for the +// authoritative replica. With a selector it queries every replica and picks the +// active one; otherwise it uses the single configured endpoint. The chosen +// fetcher is reused later so other signals also come from the active replica. +func resolveExpected(ctx context.Context, c config) (parity.Expected, parity.Fetcher, error) { + if c.extSelector != "" { + pods, err := resolvePods(ctx, c.ext.kubeCtx, c.ext.namespace, c.extSelector) + if err != nil { + return parity.Expected{}, nil, fmt.Errorf("resolve ext-server pods: %w", err) + } + if len(pods) == 0 { + return parity.Expected{}, nil, fmt.Errorf( + "no ext-server pods matched selector %q in namespace %q", c.extSelector, c.ext.namespace) + } + fetchers := make(map[string]parity.Fetcher, len(pods)) + for _, pod := range pods { + et := c.ext + et.pod = pod + fetchers[pod] = &pfFetcher{target: et, remotePort: "8080"} + } + src, perReplicaErrs, err := parity.FetchExpectedFromMany(ctx, fetchers) + if err != nil { + return parity.Expected{}, nil, err + } + // Report per-replica fetch errors and the chosen replica for diagnosis; + // they are not fatal as long as one replica answered. + for pod, e := range perReplicaErrs { + fmt.Fprintf(os.Stderr, "parity-check: ext replica %s unreachable: %v\n", pod, e) + } + fmt.Fprintf(os.Stderr, "parity-check: authoritative ext replica %s (BuildID %d) of %d\n", + src.Replica, src.Expected.BuildID, len(pods)) + return src.Expected, fetchers[src.Replica], nil + } + + // Single-endpoint fallback. + extFetcher, err := buildFetcher(c.extURL, c.ext, false) + if err != nil { + return parity.Expected{}, nil, fmt.Errorf("ext fetcher: %w", err) + } + exp, err := parity.FetchExpected(ctx, extFetcher) + if err != nil { + return parity.Expected{}, nil, err + } + return exp, extFetcher, nil +} + +// resolvePods returns the names of the running pods matching selector, used to +// enumerate the extension server replicas. +func resolvePods(ctx context.Context, kubeCtx, namespace, selector string) ([]string, error) { + args := []string{} + if kubeCtx != "" { + args = append(args, "--context", kubeCtx) + } + if namespace != "" { + args = append(args, "-n", namespace) + } + args = append(args, "get", "pods", "-l", selector, + "--field-selector=status.phase=Running", + "-o", "jsonpath={.items[*].metadata.name}") + + cmd := exec.CommandContext(ctx, "kubectl", args...) + var stderr strings.Builder + cmd.Stderr = &stderr + out, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("kubectl get pods -l %s: %w: %s", selector, err, strings.TrimSpace(stderr.String())) + } + return strings.Fields(string(out)), nil +} + +// buildFetcher returns a direct HTTP fetcher when a URL is given, otherwise a +// port-forward fetcher to the given pod. admin selects which remote port to +// reach. Port-forwarding is used rather than running a command inside the pod +// because the containers ship without a shell or HTTP client. +func buildFetcher(url string, et execTarget, admin bool) (parity.Fetcher, error) { + switch { + case url != "": + return parity.HTTPFetcher{BaseURL: url}, nil + case et.set(): + port := "8080" + if admin { + port = "19000" + } + return &pfFetcher{target: et, remotePort: port}, nil + default: + return nil, fmt.Errorf("either a URL or an exec pod must be provided") + } +} + +// pfFetcher fetches a path by forwarding a local port to the pod and making the +// request from this process. The forward is set up and torn down per call, on a +// fresh local port each time so concurrent fetches don't collide. +type pfFetcher struct { + target execTarget + remotePort string +} + +func (p *pfFetcher) Fetch(ctx context.Context, path string) ([]byte, error) { + localPort, err := freeLocalPort() + if err != nil { + return nil, fmt.Errorf("allocate local port: %w", err) + } + + args := []string{} + if p.target.kubeCtx != "" { + args = append(args, "--context", p.target.kubeCtx) + } + if p.target.namespace != "" { + args = append(args, "-n", p.target.namespace) + } + args = append(args, "port-forward", "pod/"+p.target.pod, + fmt.Sprintf("%d:%s", localPort, p.remotePort)) + + // The forward must outlive the single request but be torn down right after. + pfCtx, cancel := context.WithCancel(ctx) + defer cancel() + cmd := exec.CommandContext(pfCtx, "kubectl", args...) + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + var stderr strings.Builder + cmd.Stderr = &stderr + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("start port-forward: %w", err) + } + defer func() { cancel(); _ = cmd.Wait() }() + + // Wait until the forward is listening before fetching. + if err := waitForwardReady(pfCtx, stdout); err != nil { + return nil, fmt.Errorf("port-forward not ready: %w (%s)", err, strings.TrimSpace(stderr.String())) + } + + url := fmt.Sprintf("http://127.0.0.1:%d%s", localPort, path) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("GET %s via port-forward: %w", url, err) + } + defer func() { _ = resp.Body.Close() }() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read %s: %w", url, err) + } + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("GET %s: status %d", url, resp.StatusCode) + } + return body, nil +} + +// freeLocalPort asks the operating system for an unused port. Another process +// could claim it in the brief gap before we bind, but that window is small and +// the caller retries. +func freeLocalPort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + defer func() { _ = l.Close() }() + return l.Addr().(*net.TCPAddr).Port, nil +} + +// waitForwardReady blocks until the forward reports that its local socket is +// listening, or the context is done. +func waitForwardReady(ctx context.Context, stdout io.Reader) error { + ready := make(chan struct{}, 1) + go func() { + sc := bufio.NewScanner(stdout) + for sc.Scan() { + if strings.Contains(sc.Text(), "Forwarding from") { + ready <- struct{}{} + return + } + } + }() + select { + case <-ready: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-time.After(15 * time.Second): + return fmt.Errorf("timed out waiting for port-forward") + } +} diff --git a/cmd/parity-check/main_test.go b/cmd/parity-check/main_test.go new file mode 100644 index 00000000..6f94911c --- /dev/null +++ b/cmd/parity-check/main_test.go @@ -0,0 +1,42 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.datum.net/network-services-operator/test/parity" +) + +func TestBuildFetcher_URL(t *testing.T) { + f, err := buildFetcher("http://127.0.0.1:19000", execTarget{}, true) + require.NoError(t, err) + h, ok := f.(parity.HTTPFetcher) + require.True(t, ok) + assert.Equal(t, "http://127.0.0.1:19000", h.BaseURL) +} + +func TestBuildFetcher_PortForward(t *testing.T) { + f, err := buildFetcher("", execTarget{pod: "envoy-abc", namespace: "envoy-gateway-system", container: "envoy"}, true) + require.NoError(t, err) + e, ok := f.(*pfFetcher) + require.True(t, ok) + assert.Equal(t, "19000", e.remotePort, "admin fetcher must port-forward the admin port") + + fExt, err := buildFetcher("", execTarget{pod: "ext-abc", namespace: "nso"}, false) + require.NoError(t, err) + eExt := fExt.(*pfFetcher) + assert.Equal(t, "8080", eExt.remotePort, "ext fetcher must port-forward the health port") +} + +func TestFreeLocalPort(t *testing.T) { + p, err := freeLocalPort() + require.NoError(t, err) + assert.Greater(t, p, 0) +} + +func TestBuildFetcher_NeitherErrors(t *testing.T) { + _, err := buildFetcher("", execTarget{}, true) + assert.Error(t, err) +} diff --git a/test/parity/README.md b/test/parity/README.md new file mode 100644 index 00000000..43d3ea5b --- /dev/null +++ b/test/parity/README.md @@ -0,0 +1,58 @@ +# The parity check + +The parity check answers one question the customer ultimately cares about: + +> **Is the edge actually doing what it was told to do?** + +It compares two things — what the platform *intended* to program onto the edge, +and what the running proxy is *actually* serving — and reports any difference. +It's the tie-breaker described in [`docs/testing/README.md`](../../docs/testing/README.md), +backing up the real-traffic guarantees in +[`test/e2e/README.md`](../e2e/README.md). + +## Why real traffic isn't enough on its own + +A successful response is reassuring but ambiguous. If a firewall is supposed to +block an attack and the attack instead succeeds, the response alone can't tell +you *why*: is the firewall switched off, did the rule not match, did the edge +never pick up the new configuration, or was the configuration applied to the +wrong place? Worse, the most dangerous version of this failure is silent — a +firewall that protects nothing still lets every ordinary request through, so +everything looks healthy right up until a real attack arrives in production. + +The parity check removes the ambiguity. It looks past the response at the edge's +actual live configuration, so a surprising result becomes a specific, named +diagnosis instead of a guess. + +## What it catches + +It sorts any gap between intended and actual into the categories that map to +real incidents: + +| Diagnosis | What it means for the customer | +|---|---| +| **Missing** | The edge was told to add protection but carries none — the rule is inert. | +| **Incomplete** | Only part of what was intended made it — partial protection. | +| **Misplaced** | The right amount of configuration, applied to the wrong place. | +| **Rejected** | The edge refused the update outright and is serving stale config. | +| **Overflowed** | The configuration was too large to deliver and was silently dropped. | + +## How it fits the suite + +- **Real traffic is the verdict.** Did the edge behave correctly? That's the + pass/fail. +- **Parity is the diagnosis.** When the verdict surprises, parity says which of + the failures above you're in — and it catches the silent, protects-nothing + case that ordinary traffic can't reveal. + +The rule the suite holds to: parity supports a real-traffic test, it never +replaces one. A scenario that could send the attack and watch it be blocked does +exactly that; parity is added on top, never in place of the behavior itself. + +## Using it + +The check ships as a small command, `parity-check` (see +[`cmd/parity-check`](../../cmd/parity-check)), which the end-to-end suite runs +automatically as a shared step. It can also be pointed at a live edge on its own +to ask, "does this proxy match what it was told?" — useful when diagnosing an +edge that's misbehaving. diff --git a/test/parity/configdump.go b/test/parity/configdump.go new file mode 100644 index 00000000..aa2741be --- /dev/null +++ b/test/parity/configdump.go @@ -0,0 +1,147 @@ +package parity + +import ( + "fmt" + + adminv3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3" + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +// ConfigDump is the parsed view of the proxy's live configuration, with each +// resource decoded into its concrete type. +type ConfigDump struct { + Clusters []*clusterv3.Cluster + Routes []*routev3.RouteConfiguration + Listeners []*listenerv3.Listener + // SecretNames are the TLS certificates present, used to confirm that + // certificates we expected to be removed are actually gone. + SecretNames []string + // ErrorStates holds, per kind of configuration, the reasons the proxy gave + // for rejecting an individual resource. + ErrorStates map[string][]string +} + +// ParseConfigDump decodes the proxy's live configuration into typed resources. +// We decode against the real schema rather than poking at raw JSON so that a +// shape we don't recognize is a hard error, not a silent miss. +func ParseConfigDump(raw []byte) (*ConfigDump, error) { + var dump adminv3.ConfigDump + // Tolerate fields newer than this binary knows about, so a proxy running a + // newer version doesn't fail the parse. + if err := (protojson.UnmarshalOptions{DiscardUnknown: true}).Unmarshal(raw, &dump); err != nil { + return nil, fmt.Errorf("unmarshal config_dump envelope: %w", err) + } + + out := &ConfigDump{ErrorStates: map[string][]string{}} + + for i, cfgAny := range dump.GetConfigs() { + msg, err := cfgAny.UnmarshalNew() + if err != nil { + return nil, fmt.Errorf("unmarshal config_dump section %d (%s): %w", i, cfgAny.GetTypeUrl(), err) + } + switch section := msg.(type) { + case *adminv3.ClustersConfigDump: + if err := out.absorbClusters(section); err != nil { + return nil, err + } + case *adminv3.RoutesConfigDump: + if err := out.absorbRoutes(section); err != nil { + return nil, err + } + case *adminv3.ListenersConfigDump: + if err := out.absorbListeners(section); err != nil { + return nil, err + } + case *adminv3.SecretsConfigDump: + out.absorbSecrets(section) + default: + // Other sections aren't needed by the scanners. + } + } + return out, nil +} + +func (d *ConfigDump) absorbClusters(s *adminv3.ClustersConfigDump) error { + for _, dc := range s.GetDynamicActiveClusters() { + if es := dc.GetErrorState(); es != nil { + d.ErrorStates["cluster"] = append(d.ErrorStates["cluster"], es.GetDetails()) + } + cl := &clusterv3.Cluster{} + if err := unwrap(dc.GetCluster(), cl); err != nil { + return fmt.Errorf("unwrap dynamic cluster: %w", err) + } + if cl.GetName() != "" { + d.Clusters = append(d.Clusters, cl) + } + } + // Clusters still being applied may carry rejection details too. + for _, dc := range s.GetDynamicWarmingClusters() { + if es := dc.GetErrorState(); es != nil { + d.ErrorStates["cluster"] = append(d.ErrorStates["cluster"], es.GetDetails()) + } + } + return nil +} + +func (d *ConfigDump) absorbRoutes(s *adminv3.RoutesConfigDump) error { + for _, dr := range s.GetDynamicRouteConfigs() { + if es := dr.GetErrorState(); es != nil { + d.ErrorStates["route"] = append(d.ErrorStates["route"], es.GetDetails()) + } + rc := &routev3.RouteConfiguration{} + if err := unwrap(dr.GetRouteConfig(), rc); err != nil { + return fmt.Errorf("unwrap dynamic route config: %w", err) + } + if rc.GetName() != "" { + d.Routes = append(d.Routes, rc) + } + } + return nil +} + +func (d *ConfigDump) absorbListeners(s *adminv3.ListenersConfigDump) error { + for _, dl := range s.GetDynamicListeners() { + if es := dl.GetErrorState(); es != nil { + d.ErrorStates["listener"] = append(d.ErrorStates["listener"], es.GetDetails()) + } + // Scan the listener the proxy is currently serving, not one still being + // applied. + active := dl.GetActiveState() + if active == nil { + continue + } + l := &listenerv3.Listener{} + if err := unwrap(active.GetListener(), l); err != nil { + return fmt.Errorf("unwrap dynamic listener: %w", err) + } + if l.GetName() != "" { + d.Listeners = append(d.Listeners, l) + } + } + return nil +} + +func (d *ConfigDump) absorbSecrets(s *adminv3.SecretsConfigDump) { + for _, ds := range s.GetDynamicActiveSecrets() { + if es := ds.GetErrorState(); es != nil { + d.ErrorStates["secret"] = append(d.ErrorStates["secret"], es.GetDetails()) + } + if n := ds.GetName(); n != "" { + d.SecretNames = append(d.SecretNames, n) + } + } +} + +// unwrap decodes a wrapped resource into dst. A nil input is not an error (the +// resource is simply absent); dst is left zero. +func unwrap(a *anypb.Any, dst proto.Message) error { + if a == nil { + return nil + } + return a.UnmarshalTo(dst) +} diff --git a/test/parity/configdump_test.go b/test/parity/configdump_test.go new file mode 100644 index 00000000..68085fdc --- /dev/null +++ b/test/parity/configdump_test.go @@ -0,0 +1,185 @@ +package parity + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + adminv3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3" + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + hcmv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + internalupstreamv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/internal_upstream/v3" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" +) + +func mustAny(t *testing.T, m proto.Message) *anypb.Any { + t.Helper() + a, err := anypb.New(m) + require.NoError(t, err) + return a +} + +// buildSyntheticDump builds a proxy configuration that looks like the real +// thing, so parsing and scanning can be exercised without a live cluster. +func buildSyntheticDump(t *testing.T, corazaFilter string) []byte { + t.Helper() + + // --- WAF-governed route + CONNECT route --- + wafMeta, err := structpb.NewStruct(map[string]any{ + "resources": []any{map[string]any{ + "kind": "TrafficProtectionPolicy", "namespace": "proj-ns", "name": "test-tpp", "mode": "Observe", + }}, + }) + require.NoError(t, err) + rc := &routev3.RouteConfiguration{ + Name: "gw/https", + VirtualHosts: []*routev3.VirtualHost{{ + Name: "vh", + Domains: []string{"app.example.com", "vh.connector.internal"}, + Routes: []*routev3.Route{ + { + Name: "connector-connect-vh", + Action: &routev3.Route_Route{Route: &routev3.RouteAction{ + UpgradeConfigs: []*routev3.RouteAction_UpgradeConfig{{UpgradeType: "CONNECT"}}, + }}, + }, + { + Name: "fwd", + Metadata: &corev3.Metadata{FilterMetadata: map[string]*structpb.Struct{datumGatewayMetaKey: wafMeta}}, + }, + }, + }}, + } + + // --- replaced connector cluster --- + ts := mustAny(t, &internalupstreamv3.InternalUpstreamTransport{}) + cl := &clusterv3.Cluster{ + Name: "httproute/proj-ns/proxy/rule/0", + ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_STATIC}, + LoadAssignment: &endpointv3.ClusterLoadAssignment{ClusterName: "httproute/proj-ns/proxy/rule/0"}, + TransportSocket: &corev3.TransportSocket{ + Name: connectorInternalTransport, + ConfigType: &corev3.TransportSocket_TypedConfig{TypedConfig: ts}, + }, + } + infraCl := &clusterv3.Cluster{ + Name: "infra", + ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS}, + } + + // --- listener with the firewall and a custom error page --- + hcm := &hcmv3.HttpConnectionManager{ + RouteSpecifier: &hcmv3.HttpConnectionManager_Rds{Rds: &hcmv3.Rds{RouteConfigName: "gw/https"}}, + HttpFilters: []*hcmv3.HttpFilter{{Name: corazaFilter, Disabled: true}, {Name: "envoy.filters.http.router"}}, + LocalReplyConfig: &hcmv3.LocalReplyConfig{}, + } + l := &listenerv3.Listener{ + Name: "gw/https", + FilterChains: []*listenerv3.FilterChain{{ + Name: "fc", + Filters: []*listenerv3.Filter{{ + Name: hcmNetworkFilterName, + ConfigType: &listenerv3.Filter_TypedConfig{TypedConfig: mustAny(t, hcm)}, + }}, + }}, + } + + dump := &adminv3.ConfigDump{ + Configs: []*anypb.Any{ + mustAny(t, &adminv3.ClustersConfigDump{ + DynamicActiveClusters: []*adminv3.ClustersConfigDump_DynamicCluster{ + {Cluster: mustAny(t, cl)}, + {Cluster: mustAny(t, infraCl)}, + }, + }), + mustAny(t, &adminv3.RoutesConfigDump{ + DynamicRouteConfigs: []*adminv3.RoutesConfigDump_DynamicRouteConfig{{RouteConfig: mustAny(t, rc)}}, + }), + mustAny(t, &adminv3.ListenersConfigDump{ + DynamicListeners: []*adminv3.ListenersConfigDump_DynamicListener{ + {Name: "gw/https", ActiveState: &adminv3.ListenersConfigDump_DynamicListenerState{Listener: mustAny(t, l)}}, + }, + }), + mustAny(t, &adminv3.SecretsConfigDump{ + DynamicActiveSecrets: []*adminv3.SecretsConfigDump_DynamicSecret{{Name: "good-cert"}}, + }), + }, + } + + raw, err := protojson.Marshal(dump) + require.NoError(t, err) + return raw +} + +func TestParseAndScan_EndToEnd(t *testing.T) { + const corazaFilter = "coraza-waf" + raw := buildSyntheticDump(t, corazaFilter) + + dump, err := ParseConfigDump(raw) + require.NoError(t, err) + + assert.Len(t, dump.Clusters, 2) + assert.Len(t, dump.Routes, 1) + assert.Len(t, dump.Listeners, 1) + assert.Equal(t, []string{"good-cert"}, dump.SecretNames) + + act := ScanActual(dump, corazaFilter, nil) + assert.Equal(t, []string{wafRouteKey("gw/https", "vh", "fwd", "proj-ns", "test-tpp", "Observe")}, + normalize(act.Keys[FamilyWAFRoute])) + assert.Equal(t, []string{"httproute/proj-ns/proxy/rule/0"}, normalize(act.Keys[FamilyConnectorCluster])) + assert.Equal(t, []string{connectorRouteKey("gw/https", "vh", "connector-connect-vh")}, + normalize(act.Keys[FamilyConnectorRoute])) + assert.Equal(t, []string{listenerChainKey("gw/https", "fc")}, normalize(act.Keys[FamilyWAFHCM])) + assert.Equal(t, []string{listenerChainKey("gw/https", "fc")}, normalize(act.Keys[FamilyLocalReply])) +} + +// TestParseAndScan_FullParity is the integration shape: the same key formats are +// produced by the (simulated) ext-server side and the dump side, so Compare +// returns OK — proving the two sides are in lockstep. +func TestParseAndScan_FullParity(t *testing.T) { + const corazaFilter = "coraza-waf" + raw := buildSyntheticDump(t, corazaFilter) + dump, err := ParseConfigDump(raw) + require.NoError(t, err) + act := ScanActual(dump, corazaFilter, nil) + + // Expected derived from the same artifact set (as the programmed-set endpoint + // would report it). + expected := exp(map[Family][]string{ + FamilyWAFRoute: {wafRouteKey("gw/https", "vh", "fwd", "proj-ns", "test-tpp", "Observe")}, + FamilyConnectorCluster: {"httproute/proj-ns/proxy/rule/0"}, + FamilyConnectorRoute: {connectorRouteKey("gw/https", "vh", "connector-connect-vh")}, + FamilyWAFHCM: {listenerChainKey("gw/https", "fc")}, + FamilyLocalReply: {listenerChainKey("gw/https", "fc")}, + }) + + rep := Compare(expected, act) + rep.AssertOK(t) +} + +// TestParseConfigDump_NACKErrorState covers a single resource the proxy +// rejected while keeping the rest. +func TestParseConfigDump_NACKErrorState(t *testing.T) { + dump := &adminv3.ConfigDump{Configs: []*anypb.Any{ + mustAny(t, &adminv3.ListenersConfigDump{ + DynamicListeners: []*adminv3.ListenersConfigDump_DynamicListener{ + {Name: "bad", ErrorState: &adminv3.UpdateFailureState{Details: "KEY_VALUES_MISMATCH"}}, + }, + }), + }} + raw, err := protojson.Marshal(dump) + require.NoError(t, err) + + parsed, err := ParseConfigDump(raw) + require.NoError(t, err) + require.Contains(t, parsed.ErrorStates, "listener") + assert.Contains(t, parsed.ErrorStates["listener"][0], "KEY_VALUES_MISMATCH") +} diff --git a/test/parity/fetch.go b/test/parity/fetch.go new file mode 100644 index 00000000..95da96f4 --- /dev/null +++ b/test/parity/fetch.go @@ -0,0 +1,272 @@ +package parity + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sort" + "strconv" + "strings" +) + +// Fetcher retrieves a response body from an endpoint. It is an interface because +// different environments reach the proxy differently: some can make a plain HTTP +// request, others must tunnel into the proxy pod first. +type Fetcher interface { + Fetch(ctx context.Context, path string) ([]byte, error) +} + +// HTTPFetcher fetches over plain HTTP from a base URL, for endpoints reachable +// directly without tunneling into a pod. +type HTTPFetcher struct { + BaseURL string // e.g. "http://127.0.0.1:19000" + Client *http.Client +} + +func (h HTTPFetcher) Fetch(ctx context.Context, path string) ([]byte, error) { + client := h.Client + if client == nil { + client = http.DefaultClient + } + url := strings.TrimRight(h.BaseURL, "/") + path + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("GET %s: %w", url, err) + } + defer func() { _ = resp.Body.Close() }() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read %s: %w", url, err) + } + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("GET %s: status %d: %s", url, resp.StatusCode, snippet(body)) + } + return body, nil +} + +// FetchExpected reads the configuration the extension server says it intended to +// program, which is the reference we compare the live proxy against. +func FetchExpected(ctx context.Context, extFetcher Fetcher) (Expected, error) { + body, err := extFetcher.Fetch(ctx, ProgrammedSetPath) + if err != nil { + return Expected{}, fmt.Errorf("fetch programmed-set: %w", err) + } + var exp Expected + if err := json.Unmarshal(body, &exp); err != nil { + return Expected{}, fmt.Errorf("decode programmed-set JSON: %w", err) + } + if exp.Keys == nil { + exp.Keys = map[Family][]string{} + } + if exp.Counts == nil { + exp.Counts = map[Family]int{} + } + return exp, nil +} + +// ProgrammedSetPath is duplicated from the extension server rather than imported +// so this package stays free of any internal/ dependency; the two MUST stay in sync. +const ProgrammedSetPath = "/debug/programmed-set" + +// ExpectedSource pairs a fetched Expected set with the replica it came from, so +// the caller can report which replica was authoritative. +type ExpectedSource struct { + Replica string + Expected Expected +} + +// FetchExpectedFromMany queries every extension server replica and returns the +// authoritative one. Only one replica actually builds configuration at a time, +// so querying a fixed replica can hit an idle one and falsely report missing or +// stale configuration. +// +// The replica with the highest build count wins, since that count only ever +// increases on the replica doing the work and idle replicas sit at zero. A tie +// (no replica has built yet) resolves to the first replica so the caller still +// gets a valid empty result instead of an error. +// +// An unreachable replica is skipped rather than fatal, as long as at least one +// replica answered; its error is returned for diagnosis. +func FetchExpectedFromMany(ctx context.Context, fetchers map[string]Fetcher) (ExpectedSource, map[string]error, error) { + if len(fetchers) == 0 { + return ExpectedSource{}, nil, fmt.Errorf("no ext-server replicas to query") + } + + errs := map[string]error{} + var best *ExpectedSource + // Stable order so ties resolve deterministically. + for _, replica := range sortedKeys(fetchers) { + exp, err := FetchExpected(ctx, fetchers[replica]) + if err != nil { + errs[replica] = err + continue + } + src := ExpectedSource{Replica: replica, Expected: exp} + if best == nil || exp.BuildID > best.Expected.BuildID { + s := src + best = &s + } + } + + if best == nil { + return ExpectedSource{}, errs, fmt.Errorf("all %d ext-server replicas failed to answer", len(fetchers)) + } + return *best, errs, nil +} + +// sortedKeys returns the map keys sorted, for deterministic iteration. +func sortedKeys(m map[string]Fetcher) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// FetchActual reads the proxy's live configuration and scans it for the +// artifacts of each change family. corazaFilterName identifies the firewall +// filter; droppedSecrets names the TLS certificates that should have been +// removed, so we can confirm none are still present. +// +// adminFetcher reaches the proxy; extFetcher reaches the extension server to +// check whether any update was too large to deliver. Pass a nil extFetcher to +// skip that check. +func FetchActual( + ctx context.Context, + adminFetcher Fetcher, + extFetcher Fetcher, + corazaFilterName string, + droppedSecrets []string, +) (Actual, *ConfigDump, error) { + dumpRaw, err := adminFetcher.Fetch(ctx, "/config_dump") + if err != nil { + return Actual{}, nil, fmt.Errorf("fetch config_dump: %w", err) + } + dump, err := ParseConfigDump(dumpRaw) + if err != nil { + return Actual{}, nil, err + } + + act := ScanActual(dump, corazaFilterName, droppedSecrets) + + // Counts of updates the proxy rejected. + statsRaw, err := adminFetcher.Fetch(ctx, "/stats?format=prometheus") + if err != nil { + // Best-effort: the comparison still works without these counts, so a + // failure here yields an empty map rather than aborting. + statsRaw = nil + } + act.XDSRejected = scrapeXDSRejected(statsRaw) + + // Whether any update was too large to deliver. + if extFetcher != nil { + if metricsRaw, mErr := extFetcher.Fetch(ctx, "/metrics"); mErr == nil { + act.ResourceExhausted = scrapeResourceExhausted(metricsRaw) + } + } + return act, dump, nil +} + +// snippet returns the first line of b for error messages. +func snippet(b []byte) string { + s := string(b) + if i := strings.IndexByte(s, '\n'); i >= 0 { + s = s[:i] + } + if len(s) > 200 { + s = s[:200] + "..." + } + return s +} + +// scrapeXDSRejected sums the proxy's rejected-update counters, grouped by the +// kind of configuration. Route counters are per-route so they are matched by +// substring rather than exact name. +func scrapeXDSRejected(stats []byte) map[string]int { + out := map[string]int{} + if len(stats) == 0 { + return out + } + for line := range strings.SplitSeq(string(stats), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if !strings.Contains(line, "update_rejected") { + continue + } + name, val := splitMetricLine(line) + if val == 0 { + continue + } + fam := classifyXDS(name) + out[fam] += val + } + return out +} + +// classifyXDS groups a metric by the kind of configuration it counts. The +// proxy rewrites dots to underscores in metric names, so we match on substrings. +func classifyXDS(name string) string { + switch { + case strings.Contains(name, "lds"): + return "lds" + case strings.Contains(name, "cds"): + return "cds" + case strings.Contains(name, "rds"): + return "rds" + case strings.Contains(name, "sds"): + return "sds" + default: + return "other" + } +} + +// scrapeResourceExhausted counts how often the extension server failed to +// deliver an update because it was too large. A non-zero result means +// configuration was truncated. +func scrapeResourceExhausted(metrics []byte) int { + if len(metrics) == 0 { + return 0 + } + total := 0 + for line := range strings.SplitSeq(string(metrics), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if !strings.HasPrefix(line, "grpc_server_handled_total") { + continue + } + if !strings.Contains(line, `grpc_code="ResourceExhausted"`) { + continue + } + _, val := splitMetricLine(line) + total += val + } + return total +} + +// splitMetricLine parses one metrics line "name{labels} value" into the name +// (without labels) and the integer value (floats are truncated; non-numeric +// yields 0). +func splitMetricLine(line string) (name string, value int) { + fields := strings.Fields(line) + if len(fields) < 2 { + return "", 0 + } + valStr := fields[len(fields)-1] + nameWithLabels := strings.Join(fields[:len(fields)-1], " ") + name, _, _ = strings.Cut(nameWithLabels, "{") + if f, err := strconv.ParseFloat(valStr, 64); err == nil { + value = int(f) + } + return name, value +} diff --git a/test/parity/fetch_test.go b/test/parity/fetch_test.go new file mode 100644 index 00000000..2ec8d1ea --- /dev/null +++ b/test/parity/fetch_test.go @@ -0,0 +1,90 @@ +package parity + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// stubFetcher returns a fixed programmed-set JSON body (or error) for the +// programmed-set path. Models one ext-server replica. +type stubFetcher struct { + buildID int + err error +} + +func (s stubFetcher) Fetch(_ context.Context, path string) ([]byte, error) { + if s.err != nil { + return nil, s.err + } + if path != ProgrammedSetPath { + return nil, fmt.Errorf("unexpected path %s", path) + } + body := fmt.Sprintf(`{"buildID":%d,"keys":{"waf_route":["k%d"]},"counts":{"waf_route":1}}`, + s.buildID, s.buildID) + return []byte(body), nil +} + +// TestFetchExpectedFromMany_PicksHighestBuildID is the core multi-replica rule: +// EG pins its gRPC connection to one replica, so only that pod has the real +// (highest-BuildID) programmed-set; the idle replica sits at 0. +func TestFetchExpectedFromMany_PicksHighestBuildID(t *testing.T) { + fetchers := map[string]Fetcher{ + "ext-replica-a": stubFetcher{buildID: 0}, // idle replica + "ext-replica-b": stubFetcher{buildID: 7}, // active replica + } + src, errs, err := FetchExpectedFromMany(context.Background(), fetchers) + require.NoError(t, err) + assert.Empty(t, errs) + assert.Equal(t, "ext-replica-b", src.Replica, "must pick the active (highest-BuildID) replica") + assert.Equal(t, uint64(7), src.Expected.BuildID) + assert.Equal(t, []string{"k7"}, src.Expected.Keys[FamilyWAFRoute]) +} + +// TestFetchExpectedFromMany_OneUnreachable: one replica down is not fatal as +// long as another answers; the error is surfaced, not returned. +func TestFetchExpectedFromMany_OneUnreachable(t *testing.T) { + fetchers := map[string]Fetcher{ + "ext-replica-a": stubFetcher{err: errors.New("connection refused")}, + "ext-replica-b": stubFetcher{buildID: 3}, + } + src, errs, err := FetchExpectedFromMany(context.Background(), fetchers) + require.NoError(t, err) + assert.Equal(t, "ext-replica-b", src.Replica) + require.Contains(t, errs, "ext-replica-a") + assert.ErrorContains(t, errs["ext-replica-a"], "connection refused") +} + +// TestFetchExpectedFromMany_AllFail returns an error only when no replica answers. +func TestFetchExpectedFromMany_AllFail(t *testing.T) { + fetchers := map[string]Fetcher{ + "a": stubFetcher{err: errors.New("down")}, + "b": stubFetcher{err: errors.New("down")}, + } + _, errs, err := FetchExpectedFromMany(context.Background(), fetchers) + require.Error(t, err) + assert.Len(t, errs, 2) +} + +// TestFetchExpectedFromMany_TieResolvesDeterministically: both at 0 (no build +// anywhere yet) resolves to the first replica in sorted order, returning a +// well-formed empty Expected rather than an error. +func TestFetchExpectedFromMany_TieResolvesDeterministically(t *testing.T) { + fetchers := map[string]Fetcher{ + "ext-z": stubFetcher{buildID: 0}, + "ext-a": stubFetcher{buildID: 0}, + } + src, _, err := FetchExpectedFromMany(context.Background(), fetchers) + require.NoError(t, err) + assert.Equal(t, "ext-a", src.Replica, "ties resolve to the first replica in sorted order") + assert.Equal(t, uint64(0), src.Expected.BuildID) +} + +func TestFetchExpectedFromMany_Empty(t *testing.T) { + _, _, err := FetchExpectedFromMany(context.Background(), nil) + require.Error(t, err) +} diff --git a/test/parity/parity.go b/test/parity/parity.go new file mode 100644 index 00000000..26ee8198 --- /dev/null +++ b/test/parity/parity.go @@ -0,0 +1,313 @@ +// Package parity answers one question: is the edge proxy actually running the +// configuration the platform tried to give it? A successful-looking request can +// hide configuration that never applied, applied only partly, or applied in the +// wrong place — so we compare what was intended against what the proxy is live +// serving and name the gap, turning a surprising result into a cause instead of +// a guess. +package parity + +import ( + "fmt" + "sort" + "strings" + "testing" +) + +// Family groups one kind of edge change so intended and live can be compared +// kind by kind. The values match the names the extension server reports, so its +// output decodes straight into these. +type Family string + +const ( + FamilyWAFRoute Family = "waf_route" + FamilyWAFHCM Family = "waf_hcm" + FamilyLocalReply Family = "local_reply" + FamilyConnectorCluster Family = "connector_cluster" + FamilyConnectorRoute Family = "connector_route" + FamilyConnectorOffline Family = "connector_offline" + FamilyTLSPrune Family = "tls_prune" +) + +// AllFamilies is the canonical iteration order for reports. +var AllFamilies = []Family{ + FamilyWAFRoute, FamilyWAFHCM, FamilyLocalReply, + FamilyConnectorCluster, FamilyConnectorRoute, FamilyConnectorOffline, + FamilyTLSPrune, +} + +// FailureClass names how a comparison differed. The values are stable because +// reports and logs are read by people and tooling outside this package. +type FailureClass string + +const ( + ClassOK FailureClass = "OK" + ClassMissing FailureClass = "MISSING" + ClassCountMismatch FailureClass = "COUNT_MISMATCH" + ClassWrongKeyed FailureClass = "WRONG_KEYED" + ClassNACK FailureClass = "NACK" + ClassCeilingTruncation FailureClass = "CEILING_TRUNCATION" +) + +// Expected is the set of changes the extension server reports it last applied. +type Expected struct { + BuildID uint64 `json:"buildID"` + Keys map[Family][]string `json:"keys"` + Counts map[Family]int `json:"counts"` + TLSPrunedChains int `json:"tlsPrunedChains"` + TLSPrunedSecrets int `json:"tlsPrunedSecrets"` + TLSListenersLeftIntact int `json:"tlsListenersLeftIntact"` +} + +// Actual is what the edge proxy is really running, read back from its live +// configuration and its record of rejected or undelivered updates. +type Actual struct { + // Keys must be formatted exactly as the extension server formats them, so the + // two sides can be compared as sets. + Keys map[Family][]string + // Certificates the extension server says it removed that are still present on + // the proxy — a removal that didn't take effect. + TLSDroppedSecretsStillPresent []string + // Updates the proxy rejected, by kind. Any rejection means the proxy kept + // older configuration instead. + XDSRejected map[string]int + // Count of updates too large to deliver, which the proxy silently never sees. + ResourceExhausted int +} + +// FamilyResult is one family's comparison outcome. +type FamilyResult struct { + Family Family `json:"family"` + Class FailureClass `json:"class"` + ExpectedCount int `json:"expectedCount"` + ActualCount int `json:"actualCount"` + // Expected but absent from the proxy. + MissingKeys []string `json:"missingKeys,omitempty"` + // Present on the proxy but not expected — the other side of a wrong-place match. + UnexpectedKeys []string `json:"unexpectedKeys,omitempty"` + // Plain-language explanation for differences that aren't about keys. + Detail string `json:"detail,omitempty"` +} + +// ParityReport is the full result of a comparison — never just pass/fail, so a +// reader can see which change differed, how, and the specific keys involved. +type ParityReport struct { + ExpectedBuildID uint64 `json:"expectedBuildID"` + Families []FamilyResult `json:"families"` + // Update kinds the proxy rejected. + XDSRejections map[string]int `json:"xdsRejections,omitempty"` + // Non-zero when updates were too large to deliver. + ResourceExhausted int `json:"resourceExhausted,omitempty"` +} + +// OK reports whether everything matched and the proxy accepted every update. +func (r ParityReport) OK() bool { + if len(r.XDSRejections) > 0 || r.ResourceExhausted > 0 { + return false + } + for _, f := range r.Families { + if f.Class != ClassOK { + return false + } + } + return true +} + +// Failures lists every problem — mismatched changes plus any rejected or +// undelivered updates — so a caller can show them all at once. +func (r ParityReport) Failures() []FamilyResult { + var out []FamilyResult + for _, f := range r.Families { + if f.Class != ClassOK { + out = append(out, f) + } + } + for xds, n := range r.XDSRejections { + out = append(out, FamilyResult{ + Class: ClassNACK, + Detail: fmt.Sprintf("xDS %s update_rejected=%d (Envoy rejected the snapshot)", xds, n), + }) + } + if r.ResourceExhausted > 0 { + out = append(out, FamilyResult{ + Class: ClassCeilingTruncation, + Detail: fmt.Sprintf("ext-server gRPC ResourceExhausted=%d (message-ceiling truncation)", r.ResourceExhausted), + }) + } + return out +} + +// AssertOK fails the test with the full report when the comparison did not pass. +func (r ParityReport) AssertOK(t testing.TB) { + t.Helper() + if !r.OK() { + t.Errorf("config-dump parity gate FAILED:\n%s", r.String()) + } +} + +// String renders a compact, readable summary. +func (r ParityReport) String() string { + var b strings.Builder + fmt.Fprintf(&b, "parity report (expected build %d): %s\n", r.ExpectedBuildID, statusWord(r.OK())) + for _, f := range r.Families { + fmt.Fprintf(&b, " %-18s %-15s expected=%d actual=%d", f.Family, f.Class, f.ExpectedCount, f.ActualCount) + if len(f.MissingKeys) > 0 { + fmt.Fprintf(&b, " missing=%v", truncateKeys(f.MissingKeys)) + } + if len(f.UnexpectedKeys) > 0 { + fmt.Fprintf(&b, " unexpected=%v", truncateKeys(f.UnexpectedKeys)) + } + if f.Detail != "" { + fmt.Fprintf(&b, " (%s)", f.Detail) + } + b.WriteByte('\n') + } + for xds, n := range r.XDSRejections { + fmt.Fprintf(&b, " NACK: xDS %s update_rejected=%d\n", xds, n) + } + if r.ResourceExhausted > 0 { + fmt.Fprintf(&b, " CEILING_TRUNCATION: gRPC ResourceExhausted=%d\n", r.ResourceExhausted) + } + return b.String() +} + +func statusWord(ok bool) string { + if ok { + return "PASS" + } + return "FAIL" +} + +// truncateKeys caps a key list for log readability. +func truncateKeys(keys []string) []string { + const max = 5 + if len(keys) <= max { + return keys + } + out := append([]string{}, keys[:max]...) + return append(out, fmt.Sprintf("...(+%d more)", len(keys)-max)) +} + +// Compare diffs intended against live and classifies each kind of change. The +// key-by-key match is the important one: it is the only way to catch a change +// that quietly applied to the wrong place. +func Compare(exp Expected, act Actual) ParityReport { + report := ParityReport{ExpectedBuildID: exp.BuildID} + + for _, fam := range AllFamilies { + report.Families = append(report.Families, compareFamily(fam, exp, act)) + } + + // Record any updates the proxy rejected. + for xds, n := range act.XDSRejected { + if n > 0 { + if report.XDSRejections == nil { + report.XDSRejections = map[string]int{} + } + report.XDSRejections[xds] = n + } + } + report.ResourceExhausted = act.ResourceExhausted + return report +} + +// compareFamily classifies one kind of change. Removed certificates are the +// exception: success there means the certificate is gone, so its "actual" is how +// many that should be absent are still present. +func compareFamily(fam Family, exp Expected, act Actual) FamilyResult { + if fam == FamilyTLSPrune { + return compareTLSPrune(exp, act) + } + + expKeys := normalize(exp.Keys[fam]) + actKeys := normalize(act.Keys[fam]) + res := FamilyResult{ + Family: fam, + ExpectedCount: len(expKeys), + ActualCount: len(actKeys), + } + + switch { + case len(expKeys) == 0: + // Nothing expected — any actual is not this gate's concern (the gate + // asserts what the ext-server INTENDED is present, not the inverse). + res.Class = ClassOK + case len(actKeys) == 0: + res.Class = ClassMissing + res.MissingKeys = expKeys + res.Detail = "expected mutations but the config_dump carries none (inert / never-applied)" + case len(actKeys) < len(expKeys): + res.Class = ClassCountMismatch + res.MissingKeys = setDiff(expKeys, actKeys) + res.Detail = "config_dump carries fewer than expected (truncated / partial publish)" + default: + missing := setDiff(expKeys, actKeys) + if len(missing) == 0 { + res.Class = ClassOK + } else { + // Right number of changes, but not the ones we expected — something + // applied in the wrong place. + res.Class = ClassWrongKeyed + res.MissingKeys = missing + res.UnexpectedKeys = setDiff(actKeys, expKeys) + res.Detail = "expected key set not present in the config_dump (applied to wrong route/gateway)" + } + } + return res +} + +// compareTLSPrune checks the opposite of the others: a certificate the extension +// server says it removed must be gone from the proxy. One still present means the +// removal never took effect. +func compareTLSPrune(exp Expected, act Actual) FamilyResult { + res := FamilyResult{ + Family: FamilyTLSPrune, + ExpectedCount: exp.TLSPrunedSecrets, + ActualCount: len(act.TLSDroppedSecretsStillPresent), + } + if exp.TLSPrunedSecrets == 0 { + res.Class = ClassOK + return res + } + if len(act.TLSDroppedSecretsStillPresent) > 0 { + res.Class = ClassMissing // a removal that didn't take effect + res.UnexpectedKeys = normalize(act.TLSDroppedSecretsStillPresent) + res.Detail = "pruned TLS secret(s) still present in the config_dump (prune did not take effect)" + return res + } + res.Class = ClassOK + return res +} + +// normalize returns a sorted, de-duplicated copy of keys (nil-safe). +func normalize(keys []string) []string { + if len(keys) == 0 { + return nil + } + seen := make(map[string]struct{}, len(keys)) + out := make([]string, 0, len(keys)) + for _, k := range keys { + if _, ok := seen[k]; ok { + continue + } + seen[k] = struct{}{} + out = append(out, k) + } + sort.Strings(out) + return out +} + +// setDiff returns the elements of a not present in b. Both inputs must be +// normalized (sorted, deduped). +func setDiff(a, b []string) []string { + bset := make(map[string]struct{}, len(b)) + for _, x := range b { + bset[x] = struct{}{} + } + var out []string + for _, x := range a { + if _, ok := bset[x]; !ok { + out = append(out, x) + } + } + return out +} diff --git a/test/parity/parity_test.go b/test/parity/parity_test.go new file mode 100644 index 00000000..762e9ef4 --- /dev/null +++ b/test/parity/parity_test.go @@ -0,0 +1,154 @@ +package parity + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func exp(keys map[Family][]string) Expected { + e := Expected{BuildID: 1, Keys: keys, Counts: map[Family]int{}} + for f, k := range keys { + e.Counts[f] = len(k) + } + return e +} + +func actual(keys map[Family][]string) Actual { + return Actual{Keys: keys} +} + +func TestCompare_OK(t *testing.T) { + keys := map[Family][]string{ + FamilyWAFRoute: {"rc##vh##fwd##ns/tpp/Observe"}, + FamilyConnectorCluster: {"httproute/ns/proxy/rule/0"}, + } + rep := Compare(exp(keys), actual(keys)) + assert.True(t, rep.OK(), rep.String()) +} + +// TestCompare_WAFDisabledIsNotMissing covers the case where the firewall is +// turned off: the extension server intended no firewall changes and the proxy +// has none. That must pass — the comparison asks whether what was intended is +// present, and nothing was intended. +func TestCompare_WAFDisabledIsNotMissing(t *testing.T) { + want := map[Family][]string{ + FamilyConnectorCluster: {"httproute/ns/proxy/rule/0"}, + FamilyLocalReply: {"l##fc"}, + } + got := map[Family][]string{ + FamilyConnectorCluster: {"httproute/ns/proxy/rule/0"}, + FamilyLocalReply: {"l##fc"}, + } + rep := Compare(exp(want), actual(got)) + assert.True(t, rep.OK(), rep.String()) + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyWAFRoute).Class, "zero-expected WAF route must be OK, not MISSING") + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyWAFHCM).Class, "zero-expected WAF HCM must be OK, not MISSING") + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyConnectorCluster).Class) + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyLocalReply).Class) +} + +func TestCompare_Missing(t *testing.T) { + want := map[Family][]string{FamilyWAFRoute: {"rc##vh##fwd##ns/tpp/Observe"}} + rep := Compare(exp(want), actual(nil)) + r := familyResult(t, rep, FamilyWAFRoute) + assert.Equal(t, ClassMissing, r.Class) + assert.Equal(t, want[FamilyWAFRoute], r.MissingKeys) + assert.False(t, rep.OK()) +} + +func TestCompare_CountMismatch(t *testing.T) { + want := map[Family][]string{FamilyWAFRoute: {"a", "b", "c"}} + got := map[Family][]string{FamilyWAFRoute: {"a", "b"}} + rep := Compare(exp(want), actual(got)) + r := familyResult(t, rep, FamilyWAFRoute) + assert.Equal(t, ClassCountMismatch, r.Class) + assert.Equal(t, []string{"c"}, r.MissingKeys) +} + +// TestCompare_WrongKeyed covers the case only this comparison can catch: the +// right number of changes, but applied to the wrong place. +func TestCompare_WrongKeyed(t *testing.T) { + want := map[Family][]string{FamilyWAFRoute: {"rc##vh##fwd##nsA/tpp/Observe"}} + got := map[Family][]string{FamilyWAFRoute: {"rc##vh##fwd##nsB/tpp/Observe"}} + rep := Compare(exp(want), actual(got)) + r := familyResult(t, rep, FamilyWAFRoute) + assert.Equal(t, ClassWrongKeyed, r.Class) + assert.Equal(t, r.ExpectedCount, r.ActualCount, "counts equal — count-only gate would pass") + assert.Equal(t, []string{"rc##vh##fwd##nsA/tpp/Observe"}, r.MissingKeys) + assert.Equal(t, []string{"rc##vh##fwd##nsB/tpp/Observe"}, r.UnexpectedKeys) +} + +func TestCompare_NACK(t *testing.T) { + a := actual(nil) + a.XDSRejected = map[string]int{"lds": 1, "rds": 0} + rep := Compare(exp(nil), a) + assert.False(t, rep.OK()) + require.Contains(t, rep.XDSRejections, "lds") + assert.NotContains(t, rep.XDSRejections, "rds", "zero-delta types must not be reported") +} + +func TestCompare_CeilingTruncation(t *testing.T) { + a := actual(nil) + a.ResourceExhausted = 3 + rep := Compare(exp(nil), a) + assert.False(t, rep.OK()) + assert.Equal(t, 3, rep.ResourceExhausted) + classes := map[FailureClass]bool{} + for _, f := range rep.Failures() { + classes[f.Class] = true + } + assert.True(t, classes[ClassCeilingTruncation]) +} + +func TestCompareTLSPrune_NegativeAssertion(t *testing.T) { + e := Expected{BuildID: 1, Keys: map[Family][]string{}, Counts: map[Family]int{}, TLSPrunedSecrets: 2} + + // Pass: no pruned secret remains in the dump. + rep := Compare(e, Actual{Keys: map[Family][]string{}}) + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyTLSPrune).Class) + + // Fail: a pruned secret is still present. + a := Actual{Keys: map[Family][]string{}, TLSDroppedSecretsStillPresent: []string{"bad-cert"}} + rep2 := Compare(e, a) + r := familyResult(t, rep2, FamilyTLSPrune) + assert.Equal(t, ClassMissing, r.Class) + assert.Equal(t, []string{"bad-cert"}, r.UnexpectedKeys) +} + +func familyResult(t *testing.T, rep ParityReport, fam Family) FamilyResult { + t.Helper() + for _, f := range rep.Families { + if f.Family == fam { + return f + } + } + t.Fatalf("family %s not in report", fam) + return FamilyResult{} +} + +// --- stats scrape --- + +func TestScrapeXDSRejected(t *testing.T) { + stats := []byte(` +# HELP envoy_listener_manager_lds_update_rejected +envoy_listener_manager_lds_update_rejected 2 +envoy_cluster_manager_cds_update_rejected 0 +envoy_http_foo_rds_bar_update_rejected 1 +envoy_listener_manager_lds_update_success 40 +`) + got := scrapeXDSRejected(stats) + assert.Equal(t, 2, got["lds"]) + assert.Equal(t, 1, got["rds"]) + _, hasCDS := got["cds"] + assert.False(t, hasCDS, "zero-value counters must be skipped") +} + +func TestScrapeResourceExhausted(t *testing.T) { + metrics := []byte(` +grpc_server_handled_total{grpc_code="OK",grpc_method="PostTranslateModify"} 100 +grpc_server_handled_total{grpc_code="ResourceExhausted",grpc_method="PostTranslateModify"} 4 +`) + assert.Equal(t, 4, scrapeResourceExhausted(metrics)) +} diff --git a/test/parity/register.go b/test/parity/register.go new file mode 100644 index 00000000..eb2ef36b --- /dev/null +++ b/test/parity/register.go @@ -0,0 +1,21 @@ +package parity + +// These message types can appear nested inside the proxy's configuration, and +// each must be linked into the binary or parsing that configuration fails. Add +// to this list whenever a new dump surfaces a type the parser can't resolve. +import ( + _ "github.com/cncf/xds/go/xds/type/v3" + _ "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/filters/http/golang/v3alpha" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/health_check/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/resource_monitors/downstream_connections/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/internal_upstream/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/raw_buffer/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3" +) diff --git a/test/parity/scan.go b/test/parity/scan.go new file mode 100644 index 00000000..a5eaf59d --- /dev/null +++ b/test/parity/scan.go @@ -0,0 +1,188 @@ +package parity + +import ( + "strings" + + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + hcmv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" +) + +// This file scans the proxy's live configuration for each kind of change and +// emits an identity for every one it finds. These identity strings must match +// the ones the extension server produces exactly, or the comparison is +// meaningless; keep both sides in lockstep. + +const ( + keySep = "##" + + // Well-known marker strings, mirrored from where the configuration is written. + datumGatewayMetaKey = "datum-gateway" + connectorInternalTransport = "envoy.transport_sockets.internal_upstream" + hcmNetworkFilterName = "envoy.filters.network.http_connection_manager" + offlineBodyMarker = "Tunnel not online" +) + +// ScanActual scans the proxy's live configuration and assembles what it +// actually found. corazaFilterName is the firewall filter name used to spot +// firewall injection; droppedSecrets names the TLS certificates that should have +// been removed. +func ScanActual(dump *ConfigDump, corazaFilterName string, droppedSecrets []string) Actual { + act := Actual{Keys: map[Family][]string{}} + + for _, rc := range dump.Routes { + scanRouteConfig(rc, &act) + } + for _, cl := range dump.Clusters { + if isReplacedConnectorCluster(cl) { + act.Keys[FamilyConnectorCluster] = append(act.Keys[FamilyConnectorCluster], cl.GetName()) + } + } + for _, l := range dump.Listeners { + scanListener(l, corazaFilterName, &act) + } + + // Flag any certificate that should have been removed but is still present. + if len(droppedSecrets) > 0 { + present := make(map[string]struct{}, len(dump.SecretNames)) + for _, n := range dump.SecretNames { + present[n] = struct{}{} + } + for _, dropped := range droppedSecrets { + if _, ok := present[dropped]; ok { + act.TLSDroppedSecretsStillPresent = append(act.TLSDroppedSecretsStillPresent, dropped) + } + } + } + return act +} + +func scanRouteConfig(rc *routev3.RouteConfiguration, act *Actual) { + rcName := rc.GetName() + for _, vh := range rc.GetVirtualHosts() { + vhName := vh.GetName() + for _, rt := range vh.GetRoutes() { + if ns, name, mode, ok := datumGatewayTPP(rt); ok { + act.Keys[FamilyWAFRoute] = append(act.Keys[FamilyWAFRoute], + wafRouteKey(rcName, vhName, rt.GetName(), ns, name, mode)) + } + if isConnectRoute(rt) { + act.Keys[FamilyConnectorRoute] = append(act.Keys[FamilyConnectorRoute], + connectorRouteKey(rcName, vhName, rt.GetName())) + } + if isOfflineDirectResponse(rt) { + act.Keys[FamilyConnectorOffline] = append(act.Keys[FamilyConnectorOffline], + connectorRouteKey(rcName, vhName, rt.GetName())) + } + } + } +} + +func scanListener(l *listenerv3.Listener, corazaFilterName string, act *Actual) { + lName := l.GetName() + eachHCM(l, func(chainName string, hcm *hcmv3.HttpConnectionManager) { + if corazaFilterName != "" && hcmHasFilterAtZero(hcm, corazaFilterName) { + act.Keys[FamilyWAFHCM] = append(act.Keys[FamilyWAFHCM], listenerChainKey(lName, chainName)) + } + if hcm.GetLocalReplyConfig() != nil { + act.Keys[FamilyLocalReply] = append(act.Keys[FamilyLocalReply], listenerChainKey(lName, chainName)) + } + }) +} + +// --- identity formats (MUST match the extension server) --- + +func wafRouteKey(rc, vh, rt, tppNS, tppName, mode string) string { + return strings.Join([]string{rc, vh, rt, tppNS + "/" + tppName + "/" + mode}, keySep) +} + +func connectorRouteKey(rc, vh, rt string) string { + return strings.Join([]string{rc, vh, rt}, keySep) +} + +func listenerChainKey(listener, chain string) string { + return strings.Join([]string{listener, chain}, keySep) +} + +// --- detectors (mirror, in reverse, how the configuration is written) --- + +func datumGatewayTPP(rt *routev3.Route) (ns, name, mode string, ok bool) { + md := rt.GetMetadata() + if md == nil { + return "", "", "", false + } + dg := md.GetFilterMetadata()[datumGatewayMetaKey] + if dg == nil { + return "", "", "", false + } + res := dg.GetFields()["resources"].GetListValue() + if res == nil || len(res.GetValues()) == 0 { + return "", "", "", false + } + f := res.GetValues()[0].GetStructValue().GetFields() + return f["namespace"].GetStringValue(), + f["name"].GetStringValue(), + f["mode"].GetStringValue(), + true +} + +func isConnectRoute(rt *routev3.Route) bool { + ra := rt.GetRoute() + if ra == nil { + return false + } + for _, uc := range ra.GetUpgradeConfigs() { + if strings.EqualFold(uc.GetUpgradeType(), "CONNECT") { + return true + } + } + return false +} + +func isOfflineDirectResponse(rt *routev3.Route) bool { + dr := rt.GetDirectResponse() + if dr == nil { + return false + } + if dr.GetStatus() != 503 { + return false + } + return dr.GetBody().GetInlineString() == offlineBodyMarker +} + +func isReplacedConnectorCluster(cl *clusterv3.Cluster) bool { + if cl.GetType() != clusterv3.Cluster_STATIC { + return false + } + return cl.GetTransportSocket().GetName() == connectorInternalTransport +} + +func eachHCM(l *listenerv3.Listener, fn func(chainName string, hcm *hcmv3.HttpConnectionManager)) { + chains := make([]*listenerv3.FilterChain, 0, len(l.GetFilterChains())+1) + chains = append(chains, l.GetFilterChains()...) + if dfc := l.GetDefaultFilterChain(); dfc != nil { + chains = append(chains, dfc) + } + for _, fc := range chains { + for _, f := range fc.GetFilters() { + if f.GetName() != hcmNetworkFilterName { + continue + } + tc := f.GetTypedConfig() + if tc == nil { + continue + } + hcm := &hcmv3.HttpConnectionManager{} + if err := tc.UnmarshalTo(hcm); err != nil { + continue + } + fn(fc.GetName(), hcm) + } + } +} + +func hcmHasFilterAtZero(hcm *hcmv3.HttpConnectionManager, filterName string) bool { + fs := hcm.GetHttpFilters() + return len(fs) > 0 && fs[0].GetName() == filterName +}