@@ -292,8 +292,33 @@ impl NetworkBehaviour for SwarmApi {
292
292
if let ConnectedPoint :: Dialer { .. } = cp {
293
293
let addr = MultiaddrWithPeerId :: from ( ( closed_addr, peer_id. to_owned ( ) ) ) ;
294
294
295
- self . connect_registry
296
- . finish_subscription ( addr. into ( ) , Err ( "Connection reset by peer" . to_owned ( ) ) ) ;
295
+ match self . pending_connections . entry ( * peer_id) {
296
+ Entry :: Occupied ( mut oe) => {
297
+ let connections = oe. get_mut ( ) ;
298
+ let pos = connections. iter ( ) . position ( |x| addr. multiaddr == * x) ;
299
+
300
+ if let Some ( pos) = pos {
301
+ connections. swap_remove ( pos) ;
302
+
303
+ // this needs to be guarded, so that the connect test case doesn't cause a
304
+ // panic following inject_connection_established, inject_connection_closed
305
+ // if there's only the DummyProtocolsHandler, which doesn't open a
306
+ // substream and closes up immediatedly.
307
+ self . connect_registry . finish_subscription (
308
+ addr. into ( ) ,
309
+ Err ( "Connection reset by peer" . to_owned ( ) ) ,
310
+ ) ;
311
+ }
312
+
313
+ if connections. is_empty ( ) {
314
+ oe. remove ( ) ;
315
+ }
316
+ }
317
+ Entry :: Vacant ( _) => { }
318
+ }
319
+ } else {
320
+ // we were not dialing to the peer, thus we cannot have a pending subscription to
321
+ // finish.
297
322
}
298
323
}
299
324
@@ -338,8 +363,8 @@ impl NetworkBehaviour for SwarmApi {
338
363
} ) ;
339
364
}
340
365
341
- // this should not be executed once, but probably will be in case unsupported addresses
342
- // happen
366
+ // this should not be executed once, but probably will be in case unsupported addresses or something
367
+ // surprising happens.
343
368
for failed in self . pending_connections . remove ( peer_id) . unwrap_or_default ( ) {
344
369
let addr = MultiaddrWithoutPeerId :: try_from ( failed)
345
370
. expect ( "peerid has been stripped earlier" )
@@ -405,36 +430,188 @@ fn connection_point_addr(cp: &ConnectedPoint) -> &Multiaddr {
405
430
#[ cfg( test) ]
406
431
mod tests {
407
432
use super :: * ;
408
- use crate :: p2p:: transport:: { build_transport, TTransport } ;
433
+ use crate :: p2p:: transport:: build_transport;
434
+ use futures:: {
435
+ stream:: { StreamExt , TryStreamExt } ,
436
+ TryFutureExt ,
437
+ } ;
409
438
use libp2p:: identity:: Keypair ;
410
- use libp2p:: { multiaddr:: Protocol , multihash:: Multihash , swarm:: Swarm } ;
439
+ use libp2p:: swarm:: SwarmEvent ;
440
+ use libp2p:: { multiaddr:: Protocol , multihash:: Multihash , swarm:: Swarm , swarm:: SwarmBuilder } ;
411
441
use std:: convert:: TryInto ;
412
442
413
443
#[ tokio:: test]
414
444
async fn swarm_api ( ) {
415
- let ( peer1_id, trans) = mk_transport ( ) ;
416
- let mut swarm1 = Swarm :: new ( trans, SwarmApi :: default ( ) , peer1_id) ;
417
-
418
- let ( peer2_id, trans) = mk_transport ( ) ;
419
- let mut swarm2 = Swarm :: new ( trans, SwarmApi :: default ( ) , peer2_id) ;
445
+ let ( peer1_id, mut swarm1) = build_swarm ( ) ;
446
+ let ( peer2_id, mut swarm2) = build_swarm ( ) ;
420
447
421
448
Swarm :: listen_on ( & mut swarm1, "/ip4/127.0.0.1/tcp/0" . parse ( ) . unwrap ( ) ) . unwrap ( ) ;
422
449
423
- for l in Swarm :: listeners ( & swarm1) {
424
- let mut addr = l. to_owned ( ) ;
450
+ loop {
451
+ if let SwarmEvent :: NewListenAddr ( _) = swarm1. next_event ( ) . await {
452
+ break ;
453
+ }
454
+ }
455
+
456
+ let listeners = Swarm :: listeners ( & swarm1) . cloned ( ) . collect :: < Vec < _ > > ( ) ;
457
+
458
+ for mut addr in listeners {
425
459
addr. push ( Protocol :: P2p (
426
460
Multihash :: from_bytes ( & peer1_id. to_bytes ( ) ) . unwrap ( ) ,
427
461
) ) ;
428
- if let Some ( fut) = swarm2. connect ( addr. try_into ( ) . unwrap ( ) ) {
429
- fut. await . unwrap ( ) ;
462
+
463
+ let mut sub = swarm2. connect ( addr. try_into ( ) . unwrap ( ) ) . unwrap ( ) ;
464
+
465
+ loop {
466
+ tokio:: select! {
467
+ _ = ( & mut swarm1) . next_event( ) => { } ,
468
+ _ = ( & mut swarm2) . next_event( ) => { } ,
469
+ res = ( & mut sub) => {
470
+ // this is currently a success even though the connection is never really
471
+ // established, the DummyProtocolsHandler doesn't do anything nor want the
472
+ // connection to be kept alive and thats it.
473
+ //
474
+ // it could be argued that this should be `Err("keepalive disconnected")`
475
+ // or something and I'd agree, but I also agree this can be an `Ok(())`;
476
+ // it's the sort of difficulty with the cli functionality in general: what
477
+ // does it mean to connect to a peer? one way to look at it would be to
478
+ // make the peer a "pinned peer" or "friend" and to keep the connection
479
+ // alive at all costs. perhaps that is something for the next round.
480
+ // another aspect would be to fail this future because there was no
481
+ // `inject_connected`, only `inject_connection_established`. taking that
482
+ // route would be good; it does however leave the special case of adding
483
+ // another connection, which does add even more complexity than it exists
484
+ // at the present.
485
+ res. unwrap( ) ;
486
+
487
+ // just to confirm that there are no connections.
488
+ assert_eq!( Vec :: <Multiaddr >:: new( ) , swarm1. connections_to( & peer2_id) ) ;
489
+ break ;
490
+ }
491
+ }
430
492
}
431
493
}
432
494
}
433
495
434
- fn mk_transport ( ) -> ( PeerId , TTransport ) {
496
+ #[ tokio:: test]
497
+ async fn wrong_peerid ( ) {
498
+ let ( _, mut swarm1) = build_swarm ( ) ;
499
+ let ( _, mut swarm2) = build_swarm ( ) ;
500
+
501
+ let peer3_id = Keypair :: generate_ed25519 ( ) . public ( ) . into_peer_id ( ) ;
502
+
503
+ Swarm :: listen_on ( & mut swarm1, "/ip4/127.0.0.1/tcp/0" . parse ( ) . unwrap ( ) ) . unwrap ( ) ;
504
+
505
+ let address;
506
+
507
+ loop {
508
+ if let SwarmEvent :: NewListenAddr ( addr) = swarm1. next_event ( ) . await {
509
+ // wonder if there should be a timeout?
510
+ address = addr;
511
+ break ;
512
+ }
513
+ }
514
+
515
+ let mut fut = swarm2
516
+ . connect (
517
+ MultiaddrWithoutPeerId :: try_from ( address)
518
+ . unwrap ( )
519
+ . with ( peer3_id) ,
520
+ )
521
+ . unwrap ( )
522
+ // remove the private type wrapper
523
+ . map_err ( |e| e. into_inner ( ) ) ;
524
+
525
+ loop {
526
+ tokio:: select! {
527
+ _ = swarm1. next_event( ) => { } ,
528
+ _ = swarm2. next_event( ) => { } ,
529
+ res = & mut fut => {
530
+ assert_eq!( res. unwrap_err( ) , Some ( "Pending connection: Invalid peer ID." . into( ) ) ) ;
531
+ return ;
532
+ }
533
+ }
534
+ }
535
+ }
536
+
537
+ #[ tokio:: test]
538
+ async fn racy_connecting_attempts ( ) {
539
+ let ( peer1_id, mut swarm1) = build_swarm ( ) ;
540
+ let ( _, mut swarm2) = build_swarm ( ) ;
541
+
542
+ Swarm :: listen_on ( & mut swarm1, "/ip4/127.0.0.1/tcp/0" . parse ( ) . unwrap ( ) ) . unwrap ( ) ;
543
+ Swarm :: listen_on ( & mut swarm1, "/ip4/127.0.0.1/tcp/0" . parse ( ) . unwrap ( ) ) . unwrap ( ) ;
544
+
545
+ let mut addresses = Vec :: with_capacity ( 2 ) ;
546
+
547
+ while addresses. len ( ) < 2 {
548
+ if let SwarmEvent :: NewListenAddr ( addr) = swarm1. next_event ( ) . await {
549
+ addresses. push ( addr) ;
550
+ }
551
+ }
552
+
553
+ let targets = (
554
+ MultiaddrWithoutPeerId :: try_from ( addresses[ 0 ] . clone ( ) )
555
+ . unwrap ( )
556
+ . with ( peer1_id) ,
557
+ MultiaddrWithoutPeerId :: try_from ( addresses[ 1 ] . clone ( ) )
558
+ . unwrap ( )
559
+ . with ( peer1_id) ,
560
+ ) ;
561
+
562
+ let mut connections = futures:: stream:: FuturesOrdered :: new ( ) ;
563
+ // these two should be attempted in parallel. since we know both of them work, and they are
564
+ // given in this order, we know that in libp2p 0.34 only the first should win, however
565
+ // both should always be finished.
566
+ connections. push ( swarm2. connect ( targets. 0 ) . unwrap ( ) ) ;
567
+ connections. push ( swarm2. connect ( targets. 1 ) . unwrap ( ) ) ;
568
+ let ready = connections
569
+ // turn the private error type into Option
570
+ . map_err ( |e| e. into_inner ( ) )
571
+ . collect :: < Vec < _ > > ( ) ;
572
+
573
+ tokio:: pin!( ready) ;
574
+
575
+ loop {
576
+ tokio:: select! {
577
+ _ = swarm1. next_event( ) => { }
578
+ _ = swarm2. next_event( ) => { }
579
+ res = & mut ready => {
580
+
581
+ assert_eq!(
582
+ res,
583
+ vec![
584
+ Ok ( ( ) ) ,
585
+ Err ( Some ( "finished connecting to another address" . into( ) ) )
586
+ ] ) ;
587
+
588
+ break ;
589
+ }
590
+ }
591
+ }
592
+ }
593
+
594
+ fn build_swarm ( ) -> ( PeerId , libp2p:: swarm:: Swarm < SwarmApi > ) {
435
595
let key = Keypair :: generate_ed25519 ( ) ;
436
596
let peer_id = key. public ( ) . into_peer_id ( ) ;
437
597
let transport = build_transport ( key) . unwrap ( ) ;
438
- ( peer_id, transport)
598
+
599
+ let swarm = SwarmBuilder :: new ( transport, SwarmApi :: default ( ) , peer_id)
600
+ . executor ( Box :: new ( ThreadLocalTokio ) )
601
+ . build ( ) ;
602
+ ( peer_id, swarm)
603
+ }
604
+
605
+ use std:: future:: Future ;
606
+ use std:: pin:: Pin ;
607
+
608
+ // can only be used from within tokio context. this is required since otherwise libp2p-tcp will
609
+ // use tokio, but from a futures-executor threadpool, which is outside of tokio context.
610
+ struct ThreadLocalTokio ;
611
+
612
+ impl libp2p:: core:: Executor for ThreadLocalTokio {
613
+ fn exec ( & self , future : Pin < Box < dyn Future < Output = ( ) > + Send + ' static > > ) {
614
+ tokio:: task:: spawn ( future) ;
615
+ }
439
616
}
440
617
}
0 commit comments