Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.

Commit a54c545

Browse files
committed
fix(swarmapi): connect using DialPeer not -Address
while libp2p might soon gain the ability to dial peer addresses directly, this will be a stopgap until that. updated a bunch of comments and adapted to the since fixed missing events on banning the peer, which is still used to disconnect peer at will.
1 parent 98ef7d7 commit a54c545

File tree

2 files changed

+194
-54
lines changed

2 files changed

+194
-54
lines changed

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1510,7 +1510,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
15101510
let _ = ret.send(peers);
15111511
}
15121512
IpfsEvent::FindPeer(peer_id, local_only, ret) => {
1513-
let swarm_addrs = self.swarm.swarm.addresses_of_peer(&peer_id);
1513+
let swarm_addrs = self.swarm.swarm.connections_to(&peer_id);
15141514
let locally_known_addrs = if !swarm_addrs.is_empty() {
15151515
swarm_addrs
15161516
} else {

src/p2p/swarm.rs

Lines changed: 193 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
55
use libp2p::swarm::protocols_handler::{
66
DummyProtocolsHandler, IntoProtocolsHandler, ProtocolsHandler,
77
};
8-
use libp2p::swarm::{self, NetworkBehaviour, PollParameters, Swarm};
9-
use std::collections::{HashMap, HashSet, VecDeque};
8+
use libp2p::swarm::{self, DialPeerCondition, NetworkBehaviour, PollParameters, Swarm};
9+
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
1010
use std::convert::{TryFrom, TryInto};
1111
use std::time::Duration;
1212

@@ -38,11 +38,23 @@ type NetworkBehaviourAction = swarm::NetworkBehaviourAction<<<<SwarmApi as Netwo
3838
#[derive(Debug, Default)]
3939
pub struct SwarmApi {
4040
events: VecDeque<NetworkBehaviourAction>,
41+
42+
// FIXME: anything related to this is probably wrong, and doesn't behave as one would expect
43+
// from the method names
4144
peers: HashSet<PeerId>,
4245
connect_registry: SubscriptionRegistry<(), String>,
4346
connections: HashMap<MultiaddrWithoutPeerId, PeerId>,
4447
roundtrip_times: HashMap<PeerId, Duration>,
4548
connected_peers: HashMap<PeerId, Vec<MultiaddrWithoutPeerId>>,
49+
50+
/// The connections which have been requested, but the swarm/network is yet to ask for
51+
/// addresses; currently filled in the order of adding, with the default size of one.
52+
pending_addresses: HashMap<PeerId, Vec<Multiaddr>>,
53+
54+
/// The connections which have been requested, and the swarm/network has requested the
55+
/// addresses of. Used to keep finishing all of the subscriptions.
56+
pending_connections: HashMap<PeerId, Vec<Multiaddr>>,
57+
4658
pub(crate) bootstrappers: HashSet<MultiaddrWithPeerId>,
4759
}
4860

@@ -77,12 +89,18 @@ impl SwarmApi {
7789
}
7890

7991
pub fn set_rtt(&mut self, peer_id: &PeerId, rtt: Duration) {
80-
// FIXME: this is for any connection
92+
// NOTE: this is for any connection
8193
self.roundtrip_times.insert(*peer_id, rtt);
8294
}
8395

8496
pub fn connect(&mut self, addr: MultiaddrWithPeerId) -> Option<SubscriptionFuture<(), String>> {
85-
if self.connections.contains_key(&addr.multiaddr) {
97+
let connected_already = self
98+
.connected_peers
99+
.get(&addr.peer_id)
100+
.map(|conns| conns.iter().any(|wo| wo == &addr.multiaddr))
101+
.unwrap_or(false);
102+
103+
if connected_already {
86104
return None;
87105
}
88106

@@ -94,37 +112,38 @@ impl SwarmApi {
94112

95113
// libp2p currently doesn't support dialing with the P2p protocol, so only consider the
96114
// "bare" Multiaddr
97-
let MultiaddrWithPeerId { multiaddr, .. } = addr;
115+
let MultiaddrWithPeerId { multiaddr, peer_id } = addr;
98116

99-
self.events.push_back(NetworkBehaviourAction::DialAddress {
100-
address: multiaddr.into(),
117+
self.events.push_back(NetworkBehaviourAction::DialPeer {
118+
peer_id,
119+
// rationale: this is sort of explicit command, perhaps the old address is no longer
120+
// valid. Always would be even better but it's bugged at the moment.
121+
condition: DialPeerCondition::NotDialing,
101122
});
102123

124+
self.pending_addresses
125+
.entry(peer_id)
126+
.or_insert_with(|| Vec::with_capacity(1))
127+
.push(multiaddr.into());
128+
103129
Some(subscription)
104130
}
105131

106132
pub fn disconnect(&mut self, addr: MultiaddrWithPeerId) -> Option<Disconnector> {
107-
trace!("disconnect {}", addr);
108-
// FIXME: closing a single specific connection would be allowed for ProtocolHandlers
109-
if let Some(peer_id) = self.connections.remove(&addr.multiaddr) {
110-
// wasted some time wondering if the peer should be removed here or not; it should. the
111-
// API is a bit ackward since we can't tolerate the Disconnector::disconnect **not**
112-
// being called.
113-
//
114-
// there are currently no events being fired from the closing of connections to banned
115-
// peer, so we need to modify the accounting even before the banning happens.
116-
self.mark_disconnected(&peer_id);
133+
trace!("request to disconnect {}", addr);
134+
if let Some(&peer_id) = self.connections.get(&addr.multiaddr) {
117135
Some(Disconnector { peer_id })
118136
} else {
119137
None
120138
}
121139
}
122140

123-
fn mark_disconnected(&mut self, peer_id: &PeerId) {
124-
for address in self.connected_peers.remove(peer_id).into_iter().flatten() {
125-
self.connections.remove(&address);
126-
}
127-
self.roundtrip_times.remove(peer_id);
141+
pub fn connections_to(&self, peer_id: &PeerId) -> Vec<Multiaddr> {
142+
self.connected_peers
143+
.get(peer_id)
144+
.cloned()
145+
.map(|addrs| addrs.into_iter().map(From::from).collect())
146+
.unwrap_or_default()
128147
}
129148
}
130149

@@ -133,17 +152,22 @@ impl NetworkBehaviour for SwarmApi {
133152
type OutEvent = void::Void;
134153

135154
fn new_handler(&mut self) -> Self::ProtocolsHandler {
136-
trace!("new_handler");
137155
Default::default()
138156
}
139157

140158
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
141-
trace!("addresses_of_peer {}", peer_id);
142-
self.connected_peers
143-
.get(peer_id)
144-
.cloned()
145-
.map(|addrs| addrs.into_iter().map(From::from).collect())
146-
.unwrap_or_default()
159+
// when libp2p starts dialing, it'll collect these from all of known addresses for the peer
160+
// from the behaviour and dial them all through, ending with calls to inject_connected or
161+
// inject_addr_reach_failure.
162+
let addresses = self.pending_addresses.remove(peer_id).unwrap_or_default();
163+
164+
// store the "given out" addresses as we have created the subscriptions for them
165+
self.pending_connections
166+
.entry(*peer_id)
167+
.or_default()
168+
.extend(addresses.iter().cloned());
169+
170+
addresses
147171
}
148172

149173
fn inject_connection_established(
@@ -153,28 +177,82 @@ impl NetworkBehaviour for SwarmApi {
153177
cp: &ConnectedPoint,
154178
) {
155179
// TODO: could be that the connection is not yet fully established at this point
156-
trace!("inject_connected {} {:?}", peer_id, cp);
180+
trace!("inject_connection_established {} {:?}", peer_id, cp);
157181
let addr: MultiaddrWithoutPeerId = connection_point_addr(cp).to_owned().try_into().unwrap();
158182

159183
self.peers.insert(*peer_id);
160184
let connections = self.connected_peers.entry(*peer_id).or_default();
161185
connections.push(addr.clone());
162186

163-
self.connections.insert(addr.clone(), *peer_id);
187+
let prev = self.connections.insert(addr.clone(), *peer_id);
164188

165-
if let ConnectedPoint::Dialer { .. } = cp {
166-
let addr = MultiaddrWithPeerId {
167-
multiaddr: addr,
168-
peer_id: *peer_id,
169-
};
189+
if let Some(prev) = prev {
190+
error!(
191+
"tracked connection was replaced from {} => {}: {}",
192+
prev, peer_id, addr
193+
);
194+
}
170195

171-
self.connect_registry
172-
.finish_subscription(addr.into(), Ok(()));
196+
if let ConnectedPoint::Dialer { address } = cp {
197+
// we dialed to the `address`
198+
match self.pending_connections.entry(*peer_id) {
199+
Entry::Occupied(mut oe) => {
200+
let addresses = oe.get_mut();
201+
let just_connected = addresses.iter().position(|x| x == address);
202+
if let Some(just_connected) = just_connected {
203+
addresses.swap_remove(just_connected);
204+
if addresses.is_empty() {
205+
oe.remove();
206+
}
207+
208+
let addr = MultiaddrWithoutPeerId::try_from(address.clone())
209+
.expect("dialed address did not contain peerid in libp2p 0.34")
210+
.with(*peer_id);
211+
212+
self.connect_registry
213+
.finish_subscription(addr.into(), Ok(()));
214+
}
215+
}
216+
Entry::Vacant(_) => {
217+
// we not connecting to this peer through this api, must be libp2p_kad or
218+
// something else.
219+
}
220+
}
173221
}
174222
}
175223

176-
fn inject_connected(&mut self, _peer_id: &PeerId) {
224+
fn inject_connected(&mut self, peer_id: &PeerId) {
177225
// we have at least one fully open connection and handler is running
226+
//
227+
// just finish all of the subscriptions that remain.
228+
trace!("inject connected {}", peer_id);
229+
230+
let all_subs = self
231+
.pending_addresses
232+
.remove(peer_id)
233+
.unwrap_or_default()
234+
.into_iter()
235+
.chain(
236+
self.pending_connections
237+
.remove(peer_id)
238+
.unwrap_or_default()
239+
.into_iter(),
240+
);
241+
242+
for addr in all_subs {
243+
let addr = MultiaddrWithoutPeerId::try_from(addr)
244+
.expect("peerid has been stripped earlier")
245+
.with(*peer_id);
246+
247+
// fail the other than already connected subscriptions in
248+
// inject_connection_established. while the whole swarmapi is quite unclear on the
249+
// actual use cases, assume that connecting one is good enough for all outstanding
250+
// connection requests.
251+
self.connect_registry.finish_subscription(
252+
addr.into(),
253+
Err("finished connecting to another address".into()),
254+
);
255+
}
178256
}
179257

180258
fn inject_connection_closed(
@@ -202,7 +280,14 @@ impl NetworkBehaviour for SwarmApi {
202280

203281
Entry::Vacant(_) => {}
204282
}
205-
self.connections.remove(&closed_addr);
283+
284+
let removed = self.connections.remove(&closed_addr);
285+
286+
debug_assert!(
287+
removed.is_some(),
288+
"connection was not tracked but it should had been: {}",
289+
closed_addr
290+
);
206291

207292
if let ConnectedPoint::Dialer { .. } = cp {
208293
let addr = MultiaddrWithPeerId::from((closed_addr, peer_id.to_owned()));
@@ -213,32 +298,87 @@ impl NetworkBehaviour for SwarmApi {
213298
}
214299

215300
fn inject_disconnected(&mut self, peer_id: &PeerId) {
216-
// in rust-libp2p 0.19 this at least will not be invoked for a peer we boot by banning it.
217301
trace!("inject_disconnected: {}", peer_id);
218-
self.mark_disconnected(peer_id);
302+
assert!(!self.connected_peers.contains_key(peer_id));
303+
self.roundtrip_times.remove(peer_id);
304+
305+
let failed = self
306+
.pending_addresses
307+
.remove(peer_id)
308+
.unwrap_or_default()
309+
.into_iter()
310+
.chain(
311+
self.pending_connections
312+
.remove(peer_id)
313+
.unwrap_or_default()
314+
.into_iter(),
315+
);
316+
317+
for addr in failed {
318+
let addr = MultiaddrWithoutPeerId::try_from(addr)
319+
.expect("peerid has been stripped earlier")
320+
.with(*peer_id);
321+
322+
self.connect_registry
323+
.finish_subscription(addr.into(), Err("disconnected".into()));
324+
}
219325
}
220326

221327
fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}
222328

329+
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
330+
trace!("inject_dial_failure: {}", peer_id);
331+
if self.pending_addresses.contains_key(peer_id) {
332+
// it is possible that these addresses have not been tried yet; they will be asked
333+
// for soon.
334+
self.events
335+
.push_back(swarm::NetworkBehaviourAction::DialPeer {
336+
peer_id: *peer_id,
337+
condition: DialPeerCondition::NotDialing,
338+
});
339+
}
340+
341+
// this should not be executed once, but probably will be in case unsupported addresses
342+
// happen
343+
for failed in self.pending_connections.remove(peer_id).unwrap_or_default() {
344+
let addr = MultiaddrWithoutPeerId::try_from(failed)
345+
.expect("peerid has been stripped earlier")
346+
.with(*peer_id);
347+
348+
self.connect_registry
349+
.finish_subscription(addr.into(), Err("addresses exhausted".into()));
350+
}
351+
}
352+
223353
fn inject_addr_reach_failure(
224354
&mut self,
225355
peer_id: Option<&PeerId>,
226356
addr: &Multiaddr,
227357
error: &dyn std::error::Error,
228358
) {
229359
trace!("inject_addr_reach_failure {} {}", addr, error);
360+
230361
if let Some(peer_id) = peer_id {
231-
let addr: MultiaddrWithPeerId = if let Ok(addr) = addr.to_owned().try_into() {
232-
addr
233-
} else {
234-
(
235-
MultiaddrWithoutPeerId::try_from(addr.to_owned()).unwrap(),
236-
peer_id.to_owned(),
237-
)
238-
.into()
239-
};
240-
self.connect_registry
241-
.finish_subscription(addr.into(), Err(error.to_string()));
362+
match self.pending_connections.entry(*peer_id) {
363+
Entry::Occupied(mut oe) => {
364+
let addresses = oe.get_mut();
365+
let pos = addresses.iter().position(|a| a == addr);
366+
367+
if let Some(pos) = pos {
368+
addresses.swap_remove(pos);
369+
let addr = MultiaddrWithoutPeerId::try_from(addr.clone())
370+
.expect("multiaddr didn't contain peer id in libp2p 0.34")
371+
.with(*peer_id);
372+
self.connect_registry
373+
.finish_subscription(addr.into(), Err(error.to_string()));
374+
}
375+
376+
if addresses.is_empty() {
377+
oe.remove();
378+
}
379+
}
380+
Entry::Vacant(_) => {}
381+
}
242382
}
243383
}
244384

0 commit comments

Comments
 (0)