1
+ #![ feature( async_await) ]
1
2
#![ feature( test) ]
2
- #![ deny( warnings) ]
3
+ // #![deny(warnings)]
3
4
4
- extern crate futures;
5
- extern crate hyper;
6
- extern crate pretty_env_logger;
7
5
extern crate test;
8
- extern crate tokio;
9
6
10
7
use std:: io:: { Read , Write } ;
11
8
use std:: net:: { TcpListener , TcpStream } ;
12
9
use std:: sync:: mpsc;
10
+ use std:: time:: Duration ;
13
11
14
- use futures:: { stream, Future , Stream } ;
15
- use futures:: sync:: oneshot;
12
+ use futures_util:: { stream, StreamExt } ;
13
+ use tokio:: runtime:: current_thread;
14
+ use tokio:: sync:: oneshot;
16
15
17
16
use hyper:: { Body , Response , Server } ;
18
- use hyper:: service:: service_fn_ok ;
17
+ use hyper:: service:: { make_service_fn , service_fn } ;
19
18
20
19
macro_rules! bench_server {
21
20
( $b: ident, $header: expr, $body: expr) => ( {
22
21
let _ = pretty_env_logger:: try_init( ) ;
23
22
let ( _until_tx, until_rx) = oneshot:: channel:: <( ) >( ) ;
24
23
let addr = {
25
24
let ( addr_tx, addr_rx) = mpsc:: channel( ) ;
26
- :: std:: thread:: spawn( move || {
25
+ std:: thread:: spawn( move || {
27
26
let addr = "127.0.0.1:0" . parse( ) . unwrap( ) ;
27
+ let make_svc = make_service_fn( |_| async {
28
+ Ok :: <_, hyper:: Error >( service_fn( |_| async {
29
+ Ok :: <_, hyper:: Error >( Response :: builder( )
30
+ . header( $header. 0 , $header. 1 )
31
+ . header( "content-type" , "text/plain" )
32
+ . body( $body( ) )
33
+ . unwrap( )
34
+ )
35
+ } ) )
36
+ } ) ;
28
37
let srv = Server :: bind( & addr)
29
- . serve( || {
30
- let header = $header;
31
- let body = $body;
32
- service_fn_ok( move |_| {
33
- Response :: builder( )
34
- . header( header. 0 , header. 1 )
35
- . header( "content-type" , "text/plain" )
36
- . body( body( ) )
37
- . unwrap( )
38
- } )
39
- } ) ;
38
+ . serve( make_svc) ;
39
+
40
40
addr_tx. send( srv. local_addr( ) ) . unwrap( ) ;
41
- let fut = srv
42
- . map_err( |e| panic!( "server error: {}" , e) )
43
- . select( until_rx. then( |_| Ok ( ( ) ) ) )
44
- . then( |_| Ok ( ( ) ) ) ;
45
- let mut rt = tokio:: runtime:: current_thread:: Runtime :: new( ) . unwrap( ) ;
46
- rt. spawn( fut) ;
41
+
42
+ let graceful = srv
43
+ . with_graceful_shutdown( async {
44
+ until_rx. await . ok( ) ;
45
+ } ) ;
46
+ let mut rt = current_thread:: Runtime :: new( ) . unwrap( ) ;
47
+ rt. spawn( async {
48
+ if let Err ( e) = graceful. await {
49
+ panic!( "server error: {}" , e) ;
50
+ }
51
+ } ) ;
47
52
rt. run( ) . unwrap( ) ;
48
53
} ) ;
49
54
@@ -58,7 +63,7 @@ macro_rules! bench_server {
58
63
} ;
59
64
60
65
let mut tcp = TcpStream :: connect( addr) . unwrap( ) ;
61
- tcp. set_read_timeout( Some ( :: std :: time :: Duration :: from_secs( 3 ) ) ) . unwrap( ) ;
66
+ tcp. set_read_timeout( Some ( Duration :: from_secs( 3 ) ) ) . unwrap( ) ;
62
67
let mut buf = [ 0u8 ; 8192 ] ;
63
68
64
69
$b. bytes = 35 + total_bytes as u64 ;
@@ -91,7 +96,7 @@ fn throughput_fixedsize_large_payload(b: &mut test::Bencher) {
91
96
fn throughput_fixedsize_many_chunks ( b : & mut test:: Bencher ) {
92
97
bench_server ! ( b, ( "content-length" , "1000000" ) , || {
93
98
static S : & ' static [ & ' static [ u8 ] ] = & [ & [ b'x' ; 1_000 ] as & [ u8 ] ; 1_000 ] as _;
94
- Body :: wrap_stream( stream:: iter_ok :: <_ , String > ( S . iter( ) ) . map( |& s| s ) )
99
+ Body :: wrap_stream( stream:: iter ( S . iter( ) ) . map( |& s| Ok :: <_ , String > ( s ) ) )
95
100
} )
96
101
}
97
102
@@ -109,7 +114,7 @@ fn throughput_chunked_large_payload(b: &mut test::Bencher) {
109
114
fn throughput_chunked_many_chunks ( b : & mut test:: Bencher ) {
110
115
bench_server ! ( b, ( "transfer-encoding" , "chunked" ) , || {
111
116
static S : & ' static [ & ' static [ u8 ] ] = & [ & [ b'x' ; 1_000 ] as & [ u8 ] ; 1_000 ] as _;
112
- Body :: wrap_stream( stream:: iter_ok :: <_ , String > ( S . iter( ) ) . map( |& s| s ) )
117
+ Body :: wrap_stream( stream:: iter ( S . iter( ) ) . map( |& s| Ok :: <_ , String > ( s ) ) )
113
118
} )
114
119
}
115
120
@@ -118,7 +123,7 @@ fn raw_tcp_throughput_small_payload(b: &mut test::Bencher) {
118
123
let ( tx, rx) = mpsc:: channel ( ) ;
119
124
let listener = TcpListener :: bind ( "127.0.0.1:0" ) . unwrap ( ) ;
120
125
let addr = listener. local_addr ( ) . unwrap ( ) ;
121
- :: std:: thread:: spawn ( move || {
126
+ std:: thread:: spawn ( move || {
122
127
let mut sock = listener. accept ( ) . unwrap ( ) . 0 ;
123
128
124
129
let mut buf = [ 0u8 ; 8192 ] ;
@@ -160,7 +165,7 @@ fn raw_tcp_throughput_large_payload(b: &mut test::Bencher) {
160
165
Date: Fri, 12 May 2017 18:21:45 GMT\r \n \
161
166
\r \n \
162
167
";
163
- :: std:: thread:: spawn ( move || {
168
+ std:: thread:: spawn ( move || {
164
169
let mut sock = listener. accept ( ) . unwrap ( ) . 0 ;
165
170
166
171
let mut buf = [ 0u8 ; 8192 ] ;
@@ -190,4 +195,3 @@ fn raw_tcp_throughput_large_payload(b: &mut test::Bencher) {
190
195
} ) ;
191
196
tx. send ( ( ) ) . unwrap ( ) ;
192
197
}
193
-
0 commit comments