Skip to content

Commit fd694b8

Browse files
tmiaskocramertj
authored andcommitted
With: Ensure inner sink is ready before sending item
Previously the `With` implementation never consulted the inner sink `poll_ready` before initiating a send operation. After changes the `With` combinator is considered ready when processing of previous item has been completed and the inner sink is ready. Additionally remove buffered state since it isn't actually used. Issue: #1834
1 parent 5d49d43 commit fd694b8

File tree

1 file changed

+17
-45
lines changed

1 file changed

+17
-45
lines changed

futures-util/src/sink/with.rs

Lines changed: 17 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use core::fmt;
22
use core::marker::PhantomData;
3-
use core::mem;
43
use core::pin::Pin;
54
use futures_core::future::Future;
65
use futures_core::stream::Stream;
@@ -13,8 +12,8 @@ use pin_utils::{unsafe_pinned, unsafe_unpinned};
1312
pub struct With<Si, Item, U, Fut, F> {
1413
sink: Si,
1514
f: F,
16-
state: State<Fut, Item>,
17-
_phantom: PhantomData<fn(U)>,
15+
state: Option<Fut>,
16+
_phantom: PhantomData<fn(U) -> Item>,
1817
}
1918

2019
impl<Si, Item, U, Fut, F> Unpin for With<Si, Item, U, Fut, F>
@@ -27,7 +26,6 @@ impl<Si, Item, U, Fut, F> fmt::Debug for With<Si, Item, U, Fut, F>
2726
where
2827
Si: fmt::Debug,
2928
Fut: fmt::Debug,
30-
Item: fmt::Debug,
3129
{
3230
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3331
f.debug_struct("With")
@@ -44,45 +42,22 @@ where Si: Sink<Item>,
4442
{
4543
unsafe_pinned!(sink: Si);
4644
unsafe_unpinned!(f: F);
47-
unsafe_pinned!(state: State<Fut, Item>);
45+
unsafe_pinned!(state: Option<Fut>);
4846

4947
pub(super) fn new<E>(sink: Si, f: F) -> Self
5048
where
5149
Fut: Future<Output = Result<Item, E>>,
5250
E: From<Si::Error>,
5351
{
5452
With {
55-
state: State::Empty,
53+
state: None,
5654
sink,
5755
f,
5856
_phantom: PhantomData,
5957
}
6058
}
6159
}
6260

63-
#[derive(Debug)]
64-
enum State<Fut, T> {
65-
Empty,
66-
Process(Fut),
67-
Buffered(T),
68-
}
69-
70-
impl<Fut, T> State<Fut, T> {
71-
#[allow(clippy::wrong_self_convention)]
72-
fn as_pin_mut(self: Pin<&mut Self>) -> State<Pin<&mut Fut>, Pin<&mut T>> {
73-
unsafe {
74-
match self.get_unchecked_mut() {
75-
State::Empty =>
76-
State::Empty,
77-
State::Process(fut) =>
78-
State::Process(Pin::new_unchecked(fut)),
79-
State::Buffered(item) =>
80-
State::Buffered(Pin::new_unchecked(item)),
81-
}
82-
}
83-
}
84-
}
85-
8661
// Forwarding impl of Stream from the underlying sink
8762
impl<S, Item, U, Fut, F> Stream for With<S, Item, U, Fut, F>
8863
where S: Stream + Sink<Item>,
@@ -132,23 +107,18 @@ impl<Si, Item, U, Fut, F, E> With<Si, Item, U, Fut, F>
132107
self.sink
133108
}
134109

110+
/// Completes the processing of previous item if any.
135111
fn poll(
136112
mut self: Pin<&mut Self>,
137113
cx: &mut Context<'_>,
138114
) -> Poll<Result<(), E>> {
139-
let buffered = match self.as_mut().state().as_pin_mut() {
140-
State::Empty => return Poll::Ready(Ok(())),
141-
State::Process(fut) => Some(ready!(fut.poll(cx))?),
142-
State::Buffered(_) => None,
115+
let item = match self.as_mut().state().as_pin_mut() {
116+
None => return Poll::Ready(Ok(())),
117+
Some(fut) => ready!(fut.poll(cx))?,
143118
};
144-
if let Some(buffered) = buffered {
145-
self.as_mut().state().set(State::Buffered(buffered));
146-
}
147-
if let State::Buffered(item) = unsafe { mem::replace(self.as_mut().state().get_unchecked_mut(), State::Empty) } {
148-
Poll::Ready(self.as_mut().sink().start_send(item).map_err(Into::into))
149-
} else {
150-
unreachable!()
151-
}
119+
self.as_mut().state().set(None);
120+
self.as_mut().sink().start_send(item)?;
121+
Poll::Ready(Ok(()))
152122
}
153123
}
154124

@@ -161,18 +131,20 @@ impl<Si, Item, U, Fut, F, E> Sink<U> for With<Si, Item, U, Fut, F>
161131
type Error = E;
162132

163133
fn poll_ready(
164-
self: Pin<&mut Self>,
134+
mut self: Pin<&mut Self>,
165135
cx: &mut Context<'_>,
166136
) -> Poll<Result<(), Self::Error>> {
167-
self.poll(cx)
137+
ready!(self.as_mut().poll(cx))?;
138+
ready!(self.as_mut().sink().poll_ready(cx)?);
139+
Poll::Ready(Ok(()))
168140
}
169141

170142
fn start_send(
171143
mut self: Pin<&mut Self>,
172144
item: U,
173145
) -> Result<(), Self::Error> {
174-
let item = (self.as_mut().f())(item);
175-
self.as_mut().state().set(State::Process(item));
146+
let future = (self.as_mut().f())(item);
147+
self.as_mut().state().set(Some(future));
176148
Ok(())
177149
}
178150

0 commit comments

Comments
 (0)