@@ -6,6 +6,7 @@ use crate::zend::Function;
6
6
7
7
use std:: cell:: RefCell ;
8
8
use std:: fs:: File ;
9
+ use std:: future:: Future ;
9
10
use std:: io;
10
11
use std:: os:: fd:: { RawFd , FromRawFd } ;
11
12
use std:: sync:: mpsc:: { Sender , Receiver , channel} ;
@@ -18,6 +19,10 @@ lazy_static! {
18
19
pub static ref RUNTIME : Runtime = Runtime :: new( ) . expect( "Could not allocate runtime" ) ;
19
20
}
20
21
22
+ thread_local ! {
23
+ static EVENTLOOP : RefCell <Option <EventLoop >> = RefCell :: new( None ) ;
24
+ }
25
+
21
26
#[ cfg( any( target_os = "linux" , target_os = "solaris" ) ) ]
22
27
fn sys_pipe ( ) -> io:: Result < ( RawFd , RawFd ) > {
23
28
let mut pipefd = [ 0 ; 2 ] ;
@@ -43,7 +48,50 @@ pub struct EventLoop {
43
48
}
44
49
45
50
impl EventLoop {
46
- pub fn new ( ) -> PhpResult < Self > {
51
+ pub fn init ( ) -> PhpResult < u64 > {
52
+ EVENTLOOP . with_borrow_mut ( |e| {
53
+ Ok (
54
+ match e {
55
+ None => e. insert ( Self :: new ( ) ?) ,
56
+ Some ( ev) => ev
57
+ } . notify_receiver . as_raw_fd ( ) as u64
58
+ )
59
+ } )
60
+ }
61
+
62
+ pub fn suspend_on < T : Send + ' static , F : Future < Output = T > + Send + ' static > ( future : F ) -> T {
63
+ let future = EVENTLOOP . with_borrow_mut ( move |c| {
64
+ let c = c. as_mut ( ) . unwrap ( ) ;
65
+ let idx = c. fibers . len ( ) as u64 ;
66
+ c. fibers . insert_at_index ( idx, call_user_func ! ( c. get_current_suspension) . unwrap ( ) ) . unwrap ( ) ;
67
+
68
+ let sender = c. sender . clone ( ) ;
69
+ let mut notifier = c. notify_sender . try_clone ( ) . unwrap ( ) ;
70
+
71
+ RUNTIME . spawn ( async move {
72
+ let res = future. await ;
73
+ sender. send ( idx) . unwrap ( ) ;
74
+ :: std:: io:: Write :: write_all ( & mut notifier, & [ 0 ] ) . unwrap ( ) ;
75
+ res
76
+ } )
77
+ } ) ;
78
+
79
+ call_user_func ! ( Function :: from_method( "\\ Revolt\\ EventLoop" , "getSuspension" ) ) . unwrap ( ) . try_call_method ( "suspend" , vec ! [ ] ) . unwrap ( ) ;
80
+
81
+ return RUNTIME . block_on ( future) . unwrap ( ) ;
82
+ }
83
+
84
+ pub fn wakeup ( ) -> PhpResult < ( ) > {
85
+ EVENTLOOP . with_borrow_mut ( |c| {
86
+ c. as_mut ( ) . unwrap ( ) . wakeup_internal ( )
87
+ } )
88
+ }
89
+
90
+ pub fn shutdown ( ) {
91
+ EVENTLOOP . set ( None )
92
+ }
93
+
94
+ fn new ( ) -> PhpResult < Self > {
47
95
let ( sender, receiver) = channel ( ) ;
48
96
let ( notify_receiver, notify_sender) =
49
97
sys_pipe ( ) . map_err ( |err| format ! ( "Could not create pipe: {}" , err) ) ?;
@@ -62,11 +110,7 @@ impl EventLoop {
62
110
} )
63
111
}
64
112
65
- pub fn get_event_fd ( & self ) ->u64 {
66
- self . notify_receiver . as_raw_fd ( ) as u64
67
- }
68
-
69
- pub fn wakeup_internal ( & mut self ) -> PhpResult < ( ) > {
113
+ fn wakeup_internal ( & mut self ) -> PhpResult < ( ) > {
70
114
self . notify_receiver . read_exact ( & mut self . dummy ) . unwrap ( ) ;
71
115
72
116
for fiber_id in self . receiver . try_iter ( ) {
@@ -77,26 +121,11 @@ impl EventLoop {
77
121
}
78
122
Ok ( ( ) )
79
123
}
80
-
81
- pub fn prepare_resume ( & mut self ) -> u64 {
82
- let idx = self . fibers . len ( ) as u64 ;
83
- self . fibers . insert_at_index ( idx, call_user_func ! ( self . get_current_suspension) . unwrap ( ) ) . unwrap ( ) ;
84
-
85
- idx
86
- }
87
-
88
- pub fn suspend ( ) {
89
- call_user_func ! ( Function :: from_method( "\\ Revolt\\ EventLoop" , "getSuspension" ) ) . unwrap ( ) . try_call_method ( "suspend" , vec ! [ ] ) . unwrap ( ) ;
90
- }
91
-
92
- pub fn get_sender ( & self ) -> Sender < u64 > {
93
- self . sender . clone ( )
94
- }
95
- pub fn get_notify_sender ( & self ) -> File {
96
- self . notify_sender . try_clone ( ) . unwrap ( )
97
- }
98
124
}
99
125
100
- thread_local ! {
101
- pub static EVENTLOOP : RefCell <Option <EventLoop >> = RefCell :: new( None ) ;
102
- }
126
+ impl Drop for EventLoop {
127
+ fn drop ( & mut self ) {
128
+ unsafe { libc:: close ( self . notify_receiver . as_raw_fd ( ) ) } ;
129
+ unsafe { libc:: close ( self . notify_sender . as_raw_fd ( ) ) } ;
130
+ }
131
+ }
0 commit comments