From b351408e55643a48d43d4272784a3b8cb05f2d6b Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Mon, 16 Sep 2019 05:11:50 +0900 Subject: [PATCH] Remove unsafe pin-projections --- Cargo.toml | 1 + README.md | 2 +- runtime-tokio/src/lib.rs | 1 + runtime-tokio/src/tcp.rs | 2 +- runtime-tokio/src/udp.rs | 4 +-- src/lib.rs | 1 + src/time/ext.rs | 68 +++++++++++++++------------------------- 7 files changed, 33 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 78aefa31..7ee3b72b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ futures-preview = "0.3.0-alpha.18" runtime-attributes = { path = "runtime-attributes", version = "0.3.0-alpha.6", default-features = false } runtime-raw = { path = "runtime-raw", version = "0.3.0-alpha.5" } runtime-native = { path = "runtime-native", version = "0.3.0-alpha.6", optional = true } +pin-project = "0.4.0-alpha.11" [dev-dependencies] failure = "0.1.5" diff --git a/README.md b/README.md index 0d81c0a1..68a1985a 100644 --- a/README.md +++ b/README.md @@ -213,7 +213,7 @@ Getting things right takes time. But if you'd like to move the state of async fo you to get involved! ## Safety -This crate uses `unsafe` in a few places to construct pin projections. +This crate uses ``#![deny(unsafe_code)]`` to ensure everything is implemented in 100% Safe Rust. ## Contributing Want to join us? Check out our [The "Contributing" section of the diff --git a/runtime-tokio/src/lib.rs b/runtime-tokio/src/lib.rs index 0b996719..be3a9680 100644 --- a/runtime-tokio/src/lib.rs +++ b/runtime-tokio/src/lib.rs @@ -2,6 +2,7 @@ //! [Runtime](https://github.com/rustasync/runtime). See the [Runtime //! documentation](https://docs.rs/runtime) for more details. +#![deny(unsafe_code)] #![warn( missing_debug_implementations, missing_docs, diff --git a/runtime-tokio/src/tcp.rs b/runtime-tokio/src/tcp.rs index 4a34b1f9..a049a6ed 100644 --- a/runtime-tokio/src/tcp.rs +++ b/runtime-tokio/src/tcp.rs @@ -98,7 +98,7 @@ impl runtime_raw::TcpListener for TcpListener { self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>>> { - let listener = unsafe { &mut self.get_unchecked_mut().tokio_listener }; + let listener = &mut self.get_mut().tokio_listener; match listener.poll_accept()? { futures01::Async::Ready((tokio_stream, _)) => { let stream = Box::pin(TcpStream { tokio_stream }); diff --git a/runtime-tokio/src/udp.rs b/runtime-tokio/src/udp.rs index 69fd1c9c..595821a2 100644 --- a/runtime-tokio/src/udp.rs +++ b/runtime-tokio/src/udp.rs @@ -21,7 +21,7 @@ impl runtime_raw::UdpSocket for UdpSocket { buf: &[u8], receiver: &SocketAddr, ) -> Poll> { - let socket = unsafe { &mut self.get_unchecked_mut().tokio_socket }; + let socket = &mut self.get_mut().tokio_socket; match socket.poll_send_to(&buf, &receiver)? { futures01::Async::Ready(size) => Poll::Ready(Ok(size)), futures01::Async::NotReady => Poll::Pending, @@ -33,7 +33,7 @@ impl runtime_raw::UdpSocket for UdpSocket { _cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - let socket = unsafe { &mut self.get_unchecked_mut().tokio_socket }; + let socket = &mut self.get_mut().tokio_socket; match socket.poll_recv_from(buf)? { futures01::Async::Ready((size, addr)) => Poll::Ready(Ok((size, addr))), futures01::Async::NotReady => Poll::Pending, diff --git a/src/lib.rs b/src/lib.rs index 176223a4..36193499 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,6 +83,7 @@ //! - [Runtime Tokio](https://docs.rs/runtime-tokio) provides a thread pool, bindings to the OS, and //! a work-stealing scheduler. +#![deny(unsafe_code)] #![warn( missing_debug_implementations, missing_docs, diff --git a/src/time/ext.rs b/src/time/ext.rs index 1a9796b0..b32ce0e3 100644 --- a/src/time/ext.rs +++ b/src/time/ext.rs @@ -1,5 +1,6 @@ //! Extensions for Futures types. +use pin_project::pin_project; use std::future::Future; use std::io; use std::pin::Pin; @@ -13,31 +14,26 @@ use super::Delay; /// A future returned by methods in the [`FutureExt`] trait. /// /// [`FutureExt.timeout`]: trait.FutureExt.html +#[pin_project] #[derive(Debug)] pub struct Timeout { + #[pin] future: F, + #[pin] delay: Delay, } impl Future for Timeout { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // This pinning projection is safe because: - // 1. `Timeout` is only Unpin when `F` is Unpin. (Ok for default auto impl) - // 2. `drop` never moves out of `F`. (No manual `Drop` impl and no `#[repr(packed)]`) - // 3. `drop` on `F` must be called before overwritten or deallocated. (No manual `Drop` impl) - // 4. No other operation provided for moving out `F`. (Ok) - let (future, delay) = unsafe { - let Timeout { future, delay } = self.get_unchecked_mut(); - (Pin::new_unchecked(future), Pin::new(delay)) - }; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); - if let Poll::Ready(t) = future.poll(cx) { + if let Poll::Ready(t) = this.future.poll(cx) { return Poll::Ready(Ok(t)); } - delay + this.delay .poll(cx) .map(|_| Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out"))) } @@ -139,35 +135,29 @@ impl FutureExt for T {} /// A stream returned by methods in the [`StreamExt`] trait. /// /// [`StreamExt`]: trait.StreamExt.html +#[pin_project] #[derive(Debug)] pub struct TimeoutStream { + #[pin] timeout: Delay, dur: Duration, + #[pin] stream: S, } impl Stream for TimeoutStream { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // This pinning projection is safe. - // See detail in `Timeout::poll`. - let (mut timeout, dur, stream) = unsafe { - let TimeoutStream { - timeout, - dur, - stream, - } = self.get_unchecked_mut(); - (Pin::new(timeout), Pin::new(dur), Pin::new_unchecked(stream)) - }; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); - if let Poll::Ready(s) = stream.poll_next(cx) { - timeout.set(Delay::new(*dur)); + if let Poll::Ready(s) = this.stream.as_mut().poll_next(cx) { + this.timeout.set(Delay::new(*this.dur)); return Poll::Ready(Ok(s).transpose()); } - Pin::new(&mut *timeout).poll(cx).map(|_| { - timeout.set(Delay::new(*dur)); + this.timeout.as_mut().poll(cx).map(|_| { + this.timeout.set(Delay::new(*this.dur)); Some(Err(io::Error::new( io::ErrorKind::TimedOut, "future timed out", @@ -223,37 +213,31 @@ impl StreamExt for S {} /// A stream returned by methods in the [`StreamExt`] trait. /// /// [`StreamExt`]: trait.StreamExt.html +#[pin_project] #[derive(Debug)] pub struct TimeoutAsyncRead { + #[pin] timeout: Delay, dur: Duration, + #[pin] stream: S, } impl AsyncRead for TimeoutAsyncRead { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - // This pinning projection is safe. - // See detail in `Timeout::poll`. - let (mut timeout, dur, stream) = unsafe { - let TimeoutAsyncRead { - timeout, - dur, - stream, - } = self.get_unchecked_mut(); - (Pin::new(timeout), Pin::new(dur), Pin::new_unchecked(stream)) - }; + let mut this = self.project(); - if let Poll::Ready(s) = stream.poll_read(cx, buf) { - timeout.set(Delay::new(*dur)); + if let Poll::Ready(s) = this.stream.as_mut().poll_read(cx, buf) { + this.timeout.set(Delay::new(*this.dur)); return Poll::Ready(s); } - Pin::new(&mut *timeout).poll(cx).map(|_| { - timeout.set(Delay::new(*dur)); + this.timeout.as_mut().poll(cx).map(|_| { + this.timeout.set(Delay::new(*this.dur)); Err(io::Error::new(io::ErrorKind::TimedOut, "future timed out")) }) }