-
Notifications
You must be signed in to change notification settings - Fork 656
Description
Before we had Sink::poll_ready
, senders always had to have an item ready to send before interacting with the Sink
. This sometimes led to unnecessary buffering. The most trivial example of this is the current implementation of StreamExt::forward
, which still adheres to the "have an item before you access the sink" model. It calls Stream::poll_next
, and then calls Sink::poll_ready
when it gets an element. Assuming poll_ready
returns Poll::Ready
, the element is sent, and all is good. The issue arises if Sink::poll_ready
returns Poll::Pending
. The implementation must now buffer that element somewhere, and re-try Sink::poll_ready
on the next call to poll
before it attempts to receive another element from the Stream
(this is the buffered_item
field on Forward
). The upside of this approach is that when we poll_ready
returns Poll::Ready
, we almost immediately call .start_send
and "consume" the slot that poll_ready
told us was available.
The alternative approach that Sink::poll_ready
enabled is one that does not require any buffering, and it can be written out pretty easily:
loop {
ready!(self.sink.poll_ready());
if let Some(item) = ready!(self.stream.poll_recv()) {
self.sink.start_send(item);
} else {
break;
}
}
Here, we never need to buffer, since we only ever accept another item if we know we can send it immediately (I use this technique in tokio-tower
for example).
Unfortunately, this example highlights a problem with Sink
as it stands: we may now take a long time (potentially forever) between when we get Poll::Ready
from poll_ready
and when we call start_send
. It might be that the client is simply not sending us any more requests. But, the Sink
has promised us that we have a slot available to send, which we are still holding up. In the context of something like a bounded channel, this may mean that we are holding up one of the few slots in the channel, even though we will never use it (see #1312 and #984 (comment) for related discussion). If we have multiple of these loops (e.g., if you are forwarding multiple streams to clones of the same bounded mpsc::Sender
), then you can easily end up in a livelock: all the slots of the bounded channel are held up by forwards for streams that do not have any elements to send, and as a result forward for streams that do have elements to send are unable to forward them since poll_ready
returns Poll::Pending
for them.
This is something we've also run into in tower
, whose Service
trait features a similar poll_ready
mechanism. What is needed here is some way to "undo" a Sink::poll_ready
that returned Poll::Ready
. In tower
we've had two different proposals come up. The first (tower-rs/tower#408) is to add a disarm_ready
method. The idea here is that implementations of poll
that discover that they won't use the slot given to them by poll_ready
can "disarm"/give up that slot by calling the disarm_ready
method. The second (tower-rs/tower#412) is to have poll_ready
return some kind of "token" that must then be passed to start_send
(tower
's equivalent is called call
). This makes the API more misuse-resistant (you can't call start_send
without calling poll_ready
), and also gives us a way to give up a poll_ready
slot: just drop the token.
Neither of these proposals are necessarily the right one, but I figured I would start discussion here since this is definitely an issue with Sink
(and similar traits that include a poll_ready
method), and one that must be fixed before the trait is considered finished.