@@ -16,7 +16,6 @@ use std::{
16
16
collections:: BTreeMap ,
17
17
convert:: TryFrom ,
18
18
mem,
19
- ops:: ControlFlow ,
20
19
pin:: pin,
21
20
sync:: {
22
21
atomic:: { AtomicU64 , Ordering } ,
@@ -477,16 +476,16 @@ async fn handle_core(
477
476
. collect ( ) ;
478
477
}
479
478
480
- IdleTimeout => {
481
- if handle_idle ( & mut manager , & tx ) . await . is_break ( ) {
482
- break ;
479
+ IdleTimeout | IdleRequest => {
480
+ if let IdleRequest = event {
481
+ info ! ( "Container requested to idle" ) ;
483
482
}
484
- }
485
483
486
- IdleRequest => {
487
- info ! ( "Container requested to idle" ) ;
484
+ let idled = manager . idle ( ) . await . context ( StreamingCoordinatorIdleSnafu ) ;
485
+ let Err ( error ) = idled else { continue } ;
488
486
489
- if handle_idle ( & mut manager, & tx) . await . is_break ( ) {
487
+ if tx. send ( Err ( ( error, None ) ) ) . await . is_err ( ) {
488
+ // We can't send a response
490
489
break ;
491
490
}
492
491
}
@@ -539,21 +538,6 @@ fn response_to_message(response: MessageResponse) -> Message {
539
538
Message :: Text ( resp. into ( ) )
540
539
}
541
540
542
- async fn handle_idle ( manager : & mut CoordinatorManager , tx : & ResponseTx ) -> ControlFlow < ( ) > {
543
- let idled = manager. idle ( ) . await . context ( StreamingCoordinatorIdleSnafu ) ;
544
-
545
- let Err ( error) = idled else {
546
- return ControlFlow :: Continue ( ( ) ) ;
547
- } ;
548
-
549
- if tx. send ( Err ( ( error, None ) ) ) . await . is_err ( ) {
550
- // We can't send a response
551
- return ControlFlow :: Break ( ( ) ) ;
552
- }
553
-
554
- ControlFlow :: Continue ( ( ) )
555
- }
556
-
557
541
type ActiveExecutionInfo = ( DropGuard , Option < mpsc:: Sender < String > > ) ;
558
542
559
543
async fn handle_msg (
0 commit comments