@@ -33,13 +33,26 @@ use {
33
33
std:: {
34
34
iter:: repeat_with,
35
35
net:: { IpAddr , SocketAddr , UdpSocket } ,
36
+ // CAUTION: be careful not to introduce any awaits while holding an RwLock.
36
37
sync:: {
37
38
atomic:: { AtomicBool , AtomicU64 , Ordering } ,
38
- Arc , Mutex , MutexGuard , RwLock ,
39
+ Arc , RwLock ,
39
40
} ,
40
41
time:: { Duration , Instant } ,
41
42
} ,
42
- tokio:: { task:: JoinHandle , time:: timeout} ,
43
+ tokio:: {
44
+ // CAUTION: It's kind of sketch that we're mixing async and sync locks (see the RwLock above).
45
+ // This is done so that sync code can also access the stake table.
46
+ // Make sure we don't hold a sync lock across an await - including the await to
47
+ // lock an async Mutex. This does not happen now and should not happen as long as we
48
+ // don't hold an async Mutex and sync RwLock at the same time (currently true)
49
+ // but if we do, the scope of the RwLock must always be a subset of the async Mutex
50
+ // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to
51
+ // introduce any other awaits while holding the RwLock.
52
+ sync:: { Mutex , MutexGuard } ,
53
+ task:: JoinHandle ,
54
+ time:: timeout,
55
+ } ,
43
56
} ;
44
57
45
58
const WAIT_FOR_STREAM_TIMEOUT : Duration = Duration :: from_millis ( 100 ) ;
@@ -383,7 +396,7 @@ fn handle_and_cache_new_connection(
383
396
}
384
397
}
385
398
386
- fn prune_unstaked_connections_and_add_new_connection (
399
+ async fn prune_unstaked_connections_and_add_new_connection (
387
400
connection : Connection ,
388
401
connection_table : Arc < Mutex < ConnectionTable > > ,
389
402
max_connections : usize ,
@@ -394,7 +407,7 @@ fn prune_unstaked_connections_and_add_new_connection(
394
407
let stats = params. stats . clone ( ) ;
395
408
if max_connections > 0 {
396
409
let connection_table_clone = connection_table. clone ( ) ;
397
- let mut connection_table = connection_table. lock ( ) . unwrap ( ) ;
410
+ let mut connection_table = connection_table. lock ( ) . await ;
398
411
prune_unstaked_connection_table ( & mut connection_table, max_connections, stats) ;
399
412
handle_and_cache_new_connection (
400
413
connection,
@@ -504,7 +517,8 @@ async fn setup_connection(
504
517
505
518
match params. peer_type {
506
519
ConnectionPeerType :: Staked ( stake) => {
507
- let mut connection_table_l = staked_connection_table. lock ( ) . unwrap ( ) ;
520
+ let mut connection_table_l = staked_connection_table. lock ( ) . await ;
521
+
508
522
if connection_table_l. total_size >= max_staked_connections {
509
523
let num_pruned =
510
524
connection_table_l. prune_random ( PRUNE_RANDOM_SAMPLE_SIZE , stake) ;
@@ -535,7 +549,9 @@ async fn setup_connection(
535
549
& params,
536
550
wait_for_chunk_timeout,
537
551
stream_load_ema. clone ( ) ,
538
- ) {
552
+ )
553
+ . await
554
+ {
539
555
stats
540
556
. connection_added_from_staked_peer
541
557
. fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -557,7 +573,9 @@ async fn setup_connection(
557
573
& params,
558
574
wait_for_chunk_timeout,
559
575
stream_load_ema. clone ( ) ,
560
- ) {
576
+ )
577
+ . await
578
+ {
561
579
stats
562
580
. connection_added_from_unstaked_peer
563
581
. fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -800,7 +818,7 @@ async fn handle_connection(
800
818
}
801
819
}
802
820
803
- let removed_connection_count = connection_table. lock ( ) . unwrap ( ) . remove_connection (
821
+ let removed_connection_count = connection_table. lock ( ) . await . remove_connection (
804
822
ConnectionTableKey :: new ( remote_addr. ip ( ) , params. remote_pubkey ) ,
805
823
remote_addr. port ( ) ,
806
824
stable_id,
0 commit comments