@@ -546,8 +546,8 @@ mod tests {
546
546
/// publish to. The relay and DNS servers share their state.
547
547
#[ cfg( test) ]
548
548
mod test_dns_pkarr {
549
- use std:: future:: Future ;
550
549
use std:: net:: SocketAddr ;
550
+ use std:: { future:: Future , time:: Duration } ;
551
551
552
552
use anyhow:: Result ;
553
553
use hickory_resolver:: { config:: NameServerConfig , AsyncResolver , TokioAsyncResolver } ;
@@ -601,7 +601,7 @@ mod test_dns_pkarr {
601
601
let cancel = CancellationToken :: new ( ) ;
602
602
let origin = "testdns.example" . to_string ( ) ;
603
603
let ( nameserver, pkarr_url, _state, task) =
604
- spawn_dns_and_pkarr ( origin. clone ( ) , cancel. clone ( ) ) . await ?;
604
+ run_dns_and_pkarr_servers ( origin. clone ( ) , cancel. clone ( ) ) . await ?;
605
605
606
606
let secret_key = SecretKey :: generate ( ) ;
607
607
let node_id = secret_key. public ( ) ;
@@ -636,19 +636,20 @@ mod test_dns_pkarr {
636
636
637
637
let cancel = CancellationToken :: new ( ) ;
638
638
let origin = "testdns.example" . to_string ( ) ;
639
+ let timeout = Duration :: from_secs ( 1 ) ;
640
+
639
641
let ( nameserver, pkarr_url, state, task) =
640
- spawn_dns_and_pkarr ( origin. clone ( ) , cancel. clone ( ) ) . await ?;
642
+ run_dns_and_pkarr_servers ( & origin, cancel. clone ( ) ) . await ?;
643
+ let ( relay_map, _relay_url, _relay_guard) = run_relay_server ( ) . await ?;
641
644
642
- let ( relay_map, _relay_url, _relay_guard) = run_relay_server ( ) . await . unwrap ( ) ;
643
645
let ep1 = ep_with_discovery ( relay_map. clone ( ) , nameserver, & origin, & pkarr_url) . await ?;
644
646
let ep2 = ep_with_discovery ( relay_map, nameserver, & origin, & pkarr_url) . await ?;
645
647
646
648
// wait until our shared state received the update from pkarr publishing
647
- state. on_update ( ) . await ;
649
+ state. on_node ( & ep1 . node_id ( ) , timeout ) . await ? ;
648
650
649
651
// we connect only by node id!
650
- let ep2_node_id = ep2. node_id ( ) ;
651
- let res = ep1. connect ( ep2_node_id. into ( ) , TEST_ALPN ) . await ;
652
+ let res = ep2. connect ( ep1. node_id ( ) . into ( ) , TEST_ALPN ) . await ;
652
653
assert ! ( res. is_ok( ) , "connection established" ) ;
653
654
cancel. cancel ( ) ;
654
655
task. await ??;
@@ -703,11 +704,11 @@ mod test_dns_pkarr {
703
704
( node_info, signed_packet)
704
705
}
705
706
706
- async fn spawn_dns_and_pkarr (
707
- origin : String ,
707
+ async fn run_dns_and_pkarr_servers (
708
+ origin : impl ToString ,
708
709
cancel : CancellationToken ,
709
710
) -> Result < ( SocketAddr , Url , State , JoinHandle < Result < ( ) > > ) > {
710
- let state = State :: new ( origin) ;
711
+ let state = State :: new ( origin. to_string ( ) ) ;
711
712
let ( nameserver, dns_task) = run_dns_server ( state. clone ( ) , cancel. clone ( ) ) . await ?;
712
713
let ( pkarr_url, pkarr_task) = run_pkarr_relay ( state. clone ( ) , cancel. clone ( ) ) . await ?;
713
714
let join_handle = tokio:: task:: spawn ( async move {
@@ -720,12 +721,14 @@ mod test_dns_pkarr {
720
721
721
722
mod state {
722
723
use crate :: NodeId ;
724
+ use anyhow:: { anyhow, Result } ;
723
725
use parking_lot:: { Mutex , MutexGuard } ;
724
726
use pkarr:: SignedPacket ;
725
727
use std:: {
726
728
collections:: { hash_map, HashMap } ,
727
729
ops:: Deref ,
728
730
sync:: Arc ,
731
+ time:: Duration ,
729
732
} ;
730
733
731
734
#[ derive( Debug , Clone ) ]
@@ -748,6 +751,20 @@ mod test_dns_pkarr {
748
751
self . notify . notified ( )
749
752
}
750
753
754
+ pub async fn on_node ( & self , node : & NodeId , timeout : Duration ) -> Result < ( ) > {
755
+ let timeout = tokio:: time:: sleep ( timeout) ;
756
+ tokio:: pin!( timeout) ;
757
+ loop {
758
+ if self . get ( node) . is_some ( ) {
759
+ return Ok ( ( ) ) ;
760
+ }
761
+ tokio:: select! {
762
+ _ = & mut timeout => return Err ( anyhow!( "timeout" ) ) ,
763
+ _ = self . on_update( ) => { }
764
+ }
765
+ }
766
+ }
767
+
751
768
pub fn upsert ( & self , signed_packet : SignedPacket ) -> anyhow:: Result < bool > {
752
769
let node_id = NodeId :: from_bytes ( & signed_packet. public_key ( ) . to_bytes ( ) ) ?;
753
770
let mut map = self . packets . lock ( ) ;
@@ -771,7 +788,6 @@ mod test_dns_pkarr {
771
788
Ok ( updated)
772
789
}
773
790
pub fn get ( & self , node_id : & NodeId ) -> Option < impl Deref < Target = SignedPacket > + ' _ > {
774
- println ! ( "GET {node_id}" ) ;
775
791
let map = self . packets . lock ( ) ;
776
792
if map. contains_key ( node_id) {
777
793
let guard = MutexGuard :: map ( map, |state| state. get_mut ( node_id) . unwrap ( ) ) ;
0 commit comments