Skip to content

Commit f474bb9

Browse files
committed
feat(dispatcher): allow dispatch_blocking to return value
1 parent 7fd47c4 commit f474bb9

File tree

1 file changed

+39
-16
lines changed

1 file changed

+39
-16
lines changed

compio-dispatcher/src/lib.rs

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ use std::{
1111
thread::{available_parallelism, JoinHandle},
1212
};
1313

14-
use compio_driver::{AsyncifyPool, ProactorBuilder};
14+
use compio_driver::{AsyncifyPool, DispatchError, Dispatchable, ProactorBuilder};
1515
use compio_runtime::{event::Event, JoinHandle as CompioJoinHandle, Runtime};
1616
use flume::{unbounded, SendError, Sender};
1717
use futures_channel::oneshot;
1818

19-
type Dispatching = Box<dyn Dispatchable + Send>;
19+
type Spawning = Box<dyn Spawnable + Send>;
2020

21-
trait Dispatchable {
21+
trait Spawnable {
2222
fn spawn(self: Box<Self>, handle: &Runtime) -> CompioJoinHandle<()>;
2323
}
2424

@@ -28,7 +28,14 @@ struct Concrete<F, R> {
2828
func: F,
2929
}
3030

31-
impl<F, Fut, R> Dispatchable for Concrete<F, R>
31+
impl<F, R> Concrete<F, R> {
32+
pub fn new(func: F) -> (Self, oneshot::Receiver<R>) {
33+
let (tx, rx) = oneshot::channel();
34+
(Self { callback: tx, func }, rx)
35+
}
36+
}
37+
38+
impl<F, Fut, R> Spawnable for Concrete<F, R>
3239
where
3340
F: FnOnce() -> Fut + Send + 'static,
3441
Fut: Future<Output = R>,
@@ -43,10 +50,22 @@ where
4350
}
4451
}
4552

46-
#[derive(Debug)]
53+
impl<F, R> Dispatchable for Concrete<F, R>
54+
where
55+
F: FnOnce() -> R + Send + 'static,
56+
R: Send + 'static,
57+
{
58+
fn run(self: Box<Self>) {
59+
let Concrete { callback, func } = *self;
60+
let res = func();
61+
callback.send(res).ok();
62+
}
63+
}
64+
4765
/// The dispatcher. It manages the threads and dispatches the tasks.
66+
#[derive(Debug)]
4867
pub struct Dispatcher {
49-
sender: Sender<Dispatching>,
68+
sender: Sender<Spawning>,
5069
threads: Vec<JoinHandle<()>>,
5170
pool: AsyncifyPool,
5271
}
@@ -57,7 +76,7 @@ impl Dispatcher {
5776
let mut proactor_builder = builder.proactor_builder;
5877
proactor_builder.force_reuse_thread_pool();
5978
let pool = proactor_builder.create_or_get_thread_pool();
60-
let (sender, receiver) = unbounded::<Dispatching>();
79+
let (sender, receiver) = unbounded::<Spawning>();
6180

6281
let threads = (0..builder.nthreads)
6382
.map({
@@ -129,11 +148,8 @@ impl Dispatcher {
129148
Fut: Future<Output = R> + 'static,
130149
R: Send + 'static,
131150
{
132-
let (tx, rx) = oneshot::channel();
133-
let concrete: Concrete<Fn, R> = Concrete {
134-
callback: tx,
135-
func: f,
136-
};
151+
let (concrete, rx) = Concrete::new(f);
152+
137153
match self.sender.send(Box::new(concrete)) {
138154
Ok(_) => Ok(rx),
139155
Err(err) => {
@@ -157,11 +173,18 @@ impl Dispatcher {
157173
/// return an error with the original closure. The limit can be configured
158174
/// with [`DispatcherBuilder::proactor_builder`] and
159175
/// [`ProactorBuilder::thread_pool_limit`].
160-
pub fn dispatch_blocking<Fn>(&self, f: Fn) -> Result<(), SendError<Fn>>
176+
pub fn dispatch_blocking<Fn, R>(&self, f: Fn) -> Result<oneshot::Receiver<R>, DispatchError<Fn>>
161177
where
162-
Fn: FnOnce() + Send + 'static,
178+
Fn: FnOnce() -> R + Send + 'static,
179+
R: Send + 'static,
163180
{
164-
self.pool.dispatch(f).map_err(|f| SendError(f))
181+
let (concrete, rx) = Concrete::new(f);
182+
183+
self.pool
184+
.dispatch(concrete)
185+
.map_err(|e| DispatchError(e.0.func))?;
186+
187+
Ok(rx)
165188
}
166189

167190
/// Stop the dispatcher and wait for the threads to complete. If there is a
@@ -182,7 +205,7 @@ impl Dispatcher {
182205
handle.notify();
183206
}
184207
}) {
185-
std::thread::spawn(f);
208+
std::thread::spawn(f.0);
186209
}
187210
event.wait().await;
188211
let mut guard = results.lock().unwrap();

0 commit comments

Comments
 (0)