From a9a7803c3a4be2f02733cc857f3b83be35c3ce8f Mon Sep 17 00:00:00 2001 From: wyzhou <251932032+wyzhou-com@users.noreply.github.com> Date: Fri, 17 Apr 2026 07:35:35 +0800 Subject: [PATCH 1/5] feat(local-tunnel): support dynamic addressing mode (0-RTT in-band relay) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The tunnel protocol previously required a fixed forward_addr at startup, limiting it to a single static destination. This commit makes forward_addr optional: when omitted, the tunnel reads the target address from the head of each client stream/packet using the SOCKS5 address encoding (ATYP + ADDR + PORT), then forwards the remaining payload accordingly. In essence, this is an unencrypted shadowsocks relay running locally. The dynamic tunnel speaks the same wire format as the shadowsocks protocol — ATYP+ADDR+PORT prefix followed by payload — but without the encryption layer. Wire format per connection/packet (no negotiation, no response): TCP: [ATYP(1) ADDR(variable) PORT(2)] [payload stream...] UDP: [ATYP(1) ADDR(variable) PORT(2)] [payload] (response packets are prefixed with the same header) Protocol overhead is exactly the target address itself — typically 7 bytes for IPv4 or 19 bytes for IPv6 — with zero round-trip handshake. By comparison, a SOCKS5 session costs 2+ RTT (4 messages for auth-request, auth-response, proxy-request, proxy-response) before the first byte of payload can flow. This 0-RTT property makes the tunnel ideal as a lightweight glue layer between components that already know the destination address: - Custom transparent proxy adapters: ss-local already ships a built-in redir module for standard tproxy/redirect use cases, but third-party adapters with custom functionality need a way to feed resolved destinations into ss-local. The dynamic tunnel serves as a minimal IPC protocol for these implementations — they prepend the address header and connect, replacing the full SOCKS5 handshake that would otherwise be wasted between two localhost processes. - Chained relay topologies (relay₁ → relay₂ → ... → relayₙ → target): each hop reads the address header once and forwards; no per-hop handshake amplification. N hops cost O(N) address-header bytes total, versus O(N) round-trip latencies with SOCKS5. - forwarders: minimal code surface — a connect() plus a single writev() of header + payload is the complete client implementation; no state machine for multi-message negotiation. - Programmatic proxies and service meshes: any process that can prepend a 7-byte header to its outbound stream gains transparent proxy-chain access, making this a drop-in building block for custom routing layers. Static mode (forward_addr present) is completely unchanged; the dynamic path is only entered when forward_addr is None. Changes: - config.rs: relax forward_addr validation for tunnel protocol - local/mod.rs: pass Option
instead of .expect() - tunnel/server.rs: propagate Option
through builder - tunnel/tcprelay.rs: read ATYP+ADDR+PORT from stream when dynamic - tunnel/udprelay.rs: split into run_static/run_dynamic with per-mode InboundWriter (response header prepend in dynamic mode) Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/shadowsocks-service/src/config.rs | 7 +- crates/shadowsocks-service/src/local/mod.rs | 4 +- .../src/local/tunnel/server.rs | 9 +- .../src/local/tunnel/tcprelay.rs | 42 ++++-- .../src/local/tunnel/udprelay.rs | 136 ++++++++++++++++-- 5 files changed, 160 insertions(+), 38 deletions(-) diff --git a/crates/shadowsocks-service/src/config.rs b/crates/shadowsocks-service/src/config.rs index 8aa742b6558e..b28c572e3b83 100644 --- a/crates/shadowsocks-service/src/config.rs +++ b/crates/shadowsocks-service/src/config.rs @@ -1344,10 +1344,9 @@ impl LocalConfig { } #[cfg(feature = "local-tunnel")] ProtocolType::Tunnel => { - if self.forward_addr.is_none() { - let err = Error::new(ErrorKind::MissingField, "missing `forward_addr` in configuration", None); - return Err(err); - } + // forward_addr is optional: + // - Some: static tunnel (all connections forwarded to fixed address) + // - None: dynamic tunnel (target address read from each client stream) } #[cfg(feature = "local-http")] diff --git a/crates/shadowsocks-service/src/local/mod.rs b/crates/shadowsocks-service/src/local/mod.rs index f2095950b17a..a6feae95d3d7 100644 --- a/crates/shadowsocks-service/src/local/mod.rs +++ b/crates/shadowsocks-service/src/local/mod.rs @@ -324,10 +324,8 @@ impl Server { None => return Err(io::Error::other("tunnel requires local address")), }; - let forward_addr = local_config.forward_addr.expect("tunnel requires forward address"); - let mut server_builder = - TunnelBuilder::with_context(context.clone(), forward_addr.clone(), client_addr, balancer); + TunnelBuilder::with_context(context.clone(), local_config.forward_addr, client_addr, balancer); if let Some(c) = config.udp_max_associations { server_builder.set_udp_capacity(c); diff --git a/crates/shadowsocks-service/src/local/tunnel/server.rs b/crates/shadowsocks-service/src/local/tunnel/server.rs index f25b276eec55..0684f47ec956 100644 --- a/crates/shadowsocks-service/src/local/tunnel/server.rs +++ b/crates/shadowsocks-service/src/local/tunnel/server.rs @@ -14,7 +14,7 @@ use super::{ pub struct TunnelBuilder { context: Arc, - forward_addr: Address, + forward_addr: Option
, mode: Mode, udp_expiry_duration: Option, udp_capacity: Option, @@ -29,7 +29,10 @@ pub struct TunnelBuilder { impl TunnelBuilder { /// Create a new Tunnel server forwarding to `forward_addr` - pub fn new(forward_addr: Address, client_addr: ServerAddr, balancer: PingBalancer) -> Self { + /// + /// If `forward_addr` is `None`, the tunnel operates in dynamic mode: + /// it reads the target address (ATYP+ADDR+PORT) from each client stream. + pub fn new(forward_addr: Option
, client_addr: ServerAddr, balancer: PingBalancer) -> Self { let context = ServiceContext::new(); Self::with_context(Arc::new(context), forward_addr, client_addr, balancer) } @@ -37,7 +40,7 @@ impl TunnelBuilder { /// Create a new Tunnel server with context pub fn with_context( context: Arc, - forward_addr: Address, + forward_addr: Option
, client_addr: ServerAddr, balancer: PingBalancer, ) -> Self { diff --git a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs index 55c410fce455..8159d3f99c88 100644 --- a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs @@ -17,7 +17,7 @@ pub struct TunnelTcpServerBuilder { context: Arc, client_config: ServerAddr, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, #[cfg(target_os = "macos")] launchd_socket_name: Option, } @@ -27,7 +27,7 @@ impl TunnelTcpServerBuilder { context: Arc, client_config: ServerAddr, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, ) -> Self { Self { context, @@ -79,7 +79,7 @@ pub struct TunnelTcpServer { context: Arc, listener: ShadowTcpListener, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, } impl TunnelTcpServer { @@ -90,9 +90,13 @@ impl TunnelTcpServer { /// Start serving pub async fn run(self) -> io::Result<()> { - info!("shadowsocks TCP tunnel listening on {}", self.listener.local_addr()?); + if let Some(ref addr) = self.forward_addr { + info!("shadowsocks TCP tunnel listening on {}, forward to {}", self.listener.local_addr()?, addr); + } else { + info!("shadowsocks TCP tunnel listening on {}, dynamic forward", self.listener.local_addr()?); + } - let forward_addr = Arc::new(self.forward_addr); + let forward_addr = self.forward_addr.map(Arc::new); loop { let (stream, peer_addr) = match self.listener.accept().await { Ok(s) => s, @@ -119,15 +123,27 @@ async fn handle_tcp_client( mut stream: TcpStream, balancer: PingBalancer, peer_addr: SocketAddr, - forward_addr: Arc
, + forward_addr: Option>, ) -> io::Result<()> { - let forward_addr: &Address = &forward_addr; + // Static mode: use pre-configured forward_addr. + // Dynamic mode: read ATYP+ADDR+PORT from the client stream. + let owned_addr: Address; + let target_addr: &Address = match forward_addr { + Some(ref a) => a, + None => { + owned_addr = Address::read_from(&mut stream) + .await + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("read target address: {}", e)))?; + trace!("dynamic tunnel {} -> {} read target address", peer_addr, owned_addr); + &owned_addr + } + }; if balancer.is_empty() { - trace!("establishing tcp tunnel {} <-> {} direct", peer_addr, forward_addr); + trace!("establishing tcp tunnel {} <-> {} direct", peer_addr, target_addr); - let mut remote = AutoProxyClientStream::connect_bypassed(context, forward_addr).await?; - return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, forward_addr).await; + let mut remote = AutoProxyClientStream::connect_bypassed(context, target_addr).await?; + return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, target_addr).await; } let server = balancer.best_tcp_server(); @@ -135,13 +151,13 @@ async fn handle_tcp_client( trace!( "establishing tcp tunnel {} <-> {} through server {} (outbound: {})", peer_addr, - forward_addr, + target_addr, svr_cfg.tcp_external_addr(), svr_cfg.addr(), ); let mut remote = - AutoProxyClientStream::connect_proxied_with_opts(context, &server, forward_addr, server.connect_opts_ref()) + AutoProxyClientStream::connect_proxied_with_opts(context, &server, target_addr, server.connect_opts_ref()) .await?; - establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, forward_addr).await + establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, target_addr).await } diff --git a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs index bb9166cef956..4f033f51d5f2 100644 --- a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs @@ -2,6 +2,7 @@ use std::{io, net::SocketAddr, sync::Arc, time::Duration}; +use bytes::BytesMut; use log::{debug, error, info}; use shadowsocks::{ ServerAddr, @@ -21,7 +22,7 @@ pub struct TunnelUdpServerBuilder { time_to_live: Option, capacity: Option, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, #[cfg(target_os = "macos")] launchd_socket_name: Option, } @@ -33,7 +34,7 @@ impl TunnelUdpServerBuilder { time_to_live: Option, capacity: Option, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, ) -> Self { Self { context, @@ -83,24 +84,42 @@ impl TunnelUdpServerBuilder { } } +/// Static tunnel mode: forward_addr is fixed, responses go directly to client. #[derive(Clone)] -struct TunnelUdpInboundWriter { +struct StaticTunnelUdpInboundWriter { inbound: Arc, } -impl UdpInboundWrite for TunnelUdpInboundWriter { +impl UdpInboundWrite for StaticTunnelUdpInboundWriter { async fn send_to(&self, peer_addr: SocketAddr, _remote_addr: &Address, data: &[u8]) -> io::Result<()> { self.inbound.send_to(data, peer_addr).await.map(|_| ()) } } +/// Dynamic tunnel mode: forward_addr is per-packet, responses are prefixed with ATYP+ADDR+PORT. +#[derive(Clone)] +struct DynamicTunnelUdpInboundWriter { + inbound: Arc, +} + +impl UdpInboundWrite for DynamicTunnelUdpInboundWriter { + async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()> { + // Prepend ATYP+ADDR+PORT header so the client can identify which target sent this response + let addr_len = remote_addr.serialized_len(); + let mut buf = BytesMut::with_capacity(addr_len + data.len()); + remote_addr.write_to_buf(&mut buf); + buf.extend_from_slice(data); + self.inbound.send_to(&buf, peer_addr).await.map(|_| ()) + } +} + pub struct TunnelUdpServer { context: Arc, time_to_live: Option, capacity: Option, listener: Arc, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, } impl TunnelUdpServer { @@ -110,12 +129,24 @@ impl TunnelUdpServer { } /// Start serving - pub async fn run(self) -> io::Result<()> { - info!("shadowsocks UDP tunnel listening on {}", self.listener.local_addr()?); + pub async fn run(mut self) -> io::Result<()> { + match self.forward_addr.take() { + Some(addr) => self.run_static(addr).await, + None => self.run_dynamic().await, + } + } + + /// Static mode: all packets forwarded to a fixed address (original behavior) + async fn run_static(self, forward_addr: Address) -> io::Result<()> { + info!( + "shadowsocks UDP tunnel listening on {}, forward to {}", + self.listener.local_addr()?, + forward_addr + ); let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new( self.context.clone(), - TunnelUdpInboundWriter { + StaticTunnelUdpInboundWriter { inbound: self.listener.clone(), }, self.time_to_live, @@ -160,15 +191,90 @@ impl TunnelUdpServer { } let data = &buffer[..n]; - if let Err(err) = manager.send_to(peer_addr, self.forward_addr.clone(), data) - .await - { + if let Err(err) = manager.send_to(peer_addr, forward_addr.clone(), data).await { + debug!( + "udp packet relay {} -> {} with {} bytes failed, error: {}", + peer_addr, forward_addr, data.len(), err + ); + } + } + } + } + } + + /// Dynamic mode: parse ATYP+ADDR+PORT from each packet, prepend header to responses + async fn run_dynamic(self) -> io::Result<()> { + info!( + "shadowsocks UDP tunnel listening on {}, dynamic forward", + self.listener.local_addr()? + ); + + let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new( + self.context.clone(), + DynamicTunnelUdpInboundWriter { + inbound: self.listener.clone(), + }, + self.time_to_live, + self.capacity, + self.balancer, + ); + + let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; + let mut cleanup_timer = time::interval(cleanup_interval); + + loop { + tokio::select! { + _ = cleanup_timer.tick() => { + // cleanup expired associations. iter() will remove expired elements + manager.cleanup_expired().await; + } + + peer_addr_opt = keepalive_rx.recv() => { + let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectedly"); + manager.keep_alive(&peer_addr).await; + } + + recv_result = self.listener.recv_from(&mut buffer) => { + let (n, peer_addr) = match recv_result { + Ok(s) => s, + Err(err) => { + error!("udp server recv_from failed with error: {}", err); + time::sleep(Duration::from_secs(1)).await; + continue; + } + }; + + if n == 0 { + // For windows, it will generate a ICMP Port Unreachable Message + // https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom + // Which will result in recv_from return 0. + // + // It cannot be solved here, because `WSAGetLastError` is already set. + // + // See `relay::udprelay::utils::create_socket` for more detail. + continue; + } + + // Parse ATYP+ADDR+PORT from packet prefix + let mut cursor = io::Cursor::new(&buffer[..n]); + let target_addr = match Address::read_cursor(&mut cursor) { + Ok(addr) => addr, + Err(err) => { + error!( + "received invalid UDP tunnel packet from {}: {}", + peer_addr, err + ); + continue; + } + }; + + let header_len = cursor.position() as usize; + let payload = &buffer[header_len..n]; + + if let Err(err) = manager.send_to(peer_addr, target_addr.clone(), payload).await { debug!( "udp packet relay {} -> {} with {} bytes failed, error: {}", - peer_addr, - self.forward_addr, - data.len(), - err + peer_addr, target_addr, payload.len(), err ); } } From cc20e86b2fc60a6d0cd31f265b2d424e5b044912 Mon Sep 17 00:00:00 2001 From: wyzhou <251932032+wyzhou-com@users.noreply.github.com> Date: Fri, 17 Apr 2026 08:05:51 +0800 Subject: [PATCH 2/5] fix(local-tunnel): unify dynamic tunnel trace logs across TCP and UDP Remove redundant "read target address" suffix from TCP trace log and add matching trace log to UDP dynamic mode for consistency. --- crates/shadowsocks-service/src/local/tunnel/tcprelay.rs | 2 +- crates/shadowsocks-service/src/local/tunnel/udprelay.rs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs index 8159d3f99c88..62975c77c968 100644 --- a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs @@ -134,7 +134,7 @@ async fn handle_tcp_client( owned_addr = Address::read_from(&mut stream) .await .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("read target address: {}", e)))?; - trace!("dynamic tunnel {} -> {} read target address", peer_addr, owned_addr); + trace!("dynamic tunnel {} -> {}", peer_addr, owned_addr); &owned_addr } }; diff --git a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs index 4f033f51d5f2..9f8a3a8d8ec2 100644 --- a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs @@ -3,7 +3,7 @@ use std::{io, net::SocketAddr, sync::Arc, time::Duration}; use bytes::BytesMut; -use log::{debug, error, info}; +use log::{debug, error, info, trace}; use shadowsocks::{ ServerAddr, relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE}, @@ -271,6 +271,8 @@ impl TunnelUdpServer { let header_len = cursor.position() as usize; let payload = &buffer[header_len..n]; + trace!("dynamic tunnel {} -> {}", peer_addr, target_addr); + if let Err(err) = manager.send_to(peer_addr, target_addr.clone(), payload).await { debug!( "udp packet relay {} -> {} with {} bytes failed, error: {}", From 04f52ee41f6d4f015a23ba2ef2609d071d90eb1e Mon Sep 17 00:00:00 2001 From: wyzhou <251932032+wyzhou-com@users.noreply.github.com> Date: Sat, 18 Apr 2026 07:11:05 +0800 Subject: [PATCH 3/5] fix(local-tunnel): log invalid TCP dynamic tunnel handshakes Malformed ATYP+ADDR+PORT from a dynamic TCP tunnel client was mapped to io::Error and returned, but the spawned task result is not joined, so the failure was silently dropped. Emit an error! on the failing match arm, matching the UDP dynamic path which already logs invalid packets. --- .../shadowsocks-service/src/local/tunnel/tcprelay.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs index 62975c77c968..477d72f2e9bf 100644 --- a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs @@ -131,9 +131,13 @@ async fn handle_tcp_client( let target_addr: &Address = match forward_addr { Some(ref a) => a, None => { - owned_addr = Address::read_from(&mut stream) - .await - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("read target address: {}", e)))?; + owned_addr = match Address::read_from(&mut stream).await { + Ok(addr) => addr, + Err(err) => { + error!("received invalid TCP tunnel connection from {}: {}", peer_addr, err); + return Err(io::Error::new(io::ErrorKind::InvalidData, format!("read target address: {}", err))); + } + }; trace!("dynamic tunnel {} -> {}", peer_addr, owned_addr); &owned_addr } From cd031628e8fec0b21d83ca033e6f7ba968f3ff30 Mon Sep 17 00:00:00 2001 From: wyzhou <251932032+wyzhou-com@users.noreply.github.com> Date: Sun, 19 Apr 2026 21:19:56 +0800 Subject: [PATCH 4/5] fix(socks5): reserve PORT room in Address::read_cursor DOMAIN branch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The DOMAIN branch of `Address::read_cursor()` only checked `remaining() < domain_len`, then unconditionally called `cur.get_u16()` to read the 2-byte PORT. When the input ends exactly after the domain bytes (e.g. `03 03 61 62 63` — ATYP=DOMAIN, LEN=3, "abc", no PORT), `get_u16()` panics on buffer underflow instead of returning `Err`. Extend the check to `domain_len + 2` so PORT is validated before it is read, matching the IPv4 / IPv6 branches that already check `4 + 2` and `16 + 2` respectively. This is not limited to the dynamic UDP tunnel that surfaced the bug: `Address::read_cursor()` is also called on every inbound UDP packet from the core relay decoders — AEAD (`udprelay/aead.rs`), AEAD-2022 (`udprelay/aead_2022.rs`), stream ciphers (`udprelay/stream.rs`), and `udprelay/crypto_io.rs`. A crafted packet with a truncated DOMAIN header in any of those paths would abort the UDP task instead of being rejected as malformed. Add an integration test `udp_dynamic_tunnel` in `tests/tunnel.rs` that first sends the truncated `03 03 61 62 63` packet and then a valid IPv4-targeted dynamic packet, asserting the echo reply still comes back — i.e. the listener survived the bad packet. Also add `tcp_dynamic_tunnel` as a happy-path counterpart, styled consistently with the existing `tcp_tunnel` / `udp_tunnel` tests. Co-Authored-By: Claude Opus 4.7 --- crates/shadowsocks/src/relay/socks5.rs | 3 +- tests/tunnel.rs | 158 +++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/crates/shadowsocks/src/relay/socks5.rs b/crates/shadowsocks/src/relay/socks5.rs index 8aa21724ae74..b2a3ff08fe84 100644 --- a/crates/shadowsocks/src/relay/socks5.rs +++ b/crates/shadowsocks/src/relay/socks5.rs @@ -236,7 +236,8 @@ impl Address { } consts::SOCKS5_ADDR_TYPE_DOMAIN_NAME => { let domain_len = cur.get_u8() as usize; - if cur.remaining() < domain_len { + // domain_len + 2: leave room for PORT, otherwise get_u16() below panics. + if cur.remaining() < domain_len + 2 { return Err(Error::AddressDomainInvalidEncoding); } let mut buf = vec![0u8; domain_len]; diff --git a/tests/tunnel.rs b/tests/tunnel.rs index c4f5a10c7bf3..4e11a6af016d 100644 --- a/tests/tunnel.rs +++ b/tests/tunnel.rs @@ -163,3 +163,161 @@ async fn udp_tunnel() { assert_eq!(MESSAGE, recv_payload); } + +#[tokio::test] +async fn tcp_dynamic_tunnel() { + let _ = env_logger::try_init(); + + let (local_port, server_port) = random_local_tcp_port_pair(); + let local_config = Config::load_from_str( + &format!( + r#"{{ + "locals": [ + {{ + "local_port": {local_port}, + "local_address": "127.0.0.1", + "protocol": "tunnel" + }} + ], + "server": "127.0.0.1", + "server_port": {server_port}, + "password": "password", + "method": "aes-256-gcm" + }}"# + ), + ConfigType::Local, + ) + .unwrap(); + + let server_config = Config::load_from_str( + &format!( + r#"{{ + "server": "127.0.0.1", + "server_port": {server_port}, + "password": "password", + "method": "aes-256-gcm" + }}"# + ), + ConfigType::Server, + ) + .unwrap(); + + tokio::spawn(run_local(local_config)); + tokio::spawn(run_server(server_config)); + + time::sleep(Duration::from_secs(5)).await; + + // Dynamic tunnel: prepend ATYP+DOMAIN+PORT header on the TCP stream. + let mut stream = TcpStream::connect(("127.0.0.1", local_port)).await.unwrap(); + + const HOST: &[u8] = b"detectportal.firefox.com"; + let mut header = Vec::with_capacity(2 + HOST.len() + 2); + header.push(0x03); // ATYP = DOMAIN + header.push(HOST.len() as u8); + header.extend_from_slice(HOST); + header.extend_from_slice(&80u16.to_be_bytes()); + stream.write_all(&header).await.unwrap(); + + let req = b"GET /success.txt HTTP/1.0\r\nHost: detectportal.firefox.com\r\nAccept: */*\r\n\r\n"; + stream.write_all(req).await.unwrap(); + stream.flush().await.unwrap(); + + let mut r = BufReader::new(stream); + + let mut buf = Vec::new(); + r.read_until(b'\n', &mut buf).await.unwrap(); + + let http_status = b"HTTP/1.0 200 OK\r\n"; + assert!(buf.starts_with(http_status)); +} + +#[tokio::test] +async fn udp_dynamic_tunnel() { + let _ = env_logger::try_init(); + + let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let socket_local_addr = socket.local_addr().unwrap(); + let echo_server_port = socket_local_addr.port(); + + // A UDP echo server + tokio::spawn(async move { + debug!("UDP echo server listening on {socket_local_addr}"); + + let mut buffer = [0u8; 65536]; + loop { + let (n, peer_addr) = socket.recv_from(&mut buffer).await.unwrap(); + debug!("UDP echo server received {} bytes from {}, echoing", n, peer_addr); + socket.send_to(&buffer[..n], peer_addr).await.unwrap(); + } + }); + + time::sleep(Duration::from_secs(1)).await; + + let (local_port, server_port) = random_local_tcp_port_pair(); + let local_config = Config::load_from_str( + &format!( + r#"{{ + "locals": [ + {{ + "local_port": {local_port}, + "local_address": "127.0.0.1", + "protocol": "tunnel" + }} + ], + "server": "127.0.0.1", + "server_port": {server_port}, + "password": "password", + "method": "aes-256-gcm", + "mode": "tcp_and_udp" + }}"# + ), + ConfigType::Local, + ) + .unwrap(); + + let server_config = Config::load_from_str( + &format!( + r#"{{ + "server": "127.0.0.1", + "server_port": {server_port}, + "password": "password", + "method": "aes-256-gcm", + "mode": "udp_only" + }}"# + ), + ConfigType::Server, + ) + .unwrap(); + + tokio::spawn(run_local(local_config)); + tokio::spawn(run_server(server_config)); + + time::sleep(Duration::from_secs(5)).await; + + const MESSAGE: &[u8] = b"hello shadowsocks"; + + let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); + + // Malformed header (ATYP=DOMAIN, LEN=3, "abc", missing 2-byte PORT) must + // be rejected without taking down the dynamic UDP listener. + let malformed: &[u8] = &[0x03, 0x03, b'a', b'b', b'c']; + socket.send_to(malformed, ("127.0.0.1", local_port)).await.unwrap(); + + // Dynamic tunnel: prepend ATYP+IPv4+PORT header on each UDP packet. + let port_be = echo_server_port.to_be_bytes(); + let mut packet = Vec::with_capacity(7 + MESSAGE.len()); + packet.extend_from_slice(&[0x01, 127, 0, 0, 1, port_be[0], port_be[1]]); + packet.extend_from_slice(MESSAGE); + + socket.send_to(&packet, ("127.0.0.1", local_port)).await.unwrap(); + + let mut buf = vec![0u8; 65536]; + let n = socket.recv(&mut buf).await.unwrap(); + + let recv_payload = &buf[..n]; + println!("Got reply from server: {:?}", ByteStr::new(recv_payload)); + + // Response is prefixed with the same ATYP+ADDR+PORT header (7 bytes for IPv4). + assert_eq!(recv_payload[0], 0x01); + assert_eq!(&recv_payload[7..], MESSAGE); +} From 089798b74cf0609aa0940878c1e04d1e26e03d33 Mon Sep 17 00:00:00 2001 From: wyzhou <251932032+wyzhou-com@users.noreply.github.com> Date: Wed, 22 Apr 2026 08:57:29 +0800 Subject: [PATCH 5/5] fix(local-tunnel): preserve builder API and tidy dynamic UDP relay Restore the public TunnelBuilder static constructors to accept Address instead of Option
, and add explicit dynamic constructors for the new in-band addressing mode. This keeps existing callers source-compatible while still allowing dynamic tunnels to be created intentionally. Also clean up the tunnel relay implementation: - route config construction through static or dynamic builder entry points - keep Option
as an internal implementation detail - normalize IPv4-mapped IPv6 addresses in dynamic UDP responses, matching the SOCKS5 UDP writer behavior - share the UDP receive/association loop between static and dynamic modes while keeping mode-specific packet parsing and response writers separate - remove redundant explanatory comments added around dynamic mode This avoids an API break from the dynamic tunnel feature, keeps UDP response address encoding consistent with existing relay behavior, and reduces duplicate UDP loop logic without changing static tunnel semantics. --- crates/shadowsocks-service/src/config.rs | 6 - crates/shadowsocks-service/src/local/mod.rs | 8 +- .../src/local/tunnel/server.rs | 25 ++- .../src/local/tunnel/tcprelay.rs | 2 - .../src/local/tunnel/udprelay.rs | 183 +++++++++--------- 5 files changed, 116 insertions(+), 108 deletions(-) diff --git a/crates/shadowsocks-service/src/config.rs b/crates/shadowsocks-service/src/config.rs index b28c572e3b83..0f40c145cec5 100644 --- a/crates/shadowsocks-service/src/config.rs +++ b/crates/shadowsocks-service/src/config.rs @@ -1342,12 +1342,6 @@ impl LocalConfig { return Err(err); } } - #[cfg(feature = "local-tunnel")] - ProtocolType::Tunnel => { - // forward_addr is optional: - // - Some: static tunnel (all connections forwarded to fixed address) - // - None: dynamic tunnel (target address read from each client stream) - } #[cfg(feature = "local-http")] ProtocolType::Http => { diff --git a/crates/shadowsocks-service/src/local/mod.rs b/crates/shadowsocks-service/src/local/mod.rs index a6feae95d3d7..ccccf3b5a72b 100644 --- a/crates/shadowsocks-service/src/local/mod.rs +++ b/crates/shadowsocks-service/src/local/mod.rs @@ -324,8 +324,12 @@ impl Server { None => return Err(io::Error::other("tunnel requires local address")), }; - let mut server_builder = - TunnelBuilder::with_context(context.clone(), local_config.forward_addr, client_addr, balancer); + let mut server_builder = match local_config.forward_addr { + Some(forward_addr) => { + TunnelBuilder::with_context(context.clone(), forward_addr, client_addr, balancer) + } + None => TunnelBuilder::with_context_dynamic(context.clone(), client_addr, balancer), + }; if let Some(c) = config.udp_max_associations { server_builder.set_udp_capacity(c); diff --git a/crates/shadowsocks-service/src/local/tunnel/server.rs b/crates/shadowsocks-service/src/local/tunnel/server.rs index 0684f47ec956..b0b6d141a195 100644 --- a/crates/shadowsocks-service/src/local/tunnel/server.rs +++ b/crates/shadowsocks-service/src/local/tunnel/server.rs @@ -29,16 +29,33 @@ pub struct TunnelBuilder { impl TunnelBuilder { /// Create a new Tunnel server forwarding to `forward_addr` - /// - /// If `forward_addr` is `None`, the tunnel operates in dynamic mode: - /// it reads the target address (ATYP+ADDR+PORT) from each client stream. - pub fn new(forward_addr: Option
, client_addr: ServerAddr, balancer: PingBalancer) -> Self { + pub fn new(forward_addr: Address, client_addr: ServerAddr, balancer: PingBalancer) -> Self { let context = ServiceContext::new(); Self::with_context(Arc::new(context), forward_addr, client_addr, balancer) } + /// Create a new dynamic Tunnel server + pub fn new_dynamic(client_addr: ServerAddr, balancer: PingBalancer) -> Self { + let context = ServiceContext::new(); + Self::with_context_dynamic(Arc::new(context), client_addr, balancer) + } + /// Create a new Tunnel server with context pub fn with_context( + context: Arc, + forward_addr: Address, + client_addr: ServerAddr, + balancer: PingBalancer, + ) -> Self { + Self::with_context_forward_addr(context, Some(forward_addr), client_addr, balancer) + } + + /// Create a new dynamic Tunnel server with context + pub fn with_context_dynamic(context: Arc, client_addr: ServerAddr, balancer: PingBalancer) -> Self { + Self::with_context_forward_addr(context, None, client_addr, balancer) + } + + fn with_context_forward_addr( context: Arc, forward_addr: Option
, client_addr: ServerAddr, diff --git a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs index 477d72f2e9bf..bdaaddd23e82 100644 --- a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs @@ -125,8 +125,6 @@ async fn handle_tcp_client( peer_addr: SocketAddr, forward_addr: Option>, ) -> io::Result<()> { - // Static mode: use pre-configured forward_addr. - // Dynamic mode: read ATYP+ADDR+PORT from the client stream. let owned_addr: Address; let target_addr: &Address = match forward_addr { Some(ref a) => a, diff --git a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs index 9f8a3a8d8ec2..d8d096816274 100644 --- a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs @@ -1,6 +1,11 @@ //! UDP Tunnel server -use std::{io, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + io, + net::{IpAddr, SocketAddr}, + sync::Arc, + time::Duration, +}; use bytes::BytesMut; use log::{debug, error, info, trace}; @@ -10,10 +15,13 @@ use shadowsocks::{ }; use tokio::{net::UdpSocket, time}; -use crate::local::{ - context::ServiceContext, - loadbalancing::PingBalancer, - net::{UdpAssociationManager, UdpInboundWrite, udp::listener::create_standard_udp_listener}, +use crate::{ + local::{ + context::ServiceContext, + loadbalancing::PingBalancer, + net::{UdpAssociationManager, UdpInboundWrite, udp::listener::create_standard_udp_listener}, + }, + net::utils::to_ipv4_mapped, }; pub struct TunnelUdpServerBuilder { @@ -84,7 +92,6 @@ impl TunnelUdpServerBuilder { } } -/// Static tunnel mode: forward_addr is fixed, responses go directly to client. #[derive(Clone)] struct StaticTunnelUdpInboundWriter { inbound: Arc, @@ -96,7 +103,6 @@ impl UdpInboundWrite for StaticTunnelUdpInboundWriter { } } -/// Dynamic tunnel mode: forward_addr is per-packet, responses are prefixed with ATYP+ADDR+PORT. #[derive(Clone)] struct DynamicTunnelUdpInboundWriter { inbound: Arc, @@ -104,7 +110,22 @@ struct DynamicTunnelUdpInboundWriter { impl UdpInboundWrite for DynamicTunnelUdpInboundWriter { async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()> { - // Prepend ATYP+ADDR+PORT header so the client can identify which target sent this response + let remote_addr = match remote_addr { + Address::SocketAddress(sa) => { + // Try to convert IPv4 mapped IPv6 address if server is running on dual-stack mode + let saddr = match *sa { + SocketAddr::V4(..) => *sa, + SocketAddr::V6(ref v6) => match to_ipv4_mapped(v6.ip()) { + Some(v4) => SocketAddr::new(IpAddr::from(v4), v6.port()), + None => *sa, + }, + }; + + Address::SocketAddress(saddr) + } + daddr => daddr.clone(), + }; + let addr_len = remote_addr.serialized_len(); let mut buf = BytesMut::with_capacity(addr_len + data.len()); remote_addr.write_to_buf(&mut buf); @@ -113,6 +134,29 @@ impl UdpInboundWrite for DynamicTunnelUdpInboundWriter { } } +struct ParsedPacket { + target_addr: Address, + payload_start: usize, +} + +fn parse_dynamic_packet(peer_addr: SocketAddr, data: &[u8]) -> Option { + let mut cursor = io::Cursor::new(data); + let target_addr = match Address::read_cursor(&mut cursor) { + Ok(addr) => addr, + Err(err) => { + error!("received invalid UDP tunnel packet from {}: {}", peer_addr, err); + return None; + } + }; + + trace!("dynamic tunnel {} -> {}", peer_addr, target_addr); + + Some(ParsedPacket { + target_addr, + payload_start: cursor.position() as usize, + }) +} + pub struct TunnelUdpServer { context: Arc, time_to_live: Option, @@ -136,84 +180,46 @@ impl TunnelUdpServer { } } - /// Static mode: all packets forwarded to a fixed address (original behavior) async fn run_static(self, forward_addr: Address) -> io::Result<()> { - info!( - "shadowsocks UDP tunnel listening on {}, forward to {}", - self.listener.local_addr()?, - forward_addr - ); - - let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new( - self.context.clone(), - StaticTunnelUdpInboundWriter { - inbound: self.listener.clone(), + let mode_desc = format!("forward to {}", forward_addr); + let inbound = self.listener.clone(); + self.run_with_packet_parser( + StaticTunnelUdpInboundWriter { inbound }, + &mode_desc, + move |_peer_addr, _data| { + Some(ParsedPacket { + target_addr: forward_addr.clone(), + payload_start: 0, + }) }, - self.time_to_live, - self.capacity, - self.balancer, - ); - - let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE]; - let mut cleanup_timer = time::interval(cleanup_interval); - - loop { - tokio::select! { - _ = cleanup_timer.tick() => { - // cleanup expired associations. iter() will remove expired elements - manager.cleanup_expired().await; - } - - peer_addr_opt = keepalive_rx.recv() => { - let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectedly"); - manager.keep_alive(&peer_addr).await; - } - - recv_result = self.listener.recv_from(&mut buffer) => { - let (n, peer_addr) = match recv_result { - Ok(s) => s, - Err(err) => { - error!("udp server recv_from failed with error: {}", err); - time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - - if n == 0 { - // For windows, it will generate a ICMP Port Unreachable Message - // https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom - // Which will result in recv_from return 0. - // - // It cannot be solved here, because `WSAGetLastError` is already set. - // - // See `relay::udprelay::utils::create_socket` for more detail. - continue; - } - - let data = &buffer[..n]; - if let Err(err) = manager.send_to(peer_addr, forward_addr.clone(), data).await { - debug!( - "udp packet relay {} -> {} with {} bytes failed, error: {}", - peer_addr, forward_addr, data.len(), err - ); - } - } - } - } + ) + .await } - /// Dynamic mode: parse ATYP+ADDR+PORT from each packet, prepend header to responses async fn run_dynamic(self) -> io::Result<()> { + let inbound = self.listener.clone(); + self.run_with_packet_parser( + DynamicTunnelUdpInboundWriter { inbound }, + "dynamic forward", + parse_dynamic_packet, + ) + .await + } + + async fn run_with_packet_parser(self, writer: W, mode_desc: &str, mut parse_packet: F) -> io::Result<()> + where + W: UdpInboundWrite + Clone + Send + Sync + Unpin + 'static, + F: FnMut(SocketAddr, &[u8]) -> Option + Send, + { info!( - "shadowsocks UDP tunnel listening on {}, dynamic forward", - self.listener.local_addr()? + "shadowsocks UDP tunnel listening on {}, {}", + self.listener.local_addr()?, + mode_desc ); let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new( self.context.clone(), - DynamicTunnelUdpInboundWriter { - inbound: self.listener.clone(), - }, + writer, self.time_to_live, self.capacity, self.balancer, @@ -255,28 +261,17 @@ impl TunnelUdpServer { continue; } - // Parse ATYP+ADDR+PORT from packet prefix - let mut cursor = io::Cursor::new(&buffer[..n]); - let target_addr = match Address::read_cursor(&mut cursor) { - Ok(addr) => addr, - Err(err) => { - error!( - "received invalid UDP tunnel packet from {}: {}", - peer_addr, err - ); - continue; - } + let data = &buffer[..n]; + let packet = match parse_packet(peer_addr, data) { + Some(packet) => packet, + None => continue, }; + let payload = &data[packet.payload_start..]; - let header_len = cursor.position() as usize; - let payload = &buffer[header_len..n]; - - trace!("dynamic tunnel {} -> {}", peer_addr, target_addr); - - if let Err(err) = manager.send_to(peer_addr, target_addr.clone(), payload).await { + if let Err(err) = manager.send_to(peer_addr, packet.target_addr.clone(), payload).await { debug!( "udp packet relay {} -> {} with {} bytes failed, error: {}", - peer_addr, target_addr, payload.len(), err + peer_addr, packet.target_addr, payload.len(), err ); } }