-
Notifications
You must be signed in to change notification settings - Fork 52
Add Sender::closed future #102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@notgull would it be possible to merge this? This feature is very useful in producer/consumer patterns for shutting down the producer without having to attempt a send. For example, when forwarding a stream to an async_channel, we want to shut down as soon as the async_channel was closed. Without this PR, we can only shut down after receiving from the stream and attempting to send to the async_channel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR and sorry for the late review.
src/lib.rs
Outdated
// Check if the channel is closed. | ||
if !this.sender.is_closed() { | ||
// Channel is not closed yet - now start listening for notifications. | ||
*this.listener = Some(this.sender.channel.closed_ops.listen()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have two questions on these lines:
- Why does the previous listener have to be discarded each time and a new one created?
- Wouldn't this order (check -> listen) cause a race condition? (ref)
Maybe the correct way here is to create the listener at Sender::closed and call only S::poll in this branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the thorough review and pointing out the race. I first tried following your suggestion and creating the listener in closed()
, but if closed()
is called after the last sender dropped, then the listener will not be triggered in this case, and the test fails in this line. Instead, I followed the code pattern in recv with the loop and optional listener creation. It avoids the race and only creates the listener once.
Co-authored-by: Taiki Endo <te316e89@gmail.com>
Co-authored-by: Taiki Endo <te316e89@gmail.com>
Published in 2.5.0. |
This adds
Sender::closed()
similar totokio::sync::mpsc::Sender::closed()
(link).