Skip to content

Commit b665e04

Browse files
authored
Add Sender::closed future (#102)
Co-authored-by: xmakro <makro@>
1 parent c250f28 commit b665e04

File tree

2 files changed

+105
-0
lines changed

2 files changed

+105
-0
lines changed

src/lib.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ struct Channel<T> {
7878
/// Stream operations while the channel is empty and not closed.
7979
stream_ops: Event,
8080

81+
/// Closed operations while the channel is not closed.
82+
closed_ops: Event,
83+
8184
/// The number of currently active `Sender`s.
8285
sender_count: AtomicUsize,
8386

@@ -97,6 +100,7 @@ impl<T> Channel<T> {
97100
// Notify all receive and stream operations.
98101
self.recv_ops.notify(usize::MAX);
99102
self.stream_ops.notify(usize::MAX);
103+
self.closed_ops.notify(usize::MAX);
100104

101105
true
102106
} else {
@@ -136,6 +140,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
136140
send_ops: Event::new(),
137141
recv_ops: Event::new(),
138142
stream_ops: Event::new(),
143+
closed_ops: Event::new(),
139144
sender_count: AtomicUsize::new(1),
140145
receiver_count: AtomicUsize::new(1),
141146
});
@@ -177,6 +182,7 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
177182
send_ops: Event::new(),
178183
recv_ops: Event::new(),
179184
stream_ops: Event::new(),
185+
closed_ops: Event::new(),
180186
sender_count: AtomicUsize::new(1),
181187
receiver_count: AtomicUsize::new(1),
182188
});
@@ -266,6 +272,29 @@ impl<T> Sender<T> {
266272
})
267273
}
268274

275+
/// Completes when all receivers have dropped.
276+
///
277+
/// This allows the producers to get notified when interest in the produced values is canceled and immediately stop doing work.
278+
///
279+
/// # Examples
280+
///
281+
/// ```
282+
/// # futures_lite::future::block_on(async {
283+
/// use async_channel::{unbounded, SendError};
284+
///
285+
/// let (s, r) = unbounded::<i32>();
286+
/// drop(r);
287+
/// s.closed().await;
288+
/// # });
289+
/// ```
290+
pub fn closed(&self) -> Closed<'_, T> {
291+
Closed::_new(ClosedInner {
292+
sender: self,
293+
listener: None,
294+
_pin: PhantomPinned,
295+
})
296+
}
297+
269298
/// Sends a message into this channel using the blocking strategy.
270299
///
271300
/// If the channel is full, this method will block until there is room.
@@ -1288,6 +1317,59 @@ impl<T> EventListenerFuture for RecvInner<'_, T> {
12881317
}
12891318
}
12901319

1320+
easy_wrapper! {
1321+
/// A future returned by [`Sender::closed()`].
1322+
#[derive(Debug)]
1323+
#[must_use = "futures do nothing unless you `.await` or poll them"]
1324+
pub struct Closed<'a, T>(ClosedInner<'a, T> => ());
1325+
#[cfg(all(feature = "std", not(target_family = "wasm")))]
1326+
pub(crate) wait();
1327+
}
1328+
1329+
pin_project! {
1330+
#[derive(Debug)]
1331+
#[project(!Unpin)]
1332+
struct ClosedInner<'a, T> {
1333+
// Reference to the sender.
1334+
sender: &'a Sender<T>,
1335+
1336+
// Listener waiting on the channel.
1337+
listener: Option<EventListener>,
1338+
1339+
// Keeping this type `!Unpin` enables future optimizations.
1340+
#[pin]
1341+
_pin: PhantomPinned
1342+
}
1343+
}
1344+
1345+
impl<'a, T> EventListenerFuture for ClosedInner<'a, T> {
1346+
type Output = ();
1347+
1348+
/// Run this future with the given `Strategy`.
1349+
fn poll_with_strategy<'x, S: Strategy<'x>>(
1350+
self: Pin<&mut Self>,
1351+
strategy: &mut S,
1352+
cx: &mut S::Context,
1353+
) -> Poll<()> {
1354+
let this = self.project();
1355+
1356+
loop {
1357+
// Check if the channel is closed.
1358+
if this.sender.is_closed() {
1359+
return Poll::Ready(());
1360+
}
1361+
1362+
// Not closed - now start listening for notifications or wait for one.
1363+
if this.listener.is_some() {
1364+
// Poll using the given strategy
1365+
ready!(S::poll(strategy, &mut *this.listener, cx));
1366+
} else {
1367+
*this.listener = Some(this.sender.channel.closed_ops.listen());
1368+
}
1369+
}
1370+
}
1371+
}
1372+
12911373
#[cfg(feature = "std")]
12921374
use std::process::abort;
12931375

tests/bounded.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,29 @@ fn send() {
184184
.run();
185185
}
186186

187+
#[cfg(not(target_family = "wasm"))]
188+
#[test]
189+
fn closed() {
190+
let (s, r) = bounded(1);
191+
192+
Parallel::new()
193+
.add(|| {
194+
future::block_on(s.send(7)).unwrap();
195+
let before = s.closed();
196+
let mut before = std::pin::pin!(before);
197+
assert!(future::block_on(future::poll_once(&mut before)).is_none());
198+
sleep(ms(1000));
199+
assert_eq!(future::block_on(future::poll_once(before)), Some(()));
200+
assert_eq!(future::block_on(future::poll_once(s.closed())), Some(()));
201+
})
202+
.add(|| {
203+
assert_eq!(future::block_on(r.recv()), Ok(7));
204+
sleep(ms(500));
205+
drop(r);
206+
})
207+
.run();
208+
}
209+
187210
#[cfg(not(target_family = "wasm"))]
188211
#[test]
189212
fn force_send() {

0 commit comments

Comments
 (0)