Skip to content

Commit 7fd47c4

Browse files
committed
feat(driver): allow asyncify pool to spawn things other than FnOnce
1 parent 8a41ec2 commit 7fd47c4

File tree

1 file changed

+59
-9
lines changed

1 file changed

+59
-9
lines changed

compio-driver/src/asyncify.rs

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
fmt,
23
sync::{
34
atomic::{AtomicUsize, Ordering},
45
Arc,
@@ -8,7 +9,51 @@ use std::{
89

910
use crossbeam_channel::{bounded, Receiver, Sender, TrySendError};
1011

11-
type BoxClosure = Box<dyn FnOnce() + Send>;
12+
/// An error that may be emitted when all worker threads are busy. It simply
13+
/// returns the dispatchable value with a convenient [`fmt::Debug`] and
14+
/// [`fmt::Display`] implementation.
15+
#[derive(Copy, Clone, PartialEq, Eq)]
16+
pub struct DispatchError<T>(pub T);
17+
18+
impl<T> DispatchError<T> {
19+
/// Consume the error, yielding the dispatchable that failed to be sent.
20+
pub fn into_inner(self) -> T {
21+
self.0
22+
}
23+
}
24+
25+
impl<T> fmt::Debug for DispatchError<T> {
26+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27+
"DispatchError(..)".fmt(f)
28+
}
29+
}
30+
31+
impl<T> fmt::Display for DispatchError<T> {
32+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33+
"all threads are busy".fmt(f)
34+
}
35+
}
36+
37+
impl<T> std::error::Error for DispatchError<T> {}
38+
39+
type BoxedDispatchable = Box<dyn Dispatchable + Send>;
40+
41+
/// A trait for dispatching a closure. It's implemented for all `FnOnce() + Send
42+
/// + 'static` but may also be implemented for any other types that are `Send`
43+
/// and `'static`.
44+
pub trait Dispatchable: Send + 'static {
45+
/// Run the dispatchable
46+
fn run(self: Box<Self>);
47+
}
48+
49+
impl<F> Dispatchable for F
50+
where
51+
F: FnOnce() + Send + 'static,
52+
{
53+
fn run(self: Box<Self>) {
54+
(*self)()
55+
}
56+
}
1257

1358
struct CounterGuard(Arc<AtomicUsize>);
1459

@@ -19,24 +64,24 @@ impl Drop for CounterGuard {
1964
}
2065

2166
fn worker(
22-
receiver: Receiver<BoxClosure>,
67+
receiver: Receiver<BoxedDispatchable>,
2368
counter: Arc<AtomicUsize>,
2469
timeout: Duration,
2570
) -> impl FnOnce() {
2671
move || {
2772
counter.fetch_add(1, Ordering::AcqRel);
2873
let _guard = CounterGuard(counter);
2974
while let Ok(f) = receiver.recv_timeout(timeout) {
30-
f();
75+
f.run();
3176
}
3277
}
3378
}
3479

3580
/// A thread pool to perform blocking operations in other threads.
3681
#[derive(Debug, Clone)]
3782
pub struct AsyncifyPool {
38-
sender: Sender<BoxClosure>,
39-
receiver: Receiver<BoxClosure>,
83+
sender: Sender<BoxedDispatchable>,
84+
receiver: Receiver<BoxedDispatchable>,
4085
counter: Arc<AtomicUsize>,
4186
thread_limit: usize,
4287
recv_timeout: Duration,
@@ -56,15 +101,20 @@ impl AsyncifyPool {
56101
}
57102
}
58103

59-
/// Send a closure to another thread. Usually the user should not use it.
60-
pub fn dispatch<F: FnOnce() + Send + 'static>(&self, f: F) -> Result<(), F> {
61-
match self.sender.try_send(Box::new(f) as BoxClosure) {
104+
/// Send a dispatchable, usually a closure, to another thread. Usually the
105+
/// user should not use it. When all threads are busy and thread number
106+
/// limit has been reached, it will return an error with the original
107+
/// dispatchable.
108+
pub fn dispatch<D: Dispatchable>(&self, f: D) -> Result<(), DispatchError<D>> {
109+
match self.sender.try_send(Box::new(f) as BoxedDispatchable) {
62110
Ok(_) => Ok(()),
63111
Err(e) => match e {
64112
TrySendError::Full(f) => {
65113
if self.counter.load(Ordering::Acquire) >= self.thread_limit {
66114
// Safety: we can ensure the type
67-
Err(*unsafe { Box::from_raw(Box::into_raw(f).cast()) })
115+
Err(DispatchError(*unsafe {
116+
Box::from_raw(Box::into_raw(f).cast())
117+
}))
68118
} else {
69119
std::thread::spawn(worker(
70120
self.receiver.clone(),

0 commit comments

Comments
 (0)