Skip to content

Commit 440e74e

Browse files
authored
Add the SinkExt::feed combinator (#2155)
Like send, except no flushing is done at the end. This allows sequentially feeding the sink with items in async code without intermittent flushing. Reuse code of Feed internally in the Send future.
1 parent d1acd71 commit 440e74e

File tree

4 files changed

+75
-17
lines changed

4 files changed

+75
-17
lines changed

futures-util/src/sink/feed.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use core::pin::Pin;
2+
use futures_core::future::Future;
3+
use futures_core::ready;
4+
use futures_core::task::{Context, Poll};
5+
use futures_sink::Sink;
6+
7+
/// Future for the [`feed`](super::SinkExt::feed) method.
8+
#[derive(Debug)]
9+
#[must_use = "futures do nothing unless you `.await` or poll them"]
10+
pub struct Feed<'a, Si: ?Sized, Item> {
11+
sink: &'a mut Si,
12+
item: Option<Item>,
13+
}
14+
15+
// Pinning is never projected to children
16+
impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {}
17+
18+
impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
19+
pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
20+
Feed {
21+
sink,
22+
item: Some(item),
23+
}
24+
}
25+
26+
pub(super) fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
27+
Pin::new(self.sink)
28+
}
29+
30+
pub(super) fn is_item_pending(&self) -> bool {
31+
self.item.is_some()
32+
}
33+
}
34+
35+
impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Feed<'_, Si, Item> {
36+
type Output = Result<(), Si::Error>;
37+
38+
fn poll(
39+
self: Pin<&mut Self>,
40+
cx: &mut Context<'_>,
41+
) -> Poll<Self::Output> {
42+
let this = self.get_mut();
43+
let mut sink = Pin::new(&mut this.sink);
44+
ready!(sink.as_mut().poll_ready(cx))?;
45+
let item = this.item.take().expect("polled Feed after completion");
46+
sink.as_mut().start_send(item)?;
47+
Poll::Ready(Ok(()))
48+
}
49+
}

futures-util/src/sink/mod.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ pub use self::drain::{drain, Drain};
2626
mod fanout;
2727
pub use self::fanout::Fanout;
2828

29+
mod feed;
30+
pub use self::feed::Feed;
31+
2932
mod flush;
3033
pub use self::flush::Flush;
3134

@@ -212,15 +215,27 @@ pub trait SinkExt<Item>: Sink<Item> {
212215
/// into the sink, including flushing.
213216
///
214217
/// Note that, **because of the flushing requirement, it is usually better
215-
/// to batch together items to send via `send_all`, rather than flushing
216-
/// between each item.**
218+
/// to batch together items to send via `feed` or `send_all`,
219+
/// rather than flushing between each item.**
217220
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
218221
where
219222
Self: Unpin,
220223
{
221224
Send::new(self, item)
222225
}
223226

227+
/// A future that completes after the given item has been received
228+
/// by the sink.
229+
///
230+
/// Unlike `send`, the returned future does not flush the sink.
231+
/// It is the caller's responsibility to ensure all pending items
232+
/// are processed, which can be done via `flush` or `close`.
233+
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
234+
where Self: Unpin,
235+
{
236+
Feed::new(self, item)
237+
}
238+
224239
/// A future that completes after the given stream has been fully processed
225240
/// into the sink, including flushing.
226241
///

futures-util/src/sink/send.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use super::Feed;
12
use core::pin::Pin;
23
use futures_core::future::Future;
34
use futures_core::ready;
@@ -8,8 +9,7 @@ use futures_sink::Sink;
89
#[derive(Debug)]
910
#[must_use = "futures do nothing unless you `.await` or poll them"]
1011
pub struct Send<'a, Si: ?Sized, Item> {
11-
sink: &'a mut Si,
12-
item: Option<Item>,
12+
feed: Feed<'a, Si, Item>,
1313
}
1414

1515
// Pinning is never projected to children
@@ -18,8 +18,7 @@ impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {}
1818
impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> {
1919
pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
2020
Self {
21-
sink,
22-
item: Some(item),
21+
feed: Feed::new(sink, item),
2322
}
2423
}
2524
}
@@ -32,20 +31,15 @@ impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> {
3231
cx: &mut Context<'_>,
3332
) -> Poll<Self::Output> {
3433
let this = &mut *self;
35-
if let Some(item) = this.item.take() {
36-
let mut sink = Pin::new(&mut this.sink);
37-
match sink.as_mut().poll_ready(cx)? {
38-
Poll::Ready(()) => sink.as_mut().start_send(item)?,
39-
Poll::Pending => {
40-
this.item = Some(item);
41-
return Poll::Pending;
42-
}
43-
}
34+
35+
if this.feed.is_item_pending() {
36+
ready!(Pin::new(&mut this.feed).poll(cx))?;
37+
debug_assert!(!this.feed.is_item_pending());
4438
}
4539

4640
// we're done sending the item, but want to block on flushing the
4741
// sink
48-
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
42+
ready!(this.feed.sink_pin_mut().poll_flush(cx))?;
4943

5044
Poll::Ready(Ok(()))
5145
}

futures/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ pub mod sink {
417417
pub use futures_sink::Sink;
418418

419419
pub use futures_util::sink::{
420-
Close, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
420+
Close, Feed, Flush, Send, SendAll, SinkErrInto, SinkMapErr, With,
421421
SinkExt, Fanout, Drain, drain, Unfold, unfold,
422422
WithFlatMap,
423423
};

0 commit comments

Comments
 (0)