Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions mtorrent-core/src/trackers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ mod http;
mod udp;
mod url;

use futures_util::TryFutureExt;
use local_async_utils::sec;
use mtorrent_utils::net;
use mtorrent_utils::peer_id::PeerId;
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::Duration;
use std::{io, iter};
use tokio::net::{UdpSocket, lookup_host};
use tokio::net::lookup_host;
use tokio::sync::{mpsc, oneshot};
use tokio::task;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -336,17 +335,14 @@ async fn new_udp_client(
local_ipv4: Ipv4Addr,
local_ipv6: Ipv6Addr,
) -> io::Result<udp::TrackerConnection> {
async fn bind_and_connect_socket(
async fn bind_and_connect(
bind_addr: &SocketAddr,
remote_addr: &SocketAddr,
interface: Option<&str>,
) -> io::Result<UdpSocket> {
let socket = UdpSocket::bind(bind_addr).await?;
if let Some(iface) = interface {
net::bind_to_interface(&socket, iface)?;
}
) -> io::Result<udp::TrackerConnection> {
let socket = net::bound_udp_socket(*bind_addr, interface)?;
socket.connect(&remote_addr).await?;
Ok(socket)
udp::TrackerConnection::from_connected_socket(socket).await
}

for tracker_addr in lookup_host(tracker_addr_str).await? {
Expand All @@ -355,10 +351,7 @@ async fn new_udp_client(
SocketAddr::V6(_) => local_ipv6.into(),
};
let local_addr = SocketAddr::new(local_ip, 0);
if let Ok(client) = bind_and_connect_socket(&local_addr, &tracker_addr, interface)
.and_then(udp::TrackerConnection::from_connected_socket)
.await
{
if let Ok(client) = bind_and_connect(&local_addr, &tracker_addr, interface).await {
return Ok(client);
}
}
Expand Down Expand Up @@ -427,6 +420,7 @@ async fn do_udp_scrape(
#[cfg(test)]
mod tests {
use super::*;
use tokio::net::UdpSocket;
use tokio::time;

fn init_loopback() -> (Client, Manager) {
Expand Down
81 changes: 77 additions & 4 deletions mtorrent-utils/src/net.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use bytes::Buf;
use socket2::SockRef;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::time::Duration;
use std::{io, ops};
use tokio::net::{TcpSocket, UdpSocket};

#[cfg(not(windows))]
pub(crate) fn get_local_addr(mut predicate: impl FnMut(&IpAddr) -> bool) -> Option<IpAddr> {
Expand Down Expand Up @@ -101,6 +103,77 @@ pub fn get_bind_addr_v6(interface: Option<&str>) -> Ipv6Addr {

// ------------------------------------------------------------------------------------------------

/// Create a UDP socket bound to the specified local address and network interface (if any).
pub fn bound_udp_socket(local_addr: SocketAddr, interface: Option<&str>) -> io::Result<UdpSocket> {
let socket = socket2::Socket::new(
match local_addr {
SocketAddr::V4(_) => socket2::Domain::IPV4,
SocketAddr::V6(_) => socket2::Domain::IPV6,
},
socket2::Type::DGRAM,
None,
)?;
if local_addr.is_ipv6() {
socket.set_only_v6(true)?;
}
socket.set_nonblocking(true)?;
if let Some(interface) = interface {
bind_to_interface(&socket, interface)?;
}
socket.bind(&local_addr.into())?;
let std_socket = std::net::UdpSocket::from(socket);
UdpSocket::from_std(std_socket)
}

/// Create a TCP socket bound to the specified local address and network interface (if any).
/// The following socket options are set on the created socket:
/// - SO_REUSEADDR (on all platforms) and SO_REUSEPORT (on Linux)
/// - SO_LINGER with 0 timeout, to avoid putting socket into TIME_WAIT when disconnecting someone
/// - TCP_NODELAY
pub fn bound_tcp_socket(local_addr: SocketAddr, interface: Option<&str>) -> io::Result<TcpSocket> {
let socket = socket2::Socket::new(
match local_addr {
SocketAddr::V4(_) => socket2::Domain::IPV4,
SocketAddr::V6(_) => socket2::Domain::IPV6,
},
socket2::Type::STREAM,
None,
)?;
if local_addr.is_ipv6() {
socket.set_only_v6(true)?;
}

// To use the same local addr and port for outgoing PWP connections and for TCP listener,
// (in order to deal with endpoint-independent NAT mappings, https://www.rfc-editor.org/rfc/rfc5128#section-2.3)
// we need to set SO_REUSEADDR on Windows, and SO_REUSEADDR and SO_REUSEPORT on Linux.
// See https://stackoverflow.com/a/14388707/4432988 for details.
socket.set_reuse_address(true)?;
#[cfg(not(windows))]
socket.set_reuse_port(true)?;
// To avoid putting socket into TIME_WAIT when disconnecting someone, enable SO_LINGER with 0
// timeout See https://stackoverflow.com/a/71975993
socket.set_linger(Some(Duration::ZERO))?;
socket.set_tcp_nodelay(true)?;

socket.set_nonblocking(true)?;
if let Some(interface) = interface {
bind_to_interface(&socket, interface)?;
}
socket.bind(&local_addr.into())?;
#[cfg(any(unix, all(target_os = "wasi", not(target_env = "p1"))))]
unsafe {
use std::os::fd::{FromRawFd, IntoRawFd};
Ok(FromRawFd::from_raw_fd(socket.into_raw_fd()))
}
#[cfg(windows)]
unsafe {
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
Ok(FromRawSocket::from_raw_socket(socket.into_raw_socket()))
}
}

// ------------------------------------------------------------------------------------------------

#[doc(hidden)]
pub fn set_so_sndbuf_internal<'s>(socket: impl Into<SockRef<'s>>, value: usize, module: &str) {
if let Err(e) = socket.into().set_send_buffer_size(value) {
Expand Down Expand Up @@ -135,14 +208,14 @@ macro_rules! set_so_rcvbuf {

/// Bind a socket to a specific network interface. Does nothing on Windows.
#[cfg(target_os = "windows")]
pub fn bind_to_interface<'s>(_socket: impl Into<SockRef<'s>>, _interface: &str) -> io::Result<()> {
fn bind_to_interface<'s>(_socket: impl Into<SockRef<'s>>, _interface: &str) -> io::Result<()> {
// not supported on Windows
Ok(())
}

/// Bind a socket to a specific network interface.
#[cfg(any(target_os = "android", target_os = "fuchsia", target_os = "linux"))]
pub fn bind_to_interface<'s>(socket: impl Into<SockRef<'s>>, interface: &str) -> io::Result<()> {
fn bind_to_interface<'s>(socket: impl Into<SockRef<'s>>, interface: &str) -> io::Result<()> {
let socket = socket.into();

socket.bind_device(Some(interface.as_bytes()))?;
Expand All @@ -159,7 +232,7 @@ pub fn bind_to_interface<'s>(socket: impl Into<SockRef<'s>>, interface: &str) ->
target_os = "visionos",
target_os = "watchos",
))]
pub fn bind_to_interface<'s>(socket: impl Into<SockRef<'s>>, interface: &str) -> io::Result<()> {
fn bind_to_interface<'s>(socket: impl Into<SockRef<'s>>, interface: &str) -> io::Result<()> {
let socket = socket.into();

let interface = std::ffi::CString::new(interface)?;
Expand Down
4 changes: 2 additions & 2 deletions mtorrent-utils/src/upnp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl PortOpener {
}

/// Get the external socket address that was mapped to the internal port.
pub fn external_ip(&self) -> SocketAddr {
pub fn external_addr(&self) -> SocketAddr {
self.external_addr
}

Expand Down Expand Up @@ -146,7 +146,7 @@ mod tests {
let port_opener = PortOpener::new(PortMappingProtocol::TCP, internal_port, None, None)
.await
.unwrap_or_else(|e| panic!("Failed to create PortOpener: {e}"));
log::info!("port opener created, external ip: {}", port_opener.external_ip());
log::info!("port opener created, external ip: {}", port_opener.external_addr());
time::sleep(sec!(1)).await;
drop(port_opener);
log::info!("port opener dropped");
Expand Down
29 changes: 10 additions & 19 deletions mtorrent/src/app/dht.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use mtorrent_dht as dht;
use mtorrent_utils::{info_stopwatch, net, upnp, worker};
use std::io;
use std::net::SocketAddrV4;
use std::path::PathBuf;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::{join, task};

/// Startup configuration for the DHT system.
Expand Down Expand Up @@ -65,7 +63,7 @@ async fn start_upnp(local_port: u16, interface: Option<&str>) -> io::Result<()>
.await
.map_err(io::Error::other)?;

log::info!("UPnP for DHT succeeded, public ip: {}", port_opener.external_ip());
log::info!("UPnP for DHT succeeded, public ip: {}", port_opener.external_addr());

// start periodic renewal of port mapping. It will stop and remove the mapping
// automatically once the DHT runtime shuts down
Expand All @@ -90,22 +88,15 @@ async fn dht_main(
) {
let _sw = info_stopwatch!("DHT");

let socket = match UdpSocket::bind(SocketAddrV4::new(
net::get_bind_addr_v4(bind_interface.as_deref()),
local_port,
))
.await
{
Err(e) => return log::error!("Failed to create a UDP socket for DHT: {e}"),
Ok(socket) => socket,
};

if let Some(interface) = &bind_interface
&& let Err(e) = net::bind_to_interface(&socket, interface)
{
log::error!("Failed to bind DHT UDP socket to interface {interface}: {e}");
return;
}
let local_ipv4 = net::get_bind_addr_v4(bind_interface.as_deref());
let socket =
match net::bound_udp_socket((local_ipv4, local_port).into(), bind_interface.as_deref()) {
Err(e) => {
log::error!("Failed to create a UDP socket for DHT: {e}");
return;
}
Ok(socket) => socket,
};

if use_upnp && let Err(e) = start_upnp(local_port, bind_interface.as_deref()).await {
log::error!("UPnP for DHT failed: {e}");
Expand Down
Loading
Loading