Skip to content

Commit 9a7f188

Browse files
authored
Merge pull request #260 from George-Miao/feat/dispatcher-blocking
2 parents 277f551 + 1ec893f commit 9a7f188

File tree

2 files changed

+117
-25
lines changed

2 files changed

+117
-25
lines changed

compio-dispatcher/src/lib.rs

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ use std::{
1111
thread::{available_parallelism, JoinHandle},
1212
};
1313

14-
use compio_driver::{AsyncifyPool, ProactorBuilder};
14+
use compio_driver::{AsyncifyPool, DispatchError, Dispatchable, ProactorBuilder};
1515
use compio_runtime::{event::Event, JoinHandle as CompioJoinHandle, Runtime};
16-
use flume::{unbounded, SendError, Sender};
16+
use flume::{unbounded, Sender};
1717
use futures_channel::oneshot;
1818

19-
type Dispatching = Box<dyn Dispatchable + Send>;
19+
type Spawning = Box<dyn Spawnable + Send>;
2020

21-
trait Dispatchable {
21+
trait Spawnable {
2222
fn spawn(self: Box<Self>, handle: &Runtime) -> CompioJoinHandle<()>;
2323
}
2424

@@ -28,7 +28,14 @@ struct Concrete<F, R> {
2828
func: F,
2929
}
3030

31-
impl<F, Fut, R> Dispatchable for Concrete<F, R>
31+
impl<F, R> Concrete<F, R> {
32+
pub fn new(func: F) -> (Self, oneshot::Receiver<R>) {
33+
let (tx, rx) = oneshot::channel();
34+
(Self { callback: tx, func }, rx)
35+
}
36+
}
37+
38+
impl<F, Fut, R> Spawnable for Concrete<F, R>
3239
where
3340
F: FnOnce() -> Fut + Send + 'static,
3441
Fut: Future<Output = R>,
@@ -43,10 +50,22 @@ where
4350
}
4451
}
4552

46-
#[derive(Debug)]
53+
impl<F, R> Dispatchable for Concrete<F, R>
54+
where
55+
F: FnOnce() -> R + Send + 'static,
56+
R: Send + 'static,
57+
{
58+
fn run(self: Box<Self>) {
59+
let Concrete { callback, func } = *self;
60+
let res = func();
61+
callback.send(res).ok();
62+
}
63+
}
64+
4765
/// The dispatcher. It manages the threads and dispatches the tasks.
66+
#[derive(Debug)]
4867
pub struct Dispatcher {
49-
sender: Sender<Dispatching>,
68+
sender: Sender<Spawning>,
5069
threads: Vec<JoinHandle<()>>,
5170
pool: AsyncifyPool,
5271
}
@@ -57,7 +76,7 @@ impl Dispatcher {
5776
let mut proactor_builder = builder.proactor_builder;
5877
proactor_builder.force_reuse_thread_pool();
5978
let pool = proactor_builder.create_or_get_thread_pool();
60-
let (sender, receiver) = unbounded::<Dispatching>();
79+
let (sender, receiver) = unbounded::<Spawning>();
6180

6281
let threads = (0..builder.nthreads)
6382
.map({
@@ -123,28 +142,51 @@ impl Dispatcher {
123142
///
124143
/// If all threads have panicked, this method will return an error with the
125144
/// sent closure.
126-
pub fn dispatch<Fut, Fn, R>(&self, f: Fn) -> Result<oneshot::Receiver<R>, SendError<Fn>>
145+
pub fn dispatch<Fn, Fut, R>(&self, f: Fn) -> Result<oneshot::Receiver<R>, DispatchError<Fn>>
127146
where
128147
Fn: (FnOnce() -> Fut) + Send + 'static,
129148
Fut: Future<Output = R> + 'static,
130149
R: Send + 'static,
131150
{
132-
let (tx, rx) = oneshot::channel();
133-
let concrete: Concrete<Fn, R> = Concrete {
134-
callback: tx,
135-
func: f,
136-
};
151+
let (concrete, rx) = Concrete::new(f);
152+
137153
match self.sender.send(Box::new(concrete)) {
138154
Ok(_) => Ok(rx),
139155
Err(err) => {
140156
// SAFETY: We know the dispatchable we sent has type `Concrete<Fn, R>`
141157
let recovered =
142158
unsafe { Box::from_raw(Box::into_raw(err.0) as *mut Concrete<Fn, R>) };
143-
Err(SendError(recovered.func))
159+
Err(DispatchError(recovered.func))
144160
}
145161
}
146162
}
147163

164+
/// Dispatch a blocking task to the threads.
165+
///
166+
/// Blocking pool of the dispatcher will be obtained from the proactor
167+
/// builder. So any configuration of the proactor's blocking pool will be
168+
/// applied to the dispatcher.
169+
///
170+
/// # Error
171+
///
172+
/// If all threads are busy and the thread pool is full, this method will
173+
/// return an error with the original closure. The limit can be configured
174+
/// with [`DispatcherBuilder::proactor_builder`] and
175+
/// [`ProactorBuilder::thread_pool_limit`].
176+
pub fn dispatch_blocking<Fn, R>(&self, f: Fn) -> Result<oneshot::Receiver<R>, DispatchError<Fn>>
177+
where
178+
Fn: FnOnce() -> R + Send + 'static,
179+
R: Send + 'static,
180+
{
181+
let (concrete, rx) = Concrete::new(f);
182+
183+
self.pool
184+
.dispatch(concrete)
185+
.map_err(|e| DispatchError(e.0.func))?;
186+
187+
Ok(rx)
188+
}
189+
148190
/// Stop the dispatcher and wait for the threads to complete. If there is a
149191
/// thread panicked, this method will resume the panic.
150192
pub async fn join(self) -> io::Result<()> {
@@ -163,7 +205,7 @@ impl Dispatcher {
163205
handle.notify();
164206
}
165207
}) {
166-
std::thread::spawn(f);
208+
std::thread::spawn(f.0);
167209
}
168210
event.wait().await;
169211
let mut guard = results.lock().unwrap();

compio-driver/src/asyncify.rs

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
fmt,
23
sync::{
34
atomic::{AtomicUsize, Ordering},
45
Arc,
@@ -8,7 +9,51 @@ use std::{
89

910
use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
1011

11-
type BoxClosure = Box<dyn FnOnce() + Send>;
12+
/// An error that may be emitted when all worker threads are busy. It simply
13+
/// returns the dispatchable value with a convenient [`fmt::Debug`] and
14+
/// [`fmt::Display`] implementation.
15+
#[derive(Copy, Clone, PartialEq, Eq)]
16+
pub struct DispatchError<T>(pub T);
17+
18+
impl<T> DispatchError<T> {
19+
/// Consume the error, yielding the dispatchable that failed to be sent.
20+
pub fn into_inner(self) -> T {
21+
self.0
22+
}
23+
}
24+
25+
impl<T> fmt::Debug for DispatchError<T> {
26+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27+
"DispatchError(..)".fmt(f)
28+
}
29+
}
30+
31+
impl<T> fmt::Display for DispatchError<T> {
32+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33+
"all threads are busy".fmt(f)
34+
}
35+
}
36+
37+
impl<T> std::error::Error for DispatchError<T> {}
38+
39+
type BoxedDispatchable = Box<dyn Dispatchable + Send>;
40+
41+
/// A trait for dispatching a closure. It's implemented for all `FnOnce() + Send
42+
/// + 'static` but may also be implemented for any other types that are `Send`
43+
/// and `'static`.
44+
pub trait Dispatchable: Send + 'static {
45+
/// Run the dispatchable
46+
fn run(self: Box<Self>);
47+
}
48+
49+
impl<F> Dispatchable for F
50+
where
51+
F: FnOnce() + Send + 'static,
52+
{
53+
fn run(self: Box<Self>) {
54+
(*self)()
55+
}
56+
}
1257

1358
struct CounterGuard(Arc<AtomicUsize>);
1459

@@ -19,24 +64,24 @@ impl Drop for CounterGuard {
1964
}
2065

2166
fn worker(
22-
receiver: Receiver<BoxClosure>,
67+
receiver: Receiver<BoxedDispatchable>,
2368
counter: Arc<AtomicUsize>,
2469
timeout: Duration,
2570
) -> impl FnOnce() {
2671
move || {
2772
counter.fetch_add(1, Ordering::AcqRel);
2873
let _guard = CounterGuard(counter);
2974
while let Ok(f) = receiver.recv_timeout(timeout) {
30-
f();
75+
f.run();
3176
}
3277
}
3378
}
3479

3580
/// A thread pool to perform blocking operations in other threads.
3681
#[derive(Debug, Clone)]
3782
pub struct AsyncifyPool {
38-
sender: Sender<BoxClosure>,
39-
receiver: Receiver<BoxClosure>,
83+
sender: Sender<BoxedDispatchable>,
84+
receiver: Receiver<BoxedDispatchable>,
4085
counter: Arc<AtomicUsize>,
4186
thread_limit: usize,
4287
recv_timeout: Duration,
@@ -56,15 +101,20 @@ impl AsyncifyPool {
56101
}
57102
}
58103

59-
/// Send a closure to another thread. Usually the user should not use it.
60-
pub fn dispatch<F: FnOnce() + Send + 'static>(&self, f: F) -> Result<(), F> {
61-
match self.sender.try_send(Box::new(f) as BoxClosure) {
104+
/// Send a dispatchable, usually a closure, to another thread. Usually the
105+
/// user should not use it. When all threads are busy and thread number
106+
/// limit has been reached, it will return an error with the original
107+
/// dispatchable.
108+
pub fn dispatch<D: Dispatchable>(&self, f: D) -> Result<(), DispatchError<D>> {
109+
match self.sender.try_send(Box::new(f) as BoxedDispatchable) {
62110
Ok(_) => Ok(()),
63111
Err(e) => match e {
64112
TrySendError::Full(f) => {
65113
if self.counter.load(Ordering::Acquire) >= self.thread_limit {
66114
// Safety: we can ensure the type
67-
Err(*unsafe { Box::from_raw(Box::into_raw(f).cast()) })
115+
Err(DispatchError(*unsafe {
116+
Box::from_raw(Box::into_raw(f).cast())
117+
}))
68118
} else {
69119
std::thread::spawn(worker(
70120
self.receiver.clone(),

0 commit comments

Comments
 (0)