Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 108 additions & 5 deletions pingoo/services/http_proxy_service.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use http::{HeaderName, HeaderValue, Request, Response, header};
use http_body_util::combinators::BoxBody;
use http::{HeaderName, HeaderValue, Request, Response, StatusCode, header};
use http_body_util::{BodyExt, combinators::BoxBody};
use http_body_util::Empty;
use hyper_rustls::ConfigBuilderExt;
use hyper_util::{
client::legacy::{Client, connect::HttpConnector},
rt::TokioExecutor,
rt::{TokioExecutor, TokioIo},
};
use rand::{rng, seq::IndexedRandom};
use tracing::{debug, error, warn};
Expand All @@ -20,6 +21,15 @@ use crate::{
},
};

/// Check if the request is a WebSocket upgrade.
fn is_websocket_upgrade(req: &Request<hyper::body::Incoming>) -> bool {
req.headers()
.get("upgrade")
.and_then(|v| v.to_str().ok())
.map(|v| v.eq_ignore_ascii_case("websocket"))
.unwrap_or(false)
}

// headers that need to be removed from the client request
// https://github.com/golang/go/blob/c39abe065886f62791f41240eef6ca03d452a17b/src/net/http/httputil/reverseproxy.go#L302
const HOP_HEADERS: &[&str] = &[
Expand Down Expand Up @@ -98,6 +108,8 @@ impl HttpService for HttpProxyService {
&self,
mut req: Request<hyper::body::Incoming>,
) -> Response<BoxBody<Bytes, hyper::Error>> {
let is_ws = is_websocket_upgrade(&req);

let upstreams = self.service_registry.get_upstreams(&self.name).await;
if upstreams.is_empty() {
debug!("[{}]: no upstream available", self.name);
Expand All @@ -111,17 +123,29 @@ impl HttpService for HttpProxyService {
.0
.clone();

// For WebSocket requests, preserve Connection and Upgrade headers.
// These are hop-by-hop headers per RFC 7230, but the WebSocket protocol
// (RFC 6455 Section 4.1) requires them to be forwarded for the upgrade handshake.
for header in HOP_HEADERS {
if is_ws && (*header == "Connection" || *header == "Upgrade") {
continue;
}
req.headers_mut().remove(*header);
}

let upstream = upstreams.choose(&mut rng()).unwrap();
let path_and_query = req.uri().path_and_query().map(|x| x.as_str()).unwrap_or("/");
let mut upstream_tls_version = http::Version::HTTP_11;
// WebSocket upgrade must use HTTP/1.1 — HTTP/2 does not support the Upgrade mechanism.
let upstream_tls_version = if is_ws {
http::Version::HTTP_11
} else if upstream.tls {
http::Version::HTTP_2
} else {
http::Version::HTTP_11
};

let uri_str = if upstream.tls {
// TODO: use upstream socketAddress and correct SNI
upstream_tls_version = http::Version::HTTP_2;
format!("https://{}{path_and_query}", &upstream.hostname)
} else {
format!("http://{}{path_and_query}", &upstream.socket_address)
Expand Down Expand Up @@ -189,6 +213,85 @@ impl HttpService for HttpProxyService {
};
}

// ── WebSocket upgrade path ──────────────────────────────────────────
// For WebSocket requests, we set up the client-side upgrade future before
// forwarding, then relay bytes bidirectionally after both sides complete
// the 101 Switching Protocols handshake.
if is_ws {
// Prepare the client-side upgrade future. This must be done before the
// request is consumed by the HTTP client.
let client_upgrade = hyper::upgrade::on(&mut req);

let mut res = match self.http_client.request(req).await {
Ok(res) => res,
Err(err) => {
error!("WebSocket upstream request failed: {err}");
return new_bad_gateway_error();
}
};

if res.status() != StatusCode::SWITCHING_PROTOCOLS {
debug!("WebSocket upstream did not return 101, got {}", res.status());
// Fall through to return whatever the upstream sent (e.g. 400, 404)
for header in REPONSE_HEADERS_TO_REEMOVE {
res.headers_mut().remove(*header);
}
res.headers_mut().insert("server", HeaderValue::from_static("pingoo"));
let (parts, body) = res.into_parts();
return Response::from_parts(parts, BoxBody::new(body));
}

// Upstream returned 101 — set up the bidirectional relay.
let upstream_upgrade = hyper::upgrade::on(&mut res);

// Build the 101 response to send back to the client, preserving
// the Sec-WebSocket-Accept and other upgrade headers from upstream.
let mut client_response = Response::builder()
.status(StatusCode::SWITCHING_PROTOCOLS);

// Copy relevant headers from upstream 101 response
for (name, value) in res.headers() {
// Forward all headers except the ones we manage
if name != "server" {
client_response = client_response.header(name.clone(), value.clone());
}
}
client_response = client_response.header("server", "pingoo");

let service_name = self.name.clone();

// Spawn a background task to relay bytes between client and upstream
// once both sides have completed the protocol upgrade.
tokio::spawn(async move {
let (client_io, upstream_io) = match tokio::try_join!(client_upgrade, upstream_upgrade) {
Ok(io) => io,
Err(err) => {
error!("[{}]: WebSocket upgrade handshake failed: {err}", service_name);
return;
}
};

let mut client_io = TokioIo::new(client_io);
let mut upstream_io = TokioIo::new(upstream_io);

match tokio::io::copy_bidirectional(&mut client_io, &mut upstream_io).await {
Ok((client_to_upstream, upstream_to_client)) => {
debug!(
"[{}]: WebSocket closed (client→upstream: {} bytes, upstream→client: {} bytes)",
service_name, client_to_upstream, upstream_to_client
);
}
Err(err) => {
debug!("[{}]: WebSocket relay ended: {err}", service_name);
}
}
});

let body: BoxBody<Bytes, hyper::Error> = BoxBody::new(Empty::new().map_err(|never| match never {}));
return client_response.body(body).unwrap();
}

// ── Normal HTTP proxy path ──────────────────────────────────────────
let mut res = match self.http_client.request(req).await {
Ok(res) => res,
Err(_) => return new_bad_gateway_error(),
Expand Down