Skip to content

Commit 4d6d3f1

Browse files
authored
Merge pull request #255 from George-Miao/refactor/dispatcher
refactor(dispatcher): reduces alloc and API change
2 parents 8429dc7 + f1822c0 commit 4d6d3f1

File tree

6 files changed

+63
-106
lines changed

6 files changed

+63
-106
lines changed

compio-dispatcher/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ compio-driver = { workspace = true }
1616
compio-runtime = { workspace = true, features = ["event", "time"] }
1717

1818
flume = { workspace = true }
19-
futures-util = { workspace = true }
19+
futures-channel = { workspace = true }
2020

2121
[dev-dependencies]
2222
compio-buf = { workspace = true }
2323
compio-io = { workspace = true }
2424
compio-net = { workspace = true }
2525
compio-macros = { workspace = true }
26+
27+
futures-util = { workspace = true }

compio-dispatcher/src/lib.rs

Lines changed: 53 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,49 @@
33
#![warn(missing_docs)]
44

55
use std::{
6-
any::Any,
76
future::Future,
87
io,
98
num::NonZeroUsize,
109
panic::resume_unwind,
11-
pin::Pin,
1210
sync::{Arc, Mutex},
1311
thread::{available_parallelism, JoinHandle},
1412
};
1513

1614
use compio_driver::{AsyncifyPool, ProactorBuilder};
17-
use compio_runtime::{event::Event, Runtime};
15+
use compio_runtime::{event::Event, JoinHandle as CompioJoinHandle, Runtime};
1816
use flume::{unbounded, SendError, Sender};
17+
use futures_channel::oneshot;
18+
19+
type Dispatching = Box<dyn Dispatchable + Send>;
20+
21+
trait Dispatchable {
22+
fn spawn(self: Box<Self>, handle: &Runtime) -> CompioJoinHandle<()>;
23+
}
24+
25+
/// Concrete type for the closure we're sending to worker threads
26+
struct Concrete<F, R> {
27+
callback: oneshot::Sender<R>,
28+
func: F,
29+
}
30+
31+
impl<F, Fut, R> Dispatchable for Concrete<F, R>
32+
where
33+
F: FnOnce() -> Fut + Send + 'static,
34+
Fut: Future<Output = R>,
35+
R: Send + 'static,
36+
{
37+
fn spawn(self: Box<Self>, handle: &Runtime) -> CompioJoinHandle<()> {
38+
let Concrete { callback, func } = *self;
39+
handle.spawn(async move {
40+
let res = func().await;
41+
callback.send(res).ok();
42+
})
43+
}
44+
}
1945

2046
/// The dispatcher. It manages the threads and dispatches the tasks.
2147
pub struct Dispatcher {
22-
sender: Sender<Box<Closure>>,
48+
sender: Sender<Dispatching>,
2349
threads: Vec<JoinHandle<()>>,
2450
pool: AsyncifyPool,
2551
}
@@ -30,7 +56,7 @@ impl Dispatcher {
3056
let mut proactor_builder = builder.proactor_builder;
3157
proactor_builder.force_reuse_thread_pool();
3258
let pool = proactor_builder.create_or_get_thread_pool();
33-
let (sender, receiver) = unbounded::<Box<Closure>>();
59+
let (sender, receiver) = unbounded::<Dispatching>();
3460

3561
let threads = (0..builder.nthreads)
3662
.map({
@@ -57,14 +83,14 @@ impl Dispatcher {
5783
.expect("cannot create compio runtime")
5884
.block_on(async move {
5985
while let Ok(f) = receiver.recv_async().await {
60-
let fut = (f)();
86+
let task = Runtime::with_current(|rt| f.spawn(rt));
6187
if builder.concurrent {
62-
compio_runtime::spawn(fut).detach()
88+
task.detach()
6389
} else {
64-
fut.await
90+
task.await.ok();
6591
}
6692
}
67-
})
93+
});
6894
})
6995
}
7096
})
@@ -86,75 +112,36 @@ impl Dispatcher {
86112
DispatcherBuilder::default()
87113
}
88114

89-
fn prepare<Fut, Fn, R>(&self, f: Fn) -> (Executing<R>, Box<Closure>)
90-
where
91-
Fn: (FnOnce() -> Fut) + Send + 'static,
92-
Fut: Future<Output = R> + 'static,
93-
R: Any + Send + 'static,
94-
{
95-
let event = Event::new();
96-
let handle = event.handle();
97-
let res = Arc::new(Mutex::new(None));
98-
let dispatched = Executing {
99-
event,
100-
result: res.clone(),
101-
};
102-
let closure = Box::new(|| {
103-
Box::pin(async move {
104-
*res.lock().unwrap() = Some(f().await);
105-
handle.notify();
106-
}) as BoxFuture<()>
107-
});
108-
(dispatched, closure)
109-
}
110-
111-
/// Spawn a boxed closure to the threads.
112-
///
113-
/// If all threads have panicked, this method will return an error with the
114-
/// sent closure.
115-
pub fn spawn(&self, closure: Box<Closure>) -> Result<(), SendError<Box<Closure>>> {
116-
self.sender.send(closure)
117-
}
118-
119115
/// Dispatch a task to the threads
120116
///
121117
/// The provided `f` should be [`Send`] because it will be send to another
122-
/// thread before calling. The return [`Future`] need not to be [`Send`]
118+
/// thread before calling. The returned [`Future`] need not to be [`Send`]
123119
/// because it will be executed on only one thread.
124120
///
125121
/// # Error
126122
///
127123
/// If all threads have panicked, this method will return an error with the
128-
/// sent closure. Notice that the returned closure is not the same as the
129-
/// argument and cannot be simply transmuted back to `Fn`.
130-
pub fn dispatch<Fut, Fn>(&self, f: Fn) -> Result<(), SendError<Box<Closure>>>
131-
where
132-
Fn: (FnOnce() -> Fut) + Send + 'static,
133-
Fut: Future<Output = ()> + 'static,
134-
{
135-
self.spawn(Box::new(|| Box::pin(f()) as BoxFuture<()>))
136-
}
137-
138-
/// Execute a task on the threads and retrieve its returned value.
139-
///
140-
/// The provided `f` should be [`Send`] because it will be send to another
141-
/// thread before calling. The return [`Future`] need not to be [`Send`]
142-
/// because it will be executed on only one thread.
143-
///
144-
/// # Error
145-
///
146-
/// If all threads have panicked, this method will return an error with the
147-
/// sent closure. Notice that the returned closure is not the same as the
148-
/// argument and cannot be simply transmuted back to `Fn`.
149-
pub fn execute<Fut, Fn, R>(&self, f: Fn) -> Result<Executing<R>, SendError<Box<Closure>>>
124+
/// sent closure.
125+
pub fn dispatch<Fut, Fn, R>(&self, f: Fn) -> Result<oneshot::Receiver<R>, SendError<Fn>>
150126
where
151127
Fn: (FnOnce() -> Fut) + Send + 'static,
152128
Fut: Future<Output = R> + 'static,
153-
R: Any + Send + 'static,
129+
R: Send + 'static,
154130
{
155-
let (dispatched, closure) = self.prepare(f);
156-
self.spawn(closure)?;
157-
Ok(dispatched)
131+
let (tx, rx) = oneshot::channel();
132+
let concrete: Concrete<Fn, R> = Concrete {
133+
callback: tx,
134+
func: f,
135+
};
136+
match self.sender.send(Box::new(concrete)) {
137+
Ok(_) => Ok(rx),
138+
Err(err) => {
139+
// SAFETY: We know the dispatchable we sent has type `Concrete<Fn, R>`
140+
let recovered =
141+
unsafe { Box::from_raw(Box::into_raw(err.0) as *mut Concrete<Fn, R>) };
142+
Err(SendError(recovered.func))
143+
}
144+
}
158145
}
159146

160147
/// Stop the dispatcher and wait for the threads to complete. If there is a
@@ -253,37 +240,3 @@ impl Default for DispatcherBuilder {
253240
Self::new()
254241
}
255242
}
256-
257-
type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>;
258-
type Closure = dyn (FnOnce() -> BoxFuture<()>) + Send;
259-
260-
/// The join handle for an executing task. It can be used to wait for the
261-
/// task's returned value.
262-
pub struct Executing<R> {
263-
event: Event,
264-
result: Arc<Mutex<Option<R>>>,
265-
}
266-
267-
impl<R: 'static> Executing<R> {
268-
fn take(val: &Mutex<Option<R>>) -> R {
269-
val.lock()
270-
.unwrap()
271-
.take()
272-
.expect("the result should be set")
273-
}
274-
275-
/// Try to wait for the task to complete without blocking.
276-
pub fn try_join(self) -> Result<R, Self> {
277-
if self.event.notified() {
278-
Ok(Self::take(&self.result))
279-
} else {
280-
Err(self)
281-
}
282-
}
283-
284-
/// Wait for the task to complete.
285-
pub async fn join(self) -> R {
286-
self.event.wait().await;
287-
Self::take(&self.result)
288-
}
289-
}

compio-dispatcher/tests/listener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ async fn listener_dispatch() {
2929
for _i in 0..CLIENT_NUM {
3030
let (mut srv, _) = listener.accept().await.unwrap();
3131
let handle = dispatcher
32-
.execute(move || async move {
32+
.dispatch(move || async move {
3333
let (_, buf) = srv.read_exact(ArrayVec::<u8, 12>::new()).await.unwrap();
3434
assert_eq!(buf.as_slice(), b"Hello world!");
3535
})
3636
.unwrap();
37-
handles.push(handle.join());
37+
handles.push(handle);
3838
}
3939
while handles.next().await.is_some() {}
4040
let (_, results) = futures_util::join!(task, dispatcher.join());

compio-runtime/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ pub mod time;
2222
pub use async_task::Task;
2323
pub use attacher::*;
2424
use compio_buf::BufResult;
25-
pub use runtime::{spawn, spawn_blocking, submit, Runtime, RuntimeBuilder};
25+
pub use runtime::{spawn, spawn_blocking, submit, JoinHandle, Runtime, RuntimeBuilder};

compio-runtime/src/runtime/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ use crate::{runtime::op::OpFuture, BufResult};
3333

3434
scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);
3535

36+
/// Type alias for `Task<Result<T, Box<dyn Any + Send>>>`, which resolves to an
37+
/// `Err` when the spawned future panicked.
3638
pub type JoinHandle<T> = Task<Result<T, Box<dyn Any + Send>>>;
3739

3840
/// The async runtime of compio. It is a thread local runtime, and cannot be

compio/examples/dispatcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ async fn main() {
3535
for _i in 0..CLIENT_NUM {
3636
let (mut srv, _) = listener.accept().await.unwrap();
3737
let handle = dispatcher
38-
.execute(move || async move {
38+
.dispatch(move || async move {
3939
let BufResult(res, buf) = srv.read(Vec::with_capacity(20)).await;
4040
res.unwrap();
4141
println!("{}", std::str::from_utf8(&buf).unwrap());
4242
})
4343
.unwrap();
44-
handles.push(handle.join());
44+
handles.push(handle);
4545
}
4646
while handles.next().await.is_some() {}
4747
dispatcher.join().await.unwrap();

0 commit comments

Comments
 (0)