From 121bd95138a4287910d0e4bee8d486d5f6251507 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 14 May 2026 00:03:52 +0700 Subject: [PATCH 1/2] feat(webhook): proxy-aware error handling for forward proxies (#899) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * docs(webhook): document forward proxy feature and Envoy support Adds a Features page covering the existing DESTINATIONS_WEBHOOK_PROXY_URL configuration and introduces DESTINATIONS_WEBHOOK_PROXY_TYPE=envoy as an opt-in for Envoy-aware error handling: infra-error detection via x-envoy-response-flags and the proxyconnect error path, transparent redelivery via nack, and x-envoy-*/server header sanitization. Includes required Envoy configuration, a minimal reference envoy.yaml, and queue retry policy guidance for sufficient redelivery runway during proxy outages. Default behavior for existing users is unchanged. Co-Authored-By: Claude * docs(webhook): narrow nack semantics to proxy infra only Adds an explicit error-attribution table. Nack is reserved for cases where the proxy itself is the proximate cause (auth failure, proxy unreachable). Destination failures that Envoy merely reports (upstream DNS / connect / timeout) are recorded as normal failed delivery attempts with destination-attributed codes; the wrapper sanitizes the response data so Envoy is not exposed, but the customer still sees a destination failure. This avoids the infinite-nack failure mode for genuinely broken destinations. Co-Authored-By: Claude * docs(webhook): clarify dead-letter window phrasing "DLQ runway" replaced with an explicit statement of what the number means: the maximum time a single nacked message is retried before landing in the dead-letter queue. Co-Authored-By: Claude * docs(webhook): drop PROXY_TYPE knob, auto-detect Envoy signals Removes DESTINATIONS_WEBHOOK_PROXY_TYPE. Envoy-specific signals (x-envoy-response-flags, x-envoy-* / server: envoy headers) are detected automatically from response headers when present; nothing to configure. The proxy-infra nack path (proxyconnect 407, dial to proxy fails) is generic and applies to any forward proxy, since the discriminator is "the proxy is broken or misconfigured," not "the proxy is Envoy." Co-Authored-By: Claude * docs(webhook): split error table into generic and Envoy-specific Reorganizes the error handling section so generic-proxy behavior and Envoy-specific signals are in separate tables. Adding support for other proxy implementations (Squid, HAProxy, nginx) in the future would extend the Envoy support pattern. Co-Authored-By: Claude * docs(webhook): clarify sanitization scope and proxy security trade-offs Two clarifications: 1. Response sanitization is best-effort and currently complete only for Envoy. For arbitrary proxies, Outpost can rewrite error messages but cannot reliably strip proxy-identifying content from response bodies or headers. 2. Production proxy auth + TLS recommendation now depends on topology: not required if Outpost and the proxy share a private network; strongly recommended if the proxy is reachable over the internet to prevent open-relay abuse. Co-Authored-By: Claude * docs(webhook): demote production notes to a short paragraph Trim the production-deployment guidance to a single sentence on the VPC-vs-internet trade-off, instead of a subsection with specific configuration suggestions. Co-Authored-By: Claude * feat(destregistry): add proxy error sentinel types Defines ErrProxyInfra (signals nack-on-infra-error) and ErrProxyDestination (signals record-as-destination-error with mapped code) along with MapEnvoyResponseFlag for translating Envoy response flags into existing destination error codes (connection_refused, timeout, dns_error, network_unreachable). No consumers yet; subsequent commits add the transport wrapper and integration into the webhook delivery path. Co-Authored-By: Claude * feat(destwebhook): nack proxy infra errors, classify destination errors Wraps the proxy transport when a proxy URL is configured. Detects two classes of proxy-originated failures: - Proxy infrastructure (407 / 401 / 403 on CONNECT, dial-to-proxy failure): wrapped as ErrProxyInfra. ExecuteHTTPRequest returns Delivery: nil so the message handler nacks via the existing nil-attempt path (registry.go:179-195), preserving the destination's retry budget across proxy outages. - Destination errors that the proxy reports (5xx on CONNECT, etc.): wrapped as ErrProxyDestination with a classification code mapped from the response. Falls through to the standard failed-attempt path with the explicit code instead of substring-matching the underlying error. CONNECT-time detection uses http.Transport.OnProxyConnectResponse which exposes the full response (status + headers) before Go discards it, sidestepping the fragility of parsing internal error strings. The wrapper still detects dial-to-proxy failures via the "proxyconnect" prefix that Go's net.OpError keeps for that case. Co-Authored-By: Claude * feat(destwebhook): Envoy response-flag detection and header sanitization Extends proxyTransport with Envoy-specific signals layered on top of the generic proxy handling: - Plain-HTTP forwarding path: responses carrying a non-empty/non-"-" x-envoy-response-flags header are treated as Envoy-synthesized failures, converted to ErrProxyDestination with the flag mapped to a destination error code, and their bodies dropped so they don't reach the delivery record. - CONNECT path: onProxyConnectResponse reads the same flag header on non-200 responses to refine the generic "connection_refused" classification (e.g. flag DC -> dns_error, UT -> timeout). - Successful responses are stripped of x-envoy-* headers and a "server: envoy" header before being returned, so the proxy is not fingerprinted in the destination's recorded response. Co-Authored-By: Claude * test(destregistry): pin Go stdlib "proxyconnect" error wording Adds a snapshot test asserting that net/http still emits "proxyconnect" in dial-to-proxy failure errors. The proxyTransport detector relies on this internal stdlib convention; if a Go upgrade changes the wording, this test fails in CI rather than letting the detector silently stop matching and causing proxy outages to be misclassified as destination errors. Co-Authored-By: Claude * docs(webhook): align tables with implementation Two small adjustments after the code landed: - Drop the implementation-specific "Outpost sees" column from the generic table; describe observed behavior instead. Proxy auth-style failures cover 407, 401, and 403, not just 407. - Frame the Envoy table around the response-flag signal directly, rather than listing every flag-to-code combination (those are documented by the MapEnvoyResponseFlag mapping). Co-Authored-By: Claude * docs(webhook): move response_headers_to_add to RouteConfiguration Envoy rejects response_headers_to_add at the HttpConnectionManager level (confirmed on Envoy 1.31). The field belongs on RouteConfiguration. Updates both the standalone snippet and the reference envoy.yaml. Co-Authored-By: Claude * fix(destwebhook): treat HTTPS responses as byte-transparent Destinations commonly sit behind their own Envoy at the edge, which emits server: envoy and x-envoy-response-flags on the response. For HTTPS destinations, that response travels through an established CONNECT tunnel — the forward proxy is byte-blind to TLS-encrypted bytes and cannot have synthesized or modified anything. Previously the wrapper treated those headers as if our forward proxy had set them, which: - converted a real destination 503 from a downed app into a misleading "connection_refused" with no body - stripped the destination's own envoy observability headers After this change, HTTPS responses (post-successful-CONNECT) are returned unchanged. Proxy-originated HTTPS failures all happen at CONNECT time and remain handled in onProxyConnectResponse. The plain-HTTP forwarding path still applies envoy detection and header stripping because the forward proxy is in the byte path on the return. The residual case where this over-strips — a plain-HTTP destination that is itself behind Envoy — is documented as a limitation; attribution remains correct because x-envoy-response-flags is overwritten by the forward Envoy via OVERWRITE_IF_EXISTS_OR_ADD. Top-of-file doc comment on proxyTransport now describes the two surfaces (CONNECT-time vs plain-HTTP-forwarding) so the dual-path design is discoverable. Co-Authored-By: Claude * chore(destwebhook): apply review nits to proxy transport - httphelper: skip ClassifyNetworkError when ErrProxyDestination sentinel matches - proxytransport: drop strings.ToLower; http.Header keys are canonicalized - proxytransport: drop impossible nil-guard on req/req.URL in RoundTrip - proxytransport: note unhandled Envoy flags fall through to network_error - proxytransport: cross-reference pin test from proxyconnect detector - proxytransport_test: drop unreachable empty-flag row from map test Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(destregistry): extract NewHTTPClient as free function MakeHTTPClient never touched BaseProvider state — it was a free helper pretending to be a method. Move it to httpclient.go and update the three HTTP-based providers (desthookdeck, destwebhook, destwebhookstandard) to call destregistry.NewHTTPClient directly. BaseProvider keeps only the metadata/validation concerns it actually owns. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(destwebhook): scope proxy transport to webhook package Move proxytransport.go, the Envoy flag mapping, and all sentinels (ErrProxyInfra, ErrProxyDestination, IsProxyInfraError) into the destwebhook package — the only place they're actually used. destregistry keeps a generic WrapTransport hook on HTTPClientConfig; destwebhook exports a WrapTransport function and the two webhook providers (destwebhook, destwebhookstandard) plug it in when constructing their HTTP client. destregistry/ no longer has any proxy-specific code. The wrapper, the Envoy flag table, the CONNECT-response callback, and all four test files move with the sentinels. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) * chore: gofmt baseprovider Co-Authored-By: Claude Opus 4.7 (1M context) * docs(webhook-proxy): clarify Pub/Sub redelivery runway with OSS defaults Replace the "set 5s/600s/10 attempts for ~30min runway" recommendation with the actual behavior of the default provisioning in internal/mqinfra/gcppubsub.go (10s/120s/6 attempts ~ 5min) and note that operators expecting longer proxy outages should tune MinRetryBackoff/MaxRetryBackoff/RetryLimit. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(destwebhook): map Envoy DF (DNS failure), not DC DC (Downstream Connection Termination) is the wrong flag — it means the client closed the connection, not DNS resolution. The actual flag emitted by Envoy's dynamic_forward_proxy when a host fails to resolve is DF (DNS Failure). Verified against a local Envoy reference proxy (build/dev/envoy/envoy.yaml): nonexistent.invalid → DF → dns_error. Also adds a local-dev Envoy service (build/dev/compose.yml + envoy.yaml) matching the documented config — `%RESPONSE_FLAGS%` on RouteConfiguration with OVERWRITE_IF_EXISTS_OR_ADD — so the QA suite at qa/suites/outpost/ webhook-proxy.md can be run against a real proxy locally. Co-Authored-By: Claude Opus 4.7 (1M context) * feat(destwebhook): expand Envoy flag mapping + capture diagnostics Address review feedback that the recorded error code is too coarse and gives operators nothing to diagnose with. Mapping: align with ClassifyNetworkError vocabulary so customers see the same codes whether or not a proxy is in path. - UC, UR, LR → connection_reset (split out of connection_refused) - UPE, DPE → protocol_error (new bucket; previously network_error) - UMSDR → timeout - network_error stays as the catch-all; operator-visible signal that a new flag has shown up and the table should be expanded. Diagnostics: capture raw x-envoy-response-flags and the new x-envoy-response-code-details (stage{reason}) on ErrProxyDestination and attach them to the publish-attempt error payload as proxy_flag / proxy_details. Logged operator-side; never written to the customer- visible attempt response_data — same model EG uses. Ref Envoy config emits both headers with OVERWRITE_IF_EXISTS_OR_ADD so destinations can't spoof them. Co-Authored-By: Claude Opus 4.7 (1M context) * refactor(destwebhook): generic Diagnostics map on ErrProxyDestination Replace Flag/Details fields with a free-form Diagnostics map[string]string. The previous shape baked Envoy-specific terminology into a struct that classifies errors from any forward proxy — even though only Envoy populates the fields today. Generic map keeps the contract honest: whoever populates a key owns its naming (envoy_flag, envoy_details for Envoy; future Squid/HAProxy paths would add their own). Error() sorts keys for deterministic log output. httphelper splats the map into the publish-attempt Data payload. Behavior unchanged — same log strings, same customer-invisible surface. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude --- build/dev/compose.yml | 6 + build/dev/envoy/envoy.yaml | 66 +++ docs/content/features/webhook-proxy.mdoc | 188 +++++++++ docs/content/nav.json | 3 +- internal/destregistry/baseprovider.go | 55 --- internal/destregistry/baseprovider_test.go | 14 +- internal/destregistry/httpclient.go | 71 ++++ .../providers/desthookdeck/desthookdeck.go | 2 +- .../providers/destwebhook/destwebhook.go | 7 +- .../providers/destwebhook/httphelper.go | 56 ++- .../providers/destwebhook/proxytransport.go | 391 ++++++++++++++++++ .../proxytransport_internal_test.go | 102 +++++ .../destwebhook/proxytransport_pin_test.go | 62 +++ .../destwebhook/proxytransport_test.go | 378 +++++++++++++++++ .../destwebhookstandard.go | 7 +- 15 files changed, 1329 insertions(+), 79 deletions(-) create mode 100644 build/dev/envoy/envoy.yaml create mode 100644 docs/content/features/webhook-proxy.mdoc create mode 100644 internal/destregistry/httpclient.go create mode 100644 internal/destregistry/providers/destwebhook/proxytransport.go create mode 100644 internal/destregistry/providers/destwebhook/proxytransport_internal_test.go create mode 100644 internal/destregistry/providers/destwebhook/proxytransport_pin_test.go create mode 100644 internal/destregistry/providers/destwebhook/proxytransport_test.go diff --git a/build/dev/compose.yml b/build/dev/compose.yml index 16f59a4ed..fbdc46b71 100644 --- a/build/dev/compose.yml +++ b/build/dev/compose.yml @@ -42,6 +42,12 @@ services: environment: SERVICE: log + envoy: + image: envoyproxy/envoy:v1.31-latest + volumes: + - ./envoy/envoy.yaml:/etc/envoy/envoy.yaml:ro + command: ["envoy", "-c", "/etc/envoy/envoy.yaml", "--log-level", "info"] + portal: build: context: ../../ diff --git a/build/dev/envoy/envoy.yaml b/build/dev/envoy/envoy.yaml new file mode 100644 index 000000000..c68bff5a9 --- /dev/null +++ b/build/dev/envoy/envoy.yaml @@ -0,0 +1,66 @@ +admin: + address: + socket_address: { address: 0.0.0.0, port_value: 9901 } + +static_resources: + listeners: + - name: forward_proxy + address: + socket_address: { address: 0.0.0.0, port_value: 10000 } + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: forward_proxy + codec_type: AUTO + access_log: + - name: envoy.access_loggers.stdout + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog + http_filters: + - name: envoy.filters.http.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig + dns_cache_config: + name: dfp_cache + dns_lookup_family: V4_ONLY + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + upgrade_configs: + - upgrade_type: CONNECT + route_config: + name: proxy_route + response_headers_to_add: + - header: + key: "x-envoy-response-flags" + value: "%RESPONSE_FLAGS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + - header: + key: "x-envoy-response-code-details" + value: "%RESPONSE_CODE_DETAILS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + virtual_hosts: + - name: proxy + domains: ["*"] + routes: + - match: { connect_matcher: {} } + route: + cluster: dfp_cluster + upgrade_configs: + - upgrade_type: CONNECT + connect_config: {} + - match: { prefix: "/" } + route: { cluster: dfp_cluster } + + clusters: + - name: dfp_cluster + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig + dns_cache_config: + name: dfp_cache + dns_lookup_family: V4_ONLY diff --git a/docs/content/features/webhook-proxy.mdoc b/docs/content/features/webhook-proxy.mdoc new file mode 100644 index 000000000..6dbb6342a --- /dev/null +++ b/docs/content/features/webhook-proxy.mdoc @@ -0,0 +1,188 @@ +--- +title: "Webhook Forward Proxy" +description: "Route outgoing webhook deliveries through an HTTP forward proxy for static-IP egress or network isolation." +--- + +Outpost can route outgoing webhook traffic through an HTTP forward proxy. Common reasons: + +- **Static-IP egress** — destinations that allowlist a specific source IP +- **Network isolation** — keep delivery workers off the public internet +- **Centralized egress policy** — single chokepoint for outbound traffic + +Proxy support is operator-configured and applies to every webhook destination served by the deployment. + +## Configuration + +| Env var | Description | +|---------|-------------| +| `DESTINATIONS_WEBHOOK_PROXY_URL` | Proxy URL, e.g. `http://user:pass@proxy.example.com:8080`. Supports basic auth. | + +When `DESTINATIONS_WEBHOOK_PROXY_URL` is set, Outpost installs an HTTP proxy on the webhook publisher's transport. HTTPS destinations use the standard `CONNECT` tunneling flow; HTTP destinations are forwarded request-by-request. + +## Error handling + +Putting a forward proxy in the delivery path introduces a new failure surface that should not be charged to the destination. A proxy-auth misconfiguration or a proxy outage isn't the destination's fault, and recording it as a failed delivery attempt burns retry budget on a problem the destination cannot resolve. + +Outpost distinguishes **proxy infrastructure failures** from **destination failures** (including destination failures that the proxy merely *reports* on the destination's behalf) and applies the right behavior to each. The base behavior below applies to any forward proxy; Envoy-specific signals are picked up automatically when present (see [Envoy support](#envoy-support)). + +### General behavior + +| Scenario | Attribution | Behavior | +|---|---|---| +| Proxy returns 407 / 401 / 403 on `CONNECT` | Infra | **Nack** — operator misconfiguration of proxy credentials | +| Proxy unreachable (TCP dial to proxy fails) | Infra | **Nack** — proxy infrastructure outage | +| `CONNECT` succeeds, destination returns real 4xx/5xx (HTTPS) | Destination | Record attempt with the actual status code | +| `CONNECT` succeeds, destination TLS handshake fails (HTTPS) | Destination | Record attempt (`tls_error`) | +| `CONNECT` succeeds, destination times out | Destination | Record attempt (`timeout`) | +| Proxy returns other 5xx on `CONNECT` (cannot reach destination) | Destination | Record attempt (`connection_refused`) | +| Real upstream response passed through (plain-HTTP) | Destination | Record as today | +| Proxy itself overloaded or misbehaving (rare) | Destination (conservative) | Record attempt (`network_error`) — never nack on speculation | + +**Key principle:** when the proxy reports a failure that originated at the destination (DNS, connect refused, upstream timeout), the customer still sees it as a destination failure. Outpost rewrites the message so the response data is destination-attributed, not proxy-attributed. Nacking is reserved for cases where the proxy itself is the proximate cause. + +**HTTPS responses are byte-transparent.** Once the `CONNECT` tunnel opens, TLS runs end-to-end between Outpost and the destination; the forward proxy can no longer read or modify response bytes. Outpost therefore does not inspect or sanitize HTTPS response payloads — they are recorded as the destination sent them. Proxy-originated HTTPS failures (auth, unreachable, can't connect upstream) all happen at `CONNECT` time and are handled before the tunnel exists. + +Response-body and response-header sanitization on the plain-HTTP forwarding path is best-effort and depends on the proxy implementation being recognized. For an arbitrary forward proxy, Outpost can rewrite error messages but cannot reliably strip proxy-identifying response content. Sanitization is currently complete only for Envoy — see [Envoy support](#envoy-support). + +## Envoy support + +When the proxy is [Envoy](https://www.envoyproxy.io/), Outpost picks up Envoy-specific signals automatically — no configuration toggle required. These are additive on top of the general behavior above. + +### Additional behaviors + +Envoy-specific handling fires on two surfaces: the `CONNECT` response (HTTPS, where the proxy's response is visible to Outpost before the tunnel opens), and proxied plain-HTTP responses (where the proxy is in the byte path on the way back). Responses that arrived through an established `CONNECT` tunnel are not inspected — see the byte-transparency note above. + +| Scenario | Signal | Attribution | Behavior | +|---|---|---|---| +| Envoy `CONNECT` failure with response-flag header | `x-envoy-response-flags` on the `CONNECT` response | Destination | Record attempt with code refined from the flag (e.g. `DF` → `dns_error`, `UT` → `timeout`) instead of the generic `connection_refused` | +| Envoy synthesizes 5xx response (plain-HTTP path) | `x-envoy-response-flags: UF` / `UC` / etc. on the response | Destination | Record attempt with code mapped from the flag; response body is dropped | +| Real upstream response passed through (plain-HTTP) | `x-envoy-response-flags: -` or empty | Destination | Record as today | +| Successful plain-HTTP response served via Envoy | `x-envoy-*` and `server: envoy` headers present | Destination | Record; headers stripped before storage | +| HTTPS response (any status, any headers, post-`CONNECT`) | — | Destination | Pass-through unchanged — bytes never touched the forward proxy | + +Two Envoy-specific behaviors are layered on top, both scoped to surfaces where the forward proxy actually contributed to the bytes: + +- **Response-flag mapping** — `x-envoy-response-flags`, when present and non-empty, refines the destination error code. Mapping aligns with Outpost's non-proxy error vocabulary (`ClassifyNetworkError`), so customers see the same codes whether or not a proxy is in path: + + | Envoy flag | Outpost code | Meaning | + |---|---|---| + | `UF`, `UH`, `LH` | `connection_refused` | TCP dial failed / no healthy upstream / failed health check | + | `UC`, `UR`, `LR` | `connection_reset` | Established connection dropped / remote or local reset | + | `UT`, `SI`, `DT`, `UMSDR` | `timeout` | Upstream / stream-idle / duration / max-stream timeout | + | `DF` | `dns_error` | DNS resolution failure (`dynamic_forward_proxy`) | + | `NR`, `NC` | `network_unreachable` | No route / no cluster | + | `UPE`, `DPE` | `protocol_error` | Upstream / downstream protocol error | + | (any other flag) | `network_error` | Unmapped — operator-visible signal to expand the table | + + Applies to `CONNECT` responses and plain-HTTP responses, not to bytes returned through an HTTPS tunnel. + +- **Operator diagnostics** — when a flag fires, the raw `x-envoy-response-flags` value and `x-envoy-response-code-details` (the `stage{reason}` string, e.g. `upstream_reset_before_response_started{connection_timeout}`) are written into a generic proxy-diagnostics map on the error as `envoy_flag` / `envoy_details`. These surface in the underlying error message and on the publish-attempt error payload, visible in `consumer handler error` logs — but never written to the customer-visible attempt `response_data`, which mirrors a normal network failure (no status, no body). The map is intentionally untyped so other proxies (Squid, HAProxy) can populate their own keys without colliding. To enable this, the Envoy ref config emits the details header alongside the flag header. + +- **Header and body sanitization** — `x-envoy-*` and `server: envoy` headers are stripped from plain-HTTP responses; Envoy-synthesized plain-HTTP response bodies are replaced with a normalized message that does not leak Envoy. HTTPS responses are not sanitized. + +Support for other forward proxies (Squid, HAProxy, nginx, ...) can be added the same way — by detecting proxy-specific response signals and mapping them to destination error codes. None are currently implemented. + +#### Limitation: plain-HTTP destinations that are themselves behind Envoy + +If a destination is reached over plain HTTP and the destination itself sits behind its own Envoy edge, the destination's `x-envoy-*` headers (e.g. `x-envoy-upstream-service-time`, `server: envoy`) pass through the forward proxy and Outpost strips them. The destination's `x-envoy-response-flags` is overwritten by the forward Envoy's value, so attribution is still correct — the customer never sees a destination failure misattributed to the proxy — but some destination-side observability headers are lost on this code path. + +This does not affect HTTPS destinations: HTTPS responses are byte-transparent (see above), so any `x-envoy-*` headers from the destination's Envoy reach Outpost untouched. + +### Required Envoy configuration + +For Outpost to reliably distinguish Envoy-synthesized responses from real upstream responses, Envoy must emit its response flags as a response header. The response-code-details header is optional but recommended — without it, Outpost still classifies via the flag, but operators lose the precise stage/reason in logs. Add to the route configuration (Envoy rejects these fields on the HTTP connection manager — they belong on `RouteConfiguration`): + +```yaml +route_config: + response_headers_to_add: + - header: + key: "x-envoy-response-flags" + value: "%RESPONSE_FLAGS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + - header: + key: "x-envoy-response-code-details" + value: "%RESPONSE_CODE_DETAILS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + virtual_hosts: + # ... +``` + +`OVERWRITE_IF_EXISTS_OR_ADD` is important — it prevents a misbehaving destination from spoofing either header to confuse Outpost's classification or pollute operator diagnostics. + +### Minimal reference Envoy + +A minimal forward-proxy Envoy listener with response-flag reporting enabled: + +```yaml +admin: + address: + socket_address: { address: 127.0.0.1, port_value: 9901 } + +static_resources: + listeners: + - name: forward_proxy + address: + socket_address: { address: 0.0.0.0, port_value: 10000 } + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: forward_proxy + codec_type: AUTO + http_filters: + - name: envoy.filters.http.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig + dns_cache_config: + name: dfp_cache + dns_lookup_family: V4_ONLY + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + upgrade_configs: + - upgrade_type: CONNECT + route_config: + response_headers_to_add: + - header: + key: "x-envoy-response-flags" + value: "%RESPONSE_FLAGS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + - header: + key: "x-envoy-response-code-details" + value: "%RESPONSE_CODE_DETAILS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + virtual_hosts: + - name: proxy + domains: ["*"] + routes: + - match: { connect_matcher: {} } + route: + cluster: dfp_cluster + upgrade_configs: + - upgrade_type: CONNECT + connect_config: {} + - match: { prefix: "/" } + route: { cluster: dfp_cluster } + + clusters: + - name: dfp_cluster + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig + dns_cache_config: + name: dfp_cache + dns_lookup_family: V4_ONLY +``` + +The example above is a minimal listener. Whether you need proxy authentication and TLS depends on your network topology: if Outpost and the proxy share a private network, neither is strictly required; if the proxy is reachable over the public internet, both are strongly recommended to prevent the proxy being used as an open relay. + +## Queue retry behavior + +When a proxy infrastructure error is nacked, the underlying message queue redelivers the event. Because nacks only fire for true infra failures (proxy auth or proxy unreachable), the redelivery rate is bounded by the proxy outage duration, not by destination behavior. + +Outpost's default GCP Pub/Sub provisioning (`internal/mqinfra/gcppubsub.go`) uses `retryPolicy: {minimumBackoff: 10s, maximumBackoff: 120s}` with `maxDeliveryAttempts: 6` (default `RetryLimit` + 1). That gives roughly **5 minutes of redelivery runway** before a nacked message lands in the dead-letter topic. A proxy outage shorter than this window is transparent to the destination; longer outages require manual replay from the DLQ. + +If you expect longer proxy outages, raise `MinRetryBackoff` / `MaxRetryBackoff` (config) and `RetryLimit` (policy) so the redelivery window covers your worst-case outage duration. RabbitMQ / SQS / Kafka have equivalent knobs on their delivery queues. diff --git a/docs/content/nav.json b/docs/content/nav.json index aaa5da67f..b653a7f67 100644 --- a/docs/content/nav.json +++ b/docs/content/nav.json @@ -48,7 +48,8 @@ }, { "slug": "features/tenant-user-portal", "title": "Tenant Portal" }, { "slug": "features/metrics", "title": "Metrics" }, - { "slug": "features/opentelemetry", "title": "OpenTelemetry" } + { "slug": "features/opentelemetry", "title": "OpenTelemetry" }, + { "slug": "features/webhook-proxy", "title": "Webhook Forward Proxy" } ] ] }, diff --git a/internal/destregistry/baseprovider.go b/internal/destregistry/baseprovider.go index 49d633438..52927f89f 100644 --- a/internal/destregistry/baseprovider.go +++ b/internal/destregistry/baseprovider.go @@ -3,12 +3,9 @@ package destregistry import ( "context" "fmt" - "net/http" - "net/url" "regexp" "strconv" "strings" - "time" "github.com/hookdeck/outpost/internal/destregistry/metadata" "github.com/hookdeck/outpost/internal/models" @@ -199,55 +196,3 @@ func validateField(field metadata.FieldSchema, value string, path string) *Valid func (p *BaseProvider) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *PreprocessDestinationOpts) error { return nil } - -type HTTPClientConfig struct { - Timeout *time.Duration - UserAgent *string - ProxyURL *string -} - -func (p *BaseProvider) MakeHTTPClient(config HTTPClientConfig) (*http.Client, error) { - client := &http.Client{} - - if config.Timeout != nil { - client.Timeout = *config.Timeout - } - - // Configure transport with proxy and/or user agent if needed - if config.ProxyURL != nil || config.UserAgent != nil { - // Start with default transport settings - transport := http.DefaultTransport.(*http.Transport).Clone() - - // Configure proxy if provided - if config.ProxyURL != nil && *config.ProxyURL != "" { - proxyURLParsed, err := url.Parse(*config.ProxyURL) - if err != nil { - return nil, fmt.Errorf("invalid proxy URL: %w", err) - } - transport.Proxy = http.ProxyURL(proxyURLParsed) - } - - // Wrap transport with user agent if needed - if config.UserAgent != nil { - client.Transport = &userAgentTransport{ - userAgent: *config.UserAgent, - transport: transport, - } - } else { - client.Transport = transport - } - } - - return client, nil -} - -// userAgentTransport wraps an http.RoundTripper to inject a User-Agent header -type userAgentTransport struct { - userAgent string - transport http.RoundTripper -} - -func (t *userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) { - req.Header.Set("User-Agent", t.userAgent) - return t.transport.RoundTrip(req) -} diff --git a/internal/destregistry/baseprovider_test.go b/internal/destregistry/baseprovider_test.go index 58c209537..a335d434c 100644 --- a/internal/destregistry/baseprovider_test.go +++ b/internal/destregistry/baseprovider_test.go @@ -14,9 +14,6 @@ import ( func TestMakeHTTPClient_UserAgent(t *testing.T) { t.Parallel() - provider, err := newMockProvider() - require.NoError(t, err) - t.Run("sets user agent on requests", func(t *testing.T) { t.Parallel() @@ -30,7 +27,7 @@ func TestMakeHTTPClient_UserAgent(t *testing.T) { // Create client with user agent userAgent := "TestAgent/1.0" - client, err := provider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ UserAgent: &userAgent, }) require.NoError(t, err) @@ -49,7 +46,7 @@ func TestMakeHTTPClient_UserAgent(t *testing.T) { t.Parallel() emptyUserAgent := "" - client, err := provider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ UserAgent: &emptyUserAgent, }) require.NoError(t, err) @@ -63,9 +60,6 @@ func TestMakeHTTPClient_UserAgent(t *testing.T) { func TestMakeHTTPClient_Proxy(t *testing.T) { t.Parallel() - provider, err := newMockProvider() - require.NoError(t, err) - t.Run("routes requests through proxy", func(t *testing.T) { t.Parallel() @@ -90,7 +84,7 @@ func TestMakeHTTPClient_Proxy(t *testing.T) { defer targetServer.Close() // Create client with proxy configured - client, err := provider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ ProxyURL: &proxyServer.URL, }) require.NoError(t, err) @@ -109,7 +103,7 @@ func TestMakeHTTPClient_Proxy(t *testing.T) { t.Parallel() invalidProxy := "://invalid-url" - _, err := provider.MakeHTTPClient(destregistry.HTTPClientConfig{ + _, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ ProxyURL: &invalidProxy, }) diff --git a/internal/destregistry/httpclient.go b/internal/destregistry/httpclient.go new file mode 100644 index 000000000..a1bfc580f --- /dev/null +++ b/internal/destregistry/httpclient.go @@ -0,0 +1,71 @@ +package destregistry + +import ( + "fmt" + "net/http" + "net/url" + "time" +) + +type HTTPClientConfig struct { + Timeout *time.Duration + UserAgent *string + ProxyURL *string + // WrapTransport, if set, is invoked after a proxy URL has been installed + // on the *http.Transport. Callers can use it to attach proxy-specific + // concerns (e.g. OnProxyConnectResponse callbacks, response classifiers) + // without bleeding those concerns into destregistry itself. Receives the + // underlying transport plus the parsed proxy URL; returns the + // RoundTripper to use thereafter. + WrapTransport func(*http.Transport, *url.URL) http.RoundTripper +} + +// NewHTTPClient builds an *http.Client from config. Free function — no +// provider state is involved. +func NewHTTPClient(config HTTPClientConfig) (*http.Client, error) { + client := &http.Client{} + + if config.Timeout != nil { + client.Timeout = *config.Timeout + } + + if config.ProxyURL == nil && config.UserAgent == nil { + return client, nil + } + + transport := http.DefaultTransport.(*http.Transport).Clone() + + var rt http.RoundTripper = transport + + if config.ProxyURL != nil && *config.ProxyURL != "" { + proxyURLParsed, err := url.Parse(*config.ProxyURL) + if err != nil { + return nil, fmt.Errorf("invalid proxy URL: %w", err) + } + transport.Proxy = http.ProxyURL(proxyURLParsed) + if config.WrapTransport != nil { + rt = config.WrapTransport(transport, proxyURLParsed) + } + } + + if config.UserAgent != nil { + rt = &userAgentTransport{ + userAgent: *config.UserAgent, + transport: rt, + } + } + + client.Transport = rt + return client, nil +} + +// userAgentTransport wraps an http.RoundTripper to inject a User-Agent header +type userAgentTransport struct { + userAgent string + transport http.RoundTripper +} + +func (t *userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Set("User-Agent", t.userAgent) + return t.transport.RoundTrip(req) +} diff --git a/internal/destregistry/providers/desthookdeck/desthookdeck.go b/internal/destregistry/providers/desthookdeck/desthookdeck.go index 9780b191a..af9b01d1c 100644 --- a/internal/destregistry/providers/desthookdeck/desthookdeck.go +++ b/internal/destregistry/providers/desthookdeck/desthookdeck.go @@ -150,7 +150,7 @@ func (p *HookdeckProvider) CreatePublisher(ctx context.Context, destination *mod client = p.httpClient } else { var err error - client, err = p.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client, err = destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ UserAgent: &p.userAgent, }) if err != nil { diff --git a/internal/destregistry/providers/destwebhook/destwebhook.go b/internal/destregistry/providers/destwebhook/destwebhook.go index 42e9ad21f..21ebef459 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook.go +++ b/internal/destregistry/providers/destwebhook/destwebhook.go @@ -342,9 +342,10 @@ func (d *WebhookDestination) CreatePublisher(ctx context.Context, destination *m proxyURL = &d.proxyURL } - httpClient, err := d.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{ - UserAgent: &d.userAgent, - ProxyURL: proxyURL, + httpClient, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ + UserAgent: &d.userAgent, + ProxyURL: proxyURL, + WrapTransport: WrapTransport, }) if err != nil { return nil, err diff --git a/internal/destregistry/providers/destwebhook/httphelper.go b/internal/destregistry/providers/destwebhook/httphelper.go index 25f9c9960..e1978c935 100644 --- a/internal/destregistry/providers/destwebhook/httphelper.go +++ b/internal/destregistry/providers/destwebhook/httphelper.go @@ -2,6 +2,7 @@ package destwebhook import ( "context" + "errors" "fmt" "io" "net/http" @@ -21,20 +22,63 @@ type HTTPRequestResult struct { } // ExecuteHTTPRequest executes an HTTP request and classifies the result. -// All errors return a Delivery object with a classified error code. +// +// Most errors return a Delivery object with a classified error code so the +// caller can record a failed attempt. +// +// Proxy *infrastructure* errors (ErrProxyInfra) return Delivery: nil so the +// caller signals the queue to nack the message instead of recording a +// customer-visible attempt. See registry.go for the nil-attempt handling. +// // See: https://github.com/hookdeck/outpost/issues/571 func ExecuteHTTPRequest(ctx context.Context, client *http.Client, req *http.Request, provider string) *HTTPRequestResult { resp, err := client.Do(req) if err != nil { + // Proxy infrastructure error: nack via nil Delivery so the customer's + // retry budget is not charged for our infra outage. + var infraErr *ErrProxyInfra + if errors.As(err, &infraErr) { + return &HTTPRequestResult{ + Delivery: nil, + Error: destregistry.NewErrDestinationPublishAttempt(err, provider, map[string]interface{}{ + "error": "proxy_infrastructure", + "message": infraErr.Error(), + }), + Response: nil, + } + } + + // Proxy-attributed destination error: use the explicit Code instead of + // substring-matching the underlying error. + var code string + var destErr *ErrProxyDestination + if errors.As(err, &destErr) { + code = destErr.Code + } else { + code = ClassifyNetworkError(err) + } + + data := map[string]interface{}{ + "error": "request_failed", + "message": err.Error(), + } + // Attach raw proxy diagnostics (e.g. Envoy flag + response-code-details) + // to the publish-attempt error so operators can grep them in logs. + // Keys are owned by whichever proxy populated them. Not placed in the + // delivery's ResponseData — customer-visible attempt stays free of + // proxy details. + if destErr != nil { + for k, v := range destErr.Diagnostics { + data[k] = v + } + } + return &HTTPRequestResult{ Delivery: &destregistry.Delivery{ Status: "failed", - Code: ClassifyNetworkError(err), + Code: code, }, - Error: destregistry.NewErrDestinationPublishAttempt(err, provider, map[string]interface{}{ - "error": "request_failed", - "message": err.Error(), - }), + Error: destregistry.NewErrDestinationPublishAttempt(err, provider, data), Response: nil, } } diff --git a/internal/destregistry/providers/destwebhook/proxytransport.go b/internal/destregistry/providers/destwebhook/proxytransport.go new file mode 100644 index 000000000..a956a0170 --- /dev/null +++ b/internal/destregistry/providers/destwebhook/proxytransport.go @@ -0,0 +1,391 @@ +package destwebhook + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "sort" + "strings" +) + +// WrapTransport is the destregistry.HTTPClientConfig.WrapTransport hook for +// webhook destinations. It installs OnProxyConnectResponse on the underlying +// transport and wraps it in proxyTransport so request- and response-time +// proxy failures get translated into ErrProxyInfra / ErrProxyDestination. +func WrapTransport(transport *http.Transport, proxyURL *url.URL) http.RoundTripper { + transport.OnProxyConnectResponse = onProxyConnectResponse + return newProxyTransport(transport, proxyURL) +} + +// ErrProxyInfra signals that a webhook delivery failed at the proxy layer +// (proxy auth misconfiguration, proxy unreachable, etc.). The delivery result +// is nacked so the underlying message queue redelivers without recording a +// customer-visible failed attempt. +type ErrProxyInfra struct { + Underlying error + DestHost string +} + +func (e *ErrProxyInfra) Error() string { + if e.DestHost == "" { + return "proxy infrastructure error" + } + return fmt.Sprintf("proxy infrastructure error reaching %s", e.DestHost) +} + +func (e *ErrProxyInfra) Unwrap() error { + return e.Underlying +} + +// ErrProxyDestination signals that the proxy reported a failure originating at +// the destination (e.g. upstream DNS lookup failed, upstream connection +// refused, upstream timeout). The delivery result is recorded as a normal +// failed attempt using Code as the classification, with response data +// rewritten so the customer sees a destination-attributed failure rather than +// proxy-attributed details. +// +// Diagnostics is a free-form key/value map of proxy-specific signals the +// classification path picked up (e.g. for Envoy, "envoy_flag" and +// "envoy_details"). It is operator-side metadata only: surfaced in error +// logs and on the publish-attempt error payload, never written to the +// customer-visible attempt record. Whichever proxy emitted the data owns +// the key naming so heterogeneous proxies can coexist without colliding. +type ErrProxyDestination struct { + Underlying error + Code string + DestHost string + Diagnostics map[string]string +} + +func (e *ErrProxyDestination) Error() string { + msg := e.Code + if e.DestHost != "" { + msg = fmt.Sprintf("%s connecting to %s", e.Code, e.DestHost) + } + // Append proxy diagnostics so they surface in zap.Error(err) logs at the + // consumer boundary. Customer-visible attempt code is still just e.Code. + if len(e.Diagnostics) == 0 { + return msg + } + keys := make([]string, 0, len(e.Diagnostics)) + for k := range e.Diagnostics { + keys = append(keys, k) + } + sort.Strings(keys) + parts := make([]string, 0, len(keys)) + for _, k := range keys { + parts = append(parts, fmt.Sprintf("%s=%s", k, e.Diagnostics[k])) + } + return fmt.Sprintf("%s (%s)", msg, strings.Join(parts, ", ")) +} + +func (e *ErrProxyDestination) Unwrap() error { + return e.Underlying +} + +// IsProxyInfraError reports whether err is or wraps an ErrProxyInfra. +func IsProxyInfraError(err error) bool { + var pe *ErrProxyInfra + return errors.As(err, &pe) +} + +// MapEnvoyResponseFlag returns the destination error code corresponding to an +// Envoy response flag. The output vocabulary matches ClassifyNetworkError +// (httphelper.go) so customers see the same codes whether or not a proxy is +// in path. Unhandled flags fall through to "network_error" — operators should +// watch for that code paired with a non-empty flag in the attempt error +// payload as a signal that the mapping needs expansion. +// +// Envoy response flag reference: +// https://www.envoyproxy.io/docs/envoy/latest/configuration/observability/access_log/usage#config-access-log-format-response-flags +func MapEnvoyResponseFlag(flag string) string { + switch flag { + case "UF", "UH", "LH": + // UF: upstream connection failure (TCP dial failed) + // UH: no healthy upstream + // LH: failed local health check + return "connection_refused" + case "UC", "UR", "LR": + // UC: upstream connection termination (established then dropped) + // UR: upstream remote reset + // LR: local reset + return "connection_reset" + case "UT", "SI", "DT", "UMSDR": + // UT: upstream request timeout + // SI: stream idle timeout + // DT: downstream global duration timeout + // UMSDR: upstream max stream duration reached + return "timeout" + case "DF": + // DF: DNS resolution failure (emitted by dynamic_forward_proxy when + // the upstream host cannot be resolved). Verified empirically against + // the reference Envoy config in build/dev/envoy/envoy.yaml. + return "dns_error" + case "NR", "NC": + // NR: no route configured + // NC: upstream cluster not found + return "network_unreachable" + case "UPE", "DPE": + // UPE: upstream protocol error + // DPE: downstream protocol error + return "protocol_error" + default: + return "network_error" + } +} + +// proxyTransport wraps an http.RoundTripper to translate proxy-originated +// failures into ErrProxyInfra / ErrProxyDestination so the delivery pipeline +// can attribute them correctly. +// +// There are two distinct surfaces where the forward proxy can affect the +// transaction, and the wrapper handles them in two different places: +// +// 1. CONNECT-time (HTTPS path) — Go's transport sends a CONNECT request to +// the proxy. Whatever the proxy responds with is visible to Outpost via +// onProxyConnectResponse, installed on the underlying http.Transport. +// This is where proxy auth (407/401/403), proxy-can't-reach-upstream +// (5xx with Envoy response flags), and similar are classified. +// +// Once CONNECT succeeds, a raw TCP tunnel is open. TLS runs end-to-end +// between Outpost and the destination; the forward proxy is byte-blind +// to everything that follows. The response we read from base.RoundTrip +// therefore came entirely from the destination side and is not inspected +// or sanitized by this wrapper — see the scheme check in RoundTrip. +// +// 2. Plain-HTTP forwarding path — the proxy reads the request, makes the +// upstream call, and writes the response back to Outpost. The proxy is +// in the byte path on the return, so we can detect Envoy-synthesized +// responses (via x-envoy-response-flags) and strip x-envoy-* / server +// fingerprints. +// +// Dial-to-proxy failures (TCP to the proxy itself fails) are detected here +// from the wrapped *net.OpError whose Op == "proxyconnect" (still emitted by +// Go's stdlib; see proxytransport_pin_test.go for the snapshot guarding the +// wording). +type proxyTransport struct { + base http.RoundTripper + proxyURL *url.URL +} + +func newProxyTransport(base http.RoundTripper, proxyURL *url.URL) *proxyTransport { + return &proxyTransport{base: base, proxyURL: proxyURL} +} + +func (t *proxyTransport) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := t.base.RoundTrip(req) + if err != nil { + if mapped := t.classifyTransportError(err, req); mapped != nil { + return nil, mapped + } + return nil, err + } + + // HTTPS response: bytes came through an established CONNECT tunnel, end- + // to-end TLS between Outpost and the destination. The forward proxy + // physically cannot have read, modified, or synthesized any of these + // bytes. Any x-envoy-* / server: envoy headers we see belong to the + // destination (often itself behind Envoy at its edge), not to us. + // Treating them as ours would destroy destination observability and + // misattribute genuine destination failures as proxy-reported. Proxy- + // originated HTTPS failures all happen at CONNECT time and are handled + // in onProxyConnectResponse before we ever reach this branch. + if req.URL.Scheme == "https" { + return resp, nil + } + + // Plain-HTTP forwarding path: the forward proxy was in the byte path on + // the return and may have synthesized this response itself. + + // Envoy-synthesized response: x-envoy-response-flags carries a non-empty, + // non-placeholder value (the placeholder "-" means "no flag fired", + // i.e. a real upstream response was passed through). Convert to a + // destination-attributed error so the synthesized body never reaches the + // delivery record. + if flag := envoyResponseFlag(resp.Header); flag != "" { + details := envoyResponseDetails(resp.Header) + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + return nil, &ErrProxyDestination{ + Underlying: synthesizedErr(flag, details, resp.StatusCode), + Code: MapEnvoyResponseFlag(flag), + DestHost: destHostFromRequest(req), + Diagnostics: envoyDiagnostics(flag, details), + } + } + + // Real upstream response forwarded by the proxy: strip proxy fingerprints + // so they aren't persisted in the delivery record. Note: if the + // destination itself sits behind its own Envoy, this can over-strip the + // destination's x-envoy-* observability headers on the plain-HTTP path. + // HTTPS destinations are unaffected because of the byte-transparency + // branch above. + sanitizeEnvoyHeaders(resp.Header) + return resp, nil +} + +func destHostFromRequest(req *http.Request) string { + if req == nil || req.URL == nil { + return "" + } + host := req.URL.Host + if h, _, err := net.SplitHostPort(host); err == nil { + host = h + } + return host +} + +// sanitizeEnvoyHeaders removes Envoy-fingerprinting headers from a response so +// the destination's identity in the delivery record is not contaminated with +// proxy details. Safe no-op when no envoy headers are present. +func sanitizeEnvoyHeaders(h http.Header) { + for k := range h { + // http.Header keys are canonicalized, so the prefix is fixed-case. + if strings.HasPrefix(k, "X-Envoy-") { + h.Del(k) + } + } + if strings.EqualFold(h.Get("Server"), "envoy") { + h.Del("Server") + } +} + +func (t *proxyTransport) classifyTransportError(err error, req *http.Request) error { + // CONNECT-time errors arrive pre-typed via onProxyConnectResponse — pass + // through unchanged. + var infraErr *ErrProxyInfra + if errors.As(err, &infraErr) { + return nil + } + var destErr *ErrProxyDestination + if errors.As(err, &destErr) { + return nil + } + + destHost := "" + if req != nil && req.URL != nil { + destHost = req.URL.Host + } + + // Dial-to-proxy failure: Go wraps these in &net.OpError{Op: "proxyconnect"} + // even in current versions. The wrap is the most reliable signal that the + // proxy itself is unreachable (vs. an arbitrary network error en route to + // the destination, which would be a destination problem). The wording is + // pinned by TestProxyTransport_PinProxyconnectWording in + // proxytransport_pin_test.go — update both together if it ever changes. + if strings.Contains(err.Error(), "proxyconnect") { + return &ErrProxyInfra{Underlying: err, DestHost: destHost} + } + + return nil +} + +// onProxyConnectResponse is installed on the underlying http.Transport and +// fires for every CONNECT response from the proxy. Non-200 responses are +// translated into the appropriate sentinel here, where the full response +// (status code + headers, including x-envoy-response-flags) is still +// available. +func onProxyConnectResponse(ctx context.Context, proxyURL *url.URL, connectReq *http.Request, resp *http.Response) error { + if resp.StatusCode == http.StatusOK { + return nil + } + + // CONNECT requests set the target as Request.Host (and URL.Opaque), not + // URL.Host. See net/http transport.go: connectReq is built with + // URL: &url.URL{Opaque: targetAddr}, Host: targetAddr. + destHost := "" + if connectReq != nil { + switch { + case connectReq.Host != "": + destHost = connectReq.Host + case connectReq.URL != nil: + destHost = connectReq.URL.Host + } + } + if h, _, err := net.SplitHostPort(destHost); err == nil { + destHost = h + } + + switch resp.StatusCode { + case http.StatusProxyAuthRequired, + http.StatusUnauthorized, + http.StatusForbidden: + // Auth-related failures are operator misconfiguration of proxy + // credentials — proxy infrastructure problem, not destination. + return &ErrProxyInfra{ + Underlying: fmt.Errorf("proxy returned %s", resp.Status), + DestHost: destHost, + } + } + + // Other non-200 statuses indicate the proxy could not establish the tunnel + // to the destination. Attribute to destination; refine the code from the + // Envoy response flag when present. + code := "connection_refused" + flag := envoyResponseFlag(resp.Header) + details := envoyResponseDetails(resp.Header) + if flag != "" { + code = MapEnvoyResponseFlag(flag) + } + + return &ErrProxyDestination{ + Underlying: fmt.Errorf("proxy returned %s", resp.Status), + Code: code, + DestHost: destHost, + Diagnostics: envoyDiagnostics(flag, details), + } +} + +// envoyResponseFlag returns the meaningful value of the x-envoy-response-flags +// header, or "" if the header is absent / placeholder "-" / empty. +func envoyResponseFlag(h http.Header) string { + v := strings.TrimSpace(h.Get("x-envoy-response-flags")) + if v == "" || v == "-" { + return "" + } + return v +} + +// envoyResponseDetails returns the meaningful value of the +// x-envoy-response-code-details header (stage{reason} form when both are +// present, stage-only otherwise), or "" if the header is absent / empty / +// placeholder "-". Captured for operator-side diagnostics; never inspected +// for classification. +func envoyResponseDetails(h http.Header) string { + v := strings.TrimSpace(h.Get("x-envoy-response-code-details")) + if v == "" || v == "-" { + return "" + } + return v +} + +// envoyDiagnostics returns the diagnostics map for an Envoy-attributed +// destination error. Returns nil when both inputs are empty so the caller +// gets a properly-zero Diagnostics field (len-0 nil map). +func envoyDiagnostics(flag, details string) map[string]string { + if flag == "" && details == "" { + return nil + } + d := make(map[string]string, 2) + if flag != "" { + d["envoy_flag"] = flag + } + if details != "" { + d["envoy_details"] = details + } + return d +} + +// synthesizedErr builds the underlying error wrapped inside ErrProxyDestination +// for an Envoy-synthesized plain-HTTP response. Details are included when +// present so the operator-side log line carries the Envoy reason string. +func synthesizedErr(flag, details string, status int) error { + if details != "" { + return fmt.Errorf("proxy synthesized response (flag %s, details %s, status %d)", flag, details, status) + } + return fmt.Errorf("proxy synthesized response (flag %s, status %d)", flag, status) +} diff --git a/internal/destregistry/providers/destwebhook/proxytransport_internal_test.go b/internal/destregistry/providers/destwebhook/proxytransport_internal_test.go new file mode 100644 index 000000000..fba207652 --- /dev/null +++ b/internal/destregistry/providers/destwebhook/proxytransport_internal_test.go @@ -0,0 +1,102 @@ +package destwebhook + +import ( + "io" + "net/http" + "net/url" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } + +// TestProxyTransport_HTTPSResponse_PassesEnvoyHeadersThrough is the core +// regression test for destinations that themselves sit behind Envoy. After +// CONNECT succeeds the forward proxy is byte-blind to the response, so any +// envoy headers we see belong to the destination. The wrapper must not +// touch them. +// +// Constructed via stubbed RoundTripper rather than a real TLS-capable proxy +// because the only contract under test is "do not modify the response when +// scheme is https". +func TestProxyTransport_HTTPSResponse_PassesEnvoyHeadersThrough(t *testing.T) { + t.Parallel() + + stub := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set("server", "envoy") + h.Set("x-envoy-response-flags", "UF") + h.Set("x-envoy-upstream-service-time", "42") + return &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: h, + Body: io.NopCloser(strings.NewReader("destination's real 503 body")), + Request: req, + }, nil + }) + proxyURL, _ := url.Parse("http://example.proxy:8080") + wrapper := newProxyTransport(stub, proxyURL) + + req, err := http.NewRequest(http.MethodPost, "https://api.dest.example/hook", nil) + require.NoError(t, err) + + resp, err := wrapper.RoundTrip(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + assert.Equal(t, "destination's real 503 body", string(body), + "destination's response body must reach Outpost unchanged") + + assert.Equal(t, "envoy", resp.Header.Get("server"), + "destination's server header must not be stripped") + assert.Equal(t, "UF", resp.Header.Get("x-envoy-response-flags"), + "destination's response-flag header must survive (it's their observability data)") + assert.Equal(t, "42", resp.Header.Get("x-envoy-upstream-service-time"), + "destination's upstream-service-time must survive") +} + +// TestProxyTransport_PlainHTTPResponse_StillStripsEnvoyHeaders asserts the +// counterpart: on the plain-HTTP forwarding path our forward proxy *is* in +// the byte path on the response, so envoy-fingerprint stripping still +// applies. This documents the residual limitation noted in +// webhook-proxy.mdoc: a plain-HTTP destination that is itself behind Envoy +// will have some observability headers stripped here. Attribution stays +// correct because x-envoy-response-flags is overwritten by the forward +// Envoy via OVERWRITE_IF_EXISTS_OR_ADD. +func TestProxyTransport_PlainHTTPResponse_StillStripsEnvoyHeaders(t *testing.T) { + t.Parallel() + + stub := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set("server", "envoy") + h.Set("x-envoy-response-flags", "-") // pass-through placeholder (set by our forward Envoy) + h.Set("x-envoy-upstream-service-time", "42") + return &http.Response{ + StatusCode: http.StatusOK, + Header: h, + Body: io.NopCloser(strings.NewReader("ok")), + Request: req, + }, nil + }) + proxyURL, _ := url.Parse("http://example.proxy:8080") + wrapper := newProxyTransport(stub, proxyURL) + + req, err := http.NewRequest(http.MethodPost, "http://api.dest.example/hook", nil) + require.NoError(t, err) + + resp, err := wrapper.RoundTrip(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Empty(t, resp.Header.Get("server")) + assert.Empty(t, resp.Header.Get("x-envoy-response-flags")) + assert.Empty(t, resp.Header.Get("x-envoy-upstream-service-time")) +} diff --git a/internal/destregistry/providers/destwebhook/proxytransport_pin_test.go b/internal/destregistry/providers/destwebhook/proxytransport_pin_test.go new file mode 100644 index 000000000..588d9a436 --- /dev/null +++ b/internal/destregistry/providers/destwebhook/proxytransport_pin_test.go @@ -0,0 +1,62 @@ +package destwebhook_test + +import ( + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" + "github.com/stretchr/testify/require" +) + +// TestProxyTransport_PinProxyconnectWording guards a load-bearing assumption +// in proxytransport.go: Go's net/http transport wraps dial-to-proxy failures +// in a *net.OpError whose Op field is "proxyconnect", and that token appears +// in the resulting error's Error() string. This is not a public/typed API — +// it's an internal stdlib convention that has been stable for many Go +// versions but could change. +// +// If this test fails after a Go upgrade, the proxyTransport.classifyTransportError +// detector for proxy-unreachable failures needs to be updated to whatever the +// new wording is. Without this pin, that breakage would be silent and result +// in proxy outages no longer being attributed to infra (instead they would +// fall through and be classified as ordinary network errors against the +// destination). +// +// Pinned for go version recorded in go.mod. +func TestProxyTransport_PinProxyconnectWording(t *testing.T) { + t.Parallel() + + // Bind a port, then close — guaranteed unbound for the rest of the test. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + unreachable := srv.URL + srv.Close() + + // Bypass the wrapper: build a raw transport so we observe the stdlib + // error directly. + rawTransport, ok := http.DefaultTransport.(*http.Transport) + require.True(t, ok) + rt := rawTransport.Clone() + proxyURL, err := url.Parse(unreachable) + require.NoError(t, err) + rt.Proxy = http.ProxyURL(proxyURL) + client := &http.Client{Transport: rt} + + _, err = client.Get("https://example.invalid/") + require.Error(t, err) + require.True(t, + strings.Contains(err.Error(), "proxyconnect"), + "stdlib changed CONNECT-failure wording; update proxyTransport detector. err = %q", err.Error(), + ) + + // Also pin: the wrapper still classifies this as ErrProxyInfra. + wrappedClient := makeProxiedClient(t, unreachable) + _, err = wrappedClient.Get("https://example.invalid/") + require.Error(t, err) + require.True(t, destwebhook.IsProxyInfraError(err), + "wrapper failed to identify proxy-unreachable as infra; err = %v", err) +} diff --git a/internal/destregistry/providers/destwebhook/proxytransport_test.go b/internal/destregistry/providers/destwebhook/proxytransport_test.go new file mode 100644 index 000000000..0106a9734 --- /dev/null +++ b/internal/destregistry/providers/destwebhook/proxytransport_test.go @@ -0,0 +1,378 @@ +package destwebhook_test + +import ( + "errors" + "io" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newCONNECTRejectingProxy returns an httptest server that responds to any +// CONNECT request with the given status code. Non-CONNECT requests get 200. +func newCONNECTRejectingProxy(t *testing.T, status int) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodConnect { + w.WriteHeader(status) + return + } + w.WriteHeader(http.StatusOK) + })) +} + +// makeProxiedClient constructs an HTTP client routed through proxyURL using +// the proxyTransport wrapper, matching the NewHTTPClient flow. +func makeProxiedClient(t *testing.T, proxyURL string) *http.Client { + t.Helper() + client, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ + ProxyURL: &proxyURL, + WrapTransport: destwebhook.WrapTransport, + }) + require.NoError(t, err) + return client +} + +func TestProxyTransport_ConnectAuthFailure_ReturnsInfraError(t *testing.T) { + t.Parallel() + + proxy := newCONNECTRejectingProxy(t, http.StatusProxyAuthRequired) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + + // HTTPS target triggers CONNECT. + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var infraErr *destwebhook.ErrProxyInfra + require.True(t, errors.As(err, &infraErr), + "expected ErrProxyInfra for proxy 407, got: %v", err) + assert.Equal(t, "example.invalid", infraErr.DestHost) +} + +func TestProxyTransport_ConnectBadGateway_ReturnsDestinationError(t *testing.T) { + t.Parallel() + + proxy := newCONNECTRejectingProxy(t, http.StatusBadGateway) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr), + "expected ErrProxyDestination for proxy 502, got: %v", err) + assert.Equal(t, "connection_refused", destErr.Code) + assert.Equal(t, "example.invalid", destErr.DestHost) + + // Must not be ErrProxyInfra (would cause incorrect nack). + var infraErr *destwebhook.ErrProxyInfra + assert.False(t, errors.As(err, &infraErr), + "5xx from proxy must not be infra error") +} + +func TestProxyTransport_ConnectServiceUnavailable_ReturnsDestinationError(t *testing.T) { + t.Parallel() + + proxy := newCONNECTRejectingProxy(t, http.StatusServiceUnavailable) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr), + "expected ErrProxyDestination for proxy 503, got: %v", err) + assert.Equal(t, "connection_refused", destErr.Code) +} + +func TestProxyTransport_ProxyUnreachable_ReturnsInfraError(t *testing.T) { + t.Parallel() + + // Bind a port, then close — the address is guaranteed reserved-but-unbound + // for the rest of the test (avoids hard-coded "unlikely" ports). + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + unreachableURL := srv.URL + srv.Close() + + client := makeProxiedClient(t, unreachableURL) + + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var infraErr *destwebhook.ErrProxyInfra + require.True(t, errors.As(err, &infraErr), + "expected ErrProxyInfra when proxy is unreachable, got: %v", err) +} + +func TestProxyTransport_SuccessfulRequest_PassesThrough(t *testing.T) { + t.Parallel() + + // Plain-HTTP target so we exercise the proxy's forward path (no CONNECT). + target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("X-Test", "real-upstream") + w.WriteHeader(http.StatusOK) + })) + defer target.Close() + + // Minimal forward proxy: the client sends absolute-URI requests, so r.URL + // is the actual target. We just re-dispatch. + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + outReq, err := http.NewRequestWithContext(r.Context(), r.Method, r.URL.String(), r.Body) + if err != nil { + w.WriteHeader(http.StatusBadGateway) + return + } + for k, vs := range r.Header { + for _, v := range vs { + outReq.Header.Add(k, v) + } + } + resp, err := http.DefaultClient.Do(outReq) + if err != nil { + w.WriteHeader(http.StatusBadGateway) + return + } + defer resp.Body.Close() + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + })) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + + resp, err := client.Get(target.URL + "/somepath") + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "real-upstream", resp.Header.Get("X-Test")) +} + +func TestProxyTransport_ErrProxyInfra_DoesNotLeakProxyDetails(t *testing.T) { + t.Parallel() + + proxy := newCONNECTRejectingProxy(t, http.StatusProxyAuthRequired) + defer proxy.Close() + + proxyURL, _ := url.Parse(proxy.URL) + + client := makeProxiedClient(t, proxy.URL) + + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var infraErr *destwebhook.ErrProxyInfra + require.True(t, errors.As(err, &infraErr)) + + // Sanitized message must not contain the proxy host/port. + assert.NotContains(t, infraErr.Error(), proxyURL.Host, + "ErrProxyInfra.Error() must not leak proxy address") + assert.Contains(t, infraErr.Error(), "example.invalid", + "ErrProxyInfra.Error() should reference the destination host") +} + +// newEnvoySynthesizedProxy returns a forwarding proxy that, instead of +// forwarding, responds with an Envoy-synthesized 5xx and the configured +// response flag (plus optional response-code-details). +func newEnvoySynthesizedProxy(t *testing.T, status int, flag, details string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("server", "envoy") + w.Header().Set("x-envoy-response-flags", flag) + if details != "" { + w.Header().Set("x-envoy-response-code-details", details) + } + w.WriteHeader(status) + _, _ = w.Write([]byte("upstream connect error or disconnect/reset before headers")) + })) +} + +func TestProxyTransport_EnvoySynthesizedResponse_UF_ReturnsConnectionRefused(t *testing.T) { + t.Parallel() + + proxy := newEnvoySynthesizedProxy(t, http.StatusBadGateway, "UF", "") + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("http://example.invalid/hook") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr), + "expected ErrProxyDestination for envoy UF flag, got: %v", err) + assert.Equal(t, "connection_refused", destErr.Code) + assert.Equal(t, "example.invalid", destErr.DestHost) +} + +func TestProxyTransport_EnvoySynthesizedResponse_UT_ReturnsTimeout(t *testing.T) { + t.Parallel() + + proxy := newEnvoySynthesizedProxy(t, http.StatusGatewayTimeout, "UT", "") + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("http://example.invalid/hook") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr)) + assert.Equal(t, "timeout", destErr.Code) +} + +func TestProxyTransport_EnvoySynthesizedResponse_DF_ReturnsDNSError(t *testing.T) { + t.Parallel() + + proxy := newEnvoySynthesizedProxy(t, http.StatusServiceUnavailable, "DF", "dns_resolution_failure") + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("http://example.invalid/hook") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr)) + assert.Equal(t, "dns_error", destErr.Code) + assert.Equal(t, "DF", destErr.Diagnostics["envoy_flag"], "envoy_flag captured for operator diagnostics") + assert.Equal(t, "dns_resolution_failure", destErr.Diagnostics["envoy_details"], "envoy_details captured for operator diagnostics") +} + +func TestProxyTransport_EnvoySynthesizedResponse_UC_ReturnsConnectionReset(t *testing.T) { + t.Parallel() + + // UC = upstream connection termination (established then dropped) — must + // classify as connection_reset, distinct from UF/connection_refused. + proxy := newEnvoySynthesizedProxy(t, http.StatusBadGateway, "UC", "upstream_reset_after_response_started{connection_termination}") + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("http://example.invalid/hook") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr)) + assert.Equal(t, "connection_reset", destErr.Code) + assert.Equal(t, "UC", destErr.Diagnostics["envoy_flag"]) + assert.Equal(t, "upstream_reset_after_response_started{connection_termination}", destErr.Diagnostics["envoy_details"]) +} + +func TestProxyTransport_EnvoyPlaceholderFlag_PassesResponseThrough(t *testing.T) { + t.Parallel() + + // "-" is the placeholder Envoy emits when no flag fired (pass-through + // success path). Must not be treated as synthesized. + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("x-envoy-response-flags", "-") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("real upstream body")) + })) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + resp, err := client.Get("http://example.invalid/hook") + require.NoError(t, err) + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "real upstream body", string(body)) + // Header stripped on success. + assert.Empty(t, resp.Header.Get("x-envoy-response-flags")) +} + +func TestProxyTransport_StripsEnvoyHeaders_OnSuccess(t *testing.T) { + t.Parallel() + + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("server", "envoy") + w.Header().Set("x-envoy-upstream-service-time", "12") + w.Header().Set("x-envoy-attempt-count", "1") + w.Header().Set("x-real-header", "kept") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + })) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + resp, err := client.Get("http://example.invalid/hook") + require.NoError(t, err) + defer resp.Body.Close() + + assert.Empty(t, resp.Header.Get("server")) + assert.Empty(t, resp.Header.Get("x-envoy-upstream-service-time")) + assert.Empty(t, resp.Header.Get("x-envoy-attempt-count")) + assert.Equal(t, "kept", resp.Header.Get("x-real-header")) +} + +func TestProxyTransport_EnvoyConnectFlag_RefinesInfraErrorCode(t *testing.T) { + t.Parallel() + + // Envoy CONNECT failure with a response-flag header. The + // OnProxyConnectResponse callback sees the headers and refines the + // destination error code from the flag instead of the generic default. + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodConnect { + w.Header().Set("x-envoy-response-flags", "DF") + w.WriteHeader(http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + })) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr), + "expected ErrProxyDestination, got: %v", err) + assert.Equal(t, "dns_error", destErr.Code, + "envoy response-flag DF should refine generic connection_refused to dns_error") +} + +func TestMapEnvoyResponseFlag(t *testing.T) { + t.Parallel() + + cases := map[string]string{ + "UF": "connection_refused", + "UH": "connection_refused", + "LH": "connection_refused", + "UC": "connection_reset", + "UR": "connection_reset", + "LR": "connection_reset", + "UT": "timeout", + "SI": "timeout", + "DT": "timeout", + "UMSDR": "timeout", + "DF": "dns_error", + "NR": "network_unreachable", + "NC": "network_unreachable", + "UPE": "protocol_error", + "DPE": "protocol_error", + "unknown": "network_error", + } + for flag, want := range cases { + t.Run(flag, func(t *testing.T) { + t.Parallel() + assert.Equal(t, want, destwebhook.MapEnvoyResponseFlag(flag)) + }) + } +} diff --git a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go index 00648ca9f..62acf0fb5 100644 --- a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go +++ b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go @@ -229,9 +229,10 @@ func (d *StandardWebhookDestination) CreatePublisher(ctx context.Context, destin proxyURL = &d.proxyURL } - httpClient, err := d.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{ - UserAgent: &d.userAgent, - ProxyURL: proxyURL, + httpClient, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ + UserAgent: &d.userAgent, + ProxyURL: proxyURL, + WrapTransport: destwebhook.WrapTransport, }) if err != nil { return nil, err From 2aa4cec1574caaa599ff2d62cb38ed3ab69816bb Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 14 May 2026 00:04:08 +0700 Subject: [PATCH 2/2] fix: increase consumer error tolerance for transient infra outages (#900) Previously the consumer gave up after 5 consecutive receive errors with a 5s backoff cap (~3s total tolerance), permanently killing the worker with no recovery path. A brief broker hiccup (e.g. GCP OAuth/DNS blip, managed broker restart) was enough to take down logmq/deliverymq workers across deployments until containers were manually restarted. Mirrors the same fix applied to the retrymq scheduler in #881. Increase to 10 errors with 15s backoff cap (~1 min tolerance window). Co-authored-by: Claude Opus 4.7 (1M context) --- internal/consumer/consumer.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index b734e2410..1bbc6401c 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -12,10 +12,30 @@ import ( "go.uber.org/zap" ) +// Error retry schedule (with 200ms initial backoff): +// +// Error Backoff Cumulative +// 1 200ms 0.2s +// 2 400ms 0.6s +// 3 800ms 1.4s ← 3 retries within ~1.5s +// 4 1.6s 3.0s +// 5 3.2s 6.2s +// 6 6.4s 12.6s +// 7 12.8s 25.4s +// 8 15s (cap) 40.4s +// 9 15s (cap) 55.4s +// 10 15s (cap) 70.4s ← worker dies (~1 min total) +// +// Backoff formula: initialBackoff * 2^(attempt-1), capped at maxBackoff. +// After maxConsecutiveErrors the worker dies permanently (supervisor does +// not restart it), so these values must tolerate transient infra outages +// (e.g. brief MQ broker restarts, GCP OAuth/DNS blips) without killing the +// worker. ~1 min is sufficient for managed broker recovery from routine +// restarts or short network blips. const ( - defaultMaxConsecutiveErrors = 5 + defaultMaxConsecutiveErrors = 10 defaultInitialBackoff = 200 * time.Millisecond - defaultMaxBackoff = 5 * time.Second + defaultMaxBackoff = 15 * time.Second ) type Consumer interface {