@@ -39,7 +39,7 @@ use futures::prelude::*;
39
39
use log:: { debug, error, trace} ;
40
40
use serde_json:: json;
41
41
use std:: { collections:: HashMap , pin:: Pin , task:: { Context , Poll } , time:: Duration } ;
42
- use wasm_timer:: Instant ;
42
+ use wasm_timer:: { Delay , Instant } ;
43
43
use sp_utils:: mpsc:: { tracing_unbounded, TracingUnboundedSender , TracingUnboundedReceiver } ;
44
44
45
45
pub use libp2p:: PeerId ;
@@ -252,6 +252,9 @@ pub struct Peerset {
252
252
created : Instant ,
253
253
/// Last time when we updated the reputations of connected nodes.
254
254
latest_time_update : Instant ,
255
+ /// Next time to do a periodic call to `alloc_slots` with all sets. This is done once per
256
+ /// second, to match the period of the reputation updates.
257
+ next_periodic_alloc_slots : Delay ,
255
258
}
256
259
257
260
impl Peerset {
@@ -279,6 +282,7 @@ impl Peerset {
279
282
message_queue : VecDeque :: new ( ) ,
280
283
created : now,
281
284
latest_time_update : now,
285
+ next_periodic_alloc_slots : Delay :: new ( Duration :: new ( 0 , 0 ) ) ,
282
286
}
283
287
} ;
284
288
@@ -699,6 +703,14 @@ impl Stream for Peerset {
699
703
return Poll :: Ready ( Some ( message) ) ;
700
704
}
701
705
706
+ if let Poll :: Ready ( _) = Future :: poll ( Pin :: new ( & mut self . next_periodic_alloc_slots ) , cx) {
707
+ self . next_periodic_alloc_slots = Delay :: new ( Duration :: new ( 1 , 0 ) ) ;
708
+
709
+ for set_index in 0 ..self . data . num_sets ( ) {
710
+ self . alloc_slots ( SetId ( set_index) ) ;
711
+ }
712
+ }
713
+
702
714
let action = match Stream :: poll_next ( Pin :: new ( & mut self . rx ) , cx) {
703
715
Poll :: Pending => return Poll :: Pending ,
704
716
Poll :: Ready ( Some ( event) ) => event,
@@ -907,4 +919,45 @@ mod tests {
907
919
908
920
futures:: executor:: block_on ( fut) ;
909
921
}
922
+
923
+ #[ test]
924
+ fn test_relloc_after_banned ( ) {
925
+ let ( mut peerset, handle) = Peerset :: from_config ( PeersetConfig {
926
+ sets : vec ! [ SetConfig {
927
+ in_peers: 25 ,
928
+ out_peers: 25 ,
929
+ bootnodes: vec![ ] ,
930
+ reserved_nodes: Default :: default ( ) ,
931
+ reserved_only: false ,
932
+ } ] ,
933
+ } ) ;
934
+
935
+ // We ban a node by setting its reputation under the threshold.
936
+ let peer_id = PeerId :: random ( ) ;
937
+ handle. report_peer ( peer_id. clone ( ) , ReputationChange :: new ( BANNED_THRESHOLD - 1 , "" ) ) ;
938
+
939
+ let fut = futures:: future:: poll_fn ( move |cx| {
940
+ // We need one polling for the message to be processed.
941
+ assert_eq ! ( Stream :: poll_next( Pin :: new( & mut peerset) , cx) , Poll :: Pending ) ;
942
+
943
+ // Check that an incoming connection from that node gets refused.
944
+ // This is already tested in other tests, but it is done again here because it doesn't
945
+ // hurt.
946
+ peerset. incoming ( SetId :: from ( 0 ) , peer_id. clone ( ) , IncomingIndex ( 1 ) ) ;
947
+ if let Poll :: Ready ( msg) = Stream :: poll_next ( Pin :: new ( & mut peerset) , cx) {
948
+ assert_eq ! ( msg. unwrap( ) , Message :: Reject ( IncomingIndex ( 1 ) ) ) ;
949
+ } else {
950
+ panic ! ( )
951
+ }
952
+
953
+ // Wait for the peerset to change its mind and actually connect to it.
954
+ while let Poll :: Ready ( msg) = Stream :: poll_next ( Pin :: new ( & mut peerset) , cx) {
955
+ assert_eq ! ( msg. unwrap( ) , Message :: Connect { set_id: SetId :: from( 0 ) , peer_id } ) ;
956
+ }
957
+
958
+ Poll :: Ready ( ( ) )
959
+ } ) ;
960
+
961
+ futures:: executor:: block_on ( fut) ;
962
+ }
910
963
}
0 commit comments