@@ -12,20 +12,39 @@ use core::task::{Context, Poll};
12
12
13
13
use super :: { assert_future, MaybeDone } ;
14
14
15
+ #[ cfg( not( futures_no_atomic_cas) ) ]
16
+ use crate :: stream:: { Collect , FuturesOrdered , StreamExt } ;
17
+
15
18
fn iter_pin_mut < T > ( slice : Pin < & mut [ T ] > ) -> impl Iterator < Item = Pin < & mut T > > {
16
19
// Safety: `std` _could_ make this unsound if it were to decide Pin's
17
20
// invariants aren't required to transmit through slices. Otherwise this has
18
21
// the same safety as a normal field pin projection.
19
22
unsafe { slice. get_unchecked_mut ( ) } . iter_mut ( ) . map ( |t| unsafe { Pin :: new_unchecked ( t) } )
20
23
}
21
24
22
- /// Future for the [`join_all`] function.
23
25
#[ must_use = "futures do nothing unless you `.await` or poll them" ]
26
+ /// Future for the [`join_all`] function.
24
27
pub struct JoinAll < F >
25
28
where
26
29
F : Future ,
27
30
{
28
- elems : Pin < Box < [ MaybeDone < F > ] > > ,
31
+ kind : JoinAllKind < F > ,
32
+ }
33
+
34
+ #[ cfg( not( futures_no_atomic_cas) ) ]
35
+ const SMALL : usize = 30 ;
36
+
37
+ pub ( crate ) enum JoinAllKind < F >
38
+ where
39
+ F : Future ,
40
+ {
41
+ Small {
42
+ elems : Pin < Box < [ MaybeDone < F > ] > > ,
43
+ } ,
44
+ #[ cfg( not( futures_no_atomic_cas) ) ]
45
+ Big {
46
+ fut : Collect < FuturesOrdered < F > , Vec < F :: Output > > ,
47
+ } ,
29
48
}
30
49
31
50
impl < F > fmt:: Debug for JoinAll < F >
34
53
F :: Output : fmt:: Debug ,
35
54
{
36
55
fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
37
- f. debug_struct ( "JoinAll" ) . field ( "elems" , & self . elems ) . finish ( )
56
+ match self . kind {
57
+ JoinAllKind :: Small { ref elems } => {
58
+ f. debug_struct ( "JoinAll" ) . field ( "elems" , elems) . finish ( )
59
+ }
60
+ #[ cfg( not( futures_no_atomic_cas) ) ]
61
+ JoinAllKind :: Big { ref fut, .. } => fmt:: Debug :: fmt ( fut, f) ,
62
+ }
38
63
}
39
64
}
40
65
50
75
///
51
76
/// # See Also
52
77
///
53
- /// This is purposefully a very simple API for basic use-cases. In a lot of
54
- /// cases you will want to use the more powerful
55
- /// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
56
- /// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
78
+ /// `join_all` will switch to the more powerful [`FuturesOrdered`] for performance
79
+ /// reasons if the number of futures is large. You may want to look into using it or
80
+ /// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
57
81
///
58
82
/// Some examples for additional functionality provided by these are:
59
83
///
@@ -75,13 +99,33 @@ where
75
99
/// assert_eq!(join_all(futures).await, [1, 2, 3]);
76
100
/// # });
77
101
/// ```
78
- pub fn join_all < I > ( i : I ) -> JoinAll < I :: Item >
102
+ pub fn join_all < I > ( iter : I ) -> JoinAll < I :: Item >
79
103
where
80
104
I : IntoIterator ,
81
105
I :: Item : Future ,
82
106
{
83
- let elems: Box < [ _ ] > = i. into_iter ( ) . map ( MaybeDone :: Future ) . collect ( ) ;
84
- assert_future :: < Vec < <I :: Item as Future >:: Output > , _ > ( JoinAll { elems : elems. into ( ) } )
107
+ #[ cfg( futures_no_atomic_cas) ]
108
+ {
109
+ let elems = iter. into_iter ( ) . map ( MaybeDone :: Future ) . collect :: < Box < [ _ ] > > ( ) . into ( ) ;
110
+ let kind = JoinAllKind :: Small { elems } ;
111
+ assert_future :: < Vec < <I :: Item as Future >:: Output > , _ > ( JoinAll { kind } )
112
+ }
113
+ #[ cfg( not( futures_no_atomic_cas) ) ]
114
+ {
115
+ let iter = iter. into_iter ( ) ;
116
+ let kind = match iter. size_hint ( ) . 1 {
117
+ None => JoinAllKind :: Big { fut : iter. collect :: < FuturesOrdered < _ > > ( ) . collect ( ) } ,
118
+ Some ( max) => {
119
+ if max <= SMALL {
120
+ let elems = iter. map ( MaybeDone :: Future ) . collect :: < Box < [ _ ] > > ( ) . into ( ) ;
121
+ JoinAllKind :: Small { elems }
122
+ } else {
123
+ JoinAllKind :: Big { fut : iter. collect :: < FuturesOrdered < _ > > ( ) . collect ( ) }
124
+ }
125
+ }
126
+ } ;
127
+ assert_future :: < Vec < <I :: Item as Future >:: Output > , _ > ( JoinAll { kind } )
128
+ }
85
129
}
86
130
87
131
impl < F > Future for JoinAll < F >
@@ -91,20 +135,27 @@ where
91
135
type Output = Vec < F :: Output > ;
92
136
93
137
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
94
- let mut all_done = true ;
138
+ match & mut self . kind {
139
+ JoinAllKind :: Small { elems } => {
140
+ let mut all_done = true ;
95
141
96
- for elem in iter_pin_mut ( self . elems . as_mut ( ) ) {
97
- if elem. poll ( cx) . is_pending ( ) {
98
- all_done = false ;
99
- }
100
- }
142
+ for elem in iter_pin_mut ( elems. as_mut ( ) ) {
143
+ if elem. poll ( cx) . is_pending ( ) {
144
+ all_done = false ;
145
+ }
146
+ }
101
147
102
- if all_done {
103
- let mut elems = mem:: replace ( & mut self . elems , Box :: pin ( [ ] ) ) ;
104
- let result = iter_pin_mut ( elems. as_mut ( ) ) . map ( |e| e. take_output ( ) . unwrap ( ) ) . collect ( ) ;
105
- Poll :: Ready ( result)
106
- } else {
107
- Poll :: Pending
148
+ if all_done {
149
+ let mut elems = mem:: replace ( elems, Box :: pin ( [ ] ) ) ;
150
+ let result =
151
+ iter_pin_mut ( elems. as_mut ( ) ) . map ( |e| e. take_output ( ) . unwrap ( ) ) . collect ( ) ;
152
+ Poll :: Ready ( result)
153
+ } else {
154
+ Poll :: Pending
155
+ }
156
+ }
157
+ #[ cfg( not( futures_no_atomic_cas) ) ]
158
+ JoinAllKind :: Big { fut } => Pin :: new ( fut) . poll ( cx) ,
108
159
}
109
160
}
110
161
}
0 commit comments