Skip to content
This repository was archived by the owner on Oct 30, 2019. It is now read-only.

Remove unsafe pin-projections #102

Merged
merged 1 commit into from
Sep 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ futures-preview = "0.3.0-alpha.18"
runtime-attributes = { path = "runtime-attributes", version = "0.3.0-alpha.6", default-features = false }
runtime-raw = { path = "runtime-raw", version = "0.3.0-alpha.5" }
runtime-native = { path = "runtime-native", version = "0.3.0-alpha.6", optional = true }
pin-project = "0.4.0-alpha.11"

[dev-dependencies]
failure = "0.1.5"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ Getting things right takes time. But if you'd like to move the state of async fo
you to get involved!

## Safety
This crate uses `unsafe` in a few places to construct pin projections.
This crate uses ``#![deny(unsafe_code)]`` to ensure everything is implemented in 100% Safe Rust.

## Contributing
Want to join us? Check out our [The "Contributing" section of the
Expand Down
1 change: 1 addition & 0 deletions runtime-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! [Runtime](https://github.com/rustasync/runtime). See the [Runtime
//! documentation](https://docs.rs/runtime) for more details.

#![deny(unsafe_code)]
#![warn(
missing_debug_implementations,
missing_docs,
Expand Down
2 changes: 1 addition & 1 deletion runtime-tokio/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl runtime_raw::TcpListener for TcpListener {
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>> {
let listener = unsafe { &mut self.get_unchecked_mut().tokio_listener };
let listener = &mut self.get_mut().tokio_listener;
match listener.poll_accept()? {
futures01::Async::Ready((tokio_stream, _)) => {
let stream = Box::pin(TcpStream { tokio_stream });
Expand Down
4 changes: 2 additions & 2 deletions runtime-tokio/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl runtime_raw::UdpSocket for UdpSocket {
buf: &[u8],
receiver: &SocketAddr,
) -> Poll<io::Result<usize>> {
let socket = unsafe { &mut self.get_unchecked_mut().tokio_socket };
let socket = &mut self.get_mut().tokio_socket;
match socket.poll_send_to(&buf, &receiver)? {
futures01::Async::Ready(size) => Poll::Ready(Ok(size)),
futures01::Async::NotReady => Poll::Pending,
Expand All @@ -33,7 +33,7 @@ impl runtime_raw::UdpSocket for UdpSocket {
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<(usize, SocketAddr)>> {
let socket = unsafe { &mut self.get_unchecked_mut().tokio_socket };
let socket = &mut self.get_mut().tokio_socket;
match socket.poll_recv_from(buf)? {
futures01::Async::Ready((size, addr)) => Poll::Ready(Ok((size, addr))),
futures01::Async::NotReady => Poll::Pending,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
//! - [Runtime Tokio](https://docs.rs/runtime-tokio) provides a thread pool, bindings to the OS, and
//! a work-stealing scheduler.

#![deny(unsafe_code)]
#![warn(
missing_debug_implementations,
missing_docs,
Expand Down
68 changes: 26 additions & 42 deletions src/time/ext.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Extensions for Futures types.

use pin_project::pin_project;
use std::future::Future;
use std::io;
use std::pin::Pin;
Expand All @@ -13,31 +14,26 @@ use super::Delay;
/// A future returned by methods in the [`FutureExt`] trait.
///
/// [`FutureExt.timeout`]: trait.FutureExt.html
#[pin_project]
#[derive(Debug)]
pub struct Timeout<F: Future> {
#[pin]
future: F,
#[pin]
delay: Delay,
}

impl<F: Future> Future for Timeout<F> {
type Output = Result<F::Output, io::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// This pinning projection is safe because:
// 1. `Timeout` is only Unpin when `F` is Unpin. (Ok for default auto impl)
// 2. `drop` never moves out of `F`. (No manual `Drop` impl and no `#[repr(packed)]`)
// 3. `drop` on `F` must be called before overwritten or deallocated. (No manual `Drop` impl)
// 4. No other operation provided for moving out `F`. (Ok)
let (future, delay) = unsafe {
let Timeout { future, delay } = self.get_unchecked_mut();
(Pin::new_unchecked(future), Pin::new(delay))
};
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

if let Poll::Ready(t) = future.poll(cx) {
if let Poll::Ready(t) = this.future.poll(cx) {
return Poll::Ready(Ok(t));
}

delay
this.delay
.poll(cx)
.map(|_| Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out")))
}
Expand Down Expand Up @@ -139,35 +135,29 @@ impl<T: Future> FutureExt for T {}
/// A stream returned by methods in the [`StreamExt`] trait.
///
/// [`StreamExt`]: trait.StreamExt.html
#[pin_project]
#[derive(Debug)]
pub struct TimeoutStream<S: Stream> {
#[pin]
timeout: Delay,
dur: Duration,
#[pin]
stream: S,
}

impl<S: Stream> Stream for TimeoutStream<S> {
type Item = Result<S::Item, io::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// This pinning projection is safe.
// See detail in `Timeout::poll`.
let (mut timeout, dur, stream) = unsafe {
let TimeoutStream {
timeout,
dur,
stream,
} = self.get_unchecked_mut();
(Pin::new(timeout), Pin::new(dur), Pin::new_unchecked(stream))
};
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

if let Poll::Ready(s) = stream.poll_next(cx) {
timeout.set(Delay::new(*dur));
if let Poll::Ready(s) = this.stream.as_mut().poll_next(cx) {
this.timeout.set(Delay::new(*this.dur));
return Poll::Ready(Ok(s).transpose());
}

Pin::new(&mut *timeout).poll(cx).map(|_| {
timeout.set(Delay::new(*dur));
this.timeout.as_mut().poll(cx).map(|_| {
this.timeout.set(Delay::new(*this.dur));
Some(Err(io::Error::new(
io::ErrorKind::TimedOut,
"future timed out",
Expand Down Expand Up @@ -223,37 +213,31 @@ impl<S: Stream> StreamExt for S {}
/// A stream returned by methods in the [`StreamExt`] trait.
///
/// [`StreamExt`]: trait.StreamExt.html
#[pin_project]
#[derive(Debug)]
pub struct TimeoutAsyncRead<S: AsyncRead> {
#[pin]
timeout: Delay,
dur: Duration,
#[pin]
stream: S,
}

impl<S: AsyncRead> AsyncRead for TimeoutAsyncRead<S> {
fn poll_read(
self: Pin<&mut Self>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
// This pinning projection is safe.
// See detail in `Timeout::poll`.
let (mut timeout, dur, stream) = unsafe {
let TimeoutAsyncRead {
timeout,
dur,
stream,
} = self.get_unchecked_mut();
(Pin::new(timeout), Pin::new(dur), Pin::new_unchecked(stream))
};
let mut this = self.project();

if let Poll::Ready(s) = stream.poll_read(cx, buf) {
timeout.set(Delay::new(*dur));
if let Poll::Ready(s) = this.stream.as_mut().poll_read(cx, buf) {
this.timeout.set(Delay::new(*this.dur));
return Poll::Ready(s);
}

Pin::new(&mut *timeout).poll(cx).map(|_| {
timeout.set(Delay::new(*dur));
this.timeout.as_mut().poll(cx).map(|_| {
this.timeout.set(Delay::new(*this.dur));
Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out"))
})
}
Expand Down