From 33a5feb54e81b4ff76fbecbf37d4d7830b092763 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Fri, 10 Jan 2025 00:00:00 +0000 Subject: [PATCH] feat(util): introduce `channel::Sender::try_send()` pr #140 introduced a new channel-backed body. this body provides a new equivalent to the defunct `hyper::Body::channel()` interface that was exposed in hyper 0.14, and removed in the 1.0 major release. the previous `Sender` type also included a useful method, `try_send_data()`, which allows a thread to _synchronously_ attempt to send data _outside_ of an asynchronous context. this commit introduces a loosely equivalent `Sender::try_send()` method to provide users of `http-body-util` with a means to send `Frame` values outside of an asynchronous context, without potentially scheduling the caller to be woken later or yielding should the channel be full. this function accepts a `Frame` rather than a chunk of data, to fit in with the shift towards frame-oriented interfaces in `http-body` and `http-body-util`. Signed-off-by: katelyn martin --- http-body-util/src/channel.rs | 66 +++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/http-body-util/src/channel.rs b/http-body-util/src/channel.rs index e963d19..f132d2a 100644 --- a/http-body-util/src/channel.rs +++ b/http-body-util/src/channel.rs @@ -94,6 +94,26 @@ impl Sender { self.send(Frame::trailers(trailers)).await } + /// Attempts to send a frame on this channel. + /// + /// This function returns the unsent frame back as an `Err(_)` if the channel could not + /// (currently) accept another frame. + /// + /// # Note + /// + /// This is mostly useful for when trying to send a frame from outside of an asynchronous + /// context. If in an async context, prefer [`Sender::send_data()`] instead. + pub fn try_send(&mut self, frame: Frame) -> Result<(), Frame> { + let Self { + tx_frame, + tx_error: _, + } = self; + + tx_frame + .try_send(frame) + .map_err(tokio::sync::mpsc::error::TrySendError::into_inner) + } + /// Aborts the body in an abnormal fashion. pub fn abort(self, error: E) { self.tx_error.send(error).ok(); @@ -193,6 +213,52 @@ mod tests { assert_eq!(collected.to_bytes(), "Hello!"); } + #[tokio::test] + async fn try_send_works() { + let (mut tx, mut body) = Channel::::new(2); + + // Send two messages, filling the channel's buffer. + tx.try_send(Frame::data(Bytes::from("one"))) + .expect("can send one message"); + tx.try_send(Frame::data(Bytes::from("two"))) + .expect("can send two messages"); + + // Sending a value to a full channel should return it back to us. + match tx.try_send(Frame::data(Bytes::from("three"))) { + Err(frame) => assert_eq!(frame.into_data().unwrap(), "three"), + Ok(()) => panic!("synchronously sending a value to a full channel should fail"), + }; + + // Read the messages out of the body. + assert_eq!( + body.frame() + .await + .expect("yields result") + .expect("yields frame") + .into_data() + .expect("yields data"), + "one" + ); + assert_eq!( + body.frame() + .await + .expect("yields result") + .expect("yields frame") + .into_data() + .expect("yields data"), + "two" + ); + + // Drop the body. + drop(body); + + // Sending a value to a closed channel should return it back to us. + match tx.try_send(Frame::data(Bytes::from("closed"))) { + Err(frame) => assert_eq!(frame.into_data().unwrap(), "closed"), + Ok(()) => panic!("synchronously sending a value to a closed channel should fail"), + }; + } + /// A stand-in for an error type, for unit tests. type Error = &'static str; /// An example error message.