@@ -43,7 +43,7 @@ use n0_future::{
43
43
use snafu:: { Backtrace , Snafu } ;
44
44
use tokio:: sync:: { mpsc, oneshot, Mutex } ;
45
45
use tokio_util:: sync:: CancellationToken ;
46
- use tracing:: { error, info_span, trace, warn, Instrument } ;
46
+ use tracing:: { debug , error, info_span, trace, warn, Instrument } ;
47
47
48
48
use crate :: {
49
49
endpoint:: { Connecting , Connection , RemoteNodeIdError } ,
@@ -310,10 +310,16 @@ impl<P: ProtocolHandler> DynProtocolHandler for P {
310
310
}
311
311
}
312
312
313
- async fn shutdown_timeout ( handler : Arc < dyn DynProtocolHandler > ) -> Option < ( ) > {
314
- time:: timeout ( SHUTDOWN_TIMEOUT , handler. shutdown ( ) )
315
- . await
316
- . ok ( )
313
+ async fn shutdown_timeout ( alpn : Vec < u8 > , handler : Arc < dyn DynProtocolHandler > ) -> Option < ( ) > {
314
+ if let Err ( _elapsed) = time:: timeout ( SHUTDOWN_TIMEOUT , handler. shutdown ( ) ) . await {
315
+ debug ! (
316
+ alpn = String :: from_utf8_lossy( & alpn) . to_string( ) ,
317
+ "Protocol handler exceeded the shutdown timeout and was aborted"
318
+ ) ;
319
+ None
320
+ } else {
321
+ Some ( ( ) )
322
+ }
317
323
}
318
324
319
325
/// A typed map of protocol handlers, mapping them from ALPNs.
@@ -348,15 +354,14 @@ impl ProtocolMap {
348
354
///
349
355
/// Calls and awaits [`ProtocolHandler::shutdown`] for all registered handlers concurrently.
350
356
pub ( crate ) async fn shutdown ( & self ) {
351
- let handlers: Vec < _ > = {
352
- let inner = self . 0 . read ( ) . expect ( "poisoned" ) ;
353
- inner
354
- . values ( )
355
- . cloned ( )
356
- . map ( |handler| shutdown_timeout ( handler) )
357
- . collect ( )
358
- } ;
359
- join_all ( handlers) . await ;
357
+ let mut futures = Vec :: new ( ) ;
358
+ {
359
+ let mut inner = self . 0 . write ( ) . expect ( "poisoned" ) ;
360
+ while let Some ( ( alpn, handler) ) = inner. pop_first ( ) {
361
+ futures. push ( shutdown_timeout ( alpn, handler) ) ;
362
+ }
363
+ }
364
+ join_all ( futures) . await ;
360
365
}
361
366
}
362
367
@@ -499,8 +504,8 @@ impl RouterBuilder {
499
504
Some ( msg) = rx. recv( ) => {
500
505
match msg {
501
506
ToRouterTask :: Accept { alpn, handler, reply } => {
502
- let outcome = if let Some ( previous) = protocols. insert( alpn, handler) {
503
- join_set. spawn( shutdown_timeout( previous) ) ;
507
+ let outcome = if let Some ( previous) = protocols. insert( alpn. clone ( ) , handler) {
508
+ join_set. spawn( shutdown_timeout( alpn , previous) ) ;
504
509
AddProtocolOutcome :: Replaced
505
510
} else {
506
511
AddProtocolOutcome :: Inserted
@@ -510,7 +515,7 @@ impl RouterBuilder {
510
515
}
511
516
ToRouterTask :: StopAccepting { alpn, reply } => {
512
517
if let Some ( handler) = protocols. remove( & alpn) {
513
- join_set. spawn( shutdown_timeout( handler) ) ;
518
+ join_set. spawn( shutdown_timeout( alpn , handler) ) ;
514
519
endpoint. set_alpns( protocols. alpns( ) ) ;
515
520
reply. send( Ok ( ( ) ) ) . ok( ) ;
516
521
} else {
@@ -565,7 +570,7 @@ impl RouterBuilder {
565
570
endpoint. close ( ) . await ;
566
571
// Finally, we abort the remaining accept tasks. This should be a noop because we already cancelled
567
572
// the futures above.
568
- tracing :: debug!( "Shutting down remaining tasks" ) ;
573
+ debug ! ( "Shutting down remaining tasks" ) ;
569
574
join_set. abort_all ( ) ;
570
575
while let Some ( res) = join_set. join_next ( ) . await {
571
576
match res {
0 commit comments