@@ -53,7 +53,6 @@ use crossbeam::channel::bounded;
53
53
use crossbeam:: channel:: unbounded;
54
54
use crossbeam:: channel:: Receiver ;
55
55
use crossbeam:: channel:: Sender ;
56
- use crossbeam:: select;
57
56
use harp:: command:: r_command;
58
57
use harp:: command:: r_home_setup;
59
58
use harp:: environment:: r_ns_env;
@@ -754,6 +753,21 @@ impl RMain {
754
753
}
755
754
}
756
755
756
+ let mut select = crossbeam:: channel:: Select :: new ( ) ;
757
+
758
+ // Cloning is necessary to avoid a double mutable borrow error
759
+ let r_request_rx = self . r_request_rx . clone ( ) ;
760
+ let stdin_reply_rx = self . stdin_reply_rx . clone ( ) ;
761
+ let kernel_request_rx = self . kernel_request_rx . clone ( ) ;
762
+ let tasks_interrupt_rx = self . tasks_interrupt_rx . clone ( ) ;
763
+ let tasks_idle_rx = self . tasks_idle_rx . clone ( ) ;
764
+
765
+ let r_request_index = select. recv ( & r_request_rx) ;
766
+ let stdin_reply_index = select. recv ( & stdin_reply_rx) ;
767
+ let kernel_request_index = select. recv ( & kernel_request_rx) ;
768
+ let tasks_interrupt_index = select. recv ( & tasks_interrupt_rx) ;
769
+ let tasks_idle_index = select. recv ( & tasks_idle_rx) ;
770
+
757
771
loop {
758
772
// If an interrupt was signaled and we are in a user
759
773
// request prompt, e.g. `readline()`, we need to propagate
@@ -780,15 +794,29 @@ impl RMain {
780
794
781
795
// First handle execute requests outside of `select!` to ensure they
782
796
// have priority. `select!` chooses at random.
783
- if let Ok ( req) = self . r_request_rx . try_recv ( ) {
797
+ if let Ok ( req) = r_request_rx. try_recv ( ) {
784
798
if let Some ( input) = self . handle_execute_request ( req, & info, buf, buflen) {
785
799
return input;
786
800
}
787
801
}
788
802
789
- select ! {
790
- // Wait for an execution request from the frontend.
791
- recv( self . r_request_rx) -> req => {
803
+ let oper = select. select_timeout ( Duration :: from_millis ( 200 ) ) ;
804
+
805
+ let Ok ( oper) = oper else {
806
+ // We hit a timeout. Process events because we need to
807
+ // pump the event loop while waiting for console input.
808
+ //
809
+ // Alternatively, we could try to figure out the file
810
+ // descriptors that R has open and select() on those for
811
+ // available data?
812
+ unsafe { Self :: process_events ( ) } ;
813
+ continue ;
814
+ } ;
815
+
816
+ match oper. index ( ) {
817
+ // We've got an execute request from the frontend
818
+ i if i == r_request_index => {
819
+ let req = oper. recv ( & r_request_rx) ;
792
820
let Ok ( req) = req else {
793
821
// The channel is disconnected and empty
794
822
return ConsoleResult :: Disconnected ;
@@ -797,35 +825,33 @@ impl RMain {
797
825
if let Some ( input) = self . handle_execute_request ( req, & info, buf, buflen) {
798
826
return input;
799
827
}
800
- }
828
+ } ,
801
829
802
830
// We've got a reply for readline
803
- recv( self . stdin_reply_rx) -> reply => {
804
- return self . handle_input_reply( reply. unwrap( ) , buf, buflen) ;
805
- }
831
+ i if i == stdin_reply_index => {
832
+ let reply = oper. recv ( & stdin_reply_rx) . unwrap ( ) ;
833
+ return self . handle_input_reply ( reply, buf, buflen) ;
834
+ } ,
806
835
807
836
// We've got a kernel request
808
- recv( self . kernel_request_rx) -> req => {
809
- self . handle_kernel_request( req. unwrap( ) , & info) ;
810
- }
837
+ i if i == kernel_request_index => {
838
+ let req = oper. recv ( & kernel_request_rx) . unwrap ( ) ;
839
+ self . handle_kernel_request ( req, & info) ;
840
+ } ,
811
841
812
- // A task woke us up, start next loop tick to yield to it
813
- recv( self . tasks_interrupt_rx) -> task => {
814
- self . handle_task_interrupt( task. unwrap( ) ) ;
815
- }
816
- recv( self . tasks_idle_rx) -> task => {
817
- self . handle_task( task. unwrap( ) ) ;
818
- }
842
+ // An interrupt task woke us up
843
+ i if i == tasks_interrupt_index => {
844
+ let task = oper. recv ( & tasks_interrupt_rx) . unwrap ( ) ;
845
+ self . handle_task_interrupt ( task) ;
846
+ } ,
819
847
820
- // Wait with a timeout. Necessary because we need to
821
- // pump the event loop while waiting for console input.
822
- //
823
- // Alternatively, we could try to figure out the file
824
- // descriptors that R has open and select() on those for
825
- // available data?
826
- default ( Duration :: from_millis( 200 ) ) => {
827
- unsafe { Self :: process_events( ) } ;
828
- }
848
+ // An idle task woke us up
849
+ i if i == tasks_idle_index => {
850
+ let task = oper. recv ( & tasks_idle_rx) . unwrap ( ) ;
851
+ self . handle_task ( task) ;
852
+ } ,
853
+
854
+ i => log:: error!( "Unexpected index in Select: {i}" ) ,
829
855
}
830
856
}
831
857
}
0 commit comments