Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,890 changes: 1,053 additions & 837 deletions content-discovery/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions content-discovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ missing_debug_implementations = "warn"
unused-async = "warn"

[workspace.dependencies]
iroh = { version ="0.94", features = ["discovery-pkarr-dht"] }
iroh-base = "0.94"
iroh-blobs = "0.96"
iroh = { version ="0.96", features = ["address-lookup-pkarr-dht"] }
iroh-base = "0.96"
iroh-blobs = "0.98"
# explicitly specified until iroh minimal crates issues are solved, see https://github.com/n0-computer/iroh/pull/3255
tokio = { version = "1.44.1" }
tokio-stream = { version = "0.1.17" }
postcard = { version = "1", default-features = false }
anyhow = { version = "1", default-features = false }
anyhow = "1"
n0-future = { version = "0.3" }
futures-buffered = { version = "0.2.11" }
4 changes: 2 additions & 2 deletions content-discovery/iroh-content-discovery-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ async fn query(args: QueryArgs) -> anyhow::Result<()> {
/// Create an endpoint that does look up discovery info via DNS or the DHT, but does not
/// announce. The client node id is ephemeral and will not be dialed by anyone.
async fn create_client_endpoint() -> Result<endpoint::Endpoint, BindError> {
let discovery = iroh::discovery::pkarr::dht::DhtDiscovery::builder()
let address_lookup = iroh::address_lookup::DhtAddressLookup::builder()
.dht(true)
.n0_dns_pkarr_relay();
endpoint::Endpoint::builder()
.discovery(discovery)
.address_lookup(address_lookup)
.bind()
.await
}
Expand Down
159 changes: 98 additions & 61 deletions content-discovery/iroh-content-discovery/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{future::Future, result};
use std::result;

use iroh::{
endpoint::{ConnectOptions, Connection},
endpoint::{
ConnectOptions, Connection, ConnectionState, OutgoingZeroRtt, RecvStream, ZeroRttStatus,
},
Endpoint, EndpointId,
};
use n0_future::{BufferedStreamExt, Stream, StreamExt};
Expand All @@ -22,7 +24,7 @@ pub enum Error {

#[snafu(display("Failed connect to tracker using 1-rtt: {}", source))]
Connect1Rtt {
source: iroh::endpoint::ConnectionError,
source: iroh::endpoint::ConnectingError,
backtrace: snafu::Backtrace,
},

Expand Down Expand Up @@ -61,12 +63,6 @@ pub enum Error {
source: postcard::Error,
backtrace: snafu::Backtrace,
},

#[snafu(display("Failed to get remote endpoint id: {}", source))]
RemoteEndpointId {
source: iroh::endpoint::RemoteEndpointIdError,
backtrace: snafu::Backtrace,
},
}

pub type Result<T> = result::Result<T, Error>;
Expand Down Expand Up @@ -107,47 +103,39 @@ pub async fn announce(
.await
.context(ConnectSnafu)?;
match connecting.into_0rtt() {
Ok((connection, zero_rtt_accepted)) => {
Ok(connection) => {
trace!("connected to tracker using possibly 0-rtt: {endpoint_id}");
announce_conn(&connection, signed_announce, zero_rtt_accepted).await?;
let connection = announce_conn_0rtt(connection, signed_announce).await?;
wait_for_session_ticket(connection);
Ok(())
}
Err(connecting) => {
let connection = connecting.await.context(Connect1RttSnafu)?;
trace!("connected to tracker using 1-rtt: {endpoint_id}");
announce_conn(&connection, signed_announce, async { true }).await?;
announce_conn(&connection, signed_announce).await?;
connection.close(0u32.into(), b"");
Ok(())
}
}
}

/// Announce via an existing connection.
///
/// The proceed future can be used to reattempt the send, which can be useful if the connection
/// was established using 0-rtt and the tracker does not support it. If you have an existing
/// 1-rtt connection, you can pass `async { true }` to proceed immediately.
pub async fn announce_conn(
connection: &Connection,
signed_announce: SignedAnnounce,
proceed: impl Future<Output = bool>,
) -> Result<()> {
let (mut send, recv) = connection.open_bi().await.context(OpenStreamSnafu)?;
pub fn create_announce_request(signed_announce: SignedAnnounce) -> Result<Vec<u8>> {
let request = Request::Announce(signed_announce);
let request = postcard::to_stdvec(&request).context(SerializeRequestSnafu)?;
postcard::to_stdvec(&request).context(SerializeRequestSnafu)
}

pub async fn send_announce<S: ConnectionState>(
connection: &Connection<S>,
request: &[u8],
) -> Result<RecvStream> {
let (mut send, recv) = connection.open_bi().await.context(OpenStreamSnafu)?;
trace!("sending announce");
send.write_all(&request).await.context(WriteRequestSnafu)?;
send.write_all(request).await.context(WriteRequestSnafu)?;
send.finish().context(FinishWriteSnafu)?;
let mut recv = if proceed.await {
recv
} else {
let (mut send, recv) = connection.open_bi().await.context(OpenStreamSnafu)?;
trace!("re-sending announce using 1-rtt");
send.write_all(&request).await.context(WriteRequestSnafu)?;
send.finish().context(FinishWriteSnafu)?;
recv
};
Ok(recv)
}

pub async fn recv_response(mut recv: RecvStream) -> Result<()> {
let _response = recv
.read_to_end(REQUEST_SIZE_LIMIT)
.await
Expand All @@ -156,6 +144,38 @@ pub async fn announce_conn(
Ok(())
}

pub async fn announce_conn(connection: &Connection, signed_announce: SignedAnnounce) -> Result<()> {
let request = create_announce_request(signed_announce)?;
let recv = send_announce(connection, &request).await?;
recv_response(recv).await
}

/// Announce via an existing 0rtt connection.
pub async fn announce_conn_0rtt(
connection: Connection<OutgoingZeroRtt>,
signed_announce: SignedAnnounce,
) -> Result<Connection> {
// Send via 0-RTT
let request = create_announce_request(signed_announce)?;
let recv = send_announce(&connection, &request).await?;

// Check if 0-RTT was accepted
let status = connection
.handshake_completed()
.await
.context(Connect1RttSnafu)?;
let (connection, recv) = match status {
ZeroRttStatus::Accepted(conn) => (conn, recv),
ZeroRttStatus::Rejected(conn) => {
// Resend on a new stream
let recv = send_announce(&conn, &request).await?;
(conn, recv)
}
};
recv_response(recv).await?;
Ok(connection)
}

/// A single query to a tracker, using 0-rtt if possible.
pub async fn query(
endpoint: &Endpoint,
Expand All @@ -167,16 +187,47 @@ pub async fn query(
.await
.context(ConnectSnafu)?;
let result = match connecting.into_0rtt() {
Ok((connection, zero_rtt_accepted)) => {
Ok(connection) => {
trace!("connected to tracker using possibly 0-rtt: {endpoint_id}");
let res = query_conn(&connection, args, zero_rtt_accepted).await?;
// Send via 0-RTT
let request = Request::Query(args);
let request = postcard::to_stdvec(&request).context(SerializeRequestSnafu)?;
trace!("connected to {:?}", connection.remote_id());
let (mut send, recv) = connection.open_bi().await.context(OpenStreamSnafu)?;
trace!("sending query");
send.write_all(&request).await.context(WriteRequestSnafu)?;
send.finish().context(FinishWriteSnafu)?;

// Check if 0-RTT was accepted
let status = connection
.handshake_completed()
.await
.context(Connect1RttSnafu)?;
let (connection, mut recv) = match status {
ZeroRttStatus::Accepted(conn) => (conn, recv),
ZeroRttStatus::Rejected(conn) => {
// Resend on a new stream
let (mut send, recv) = conn.open_bi().await.context(OpenStreamSnafu)?;
trace!("sending query again using 1-rtt");
send.write_all(&request).await.context(WriteRequestSnafu)?;
send.finish().context(FinishWriteSnafu)?;
(conn, recv)
}
};
let response = recv
.read_to_end(REQUEST_SIZE_LIMIT)
.await
.context(ReadResponseSnafu)?;
let response =
postcard::from_bytes::<Response>(&response).context(DeserializeResponseSnafu)?;
let Response::QueryResponse(res) = response;
wait_for_session_ticket(connection);
res
}
Err(connecting) => {
let connection = connecting.await.context(Connect1RttSnafu)?;
trace!("connected to tracker using 1-rtt: {endpoint_id}");
let res = query_conn(&connection, args, async { true }).await?;
let res = query_conn(&connection, args).await?;
connection.close(0u32.into(), b"");
res
}
Expand Down Expand Up @@ -210,35 +261,16 @@ pub fn query_all(
}

/// Query via an existing connection.
///
/// The proceed future can be used to reattempt the send, which can be useful if the connection
/// was established using 0-rtt and the tracker does not support it. If you have an existing
/// 1-rtt connection, you can pass `async { true }` to proceed immediately.
pub async fn query_conn(
connection: &Connection,
pub async fn query_conn<S: ConnectionState>(
connection: &Connection<S>,
args: Query,
proceed: impl Future<Output = bool>,
) -> Result<QueryResponse> {
let request = Request::Query(args);
let request = postcard::to_stdvec(&request).context(SerializeRequestSnafu)?;
trace!(
"connected to {:?}",
connection.remote_id().context(RemoteEndpointIdSnafu)?
);
trace!("opened bi stream");
let (mut send, recv) = connection.open_bi().await.context(OpenStreamSnafu)?;
trace!("sending query");
let (mut send, mut recv) = connection.open_bi().await.context(OpenStreamSnafu)?;
send.write_all(&request).await.context(WriteRequestSnafu)?;
send.finish().context(FinishWriteSnafu)?;
let mut recv = if proceed.await {
recv
} else {
let (mut send, recv) = connection.open_bi().await.context(OpenStreamSnafu)?;
trace!("sending query again using 1-rtt");
send.write_all(&request).await.context(WriteRequestSnafu)?;
send.finish().context(FinishWriteSnafu)?;
recv
};
let response = recv
.read_to_end(REQUEST_SIZE_LIMIT)
.await
Expand All @@ -249,11 +281,16 @@ pub async fn query_conn(
})
}

fn wait_for_session_ticket(connection: Connection) {
fn wait_for_session_ticket(connection: Connection<iroh::endpoint::HandshakeCompleted>) {
tokio::spawn(async move {
// todo: use a more precise API for waiting once it is available.
// See https://github.com/quinn-rs/quinn/pull/2257
tokio::time::sleep(connection.rtt() * 2).await;
let wait_time = connection
.to_info()
.selected_path()
.map(|p| p.rtt() * 2)
.unwrap_or(std::time::Duration::from_millis(100));
tokio::time::sleep(wait_time).await;
connection.close(0u32.into(), b"");
});
}
2 changes: 1 addition & 1 deletion content-discovery/iroh-content-tracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
anyhow = { workspace = true, features = ["backtrace"] }
# needs to keep updated with the dep of iroh-blobs
bao-tree = { version = "0.15.1", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.16", features = ["tokio_fsm"], default-features = false }
bytes = "1"
derive_more = { version = "2", features = ["debug", "display", "from", "try_into"] }
dirs-next = "2"
Expand Down
14 changes: 7 additions & 7 deletions content-discovery/iroh-content-tracker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use clap::Parser;
use iroh::{endpoint::BindError, Endpoint};
use iroh::Endpoint;
use iroh_content_discovery::protocol::ALPN;
use iroh_content_tracker::{
io::{
Expand Down Expand Up @@ -46,22 +46,22 @@ async fn create_endpoint(
key: iroh::SecretKey,
ipv4_addr: Option<SocketAddrV4>,
ipv6_addr: Option<SocketAddrV6>,
) -> Result<Endpoint, BindError> {
) -> anyhow::Result<Endpoint> {
let mut builder = iroh::Endpoint::builder()
.secret_key(key.clone())
.discovery(
iroh::discovery::pkarr::dht::DhtDiscovery::builder()
.address_lookup(
iroh::address_lookup::DhtAddressLookup::builder()
.secret_key(key)
.build()?,
)
.alpns(vec![ALPN.to_vec()]);
if let Some(ipv4_addr) = ipv4_addr {
builder = builder.bind_addr_v4(ipv4_addr);
builder = builder.bind_addr(std::net::SocketAddr::V4(ipv4_addr))?;
}
if let Some(ipv6_addr) = ipv6_addr {
builder = builder.bind_addr_v6(ipv6_addr);
builder = builder.bind_addr(std::net::SocketAddr::V6(ipv6_addr))?;
}
builder.bind().await
Ok(builder.bind().await?)
}

/// Write default options to a sample config file.
Expand Down
21 changes: 11 additions & 10 deletions content-discovery/iroh-content-tracker/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
time::{Duration, Instant},
};

use anyhow::{bail, Context};
use anyhow::Context;
use bao_tree::ChunkNum;
use iroh::{endpoint::Connection, Endpoint, EndpointId};
use iroh_blobs::{
Expand Down Expand Up @@ -898,12 +898,7 @@ impl Tracker {
let connecting = incoming.accept()?;
let tracker = self.clone();
tokio::spawn(async move {
let Ok((conn, _)) = connecting.into_0rtt() else {
// this should never happen, but for now we handle it gracefully.
// hopefully the API will be changed to not return a result.
error!("error converting to 0-RTT connection");
return;
};
let conn = connecting.into_0rtt();
let Some(alpn) = conn.alpn() else {
error!("no ALPN found on connection");
return;
Expand All @@ -912,6 +907,14 @@ impl Tracker {
error!("unexpected ALPN on connection: {:?}", alpn);
return;
}
// Wait for handshake to complete to get a Connection<HandshakeCompleted>
let conn = match conn.handshake_completed().await {
Ok(conn) => conn,
Err(e) => {
error!("handshake failed: {e}");
return;
}
};
if let Err(cause) = tracker.handle_connection(conn).await {
tracing::error!("error handling connection: {}", cause);
}
Expand All @@ -923,9 +926,7 @@ impl Tracker {
/// Handle a single incoming connection on the tracker ALPN.
pub async fn handle_connection(&self, conn: Connection) -> anyhow::Result<()> {
let (mut send, mut recv) = conn.accept_bi().await?;
let Ok(remote_endpoint_id) = conn.remote_id() else {
bail!("error getting remote endpoint id");
};
let remote_endpoint_id = conn.remote_id();
trace!("remote endpoint id: {}", remote_endpoint_id);
let request = recv.read_to_end(REQUEST_SIZE_LIMIT).await?;
let request = postcard::from_bytes::<Request>(&request)?;
Expand Down
4 changes: 2 additions & 2 deletions h3-iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ http-body = { version = "1", optional = true }
http-body-util = { version = "0.1", optional = true }
hyper = { version = "1.5", optional = true }
hyper-util = { version = "0.1", optional = true }
iroh = "0.95"
iroh-tickets = "0.2"
iroh = "0.96"
iroh-tickets = "0.3"
tokio = { version = "1", features = ["io-util"], default-features = false}
tokio-util = "0.7"
tower = { version = "0.5", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion h3-iroh/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> Result<()> {

async fn handle_connection(incoming: Incoming, root: Arc<Option<PathBuf>>) -> Result<()> {
let conn = incoming.accept()?.await?;
let remote_endpoint_id = conn.remote_id()?;
let remote_endpoint_id = conn.remote_id();
let span = Span::current();
span.record(
"remote_endpoint_id",
Expand Down
Loading
Loading