From 4756648e65694eeadc0acb0933b5fe790058d069 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 29 Mar 2023 15:41:46 +0200 Subject: [PATCH] Fix missing SUBSCRIBE and refactor Depend on latest commits on https://github.com/libp2p/rust-libp2p/pull/3625 more specifically https://github.com/libp2p/rust-libp2p/pull/3625/commits/b5728952464f570c5636f5e6a4fa2448e28f3cbe resolving the missing SUBSCRIBE message previously not send by a rust-libp2p server. Additional refactorings: - Use clap to parse command line arguments. Among other things gives us nice help text and better error messages. - Depend on `libp2p-webrtc` directly as we will remove the re-export from `libp2p`. - Use `futures-timer` to send `Hello World` on a 2 second interval instead of on each `SwarmEvent`. - Redo logging on various levels (warn, info, debug). --- rust-server/Cargo.lock | 102 ++++++++++++++++++++++++++---------- rust-server/Cargo.toml | 4 ++ rust-server/src/main.rs | 111 ++++++++++++++++++++++++++++------------ 3 files changed, 155 insertions(+), 62 deletions(-) diff --git a/rust-server/Cargo.lock b/rust-server/Cargo.lock index 0e18e011..0efa71a5 100644 --- a/rust-server/Cargo.lock +++ b/rust-server/Cargo.lock @@ -278,7 +278,7 @@ checksum = "86ea188f25f0255d8f92797797c97ebf5631fa88178beb1a46fdf5622c9a00e4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.4", + "syn 2.0.10", ] [[package]] @@ -494,6 +494,42 @@ dependencies = [ "inout", ] +[[package]] +name = "clap" +version = "4.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c911b090850d79fc64fe9ea01e28e465f65e821e08813ced95bced72f7a8a9b" +dependencies = [ + "bitflags", + "clap_derive", + "clap_lex", + "is-terminal", + "once_cell", + "strsim", + "termcolor", +] + +[[package]] +name = "clap_derive" +version = "4.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a932373bab67b984c790ddf2c9ca295d8e3af3b7ef92de5a5bacdccdee4b09b" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.10", +] + +[[package]] +name = "clap_lex" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "033f6b7a4acb1f358c742aaca805c939ee73b4c6209ae4318ec7aca81c42e646" +dependencies = [ + "os_str_bytes", +] + [[package]] name = "concurrent-queue" version = "2.1.0" @@ -1434,7 +1470,7 @@ checksum = "7fc7aa29613bd6a620df431842069224d8bc9011086b1db4c0e0cd47fa03ec9a" [[package]] name = "libp2p" version = "0.51.2" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "bytes", "futures", @@ -1463,7 +1499,7 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" version = "0.1.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "libp2p-core", "libp2p-identity", @@ -1474,7 +1510,7 @@ dependencies = [ [[package]] name = "libp2p-connection-limits" version = "0.1.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "libp2p-core", "libp2p-identity", @@ -1485,7 +1521,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.39.1" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "either", "fnv", @@ -1512,7 +1548,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.39.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "futures", "libp2p-core", @@ -1526,7 +1562,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.44.1" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "asynchronous-codec", "base64 0.21.0", @@ -1556,7 +1592,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.42.1" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "asynchronous-codec", "either", @@ -1577,7 +1613,7 @@ dependencies = [ [[package]] name = "libp2p-identity" version = "0.1.1" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "asn1_der", "bs58", @@ -1596,7 +1632,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.43.1" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "arrayvec", "asynchronous-codec", @@ -1623,7 +1659,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.43.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "data-encoding", "futures", @@ -1643,7 +1679,7 @@ dependencies = [ [[package]] name = "libp2p-metrics" version = "0.12.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "libp2p-core", "libp2p-gossipsub", @@ -1658,7 +1694,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.42.1" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "bytes", "curve25519-dalek 3.2.0", @@ -1680,7 +1716,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.42.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "either", "futures", @@ -1697,7 +1733,7 @@ dependencies = [ [[package]] name = "libp2p-quic" version = "0.7.0-alpha.3" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "bytes", "futures", @@ -1718,7 +1754,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.42.1" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "either", "fnv", @@ -1738,17 +1774,17 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" version = "0.32.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "heck", "quote", - "syn 2.0.4", + "syn 2.0.10", ] [[package]] name = "libp2p-tcp" version = "0.39.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "futures", "futures-timer", @@ -1764,7 +1800,7 @@ dependencies = [ [[package]] name = "libp2p-tls" version = "0.1.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "futures", "futures-rustls", @@ -1782,7 +1818,7 @@ dependencies = [ [[package]] name = "libp2p-webrtc" version = "0.4.0-alpha.4" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "async-trait", "asynchronous-codec", @@ -1971,7 +2007,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.12.1" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "bytes", "futures", @@ -2139,6 +2175,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "os_str_bytes" +version = "6.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267" + [[package]] name = "p256" version = "0.11.1" @@ -2467,7 +2509,7 @@ dependencies = [ [[package]] name = "quick-protobuf-codec" version = "0.1.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "asynchronous-codec", "bytes", @@ -2706,9 +2748,13 @@ name = "rust-libp2p-webrtc-server" version = "0.1.0" dependencies = [ "anyhow", + "clap", "env_logger", "futures", + "futures-timer", "libp2p", + "libp2p-webrtc", + "log", "rand 0.8.5", "tokio", "tokio-util", @@ -2781,7 +2827,7 @@ dependencies = [ [[package]] name = "rw-stream-sink" version = "0.3.0" -source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#e28af538318c146d5b19ab7098d9e0017c356ec8" +source = "git+https://github.com/vnermolaev/rust-libp2p.git?branch=deprecate/gossipsub-close-event#b5728952464f570c5636f5e6a4fa2448e28f3cbe" dependencies = [ "futures", "pin-project", @@ -2869,7 +2915,7 @@ checksum = "e801c1712f48475582b7696ac71e0ca34ebb30e09338425384269d9717c62cad" dependencies = [ "proc-macro2", "quote", - "syn 2.0.4", + "syn 2.0.10", ] [[package]] @@ -3067,9 +3113,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.4" +version = "2.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c622ae390c9302e214c31013517c2061ecb2699935882c60a9b37f82f8625ae" +checksum = "5aad1363ed6d37b84299588d62d3a7d95b5a5c2d9aad5c85609fda12afaa1f40" dependencies = [ "proc-macro2", "quote", @@ -3135,7 +3181,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.4", + "syn 2.0.10", ] [[package]] diff --git a/rust-server/Cargo.toml b/rust-server/Cargo.toml index 43143c3f..eb87bb95 100644 --- a/rust-server/Cargo.toml +++ b/rust-server/Cargo.toml @@ -7,9 +7,13 @@ edition = "2021" [dependencies] anyhow = "1.0" +clap = { version = "4.1.11", features = ["derive"] } env_logger = "0.10.0" futures = "0.3.27" +futures-timer = "3.0.2" libp2p = {git = "https://github.com/vnermolaev/rust-libp2p.git", branch = "deprecate/gossipsub-close-event", version="0.51.2", features = ["identify", "ping", "tokio", "gossipsub", "webrtc", "macros", "kad", "rsa", "ed25519"]} +libp2p-webrtc = {git = "https://github.com/vnermolaev/rust-libp2p.git", branch = "deprecate/gossipsub-close-event", version="0.4.0-alpha.4", features = ["tokio"] } +log = "0.4.17" rand = "0.8.5" tokio = { version= "1.26.0", features=["full"] } tokio-util = { version = "0.7", features = ["full"] } diff --git a/rust-server/src/main.rs b/rust-server/src/main.rs index 4a732cf5..9bb9753f 100644 --- a/rust-server/src/main.rs +++ b/rust-server/src/main.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use clap::Parser; use futures::StreamExt; use libp2p::{ core::muxing::StreamMuxerBox, @@ -6,51 +7,93 @@ use libp2p::{ multiaddr::Protocol, ping, swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}, - webrtc, Multiaddr, PeerId, Transport, + Multiaddr, PeerId, Transport, }; +use libp2p_webrtc as webrtc; +use log::{debug, error, info, warn}; use rand::thread_rng; -use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; -use std::io; -use std::time::{Duration, Instant}; +use std::time::Instant; +use std::{collections::hash_map::DefaultHasher, time::Duration}; +const TICK_INTERVAL: Duration = Duration::from_secs(5); + +#[derive(Debug, Parser)] +#[clap(name = "universal connectivity rust server")] +struct Opt { + /// Address of a remote peer to connect to. + #[clap(long)] + remote_address: Option, +} /// An example WebRTC server that will accept connections #[tokio::main] async fn main() -> Result<()> { - env_logger::init(); + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + let opt = Opt::parse(); let mut swarm = create_swarm()?; swarm.listen_on(format!("/ip4/127.0.0.1/udp/0/webrtc").parse()?)?; + if let Some(remote_address) = opt.remote_address { + swarm.dial(remote_address).unwrap(); + } + + let mut tick = futures_timer::Delay::new(TICK_INTERVAL); + let now = Instant::now(); loop { - let event = swarm.next().await.unwrap(); - eprintln!("New event: {event:?}"); - match event { - SwarmEvent::NewListenAddr { address, .. } => { - let p2p_address = address.with(Protocol::P2p((*swarm.local_peer_id()).into())); - eprintln!("p2p address: {p2p_address:?}") + match futures::future::select(swarm.next(), &mut tick).await { + futures::future::Either::Left((event, _)) => match event.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { + let p2p_address = address.with(Protocol::P2p((*swarm.local_peer_id()).into())); + info!("Listen address: {p2p_address:?}") + } + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + info!("Connected to {peer_id}"); + } + SwarmEvent::OutgoingConnectionError { peer_id, error } => { + warn!("Failed to dial {peer_id:?}: {error}"); + } + SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { + warn!("Connection to {peer_id} closed: {cause:?}"); + } + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( + libp2p::gossipsub::Event::Message { + message_id: _, + propagation_source: _, + message, + }, + )) => { + info!( + "Received message from {:?}: {}", + message.source, + String::from_utf8(message.data).unwrap() + ); + } + SwarmEvent::Behaviour(BehaviourEvent::Gossipsub( + libp2p::gossipsub::Event::Subscribed { peer_id, topic }, + )) => { + info!("{peer_id} subscribed to {topic}"); + } + event => { + debug!("{event:?}"); + } }, - _ => {} + futures::future::Either::Right(_) => { + tick = futures_timer::Delay::new(TICK_INTERVAL); + + let message = format!("Hello world! Sent at: {:4}s", now.elapsed().as_secs_f64()); + + if let Err(err) = swarm.behaviour_mut().gossipsub.publish( + gossipsub::IdentTopic::new("universal-connectivity"), + message.as_bytes(), + ) { + error!("Failed to publish periodic message: {err}") + } + } } - // let peers: Vec<_> = swarm.behaviour().gossipsub.all_peers().collect(); - // eprintln!("Peers: {peers:?}"); - // let peers: Vec<_> = swarm.behaviour().gossipsub.all_mesh_peers().collect(); - // eprintln!("Mesh peers: {peers:?}"); - - - let elapsed_secs = now.elapsed().as_secs(); - // eprintln!("elapsed seconds: {}", elapsed_secs); - - let message = "Hello world! sent at : ".to_owned() + &elapsed_secs.clone().to_string() + " seconds."; - - // if elapsed_secs % 2 == 0 { - swarm - .behaviour_mut() - .gossipsub - .publish(gossipsub::IdentTopic::new("universal-connectivity"), message.as_bytes()); - // } } } @@ -66,7 +109,7 @@ struct Behaviour { fn create_swarm() -> Result> { let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); - println!("Local peer id: {local_peer_id}"); + debug!("Local peer id: {local_peer_id}"); // To content-address message, we can take the hash of message and use it as an ID. let message_id_fn = |message: &gossipsub::Message| { @@ -103,10 +146,10 @@ fn create_swarm() -> Result> { webrtc::tokio::Certificate::generate(&mut thread_rng())?, ); - let identify_config = identify::Behaviour::new(identify::Config::new( - "/ipfs/0.1.0".into(), - local_key.public().clone(), - )); + let identify_config = identify::Behaviour::new( + identify::Config::new("/ipfs/0.1.0".into(), local_key.public().clone()) + .with_initial_delay(Duration::ZERO), + ); let transport = transport .map(|(local_peer_id, conn), _| (local_peer_id, StreamMuxerBox::new(conn)))