@@ -12,7 +12,7 @@ use crate::{
12
12
Endpoint , NodeAddr , NodeId ,
13
13
} ;
14
14
use anyhow:: anyhow;
15
- use futures_lite:: future:: Boxed as BoxFuture ;
15
+ use futures_lite:: { future:: Boxed as BoxFuture , Stream } ;
16
16
use tokio:: task:: JoinSet ;
17
17
use tokio_util:: sync:: CancellationToken ;
18
18
use tracing:: error;
@@ -21,9 +21,17 @@ use tracing::error;
21
21
///
22
22
/// This wraps a [`Dialer`] and exposes a function to also push accepted connections.
23
23
/// When pushing accepted connections, it is ensured that only a single connection between
24
- /// ourselves and a peer will prevail. If dials from both sides happen concurrently, only one of
25
- /// them will prevail. This is implemented by sorting our node id and the peer's node id, and only
26
- /// keeping the connection with the higher sorted dialing node.
24
+ /// ourselves and a peer will prevail.
25
+ ///
26
+ /// The [`ConnManager`] does not accept connections from the endpoint by itself. Instead, you
27
+ /// should run an accept loop yourself, and push connections with a matching ALPN into the manager
28
+ /// with [`ConnManager::accept`]. The connection will be dropped if we already have a connection to
29
+ /// that node. If we are currently dialing the node, the connection will only be accepted if the
30
+ /// peer's node id sorts lower than our node id. Through this, it is ensured that we will not get
31
+ /// double connections with a node if both we and them dial each other at the same time.
32
+ ///
33
+ /// The [`ConnManager`] implements [`Stream`]. It will yield new connections, both from dialing and
34
+ /// accepting.
27
35
#[ derive( Debug ) ]
28
36
pub struct ConnManager {
29
37
dialer : Dialer ,
@@ -45,12 +53,20 @@ impl ConnManager {
45
53
}
46
54
47
55
/// Push a newly accepted connection into the manager.
48
- pub fn push_accept ( & mut self , conn : quinn:: Connection ) -> anyhow:: Result < ( ) > {
56
+ ///
57
+ /// This does not check the connection's ALPN, so you should make sure that the ALPN matches
58
+ /// the [`ConnManager`]'s execpected ALPN before passing the connection to [`Self::accept].
59
+ ///
60
+ /// If we are currently dialing the node, the connection will be dropped if the peer's node id
61
+ /// sorty higher than our node id.
62
+ ///
63
+ /// Returns an error if getting the peer's node id from the TLS certificate fails.
64
+ pub fn accept ( & mut self , conn : quinn:: Connection ) -> anyhow:: Result < ( ) > {
49
65
let node_id = get_remote_node_id ( & conn) ?;
50
66
if self . is_connected ( & node_id) {
51
67
return Ok ( ( ) ) ;
52
68
}
53
- // if we are also dialing this node, only accept if node id is greater than ours.
69
+ // If we are also dialing this node, only accept if node id is greater than ours.
54
70
// this deduplicates connections.
55
71
if !self . dialer . is_pending ( & node_id) || node_id > self . our_node_id ( ) {
56
72
self . dialer . abort_dial ( & node_id) ;
@@ -60,13 +76,11 @@ impl ConnManager {
60
76
Ok ( ( ) )
61
77
}
62
78
63
- fn our_node_id ( & self ) -> NodeId {
64
- self . dialer . endpoint . node_id ( )
65
- }
66
-
67
- /// Remove a connection to a node, if it exists.
79
+ /// Remove the connection to a node.
68
80
///
69
81
/// Also aborts pending dials to the node, if existing.
82
+ ///
83
+ /// Returns the connection if it existed.
70
84
pub fn remove ( & mut self , node_id : & NodeId ) -> Option < Connection > {
71
85
self . dialer . abort_dial ( node_id) ;
72
86
self . active . remove ( node_id)
@@ -85,7 +99,7 @@ impl ConnManager {
85
99
if self . is_dialing ( & node_id) || self . is_connected ( & node_id) {
86
100
return ;
87
101
}
88
- self . dialer . queue_dial ( node_id, & self . alpn ) ;
102
+ self . dialer . queue_dial ( node_id, self . alpn ) ;
89
103
}
90
104
91
105
/// Returns `true` if we are currently dialing the node.
@@ -97,9 +111,13 @@ impl ConnManager {
97
111
pub fn is_connected ( & self , node_id : & NodeId ) -> bool {
98
112
self . active . contains_key ( node_id)
99
113
}
114
+
115
+ fn our_node_id ( & self ) -> NodeId {
116
+ self . dialer . endpoint . node_id ( )
117
+ }
100
118
}
101
119
102
- impl futures_lite :: Stream for ConnManager {
120
+ impl Stream for ConnManager {
103
121
type Item = NewConnection ;
104
122
105
123
fn poll_next (
@@ -241,7 +259,7 @@ impl Dialer {
241
259
}
242
260
}
243
261
244
- impl futures_lite :: Stream for Dialer {
262
+ impl Stream for Dialer {
245
263
type Item = ( PublicKey , anyhow:: Result < quinn:: Connection > ) ;
246
264
247
265
fn poll_next (
0 commit comments