1
- extern crate proc_macro;
2
-
3
- use proc_macro:: { TokenStream , TokenTree } ;
4
- use proc_macro2:: { Group , Span , TokenStream as TokenStream2 , TokenTree as TokenTree2 } ;
1
+ use proc_macro:: TokenStream ;
2
+ use proc_macro2:: { Delimiter , Group , TokenStream as TokenStream2 , TokenTree } ;
5
3
use quote:: quote;
6
4
use syn:: visit_mut:: VisitMut ;
7
5
@@ -12,48 +10,14 @@ struct Scrub {
12
10
num_yield : u32 ,
13
11
}
14
12
15
- #[ derive( Debug ) ]
16
- struct AsyncStreamEnumHack {
17
- macro_ident : syn:: Ident ,
18
- stmts : Vec < syn:: Stmt > ,
19
- }
20
-
21
- impl AsyncStreamEnumHack {
22
- fn parse ( input : TokenStream ) -> syn:: Result < Self > {
23
- macro_rules! n {
24
- ( $i: ident) => {
25
- $i. next( ) . unwrap( )
26
- } ;
27
- }
28
-
29
- let mut input = input. into_iter ( ) ;
30
- n ! ( input) ; // enum
31
- n ! ( input) ; // ident
32
-
33
- let mut braces = match n ! ( input) {
34
- TokenTree :: Group ( group) => group. stream ( ) . into_iter ( ) ,
35
- _ => unreachable ! ( ) ,
36
- } ;
37
-
38
- n ! ( braces) ; // Dummy
39
- n ! ( braces) ; // =
40
- n ! ( braces) ; // $crate
41
- n ! ( braces) ; // :
42
- n ! ( braces) ; // :
43
- n ! ( braces) ; // scrub
44
- n ! ( braces) ; // !
45
-
46
- let inner = n ! ( braces) ;
47
- let inner = replace_for_await ( TokenStream2 :: from ( TokenStream :: from ( inner) ) ) ;
48
- let syn:: Block { stmts, .. } = syn:: parse2 ( inner. clone ( ) ) ?;
49
-
50
- let macro_ident = syn:: Ident :: new (
51
- & format ! ( "stream_{}" , count_bangs( inner. into( ) ) ) ,
52
- Span :: call_site ( ) ,
53
- ) ;
13
+ fn parse_input ( input : TokenStream ) -> syn:: Result < Vec < syn:: Stmt > > {
14
+ let input = replace_for_await ( input. into ( ) ) ;
15
+ // syn does not provide a way to parse `Vec<Stmt>` directly from `TokenStream`,
16
+ // so wrap input in a brace and then parse it as a block.
17
+ let input = TokenStream2 :: from ( TokenTree :: Group ( Group :: new ( Delimiter :: Brace , input) ) ) ;
18
+ let syn:: Block { stmts, .. } = syn:: parse2 ( input) ?;
54
19
55
- Ok ( AsyncStreamEnumHack { stmts, macro_ident } )
56
- }
20
+ Ok ( stmts)
57
21
}
58
22
59
23
impl VisitMut for Scrub {
@@ -150,12 +114,36 @@ impl VisitMut for Scrub {
150
114
}
151
115
}
152
116
153
- #[ proc_macro_derive( AsyncStreamHack ) ]
154
- pub fn async_stream_impl ( input : TokenStream ) -> TokenStream {
155
- let AsyncStreamEnumHack {
156
- macro_ident,
157
- mut stmts,
158
- } = match AsyncStreamEnumHack :: parse ( input) {
117
+ /// Asynchronous stream
118
+ ///
119
+ /// See [crate](index.html) documentation for more details.
120
+ ///
121
+ /// # Examples
122
+ ///
123
+ /// ```rust
124
+ /// use async_stream::stream;
125
+ ///
126
+ /// use futures_util::pin_mut;
127
+ /// use futures_util::stream::StreamExt;
128
+ ///
129
+ /// #[tokio::main]
130
+ /// async fn main() {
131
+ /// let s = stream! {
132
+ /// for i in 0..3 {
133
+ /// yield i;
134
+ /// }
135
+ /// };
136
+ ///
137
+ /// pin_mut!(s); // needed for iteration
138
+ ///
139
+ /// while let Some(value) = s.next().await {
140
+ /// println!("got {}", value);
141
+ /// }
142
+ /// }
143
+ /// ```
144
+ #[ proc_macro]
145
+ pub fn stream ( input : TokenStream ) -> TokenStream {
146
+ let mut stmts = match parse_input ( input) {
159
147
Ok ( x) => x,
160
148
Err ( e) => return e. to_compile_error ( ) . into ( ) ,
161
149
} ;
@@ -171,33 +159,56 @@ pub fn async_stream_impl(input: TokenStream) -> TokenStream {
171
159
scrub. visit_stmt_mut ( & mut stmt) ;
172
160
}
173
161
174
- if scrub. num_yield == 0 {
175
- quote ! ( macro_rules! #macro_ident {
176
- ( ) => { {
177
- if false {
178
- __yield_tx. send( ( ) ) . await ;
179
- }
180
-
181
- #( #stmts) *
182
- } } ;
183
- } )
184
- . into ( )
162
+ let dummy_yield = if scrub. num_yield == 0 {
163
+ Some ( quote ! ( if false {
164
+ __yield_tx. send( ( ) ) . await ;
165
+ } ) )
185
166
} else {
186
- quote ! ( macro_rules! #macro_ident {
187
- ( ) => { {
188
- #( #stmts) *
189
- } } ;
167
+ None
168
+ } ;
169
+
170
+ quote ! ( {
171
+ let ( mut __yield_tx, __yield_rx) = :: async_stream:: yielder:: pair( ) ;
172
+ :: async_stream:: AsyncStream :: new( __yield_rx, async move {
173
+ #dummy_yield
174
+ #( #stmts) *
190
175
} )
191
- . into ( )
192
- }
176
+ } )
177
+ . into ( )
193
178
}
194
179
195
- #[ proc_macro_derive( AsyncTryStreamHack ) ]
196
- pub fn async_try_stream_impl ( input : TokenStream ) -> TokenStream {
197
- let AsyncStreamEnumHack {
198
- macro_ident,
199
- mut stmts,
200
- } = match AsyncStreamEnumHack :: parse ( input) {
180
+ /// Asynchronous fallible stream
181
+ ///
182
+ /// See [crate](index.html) documentation for more details.
183
+ ///
184
+ /// # Examples
185
+ ///
186
+ /// ```rust
187
+ /// use tokio::net::{TcpListener, TcpStream};
188
+ ///
189
+ /// use async_stream::try_stream;
190
+ /// use futures_core::stream::Stream;
191
+ ///
192
+ /// use std::io;
193
+ /// use std::net::SocketAddr;
194
+ ///
195
+ /// fn bind_and_accept(addr: SocketAddr)
196
+ /// -> impl Stream<Item = io::Result<TcpStream>>
197
+ /// {
198
+ /// try_stream! {
199
+ /// let mut listener = TcpListener::bind(addr).await?;
200
+ ///
201
+ /// loop {
202
+ /// let (stream, addr) = listener.accept().await?;
203
+ /// println!("received on {:?}", addr);
204
+ /// yield stream;
205
+ /// }
206
+ /// }
207
+ /// }
208
+ /// ```
209
+ #[ proc_macro]
210
+ pub fn try_stream ( input : TokenStream ) -> TokenStream {
211
+ let mut stmts = match parse_input ( input) {
201
212
Ok ( x) => x,
202
213
Err ( e) => return e. to_compile_error ( ) . into ( ) ,
203
214
} ;
@@ -213,45 +224,22 @@ pub fn async_try_stream_impl(input: TokenStream) -> TokenStream {
213
224
scrub. visit_stmt_mut ( & mut stmt) ;
214
225
}
215
226
216
- if scrub. num_yield == 0 {
217
- quote ! ( macro_rules! #macro_ident {
218
- ( ) => { {
219
- if false {
220
- __yield_tx. send( ( ) ) . await ;
221
- }
222
-
223
- #( #stmts) *
224
- } } ;
225
- } )
226
- . into ( )
227
+ let dummy_yield = if scrub. num_yield == 0 {
228
+ Some ( quote ! ( if false {
229
+ __yield_tx. send( ( ) ) . await ;
230
+ } ) )
227
231
} else {
228
- quote ! ( macro_rules! #macro_ident {
229
- ( ) => { {
230
- #( #stmts) *
231
- } } ;
232
- } )
233
- . into ( )
234
- }
235
- }
236
-
237
- fn count_bangs ( input : TokenStream ) -> usize {
238
- let mut count = 0 ;
239
-
240
- for token in input {
241
- match token {
242
- TokenTree :: Punct ( punct) => {
243
- if punct. as_char ( ) == '!' {
244
- count += 1 ;
245
- }
246
- }
247
- TokenTree :: Group ( group) => {
248
- count += count_bangs ( group. stream ( ) ) ;
249
- }
250
- _ => { }
251
- }
252
- }
232
+ None
233
+ } ;
253
234
254
- count
235
+ quote ! ( {
236
+ let ( mut __yield_tx, __yield_rx) = :: async_stream:: yielder:: pair( ) ;
237
+ :: async_stream:: AsyncStream :: new( __yield_rx, async move {
238
+ #dummy_yield
239
+ #( #stmts) *
240
+ } )
241
+ } )
242
+ . into ( )
255
243
}
256
244
257
245
fn replace_for_await ( input : TokenStream2 ) -> TokenStream2 {
@@ -260,17 +248,17 @@ fn replace_for_await(input: TokenStream2) -> TokenStream2 {
260
248
261
249
while let Some ( token) = input. next ( ) {
262
250
match token {
263
- TokenTree2 :: Ident ( ident) => {
251
+ TokenTree :: Ident ( ident) => {
264
252
match input. peek ( ) {
265
- Some ( TokenTree2 :: Ident ( next) ) if ident == "for" && next == "await" => {
253
+ Some ( TokenTree :: Ident ( next) ) if ident == "for" && next == "await" => {
266
254
tokens. extend ( quote ! ( #[ #next] ) ) ;
267
255
let _ = input. next ( ) ;
268
256
}
269
257
_ => { }
270
258
}
271
259
tokens. push ( ident. into ( ) ) ;
272
260
}
273
- TokenTree2 :: Group ( group) => {
261
+ TokenTree :: Group ( group) => {
274
262
let stream = replace_for_await ( group. stream ( ) ) ;
275
263
tokens. push ( Group :: new ( group. delimiter ( ) , stream) . into ( ) ) ;
276
264
}
0 commit comments