3
3
#![ warn( missing_docs) ]
4
4
5
5
use std:: {
6
+ any:: Any ,
6
7
future:: Future ,
7
8
io,
8
9
num:: NonZeroUsize ,
9
- panic:: { resume_unwind, UnwindSafe } ,
10
+ panic:: resume_unwind,
11
+ pin:: Pin ,
10
12
sync:: { Arc , Mutex } ,
11
13
thread:: { available_parallelism, JoinHandle } ,
12
14
} ;
13
15
14
16
use compio_driver:: { AsyncifyPool , ProactorBuilder } ;
15
- use compio_runtime:: {
16
- event:: { Event , EventHandle } ,
17
- Runtime ,
18
- } ;
19
- use crossbeam_channel:: { unbounded, Sender } ;
20
- use futures_util:: { future:: LocalBoxFuture , FutureExt } ;
17
+ use compio_runtime:: { event:: Event , Runtime } ;
18
+ use flume:: { unbounded, SendError , Sender } ;
21
19
22
20
/// The dispatcher. It manages the threads and dispatches the tasks.
23
21
pub struct Dispatcher {
24
- sender : Sender < DispatcherClosure > ,
22
+ sender : Sender < Box < Closure > > ,
25
23
threads : Vec < JoinHandle < ( ) > > ,
26
24
pool : AsyncifyPool ,
27
25
}
@@ -32,13 +30,12 @@ impl Dispatcher {
32
30
let mut proactor_builder = builder. proactor_builder ;
33
31
proactor_builder. force_reuse_thread_pool ( ) ;
34
32
let pool = proactor_builder. create_or_get_thread_pool ( ) ;
33
+ let ( sender, receiver) = unbounded :: < Box < Closure > > ( ) ;
35
34
36
- let ( sender, receiver) = unbounded :: < DispatcherClosure > ( ) ;
37
35
let threads = ( 0 ..builder. nthreads )
38
36
. map ( {
39
37
|index| {
40
38
let proactor_builder = proactor_builder. clone ( ) ;
41
-
42
39
let receiver = receiver. clone ( ) ;
43
40
44
41
let thread_builder = std:: thread:: Builder :: new ( ) ;
@@ -54,17 +51,21 @@ impl Dispatcher {
54
51
} ;
55
52
56
53
thread_builder. spawn ( move || {
57
- let runtime = Runtime :: builder ( )
54
+ Runtime :: builder ( )
58
55
. with_proactor ( proactor_builder)
59
56
. build ( )
60
- . expect ( "cannot create compio runtime" ) ;
61
- let _guard = runtime. enter ( ) ;
62
- while let Ok ( f) = receiver. recv ( ) {
63
- * f. result . lock ( ) . unwrap ( ) = Some ( std:: panic:: catch_unwind ( || {
64
- Runtime :: current ( ) . block_on ( ( f. func ) ( ) ) ;
65
- } ) ) ;
66
- f. handle . notify ( ) ;
67
- }
57
+ . expect ( "cannot create compio runtime" )
58
+ . block_on ( async move {
59
+ let rt = Runtime :: current ( ) ;
60
+ while let Ok ( f) = receiver. recv_async ( ) . await {
61
+ let fut = ( f) ( ) ;
62
+ if builder. concurrent {
63
+ rt. spawn ( fut) . detach ( )
64
+ } else {
65
+ fut. await
66
+ }
67
+ }
68
+ } )
68
69
} )
69
70
}
70
71
} )
@@ -86,30 +87,73 @@ impl Dispatcher {
86
87
DispatcherBuilder :: default ( )
87
88
}
88
89
89
- /// Dispatch a task to the threads.
90
+ fn prepare < Fut , Fn , R > ( & self , f : Fn ) -> ( Executing < R > , Box < Closure > )
91
+ where
92
+ Fn : ( FnOnce ( ) -> Fut ) + Send + ' static ,
93
+ Fut : Future < Output = R > + ' static ,
94
+ R : Any + Send + ' static ,
95
+ {
96
+ let event = Event :: new ( ) ;
97
+ let handle = event. handle ( ) ;
98
+ let res = Arc :: new ( Mutex :: new ( None ) ) ;
99
+ let dispatched = Executing {
100
+ event,
101
+ result : res. clone ( ) ,
102
+ } ;
103
+ let closure = Box :: new ( || {
104
+ Box :: pin ( async move {
105
+ * res. lock ( ) . unwrap ( ) = Some ( f ( ) . await ) ;
106
+ handle. notify ( ) ;
107
+ } ) as BoxFuture < ( ) >
108
+ } ) ;
109
+ ( dispatched, closure)
110
+ }
111
+
112
+ /// Spawn a boxed closure to the threads.
113
+ ///
114
+ /// If all threads have panicked, this method will return an error with the
115
+ /// sent closure.
116
+ pub fn spawn ( & self , closure : Box < Closure > ) -> Result < ( ) , SendError < Box < Closure > > > {
117
+ self . sender . send ( closure)
118
+ }
119
+
120
+ /// Dispatch a task to the threads
90
121
///
91
122
/// The provided `f` should be [`Send`] because it will be send to another
92
123
/// thread before calling. The return [`Future`] need not to be [`Send`]
93
124
/// because it will be executed on only one thread.
94
- pub fn dispatch <
95
- F : Future < Output = ( ) > + ' static ,
96
- Fn : ( FnOnce ( ) -> F ) + Send + UnwindSafe + ' static ,
97
- > (
98
- & self ,
99
- f : Fn ,
100
- ) -> io:: Result < DispatcherJoinHandle > {
101
- let event = Event :: new ( ) ;
102
- let handle = event. handle ( ) ;
103
- let join_handle = DispatcherJoinHandle :: new ( event) ;
104
- let closure = DispatcherClosure {
105
- handle,
106
- result : join_handle. result . clone ( ) ,
107
- func : Box :: new ( || f ( ) . boxed_local ( ) ) ,
108
- } ;
109
- self . sender
110
- . send ( closure)
111
- . expect ( "the channel should not be disconnected" ) ;
112
- Ok ( join_handle)
125
+ ///
126
+ /// # Error
127
+ ///
128
+ /// If all threads have panicked, this method will return an error with the
129
+ /// sent closure.
130
+ pub fn dispatch < Fut , Fn > ( & self , f : Fn ) -> Result < ( ) , SendError < Box < Closure > > >
131
+ where
132
+ Fn : ( FnOnce ( ) -> Fut ) + Send + ' static ,
133
+ Fut : Future < Output = ( ) > + ' static ,
134
+ {
135
+ self . spawn ( Box :: new ( || Box :: pin ( f ( ) ) as BoxFuture < ( ) > ) )
136
+ }
137
+
138
+ /// Execute a task on the threads and retrieve its returned value.
139
+ ///
140
+ /// The provided `f` should be [`Send`] because it will be send to another
141
+ /// thread before calling. The return [`Future`] need not to be [`Send`]
142
+ /// because it will be executed on only one thread.
143
+ ///
144
+ /// # Error
145
+ ///
146
+ /// If all threads have panicked, this method will return an error with the
147
+ /// sent closure.
148
+ pub fn execute < Fut , Fn , R > ( & self , f : Fn ) -> Result < Executing < R > , SendError < Box < Closure > > >
149
+ where
150
+ Fn : ( FnOnce ( ) -> Fut ) + Send + ' static ,
151
+ Fut : Future < Output = R > + ' static ,
152
+ R : Any + Send + ' static ,
153
+ {
154
+ let ( dispatched, closure) = self . prepare ( f) ;
155
+ self . spawn ( closure) ?;
156
+ Ok ( dispatched)
113
157
}
114
158
115
159
/// Stop the dispatcher and wait for the threads to complete. If there is a
@@ -135,7 +179,6 @@ impl Dispatcher {
135
179
event. wait ( ) . await ;
136
180
let mut guard = results. lock ( ) . unwrap ( ) ;
137
181
for res in std:: mem:: take :: < Vec < std:: thread:: Result < ( ) > > > ( guard. as_mut ( ) ) {
138
- // The thread should not panic.
139
182
res. unwrap_or_else ( |e| resume_unwind ( e) ) ;
140
183
}
141
184
Ok ( ( ) )
@@ -145,6 +188,7 @@ impl Dispatcher {
145
188
/// A builder for [`Dispatcher`].
146
189
pub struct DispatcherBuilder {
147
190
nthreads : usize ,
191
+ concurrent : bool ,
148
192
stack_size : Option < usize > ,
149
193
names : Option < Box < dyn FnMut ( usize ) -> String > > ,
150
194
proactor_builder : ProactorBuilder ,
@@ -155,12 +199,22 @@ impl DispatcherBuilder {
155
199
pub fn new ( ) -> Self {
156
200
Self {
157
201
nthreads : available_parallelism ( ) . map ( |n| n. get ( ) ) . unwrap_or ( 1 ) ,
202
+ concurrent : true ,
158
203
stack_size : None ,
159
204
names : None ,
160
205
proactor_builder : ProactorBuilder :: new ( ) ,
161
206
}
162
207
}
163
208
209
+ /// If execute tasks concurrently. Default to be `true`.
210
+ ///
211
+ /// When set to `false`, tasks are executed sequentially without any
212
+ /// concurrency within the thread.
213
+ pub fn concurrent ( mut self , concurrent : bool ) -> Self {
214
+ self . concurrent = concurrent;
215
+ self
216
+ }
217
+
164
218
/// Set the number of worker threads of the dispatcher. The default value is
165
219
/// the CPU number. If the CPU number could not be retrieved, the
166
220
/// default value is 1.
@@ -199,36 +253,36 @@ impl Default for DispatcherBuilder {
199
253
}
200
254
}
201
255
202
- type Closure < ' a > = dyn ( FnOnce ( ) -> LocalBoxFuture < ' a , ( ) > ) + Send + UnwindSafe ;
256
+ type BoxFuture < T > = Pin < Box < dyn Future < Output = T > > > ;
257
+ type Closure = dyn ( FnOnce ( ) -> BoxFuture < ( ) > ) + Send ;
203
258
204
- struct DispatcherClosure {
205
- handle : EventHandle ,
206
- result : Arc < Mutex < Option < std:: thread:: Result < ( ) > > > > ,
207
- func : Box < Closure < ' static > > ,
208
- }
209
-
210
- /// The join handle for dispatched task.
211
- pub struct DispatcherJoinHandle {
259
+ /// The join handle for an executing task. It can be used to wait for the
260
+ /// task's returned value.
261
+ pub struct Executing < R > {
212
262
event : Event ,
213
- result : Arc < Mutex < Option < std :: thread :: Result < ( ) > > > > ,
263
+ result : Arc < Mutex < Option < R > > > ,
214
264
}
215
265
216
- impl DispatcherJoinHandle {
217
- pub ( crate ) fn new ( event : Event ) -> Self {
218
- Self {
219
- event,
220
- result : Arc :: new ( Mutex :: new ( None ) ) ,
266
+ impl < R : ' static > Executing < R > {
267
+ fn take ( val : & Mutex < Option < R > > ) -> R {
268
+ val. lock ( )
269
+ . unwrap ( )
270
+ . take ( )
271
+ . expect ( "the result should be set" )
272
+ }
273
+
274
+ /// Try to wait for the task to complete without blocking.
275
+ pub fn try_join ( self ) -> Result < R , Self > {
276
+ if self . event . notified ( ) {
277
+ Ok ( Self :: take ( & self . result ) )
278
+ } else {
279
+ Err ( self )
221
280
}
222
281
}
223
282
224
283
/// Wait for the task to complete.
225
- pub async fn join ( self ) -> io :: Result < std :: thread :: Result < ( ) > > {
284
+ pub async fn join ( self ) -> R {
226
285
self . event . wait ( ) . await ;
227
- Ok ( self
228
- . result
229
- . lock ( )
230
- . unwrap ( )
231
- . take ( )
232
- . expect ( "the result should be set" ) )
286
+ Self :: take ( & self . result )
233
287
}
234
288
}
0 commit comments