From baa8c740cd8003b1af0341df289d20cdbe7dafbd Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 14:40:03 -0500 Subject: [PATCH 01/21] Add MappedFutures --- futures-util/Cargo.toml | 1 + .../futures_unordered/futures_mapped/mod.rs | 23 + .../src/stream/futures_unordered/iter.rs | 28 + .../src/stream/futures_unordered/mod.rs | 539 ++++++++++++++++++ futures-util/src/stream/mod.rs | 3 + 5 files changed, 594 insertions(+) create mode 100644 futures-util/src/stream/futures_unordered/futures_mapped/mod.rs diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index d2876cc7ce..ff995ec0e3 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -48,6 +48,7 @@ spin = { version = "0.10.0", optional = true } futures = { path = "../futures", features = ["async-await", "thread-pool"] } futures-test = { path = "../futures-test" } tokio = "0.1.11" +futures-timer = "3.0.3" [package.metadata.docs.rs] all-features = true diff --git a/futures-util/src/stream/futures_unordered/futures_mapped/mod.rs b/futures-util/src/stream/futures_unordered/futures_mapped/mod.rs new file mode 100644 index 0000000000..9927813b7a --- /dev/null +++ b/futures-util/src/stream/futures_unordered/futures_mapped/mod.rs @@ -0,0 +1,23 @@ +//! An unbounded map of futures. +//! +//! This module is only available when the `std` or `alloc` feature of this +//! library is activated, and it is activated by default. + +use crate::task::AtomicWaker; +use alloc::sync::{Arc, Weak}; +use core::cell::UnsafeCell; +use core::fmt::{self, Debug}; +use core::iter::FromIterator; +use core::marker::PhantomData; +use core::mem; +use core::pin::Pin; +use core::ptr; +use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; +use core::sync::atomic::{AtomicBool, AtomicPtr}; +use futures_core::future::Future; +use futures_core::stream::{FusedStream, Stream}; +use futures_core::task::{Context, Poll}; +use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +use std::collections::{HashMap, HashSet}; + +use super::FuturesUnordered; diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 20248c70fe..686b56d821 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -1,5 +1,6 @@ use super::task::Task; use super::FuturesUnordered; +use core::hash::Hash; use core::marker::PhantomData; use core::pin::Pin; use core::ptr; @@ -13,6 +14,19 @@ pub struct IterPinMut<'a, Fut> { pub(super) _marker: PhantomData<&'a mut FuturesUnordered>, } +/// Mutable iterator over all keys and futures in the map. +// #[derive(Debug)] +// pub struct MapIterPinMut<'a, K, Fut> { +// pub(super) inner: IterMut<'a, K, Fut>, +// // pub(super) task: *const Task, +// // pub(super) len: usize, +// // pub(super) _marker: PhantomData<&'a mut FuturesUnordered>, +// } + +/// Immutable iterator over all the keys and futures in the map. +// #[derive(Debug)] +// pub struct IterMap<'a, K: Hash + Eq, Fut: Unpin>(pub(super) IterPinRef<'a, K, Fut>); + /// Mutable iterator over all futures in the unordered set. #[derive(Debug)] pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); @@ -26,6 +40,20 @@ pub struct IterPinRef<'a, Fut> { pub(super) _marker: PhantomData<&'a FuturesUnordered>, } +/// Immutable iterator over all keys and futures in the map. +// #[derive(Debug)] +// pub struct MapIterPinRef<'a, K, Fut> { +// inner: HashMapIter<'a, K, *const Task>, +// // pub(super) task: *const Task, +// // pub(super) len: usize, +// // pub(super) pending_next_all: *mut Task, +// // pub(super) _marker: PhantomData<&'a FuturesUnordered>, +// } + +/// Immutable iterator over all keys and futures in the map. +// #[derive(Debug)] +// pub struct MapIter<'a, K, Fut>(pub(super) MapIterPinRef<'a, K, Fut>); + /// Immutable iterator over all the futures in the unordered set. #[derive(Debug)] pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 913e260fda..27966256be 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -5,8 +5,10 @@ use crate::task::AtomicWaker; use alloc::sync::{Arc, Weak}; +use core::borrow::Borrow; use core::cell::UnsafeCell; use core::fmt::{self, Debug}; +use core::hash::{Hash, Hasher}; use core::iter::FromIterator; use core::marker::PhantomData; use core::mem; @@ -18,6 +20,8 @@ use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +use std::collections::HashSet; +use std::println; mod abort; @@ -163,6 +167,10 @@ impl FuturesUnordered { /// ensure that [`FuturesUnordered::poll_next`](Stream::poll_next) is called /// in order to receive wake-up notifications for the given future. pub fn push(&self, future: Fut) { + self.push_inner(future); + } + + fn push_inner(&self, future: Fut) -> *const Task { let task = Arc::new(Task { future: UnsafeCell::new(Some(future)), next_all: AtomicPtr::new(self.pending_next_all()), @@ -173,6 +181,7 @@ impl FuturesUnordered { ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), woken: AtomicBool::new(false), }); + println!("Task creation, ref count {:?}", Arc::strong_count(&task)); // Reset the `is_terminated` flag if we've previously marked ourselves // as terminated. @@ -188,6 +197,7 @@ impl FuturesUnordered { // futures are ready. To do that we unconditionally enqueue it for // polling here. self.ready_to_run_queue.enqueue(ptr); + ptr } /// Returns an iterator that allows inspecting each future in the set. @@ -671,3 +681,532 @@ impl Extend for FuturesUnordered { } } } + +/// A map of futures which may complete in any order. +/// +/// This structure is optimized to manage a large number of futures. +/// Futures managed by [`MappedFutures`] will only be polled when they +/// generate wake-up notifications. This reduces the required amount of work +/// needed to poll large numbers of futures. +/// +/// [`MappedFutures`] can be filled by [`collect`](Iterator::collect)ing an +/// iterator of futures into a [`MappedFutures`], or by +/// [`insert`](MappedFutures::insert)ing futures onto an existing +/// [`MappedFutures`]. When new futures are added, +/// [`poll_next`](Stream::poll_next) must be called in order to begin receiving +/// wake-ups for new futures. +/// +/// Note that you can create a ready-made [`MappedFutures`] via the +/// [`collect`](Iterator::collect) method, or you can start with an empty set +/// with the [`MappedFutures::new`] constructor. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct MappedFutures { + // inner: FuturesUnordered>, + task_set: HashSet>>, + futures: FuturesUnordered>, +} + +#[derive(Debug)] +struct HashTask { + inner: *const Task, + key: Arc, +} + +impl Borrow for HashTask> { + fn borrow(&self) -> &K { + // DO NOT CALL THIS + // This is required for the hash set to use a K to get the HashTask. + // let key = &unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().key; + + // panic!(); + // key.as_ref() + &self.key + } +} + +impl PartialEq for HashTask { + fn eq(&self, other: &Self) -> bool { + println!("Comparing eq keys {:?} with {:?}", self.inner, other.inner); + self.inner == other.inner + } +} + +impl Eq for HashTask {} + +impl Hash for HashTask> { + fn hash(&self, state: &mut H) { + let key = unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().key(); + // println!("Comparing hash keys {:?} ", key); + // unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().hash(state) + // unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().hash(state) + key.hash(state) + } +} + +#[derive(Debug)] +struct HashFut { + key: Arc, + future: Fut, +} + +impl HashFut { + fn key(&self) -> &K { + self.key.as_ref() + } +} + +impl PartialEq for HashFut { + fn eq(&self, other: &Self) -> bool { + self.key == other.key + } +} + +impl Eq for HashFut {} + +impl Hash for HashFut { + fn hash(&self, state: &mut H) { + self.key.hash(state); + } +} + +impl Future for HashFut { + type Output = (Arc, Fut::Output); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = unsafe { Pin::into_inner_unchecked(self) }; + let res = std::task::ready!(unsafe { Pin::new_unchecked(&mut inner.future) }.poll(cx)); + // let key = inner.key; + Poll::Ready((inner.key.clone(), res)) + } +} + +unsafe impl Send for MappedFutures {} +unsafe impl Sync for MappedFutures {} +impl Unpin for MappedFutures {} + +impl Default for MappedFutures { + fn default() -> Self { + Self::new() + } +} + +impl MappedFutures { + /// Constructs a new, empty [`MappedFutures`]. + /// + /// The returned [`MappedFutures`] does not contain any futures. + /// In this state, [`MappedFutures::poll_next`](Stream::poll_next) will + /// return [`Poll::Ready(None)`](Poll::Ready). + pub fn new() -> Self { + Self { task_set: HashSet::new(), futures: FuturesUnordered::new() } + } + + /// Returns the number of futures contained in the set. + /// + /// This represents the total number of in-flight futures. + pub fn len(&self) -> usize { + self.futures.len() + } + + /// Returns `true` if the set contains no futures. + pub fn is_empty(&self) -> bool { + // Relaxed ordering can be used here since we don't need to read from + // the head pointer, only check whether it is null. + self.futures.is_empty() + } + + // /// This is a code stank + // fn set(&mut self) -> &mut HashSet> { + // &mut self.inner.inner.inner + // } + + /// Insert a future into the set. + /// + /// This method adds the given future to the set. This method will not + /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must + /// ensure that [`MappedFutures::poll_next`](Stream::poll_next) is called + /// in order to receive wake-up notifications for the given future. + /// + /// This method will remove and drop a future that is already mapped to the provided key. + /// Returns true if another future was not removed to make room for the provided future. + pub fn insert(&mut self, key: K, future: Fut) -> bool { + let replacing = self.cancel(&key); + let arc_key = Arc::new(key); + let hash_fut = HashFut { key: arc_key.clone(), future }; + let task = self.futures.push_inner(hash_fut); + let arc = unsafe { Arc::from_raw(task) }; + println!("Task inserted, ref count {:?}", Arc::strong_count(&arc)); + Arc::into_raw(arc); + // self.inner.self.hash_set.insert(task.into()); + self.task_set.insert(HashTask { key: arc_key, inner: task }); + !replacing + } + + /// Insert a future into the set and return the displaced future, if there was one. + /// + /// This method adds the given future to the set. This method will not + /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must + /// ensure that [`MappedFutures::poll_next`](Stream::poll_next) is called + /// in order to receive wake-up notifications for the given future. + /// Returns true if another future was ma + pub fn replace(&mut self, key: K, future: Fut) -> Option + where + Fut: Unpin, + { + let replacing = self.remove(&key); + self.insert(key, future); + replacing + } + + /// Remove a future from the set, dropping it. + /// + /// Returns true if a future was cancelled. + pub fn cancel(&mut self, key: &K) -> bool { + // if let Some(task) = self.hash_set.get(key) { + if let Some(task) = self.task_set.take(key) { + unsafe { + let task_arc = Arc::from_raw(task.inner); + if (*task_arc.future.get()).is_some() { + // self.futures.unlink(task.inner); + let unlinked_task = self.futures.unlink(task.inner); + self.futures.release_task(unlinked_task); + Arc::into_raw(task_arc); + return true; + } + Arc::into_raw(task_arc); + } + } + false + } + + /// Remove a future from the set and return it. + pub fn remove(&mut self, key: &K) -> Option + where + Fut: Unpin, + { + // if let Some(task) = self.hash_set.get(key) { + if let Some(task) = self.task_set.take(key) { + unsafe { + let arc_task = Arc::from_raw(task.inner); + let fut = (*arc_task.future.get()).take().unwrap(); + let unlinked_task = self.futures.unlink(task.inner); + self.futures.release_task(unlinked_task); + Arc::into_raw(arc_task); + return Some(fut.future); + } + } + None + } + + /// Returns `true` if the map contains a future for the specified key. + pub fn contains(&mut self, key: &K) -> bool { + self.task_set.contains(key) + } + + /// Get a pinned mutable reference to the mapped future. + pub fn get_pin_mut(&mut self, key: &K) -> Option> { + // if let Some(task_ref) = self.hash_set.get(key) { + if let Some(task_ref) = self.task_set.get(key) { + unsafe { + // let mut fut = ; + // if let Some(mut fut) = fut { + if let Some(ref mut fut) = *Arc::from_raw(task_ref.inner).future.get() { + return Some(Pin::new_unchecked(&mut fut.future)); + } + } + } + None + } + + /// Get a pinned mutable reference to the mapped future. + pub fn get_mut(&mut self, key: &K) -> Option<&mut Fut> + where + Fut: Unpin, + { + // if let Some(task_ref) = self.hash_set.get(key) { + if let Some(task_ref) = self.task_set.get(key) { + unsafe { + // if let Some(fut) = &mut *task_ref.inner.future.get() { + let task = Arc::from_raw(task_ref.inner); + if let Some(ref mut fut) = *task.future.get() { + println!("Future found"); + Arc::into_raw(task); + return Some(&mut fut.future); + } + println!("Task get_mut, ref count {:?}", Arc::strong_count(&task)); + + Arc::into_raw(task); + } + } + None + } + + /// Get a shared reference to the mapped future. + pub fn get(&mut self, key: &K) -> Option<&Fut> { + // if let Some(task_ref) = self.hash_set.get(key) { + // if let Some(task_ref) = self.set().get(key) { + if let Some(task_ref) = self.task_set.get(key) { + unsafe { + // if let Some(fut) = &*task_ref.inner.future.get() { + let task = Arc::from_raw(task_ref.inner); + if let Some(ref mut fut) = *task.future.get() { + return Some(&fut.future); + } + Arc::into_raw(task); + } + } + None + } + + /// Get a pinned shared reference to the mapped future. + pub fn get_pin(&mut self, key: &K) -> Option> { + // if let Some(task_ref) = self.hash_set.get(key) { + // if let Some(task_ref) = self.set().get(key) { + if let Some(task_ref) = self.task_set.get(key) { + unsafe { + // if let Some(fut) = &*task_ref.future.get() { + let task = Arc::from_raw(task_ref.inner); + if let Some(ref mut fut) = *task.future.get() { + return Some(Pin::new_unchecked(&fut.future)); + } + Arc::into_raw(task); + } + } + None + } + + /// Returns an iterator of keys in the mapping. + pub fn keys_pin<'a>(self: Pin<&'a Self>) -> KeysPin<'a, K, Fut> { + KeysPin { + // inner: self.hash_set.iter().map(Box::new(|hash_task| HashTask::key_unwrap(hash_task))), + // inner: self.set().iter().map(Box::new(|hash_task| HashTask::key_unwrap(hash_task))), + // inner: self.set().keys(), + inner: unsafe { self.map_unchecked(|f| &f.futures) }.iter_pin_ref(), + } + } + + /// Returns an iterator of keys in the mapping. + pub fn keys<'a>(&'a self) -> Keys<'a, K, Fut> + where + K: Unpin, + Fut: Unpin, + { + Keys { + // inner: self.hash_set.iter().map(Box::new(|hash_task| HashTask::key_unwrap(hash_task))), + // inner: self.set().iter().map(Box::new(|hash_task| HashTask::key_unwrap(hash_task))), + // inner: self.set().keys(), + inner: Pin::new(self).keys_pin(), + } + } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter(&self) -> MapIter<'_, K, Fut> + where + Fut: Unpin, + K: Unpin, + { + MapIter(Pin::new(self).iter_pin_ref()) + } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter_pin_ref(self: Pin<&Self>) -> MapIterPinRef<'_, K, Fut> { + MapIterPinRef { inner: unsafe { self.map_unchecked(|f| &f.futures) }.iter_pin_ref() } + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_mut(&mut self) -> MapIterMut<'_, K, Fut> + where + Fut: Unpin, + { + MapIterMut(Pin::new(self).iter_pin_mut()) + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_pin_mut(self: Pin<&mut Self>) -> MapIterPinMut<'_, K, Fut> { + // `head_all` can be accessed directly and we don't need to spin on + // `Task::next_all` since we have exclusive access to the set. + + // IterPinMut { inner: Pin::new(&mut self.inner).iter_pin_mut() } + MapIterPinMut(unsafe { self.map_unchecked_mut(|thing| &mut thing.futures) }.iter_pin_mut()) + // IterPinMut { task, len, _marker: PhantomData } + } +} + +impl Stream for MappedFutures { + type Item = (K, Fut::Output); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match std::task::ready!(Pin::new(&mut self.futures).poll_next(cx)) { + Some(output) => { + let key = output.0; + self.task_set.remove(key.as_ref()); + + println!("Task complete, ref count {:?}", Arc::strong_count(&key)); + Poll::Ready(Some((Arc::into_inner(key).unwrap(), output.1))) + } + None => Poll::Ready(None), + } + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } +} + +/// Immutable iterator over all keys in the mapping. +#[derive(Debug)] +pub struct KeysPin<'a, K: Hash + Eq + Unpin, Fut> { + pub(super) inner: IterPinRef<'a, HashFut>, +} + +impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for KeysPin<'a, K, Fut> { + type Item = &'a K; + + fn next(&mut self) -> Option { + Some(&(*self.inner.next().as_ref()?).get_ref().key()) + } +} + +/// Immutable iterator over all keys in the mapping. +#[derive(Debug)] +pub struct Keys<'a, K: Hash + Eq + Unpin, Fut: Unpin> { + pub(super) inner: KeysPin<'a, K, Fut>, +} + +impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for Keys<'a, K, Fut> { + type Item = &'a K; + + fn next(&mut self) -> Option { + self.inner.next() + } +} + +/// Immutable iterator over all keys in the mapping. +#[derive(Debug)] +pub struct MapIterPinRef<'a, K: Hash + Eq + Unpin, Fut> { + pub(super) inner: IterPinRef<'a, HashFut>, +} + +/// Immutable iterator over all keys in the mapping. +#[derive(Debug)] +pub struct MapIterPinMut<'a, K: Hash + Eq + Unpin, Fut>(pub(super) IterPinMut<'a, HashFut>); + +#[derive(Debug)] +pub struct MapIterMut<'a, K: Hash + Eq + Unpin, Fut>(pub(super) MapIterPinMut<'a, K, Fut>); + +#[derive(Debug)] +pub struct MapIter<'a, K: Hash + Eq + Unpin, Fut>(pub(super) MapIterPinRef<'a, K, Fut>); + +impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIterMut<'a, K, Fut> { + type Item = (&'a K, &'a mut Fut); + + fn next(&mut self) -> Option { + let next = self.0.next()?; + Some((&next.0, Pin::into_inner(next.1))) + } +} + +impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinMut<'a, K, Fut> { + type Item = (&'a K, Pin<&'a mut Fut>); + + fn next(&mut self) -> Option { + let next = unsafe { Pin::into_inner_unchecked(self.0.next()?) }; + Some((&next.key.as_ref(), unsafe { Pin::new_unchecked(&mut next.future) })) + } +} + +impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinRef<'a, K, Fut> { + type Item = (&'a K, Pin<&'a Fut>); + + fn next(&mut self) -> Option { + let next = self.inner.next()?; + let fut = unsafe { next.map_unchecked(|f| &f.future) }; + // Some((&next.get_ref().key.as_ref().unwrap(), fut)) + Some((next.get_ref().key(), fut)) + } +} + +impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIter<'a, K, Fut> { + type Item = (&'a K, &'a Fut); + + fn next(&mut self) -> Option { + let next = self.0.next()?; + let key = next.0; + // let fut = Pin::get_ref(next.1.as_ref()); + Some((key, Pin::into_inner(next.1))) + } +} + +#[cfg(test)] +pub mod tests { + use crate::stream::*; + use futures::executor::block_on; + use futures::future::LocalBoxFuture; + use futures_timer::Delay; + use futures_unordered::MappedFutures; + // use futures_util::StreamExt; + // use crate::StreamExt; + use std::boxed::Box; + use std::println; + use std::time::Duration; + + fn insert_millis(futs: &mut MappedFutures, key: u32, millis: u64) { + futs.insert(key, Delay::new(Duration::from_millis(millis))); + } + + fn insert_millis_pinned( + futs: &mut MappedFutures>, + key: u32, + millis: u64, + ) { + futs.insert(key, Box::pin(Delay::new(Duration::from_millis(millis)))); + } + + #[test] + fn mf_map_futures() { + let mut futures: MappedFutures = MappedFutures::new(); + insert_millis(&mut futures, 1, 50); + insert_millis(&mut futures, 2, 75); + insert_millis(&mut futures, 3, 150); + insert_millis(&mut futures, 4, 200); + + assert_eq!(block_on(futures.next()).unwrap().0, 1); + assert_eq!(futures.cancel(&3), true); + assert_eq!(block_on(futures.next()).unwrap().0, 2); + assert_eq!(block_on(futures.next()).unwrap().0, 4); + assert_eq!(block_on(futures.next()), None); + } + + #[test] + fn mf_remove_pinned() { + let mut futures: MappedFutures> = MappedFutures::new(); + insert_millis_pinned(&mut futures, 1, 50); + insert_millis_pinned(&mut futures, 3, 150); + insert_millis_pinned(&mut futures, 4, 200); + + assert_eq!(block_on(futures.next()).unwrap().0, 1); + assert_eq!(block_on(futures.remove(&3).unwrap()), ()); + insert_millis_pinned(&mut futures, 2, 60); + assert_eq!(block_on(futures.next()).unwrap().0, 4); + assert_eq!(block_on(futures.next()).unwrap().0, 2); + assert_eq!(block_on(futures.next()), None); + } + + #[test] + fn mf_mutate() { + let mut futures: MappedFutures = MappedFutures::new(); + insert_millis(&mut futures, 1, 500); + println!("{:?}", futures); + println!("{:?}", futures.get_mut(&1)); + insert_millis(&mut futures, 2, 1000); + insert_millis(&mut futures, 3, 1500); + insert_millis(&mut futures, 4, 2000); + + assert_eq!(block_on(futures.next()).unwrap().0, 1); + futures.get_mut(&3).unwrap().reset(Duration::from_millis(300)); + assert_eq!(block_on(futures.next()).unwrap().0, 3); + assert_eq!(block_on(futures.next()).unwrap().0, 2); + assert_eq!(block_on(futures.next()).unwrap().0, 4); + assert_eq!(block_on(futures.next()), None); + } +} diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 789e1ad221..4b9e086e07 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -115,6 +115,9 @@ mod futures_ordered; #[cfg(feature = "alloc")] pub use self::futures_ordered::FuturesOrdered; +#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] +#[cfg(feature = "alloc")] +pub mod futures_mapped; #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub mod futures_unordered; From d4f33d6b2dc073059abd12652a546082e94a24ea Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 15:23:22 -0500 Subject: [PATCH 02/21] extract MappedFutures to module nested in FuturesUnordered; clean code --- .../futures_unordered/futures_mapped/mod.rs | 23 - .../src/stream/futures_unordered/iter.rs | 1 - .../futures_unordered/mapped_futures/mod.rs | 485 ++++++++++++++++ .../src/stream/futures_unordered/mod.rs | 536 +----------------- futures-util/src/stream/mod.rs | 3 - 5 files changed, 487 insertions(+), 561 deletions(-) delete mode 100644 futures-util/src/stream/futures_unordered/futures_mapped/mod.rs create mode 100644 futures-util/src/stream/futures_unordered/mapped_futures/mod.rs diff --git a/futures-util/src/stream/futures_unordered/futures_mapped/mod.rs b/futures-util/src/stream/futures_unordered/futures_mapped/mod.rs deleted file mode 100644 index 9927813b7a..0000000000 --- a/futures-util/src/stream/futures_unordered/futures_mapped/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -//! An unbounded map of futures. -//! -//! This module is only available when the `std` or `alloc` feature of this -//! library is activated, and it is activated by default. - -use crate::task::AtomicWaker; -use alloc::sync::{Arc, Weak}; -use core::cell::UnsafeCell; -use core::fmt::{self, Debug}; -use core::iter::FromIterator; -use core::marker::PhantomData; -use core::mem; -use core::pin::Pin; -use core::ptr; -use core::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst}; -use core::sync::atomic::{AtomicBool, AtomicPtr}; -use futures_core::future::Future; -use futures_core::stream::{FusedStream, Stream}; -use futures_core::task::{Context, Poll}; -use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; -use std::collections::{HashMap, HashSet}; - -use super::FuturesUnordered; diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 686b56d821..6eb3d6e6cd 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -1,6 +1,5 @@ use super::task::Task; use super::FuturesUnordered; -use core::hash::Hash; use core::marker::PhantomData; use core::pin::Pin; use core::ptr; diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs new file mode 100644 index 0000000000..dde5a36928 --- /dev/null +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -0,0 +1,485 @@ +//! An unbounded map of futures. +//! +//! This module is only available when the `std` or `alloc` feature of this +//! library is activated, and it is activated by default. + +use super::task::Task; +use alloc::sync::Arc; +use core::borrow::Borrow; +use core::fmt::Debug; +use core::hash::{Hash, Hasher}; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll}; +use std::collections::HashSet; + +use super::{FuturesUnordered, IterPinMut, IterPinRef}; + +/// A map of futures which may complete in any order. +/// +/// This structure is optimized to manage a large number of futures. +/// Futures managed by [`MappedFutures`] will only be polled when they +/// generate wake-up notifications. This reduces the required amount of work +/// needed to poll large numbers of futures. +/// +/// [`MappedFutures`] can be filled by [`collect`](Iterator::collect)ing an +/// iterator of futures into a [`MappedFutures`], or by +/// [`insert`](MappedFutures::insert)ing futures onto an existing +/// [`MappedFutures`]. When new futures are added, +/// [`poll_next`](Stream::poll_next) must be called in order to begin receiving +/// wake-ups for new futures. +/// +/// Note that you can create a ready-made [`MappedFutures`] via the +/// [`collect`](Iterator::collect) method, or you can start with an empty set +/// with the [`MappedFutures::new`] constructor. +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct MappedFutures { + task_set: HashSet>>, + futures: FuturesUnordered>, +} + +#[derive(Debug)] +struct HashTask { + inner: *const Task, + key: Arc, +} + +impl Borrow for HashTask> { + fn borrow(&self) -> &K { + &self.key + } +} + +impl PartialEq for HashTask { + fn eq(&self, other: &Self) -> bool { + self.inner == other.inner + } +} + +impl Eq for HashTask {} + +impl Hash for HashTask> { + fn hash(&self, state: &mut H) { + let key = unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().key(); + key.hash(state) + } +} + +#[derive(Debug)] +struct HashFut { + key: Arc, + future: Fut, +} + +impl HashFut { + fn key(&self) -> &K { + self.key.as_ref() + } +} + +impl PartialEq for HashFut { + fn eq(&self, other: &Self) -> bool { + self.key == other.key + } +} + +impl Eq for HashFut {} + +impl Hash for HashFut { + fn hash(&self, state: &mut H) { + self.key.hash(state); + } +} + +impl Future for HashFut { + type Output = (Arc, Fut::Output); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = unsafe { Pin::into_inner_unchecked(self) }; + let res = std::task::ready!(unsafe { Pin::new_unchecked(&mut inner.future) }.poll(cx)); + Poll::Ready((inner.key.clone(), res)) + } +} + +unsafe impl Send for MappedFutures {} +unsafe impl Sync for MappedFutures {} +impl Unpin for MappedFutures {} + +impl Default for MappedFutures { + fn default() -> Self { + Self::new() + } +} + +impl MappedFutures { + /// Constructs a new, empty [`MappedFutures`]. + /// + /// The returned [`MappedFutures`] does not contain any futures. + /// In this state, [`MappedFutures::poll_next`](Stream::poll_next) will + /// return [`Poll::Ready(None)`](Poll::Ready). + pub fn new() -> Self { + Self { task_set: HashSet::new(), futures: FuturesUnordered::new() } + } + + /// Returns the number of futures contained in the set. + /// + /// This represents the total number of in-flight futures. + pub fn len(&self) -> usize { + self.futures.len() + } + + /// Returns `true` if the set contains no futures. + pub fn is_empty(&self) -> bool { + // Relaxed ordering can be used here since we don't need to read from + // the head pointer, only check whether it is null. + self.futures.is_empty() + } + + /// Insert a future into the map. + /// + /// This method adds the given future to the set. This method will not + /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must + /// ensure that [`MappedFutures::poll_next`](Stream::poll_next) is called + /// in order to receive wake-up notifications for the given future. + /// + /// This method will remove and drop a future that is already mapped to the provided key. + /// Returns true if another future was not removed to make room for the provided future. + pub fn insert(&mut self, key: K, future: Fut) -> bool { + let replacing = self.cancel(&key); + let arc_key = Arc::new(key); + let hash_fut = HashFut { key: arc_key.clone(), future }; + let task = self.futures.push_inner(hash_fut); + let arc = unsafe { Arc::from_raw(task) }; + let _ = Arc::into_raw(arc); + self.task_set.insert(HashTask { key: arc_key, inner: task }); + !replacing + } + + /// Insert a future into the set and return the displaced future, if there was one. + /// + /// This method adds the given future to the set. This method will not + /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must + /// ensure that [`MappedFutures::poll_next`](Stream::poll_next) is called + /// in order to receive wake-up notifications for the given future. + /// Returns true if another future was ma + pub fn replace(&mut self, key: K, future: Fut) -> Option + where + Fut: Unpin, + { + let replacing = self.remove(&key); + self.insert(key, future); + replacing + } + + /// Remove a future from the set, dropping it. + /// + /// Returns true if a future was cancelled. + pub fn cancel(&mut self, key: &K) -> bool { + if let Some(task) = self.task_set.take(key) { + unsafe { + let task_arc = Arc::from_raw(task.inner); + if (*task_arc.future.get()).is_some() { + let unlinked_task = self.futures.unlink(task.inner); + self.futures.release_task(unlinked_task); + let _ = Arc::into_raw(task_arc); + return true; + } + let _ = Arc::into_raw(task_arc); + } + } + false + } + + /// Remove a future from the set and return it. + pub fn remove(&mut self, key: &K) -> Option + where + Fut: Unpin, + { + // if let Some(task) = self.hash_set.get(key) { + if let Some(task) = self.task_set.take(key) { + unsafe { + let arc_task = Arc::from_raw(task.inner); + let fut = (*arc_task.future.get()).take().unwrap(); + let unlinked_task = self.futures.unlink(task.inner); + self.futures.release_task(unlinked_task); + let _ = Arc::into_raw(arc_task); + return Some(fut.future); + } + } + None + } + + /// Returns `true` if the map contains a future for the specified key. + pub fn contains(&mut self, key: &K) -> bool { + self.task_set.contains(key) + } + + /// Get a pinned mutable reference to the mapped future. + pub fn get_pin_mut(&mut self, key: &K) -> Option> { + if let Some(task_ref) = self.task_set.get(key) { + unsafe { + if let Some(ref mut fut) = *Arc::from_raw(task_ref.inner).future.get() { + return Some(Pin::new_unchecked(&mut fut.future)); + } + } + } + None + } + + /// Get a pinned mutable reference to the mapped future. + pub fn get_mut(&mut self, key: &K) -> Option<&mut Fut> + where + Fut: Unpin, + { + if let Some(task_ref) = self.task_set.get(key) { + unsafe { + let task = Arc::from_raw(task_ref.inner); + if let Some(ref mut fut) = *task.future.get() { + let _ = Arc::into_raw(task); + return Some(&mut fut.future); + } + + let _ = Arc::into_raw(task); + } + } + None + } + + /// Get a shared reference to the mapped future. + pub fn get(&mut self, key: &K) -> Option<&Fut> { + if let Some(task_ref) = self.task_set.get(key) { + unsafe { + let task = Arc::from_raw(task_ref.inner); + if let Some(ref mut fut) = *task.future.get() { + return Some(&fut.future); + } + let _ = Arc::into_raw(task); + } + } + None + } + + /// Get a pinned shared reference to the mapped future. + pub fn get_pin(&mut self, key: &K) -> Option> { + if let Some(task_ref) = self.task_set.get(key) { + unsafe { + let task = Arc::from_raw(task_ref.inner); + if let Some(ref mut fut) = *task.future.get() { + return Some(Pin::new_unchecked(&fut.future)); + } + let _ = Arc::into_raw(task); + } + } + None + } + + /// Returns an iterator of keys in the mapping. + pub fn keys_pin<'a>(self: Pin<&'a Self>) -> KeysPin<'a, K, Fut> { + KeysPin(unsafe { self.map_unchecked(|f| &f.futures) }.iter_pin_ref()) + } + + /// Returns an iterator of keys in the mapping. + pub fn keys<'a>(&'a self) -> Keys<'a, K, Fut> + where + K: Unpin, + Fut: Unpin, + { + Keys(Pin::new(self).keys_pin()) + } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter(&self) -> MapIter<'_, K, Fut> + where + Fut: Unpin, + K: Unpin, + { + MapIter(Pin::new(self).iter_pin_ref()) + } + + /// Returns an iterator that allows inspecting each future in the set. + pub fn iter_pin_ref(self: Pin<&Self>) -> MapIterPinRef<'_, K, Fut> { + MapIterPinRef(unsafe { self.map_unchecked(|f| &f.futures) }.iter_pin_ref()) + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_mut(&mut self) -> MapIterMut<'_, K, Fut> + where + Fut: Unpin, + { + MapIterMut(Pin::new(self).iter_pin_mut()) + } + + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_pin_mut(self: Pin<&mut Self>) -> MapIterPinMut<'_, K, Fut> { + MapIterPinMut(unsafe { self.map_unchecked_mut(|thing| &mut thing.futures) }.iter_pin_mut()) + } +} + +impl Stream for MappedFutures { + type Item = (K, Fut::Output); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match std::task::ready!(Pin::new(&mut self.futures).poll_next(cx)) { + Some(output) => { + let key = output.0; + self.task_set.remove(key.as_ref()); + Poll::Ready(Some((Arc::into_inner(key).unwrap(), output.1))) + } + None => Poll::Ready(None), + } + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } +} + +/// Immutable iterator over all keys in the mapping. +#[derive(Debug)] +pub struct KeysPin<'a, K: Hash + Eq + Unpin, Fut>(IterPinRef<'a, HashFut>); + +impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for KeysPin<'a, K, Fut> { + type Item = &'a K; + + fn next(&mut self) -> Option { + Some(&(*self.0.next().as_ref()?).get_ref().key()) + } +} + +/// Immutable iterator over all keys in the mapping. +#[derive(Debug)] +pub struct Keys<'a, K: Hash + Eq + Unpin, Fut: Unpin>(KeysPin<'a, K, Fut>); + +impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for Keys<'a, K, Fut> { + type Item = &'a K; + + fn next(&mut self) -> Option { + self.0.next() + } +} + +/// Immutable iterator over all keys in the mapping. +#[derive(Debug)] +pub struct MapIterPinRef<'a, K: Hash + Eq + Unpin, Fut>(IterPinRef<'a, HashFut>); + +/// Immutable iterator over all keys in the mapping. +#[derive(Debug)] +pub struct MapIterPinMut<'a, K: Hash + Eq + Unpin, Fut>(IterPinMut<'a, HashFut>); + +/// Mutable iterator over all keys and futures in the map. +#[derive(Debug)] +pub struct MapIterMut<'a, K: Hash + Eq + Unpin, Fut>(MapIterPinMut<'a, K, Fut>); + +/// Immutable iterator over all the keys and futures in the map. +#[derive(Debug)] +pub struct MapIter<'a, K: Hash + Eq + Unpin, Fut>(MapIterPinRef<'a, K, Fut>); + +impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIterMut<'a, K, Fut> { + type Item = (&'a K, &'a mut Fut); + + fn next(&mut self) -> Option { + let next = self.0.next()?; + Some((&next.0, Pin::into_inner(next.1))) + } +} + +impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinMut<'a, K, Fut> { + type Item = (&'a K, Pin<&'a mut Fut>); + + fn next(&mut self) -> Option { + let next = unsafe { Pin::into_inner_unchecked(self.0.next()?) }; + Some((&next.key.as_ref(), unsafe { Pin::new_unchecked(&mut next.future) })) + } +} + +impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinRef<'a, K, Fut> { + type Item = (&'a K, Pin<&'a Fut>); + + fn next(&mut self) -> Option { + let next = self.0.next()?; + let fut = unsafe { next.map_unchecked(|f| &f.future) }; + Some((next.get_ref().key(), fut)) + } +} + +impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIter<'a, K, Fut> { + type Item = (&'a K, &'a Fut); + + fn next(&mut self) -> Option { + let next = self.0.next()?; + let key = next.0; + Some((key, Pin::into_inner(next.1))) + } +} + +/// Tests for MappedFutures +#[cfg(test)] +pub mod tests { + use crate::stream::*; + use futures::executor::block_on; + use futures::future::LocalBoxFuture; + use futures_timer::Delay; + use futures_unordered::mapped_futures::MappedFutures; + use std::boxed::Box; + use std::time::Duration; + + fn insert_millis(futs: &mut MappedFutures, key: u32, millis: u64) { + futs.insert(key, Delay::new(Duration::from_millis(millis))); + } + + fn insert_millis_pinned( + futs: &mut MappedFutures>, + key: u32, + millis: u64, + ) { + futs.insert(key, Box::pin(Delay::new(Duration::from_millis(millis)))); + } + + #[test] + fn mf_map_futures() { + let mut futures: MappedFutures = MappedFutures::new(); + insert_millis(&mut futures, 1, 50); + insert_millis(&mut futures, 2, 75); + insert_millis(&mut futures, 3, 150); + insert_millis(&mut futures, 4, 200); + + assert_eq!(block_on(futures.next()).unwrap().0, 1); + assert_eq!(futures.cancel(&3), true); + assert_eq!(block_on(futures.next()).unwrap().0, 2); + assert_eq!(block_on(futures.next()).unwrap().0, 4); + assert_eq!(block_on(futures.next()), None); + } + + #[test] + fn mf_remove_pinned() { + let mut futures: MappedFutures> = MappedFutures::new(); + insert_millis_pinned(&mut futures, 1, 50); + insert_millis_pinned(&mut futures, 3, 150); + insert_millis_pinned(&mut futures, 4, 200); + + assert_eq!(block_on(futures.next()).unwrap().0, 1); + assert_eq!(block_on(futures.remove(&3).unwrap()), ()); + insert_millis_pinned(&mut futures, 2, 60); + assert_eq!(block_on(futures.next()).unwrap().0, 4); + assert_eq!(block_on(futures.next()).unwrap().0, 2); + assert_eq!(block_on(futures.next()), None); + } + + #[test] + fn mf_mutate() { + let mut futures: MappedFutures = MappedFutures::new(); + insert_millis(&mut futures, 1, 500); + insert_millis(&mut futures, 2, 1000); + insert_millis(&mut futures, 3, 1500); + insert_millis(&mut futures, 4, 2000); + + assert_eq!(block_on(futures.next()).unwrap().0, 1); + futures.get_mut(&3).unwrap().reset(Duration::from_millis(300)); + assert_eq!(block_on(futures.next()).unwrap().0, 3); + assert_eq!(block_on(futures.next()).unwrap().0, 2); + assert_eq!(block_on(futures.next()).unwrap().0, 4); + assert_eq!(block_on(futures.next()), None); + } +} diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 27966256be..f8efedbc78 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -5,10 +5,8 @@ use crate::task::AtomicWaker; use alloc::sync::{Arc, Weak}; -use core::borrow::Borrow; use core::cell::UnsafeCell; use core::fmt::{self, Debug}; -use core::hash::{Hash, Hasher}; use core::iter::FromIterator; use core::marker::PhantomData; use core::mem; @@ -20,8 +18,8 @@ use futures_core::future::Future; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; -use std::collections::HashSet; -use std::println; + +pub mod mapped_futures; mod abort; @@ -181,7 +179,6 @@ impl FuturesUnordered { ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), woken: AtomicBool::new(false), }); - println!("Task creation, ref count {:?}", Arc::strong_count(&task)); // Reset the `is_terminated` flag if we've previously marked ourselves // as terminated. @@ -681,532 +678,3 @@ impl Extend for FuturesUnordered { } } } - -/// A map of futures which may complete in any order. -/// -/// This structure is optimized to manage a large number of futures. -/// Futures managed by [`MappedFutures`] will only be polled when they -/// generate wake-up notifications. This reduces the required amount of work -/// needed to poll large numbers of futures. -/// -/// [`MappedFutures`] can be filled by [`collect`](Iterator::collect)ing an -/// iterator of futures into a [`MappedFutures`], or by -/// [`insert`](MappedFutures::insert)ing futures onto an existing -/// [`MappedFutures`]. When new futures are added, -/// [`poll_next`](Stream::poll_next) must be called in order to begin receiving -/// wake-ups for new futures. -/// -/// Note that you can create a ready-made [`MappedFutures`] via the -/// [`collect`](Iterator::collect) method, or you can start with an empty set -/// with the [`MappedFutures::new`] constructor. -#[derive(Debug)] -#[must_use = "streams do nothing unless polled"] -pub struct MappedFutures { - // inner: FuturesUnordered>, - task_set: HashSet>>, - futures: FuturesUnordered>, -} - -#[derive(Debug)] -struct HashTask { - inner: *const Task, - key: Arc, -} - -impl Borrow for HashTask> { - fn borrow(&self) -> &K { - // DO NOT CALL THIS - // This is required for the hash set to use a K to get the HashTask. - // let key = &unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().key; - - // panic!(); - // key.as_ref() - &self.key - } -} - -impl PartialEq for HashTask { - fn eq(&self, other: &Self) -> bool { - println!("Comparing eq keys {:?} with {:?}", self.inner, other.inner); - self.inner == other.inner - } -} - -impl Eq for HashTask {} - -impl Hash for HashTask> { - fn hash(&self, state: &mut H) { - let key = unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().key(); - // println!("Comparing hash keys {:?} ", key); - // unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().hash(state) - // unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().hash(state) - key.hash(state) - } -} - -#[derive(Debug)] -struct HashFut { - key: Arc, - future: Fut, -} - -impl HashFut { - fn key(&self) -> &K { - self.key.as_ref() - } -} - -impl PartialEq for HashFut { - fn eq(&self, other: &Self) -> bool { - self.key == other.key - } -} - -impl Eq for HashFut {} - -impl Hash for HashFut { - fn hash(&self, state: &mut H) { - self.key.hash(state); - } -} - -impl Future for HashFut { - type Output = (Arc, Fut::Output); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let inner = unsafe { Pin::into_inner_unchecked(self) }; - let res = std::task::ready!(unsafe { Pin::new_unchecked(&mut inner.future) }.poll(cx)); - // let key = inner.key; - Poll::Ready((inner.key.clone(), res)) - } -} - -unsafe impl Send for MappedFutures {} -unsafe impl Sync for MappedFutures {} -impl Unpin for MappedFutures {} - -impl Default for MappedFutures { - fn default() -> Self { - Self::new() - } -} - -impl MappedFutures { - /// Constructs a new, empty [`MappedFutures`]. - /// - /// The returned [`MappedFutures`] does not contain any futures. - /// In this state, [`MappedFutures::poll_next`](Stream::poll_next) will - /// return [`Poll::Ready(None)`](Poll::Ready). - pub fn new() -> Self { - Self { task_set: HashSet::new(), futures: FuturesUnordered::new() } - } - - /// Returns the number of futures contained in the set. - /// - /// This represents the total number of in-flight futures. - pub fn len(&self) -> usize { - self.futures.len() - } - - /// Returns `true` if the set contains no futures. - pub fn is_empty(&self) -> bool { - // Relaxed ordering can be used here since we don't need to read from - // the head pointer, only check whether it is null. - self.futures.is_empty() - } - - // /// This is a code stank - // fn set(&mut self) -> &mut HashSet> { - // &mut self.inner.inner.inner - // } - - /// Insert a future into the set. - /// - /// This method adds the given future to the set. This method will not - /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must - /// ensure that [`MappedFutures::poll_next`](Stream::poll_next) is called - /// in order to receive wake-up notifications for the given future. - /// - /// This method will remove and drop a future that is already mapped to the provided key. - /// Returns true if another future was not removed to make room for the provided future. - pub fn insert(&mut self, key: K, future: Fut) -> bool { - let replacing = self.cancel(&key); - let arc_key = Arc::new(key); - let hash_fut = HashFut { key: arc_key.clone(), future }; - let task = self.futures.push_inner(hash_fut); - let arc = unsafe { Arc::from_raw(task) }; - println!("Task inserted, ref count {:?}", Arc::strong_count(&arc)); - Arc::into_raw(arc); - // self.inner.self.hash_set.insert(task.into()); - self.task_set.insert(HashTask { key: arc_key, inner: task }); - !replacing - } - - /// Insert a future into the set and return the displaced future, if there was one. - /// - /// This method adds the given future to the set. This method will not - /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must - /// ensure that [`MappedFutures::poll_next`](Stream::poll_next) is called - /// in order to receive wake-up notifications for the given future. - /// Returns true if another future was ma - pub fn replace(&mut self, key: K, future: Fut) -> Option - where - Fut: Unpin, - { - let replacing = self.remove(&key); - self.insert(key, future); - replacing - } - - /// Remove a future from the set, dropping it. - /// - /// Returns true if a future was cancelled. - pub fn cancel(&mut self, key: &K) -> bool { - // if let Some(task) = self.hash_set.get(key) { - if let Some(task) = self.task_set.take(key) { - unsafe { - let task_arc = Arc::from_raw(task.inner); - if (*task_arc.future.get()).is_some() { - // self.futures.unlink(task.inner); - let unlinked_task = self.futures.unlink(task.inner); - self.futures.release_task(unlinked_task); - Arc::into_raw(task_arc); - return true; - } - Arc::into_raw(task_arc); - } - } - false - } - - /// Remove a future from the set and return it. - pub fn remove(&mut self, key: &K) -> Option - where - Fut: Unpin, - { - // if let Some(task) = self.hash_set.get(key) { - if let Some(task) = self.task_set.take(key) { - unsafe { - let arc_task = Arc::from_raw(task.inner); - let fut = (*arc_task.future.get()).take().unwrap(); - let unlinked_task = self.futures.unlink(task.inner); - self.futures.release_task(unlinked_task); - Arc::into_raw(arc_task); - return Some(fut.future); - } - } - None - } - - /// Returns `true` if the map contains a future for the specified key. - pub fn contains(&mut self, key: &K) -> bool { - self.task_set.contains(key) - } - - /// Get a pinned mutable reference to the mapped future. - pub fn get_pin_mut(&mut self, key: &K) -> Option> { - // if let Some(task_ref) = self.hash_set.get(key) { - if let Some(task_ref) = self.task_set.get(key) { - unsafe { - // let mut fut = ; - // if let Some(mut fut) = fut { - if let Some(ref mut fut) = *Arc::from_raw(task_ref.inner).future.get() { - return Some(Pin::new_unchecked(&mut fut.future)); - } - } - } - None - } - - /// Get a pinned mutable reference to the mapped future. - pub fn get_mut(&mut self, key: &K) -> Option<&mut Fut> - where - Fut: Unpin, - { - // if let Some(task_ref) = self.hash_set.get(key) { - if let Some(task_ref) = self.task_set.get(key) { - unsafe { - // if let Some(fut) = &mut *task_ref.inner.future.get() { - let task = Arc::from_raw(task_ref.inner); - if let Some(ref mut fut) = *task.future.get() { - println!("Future found"); - Arc::into_raw(task); - return Some(&mut fut.future); - } - println!("Task get_mut, ref count {:?}", Arc::strong_count(&task)); - - Arc::into_raw(task); - } - } - None - } - - /// Get a shared reference to the mapped future. - pub fn get(&mut self, key: &K) -> Option<&Fut> { - // if let Some(task_ref) = self.hash_set.get(key) { - // if let Some(task_ref) = self.set().get(key) { - if let Some(task_ref) = self.task_set.get(key) { - unsafe { - // if let Some(fut) = &*task_ref.inner.future.get() { - let task = Arc::from_raw(task_ref.inner); - if let Some(ref mut fut) = *task.future.get() { - return Some(&fut.future); - } - Arc::into_raw(task); - } - } - None - } - - /// Get a pinned shared reference to the mapped future. - pub fn get_pin(&mut self, key: &K) -> Option> { - // if let Some(task_ref) = self.hash_set.get(key) { - // if let Some(task_ref) = self.set().get(key) { - if let Some(task_ref) = self.task_set.get(key) { - unsafe { - // if let Some(fut) = &*task_ref.future.get() { - let task = Arc::from_raw(task_ref.inner); - if let Some(ref mut fut) = *task.future.get() { - return Some(Pin::new_unchecked(&fut.future)); - } - Arc::into_raw(task); - } - } - None - } - - /// Returns an iterator of keys in the mapping. - pub fn keys_pin<'a>(self: Pin<&'a Self>) -> KeysPin<'a, K, Fut> { - KeysPin { - // inner: self.hash_set.iter().map(Box::new(|hash_task| HashTask::key_unwrap(hash_task))), - // inner: self.set().iter().map(Box::new(|hash_task| HashTask::key_unwrap(hash_task))), - // inner: self.set().keys(), - inner: unsafe { self.map_unchecked(|f| &f.futures) }.iter_pin_ref(), - } - } - - /// Returns an iterator of keys in the mapping. - pub fn keys<'a>(&'a self) -> Keys<'a, K, Fut> - where - K: Unpin, - Fut: Unpin, - { - Keys { - // inner: self.hash_set.iter().map(Box::new(|hash_task| HashTask::key_unwrap(hash_task))), - // inner: self.set().iter().map(Box::new(|hash_task| HashTask::key_unwrap(hash_task))), - // inner: self.set().keys(), - inner: Pin::new(self).keys_pin(), - } - } - - /// Returns an iterator that allows inspecting each future in the set. - pub fn iter(&self) -> MapIter<'_, K, Fut> - where - Fut: Unpin, - K: Unpin, - { - MapIter(Pin::new(self).iter_pin_ref()) - } - - /// Returns an iterator that allows inspecting each future in the set. - pub fn iter_pin_ref(self: Pin<&Self>) -> MapIterPinRef<'_, K, Fut> { - MapIterPinRef { inner: unsafe { self.map_unchecked(|f| &f.futures) }.iter_pin_ref() } - } - - /// Returns an iterator that allows modifying each future in the set. - pub fn iter_mut(&mut self) -> MapIterMut<'_, K, Fut> - where - Fut: Unpin, - { - MapIterMut(Pin::new(self).iter_pin_mut()) - } - - /// Returns an iterator that allows modifying each future in the set. - pub fn iter_pin_mut(self: Pin<&mut Self>) -> MapIterPinMut<'_, K, Fut> { - // `head_all` can be accessed directly and we don't need to spin on - // `Task::next_all` since we have exclusive access to the set. - - // IterPinMut { inner: Pin::new(&mut self.inner).iter_pin_mut() } - MapIterPinMut(unsafe { self.map_unchecked_mut(|thing| &mut thing.futures) }.iter_pin_mut()) - // IterPinMut { task, len, _marker: PhantomData } - } -} - -impl Stream for MappedFutures { - type Item = (K, Fut::Output); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match std::task::ready!(Pin::new(&mut self.futures).poll_next(cx)) { - Some(output) => { - let key = output.0; - self.task_set.remove(key.as_ref()); - - println!("Task complete, ref count {:?}", Arc::strong_count(&key)); - Poll::Ready(Some((Arc::into_inner(key).unwrap(), output.1))) - } - None => Poll::Ready(None), - } - } - - fn size_hint(&self) -> (usize, Option) { - let len = self.len(); - (len, Some(len)) - } -} - -/// Immutable iterator over all keys in the mapping. -#[derive(Debug)] -pub struct KeysPin<'a, K: Hash + Eq + Unpin, Fut> { - pub(super) inner: IterPinRef<'a, HashFut>, -} - -impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for KeysPin<'a, K, Fut> { - type Item = &'a K; - - fn next(&mut self) -> Option { - Some(&(*self.inner.next().as_ref()?).get_ref().key()) - } -} - -/// Immutable iterator over all keys in the mapping. -#[derive(Debug)] -pub struct Keys<'a, K: Hash + Eq + Unpin, Fut: Unpin> { - pub(super) inner: KeysPin<'a, K, Fut>, -} - -impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for Keys<'a, K, Fut> { - type Item = &'a K; - - fn next(&mut self) -> Option { - self.inner.next() - } -} - -/// Immutable iterator over all keys in the mapping. -#[derive(Debug)] -pub struct MapIterPinRef<'a, K: Hash + Eq + Unpin, Fut> { - pub(super) inner: IterPinRef<'a, HashFut>, -} - -/// Immutable iterator over all keys in the mapping. -#[derive(Debug)] -pub struct MapIterPinMut<'a, K: Hash + Eq + Unpin, Fut>(pub(super) IterPinMut<'a, HashFut>); - -#[derive(Debug)] -pub struct MapIterMut<'a, K: Hash + Eq + Unpin, Fut>(pub(super) MapIterPinMut<'a, K, Fut>); - -#[derive(Debug)] -pub struct MapIter<'a, K: Hash + Eq + Unpin, Fut>(pub(super) MapIterPinRef<'a, K, Fut>); - -impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIterMut<'a, K, Fut> { - type Item = (&'a K, &'a mut Fut); - - fn next(&mut self) -> Option { - let next = self.0.next()?; - Some((&next.0, Pin::into_inner(next.1))) - } -} - -impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinMut<'a, K, Fut> { - type Item = (&'a K, Pin<&'a mut Fut>); - - fn next(&mut self) -> Option { - let next = unsafe { Pin::into_inner_unchecked(self.0.next()?) }; - Some((&next.key.as_ref(), unsafe { Pin::new_unchecked(&mut next.future) })) - } -} - -impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinRef<'a, K, Fut> { - type Item = (&'a K, Pin<&'a Fut>); - - fn next(&mut self) -> Option { - let next = self.inner.next()?; - let fut = unsafe { next.map_unchecked(|f| &f.future) }; - // Some((&next.get_ref().key.as_ref().unwrap(), fut)) - Some((next.get_ref().key(), fut)) - } -} - -impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIter<'a, K, Fut> { - type Item = (&'a K, &'a Fut); - - fn next(&mut self) -> Option { - let next = self.0.next()?; - let key = next.0; - // let fut = Pin::get_ref(next.1.as_ref()); - Some((key, Pin::into_inner(next.1))) - } -} - -#[cfg(test)] -pub mod tests { - use crate::stream::*; - use futures::executor::block_on; - use futures::future::LocalBoxFuture; - use futures_timer::Delay; - use futures_unordered::MappedFutures; - // use futures_util::StreamExt; - // use crate::StreamExt; - use std::boxed::Box; - use std::println; - use std::time::Duration; - - fn insert_millis(futs: &mut MappedFutures, key: u32, millis: u64) { - futs.insert(key, Delay::new(Duration::from_millis(millis))); - } - - fn insert_millis_pinned( - futs: &mut MappedFutures>, - key: u32, - millis: u64, - ) { - futs.insert(key, Box::pin(Delay::new(Duration::from_millis(millis)))); - } - - #[test] - fn mf_map_futures() { - let mut futures: MappedFutures = MappedFutures::new(); - insert_millis(&mut futures, 1, 50); - insert_millis(&mut futures, 2, 75); - insert_millis(&mut futures, 3, 150); - insert_millis(&mut futures, 4, 200); - - assert_eq!(block_on(futures.next()).unwrap().0, 1); - assert_eq!(futures.cancel(&3), true); - assert_eq!(block_on(futures.next()).unwrap().0, 2); - assert_eq!(block_on(futures.next()).unwrap().0, 4); - assert_eq!(block_on(futures.next()), None); - } - - #[test] - fn mf_remove_pinned() { - let mut futures: MappedFutures> = MappedFutures::new(); - insert_millis_pinned(&mut futures, 1, 50); - insert_millis_pinned(&mut futures, 3, 150); - insert_millis_pinned(&mut futures, 4, 200); - - assert_eq!(block_on(futures.next()).unwrap().0, 1); - assert_eq!(block_on(futures.remove(&3).unwrap()), ()); - insert_millis_pinned(&mut futures, 2, 60); - assert_eq!(block_on(futures.next()).unwrap().0, 4); - assert_eq!(block_on(futures.next()).unwrap().0, 2); - assert_eq!(block_on(futures.next()), None); - } - - #[test] - fn mf_mutate() { - let mut futures: MappedFutures = MappedFutures::new(); - insert_millis(&mut futures, 1, 500); - println!("{:?}", futures); - println!("{:?}", futures.get_mut(&1)); - insert_millis(&mut futures, 2, 1000); - insert_millis(&mut futures, 3, 1500); - insert_millis(&mut futures, 4, 2000); - - assert_eq!(block_on(futures.next()).unwrap().0, 1); - futures.get_mut(&3).unwrap().reset(Duration::from_millis(300)); - assert_eq!(block_on(futures.next()).unwrap().0, 3); - assert_eq!(block_on(futures.next()).unwrap().0, 2); - assert_eq!(block_on(futures.next()).unwrap().0, 4); - assert_eq!(block_on(futures.next()), None); - } -} diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 4b9e086e07..789e1ad221 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -115,9 +115,6 @@ mod futures_ordered; #[cfg(feature = "alloc")] pub use self::futures_ordered::FuturesOrdered; -#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] -#[cfg(feature = "alloc")] -pub mod futures_mapped; #[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))] #[cfg(feature = "alloc")] pub mod futures_unordered; From 9db37c6b05b1560cdadaa1de7729ebca7ec65ea0 Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 16:06:14 -0500 Subject: [PATCH 03/21] resolve lints and add std feature requirement for MappedFutures --- .../src/stream/futures_unordered/iter.rs | 27 ------------------- .../futures_unordered/mapped_futures/mod.rs | 2 +- .../src/stream/futures_unordered/mod.rs | 1 + 3 files changed, 2 insertions(+), 28 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/iter.rs b/futures-util/src/stream/futures_unordered/iter.rs index 6eb3d6e6cd..20248c70fe 100644 --- a/futures-util/src/stream/futures_unordered/iter.rs +++ b/futures-util/src/stream/futures_unordered/iter.rs @@ -13,19 +13,6 @@ pub struct IterPinMut<'a, Fut> { pub(super) _marker: PhantomData<&'a mut FuturesUnordered>, } -/// Mutable iterator over all keys and futures in the map. -// #[derive(Debug)] -// pub struct MapIterPinMut<'a, K, Fut> { -// pub(super) inner: IterMut<'a, K, Fut>, -// // pub(super) task: *const Task, -// // pub(super) len: usize, -// // pub(super) _marker: PhantomData<&'a mut FuturesUnordered>, -// } - -/// Immutable iterator over all the keys and futures in the map. -// #[derive(Debug)] -// pub struct IterMap<'a, K: Hash + Eq, Fut: Unpin>(pub(super) IterPinRef<'a, K, Fut>); - /// Mutable iterator over all futures in the unordered set. #[derive(Debug)] pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); @@ -39,20 +26,6 @@ pub struct IterPinRef<'a, Fut> { pub(super) _marker: PhantomData<&'a FuturesUnordered>, } -/// Immutable iterator over all keys and futures in the map. -// #[derive(Debug)] -// pub struct MapIterPinRef<'a, K, Fut> { -// inner: HashMapIter<'a, K, *const Task>, -// // pub(super) task: *const Task, -// // pub(super) len: usize, -// // pub(super) pending_next_all: *mut Task, -// // pub(super) _marker: PhantomData<&'a FuturesUnordered>, -// } - -/// Immutable iterator over all keys and futures in the map. -// #[derive(Debug)] -// pub struct MapIter<'a, K, Fut>(pub(super) MapIterPinRef<'a, K, Fut>); - /// Immutable iterator over all the futures in the unordered set. #[derive(Debug)] pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index dde5a36928..fb8a0128a6 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -280,7 +280,7 @@ impl MappedFutures { } /// Returns an iterator of keys in the mapping. - pub fn keys<'a>(&'a self) -> Keys<'a, K, Fut> + pub fn keys(&self) -> Keys<'_, K, Fut> where K: Unpin, Fut: Unpin, diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index f8efedbc78..6a48adc6bd 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -19,6 +19,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +#[cfg(feature = "std")] pub mod mapped_futures; mod abort; From fbe6f207de4b46acdb4c58dffe64304600620e82 Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 16:28:57 -0500 Subject: [PATCH 04/21] fix lint; remove unstable ready! macro; remove 1.70 Arc::into_inner --- .../futures_unordered/mapped_futures/mod.rs | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index fb8a0128a6..9f9f5f9316 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -97,8 +97,10 @@ impl Future for HashFut { type Output = (Arc, Fut::Output); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let inner = unsafe { Pin::into_inner_unchecked(self) }; - let res = std::task::ready!(unsafe { Pin::new_unchecked(&mut inner.future) }.poll(cx)); - Poll::Ready((inner.key.clone(), res)) + match unsafe { Pin::new_unchecked(&mut inner.future) }.poll(cx) { + Poll::Ready(res) => Poll::Ready((inner.key.clone(), res)), + Poll::Pending => Poll::Pending, + } } } @@ -320,13 +322,20 @@ impl Stream for MappedFutures { type Item = (K, Fut::Output); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match std::task::ready!(Pin::new(&mut self.futures).poll_next(cx)) { - Some(output) => { + match Pin::new(&mut self.futures).poll_next(cx) { + Poll::Ready(Some(output)) => { let key = output.0; self.task_set.remove(key.as_ref()); - Poll::Ready(Some((Arc::into_inner(key).unwrap(), output.1))) + // Arc::into_inner() only available in >=1.70.0 + // Poll::Ready(Some((Arc::into_inner(key).unwrap(), output.1))) + // + // Arc::try_unwrap() is acceptable because keys are only kept 1) in the HashSet, + // and 2) in the HashFut. The complete future has already been dropped here, + // so the remaining Arc will always have a strong ref count of 1 + Poll::Ready(Some((Arc::try_unwrap(key).ok().unwrap(), output.1))) } - None => Poll::Ready(None), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } } @@ -446,7 +455,7 @@ pub mod tests { insert_millis(&mut futures, 4, 200); assert_eq!(block_on(futures.next()).unwrap().0, 1); - assert_eq!(futures.cancel(&3), true); + assert!(futures.cancel(&3)); assert_eq!(block_on(futures.next()).unwrap().0, 2); assert_eq!(block_on(futures.next()).unwrap().0, 4); assert_eq!(block_on(futures.next()), None); From 344b41f658fd69d612bdf0e53e115495cc5df44f Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 16:33:45 -0500 Subject: [PATCH 05/21] fix lint --- futures-util/src/stream/futures_unordered/mapped_futures/mod.rs | 2 +- futures-util/src/stream/futures_unordered/mod.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 9f9f5f9316..26c4c34f70 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -469,7 +469,7 @@ pub mod tests { insert_millis_pinned(&mut futures, 4, 200); assert_eq!(block_on(futures.next()).unwrap().0, 1); - assert_eq!(block_on(futures.remove(&3).unwrap()), ()); + block_on(futures.remove(&3).unwrap()); insert_millis_pinned(&mut futures, 2, 60); assert_eq!(block_on(futures.next()).unwrap().0, 4); assert_eq!(block_on(futures.next()).unwrap().0, 2); diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 6a48adc6bd..cfbf57bf3d 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -19,6 +19,7 @@ use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +// mapped_futures uses std::collections::HashSet #[cfg(feature = "std")] pub mod mapped_futures; From 96bf4da6e7a097a6573c4efbffbc09f44f9b747a Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 17:59:44 -0500 Subject: [PATCH 06/21] fix unsoundness; switch futures_timer to tokio::time; add additional newer tokio depenedency --- futures-util/Cargo.toml | 4 + .../futures_unordered/mapped_futures/mod.rs | 80 ++++++++++++------- 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index ff995ec0e3..6104824815 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -48,7 +48,11 @@ spin = { version = "0.10.0", optional = true } futures = { path = "../futures", features = ["async-await", "thread-pool"] } futures-test = { path = "../futures-test" } tokio = "0.1.11" +# MappedFutures requires tokio::time::sleep, which is unavailable in the above tokio version +tokio_new = {package = "tokio", git = "https://github.com/tokio-rs/tokio", version = "1.44.1", features = ["time", "rt", "macros"]} + futures-timer = "3.0.3" +futures-time = "3.0.0" [package.metadata.docs.rs] all-features = true diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 26c4c34f70..5e42b3931f 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -221,9 +221,12 @@ impl MappedFutures { pub fn get_pin_mut(&mut self, key: &K) -> Option> { if let Some(task_ref) = self.task_set.get(key) { unsafe { - if let Some(ref mut fut) = *Arc::from_raw(task_ref.inner).future.get() { + let arc_task = Arc::from_raw(task_ref.inner); + if let Some(ref mut fut) = *arc_task.future.get() { + let _ = Arc::into_raw(arc_task); return Some(Pin::new_unchecked(&mut fut.future)); } + let _ = Arc::into_raw(arc_task); } } None @@ -254,6 +257,7 @@ impl MappedFutures { unsafe { let task = Arc::from_raw(task_ref.inner); if let Some(ref mut fut) = *task.future.get() { + let _ = Arc::into_raw(task); return Some(&fut.future); } let _ = Arc::into_raw(task); @@ -268,6 +272,7 @@ impl MappedFutures { unsafe { let task = Arc::from_raw(task_ref.inner); if let Some(ref mut fut) = *task.future.get() { + let _ = Arc::into_raw(task); return Some(Pin::new_unchecked(&fut.future)); } let _ = Arc::into_raw(task); @@ -427,15 +432,16 @@ impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIter<'a, K, Fut> { #[cfg(test)] pub mod tests { use crate::stream::*; - use futures::executor::block_on; use futures::future::LocalBoxFuture; - use futures_timer::Delay; use futures_unordered::mapped_futures::MappedFutures; use std::boxed::Box; - use std::time::Duration; + use tokio::time::{sleep, Duration, Instant, Sleep}; + // Two tokio versions available, use the right one + use tokio_new as tokio; - fn insert_millis(futs: &mut MappedFutures, key: u32, millis: u64) { - futs.insert(key, Delay::new(Duration::from_millis(millis))); + fn insert_millis(futs: &mut MappedFutures, key: u32, millis: u64) { + // futs.insert(key, Delay::new(Duration::from_millis(millis))); + futs.insert(key, sleep(Duration::from_millis(millis))); } fn insert_millis_pinned( @@ -443,52 +449,66 @@ pub mod tests { key: u32, millis: u64, ) { - futs.insert(key, Box::pin(Delay::new(Duration::from_millis(millis)))); + // futs.insert(key, Box::pin(Delay::new(Duration::from_millis(millis)))); + futs.insert(key, Box::pin(sleep(Duration::from_millis(millis)))); + } + + #[tokio::test] + async fn mf_test_delay() { + // let mut futures: MappedFutures = MappedFutures::new(); + // insert_millis(&mut futures, 1, 50); + // assert_eq!(futures.next().await.unwrap().0, 2); + sleep(Duration::from_millis(500)).await; } - #[test] - fn mf_map_futures() { - let mut futures: MappedFutures = MappedFutures::new(); + #[tokio::test] + async fn mf_map_futures() { + let mut futures: MappedFutures = MappedFutures::new(); insert_millis(&mut futures, 1, 50); insert_millis(&mut futures, 2, 75); insert_millis(&mut futures, 3, 150); insert_millis(&mut futures, 4, 200); - assert_eq!(block_on(futures.next()).unwrap().0, 1); + assert_eq!(futures.next().await.unwrap().0, 1); assert!(futures.cancel(&3)); - assert_eq!(block_on(futures.next()).unwrap().0, 2); - assert_eq!(block_on(futures.next()).unwrap().0, 4); - assert_eq!(block_on(futures.next()), None); + assert_eq!(futures.next().await.unwrap().0, 2); + assert_eq!(futures.next().await.unwrap().0, 4); + assert_eq!(futures.next().await, None); } - #[test] - fn mf_remove_pinned() { + #[tokio::test] + async fn mf_remove_pinned() { let mut futures: MappedFutures> = MappedFutures::new(); insert_millis_pinned(&mut futures, 1, 50); insert_millis_pinned(&mut futures, 3, 150); insert_millis_pinned(&mut futures, 4, 200); - assert_eq!(block_on(futures.next()).unwrap().0, 1); - block_on(futures.remove(&3).unwrap()); + assert_eq!(futures.next().await.unwrap().0, 1); + futures.remove(&3).unwrap().await; insert_millis_pinned(&mut futures, 2, 60); - assert_eq!(block_on(futures.next()).unwrap().0, 4); - assert_eq!(block_on(futures.next()).unwrap().0, 2); - assert_eq!(block_on(futures.next()), None); + assert_eq!(futures.next().await.unwrap().0, 4); + assert_eq!(futures.next().await.unwrap().0, 2); + assert_eq!(futures.next().await, None); } - #[test] - fn mf_mutate() { - let mut futures: MappedFutures = MappedFutures::new(); + #[tokio::test] + async fn mf_mutate() { + let mut futures: MappedFutures = MappedFutures::new(); insert_millis(&mut futures, 1, 500); insert_millis(&mut futures, 2, 1000); insert_millis(&mut futures, 3, 1500); insert_millis(&mut futures, 4, 2000); - assert_eq!(block_on(futures.next()).unwrap().0, 1); - futures.get_mut(&3).unwrap().reset(Duration::from_millis(300)); - assert_eq!(block_on(futures.next()).unwrap().0, 3); - assert_eq!(block_on(futures.next()).unwrap().0, 2); - assert_eq!(block_on(futures.next()).unwrap().0, 4); - assert_eq!(block_on(futures.next()), None); + assert_eq!(futures.next().await.unwrap().0, 1); + // futures.get_mut(&3).unwrap().reset(Duration::from_millis(300)); + futures + .get_pin_mut(&3) + .unwrap() + .as_mut() + .reset(Instant::now() + Duration::from_millis(300)); + assert_eq!(futures.next().await.unwrap().0, 3); + assert_eq!(futures.next().await.unwrap().0, 2); + assert_eq!(futures.next().await.unwrap().0, 4); + assert_eq!(futures.next().await, None); } } From 1f58094d3e03269cd583e39962f881be045e5d63 Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 18:05:44 -0500 Subject: [PATCH 07/21] downgrade new tokio --- futures-util/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 6104824815..de2544e93e 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -49,7 +49,7 @@ futures = { path = "../futures", features = ["async-await", "thread-pool"] } futures-test = { path = "../futures-test" } tokio = "0.1.11" # MappedFutures requires tokio::time::sleep, which is unavailable in the above tokio version -tokio_new = {package = "tokio", git = "https://github.com/tokio-rs/tokio", version = "1.44.1", features = ["time", "rt", "macros"]} +tokio_new = {package = "tokio", git = "https://github.com/tokio-rs/tokio", version = "^1.1.0", features = ["time", "rt", "macros"]} futures-timer = "3.0.3" futures-time = "3.0.0" From 9a4b9ef2ad0602a78691ec94545456e9964fe7f1 Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 18:17:08 -0500 Subject: [PATCH 08/21] use most recent cargo2018-compatiable tokio version --- futures-util/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index de2544e93e..b85f821f56 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -49,7 +49,7 @@ futures = { path = "../futures", features = ["async-await", "thread-pool"] } futures-test = { path = "../futures-test" } tokio = "0.1.11" # MappedFutures requires tokio::time::sleep, which is unavailable in the above tokio version -tokio_new = {package = "tokio", git = "https://github.com/tokio-rs/tokio", version = "^1.1.0", features = ["time", "rt", "macros"]} +tokio_new = {package = "tokio", git = "https://github.com/tokio-rs/tokio", branch = "tokio-1.25.x", features = ["time", "rt", "macros"]} futures-timer = "3.0.3" futures-time = "3.0.0" From eb361a1fe841658bca8159751144e5f3e0fd470b Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 20:20:35 -0500 Subject: [PATCH 09/21] avoid unsoundness by pointerizing arc before release of task --- .../src/stream/futures_unordered/mapped_futures/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 5e42b3931f..3842f498c1 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -152,8 +152,6 @@ impl MappedFutures { let arc_key = Arc::new(key); let hash_fut = HashFut { key: arc_key.clone(), future }; let task = self.futures.push_inner(hash_fut); - let arc = unsafe { Arc::from_raw(task) }; - let _ = Arc::into_raw(arc); self.task_set.insert(HashTask { key: arc_key, inner: task }); !replacing } @@ -182,9 +180,9 @@ impl MappedFutures { unsafe { let task_arc = Arc::from_raw(task.inner); if (*task_arc.future.get()).is_some() { + let _ = Arc::into_raw(task_arc); let unlinked_task = self.futures.unlink(task.inner); self.futures.release_task(unlinked_task); - let _ = Arc::into_raw(task_arc); return true; } let _ = Arc::into_raw(task_arc); @@ -203,9 +201,9 @@ impl MappedFutures { unsafe { let arc_task = Arc::from_raw(task.inner); let fut = (*arc_task.future.get()).take().unwrap(); + let _ = Arc::into_raw(arc_task); let unlinked_task = self.futures.unlink(task.inner); self.futures.release_task(unlinked_task); - let _ = Arc::into_raw(arc_task); return Some(fut.future); } } From 3eedd7075779f1d36b8acc6e7616ec38e12dbe24 Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 20:46:26 -0500 Subject: [PATCH 10/21] remove timer deps --- futures-util/Cargo.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index b85f821f56..01ce86be3b 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -51,9 +51,6 @@ tokio = "0.1.11" # MappedFutures requires tokio::time::sleep, which is unavailable in the above tokio version tokio_new = {package = "tokio", git = "https://github.com/tokio-rs/tokio", branch = "tokio-1.25.x", features = ["time", "rt", "macros"]} -futures-timer = "3.0.3" -futures-time = "3.0.0" - [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] From 6cc507a892240be8512773a5e2980d84103a4b8a Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 21:13:25 -0500 Subject: [PATCH 11/21] remove Unpin key req, add Send and Sync impls, add ExactSizeIterator traits to iterators --- .../futures_unordered/mapped_futures/mod.rs | 102 ++++++++++-------- 1 file changed, 57 insertions(+), 45 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 3842f498c1..366de98341 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -35,7 +35,7 @@ use super::{FuturesUnordered, IterPinMut, IterPinRef}; /// with the [`MappedFutures::new`] constructor. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] -pub struct MappedFutures { +pub struct MappedFutures { task_set: HashSet>>, futures: FuturesUnordered>, } @@ -46,7 +46,7 @@ struct HashTask { key: Arc, } -impl Borrow for HashTask> { +impl Borrow for HashTask> { fn borrow(&self) -> &K { &self.key } @@ -60,7 +60,7 @@ impl PartialEq for HashTask { impl Eq for HashTask {} -impl Hash for HashTask> { +impl Hash for HashTask> { fn hash(&self, state: &mut H) { let key = unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().key(); key.hash(state) @@ -68,32 +68,32 @@ impl Hash for HashTask> { } #[derive(Debug)] -struct HashFut { +struct HashFut { key: Arc, future: Fut, } -impl HashFut { +impl HashFut { fn key(&self) -> &K { self.key.as_ref() } } -impl PartialEq for HashFut { +impl PartialEq for HashFut { fn eq(&self, other: &Self) -> bool { self.key == other.key } } -impl Eq for HashFut {} +impl Eq for HashFut {} -impl Hash for HashFut { +impl Hash for HashFut { fn hash(&self, state: &mut H) { self.key.hash(state); } } -impl Future for HashFut { +impl Future for HashFut { type Output = (Arc, Fut::Output); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let inner = unsafe { Pin::into_inner_unchecked(self) }; @@ -104,17 +104,17 @@ impl Future for HashFut { } } -unsafe impl Send for MappedFutures {} -unsafe impl Sync for MappedFutures {} -impl Unpin for MappedFutures {} +unsafe impl Send for MappedFutures {} +unsafe impl Sync for MappedFutures {} +impl Unpin for MappedFutures {} -impl Default for MappedFutures { +impl Default for MappedFutures { fn default() -> Self { Self::new() } } -impl MappedFutures { +impl MappedFutures { /// Constructs a new, empty [`MappedFutures`]. /// /// The returned [`MappedFutures`] does not contain any futures. @@ -279,18 +279,13 @@ impl MappedFutures { None } - /// Returns an iterator of keys in the mapping. - pub fn keys_pin<'a>(self: Pin<&'a Self>) -> KeysPin<'a, K, Fut> { - KeysPin(unsafe { self.map_unchecked(|f| &f.futures) }.iter_pin_ref()) - } - /// Returns an iterator of keys in the mapping. pub fn keys(&self) -> Keys<'_, K, Fut> where K: Unpin, - Fut: Unpin, { - Keys(Pin::new(self).keys_pin()) + Keys(self.task_set.iter()) + // Keys(Pin::new(self).keys_pin()) } /// Returns an iterator that allows inspecting each future in the set. @@ -321,7 +316,7 @@ impl MappedFutures { } } -impl Stream for MappedFutures { +impl Stream for MappedFutures { type Item = (K, Fut::Output); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -350,63 +345,61 @@ impl Stream for MappedFutures { /// Immutable iterator over all keys in the mapping. #[derive(Debug)] -pub struct KeysPin<'a, K: Hash + Eq + Unpin, Fut>(IterPinRef<'a, HashFut>); - -impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for KeysPin<'a, K, Fut> { - type Item = &'a K; - - fn next(&mut self) -> Option { - Some(&(*self.0.next().as_ref()?).get_ref().key()) - } -} - -/// Immutable iterator over all keys in the mapping. -#[derive(Debug)] -pub struct Keys<'a, K: Hash + Eq + Unpin, Fut: Unpin>(KeysPin<'a, K, Fut>); +pub struct Keys<'a, K: Hash + Eq, Fut>( + std::collections::hash_set::Iter<'a, HashTask>>, +); -impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for Keys<'a, K, Fut> { +impl<'a, K: Hash + Eq, Fut> Iterator for Keys<'a, K, Fut> { type Item = &'a K; fn next(&mut self) -> Option { - self.0.next() + self.0.next().map(|hash_task| hash_task.key.as_ref()) } } /// Immutable iterator over all keys in the mapping. #[derive(Debug)] -pub struct MapIterPinRef<'a, K: Hash + Eq + Unpin, Fut>(IterPinRef<'a, HashFut>); +pub struct MapIterPinRef<'a, K: Hash + Eq, Fut>(IterPinRef<'a, HashFut>); /// Immutable iterator over all keys in the mapping. #[derive(Debug)] -pub struct MapIterPinMut<'a, K: Hash + Eq + Unpin, Fut>(IterPinMut<'a, HashFut>); +pub struct MapIterPinMut<'a, K: Hash + Eq, Fut>(IterPinMut<'a, HashFut>); /// Mutable iterator over all keys and futures in the map. #[derive(Debug)] -pub struct MapIterMut<'a, K: Hash + Eq + Unpin, Fut>(MapIterPinMut<'a, K, Fut>); +pub struct MapIterMut<'a, K: Hash + Eq, Fut>(MapIterPinMut<'a, K, Fut>); /// Immutable iterator over all the keys and futures in the map. #[derive(Debug)] -pub struct MapIter<'a, K: Hash + Eq + Unpin, Fut>(MapIterPinRef<'a, K, Fut>); +pub struct MapIter<'a, K: Hash + Eq, Fut>(MapIterPinRef<'a, K, Fut>); -impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIterMut<'a, K, Fut> { +impl<'a, K: Hash + Eq, Fut: Unpin> Iterator for MapIterMut<'a, K, Fut> { type Item = (&'a K, &'a mut Fut); fn next(&mut self) -> Option { let next = self.0.next()?; Some((&next.0, Pin::into_inner(next.1))) } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } } -impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinMut<'a, K, Fut> { +impl<'a, K: Hash + Eq, Fut> Iterator for MapIterPinMut<'a, K, Fut> { type Item = (&'a K, Pin<&'a mut Fut>); fn next(&mut self) -> Option { let next = unsafe { Pin::into_inner_unchecked(self.0.next()?) }; Some((&next.key.as_ref(), unsafe { Pin::new_unchecked(&mut next.future) })) } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } } -impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinRef<'a, K, Fut> { +impl<'a, K: Hash + Eq, Fut> Iterator for MapIterPinRef<'a, K, Fut> { type Item = (&'a K, Pin<&'a Fut>); fn next(&mut self) -> Option { @@ -414,9 +407,13 @@ impl<'a, K: Hash + Eq + Unpin, Fut> Iterator for MapIterPinRef<'a, K, Fut> { let fut = unsafe { next.map_unchecked(|f| &f.future) }; Some((next.get_ref().key(), fut)) } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } } -impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIter<'a, K, Fut> { +impl<'a, K: Hash + Eq, Fut: Unpin> Iterator for MapIter<'a, K, Fut> { type Item = (&'a K, &'a Fut); fn next(&mut self) -> Option { @@ -424,7 +421,22 @@ impl<'a, K: Hash + Eq + Unpin, Fut: Unpin> Iterator for MapIter<'a, K, Fut> { let key = next.0; Some((key, Pin::into_inner(next.1))) } + + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } } +impl ExactSizeIterator for MapIter<'_, K, Fut> {} +impl ExactSizeIterator for MapIterPinRef<'_, K, Fut> {} +impl ExactSizeIterator for MapIterMut<'_, K, Fut> {} +impl ExactSizeIterator for MapIterPinMut<'_, K, Fut> {} +impl ExactSizeIterator for Keys<'_, K, Fut> {} + +unsafe impl Send for MapIterPinMut<'_, K, Fut> {} +unsafe impl Sync for MapIterPinMut<'_, K, Fut> {} + +unsafe impl Send for MapIterPinRef<'_, K, Fut> {} +unsafe impl Sync for MapIterPinRef<'_, K, Fut> {} /// Tests for MappedFutures #[cfg(test)] From 2a7c0c110e103b5bcd95cc6149306d247be1703a Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 22:40:27 -0500 Subject: [PATCH 12/21] extract task ref counting --- .../futures_unordered/mapped_futures/mod.rs | 82 +++++++------------ 1 file changed, 28 insertions(+), 54 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 366de98341..a663c482ed 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -172,20 +172,27 @@ impl MappedFutures { replacing } + fn get_task_future(task: &HashTask>) -> Option<&mut HashFut> { + unsafe { + let arc_task = Arc::from_raw(task.inner); + if let Some(ref mut fut) = *arc_task.future.get() { + let _ = Arc::into_raw(arc_task); + return Some(fut); + } + let _ = Arc::into_raw(arc_task); + None + } + } + /// Remove a future from the set, dropping it. /// /// Returns true if a future was cancelled. pub fn cancel(&mut self, key: &K) -> bool { if let Some(task) = self.task_set.take(key) { - unsafe { - let task_arc = Arc::from_raw(task.inner); - if (*task_arc.future.get()).is_some() { - let _ = Arc::into_raw(task_arc); - let unlinked_task = self.futures.unlink(task.inner); - self.futures.release_task(unlinked_task); - return true; - } - let _ = Arc::into_raw(task_arc); + if Self::get_task_future(&task).is_some() { + let unlinked_task = unsafe { self.futures.unlink(task.inner) }; + self.futures.release_task(unlinked_task); + return true; } } false @@ -217,17 +224,11 @@ impl MappedFutures { /// Get a pinned mutable reference to the mapped future. pub fn get_pin_mut(&mut self, key: &K) -> Option> { - if let Some(task_ref) = self.task_set.get(key) { - unsafe { - let arc_task = Arc::from_raw(task_ref.inner); - if let Some(ref mut fut) = *arc_task.future.get() { - let _ = Arc::into_raw(arc_task); - return Some(Pin::new_unchecked(&mut fut.future)); - } - let _ = Arc::into_raw(arc_task); - } - } - None + self.task_set + .get(key) + .map(Self::get_task_future) + .flatten() + .map(|f| unsafe { Pin::new_unchecked(&mut f.future) }) } /// Get a pinned mutable reference to the mapped future. @@ -235,48 +236,21 @@ impl MappedFutures { where Fut: Unpin, { - if let Some(task_ref) = self.task_set.get(key) { - unsafe { - let task = Arc::from_raw(task_ref.inner); - if let Some(ref mut fut) = *task.future.get() { - let _ = Arc::into_raw(task); - return Some(&mut fut.future); - } - - let _ = Arc::into_raw(task); - } - } - None + Self::get_task_future(self.task_set.get(key)?).map(|f| &mut f.future) } /// Get a shared reference to the mapped future. pub fn get(&mut self, key: &K) -> Option<&Fut> { - if let Some(task_ref) = self.task_set.get(key) { - unsafe { - let task = Arc::from_raw(task_ref.inner); - if let Some(ref mut fut) = *task.future.get() { - let _ = Arc::into_raw(task); - return Some(&fut.future); - } - let _ = Arc::into_raw(task); - } - } - None + self.task_set.get(key).map(Self::get_task_future).flatten().map(|f| &f.future) } /// Get a pinned shared reference to the mapped future. pub fn get_pin(&mut self, key: &K) -> Option> { - if let Some(task_ref) = self.task_set.get(key) { - unsafe { - let task = Arc::from_raw(task_ref.inner); - if let Some(ref mut fut) = *task.future.get() { - let _ = Arc::into_raw(task); - return Some(Pin::new_unchecked(&fut.future)); - } - let _ = Arc::into_raw(task); - } - } - None + self.task_set + .get(key) + .map(Self::get_task_future) + .flatten() + .map(|f| unsafe { Pin::new_unchecked(&f.future) }) } /// Returns an iterator of keys in the mapping. From 4a69aaaeb5b18cbde24263e4d421f3e49ec4c871 Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 22:49:20 -0500 Subject: [PATCH 13/21] apply clippy suggestions --- .../src/stream/futures_unordered/mapped_futures/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index a663c482ed..a6c985b8c1 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -226,8 +226,7 @@ impl MappedFutures { pub fn get_pin_mut(&mut self, key: &K) -> Option> { self.task_set .get(key) - .map(Self::get_task_future) - .flatten() + .and_then(Self::get_task_future) .map(|f| unsafe { Pin::new_unchecked(&mut f.future) }) } @@ -241,15 +240,14 @@ impl MappedFutures { /// Get a shared reference to the mapped future. pub fn get(&mut self, key: &K) -> Option<&Fut> { - self.task_set.get(key).map(Self::get_task_future).flatten().map(|f| &f.future) + self.task_set.get(key).and_then(Self::get_task_future).map(|f| &f.future) } /// Get a pinned shared reference to the mapped future. pub fn get_pin(&mut self, key: &K) -> Option> { self.task_set .get(key) - .map(Self::get_task_future) - .flatten() + .and_then(Self::get_task_future) .map(|f| unsafe { Pin::new_unchecked(&f.future) }) } From a662a83538846286c48f65ef16b5fb51848b300c Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 23:47:11 -0500 Subject: [PATCH 14/21] clarify cargo comment --- futures-util/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 01ce86be3b..2809b22431 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -48,7 +48,7 @@ spin = { version = "0.10.0", optional = true } futures = { path = "../futures", features = ["async-await", "thread-pool"] } futures-test = { path = "../futures-test" } tokio = "0.1.11" -# MappedFutures requires tokio::time::sleep, which is unavailable in the above tokio version +# MappedFutures's test cases require tokio::time::sleep, which is unavailable in the above tokio version tokio_new = {package = "tokio", git = "https://github.com/tokio-rs/tokio", branch = "tokio-1.25.x", features = ["time", "rt", "macros"]} [package.metadata.docs.rs] From 5968e416a4f8cd8484c96f7950b3eeda1862d4b4 Mon Sep 17 00:00:00 2001 From: Xian Date: Fri, 4 Apr 2025 23:58:18 -0500 Subject: [PATCH 15/21] remove hash trait from HashFut --- .../futures_unordered/mapped_futures/mod.rs | 61 ++++++------------- 1 file changed, 19 insertions(+), 42 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index a6c985b8c1..3075dfbfa1 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -41,53 +41,38 @@ pub struct MappedFutures { } #[derive(Debug)] -struct HashTask { - inner: *const Task, +struct HashFut { key: Arc, + future: Fut, } -impl Borrow for HashTask> { - fn borrow(&self) -> &K { - &self.key - } -} - -impl PartialEq for HashTask { - fn eq(&self, other: &Self) -> bool { - self.inner == other.inner - } -} - -impl Eq for HashTask {} - -impl Hash for HashTask> { - fn hash(&self, state: &mut H) { - let key = unsafe { (*self.inner).future.get().as_ref() }.unwrap().as_ref().unwrap().key(); - key.hash(state) +impl HashFut { + fn key(&self) -> &K { + self.key.as_ref() } } #[derive(Debug)] -struct HashFut { +struct HashTask { + inner: *const Task, key: Arc, - future: Fut, } -impl HashFut { - fn key(&self) -> &K { - self.key.as_ref() +impl Borrow for HashTask> { + fn borrow(&self) -> &K { + &self.key } } -impl PartialEq for HashFut { +impl PartialEq for HashTask { fn eq(&self, other: &Self) -> bool { self.key == other.key } } -impl Eq for HashFut {} +impl Eq for HashTask {} -impl Hash for HashFut { +impl Hash for HashTask> { fn hash(&self, state: &mut H) { self.key.hash(state); } @@ -106,6 +91,7 @@ impl Future for HashFut { unsafe impl Send for MappedFutures {} unsafe impl Sync for MappedFutures {} + impl Unpin for MappedFutures {} impl Default for MappedFutures { @@ -226,7 +212,8 @@ impl MappedFutures { pub fn get_pin_mut(&mut self, key: &K) -> Option> { self.task_set .get(key) - .and_then(Self::get_task_future) + .map(Self::get_task_future) + .flatten() .map(|f| unsafe { Pin::new_unchecked(&mut f.future) }) } @@ -240,14 +227,15 @@ impl MappedFutures { /// Get a shared reference to the mapped future. pub fn get(&mut self, key: &K) -> Option<&Fut> { - self.task_set.get(key).and_then(Self::get_task_future).map(|f| &f.future) + self.task_set.get(key).map(Self::get_task_future).flatten().map(|f| &f.future) } /// Get a pinned shared reference to the mapped future. pub fn get_pin(&mut self, key: &K) -> Option> { self.task_set .get(key) - .and_then(Self::get_task_future) + .map(Self::get_task_future) + .flatten() .map(|f| unsafe { Pin::new_unchecked(&f.future) }) } @@ -422,7 +410,6 @@ pub mod tests { use tokio_new as tokio; fn insert_millis(futs: &mut MappedFutures, key: u32, millis: u64) { - // futs.insert(key, Delay::new(Duration::from_millis(millis))); futs.insert(key, sleep(Duration::from_millis(millis))); } @@ -431,18 +418,9 @@ pub mod tests { key: u32, millis: u64, ) { - // futs.insert(key, Box::pin(Delay::new(Duration::from_millis(millis)))); futs.insert(key, Box::pin(sleep(Duration::from_millis(millis)))); } - #[tokio::test] - async fn mf_test_delay() { - // let mut futures: MappedFutures = MappedFutures::new(); - // insert_millis(&mut futures, 1, 50); - // assert_eq!(futures.next().await.unwrap().0, 2); - sleep(Duration::from_millis(500)).await; - } - #[tokio::test] async fn mf_map_futures() { let mut futures: MappedFutures = MappedFutures::new(); @@ -482,7 +460,6 @@ pub mod tests { insert_millis(&mut futures, 4, 2000); assert_eq!(futures.next().await.unwrap().0, 1); - // futures.get_mut(&3).unwrap().reset(Duration::from_millis(300)); futures .get_pin_mut(&3) .unwrap() From c0ecfa00682ff8e5159f743c237a58c6390c17a7 Mon Sep 17 00:00:00 2001 From: Xian Date: Sat, 5 Apr 2025 01:03:12 -0500 Subject: [PATCH 16/21] add more iterator traits, add comments --- .../futures_unordered/mapped_futures/mod.rs | 89 ++++++++++++++++++- 1 file changed, 87 insertions(+), 2 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 3075dfbfa1..9273f96e1a 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -8,13 +8,16 @@ use alloc::sync::Arc; use core::borrow::Borrow; use core::fmt::Debug; use core::hash::{Hash, Hasher}; +use core::iter::FromIterator; use core::pin::Pin; +use core::sync::atomic::Ordering::Relaxed; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; +use futures_core::FusedStream; use std::collections::HashSet; -use super::{FuturesUnordered, IterPinMut, IterPinRef}; +use super::{FuturesUnordered, IntoIter, IterPinMut, IterPinRef}; /// A map of futures which may complete in any order. /// @@ -40,6 +43,8 @@ pub struct MappedFutures { futures: FuturesUnordered>, } +// Wraps the user-provided Future. Output is associated with a key, partly so that we'll know which +// HashTask to remove from the Set. #[derive(Debug)] struct HashFut { key: Arc, @@ -52,6 +57,11 @@ impl HashFut { } } +// Wraps the task; but contains a raw pointer, so we need to ensure soundness by: +// - always decrementing strong count or using Arc::into_raw() any time we re-create the Arc +// - ensure that the strong count is exactly 1 when the task is dropped in release_task() +// Aside from that, HashTask is used to access Task using a key, such as in get_mut, remove, +// cancel, etc. #[derive(Debug)] struct HashTask { inner: *const Task, @@ -78,6 +88,11 @@ impl Hash for HashTask> { } } +// SAFETY: +// - the use of Pin::into_inner_unchecked() unchecked is safe because we are only accessing the owned +// future, which is not moved, and its reference is immediaely pinned +// - the other field, of type Arc, is Unpin, and can be moved safely +// - Pin::new_unchecked() is safe because the &mut points to a value that was just also pinned impl Future for HashFut { type Output = (Arc, Fut::Output); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -158,6 +173,10 @@ impl MappedFutures { replacing } + // Extracts some of the unsafety. + // Get the &mut to the future of the task. + // The "future not found" case should never occur; the future is removed from task just before + // task is dropped; consider putting a debug invariant in this function. fn get_task_future(task: &HashTask>) -> Option<&mut HashFut> { unsafe { let arc_task = Arc::from_raw(task.inner); @@ -252,7 +271,6 @@ impl MappedFutures { pub fn iter(&self) -> MapIter<'_, K, Fut> where Fut: Unpin, - K: Unpin, { MapIter(Pin::new(self).iter_pin_ref()) } @@ -303,6 +321,52 @@ impl Stream for MappedFutures { } } +impl<'a, K: Hash + Eq, Fut: Unpin> IntoIterator for &'a MappedFutures { + type Item = (&'a K, &'a Fut); + type IntoIter = MapIter<'a, K, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, K: Hash + Eq, Fut: Unpin> IntoIterator for &'a mut MappedFutures { + type Item = (&'a K, &'a mut Fut); + type IntoIter = MapIterMut<'a, K, Fut>; + + fn into_iter(self) -> Self::IntoIter { + self.iter_mut() + } +} + +impl IntoIterator for MappedFutures { + type Item = (K, Fut); + type IntoIter = MapIntoIter; + + fn into_iter(self) -> Self::IntoIter { + MapIntoIter { task_set: self.task_set, inner: self.futures.into_iter() } + } +} + +impl FromIterator<(K, Fut)> for MappedFutures { + fn from_iter(iter: I) -> Self + where + I: IntoIterator, + { + let acc = Self::new(); + iter.into_iter().fold(acc, |mut acc, (key, fut)| { + acc.insert(key, fut); + acc + }) + } +} + +impl FusedStream for MappedFutures { + fn is_terminated(&self) -> bool { + self.futures.is_terminated.load(Relaxed) + } +} + /// Immutable iterator over all keys in the mapping. #[derive(Debug)] pub struct Keys<'a, K: Hash + Eq, Fut>( @@ -333,6 +397,27 @@ pub struct MapIterMut<'a, K: Hash + Eq, Fut>(MapIterPinMut<'a, K, Fut>); #[derive(Debug)] pub struct MapIter<'a, K: Hash + Eq, Fut>(MapIterPinRef<'a, K, Fut>); +/// Owned iterator over all keys and futures in the map. +#[derive(Debug)] +pub struct MapIntoIter { + task_set: HashSet>>, + inner: IntoIter>, +} + +impl Iterator for MapIntoIter { + type Item = (K, Fut); + + fn next(&mut self) -> Option { + let hash_fut = self.inner.next()?; + self.task_set.remove(hash_fut.key()); + Some((Arc::try_unwrap(hash_fut.key).ok().unwrap(), hash_fut.future)) + } + + fn size_hint(&self) -> (usize, Option) { + (self.inner.len, Some(self.inner.len)) + } +} + impl<'a, K: Hash + Eq, Fut: Unpin> Iterator for MapIterMut<'a, K, Fut> { type Item = (&'a K, &'a mut Fut); From 5d1803ca71bb6752c218e59111af7c4f0a7e4e52 Mon Sep 17 00:00:00 2001 From: Xian Date: Sat, 5 Apr 2025 08:41:40 -0500 Subject: [PATCH 17/21] re-apply overwritten clippy fixes --- .../src/stream/futures_unordered/mapped_futures/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 9273f96e1a..fec8b0b696 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -231,8 +231,7 @@ impl MappedFutures { pub fn get_pin_mut(&mut self, key: &K) -> Option> { self.task_set .get(key) - .map(Self::get_task_future) - .flatten() + .and_then(Self::get_task_future) .map(|f| unsafe { Pin::new_unchecked(&mut f.future) }) } @@ -246,15 +245,14 @@ impl MappedFutures { /// Get a shared reference to the mapped future. pub fn get(&mut self, key: &K) -> Option<&Fut> { - self.task_set.get(key).map(Self::get_task_future).flatten().map(|f| &f.future) + self.task_set.get(key).and_then(Self::get_task_future).map(|f| &f.future) } /// Get a pinned shared reference to the mapped future. pub fn get_pin(&mut self, key: &K) -> Option> { self.task_set .get(key) - .map(Self::get_task_future) - .flatten() + .and_then(Self::get_task_future) .map(|f| unsafe { Pin::new_unchecked(&f.future) }) } From f217aa93fc0f0837dd56751de8b131cba140ff6a Mon Sep 17 00:00:00 2001 From: Xian Date: Sat, 5 Apr 2025 09:15:21 -0500 Subject: [PATCH 18/21] remove &muts for &; refactor out all Arc::into_raw() --- .../futures_unordered/mapped_futures/mod.rs | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index fec8b0b696..1bb7b50e51 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -57,9 +57,8 @@ impl HashFut { } } -// Wraps the task; but contains a raw pointer, so we need to ensure soundness by: -// - always decrementing strong count or using Arc::into_raw() any time we re-create the Arc -// - ensure that the strong count is exactly 1 when the task is dropped in release_task() +// Wraps the task; but contains a raw pointer, so we need to ensure soundness by ensuring the Task +// is only ever dropped inside release_task(), and is never used after calling release_task() // Aside from that, HashTask is used to access Task using a key, such as in get_mut, remove, // cancel, etc. #[derive(Debug)] @@ -178,15 +177,7 @@ impl MappedFutures { // The "future not found" case should never occur; the future is removed from task just before // task is dropped; consider putting a debug invariant in this function. fn get_task_future(task: &HashTask>) -> Option<&mut HashFut> { - unsafe { - let arc_task = Arc::from_raw(task.inner); - if let Some(ref mut fut) = *arc_task.future.get() { - let _ = Arc::into_raw(arc_task); - return Some(fut); - } - let _ = Arc::into_raw(arc_task); - None - } + unsafe { (*(*task.inner).future.get()).as_mut() } } /// Remove a future from the set, dropping it. @@ -208,22 +199,23 @@ impl MappedFutures { where Fut: Unpin, { - // if let Some(task) = self.hash_set.get(key) { - if let Some(task) = self.task_set.take(key) { - unsafe { - let arc_task = Arc::from_raw(task.inner); - let fut = (*arc_task.future.get()).take().unwrap(); - let _ = Arc::into_raw(arc_task); + self.task_set + .take(key) + .and_then(|task| unsafe { + // SAFETY: + // - Must remove the future from task before releasing task + // - Derefernce must be safe; if the task had been released then it would have been + // removed from the set already + let fut = (*(*task.inner).future.get()).take(); let unlinked_task = self.futures.unlink(task.inner); self.futures.release_task(unlinked_task); - return Some(fut.future); - } - } - None + fut + }) + .map(|f| f.future) } /// Returns `true` if the map contains a future for the specified key. - pub fn contains(&mut self, key: &K) -> bool { + pub fn contains(&self, key: &K) -> bool { self.task_set.contains(key) } @@ -244,12 +236,12 @@ impl MappedFutures { } /// Get a shared reference to the mapped future. - pub fn get(&mut self, key: &K) -> Option<&Fut> { + pub fn get(&self, key: &K) -> Option<&Fut> { self.task_set.get(key).and_then(Self::get_task_future).map(|f| &f.future) } /// Get a pinned shared reference to the mapped future. - pub fn get_pin(&mut self, key: &K) -> Option> { + pub fn get_pin(&self, key: &K) -> Option> { self.task_set .get(key) .and_then(Self::get_task_future) From 98eedf358210f0efcbd69013b27b429baa50339d Mon Sep 17 00:00:00 2001 From: Xian Date: Sat, 5 Apr 2025 10:25:12 -0500 Subject: [PATCH 19/21] add safety comment on Sync impl --- .../src/stream/futures_unordered/mapped_futures/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 1bb7b50e51..692322b10e 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -103,6 +103,11 @@ impl Future for HashFut { } } +// SAFETY: +// - all task pointers are owned within MappedFutures, either in FuturesUnordered or in the task +// HashSet +// - so its not possible for a task to be written to while a ready/write is happening, since the +// former op would require &mut access to MappedFutures unsafe impl Send for MappedFutures {} unsafe impl Sync for MappedFutures {} From 784b07619fe2e059987ecd4cb79fc840e35d2b19 Mon Sep 17 00:00:00 2001 From: Xian Date: Sat, 5 Apr 2025 12:49:01 -0500 Subject: [PATCH 20/21] extract HashTask::get_future() --- .../futures_unordered/mapped_futures/mod.rs | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 692322b10e..570bad5062 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -87,6 +87,16 @@ impl Hash for HashTask> { } } +impl HashTask> { + // Extracts some of the unsafety. + // Get the &mut to the future of the task. + // The "future not found" case should never occur; the future is removed from task just before + // task is dropped; consider putting a debug invariant in this function. + fn get_future(&self) -> Option<&mut HashFut> { + unsafe { (*(*self.inner).future.get()).as_mut() } + } +} + // SAFETY: // - the use of Pin::into_inner_unchecked() unchecked is safe because we are only accessing the owned // future, which is not moved, and its reference is immediaely pinned @@ -181,8 +191,8 @@ impl MappedFutures { // Get the &mut to the future of the task. // The "future not found" case should never occur; the future is removed from task just before // task is dropped; consider putting a debug invariant in this function. - fn get_task_future(task: &HashTask>) -> Option<&mut HashFut> { - unsafe { (*(*task.inner).future.get()).as_mut() } + fn get_task_future(&self, key: &K) -> Option<&mut HashFut> { + self.task_set.get(key).and_then(|t| t.get_future()) } /// Remove a future from the set, dropping it. @@ -190,11 +200,13 @@ impl MappedFutures { /// Returns true if a future was cancelled. pub fn cancel(&mut self, key: &K) -> bool { if let Some(task) = self.task_set.take(key) { - if Self::get_task_future(&task).is_some() { - let unlinked_task = unsafe { self.futures.unlink(task.inner) }; - self.futures.release_task(unlinked_task); - return true; - } + // if task.get_future().is_some() { // unnecessary + // Should be impossible to get here without task having a future + // If the future was removed, then task is not in set + // If future was completed, then it was removed and task dropped + let unlinked_task = unsafe { self.futures.unlink(task.inner) }; + self.futures.release_task(unlinked_task); + return true; } false } @@ -208,7 +220,7 @@ impl MappedFutures { .take(key) .and_then(|task| unsafe { // SAFETY: - // - Must remove the future from task before releasing task + // - If removing the future from task, must do so before releasing task // - Derefernce must be safe; if the task had been released then it would have been // removed from the set already let fut = (*(*task.inner).future.get()).take(); @@ -226,10 +238,7 @@ impl MappedFutures { /// Get a pinned mutable reference to the mapped future. pub fn get_pin_mut(&mut self, key: &K) -> Option> { - self.task_set - .get(key) - .and_then(Self::get_task_future) - .map(|f| unsafe { Pin::new_unchecked(&mut f.future) }) + self.get_task_future(key).map(|f| unsafe { Pin::new_unchecked(&mut f.future) }) } /// Get a pinned mutable reference to the mapped future. @@ -237,20 +246,17 @@ impl MappedFutures { where Fut: Unpin, { - Self::get_task_future(self.task_set.get(key)?).map(|f| &mut f.future) + self.get_task_future(key).map(|f| &mut f.future) } /// Get a shared reference to the mapped future. pub fn get(&self, key: &K) -> Option<&Fut> { - self.task_set.get(key).and_then(Self::get_task_future).map(|f| &f.future) + self.get_task_future(key).map(|f| &f.future) } /// Get a pinned shared reference to the mapped future. pub fn get_pin(&self, key: &K) -> Option> { - self.task_set - .get(key) - .and_then(Self::get_task_future) - .map(|f| unsafe { Pin::new_unchecked(&f.future) }) + self.get_task_future(key).map(|f| unsafe { Pin::new_unchecked(&f.future) }) } /// Returns an iterator of keys in the mapping. From eaaeb0881ec25839c765cae314cdf0c43706cd3f Mon Sep 17 00:00:00 2001 From: Xian Date: Sun, 6 Apr 2025 14:56:07 -0500 Subject: [PATCH 21/21] add safety doc comment, and add debug_assert!() for non-nullable task futures --- .../stream/futures_unordered/mapped_futures/mod.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs index 570bad5062..76a0976054 100644 --- a/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs +++ b/futures-util/src/stream/futures_unordered/mapped_futures/mod.rs @@ -92,8 +92,17 @@ impl HashTask> { // Get the &mut to the future of the task. // The "future not found" case should never occur; the future is removed from task just before // task is dropped; consider putting a debug invariant in this function. + // SAFETY: + // - we are returning an &mut to the HashFut, requiring only a & ref + // - this could be used to have multiple mutable references at the same time + // - use of this function is sound only if there never exists >=2 mut refs to the same future + // - so, consuming code can either themselves require &mut MappedFutures, and not themselves + // create multiple mut refs, or can require &MappedFutures, and cast the returned + // &mut HashFut to a &HashFut fn get_future(&self) -> Option<&mut HashFut> { - unsafe { (*(*self.inner).future.get()).as_mut() } + let fut_opt = unsafe { (*(*self.inner).future.get()).as_mut() }; + debug_assert!(fut_opt.is_some()); + fut_opt } } @@ -224,6 +233,7 @@ impl MappedFutures { // - Derefernce must be safe; if the task had been released then it would have been // removed from the set already let fut = (*(*task.inner).future.get()).take(); + debug_assert!(fut.is_some()); let unlinked_task = self.futures.unlink(task.inner); self.futures.release_task(unlinked_task); fut