Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[target.'cfg(all())']
rustflags = ["-D", "warnings", "--cfg", "tokio_unstable"]
rustflags = ["-D", "warnings"]

[target.'cfg(target_family = "windows")']
rustflags = ["-C", "target-feature=+crt-static"]
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ jobs:
- name: Build docs
run: cargo +nightly doc --lib --no-deps --all-features --document-private-items
env:
RUSTDOCFLAGS: --cfg docsrs --cfg tokio_unstable -Dwarnings
RUSTFLAGS: --cfg tokio_unstable -Dwarnings
RUSTDOCFLAGS: --cfg docsrs -Dwarnings
RUSTFLAGS: -Dwarnings
- name: Run doctests
run: cargo test --doc

Expand Down
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,4 @@ rstest_reuse = { version = "0.7", default-features = false }
opt-level = 3

[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(tokio_unstable)',
'cfg(coverage)',
] }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage)'] }
4 changes: 1 addition & 3 deletions mtorrent-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ Lightweight CLI Bittorrent client in Rust. Blazingly fast, incredibly robust and
# Installation
Download the latest pre-compiled binary for Linux or Windows here: https://github.com/DanglingPointer/mtorrent/releases/latest

Alternatively, compile locally using the following commands:
- Linux: `RUSTFLAGS="--cfg=tokio_unstable" cargo install mtorrent-cli`
- Windows: `$env:RUSTFLAGS="--cfg=tokio_unstable"; cargo install mtorrent-cli`
Alternatively, compile locally and install using `cargo install mtorrent-cli`.

# Features
- Peer Wire Protocol over IPv4 and IPv6
Expand Down
3 changes: 1 addition & 2 deletions mtorrent-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,4 @@ workspace = true

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"]
rustc-args = ["--cfg", "tokio_unstable"]
rustdoc-args = ["--cfg", "docsrs"]
3 changes: 1 addition & 2 deletions mtorrent-dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ workspace = true

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"]
rustc-args = ["--cfg", "tokio_unstable"]
rustdoc-args = ["--cfg", "docsrs"]
3 changes: 1 addition & 2 deletions mtorrent-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,4 @@ workspace = true

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"]
rustc-args = ["--cfg", "tokio_unstable"]
rustdoc-args = ["--cfg", "docsrs"]
3 changes: 1 addition & 2 deletions mtorrent-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ pub mod peer_id;
/// Trait for splitting a bidirectional stream into read/write halves.
pub mod split_stream;

/// Logging stopwatch macros at various log levels.
pub mod stopwatch;
mod stopwatch;

/// UPnP/IGD port forwarding helpers.
pub mod upnp;
Expand Down
6 changes: 6 additions & 0 deletions mtorrent-utils/src/split_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ use tokio::net::{TcpStream, tcp};

/// Helper trait for efficiently splitting a stream into read and write halves.
pub trait SplitStream: Unpin {
/// Associated type for the read half of the stream. These can be a non-owning
/// reference to avoid unnecessary heap allocations and reference counting.
type Ingress<'i>: AsyncRead + Unpin
where
Self: 'i;

/// Associated type for the write half of the stream. These can be a non-owning
/// reference to avoid unnecessary heap allocations and reference counting.
type Egress<'e>: AsyncWrite + Unpin
where
Self: 'e;

/// Split the stream into non-owning read and write halves.
fn split(&mut self) -> (Self::Ingress<'_>, Self::Egress<'_>);
}

Expand Down
5 changes: 5 additions & 0 deletions mtorrent-utils/src/stopwatch.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,36 @@
#[doc(hidden)]
#[macro_export]
macro_rules! debug_stopwatch {
($($arg:tt)+) => {
local_async_utils::debug_stopwatch!(local_async_utils::millisec!(1), $($arg)+)
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! trace_stopwatch {
($($arg:tt)+) => {
local_async_utils::trace_stopwatch!(std::time::Duration::ZERO, $($arg)+)
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! info_stopwatch {
($($arg:tt)+) => {
local_async_utils::info_stopwatch!(local_async_utils::millisec!(1), $($arg)+)
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! warn_stopwatch {
($($arg:tt)+) => {
local_async_utils::warn_stopwatch!(local_async_utils::sec!(1), $($arg)+)
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! error_stopwatch {
($($arg:tt)+) => {
Expand Down
32 changes: 20 additions & 12 deletions mtorrent-utils/src/upnp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type BlockingGateway = igd_next::Gateway;

pub use igd_next::PortMappingProtocol;

/// Utility for creating and maintaining a port mapping on the local gateway via UPnP. The mapping
/// is valid for `PORT_LEASE_DURATION_SEC` seconds, but automatic renewal can be enabled by calling
/// `run_continuous_renewal()`. The mapping is removed when the `PortOpener` is dropped.
pub struct PortOpener {
gateway: AsyncGateway,
internal_addr: SocketAddr,
Expand All @@ -19,12 +22,15 @@ pub struct PortOpener {

impl PortOpener {
/// Recommended lease duration from <https://upnp.org/specs/gw/UPnP-gw-WANIPConnection-v2-Service.pdf>.
const LEASE_DURATION_SEC: u32 = 3600;
pub const PORT_LEASE_DURATION_SEC: u32 = 3600;

/// Create a port mapping on the local gateway and return a `PortOpener` that maintains it.
/// Create a TCP or UDP port mapping that will be valid for
/// `PORT_LEASE_DURATION_SEC` seconds and return a `PortOpener` that maintains it.
///
/// The mapping will be automatically removed when the `PortOpener` is dropped, but it will not
/// be renewed unless `run_continuous_renewal()` is called.
/// If `desired_external_port` is not specified, the gateway will assign an arbitrary external
/// port number.
/// If `interface` is not specified, the first active network adapter with a non-loopback IPv4
/// address will be used.
pub async fn new(
proto: PortMappingProtocol,
internal_port: u16,
Expand Down Expand Up @@ -54,11 +60,13 @@ impl PortOpener {
let public_ip = gateway.get_external_ip().await?;
let public_port = if let Some(desired_port) = desired_external_port {
gateway
.add_port(proto, desired_port, internal_addr, Self::LEASE_DURATION_SEC, "")
.add_port(proto, desired_port, internal_addr, Self::PORT_LEASE_DURATION_SEC, "")
.await?;
desired_port
} else {
gateway.add_any_port(proto, internal_addr, Self::LEASE_DURATION_SEC, "").await?
gateway
.add_any_port(proto, internal_addr, Self::PORT_LEASE_DURATION_SEC, "")
.await?
};
let external_addr = SocketAddr::new(public_ip, public_port);

Expand All @@ -76,12 +84,12 @@ impl PortOpener {
self.external_addr
}

/// Start continuous renewal of the port mapping. The mapping will be automatically removed when
/// the returned future is dropped.
pub async fn run_continuous_renewal(self) -> igd_next::Result<()> {
/// Continuously renew the port mapping every `PORT_LEASE_DURATION_SEC` seconds. This function
/// never returns unless an error occurs.
pub async fn run_continuous_renewal(&mut self) -> igd_next::Result<()> {
// renewal interval must be slightly higher than the lease duration because renewing a
// mapping that hasn't expired yet has no effect
let renewal_interval = sec!(Self::LEASE_DURATION_SEC as u64) + millisec!(500);
let renewal_interval = sec!(Self::PORT_LEASE_DURATION_SEC as u64) + millisec!(500);
loop {
sleep(renewal_interval).await;

Expand All @@ -90,7 +98,7 @@ impl PortOpener {
self.proto,
self.external_addr.port(),
self.internal_addr,
Self::LEASE_DURATION_SEC,
Self::PORT_LEASE_DURATION_SEC,
"",
)
.await?;
Expand All @@ -106,7 +114,7 @@ impl PortOpener {

impl Drop for PortOpener {
/// Remove the port mapping when the `PortOpener` is dropped. Note that this is a blocking
/// operation because [`AsyncDrop`](https://doc.rust-lang.org/std/future/trait.AsyncDrop.html) is experimental.
/// operation since [`AsyncDrop`](https://doc.rust-lang.org/std/future/trait.AsyncDrop.html) is still experimental.
fn drop(&mut self) {
let gateway = BlockingGateway {
addr: self.gateway.addr,
Expand Down
6 changes: 1 addition & 5 deletions mtorrent-utils/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::sync::Arc;
use std::time::Duration;
use std::{io, thread};
use tokio::runtime;
use tokio::sync::Notify;
#[cfg(tokio_unstable)]
use tokio::sync::oneshot;
use tokio::sync::{Notify, oneshot};

pub mod simple {
use super::*;
Expand Down Expand Up @@ -152,8 +150,6 @@ pub fn with_runtime(config: rt::Config) -> io::Result<rt::Handle> {
}

/// Create a worker thread running a [`tokio::runtime::LocalRuntime`](https://docs.rs/tokio/latest/tokio/runtime/struct.LocalRuntime.html) with the specified configuration.
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn with_local_runtime(config: rt::Config) -> io::Result<rt::Handle> {
let mut builder = runtime::Builder::new_current_thread();
builder.max_blocking_threads(config.max_blocking_threads);
Expand Down
3 changes: 1 addition & 2 deletions mtorrent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,4 @@ workspace = true

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"]
rustc-args = ["--cfg", "tokio_unstable"]
rustdoc-args = ["--cfg", "docsrs"]
2 changes: 1 addition & 1 deletion mtorrent/src/app/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn launch_dht_node_runtime(cfg: Config) -> io::Result<(worker::rt::Handle, d

async fn start_upnp(local_port: u16, interface: Option<&str>) -> io::Result<()> {
// try create a port mapping with the same port number
let port_opener = upnp::PortOpener::new(
let mut port_opener = upnp::PortOpener::new(
upnp::PortMappingProtocol::UDP,
local_port,
Some(local_port),
Expand Down
37 changes: 18 additions & 19 deletions mtorrent/src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ async fn start_upnp(
internal_port: u16,
desired_external_port: Option<u16>,
proto: upnp::PortMappingProtocol,
interface: Option<&str>,
interface: Option<String>,
) -> u16 {
let Ok(port_opener) =
upnp::PortOpener::new(proto, internal_port, desired_external_port, interface)
let Ok(mut port_opener) =
upnp::PortOpener::new(proto, internal_port, desired_external_port, interface.as_deref())
.await
.inspect_err(|e| log::error!("UPnP: {proto:?} port mapping failed: {e}"))
else {
Expand Down Expand Up @@ -111,22 +111,21 @@ pub async fn single_torrent(
// create port mappings and get external port to send correct listening port to trackers and
// peers later
let external_pwp_port = if cfg.use_upnp {
let _g = ctx.pwp_runtime.enter();
let (_external_tcp_port, external_udp_port) = join!(
start_upnp(
internal_pwp_port,
cfg.pwp_port,
upnp::PortMappingProtocol::TCP,
cfg.bind_interface.as_deref()
),
start_upnp(
internal_pwp_port,
cfg.pwp_port,
upnp::PortMappingProtocol::UDP,
cfg.bind_interface.as_deref()
),
);
external_udp_port
// UPnP tasks must be spawned on the pwp runtime
let tcp_task = ctx.pwp_runtime.spawn(start_upnp(
internal_pwp_port,
cfg.pwp_port,
upnp::PortMappingProtocol::TCP,
cfg.bind_interface.clone(),
));
let utp_task = ctx.pwp_runtime.spawn(start_upnp(
internal_pwp_port,
cfg.pwp_port,
upnp::PortMappingProtocol::UDP,
cfg.bind_interface.clone(),
));
let (_external_tcp_port, external_udp_port) = join!(tcp_task, utp_task);
external_udp_port.unwrap_or(internal_pwp_port)
} else {
internal_pwp_port
};
Expand Down
2 changes: 0 additions & 2 deletions mtorrent/src/ops/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,6 @@ mod tests {
use std::future::pending;
use std::net::Ipv4Addr;
use std::sync::Arc;
use tokio::runtime::UnhandledPanic;
use tokio::time::{sleep, sleep_until};

fn addr(i: u16) -> SocketAddr {
Expand All @@ -463,7 +462,6 @@ mod tests {
macro_rules! run_in_local_set {
($($tokens:tt)+) => {{
task::LocalSet::new()
.unhandled_panic(UnhandledPanic::ShutdownRuntime)
.run_until(async move {
$($tokens)+

Expand Down
Loading