Skip to content

Commit b57860f

Browse files
ibraheemdevtaiki-e
authored andcommitted
Use FuturesOrdered in join_all (#2412)
1 parent d6e7046 commit b57860f

File tree

1 file changed

+73
-22
lines changed

1 file changed

+73
-22
lines changed

futures-util/src/future/join_all.rs

Lines changed: 73 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,39 @@ use core::task::{Context, Poll};
1212

1313
use super::{assert_future, MaybeDone};
1414

15+
#[cfg(not(futures_no_atomic_cas))]
16+
use crate::stream::{Collect, FuturesOrdered, StreamExt};
17+
1518
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
1619
// Safety: `std` _could_ make this unsound if it were to decide Pin's
1720
// invariants aren't required to transmit through slices. Otherwise this has
1821
// the same safety as a normal field pin projection.
1922
unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
2023
}
2124

22-
/// Future for the [`join_all`] function.
2325
#[must_use = "futures do nothing unless you `.await` or poll them"]
26+
/// Future for the [`join_all`] function.
2427
pub struct JoinAll<F>
2528
where
2629
F: Future,
2730
{
28-
elems: Pin<Box<[MaybeDone<F>]>>,
31+
kind: JoinAllKind<F>,
32+
}
33+
34+
#[cfg(not(futures_no_atomic_cas))]
35+
const SMALL: usize = 30;
36+
37+
pub(crate) enum JoinAllKind<F>
38+
where
39+
F: Future,
40+
{
41+
Small {
42+
elems: Pin<Box<[MaybeDone<F>]>>,
43+
},
44+
#[cfg(not(futures_no_atomic_cas))]
45+
Big {
46+
fut: Collect<FuturesOrdered<F>, Vec<F::Output>>,
47+
},
2948
}
3049

3150
impl<F> fmt::Debug for JoinAll<F>
@@ -34,7 +53,13 @@ where
3453
F::Output: fmt::Debug,
3554
{
3655
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37-
f.debug_struct("JoinAll").field("elems", &self.elems).finish()
56+
match self.kind {
57+
JoinAllKind::Small { ref elems } => {
58+
f.debug_struct("JoinAll").field("elems", elems).finish()
59+
}
60+
#[cfg(not(futures_no_atomic_cas))]
61+
JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
62+
}
3863
}
3964
}
4065

@@ -50,10 +75,9 @@ where
5075
///
5176
/// # See Also
5277
///
53-
/// This is purposefully a very simple API for basic use-cases. In a lot of
54-
/// cases you will want to use the more powerful
55-
/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
56-
/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
78+
/// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
79+
/// reasons if the number of futures is large. You may want to look into using it or
80+
/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
5781
///
5882
/// Some examples for additional functionality provided by these are:
5983
///
@@ -75,13 +99,33 @@ where
7599
/// assert_eq!(join_all(futures).await, [1, 2, 3]);
76100
/// # });
77101
/// ```
78-
pub fn join_all<I>(i: I) -> JoinAll<I::Item>
102+
pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
79103
where
80104
I: IntoIterator,
81105
I::Item: Future,
82106
{
83-
let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect();
84-
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() })
107+
#[cfg(futures_no_atomic_cas)]
108+
{
109+
let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
110+
let kind = JoinAllKind::Small { elems };
111+
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
112+
}
113+
#[cfg(not(futures_no_atomic_cas))]
114+
{
115+
let iter = iter.into_iter();
116+
let kind = match iter.size_hint().1 {
117+
None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
118+
Some(max) => {
119+
if max <= SMALL {
120+
let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
121+
JoinAllKind::Small { elems }
122+
} else {
123+
JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
124+
}
125+
}
126+
};
127+
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
128+
}
85129
}
86130

87131
impl<F> Future for JoinAll<F>
@@ -91,20 +135,27 @@ where
91135
type Output = Vec<F::Output>;
92136

93137
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
94-
let mut all_done = true;
138+
match &mut self.kind {
139+
JoinAllKind::Small { elems } => {
140+
let mut all_done = true;
95141

96-
for elem in iter_pin_mut(self.elems.as_mut()) {
97-
if elem.poll(cx).is_pending() {
98-
all_done = false;
99-
}
100-
}
142+
for elem in iter_pin_mut(elems.as_mut()) {
143+
if elem.poll(cx).is_pending() {
144+
all_done = false;
145+
}
146+
}
101147

102-
if all_done {
103-
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
104-
let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
105-
Poll::Ready(result)
106-
} else {
107-
Poll::Pending
148+
if all_done {
149+
let mut elems = mem::replace(elems, Box::pin([]));
150+
let result =
151+
iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
152+
Poll::Ready(result)
153+
} else {
154+
Poll::Pending
155+
}
156+
}
157+
#[cfg(not(futures_no_atomic_cas))]
158+
JoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
108159
}
109160
}
110161
}

0 commit comments

Comments
 (0)