@@ -8,14 +8,15 @@ extern crate std;
8
8
use core:: sync:: atomic:: { AtomicBool , Ordering } ;
9
9
use std:: boxed:: Box ;
10
10
use std:: collections:: HashMap ;
11
+ use std:: ffi:: c_void;
11
12
use std:: fmt:: { self , Debug , Display } ;
12
13
use std:: future:: Future ;
13
14
use std:: mem;
14
15
use std:: pin:: Pin ;
15
16
use std:: ptr;
16
17
use std:: string:: String ;
17
18
use std:: sync:: Arc ;
18
- use std:: task:: { Context , Poll , Wake , Waker } ;
19
+ use std:: task:: { Context , Poll , Wake } ;
19
20
use std:: vec:: Vec ;
20
21
21
22
use futures:: channel:: oneshot;
@@ -36,6 +37,7 @@ macro_rules! rtdebug {
36
37
}
37
38
38
39
mod abi_buffer;
40
+ mod cabi;
39
41
mod future_support;
40
42
mod stream_support;
41
43
mod waitable;
@@ -60,21 +62,32 @@ struct FutureState {
60
62
tasks : Option < FuturesUnordered < BoxFuture > > ,
61
63
/// The waitable set containing waitables created by this task, if any.
62
64
waitable_set : Option < u32 > ,
63
- /// A map of waitables to the corresponding waker and completion code.
64
- ///
65
- /// This is primarily filled in and managed by `WaitableOperation<S>`. The
66
- /// waker here comes straight from `std::task::Context` and the pointer is
67
- /// otherwise stored within the `WaitableOperation<S>` The raw pointer here
68
- /// has a disconnected lifetime with each future but the management of the
69
- /// internal states with respect to drop should always ensure that this is
70
- /// only ever pointing to active waitable operations.
71
- ///
72
- /// When a waitable notification is received the corresponding entry in this
73
- /// map is removed, the status code is filled in, and the waker is notified.
74
- wakers : HashMap < u32 , ( Waker , * mut Option < u32 > ) > ,
65
+
66
+ /// State of all waitables in `waitable_set`, and the ptr/callback they're
67
+ /// associated with.
68
+ waitables : HashMap < u32 , ( * mut c_void , unsafe extern "C" fn ( * mut c_void , u32 ) ) > ,
69
+
70
+ /// Raw structure used to pass to `cabi::wasip3_task_set`
71
+ wasip3_task : cabi:: wasip3_task ,
75
72
}
76
73
77
74
impl FutureState {
75
+ fn new ( future : BoxFuture ) -> FutureState {
76
+ FutureState {
77
+ todo : 0 ,
78
+ tasks : Some ( [ future] . into_iter ( ) . collect ( ) ) ,
79
+ waitable_set : None ,
80
+ waitables : HashMap :: new ( ) ,
81
+ wasip3_task : cabi:: wasip3_task {
82
+ // This pointer is filled in before calling `wasip3_task_set`.
83
+ ptr : ptr:: null_mut ( ) ,
84
+ version : cabi:: WASIP3_TASK_V1 ,
85
+ waitable_register,
86
+ waitable_unregister,
87
+ } ,
88
+ }
89
+ }
90
+
78
91
fn get_or_create_waitable_set ( & mut self ) -> u32 {
79
92
* self . waitable_set . get_or_insert_with ( waitable_set_new)
80
93
}
@@ -88,7 +101,32 @@ impl FutureState {
88
101
}
89
102
90
103
fn remaining_work ( & self ) -> bool {
91
- self . todo > 0 || !self . wakers . is_empty ( )
104
+ self . todo > 0 || !self . waitables . is_empty ( )
105
+ }
106
+ }
107
+
108
+ unsafe extern "C" fn waitable_register (
109
+ ptr : * mut c_void ,
110
+ waitable : u32 ,
111
+ callback : unsafe extern "C" fn ( * mut c_void , u32 ) ,
112
+ callback_ptr : * mut c_void ,
113
+ ) -> * mut c_void {
114
+ let ptr = ptr. cast :: < FutureState > ( ) ;
115
+ assert ! ( !ptr. is_null( ) ) ;
116
+ ( * ptr) . add_waitable ( waitable) ;
117
+ match ( * ptr) . waitables . insert ( waitable, ( callback_ptr, callback) ) {
118
+ Some ( ( prev, _) ) => prev,
119
+ None => ptr:: null_mut ( ) ,
120
+ }
121
+ }
122
+
123
+ unsafe extern "C" fn waitable_unregister ( ptr : * mut c_void , waitable : u32 ) -> * mut c_void {
124
+ let ptr = ptr. cast :: < FutureState > ( ) ;
125
+ assert ! ( !ptr. is_null( ) ) ;
126
+ ( * ptr) . remove_waitable ( waitable) ;
127
+ match ( * ptr) . waitables . remove ( & waitable) {
128
+ Some ( ( prev, _) ) => prev,
129
+ None => ptr:: null_mut ( ) ,
92
130
}
93
131
}
94
132
@@ -145,6 +183,22 @@ unsafe fn poll(state: *mut FutureState) -> Poll<()> {
145
183
}
146
184
}
147
185
186
+ // Finish our `wasip3_task` by initializing its self-referential pointer,
187
+ // and then register it for the duration of this function with
188
+ // `wasip3_task_set`. The previous value of `wasip3_task_set` will get
189
+ // restored when this function returns.
190
+ struct ResetTask ( * mut cabi:: wasip3_task ) ;
191
+ impl Drop for ResetTask {
192
+ fn drop ( & mut self ) {
193
+ unsafe {
194
+ cabi:: wasip3_task_set ( self . 0 ) ;
195
+ }
196
+ }
197
+ }
198
+ ( * state) . wasip3_task . ptr = state. cast ( ) ;
199
+ let prev = cabi:: wasip3_task_set ( & mut ( * state) . wasip3_task ) ;
200
+ let _reset = ResetTask ( prev) ;
201
+
148
202
loop {
149
203
if let Some ( futures) = ( * state) . tasks . as_mut ( ) {
150
204
let old = CURRENT ;
@@ -191,16 +245,9 @@ pub fn first_poll<T: 'static>(
191
245
future : impl Future < Output = T > + ' static ,
192
246
fun : impl FnOnce ( & T ) + ' static ,
193
247
) -> i32 {
194
- let state = Box :: into_raw ( Box :: new ( FutureState {
195
- todo : 0 ,
196
- tasks : Some (
197
- [ Box :: pin ( future. map ( |v| fun ( & v) ) ) as BoxFuture ]
198
- . into_iter ( )
199
- . collect ( ) ,
200
- ) ,
201
- waitable_set : None ,
202
- wakers : HashMap :: new ( ) ,
203
- } ) ) ;
248
+ let state = Box :: into_raw ( Box :: new ( FutureState :: new ( Box :: pin (
249
+ future. map ( |v| fun ( & v) ) ,
250
+ ) ) ) ) ;
204
251
let done = unsafe { poll ( state) . is_ready ( ) } ;
205
252
unsafe { callback_code ( state, done) }
206
253
}
@@ -339,9 +386,8 @@ unsafe fn callback_with_state(
339
386
"EVENT_{{STREAM,FUTURE}}_{{READ,WRITE}}({event0:#x}, {event1:#x}, {event2:#x})"
340
387
) ;
341
388
( * state) . remove_waitable ( event1 as u32 ) ;
342
- let ( waker, code) = ( * state) . wakers . remove ( & ( event1 as u32 ) ) . unwrap ( ) ;
343
- * code = Some ( event2 as u32 ) ;
344
- waker. wake ( ) ;
389
+ let ( ptr, callback) = ( * state) . waitables . remove ( & ( event1 as u32 ) ) . unwrap ( ) ;
390
+ callback ( ptr, event2 as u32 ) ;
345
391
346
392
let done = poll ( state) . is_ready ( ) ;
347
393
callback_code ( state, done)
@@ -469,16 +515,7 @@ pub fn spawn(future: impl Future<Output = ()> + 'static) {
469
515
// TODO: refactor so `'static` bounds aren't necessary
470
516
pub fn block_on < T : ' static > ( future : impl Future < Output = T > + ' static ) -> T {
471
517
let ( tx, mut rx) = oneshot:: channel ( ) ;
472
- let state = & mut FutureState {
473
- todo : 0 ,
474
- tasks : Some (
475
- [ Box :: pin ( future. map ( move |v| drop ( tx. send ( v) ) ) ) as BoxFuture ]
476
- . into_iter ( )
477
- . collect ( ) ,
478
- ) ,
479
- waitable_set : None ,
480
- wakers : HashMap :: new ( ) ,
481
- } ;
518
+ let state = & mut FutureState :: new ( Box :: pin ( future. map ( move |v| drop ( tx. send ( v) ) ) ) as BoxFuture ) ;
482
519
loop {
483
520
match unsafe { poll ( state) } {
484
521
Poll :: Ready ( ( ) ) => break rx. try_recv ( ) . unwrap ( ) . unwrap ( ) ,
0 commit comments