Skip to content

Commit 46560aa

Browse files
0-o-0cramertj
authored andcommitted
fix mpsc::Sender::poll_flush impl
1 parent 6408473 commit 46560aa

File tree

3 files changed

+105
-35
lines changed

3 files changed

+105
-35
lines changed

futures-channel/benches/sync_mpsc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl Stream for TestSender {
9393
ready!(tx.as_mut().poll_ready(cx)).unwrap();
9494
tx.as_mut().start_send(this.last + 1).unwrap();
9595
this.last += 1;
96-
assert_eq!(Poll::Ready(Ok(())), tx.as_mut().poll_flush(cx));
96+
assert_eq!(Poll::Pending, tx.as_mut().poll_flush(cx));
9797
Poll::Ready(Some(this.last))
9898
}
9999
}

futures-channel/tests/mpsc.rs

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
use futures::channel::{mpsc, oneshot};
44
use futures::executor::{block_on, block_on_stream};
5-
use futures::future::{join, poll_fn};
5+
use futures::future::{FutureExt, poll_fn};
66
use futures::stream::{Stream, StreamExt};
77
use futures::sink::{Sink, SinkExt};
8-
use futures::task::Poll;
9-
use futures_test::task::noop_context;
8+
use futures::task::{Context, Poll};
9+
use futures_test::task::{new_count_waker, noop_context};
1010
use pin_utils::pin_mut;
1111
use std::sync::{Arc, Mutex};
1212
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -92,28 +92,16 @@ fn send_recv_threads() {
9292
#[test]
9393
fn send_recv_threads_no_capacity() {
9494
let (mut tx, rx) = mpsc::channel::<i32>(0);
95-
let mut rx = block_on_stream(rx);
9695

97-
let (readytx, readyrx) = mpsc::channel::<()>(2);
98-
let mut readyrx = block_on_stream(readyrx);
9996
let t = thread::spawn(move || {
100-
let mut readytx = readytx.sink_map_err(|_| panic!());
101-
let (send_res_1, send_res_2) = block_on(join(tx.send(1), readytx.send(())));
102-
send_res_1.unwrap();
103-
send_res_2.unwrap();
104-
block_on(join(tx.send(2), readytx.send(())))
97+
block_on(tx.send(1)).unwrap();
98+
block_on(tx.send(2)).unwrap();
10599
});
106100

107-
readyrx.next();
108-
assert_eq!(rx.next(), Some(1));
109-
readyrx.next();
110-
drop(readyrx);
111-
assert_eq!(rx.next(), Some(2));
112-
drop(rx);
101+
let v: Vec<_> = block_on(rx.collect());
102+
assert_eq!(v, vec![1, 2]);
113103

114-
let (x, y) = t.join().unwrap();
115-
assert!(x.is_ok());
116-
assert!(y.is_ok());
104+
t.join().unwrap();
117105
}
118106

119107
#[test]
@@ -542,3 +530,46 @@ fn same_receiver() {
542530
assert!(!txa1.same_receiver(&txa2));
543531
assert!(txb1.same_receiver(&txb2));
544532
}
533+
534+
#[test]
535+
fn send_backpressure() {
536+
let (waker, counter) = new_count_waker();
537+
let mut cx = Context::from_waker(&waker);
538+
539+
let (mut tx, mut rx) = mpsc::channel(1);
540+
block_on(tx.send(1)).unwrap();
541+
542+
let mut task = tx.send(2);
543+
assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
544+
assert_eq!(counter, 0);
545+
546+
let item = block_on(rx.next()).unwrap();
547+
assert_eq!(item, 1);
548+
assert_eq!(counter, 1);
549+
assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
550+
551+
let item = block_on(rx.next()).unwrap();
552+
assert_eq!(item, 2);
553+
}
554+
555+
#[test]
556+
fn send_backpressure_multi_senders() {
557+
let (waker, counter) = new_count_waker();
558+
let mut cx = Context::from_waker(&waker);
559+
560+
let (mut tx1, mut rx) = mpsc::channel(1);
561+
let mut tx2 = tx1.clone();
562+
block_on(tx1.send(1)).unwrap();
563+
564+
let mut task = tx2.send(2);
565+
assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
566+
assert_eq!(counter, 0);
567+
568+
let item = block_on(rx.next()).unwrap();
569+
assert_eq!(item, 1);
570+
assert_eq!(counter, 1);
571+
assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
572+
573+
let item = block_on(rx.next()).unwrap();
574+
assert_eq!(item, 2);
575+
}

futures-sink/src/channel_impls.rs

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,42 @@
1-
use crate::{Sink, Poll};
1+
use crate::{Poll, Sink};
2+
use futures_channel::mpsc::{SendError, Sender, TrySendError, UnboundedSender};
23
use futures_core::task::Context;
3-
use futures_channel::mpsc::{Sender, SendError, TrySendError, UnboundedSender};
44
use std::pin::Pin;
55

66
impl<T> Sink<T> for Sender<T> {
77
type SinkError = SendError;
88

9-
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
9+
fn poll_ready(
10+
mut self: Pin<&mut Self>,
11+
cx: &mut Context<'_>,
12+
) -> Poll<Result<(), Self::SinkError>> {
1013
(*self).poll_ready(cx)
1114
}
1215

13-
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::SinkError> {
16+
fn start_send(
17+
mut self: Pin<&mut Self>,
18+
msg: T,
19+
) -> Result<(), Self::SinkError> {
1420
(*self).start_send(msg)
1521
}
1622

17-
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
18-
Poll::Ready(Ok(()))
23+
fn poll_flush(
24+
mut self: Pin<&mut Self>,
25+
cx: &mut Context<'_>,
26+
) -> Poll<Result<(), Self::SinkError>> {
27+
match (*self).poll_ready(cx) {
28+
Poll::Ready(Err(ref e)) if e.is_disconnected() => {
29+
// If the receiver disconnected, we consider the sink to be flushed.
30+
Poll::Ready(Ok(()))
31+
}
32+
x => x,
33+
}
1934
}
2035

21-
fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
36+
fn poll_close(
37+
mut self: Pin<&mut Self>,
38+
_: &mut Context<'_>,
39+
) -> Poll<Result<(), Self::SinkError>> {
2240
self.disconnect();
2341
Poll::Ready(Ok(()))
2442
}
@@ -27,19 +45,31 @@ impl<T> Sink<T> for Sender<T> {
2745
impl<T> Sink<T> for UnboundedSender<T> {
2846
type SinkError = SendError;
2947

30-
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
48+
fn poll_ready(
49+
self: Pin<&mut Self>,
50+
cx: &mut Context<'_>,
51+
) -> Poll<Result<(), Self::SinkError>> {
3152
UnboundedSender::poll_ready(&*self, cx)
3253
}
3354

34-
fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::SinkError> {
55+
fn start_send(
56+
mut self: Pin<&mut Self>,
57+
msg: T,
58+
) -> Result<(), Self::SinkError> {
3559
UnboundedSender::start_send(&mut *self, msg)
3660
}
3761

38-
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
62+
fn poll_flush(
63+
self: Pin<&mut Self>,
64+
_: &mut Context<'_>,
65+
) -> Poll<Result<(), Self::SinkError>> {
3966
Poll::Ready(Ok(()))
4067
}
4168

42-
fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
69+
fn poll_close(
70+
mut self: Pin<&mut Self>,
71+
_: &mut Context<'_>,
72+
) -> Poll<Result<(), Self::SinkError>> {
4373
self.disconnect();
4474
Poll::Ready(Ok(()))
4575
}
@@ -48,7 +78,10 @@ impl<T> Sink<T> for UnboundedSender<T> {
4878
impl<T> Sink<T> for &UnboundedSender<T> {
4979
type SinkError = SendError;
5080

51-
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
81+
fn poll_ready(
82+
self: Pin<&mut Self>,
83+
cx: &mut Context<'_>,
84+
) -> Poll<Result<(), Self::SinkError>> {
5285
UnboundedSender::poll_ready(*self, cx)
5386
}
5487

@@ -57,11 +90,17 @@ impl<T> Sink<T> for &UnboundedSender<T> {
5790
.map_err(TrySendError::into_send_error)
5891
}
5992

60-
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
93+
fn poll_flush(
94+
self: Pin<&mut Self>,
95+
_: &mut Context<'_>,
96+
) -> Poll<Result<(), Self::SinkError>> {
6197
Poll::Ready(Ok(()))
6298
}
6399

64-
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::SinkError>> {
100+
fn poll_close(
101+
self: Pin<&mut Self>,
102+
_: &mut Context<'_>,
103+
) -> Poll<Result<(), Self::SinkError>> {
65104
self.close_channel();
66105
Poll::Ready(Ok(()))
67106
}

0 commit comments

Comments
 (0)