3
3
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>
4
4
5
5
use futures;
6
- use futures:: executor :: { Executor , SpawnError } ;
6
+ use futures:: future :: { FutureObj , LocalFutureObj } ;
7
7
use futures:: prelude:: * ;
8
- use futures:: task:: { LocalMap , UnsafeWake , Waker } ;
9
- use futures:: { Async , Future , Never } ;
8
+ use futures:: task:: {
9
+ Context , LocalSpawn , Poll , RawWaker , RawWakerVTable , Spawn , SpawnError , Waker ,
10
+ } ;
10
11
use get_thread_id;
11
12
use glib_sys;
12
13
use std:: mem;
13
14
use std:: ptr;
14
15
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
15
16
use translate:: { from_glib_full, from_glib_none, mut_override, ToGlib } ;
17
+
16
18
use MainContext ;
17
19
use MainLoop ;
18
20
use Priority ;
@@ -28,27 +30,48 @@ const DONE: usize = 3;
28
30
#[ repr( C ) ]
29
31
struct TaskSource {
30
32
source : glib_sys:: GSource ,
31
- future : Option < ( Box < Future < Item = ( ) , Error = Never > > , Box < LocalMap > ) > ,
33
+ future : Option < FutureObj < ' static , ( ) > > ,
32
34
thread : Option < usize > ,
33
35
state : AtomicUsize ,
34
36
}
35
37
36
- unsafe impl UnsafeWake for TaskSource {
37
- unsafe fn clone_raw ( & self ) -> Waker {
38
- Waker :: new ( glib_sys:: g_source_ref ( mut_override ( & self . source ) ) as * const TaskSource )
39
- }
38
+ static TASK_SOURCE_WAKER_VTABLE : RawWakerVTable = RawWakerVTable :: new (
39
+ TaskSource :: clone_raw,
40
+ TaskSource :: wake_raw,
41
+ TaskSource :: wake_by_ref_raw,
42
+ TaskSource :: drop_raw,
43
+ ) ;
40
44
41
- unsafe fn drop_raw ( & self ) {
42
- glib_sys:: g_source_unref ( mut_override ( & self . source ) ) ;
45
+ impl TaskSource {
46
+ unsafe fn clone_raw ( waker : * const ( ) ) -> RawWaker {
47
+ let waker = & * ( waker as * const TaskSource ) ;
48
+ glib_sys:: g_source_ref ( mut_override ( & waker. source ) ) ;
49
+ RawWaker :: new ( waker as * const Self as * const ( ) , & TASK_SOURCE_WAKER_VTABLE )
50
+ }
51
+ unsafe fn wake_raw ( waker : * const ( ) ) {
52
+ Self :: wake_by_ref_raw ( waker) ;
53
+ Self :: drop_raw ( waker) ;
43
54
}
44
55
45
- unsafe fn wake ( & self ) {
46
- if self . state
47
- . compare_and_swap ( NOT_READY , READY , Ordering :: SeqCst ) == NOT_READY
56
+ unsafe fn wake_by_ref_raw ( waker : * const ( ) ) {
57
+ let waker = & * ( waker as * const TaskSource ) ;
58
+ if waker
59
+ . state
60
+ . compare_and_swap ( NOT_READY , READY , Ordering :: SeqCst )
61
+ == NOT_READY
48
62
{
49
- glib_sys:: g_source_set_ready_time ( mut_override ( & self . source ) , 0 ) ;
63
+ glib_sys:: g_source_set_ready_time ( mut_override ( & waker . source ) , 0 ) ;
50
64
}
51
65
}
66
+
67
+ unsafe fn drop_raw ( waker : * const ( ) ) {
68
+ let waker = & * ( waker as * const TaskSource ) ;
69
+ glib_sys:: g_source_unref ( mut_override ( & waker. source ) ) ;
70
+ }
71
+
72
+ fn as_waker ( & self ) -> Waker {
73
+ unsafe { Waker :: from_raw ( Self :: clone_raw ( self as * const Self as * const ( ) ) ) }
74
+ }
52
75
}
53
76
54
77
unsafe extern "C" fn prepare (
@@ -66,7 +89,7 @@ unsafe extern "C" fn prepare(
66
89
// XXX: This is not actually correct, we should not dispatch the
67
90
// GSource here already but we need to know its current status so
68
91
// that if it is not ready yet something can register to the waker
69
- if let Async :: Ready ( ( ) ) = source. poll ( ) {
92
+ if let Poll :: Ready ( ( ) ) = source. poll ( ) {
70
93
source. state . store ( DONE , Ordering :: SeqCst ) ;
71
94
cur = DONE ;
72
95
} else {
@@ -105,7 +128,7 @@ unsafe extern "C" fn dispatch(
105
128
. state
106
129
. compare_and_swap ( READY , NOT_READY , Ordering :: SeqCst ) ;
107
130
if cur == READY {
108
- if let Async :: Ready ( ( ) ) = source. poll ( ) {
131
+ if let Poll :: Ready ( ( ) ) = source. poll ( ) {
109
132
source. state . store ( DONE , Ordering :: SeqCst ) ;
110
133
cur = DONE ;
111
134
} else {
@@ -134,39 +157,31 @@ static SOURCE_FUNCS: glib_sys::GSourceFuncs = glib_sys::GSourceFuncs {
134
157
closure_marshal : None ,
135
158
} ;
136
159
160
+ unsafe impl Send for TaskSource { }
161
+ unsafe impl Sync for TaskSource { }
162
+
137
163
impl TaskSource {
138
164
#[ allow( clippy:: new_ret_no_self) ]
139
- fn new (
140
- priority : Priority ,
141
- future : Box < Future < Item = ( ) , Error = Never > + ' static + Send > ,
142
- ) -> Source {
143
- unsafe { Self :: new_unsafe ( priority, None , future) }
144
- }
145
-
146
- // NOTE: This does not have the Send bound and requires to be called from the same
147
- // thread where the main context is running
148
- unsafe fn new_unsafe (
149
- priority : Priority ,
150
- thread : Option < usize > ,
151
- future : Box < Future < Item = ( ) , Error = Never > + ' static > ,
152
- ) -> Source {
153
- let source = glib_sys:: g_source_new (
154
- mut_override ( & SOURCE_FUNCS ) ,
155
- mem:: size_of :: < TaskSource > ( ) as u32 ,
156
- ) ;
157
- {
158
- let source = & mut * ( source as * mut TaskSource ) ;
159
- ptr:: write ( & mut source. future , Some ( ( future, Box :: new ( LocalMap :: new ( ) ) ) ) ) ;
160
- source. thread = thread;
161
- source. state = AtomicUsize :: new ( INIT ) ;
162
- }
165
+ fn new ( priority : Priority , thread : Option < usize > , future : FutureObj < ' static , ( ) > ) -> Source {
166
+ unsafe {
167
+ let source = glib_sys:: g_source_new (
168
+ mut_override ( & SOURCE_FUNCS ) ,
169
+ mem:: size_of :: < TaskSource > ( ) as u32 ,
170
+ ) ;
171
+ {
172
+ let source = & mut * ( source as * mut TaskSource ) ;
173
+ ptr:: write ( & mut source. future , Some ( future) ) ;
174
+ source. thread = thread;
175
+ source. state = AtomicUsize :: new ( INIT ) ;
176
+ }
163
177
164
- glib_sys:: g_source_set_priority ( source, priority. to_glib ( ) ) ;
178
+ glib_sys:: g_source_set_priority ( source, priority. to_glib ( ) ) ;
165
179
166
- from_glib_full ( source)
180
+ from_glib_full ( source)
181
+ }
167
182
}
168
183
169
- fn poll ( & mut self ) -> Async < ( ) > {
184
+ fn poll ( & mut self ) -> Poll < ( ) > {
170
185
// Make sure that the first time we're polled that the current thread is remembered
171
186
// and from there one we ensure that we're always polled from exactly the same thread.
172
187
//
@@ -178,32 +193,34 @@ impl TaskSource {
178
193
* thread = Some ( get_thread_id ( ) ) ;
179
194
}
180
195
& mut Some ( thread_id) => {
181
- assert_eq ! ( get_thread_id( ) , thread_id,
182
- "Task polled on a different thread than before" ) ;
196
+ assert_eq ! (
197
+ get_thread_id( ) ,
198
+ thread_id,
199
+ "Task polled on a different thread than before"
200
+ ) ;
183
201
}
184
202
}
185
203
186
- let waker = unsafe { self . clone_raw ( ) } ;
204
+ let waker = self . as_waker ( ) ;
187
205
let source = & self . source as * const _ ;
188
206
if let Some ( ref mut future) = self . future {
189
- let ( ref mut future, ref mut local_map) = * future;
190
-
191
- let mut executor: MainContext = unsafe {
192
- from_glib_none ( glib_sys:: g_source_get_context ( mut_override ( source) ) )
193
- } ;
207
+ let mut executor: MainContext =
208
+ unsafe { from_glib_none ( glib_sys:: g_source_get_context ( mut_override ( source) ) ) } ;
194
209
195
- assert ! ( executor. is_owner( ) , "Polling futures only allowed if the thread is owning the MainContext" ) ;
210
+ assert ! (
211
+ executor. is_owner( ) ,
212
+ "Polling futures only allowed if the thread is owning the MainContext"
213
+ ) ;
196
214
197
215
// Clone that we store in the task local data so that
198
216
// it can be retrieved as needed
199
217
executor. push_thread_default ( ) ;
200
218
201
219
let res = {
202
220
let enter = futures:: executor:: enter ( ) . unwrap ( ) ;
203
- let mut context =
204
- futures:: task:: Context :: new ( local_map, & waker, & mut executor) ;
221
+ let mut context = Context :: from_waker ( & waker) ;
205
222
206
- let res = future. poll ( & mut context) . unwrap_or ( Async :: Ready ( ( ) ) ) ;
223
+ let res = future. poll_unpin ( & mut context) ;
207
224
208
225
drop ( enter) ;
209
226
@@ -213,7 +230,7 @@ impl TaskSource {
213
230
executor. pop_thread_default ( ) ;
214
231
res
215
232
} else {
216
- Async :: Ready ( ( ) )
233
+ Poll :: Ready ( ( ) )
217
234
}
218
235
}
219
236
}
@@ -223,7 +240,7 @@ impl MainContext {
223
240
///
224
241
/// This can be called from any thread and will execute the future from the thread
225
242
/// where main context is running, e.g. via a `MainLoop`.
226
- pub fn spawn < F : Future < Item = ( ) , Error = Never > + Send + ' static > ( & self , f : F ) {
243
+ pub fn spawn < F : Future < Output = ( ) > + Send + ' static > ( & self , f : F ) {
227
244
self . spawn_with_priority ( :: PRIORITY_DEFAULT , f) ;
228
245
}
229
246
@@ -234,17 +251,21 @@ impl MainContext {
234
251
/// This can be called only from the thread where the main context is running, e.g.
235
252
/// from any other `Future` that is executed on this main context, or after calling
236
253
/// `push_thread_default` or `acquire` on the main context.
237
- pub fn spawn_local < F : Future < Item = ( ) , Error = Never > + ' static > ( & self , f : F ) {
254
+ pub fn spawn_local < F : Future < Output = ( ) > + ' static > ( & self , f : F ) {
238
255
self . spawn_local_with_priority ( :: PRIORITY_DEFAULT , f) ;
239
256
}
240
257
241
258
/// Spawn a new infallible `Future` on the main context, with a non-default priority.
242
259
///
243
260
/// This can be called from any thread and will execute the future from the thread
244
261
/// where main context is running, e.g. via a `MainLoop`.
245
- pub fn spawn_with_priority < F : Future < Item = ( ) , Error = Never > + Send + ' static > ( & self , priority : Priority , f : F ) {
246
- let f = Box :: new ( f) ;
247
- let source = TaskSource :: new ( priority, f) ;
262
+ pub fn spawn_with_priority < F : Future < Output = ( ) > + Send + ' static > (
263
+ & self ,
264
+ priority : Priority ,
265
+ f : F ,
266
+ ) {
267
+ let f = FutureObj :: new ( Box :: new ( f) ) ;
268
+ let source = TaskSource :: new ( priority, None , f) ;
248
269
source. attach ( Some ( & * self ) ) ;
249
270
}
250
271
@@ -255,13 +276,23 @@ impl MainContext {
255
276
/// This can be called only from the thread where the main context is running, e.g.
256
277
/// from any other `Future` that is executed on this main context, or after calling
257
278
/// `push_thread_default` or `acquire` on the main context.
258
- pub fn spawn_local_with_priority < F : Future < Item = ( ) , Error = Never > + ' static > ( & self , priority : Priority , f : F ) {
259
- assert ! ( self . is_owner( ) , "Spawning local futures only allowed on the thread owning the MainContext" ) ;
260
- let f = Box :: new ( f) ;
279
+ pub fn spawn_local_with_priority < F : Future < Output = ( ) > + ' static > (
280
+ & self ,
281
+ priority : Priority ,
282
+ f : F ,
283
+ ) {
284
+ assert ! (
285
+ self . is_owner( ) ,
286
+ "Spawning local futures only allowed on the thread owning the MainContext"
287
+ ) ;
261
288
unsafe {
262
- // Ensure that this task is never polled on another thread
263
- // than this one where it was spawned now.
264
- let source = TaskSource :: new_unsafe ( priority, Some ( get_thread_id ( ) ) , f) ;
289
+ let f = LocalFutureObj :: new ( Box :: new ( f) ) ;
290
+ // We ensure here that we only ever run the future on this very task
291
+ // and that the futures executor is running on this task. Otherwise
292
+ // we will panic later.
293
+ // As such we can add the Send impl here safely
294
+ let f = f. into_future_obj ( ) ;
295
+ let source = TaskSource :: new ( priority, Some ( get_thread_id ( ) ) , f) ;
265
296
source. attach ( Some ( & * self ) ) ;
266
297
}
267
298
}
@@ -274,7 +305,7 @@ impl MainContext {
274
305
/// This must only be called if no `MainLoop` or anything else is running on this specific main
275
306
/// context.
276
307
#[ allow( clippy:: transmute_ptr_to_ptr) ]
277
- pub fn block_on < F : Future > ( & self , f : F ) -> Result < F :: Item , F :: Error > {
308
+ pub fn block_on < F : Future > ( & self , f : F ) -> F :: Output {
278
309
let mut res = None ;
279
310
let l = MainLoop :: new ( Some ( & * self ) , false ) ;
280
311
let l_clone = l. clone ( ) ;
@@ -283,17 +314,17 @@ impl MainContext {
283
314
let f = f. then ( |r| {
284
315
res = Some ( r) ;
285
316
l_clone. quit ( ) ;
286
- Ok :: < ( ) , Never > ( ( ) )
317
+ future :: ready ( ( ) )
287
318
} ) ;
288
319
289
- let f: * mut Future < Item = ( ) , Error = Never > = Box :: into_raw ( Box :: new ( f) ) ;
290
- // XXX: Transmute to get a 'static lifetime here, super unsafe
291
- let f: * mut ( Future < Item = ( ) , Error = Never > + ' static ) = mem:: transmute ( f) ;
292
- let f: Box < Future < Item = ( ) , Error = Never > + ' static > = Box :: from_raw ( f) ;
320
+ // Super-unsafe: We transmute here to get rid of the 'static lifetime
321
+ let f = LocalFutureObj :: new ( Box :: new ( f) ) ;
322
+ let f: ( LocalFutureObj < ' static , ( ) > ) = mem:: transmute ( f) ;
323
+
324
+ // And ensure that we are only ever running on this very thread.
325
+ let f = f. into_future_obj ( ) ;
293
326
294
- // Ensure that this task is never polled on another thread
295
- // than this one where it was spawned now.
296
- let source = TaskSource :: new_unsafe ( :: PRIORITY_DEFAULT , Some ( get_thread_id ( ) ) , f) ;
327
+ let source = TaskSource :: new ( :: PRIORITY_DEFAULT , Some ( get_thread_id ( ) ) , f) ;
297
328
source. attach ( Some ( & * self ) ) ;
298
329
}
299
330
@@ -303,10 +334,19 @@ impl MainContext {
303
334
}
304
335
}
305
336
306
- impl Executor for MainContext {
307
- fn spawn ( & mut self , f : Box < Future < Item = ( ) , Error = Never > + Send > ) -> Result < ( ) , SpawnError > {
308
- let f = Box :: new ( f) ;
309
- let source = TaskSource :: new ( :: PRIORITY_DEFAULT , f) ;
337
+ impl Spawn for MainContext {
338
+ fn spawn_obj ( & mut self , f : FutureObj < ' static , ( ) > ) -> Result < ( ) , SpawnError > {
339
+ let source = TaskSource :: new ( :: PRIORITY_DEFAULT , None , f) ;
340
+ source. attach ( Some ( & * self ) ) ;
341
+ Ok ( ( ) )
342
+ }
343
+ }
344
+
345
+ impl LocalSpawn for MainContext {
346
+ fn spawn_local_obj ( & mut self , f : LocalFutureObj < ' static , ( ) > ) -> Result < ( ) , SpawnError > {
347
+ let source = TaskSource :: new ( :: PRIORITY_DEFAULT , Some ( get_thread_id ( ) ) , unsafe {
348
+ f. into_future_obj ( )
349
+ } ) ;
310
350
source. attach ( Some ( & * self ) ) ;
311
351
Ok ( ( ) )
312
352
}
@@ -315,10 +355,9 @@ impl Executor for MainContext {
315
355
#[ cfg( test) ]
316
356
mod tests {
317
357
use super :: * ;
318
- use std:: thread;
319
- use std:: sync:: mpsc;
320
- use futures:: future;
321
358
use futures:: channel:: oneshot;
359
+ use std:: sync:: mpsc;
360
+ use std:: thread;
322
361
323
362
#[ test]
324
363
fn test_spawn ( ) {
@@ -329,14 +368,15 @@ mod tests {
329
368
let ( o_sender, o_receiver) = oneshot:: channel ( ) ;
330
369
331
370
let l_clone = l. clone ( ) ;
332
- c. spawn ( o_receiver
333
- . map_err ( |_| unimplemented ! ( ) )
371
+ c. spawn (
372
+ o_receiver
334
373
. and_then ( move |( ) | {
335
374
sender. send ( ( ) ) . unwrap ( ) ;
336
375
l_clone. quit ( ) ;
337
376
338
- Ok ( ( ) )
377
+ future :: ok ( ( ) )
339
378
} )
379
+ . then ( |res| future:: ready ( res. unwrap ( ) ) ) ,
340
380
) ;
341
381
342
382
thread:: spawn ( move || {
@@ -357,8 +397,6 @@ mod tests {
357
397
let l_clone = l. clone ( ) ;
358
398
c. spawn_local ( future:: lazy ( move |_ctx| {
359
399
l_clone. quit ( ) ;
360
-
361
- Ok ( ( ) )
362
400
} ) ) ;
363
401
364
402
l. run ( ) ;
0 commit comments