Skip to content

Commit 9872591

Browse files
committed
refactor(body): move channel based incoming body implementation to ChanBody type
1 parent f6e33a7 commit 9872591

File tree

2 files changed

+208
-183
lines changed

2 files changed

+208
-183
lines changed

src/body/incoming/channel.rs

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
use std::fmt;
2+
use std::future::Future;
3+
use std::pin::Pin;
4+
use std::task::{Context, Poll};
5+
6+
use bytes::Bytes;
7+
use futures_channel::{mpsc, oneshot};
8+
use futures_util::ready;
9+
use futures_util::{stream::FusedStream, Stream}; // for mpsc::Receiver
10+
use http::HeaderMap;
11+
use http_body::{Frame, SizeHint};
12+
13+
use crate::body::DecodedLength;
14+
use crate::common::watch;
15+
16+
type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
17+
type TrailersSender = oneshot::Sender<HeaderMap>;
18+
19+
const WANT_PENDING: usize = 1;
20+
const WANT_READY: usize = 2;
21+
22+
pub(super) struct ChanBody {
23+
content_length: DecodedLength,
24+
want_tx: watch::Sender,
25+
data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
26+
trailers_rx: oneshot::Receiver<HeaderMap>,
27+
}
28+
29+
impl ChanBody {
30+
pub(super) fn new(content_length: DecodedLength, wanter: bool) -> (Sender, Self) {
31+
let (data_tx, data_rx) = mpsc::channel(0);
32+
let (trailers_tx, trailers_rx) = oneshot::channel();
33+
34+
// If wanter is true, `Sender::poll_ready()` won't becoming ready
35+
// until the `Body` has been polled for data once.
36+
let want = if wanter { WANT_PENDING } else { WANT_READY };
37+
38+
let (want_tx, want_rx) = watch::channel(want);
39+
40+
let tx = Sender {
41+
want_rx,
42+
data_tx,
43+
trailers_tx: Some(trailers_tx),
44+
};
45+
let rx = Self {
46+
content_length,
47+
want_tx,
48+
data_rx,
49+
trailers_rx,
50+
};
51+
52+
(tx, rx)
53+
}
54+
55+
pub(super) fn poll_frame(
56+
&mut self,
57+
cx: &mut Context<'_>,
58+
) -> Poll<Option<Result<Frame<Bytes>, crate::Error>>> {
59+
let Self {
60+
content_length: ref mut len,
61+
ref mut data_rx,
62+
ref mut want_tx,
63+
ref mut trailers_rx,
64+
} = self;
65+
66+
want_tx.send(WANT_READY);
67+
68+
if !data_rx.is_terminated() {
69+
if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
70+
len.sub_if(chunk.len() as u64);
71+
return Poll::Ready(Some(Ok(Frame::data(chunk))));
72+
}
73+
}
74+
75+
// check trailers after data is terminated
76+
match ready!(Pin::new(trailers_rx).poll(cx)) {
77+
Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
78+
Err(_) => Poll::Ready(None),
79+
}
80+
}
81+
82+
pub(super) fn is_end_stream(&self) -> bool {
83+
self.content_length == DecodedLength::ZERO
84+
}
85+
86+
pub(super) fn size_hint(&self) -> SizeHint {
87+
super::opt_len(self.content_length)
88+
}
89+
}
90+
91+
/// A sender half created through [`Body::channel()`].
92+
///
93+
/// Useful when wanting to stream chunks from another thread.
94+
///
95+
/// ## Body Closing
96+
///
97+
/// Note that the request body will always be closed normally when the sender is dropped (meaning
98+
/// that the empty terminating chunk will be sent to the remote). If you desire to close the
99+
/// connection with an incomplete response (e.g. in the case of an error during asynchronous
100+
/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
101+
///
102+
/// [`Body::channel()`]: struct.Body.html#method.channel
103+
/// [`Sender::abort()`]: struct.Sender.html#method.abort
104+
#[must_use = "Sender does nothing unless sent on"]
105+
pub(crate) struct Sender {
106+
want_rx: watch::Receiver,
107+
data_tx: BodySender,
108+
trailers_tx: Option<TrailersSender>,
109+
}
110+
111+
impl Sender {
112+
/// Check to see if this `Sender` can send more data.
113+
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
114+
// Check if the receiver end has tried polling for the body yet
115+
ready!(self.poll_want(cx)?);
116+
self.data_tx
117+
.poll_ready(cx)
118+
.map_err(|_| crate::Error::new_closed())
119+
}
120+
121+
pub(crate) fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
122+
match self.want_rx.load(cx) {
123+
WANT_READY => Poll::Ready(Ok(())),
124+
WANT_PENDING => Poll::Pending,
125+
watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
126+
unexpected => unreachable!("want_rx value: {}", unexpected),
127+
}
128+
}
129+
130+
/// Send trailers on trailers channel.
131+
#[allow(unused)]
132+
pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
133+
let tx = match self.trailers_tx.take() {
134+
Some(tx) => tx,
135+
None => return Err(crate::Error::new_closed()),
136+
};
137+
tx.send(trailers).map_err(|_| crate::Error::new_closed())
138+
}
139+
140+
/// Try to send data on this channel.
141+
///
142+
/// # Errors
143+
///
144+
/// Returns `Err(Bytes)` if the channel could not (currently) accept
145+
/// another `Bytes`.
146+
///
147+
/// # Note
148+
///
149+
/// This is mostly useful for when trying to send from some other thread
150+
/// that doesn't have an async context. If in an async context, prefer
151+
/// `send_data()` instead.
152+
pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
153+
self.data_tx
154+
.try_send(Ok(chunk))
155+
.map_err(|err| err.into_inner().expect("just sent Ok"))
156+
}
157+
158+
pub(crate) fn send_error(&mut self, err: crate::Error) {
159+
let _ = self
160+
.data_tx
161+
// clone so the send works even if buffer is full
162+
.clone()
163+
.try_send(Err(err));
164+
}
165+
}
166+
167+
impl fmt::Debug for Sender {
168+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169+
#[derive(Debug)]
170+
struct Open;
171+
#[derive(Debug)]
172+
struct Closed;
173+
174+
let mut builder = f.debug_tuple("Sender");
175+
match self.want_rx.peek() {
176+
watch::CLOSED => builder.field(&Closed),
177+
_ => builder.field(&Open),
178+
};
179+
180+
builder.finish()
181+
}
182+
}

0 commit comments

Comments
 (0)