Skip to content

Commit 28f3514

Browse files
committed
protocols/autonat: Add basic behaviour logic
1 parent bccf94c commit 28f3514

File tree

3 files changed

+175
-66
lines changed

3 files changed

+175
-66
lines changed

protocols/autonat/src/autonat.rs renamed to protocols/autonat/src/behaviour.rs

Lines changed: 121 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,49 +18,61 @@
1818
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
1919
// DEALINGS IN THE SOFTWARE.
2020

21-
pub use crate::protocol::DialResponse;
22-
use crate::protocol::{AutoNatCodec, AutoNatProtocol};
21+
use crate::protocol::{AutoNatCodec, AutoNatProtocol, DialRequest, DialResponse, ResponseError};
2322
use libp2p_core::{
2423
connection::{ConnectionId, ListenerId},
2524
ConnectedPoint, Multiaddr, PeerId,
2625
};
2726
use libp2p_request_response::{
2827
handler::RequestResponseHandlerEvent, ProtocolSupport, RequestResponse, RequestResponseConfig,
29-
RequestResponseEvent, RequestResponseMessage,
28+
RequestResponseEvent, RequestResponseMessage, ResponseChannel,
3029
};
3130
use libp2p_swarm::{
32-
IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
31+
AddressScore, DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour,
32+
NetworkBehaviourAction, PollParameters,
3333
};
3434
use std::{
35+
collections::{HashMap, HashSet, VecDeque},
3536
iter,
3637
task::{Context, Poll},
3738
};
3839

39-
#[derive(Clone, Debug, Eq, PartialEq)]
40-
pub enum AutoNatEvent {
41-
DialResponse(DialResponse),
42-
}
40+
type FiniteAddrScore = u32;
4341

4442
pub struct AutoNat {
4543
inner: RequestResponse<AutoNatCodec>,
44+
local_addresses: HashMap<Multiaddr, FiniteAddrScore>,
45+
pending_inbound: HashMap<PeerId, ResponseChannel<DialResponse>>,
46+
pending_outbound: HashSet<PeerId>,
47+
send_request: VecDeque<PeerId>,
4648
}
4749

48-
impl AutoNat {
49-
pub fn new() -> Self {
50+
impl Default for AutoNat {
51+
fn default() -> Self {
5052
let protocols = iter::once((AutoNatProtocol, ProtocolSupport::Full));
5153
let cfg = RequestResponseConfig::default();
5254
let inner = RequestResponse::new(AutoNatCodec, protocols, cfg);
53-
Self { inner }
55+
Self {
56+
inner,
57+
local_addresses: HashMap::default(),
58+
pending_inbound: HashMap::default(),
59+
pending_outbound: HashSet::default(),
60+
send_request: VecDeque::default(),
61+
}
5462
}
63+
}
5564

56-
pub fn add_address(&mut self, peer_id: &PeerId, addr: Multiaddr) {
57-
self.inner.add_address(peer_id, addr)
65+
impl AutoNat {
66+
pub fn add_local_address(&mut self, address: Multiaddr) {
67+
if self.local_addresses.get(&address).is_none() {
68+
self.local_addresses.insert(address, 1);
69+
}
5870
}
5971
}
6072

6173
impl NetworkBehaviour for AutoNat {
6274
type ProtocolsHandler = <RequestResponse<AutoNatCodec> as NetworkBehaviour>::ProtocolsHandler;
63-
type OutEvent = AutoNatEvent;
75+
type OutEvent = ();
6476

6577
fn new_handler(&mut self) -> Self::ProtocolsHandler {
6678
self.inner.new_handler()
@@ -85,7 +97,17 @@ impl NetworkBehaviour for AutoNat {
8597
endpoint: &ConnectedPoint,
8698
) {
8799
self.inner
88-
.inject_connection_established(peer, conn, endpoint)
100+
.inject_connection_established(peer, conn, endpoint);
101+
if !self.pending_outbound.contains(peer) {
102+
self.send_request.push_back(*peer);
103+
}
104+
if let ConnectedPoint::Dialer { address } = endpoint {
105+
if let Some(channel) = self.pending_inbound.remove(peer) {
106+
let _ = self
107+
.inner
108+
.send_response(channel, DialResponse::Ok(address.clone()));
109+
}
110+
}
89111
}
90112

91113
fn inject_connection_closed(
@@ -96,7 +118,10 @@ impl NetworkBehaviour for AutoNat {
96118
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
97119
) {
98120
self.inner
99-
.inject_connection_closed(peer, conn, endpoint, handler)
121+
.inject_connection_closed(peer, conn, endpoint, handler);
122+
// Channel can be dropped, as the underlying substream already closed.
123+
self.pending_inbound.remove(peer);
124+
self.send_request.retain(|p| p != peer);
100125
}
101126

102127
fn inject_address_change(
@@ -106,16 +131,34 @@ impl NetworkBehaviour for AutoNat {
106131
old: &ConnectedPoint,
107132
new: &ConnectedPoint,
108133
) {
109-
self.inner.inject_address_change(peer, conn, old, new)
134+
self.inner.inject_address_change(peer, conn, old, new);
135+
if let ConnectedPoint::Listener {
136+
local_addr: old_addr,
137+
..
138+
} = old
139+
{
140+
match new {
141+
ConnectedPoint::Listener {
142+
local_addr: new_addr,
143+
..
144+
} if old_addr != new_addr => {
145+
self.local_addresses.remove(old_addr);
146+
if !self.local_addresses.contains_key(new_addr) {
147+
self.local_addresses.insert(new_addr.clone(), 1);
148+
}
149+
}
150+
_ => {}
151+
}
152+
}
110153
}
111154

112155
fn inject_event(
113156
&mut self,
114-
peer: PeerId,
157+
peer_id: PeerId,
115158
conn: ConnectionId,
116159
event: RequestResponseHandlerEvent<AutoNatCodec>,
117160
) {
118-
self.inner.inject_event(peer, conn, event)
161+
self.inner.inject_event(peer_id, conn, event)
119162
}
120163

121164
fn inject_addr_reach_failure(
@@ -133,7 +176,12 @@ impl NetworkBehaviour for AutoNat {
133176
handler: Self::ProtocolsHandler,
134177
error: libp2p_swarm::DialError,
135178
) {
136-
self.inner.inject_dial_failure(peer_id, handler, error)
179+
self.inner.inject_dial_failure(peer_id, handler, error);
180+
if let Some(channel) = self.pending_inbound.remove(peer_id) {
181+
let _ = self
182+
.inner
183+
.send_response(channel, DialResponse::Err(ResponseError::DialError));
184+
}
137185
}
138186

139187
fn inject_listen_failure(
@@ -151,11 +199,15 @@ impl NetworkBehaviour for AutoNat {
151199
}
152200

153201
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
154-
self.inner.inject_new_listen_addr(id, addr)
202+
self.inner.inject_new_listen_addr(id, addr);
203+
if !self.local_addresses.contains_key(addr) {
204+
self.local_addresses.insert(addr.clone(), 0);
205+
}
155206
}
156207

157208
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
158-
self.inner.inject_expired_listen_addr(id, addr)
209+
self.inner.inject_expired_listen_addr(id, addr);
210+
self.local_addresses.remove(addr);
159211
}
160212

161213
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
@@ -167,11 +219,19 @@ impl NetworkBehaviour for AutoNat {
167219
}
168220

169221
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
170-
self.inner.inject_new_external_addr(addr)
222+
self.inner.inject_new_external_addr(addr);
223+
match self.local_addresses.get_mut(addr) {
224+
Some(score) if *score == 0 => *score = 1,
225+
Some(_) => {},
226+
None => {
227+
self.local_addresses.insert(addr.clone(), 1);
228+
}
229+
}
171230
}
172231

173232
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
174-
self.inner.inject_expired_external_addr(addr)
233+
self.inner.inject_expired_external_addr(addr);
234+
self.local_addresses.remove(addr);
175235
}
176236

177237
fn poll(
@@ -180,46 +240,62 @@ impl NetworkBehaviour for AutoNat {
180240
params: &mut impl PollParameters,
181241
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
182242
loop {
243+
if let Some(peer_id) = self.send_request.pop_front() {
244+
let mut scores: Vec<(Multiaddr, FiniteAddrScore)> =
245+
self.local_addresses.clone().into_iter().collect();
246+
// Sort so that the address with the highest score will be dialed first by the remote.
247+
scores.sort_by(|(_, score_a), (_, score_b)| score_b.cmp(score_a));
248+
let addrs = scores.into_iter().map(|(a, _)| a).collect();
249+
self.inner
250+
.send_request(&peer_id, DialRequest { peer_id, addrs });
251+
}
183252
match self.inner.poll(cx, params) {
184253
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
185254
RequestResponseEvent::Message { peer, message },
186255
)) => match message {
187256
RequestResponseMessage::Request {
188-
request_id,
189-
request,
257+
request_id: _,
258+
request: DialRequest { peer_id, addrs },
190259
channel,
191260
} => {
192-
println!("{} {:?} {:?} {:?}", peer, request_id, request, channel);
261+
for addr in addrs {
262+
self.inner.add_address(&peer, addr)
263+
}
264+
// TODO: Handle if there is already a pending request.
265+
self.pending_inbound.insert(peer_id, channel);
266+
return Poll::Ready(NetworkBehaviourAction::DialPeer {
267+
peer_id: peer,
268+
handler: self.inner.new_handler(),
269+
condition: DialPeerCondition::Always,
270+
});
193271
}
194272
RequestResponseMessage::Response {
195-
request_id,
273+
request_id: _,
196274
response,
197275
} => {
198-
println!("{} {:?} {:?}", peer, request_id, response);
276+
self.pending_outbound.remove(&peer);
277+
if let DialResponse::Ok(address) = response {
278+
let score = self.local_addresses.entry(address.clone()).or_insert(1);
279+
*score += 1;
280+
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
281+
address,
282+
score: AddressScore::Finite(*score),
283+
});
284+
}
199285
}
200286
},
201287
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
202-
RequestResponseEvent::ResponseSent { peer, request_id },
203-
)) => {
204-
println!("response sent {} {:?}", peer, request_id);
205-
}
288+
RequestResponseEvent::ResponseSent { .. },
289+
)) => {}
206290
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
207-
RequestResponseEvent::OutboundFailure {
208-
peer,
209-
request_id,
210-
error,
211-
},
291+
RequestResponseEvent::OutboundFailure { peer, .. },
212292
)) => {
213-
println!("outbound failure {} {:?} {:?}", peer, request_id, error);
293+
self.pending_outbound.remove(&peer);
214294
}
215295
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
216-
RequestResponseEvent::InboundFailure {
217-
peer,
218-
error,
219-
request_id,
220-
},
296+
RequestResponseEvent::InboundFailure { peer, .. },
221297
)) => {
222-
println!("inbound failure {} {:?} {:?}", peer, request_id, error);
298+
self.pending_inbound.remove(&peer);
223299
}
224300
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => {
225301
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })

protocols/autonat/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
//! Implementation of the [AutoNAT] protocol.
2222
23-
pub use self::autonat::{AutoNat, AutoNatEvent, DialResponse};
23+
pub use self::behaviour::AutoNat;
2424

25-
mod autonat;
25+
mod behaviour;
2626
mod protocol;
2727

2828
mod structs_proto {

0 commit comments

Comments
 (0)