Skip to content

Commit d63ee8e

Browse files
Nemo157cramertj
authored andcommitted
Allow joining !Unpin futures in JoinAll
1 parent 7fad7ae commit d63ee8e

File tree

2 files changed

+75
-34
lines changed

2 files changed

+75
-34
lines changed

futures-util/src/future/join_all.rs

Lines changed: 74 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,44 @@ where
1515
F: Future,
1616
{
1717
Pending(F),
18-
Done(F::Output),
18+
Done(Option<F::Output>),
19+
}
20+
21+
impl<F> ElemState<F>
22+
where
23+
F: Future,
24+
{
25+
fn pending_pin_mut<'a>(self: Pin<&'a mut Self>) -> Option<Pin<&'a mut F>> {
26+
// Safety: Basic enum pin projection, no drop + optionally Unpin based
27+
// on the type of this variant
28+
match unsafe { self.get_unchecked_mut() } {
29+
ElemState::Pending(f) => Some(unsafe { Pin::new_unchecked(f) }),
30+
ElemState::Done(_) => None,
31+
}
32+
}
33+
34+
fn take_done(self: Pin<&mut Self>) -> Option<F::Output> {
35+
// Safety: Going from pin to a variant we never pin-project
36+
match unsafe { self.get_unchecked_mut() } {
37+
ElemState::Pending(_) => None,
38+
ElemState::Done(output) => output.take(),
39+
}
40+
}
41+
}
42+
43+
impl<F> Unpin for ElemState<F>
44+
where
45+
F: Future + Unpin,
46+
{
47+
}
48+
49+
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
50+
// Safety: `std` _could_ make this unsound if it were to decide Pin's
51+
// invariants aren't required to transmit through slices. Otherwise this has
52+
// the same safety as a normal field pin projection.
53+
unsafe { slice.get_unchecked_mut() }
54+
.iter_mut()
55+
.map(|t| unsafe { Pin::new_unchecked(t) })
1956
}
2057

2158
/// A future which takes a list of futures and resolves with a vector of the
@@ -27,7 +64,7 @@ pub struct JoinAll<F>
2764
where
2865
F: Future,
2966
{
30-
elems: Vec<ElemState<F>>,
67+
elems: Pin<Box<[ElemState<F>]>>,
3168
}
3269

3370
impl<F> fmt::Debug for JoinAll<F>
@@ -49,33 +86,44 @@ where
4986
/// collecting the results into a destination `Vec<T>` in the same order as they
5087
/// were provided.
5188
///
89+
/// # See Also
90+
///
91+
/// This is purposefully a very simple API for basic use-cases. In a lot of
92+
/// cases you will want to use the more powerful
93+
/// [`FuturesUnordered`][crate::stream::FuturesUnordered] APIs, some
94+
/// examples of additional functionality that provides:
95+
///
96+
/// * Adding new futures to the set even after it has been started.
97+
///
98+
/// * Only polling the specific futures that have been woken. In cases where
99+
/// you have a lot of futures this will result in much more efficient polling.
100+
///
52101
/// # Examples
53102
///
54103
/// ```
55-
/// use futures_util::future::{FutureExt, join_all, ready};
104+
/// #![feature(async_await, await_macro, futures_api)]
105+
/// # futures::executor::block_on(async {
106+
/// use futures::future::{join_all};
107+
///
108+
/// async fn foo(i: u32) -> u32 { i }
56109
///
57-
/// let f = join_all(vec![
58-
/// ready::<u32>(1),
59-
/// ready::<u32>(2),
60-
/// ready::<u32>(3),
61-
/// ]);
62-
/// let f = f.map(|x| {
63-
/// assert_eq!(x, [1, 2, 3]);
64-
/// });
110+
/// let futures = vec![foo(1), foo(2), foo(3)];
111+
///
112+
/// assert_eq!(await!(join_all(futures)), [1, 2, 3]);
113+
/// # });
65114
/// ```
66115
pub fn join_all<I>(i: I) -> JoinAll<I::Item>
67116
where
68117
I: IntoIterator,
69118
I::Item: Future,
70119
{
71-
let elems = i.into_iter().map(ElemState::Pending).collect();
72-
JoinAll { elems }
120+
let elems: Box<[_]> = i.into_iter().map(ElemState::Pending).collect();
121+
JoinAll { elems: Box::into_pin(elems) }
73122
}
74123

75124
impl<F> Future for JoinAll<F>
76125
where
77-
F: Future + Unpin,
78-
F::Output: Unpin,
126+
F: Future,
79127
{
80128
type Output = Vec<F::Output>;
81129

@@ -85,27 +133,20 @@ where
85133
) -> Poll<Self::Output> {
86134
let mut all_done = true;
87135

88-
for elem in self.as_mut().elems.iter_mut() {
89-
match elem {
90-
ElemState::Pending(ref mut t) => match Pin::new(t).poll(lw) {
91-
Poll::Ready(v) => *elem = ElemState::Done(v),
92-
Poll::Pending => {
93-
all_done = false;
94-
continue;
95-
}
96-
},
97-
ElemState::Done(ref mut _v) => (),
98-
};
136+
for mut elem in iter_pin_mut(self.elems.as_mut()) {
137+
if let Some(pending) = elem.as_mut().pending_pin_mut() {
138+
if let Poll::Ready(output) = pending.poll(lw) {
139+
elem.set(ElemState::Done(Some(output)));
140+
} else {
141+
all_done = false;
142+
}
143+
}
99144
}
100145

101146
if all_done {
102-
let elems = mem::replace(&mut self.elems, Vec::new());
103-
let result = elems
104-
.into_iter()
105-
.map(|e| match e {
106-
ElemState::Done(t) => t,
107-
_ => unreachable!(),
108-
})
147+
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
148+
let result = iter_pin_mut(elems.as_mut())
149+
.map(|e| e.take_done().unwrap())
109150
.collect();
110151
Poll::Ready(result)
111152
} else {

futures-util/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
22
//! and the `AsyncRead` and `AsyncWrite` traits.
33
4-
#![feature(futures_api)]
4+
#![feature(futures_api, box_into_pin)]
55
#![cfg_attr(feature = "std", feature(async_await, await_macro))]
66
#![cfg_attr(feature = "nightly", feature(cfg_target_has_atomic))]
77

0 commit comments

Comments
 (0)