Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions crates/shadowsocks-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1342,13 +1342,6 @@ impl LocalConfig {
return Err(err);
}
}
#[cfg(feature = "local-tunnel")]
ProtocolType::Tunnel => {
if self.forward_addr.is_none() {
let err = Error::new(ErrorKind::MissingField, "missing `forward_addr` in configuration", None);
return Err(err);
}
}

#[cfg(feature = "local-http")]
ProtocolType::Http => {
Expand Down
10 changes: 6 additions & 4 deletions crates/shadowsocks-service/src/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,12 @@ impl Server {
None => return Err(io::Error::other("tunnel requires local address")),
};

let forward_addr = local_config.forward_addr.expect("tunnel requires forward address");

let mut server_builder =
TunnelBuilder::with_context(context.clone(), forward_addr.clone(), client_addr, balancer);
let mut server_builder = match local_config.forward_addr {
Some(forward_addr) => {
TunnelBuilder::with_context(context.clone(), forward_addr, client_addr, balancer)
}
None => TunnelBuilder::with_context_dynamic(context.clone(), client_addr, balancer),
};

if let Some(c) = config.udp_max_associations {
server_builder.set_udp_capacity(c);
Expand Down
22 changes: 21 additions & 1 deletion crates/shadowsocks-service/src/local/tunnel/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::{

pub struct TunnelBuilder {
context: Arc<ServiceContext>,
forward_addr: Address,
forward_addr: Option<Address>,
mode: Mode,
udp_expiry_duration: Option<Duration>,
udp_capacity: Option<usize>,
Expand All @@ -34,12 +34,32 @@ impl TunnelBuilder {
Self::with_context(Arc::new(context), forward_addr, client_addr, balancer)
}

/// Create a new dynamic Tunnel server
pub fn new_dynamic(client_addr: ServerAddr, balancer: PingBalancer) -> Self {
let context = ServiceContext::new();
Self::with_context_dynamic(Arc::new(context), client_addr, balancer)
}

/// Create a new Tunnel server with context
pub fn with_context(
context: Arc<ServiceContext>,
forward_addr: Address,
client_addr: ServerAddr,
balancer: PingBalancer,
) -> Self {
Self::with_context_forward_addr(context, Some(forward_addr), client_addr, balancer)
}

/// Create a new dynamic Tunnel server with context
pub fn with_context_dynamic(context: Arc<ServiceContext>, client_addr: ServerAddr, balancer: PingBalancer) -> Self {
Self::with_context_forward_addr(context, None, client_addr, balancer)
}

fn with_context_forward_addr(
context: Arc<ServiceContext>,
forward_addr: Option<Address>,
client_addr: ServerAddr,
balancer: PingBalancer,
) -> Self {
Self {
context,
Expand Down
44 changes: 31 additions & 13 deletions crates/shadowsocks-service/src/local/tunnel/tcprelay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct TunnelTcpServerBuilder {
context: Arc<ServiceContext>,
client_config: ServerAddr,
balancer: PingBalancer,
forward_addr: Address,
forward_addr: Option<Address>,
#[cfg(target_os = "macos")]
launchd_socket_name: Option<String>,
}
Expand All @@ -27,7 +27,7 @@ impl TunnelTcpServerBuilder {
context: Arc<ServiceContext>,
client_config: ServerAddr,
balancer: PingBalancer,
forward_addr: Address,
forward_addr: Option<Address>,
) -> Self {
Self {
context,
Expand Down Expand Up @@ -79,7 +79,7 @@ pub struct TunnelTcpServer {
context: Arc<ServiceContext>,
listener: ShadowTcpListener,
balancer: PingBalancer,
forward_addr: Address,
forward_addr: Option<Address>,
}

impl TunnelTcpServer {
Expand All @@ -90,9 +90,13 @@ impl TunnelTcpServer {

/// Start serving
pub async fn run(self) -> io::Result<()> {
info!("shadowsocks TCP tunnel listening on {}", self.listener.local_addr()?);
if let Some(ref addr) = self.forward_addr {
info!("shadowsocks TCP tunnel listening on {}, forward to {}", self.listener.local_addr()?, addr);
} else {
info!("shadowsocks TCP tunnel listening on {}, dynamic forward", self.listener.local_addr()?);
}

let forward_addr = Arc::new(self.forward_addr);
let forward_addr = self.forward_addr.map(Arc::new);
loop {
let (stream, peer_addr) = match self.listener.accept().await {
Ok(s) => s,
Expand All @@ -119,29 +123,43 @@ async fn handle_tcp_client(
mut stream: TcpStream,
balancer: PingBalancer,
peer_addr: SocketAddr,
forward_addr: Arc<Address>,
forward_addr: Option<Arc<Address>>,
) -> io::Result<()> {
let forward_addr: &Address = &forward_addr;
let owned_addr: Address;
let target_addr: &Address = match forward_addr {
Some(ref a) => a,
None => {
owned_addr = match Address::read_from(&mut stream).await {
Ok(addr) => addr,
Err(err) => {
error!("received invalid TCP tunnel connection from {}: {}", peer_addr, err);
return Err(io::Error::new(io::ErrorKind::InvalidData, format!("read target address: {}", err)));
}
};
trace!("dynamic tunnel {} -> {}", peer_addr, owned_addr);
&owned_addr
}
};

if balancer.is_empty() {
trace!("establishing tcp tunnel {} <-> {} direct", peer_addr, forward_addr);
trace!("establishing tcp tunnel {} <-> {} direct", peer_addr, target_addr);

let mut remote = AutoProxyClientStream::connect_bypassed(context, forward_addr).await?;
return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, forward_addr).await;
let mut remote = AutoProxyClientStream::connect_bypassed(context, target_addr).await?;
return establish_tcp_tunnel_bypassed(&mut stream, &mut remote, peer_addr, target_addr).await;
}

let server = balancer.best_tcp_server();
let svr_cfg = server.server_config();
trace!(
"establishing tcp tunnel {} <-> {} through server {} (outbound: {})",
peer_addr,
forward_addr,
target_addr,
svr_cfg.tcp_external_addr(),
svr_cfg.addr(),
);

let mut remote =
AutoProxyClientStream::connect_proxied_with_opts(context, &server, forward_addr, server.connect_opts_ref())
AutoProxyClientStream::connect_proxied_with_opts(context, &server, target_addr, server.connect_opts_ref())
.await?;
establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, forward_addr).await
establish_tcp_tunnel(svr_cfg, &mut stream, &mut remote, peer_addr, target_addr).await
}
149 changes: 126 additions & 23 deletions crates/shadowsocks-service/src/local/tunnel/udprelay.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
//! UDP Tunnel server

use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use std::{
io,
net::{IpAddr, SocketAddr},
sync::Arc,
time::Duration,
};

use log::{debug, error, info};
use bytes::BytesMut;
use log::{debug, error, info, trace};
use shadowsocks::{
ServerAddr,
relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE},
};
use tokio::{net::UdpSocket, time};

use crate::local::{
context::ServiceContext,
loadbalancing::PingBalancer,
net::{UdpAssociationManager, UdpInboundWrite, udp::listener::create_standard_udp_listener},
use crate::{
local::{
context::ServiceContext,
loadbalancing::PingBalancer,
net::{UdpAssociationManager, UdpInboundWrite, udp::listener::create_standard_udp_listener},
},
net::utils::to_ipv4_mapped,
};

pub struct TunnelUdpServerBuilder {
Expand All @@ -21,7 +30,7 @@ pub struct TunnelUdpServerBuilder {
time_to_live: Option<Duration>,
capacity: Option<usize>,
balancer: PingBalancer,
forward_addr: Address,
forward_addr: Option<Address>,
#[cfg(target_os = "macos")]
launchd_socket_name: Option<String>,
}
Expand All @@ -33,7 +42,7 @@ impl TunnelUdpServerBuilder {
time_to_live: Option<Duration>,
capacity: Option<usize>,
balancer: PingBalancer,
forward_addr: Address,
forward_addr: Option<Address>,
) -> Self {
Self {
context,
Expand Down Expand Up @@ -84,23 +93,77 @@ impl TunnelUdpServerBuilder {
}

#[derive(Clone)]
struct TunnelUdpInboundWriter {
struct StaticTunnelUdpInboundWriter {
inbound: Arc<UdpSocket>,
}

impl UdpInboundWrite for TunnelUdpInboundWriter {
impl UdpInboundWrite for StaticTunnelUdpInboundWriter {
async fn send_to(&self, peer_addr: SocketAddr, _remote_addr: &Address, data: &[u8]) -> io::Result<()> {
self.inbound.send_to(data, peer_addr).await.map(|_| ())
}
}

#[derive(Clone)]
struct DynamicTunnelUdpInboundWriter {
inbound: Arc<UdpSocket>,
}

impl UdpInboundWrite for DynamicTunnelUdpInboundWriter {
async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()> {
let remote_addr = match remote_addr {
Address::SocketAddress(sa) => {
// Try to convert IPv4 mapped IPv6 address if server is running on dual-stack mode
let saddr = match *sa {
SocketAddr::V4(..) => *sa,
SocketAddr::V6(ref v6) => match to_ipv4_mapped(v6.ip()) {
Some(v4) => SocketAddr::new(IpAddr::from(v4), v6.port()),
None => *sa,
},
};

Address::SocketAddress(saddr)
}
daddr => daddr.clone(),
};

let addr_len = remote_addr.serialized_len();
let mut buf = BytesMut::with_capacity(addr_len + data.len());
remote_addr.write_to_buf(&mut buf);
buf.extend_from_slice(data);
self.inbound.send_to(&buf, peer_addr).await.map(|_| ())
}
}

struct ParsedPacket {
target_addr: Address,
payload_start: usize,
}

fn parse_dynamic_packet(peer_addr: SocketAddr, data: &[u8]) -> Option<ParsedPacket> {
let mut cursor = io::Cursor::new(data);
let target_addr = match Address::read_cursor(&mut cursor) {
Ok(addr) => addr,
Err(err) => {
error!("received invalid UDP tunnel packet from {}: {}", peer_addr, err);
return None;
}
};

trace!("dynamic tunnel {} -> {}", peer_addr, target_addr);

Some(ParsedPacket {
target_addr,
payload_start: cursor.position() as usize,
})
}

pub struct TunnelUdpServer {
context: Arc<ServiceContext>,
time_to_live: Option<Duration>,
capacity: Option<usize>,
listener: Arc<UdpSocket>,
balancer: PingBalancer,
forward_addr: Address,
forward_addr: Option<Address>,
}

impl TunnelUdpServer {
Expand All @@ -110,14 +173,53 @@ impl TunnelUdpServer {
}

/// Start serving
pub async fn run(self) -> io::Result<()> {
info!("shadowsocks UDP tunnel listening on {}", self.listener.local_addr()?);
pub async fn run(mut self) -> io::Result<()> {
match self.forward_addr.take() {
Some(addr) => self.run_static(addr).await,
None => self.run_dynamic().await,
}
}

async fn run_static(self, forward_addr: Address) -> io::Result<()> {
let mode_desc = format!("forward to {}", forward_addr);
let inbound = self.listener.clone();
self.run_with_packet_parser(
StaticTunnelUdpInboundWriter { inbound },
&mode_desc,
move |_peer_addr, _data| {
Some(ParsedPacket {
target_addr: forward_addr.clone(),
payload_start: 0,
})
},
)
.await
}

async fn run_dynamic(self) -> io::Result<()> {
let inbound = self.listener.clone();
self.run_with_packet_parser(
DynamicTunnelUdpInboundWriter { inbound },
"dynamic forward",
parse_dynamic_packet,
)
.await
}

async fn run_with_packet_parser<W, F>(self, writer: W, mode_desc: &str, mut parse_packet: F) -> io::Result<()>
where
W: UdpInboundWrite + Clone + Send + Sync + Unpin + 'static,
F: FnMut(SocketAddr, &[u8]) -> Option<ParsedPacket> + Send,
{
info!(
"shadowsocks UDP tunnel listening on {}, {}",
self.listener.local_addr()?,
mode_desc
);

let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new(
self.context.clone(),
TunnelUdpInboundWriter {
inbound: self.listener.clone(),
},
writer,
self.time_to_live,
self.capacity,
self.balancer,
Expand Down Expand Up @@ -160,15 +262,16 @@ impl TunnelUdpServer {
}

let data = &buffer[..n];
if let Err(err) = manager.send_to(peer_addr, self.forward_addr.clone(), data)
.await
{
let packet = match parse_packet(peer_addr, data) {
Some(packet) => packet,
None => continue,
};
let payload = &data[packet.payload_start..];

if let Err(err) = manager.send_to(peer_addr, packet.target_addr.clone(), payload).await {
debug!(
"udp packet relay {} -> {} with {} bytes failed, error: {}",
peer_addr,
self.forward_addr,
data.len(),
err
peer_addr, packet.target_addr, payload.len(), err
);
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/shadowsocks/src/relay/socks5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ impl Address {
}
consts::SOCKS5_ADDR_TYPE_DOMAIN_NAME => {
let domain_len = cur.get_u8() as usize;
if cur.remaining() < domain_len {
// domain_len + 2: leave room for PORT, otherwise get_u16() below panics.
if cur.remaining() < domain_len + 2 {
return Err(Error::AddressDomainInvalidEncoding);
}
let mut buf = vec![0u8; domain_len];
Expand Down
Loading
Loading