1
1
// Take a look at the license at the top of the repository in the LICENSE file.
2
2
3
- use std:: { future:: Future , ptr} ;
3
+ use std:: { future:: Future , panic , ptr} ;
4
4
5
5
use futures_channel:: oneshot;
6
6
@@ -13,6 +13,30 @@ pub struct ThreadPool(ptr::NonNull<ffi::GThreadPool>);
13
13
unsafe impl Send for ThreadPool { }
14
14
unsafe impl Sync for ThreadPool { }
15
15
16
+ // rustdoc-stripper-ignore-next
17
+ /// A handle to a thread running on a [`ThreadPool`].
18
+ ///
19
+ /// Like [`std::thread::JoinHandle`] for a GLib thread. The return value from the task can be
20
+ /// retrieved by calling [`ThreadHandle::join`]. Dropping the handle "detaches" the thread,
21
+ /// allowing it to complete but discarding the return value.
22
+ #[ derive( Debug ) ]
23
+ pub struct ThreadHandle < T > {
24
+ rx : std:: sync:: mpsc:: Receiver < std:: thread:: Result < T > > ,
25
+ }
26
+
27
+ impl < T > ThreadHandle < T > {
28
+ // rustdoc-stripper-ignore-next
29
+ /// Waits for the associated thread to finish.
30
+ ///
31
+ /// Blocks until the associated thread returns. Returns `Ok` with the value returned from the
32
+ /// thread, or `Err` if the thread panicked. This function will return immediately if the
33
+ /// associated thread has already finished.
34
+ #[ inline]
35
+ pub fn join ( self ) -> std:: thread:: Result < T > {
36
+ self . rx . recv ( ) . unwrap ( )
37
+ }
38
+ }
39
+
16
40
impl ThreadPool {
17
41
#[ doc( alias = "g_thread_pool_new" ) ]
18
42
pub fn shared ( max_threads : Option < u32 > ) -> Result < Self , crate :: Error > {
@@ -53,9 +77,15 @@ impl ThreadPool {
53
77
}
54
78
55
79
#[ doc( alias = "g_thread_pool_push" ) ]
56
- pub fn push < F : FnOnce ( ) + Send + ' static > ( & self , func : F ) -> Result < ( ) , crate :: Error > {
80
+ pub fn push < T : Send + ' static , F : FnOnce ( ) -> T + Send + ' static > (
81
+ & self ,
82
+ func : F ,
83
+ ) -> Result < ThreadHandle < T > , crate :: Error > {
84
+ let ( tx, rx) = std:: sync:: mpsc:: sync_channel ( 1 ) ;
57
85
unsafe {
58
- let func: Box < dyn FnOnce ( ) + Send + ' static > = Box :: new ( func) ;
86
+ let func: Box < dyn FnOnce ( ) + Send + ' static > = Box :: new ( move || {
87
+ let _ = tx. send ( panic:: catch_unwind ( panic:: AssertUnwindSafe ( func) ) ) ;
88
+ } ) ;
59
89
let func = Box :: new ( func) ;
60
90
let mut err = ptr:: null_mut ( ) ;
61
91
@@ -66,7 +96,7 @@ impl ThreadPool {
66
96
& mut err,
67
97
) ) ;
68
98
if ret {
69
- Ok ( ( ) )
99
+ Ok ( ThreadHandle { rx } )
70
100
} else {
71
101
let _ = Box :: from_raw ( func) ;
72
102
Err ( from_glib_full ( err) )
@@ -77,11 +107,12 @@ impl ThreadPool {
77
107
pub fn push_future < T : Send + ' static , F : FnOnce ( ) -> T + Send + ' static > (
78
108
& self ,
79
109
func : F ,
80
- ) -> Result < impl Future < Output = T > + Send + Sync + ' static , crate :: Error > {
110
+ ) -> Result < impl Future < Output = std:: thread:: Result < T > > + Send + Sync + ' static , crate :: Error >
111
+ {
81
112
let ( sender, receiver) = oneshot:: channel ( ) ;
82
113
83
114
self . push ( move || {
84
- let _ = sender. send ( func ( ) ) ;
115
+ let _ = sender. send ( panic :: catch_unwind ( panic :: AssertUnwindSafe ( func ) ) ) ;
85
116
} ) ?;
86
117
87
118
Ok ( async move { receiver. await . expect ( "Dropped before executing" ) } )
@@ -198,11 +229,14 @@ mod tests {
198
229
let p = ThreadPool :: exclusive ( 1 ) . unwrap ( ) ;
199
230
let ( sender, receiver) = mpsc:: channel ( ) ;
200
231
201
- p. push ( move || {
202
- sender. send ( true ) . unwrap ( ) ;
203
- } )
204
- . unwrap ( ) ;
232
+ let handle = p
233
+ . push ( move || {
234
+ sender. send ( true ) . unwrap ( ) ;
235
+ 123
236
+ } )
237
+ . unwrap ( ) ;
205
238
239
+ assert_eq ! ( handle. join( ) . unwrap( ) , 123 ) ;
206
240
assert_eq ! ( receiver. recv( ) , Ok ( true ) ) ;
207
241
}
208
242
@@ -214,6 +248,6 @@ mod tests {
214
248
let fut = p. push_future ( || true ) . unwrap ( ) ;
215
249
216
250
let res = c. block_on ( fut) ;
217
- assert ! ( res) ;
251
+ assert ! ( res. unwrap ( ) ) ;
218
252
}
219
253
}
0 commit comments