@@ -718,46 +718,62 @@ async fn handle_execute_inner(
718
718
let mut reported = false ;
719
719
720
720
let status = loop {
721
- tokio:: select! {
722
- status = & mut task => break status,
723
-
724
- stdin = rx. recv( ) , if stdin_tx. is_some( ) => {
725
- match stdin {
726
- Some ( stdin) => {
727
- stdin_tx
728
- . as_ref( )
729
- . unwrap( /* This is a precondition */ )
730
- . send( stdin)
731
- . await
732
- . drop_error_details( )
733
- . context( StdinSnafu ) ?;
734
- }
735
- None => {
736
- let stdin_tx = stdin_tx. take( ) ;
737
- drop( stdin_tx) ; // Signal closed
738
- }
739
- }
721
+ enum Event {
722
+ Stdin ( Option < String > ) ,
723
+ Stdout ( String ) ,
724
+ Stderr ( String ) ,
725
+ Status ( coordinator:: ExecuteStatus ) ,
726
+ }
727
+ use Event :: * ;
728
+
729
+ let event = tokio:: select! {
730
+ response = & mut task => break response,
731
+
732
+ stdin = rx. recv( ) , if stdin_tx. is_some( ) => Stdin ( stdin) ,
733
+
734
+ Some ( stdout) = stdout_rx. recv( ) => Stdout ( stdout) ,
735
+
736
+ Some ( stderr) = stderr_rx. recv( ) => Stderr ( stderr) ,
737
+
738
+ Some ( status) = status_rx. next( ) => Status ( status)
739
+ } ;
740
+
741
+ match event {
742
+ Stdin ( Some ( stdin) ) => {
743
+ stdin_tx
744
+ . as_ref ( )
745
+ . unwrap ( /* This is a precondition */ )
746
+ . send ( stdin)
747
+ . await
748
+ . drop_error_details ( )
749
+ . context ( StdinSnafu ) ?;
750
+ }
751
+ Stdin ( None ) => {
752
+ let stdin_tx = stdin_tx. take ( ) ;
753
+ drop ( stdin_tx) ; // Signal closed
740
754
}
741
755
742
- Some ( stdout) = stdout_rx . recv ( ) => {
756
+ Stdout ( stdout) => {
743
757
let sent = send_stdout ( stdout) . await ;
744
758
abandon_if_closed ! ( sent) ;
745
- } ,
759
+ }
746
760
747
- Some ( stderr) = stderr_rx . recv ( ) => {
761
+ Stderr ( stderr) => {
748
762
let sent = send_stderr ( stderr) . await ;
749
763
abandon_if_closed ! ( sent) ;
750
- } ,
764
+ }
751
765
752
- Some ( status) = status_rx . next ( ) => {
766
+ Status ( status) => {
753
767
if !reported && status. total_time_secs > 60.0 {
754
768
error ! ( "Request consumed more than 60s of CPU time: {req:?}" ) ;
755
769
reported = true ;
756
770
}
757
771
758
772
let payload = status. into ( ) ;
759
773
let meta = meta. clone ( ) ;
760
- let sent = tx. send( Ok ( MessageResponse :: ExecuteStatus { payload, meta } ) ) . await ;
774
+ let sent = tx
775
+ . send ( Ok ( MessageResponse :: ExecuteStatus { payload, meta } ) )
776
+ . await ;
761
777
abandon_if_closed ! ( sent) ;
762
778
}
763
779
}
0 commit comments