From a1bc6111241ca35e2d9bf1d5fc2d01d9a61f40f1 Mon Sep 17 00:00:00 2001 From: Jacob Date: Mon, 28 Feb 2022 11:49:24 +0000 Subject: [PATCH 1/2] Expose Floodsub target_peer list. The IPFS facade does not have access to the functions which add and remove nodes to the floodsub target_peer list. As such, messages are never propagated to connected peers. --- src/lib.rs | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index d9991263b..975bee5c2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -268,6 +268,8 @@ enum IpfsEvent { Disconnect(MultiaddrWithPeerId, Channel<()>), /// Request background task to return the listened and external addresses GetAddresses(OneshotSender>), + PubsubAddPeer(PeerId, OneshotSender<()>), + PubsubRemovePeer(PeerId, OneshotSender<()>), PubsubSubscribe(String, OneshotSender>), PubsubUnsubscribe(String, OneshotSender), PubsubPublish(String, Vec, OneshotSender<()>), @@ -755,6 +757,42 @@ impl Ipfs { .await } + /// Add a peer to list of nodes to propagate messages to. + /// + /// Unless a peer is added to the list in this way it will not receive any pubsub messages from this node. + pub async fn pubsub_add_peer(&self, peer_id: PeerId) -> Result<(), Error> { + async move { + let (tx, rx) = oneshot_channel::<()>(); + + self.to_task + .clone() + .send(IpfsEvent::PubsubAddPeer(peer_id, tx)) + .await?; + + Ok(rx.await?) + } + .instrument(self.span.clone()) + .await + } + + /// Remove a peer from the list of nodes that messages are propagated to. + /// + /// This will not stop messages being sent to the specified peers for subscribed topics which have already been communicated. + pub async fn pubsub_remove_peer(&self, peer_id: PeerId) -> Result<(), Error> { + async move { + let (tx, rx) = oneshot_channel::<()>(); + + self.to_task + .clone() + .send(IpfsEvent::PubsubRemovePeer(peer_id, tx)) + .await?; + + Ok(rx.await?) + } + .instrument(self.span.clone()) + .await + } + /// Subscribes to a given topic. Can be done at most once without unsubscribing in the between. /// The subscription can be unsubscribed by dropping the stream or calling /// [`Ipfs::pubsub_unsubscribe`]. @@ -1431,6 +1469,20 @@ impl Future for IpfsFuture { // ignore error, perhaps caller went away already let _ = ret.send(addresses); } + IpfsEvent::PubsubAddPeer(peer_id, ret) => { + self.swarm + .behaviour_mut() + .pubsub() + .add_node_to_partial_view(peer_id); + let _ = ret.send(()); + } + IpfsEvent::PubsubRemovePeer(peer_id, ret) => { + self.swarm + .behaviour_mut() + .pubsub() + .remove_node_from_partial_view(&peer_id); + let _ = ret.send(()); + } IpfsEvent::PubsubSubscribe(topic, ret) => { let _ = ret.send(self.swarm.behaviour_mut().pubsub().subscribe(topic)); } @@ -1780,8 +1832,11 @@ mod node { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; use crate::make_ipld; + use futures::{stream::poll_immediate, StreamExt}; use multihash::Sha2_256; #[tokio::test] @@ -1819,4 +1874,50 @@ mod tests { ipfs.remove_pin(&cid, false).await.unwrap(); assert!(!ipfs.is_pinned(&cid).await.unwrap()); } + + #[tokio::test] + async fn test_pubsub_send_and_receive() { + let alice = Node::new("alice").await; + let alice_addr: Multiaddr = "/ip4/127.0.0.1/tcp/10001".parse().unwrap(); + alice.add_listening_address(alice_addr).await.unwrap(); + let bob = Node::new("bob").await; + let bob_addr: Multiaddr = "/ip4/127.0.0.1/tcp/10002".parse().unwrap(); + bob.add_listening_address(bob_addr.clone()).await.unwrap(); + + let topic = String::from("test_topic"); + alice + .connect(bob_addr.with(Protocol::P2p(bob.id.into()))) + .await + .expect("alice failed to connect to bob"); + let _alice_messages = alice.pubsub_subscribe(topic.clone()).await.unwrap(); + let mut bob_messages = poll_immediate(bob.pubsub_subscribe(topic.clone()).await.unwrap()); + + let data = vec![1, 2, 3]; + + alice + .pubsub_publish(topic.clone(), data.clone()) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + assert_eq!(bob_messages.next().await, Some(Poll::Pending)); + + bob.pubsub_add_peer(alice.id).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + assert_eq!(bob_messages.next().await, Some(Poll::Pending)); + + alice + .pubsub_publish(topic.clone(), data.clone()) + .await + .unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + + let received_data = bob_messages + .next() + .await + .expect("unexpected end of stream") + .map(|msg| msg.data.clone()); + assert_eq!(received_data, Poll::Ready(data.clone())); + } } From 39c4f1647426913ce70cc9fb4fe51e4ae198876e Mon Sep 17 00:00:00 2001 From: Jacob Date: Mon, 28 Feb 2022 17:53:28 +0000 Subject: [PATCH 2/2] Improve docs wording and simplify test. --- src/lib.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 975bee5c2..44c2a7aa5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -759,7 +759,8 @@ impl Ipfs { /// Add a peer to list of nodes to propagate messages to. /// - /// Unless a peer is added to the list in this way it will not receive any pubsub messages from this node. + /// A peer will not receive any pubsub messages from this node until it is added using this function, + /// unless it has added this node in the same way. pub async fn pubsub_add_peer(&self, peer_id: PeerId) -> Result<(), Error> { async move { let (tx, rx) = oneshot_channel::<()>(); @@ -777,7 +778,7 @@ impl Ipfs { /// Remove a peer from the list of nodes that messages are propagated to. /// - /// This will not stop messages being sent to the specified peers for subscribed topics which have already been communicated. + /// Calling this function will not stop messages being sent to the specified peers for subscribed topics which have already been communicated. pub async fn pubsub_remove_peer(&self, peer_id: PeerId) -> Result<(), Error> { async move { let (tx, rx) = oneshot_channel::<()>(); @@ -1878,11 +1879,8 @@ mod tests { #[tokio::test] async fn test_pubsub_send_and_receive() { let alice = Node::new("alice").await; - let alice_addr: Multiaddr = "/ip4/127.0.0.1/tcp/10001".parse().unwrap(); - alice.add_listening_address(alice_addr).await.unwrap(); let bob = Node::new("bob").await; - let bob_addr: Multiaddr = "/ip4/127.0.0.1/tcp/10002".parse().unwrap(); - bob.add_listening_address(bob_addr.clone()).await.unwrap(); + let bob_addr = bob.addrs_local().await.unwrap()[0].clone(); let topic = String::from("test_topic"); alice