Skip to content

Commit 0c12744

Browse files
Sqlx-core: code for async-global-executor
1 parent 31553e7 commit 0c12744

File tree

5 files changed

+178
-0
lines changed

5 files changed

+178
-0
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
use std::{
2+
future::Future,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
use async_global_executor::Task;
8+
9+
pub struct JoinHandle<T> {
10+
pub task: Option<Task<T>>,
11+
}
12+
13+
impl<T> Drop for JoinHandle<T> {
14+
fn drop(&mut self) {
15+
if let Some(task) = self.task.take() {
16+
task.detach();
17+
}
18+
}
19+
}
20+
21+
impl<T> Future for JoinHandle<T> {
22+
type Output = T;
23+
24+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
25+
match self.task.as_mut() {
26+
Some(task) => Future::poll(Pin::new(task), cx),
27+
None => unreachable!("JoinHandle polled after dropping"),
28+
}
29+
}
30+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
mod join_handle;
2+
pub use join_handle::*;
3+
4+
mod socket;
5+
6+
mod timeout;
7+
pub use timeout::*;
8+
9+
pub mod yield_now;
10+
pub use yield_now::*;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use crate::net::Socket;
2+
3+
use std::io;
4+
use std::io::{Read, Write};
5+
use std::net::{Shutdown, TcpStream};
6+
7+
use std::task::{Context, Poll};
8+
9+
use crate::io::ReadBuf;
10+
use async_io_global_executor::Async;
11+
12+
impl Socket for Async<TcpStream> {
13+
fn try_read(&mut self, buf: &mut dyn ReadBuf) -> io::Result<usize> {
14+
unsafe { self.get_mut().read(buf.init_mut()) }
15+
}
16+
17+
fn try_write(&mut self, buf: &[u8]) -> io::Result<usize> {
18+
unsafe { self.get_mut().write(buf) }
19+
}
20+
21+
fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
22+
self.poll_readable(cx)
23+
}
24+
25+
fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
26+
self.poll_writable(cx)
27+
}
28+
29+
fn poll_shutdown(&mut self, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
30+
unsafe { Poll::Ready(self.get_mut().shutdown(Shutdown::Both)) }
31+
}
32+
}
33+
34+
#[cfg(unix)]
35+
impl Socket for Async<std::os::unix::net::UnixStream> {
36+
fn try_read(&mut self, buf: &mut dyn ReadBuf) -> io::Result<usize> {
37+
unsafe { self.get_mut().read(buf.init_mut()) }
38+
}
39+
40+
fn try_write(&mut self, buf: &[u8]) -> io::Result<usize> {
41+
unsafe { self.get_mut().write(buf) }
42+
}
43+
44+
fn poll_read_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
45+
self.poll_readable(cx)
46+
}
47+
48+
fn poll_write_ready(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
49+
self.poll_writable(cx)
50+
}
51+
52+
fn poll_shutdown(&mut self, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
53+
unsafe { Poll::Ready(self.get_mut().shutdown(Shutdown::Both)) }
54+
}
55+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use std::{
2+
future::Future,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
time::Duration,
6+
};
7+
8+
use crate::rt::TimeoutError;
9+
10+
pub async fn sleep(duration: Duration) {
11+
timeout_future(duration).await;
12+
}
13+
14+
pub fn timeout<F: Future>(
15+
duration: Duration,
16+
future: F,
17+
) -> impl Future<Output = Result<F::Output, TimeoutError>> {
18+
TimeoutFuture::new(future, timeout_future(duration))
19+
}
20+
21+
fn timeout_future(duration: Duration) -> impl Future {
22+
async_io_global_executor::Timer::after(duration)
23+
}
24+
25+
pub struct TimeoutFuture<F, D> {
26+
future: F,
27+
delay: D,
28+
}
29+
30+
impl<F, D> TimeoutFuture<F, D> {
31+
fn new(future: F, delay: D) -> TimeoutFuture<F, D> {
32+
TimeoutFuture { future, delay }
33+
}
34+
}
35+
36+
impl<F: Future, D: Future> Future for TimeoutFuture<F, D> {
37+
type Output = Result<F::Output, TimeoutError>;
38+
39+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40+
let future_polled = {
41+
let future = unsafe { self.as_mut().map_unchecked_mut(|s| &mut s.future) }; // safe, as self is Pin
42+
future.poll(cx)
43+
};
44+
match future_polled {
45+
Poll::Ready(v) => Poll::Ready(Ok(v)),
46+
Poll::Pending => {
47+
let delay = unsafe { self.map_unchecked_mut(|s| &mut s.delay) }; // safe, as self is Pin
48+
match delay.poll(cx) {
49+
Poll::Ready(_) => Poll::Ready(Err(TimeoutError)),
50+
Poll::Pending => Poll::Pending,
51+
}
52+
}
53+
}
54+
}
55+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use std::{
2+
future::Future,
3+
pin::Pin,
4+
task::{Context, Poll},
5+
};
6+
7+
pub fn yield_now() -> impl Future<Output = ()> {
8+
YieldNow(false)
9+
}
10+
11+
struct YieldNow(bool);
12+
13+
impl Future for YieldNow {
14+
type Output = ();
15+
16+
// The futures executor is implemented as a FIFO queue, so all this future
17+
// does is re-schedule the future back to the end of the queue, giving room
18+
// for other futures to progress.
19+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
20+
if !self.0 {
21+
self.0 = true;
22+
cx.waker().wake_by_ref();
23+
Poll::Pending
24+
} else {
25+
Poll::Ready(())
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)