Skip to content

Commit 63b67ae

Browse files
authored
util: Add BodyStream. (#91)
1 parent c9f1337 commit 63b67ae

File tree

2 files changed

+98
-2
lines changed

2 files changed

+98
-2
lines changed

http-body-util/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub use self::either::Either;
2929
pub use self::empty::Empty;
3030
pub use self::full::Full;
3131
pub use self::limited::{LengthLimitError, Limited};
32-
pub use self::stream::StreamBody;
32+
pub use self::stream::{BodyStream, StreamBody};
3333

3434
/// An extension trait for [`http_body::Body`] adding various combinators and adapters
3535
pub trait BodyExt: http_body::Body {

http-body-util/src/stream.rs

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,57 @@ impl<S: Stream> Stream for StreamBody<S> {
5555
}
5656
}
5757

58+
pin_project! {
59+
/// A stream created from a [`Body`].
60+
#[derive(Clone, Copy, Debug)]
61+
pub struct BodyStream<B> {
62+
#[pin]
63+
body: B,
64+
}
65+
}
66+
67+
impl<B> BodyStream<B> {
68+
/// Create a new `BodyStream`.
69+
pub fn new(body: B) -> Self {
70+
Self { body }
71+
}
72+
}
73+
74+
impl<B> Body for BodyStream<B>
75+
where
76+
B: Body,
77+
{
78+
type Data = B::Data;
79+
type Error = B::Error;
80+
81+
fn poll_frame(
82+
self: Pin<&mut Self>,
83+
cx: &mut Context<'_>,
84+
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
85+
self.project().body.poll_frame(cx)
86+
}
87+
}
88+
89+
impl<B> Stream for BodyStream<B>
90+
where
91+
B: Body,
92+
{
93+
type Item = Result<Frame<B::Data>, B::Error>;
94+
95+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
96+
match self.project().body.poll_frame(cx) {
97+
Poll::Ready(Some(frame)) => Poll::Ready(Some(frame)),
98+
Poll::Ready(None) => Poll::Ready(None),
99+
Poll::Pending => Poll::Pending,
100+
}
101+
}
102+
}
103+
58104
#[cfg(test)]
59105
mod tests {
60-
use crate::{BodyExt, StreamBody};
106+
use crate::{BodyExt, BodyStream, StreamBody};
61107
use bytes::Bytes;
108+
use futures_util::StreamExt;
62109
use http_body::Frame;
63110
use std::convert::Infallible;
64111

@@ -105,4 +152,53 @@ mod tests {
105152

106153
assert!(body.frame().await.is_none());
107154
}
155+
156+
#[tokio::test]
157+
async fn stream_from_body() {
158+
let chunks: Vec<Result<_, Infallible>> = vec![
159+
Ok(Frame::data(Bytes::from(vec![1]))),
160+
Ok(Frame::data(Bytes::from(vec![2]))),
161+
Ok(Frame::data(Bytes::from(vec![3]))),
162+
];
163+
let stream = futures_util::stream::iter(chunks);
164+
let body = StreamBody::new(stream);
165+
166+
let mut stream = BodyStream::new(body);
167+
168+
assert_eq!(
169+
stream
170+
.next()
171+
.await
172+
.unwrap()
173+
.unwrap()
174+
.into_data()
175+
.unwrap()
176+
.as_ref(),
177+
[1]
178+
);
179+
assert_eq!(
180+
stream
181+
.next()
182+
.await
183+
.unwrap()
184+
.unwrap()
185+
.into_data()
186+
.unwrap()
187+
.as_ref(),
188+
[2]
189+
);
190+
assert_eq!(
191+
stream
192+
.next()
193+
.await
194+
.unwrap()
195+
.unwrap()
196+
.into_data()
197+
.unwrap()
198+
.as_ref(),
199+
[3]
200+
);
201+
202+
assert!(stream.next().await.is_none());
203+
}
108204
}

0 commit comments

Comments
 (0)