Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Commit 38b3974

Browse files
committed
feat: introduce the means to connect by PeerId
Signed-off-by: ljedrz <ljedrz@gmail.com>
1 parent e929cdf commit 38b3974

File tree

6 files changed

+88
-37
lines changed

6 files changed

+88
-37
lines changed

src/lib.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,8 @@ use self::dag::IpldDag;
4747
pub use self::error::Error;
4848
use self::ipns::Ipns;
4949
pub use self::p2p::pubsub::{PubsubMessage, SubscriptionStream};
50-
pub use self::p2p::Connection;
51-
pub use self::p2p::SwarmTypes;
5250
use self::p2p::{create_swarm, SwarmOptions, TSwarm};
51+
pub use self::p2p::{Connection, ConnectionTarget, SwarmTypes};
5352
pub use self::path::IpfsPath;
5453
pub use self::repo::RepoTypes;
5554
use self::repo::{create_repo, Repo, RepoEvent, RepoOptions};
@@ -244,7 +243,10 @@ type FutureSubscription<T, E> = SubscriptionFuture<Result<T, E>>;
244243
#[derive(Debug)]
245244
enum IpfsEvent {
246245
/// Connect
247-
Connect(Multiaddr, OneshotSender<FutureSubscription<(), String>>),
246+
Connect(
247+
ConnectionTarget,
248+
OneshotSender<FutureSubscription<(), String>>,
249+
),
248250
/// Addresses
249251
Addresses(Channel<Vec<(PeerId, Vec<Multiaddr>)>>),
250252
/// Local addresses
@@ -467,13 +469,13 @@ impl<Types: IpfsTypes> Ipfs<Types> {
467469
self.ipns().cancel(key).instrument(self.span.clone()).await
468470
}
469471

470-
pub async fn connect(&self, addr: Multiaddr) -> Result<(), Error> {
472+
pub async fn connect<T: Into<ConnectionTarget>>(&self, target: T) -> Result<(), Error> {
471473
self.span
472474
.in_scope(|| async {
473475
let (tx, rx) = oneshot_channel();
474476
self.to_task
475477
.clone()
476-
.send(IpfsEvent::Connect(addr, tx))
478+
.send(IpfsEvent::Connect(target.into(), tx))
477479
.await?;
478480
let subscription = rx.await?;
479481
subscription.await?.map_err(|e| format_err!("{}", e))
@@ -892,8 +894,8 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
892894
};
893895

894896
match inner {
895-
IpfsEvent::Connect(addr, ret) => {
896-
ret.send(self.swarm.connect(addr)).ok();
897+
IpfsEvent::Connect(target, ret) => {
898+
ret.send(self.swarm.connect(target)).ok();
897899
}
898900
IpfsEvent::Addresses(ret) => {
899901
let addrs = self.swarm.addrs();

src/p2p/behaviour.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::pubsub::Pubsub;
2-
use super::swarm::{Connection, Disconnector, SwarmApi};
2+
use super::swarm::{Connection, ConnectionTarget, Disconnector, SwarmApi};
33
use crate::p2p::{SwarmOptions, SwarmTypes};
44
use crate::repo::BlockPut;
55
use crate::subscription::{SubscriptionFuture, SubscriptionRegistry};
@@ -411,8 +411,8 @@ impl<Types: IpfsTypes> Behaviour<Types> {
411411
self.swarm.connections()
412412
}
413413

414-
pub fn connect(&mut self, addr: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
415-
self.swarm.connect(addr)
414+
pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture<Result<(), String>> {
415+
self.swarm.connect(target)
416416
}
417417

418418
pub fn disconnect(&mut self, addr: Multiaddr) -> Option<Disconnector> {

src/p2p/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub(crate) mod pubsub;
1313
mod swarm;
1414
mod transport;
1515

16-
pub use swarm::Connection;
16+
pub use swarm::{Connection, ConnectionTarget};
1717

1818
pub type TSwarm<T> = Swarm<behaviour::Behaviour<T>>;
1919

src/p2p/swarm.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
44
use libp2p::swarm::protocols_handler::{
55
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
66
};
7-
use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm};
7+
use libp2p::swarm::{self, DialPeerCondition, NetworkBehaviour, PollParameters, Swarm};
88
use std::collections::{HashMap, HashSet, VecDeque};
99
use std::time::Duration;
1010

@@ -19,6 +19,24 @@ pub struct Connection {
1919
pub rtt: Option<Duration>,
2020
}
2121

22+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23+
pub enum ConnectionTarget {
24+
Addr(Multiaddr),
25+
PeerId(PeerId),
26+
}
27+
28+
impl From<Multiaddr> for ConnectionTarget {
29+
fn from(addr: Multiaddr) -> Self {
30+
Self::Addr(addr)
31+
}
32+
}
33+
34+
impl From<PeerId> for ConnectionTarget {
35+
fn from(peer_id: PeerId) -> Self {
36+
Self::PeerId(peer_id)
37+
}
38+
}
39+
2240
/// Disconnected will use banning to disconnect a node. Disconnecting a single peer connection is
2341
/// not supported at the moment.
2442
pub struct Disconnector {
@@ -87,13 +105,21 @@ impl SwarmApi {
87105
self.roundtrip_times.insert(peer_id.clone(), rtt);
88106
}
89107

90-
pub fn connect(&mut self, address: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
91-
trace!("starting to connect to {}", address);
92-
self.events.push_back(NetworkBehaviourAction::DialAddress {
93-
address: address.clone(),
108+
pub fn connect(&mut self, target: ConnectionTarget) -> SubscriptionFuture<Result<(), String>> {
109+
trace!("Connecting to {:?}", target);
110+
111+
self.events.push_back(match target {
112+
ConnectionTarget::Addr(ref addr) => NetworkBehaviourAction::DialAddress {
113+
address: addr.clone(),
114+
},
115+
ConnectionTarget::PeerId(ref id) => NetworkBehaviourAction::DialPeer {
116+
peer_id: id.clone(),
117+
condition: DialPeerCondition::Disconnected,
118+
},
94119
});
120+
95121
self.connect_registry
96-
.create_subscription(address.into(), None)
122+
.create_subscription(target.into(), None)
97123
}
98124

99125
pub fn disconnect(&mut self, address: Multiaddr) -> Option<Disconnector> {
@@ -157,6 +183,8 @@ impl NetworkBehaviour for SwarmApi {
157183
self.connections.insert(addr.clone(), peer_id.clone());
158184
self.connect_registry
159185
.finish_subscription(addr.clone().into(), Ok(()));
186+
self.connect_registry
187+
.finish_subscription(peer_id.clone().into(), Ok(()));
160188
}
161189

162190
fn inject_connected(&mut self, _peer_id: &PeerId) {
@@ -262,7 +290,7 @@ mod tests {
262290
};
263291

264292
let peer2 = async move {
265-
let future = swarm2.connect(rx.next().await.unwrap());
293+
let future = swarm2.connect(rx.next().await.unwrap().into());
266294

267295
let poll_swarm = async move {
268296
loop {

src/subscription.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! that contains them. `SubscriptionFuture` is the `Future` bound to pending `Subscription`s and
44
//! sharing the same unique numeric identifier, the `SubscriptionId`.
55
6-
use crate::RepoEvent;
6+
use crate::{p2p::ConnectionTarget, RepoEvent};
77
use async_std::future::Future;
88
use async_std::task::{self, Context, Poll, Waker};
99
use core::fmt::Debug;
@@ -12,7 +12,7 @@ use core::pin::Pin;
1212
use futures::channel::mpsc::Sender;
1313
use futures::lock::Mutex;
1414
use libipld::Cid;
15-
use libp2p::{kad::QueryId, Multiaddr};
15+
use libp2p::{kad::QueryId, Multiaddr, PeerId};
1616
use std::collections::HashMap;
1717
use std::convert::TryFrom;
1818
use std::fmt;
@@ -29,8 +29,8 @@ static GLOBAL_REQ_COUNT: AtomicU64 = AtomicU64::new(0);
2929
/// The type of a request for subscription.
3030
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
3131
pub enum RequestKind {
32-
/// A request to connect to the given `Multiaddr`.
33-
Connect(Multiaddr),
32+
/// A request to connect to the given `Multiaddr` or `PeerId`.
33+
Connect(ConnectionTarget),
3434
/// A request to obtain a `Block` with a specific `Cid`.
3535
GetBlock(Cid),
3636
/// A DHT request to Kademlia.
@@ -41,7 +41,19 @@ pub enum RequestKind {
4141

4242
impl From<Multiaddr> for RequestKind {
4343
fn from(addr: Multiaddr) -> Self {
44-
Self::Connect(addr)
44+
Self::Connect(ConnectionTarget::Addr(addr))
45+
}
46+
}
47+
48+
impl From<PeerId> for RequestKind {
49+
fn from(peer_id: PeerId) -> Self {
50+
Self::Connect(ConnectionTarget::PeerId(peer_id))
51+
}
52+
}
53+
54+
impl From<ConnectionTarget> for RequestKind {
55+
fn from(target: ConnectionTarget) -> Self {
56+
Self::Connect(target)
4557
}
4658
}
4759

@@ -60,7 +72,7 @@ impl From<QueryId> for RequestKind {
6072
impl fmt::Display for RequestKind {
6173
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
6274
match self {
63-
Self::Connect(addr) => write!(fmt, "Connect to {}", addr),
75+
Self::Connect(tgt) => write!(fmt, "Connect to {:?}", tgt),
6476
Self::GetBlock(cid) => write!(fmt, "Obtain block {}", cid),
6577
Self::KadQuery(id) => write!(fmt, "Kad request {:?}", id),
6678
#[cfg(test)]

tests/connect_two.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,40 @@
11
use ipfs::Node;
2+
use libp2p::PeerId;
23

34
// Make sure two instances of ipfs can be connected.
45
#[async_std::test]
5-
async fn connect_two_nodes() {
6+
async fn connect_two_nodes_by_addr() {
67
let node_a = Node::new("a").await;
78
let node_b = Node::new("b").await;
89

910
let (_, b_addrs) = node_b.identity().await.unwrap();
1011
assert!(!b_addrs.is_empty());
1112

12-
let mut connected = None;
13+
let mut connected = false;
1314

1415
for addr in b_addrs {
15-
println!("trying {}", addr);
16-
match node_a.connect(addr.clone()).await {
17-
Ok(_) => {
18-
connected = Some(addr);
19-
break;
20-
}
21-
Err(e) => {
22-
println!("Failed connecting to {}: {}", addr, e);
23-
}
16+
if node_a.connect(addr.clone()).await.is_ok() {
17+
connected = true;
18+
break;
2419
}
2520
}
2621

27-
let connected = connected.expect("Failed to connect to anything");
28-
println!("connected to {}", connected);
22+
assert!(connected);
23+
}
24+
25+
// Make sure two instances of ipfs can be connected by `PeerId`.
26+
#[async_std::test]
27+
async fn connect_two_nodes_by_peer_id() {
28+
let node_a = Node::new("a").await;
29+
let node_b = Node::new("b").await;
30+
31+
let (b_key, mut b_addrs) = node_b.identity().await.unwrap();
32+
let b_id = PeerId::from_public_key(b_key);
33+
34+
while let Some(addr) = b_addrs.pop() {
35+
node_a.add_peer(b_id.clone(), addr).await.unwrap();
36+
}
37+
node_a.connect(b_id).await.unwrap();
2938
}
3039

3140
// More complicated one to the above; first node will have two listening addresses and the second

0 commit comments

Comments
 (0)