1
+ #![ feature( async_await) ]
1
2
#![ feature( test) ]
2
3
#![ deny( warnings) ]
3
4
4
- extern crate futures;
5
- extern crate hyper;
6
- extern crate pretty_env_logger;
7
5
extern crate test;
8
- extern crate tokio;
9
6
10
7
use std:: net:: SocketAddr ;
11
8
12
- use futures :: { Future , Stream } ;
9
+ use futures_util :: future :: join_all ;
13
10
use tokio:: runtime:: current_thread:: Runtime ;
14
11
15
12
use hyper:: { Body , Method , Request , Response , Server } ;
@@ -176,11 +173,12 @@ impl Opts {
176
173
177
174
fn bench ( self , b : & mut test:: Bencher ) {
178
175
let _ = pretty_env_logger:: try_init ( ) ;
176
+ // Create a runtime of current thread.
179
177
let mut rt = Runtime :: new ( ) . unwrap ( ) ;
180
178
181
179
b. bytes = self . response_body . len ( ) as u64 + self . request_body . map ( |b| b. len ( ) ) . unwrap_or ( 0 ) as u64 ;
182
180
183
- let addr = spawn_hello ( & mut rt, & self ) ;
181
+ let addr = spawn_server ( & mut rt, & self ) ;
184
182
185
183
let connector = HttpConnector :: new ( 1 ) ;
186
184
let client = hyper:: Client :: builder ( )
@@ -198,61 +196,60 @@ impl Opts {
198
196
. unwrap_or_else ( || Body :: empty ( ) ) ;
199
197
let mut req = Request :: new ( body) ;
200
198
* req. method_mut ( ) = self . request_method . clone ( ) ;
199
+ * req. uri_mut ( ) = url. clone ( ) ;
201
200
req
202
201
} ;
203
202
203
+ let send_request = |req : Request < Body > | {
204
+ let fut = client. request ( req) ;
205
+ async {
206
+ let res = fut. await . expect ( "client wait" ) ;
207
+ let mut body = res. into_body ( ) ;
208
+ while let Some ( _chunk) = body. next ( ) . await { }
209
+ }
210
+ } ;
211
+
204
212
if self . parallel_cnt == 1 {
205
- b. iter ( move || {
206
- let mut req = make_request ( ) ;
207
- * req. uri_mut ( ) = url. clone ( ) ;
208
- rt. block_on ( client. request ( req) . and_then ( |res| {
209
- res. into_body ( ) . for_each ( |_chunk| {
210
- Ok ( ( ) )
211
- } )
212
- } ) ) . expect ( "client wait" ) ;
213
+ b. iter ( || {
214
+ let req = make_request ( ) ;
215
+ rt. block_on ( send_request ( req) ) ;
213
216
} ) ;
217
+
214
218
} else {
215
219
b. iter ( || {
216
220
let futs = ( 0 ..self . parallel_cnt )
217
- . into_iter ( )
218
221
. map ( |_| {
219
- let mut req = make_request ( ) ;
220
- * req. uri_mut ( ) = url. clone ( ) ;
221
- client. request ( req) . and_then ( |res| {
222
- res. into_body ( ) . for_each ( |_chunk| {
223
- Ok ( ( ) )
224
- } )
225
- } ) . map_err ( |e| panic ! ( "client error: {}" , e) )
222
+ let req = make_request ( ) ;
223
+ send_request ( req)
226
224
} ) ;
227
- let _ = rt. block_on ( :: futures:: future:: join_all ( futs) ) ;
225
+ // Await all spawned futures becoming completed.
226
+ rt. block_on ( join_all ( futs) ) ;
228
227
} ) ;
229
228
}
230
229
}
231
230
}
232
231
233
- fn spawn_hello ( rt : & mut Runtime , opts : & Opts ) -> SocketAddr {
234
- use hyper:: service:: { service_fn} ;
232
+ fn spawn_server ( rt : & mut Runtime , opts : & Opts ) -> SocketAddr {
233
+ use hyper:: service:: { make_service_fn , service_fn} ;
235
234
let addr = "127.0.0.1:0" . parse ( ) . unwrap ( ) ;
236
235
237
236
let body = opts. response_body ;
238
237
let srv = Server :: bind ( & addr)
239
238
. http2_only ( opts. http2 )
240
239
. http2_initial_stream_window_size_ ( opts. http2_stream_window )
241
240
. http2_initial_connection_window_size_ ( opts. http2_conn_window )
242
- . serve ( move || {
243
- service_fn ( move |req : Request < Body > | {
244
- req
245
- . into_body ( )
246
- . for_each ( |_chunk| {
247
- Ok ( ( ) )
248
- } )
249
- . map ( move |_| {
250
- Response :: new ( Body :: from ( body) )
251
- } )
252
- } )
253
- } ) ;
241
+ . serve ( make_service_fn ( move |_| async move {
242
+ Ok :: < _ , hyper:: Error > ( service_fn ( move |req : Request < Body > | async move {
243
+ let mut req_body = req. into_body ( ) ;
244
+ while let Some ( _chunk) = req_body. next ( ) . await { }
245
+ Ok :: < _ , hyper:: Error > ( Response :: new ( Body :: from ( body) ) )
246
+ } ) )
247
+ } ) ) ;
254
248
let addr = srv. local_addr ( ) ;
255
- let fut = srv. map_err ( |err| panic ! ( "server error: {}" , err) ) ;
256
- rt. spawn ( fut) ;
249
+ rt. spawn ( async {
250
+ if let Err ( err) = srv. await {
251
+ panic ! ( "server error: {}" , err) ;
252
+ }
253
+ } ) ;
257
254
return addr
258
255
}
0 commit comments