diff --git a/.cargo/config.toml b/.cargo/config.toml index 49ba7442..84077c82 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,5 @@ [target.'cfg(all())'] -rustflags = ["-D", "warnings", "--cfg", "tokio_unstable"] +rustflags = ["-D", "warnings"] [target.'cfg(target_family = "windows")'] rustflags = ["-C", "target-feature=+crt-static"] diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b1c6e692..295e450d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,8 +35,8 @@ jobs: - name: Build docs run: cargo +nightly doc --lib --no-deps --all-features --document-private-items env: - RUSTDOCFLAGS: --cfg docsrs --cfg tokio_unstable -Dwarnings - RUSTFLAGS: --cfg tokio_unstable -Dwarnings + RUSTDOCFLAGS: --cfg docsrs -Dwarnings + RUSTFLAGS: -Dwarnings - name: Run doctests run: cargo test --doc diff --git a/Cargo.toml b/Cargo.toml index a6d2fa6a..cf8459dc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,4 @@ rstest_reuse = { version = "0.7", default-features = false } opt-level = 3 [workspace.lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = [ - 'cfg(tokio_unstable)', - 'cfg(coverage)', -] } +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage)'] } diff --git a/mtorrent-cli/README.md b/mtorrent-cli/README.md index 25f3603a..66cdb470 100644 --- a/mtorrent-cli/README.md +++ b/mtorrent-cli/README.md @@ -7,9 +7,7 @@ Lightweight CLI Bittorrent client in Rust. Blazingly fast, incredibly robust and # Installation Download the latest pre-compiled binary for Linux or Windows here: https://github.com/DanglingPointer/mtorrent/releases/latest -Alternatively, compile locally using the following commands: -- Linux: `RUSTFLAGS="--cfg=tokio_unstable" cargo install mtorrent-cli` -- Windows: `$env:RUSTFLAGS="--cfg=tokio_unstable"; cargo install mtorrent-cli` +Alternatively, compile locally and install using `cargo install mtorrent-cli`. # Features - Peer Wire Protocol over IPv4 and IPv6 diff --git a/mtorrent-core/Cargo.toml b/mtorrent-core/Cargo.toml index 846881a9..6db5b210 100644 --- a/mtorrent-core/Cargo.toml +++ b/mtorrent-core/Cargo.toml @@ -45,5 +45,4 @@ workspace = true [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"] -rustc-args = ["--cfg", "tokio_unstable"] +rustdoc-args = ["--cfg", "docsrs"] diff --git a/mtorrent-dht/Cargo.toml b/mtorrent-dht/Cargo.toml index 5312f9c7..ccb7f2f2 100644 --- a/mtorrent-dht/Cargo.toml +++ b/mtorrent-dht/Cargo.toml @@ -39,5 +39,4 @@ workspace = true [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"] -rustc-args = ["--cfg", "tokio_unstable"] +rustdoc-args = ["--cfg", "docsrs"] diff --git a/mtorrent-utils/Cargo.toml b/mtorrent-utils/Cargo.toml index 32834ad1..f0526786 100644 --- a/mtorrent-utils/Cargo.toml +++ b/mtorrent-utils/Cargo.toml @@ -42,5 +42,4 @@ workspace = true [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"] -rustc-args = ["--cfg", "tokio_unstable"] +rustdoc-args = ["--cfg", "docsrs"] diff --git a/mtorrent-utils/src/lib.rs b/mtorrent-utils/src/lib.rs index 2d2643b6..4953f492 100644 --- a/mtorrent-utils/src/lib.rs +++ b/mtorrent-utils/src/lib.rs @@ -28,8 +28,7 @@ pub mod peer_id; /// Trait for splitting a bidirectional stream into read/write halves. pub mod split_stream; -/// Logging stopwatch macros at various log levels. -pub mod stopwatch; +mod stopwatch; /// UPnP/IGD port forwarding helpers. pub mod upnp; diff --git a/mtorrent-utils/src/split_stream.rs b/mtorrent-utils/src/split_stream.rs index 2b4ddfe8..775c84e8 100644 --- a/mtorrent-utils/src/split_stream.rs +++ b/mtorrent-utils/src/split_stream.rs @@ -4,13 +4,19 @@ use tokio::net::{TcpStream, tcp}; /// Helper trait for efficiently splitting a stream into read and write halves. pub trait SplitStream: Unpin { + /// Associated type for the read half of the stream. These can be a non-owning + /// reference to avoid unnecessary heap allocations and reference counting. type Ingress<'i>: AsyncRead + Unpin where Self: 'i; + + /// Associated type for the write half of the stream. These can be a non-owning + /// reference to avoid unnecessary heap allocations and reference counting. type Egress<'e>: AsyncWrite + Unpin where Self: 'e; + /// Split the stream into non-owning read and write halves. fn split(&mut self) -> (Self::Ingress<'_>, Self::Egress<'_>); } diff --git a/mtorrent-utils/src/stopwatch.rs b/mtorrent-utils/src/stopwatch.rs index 4c9eae06..ea5716e5 100644 --- a/mtorrent-utils/src/stopwatch.rs +++ b/mtorrent-utils/src/stopwatch.rs @@ -1,3 +1,4 @@ +#[doc(hidden)] #[macro_export] macro_rules! debug_stopwatch { ($($arg:tt)+) => { @@ -5,6 +6,7 @@ macro_rules! debug_stopwatch { }; } +#[doc(hidden)] #[macro_export] macro_rules! trace_stopwatch { ($($arg:tt)+) => { @@ -12,6 +14,7 @@ macro_rules! trace_stopwatch { }; } +#[doc(hidden)] #[macro_export] macro_rules! info_stopwatch { ($($arg:tt)+) => { @@ -19,6 +22,7 @@ macro_rules! info_stopwatch { }; } +#[doc(hidden)] #[macro_export] macro_rules! warn_stopwatch { ($($arg:tt)+) => { @@ -26,6 +30,7 @@ macro_rules! warn_stopwatch { }; } +#[doc(hidden)] #[macro_export] macro_rules! error_stopwatch { ($($arg:tt)+) => { diff --git a/mtorrent-utils/src/upnp.rs b/mtorrent-utils/src/upnp.rs index 6071bdda..df77fad8 100644 --- a/mtorrent-utils/src/upnp.rs +++ b/mtorrent-utils/src/upnp.rs @@ -10,6 +10,9 @@ type BlockingGateway = igd_next::Gateway; pub use igd_next::PortMappingProtocol; +/// Utility for creating and maintaining a port mapping on the local gateway via UPnP. The mapping +/// is valid for `PORT_LEASE_DURATION_SEC` seconds, but automatic renewal can be enabled by calling +/// `run_continuous_renewal()`. The mapping is removed when the `PortOpener` is dropped. pub struct PortOpener { gateway: AsyncGateway, internal_addr: SocketAddr, @@ -19,12 +22,15 @@ pub struct PortOpener { impl PortOpener { /// Recommended lease duration from . - const LEASE_DURATION_SEC: u32 = 3600; + pub const PORT_LEASE_DURATION_SEC: u32 = 3600; - /// Create a port mapping on the local gateway and return a `PortOpener` that maintains it. + /// Create a TCP or UDP port mapping that will be valid for + /// `PORT_LEASE_DURATION_SEC` seconds and return a `PortOpener` that maintains it. /// - /// The mapping will be automatically removed when the `PortOpener` is dropped, but it will not - /// be renewed unless `run_continuous_renewal()` is called. + /// If `desired_external_port` is not specified, the gateway will assign an arbitrary external + /// port number. + /// If `interface` is not specified, the first active network adapter with a non-loopback IPv4 + /// address will be used. pub async fn new( proto: PortMappingProtocol, internal_port: u16, @@ -54,11 +60,13 @@ impl PortOpener { let public_ip = gateway.get_external_ip().await?; let public_port = if let Some(desired_port) = desired_external_port { gateway - .add_port(proto, desired_port, internal_addr, Self::LEASE_DURATION_SEC, "") + .add_port(proto, desired_port, internal_addr, Self::PORT_LEASE_DURATION_SEC, "") .await?; desired_port } else { - gateway.add_any_port(proto, internal_addr, Self::LEASE_DURATION_SEC, "").await? + gateway + .add_any_port(proto, internal_addr, Self::PORT_LEASE_DURATION_SEC, "") + .await? }; let external_addr = SocketAddr::new(public_ip, public_port); @@ -76,12 +84,12 @@ impl PortOpener { self.external_addr } - /// Start continuous renewal of the port mapping. The mapping will be automatically removed when - /// the returned future is dropped. - pub async fn run_continuous_renewal(self) -> igd_next::Result<()> { + /// Continuously renew the port mapping every `PORT_LEASE_DURATION_SEC` seconds. This function + /// never returns unless an error occurs. + pub async fn run_continuous_renewal(&mut self) -> igd_next::Result<()> { // renewal interval must be slightly higher than the lease duration because renewing a // mapping that hasn't expired yet has no effect - let renewal_interval = sec!(Self::LEASE_DURATION_SEC as u64) + millisec!(500); + let renewal_interval = sec!(Self::PORT_LEASE_DURATION_SEC as u64) + millisec!(500); loop { sleep(renewal_interval).await; @@ -90,7 +98,7 @@ impl PortOpener { self.proto, self.external_addr.port(), self.internal_addr, - Self::LEASE_DURATION_SEC, + Self::PORT_LEASE_DURATION_SEC, "", ) .await?; @@ -106,7 +114,7 @@ impl PortOpener { impl Drop for PortOpener { /// Remove the port mapping when the `PortOpener` is dropped. Note that this is a blocking - /// operation because [`AsyncDrop`](https://doc.rust-lang.org/std/future/trait.AsyncDrop.html) is experimental. + /// operation since [`AsyncDrop`](https://doc.rust-lang.org/std/future/trait.AsyncDrop.html) is still experimental. fn drop(&mut self) { let gateway = BlockingGateway { addr: self.gateway.addr, diff --git a/mtorrent-utils/src/worker.rs b/mtorrent-utils/src/worker.rs index 512de63c..243b8b7b 100644 --- a/mtorrent-utils/src/worker.rs +++ b/mtorrent-utils/src/worker.rs @@ -2,9 +2,7 @@ use std::sync::Arc; use std::time::Duration; use std::{io, thread}; use tokio::runtime; -use tokio::sync::Notify; -#[cfg(tokio_unstable)] -use tokio::sync::oneshot; +use tokio::sync::{Notify, oneshot}; pub mod simple { use super::*; @@ -152,8 +150,6 @@ pub fn with_runtime(config: rt::Config) -> io::Result { } /// Create a worker thread running a [`tokio::runtime::LocalRuntime`](https://docs.rs/tokio/latest/tokio/runtime/struct.LocalRuntime.html) with the specified configuration. -#[cfg(tokio_unstable)] -#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub fn with_local_runtime(config: rt::Config) -> io::Result { let mut builder = runtime::Builder::new_current_thread(); builder.max_blocking_threads(config.max_blocking_threads); diff --git a/mtorrent/Cargo.toml b/mtorrent/Cargo.toml index b92aff7e..37196d7f 100644 --- a/mtorrent/Cargo.toml +++ b/mtorrent/Cargo.toml @@ -37,5 +37,4 @@ workspace = true [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"] -rustc-args = ["--cfg", "tokio_unstable"] +rustdoc-args = ["--cfg", "docsrs"] diff --git a/mtorrent/src/app/dht.rs b/mtorrent/src/app/dht.rs index b28d0486..288f3b53 100644 --- a/mtorrent/src/app/dht.rs +++ b/mtorrent/src/app/dht.rs @@ -54,7 +54,7 @@ pub fn launch_dht_node_runtime(cfg: Config) -> io::Result<(worker::rt::Handle, d async fn start_upnp(local_port: u16, interface: Option<&str>) -> io::Result<()> { // try create a port mapping with the same port number - let port_opener = upnp::PortOpener::new( + let mut port_opener = upnp::PortOpener::new( upnp::PortMappingProtocol::UDP, local_port, Some(local_port), diff --git a/mtorrent/src/app/main.rs b/mtorrent/src/app/main.rs index 73dacffa..69fca92b 100644 --- a/mtorrent/src/app/main.rs +++ b/mtorrent/src/app/main.rs @@ -64,10 +64,10 @@ async fn start_upnp( internal_port: u16, desired_external_port: Option, proto: upnp::PortMappingProtocol, - interface: Option<&str>, + interface: Option, ) -> u16 { - let Ok(port_opener) = - upnp::PortOpener::new(proto, internal_port, desired_external_port, interface) + let Ok(mut port_opener) = + upnp::PortOpener::new(proto, internal_port, desired_external_port, interface.as_deref()) .await .inspect_err(|e| log::error!("UPnP: {proto:?} port mapping failed: {e}")) else { @@ -111,22 +111,21 @@ pub async fn single_torrent( // create port mappings and get external port to send correct listening port to trackers and // peers later let external_pwp_port = if cfg.use_upnp { - let _g = ctx.pwp_runtime.enter(); - let (_external_tcp_port, external_udp_port) = join!( - start_upnp( - internal_pwp_port, - cfg.pwp_port, - upnp::PortMappingProtocol::TCP, - cfg.bind_interface.as_deref() - ), - start_upnp( - internal_pwp_port, - cfg.pwp_port, - upnp::PortMappingProtocol::UDP, - cfg.bind_interface.as_deref() - ), - ); - external_udp_port + // UPnP tasks must be spawned on the pwp runtime + let tcp_task = ctx.pwp_runtime.spawn(start_upnp( + internal_pwp_port, + cfg.pwp_port, + upnp::PortMappingProtocol::TCP, + cfg.bind_interface.clone(), + )); + let utp_task = ctx.pwp_runtime.spawn(start_upnp( + internal_pwp_port, + cfg.pwp_port, + upnp::PortMappingProtocol::UDP, + cfg.bind_interface.clone(), + )); + let (_external_tcp_port, external_udp_port) = join!(tcp_task, utp_task); + external_udp_port.unwrap_or(internal_pwp_port) } else { internal_pwp_port }; diff --git a/mtorrent/src/ops/connections.rs b/mtorrent/src/ops/connections.rs index 4e68b414..9f63000b 100644 --- a/mtorrent/src/ops/connections.rs +++ b/mtorrent/src/ops/connections.rs @@ -450,7 +450,6 @@ mod tests { use std::future::pending; use std::net::Ipv4Addr; use std::sync::Arc; - use tokio::runtime::UnhandledPanic; use tokio::time::{sleep, sleep_until}; fn addr(i: u16) -> SocketAddr { @@ -463,7 +462,6 @@ mod tests { macro_rules! run_in_local_set { ($($tokens:tt)+) => {{ task::LocalSet::new() - .unhandled_panic(UnhandledPanic::ShutdownRuntime) .run_until(async move { $($tokens)+