Skip to content

Commit 8650b97

Browse files
committed
protocols/autonat/behaviour: smaller fixes, docs
1 parent 1cc2fe1 commit 8650b97

File tree

1 file changed

+58
-51
lines changed

1 file changed

+58
-51
lines changed

protocols/autonat/src/behaviour.rs

Lines changed: 58 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use std::{
4040

4141
type FiniteAddrScore = u32;
4242

43+
/// Config for the [`Behaviour`].
4344
pub struct Config {
45+
// Timeout for requests.
4446
timeout: Duration,
4547
}
4648

@@ -53,18 +55,26 @@ impl Default for Config {
5355
}
5456

5557
impl Config {
58+
/// Set the timeout for dial-requests.
5659
pub fn with_timeout(mut self, timeout: Duration) -> Self {
5760
self.timeout = timeout;
5861
self
5962
}
6063
}
6164

65+
/// Network Behaviour for AutoNAT.
6266
pub struct Behaviour {
67+
// Inner protocol for sending requests and receiving the response.
6368
inner: RequestResponse<AutoNatCodec>,
64-
local_addresses: HashMap<Multiaddr, FiniteAddrScore>,
65-
pending_inbound: HashMap<PeerId, ResponseChannel<DialResponse>>,
66-
pending_outbound: HashSet<PeerId>,
67-
send_request: VecDeque<PeerId>,
69+
// Local listening addresses with a score indicating their reachability.
70+
// The score increases each time a remote peer successfully dials this address.
71+
addresses: HashMap<Multiaddr, FiniteAddrScore>,
72+
// Ongoing inbound requests, where no response has been sent back to the remote yet.
73+
ongoing_inbound: HashMap<PeerId, ResponseChannel<DialResponse>>,
74+
// Ongoing outbound dial-requests, where no response has been received from the remote yet.
75+
ongoing_outbound: HashSet<PeerId>,
76+
// Recently connected peers to which we want to send a dial-request.
77+
pending_requests: VecDeque<PeerId>,
6878
}
6979

7080
impl Default for Behaviour {
@@ -81,18 +91,27 @@ impl Behaviour {
8191
let inner = RequestResponse::new(AutoNatCodec, protocols, cfg);
8292
Self {
8393
inner,
84-
local_addresses: HashMap::default(),
85-
pending_inbound: HashMap::default(),
86-
pending_outbound: HashSet::default(),
87-
send_request: VecDeque::default(),
94+
addresses: HashMap::default(),
95+
ongoing_inbound: HashMap::default(),
96+
ongoing_outbound: HashSet::default(),
97+
pending_requests: VecDeque::default(),
8898
}
8999
}
90100

101+
/// Add a new address to the address list that is send to remote peers in a dial-request.
91102
pub fn add_local_address(&mut self, address: Multiaddr) {
92-
if self.local_addresses.get(&address).is_none() {
93-
self.local_addresses.insert(address, 1);
103+
if self.addresses.get(&address).is_none() {
104+
self.addresses.insert(address, 1);
94105
}
95106
}
107+
108+
/// Get the list of local addresses with their current score.
109+
///
110+
/// The score of an address increases each time a remote peer successfully dialed us via this address.
111+
/// Therefore higher scores indicate a higher reachability.
112+
pub fn address_list(&self) -> impl Iterator<Item = (&Multiaddr, &FiniteAddrScore)> {
113+
self.addresses.iter()
114+
}
96115
}
97116

98117
impl NetworkBehaviour for Behaviour {
@@ -112,7 +131,8 @@ impl NetworkBehaviour for Behaviour {
112131
}
113132

114133
fn inject_disconnected(&mut self, peer: &PeerId) {
115-
self.inner.inject_disconnected(peer)
134+
self.inner.inject_disconnected(peer);
135+
self.ongoing_inbound.remove(peer);
116136
}
117137

118138
fn inject_connection_established(
@@ -123,11 +143,16 @@ impl NetworkBehaviour for Behaviour {
123143
) {
124144
self.inner
125145
.inject_connection_established(peer, conn, endpoint);
126-
if !self.pending_outbound.contains(peer) {
127-
self.send_request.push_back(*peer);
146+
147+
// Initiate a new dial request if there is none pending.
148+
if !self.ongoing_outbound.contains(peer) {
149+
self.pending_requests.push_back(*peer);
128150
}
151+
129152
if let ConnectedPoint::Dialer { address } = endpoint {
130-
if let Some(channel) = self.pending_inbound.remove(peer) {
153+
if let Some(channel) = self.ongoing_inbound.remove(peer) {
154+
// Successfully dialed one of the addresses from the remote peer.
155+
// TODO: Check if the address was part of the list received in the dial-request.
131156
let _ = self
132157
.inner
133158
.send_response(channel, DialResponse::Ok(address.clone()));
@@ -144,9 +169,6 @@ impl NetworkBehaviour for Behaviour {
144169
) {
145170
self.inner
146171
.inject_connection_closed(peer, conn, endpoint, handler);
147-
// Channel can be dropped, as the underlying substream already closed.
148-
self.pending_inbound.remove(peer);
149-
self.send_request.retain(|p| p != peer);
150172
}
151173

152174
fn inject_address_change(
@@ -157,24 +179,6 @@ impl NetworkBehaviour for Behaviour {
157179
new: &ConnectedPoint,
158180
) {
159181
self.inner.inject_address_change(peer, conn, old, new);
160-
if let ConnectedPoint::Listener {
161-
local_addr: old_addr,
162-
..
163-
} = old
164-
{
165-
match new {
166-
ConnectedPoint::Listener {
167-
local_addr: new_addr,
168-
..
169-
} if old_addr != new_addr => {
170-
self.local_addresses.remove(old_addr);
171-
if !self.local_addresses.contains_key(new_addr) {
172-
self.local_addresses.insert(new_addr.clone(), 1);
173-
}
174-
}
175-
_ => {}
176-
}
177-
}
178182
}
179183

180184
fn inject_event(
@@ -202,7 +206,8 @@ impl NetworkBehaviour for Behaviour {
202206
error: libp2p_swarm::DialError,
203207
) {
204208
self.inner.inject_dial_failure(peer_id, handler, error);
205-
if let Some(channel) = self.pending_inbound.remove(peer_id) {
209+
if let Some(channel) = self.ongoing_inbound.remove(peer_id) {
210+
// Failed to dial any of the addresses sent by the remote peer in their dial-request.
206211
let _ = self
207212
.inner
208213
.send_response(channel, DialResponse::Err(ResponseError::DialError));
@@ -225,14 +230,14 @@ impl NetworkBehaviour for Behaviour {
225230

226231
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
227232
self.inner.inject_new_listen_addr(id, addr);
228-
if !self.local_addresses.contains_key(addr) {
229-
self.local_addresses.insert(addr.clone(), 0);
233+
if !self.addresses.contains_key(addr) {
234+
self.addresses.insert(addr.clone(), 0);
230235
}
231236
}
232237

233238
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
234239
self.inner.inject_expired_listen_addr(id, addr);
235-
self.local_addresses.remove(addr);
240+
self.addresses.remove(addr);
236241
}
237242

238243
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
@@ -245,18 +250,19 @@ impl NetworkBehaviour for Behaviour {
245250

246251
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
247252
self.inner.inject_new_external_addr(addr);
248-
match self.local_addresses.get_mut(addr) {
253+
// Add the address to the local address list.
254+
match self.addresses.get_mut(addr) {
249255
Some(score) if *score == 0 => *score = 1,
250256
Some(_) => {}
251257
None => {
252-
self.local_addresses.insert(addr.clone(), 1);
258+
self.addresses.insert(addr.clone(), 1);
253259
}
254260
}
255261
}
256262

257263
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
258264
self.inner.inject_expired_external_addr(addr);
259-
self.local_addresses.remove(addr);
265+
self.addresses.remove(addr);
260266
}
261267

262268
fn poll(
@@ -265,9 +271,8 @@ impl NetworkBehaviour for Behaviour {
265271
params: &mut impl PollParameters,
266272
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
267273
loop {
268-
if let Some(peer_id) = self.send_request.pop_front() {
269-
let mut scores: Vec<(Multiaddr, FiniteAddrScore)> =
270-
self.local_addresses.clone().into_iter().collect();
274+
if let Some(peer_id) = self.pending_requests.pop_front() {
275+
let mut scores: Vec<_> = self.addresses.clone().into_iter().collect();
271276
// Sort so that the address with the highest score will be dialed first by the remote.
272277
scores.sort_by(|(_, score_a), (_, score_b)| score_b.cmp(score_a));
273278
let addrs = scores.into_iter().map(|(a, _)| a).collect();
@@ -283,11 +288,12 @@ impl NetworkBehaviour for Behaviour {
283288
request: DialRequest { peer_id, addrs },
284289
channel,
285290
} => {
291+
// Add all addresses to the address book.
286292
for addr in addrs {
287293
self.inner.add_address(&peer, addr)
288294
}
289-
// TODO: Handle if there is already a pending request.
290-
self.pending_inbound.insert(peer_id, channel);
295+
// TODO: Handle if there is already a ongoing request.
296+
self.ongoing_inbound.insert(peer_id, channel);
291297
return Poll::Ready(NetworkBehaviourAction::DialPeer {
292298
peer_id: peer,
293299
handler: self.inner.new_handler(),
@@ -298,9 +304,10 @@ impl NetworkBehaviour for Behaviour {
298304
request_id: _,
299305
response,
300306
} => {
301-
self.pending_outbound.remove(&peer);
307+
self.ongoing_outbound.remove(&peer);
302308
if let DialResponse::Ok(address) = response {
303-
let score = self.local_addresses.entry(address.clone()).or_insert(1);
309+
// Increase score of the successfully dialed address.
310+
let score = self.addresses.entry(address.clone()).or_insert(1);
304311
*score += 1;
305312
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
306313
address,
@@ -315,12 +322,12 @@ impl NetworkBehaviour for Behaviour {
315322
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
316323
RequestResponseEvent::OutboundFailure { peer, .. },
317324
)) => {
318-
self.pending_outbound.remove(&peer);
325+
self.ongoing_outbound.remove(&peer);
319326
}
320327
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
321328
RequestResponseEvent::InboundFailure { peer, .. },
322329
)) => {
323-
self.pending_inbound.remove(&peer);
330+
self.ongoing_inbound.remove(&peer);
324331
}
325332
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => {
326333
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })

0 commit comments

Comments
 (0)