Skip to content

Commit 1f8497a

Browse files
authored
Merge pull request #244 from George-Miao/feat/concurrent-dispatcher
feat(dispatcher): Make concurrency
2 parents c307b71 + 3828594 commit 1f8497a

File tree

6 files changed

+129
-74
lines changed

6 files changed

+129
-74
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@
33
/.vscode
44
/.cargo
55
/.idea
6+
/.direnv
7+
.envrc

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ compio-dispatcher = { path = "./compio-dispatcher", version = "0.2.0" }
3535
compio-log = { path = "./compio-log", version = "0.1.0" }
3636
compio-tls = { path = "./compio-tls", version = "0.2.0", default-features = false }
3737

38+
flume = "0.11.0"
3839
cfg-if = "1.0.0"
3940
criterion = "0.5.1"
4041
crossbeam-channel = "0.5.8"

compio-dispatcher/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ repository = { workspace = true }
1313
[dependencies]
1414
# Workspace dependencies
1515
compio-driver = { workspace = true }
16-
compio-runtime = { workspace = true, features = ["event"] }
16+
compio-runtime = { workspace = true, features = ["event", "time"] }
1717

18-
crossbeam-channel = { workspace = true }
18+
flume = { workspace = true }
1919
futures-util = { workspace = true }
2020

2121
[dev-dependencies]

compio-dispatcher/src/lib.rs

Lines changed: 118 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,23 @@
33
#![warn(missing_docs)]
44

55
use std::{
6+
any::Any,
67
future::Future,
78
io,
89
num::NonZeroUsize,
9-
panic::{resume_unwind, UnwindSafe},
10+
panic::resume_unwind,
11+
pin::Pin,
1012
sync::{Arc, Mutex},
1113
thread::{available_parallelism, JoinHandle},
1214
};
1315

1416
use compio_driver::{AsyncifyPool, ProactorBuilder};
15-
use compio_runtime::{
16-
event::{Event, EventHandle},
17-
Runtime,
18-
};
19-
use crossbeam_channel::{unbounded, Sender};
20-
use futures_util::{future::LocalBoxFuture, FutureExt};
17+
use compio_runtime::{event::Event, Runtime};
18+
use flume::{unbounded, SendError, Sender};
2119

2220
/// The dispatcher. It manages the threads and dispatches the tasks.
2321
pub struct Dispatcher {
24-
sender: Sender<DispatcherClosure>,
22+
sender: Sender<Box<Closure>>,
2523
threads: Vec<JoinHandle<()>>,
2624
pool: AsyncifyPool,
2725
}
@@ -32,13 +30,12 @@ impl Dispatcher {
3230
let mut proactor_builder = builder.proactor_builder;
3331
proactor_builder.force_reuse_thread_pool();
3432
let pool = proactor_builder.create_or_get_thread_pool();
33+
let (sender, receiver) = unbounded::<Box<Closure>>();
3534

36-
let (sender, receiver) = unbounded::<DispatcherClosure>();
3735
let threads = (0..builder.nthreads)
3836
.map({
3937
|index| {
4038
let proactor_builder = proactor_builder.clone();
41-
4239
let receiver = receiver.clone();
4340

4441
let thread_builder = std::thread::Builder::new();
@@ -54,17 +51,21 @@ impl Dispatcher {
5451
};
5552

5653
thread_builder.spawn(move || {
57-
let runtime = Runtime::builder()
54+
Runtime::builder()
5855
.with_proactor(proactor_builder)
5956
.build()
60-
.expect("cannot create compio runtime");
61-
let _guard = runtime.enter();
62-
while let Ok(f) = receiver.recv() {
63-
*f.result.lock().unwrap() = Some(std::panic::catch_unwind(|| {
64-
Runtime::current().block_on((f.func)());
65-
}));
66-
f.handle.notify();
67-
}
57+
.expect("cannot create compio runtime")
58+
.block_on(async move {
59+
let rt = Runtime::current();
60+
while let Ok(f) = receiver.recv_async().await {
61+
let fut = (f)();
62+
if builder.concurrent {
63+
rt.spawn(fut).detach()
64+
} else {
65+
fut.await
66+
}
67+
}
68+
})
6869
})
6970
}
7071
})
@@ -86,30 +87,75 @@ impl Dispatcher {
8687
DispatcherBuilder::default()
8788
}
8889

89-
/// Dispatch a task to the threads.
90+
fn prepare<Fut, Fn, R>(&self, f: Fn) -> (Executing<R>, Box<Closure>)
91+
where
92+
Fn: (FnOnce() -> Fut) + Send + 'static,
93+
Fut: Future<Output = R> + 'static,
94+
R: Any + Send + 'static,
95+
{
96+
let event = Event::new();
97+
let handle = event.handle();
98+
let res = Arc::new(Mutex::new(None));
99+
let dispatched = Executing {
100+
event,
101+
result: res.clone(),
102+
};
103+
let closure = Box::new(|| {
104+
Box::pin(async move {
105+
*res.lock().unwrap() = Some(f().await);
106+
handle.notify();
107+
}) as BoxFuture<()>
108+
});
109+
(dispatched, closure)
110+
}
111+
112+
/// Spawn a boxed closure to the threads.
113+
///
114+
/// If all threads have panicked, this method will return an error with the
115+
/// sent closure.
116+
pub fn spawn(&self, closure: Box<Closure>) -> Result<(), SendError<Box<Closure>>> {
117+
self.sender.send(closure)
118+
}
119+
120+
/// Dispatch a task to the threads
90121
///
91122
/// The provided `f` should be [`Send`] because it will be send to another
92123
/// thread before calling. The return [`Future`] need not to be [`Send`]
93124
/// because it will be executed on only one thread.
94-
pub fn dispatch<
95-
F: Future<Output = ()> + 'static,
96-
Fn: (FnOnce() -> F) + Send + UnwindSafe + 'static,
97-
>(
98-
&self,
99-
f: Fn,
100-
) -> io::Result<DispatcherJoinHandle> {
101-
let event = Event::new();
102-
let handle = event.handle();
103-
let join_handle = DispatcherJoinHandle::new(event);
104-
let closure = DispatcherClosure {
105-
handle,
106-
result: join_handle.result.clone(),
107-
func: Box::new(|| f().boxed_local()),
108-
};
109-
self.sender
110-
.send(closure)
111-
.expect("the channel should not be disconnected");
112-
Ok(join_handle)
125+
///
126+
/// # Error
127+
///
128+
/// If all threads have panicked, this method will return an error with the
129+
/// sent closure. Notice that the returned closure is not the same as the
130+
/// argument and cannot be simply transmuted back to `Fn`.
131+
pub fn dispatch<Fut, Fn>(&self, f: Fn) -> Result<(), SendError<Box<Closure>>>
132+
where
133+
Fn: (FnOnce() -> Fut) + Send + 'static,
134+
Fut: Future<Output = ()> + 'static,
135+
{
136+
self.spawn(Box::new(|| Box::pin(f()) as BoxFuture<()>))
137+
}
138+
139+
/// Execute a task on the threads and retrieve its returned value.
140+
///
141+
/// The provided `f` should be [`Send`] because it will be send to another
142+
/// thread before calling. The return [`Future`] need not to be [`Send`]
143+
/// because it will be executed on only one thread.
144+
///
145+
/// # Error
146+
///
147+
/// If all threads have panicked, this method will return an error with the
148+
/// sent closure. Notice that the returned closure is not the same as the
149+
/// argument and cannot be simply transmuted back to `Fn`.
150+
pub fn execute<Fut, Fn, R>(&self, f: Fn) -> Result<Executing<R>, SendError<Box<Closure>>>
151+
where
152+
Fn: (FnOnce() -> Fut) + Send + 'static,
153+
Fut: Future<Output = R> + 'static,
154+
R: Any + Send + 'static,
155+
{
156+
let (dispatched, closure) = self.prepare(f);
157+
self.spawn(closure)?;
158+
Ok(dispatched)
113159
}
114160

115161
/// Stop the dispatcher and wait for the threads to complete. If there is a
@@ -135,7 +181,6 @@ impl Dispatcher {
135181
event.wait().await;
136182
let mut guard = results.lock().unwrap();
137183
for res in std::mem::take::<Vec<std::thread::Result<()>>>(guard.as_mut()) {
138-
// The thread should not panic.
139184
res.unwrap_or_else(|e| resume_unwind(e));
140185
}
141186
Ok(())
@@ -145,6 +190,7 @@ impl Dispatcher {
145190
/// A builder for [`Dispatcher`].
146191
pub struct DispatcherBuilder {
147192
nthreads: usize,
193+
concurrent: bool,
148194
stack_size: Option<usize>,
149195
names: Option<Box<dyn FnMut(usize) -> String>>,
150196
proactor_builder: ProactorBuilder,
@@ -155,12 +201,22 @@ impl DispatcherBuilder {
155201
pub fn new() -> Self {
156202
Self {
157203
nthreads: available_parallelism().map(|n| n.get()).unwrap_or(1),
204+
concurrent: true,
158205
stack_size: None,
159206
names: None,
160207
proactor_builder: ProactorBuilder::new(),
161208
}
162209
}
163210

211+
/// If execute tasks concurrently. Default to be `true`.
212+
///
213+
/// When set to `false`, tasks are executed sequentially without any
214+
/// concurrency within the thread.
215+
pub fn concurrent(mut self, concurrent: bool) -> Self {
216+
self.concurrent = concurrent;
217+
self
218+
}
219+
164220
/// Set the number of worker threads of the dispatcher. The default value is
165221
/// the CPU number. If the CPU number could not be retrieved, the
166222
/// default value is 1.
@@ -199,36 +255,36 @@ impl Default for DispatcherBuilder {
199255
}
200256
}
201257

202-
type Closure<'a> = dyn (FnOnce() -> LocalBoxFuture<'a, ()>) + Send + UnwindSafe;
258+
type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>;
259+
type Closure = dyn (FnOnce() -> BoxFuture<()>) + Send;
203260

204-
struct DispatcherClosure {
205-
handle: EventHandle,
206-
result: Arc<Mutex<Option<std::thread::Result<()>>>>,
207-
func: Box<Closure<'static>>,
208-
}
209-
210-
/// The join handle for dispatched task.
211-
pub struct DispatcherJoinHandle {
261+
/// The join handle for an executing task. It can be used to wait for the
262+
/// task's returned value.
263+
pub struct Executing<R> {
212264
event: Event,
213-
result: Arc<Mutex<Option<std::thread::Result<()>>>>,
265+
result: Arc<Mutex<Option<R>>>,
214266
}
215267

216-
impl DispatcherJoinHandle {
217-
pub(crate) fn new(event: Event) -> Self {
218-
Self {
219-
event,
220-
result: Arc::new(Mutex::new(None)),
268+
impl<R: 'static> Executing<R> {
269+
fn take(val: &Mutex<Option<R>>) -> R {
270+
val.lock()
271+
.unwrap()
272+
.take()
273+
.expect("the result should be set")
274+
}
275+
276+
/// Try to wait for the task to complete without blocking.
277+
pub fn try_join(self) -> Result<R, Self> {
278+
if self.event.notified() {
279+
Ok(Self::take(&self.result))
280+
} else {
281+
Err(self)
221282
}
222283
}
223284

224285
/// Wait for the task to complete.
225-
pub async fn join(self) -> io::Result<std::thread::Result<()>> {
286+
pub async fn join(self) -> R {
226287
self.event.wait().await;
227-
Ok(self
228-
.result
229-
.lock()
230-
.unwrap()
231-
.take()
232-
.expect("the result should be set"))
288+
Self::take(&self.result)
233289
}
234290
}

compio-dispatcher/tests/listener.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{num::NonZeroUsize, panic::resume_unwind};
1+
use std::num::NonZeroUsize;
22

33
use compio_buf::arrayvec::ArrayVec;
44
use compio_dispatcher::Dispatcher;
@@ -29,16 +29,14 @@ async fn listener_dispatch() {
2929
for _i in 0..CLIENT_NUM {
3030
let (mut srv, _) = listener.accept().await.unwrap();
3131
let handle = dispatcher
32-
.dispatch(move || async move {
32+
.execute(move || async move {
3333
let (_, buf) = srv.read_exact(ArrayVec::<u8, 12>::new()).await.unwrap();
3434
assert_eq!(buf.as_slice(), b"Hello world!");
3535
})
3636
.unwrap();
3737
handles.push(handle.join());
3838
}
39-
while let Some(res) = handles.next().await {
40-
res.unwrap().unwrap_or_else(|e| resume_unwind(e));
41-
}
39+
while handles.next().await.is_some() {}
4240
let (_, results) = futures_util::join!(task, dispatcher.join());
4341
results.unwrap();
4442
}

compio/examples/dispatcher.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{num::NonZeroUsize, panic::resume_unwind};
1+
use std::num::NonZeroUsize;
22

33
use compio::{
44
dispatcher::Dispatcher,
@@ -35,16 +35,14 @@ async fn main() {
3535
for _i in 0..CLIENT_NUM {
3636
let (mut srv, _) = listener.accept().await.unwrap();
3737
let handle = dispatcher
38-
.dispatch(move || async move {
38+
.execute(move || async move {
3939
let BufResult(res, buf) = srv.read(Vec::with_capacity(20)).await;
4040
res.unwrap();
4141
println!("{}", std::str::from_utf8(&buf).unwrap());
4242
})
4343
.unwrap();
4444
handles.push(handle.join());
4545
}
46-
while let Some(res) = handles.next().await {
47-
res.unwrap().unwrap_or_else(|e| resume_unwind(e));
48-
}
46+
while handles.next().await.is_some() {}
4947
dispatcher.join().await.unwrap();
5048
}

0 commit comments

Comments
 (0)