@@ -33,13 +33,26 @@ use {
33
33
std:: {
34
34
iter:: repeat_with,
35
35
net:: { IpAddr , SocketAddr , UdpSocket } ,
36
+ // CAUTION: be careful not to introduce any awaits while holding an RwLock.
36
37
sync:: {
37
38
atomic:: { AtomicBool , AtomicU64 , Ordering } ,
38
- Arc , Mutex , MutexGuard , RwLock ,
39
+ Arc , RwLock ,
39
40
} ,
40
41
time:: { Duration , Instant } ,
41
42
} ,
42
- tokio:: { task:: JoinHandle , time:: timeout} ,
43
+ tokio:: {
44
+ // CAUTION: It's kind of sketch that we're mixing async and sync locks (see the RwLock above).
45
+ // This is done so that sync code can also access the stake table.
46
+ // Make sure we don't hold a sync lock across an await - including the await to
47
+ // lock an async Mutex. This does not happen now and should not happen as long as we
48
+ // don't hold an async Mutex and sync RwLock at the same time (currently true)
49
+ // but if we do, the scope of the RwLock must always be a subset of the async Mutex
50
+ // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to
51
+ // introduce any other awaits while holding the RwLock.
52
+ sync:: { Mutex , MutexGuard } ,
53
+ task:: JoinHandle ,
54
+ time:: timeout,
55
+ } ,
43
56
} ;
44
57
45
58
const WAIT_FOR_STREAM_TIMEOUT : Duration = Duration :: from_millis ( 100 ) ;
@@ -384,7 +397,7 @@ fn handle_and_cache_new_connection(
384
397
}
385
398
}
386
399
387
- fn prune_unstaked_connections_and_add_new_connection (
400
+ async fn prune_unstaked_connections_and_add_new_connection (
388
401
connection : Connection ,
389
402
connection_table : Arc < Mutex < ConnectionTable > > ,
390
403
max_connections : usize ,
@@ -395,7 +408,7 @@ fn prune_unstaked_connections_and_add_new_connection(
395
408
let stats = params. stats . clone ( ) ;
396
409
if max_connections > 0 {
397
410
let connection_table_clone = connection_table. clone ( ) ;
398
- let mut connection_table = connection_table. lock ( ) . unwrap ( ) ;
411
+ let mut connection_table = connection_table. lock ( ) . await ;
399
412
prune_unstaked_connection_table ( & mut connection_table, max_connections, stats) ;
400
413
handle_and_cache_new_connection (
401
414
connection,
@@ -505,7 +518,8 @@ async fn setup_connection(
505
518
506
519
match params. peer_type {
507
520
ConnectionPeerType :: Staked ( stake) => {
508
- let mut connection_table_l = staked_connection_table. lock ( ) . unwrap ( ) ;
521
+ let mut connection_table_l = staked_connection_table. lock ( ) . await ;
522
+
509
523
if connection_table_l. total_size >= max_staked_connections {
510
524
let num_pruned =
511
525
connection_table_l. prune_random ( PRUNE_RANDOM_SAMPLE_SIZE , stake) ;
@@ -536,7 +550,9 @@ async fn setup_connection(
536
550
& params,
537
551
wait_for_chunk_timeout,
538
552
stream_load_ema. clone ( ) ,
539
- ) {
553
+ )
554
+ . await
555
+ {
540
556
stats
541
557
. connection_added_from_staked_peer
542
558
. fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -558,7 +574,9 @@ async fn setup_connection(
558
574
& params,
559
575
wait_for_chunk_timeout,
560
576
stream_load_ema. clone ( ) ,
561
- ) {
577
+ )
578
+ . await
579
+ {
562
580
stats
563
581
. connection_added_from_unstaked_peer
564
582
. fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -801,7 +819,7 @@ async fn handle_connection(
801
819
}
802
820
}
803
821
804
- let removed_connection_count = connection_table. lock ( ) . unwrap ( ) . remove_connection (
822
+ let removed_connection_count = connection_table. lock ( ) . await . remove_connection (
805
823
ConnectionTableKey :: new ( remote_addr. ip ( ) , params. remote_pubkey ) ,
806
824
remote_addr. port ( ) ,
807
825
stable_id,
0 commit comments