diff --git a/async-stream-impl/src/lib.rs b/async-stream-impl/src/lib.rs index db4e0fd..a71b7ad 100644 --- a/async-stream-impl/src/lib.rs +++ b/async-stream-impl/src/lib.rs @@ -1,12 +1,12 @@ use proc_macro::TokenStream; -use proc_macro2::{Group, TokenStream as TokenStream2, TokenTree}; +use proc_macro2::{Group, Ident, Span, TokenStream as TokenStream2, TokenTree}; use quote::quote; use syn::parse::{Parse, ParseStream, Parser, Result}; use syn::visit_mut::VisitMut; struct Scrub<'a> { - /// Whether the stream is a try stream. - is_try: bool, + /// The identifier of the local `Stream` variable. + stream: &'a Ident, /// The unit expression, `()`. unit: Box, has_yielded: bool, @@ -24,9 +24,9 @@ fn parse_input(input: TokenStream) -> syn::Result<(TokenStream2, Vec) } impl<'a> Scrub<'a> { - fn new(is_try: bool, crate_path: &'a TokenStream2) -> Self { + fn new(stream: &'a Ident, crate_path: &'a TokenStream2) -> Self { Self { - is_try, + stream, unit: syn::parse_quote!(()), has_yielded: false, crate_path, @@ -118,28 +118,10 @@ impl VisitMut for Scrub<'_> { let value_expr = yield_expr.expr.as_ref().unwrap_or(&self.unit); - // let ident = &self.yielder; - - *i = if self.is_try { - syn::parse_quote! { __yield_tx.send(::core::result::Result::Ok(#value_expr)).await } - } else { - syn::parse_quote! { __yield_tx.send(#value_expr).await } - }; - } - syn::Expr::Try(try_expr) => { - syn::visit_mut::visit_expr_try_mut(self, try_expr); - // let ident = &self.yielder; - let e = &try_expr.expr; - - *i = syn::parse_quote! { - match #e { - ::core::result::Result::Ok(v) => v, - ::core::result::Result::Err(e) => { - __yield_tx.send(::core::result::Result::Err(e.into())).await; - return; - } - } - }; + let stream = &self.stream; + *i = syn::Expr::Verbatim(quote! { + #stream.yield_item(#value_expr).await + }) } syn::Expr::Closure(_) | syn::Expr::Async(_) => { // Don't transform inner closures or async blocks. @@ -212,7 +194,9 @@ pub fn stream_inner(input: TokenStream) -> TokenStream { Err(e) => return e.to_compile_error().into(), }; - let mut scrub = Scrub::new(false, &crate_path); + let stream = Ident::new("stream", Span::mixed_site()); + + let mut scrub = Scrub::new(&stream, &crate_path); for stmt in &mut stmts { scrub.visit_stmt_mut(stmt); @@ -222,13 +206,12 @@ pub fn stream_inner(input: TokenStream) -> TokenStream { None } else { Some(quote!(if false { - __yield_tx.send(()).await; + #stream.yield_item(()).await; })) }; quote!({ - let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair(); - #crate_path::AsyncStream::new(__yield_rx, async move { + #crate_path::stream(move |mut #stream| async move { #dummy_yield #(#stmts)* }) @@ -246,7 +229,9 @@ pub fn try_stream_inner(input: TokenStream) -> TokenStream { Err(e) => return e.to_compile_error().into(), }; - let mut scrub = Scrub::new(true, &crate_path); + let stream = Ident::new("stream", Span::mixed_site()); + + let mut scrub = Scrub::new(&stream, &crate_path); for stmt in &mut stmts { scrub.visit_stmt_mut(stmt); @@ -256,15 +241,18 @@ pub fn try_stream_inner(input: TokenStream) -> TokenStream { None } else { Some(quote!(if false { - __yield_tx.send(()).await; + #stream.yield_item(()).await; })) }; quote!({ - let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair(); - #crate_path::AsyncStream::new(__yield_rx, async move { + #crate_path::try_stream(move |mut #stream| async move { #dummy_yield - #(#stmts)* + let () = { + #(#stmts)* + }; + #[allow(unreachable_code)] + Ok(()) }) }) .into() diff --git a/async-stream/Cargo.toml b/async-stream/Cargo.toml index df2dbe5..a14ad10 100644 --- a/async-stream/Cargo.toml +++ b/async-stream/Cargo.toml @@ -11,10 +11,13 @@ authors = ["Carl Lerche "] description = "Asynchronous streams using async & await notation" repository = "https://github.com/tokio-rs/async-stream" +[features] +macro = ["async-stream-impl"] + [dependencies] -async-stream-impl = { version = "=0.3.3", path = "../async-stream-impl" } +async-stream-impl = { version = "=0.3.3", path = "../async-stream-impl", optional = true } futures-core = "0.3" -pin-project-lite = "0.2" +pin-project-lite = "0.2.8" [dev-dependencies] futures-util = "0.3" @@ -22,3 +25,7 @@ rustversion = "1" tokio = { version = "1", features = ["full"] } tokio-test = "0.4" trybuild = "1" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "doc_nightly"] diff --git a/async-stream/README.md b/async-stream/README.md index ed28ea8..3e62789 100644 --- a/async-stream/README.md +++ b/async-stream/README.md @@ -2,18 +2,18 @@ Asynchronous stream of elements. -Provides two macros, `stream!` and `try_stream!`, allowing the caller to +Provides two functions, `stream` and `try_stream`, allowing the caller to define asynchronous streams of elements. These are implemented using `async` & `await` notation. This crate works without unstable features. -The `stream!` macro returns an anonymous type implementing the [`Stream`] +The `stream` function returns an anonymous type implementing the [`Stream`] trait. The `Item` associated type is the type of the values yielded from the -stream. The `try_stream!` also returns an anonymous type implementing the +stream. The `try_stream` also returns an anonymous type implementing the [`Stream`] trait, but the `Item` associated type is `Result`. The -`try_stream!` macro supports using `?` notation as part of the +`try_stream` function supports using `?` notation as part of the implementation. -## Usage +## Function Usage A basic stream yielding numbers. Values are yielded using the `yield` keyword. The stream block must return `()`. @@ -26,11 +26,11 @@ use futures_util::stream::StreamExt; #[tokio::main] async fn main() { - let s = stream! { + let s = stream(|mut stream| async move { for i in 0..3 { - yield i; + stream.yield_item(i).await; } - }; + }); pin_mut!(s); // needed for iteration @@ -50,11 +50,11 @@ use futures_util::pin_mut; use futures_util::stream::StreamExt; fn zero_to_three() -> impl Stream { - stream! { + stream(|mut stream| async move { for i in 0..3 { - yield i; + stream.yield_item(i).await; } - } + }) } #[tokio::main] @@ -68,8 +68,57 @@ async fn main() { } ``` -Streams may be implemented in terms of other streams - `async-stream` provides `for await` -syntax to assist with this: +Rust try notation (`?`) can be used with the `try_stream` function. The +`Item` of the returned stream is `Result` with `Ok` being the value yielded +and `Err` the error type returned by `?`. + +```rust +use tokio::net::{TcpListener, TcpStream}; + +use async_stream::try_stream; +use futures_core::stream::Stream; + +use std::io; +use std::net::SocketAddr; + +fn bind_and_accept(addr: SocketAddr) + -> impl Stream> +{ + try_stream(move |mut stream| async move { + let mut listener = TcpListener::bind(addr).await?; + + loop { + let (tcp_stream, addr) = listener.accept().await?; + println!("received on {:?}", addr); + stream.yield_item(tcp_stream).await; + } + }) +} +``` + +## Macro usage + +When the `macro` Cargo feature flag is enabled, this crate additionally +provides two macros, `stream!` and `try_stream!`, that are identical to +their equivalent functions but have a slightly nicer syntax where you can +write `yield x` instead of `stream.yield_item(x).await`. For example: + +```rust +use async_stream::stream; + +use futures_core::stream::Stream; + +fn zero_to_three() -> impl Stream { + stream! { + for i in 0..3 { + yield i; + } + } +} +``` + +Streams may be implemented in terms of other streams - the macros provide +`for await` syntax to assist with this: ```rust use async_stream::stream; @@ -107,52 +156,26 @@ async fn main() { } ``` -Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` -of the returned stream is `Result` with `Ok` being the value yielded and -`Err` the error type returned by `?`. - -```rust -use tokio::net::{TcpListener, TcpStream}; - -use async_stream::try_stream; -use futures_core::stream::Stream; - -use std::io; -use std::net::SocketAddr; - -fn bind_and_accept(addr: SocketAddr) - -> impl Stream> -{ - try_stream! { - let mut listener = TcpListener::bind(addr).await?; - - loop { - let (stream, addr) = listener.accept().await?; - println!("received on {:?}", addr); - yield stream; - } - } -} -``` - ## Implementation -The `stream!` and `try_stream!` macros are implemented using proc macros. -The macro searches the syntax tree for instances of `sender.send($expr)` and -transforms them into `sender.send($expr).await`. - -The stream uses a lightweight sender to send values from the stream +The streams use a lightweight sender to send values from the stream implementation to the caller. When entering the stream, an `Option` is -stored on the stack. A pointer to the cell is stored in a thread local and -`poll` is called on the async block. When `poll` returns. -`sender.send(value)` stores the value that cell and yields back to the -caller. +stored on the stack. A pointer to the slot is stored in a thread local and +`poll` is called on the future. When `yield` is called, the value is stored +in there and the stream yields back to the caller. -[`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html +The `stream!` and `try_stream!` macros are implemented using proc macros. +The macro searches the syntax tree for instances of `yield $expr` and +transforms them into `stream.yield_item($expr).await`. ## Supported Rust Versions -`async-stream` is built against the latest stable release. The minimum supported version is 1.45 due to [function-like procedural macros in expression, pattern, and statement positions](https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements). +`async-stream` is built against the latest stable release. The minimum +supported version is 1.45 due to [function-like procedural macros in +expression, pattern, and statement positions][1.45]. + +[`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html +[1.45]: https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements ## License diff --git a/async-stream/examples/tcp_accept.rs b/async-stream/examples/tcp_accept.rs index 1b69bda..ad214ce 100644 --- a/async-stream/examples/tcp_accept.rs +++ b/async-stream/examples/tcp_accept.rs @@ -7,12 +7,12 @@ use tokio::net::TcpListener; async fn main() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let incoming = stream! { + let incoming = stream(|mut stream| async move { loop { let (socket, _) = listener.accept().await.unwrap(); - yield socket; + stream.yield_item(socket).await; } - }; + }); pin_mut!(incoming); while let Some(v) = incoming.next().await { diff --git a/async-stream/src/async_stream.rs b/async-stream/src/async_stream.rs deleted file mode 100644 index ff408ab..0000000 --- a/async-stream/src/async_stream.rs +++ /dev/null @@ -1,79 +0,0 @@ -use crate::yielder::Receiver; - -use futures_core::{FusedStream, Stream}; -use pin_project_lite::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -pin_project! { - #[doc(hidden)] - #[derive(Debug)] - pub struct AsyncStream { - rx: Receiver, - done: bool, - #[pin] - generator: U, - } -} - -impl AsyncStream { - #[doc(hidden)] - pub fn new(rx: Receiver, generator: U) -> AsyncStream { - AsyncStream { - rx, - done: false, - generator, - } - } -} - -impl FusedStream for AsyncStream -where - U: Future, -{ - fn is_terminated(&self) -> bool { - self.done - } -} - -impl Stream for AsyncStream -where - U: Future, -{ - type Item = T; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = self.project(); - - if *me.done { - return Poll::Ready(None); - } - - let mut dst = None; - let res = { - let _enter = me.rx.enter(&mut dst); - me.generator.poll(cx) - }; - - *me.done = res.is_ready(); - - if dst.is_some() { - return Poll::Ready(dst.take()); - } - - if *me.done { - Poll::Ready(None) - } else { - Poll::Pending - } - } - - fn size_hint(&self) -> (usize, Option) { - if self.done { - (0, Some(0)) - } else { - (0, None) - } - } -} diff --git a/async-stream/src/functions.rs b/async-stream/src/functions.rs new file mode 100644 index 0000000..d191d4d --- /dev/null +++ b/async-stream/src/functions.rs @@ -0,0 +1,352 @@ +use pin_project_lite::pin_project; +use std::cell::Cell; +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::future::Future; +use std::marker::PhantomData; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::ptr::NonNull; +use std::task; +use std::task::Poll; + +/// Create an asynchronous stream. +/// +/// See [crate](index.html) documentation for more details. +/// +/// # Examples +/// +/// ``` +/// use async_stream::stream; +/// +/// use futures_util::pin_mut; +/// use futures_util::stream::StreamExt; +/// +/// #[tokio::main] +/// async fn main() { +/// let s = stream(|mut stream| async move { +/// for i in 0..3 { +/// stream.yield_item(i).await; +/// } +/// }); +/// +/// pin_mut!(s); // needed for iteration +/// +/// while let Some(value) = s.next().await { +/// println!("got {}", value); +/// } +/// } +/// ``` +pub fn stream(function: F) -> impl futures_core::FusedStream +where + F: FnOnce(Stream) -> Fut, + Fut: Future, +{ + AsyncStream::Initial { function } +} + +/// Create a fallible asynchronous stream. +/// +/// See [crate](index.html) documentation for more details. +/// +/// # Examples +/// +/// ``` +/// use tokio::net::{TcpListener, TcpStream}; +/// +/// use async_stream::try_stream; +/// use futures_core::stream::Stream; +/// +/// use std::io; +/// use std::net::SocketAddr; +/// +/// fn bind_and_accept(addr: SocketAddr) +/// -> impl Stream> +/// { +/// try_stream(move |mut stream| async move { +/// let mut listener = TcpListener::bind(addr).await?; +/// +/// loop { +/// let (tcp_stream, addr) = listener.accept().await?; +/// println!("received on {:?}", addr); +/// stream.yield_item(tcp_stream).await; +/// } +/// }) +/// } +/// ``` +pub fn try_stream(function: F) -> impl futures_core::FusedStream> +where + F: FnOnce(Stream) -> Fut, + Fut: Future>, +{ + AsyncStream::Initial { function } +} + +/// Either `()` or `Result<(), E>`. This is used to share the code behind `stream` and `try_stream`. +/// The `T` generic parameter represents the type yielded by the inner future (the type parameter +/// `T` of `Stream`). +trait FutureOutput { + /// The item type that should be yielded by the containing stream. + type Item; + + /// Convert each `T` into an item type for yielding. + fn yield_item(item: T) -> Self::Item; + + /// Obtain the final item that may or may not be yielded by the stream after the future has + /// completed. + fn into_item(self) -> Option; +} + +impl FutureOutput for () { + type Item = T; + + fn yield_item(item: T) -> Self::Item { + item + } + + fn into_item(self) -> Option { + None + } +} + +impl FutureOutput for Result<(), E> { + type Item = Result; + + fn yield_item(item: T) -> Self::Item { + Ok(item) + } + + fn into_item(self) -> Option { + match self { + Ok(()) => None, + Err(e) => Some(Err(e)), + } + } +} + +pin_project! { + /// The actual implementation behind `stream` and `try_stream`. + #[project = Proj] + #[project_replace = ProjReplace] + enum AsyncStream { + /// The stream hasn't started yet (before first poll). + Initial { + function: F, + }, + /// The stream is currently running the inner future. + Running { + #[pin] + future: Fut, + }, + /// The stream has completed and will only yield `Poll::Ready(None)` from now on. + Complete, + /// This variant is never created and only exists for various phantom things. + Phantoms { + // The address of the `AsyncStream` needs to remain stable so that the yielder can + // assert that it is sending the item to the correct stream. + #[pin] + _pinned: PhantomPinned, + + // Covariance because we contain a `FnOnce(Yielder)` which is semantically a + // `fn(fn(T))` that is covariant (as the contravariances of `fn(T)` cancel out). + _covariant_input: Option T>, + }, + } +} + +// SAFETY: You cannot do anything with an `&AsyncStream`. +unsafe impl Sync for AsyncStream {} + +impl futures_core::Stream for AsyncStream +where + F: FnOnce(Stream) -> Fut, + Fut: Future, + Fut::Output: FutureOutput, +{ + type Item = >::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + // Strict provenance-wise, this is identical to a call to `.addr()` since it is only used + // for equality checking from this point onward (and never converted back into a pointer). + let addr = self.as_ref().get_ref() as *const Self as usize; + + if let Proj::Initial { .. } = self.as_mut().project() { + let function = match self.as_mut().project_replace(AsyncStream::Complete) { + ProjReplace::Initial { function } => function, + _ => unreachable!(), + }; + let future = function(Stream { + addr, + _contravariant: PhantomData, + }); + self.set(AsyncStream::Running { future }); + } + + let future = match self.as_mut().project() { + Proj::Running { future } => future, + Proj::Complete => return Poll::Ready(None), + // Not possible as the above `if` just checked for it and replaced it. + Proj::Initial { .. } => unreachable!(), + // We never create this variant so this can't happen. + Proj::Phantoms { .. } => unreachable!(), + }; + + let mut slot: Option = None; + + let poll = CURRENT_CONTAINING_STREAM.with(|containing_stream| { + struct Guard<'tls> { + containing_stream: &'tls Cell>, + old: Option, + } + impl<'tls> Drop for Guard<'tls> { + fn drop(&mut self) { + self.containing_stream.set(self.old.take()); + } + } + + let mut guard = Guard { + containing_stream, + old: containing_stream.take(), + }; + containing_stream.set(Some(ContainingStream { + addr, + slot: NonNull::from(&mut slot).cast::<()>(), + next: NonNull::from(&mut guard.old), + })); + + future.poll(cx) + }); + + match poll { + Poll::Ready(output) => { + self.set(AsyncStream::Complete); + assert!(slot.is_none(), "yielded item as stream completed"); + Poll::Ready(output.into_item()) + } + Poll::Pending => { + if let Some(item) = slot { + Poll::Ready(Some(>::yield_item(item))) + } else { + Poll::Pending + } + } + } + } + + fn size_hint(&self) -> (usize, Option) { + match self { + Self::Complete => (0, Some(0)), + _ => (0, None), + } + } +} + +impl futures_core::FusedStream for AsyncStream +where + F: FnOnce(Stream) -> Fut, + Fut: Future, + Fut::Output: FutureOutput, +{ + fn is_terminated(&self) -> bool { + matches!(self, Self::Complete) + } +} + +/// Pointer to a stream that is currently being polled. +#[derive(Clone, Copy)] +struct ContainingStream { + /// The address of the stream, used to verify that the stream is the correct one. This is never + /// turned back into a pointer. + addr: usize, + + /// A type-erased pointer to the `Option` that holds the item about to be yielded. + slot: NonNull<()>, + + /// A pointer to the containing `ContainingStream` in case multiple streams are being polled + /// recursively. This forms a temporary stack-allocated linked list. + next: NonNull>, +} + +thread_local! { + static CURRENT_CONTAINING_STREAM: Cell> = Cell::new(None); +} + +/// The internal interface of an async stream. +pub struct Stream { + /// The address of the containing `AsyncStream`, used to ensure that we don't ever yield to the + /// wrong stream. + addr: usize, + + /// Contravariance because we are semantically a `fn(T)`. + _contravariant: PhantomData, +} + +impl Stream { + /// Yield an item to the containing stream. + /// + /// # Panics + /// + /// - Panics if this is called outside a stream's context. + /// - Panics if an item has already been yielded in the stream's context. + /// - Will result in further panics if the containing future completes before this has finished + /// `.await`ing. + pub async fn yield_item(&mut self, item: T) { + const OUTSIDE_CONTEXT_PANIC: &str = "called `yield_item` outside of stream context"; + const DOUBLE_YIELD_PANIC: &str = "called `yield_item` twice in the same context"; + + // Use a scope to ensure the surrounding future can still be `Send + Sync`. + { + // Loop through the stack of streams that are currently being polled to try and find our + // one. Although this is `O(n)` other than in pathological cases `n` will be very small. + let mut option_stream = CURRENT_CONTAINING_STREAM.with(Cell::get); + let mut slot = loop { + let stream = option_stream.expect(OUTSIDE_CONTEXT_PANIC); + if stream.addr == self.addr { + break stream.slot.cast::>(); + } + option_stream = unsafe { *stream.next.as_ref() }; + }; + + let slot = unsafe { slot.as_mut() }; + assert!(slot.is_none(), "{}", DOUBLE_YIELD_PANIC); + *slot = Some(item); + } + + PendOnce::default().await; + } +} + +// Intentionally `Clone` but not `Copy` (even though we could be) to prevent misuse (e.g. calling +// `yield_item` on one `Stream` in multiple arms of a `join!`). +impl Clone for Stream { + fn clone(&self) -> Self { + Self { + addr: self.addr, + _contravariant: PhantomData, + } + } +} + +impl Debug for Stream { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.pad("Stream") + } +} + +#[derive(Default)] +struct PendOnce(bool); + +impl Future for PendOnce { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll { + match self.0 { + false => { + self.0 = true; + Poll::Pending + } + true => Poll::Ready(()), + } + } +} diff --git a/async-stream/src/lib.rs b/async-stream/src/lib.rs index bab0954..62e815a 100644 --- a/async-stream/src/lib.rs +++ b/async-stream/src/lib.rs @@ -5,21 +5,22 @@ unreachable_pub )] #![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))] +#![cfg_attr(doc_nightly, feature(doc_cfg))] //! Asynchronous stream of elements. //! -//! Provides two macros, `stream!` and `try_stream!`, allowing the caller to +//! Provides two functions, `stream` and `try_stream`, allowing the caller to //! define asynchronous streams of elements. These are implemented using `async` //! & `await` notation. This crate works without unstable features. //! -//! The `stream!` macro returns an anonymous type implementing the [`Stream`] +//! The `stream` function returns an anonymous type implementing the [`Stream`] //! trait. The `Item` associated type is the type of the values yielded from the -//! stream. The `try_stream!` also returns an anonymous type implementing the +//! stream. The `try_stream` also returns an anonymous type implementing the //! [`Stream`] trait, but the `Item` associated type is `Result`. The -//! `try_stream!` macro supports using `?` notation as part of the +//! `try_stream` function supports using `?` notation as part of the //! implementation. //! -//! # Usage +//! # Function Usage //! //! A basic stream yielding numbers. Values are yielded using the `yield` //! keyword. The stream block must return `()`. @@ -32,11 +33,11 @@ //! //! #[tokio::main] //! async fn main() { -//! let s = stream! { +//! let s = stream(|mut stream| async move { //! for i in 0..3 { -//! yield i; +//! stream.yield_item(i).await; //! } -//! }; +//! }); //! //! pin_mut!(s); // needed for iteration //! @@ -56,11 +57,11 @@ //! use futures_util::stream::StreamExt; //! //! fn zero_to_three() -> impl Stream { -//! stream! { +//! stream(|mut stream| async move { //! for i in 0..3 { -//! yield i; +//! stream.yield_item(i).await; //! } -//! } +//! }) //! } //! //! #[tokio::main] @@ -74,8 +75,57 @@ //! } //! ``` //! -//! Streams may be implemented in terms of other streams - `async-stream` provides `for await` -//! syntax to assist with this: +//! Rust try notation (`?`) can be used with the `try_stream` function. The +//! `Item` of the returned stream is `Result` with `Ok` being the value yielded +//! and `Err` the error type returned by `?`. +//! +//! ```rust +//! use tokio::net::{TcpListener, TcpStream}; +//! +//! use async_stream::try_stream; +//! use futures_core::stream::Stream; +//! +//! use std::io; +//! use std::net::SocketAddr; +//! +//! fn bind_and_accept(addr: SocketAddr) +//! -> impl Stream> +//! { +//! try_stream(move |mut stream| async move { +//! let mut listener = TcpListener::bind(addr).await?; +//! +//! loop { +//! let (tcp_stream, addr) = listener.accept().await?; +//! println!("received on {:?}", addr); +//! stream.yield_item(tcp_stream).await; +//! } +//! }) +//! } +//! ``` +//! +//! # Macro usage +//! +//! When the `macro` Cargo feature flag is enabled, this crate additionally +//! provides two macros, `stream!` and `try_stream!`, that are identical to +//! their equivalent functions but have a slightly nicer syntax where you can +//! write `yield x` instead of `stream.yield_item(x).await`. For example: +//! +//! ``` +//! use async_stream::stream; +//! +//! use futures_core::stream::Stream; +//! +//! fn zero_to_three() -> impl Stream { +//! stream! { +//! for i in 0..3 { +//! yield i; +//! } +//! } +//! } +//! ``` +//! +//! Streams may be implemented in terms of other streams - the macros provide +//! `for await` syntax to assist with this: //! //! ```rust //! use async_stream::stream; @@ -113,133 +163,34 @@ //! } //! ``` //! -//! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` -//! of the returned stream is `Result` with `Ok` being the value yielded and -//! `Err` the error type returned by `?`. -//! -//! ```rust -//! use tokio::net::{TcpListener, TcpStream}; -//! -//! use async_stream::try_stream; -//! use futures_core::stream::Stream; -//! -//! use std::io; -//! use std::net::SocketAddr; -//! -//! fn bind_and_accept(addr: SocketAddr) -//! -> impl Stream> -//! { -//! try_stream! { -//! let mut listener = TcpListener::bind(addr).await?; -//! -//! loop { -//! let (stream, addr) = listener.accept().await?; -//! println!("received on {:?}", addr); -//! yield stream; -//! } -//! } -//! } -//! ``` -//! //! # Implementation //! +//! The streams use a lightweight sender to send values from the stream +//! implementation to the caller. When entering the stream, an `Option` is +//! stored on the stack. A pointer to the slot is stored in a thread local and +//! `poll` is called on the future. When `yield` is called, the value is stored +//! in there and the stream yields back to the caller. +//! //! The `stream!` and `try_stream!` macros are implemented using proc macros. -//! The macro searches the syntax tree for instances of `sender.send($expr)` and -//! transforms them into `sender.send($expr).await`. +//! The macro searches the syntax tree for instances of `yield $expr` and +//! transforms them into `stream.yield_item($expr).await`. //! -//! The stream uses a lightweight sender to send values from the stream -//! implementation to the caller. When entering the stream, an `Option` is -//! stored on the stack. A pointer to the cell is stored in a thread local and -//! `poll` is called on the async block. When `poll` returns. -//! `sender.send(value)` stores the value that cell and yields back to the -//! caller. +//! # Supported Rust Versions +//! +//! `async-stream` is built against the latest stable release. The minimum +//! supported version is 1.45 due to [function-like procedural macros in +//! expression, pattern, and statement positions][1.45]. //! //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html +//! [1.45]: https://blog.rust-lang.org/2020/07/16/Rust-1.45.0.html#stabilizing-function-like-procedural-macros-in-expressions-patterns-and-statements -mod async_stream; -mod next; -#[doc(hidden)] -pub mod yielder; +#[cfg(feature = "macro")] +mod macros; // Used by the macro, but not intended to be accessed publicly. +#[cfg(feature = "macro")] #[doc(hidden)] -pub use crate::async_stream::AsyncStream; +pub use {::async_stream_impl, macros::reexport}; -#[doc(hidden)] -pub use async_stream_impl; - -/// Asynchronous stream -/// -/// See [crate](index.html) documentation for more details. -/// -/// # Examples -/// -/// ``` -/// use async_stream::stream; -/// -/// use futures_util::pin_mut; -/// use futures_util::stream::StreamExt; -/// -/// #[tokio::main] -/// async fn main() { -/// let s = stream! { -/// for i in 0..3 { -/// yield i; -/// } -/// }; -/// -/// pin_mut!(s); // needed for iteration -/// -/// while let Some(value) = s.next().await { -/// println!("got {}", value); -/// } -/// } -/// ``` -#[macro_export] -macro_rules! stream { - ($($tt:tt)*) => { - $crate::async_stream_impl::stream_inner!(($crate) $($tt)*) - } -} - -/// Asynchronous fallible stream -/// -/// See [crate](index.html) documentation for more details. -/// -/// # Examples -/// -/// ``` -/// use tokio::net::{TcpListener, TcpStream}; -/// -/// use async_stream::try_stream; -/// use futures_core::stream::Stream; -/// -/// use std::io; -/// use std::net::SocketAddr; -/// -/// fn bind_and_accept(addr: SocketAddr) -/// -> impl Stream> -/// { -/// try_stream! { -/// let mut listener = TcpListener::bind(addr).await?; -/// -/// loop { -/// let (stream, addr) = listener.accept().await?; -/// println!("received on {:?}", addr); -/// yield stream; -/// } -/// } -/// } -/// ``` -#[macro_export] -macro_rules! try_stream { - ($($tt:tt)*) => { - $crate::async_stream_impl::try_stream_inner!(($crate) $($tt)*) - } -} - -#[doc(hidden)] -pub mod reexport { - #[doc(hidden)] - pub use crate::next::next; -} +mod functions; +pub use functions::{stream, try_stream, Stream}; diff --git a/async-stream/src/macros/mod.rs b/async-stream/src/macros/mod.rs new file mode 100644 index 0000000..dc280b6 --- /dev/null +++ b/async-stream/src/macros/mod.rs @@ -0,0 +1,78 @@ +mod next; + +/// Asynchronous stream +/// +/// See [crate](index.html) documentation for more details. +/// +/// # Examples +/// +/// ``` +/// use async_stream::stream; +/// +/// use futures_util::pin_mut; +/// use futures_util::stream::StreamExt; +/// +/// #[tokio::main] +/// async fn main() { +/// let s = stream! { +/// for i in 0..3 { +/// yield i; +/// } +/// }; +/// +/// pin_mut!(s); // needed for iteration +/// +/// while let Some(value) = s.next().await { +/// println!("got {}", value); +/// } +/// } +/// ``` +#[cfg_attr(doc_nightly, doc(cfg(feature = "macro")))] +#[macro_export] +macro_rules! stream { + ($($tt:tt)*) => { + $crate::async_stream_impl::stream_inner!(($crate) $($tt)*) + } +} + +/// Asynchronous fallible stream +/// +/// See [crate](index.html) documentation for more details. +/// +/// # Examples +/// +/// ``` +/// use tokio::net::{TcpListener, TcpStream}; +/// +/// use async_stream::try_stream; +/// use futures_core::stream::Stream; +/// +/// use std::io; +/// use std::net::SocketAddr; +/// +/// fn bind_and_accept(addr: SocketAddr) +/// -> impl Stream> +/// { +/// try_stream! { +/// let mut listener = TcpListener::bind(addr).await?; +/// +/// loop { +/// let (stream, addr) = listener.accept().await?; +/// println!("received on {:?}", addr); +/// yield stream; +/// } +/// } +/// } +/// ``` +#[cfg_attr(doc_nightly, doc(cfg(feature = "macro")))] +#[macro_export] +macro_rules! try_stream { + ($($tt:tt)*) => { + $crate::async_stream_impl::try_stream_inner!(($crate) $($tt)*) + } +} + +#[doc(hidden)] +pub mod reexport { + pub use super::next::next; +} diff --git a/async-stream/src/macros/next.rs b/async-stream/src/macros/next.rs new file mode 100644 index 0000000..7b1e046 --- /dev/null +++ b/async-stream/src/macros/next.rs @@ -0,0 +1,32 @@ +use futures_core::Stream; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +// This is equivalent to the `futures::StreamExt::next` method. +// But we want to make this crate dependency as small as possible, so we define our `next` function. +#[doc(hidden)] +pub fn next(stream: &mut S) -> impl Future> + '_ +where + S: Stream + Unpin, +{ + Next { stream } +} + +#[derive(Debug)] +struct Next<'a, S> { + stream: &'a mut S, +} + +impl Unpin for Next<'_, S> where S: Unpin {} + +impl Future for Next<'_, S> +where + S: Stream + Unpin, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.stream).poll_next(cx) + } +} diff --git a/async-stream/src/yielder.rs b/async-stream/src/yielder.rs deleted file mode 100644 index 2482260..0000000 --- a/async-stream/src/yielder.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::cell::Cell; -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::ptr; -use std::task::{Context, Poll}; - -#[derive(Debug)] -pub struct Sender { - _p: PhantomData, -} - -#[derive(Debug)] -pub struct Receiver { - _p: PhantomData, -} - -pub(crate) struct Enter<'a, T> { - _rx: &'a mut Receiver, - prev: *mut (), -} - -pub fn pair() -> (Sender, Receiver) { - let tx = Sender { _p: PhantomData }; - let rx = Receiver { _p: PhantomData }; - (tx, rx) -} - -// Tracks the pointer to `Option`. -// -// TODO: Ensure wakers match? -thread_local!(static STORE: Cell<*mut ()> = Cell::new(ptr::null_mut())); - -// ===== impl Sender ===== - -impl Sender { - pub fn send(&mut self, value: T) -> impl Future { - Send { value: Some(value) } - } -} - -struct Send { - value: Option, -} - -impl Unpin for Send {} - -impl Future for Send { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { - if self.value.is_none() { - return Poll::Ready(()); - } - - STORE.with(|cell| { - let ptr = cell.get() as *mut Option; - let option_ref = unsafe { ptr.as_mut() }.expect("invalid usage"); - - if option_ref.is_none() { - *option_ref = self.value.take(); - } - - Poll::Pending - }) - } -} - -// ===== impl Receiver ===== - -impl Receiver { - pub(crate) fn enter<'a>(&'a mut self, dst: &'a mut Option) -> Enter<'a, T> { - let prev = STORE.with(|cell| { - let prev = cell.get(); - cell.set(dst as *mut _ as *mut ()); - prev - }); - - Enter { _rx: self, prev } - } -} - -// ===== impl Enter ===== - -impl<'a, T> Drop for Enter<'a, T> { - fn drop(&mut self) { - STORE.with(|cell| cell.set(self.prev)); - } -} diff --git a/async-stream/tests/for_await.rs b/async-stream/tests/for_await.rs index 590ffbd..dc8be7c 100644 --- a/async-stream/tests/for_await.rs +++ b/async-stream/tests/for_await.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "macro")] + use async_stream::stream; use futures_util::stream::StreamExt; diff --git a/async-stream/tests/recursive.rs b/async-stream/tests/recursive.rs new file mode 100644 index 0000000..d3e9eb8 --- /dev/null +++ b/async-stream/tests/recursive.rs @@ -0,0 +1,22 @@ +use async_stream::stream; +use futures_util::stream::StreamExt; +use tokio::pin; + +#[tokio::test] +async fn test() { + let stream_1 = stream(|mut stream_1| async move { + let mut stream_1_clone = stream_1.clone(); + let stream_2 = stream(|mut stream_2| async move { + stream_1_clone.yield_item(1).await; + stream_2.yield_item(2).await; + }); + pin!(stream_2); + assert_eq!(stream_2.next().await, Some(2)); + stream_1.yield_item(11).await; + assert_eq!(stream_2.next().await, None); + }); + pin!(stream_1); + assert_eq!(stream_1.next().await, Some(1)); + assert_eq!(stream_1.next().await, Some(11)); + assert_eq!(stream_1.next().await, None); +} diff --git a/async-stream/tests/reexporter/Cargo.toml b/async-stream/tests/reexporter/Cargo.toml index 5dbd2f2..a8b1b5c 100644 --- a/async-stream/tests/reexporter/Cargo.toml +++ b/async-stream/tests/reexporter/Cargo.toml @@ -6,4 +6,4 @@ edition = "2018" publish = false [dependencies] -async-stream = { path = "../.." } +async-stream = { path = "../..", features = ["macro"] } diff --git a/async-stream/tests/send_sync.rs b/async-stream/tests/send_sync.rs new file mode 100644 index 0000000..68f2852 --- /dev/null +++ b/async-stream/tests/send_sync.rs @@ -0,0 +1,34 @@ +use async_stream::stream; +use async_stream::try_stream; +use std::cell::Cell; + +fn assert_send_sync(_: T) {} + +#[test] +fn functions() { + let stream = stream(|mut stream| async move { + stream.yield_item(Cell::new(0)).await; + }); + assert_send_sync(stream); + + let stream = try_stream(|mut stream| async move { + stream.yield_item(Cell::new(0)).await; + Err(Cell::new(0)) + }); + assert_send_sync(stream); +} + +#[cfg(feature = "macro")] +#[test] +fn macros() { + let stream = stream! { + yield Cell::new(0); + }; + assert_send_sync(stream); + + let stream = try_stream! { + yield Cell::new(0); + return Err(Cell::new(0)); + }; + assert_send_sync(stream); +} diff --git a/async-stream/tests/stream.rs b/async-stream/tests/stream.rs index 4e26a3d..6002635 100644 --- a/async-stream/tests/stream.rs +++ b/async-stream/tests/stream.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "macro")] + use async_stream::stream; use futures_core::stream::{FusedStream, Stream}; diff --git a/async-stream/tests/try_stream.rs b/async-stream/tests/try_stream.rs index c404e62..6bbb06b 100644 --- a/async-stream/tests/try_stream.rs +++ b/async-stream/tests/try_stream.rs @@ -1,7 +1,10 @@ +#![cfg(feature = "macro")] + use async_stream::try_stream; use futures_core::stream::Stream; use futures_util::stream::StreamExt; +use tokio::pin; #[tokio::test] async fn single_err() { @@ -85,3 +88,22 @@ fn issue_65() -> impl Stream> { yield Err(())?; } } + +macro_rules! try_macro { + ($e:expr) => { + $e? + }; +} + +#[tokio::test] +async fn try_in_macro() { + let s = try_stream! { + yield "hi"; + try_macro!(Err("bye")); + }; + pin!(s); + + assert_eq!(s.next().await, Some(Ok("hi"))); + assert_eq!(s.next().await, Some(Err("bye"))); + assert_eq!(s.next().await, None); +}