-
Hi there, I'm having a few weird problems with my P2P chat implementation of Libp2p and would like some help. Problems
How to run the code
main.rsuse futures::stream::StreamExt;
use libp2p::{gossipsub, mdns, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use std::io::Write;
use tokio::{io, io::AsyncBufReadExt, select};
use tracing_subscriber::EnvFilter;
use std::collections::HashSet;
// We create a custom network behavior that combines Gossipsub and Mdns.
#[derive(NetworkBehaviour)]
struct MyBehaviour {
gossipsub: gossipsub::Behaviour,
mdns: mdns::tokio::Behaviour,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_quic()
.with_behaviour(|key| {
// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};
// Set a custom gossipsub configuration
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; // Temporary hack because `build` does not return a proper `std::error::Error`.
// build a gossipsub network behaviour
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
gossipsub_config,
)?;
let mdns =
mdns::tokio::Behaviour::new(mdns::Config::default(), key.public().to_peer_id())?;
Ok(MyBehaviour { gossipsub, mdns })
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();
// Prompt the user to enter a username
print!("Please enter a username: ");
std::io::stdout().flush()?;
let mut username = String::new();
std::io::stdin().read_line(&mut username)?;
// Prompt the user to enter a Gossipsub topic to join
print!("Please enter a topic to join: ");
std::io::stdout().flush()?;
let mut topic = String::new();
std::io::stdin().read_line(&mut topic)?;
// Create a Gossipsub topic
let topic = gossipsub::IdentTopic::new(topic.trim());
// subscribes to our topic
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
println!("\x1b[32mSuccessfully joined topic {} as user {}\x1b[0m", topic, username.trim());
println!("---------------------------");
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
// Initialize the set of discovered peers
let mut discovered_peers = HashSet::new();
// Kick it off
loop {
select! {
Ok(Some(line)) = stdin.next_line() => {
let message = format!("\x1b[36m{}:\x1b[0m {}", username.trim(), line.trim());
if let Err(e) = swarm
.behaviour_mut().gossipsub
.publish(topic.clone(), message.as_bytes()) {
println!("\x1b[31mPublish error: {e:?}\x1b[0m");
}
}
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
// Only print the message if the peer is new
if discovered_peers.insert(peer_id.clone()) {
let short_peer_id: String = peer_id.to_string().chars().take(14).collect();
println!("\x1b[32mDiscovered a new peer: {short_peer_id}\x1b[0m");
}
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, _multiaddr) in list {
let short_peer_id: String = peer_id.to_string().chars().take(14).collect();
println!("\x1b[33mmDNS discover peer has expired: {short_peer_id}\x1b[0m");
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: _peer_id,
message_id: _id,
message,
})) => println!("{}", String::from_utf8_lossy(&message.data),),
_ => {}
}
}
}
} Cargo.toml[package]
name = "chat-app"
version = "0.1.0"
edition = "2021"
publish = false
license = "MIT"
[package.metadata.release]
release = false
[dependencies]
tokio = { version = "1.35", features = ["full"] }
async-trait = "0.1"
futures = "0.3.29"
libp2p = { version = "0.53.2", features = [ "tokio", "gossipsub", "mdns", "noise", "macros", "tcp", "yamux", "quic"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Mdns would only work if your machines (and local network) supports it. Sometimes a firewall will block this and would require it to be enabled on both ends. As for why disconnecting and reconnecting on the internet doesnt work is because the code does not attempt to dial the peer again after disconnection (assuming this is over WAN on mdns discovery event, |
Beta Was this translation helpful? Give feedback.
Mdns would only work if your machines (and local network) supports it. Sometimes a firewall will block this and would require it to be enabled on both ends.
As for why disconnecting and reconnecting on the internet doesnt work is because the code does not attempt to dial the peer again after disconnection (assuming this is over WAN on mdns discovery event,
Behaviour::add_explicit_peer
is called, which would internally dial the peer if the peer isnt connected). The best solution would be retry the connection by listening onSwarmEvent::ConnectionClosed
, add that peer to an array of peers and at some interval attempt to retry a connectionion to those peers and if that fails just wait for th…