Skip to content

Commit 8af39a1

Browse files
authored
Revert "Let the ? operator work natively in try_stream!." (#55)
This reverts commit 623c556.
1 parent 623c556 commit 8af39a1

File tree

4 files changed

+29
-98
lines changed

4 files changed

+29
-98
lines changed

async-stream-impl/src/lib.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use syn::parse::Parser;
55
use syn::visit_mut::VisitMut;
66

77
struct Scrub<'a> {
8+
/// Whether the stream is a try stream.
9+
is_try: bool,
810
/// The unit expression, `()`.
911
unit: Box<syn::Expr>,
1012
has_yielded: bool,
@@ -22,8 +24,9 @@ fn parse_input(input: TokenStream) -> syn::Result<(TokenStream2, Vec<syn::Stmt>)
2224
}
2325

2426
impl<'a> Scrub<'a> {
25-
fn new(crate_path: &'a TokenStream2) -> Self {
27+
fn new(is_try: bool, crate_path: &'a TokenStream2) -> Self {
2628
Self {
29+
is_try,
2730
unit: syn::parse_quote!(()),
2831
has_yielded: false,
2932
crate_path,
@@ -41,7 +44,26 @@ impl VisitMut for Scrub<'_> {
4144

4245
// let ident = &self.yielder;
4346

44-
*i = syn::parse_quote! { __yield_tx.send(#value_expr).await };
47+
*i = if self.is_try {
48+
syn::parse_quote! { __yield_tx.send(::core::result::Result::Ok(#value_expr)).await }
49+
} else {
50+
syn::parse_quote! { __yield_tx.send(#value_expr).await }
51+
};
52+
}
53+
syn::Expr::Try(try_expr) => {
54+
syn::visit_mut::visit_expr_try_mut(self, try_expr);
55+
// let ident = &self.yielder;
56+
let e = &try_expr.expr;
57+
58+
*i = syn::parse_quote! {
59+
match #e {
60+
::core::result::Result::Ok(v) => v,
61+
::core::result::Result::Err(e) => {
62+
__yield_tx.send(::core::result::Result::Err(e.into())).await;
63+
return;
64+
}
65+
}
66+
};
4567
}
4668
syn::Expr::Closure(_) | syn::Expr::Async(_) => {
4769
// Don't transform inner closures or async blocks.
@@ -102,7 +124,7 @@ pub fn stream_inner(input: TokenStream) -> TokenStream {
102124
Err(e) => return e.to_compile_error().into(),
103125
};
104126

105-
let mut scrub = Scrub::new(&crate_path);
127+
let mut scrub = Scrub::new(false, &crate_path);
106128

107129
for mut stmt in &mut stmts {
108130
scrub.visit_stmt_mut(&mut stmt);
@@ -136,7 +158,7 @@ pub fn try_stream_inner(input: TokenStream) -> TokenStream {
136158
Err(e) => return e.to_compile_error().into(),
137159
};
138160

139-
let mut scrub = Scrub::new(&crate_path);
161+
let mut scrub = Scrub::new(true, &crate_path);
140162

141163
for mut stmt in &mut stmts {
142164
scrub.visit_stmt_mut(&mut stmt);
@@ -152,13 +174,9 @@ pub fn try_stream_inner(input: TokenStream) -> TokenStream {
152174

153175
quote!({
154176
let (mut __yield_tx, __yield_rx) = #crate_path::yielder::pair();
155-
#crate_path::AsyncTryStream::new(__yield_rx, async move {
177+
#crate_path::AsyncStream::new(__yield_rx, async move {
156178
#dummy_yield
157-
let () = {
158-
#(#stmts)*
159-
};
160-
#[allow(unreachable_code)]
161-
Ok(())
179+
#(#stmts)*
162180
})
163181
})
164182
.into()

async-stream/src/async_stream.rs

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -75,70 +75,3 @@ where
7575
}
7676
}
7777
}
78-
79-
#[doc(hidden)]
80-
#[derive(Debug)]
81-
pub struct AsyncTryStream<T, U> {
82-
rx: Receiver<T>,
83-
done: bool,
84-
generator: U,
85-
}
86-
87-
impl<T, U> AsyncTryStream<T, U> {
88-
#[doc(hidden)]
89-
pub fn new(rx: Receiver<T>, generator: U) -> AsyncTryStream<T, U> {
90-
AsyncTryStream {
91-
rx,
92-
done: false,
93-
generator,
94-
}
95-
}
96-
}
97-
98-
impl<T, U, E> FusedStream for AsyncTryStream<T, U>
99-
where
100-
U: Future<Output = Result<(), E>>,
101-
{
102-
fn is_terminated(&self) -> bool {
103-
self.done
104-
}
105-
}
106-
107-
impl<T, U, E> Stream for AsyncTryStream<T, U>
108-
where
109-
U: Future<Output = Result<(), E>>,
110-
{
111-
type Item = Result<T, E>;
112-
113-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
114-
unsafe {
115-
let me = Pin::get_unchecked_mut(self);
116-
117-
if me.done {
118-
return Poll::Ready(None);
119-
}
120-
121-
let mut dst = None;
122-
let res = {
123-
let _enter = me.rx.enter(&mut dst);
124-
Pin::new_unchecked(&mut me.generator).poll(cx)
125-
};
126-
127-
me.done = res.is_ready();
128-
129-
if let Poll::Ready(Err(e)) = res {
130-
return Poll::Ready(Some(Err(e)));
131-
}
132-
133-
if let Some(val) = dst.take() {
134-
return Poll::Ready(Some(Ok(val)));
135-
}
136-
137-
if me.done {
138-
Poll::Ready(None)
139-
} else {
140-
Poll::Pending
141-
}
142-
}
143-
}
144-
}

async-stream/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ pub mod yielder;
164164

165165
// Used by the macro, but not intended to be accessed publicly.
166166
#[doc(hidden)]
167-
pub use crate::async_stream::{AsyncStream, AsyncTryStream};
167+
pub use crate::async_stream::AsyncStream;
168168

169169
#[doc(hidden)]
170170
pub use async_stream_impl;

async-stream/tests/try_stream.rs

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use async_stream::try_stream;
22

33
use futures_core::stream::Stream;
4-
use futures_util::pin_mut;
54
use futures_util::stream::StreamExt;
65

76
#[tokio::test]
@@ -79,22 +78,3 @@ async fn multi_try() {
7978
values
8079
);
8180
}
82-
83-
macro_rules! try_macro {
84-
($e:expr) => {
85-
$e?
86-
};
87-
}
88-
89-
#[tokio::test]
90-
async fn try_in_macro() {
91-
let s = try_stream! {
92-
yield "hi";
93-
try_macro!(Err("bye"));
94-
};
95-
pin_mut!(s);
96-
97-
assert_eq!(s.next().await, Some(Ok("hi")));
98-
assert_eq!(s.next().await, Some(Err("bye")));
99-
assert_eq!(s.next().await, None);
100-
}

0 commit comments

Comments
 (0)