From 6a4b388dd8389079a67f3395d62436d974a00b25 Mon Sep 17 00:00:00 2001 From: provoiceservices Date: Tue, 7 Apr 2026 08:06:09 -0500 Subject: [PATCH] feat: add WebSocket proxy support WebSocket upgrade requests (Upgrade: websocket) are now proxied through to upstream backends instead of being rejected. Previously, the Connection and Upgrade headers were unconditionally stripped as hop-by-hop headers before forwarding. While this is correct per RFC 7230, RFC 6455 Section 4.1 requires these headers for the WebSocket handshake. Changes: - Detect WebSocket upgrade requests before header stripping - Preserve Connection and Upgrade headers for WS requests - Force HTTP/1.1 for WS requests (HTTP/2 does not support Upgrade) - On 101 from upstream, set up bidirectional byte relay via tokio::io::copy_bidirectional - Forward Sec-WebSocket-Accept and other upgrade headers from upstream Works because the HTTP listener already uses hyper serve_connection_with_upgrades. Fixes #23 --- pingoo/services/http_proxy_service.rs | 113 ++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 5 deletions(-) diff --git a/pingoo/services/http_proxy_service.rs b/pingoo/services/http_proxy_service.rs index 00b4ee6..18dc2b2 100644 --- a/pingoo/services/http_proxy_service.rs +++ b/pingoo/services/http_proxy_service.rs @@ -1,12 +1,13 @@ use std::{sync::Arc, time::Duration}; use bytes::Bytes; -use http::{HeaderName, HeaderValue, Request, Response, header}; -use http_body_util::combinators::BoxBody; +use http::{HeaderName, HeaderValue, Request, Response, StatusCode, header}; +use http_body_util::{BodyExt, combinators::BoxBody}; +use http_body_util::Empty; use hyper_rustls::ConfigBuilderExt; use hyper_util::{ client::legacy::{Client, connect::HttpConnector}, - rt::TokioExecutor, + rt::{TokioExecutor, TokioIo}, }; use rand::{rng, seq::IndexedRandom}; use tracing::{debug, error, warn}; @@ -20,6 +21,15 @@ use crate::{ }, }; +/// Check if the request is a WebSocket upgrade. +fn is_websocket_upgrade(req: &Request) -> bool { + req.headers() + .get("upgrade") + .and_then(|v| v.to_str().ok()) + .map(|v| v.eq_ignore_ascii_case("websocket")) + .unwrap_or(false) +} + // headers that need to be removed from the client request // https://github.com/golang/go/blob/c39abe065886f62791f41240eef6ca03d452a17b/src/net/http/httputil/reverseproxy.go#L302 const HOP_HEADERS: &[&str] = &[ @@ -98,6 +108,8 @@ impl HttpService for HttpProxyService { &self, mut req: Request, ) -> Response> { + let is_ws = is_websocket_upgrade(&req); + let upstreams = self.service_registry.get_upstreams(&self.name).await; if upstreams.is_empty() { debug!("[{}]: no upstream available", self.name); @@ -111,17 +123,29 @@ impl HttpService for HttpProxyService { .0 .clone(); + // For WebSocket requests, preserve Connection and Upgrade headers. + // These are hop-by-hop headers per RFC 7230, but the WebSocket protocol + // (RFC 6455 Section 4.1) requires them to be forwarded for the upgrade handshake. for header in HOP_HEADERS { + if is_ws && (*header == "Connection" || *header == "Upgrade") { + continue; + } req.headers_mut().remove(*header); } let upstream = upstreams.choose(&mut rng()).unwrap(); let path_and_query = req.uri().path_and_query().map(|x| x.as_str()).unwrap_or("/"); - let mut upstream_tls_version = http::Version::HTTP_11; + // WebSocket upgrade must use HTTP/1.1 — HTTP/2 does not support the Upgrade mechanism. + let upstream_tls_version = if is_ws { + http::Version::HTTP_11 + } else if upstream.tls { + http::Version::HTTP_2 + } else { + http::Version::HTTP_11 + }; let uri_str = if upstream.tls { // TODO: use upstream socketAddress and correct SNI - upstream_tls_version = http::Version::HTTP_2; format!("https://{}{path_and_query}", &upstream.hostname) } else { format!("http://{}{path_and_query}", &upstream.socket_address) @@ -189,6 +213,85 @@ impl HttpService for HttpProxyService { }; } + // ── WebSocket upgrade path ────────────────────────────────────────── + // For WebSocket requests, we set up the client-side upgrade future before + // forwarding, then relay bytes bidirectionally after both sides complete + // the 101 Switching Protocols handshake. + if is_ws { + // Prepare the client-side upgrade future. This must be done before the + // request is consumed by the HTTP client. + let client_upgrade = hyper::upgrade::on(&mut req); + + let mut res = match self.http_client.request(req).await { + Ok(res) => res, + Err(err) => { + error!("WebSocket upstream request failed: {err}"); + return new_bad_gateway_error(); + } + }; + + if res.status() != StatusCode::SWITCHING_PROTOCOLS { + debug!("WebSocket upstream did not return 101, got {}", res.status()); + // Fall through to return whatever the upstream sent (e.g. 400, 404) + for header in REPONSE_HEADERS_TO_REEMOVE { + res.headers_mut().remove(*header); + } + res.headers_mut().insert("server", HeaderValue::from_static("pingoo")); + let (parts, body) = res.into_parts(); + return Response::from_parts(parts, BoxBody::new(body)); + } + + // Upstream returned 101 — set up the bidirectional relay. + let upstream_upgrade = hyper::upgrade::on(&mut res); + + // Build the 101 response to send back to the client, preserving + // the Sec-WebSocket-Accept and other upgrade headers from upstream. + let mut client_response = Response::builder() + .status(StatusCode::SWITCHING_PROTOCOLS); + + // Copy relevant headers from upstream 101 response + for (name, value) in res.headers() { + // Forward all headers except the ones we manage + if name != "server" { + client_response = client_response.header(name.clone(), value.clone()); + } + } + client_response = client_response.header("server", "pingoo"); + + let service_name = self.name.clone(); + + // Spawn a background task to relay bytes between client and upstream + // once both sides have completed the protocol upgrade. + tokio::spawn(async move { + let (client_io, upstream_io) = match tokio::try_join!(client_upgrade, upstream_upgrade) { + Ok(io) => io, + Err(err) => { + error!("[{}]: WebSocket upgrade handshake failed: {err}", service_name); + return; + } + }; + + let mut client_io = TokioIo::new(client_io); + let mut upstream_io = TokioIo::new(upstream_io); + + match tokio::io::copy_bidirectional(&mut client_io, &mut upstream_io).await { + Ok((client_to_upstream, upstream_to_client)) => { + debug!( + "[{}]: WebSocket closed (client→upstream: {} bytes, upstream→client: {} bytes)", + service_name, client_to_upstream, upstream_to_client + ); + } + Err(err) => { + debug!("[{}]: WebSocket relay ended: {err}", service_name); + } + } + }); + + let body: BoxBody = BoxBody::new(Empty::new().map_err(|never| match never {})); + return client_response.body(body).unwrap(); + } + + // ── Normal HTTP proxy path ────────────────────────────────────────── let mut res = match self.http_client.request(req).await { Ok(res) => res, Err(_) => return new_bad_gateway_error(),