1
- use std:: {
2
- sync:: { Arc , Mutex } ,
3
- time:: Instant ,
4
- } ;
5
-
6
- use anyhow:: { Context , Result } ;
1
+ use anyhow:: Result ;
7
2
use clap:: Parser ;
8
- use iroh_net:: {
9
- endpoint:: { self , Connection } ,
10
- Endpoint , NodeAddr ,
11
- } ;
12
- use tokio:: sync:: Semaphore ;
13
- use tracing:: { info, trace} ;
14
-
15
- use iroh_net_bench:: {
16
- configure_tracing_subscriber, connect_client, drain_stream, rt, send_data_on_stream,
17
- server_endpoint,
18
- stats:: { Stats , TransferResult } ,
19
- Opt ,
20
- } ;
3
+
4
+ use iroh_net_bench:: { configure_tracing_subscriber, iroh, quinn, rt, s2n, Commands , Opt } ;
21
5
22
6
fn main ( ) {
23
- let opt = Opt :: parse ( ) ;
7
+ let cmd = Commands :: parse ( ) ;
24
8
configure_tracing_subscriber ( ) ;
25
9
10
+ match cmd {
11
+ Commands :: Iroh ( opt) => {
12
+ if let Err ( e) = run_iroh ( opt) {
13
+ eprintln ! ( "failed: {e:#}" ) ;
14
+ }
15
+ }
16
+ Commands :: Quinn ( opt) => {
17
+ if let Err ( e) = run_quinn ( opt) {
18
+ eprintln ! ( "failed: {e:#}" ) ;
19
+ }
20
+ }
21
+ Commands :: S2n ( opt) => {
22
+ if let Err ( e) = run_s2n ( opt) {
23
+ eprintln ! ( "failed: {e:#}" ) ;
24
+ }
25
+ }
26
+ }
27
+ }
28
+
29
+ pub fn run_iroh ( opt : Opt ) -> Result < ( ) > {
26
30
let server_span = tracing:: error_span!( "server" ) ;
27
31
let runtime = rt ( ) ;
28
32
let ( server_addr, endpoint) = {
29
33
let _guard = server_span. enter ( ) ;
30
- server_endpoint ( & runtime, & opt)
34
+ iroh :: server_endpoint ( & runtime, & opt)
31
35
} ;
32
36
33
37
let server_thread = std:: thread:: spawn ( move || {
34
38
let _guard = server_span. entered ( ) ;
35
- if let Err ( e) = runtime. block_on ( server ( endpoint, opt) ) {
39
+ if let Err ( e) = runtime. block_on ( iroh :: server ( endpoint, opt) ) {
36
40
eprintln ! ( "server failed: {e:#}" ) ;
37
41
}
38
42
} ) ;
@@ -43,7 +47,7 @@ fn main() {
43
47
handles. push ( std:: thread:: spawn ( move || {
44
48
let _guard = tracing:: error_span!( "client" , id) . entered ( ) ;
45
49
let runtime = rt ( ) ;
46
- match runtime. block_on ( client ( server_addr, opt) ) {
50
+ match runtime. block_on ( iroh :: client ( server_addr, opt) ) {
47
51
Ok ( stats) => Ok ( stats) ,
48
52
Err ( e) => {
49
53
eprintln ! ( "client failed: {e:#}" ) ;
@@ -62,153 +66,53 @@ fn main() {
62
66
}
63
67
64
68
server_thread. join ( ) . expect ( "server thread" ) ;
65
- }
66
-
67
- async fn server ( endpoint : Endpoint , opt : Opt ) -> Result < ( ) > {
68
- let mut server_tasks = Vec :: new ( ) ;
69
-
70
- // Handle only the expected amount of clients
71
- for _ in 0 ..opt. clients {
72
- let handshake = endpoint. accept ( ) . await . unwrap ( ) ;
73
- let connection = handshake. await . context ( "handshake failed" ) ?;
74
-
75
- server_tasks. push ( tokio:: spawn ( async move {
76
- loop {
77
- let ( mut send_stream, mut recv_stream) = match connection. accept_bi ( ) . await {
78
- Err ( endpoint:: ConnectionError :: ApplicationClosed ( _) ) => break ,
79
- Err ( e) => {
80
- eprintln ! ( "accepting stream failed: {e:?}" ) ;
81
- break ;
82
- }
83
- Ok ( stream) => stream,
84
- } ;
85
- trace ! ( "stream established" ) ;
86
-
87
- tokio:: spawn ( async move {
88
- drain_stream ( & mut recv_stream, opt. read_unordered ) . await ?;
89
- send_data_on_stream ( & mut send_stream, opt. download_size ) . await ?;
90
- Ok :: < _ , anyhow:: Error > ( ( ) )
91
- } ) ;
92
- }
93
-
94
- if opt. stats {
95
- println ! ( "\n Server connection stats:\n {:#?}" , connection. stats( ) ) ;
96
- }
97
- } ) ) ;
98
- }
99
-
100
- // Await all the tasks. We have to do this to prevent the runtime getting dropped
101
- // and all server tasks to be cancelled
102
- for handle in server_tasks {
103
- if let Err ( e) = handle. await {
104
- eprintln ! ( "Server task error: {e:?}" ) ;
105
- } ;
106
- }
107
69
108
70
Ok ( ( ) )
109
71
}
110
72
111
- async fn client ( server_addr : NodeAddr , opt : Opt ) -> Result < ClientStats > {
112
- let ( endpoint, connection) = connect_client ( server_addr, opt) . await ?;
113
-
114
- let start = Instant :: now ( ) ;
115
-
116
- let connection = Arc :: new ( connection) ;
117
-
118
- let mut stats = ClientStats :: default ( ) ;
119
- let mut first_error = None ;
120
-
121
- let sem = Arc :: new ( Semaphore :: new ( opt. max_streams ) ) ;
122
- let results = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
123
- for _ in 0 ..opt. streams {
124
- let permit = sem. clone ( ) . acquire_owned ( ) . await . unwrap ( ) ;
125
- let results = results. clone ( ) ;
126
- let connection = connection. clone ( ) ;
127
- tokio:: spawn ( async move {
128
- let result =
129
- handle_client_stream ( connection, opt. upload_size , opt. read_unordered ) . await ;
130
- info ! ( "stream finished: {:?}" , result) ;
131
- results. lock ( ) . unwrap ( ) . push ( result) ;
132
- drop ( permit) ;
133
- } ) ;
134
- }
73
+ pub fn run_quinn ( opt : Opt ) -> Result < ( ) > {
74
+ let server_span = tracing:: error_span!( "server" ) ;
75
+ let runtime = rt ( ) ;
76
+ let ( server_addr, endpoint) = {
77
+ let _guard = server_span. enter ( ) ;
78
+ quinn:: server_endpoint ( & runtime, & opt)
79
+ } ;
135
80
136
- // Wait for remaining streams to finish
137
- let _ = sem. acquire_many ( opt. max_streams as u32 ) . await . unwrap ( ) ;
81
+ let server_thread = std:: thread:: spawn ( move || {
82
+ let _guard = server_span. entered ( ) ;
83
+ if let Err ( e) = runtime. block_on ( quinn:: server ( endpoint, opt) ) {
84
+ eprintln ! ( "server failed: {e:#}" ) ;
85
+ }
86
+ } ) ;
138
87
139
- for result in results. lock ( ) . unwrap ( ) . drain ( ..) {
140
- match result {
141
- Ok ( ( upload_result, download_result) ) => {
142
- stats. upload_stats . stream_finished ( upload_result) ;
143
- stats. download_stats . stream_finished ( download_result) ;
144
- }
145
- Err ( e) => {
146
- if first_error. is_none ( ) {
147
- first_error = Some ( e) ;
88
+ let mut handles = Vec :: new ( ) ;
89
+ for id in 0 ..opt. clients {
90
+ handles. push ( std:: thread:: spawn ( move || {
91
+ let _guard = tracing:: error_span!( "client" , id) . entered ( ) ;
92
+ let runtime = rt ( ) ;
93
+ match runtime. block_on ( quinn:: client ( server_addr, opt) ) {
94
+ Ok ( stats) => Ok ( stats) ,
95
+ Err ( e) => {
96
+ eprintln ! ( "client failed: {e:#}" ) ;
97
+ Err ( e)
148
98
}
149
99
}
150
- }
151
- }
152
-
153
- stats. upload_stats . total_duration = start. elapsed ( ) ;
154
- stats. download_stats . total_duration = start. elapsed ( ) ;
155
-
156
- // Explicit close of the connection, since handles can still be around due
157
- // to `Arc`ing them
158
- connection. close ( 0u32 . into ( ) , b"Benchmark done" ) ;
159
-
160
- endpoint. close ( 0u32 . into ( ) , b"" ) . await ?;
161
-
162
- if opt. stats {
163
- println ! ( "\n Client connection stats:\n {:#?}" , connection. stats( ) ) ;
100
+ } ) ) ;
164
101
}
165
102
166
- match first_error {
167
- None => Ok ( stats) ,
168
- Some ( e) => Err ( e) ,
103
+ for ( id, handle) in handles. into_iter ( ) . enumerate ( ) {
104
+ // We print all stats at the end of the test sequentially to avoid
105
+ // them being garbled due to being printed concurrently
106
+ if let Ok ( stats) = handle. join ( ) . expect ( "client thread" ) {
107
+ stats. print ( id) ;
108
+ }
169
109
}
170
- }
171
-
172
- async fn handle_client_stream (
173
- connection : Arc < Connection > ,
174
- upload_size : u64 ,
175
- read_unordered : bool ,
176
- ) -> Result < ( TransferResult , TransferResult ) > {
177
- let start = Instant :: now ( ) ;
178
-
179
- let ( mut send_stream, mut recv_stream) = connection
180
- . open_bi ( )
181
- . await
182
- . context ( "failed to open stream" ) ?;
183
-
184
- send_data_on_stream ( & mut send_stream, upload_size) . await ?;
185
-
186
- let upload_result = TransferResult :: new ( start. elapsed ( ) , upload_size) ;
187
-
188
- let start = Instant :: now ( ) ;
189
- let size = drain_stream ( & mut recv_stream, read_unordered) . await ?;
190
- let download_result = TransferResult :: new ( start. elapsed ( ) , size as u64 ) ;
191
110
192
- Ok ( ( upload_result, download_result) )
193
- }
111
+ server_thread. join ( ) . expect ( "server thread" ) ;
194
112
195
- #[ derive( Default ) ]
196
- struct ClientStats {
197
- upload_stats : Stats ,
198
- download_stats : Stats ,
113
+ Ok ( ( ) )
199
114
}
200
115
201
- impl ClientStats {
202
- pub fn print ( & self , client_id : usize ) {
203
- println ! ( ) ;
204
- println ! ( "Client {client_id} stats:" ) ;
205
-
206
- if self . upload_stats . total_size != 0 {
207
- self . upload_stats . print ( "upload" ) ;
208
- }
209
-
210
- if self . download_stats . total_size != 0 {
211
- self . download_stats . print ( "download" ) ;
212
- }
213
- }
116
+ pub fn run_s2n ( _opt : s2n:: Opt ) -> Result < ( ) > {
117
+ unimplemented ! ( )
214
118
}
0 commit comments