Skip to content

Commit d11fd00

Browse files
khollbachtaiki-e
authored andcommitted
Remove TryStreamExt::into_async_read Unpin bound (#2599)
1 parent 628881f commit d11fd00

File tree

2 files changed

+57
-61
lines changed

2 files changed

+57
-61
lines changed

futures-util/src/stream/try_stream/into_async_read.rs

Lines changed: 51 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,26 @@
1-
use crate::stream::TryStreamExt;
21
use core::pin::Pin;
32
use futures_core::ready;
43
use futures_core::stream::TryStream;
54
use futures_core::task::{Context, Poll};
65
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
6+
use pin_project_lite::pin_project;
77
use std::cmp;
88
use std::io::{Error, Result};
99

10-
/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
11-
#[derive(Debug)]
12-
#[must_use = "readers do nothing unless polled"]
13-
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
14-
pub struct IntoAsyncRead<St>
15-
where
16-
St: TryStream<Error = Error> + Unpin,
17-
St::Ok: AsRef<[u8]>,
18-
{
19-
stream: St,
20-
state: ReadState<St::Ok>,
21-
}
22-
23-
impl<St> Unpin for IntoAsyncRead<St>
24-
where
25-
St: TryStream<Error = Error> + Unpin,
26-
St::Ok: AsRef<[u8]>,
27-
{
10+
pin_project! {
11+
/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
12+
#[derive(Debug)]
13+
#[must_use = "readers do nothing unless polled"]
14+
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
15+
pub struct IntoAsyncRead<St>
16+
where
17+
St: TryStream<Error = Error>,
18+
St::Ok: AsRef<[u8]>,
19+
{
20+
#[pin]
21+
stream: St,
22+
state: ReadState<St::Ok>,
23+
}
2824
}
2925

3026
#[derive(Debug)]
@@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> {
3632

3733
impl<St> IntoAsyncRead<St>
3834
where
39-
St: TryStream<Error = Error> + Unpin,
35+
St: TryStream<Error = Error>,
4036
St::Ok: AsRef<[u8]>,
4137
{
4238
pub(super) fn new(stream: St) -> Self {
@@ -46,16 +42,18 @@ where
4642

4743
impl<St> AsyncRead for IntoAsyncRead<St>
4844
where
49-
St: TryStream<Error = Error> + Unpin,
45+
St: TryStream<Error = Error>,
5046
St::Ok: AsRef<[u8]>,
5147
{
5248
fn poll_read(
53-
mut self: Pin<&mut Self>,
49+
self: Pin<&mut Self>,
5450
cx: &mut Context<'_>,
5551
buf: &mut [u8],
5652
) -> Poll<Result<usize>> {
53+
let mut this = self.project();
54+
5755
loop {
58-
match &mut self.state {
56+
match this.state {
5957
ReadState::Ready { chunk, chunk_start } => {
6058
let chunk = chunk.as_ref();
6159
let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
@@ -64,23 +62,23 @@ where
6462
*chunk_start += len;
6563

6664
if chunk.len() == *chunk_start {
67-
self.state = ReadState::PendingChunk;
65+
*this.state = ReadState::PendingChunk;
6866
}
6967

7068
return Poll::Ready(Ok(len));
7169
}
72-
ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) {
70+
ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
7371
Some(Ok(chunk)) => {
7472
if !chunk.as_ref().is_empty() {
75-
self.state = ReadState::Ready { chunk, chunk_start: 0 };
73+
*this.state = ReadState::Ready { chunk, chunk_start: 0 };
7674
}
7775
}
7876
Some(Err(err)) => {
79-
self.state = ReadState::Eof;
77+
*this.state = ReadState::Eof;
8078
return Poll::Ready(Err(err));
8179
}
8280
None => {
83-
self.state = ReadState::Eof;
81+
*this.state = ReadState::Eof;
8482
return Poll::Ready(Ok(0));
8583
}
8684
},
@@ -94,51 +92,52 @@ where
9492

9593
impl<St> AsyncWrite for IntoAsyncRead<St>
9694
where
97-
St: TryStream<Error = Error> + AsyncWrite + Unpin,
95+
St: TryStream<Error = Error> + AsyncWrite,
9896
St::Ok: AsRef<[u8]>,
9997
{
100-
fn poll_write(
101-
mut self: Pin<&mut Self>,
102-
cx: &mut Context<'_>,
103-
buf: &[u8],
104-
) -> Poll<Result<usize>> {
105-
Pin::new(&mut self.stream).poll_write(cx, buf)
98+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
99+
let this = self.project();
100+
this.stream.poll_write(cx, buf)
106101
}
107102

108-
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
109-
Pin::new(&mut self.stream).poll_flush(cx)
103+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
104+
let this = self.project();
105+
this.stream.poll_flush(cx)
110106
}
111107

112-
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
113-
Pin::new(&mut self.stream).poll_close(cx)
108+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
109+
let this = self.project();
110+
this.stream.poll_close(cx)
114111
}
115112
}
116113

117114
impl<St> AsyncBufRead for IntoAsyncRead<St>
118115
where
119-
St: TryStream<Error = Error> + Unpin,
116+
St: TryStream<Error = Error>,
120117
St::Ok: AsRef<[u8]>,
121118
{
122-
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
123-
while let ReadState::PendingChunk = self.state {
124-
match ready!(self.stream.try_poll_next_unpin(cx)) {
119+
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
120+
let mut this = self.project();
121+
122+
while let ReadState::PendingChunk = this.state {
123+
match ready!(this.stream.as_mut().try_poll_next(cx)) {
125124
Some(Ok(chunk)) => {
126125
if !chunk.as_ref().is_empty() {
127-
self.state = ReadState::Ready { chunk, chunk_start: 0 };
126+
*this.state = ReadState::Ready { chunk, chunk_start: 0 };
128127
}
129128
}
130129
Some(Err(err)) => {
131-
self.state = ReadState::Eof;
130+
*this.state = ReadState::Eof;
132131
return Poll::Ready(Err(err));
133132
}
134133
None => {
135-
self.state = ReadState::Eof;
134+
*this.state = ReadState::Eof;
136135
return Poll::Ready(Ok(&[]));
137136
}
138137
}
139138
}
140139

141-
if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
140+
if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
142141
let chunk = chunk.as_ref();
143142
return Poll::Ready(Ok(&chunk[chunk_start..]));
144143
}
@@ -147,16 +146,18 @@ where
147146
Poll::Ready(Ok(&[]))
148147
}
149148

150-
fn consume(mut self: Pin<&mut Self>, amount: usize) {
149+
fn consume(self: Pin<&mut Self>, amount: usize) {
150+
let this = self.project();
151+
151152
// https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
152153
if amount == 0 {
153154
return;
154155
}
155-
if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
156+
if let ReadState::Ready { chunk, chunk_start } = this.state {
156157
*chunk_start += amount;
157158
debug_assert!(*chunk_start <= chunk.as_ref().len());
158159
if *chunk_start >= chunk.as_ref().len() {
159-
self.state = ReadState::PendingChunk;
160+
*this.state = ReadState::PendingChunk;
160161
}
161162
} else {
162163
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");

futures-util/src/stream/try_stream/mod.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,12 +1031,7 @@ pub trait TryStreamExt: TryStream {
10311031
Compat::new(self)
10321032
}
10331033

1034-
/// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead).
1035-
///
1036-
/// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be
1037-
/// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll
1038-
/// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`]
1039-
/// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate.
1034+
/// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
10401035
///
10411036
/// This method is only available when the `std` feature of this
10421037
/// library is activated, and it is activated by default.
@@ -1048,20 +1043,20 @@ pub trait TryStreamExt: TryStream {
10481043
/// use futures::stream::{self, TryStreamExt};
10491044
/// use futures::io::AsyncReadExt;
10501045
///
1051-
/// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]);
1046+
/// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
10521047
/// let mut reader = stream.into_async_read();
1053-
/// let mut buf = Vec::new();
10541048
///
1055-
/// assert!(reader.read_to_end(&mut buf).await.is_ok());
1056-
/// assert_eq!(buf, &[1, 2, 3, 4, 5]);
1049+
/// let mut buf = Vec::new();
1050+
/// reader.read_to_end(&mut buf).await.unwrap();
1051+
/// assert_eq!(buf, [1, 2, 3, 4, 5]);
10571052
/// # })
10581053
/// ```
10591054
#[cfg(feature = "io")]
10601055
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
10611056
#[cfg(feature = "std")]
10621057
fn into_async_read(self) -> IntoAsyncRead<Self>
10631058
where
1064-
Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
1059+
Self: Sized + TryStreamExt<Error = std::io::Error>,
10651060
Self::Ok: AsRef<[u8]>,
10661061
{
10671062
crate::io::assert_read(IntoAsyncRead::new(self))

0 commit comments

Comments
 (0)