Skip to content

Commit b9dd6cf

Browse files
ebkalderoncramertj
authored andcommitted
Implement try_join_all() and TryJoinAll in futures-util
1 parent e8bacf2 commit b9dd6cf

File tree

4 files changed

+190
-3
lines changed

4 files changed

+190
-3
lines changed

futures-util/src/future/join_all.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
5858
/// A future which takes a list of futures and resolves with a vector of the
5959
/// completed values.
6060
///
61-
/// This future is created with the `join_all` method.
61+
/// This future is created with the `join_all` function.
6262
#[must_use = "futures do nothing unless polled"]
6363
pub struct JoinAll<F>
6464
where

futures-util/src/try_future/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ mod select_all;
2121
#[cfg(feature = "std")]
2222
mod select_ok;
2323
#[cfg(feature = "std")]
24-
pub use self::join_all::{join_all, JoinAll};
25-
#[cfg(feature = "std")]
2624
pub use self::select_all::{SelectAll, SelectAllNext, select_all};
2725
#[cfg(feature = "std")]
2826
pub use self::select_ok::{SelectOk, select_ok};
@@ -56,6 +54,12 @@ pub use self::or_else::OrElse;
5654
mod unwrap_or_else;
5755
pub use self::unwrap_or_else::UnwrapOrElse;
5856

57+
#[cfg(feature = "std")]
58+
mod join_all;
59+
60+
#[cfg(feature = "std")]
61+
pub use self::join_all::{join_all, JoinAll};
62+
5963
// Implementation details
6064
mod try_chain;
6165
pub(crate) use self::try_chain::{TryChain, TryChainAction};
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
//! Definition of the `TryJoinAll` combinator, waiting for all of a list of
2+
//! futures to finish with either success or error.
3+
4+
use std::fmt;
5+
use std::future::Future;
6+
use std::iter::FromIterator;
7+
use std::mem;
8+
use std::pin::Pin;
9+
use std::prelude::v1::*;
10+
use std::task::Poll;
11+
use super::TryFuture;
12+
13+
#[derive(Debug)]
14+
enum ElemState<F>
15+
where
16+
F: TryFuture,
17+
{
18+
Pending(F),
19+
Done(Option<Result<F::Ok, F::Error>>),
20+
}
21+
22+
impl<F> ElemState<F>
23+
where
24+
F: TryFuture,
25+
{
26+
fn pending_pin_mut<'a>(self: Pin<&'a mut Self>) -> Option<Pin<&'a mut F>> {
27+
// Safety: Basic enum pin projection, no drop + optionally Unpin based
28+
// on the type of this variant
29+
match unsafe { self.get_unchecked_mut() } {
30+
ElemState::Pending(f) => Some(unsafe { Pin::new_unchecked(f) }),
31+
ElemState::Done(_) => None,
32+
}
33+
}
34+
35+
fn take_done(self: Pin<&mut Self>) -> Option<Result<F::Ok, F::Error>> {
36+
// Safety: Going from pin to a variant we never pin-project
37+
match unsafe { self.get_unchecked_mut() } {
38+
ElemState::Pending(_) => None,
39+
ElemState::Done(output) => output.take(),
40+
}
41+
}
42+
}
43+
44+
impl<F> Unpin for ElemState<F> where F: TryFuture + Unpin {}
45+
46+
fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
47+
// Safety: `std` _could_ make this unsound if it were to decide Pin's
48+
// invariants aren't required to transmit through slices. Otherwise this has
49+
// the same safety as a normal field pin projection.
50+
unsafe { slice.get_unchecked_mut() }
51+
.iter_mut()
52+
.map(|t| unsafe { Pin::new_unchecked(t) })
53+
}
54+
55+
#[derive(Debug)]
56+
enum FinalState<E = ()> {
57+
Pending,
58+
AllDone,
59+
Error(E)
60+
}
61+
62+
/// A future which takes a list of futures and resolves with a vector of the
63+
/// completed values or an error.
64+
///
65+
/// This future is created with the `try_join_all` function.
66+
#[must_use = "futures do nothing unless polled"]
67+
pub struct TryJoinAll<F>
68+
where
69+
F: TryFuture,
70+
{
71+
elems: Pin<Box<[ElemState<F>]>>,
72+
}
73+
74+
impl<F> fmt::Debug for TryJoinAll<F>
75+
where
76+
F: TryFuture + fmt::Debug,
77+
F::Ok: fmt::Debug,
78+
F::Error: fmt::Debug,
79+
{
80+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
81+
fmt.debug_struct("TryJoinAll")
82+
.field("elems", &self.elems)
83+
.finish()
84+
}
85+
}
86+
87+
/// Creates a future which represents either a collection of the results of the
88+
/// futures given or an error.
89+
///
90+
/// The returned future will drive execution for all of its underlying futures,
91+
/// collecting the results into a destination `Vec<T>` in the same order as they
92+
/// were provided.
93+
///
94+
/// If any future returns an error then all other futures will be canceled and
95+
/// an error will be returned immediately. If all futures complete successfully,
96+
/// however, then the returned future will succeed with a `Vec` of all the
97+
/// successful results.
98+
///
99+
/// # Examples
100+
///
101+
/// ```
102+
/// #![feature(async_await, await_macro, futures_api)]
103+
/// # futures::executor::block_on(async {
104+
/// use futures::future::{self, try_join_all};
105+
///
106+
/// let futures = vec![
107+
/// future::ok::<u32, u32>(1),
108+
/// future::ok::<u32, u32>(2),
109+
/// future::ok::<u32, u32>(3),
110+
/// ];
111+
///
112+
/// assert_eq!(await!(try_join_all(futures)), Ok(vec![1, 2, 3]));
113+
///
114+
/// let futures = vec![
115+
/// future::ok::<u32, u32>(1),
116+
/// future::err::<u32, u32>(2),
117+
/// future::ok::<u32, u32>(3),
118+
/// ];
119+
///
120+
/// assert_eq!(await!(try_join_all(futures)), Err(2));
121+
/// # });
122+
/// ```
123+
pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item>
124+
where
125+
I: IntoIterator,
126+
I::Item: TryFuture,
127+
{
128+
let elems: Box<[_]> = i.into_iter().map(ElemState::Pending).collect();
129+
TryJoinAll {
130+
elems: Box::into_pin(elems),
131+
}
132+
}
133+
134+
impl<F> Future for TryJoinAll<F>
135+
where
136+
F: TryFuture,
137+
{
138+
type Output = Result<Vec<F::Ok>, F::Error>;
139+
140+
fn poll(
141+
mut self: Pin<&mut Self>,
142+
lw: &::std::task::LocalWaker,
143+
) -> Poll<Self::Output> {
144+
let mut state = FinalState::AllDone;
145+
146+
for mut elem in iter_pin_mut(self.elems.as_mut()) {
147+
if let Some(pending) = elem.as_mut().pending_pin_mut() {
148+
match pending.try_poll(lw) {
149+
Poll::Pending => state = FinalState::Pending,
150+
Poll::Ready(output) => match output {
151+
Ok(item) => elem.set(ElemState::Done(Some(Ok(item)))),
152+
Err(e) => {
153+
state = FinalState::Error(e);
154+
break;
155+
}
156+
}
157+
}
158+
}
159+
}
160+
161+
match state {
162+
FinalState::Pending => Poll::Pending,
163+
FinalState::AllDone => {
164+
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
165+
let result = iter_pin_mut(elems.as_mut())
166+
.map(|e| e.take_done().unwrap())
167+
.collect();
168+
Poll::Ready(result)
169+
},
170+
FinalState::Error(e) => {
171+
let _ = mem::replace(&mut self.elems, Box::pin([]));
172+
Poll::Ready(Err(e))
173+
},
174+
}
175+
}
176+
}
177+
178+
impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> {
179+
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
180+
try_join_all(iter)
181+
}
182+
}

futures/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ pub mod future {
206206
AndThen, ErrInto, FlattenSink, IntoFuture, MapErr, MapOk, OrElse,
207207
UnwrapOrElse,
208208
TryJoin, TryJoin3, TryJoin4, TryJoin5,
209+
try_join_all, TryJoinAll,
209210
};
210211
}
211212

0 commit comments

Comments
 (0)