diff --git a/cmd/parity-check/main.go b/cmd/parity-check/main.go new file mode 100644 index 00000000..b145cdbc --- /dev/null +++ b/cmd/parity-check/main.go @@ -0,0 +1,342 @@ +// Command parity-check is the command-line wrapper around the parity package. It +// reads the configuration the extension server intended to program, reads the +// proxy's live configuration, compares them, prints the result as JSON, and +// exits non-zero on any mismatch. +// +// The proxy and extension server can be reached either by a direct URL or, when +// their containers ship without any shell or HTTP client, by forwarding a port +// to the pod and fetching through it from this process. The flags below select +// which. +package main + +import ( + "bufio" + "context" + "encoding/json" + "flag" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "strings" + "time" + + "go.datum.net/network-services-operator/test/parity" +) + +func main() { + cfg := parseFlags() + if err := run(cfg); err != nil { + fmt.Fprintf(os.Stderr, "parity-check: %v\n", err) + os.Exit(2) // 2 = could not fetch or parse; 1 = mismatch found. + } +} + +type config struct { + corazaFilter string + timeout time.Duration + + adminURL string + admin execTarget + + extURL string + ext execTarget + // extSelector matches all extension server replicas. Only one replica + // actually builds configuration at a time, so the check must query every + // replica and use the authoritative one. Mutually exclusive with + // --ext-exec-pod / --ext-url. + extSelector string + + // expectMinBuildID, when set, requires the build count to be at least this + // value, proving a fresh build happened. Capture the count before a change, + // then require a higher one after. + expectMinBuildID uint64 + jsonOut bool + + // printBuildID, when true, resolves the authoritative replica and prints only + // its build count, then exits without comparing anything. Used to capture the + // count before a change and to wait for configuration to settle. + printBuildID bool +} + +type execTarget struct { + pod string + namespace string + container string + kubeCtx string +} + +func (e execTarget) set() bool { return e.pod != "" } + +func parseFlags() config { + var c config + flag.StringVar(&c.corazaFilter, "coraza-filter", "coraza-waf", + "Coraza HTTP filter name (identifies WAF HCM injection)") + flag.DurationVar(&c.timeout, "timeout", 30*time.Second, "overall fetch timeout") + + flag.StringVar(&c.adminURL, "admin-url", "", + "Envoy admin base URL (e.g. http://127.0.0.1:19000); use this OR --admin-exec-pod") + flag.StringVar(&c.admin.pod, "admin-exec-pod", "", "proxy pod to kubectl-exec into for admin access") + flag.StringVar(&c.admin.namespace, "admin-exec-namespace", "envoy-gateway-system", "namespace of the proxy pod") + flag.StringVar(&c.admin.container, "admin-exec-container", "envoy", "container in the proxy pod") + flag.StringVar(&c.admin.kubeCtx, "admin-exec-context", "", "kubeconfig context for admin exec (optional)") + + flag.StringVar(&c.extURL, "ext-url", "", + "ext-server health base URL (e.g. http://127.0.0.1:8080); use this OR --ext-exec-pod / --ext-exec-selector") + flag.StringVar(&c.ext.pod, "ext-exec-pod", "", "single ext-server pod to kubectl-exec into") + flag.StringVar(&c.extSelector, "ext-exec-selector", "", + "label selector for ALL ext-server replicas; picks the authoritative one (preferred over --ext-exec-pod)") + flag.StringVar(&c.ext.namespace, "ext-exec-namespace", "", "namespace of the ext-server pod(s)") + flag.StringVar(&c.ext.container, "ext-exec-container", "", "container in the ext-server pod") + flag.StringVar(&c.ext.kubeCtx, "ext-exec-context", "", "kubeconfig context for ext exec (optional)") + + flag.Uint64Var(&c.expectMinBuildID, "expect-min-build-id", 0, "require programmed-set BuildID >= this (STALE oracle)") + flag.BoolVar(&c.jsonOut, "json", true, "emit the ParityReport as JSON") + flag.BoolVar(&c.printBuildID, "print-build-id", false, + "resolve the authoritative ext-server replica and print ONLY its BuildID, then exit") + flag.Parse() + return c +} + +func run(c config) error { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Resolve the authoritative replica and emit only its build count, without + // fetching or comparing the proxy's configuration. + if c.printBuildID { + expected, _, err := resolveExpected(ctx, c) + if err != nil { + return err + } + fmt.Printf("%d\n", expected.BuildID) + return nil + } + + adminFetcher, err := buildFetcher(c.adminURL, c.admin, true) + if err != nil { + return fmt.Errorf("admin fetcher: %w", err) + } + + // Resolve the intended configuration and the fetcher for the authoritative + // replica. With a selector this queries every replica and picks the active + // one; otherwise it uses the single configured endpoint. + expected, extFetcher, err := resolveExpected(ctx, c) + if err != nil { + return err + } + if c.expectMinBuildID > 0 && expected.BuildID < c.expectMinBuildID { + return fmt.Errorf("STALE: programmed-set BuildID %d < required %d (ext-server did not re-translate)", + expected.BuildID, c.expectMinBuildID) + } + + droppedSecrets := expected.Keys[parity.FamilyTLSPrune] + actual, _, err := parity.FetchActual(ctx, adminFetcher, extFetcher, c.corazaFilter, droppedSecrets) + if err != nil { + return err + } + + report := parity.Compare(expected, actual) + + if c.jsonOut { + enc := json.NewEncoder(os.Stdout) + enc.SetIndent("", " ") + _ = enc.Encode(report) + } + fmt.Fprintln(os.Stderr, report.String()) + + if !report.OK() { + os.Exit(1) // mismatch found — distinct from a fetch or parse failure (exit 2). + } + return nil +} + +// resolveExpected returns the intended configuration and the fetcher for the +// authoritative replica. With a selector it queries every replica and picks the +// active one; otherwise it uses the single configured endpoint. The chosen +// fetcher is reused later so other signals also come from the active replica. +func resolveExpected(ctx context.Context, c config) (parity.Expected, parity.Fetcher, error) { + if c.extSelector != "" { + pods, err := resolvePods(ctx, c.ext.kubeCtx, c.ext.namespace, c.extSelector) + if err != nil { + return parity.Expected{}, nil, fmt.Errorf("resolve ext-server pods: %w", err) + } + if len(pods) == 0 { + return parity.Expected{}, nil, fmt.Errorf( + "no ext-server pods matched selector %q in namespace %q", c.extSelector, c.ext.namespace) + } + fetchers := make(map[string]parity.Fetcher, len(pods)) + for _, pod := range pods { + et := c.ext + et.pod = pod + fetchers[pod] = &pfFetcher{target: et, remotePort: "8080"} + } + src, perReplicaErrs, err := parity.FetchExpectedFromMany(ctx, fetchers) + if err != nil { + return parity.Expected{}, nil, err + } + // Report per-replica fetch errors and the chosen replica for diagnosis; + // they are not fatal as long as one replica answered. + for pod, e := range perReplicaErrs { + fmt.Fprintf(os.Stderr, "parity-check: ext replica %s unreachable: %v\n", pod, e) + } + fmt.Fprintf(os.Stderr, "parity-check: authoritative ext replica %s (BuildID %d) of %d\n", + src.Replica, src.Expected.BuildID, len(pods)) + return src.Expected, fetchers[src.Replica], nil + } + + // Single-endpoint fallback. + extFetcher, err := buildFetcher(c.extURL, c.ext, false) + if err != nil { + return parity.Expected{}, nil, fmt.Errorf("ext fetcher: %w", err) + } + exp, err := parity.FetchExpected(ctx, extFetcher) + if err != nil { + return parity.Expected{}, nil, err + } + return exp, extFetcher, nil +} + +// resolvePods returns the names of the running pods matching selector, used to +// enumerate the extension server replicas. +func resolvePods(ctx context.Context, kubeCtx, namespace, selector string) ([]string, error) { + args := []string{} + if kubeCtx != "" { + args = append(args, "--context", kubeCtx) + } + if namespace != "" { + args = append(args, "-n", namespace) + } + args = append(args, "get", "pods", "-l", selector, + "--field-selector=status.phase=Running", + "-o", "jsonpath={.items[*].metadata.name}") + + cmd := exec.CommandContext(ctx, "kubectl", args...) + var stderr strings.Builder + cmd.Stderr = &stderr + out, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("kubectl get pods -l %s: %w: %s", selector, err, strings.TrimSpace(stderr.String())) + } + return strings.Fields(string(out)), nil +} + +// buildFetcher returns a direct HTTP fetcher when a URL is given, otherwise a +// port-forward fetcher to the given pod. admin selects which remote port to +// reach. Port-forwarding is used rather than running a command inside the pod +// because the containers ship without a shell or HTTP client. +func buildFetcher(url string, et execTarget, admin bool) (parity.Fetcher, error) { + switch { + case url != "": + return parity.HTTPFetcher{BaseURL: url}, nil + case et.set(): + port := "8080" + if admin { + port = "19000" + } + return &pfFetcher{target: et, remotePort: port}, nil + default: + return nil, fmt.Errorf("either a URL or an exec pod must be provided") + } +} + +// pfFetcher fetches a path by forwarding a local port to the pod and making the +// request from this process. The forward is set up and torn down per call, on a +// fresh local port each time so concurrent fetches don't collide. +type pfFetcher struct { + target execTarget + remotePort string +} + +func (p *pfFetcher) Fetch(ctx context.Context, path string) ([]byte, error) { + localPort, err := freeLocalPort() + if err != nil { + return nil, fmt.Errorf("allocate local port: %w", err) + } + + args := []string{} + if p.target.kubeCtx != "" { + args = append(args, "--context", p.target.kubeCtx) + } + if p.target.namespace != "" { + args = append(args, "-n", p.target.namespace) + } + args = append(args, "port-forward", "pod/"+p.target.pod, + fmt.Sprintf("%d:%s", localPort, p.remotePort)) + + // The forward must outlive the single request but be torn down right after. + pfCtx, cancel := context.WithCancel(ctx) + defer cancel() + cmd := exec.CommandContext(pfCtx, "kubectl", args...) + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + var stderr strings.Builder + cmd.Stderr = &stderr + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("start port-forward: %w", err) + } + defer func() { cancel(); _ = cmd.Wait() }() + + // Wait until the forward is listening before fetching. + if err := waitForwardReady(pfCtx, stdout); err != nil { + return nil, fmt.Errorf("port-forward not ready: %w (%s)", err, strings.TrimSpace(stderr.String())) + } + + url := fmt.Sprintf("http://127.0.0.1:%d%s", localPort, path) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("GET %s via port-forward: %w", url, err) + } + defer func() { _ = resp.Body.Close() }() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read %s: %w", url, err) + } + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("GET %s: status %d", url, resp.StatusCode) + } + return body, nil +} + +// freeLocalPort asks the operating system for an unused port. Another process +// could claim it in the brief gap before we bind, but that window is small and +// the caller retries. +func freeLocalPort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + defer func() { _ = l.Close() }() + return l.Addr().(*net.TCPAddr).Port, nil +} + +// waitForwardReady blocks until the forward reports that its local socket is +// listening, or the context is done. +func waitForwardReady(ctx context.Context, stdout io.Reader) error { + ready := make(chan struct{}, 1) + go func() { + sc := bufio.NewScanner(stdout) + for sc.Scan() { + if strings.Contains(sc.Text(), "Forwarding from") { + ready <- struct{}{} + return + } + } + }() + select { + case <-ready: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-time.After(15 * time.Second): + return fmt.Errorf("timed out waiting for port-forward") + } +} diff --git a/cmd/parity-check/main_test.go b/cmd/parity-check/main_test.go new file mode 100644 index 00000000..6f94911c --- /dev/null +++ b/cmd/parity-check/main_test.go @@ -0,0 +1,42 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.datum.net/network-services-operator/test/parity" +) + +func TestBuildFetcher_URL(t *testing.T) { + f, err := buildFetcher("http://127.0.0.1:19000", execTarget{}, true) + require.NoError(t, err) + h, ok := f.(parity.HTTPFetcher) + require.True(t, ok) + assert.Equal(t, "http://127.0.0.1:19000", h.BaseURL) +} + +func TestBuildFetcher_PortForward(t *testing.T) { + f, err := buildFetcher("", execTarget{pod: "envoy-abc", namespace: "envoy-gateway-system", container: "envoy"}, true) + require.NoError(t, err) + e, ok := f.(*pfFetcher) + require.True(t, ok) + assert.Equal(t, "19000", e.remotePort, "admin fetcher must port-forward the admin port") + + fExt, err := buildFetcher("", execTarget{pod: "ext-abc", namespace: "nso"}, false) + require.NoError(t, err) + eExt := fExt.(*pfFetcher) + assert.Equal(t, "8080", eExt.remotePort, "ext fetcher must port-forward the health port") +} + +func TestFreeLocalPort(t *testing.T) { + p, err := freeLocalPort() + require.NoError(t, err) + assert.Greater(t, p, 0) +} + +func TestBuildFetcher_NeitherErrors(t *testing.T) { + _, err := buildFetcher("", execTarget{}, true) + assert.Error(t, err) +} diff --git a/test/parity/README.md b/test/parity/README.md new file mode 100644 index 00000000..43d3ea5b --- /dev/null +++ b/test/parity/README.md @@ -0,0 +1,58 @@ +# The parity check + +The parity check answers one question the customer ultimately cares about: + +> **Is the edge actually doing what it was told to do?** + +It compares two things — what the platform *intended* to program onto the edge, +and what the running proxy is *actually* serving — and reports any difference. +It's the tie-breaker described in [`docs/testing/README.md`](../../docs/testing/README.md), +backing up the real-traffic guarantees in +[`test/e2e/README.md`](../e2e/README.md). + +## Why real traffic isn't enough on its own + +A successful response is reassuring but ambiguous. If a firewall is supposed to +block an attack and the attack instead succeeds, the response alone can't tell +you *why*: is the firewall switched off, did the rule not match, did the edge +never pick up the new configuration, or was the configuration applied to the +wrong place? Worse, the most dangerous version of this failure is silent — a +firewall that protects nothing still lets every ordinary request through, so +everything looks healthy right up until a real attack arrives in production. + +The parity check removes the ambiguity. It looks past the response at the edge's +actual live configuration, so a surprising result becomes a specific, named +diagnosis instead of a guess. + +## What it catches + +It sorts any gap between intended and actual into the categories that map to +real incidents: + +| Diagnosis | What it means for the customer | +|---|---| +| **Missing** | The edge was told to add protection but carries none — the rule is inert. | +| **Incomplete** | Only part of what was intended made it — partial protection. | +| **Misplaced** | The right amount of configuration, applied to the wrong place. | +| **Rejected** | The edge refused the update outright and is serving stale config. | +| **Overflowed** | The configuration was too large to deliver and was silently dropped. | + +## How it fits the suite + +- **Real traffic is the verdict.** Did the edge behave correctly? That's the + pass/fail. +- **Parity is the diagnosis.** When the verdict surprises, parity says which of + the failures above you're in — and it catches the silent, protects-nothing + case that ordinary traffic can't reveal. + +The rule the suite holds to: parity supports a real-traffic test, it never +replaces one. A scenario that could send the attack and watch it be blocked does +exactly that; parity is added on top, never in place of the behavior itself. + +## Using it + +The check ships as a small command, `parity-check` (see +[`cmd/parity-check`](../../cmd/parity-check)), which the end-to-end suite runs +automatically as a shared step. It can also be pointed at a live edge on its own +to ask, "does this proxy match what it was told?" — useful when diagnosing an +edge that's misbehaving. diff --git a/test/parity/configdump.go b/test/parity/configdump.go new file mode 100644 index 00000000..aa2741be --- /dev/null +++ b/test/parity/configdump.go @@ -0,0 +1,147 @@ +package parity + +import ( + "fmt" + + adminv3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3" + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" +) + +// ConfigDump is the parsed view of the proxy's live configuration, with each +// resource decoded into its concrete type. +type ConfigDump struct { + Clusters []*clusterv3.Cluster + Routes []*routev3.RouteConfiguration + Listeners []*listenerv3.Listener + // SecretNames are the TLS certificates present, used to confirm that + // certificates we expected to be removed are actually gone. + SecretNames []string + // ErrorStates holds, per kind of configuration, the reasons the proxy gave + // for rejecting an individual resource. + ErrorStates map[string][]string +} + +// ParseConfigDump decodes the proxy's live configuration into typed resources. +// We decode against the real schema rather than poking at raw JSON so that a +// shape we don't recognize is a hard error, not a silent miss. +func ParseConfigDump(raw []byte) (*ConfigDump, error) { + var dump adminv3.ConfigDump + // Tolerate fields newer than this binary knows about, so a proxy running a + // newer version doesn't fail the parse. + if err := (protojson.UnmarshalOptions{DiscardUnknown: true}).Unmarshal(raw, &dump); err != nil { + return nil, fmt.Errorf("unmarshal config_dump envelope: %w", err) + } + + out := &ConfigDump{ErrorStates: map[string][]string{}} + + for i, cfgAny := range dump.GetConfigs() { + msg, err := cfgAny.UnmarshalNew() + if err != nil { + return nil, fmt.Errorf("unmarshal config_dump section %d (%s): %w", i, cfgAny.GetTypeUrl(), err) + } + switch section := msg.(type) { + case *adminv3.ClustersConfigDump: + if err := out.absorbClusters(section); err != nil { + return nil, err + } + case *adminv3.RoutesConfigDump: + if err := out.absorbRoutes(section); err != nil { + return nil, err + } + case *adminv3.ListenersConfigDump: + if err := out.absorbListeners(section); err != nil { + return nil, err + } + case *adminv3.SecretsConfigDump: + out.absorbSecrets(section) + default: + // Other sections aren't needed by the scanners. + } + } + return out, nil +} + +func (d *ConfigDump) absorbClusters(s *adminv3.ClustersConfigDump) error { + for _, dc := range s.GetDynamicActiveClusters() { + if es := dc.GetErrorState(); es != nil { + d.ErrorStates["cluster"] = append(d.ErrorStates["cluster"], es.GetDetails()) + } + cl := &clusterv3.Cluster{} + if err := unwrap(dc.GetCluster(), cl); err != nil { + return fmt.Errorf("unwrap dynamic cluster: %w", err) + } + if cl.GetName() != "" { + d.Clusters = append(d.Clusters, cl) + } + } + // Clusters still being applied may carry rejection details too. + for _, dc := range s.GetDynamicWarmingClusters() { + if es := dc.GetErrorState(); es != nil { + d.ErrorStates["cluster"] = append(d.ErrorStates["cluster"], es.GetDetails()) + } + } + return nil +} + +func (d *ConfigDump) absorbRoutes(s *adminv3.RoutesConfigDump) error { + for _, dr := range s.GetDynamicRouteConfigs() { + if es := dr.GetErrorState(); es != nil { + d.ErrorStates["route"] = append(d.ErrorStates["route"], es.GetDetails()) + } + rc := &routev3.RouteConfiguration{} + if err := unwrap(dr.GetRouteConfig(), rc); err != nil { + return fmt.Errorf("unwrap dynamic route config: %w", err) + } + if rc.GetName() != "" { + d.Routes = append(d.Routes, rc) + } + } + return nil +} + +func (d *ConfigDump) absorbListeners(s *adminv3.ListenersConfigDump) error { + for _, dl := range s.GetDynamicListeners() { + if es := dl.GetErrorState(); es != nil { + d.ErrorStates["listener"] = append(d.ErrorStates["listener"], es.GetDetails()) + } + // Scan the listener the proxy is currently serving, not one still being + // applied. + active := dl.GetActiveState() + if active == nil { + continue + } + l := &listenerv3.Listener{} + if err := unwrap(active.GetListener(), l); err != nil { + return fmt.Errorf("unwrap dynamic listener: %w", err) + } + if l.GetName() != "" { + d.Listeners = append(d.Listeners, l) + } + } + return nil +} + +func (d *ConfigDump) absorbSecrets(s *adminv3.SecretsConfigDump) { + for _, ds := range s.GetDynamicActiveSecrets() { + if es := ds.GetErrorState(); es != nil { + d.ErrorStates["secret"] = append(d.ErrorStates["secret"], es.GetDetails()) + } + if n := ds.GetName(); n != "" { + d.SecretNames = append(d.SecretNames, n) + } + } +} + +// unwrap decodes a wrapped resource into dst. A nil input is not an error (the +// resource is simply absent); dst is left zero. +func unwrap(a *anypb.Any, dst proto.Message) error { + if a == nil { + return nil + } + return a.UnmarshalTo(dst) +} diff --git a/test/parity/configdump_test.go b/test/parity/configdump_test.go new file mode 100644 index 00000000..68085fdc --- /dev/null +++ b/test/parity/configdump_test.go @@ -0,0 +1,185 @@ +package parity + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + adminv3 "github.com/envoyproxy/go-control-plane/envoy/admin/v3" + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + hcmv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + internalupstreamv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/internal_upstream/v3" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" +) + +func mustAny(t *testing.T, m proto.Message) *anypb.Any { + t.Helper() + a, err := anypb.New(m) + require.NoError(t, err) + return a +} + +// buildSyntheticDump builds a proxy configuration that looks like the real +// thing, so parsing and scanning can be exercised without a live cluster. +func buildSyntheticDump(t *testing.T, corazaFilter string) []byte { + t.Helper() + + // --- WAF-governed route + CONNECT route --- + wafMeta, err := structpb.NewStruct(map[string]any{ + "resources": []any{map[string]any{ + "kind": "TrafficProtectionPolicy", "namespace": "proj-ns", "name": "test-tpp", "mode": "Observe", + }}, + }) + require.NoError(t, err) + rc := &routev3.RouteConfiguration{ + Name: "gw/https", + VirtualHosts: []*routev3.VirtualHost{{ + Name: "vh", + Domains: []string{"app.example.com", "vh.connector.internal"}, + Routes: []*routev3.Route{ + { + Name: "connector-connect-vh", + Action: &routev3.Route_Route{Route: &routev3.RouteAction{ + UpgradeConfigs: []*routev3.RouteAction_UpgradeConfig{{UpgradeType: "CONNECT"}}, + }}, + }, + { + Name: "fwd", + Metadata: &corev3.Metadata{FilterMetadata: map[string]*structpb.Struct{datumGatewayMetaKey: wafMeta}}, + }, + }, + }}, + } + + // --- replaced connector cluster --- + ts := mustAny(t, &internalupstreamv3.InternalUpstreamTransport{}) + cl := &clusterv3.Cluster{ + Name: "httproute/proj-ns/proxy/rule/0", + ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_STATIC}, + LoadAssignment: &endpointv3.ClusterLoadAssignment{ClusterName: "httproute/proj-ns/proxy/rule/0"}, + TransportSocket: &corev3.TransportSocket{ + Name: connectorInternalTransport, + ConfigType: &corev3.TransportSocket_TypedConfig{TypedConfig: ts}, + }, + } + infraCl := &clusterv3.Cluster{ + Name: "infra", + ClusterDiscoveryType: &clusterv3.Cluster_Type{Type: clusterv3.Cluster_EDS}, + } + + // --- listener with the firewall and a custom error page --- + hcm := &hcmv3.HttpConnectionManager{ + RouteSpecifier: &hcmv3.HttpConnectionManager_Rds{Rds: &hcmv3.Rds{RouteConfigName: "gw/https"}}, + HttpFilters: []*hcmv3.HttpFilter{{Name: corazaFilter, Disabled: true}, {Name: "envoy.filters.http.router"}}, + LocalReplyConfig: &hcmv3.LocalReplyConfig{}, + } + l := &listenerv3.Listener{ + Name: "gw/https", + FilterChains: []*listenerv3.FilterChain{{ + Name: "fc", + Filters: []*listenerv3.Filter{{ + Name: hcmNetworkFilterName, + ConfigType: &listenerv3.Filter_TypedConfig{TypedConfig: mustAny(t, hcm)}, + }}, + }}, + } + + dump := &adminv3.ConfigDump{ + Configs: []*anypb.Any{ + mustAny(t, &adminv3.ClustersConfigDump{ + DynamicActiveClusters: []*adminv3.ClustersConfigDump_DynamicCluster{ + {Cluster: mustAny(t, cl)}, + {Cluster: mustAny(t, infraCl)}, + }, + }), + mustAny(t, &adminv3.RoutesConfigDump{ + DynamicRouteConfigs: []*adminv3.RoutesConfigDump_DynamicRouteConfig{{RouteConfig: mustAny(t, rc)}}, + }), + mustAny(t, &adminv3.ListenersConfigDump{ + DynamicListeners: []*adminv3.ListenersConfigDump_DynamicListener{ + {Name: "gw/https", ActiveState: &adminv3.ListenersConfigDump_DynamicListenerState{Listener: mustAny(t, l)}}, + }, + }), + mustAny(t, &adminv3.SecretsConfigDump{ + DynamicActiveSecrets: []*adminv3.SecretsConfigDump_DynamicSecret{{Name: "good-cert"}}, + }), + }, + } + + raw, err := protojson.Marshal(dump) + require.NoError(t, err) + return raw +} + +func TestParseAndScan_EndToEnd(t *testing.T) { + const corazaFilter = "coraza-waf" + raw := buildSyntheticDump(t, corazaFilter) + + dump, err := ParseConfigDump(raw) + require.NoError(t, err) + + assert.Len(t, dump.Clusters, 2) + assert.Len(t, dump.Routes, 1) + assert.Len(t, dump.Listeners, 1) + assert.Equal(t, []string{"good-cert"}, dump.SecretNames) + + act := ScanActual(dump, corazaFilter, nil) + assert.Equal(t, []string{wafRouteKey("gw/https", "vh", "fwd", "proj-ns", "test-tpp", "Observe")}, + normalize(act.Keys[FamilyWAFRoute])) + assert.Equal(t, []string{"httproute/proj-ns/proxy/rule/0"}, normalize(act.Keys[FamilyConnectorCluster])) + assert.Equal(t, []string{connectorRouteKey("gw/https", "vh", "connector-connect-vh")}, + normalize(act.Keys[FamilyConnectorRoute])) + assert.Equal(t, []string{listenerChainKey("gw/https", "fc")}, normalize(act.Keys[FamilyWAFHCM])) + assert.Equal(t, []string{listenerChainKey("gw/https", "fc")}, normalize(act.Keys[FamilyLocalReply])) +} + +// TestParseAndScan_FullParity is the integration shape: the same key formats are +// produced by the (simulated) ext-server side and the dump side, so Compare +// returns OK — proving the two sides are in lockstep. +func TestParseAndScan_FullParity(t *testing.T) { + const corazaFilter = "coraza-waf" + raw := buildSyntheticDump(t, corazaFilter) + dump, err := ParseConfigDump(raw) + require.NoError(t, err) + act := ScanActual(dump, corazaFilter, nil) + + // Expected derived from the same artifact set (as the programmed-set endpoint + // would report it). + expected := exp(map[Family][]string{ + FamilyWAFRoute: {wafRouteKey("gw/https", "vh", "fwd", "proj-ns", "test-tpp", "Observe")}, + FamilyConnectorCluster: {"httproute/proj-ns/proxy/rule/0"}, + FamilyConnectorRoute: {connectorRouteKey("gw/https", "vh", "connector-connect-vh")}, + FamilyWAFHCM: {listenerChainKey("gw/https", "fc")}, + FamilyLocalReply: {listenerChainKey("gw/https", "fc")}, + }) + + rep := Compare(expected, act) + rep.AssertOK(t) +} + +// TestParseConfigDump_NACKErrorState covers a single resource the proxy +// rejected while keeping the rest. +func TestParseConfigDump_NACKErrorState(t *testing.T) { + dump := &adminv3.ConfigDump{Configs: []*anypb.Any{ + mustAny(t, &adminv3.ListenersConfigDump{ + DynamicListeners: []*adminv3.ListenersConfigDump_DynamicListener{ + {Name: "bad", ErrorState: &adminv3.UpdateFailureState{Details: "KEY_VALUES_MISMATCH"}}, + }, + }), + }} + raw, err := protojson.Marshal(dump) + require.NoError(t, err) + + parsed, err := ParseConfigDump(raw) + require.NoError(t, err) + require.Contains(t, parsed.ErrorStates, "listener") + assert.Contains(t, parsed.ErrorStates["listener"][0], "KEY_VALUES_MISMATCH") +} diff --git a/test/parity/fetch.go b/test/parity/fetch.go new file mode 100644 index 00000000..95da96f4 --- /dev/null +++ b/test/parity/fetch.go @@ -0,0 +1,272 @@ +package parity + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sort" + "strconv" + "strings" +) + +// Fetcher retrieves a response body from an endpoint. It is an interface because +// different environments reach the proxy differently: some can make a plain HTTP +// request, others must tunnel into the proxy pod first. +type Fetcher interface { + Fetch(ctx context.Context, path string) ([]byte, error) +} + +// HTTPFetcher fetches over plain HTTP from a base URL, for endpoints reachable +// directly without tunneling into a pod. +type HTTPFetcher struct { + BaseURL string // e.g. "http://127.0.0.1:19000" + Client *http.Client +} + +func (h HTTPFetcher) Fetch(ctx context.Context, path string) ([]byte, error) { + client := h.Client + if client == nil { + client = http.DefaultClient + } + url := strings.TrimRight(h.BaseURL, "/") + path + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("GET %s: %w", url, err) + } + defer func() { _ = resp.Body.Close() }() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read %s: %w", url, err) + } + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("GET %s: status %d: %s", url, resp.StatusCode, snippet(body)) + } + return body, nil +} + +// FetchExpected reads the configuration the extension server says it intended to +// program, which is the reference we compare the live proxy against. +func FetchExpected(ctx context.Context, extFetcher Fetcher) (Expected, error) { + body, err := extFetcher.Fetch(ctx, ProgrammedSetPath) + if err != nil { + return Expected{}, fmt.Errorf("fetch programmed-set: %w", err) + } + var exp Expected + if err := json.Unmarshal(body, &exp); err != nil { + return Expected{}, fmt.Errorf("decode programmed-set JSON: %w", err) + } + if exp.Keys == nil { + exp.Keys = map[Family][]string{} + } + if exp.Counts == nil { + exp.Counts = map[Family]int{} + } + return exp, nil +} + +// ProgrammedSetPath is duplicated from the extension server rather than imported +// so this package stays free of any internal/ dependency; the two MUST stay in sync. +const ProgrammedSetPath = "/debug/programmed-set" + +// ExpectedSource pairs a fetched Expected set with the replica it came from, so +// the caller can report which replica was authoritative. +type ExpectedSource struct { + Replica string + Expected Expected +} + +// FetchExpectedFromMany queries every extension server replica and returns the +// authoritative one. Only one replica actually builds configuration at a time, +// so querying a fixed replica can hit an idle one and falsely report missing or +// stale configuration. +// +// The replica with the highest build count wins, since that count only ever +// increases on the replica doing the work and idle replicas sit at zero. A tie +// (no replica has built yet) resolves to the first replica so the caller still +// gets a valid empty result instead of an error. +// +// An unreachable replica is skipped rather than fatal, as long as at least one +// replica answered; its error is returned for diagnosis. +func FetchExpectedFromMany(ctx context.Context, fetchers map[string]Fetcher) (ExpectedSource, map[string]error, error) { + if len(fetchers) == 0 { + return ExpectedSource{}, nil, fmt.Errorf("no ext-server replicas to query") + } + + errs := map[string]error{} + var best *ExpectedSource + // Stable order so ties resolve deterministically. + for _, replica := range sortedKeys(fetchers) { + exp, err := FetchExpected(ctx, fetchers[replica]) + if err != nil { + errs[replica] = err + continue + } + src := ExpectedSource{Replica: replica, Expected: exp} + if best == nil || exp.BuildID > best.Expected.BuildID { + s := src + best = &s + } + } + + if best == nil { + return ExpectedSource{}, errs, fmt.Errorf("all %d ext-server replicas failed to answer", len(fetchers)) + } + return *best, errs, nil +} + +// sortedKeys returns the map keys sorted, for deterministic iteration. +func sortedKeys(m map[string]Fetcher) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// FetchActual reads the proxy's live configuration and scans it for the +// artifacts of each change family. corazaFilterName identifies the firewall +// filter; droppedSecrets names the TLS certificates that should have been +// removed, so we can confirm none are still present. +// +// adminFetcher reaches the proxy; extFetcher reaches the extension server to +// check whether any update was too large to deliver. Pass a nil extFetcher to +// skip that check. +func FetchActual( + ctx context.Context, + adminFetcher Fetcher, + extFetcher Fetcher, + corazaFilterName string, + droppedSecrets []string, +) (Actual, *ConfigDump, error) { + dumpRaw, err := adminFetcher.Fetch(ctx, "/config_dump") + if err != nil { + return Actual{}, nil, fmt.Errorf("fetch config_dump: %w", err) + } + dump, err := ParseConfigDump(dumpRaw) + if err != nil { + return Actual{}, nil, err + } + + act := ScanActual(dump, corazaFilterName, droppedSecrets) + + // Counts of updates the proxy rejected. + statsRaw, err := adminFetcher.Fetch(ctx, "/stats?format=prometheus") + if err != nil { + // Best-effort: the comparison still works without these counts, so a + // failure here yields an empty map rather than aborting. + statsRaw = nil + } + act.XDSRejected = scrapeXDSRejected(statsRaw) + + // Whether any update was too large to deliver. + if extFetcher != nil { + if metricsRaw, mErr := extFetcher.Fetch(ctx, "/metrics"); mErr == nil { + act.ResourceExhausted = scrapeResourceExhausted(metricsRaw) + } + } + return act, dump, nil +} + +// snippet returns the first line of b for error messages. +func snippet(b []byte) string { + s := string(b) + if i := strings.IndexByte(s, '\n'); i >= 0 { + s = s[:i] + } + if len(s) > 200 { + s = s[:200] + "..." + } + return s +} + +// scrapeXDSRejected sums the proxy's rejected-update counters, grouped by the +// kind of configuration. Route counters are per-route so they are matched by +// substring rather than exact name. +func scrapeXDSRejected(stats []byte) map[string]int { + out := map[string]int{} + if len(stats) == 0 { + return out + } + for line := range strings.SplitSeq(string(stats), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if !strings.Contains(line, "update_rejected") { + continue + } + name, val := splitMetricLine(line) + if val == 0 { + continue + } + fam := classifyXDS(name) + out[fam] += val + } + return out +} + +// classifyXDS groups a metric by the kind of configuration it counts. The +// proxy rewrites dots to underscores in metric names, so we match on substrings. +func classifyXDS(name string) string { + switch { + case strings.Contains(name, "lds"): + return "lds" + case strings.Contains(name, "cds"): + return "cds" + case strings.Contains(name, "rds"): + return "rds" + case strings.Contains(name, "sds"): + return "sds" + default: + return "other" + } +} + +// scrapeResourceExhausted counts how often the extension server failed to +// deliver an update because it was too large. A non-zero result means +// configuration was truncated. +func scrapeResourceExhausted(metrics []byte) int { + if len(metrics) == 0 { + return 0 + } + total := 0 + for line := range strings.SplitSeq(string(metrics), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + if !strings.HasPrefix(line, "grpc_server_handled_total") { + continue + } + if !strings.Contains(line, `grpc_code="ResourceExhausted"`) { + continue + } + _, val := splitMetricLine(line) + total += val + } + return total +} + +// splitMetricLine parses one metrics line "name{labels} value" into the name +// (without labels) and the integer value (floats are truncated; non-numeric +// yields 0). +func splitMetricLine(line string) (name string, value int) { + fields := strings.Fields(line) + if len(fields) < 2 { + return "", 0 + } + valStr := fields[len(fields)-1] + nameWithLabels := strings.Join(fields[:len(fields)-1], " ") + name, _, _ = strings.Cut(nameWithLabels, "{") + if f, err := strconv.ParseFloat(valStr, 64); err == nil { + value = int(f) + } + return name, value +} diff --git a/test/parity/fetch_test.go b/test/parity/fetch_test.go new file mode 100644 index 00000000..2ec8d1ea --- /dev/null +++ b/test/parity/fetch_test.go @@ -0,0 +1,90 @@ +package parity + +import ( + "context" + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// stubFetcher returns a fixed programmed-set JSON body (or error) for the +// programmed-set path. Models one ext-server replica. +type stubFetcher struct { + buildID int + err error +} + +func (s stubFetcher) Fetch(_ context.Context, path string) ([]byte, error) { + if s.err != nil { + return nil, s.err + } + if path != ProgrammedSetPath { + return nil, fmt.Errorf("unexpected path %s", path) + } + body := fmt.Sprintf(`{"buildID":%d,"keys":{"waf_route":["k%d"]},"counts":{"waf_route":1}}`, + s.buildID, s.buildID) + return []byte(body), nil +} + +// TestFetchExpectedFromMany_PicksHighestBuildID is the core multi-replica rule: +// EG pins its gRPC connection to one replica, so only that pod has the real +// (highest-BuildID) programmed-set; the idle replica sits at 0. +func TestFetchExpectedFromMany_PicksHighestBuildID(t *testing.T) { + fetchers := map[string]Fetcher{ + "ext-replica-a": stubFetcher{buildID: 0}, // idle replica + "ext-replica-b": stubFetcher{buildID: 7}, // active replica + } + src, errs, err := FetchExpectedFromMany(context.Background(), fetchers) + require.NoError(t, err) + assert.Empty(t, errs) + assert.Equal(t, "ext-replica-b", src.Replica, "must pick the active (highest-BuildID) replica") + assert.Equal(t, uint64(7), src.Expected.BuildID) + assert.Equal(t, []string{"k7"}, src.Expected.Keys[FamilyWAFRoute]) +} + +// TestFetchExpectedFromMany_OneUnreachable: one replica down is not fatal as +// long as another answers; the error is surfaced, not returned. +func TestFetchExpectedFromMany_OneUnreachable(t *testing.T) { + fetchers := map[string]Fetcher{ + "ext-replica-a": stubFetcher{err: errors.New("connection refused")}, + "ext-replica-b": stubFetcher{buildID: 3}, + } + src, errs, err := FetchExpectedFromMany(context.Background(), fetchers) + require.NoError(t, err) + assert.Equal(t, "ext-replica-b", src.Replica) + require.Contains(t, errs, "ext-replica-a") + assert.ErrorContains(t, errs["ext-replica-a"], "connection refused") +} + +// TestFetchExpectedFromMany_AllFail returns an error only when no replica answers. +func TestFetchExpectedFromMany_AllFail(t *testing.T) { + fetchers := map[string]Fetcher{ + "a": stubFetcher{err: errors.New("down")}, + "b": stubFetcher{err: errors.New("down")}, + } + _, errs, err := FetchExpectedFromMany(context.Background(), fetchers) + require.Error(t, err) + assert.Len(t, errs, 2) +} + +// TestFetchExpectedFromMany_TieResolvesDeterministically: both at 0 (no build +// anywhere yet) resolves to the first replica in sorted order, returning a +// well-formed empty Expected rather than an error. +func TestFetchExpectedFromMany_TieResolvesDeterministically(t *testing.T) { + fetchers := map[string]Fetcher{ + "ext-z": stubFetcher{buildID: 0}, + "ext-a": stubFetcher{buildID: 0}, + } + src, _, err := FetchExpectedFromMany(context.Background(), fetchers) + require.NoError(t, err) + assert.Equal(t, "ext-a", src.Replica, "ties resolve to the first replica in sorted order") + assert.Equal(t, uint64(0), src.Expected.BuildID) +} + +func TestFetchExpectedFromMany_Empty(t *testing.T) { + _, _, err := FetchExpectedFromMany(context.Background(), nil) + require.Error(t, err) +} diff --git a/test/parity/parity.go b/test/parity/parity.go new file mode 100644 index 00000000..26ee8198 --- /dev/null +++ b/test/parity/parity.go @@ -0,0 +1,313 @@ +// Package parity answers one question: is the edge proxy actually running the +// configuration the platform tried to give it? A successful-looking request can +// hide configuration that never applied, applied only partly, or applied in the +// wrong place — so we compare what was intended against what the proxy is live +// serving and name the gap, turning a surprising result into a cause instead of +// a guess. +package parity + +import ( + "fmt" + "sort" + "strings" + "testing" +) + +// Family groups one kind of edge change so intended and live can be compared +// kind by kind. The values match the names the extension server reports, so its +// output decodes straight into these. +type Family string + +const ( + FamilyWAFRoute Family = "waf_route" + FamilyWAFHCM Family = "waf_hcm" + FamilyLocalReply Family = "local_reply" + FamilyConnectorCluster Family = "connector_cluster" + FamilyConnectorRoute Family = "connector_route" + FamilyConnectorOffline Family = "connector_offline" + FamilyTLSPrune Family = "tls_prune" +) + +// AllFamilies is the canonical iteration order for reports. +var AllFamilies = []Family{ + FamilyWAFRoute, FamilyWAFHCM, FamilyLocalReply, + FamilyConnectorCluster, FamilyConnectorRoute, FamilyConnectorOffline, + FamilyTLSPrune, +} + +// FailureClass names how a comparison differed. The values are stable because +// reports and logs are read by people and tooling outside this package. +type FailureClass string + +const ( + ClassOK FailureClass = "OK" + ClassMissing FailureClass = "MISSING" + ClassCountMismatch FailureClass = "COUNT_MISMATCH" + ClassWrongKeyed FailureClass = "WRONG_KEYED" + ClassNACK FailureClass = "NACK" + ClassCeilingTruncation FailureClass = "CEILING_TRUNCATION" +) + +// Expected is the set of changes the extension server reports it last applied. +type Expected struct { + BuildID uint64 `json:"buildID"` + Keys map[Family][]string `json:"keys"` + Counts map[Family]int `json:"counts"` + TLSPrunedChains int `json:"tlsPrunedChains"` + TLSPrunedSecrets int `json:"tlsPrunedSecrets"` + TLSListenersLeftIntact int `json:"tlsListenersLeftIntact"` +} + +// Actual is what the edge proxy is really running, read back from its live +// configuration and its record of rejected or undelivered updates. +type Actual struct { + // Keys must be formatted exactly as the extension server formats them, so the + // two sides can be compared as sets. + Keys map[Family][]string + // Certificates the extension server says it removed that are still present on + // the proxy — a removal that didn't take effect. + TLSDroppedSecretsStillPresent []string + // Updates the proxy rejected, by kind. Any rejection means the proxy kept + // older configuration instead. + XDSRejected map[string]int + // Count of updates too large to deliver, which the proxy silently never sees. + ResourceExhausted int +} + +// FamilyResult is one family's comparison outcome. +type FamilyResult struct { + Family Family `json:"family"` + Class FailureClass `json:"class"` + ExpectedCount int `json:"expectedCount"` + ActualCount int `json:"actualCount"` + // Expected but absent from the proxy. + MissingKeys []string `json:"missingKeys,omitempty"` + // Present on the proxy but not expected — the other side of a wrong-place match. + UnexpectedKeys []string `json:"unexpectedKeys,omitempty"` + // Plain-language explanation for differences that aren't about keys. + Detail string `json:"detail,omitempty"` +} + +// ParityReport is the full result of a comparison — never just pass/fail, so a +// reader can see which change differed, how, and the specific keys involved. +type ParityReport struct { + ExpectedBuildID uint64 `json:"expectedBuildID"` + Families []FamilyResult `json:"families"` + // Update kinds the proxy rejected. + XDSRejections map[string]int `json:"xdsRejections,omitempty"` + // Non-zero when updates were too large to deliver. + ResourceExhausted int `json:"resourceExhausted,omitempty"` +} + +// OK reports whether everything matched and the proxy accepted every update. +func (r ParityReport) OK() bool { + if len(r.XDSRejections) > 0 || r.ResourceExhausted > 0 { + return false + } + for _, f := range r.Families { + if f.Class != ClassOK { + return false + } + } + return true +} + +// Failures lists every problem — mismatched changes plus any rejected or +// undelivered updates — so a caller can show them all at once. +func (r ParityReport) Failures() []FamilyResult { + var out []FamilyResult + for _, f := range r.Families { + if f.Class != ClassOK { + out = append(out, f) + } + } + for xds, n := range r.XDSRejections { + out = append(out, FamilyResult{ + Class: ClassNACK, + Detail: fmt.Sprintf("xDS %s update_rejected=%d (Envoy rejected the snapshot)", xds, n), + }) + } + if r.ResourceExhausted > 0 { + out = append(out, FamilyResult{ + Class: ClassCeilingTruncation, + Detail: fmt.Sprintf("ext-server gRPC ResourceExhausted=%d (message-ceiling truncation)", r.ResourceExhausted), + }) + } + return out +} + +// AssertOK fails the test with the full report when the comparison did not pass. +func (r ParityReport) AssertOK(t testing.TB) { + t.Helper() + if !r.OK() { + t.Errorf("config-dump parity gate FAILED:\n%s", r.String()) + } +} + +// String renders a compact, readable summary. +func (r ParityReport) String() string { + var b strings.Builder + fmt.Fprintf(&b, "parity report (expected build %d): %s\n", r.ExpectedBuildID, statusWord(r.OK())) + for _, f := range r.Families { + fmt.Fprintf(&b, " %-18s %-15s expected=%d actual=%d", f.Family, f.Class, f.ExpectedCount, f.ActualCount) + if len(f.MissingKeys) > 0 { + fmt.Fprintf(&b, " missing=%v", truncateKeys(f.MissingKeys)) + } + if len(f.UnexpectedKeys) > 0 { + fmt.Fprintf(&b, " unexpected=%v", truncateKeys(f.UnexpectedKeys)) + } + if f.Detail != "" { + fmt.Fprintf(&b, " (%s)", f.Detail) + } + b.WriteByte('\n') + } + for xds, n := range r.XDSRejections { + fmt.Fprintf(&b, " NACK: xDS %s update_rejected=%d\n", xds, n) + } + if r.ResourceExhausted > 0 { + fmt.Fprintf(&b, " CEILING_TRUNCATION: gRPC ResourceExhausted=%d\n", r.ResourceExhausted) + } + return b.String() +} + +func statusWord(ok bool) string { + if ok { + return "PASS" + } + return "FAIL" +} + +// truncateKeys caps a key list for log readability. +func truncateKeys(keys []string) []string { + const max = 5 + if len(keys) <= max { + return keys + } + out := append([]string{}, keys[:max]...) + return append(out, fmt.Sprintf("...(+%d more)", len(keys)-max)) +} + +// Compare diffs intended against live and classifies each kind of change. The +// key-by-key match is the important one: it is the only way to catch a change +// that quietly applied to the wrong place. +func Compare(exp Expected, act Actual) ParityReport { + report := ParityReport{ExpectedBuildID: exp.BuildID} + + for _, fam := range AllFamilies { + report.Families = append(report.Families, compareFamily(fam, exp, act)) + } + + // Record any updates the proxy rejected. + for xds, n := range act.XDSRejected { + if n > 0 { + if report.XDSRejections == nil { + report.XDSRejections = map[string]int{} + } + report.XDSRejections[xds] = n + } + } + report.ResourceExhausted = act.ResourceExhausted + return report +} + +// compareFamily classifies one kind of change. Removed certificates are the +// exception: success there means the certificate is gone, so its "actual" is how +// many that should be absent are still present. +func compareFamily(fam Family, exp Expected, act Actual) FamilyResult { + if fam == FamilyTLSPrune { + return compareTLSPrune(exp, act) + } + + expKeys := normalize(exp.Keys[fam]) + actKeys := normalize(act.Keys[fam]) + res := FamilyResult{ + Family: fam, + ExpectedCount: len(expKeys), + ActualCount: len(actKeys), + } + + switch { + case len(expKeys) == 0: + // Nothing expected — any actual is not this gate's concern (the gate + // asserts what the ext-server INTENDED is present, not the inverse). + res.Class = ClassOK + case len(actKeys) == 0: + res.Class = ClassMissing + res.MissingKeys = expKeys + res.Detail = "expected mutations but the config_dump carries none (inert / never-applied)" + case len(actKeys) < len(expKeys): + res.Class = ClassCountMismatch + res.MissingKeys = setDiff(expKeys, actKeys) + res.Detail = "config_dump carries fewer than expected (truncated / partial publish)" + default: + missing := setDiff(expKeys, actKeys) + if len(missing) == 0 { + res.Class = ClassOK + } else { + // Right number of changes, but not the ones we expected — something + // applied in the wrong place. + res.Class = ClassWrongKeyed + res.MissingKeys = missing + res.UnexpectedKeys = setDiff(actKeys, expKeys) + res.Detail = "expected key set not present in the config_dump (applied to wrong route/gateway)" + } + } + return res +} + +// compareTLSPrune checks the opposite of the others: a certificate the extension +// server says it removed must be gone from the proxy. One still present means the +// removal never took effect. +func compareTLSPrune(exp Expected, act Actual) FamilyResult { + res := FamilyResult{ + Family: FamilyTLSPrune, + ExpectedCount: exp.TLSPrunedSecrets, + ActualCount: len(act.TLSDroppedSecretsStillPresent), + } + if exp.TLSPrunedSecrets == 0 { + res.Class = ClassOK + return res + } + if len(act.TLSDroppedSecretsStillPresent) > 0 { + res.Class = ClassMissing // a removal that didn't take effect + res.UnexpectedKeys = normalize(act.TLSDroppedSecretsStillPresent) + res.Detail = "pruned TLS secret(s) still present in the config_dump (prune did not take effect)" + return res + } + res.Class = ClassOK + return res +} + +// normalize returns a sorted, de-duplicated copy of keys (nil-safe). +func normalize(keys []string) []string { + if len(keys) == 0 { + return nil + } + seen := make(map[string]struct{}, len(keys)) + out := make([]string, 0, len(keys)) + for _, k := range keys { + if _, ok := seen[k]; ok { + continue + } + seen[k] = struct{}{} + out = append(out, k) + } + sort.Strings(out) + return out +} + +// setDiff returns the elements of a not present in b. Both inputs must be +// normalized (sorted, deduped). +func setDiff(a, b []string) []string { + bset := make(map[string]struct{}, len(b)) + for _, x := range b { + bset[x] = struct{}{} + } + var out []string + for _, x := range a { + if _, ok := bset[x]; !ok { + out = append(out, x) + } + } + return out +} diff --git a/test/parity/parity_test.go b/test/parity/parity_test.go new file mode 100644 index 00000000..762e9ef4 --- /dev/null +++ b/test/parity/parity_test.go @@ -0,0 +1,154 @@ +package parity + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func exp(keys map[Family][]string) Expected { + e := Expected{BuildID: 1, Keys: keys, Counts: map[Family]int{}} + for f, k := range keys { + e.Counts[f] = len(k) + } + return e +} + +func actual(keys map[Family][]string) Actual { + return Actual{Keys: keys} +} + +func TestCompare_OK(t *testing.T) { + keys := map[Family][]string{ + FamilyWAFRoute: {"rc##vh##fwd##ns/tpp/Observe"}, + FamilyConnectorCluster: {"httproute/ns/proxy/rule/0"}, + } + rep := Compare(exp(keys), actual(keys)) + assert.True(t, rep.OK(), rep.String()) +} + +// TestCompare_WAFDisabledIsNotMissing covers the case where the firewall is +// turned off: the extension server intended no firewall changes and the proxy +// has none. That must pass — the comparison asks whether what was intended is +// present, and nothing was intended. +func TestCompare_WAFDisabledIsNotMissing(t *testing.T) { + want := map[Family][]string{ + FamilyConnectorCluster: {"httproute/ns/proxy/rule/0"}, + FamilyLocalReply: {"l##fc"}, + } + got := map[Family][]string{ + FamilyConnectorCluster: {"httproute/ns/proxy/rule/0"}, + FamilyLocalReply: {"l##fc"}, + } + rep := Compare(exp(want), actual(got)) + assert.True(t, rep.OK(), rep.String()) + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyWAFRoute).Class, "zero-expected WAF route must be OK, not MISSING") + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyWAFHCM).Class, "zero-expected WAF HCM must be OK, not MISSING") + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyConnectorCluster).Class) + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyLocalReply).Class) +} + +func TestCompare_Missing(t *testing.T) { + want := map[Family][]string{FamilyWAFRoute: {"rc##vh##fwd##ns/tpp/Observe"}} + rep := Compare(exp(want), actual(nil)) + r := familyResult(t, rep, FamilyWAFRoute) + assert.Equal(t, ClassMissing, r.Class) + assert.Equal(t, want[FamilyWAFRoute], r.MissingKeys) + assert.False(t, rep.OK()) +} + +func TestCompare_CountMismatch(t *testing.T) { + want := map[Family][]string{FamilyWAFRoute: {"a", "b", "c"}} + got := map[Family][]string{FamilyWAFRoute: {"a", "b"}} + rep := Compare(exp(want), actual(got)) + r := familyResult(t, rep, FamilyWAFRoute) + assert.Equal(t, ClassCountMismatch, r.Class) + assert.Equal(t, []string{"c"}, r.MissingKeys) +} + +// TestCompare_WrongKeyed covers the case only this comparison can catch: the +// right number of changes, but applied to the wrong place. +func TestCompare_WrongKeyed(t *testing.T) { + want := map[Family][]string{FamilyWAFRoute: {"rc##vh##fwd##nsA/tpp/Observe"}} + got := map[Family][]string{FamilyWAFRoute: {"rc##vh##fwd##nsB/tpp/Observe"}} + rep := Compare(exp(want), actual(got)) + r := familyResult(t, rep, FamilyWAFRoute) + assert.Equal(t, ClassWrongKeyed, r.Class) + assert.Equal(t, r.ExpectedCount, r.ActualCount, "counts equal — count-only gate would pass") + assert.Equal(t, []string{"rc##vh##fwd##nsA/tpp/Observe"}, r.MissingKeys) + assert.Equal(t, []string{"rc##vh##fwd##nsB/tpp/Observe"}, r.UnexpectedKeys) +} + +func TestCompare_NACK(t *testing.T) { + a := actual(nil) + a.XDSRejected = map[string]int{"lds": 1, "rds": 0} + rep := Compare(exp(nil), a) + assert.False(t, rep.OK()) + require.Contains(t, rep.XDSRejections, "lds") + assert.NotContains(t, rep.XDSRejections, "rds", "zero-delta types must not be reported") +} + +func TestCompare_CeilingTruncation(t *testing.T) { + a := actual(nil) + a.ResourceExhausted = 3 + rep := Compare(exp(nil), a) + assert.False(t, rep.OK()) + assert.Equal(t, 3, rep.ResourceExhausted) + classes := map[FailureClass]bool{} + for _, f := range rep.Failures() { + classes[f.Class] = true + } + assert.True(t, classes[ClassCeilingTruncation]) +} + +func TestCompareTLSPrune_NegativeAssertion(t *testing.T) { + e := Expected{BuildID: 1, Keys: map[Family][]string{}, Counts: map[Family]int{}, TLSPrunedSecrets: 2} + + // Pass: no pruned secret remains in the dump. + rep := Compare(e, Actual{Keys: map[Family][]string{}}) + assert.Equal(t, ClassOK, familyResult(t, rep, FamilyTLSPrune).Class) + + // Fail: a pruned secret is still present. + a := Actual{Keys: map[Family][]string{}, TLSDroppedSecretsStillPresent: []string{"bad-cert"}} + rep2 := Compare(e, a) + r := familyResult(t, rep2, FamilyTLSPrune) + assert.Equal(t, ClassMissing, r.Class) + assert.Equal(t, []string{"bad-cert"}, r.UnexpectedKeys) +} + +func familyResult(t *testing.T, rep ParityReport, fam Family) FamilyResult { + t.Helper() + for _, f := range rep.Families { + if f.Family == fam { + return f + } + } + t.Fatalf("family %s not in report", fam) + return FamilyResult{} +} + +// --- stats scrape --- + +func TestScrapeXDSRejected(t *testing.T) { + stats := []byte(` +# HELP envoy_listener_manager_lds_update_rejected +envoy_listener_manager_lds_update_rejected 2 +envoy_cluster_manager_cds_update_rejected 0 +envoy_http_foo_rds_bar_update_rejected 1 +envoy_listener_manager_lds_update_success 40 +`) + got := scrapeXDSRejected(stats) + assert.Equal(t, 2, got["lds"]) + assert.Equal(t, 1, got["rds"]) + _, hasCDS := got["cds"] + assert.False(t, hasCDS, "zero-value counters must be skipped") +} + +func TestScrapeResourceExhausted(t *testing.T) { + metrics := []byte(` +grpc_server_handled_total{grpc_code="OK",grpc_method="PostTranslateModify"} 100 +grpc_server_handled_total{grpc_code="ResourceExhausted",grpc_method="PostTranslateModify"} 4 +`) + assert.Equal(t, 4, scrapeResourceExhausted(metrics)) +} diff --git a/test/parity/register.go b/test/parity/register.go new file mode 100644 index 00000000..eb2ef36b --- /dev/null +++ b/test/parity/register.go @@ -0,0 +1,21 @@ +package parity + +// These message types can appear nested inside the proxy's configuration, and +// each must be linked into the binary or parsing that configuration fails. Add +// to this list whenever a new dump surfaces a type the parser can't resolve. +import ( + _ "github.com/cncf/xds/go/xds/type/v3" + _ "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/filters/http/golang/v3alpha" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/health_check/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/least_request/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/resource_monitors/downstream_connections/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/internal_upstream/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/raw_buffer/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3" +) diff --git a/test/parity/scan.go b/test/parity/scan.go new file mode 100644 index 00000000..a5eaf59d --- /dev/null +++ b/test/parity/scan.go @@ -0,0 +1,188 @@ +package parity + +import ( + "strings" + + clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + listenerv3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + routev3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + hcmv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" +) + +// This file scans the proxy's live configuration for each kind of change and +// emits an identity for every one it finds. These identity strings must match +// the ones the extension server produces exactly, or the comparison is +// meaningless; keep both sides in lockstep. + +const ( + keySep = "##" + + // Well-known marker strings, mirrored from where the configuration is written. + datumGatewayMetaKey = "datum-gateway" + connectorInternalTransport = "envoy.transport_sockets.internal_upstream" + hcmNetworkFilterName = "envoy.filters.network.http_connection_manager" + offlineBodyMarker = "Tunnel not online" +) + +// ScanActual scans the proxy's live configuration and assembles what it +// actually found. corazaFilterName is the firewall filter name used to spot +// firewall injection; droppedSecrets names the TLS certificates that should have +// been removed. +func ScanActual(dump *ConfigDump, corazaFilterName string, droppedSecrets []string) Actual { + act := Actual{Keys: map[Family][]string{}} + + for _, rc := range dump.Routes { + scanRouteConfig(rc, &act) + } + for _, cl := range dump.Clusters { + if isReplacedConnectorCluster(cl) { + act.Keys[FamilyConnectorCluster] = append(act.Keys[FamilyConnectorCluster], cl.GetName()) + } + } + for _, l := range dump.Listeners { + scanListener(l, corazaFilterName, &act) + } + + // Flag any certificate that should have been removed but is still present. + if len(droppedSecrets) > 0 { + present := make(map[string]struct{}, len(dump.SecretNames)) + for _, n := range dump.SecretNames { + present[n] = struct{}{} + } + for _, dropped := range droppedSecrets { + if _, ok := present[dropped]; ok { + act.TLSDroppedSecretsStillPresent = append(act.TLSDroppedSecretsStillPresent, dropped) + } + } + } + return act +} + +func scanRouteConfig(rc *routev3.RouteConfiguration, act *Actual) { + rcName := rc.GetName() + for _, vh := range rc.GetVirtualHosts() { + vhName := vh.GetName() + for _, rt := range vh.GetRoutes() { + if ns, name, mode, ok := datumGatewayTPP(rt); ok { + act.Keys[FamilyWAFRoute] = append(act.Keys[FamilyWAFRoute], + wafRouteKey(rcName, vhName, rt.GetName(), ns, name, mode)) + } + if isConnectRoute(rt) { + act.Keys[FamilyConnectorRoute] = append(act.Keys[FamilyConnectorRoute], + connectorRouteKey(rcName, vhName, rt.GetName())) + } + if isOfflineDirectResponse(rt) { + act.Keys[FamilyConnectorOffline] = append(act.Keys[FamilyConnectorOffline], + connectorRouteKey(rcName, vhName, rt.GetName())) + } + } + } +} + +func scanListener(l *listenerv3.Listener, corazaFilterName string, act *Actual) { + lName := l.GetName() + eachHCM(l, func(chainName string, hcm *hcmv3.HttpConnectionManager) { + if corazaFilterName != "" && hcmHasFilterAtZero(hcm, corazaFilterName) { + act.Keys[FamilyWAFHCM] = append(act.Keys[FamilyWAFHCM], listenerChainKey(lName, chainName)) + } + if hcm.GetLocalReplyConfig() != nil { + act.Keys[FamilyLocalReply] = append(act.Keys[FamilyLocalReply], listenerChainKey(lName, chainName)) + } + }) +} + +// --- identity formats (MUST match the extension server) --- + +func wafRouteKey(rc, vh, rt, tppNS, tppName, mode string) string { + return strings.Join([]string{rc, vh, rt, tppNS + "/" + tppName + "/" + mode}, keySep) +} + +func connectorRouteKey(rc, vh, rt string) string { + return strings.Join([]string{rc, vh, rt}, keySep) +} + +func listenerChainKey(listener, chain string) string { + return strings.Join([]string{listener, chain}, keySep) +} + +// --- detectors (mirror, in reverse, how the configuration is written) --- + +func datumGatewayTPP(rt *routev3.Route) (ns, name, mode string, ok bool) { + md := rt.GetMetadata() + if md == nil { + return "", "", "", false + } + dg := md.GetFilterMetadata()[datumGatewayMetaKey] + if dg == nil { + return "", "", "", false + } + res := dg.GetFields()["resources"].GetListValue() + if res == nil || len(res.GetValues()) == 0 { + return "", "", "", false + } + f := res.GetValues()[0].GetStructValue().GetFields() + return f["namespace"].GetStringValue(), + f["name"].GetStringValue(), + f["mode"].GetStringValue(), + true +} + +func isConnectRoute(rt *routev3.Route) bool { + ra := rt.GetRoute() + if ra == nil { + return false + } + for _, uc := range ra.GetUpgradeConfigs() { + if strings.EqualFold(uc.GetUpgradeType(), "CONNECT") { + return true + } + } + return false +} + +func isOfflineDirectResponse(rt *routev3.Route) bool { + dr := rt.GetDirectResponse() + if dr == nil { + return false + } + if dr.GetStatus() != 503 { + return false + } + return dr.GetBody().GetInlineString() == offlineBodyMarker +} + +func isReplacedConnectorCluster(cl *clusterv3.Cluster) bool { + if cl.GetType() != clusterv3.Cluster_STATIC { + return false + } + return cl.GetTransportSocket().GetName() == connectorInternalTransport +} + +func eachHCM(l *listenerv3.Listener, fn func(chainName string, hcm *hcmv3.HttpConnectionManager)) { + chains := make([]*listenerv3.FilterChain, 0, len(l.GetFilterChains())+1) + chains = append(chains, l.GetFilterChains()...) + if dfc := l.GetDefaultFilterChain(); dfc != nil { + chains = append(chains, dfc) + } + for _, fc := range chains { + for _, f := range fc.GetFilters() { + if f.GetName() != hcmNetworkFilterName { + continue + } + tc := f.GetTypedConfig() + if tc == nil { + continue + } + hcm := &hcmv3.HttpConnectionManager{} + if err := tc.UnmarshalTo(hcm); err != nil { + continue + } + fn(fc.GetName(), hcm) + } + } +} + +func hcmHasFilterAtZero(hcm *hcmv3.HttpConnectionManager, filterName string) bool { + fs := hcm.GetHttpFilters() + return len(fs) > 0 && fs[0].GetName() == filterName +}