Skip to content

Commit d01e8e6

Browse files
authored
Merge pull request #196 from Berrysoft/atomic-waker
2 parents 4c38ff5 + e433505 commit d01e8e6

File tree

11 files changed

+161
-289
lines changed

11 files changed

+161
-289
lines changed

compio-dispatcher/src/lib.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ impl Dispatcher {
6363
*f.result.lock().unwrap() = Some(std::panic::catch_unwind(|| {
6464
Runtime::current().block_on((f.func)());
6565
}));
66-
f.handle.notify().ok();
66+
f.handle.notify();
6767
}
6868
})
6969
}
@@ -98,8 +98,8 @@ impl Dispatcher {
9898
&self,
9999
f: Fn,
100100
) -> io::Result<DispatcherJoinHandle> {
101-
let event = Event::new()?;
102-
let handle = event.handle()?;
101+
let event = Event::new();
102+
let handle = event.handle();
103103
let join_handle = DispatcherJoinHandle::new(event);
104104
let closure = DispatcherClosure {
105105
handle,
@@ -117,8 +117,8 @@ impl Dispatcher {
117117
pub async fn join(self) -> io::Result<()> {
118118
drop(self.sender);
119119
let results = Arc::new(Mutex::new(vec![]));
120-
let event = Event::new()?;
121-
let handle = event.handle()?;
120+
let event = Event::new();
121+
let handle = event.handle();
122122
if let Err(f) = self.pool.dispatch({
123123
let results = results.clone();
124124
move || {
@@ -127,12 +127,12 @@ impl Dispatcher {
127127
.into_iter()
128128
.map(|thread| thread.join())
129129
.collect();
130-
handle.notify().ok();
130+
handle.notify();
131131
}
132132
}) {
133133
std::thread::spawn(f);
134134
}
135-
event.wait().await?;
135+
event.wait().await;
136136
let mut guard = results.lock().unwrap();
137137
for res in std::mem::take::<Vec<std::thread::Result<()>>>(guard.as_mut()) {
138138
// The thread should not panic.
@@ -223,7 +223,7 @@ impl DispatcherJoinHandle {
223223

224224
/// Wait for the task to complete.
225225
pub async fn join(self) -> io::Result<std::thread::Result<()>> {
226-
self.event.wait().await?;
226+
self.event.wait().await;
227227
Ok(self
228228
.result
229229
.lock()

compio-net/src/resolve/windows.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl AsyncResolver {
4747
let overlapped_ptr = lpoverlapped.cast::<GAIOverlapped>().cast_mut();
4848
if let Some(overlapped) = overlapped_ptr.as_mut() {
4949
if let Some(handle) = overlapped.handle.take() {
50-
handle.notify().ok();
50+
handle.notify();
5151
}
5252
}
5353
}
@@ -151,14 +151,14 @@ pub async fn resolve_sock_addrs(
151151
hints.ai_socktype = SOCK_STREAM;
152152
hints.ai_protocol = IPPROTO_TCP;
153153

154-
let event = Event::new()?;
155-
let handle = event.handle()?;
154+
let event = Event::new();
155+
let handle = event.handle();
156156
match unsafe { resolver.call(&hints, handle) } {
157157
Poll::Ready(res) => {
158158
res?;
159159
}
160160
Poll::Pending => {
161-
event.wait().await?;
161+
event.wait().await;
162162
}
163163
}
164164

compio-runtime/src/event/eventfd.rs

Lines changed: 0 additions & 68 deletions
This file was deleted.

compio-runtime/src/event/iocp.rs

Lines changed: 0 additions & 75 deletions
This file was deleted.

compio-runtime/src/event/mod.rs

Lines changed: 108 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,111 @@
11
//! Asynchronous events.
2-
//!
3-
//! Only for waking up the driver.
4-
5-
cfg_if::cfg_if! {
6-
if #[cfg(windows)] {
7-
mod iocp;
8-
pub use iocp::*;
9-
} else if #[cfg(any(
10-
target_os = "android",
11-
target_os = "freebsd",
12-
target_os = "illumos",
13-
target_os = "linux",
14-
))] {
15-
mod eventfd;
16-
pub use eventfd::*;
17-
} else if #[cfg(unix)] {
18-
mod pipe;
19-
pub use pipe::*;
2+
3+
use std::{
4+
pin::Pin,
5+
sync::{
6+
atomic::{AtomicBool, Ordering},
7+
Arc,
8+
},
9+
task::{Context, Poll},
10+
};
11+
12+
use futures_util::{task::AtomicWaker, Future};
13+
14+
#[derive(Debug)]
15+
struct Inner {
16+
waker: AtomicWaker,
17+
set: AtomicBool,
18+
}
19+
20+
#[derive(Debug, Clone)]
21+
struct Flag(Arc<Inner>);
22+
23+
impl Flag {
24+
pub fn new() -> Self {
25+
Self(Arc::new(Inner {
26+
waker: AtomicWaker::new(),
27+
set: AtomicBool::new(false),
28+
}))
29+
}
30+
31+
pub fn notify(&self) {
32+
self.0.set.store(true, Ordering::Relaxed);
33+
self.0.waker.wake();
34+
}
35+
36+
pub fn notified(&self) -> bool {
37+
self.0.set.load(Ordering::Relaxed)
38+
}
39+
}
40+
41+
impl Future for Flag {
42+
type Output = ();
43+
44+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
45+
// quick check to avoid registration if already done.
46+
if self.0.set.load(Ordering::Relaxed) {
47+
return Poll::Ready(());
48+
}
49+
50+
self.0.waker.register(cx.waker());
51+
52+
// Need to check condition **after** `register` to avoid a race
53+
// condition that would result in lost notifications.
54+
if self.0.set.load(Ordering::Relaxed) {
55+
Poll::Ready(())
56+
} else {
57+
Poll::Pending
58+
}
59+
}
60+
}
61+
62+
/// An event that won't wake until [`EventHandle::notify`] is called
63+
/// successfully.
64+
#[derive(Debug)]
65+
pub struct Event {
66+
flag: Flag,
67+
}
68+
69+
impl Default for Event {
70+
fn default() -> Self {
71+
Self::new()
72+
}
73+
}
74+
75+
impl Event {
76+
/// Create [`Event`].
77+
pub fn new() -> Self {
78+
Self { flag: Flag::new() }
79+
}
80+
81+
/// Get a notify handle.
82+
pub fn handle(&self) -> EventHandle {
83+
EventHandle::new(self.flag.clone())
84+
}
85+
86+
/// Get if the event has been notified.
87+
pub fn notified(&self) -> bool {
88+
self.flag.notified()
89+
}
90+
91+
/// Wait for [`EventHandle::notify`] called.
92+
pub async fn wait(self) {
93+
self.flag.await
94+
}
95+
}
96+
97+
/// A wake up handle to [`Event`].
98+
pub struct EventHandle {
99+
flag: Flag,
100+
}
101+
102+
impl EventHandle {
103+
fn new(flag: Flag) -> Self {
104+
Self { flag }
105+
}
106+
107+
/// Notify the event.
108+
pub fn notify(self) {
109+
self.flag.notify()
20110
}
21111
}

0 commit comments

Comments
 (0)