Skip to content

Commit 0c4d6a3

Browse files
authored
Add FuturesUnordered::clear (#2415)
1 parent 0273188 commit 0c4d6a3

File tree

3 files changed

+69
-20
lines changed

3 files changed

+69
-20
lines changed

futures-util/src/stream/futures_unordered/mod.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ impl<Fut> FuturesUnordered<Fut> {
236236
(task, len)
237237
}
238238

239-
/// Releases the task. It destorys the future inside and either drops
239+
/// Releases the task. It destroys the future inside and either drops
240240
/// the `Arc<Task>` or transfers ownership to the ready to run queue.
241241
/// The task this method is called on must have been unlinked before.
242242
fn release_task(&mut self, task: Arc<Task<Fut>>) {
@@ -553,19 +553,33 @@ impl<Fut> Debug for FuturesUnordered<Fut> {
553553
}
554554
}
555555

556+
impl<Fut> FuturesUnordered<Fut> {
557+
/// Clears the set, removing all futures.
558+
pub fn clear(&mut self) {
559+
self.clear_head_all();
560+
561+
// we just cleared all the tasks, and we have &mut self, so this is safe.
562+
unsafe { self.ready_to_run_queue.clear() };
563+
564+
self.is_terminated.store(false, Relaxed);
565+
}
566+
567+
fn clear_head_all(&mut self) {
568+
while !self.head_all.get_mut().is_null() {
569+
let head = *self.head_all.get_mut();
570+
let task = unsafe { self.unlink(head) };
571+
self.release_task(task);
572+
}
573+
}
574+
}
575+
556576
impl<Fut> Drop for FuturesUnordered<Fut> {
557577
fn drop(&mut self) {
558578
// When a `FuturesUnordered` is dropped we want to drop all futures
559579
// associated with it. At the same time though there may be tons of
560580
// wakers flying around which contain `Task<Fut>` references
561581
// inside them. We'll let those naturally get deallocated.
562-
unsafe {
563-
while !self.head_all.get_mut().is_null() {
564-
let head = *self.head_all.get_mut();
565-
let task = self.unlink(head);
566-
self.release_task(task);
567-
}
568-
}
582+
self.clear_head_all();
569583

570584
// Note that at this point we could still have a bunch of tasks in the
571585
// ready to run queue. None of those tasks, however, have futures

futures-util/src/stream/futures_unordered/ready_to_run_queue.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,25 +85,38 @@ impl<Fut> ReadyToRunQueue<Fut> {
8585
pub(super) fn stub(&self) -> *const Task<Fut> {
8686
&*self.stub
8787
}
88+
89+
// Clear the queue of tasks.
90+
//
91+
// Note that each task has a strong reference count associated with it
92+
// which is owned by the ready to run queue. This method just pulls out
93+
// tasks and drops their refcounts.
94+
//
95+
// # Safety
96+
//
97+
// - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear)
98+
// - The caller **must** guarantee unique access to `self`
99+
pub(crate) unsafe fn clear(&self) {
100+
loop {
101+
// SAFETY: We have the guarantee of mutual exclusion required by `dequeue`.
102+
match self.dequeue() {
103+
Dequeue::Empty => break,
104+
Dequeue::Inconsistent => abort("inconsistent in drop"),
105+
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
106+
}
107+
}
108+
}
88109
}
89110

90111
impl<Fut> Drop for ReadyToRunQueue<Fut> {
91112
fn drop(&mut self) {
92113
// Once we're in the destructor for `Inner<Fut>` we need to clear out
93114
// the ready to run queue of tasks if there's anything left in there.
94-
//
95-
// Note that each task has a strong reference count associated with it
96-
// which is owned by the ready to run queue. All tasks should have had
97-
// their futures dropped already by the `FuturesUnordered` destructor
98-
// above, so we're just pulling out tasks and dropping their refcounts.
115+
116+
// All tasks have had their futures dropped already by the `FuturesUnordered`
117+
// destructor above, and we have &mut self, so this is safe.
99118
unsafe {
100-
loop {
101-
match self.dequeue() {
102-
Dequeue::Empty => break,
103-
Dequeue::Inconsistent => abort("inconsistent in drop"),
104-
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
105-
}
106-
}
119+
self.clear();
107120
}
108121
}
109122
}

futures/tests/stream_futures_unordered.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,3 +345,25 @@ fn polled_only_once_at_most_per_iteration() {
345345
let mut tasks = FuturesUnordered::<F>::new();
346346
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
347347
}
348+
349+
#[test]
350+
fn clear() {
351+
let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);
352+
353+
assert_eq!(block_on(tasks.next()), Some(1));
354+
assert!(!tasks.is_empty());
355+
356+
tasks.clear();
357+
assert!(tasks.is_empty());
358+
359+
tasks.push(future::ready(3));
360+
assert!(!tasks.is_empty());
361+
362+
tasks.clear();
363+
assert!(tasks.is_empty());
364+
365+
assert_eq!(block_on(tasks.next()), None);
366+
assert!(tasks.is_terminated());
367+
tasks.clear();
368+
assert!(!tasks.is_terminated());
369+
}

0 commit comments

Comments
 (0)