@@ -30,13 +30,26 @@ use {
30
30
std:: {
31
31
iter:: repeat_with,
32
32
net:: { IpAddr , SocketAddr , UdpSocket } ,
33
+ // CAUTION: be careful not to introduce any awaits while holding an RwLock.
33
34
sync:: {
34
35
atomic:: { AtomicBool , AtomicU64 , Ordering } ,
35
- Arc , Mutex , MutexGuard , RwLock ,
36
+ Arc , RwLock ,
36
37
} ,
37
38
time:: { Duration , Instant } ,
38
39
} ,
39
- tokio:: { task:: JoinHandle , time:: timeout} ,
40
+ tokio:: {
41
+ // CAUTION: It's kind of sketch that we're mixing async and sync locks (see the RwLock above).
42
+ // This is done so that sync code can also access the stake table.
43
+ // Make sure we don't hold a sync lock across an await - including the await to
44
+ // lock an async Mutex. This does not happen now and should not happen as long as we
45
+ // don't hold an async Mutex and sync RwLock at the same time (currently true)
46
+ // but if we do, the scope of the RwLock must always be a subset of the async Mutex
47
+ // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to
48
+ // introduce any other awaits while holding the RwLock.
49
+ sync:: { Mutex , MutexGuard } ,
50
+ task:: JoinHandle ,
51
+ time:: timeout,
52
+ } ,
40
53
} ;
41
54
42
55
/// Limit to 500K PPS
@@ -383,7 +396,7 @@ fn handle_and_cache_new_connection(
383
396
}
384
397
}
385
398
386
- fn prune_unstaked_connections_and_add_new_connection (
399
+ async fn prune_unstaked_connections_and_add_new_connection (
387
400
connection : Connection ,
388
401
connection_table : Arc < Mutex < ConnectionTable > > ,
389
402
max_connections : usize ,
@@ -393,7 +406,7 @@ fn prune_unstaked_connections_and_add_new_connection(
393
406
let stats = params. stats . clone ( ) ;
394
407
if max_connections > 0 {
395
408
let connection_table_clone = connection_table. clone ( ) ;
396
- let mut connection_table = connection_table. lock ( ) . unwrap ( ) ;
409
+ let mut connection_table = connection_table. lock ( ) . await ;
397
410
prune_unstaked_connection_table ( & mut connection_table, max_connections, stats) ;
398
411
handle_and_cache_new_connection (
399
412
connection,
@@ -496,7 +509,8 @@ async fn setup_connection(
496
509
) ;
497
510
498
511
if params. stake > 0 {
499
- let mut connection_table_l = staked_connection_table. lock ( ) . unwrap ( ) ;
512
+ let mut connection_table_l = staked_connection_table. lock ( ) . await ;
513
+
500
514
if connection_table_l. total_size >= max_staked_connections {
501
515
let num_pruned =
502
516
connection_table_l. prune_random ( PRUNE_RANDOM_SAMPLE_SIZE , params. stake ) ;
@@ -525,7 +539,9 @@ async fn setup_connection(
525
539
max_unstaked_connections,
526
540
& params,
527
541
wait_for_chunk_timeout,
528
- ) {
542
+ )
543
+ . await
544
+ {
529
545
stats
530
546
. connection_added_from_staked_peer
531
547
. fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -544,7 +560,9 @@ async fn setup_connection(
544
560
max_unstaked_connections,
545
561
& params,
546
562
wait_for_chunk_timeout,
547
- ) {
563
+ )
564
+ . await
565
+ {
548
566
stats
549
567
. connection_added_from_unstaked_peer
550
568
. fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -807,7 +825,7 @@ async fn handle_connection(
807
825
}
808
826
}
809
827
810
- let removed_connection_count = connection_table. lock ( ) . unwrap ( ) . remove_connection (
828
+ let removed_connection_count = connection_table. lock ( ) . await . remove_connection (
811
829
ConnectionTableKey :: new ( remote_addr. ip ( ) , params. remote_pubkey ) ,
812
830
remote_addr. port ( ) ,
813
831
stable_id,
0 commit comments