Skip to content

Commit 200466e

Browse files
authored
Support reexporting from anywhere by wrapping in a declarative macro (#46)
* Refactor Scrub * Wrap macro in a declarative macro and use $crate As a downside we get slightly worse error messages as the filename and line number will point to the macro definition site instead of the call site.
1 parent 80c03da commit 200466e

File tree

9 files changed

+183
-131
lines changed

9 files changed

+183
-131
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@
22
members = [
33
"async-stream",
44
"async-stream-impl",
5+
"async-stream/tests/reexporter",
6+
"async-stream/tests/test-reexport",
57
]

async-stream-impl/src/lib.rs

Lines changed: 63 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,51 @@
11
use proc_macro::TokenStream;
2-
use proc_macro2::{Delimiter, Group, TokenStream as TokenStream2, TokenTree};
2+
use proc_macro2::{Group, TokenStream as TokenStream2, TokenTree};
33
use quote::quote;
4+
use syn::parse::Parser;
45
use syn::visit_mut::VisitMut;
56

6-
struct Scrub {
7-
is_xforming: bool,
7+
struct Scrub<'a> {
8+
/// Whether the stream is a try stream.
89
is_try: bool,
10+
/// The unit expression, `()`.
911
unit: Box<syn::Expr>,
10-
num_yield: u32,
12+
has_yielded: bool,
13+
crate_path: &'a TokenStream2,
1114
}
1215

13-
fn parse_input(input: TokenStream) -> syn::Result<Vec<syn::Stmt>> {
14-
let input = replace_for_await(input.into());
15-
// syn does not provide a way to parse `Vec<Stmt>` directly from `TokenStream`,
16-
// so wrap input in a brace and then parse it as a block.
17-
let input = TokenStream2::from(TokenTree::Group(Group::new(Delimiter::Brace, input)));
18-
let syn::Block { stmts, .. } = syn::parse2(input)?;
19-
20-
Ok(stmts)
16+
fn parse_input(input: TokenStream) -> syn::Result<(TokenStream2, Vec<syn::Stmt>)> {
17+
let mut input = TokenStream2::from(input).into_iter();
18+
let crate_path = match input.next().unwrap() {
19+
TokenTree::Group(group) => group.stream(),
20+
_ => panic!(),
21+
};
22+
let stmts = syn::Block::parse_within.parse2(replace_for_await(input))?;
23+
Ok((crate_path, stmts))
2124
}
2225

23-
impl VisitMut for Scrub {
24-
fn visit_expr_mut(&mut self, i: &mut syn::Expr) {
25-
if !self.is_xforming {
26-
syn::visit_mut::visit_expr_mut(self, i);
27-
return;
26+
impl<'a> Scrub<'a> {
27+
fn new(is_try: bool, crate_path: &'a TokenStream2) -> Self {
28+
Self {
29+
is_try,
30+
unit: syn::parse_quote!(()),
31+
has_yielded: false,
32+
crate_path,
2833
}
34+
}
35+
}
2936

37+
impl VisitMut for Scrub<'_> {
38+
fn visit_expr_mut(&mut self, i: &mut syn::Expr) {
3039
match i {
3140
syn::Expr::Yield(yield_expr) => {
32-
self.num_yield += 1;
41+
self.has_yielded = true;
3342

34-
let value_expr = if let Some(ref e) = yield_expr.expr {
35-
e
36-
} else {
37-
&self.unit
38-
};
43+
let value_expr = yield_expr.expr.as_ref().unwrap_or(&self.unit);
3944

4045
// let ident = &self.yielder;
4146

4247
*i = if self.is_try {
43-
syn::parse_quote! { __yield_tx.send(Ok(#value_expr)).await }
48+
syn::parse_quote! { __yield_tx.send(::core::result::Result::Ok(#value_expr)).await }
4449
} else {
4550
syn::parse_quote! { __yield_tx.send(#value_expr).await }
4651
};
@@ -52,19 +57,16 @@ impl VisitMut for Scrub {
5257

5358
*i = syn::parse_quote! {
5459
match #e {
55-
Ok(v) => v,
56-
Err(e) => {
57-
__yield_tx.send(Err(e.into())).await;
60+
::core::result::Result::Ok(v) => v,
61+
::core::result::Result::Err(e) => {
62+
__yield_tx.send(::core::result::Result::Err(e.into())).await;
5863
return;
5964
}
6065
}
6166
};
6267
}
6368
syn::Expr::Closure(_) | syn::Expr::Async(_) => {
64-
let prev = self.is_xforming;
65-
self.is_xforming = false;
66-
syn::visit_mut::visit_expr_mut(self, i);
67-
self.is_xforming = prev;
69+
// Don't transform inner closures or async blocks.
6870
}
6971
syn::Expr::ForLoop(expr) => {
7072
syn::visit_mut::visit_expr_for_loop_mut(self, expr);
@@ -87,14 +89,15 @@ impl VisitMut for Scrub {
8789
return;
8890
}
8991

92+
let crate_path = self.crate_path;
9093
*i = syn::parse_quote! {{
9194
let mut __pinned = #expr;
9295
let mut __pinned = unsafe {
9396
::core::pin::Pin::new_unchecked(&mut __pinned)
9497
};
9598
#label
9699
loop {
97-
let #pat = match ::async_stream::reexport::next(&mut __pinned).await {
100+
let #pat = match #crate_path::reexport::next(&mut __pinned).await {
98101
::core::option::Option::Some(e) => e,
99102
::core::option::Option::None => break,
100103
};
@@ -106,143 +109,81 @@ impl VisitMut for Scrub {
106109
}
107110
}
108111

109-
fn visit_item_mut(&mut self, i: &mut syn::Item) {
110-
let prev = self.is_xforming;
111-
self.is_xforming = false;
112-
syn::visit_mut::visit_item_mut(self, i);
113-
self.is_xforming = prev;
112+
fn visit_item_mut(&mut self, _: &mut syn::Item) {
113+
// Don't transform inner items.
114114
}
115115
}
116116

117-
/// Asynchronous stream
118-
///
119-
/// See [crate](index.html) documentation for more details.
120-
///
121-
/// # Examples
122-
///
123-
/// ```rust
124-
/// use async_stream::stream;
125-
///
126-
/// use futures_util::pin_mut;
127-
/// use futures_util::stream::StreamExt;
128-
///
129-
/// #[tokio::main]
130-
/// async fn main() {
131-
/// let s = stream! {
132-
/// for i in 0..3 {
133-
/// yield i;
134-
/// }
135-
/// };
136-
///
137-
/// pin_mut!(s); // needed for iteration
138-
///
139-
/// while let Some(value) = s.next().await {
140-
/// println!("got {}", value);
141-
/// }
142-
/// }
143-
/// ```
117+
/// The first token tree in the stream must be a group containing the path to the `async-stream`
118+
/// crate.
144119
#[proc_macro]
145-
pub fn stream(input: TokenStream) -> TokenStream {
146-
let mut stmts = match parse_input(input) {
120+
#[doc(hidden)]
121+
pub fn stream_inner(input: TokenStream) -> TokenStream {
122+
let (crate_path, mut stmts) = match parse_input(input) {
147123
Ok(x) => x,
148124
Err(e) => return e.to_compile_error().into(),
149125
};
150126

151-
let mut scrub = Scrub {
152-
is_xforming: true,
153-
is_try: false,
154-
unit: syn::parse_quote!(()),
155-
num_yield: 0,
156-
};
127+
let mut scrub = Scrub::new(false, &crate_path);
157128

158-
for mut stmt in &mut stmts[..] {
129+
for mut stmt in &mut stmts {
159130
scrub.visit_stmt_mut(&mut stmt);
160131
}
161132

162-
let dummy_yield = if scrub.num_yield == 0 {
133+
let dummy_yield = if scrub.has_yielded {
134+
None
135+
} else {
163136
Some(quote!(if false {
164137
__yield_tx.send(()).await;
165138
}))
166-
} else {
167-
None
168139
};
169140

170141
quote!({
171-
let (mut __yield_tx, __yield_rx) = ::async_stream::yielder::pair();
172-
::async_stream::AsyncStream::new(__yield_rx, async move {
142+
let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
143+
#crate_path::AsyncStream::new(__yield_rx, async move {
173144
#dummy_yield
174145
#(#stmts)*
175146
})
176147
})
177148
.into()
178149
}
179150

180-
/// Asynchronous fallible stream
181-
///
182-
/// See [crate](index.html) documentation for more details.
183-
///
184-
/// # Examples
185-
///
186-
/// ```rust
187-
/// use tokio::net::{TcpListener, TcpStream};
188-
///
189-
/// use async_stream::try_stream;
190-
/// use futures_core::stream::Stream;
191-
///
192-
/// use std::io;
193-
/// use std::net::SocketAddr;
194-
///
195-
/// fn bind_and_accept(addr: SocketAddr)
196-
/// -> impl Stream<Item = io::Result<TcpStream>>
197-
/// {
198-
/// try_stream! {
199-
/// let mut listener = TcpListener::bind(addr).await?;
200-
///
201-
/// loop {
202-
/// let (stream, addr) = listener.accept().await?;
203-
/// println!("received on {:?}", addr);
204-
/// yield stream;
205-
/// }
206-
/// }
207-
/// }
208-
/// ```
151+
/// The first token tree in the stream must be a group containing the path to the `async-stream`
152+
/// crate.
209153
#[proc_macro]
210-
pub fn try_stream(input: TokenStream) -> TokenStream {
211-
let mut stmts = match parse_input(input) {
154+
#[doc(hidden)]
155+
pub fn try_stream_inner(input: TokenStream) -> TokenStream {
156+
let (crate_path, mut stmts) = match parse_input(input) {
212157
Ok(x) => x,
213158
Err(e) => return e.to_compile_error().into(),
214159
};
215160

216-
let mut scrub = Scrub {
217-
is_xforming: true,
218-
is_try: true,
219-
unit: syn::parse_quote!(()),
220-
num_yield: 0,
221-
};
161+
let mut scrub = Scrub::new(true, &crate_path);
222162

223-
for mut stmt in &mut stmts[..] {
163+
for mut stmt in &mut stmts {
224164
scrub.visit_stmt_mut(&mut stmt);
225165
}
226166

227-
let dummy_yield = if scrub.num_yield == 0 {
167+
let dummy_yield = if scrub.has_yielded {
168+
None
169+
} else {
228170
Some(quote!(if false {
229171
__yield_tx.send(()).await;
230172
}))
231-
} else {
232-
None
233173
};
234174

235175
quote!({
236-
let (mut __yield_tx, __yield_rx) = ::async_stream::yielder::pair();
237-
::async_stream::AsyncStream::new(__yield_rx, async move {
176+
let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
177+
#crate_path::AsyncStream::new(__yield_rx, async move {
238178
#dummy_yield
239179
#(#stmts)*
240180
})
241181
})
242182
.into()
243183
}
244184

245-
fn replace_for_await(input: TokenStream2) -> TokenStream2 {
185+
/// Replace `for await` with `#[await] for`, which will be later transformed into a `next` loop.
186+
fn replace_for_await(input: impl IntoIterator<Item = TokenTree>) -> TokenStream2 {
246187
let mut input = input.into_iter().peekable();
247188
let mut tokens = Vec::new();
248189

async-stream/src/lib.rs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,82 @@ mod next;
162162
#[doc(hidden)]
163163
pub mod yielder;
164164

165-
// Used by the macro, but not intended to be accessed publically.
165+
// Used by the macro, but not intended to be accessed publicly.
166166
#[doc(hidden)]
167167
pub use crate::async_stream::AsyncStream;
168168

169-
pub use async_stream_impl::{stream, try_stream};
169+
#[doc(hidden)]
170+
pub use async_stream_impl;
171+
172+
/// Asynchronous stream
173+
///
174+
/// See [crate](index.html) documentation for more details.
175+
///
176+
/// # Examples
177+
///
178+
/// ```
179+
/// use async_stream::stream;
180+
///
181+
/// use futures_util::pin_mut;
182+
/// use futures_util::stream::StreamExt;
183+
///
184+
/// #[tokio::main]
185+
/// async fn main() {
186+
/// let s = stream! {
187+
/// for i in 0..3 {
188+
/// yield i;
189+
/// }
190+
/// };
191+
///
192+
/// pin_mut!(s); // needed for iteration
193+
///
194+
/// while let Some(value) = s.next().await {
195+
/// println!("got {}", value);
196+
/// }
197+
/// }
198+
/// ```
199+
#[macro_export]
200+
macro_rules! stream {
201+
($($tt:tt)*) => {
202+
$crate::async_stream_impl::stream_inner!(($crate) $($tt)*)
203+
}
204+
}
205+
206+
/// Asynchronous fallible stream
207+
///
208+
/// See [crate](index.html) documentation for more details.
209+
///
210+
/// # Examples
211+
///
212+
/// ```
213+
/// use tokio::net::{TcpListener, TcpStream};
214+
///
215+
/// use async_stream::try_stream;
216+
/// use futures_core::stream::Stream;
217+
///
218+
/// use std::io;
219+
/// use std::net::SocketAddr;
220+
///
221+
/// fn bind_and_accept(addr: SocketAddr)
222+
/// -> impl Stream<Item = io::Result<TcpStream>>
223+
/// {
224+
/// try_stream! {
225+
/// let mut listener = TcpListener::bind(addr).await?;
226+
///
227+
/// loop {
228+
/// let (stream, addr) = listener.accept().await?;
229+
/// println!("received on {:?}", addr);
230+
/// yield stream;
231+
/// }
232+
/// }
233+
/// }
234+
/// ```
235+
#[macro_export]
236+
macro_rules! try_stream {
237+
($($tt:tt)*) => {
238+
$crate::async_stream_impl::try_stream_inner!(($crate) $($tt)*)
239+
}
240+
}
170241

171242
#[doc(hidden)]
172243
pub mod reexport {
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "reexporter"
3+
version = "0.0.0"
4+
authors = []
5+
edition = "2018"
6+
publish = false
7+
8+
[dependencies]
9+
async-stream = { path = "../.." }
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
//! A crate to test reexporting the `stream!` and `try_stream!` macros.
2+
3+
pub use async_stream::{stream, try_stream};

0 commit comments

Comments
 (0)