diff --git a/crates/shadowsocks-service/src/config.rs b/crates/shadowsocks-service/src/config.rs index 8aa742b6558e..0f40c145cec5 100644 --- a/crates/shadowsocks-service/src/config.rs +++ b/crates/shadowsocks-service/src/config.rs @@ -1342,13 +1342,6 @@ impl LocalConfig { return Err(err); } } - #[cfg(feature = "local-tunnel")] - ProtocolType::Tunnel => { - if self.forward_addr.is_none() { - let err = Error::new(ErrorKind::MissingField, "missing `forward_addr` in configuration", None); - return Err(err); - } - } #[cfg(feature = "local-http")] ProtocolType::Http => { diff --git a/crates/shadowsocks-service/src/local/mod.rs b/crates/shadowsocks-service/src/local/mod.rs index f2095950b17a..ccccf3b5a72b 100644 --- a/crates/shadowsocks-service/src/local/mod.rs +++ b/crates/shadowsocks-service/src/local/mod.rs @@ -324,10 +324,12 @@ impl Server { None => return Err(io::Error::other("tunnel requires local address")), }; - let forward_addr = local_config.forward_addr.expect("tunnel requires forward address"); - - let mut server_builder = - TunnelBuilder::with_context(context.clone(), forward_addr.clone(), client_addr, balancer); + let mut server_builder = match local_config.forward_addr { + Some(forward_addr) => { + TunnelBuilder::with_context(context.clone(), forward_addr, client_addr, balancer) + } + None => TunnelBuilder::with_context_dynamic(context.clone(), client_addr, balancer), + }; if let Some(c) = config.udp_max_associations { server_builder.set_udp_capacity(c); diff --git a/crates/shadowsocks-service/src/local/tunnel/server.rs b/crates/shadowsocks-service/src/local/tunnel/server.rs index f25b276eec55..b0b6d141a195 100644 --- a/crates/shadowsocks-service/src/local/tunnel/server.rs +++ b/crates/shadowsocks-service/src/local/tunnel/server.rs @@ -14,7 +14,7 @@ use super::{ pub struct TunnelBuilder { context: Arc, - forward_addr: Address, + forward_addr: Option
, mode: Mode, udp_expiry_duration: Option, udp_capacity: Option, @@ -34,12 +34,32 @@ impl TunnelBuilder { Self::with_context(Arc::new(context), forward_addr, client_addr, balancer) } + /// Create a new dynamic Tunnel server + pub fn new_dynamic(client_addr: ServerAddr, balancer: PingBalancer) -> Self { + let context = ServiceContext::new(); + Self::with_context_dynamic(Arc::new(context), client_addr, balancer) + } + /// Create a new Tunnel server with context pub fn with_context( context: Arc, forward_addr: Address, client_addr: ServerAddr, balancer: PingBalancer, + ) -> Self { + Self::with_context_forward_addr(context, Some(forward_addr), client_addr, balancer) + } + + /// Create a new dynamic Tunnel server with context + pub fn with_context_dynamic(context: Arc, client_addr: ServerAddr, balancer: PingBalancer) -> Self { + Self::with_context_forward_addr(context, None, client_addr, balancer) + } + + fn with_context_forward_addr( + context: Arc, + forward_addr: Option
, + client_addr: ServerAddr, + balancer: PingBalancer, ) -> Self { Self { context, diff --git a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs index 55c410fce455..bdaaddd23e82 100644 --- a/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/tcprelay.rs @@ -17,7 +17,7 @@ pub struct TunnelTcpServerBuilder { context: Arc, client_config: ServerAddr, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, #[cfg(target_os = "macos")] launchd_socket_name: Option, } @@ -27,7 +27,7 @@ impl TunnelTcpServerBuilder { context: Arc, client_config: ServerAddr, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, ) -> Self { Self { context, @@ -79,7 +79,7 @@ pub struct TunnelTcpServer { context: Arc, listener: ShadowTcpListener, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, } impl TunnelTcpServer { @@ -90,9 +90,13 @@ impl TunnelTcpServer { /// Start serving pub async fn run(self) -> io::Result<()> { - info!("shadowsocks TCP tunnel listening on {}", self.listener.local_addr()?); + if let Some(ref addr) = self.forward_addr { + info!("shadowsocks TCP tunnel listening on {}, forward to {}", self.listener.local_addr()?, addr); + } else { + info!("shadowsocks TCP tunnel listening on {}, dynamic forward", self.listener.local_addr()?); + } - let forward_addr = Arc::new(self.forward_addr); + let forward_addr = self.forward_addr.map(Arc::new); loop { let (stream, peer_addr) = match self.listener.accept().await { Ok(s) => s, @@ -119,15 +123,29 @@ async fn handle_tcp_client( mut stream: TcpStream, balancer: PingBalancer, peer_addr: SocketAddr, - forward_addr: Arc
, + forward_addr: Option>, ) -> io::Result<()> { - let forward_addr: &Address = &forward_addr; + let owned_addr: Address; + let target_addr: &Address = match forward_addr { + Some(ref a) => a, + None => { + owned_addr = match Address::read_from(&mut stream).await { + Ok(addr) => addr, + Err(err) => { + error!("received invalid TCP tunnel connection from {}: {}", peer_addr, err); + return Err(io::Error::new(io::ErrorKind::InvalidData, format!("read target address: {}", err))); + } + }; + trace!("dynamic tunnel {} -> {}", peer_addr, owned_addr); + &owned_addr + } + }; if balancer.is_empty() { - trace!("establishing tcp tunnel {} <-> {} direct", peer_addr, forward_addr); + trace!("establishing tcp tunnel {} <-> {} direct", peer_addr, target_addr); - let mut remote = AutoProxyClientStream::connect_bypassed(context, forward_addr).await?; - return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, forward_addr).await; + let mut remote = AutoProxyClientStream::connect_bypassed(context, target_addr).await?; + return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, target_addr).await; } let server = balancer.best_tcp_server(); @@ -135,13 +153,13 @@ async fn handle_tcp_client( trace!( "establishing tcp tunnel {} <-> {} through server {} (outbound: {})", peer_addr, - forward_addr, + target_addr, svr_cfg.tcp_external_addr(), svr_cfg.addr(), ); let mut remote = - AutoProxyClientStream::connect_proxied_with_opts(context, &server, forward_addr, server.connect_opts_ref()) + AutoProxyClientStream::connect_proxied_with_opts(context, &server, target_addr, server.connect_opts_ref()) .await?; - establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, forward_addr).await + establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, target_addr).await } diff --git a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs index bb9166cef956..d8d096816274 100644 --- a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs @@ -1,18 +1,27 @@ //! UDP Tunnel server -use std::{io, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + io, + net::{IpAddr, SocketAddr}, + sync::Arc, + time::Duration, +}; -use log::{debug, error, info}; +use bytes::BytesMut; +use log::{debug, error, info, trace}; use shadowsocks::{ ServerAddr, relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE}, }; use tokio::{net::UdpSocket, time}; -use crate::local::{ - context::ServiceContext, - loadbalancing::PingBalancer, - net::{UdpAssociationManager, UdpInboundWrite, udp::listener::create_standard_udp_listener}, +use crate::{ + local::{ + context::ServiceContext, + loadbalancing::PingBalancer, + net::{UdpAssociationManager, UdpInboundWrite, udp::listener::create_standard_udp_listener}, + }, + net::utils::to_ipv4_mapped, }; pub struct TunnelUdpServerBuilder { @@ -21,7 +30,7 @@ pub struct TunnelUdpServerBuilder { time_to_live: Option, capacity: Option, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, #[cfg(target_os = "macos")] launchd_socket_name: Option, } @@ -33,7 +42,7 @@ impl TunnelUdpServerBuilder { time_to_live: Option, capacity: Option, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, ) -> Self { Self { context, @@ -84,23 +93,77 @@ impl TunnelUdpServerBuilder { } #[derive(Clone)] -struct TunnelUdpInboundWriter { +struct StaticTunnelUdpInboundWriter { inbound: Arc, } -impl UdpInboundWrite for TunnelUdpInboundWriter { +impl UdpInboundWrite for StaticTunnelUdpInboundWriter { async fn send_to(&self, peer_addr: SocketAddr, _remote_addr: &Address, data: &[u8]) -> io::Result<()> { self.inbound.send_to(data, peer_addr).await.map(|_| ()) } } +#[derive(Clone)] +struct DynamicTunnelUdpInboundWriter { + inbound: Arc, +} + +impl UdpInboundWrite for DynamicTunnelUdpInboundWriter { + async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()> { + let remote_addr = match remote_addr { + Address::SocketAddress(sa) => { + // Try to convert IPv4 mapped IPv6 address if server is running on dual-stack mode + let saddr = match *sa { + SocketAddr::V4(..) => *sa, + SocketAddr::V6(ref v6) => match to_ipv4_mapped(v6.ip()) { + Some(v4) => SocketAddr::new(IpAddr::from(v4), v6.port()), + None => *sa, + }, + }; + + Address::SocketAddress(saddr) + } + daddr => daddr.clone(), + }; + + let addr_len = remote_addr.serialized_len(); + let mut buf = BytesMut::with_capacity(addr_len + data.len()); + remote_addr.write_to_buf(&mut buf); + buf.extend_from_slice(data); + self.inbound.send_to(&buf, peer_addr).await.map(|_| ()) + } +} + +struct ParsedPacket { + target_addr: Address, + payload_start: usize, +} + +fn parse_dynamic_packet(peer_addr: SocketAddr, data: &[u8]) -> Option { + let mut cursor = io::Cursor::new(data); + let target_addr = match Address::read_cursor(&mut cursor) { + Ok(addr) => addr, + Err(err) => { + error!("received invalid UDP tunnel packet from {}: {}", peer_addr, err); + return None; + } + }; + + trace!("dynamic tunnel {} -> {}", peer_addr, target_addr); + + Some(ParsedPacket { + target_addr, + payload_start: cursor.position() as usize, + }) +} + pub struct TunnelUdpServer { context: Arc, time_to_live: Option, capacity: Option, listener: Arc, balancer: PingBalancer, - forward_addr: Address, + forward_addr: Option
, } impl TunnelUdpServer { @@ -110,14 +173,53 @@ impl TunnelUdpServer { } /// Start serving - pub async fn run(self) -> io::Result<()> { - info!("shadowsocks UDP tunnel listening on {}", self.listener.local_addr()?); + pub async fn run(mut self) -> io::Result<()> { + match self.forward_addr.take() { + Some(addr) => self.run_static(addr).await, + None => self.run_dynamic().await, + } + } + + async fn run_static(self, forward_addr: Address) -> io::Result<()> { + let mode_desc = format!("forward to {}", forward_addr); + let inbound = self.listener.clone(); + self.run_with_packet_parser( + StaticTunnelUdpInboundWriter { inbound }, + &mode_desc, + move |_peer_addr, _data| { + Some(ParsedPacket { + target_addr: forward_addr.clone(), + payload_start: 0, + }) + }, + ) + .await + } + + async fn run_dynamic(self) -> io::Result<()> { + let inbound = self.listener.clone(); + self.run_with_packet_parser( + DynamicTunnelUdpInboundWriter { inbound }, + "dynamic forward", + parse_dynamic_packet, + ) + .await + } + + async fn run_with_packet_parser(self, writer: W, mode_desc: &str, mut parse_packet: F) -> io::Result<()> + where + W: UdpInboundWrite + Clone + Send + Sync + Unpin + 'static, + F: FnMut(SocketAddr, &[u8]) -> Option + Send, + { + info!( + "shadowsocks UDP tunnel listening on {}, {}", + self.listener.local_addr()?, + mode_desc + ); let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new( self.context.clone(), - TunnelUdpInboundWriter { - inbound: self.listener.clone(), - }, + writer, self.time_to_live, self.capacity, self.balancer, @@ -160,15 +262,16 @@ impl TunnelUdpServer { } let data = &buffer[..n]; - if let Err(err) = manager.send_to(peer_addr, self.forward_addr.clone(), data) - .await - { + let packet = match parse_packet(peer_addr, data) { + Some(packet) => packet, + None => continue, + }; + let payload = &data[packet.payload_start..]; + + if let Err(err) = manager.send_to(peer_addr, packet.target_addr.clone(), payload).await { debug!( "udp packet relay {} -> {} with {} bytes failed, error: {}", - peer_addr, - self.forward_addr, - data.len(), - err + peer_addr, packet.target_addr, payload.len(), err ); } } diff --git a/crates/shadowsocks/src/relay/socks5.rs b/crates/shadowsocks/src/relay/socks5.rs index 8aa21724ae74..b2a3ff08fe84 100644 --- a/crates/shadowsocks/src/relay/socks5.rs +++ b/crates/shadowsocks/src/relay/socks5.rs @@ -236,7 +236,8 @@ impl Address { } consts::SOCKS5_ADDR_TYPE_DOMAIN_NAME => { let domain_len = cur.get_u8() as usize; - if cur.remaining() < domain_len { + // domain_len + 2: leave room for PORT, otherwise get_u16() below panics. + if cur.remaining() < domain_len + 2 { return Err(Error::AddressDomainInvalidEncoding); } let mut buf = vec![0u8; domain_len]; diff --git a/tests/tunnel.rs b/tests/tunnel.rs index c4f5a10c7bf3..4e11a6af016d 100644 --- a/tests/tunnel.rs +++ b/tests/tunnel.rs @@ -163,3 +163,161 @@ async fn udp_tunnel() { assert_eq!(MESSAGE, recv_payload); } + +#[tokio::test] +async fn tcp_dynamic_tunnel() { + let _ = env_logger::try_init(); + + let (local_port, server_port) = random_local_tcp_port_pair(); + let local_config = Config::load_from_str( + &format!( + r#"{{ + "locals": [ + {{ + "local_port": {local_port}, + "local_address": "127.0.0.1", + "protocol": "tunnel" + }} + ], + "server": "127.0.0.1", + "server_port": {server_port}, + "password": "password", + "method": "aes-256-gcm" + }}"# + ), + ConfigType::Local, + ) + .unwrap(); + + let server_config = Config::load_from_str( + &format!( + r#"{{ + "server": "127.0.0.1", + "server_port": {server_port}, + "password": "password", + "method": "aes-256-gcm" + }}"# + ), + ConfigType::Server, + ) + .unwrap(); + + tokio::spawn(run_local(local_config)); + tokio::spawn(run_server(server_config)); + + time::sleep(Duration::from_secs(5)).await; + + // Dynamic tunnel: prepend ATYP+DOMAIN+PORT header on the TCP stream. + let mut stream = TcpStream::connect(("127.0.0.1", local_port)).await.unwrap(); + + const HOST: &[u8] = b"detectportal.firefox.com"; + let mut header = Vec::with_capacity(2 + HOST.len() + 2); + header.push(0x03); // ATYP = DOMAIN + header.push(HOST.len() as u8); + header.extend_from_slice(HOST); + header.extend_from_slice(&80u16.to_be_bytes()); + stream.write_all(&header).await.unwrap(); + + let req = b"GET /success.txt HTTP/1.0\r\nHost: detectportal.firefox.com\r\nAccept: */*\r\n\r\n"; + stream.write_all(req).await.unwrap(); + stream.flush().await.unwrap(); + + let mut r = BufReader::new(stream); + + let mut buf = Vec::new(); + r.read_until(b'\n', &mut buf).await.unwrap(); + + let http_status = b"HTTP/1.0 200 OK\r\n"; + assert!(buf.starts_with(http_status)); +} + +#[tokio::test] +async fn udp_dynamic_tunnel() { + let _ = env_logger::try_init(); + + let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let socket_local_addr = socket.local_addr().unwrap(); + let echo_server_port = socket_local_addr.port(); + + // A UDP echo server + tokio::spawn(async move { + debug!("UDP echo server listening on {socket_local_addr}"); + + let mut buffer = [0u8; 65536]; + loop { + let (n, peer_addr) = socket.recv_from(&mut buffer).await.unwrap(); + debug!("UDP echo server received {} bytes from {}, echoing", n, peer_addr); + socket.send_to(&buffer[..n], peer_addr).await.unwrap(); + } + }); + + time::sleep(Duration::from_secs(1)).await; + + let (local_port, server_port) = random_local_tcp_port_pair(); + let local_config = Config::load_from_str( + &format!( + r#"{{ + "locals": [ + {{ + "local_port": {local_port}, + "local_address": "127.0.0.1", + "protocol": "tunnel" + }} + ], + "server": "127.0.0.1", + "server_port": {server_port}, + "password": "password", + "method": "aes-256-gcm", + "mode": "tcp_and_udp" + }}"# + ), + ConfigType::Local, + ) + .unwrap(); + + let server_config = Config::load_from_str( + &format!( + r#"{{ + "server": "127.0.0.1", + "server_port": {server_port}, + "password": "password", + "method": "aes-256-gcm", + "mode": "udp_only" + }}"# + ), + ConfigType::Server, + ) + .unwrap(); + + tokio::spawn(run_local(local_config)); + tokio::spawn(run_server(server_config)); + + time::sleep(Duration::from_secs(5)).await; + + const MESSAGE: &[u8] = b"hello shadowsocks"; + + let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); + + // Malformed header (ATYP=DOMAIN, LEN=3, "abc", missing 2-byte PORT) must + // be rejected without taking down the dynamic UDP listener. + let malformed: &[u8] = &[0x03, 0x03, b'a', b'b', b'c']; + socket.send_to(malformed, ("127.0.0.1", local_port)).await.unwrap(); + + // Dynamic tunnel: prepend ATYP+IPv4+PORT header on each UDP packet. + let port_be = echo_server_port.to_be_bytes(); + let mut packet = Vec::with_capacity(7 + MESSAGE.len()); + packet.extend_from_slice(&[0x01, 127, 0, 0, 1, port_be[0], port_be[1]]); + packet.extend_from_slice(MESSAGE); + + socket.send_to(&packet, ("127.0.0.1", local_port)).await.unwrap(); + + let mut buf = vec![0u8; 65536]; + let n = socket.recv(&mut buf).await.unwrap(); + + let recv_payload = &buf[..n]; + println!("Got reply from server: {:?}", ByteStr::new(recv_payload)); + + // Response is prefixed with the same ATYP+ADDR+PORT header (7 bytes for IPv4). + assert_eq!(recv_payload[0], 0x01); + assert_eq!(&recv_payload[7..], MESSAGE); +}