diff --git a/build/dev/compose.yml b/build/dev/compose.yml index 16f59a4e..fbdc46b7 100644 --- a/build/dev/compose.yml +++ b/build/dev/compose.yml @@ -42,6 +42,12 @@ services: environment: SERVICE: log + envoy: + image: envoyproxy/envoy:v1.31-latest + volumes: + - ./envoy/envoy.yaml:/etc/envoy/envoy.yaml:ro + command: ["envoy", "-c", "/etc/envoy/envoy.yaml", "--log-level", "info"] + portal: build: context: ../../ diff --git a/build/dev/envoy/envoy.yaml b/build/dev/envoy/envoy.yaml new file mode 100644 index 00000000..c68bff5a --- /dev/null +++ b/build/dev/envoy/envoy.yaml @@ -0,0 +1,66 @@ +admin: + address: + socket_address: { address: 0.0.0.0, port_value: 9901 } + +static_resources: + listeners: + - name: forward_proxy + address: + socket_address: { address: 0.0.0.0, port_value: 10000 } + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: forward_proxy + codec_type: AUTO + access_log: + - name: envoy.access_loggers.stdout + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog + http_filters: + - name: envoy.filters.http.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig + dns_cache_config: + name: dfp_cache + dns_lookup_family: V4_ONLY + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + upgrade_configs: + - upgrade_type: CONNECT + route_config: + name: proxy_route + response_headers_to_add: + - header: + key: "x-envoy-response-flags" + value: "%RESPONSE_FLAGS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + - header: + key: "x-envoy-response-code-details" + value: "%RESPONSE_CODE_DETAILS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + virtual_hosts: + - name: proxy + domains: ["*"] + routes: + - match: { connect_matcher: {} } + route: + cluster: dfp_cluster + upgrade_configs: + - upgrade_type: CONNECT + connect_config: {} + - match: { prefix: "/" } + route: { cluster: dfp_cluster } + + clusters: + - name: dfp_cluster + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig + dns_cache_config: + name: dfp_cache + dns_lookup_family: V4_ONLY diff --git a/docs/content/features/webhook-proxy.mdoc b/docs/content/features/webhook-proxy.mdoc new file mode 100644 index 00000000..6dbb6342 --- /dev/null +++ b/docs/content/features/webhook-proxy.mdoc @@ -0,0 +1,188 @@ +--- +title: "Webhook Forward Proxy" +description: "Route outgoing webhook deliveries through an HTTP forward proxy for static-IP egress or network isolation." +--- + +Outpost can route outgoing webhook traffic through an HTTP forward proxy. Common reasons: + +- **Static-IP egress** — destinations that allowlist a specific source IP +- **Network isolation** — keep delivery workers off the public internet +- **Centralized egress policy** — single chokepoint for outbound traffic + +Proxy support is operator-configured and applies to every webhook destination served by the deployment. + +## Configuration + +| Env var | Description | +|---------|-------------| +| `DESTINATIONS_WEBHOOK_PROXY_URL` | Proxy URL, e.g. `http://user:pass@proxy.example.com:8080`. Supports basic auth. | + +When `DESTINATIONS_WEBHOOK_PROXY_URL` is set, Outpost installs an HTTP proxy on the webhook publisher's transport. HTTPS destinations use the standard `CONNECT` tunneling flow; HTTP destinations are forwarded request-by-request. + +## Error handling + +Putting a forward proxy in the delivery path introduces a new failure surface that should not be charged to the destination. A proxy-auth misconfiguration or a proxy outage isn't the destination's fault, and recording it as a failed delivery attempt burns retry budget on a problem the destination cannot resolve. + +Outpost distinguishes **proxy infrastructure failures** from **destination failures** (including destination failures that the proxy merely *reports* on the destination's behalf) and applies the right behavior to each. The base behavior below applies to any forward proxy; Envoy-specific signals are picked up automatically when present (see [Envoy support](#envoy-support)). + +### General behavior + +| Scenario | Attribution | Behavior | +|---|---|---| +| Proxy returns 407 / 401 / 403 on `CONNECT` | Infra | **Nack** — operator misconfiguration of proxy credentials | +| Proxy unreachable (TCP dial to proxy fails) | Infra | **Nack** — proxy infrastructure outage | +| `CONNECT` succeeds, destination returns real 4xx/5xx (HTTPS) | Destination | Record attempt with the actual status code | +| `CONNECT` succeeds, destination TLS handshake fails (HTTPS) | Destination | Record attempt (`tls_error`) | +| `CONNECT` succeeds, destination times out | Destination | Record attempt (`timeout`) | +| Proxy returns other 5xx on `CONNECT` (cannot reach destination) | Destination | Record attempt (`connection_refused`) | +| Real upstream response passed through (plain-HTTP) | Destination | Record as today | +| Proxy itself overloaded or misbehaving (rare) | Destination (conservative) | Record attempt (`network_error`) — never nack on speculation | + +**Key principle:** when the proxy reports a failure that originated at the destination (DNS, connect refused, upstream timeout), the customer still sees it as a destination failure. Outpost rewrites the message so the response data is destination-attributed, not proxy-attributed. Nacking is reserved for cases where the proxy itself is the proximate cause. + +**HTTPS responses are byte-transparent.** Once the `CONNECT` tunnel opens, TLS runs end-to-end between Outpost and the destination; the forward proxy can no longer read or modify response bytes. Outpost therefore does not inspect or sanitize HTTPS response payloads — they are recorded as the destination sent them. Proxy-originated HTTPS failures (auth, unreachable, can't connect upstream) all happen at `CONNECT` time and are handled before the tunnel exists. + +Response-body and response-header sanitization on the plain-HTTP forwarding path is best-effort and depends on the proxy implementation being recognized. For an arbitrary forward proxy, Outpost can rewrite error messages but cannot reliably strip proxy-identifying response content. Sanitization is currently complete only for Envoy — see [Envoy support](#envoy-support). + +## Envoy support + +When the proxy is [Envoy](https://www.envoyproxy.io/), Outpost picks up Envoy-specific signals automatically — no configuration toggle required. These are additive on top of the general behavior above. + +### Additional behaviors + +Envoy-specific handling fires on two surfaces: the `CONNECT` response (HTTPS, where the proxy's response is visible to Outpost before the tunnel opens), and proxied plain-HTTP responses (where the proxy is in the byte path on the way back). Responses that arrived through an established `CONNECT` tunnel are not inspected — see the byte-transparency note above. + +| Scenario | Signal | Attribution | Behavior | +|---|---|---|---| +| Envoy `CONNECT` failure with response-flag header | `x-envoy-response-flags` on the `CONNECT` response | Destination | Record attempt with code refined from the flag (e.g. `DF` → `dns_error`, `UT` → `timeout`) instead of the generic `connection_refused` | +| Envoy synthesizes 5xx response (plain-HTTP path) | `x-envoy-response-flags: UF` / `UC` / etc. on the response | Destination | Record attempt with code mapped from the flag; response body is dropped | +| Real upstream response passed through (plain-HTTP) | `x-envoy-response-flags: -` or empty | Destination | Record as today | +| Successful plain-HTTP response served via Envoy | `x-envoy-*` and `server: envoy` headers present | Destination | Record; headers stripped before storage | +| HTTPS response (any status, any headers, post-`CONNECT`) | — | Destination | Pass-through unchanged — bytes never touched the forward proxy | + +Two Envoy-specific behaviors are layered on top, both scoped to surfaces where the forward proxy actually contributed to the bytes: + +- **Response-flag mapping** — `x-envoy-response-flags`, when present and non-empty, refines the destination error code. Mapping aligns with Outpost's non-proxy error vocabulary (`ClassifyNetworkError`), so customers see the same codes whether or not a proxy is in path: + + | Envoy flag | Outpost code | Meaning | + |---|---|---| + | `UF`, `UH`, `LH` | `connection_refused` | TCP dial failed / no healthy upstream / failed health check | + | `UC`, `UR`, `LR` | `connection_reset` | Established connection dropped / remote or local reset | + | `UT`, `SI`, `DT`, `UMSDR` | `timeout` | Upstream / stream-idle / duration / max-stream timeout | + | `DF` | `dns_error` | DNS resolution failure (`dynamic_forward_proxy`) | + | `NR`, `NC` | `network_unreachable` | No route / no cluster | + | `UPE`, `DPE` | `protocol_error` | Upstream / downstream protocol error | + | (any other flag) | `network_error` | Unmapped — operator-visible signal to expand the table | + + Applies to `CONNECT` responses and plain-HTTP responses, not to bytes returned through an HTTPS tunnel. + +- **Operator diagnostics** — when a flag fires, the raw `x-envoy-response-flags` value and `x-envoy-response-code-details` (the `stage{reason}` string, e.g. `upstream_reset_before_response_started{connection_timeout}`) are written into a generic proxy-diagnostics map on the error as `envoy_flag` / `envoy_details`. These surface in the underlying error message and on the publish-attempt error payload, visible in `consumer handler error` logs — but never written to the customer-visible attempt `response_data`, which mirrors a normal network failure (no status, no body). The map is intentionally untyped so other proxies (Squid, HAProxy) can populate their own keys without colliding. To enable this, the Envoy ref config emits the details header alongside the flag header. + +- **Header and body sanitization** — `x-envoy-*` and `server: envoy` headers are stripped from plain-HTTP responses; Envoy-synthesized plain-HTTP response bodies are replaced with a normalized message that does not leak Envoy. HTTPS responses are not sanitized. + +Support for other forward proxies (Squid, HAProxy, nginx, ...) can be added the same way — by detecting proxy-specific response signals and mapping them to destination error codes. None are currently implemented. + +#### Limitation: plain-HTTP destinations that are themselves behind Envoy + +If a destination is reached over plain HTTP and the destination itself sits behind its own Envoy edge, the destination's `x-envoy-*` headers (e.g. `x-envoy-upstream-service-time`, `server: envoy`) pass through the forward proxy and Outpost strips them. The destination's `x-envoy-response-flags` is overwritten by the forward Envoy's value, so attribution is still correct — the customer never sees a destination failure misattributed to the proxy — but some destination-side observability headers are lost on this code path. + +This does not affect HTTPS destinations: HTTPS responses are byte-transparent (see above), so any `x-envoy-*` headers from the destination's Envoy reach Outpost untouched. + +### Required Envoy configuration + +For Outpost to reliably distinguish Envoy-synthesized responses from real upstream responses, Envoy must emit its response flags as a response header. The response-code-details header is optional but recommended — without it, Outpost still classifies via the flag, but operators lose the precise stage/reason in logs. Add to the route configuration (Envoy rejects these fields on the HTTP connection manager — they belong on `RouteConfiguration`): + +```yaml +route_config: + response_headers_to_add: + - header: + key: "x-envoy-response-flags" + value: "%RESPONSE_FLAGS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + - header: + key: "x-envoy-response-code-details" + value: "%RESPONSE_CODE_DETAILS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + virtual_hosts: + # ... +``` + +`OVERWRITE_IF_EXISTS_OR_ADD` is important — it prevents a misbehaving destination from spoofing either header to confuse Outpost's classification or pollute operator diagnostics. + +### Minimal reference Envoy + +A minimal forward-proxy Envoy listener with response-flag reporting enabled: + +```yaml +admin: + address: + socket_address: { address: 127.0.0.1, port_value: 9901 } + +static_resources: + listeners: + - name: forward_proxy + address: + socket_address: { address: 0.0.0.0, port_value: 10000 } + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: forward_proxy + codec_type: AUTO + http_filters: + - name: envoy.filters.http.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig + dns_cache_config: + name: dfp_cache + dns_lookup_family: V4_ONLY + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + upgrade_configs: + - upgrade_type: CONNECT + route_config: + response_headers_to_add: + - header: + key: "x-envoy-response-flags" + value: "%RESPONSE_FLAGS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + - header: + key: "x-envoy-response-code-details" + value: "%RESPONSE_CODE_DETAILS%" + append_action: OVERWRITE_IF_EXISTS_OR_ADD + virtual_hosts: + - name: proxy + domains: ["*"] + routes: + - match: { connect_matcher: {} } + route: + cluster: dfp_cluster + upgrade_configs: + - upgrade_type: CONNECT + connect_config: {} + - match: { prefix: "/" } + route: { cluster: dfp_cluster } + + clusters: + - name: dfp_cluster + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig + dns_cache_config: + name: dfp_cache + dns_lookup_family: V4_ONLY +``` + +The example above is a minimal listener. Whether you need proxy authentication and TLS depends on your network topology: if Outpost and the proxy share a private network, neither is strictly required; if the proxy is reachable over the public internet, both are strongly recommended to prevent the proxy being used as an open relay. + +## Queue retry behavior + +When a proxy infrastructure error is nacked, the underlying message queue redelivers the event. Because nacks only fire for true infra failures (proxy auth or proxy unreachable), the redelivery rate is bounded by the proxy outage duration, not by destination behavior. + +Outpost's default GCP Pub/Sub provisioning (`internal/mqinfra/gcppubsub.go`) uses `retryPolicy: {minimumBackoff: 10s, maximumBackoff: 120s}` with `maxDeliveryAttempts: 6` (default `RetryLimit` + 1). That gives roughly **5 minutes of redelivery runway** before a nacked message lands in the dead-letter topic. A proxy outage shorter than this window is transparent to the destination; longer outages require manual replay from the DLQ. + +If you expect longer proxy outages, raise `MinRetryBackoff` / `MaxRetryBackoff` (config) and `RetryLimit` (policy) so the redelivery window covers your worst-case outage duration. RabbitMQ / SQS / Kafka have equivalent knobs on their delivery queues. diff --git a/docs/content/nav.json b/docs/content/nav.json index aaa5da67..b653a7f6 100644 --- a/docs/content/nav.json +++ b/docs/content/nav.json @@ -48,7 +48,8 @@ }, { "slug": "features/tenant-user-portal", "title": "Tenant Portal" }, { "slug": "features/metrics", "title": "Metrics" }, - { "slug": "features/opentelemetry", "title": "OpenTelemetry" } + { "slug": "features/opentelemetry", "title": "OpenTelemetry" }, + { "slug": "features/webhook-proxy", "title": "Webhook Forward Proxy" } ] ] }, diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go index b734e241..1bbc6401 100644 --- a/internal/consumer/consumer.go +++ b/internal/consumer/consumer.go @@ -12,10 +12,30 @@ import ( "go.uber.org/zap" ) +// Error retry schedule (with 200ms initial backoff): +// +// Error Backoff Cumulative +// 1 200ms 0.2s +// 2 400ms 0.6s +// 3 800ms 1.4s ← 3 retries within ~1.5s +// 4 1.6s 3.0s +// 5 3.2s 6.2s +// 6 6.4s 12.6s +// 7 12.8s 25.4s +// 8 15s (cap) 40.4s +// 9 15s (cap) 55.4s +// 10 15s (cap) 70.4s ← worker dies (~1 min total) +// +// Backoff formula: initialBackoff * 2^(attempt-1), capped at maxBackoff. +// After maxConsecutiveErrors the worker dies permanently (supervisor does +// not restart it), so these values must tolerate transient infra outages +// (e.g. brief MQ broker restarts, GCP OAuth/DNS blips) without killing the +// worker. ~1 min is sufficient for managed broker recovery from routine +// restarts or short network blips. const ( - defaultMaxConsecutiveErrors = 5 + defaultMaxConsecutiveErrors = 10 defaultInitialBackoff = 200 * time.Millisecond - defaultMaxBackoff = 5 * time.Second + defaultMaxBackoff = 15 * time.Second ) type Consumer interface { diff --git a/internal/destregistry/baseprovider.go b/internal/destregistry/baseprovider.go index 49d63343..52927f89 100644 --- a/internal/destregistry/baseprovider.go +++ b/internal/destregistry/baseprovider.go @@ -3,12 +3,9 @@ package destregistry import ( "context" "fmt" - "net/http" - "net/url" "regexp" "strconv" "strings" - "time" "github.com/hookdeck/outpost/internal/destregistry/metadata" "github.com/hookdeck/outpost/internal/models" @@ -199,55 +196,3 @@ func validateField(field metadata.FieldSchema, value string, path string) *Valid func (p *BaseProvider) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *PreprocessDestinationOpts) error { return nil } - -type HTTPClientConfig struct { - Timeout *time.Duration - UserAgent *string - ProxyURL *string -} - -func (p *BaseProvider) MakeHTTPClient(config HTTPClientConfig) (*http.Client, error) { - client := &http.Client{} - - if config.Timeout != nil { - client.Timeout = *config.Timeout - } - - // Configure transport with proxy and/or user agent if needed - if config.ProxyURL != nil || config.UserAgent != nil { - // Start with default transport settings - transport := http.DefaultTransport.(*http.Transport).Clone() - - // Configure proxy if provided - if config.ProxyURL != nil && *config.ProxyURL != "" { - proxyURLParsed, err := url.Parse(*config.ProxyURL) - if err != nil { - return nil, fmt.Errorf("invalid proxy URL: %w", err) - } - transport.Proxy = http.ProxyURL(proxyURLParsed) - } - - // Wrap transport with user agent if needed - if config.UserAgent != nil { - client.Transport = &userAgentTransport{ - userAgent: *config.UserAgent, - transport: transport, - } - } else { - client.Transport = transport - } - } - - return client, nil -} - -// userAgentTransport wraps an http.RoundTripper to inject a User-Agent header -type userAgentTransport struct { - userAgent string - transport http.RoundTripper -} - -func (t *userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) { - req.Header.Set("User-Agent", t.userAgent) - return t.transport.RoundTrip(req) -} diff --git a/internal/destregistry/baseprovider_test.go b/internal/destregistry/baseprovider_test.go index 58c20953..a335d434 100644 --- a/internal/destregistry/baseprovider_test.go +++ b/internal/destregistry/baseprovider_test.go @@ -14,9 +14,6 @@ import ( func TestMakeHTTPClient_UserAgent(t *testing.T) { t.Parallel() - provider, err := newMockProvider() - require.NoError(t, err) - t.Run("sets user agent on requests", func(t *testing.T) { t.Parallel() @@ -30,7 +27,7 @@ func TestMakeHTTPClient_UserAgent(t *testing.T) { // Create client with user agent userAgent := "TestAgent/1.0" - client, err := provider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ UserAgent: &userAgent, }) require.NoError(t, err) @@ -49,7 +46,7 @@ func TestMakeHTTPClient_UserAgent(t *testing.T) { t.Parallel() emptyUserAgent := "" - client, err := provider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ UserAgent: &emptyUserAgent, }) require.NoError(t, err) @@ -63,9 +60,6 @@ func TestMakeHTTPClient_UserAgent(t *testing.T) { func TestMakeHTTPClient_Proxy(t *testing.T) { t.Parallel() - provider, err := newMockProvider() - require.NoError(t, err) - t.Run("routes requests through proxy", func(t *testing.T) { t.Parallel() @@ -90,7 +84,7 @@ func TestMakeHTTPClient_Proxy(t *testing.T) { defer targetServer.Close() // Create client with proxy configured - client, err := provider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ ProxyURL: &proxyServer.URL, }) require.NoError(t, err) @@ -109,7 +103,7 @@ func TestMakeHTTPClient_Proxy(t *testing.T) { t.Parallel() invalidProxy := "://invalid-url" - _, err := provider.MakeHTTPClient(destregistry.HTTPClientConfig{ + _, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ ProxyURL: &invalidProxy, }) diff --git a/internal/destregistry/httpclient.go b/internal/destregistry/httpclient.go new file mode 100644 index 00000000..a1bfc580 --- /dev/null +++ b/internal/destregistry/httpclient.go @@ -0,0 +1,71 @@ +package destregistry + +import ( + "fmt" + "net/http" + "net/url" + "time" +) + +type HTTPClientConfig struct { + Timeout *time.Duration + UserAgent *string + ProxyURL *string + // WrapTransport, if set, is invoked after a proxy URL has been installed + // on the *http.Transport. Callers can use it to attach proxy-specific + // concerns (e.g. OnProxyConnectResponse callbacks, response classifiers) + // without bleeding those concerns into destregistry itself. Receives the + // underlying transport plus the parsed proxy URL; returns the + // RoundTripper to use thereafter. + WrapTransport func(*http.Transport, *url.URL) http.RoundTripper +} + +// NewHTTPClient builds an *http.Client from config. Free function — no +// provider state is involved. +func NewHTTPClient(config HTTPClientConfig) (*http.Client, error) { + client := &http.Client{} + + if config.Timeout != nil { + client.Timeout = *config.Timeout + } + + if config.ProxyURL == nil && config.UserAgent == nil { + return client, nil + } + + transport := http.DefaultTransport.(*http.Transport).Clone() + + var rt http.RoundTripper = transport + + if config.ProxyURL != nil && *config.ProxyURL != "" { + proxyURLParsed, err := url.Parse(*config.ProxyURL) + if err != nil { + return nil, fmt.Errorf("invalid proxy URL: %w", err) + } + transport.Proxy = http.ProxyURL(proxyURLParsed) + if config.WrapTransport != nil { + rt = config.WrapTransport(transport, proxyURLParsed) + } + } + + if config.UserAgent != nil { + rt = &userAgentTransport{ + userAgent: *config.UserAgent, + transport: rt, + } + } + + client.Transport = rt + return client, nil +} + +// userAgentTransport wraps an http.RoundTripper to inject a User-Agent header +type userAgentTransport struct { + userAgent string + transport http.RoundTripper +} + +func (t *userAgentTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Set("User-Agent", t.userAgent) + return t.transport.RoundTrip(req) +} diff --git a/internal/destregistry/providers/desthookdeck/desthookdeck.go b/internal/destregistry/providers/desthookdeck/desthookdeck.go index 9780b191..af9b01d1 100644 --- a/internal/destregistry/providers/desthookdeck/desthookdeck.go +++ b/internal/destregistry/providers/desthookdeck/desthookdeck.go @@ -150,7 +150,7 @@ func (p *HookdeckProvider) CreatePublisher(ctx context.Context, destination *mod client = p.httpClient } else { var err error - client, err = p.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{ + client, err = destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ UserAgent: &p.userAgent, }) if err != nil { diff --git a/internal/destregistry/providers/destwebhook/destwebhook.go b/internal/destregistry/providers/destwebhook/destwebhook.go index 42e9ad21..21ebef45 100644 --- a/internal/destregistry/providers/destwebhook/destwebhook.go +++ b/internal/destregistry/providers/destwebhook/destwebhook.go @@ -342,9 +342,10 @@ func (d *WebhookDestination) CreatePublisher(ctx context.Context, destination *m proxyURL = &d.proxyURL } - httpClient, err := d.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{ - UserAgent: &d.userAgent, - ProxyURL: proxyURL, + httpClient, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ + UserAgent: &d.userAgent, + ProxyURL: proxyURL, + WrapTransport: WrapTransport, }) if err != nil { return nil, err diff --git a/internal/destregistry/providers/destwebhook/httphelper.go b/internal/destregistry/providers/destwebhook/httphelper.go index 25f9c996..e1978c93 100644 --- a/internal/destregistry/providers/destwebhook/httphelper.go +++ b/internal/destregistry/providers/destwebhook/httphelper.go @@ -2,6 +2,7 @@ package destwebhook import ( "context" + "errors" "fmt" "io" "net/http" @@ -21,20 +22,63 @@ type HTTPRequestResult struct { } // ExecuteHTTPRequest executes an HTTP request and classifies the result. -// All errors return a Delivery object with a classified error code. +// +// Most errors return a Delivery object with a classified error code so the +// caller can record a failed attempt. +// +// Proxy *infrastructure* errors (ErrProxyInfra) return Delivery: nil so the +// caller signals the queue to nack the message instead of recording a +// customer-visible attempt. See registry.go for the nil-attempt handling. +// // See: https://github.com/hookdeck/outpost/issues/571 func ExecuteHTTPRequest(ctx context.Context, client *http.Client, req *http.Request, provider string) *HTTPRequestResult { resp, err := client.Do(req) if err != nil { + // Proxy infrastructure error: nack via nil Delivery so the customer's + // retry budget is not charged for our infra outage. + var infraErr *ErrProxyInfra + if errors.As(err, &infraErr) { + return &HTTPRequestResult{ + Delivery: nil, + Error: destregistry.NewErrDestinationPublishAttempt(err, provider, map[string]interface{}{ + "error": "proxy_infrastructure", + "message": infraErr.Error(), + }), + Response: nil, + } + } + + // Proxy-attributed destination error: use the explicit Code instead of + // substring-matching the underlying error. + var code string + var destErr *ErrProxyDestination + if errors.As(err, &destErr) { + code = destErr.Code + } else { + code = ClassifyNetworkError(err) + } + + data := map[string]interface{}{ + "error": "request_failed", + "message": err.Error(), + } + // Attach raw proxy diagnostics (e.g. Envoy flag + response-code-details) + // to the publish-attempt error so operators can grep them in logs. + // Keys are owned by whichever proxy populated them. Not placed in the + // delivery's ResponseData — customer-visible attempt stays free of + // proxy details. + if destErr != nil { + for k, v := range destErr.Diagnostics { + data[k] = v + } + } + return &HTTPRequestResult{ Delivery: &destregistry.Delivery{ Status: "failed", - Code: ClassifyNetworkError(err), + Code: code, }, - Error: destregistry.NewErrDestinationPublishAttempt(err, provider, map[string]interface{}{ - "error": "request_failed", - "message": err.Error(), - }), + Error: destregistry.NewErrDestinationPublishAttempt(err, provider, data), Response: nil, } } diff --git a/internal/destregistry/providers/destwebhook/proxytransport.go b/internal/destregistry/providers/destwebhook/proxytransport.go new file mode 100644 index 00000000..a956a017 --- /dev/null +++ b/internal/destregistry/providers/destwebhook/proxytransport.go @@ -0,0 +1,391 @@ +package destwebhook + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "net/url" + "sort" + "strings" +) + +// WrapTransport is the destregistry.HTTPClientConfig.WrapTransport hook for +// webhook destinations. It installs OnProxyConnectResponse on the underlying +// transport and wraps it in proxyTransport so request- and response-time +// proxy failures get translated into ErrProxyInfra / ErrProxyDestination. +func WrapTransport(transport *http.Transport, proxyURL *url.URL) http.RoundTripper { + transport.OnProxyConnectResponse = onProxyConnectResponse + return newProxyTransport(transport, proxyURL) +} + +// ErrProxyInfra signals that a webhook delivery failed at the proxy layer +// (proxy auth misconfiguration, proxy unreachable, etc.). The delivery result +// is nacked so the underlying message queue redelivers without recording a +// customer-visible failed attempt. +type ErrProxyInfra struct { + Underlying error + DestHost string +} + +func (e *ErrProxyInfra) Error() string { + if e.DestHost == "" { + return "proxy infrastructure error" + } + return fmt.Sprintf("proxy infrastructure error reaching %s", e.DestHost) +} + +func (e *ErrProxyInfra) Unwrap() error { + return e.Underlying +} + +// ErrProxyDestination signals that the proxy reported a failure originating at +// the destination (e.g. upstream DNS lookup failed, upstream connection +// refused, upstream timeout). The delivery result is recorded as a normal +// failed attempt using Code as the classification, with response data +// rewritten so the customer sees a destination-attributed failure rather than +// proxy-attributed details. +// +// Diagnostics is a free-form key/value map of proxy-specific signals the +// classification path picked up (e.g. for Envoy, "envoy_flag" and +// "envoy_details"). It is operator-side metadata only: surfaced in error +// logs and on the publish-attempt error payload, never written to the +// customer-visible attempt record. Whichever proxy emitted the data owns +// the key naming so heterogeneous proxies can coexist without colliding. +type ErrProxyDestination struct { + Underlying error + Code string + DestHost string + Diagnostics map[string]string +} + +func (e *ErrProxyDestination) Error() string { + msg := e.Code + if e.DestHost != "" { + msg = fmt.Sprintf("%s connecting to %s", e.Code, e.DestHost) + } + // Append proxy diagnostics so they surface in zap.Error(err) logs at the + // consumer boundary. Customer-visible attempt code is still just e.Code. + if len(e.Diagnostics) == 0 { + return msg + } + keys := make([]string, 0, len(e.Diagnostics)) + for k := range e.Diagnostics { + keys = append(keys, k) + } + sort.Strings(keys) + parts := make([]string, 0, len(keys)) + for _, k := range keys { + parts = append(parts, fmt.Sprintf("%s=%s", k, e.Diagnostics[k])) + } + return fmt.Sprintf("%s (%s)", msg, strings.Join(parts, ", ")) +} + +func (e *ErrProxyDestination) Unwrap() error { + return e.Underlying +} + +// IsProxyInfraError reports whether err is or wraps an ErrProxyInfra. +func IsProxyInfraError(err error) bool { + var pe *ErrProxyInfra + return errors.As(err, &pe) +} + +// MapEnvoyResponseFlag returns the destination error code corresponding to an +// Envoy response flag. The output vocabulary matches ClassifyNetworkError +// (httphelper.go) so customers see the same codes whether or not a proxy is +// in path. Unhandled flags fall through to "network_error" — operators should +// watch for that code paired with a non-empty flag in the attempt error +// payload as a signal that the mapping needs expansion. +// +// Envoy response flag reference: +// https://www.envoyproxy.io/docs/envoy/latest/configuration/observability/access_log/usage#config-access-log-format-response-flags +func MapEnvoyResponseFlag(flag string) string { + switch flag { + case "UF", "UH", "LH": + // UF: upstream connection failure (TCP dial failed) + // UH: no healthy upstream + // LH: failed local health check + return "connection_refused" + case "UC", "UR", "LR": + // UC: upstream connection termination (established then dropped) + // UR: upstream remote reset + // LR: local reset + return "connection_reset" + case "UT", "SI", "DT", "UMSDR": + // UT: upstream request timeout + // SI: stream idle timeout + // DT: downstream global duration timeout + // UMSDR: upstream max stream duration reached + return "timeout" + case "DF": + // DF: DNS resolution failure (emitted by dynamic_forward_proxy when + // the upstream host cannot be resolved). Verified empirically against + // the reference Envoy config in build/dev/envoy/envoy.yaml. + return "dns_error" + case "NR", "NC": + // NR: no route configured + // NC: upstream cluster not found + return "network_unreachable" + case "UPE", "DPE": + // UPE: upstream protocol error + // DPE: downstream protocol error + return "protocol_error" + default: + return "network_error" + } +} + +// proxyTransport wraps an http.RoundTripper to translate proxy-originated +// failures into ErrProxyInfra / ErrProxyDestination so the delivery pipeline +// can attribute them correctly. +// +// There are two distinct surfaces where the forward proxy can affect the +// transaction, and the wrapper handles them in two different places: +// +// 1. CONNECT-time (HTTPS path) — Go's transport sends a CONNECT request to +// the proxy. Whatever the proxy responds with is visible to Outpost via +// onProxyConnectResponse, installed on the underlying http.Transport. +// This is where proxy auth (407/401/403), proxy-can't-reach-upstream +// (5xx with Envoy response flags), and similar are classified. +// +// Once CONNECT succeeds, a raw TCP tunnel is open. TLS runs end-to-end +// between Outpost and the destination; the forward proxy is byte-blind +// to everything that follows. The response we read from base.RoundTrip +// therefore came entirely from the destination side and is not inspected +// or sanitized by this wrapper — see the scheme check in RoundTrip. +// +// 2. Plain-HTTP forwarding path — the proxy reads the request, makes the +// upstream call, and writes the response back to Outpost. The proxy is +// in the byte path on the return, so we can detect Envoy-synthesized +// responses (via x-envoy-response-flags) and strip x-envoy-* / server +// fingerprints. +// +// Dial-to-proxy failures (TCP to the proxy itself fails) are detected here +// from the wrapped *net.OpError whose Op == "proxyconnect" (still emitted by +// Go's stdlib; see proxytransport_pin_test.go for the snapshot guarding the +// wording). +type proxyTransport struct { + base http.RoundTripper + proxyURL *url.URL +} + +func newProxyTransport(base http.RoundTripper, proxyURL *url.URL) *proxyTransport { + return &proxyTransport{base: base, proxyURL: proxyURL} +} + +func (t *proxyTransport) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := t.base.RoundTrip(req) + if err != nil { + if mapped := t.classifyTransportError(err, req); mapped != nil { + return nil, mapped + } + return nil, err + } + + // HTTPS response: bytes came through an established CONNECT tunnel, end- + // to-end TLS between Outpost and the destination. The forward proxy + // physically cannot have read, modified, or synthesized any of these + // bytes. Any x-envoy-* / server: envoy headers we see belong to the + // destination (often itself behind Envoy at its edge), not to us. + // Treating them as ours would destroy destination observability and + // misattribute genuine destination failures as proxy-reported. Proxy- + // originated HTTPS failures all happen at CONNECT time and are handled + // in onProxyConnectResponse before we ever reach this branch. + if req.URL.Scheme == "https" { + return resp, nil + } + + // Plain-HTTP forwarding path: the forward proxy was in the byte path on + // the return and may have synthesized this response itself. + + // Envoy-synthesized response: x-envoy-response-flags carries a non-empty, + // non-placeholder value (the placeholder "-" means "no flag fired", + // i.e. a real upstream response was passed through). Convert to a + // destination-attributed error so the synthesized body never reaches the + // delivery record. + if flag := envoyResponseFlag(resp.Header); flag != "" { + details := envoyResponseDetails(resp.Header) + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + return nil, &ErrProxyDestination{ + Underlying: synthesizedErr(flag, details, resp.StatusCode), + Code: MapEnvoyResponseFlag(flag), + DestHost: destHostFromRequest(req), + Diagnostics: envoyDiagnostics(flag, details), + } + } + + // Real upstream response forwarded by the proxy: strip proxy fingerprints + // so they aren't persisted in the delivery record. Note: if the + // destination itself sits behind its own Envoy, this can over-strip the + // destination's x-envoy-* observability headers on the plain-HTTP path. + // HTTPS destinations are unaffected because of the byte-transparency + // branch above. + sanitizeEnvoyHeaders(resp.Header) + return resp, nil +} + +func destHostFromRequest(req *http.Request) string { + if req == nil || req.URL == nil { + return "" + } + host := req.URL.Host + if h, _, err := net.SplitHostPort(host); err == nil { + host = h + } + return host +} + +// sanitizeEnvoyHeaders removes Envoy-fingerprinting headers from a response so +// the destination's identity in the delivery record is not contaminated with +// proxy details. Safe no-op when no envoy headers are present. +func sanitizeEnvoyHeaders(h http.Header) { + for k := range h { + // http.Header keys are canonicalized, so the prefix is fixed-case. + if strings.HasPrefix(k, "X-Envoy-") { + h.Del(k) + } + } + if strings.EqualFold(h.Get("Server"), "envoy") { + h.Del("Server") + } +} + +func (t *proxyTransport) classifyTransportError(err error, req *http.Request) error { + // CONNECT-time errors arrive pre-typed via onProxyConnectResponse — pass + // through unchanged. + var infraErr *ErrProxyInfra + if errors.As(err, &infraErr) { + return nil + } + var destErr *ErrProxyDestination + if errors.As(err, &destErr) { + return nil + } + + destHost := "" + if req != nil && req.URL != nil { + destHost = req.URL.Host + } + + // Dial-to-proxy failure: Go wraps these in &net.OpError{Op: "proxyconnect"} + // even in current versions. The wrap is the most reliable signal that the + // proxy itself is unreachable (vs. an arbitrary network error en route to + // the destination, which would be a destination problem). The wording is + // pinned by TestProxyTransport_PinProxyconnectWording in + // proxytransport_pin_test.go — update both together if it ever changes. + if strings.Contains(err.Error(), "proxyconnect") { + return &ErrProxyInfra{Underlying: err, DestHost: destHost} + } + + return nil +} + +// onProxyConnectResponse is installed on the underlying http.Transport and +// fires for every CONNECT response from the proxy. Non-200 responses are +// translated into the appropriate sentinel here, where the full response +// (status code + headers, including x-envoy-response-flags) is still +// available. +func onProxyConnectResponse(ctx context.Context, proxyURL *url.URL, connectReq *http.Request, resp *http.Response) error { + if resp.StatusCode == http.StatusOK { + return nil + } + + // CONNECT requests set the target as Request.Host (and URL.Opaque), not + // URL.Host. See net/http transport.go: connectReq is built with + // URL: &url.URL{Opaque: targetAddr}, Host: targetAddr. + destHost := "" + if connectReq != nil { + switch { + case connectReq.Host != "": + destHost = connectReq.Host + case connectReq.URL != nil: + destHost = connectReq.URL.Host + } + } + if h, _, err := net.SplitHostPort(destHost); err == nil { + destHost = h + } + + switch resp.StatusCode { + case http.StatusProxyAuthRequired, + http.StatusUnauthorized, + http.StatusForbidden: + // Auth-related failures are operator misconfiguration of proxy + // credentials — proxy infrastructure problem, not destination. + return &ErrProxyInfra{ + Underlying: fmt.Errorf("proxy returned %s", resp.Status), + DestHost: destHost, + } + } + + // Other non-200 statuses indicate the proxy could not establish the tunnel + // to the destination. Attribute to destination; refine the code from the + // Envoy response flag when present. + code := "connection_refused" + flag := envoyResponseFlag(resp.Header) + details := envoyResponseDetails(resp.Header) + if flag != "" { + code = MapEnvoyResponseFlag(flag) + } + + return &ErrProxyDestination{ + Underlying: fmt.Errorf("proxy returned %s", resp.Status), + Code: code, + DestHost: destHost, + Diagnostics: envoyDiagnostics(flag, details), + } +} + +// envoyResponseFlag returns the meaningful value of the x-envoy-response-flags +// header, or "" if the header is absent / placeholder "-" / empty. +func envoyResponseFlag(h http.Header) string { + v := strings.TrimSpace(h.Get("x-envoy-response-flags")) + if v == "" || v == "-" { + return "" + } + return v +} + +// envoyResponseDetails returns the meaningful value of the +// x-envoy-response-code-details header (stage{reason} form when both are +// present, stage-only otherwise), or "" if the header is absent / empty / +// placeholder "-". Captured for operator-side diagnostics; never inspected +// for classification. +func envoyResponseDetails(h http.Header) string { + v := strings.TrimSpace(h.Get("x-envoy-response-code-details")) + if v == "" || v == "-" { + return "" + } + return v +} + +// envoyDiagnostics returns the diagnostics map for an Envoy-attributed +// destination error. Returns nil when both inputs are empty so the caller +// gets a properly-zero Diagnostics field (len-0 nil map). +func envoyDiagnostics(flag, details string) map[string]string { + if flag == "" && details == "" { + return nil + } + d := make(map[string]string, 2) + if flag != "" { + d["envoy_flag"] = flag + } + if details != "" { + d["envoy_details"] = details + } + return d +} + +// synthesizedErr builds the underlying error wrapped inside ErrProxyDestination +// for an Envoy-synthesized plain-HTTP response. Details are included when +// present so the operator-side log line carries the Envoy reason string. +func synthesizedErr(flag, details string, status int) error { + if details != "" { + return fmt.Errorf("proxy synthesized response (flag %s, details %s, status %d)", flag, details, status) + } + return fmt.Errorf("proxy synthesized response (flag %s, status %d)", flag, status) +} diff --git a/internal/destregistry/providers/destwebhook/proxytransport_internal_test.go b/internal/destregistry/providers/destwebhook/proxytransport_internal_test.go new file mode 100644 index 00000000..fba20765 --- /dev/null +++ b/internal/destregistry/providers/destwebhook/proxytransport_internal_test.go @@ -0,0 +1,102 @@ +package destwebhook + +import ( + "io" + "net/http" + "net/url" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { return f(r) } + +// TestProxyTransport_HTTPSResponse_PassesEnvoyHeadersThrough is the core +// regression test for destinations that themselves sit behind Envoy. After +// CONNECT succeeds the forward proxy is byte-blind to the response, so any +// envoy headers we see belong to the destination. The wrapper must not +// touch them. +// +// Constructed via stubbed RoundTripper rather than a real TLS-capable proxy +// because the only contract under test is "do not modify the response when +// scheme is https". +func TestProxyTransport_HTTPSResponse_PassesEnvoyHeadersThrough(t *testing.T) { + t.Parallel() + + stub := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set("server", "envoy") + h.Set("x-envoy-response-flags", "UF") + h.Set("x-envoy-upstream-service-time", "42") + return &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: h, + Body: io.NopCloser(strings.NewReader("destination's real 503 body")), + Request: req, + }, nil + }) + proxyURL, _ := url.Parse("http://example.proxy:8080") + wrapper := newProxyTransport(stub, proxyURL) + + req, err := http.NewRequest(http.MethodPost, "https://api.dest.example/hook", nil) + require.NoError(t, err) + + resp, err := wrapper.RoundTrip(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + + body, _ := io.ReadAll(resp.Body) + assert.Equal(t, "destination's real 503 body", string(body), + "destination's response body must reach Outpost unchanged") + + assert.Equal(t, "envoy", resp.Header.Get("server"), + "destination's server header must not be stripped") + assert.Equal(t, "UF", resp.Header.Get("x-envoy-response-flags"), + "destination's response-flag header must survive (it's their observability data)") + assert.Equal(t, "42", resp.Header.Get("x-envoy-upstream-service-time"), + "destination's upstream-service-time must survive") +} + +// TestProxyTransport_PlainHTTPResponse_StillStripsEnvoyHeaders asserts the +// counterpart: on the plain-HTTP forwarding path our forward proxy *is* in +// the byte path on the response, so envoy-fingerprint stripping still +// applies. This documents the residual limitation noted in +// webhook-proxy.mdoc: a plain-HTTP destination that is itself behind Envoy +// will have some observability headers stripped here. Attribution stays +// correct because x-envoy-response-flags is overwritten by the forward +// Envoy via OVERWRITE_IF_EXISTS_OR_ADD. +func TestProxyTransport_PlainHTTPResponse_StillStripsEnvoyHeaders(t *testing.T) { + t.Parallel() + + stub := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + h := http.Header{} + h.Set("server", "envoy") + h.Set("x-envoy-response-flags", "-") // pass-through placeholder (set by our forward Envoy) + h.Set("x-envoy-upstream-service-time", "42") + return &http.Response{ + StatusCode: http.StatusOK, + Header: h, + Body: io.NopCloser(strings.NewReader("ok")), + Request: req, + }, nil + }) + proxyURL, _ := url.Parse("http://example.proxy:8080") + wrapper := newProxyTransport(stub, proxyURL) + + req, err := http.NewRequest(http.MethodPost, "http://api.dest.example/hook", nil) + require.NoError(t, err) + + resp, err := wrapper.RoundTrip(req) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Empty(t, resp.Header.Get("server")) + assert.Empty(t, resp.Header.Get("x-envoy-response-flags")) + assert.Empty(t, resp.Header.Get("x-envoy-upstream-service-time")) +} diff --git a/internal/destregistry/providers/destwebhook/proxytransport_pin_test.go b/internal/destregistry/providers/destwebhook/proxytransport_pin_test.go new file mode 100644 index 00000000..588d9a43 --- /dev/null +++ b/internal/destregistry/providers/destwebhook/proxytransport_pin_test.go @@ -0,0 +1,62 @@ +package destwebhook_test + +import ( + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" + "github.com/stretchr/testify/require" +) + +// TestProxyTransport_PinProxyconnectWording guards a load-bearing assumption +// in proxytransport.go: Go's net/http transport wraps dial-to-proxy failures +// in a *net.OpError whose Op field is "proxyconnect", and that token appears +// in the resulting error's Error() string. This is not a public/typed API — +// it's an internal stdlib convention that has been stable for many Go +// versions but could change. +// +// If this test fails after a Go upgrade, the proxyTransport.classifyTransportError +// detector for proxy-unreachable failures needs to be updated to whatever the +// new wording is. Without this pin, that breakage would be silent and result +// in proxy outages no longer being attributed to infra (instead they would +// fall through and be classified as ordinary network errors against the +// destination). +// +// Pinned for go version recorded in go.mod. +func TestProxyTransport_PinProxyconnectWording(t *testing.T) { + t.Parallel() + + // Bind a port, then close — guaranteed unbound for the rest of the test. + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + unreachable := srv.URL + srv.Close() + + // Bypass the wrapper: build a raw transport so we observe the stdlib + // error directly. + rawTransport, ok := http.DefaultTransport.(*http.Transport) + require.True(t, ok) + rt := rawTransport.Clone() + proxyURL, err := url.Parse(unreachable) + require.NoError(t, err) + rt.Proxy = http.ProxyURL(proxyURL) + client := &http.Client{Transport: rt} + + _, err = client.Get("https://example.invalid/") + require.Error(t, err) + require.True(t, + strings.Contains(err.Error(), "proxyconnect"), + "stdlib changed CONNECT-failure wording; update proxyTransport detector. err = %q", err.Error(), + ) + + // Also pin: the wrapper still classifies this as ErrProxyInfra. + wrappedClient := makeProxiedClient(t, unreachable) + _, err = wrappedClient.Get("https://example.invalid/") + require.Error(t, err) + require.True(t, destwebhook.IsProxyInfraError(err), + "wrapper failed to identify proxy-unreachable as infra; err = %v", err) +} diff --git a/internal/destregistry/providers/destwebhook/proxytransport_test.go b/internal/destregistry/providers/destwebhook/proxytransport_test.go new file mode 100644 index 00000000..0106a973 --- /dev/null +++ b/internal/destregistry/providers/destwebhook/proxytransport_test.go @@ -0,0 +1,378 @@ +package destwebhook_test + +import ( + "errors" + "io" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newCONNECTRejectingProxy returns an httptest server that responds to any +// CONNECT request with the given status code. Non-CONNECT requests get 200. +func newCONNECTRejectingProxy(t *testing.T, status int) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodConnect { + w.WriteHeader(status) + return + } + w.WriteHeader(http.StatusOK) + })) +} + +// makeProxiedClient constructs an HTTP client routed through proxyURL using +// the proxyTransport wrapper, matching the NewHTTPClient flow. +func makeProxiedClient(t *testing.T, proxyURL string) *http.Client { + t.Helper() + client, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ + ProxyURL: &proxyURL, + WrapTransport: destwebhook.WrapTransport, + }) + require.NoError(t, err) + return client +} + +func TestProxyTransport_ConnectAuthFailure_ReturnsInfraError(t *testing.T) { + t.Parallel() + + proxy := newCONNECTRejectingProxy(t, http.StatusProxyAuthRequired) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + + // HTTPS target triggers CONNECT. + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var infraErr *destwebhook.ErrProxyInfra + require.True(t, errors.As(err, &infraErr), + "expected ErrProxyInfra for proxy 407, got: %v", err) + assert.Equal(t, "example.invalid", infraErr.DestHost) +} + +func TestProxyTransport_ConnectBadGateway_ReturnsDestinationError(t *testing.T) { + t.Parallel() + + proxy := newCONNECTRejectingProxy(t, http.StatusBadGateway) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr), + "expected ErrProxyDestination for proxy 502, got: %v", err) + assert.Equal(t, "connection_refused", destErr.Code) + assert.Equal(t, "example.invalid", destErr.DestHost) + + // Must not be ErrProxyInfra (would cause incorrect nack). + var infraErr *destwebhook.ErrProxyInfra + assert.False(t, errors.As(err, &infraErr), + "5xx from proxy must not be infra error") +} + +func TestProxyTransport_ConnectServiceUnavailable_ReturnsDestinationError(t *testing.T) { + t.Parallel() + + proxy := newCONNECTRejectingProxy(t, http.StatusServiceUnavailable) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr), + "expected ErrProxyDestination for proxy 503, got: %v", err) + assert.Equal(t, "connection_refused", destErr.Code) +} + +func TestProxyTransport_ProxyUnreachable_ReturnsInfraError(t *testing.T) { + t.Parallel() + + // Bind a port, then close — the address is guaranteed reserved-but-unbound + // for the rest of the test (avoids hard-coded "unlikely" ports). + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + unreachableURL := srv.URL + srv.Close() + + client := makeProxiedClient(t, unreachableURL) + + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var infraErr *destwebhook.ErrProxyInfra + require.True(t, errors.As(err, &infraErr), + "expected ErrProxyInfra when proxy is unreachable, got: %v", err) +} + +func TestProxyTransport_SuccessfulRequest_PassesThrough(t *testing.T) { + t.Parallel() + + // Plain-HTTP target so we exercise the proxy's forward path (no CONNECT). + target := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("X-Test", "real-upstream") + w.WriteHeader(http.StatusOK) + })) + defer target.Close() + + // Minimal forward proxy: the client sends absolute-URI requests, so r.URL + // is the actual target. We just re-dispatch. + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + outReq, err := http.NewRequestWithContext(r.Context(), r.Method, r.URL.String(), r.Body) + if err != nil { + w.WriteHeader(http.StatusBadGateway) + return + } + for k, vs := range r.Header { + for _, v := range vs { + outReq.Header.Add(k, v) + } + } + resp, err := http.DefaultClient.Do(outReq) + if err != nil { + w.WriteHeader(http.StatusBadGateway) + return + } + defer resp.Body.Close() + for k, vs := range resp.Header { + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + })) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + + resp, err := client.Get(target.URL + "/somepath") + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "real-upstream", resp.Header.Get("X-Test")) +} + +func TestProxyTransport_ErrProxyInfra_DoesNotLeakProxyDetails(t *testing.T) { + t.Parallel() + + proxy := newCONNECTRejectingProxy(t, http.StatusProxyAuthRequired) + defer proxy.Close() + + proxyURL, _ := url.Parse(proxy.URL) + + client := makeProxiedClient(t, proxy.URL) + + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var infraErr *destwebhook.ErrProxyInfra + require.True(t, errors.As(err, &infraErr)) + + // Sanitized message must not contain the proxy host/port. + assert.NotContains(t, infraErr.Error(), proxyURL.Host, + "ErrProxyInfra.Error() must not leak proxy address") + assert.Contains(t, infraErr.Error(), "example.invalid", + "ErrProxyInfra.Error() should reference the destination host") +} + +// newEnvoySynthesizedProxy returns a forwarding proxy that, instead of +// forwarding, responds with an Envoy-synthesized 5xx and the configured +// response flag (plus optional response-code-details). +func newEnvoySynthesizedProxy(t *testing.T, status int, flag, details string) *httptest.Server { + t.Helper() + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("server", "envoy") + w.Header().Set("x-envoy-response-flags", flag) + if details != "" { + w.Header().Set("x-envoy-response-code-details", details) + } + w.WriteHeader(status) + _, _ = w.Write([]byte("upstream connect error or disconnect/reset before headers")) + })) +} + +func TestProxyTransport_EnvoySynthesizedResponse_UF_ReturnsConnectionRefused(t *testing.T) { + t.Parallel() + + proxy := newEnvoySynthesizedProxy(t, http.StatusBadGateway, "UF", "") + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("http://example.invalid/hook") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr), + "expected ErrProxyDestination for envoy UF flag, got: %v", err) + assert.Equal(t, "connection_refused", destErr.Code) + assert.Equal(t, "example.invalid", destErr.DestHost) +} + +func TestProxyTransport_EnvoySynthesizedResponse_UT_ReturnsTimeout(t *testing.T) { + t.Parallel() + + proxy := newEnvoySynthesizedProxy(t, http.StatusGatewayTimeout, "UT", "") + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("http://example.invalid/hook") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr)) + assert.Equal(t, "timeout", destErr.Code) +} + +func TestProxyTransport_EnvoySynthesizedResponse_DF_ReturnsDNSError(t *testing.T) { + t.Parallel() + + proxy := newEnvoySynthesizedProxy(t, http.StatusServiceUnavailable, "DF", "dns_resolution_failure") + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("http://example.invalid/hook") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr)) + assert.Equal(t, "dns_error", destErr.Code) + assert.Equal(t, "DF", destErr.Diagnostics["envoy_flag"], "envoy_flag captured for operator diagnostics") + assert.Equal(t, "dns_resolution_failure", destErr.Diagnostics["envoy_details"], "envoy_details captured for operator diagnostics") +} + +func TestProxyTransport_EnvoySynthesizedResponse_UC_ReturnsConnectionReset(t *testing.T) { + t.Parallel() + + // UC = upstream connection termination (established then dropped) — must + // classify as connection_reset, distinct from UF/connection_refused. + proxy := newEnvoySynthesizedProxy(t, http.StatusBadGateway, "UC", "upstream_reset_after_response_started{connection_termination}") + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("http://example.invalid/hook") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr)) + assert.Equal(t, "connection_reset", destErr.Code) + assert.Equal(t, "UC", destErr.Diagnostics["envoy_flag"]) + assert.Equal(t, "upstream_reset_after_response_started{connection_termination}", destErr.Diagnostics["envoy_details"]) +} + +func TestProxyTransport_EnvoyPlaceholderFlag_PassesResponseThrough(t *testing.T) { + t.Parallel() + + // "-" is the placeholder Envoy emits when no flag fired (pass-through + // success path). Must not be treated as synthesized. + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("x-envoy-response-flags", "-") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("real upstream body")) + })) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + resp, err := client.Get("http://example.invalid/hook") + require.NoError(t, err) + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "real upstream body", string(body)) + // Header stripped on success. + assert.Empty(t, resp.Header.Get("x-envoy-response-flags")) +} + +func TestProxyTransport_StripsEnvoyHeaders_OnSuccess(t *testing.T) { + t.Parallel() + + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("server", "envoy") + w.Header().Set("x-envoy-upstream-service-time", "12") + w.Header().Set("x-envoy-attempt-count", "1") + w.Header().Set("x-real-header", "kept") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + })) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + resp, err := client.Get("http://example.invalid/hook") + require.NoError(t, err) + defer resp.Body.Close() + + assert.Empty(t, resp.Header.Get("server")) + assert.Empty(t, resp.Header.Get("x-envoy-upstream-service-time")) + assert.Empty(t, resp.Header.Get("x-envoy-attempt-count")) + assert.Equal(t, "kept", resp.Header.Get("x-real-header")) +} + +func TestProxyTransport_EnvoyConnectFlag_RefinesInfraErrorCode(t *testing.T) { + t.Parallel() + + // Envoy CONNECT failure with a response-flag header. The + // OnProxyConnectResponse callback sees the headers and refines the + // destination error code from the flag instead of the generic default. + proxy := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodConnect { + w.Header().Set("x-envoy-response-flags", "DF") + w.WriteHeader(http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + })) + defer proxy.Close() + + client := makeProxiedClient(t, proxy.URL) + _, err := client.Get("https://example.invalid/") + require.Error(t, err) + + var destErr *destwebhook.ErrProxyDestination + require.True(t, errors.As(err, &destErr), + "expected ErrProxyDestination, got: %v", err) + assert.Equal(t, "dns_error", destErr.Code, + "envoy response-flag DF should refine generic connection_refused to dns_error") +} + +func TestMapEnvoyResponseFlag(t *testing.T) { + t.Parallel() + + cases := map[string]string{ + "UF": "connection_refused", + "UH": "connection_refused", + "LH": "connection_refused", + "UC": "connection_reset", + "UR": "connection_reset", + "LR": "connection_reset", + "UT": "timeout", + "SI": "timeout", + "DT": "timeout", + "UMSDR": "timeout", + "DF": "dns_error", + "NR": "network_unreachable", + "NC": "network_unreachable", + "UPE": "protocol_error", + "DPE": "protocol_error", + "unknown": "network_error", + } + for flag, want := range cases { + t.Run(flag, func(t *testing.T) { + t.Parallel() + assert.Equal(t, want, destwebhook.MapEnvoyResponseFlag(flag)) + }) + } +} diff --git a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go index 00648ca9..62acf0fb 100644 --- a/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go +++ b/internal/destregistry/providers/destwebhookstandard/destwebhookstandard.go @@ -229,9 +229,10 @@ func (d *StandardWebhookDestination) CreatePublisher(ctx context.Context, destin proxyURL = &d.proxyURL } - httpClient, err := d.BaseProvider.MakeHTTPClient(destregistry.HTTPClientConfig{ - UserAgent: &d.userAgent, - ProxyURL: proxyURL, + httpClient, err := destregistry.NewHTTPClient(destregistry.HTTPClientConfig{ + UserAgent: &d.userAgent, + ProxyURL: proxyURL, + WrapTransport: destwebhook.WrapTransport, }) if err != nil { return nil, err