Skip to content

Undoing a Sink::poll_ready #2109

@jonhoo

Description

@jonhoo

Before we had Sink::poll_ready, senders always had to have an item ready to send before interacting with the Sink. This sometimes led to unnecessary buffering. The most trivial example of this is the current implementation of StreamExt::forward, which still adheres to the "have an item before you access the sink" model. It calls Stream::poll_next, and then calls Sink::poll_ready when it gets an element. Assuming poll_ready returns Poll::Ready, the element is sent, and all is good. The issue arises if Sink::poll_ready returns Poll::Pending. The implementation must now buffer that element somewhere, and re-try Sink::poll_ready on the next call to poll before it attempts to receive another element from the Stream (this is the buffered_item field on Forward). The upside of this approach is that when we poll_ready returns Poll::Ready, we almost immediately call .start_send and "consume" the slot that poll_ready told us was available.

The alternative approach that Sink::poll_ready enabled is one that does not require any buffering, and it can be written out pretty easily:

loop {
    ready!(self.sink.poll_ready());
    if let Some(item) = ready!(self.stream.poll_recv()) {
        self.sink.start_send(item);
    } else {
        break;
    }
}

Here, we never need to buffer, since we only ever accept another item if we know we can send it immediately (I use this technique in tokio-tower for example).

Unfortunately, this example highlights a problem with Sink as it stands: we may now take a long time (potentially forever) between when we get Poll::Ready from poll_ready and when we call start_send. It might be that the client is simply not sending us any more requests. But, the Sink has promised us that we have a slot available to send, which we are still holding up. In the context of something like a bounded channel, this may mean that we are holding up one of the few slots in the channel, even though we will never use it (see #1312 and #984 (comment) for related discussion). If we have multiple of these loops (e.g., if you are forwarding multiple streams to clones of the same bounded mpsc::Sender), then you can easily end up in a livelock: all the slots of the bounded channel are held up by forwards for streams that do not have any elements to send, and as a result forward for streams that do have elements to send are unable to forward them since poll_ready returns Poll::Pending for them.

This is something we've also run into in tower, whose Service trait features a similar poll_ready mechanism. What is needed here is some way to "undo" a Sink::poll_ready that returned Poll::Ready. In tower we've had two different proposals come up. The first (tower-rs/tower#408) is to add a disarm_ready method. The idea here is that implementations of poll that discover that they won't use the slot given to them by poll_ready can "disarm"/give up that slot by calling the disarm_ready method. The second (tower-rs/tower#412) is to have poll_ready return some kind of "token" that must then be passed to start_send (tower's equivalent is called call). This makes the API more misuse-resistant (you can't call start_send without calling poll_ready), and also gives us a way to give up a poll_ready slot: just drop the token.

Neither of these proposals are necessarily the right one, but I figured I would start discussion here since this is definitely an issue with Sink (and similar traits that include a poll_ready method), and one that must be fixed before the trait is considered finished.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-sinkArea: futures::sinkS-needs-api-designStatus: Before implementing this, a discussion or decision on the new API is needed.

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions