@@ -23,14 +23,14 @@ use libp2p::core::identity;
23
23
use libp2p:: core:: PeerId ;
24
24
use libp2p:: multiaddr:: Protocol ;
25
25
use libp2p:: ping:: { Ping , PingConfig , PingEvent , PingSuccess } ;
26
- use libp2p:: swarm:: Swarm ;
27
26
use libp2p:: swarm:: SwarmEvent ;
27
+ use libp2p:: Swarm ;
28
28
use libp2p:: { development_transport, rendezvous, Multiaddr } ;
29
29
use std:: time:: Duration ;
30
30
31
31
const NAMESPACE : & str = "rendezvous" ;
32
32
33
- #[ async_std :: main]
33
+ #[ tokio :: main]
34
34
async fn main ( ) {
35
35
env_logger:: init ( ) ;
36
36
@@ -44,7 +44,11 @@ async fn main() {
44
44
development_transport ( identity. clone ( ) ) . await . unwrap ( ) ,
45
45
MyBehaviour {
46
46
rendezvous : rendezvous:: client:: Behaviour :: new ( identity. clone ( ) ) ,
47
- ping : Ping :: new ( PingConfig :: new ( ) . with_interval ( Duration :: from_secs ( 1 ) ) ) ,
47
+ ping : Ping :: new (
48
+ PingConfig :: new ( )
49
+ . with_interval ( Duration :: from_secs ( 1 ) )
50
+ . with_keep_alive ( true ) ,
51
+ ) ,
48
52
} ,
49
53
PeerId :: from ( identity. public ( ) ) ,
50
54
) ;
@@ -53,62 +57,77 @@ async fn main() {
53
57
54
58
let _ = swarm. dial_addr ( rendezvous_point_address. clone ( ) ) ;
55
59
56
- while let Some ( event) = swarm. next ( ) . await {
57
- match event {
58
- SwarmEvent :: ConnectionEstablished { peer_id, .. } if peer_id == rendezvous_point => {
59
- log:: info!(
60
- "Connected to rendezvous point, discovering nodes in '{}' namespace ..." ,
61
- NAMESPACE
62
- ) ;
60
+ let mut discover_tick = tokio:: time:: interval ( Duration :: from_secs ( 30 ) ) ;
61
+ let mut cookie = None ;
63
62
63
+ loop {
64
+ tokio:: select! {
65
+ event = swarm. select_next_some( ) => match event {
66
+ SwarmEvent :: ConnectionEstablished { peer_id, .. } if peer_id == rendezvous_point => {
67
+ log:: info!(
68
+ "Connected to rendezvous point, discovering nodes in '{}' namespace ..." ,
69
+ NAMESPACE
70
+ ) ;
71
+
72
+ swarm. behaviour_mut( ) . rendezvous. discover(
73
+ Some ( rendezvous:: Namespace :: new( NAMESPACE . to_string( ) ) . unwrap( ) ) ,
74
+ None ,
75
+ None ,
76
+ rendezvous_point,
77
+ ) ;
78
+ }
79
+ SwarmEvent :: UnreachableAddr { error, address, .. }
80
+ | SwarmEvent :: UnknownPeerUnreachableAddr { error, address, .. }
81
+ if address == rendezvous_point_address =>
82
+ {
83
+ log:: error!(
84
+ "Failed to connect to rendezvous point at {}: {}" ,
85
+ address,
86
+ error
87
+ ) ;
88
+ return ;
89
+ }
90
+ SwarmEvent :: Behaviour ( MyEvent :: Rendezvous ( rendezvous:: client:: Event :: Discovered {
91
+ registrations,
92
+ cookie: new_cookie,
93
+ ..
94
+ } ) ) => {
95
+ cookie. replace( new_cookie) ;
96
+
97
+ for registration in registrations {
98
+ for address in registration. record. addresses( ) {
99
+ let peer = registration. record. peer_id( ) ;
100
+ log:: info!( "Discovered peer {} at {}" , peer, address) ;
101
+
102
+ let p2p_suffix = Protocol :: P2p ( * peer. as_ref( ) ) ;
103
+ let address_with_p2p =
104
+ if !address. ends_with( & Multiaddr :: empty( ) . with( p2p_suffix. clone( ) ) ) {
105
+ address. clone( ) . with( p2p_suffix)
106
+ } else {
107
+ address. clone( )
108
+ } ;
109
+
110
+ swarm. dial_addr( address_with_p2p) . unwrap( )
111
+ }
112
+ }
113
+ }
114
+ SwarmEvent :: Behaviour ( MyEvent :: Ping ( PingEvent {
115
+ peer,
116
+ result: Ok ( PingSuccess :: Ping { rtt } ) ,
117
+ } ) ) if peer != rendezvous_point => {
118
+ log:: info!( "Ping to {} is {}ms" , peer, rtt. as_millis( ) )
119
+ }
120
+ other => {
121
+ log:: debug!( "Unhandled {:?}" , other) ;
122
+ }
123
+ } ,
124
+ _ = discover_tick. tick( ) , if cookie. is_some( ) =>
64
125
swarm. behaviour_mut( ) . rendezvous. discover(
65
126
Some ( rendezvous:: Namespace :: new( NAMESPACE . to_string( ) ) . unwrap( ) ) ,
127
+ cookie. clone( ) ,
66
128
None ,
67
- None ,
68
- rendezvous_point,
69
- ) ;
70
- }
71
- SwarmEvent :: UnreachableAddr { error, address, .. }
72
- | SwarmEvent :: UnknownPeerUnreachableAddr { error, address, .. }
73
- if address == rendezvous_point_address =>
74
- {
75
- log:: error!(
76
- "Failed to connect to rendezvous point at {}: {}" ,
77
- address,
78
- error
79
- ) ;
80
- return ;
81
- }
82
- SwarmEvent :: Behaviour ( MyEvent :: Rendezvous ( rendezvous:: client:: Event :: Discovered {
83
- registrations,
84
- ..
85
- } ) ) => {
86
- for registration in registrations {
87
- for address in registration. record . addresses ( ) {
88
- let peer = registration. record . peer_id ( ) ;
89
- log:: info!( "Discovered peer {} at {}" , peer, address) ;
90
-
91
- let p2p_suffix = Protocol :: P2p ( * peer. as_ref ( ) ) ;
92
- let address_with_p2p =
93
- if !address. ends_with ( & Multiaddr :: empty ( ) . with ( p2p_suffix. clone ( ) ) ) {
94
- address. clone ( ) . with ( p2p_suffix)
95
- } else {
96
- address. clone ( )
97
- } ;
98
-
99
- swarm. dial_addr ( address_with_p2p) . unwrap ( )
100
- }
101
- }
102
- }
103
- SwarmEvent :: Behaviour ( MyEvent :: Ping ( PingEvent {
104
- peer,
105
- result : Ok ( PingSuccess :: Ping { rtt } ) ,
106
- } ) ) if peer != rendezvous_point => {
107
- log:: info!( "Ping to {} is {}ms" , peer, rtt. as_millis( ) )
108
- }
109
- other => {
110
- log:: debug!( "Unhandled {:?}" , other) ;
111
- }
129
+ rendezvous_point
130
+ )
112
131
}
113
132
}
114
133
}
0 commit comments