Skip to content

Commit f97ffd9

Browse files
committed
perf(net): move named pipe bench and add unix bench
1 parent ca9b2dd commit f97ffd9

File tree

3 files changed

+163
-105
lines changed

3 files changed

+163
-105
lines changed

compio/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,3 @@ harness = false
159159
[[bench]]
160160
name = "net"
161161
harness = false
162-
163-
[[bench]]
164-
name = "named_pipe"
165-
harness = false

compio/benches/named_pipe.rs

Lines changed: 0 additions & 74 deletions
This file was deleted.

compio/benches/net.rs

Lines changed: 163 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,24 @@ criterion_main!(net);
1414
const BUFFER_SIZE: usize = 4096;
1515
const BUFFER_COUNT: usize = 1024;
1616

17-
fn echo_tokio(b: &mut Bencher, content: &[u8; BUFFER_SIZE]) {
18-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
17+
fn echo_tokio_tcp(b: &mut Bencher, content: &[u8; BUFFER_SIZE]) {
18+
use tokio::{
19+
io::{AsyncReadExt, AsyncWriteExt},
20+
net::{TcpListener, TcpStream},
21+
};
1922

2023
let runtime = tokio::runtime::Builder::new_current_thread()
2124
.enable_all()
2225
.build()
2326
.unwrap();
2427
b.to_async(&runtime).iter_custom(|iter| async move {
25-
let listener = tokio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0))
26-
.await
27-
.unwrap();
28+
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
2829
let addr = listener.local_addr().unwrap();
2930

3031
let start = Instant::now();
3132
for _i in 0..iter {
3233
let (mut tx, (mut rx, _)) =
33-
tokio::try_join!(tokio::net::TcpStream::connect(addr), listener.accept()).unwrap();
34+
tokio::try_join!(TcpStream::connect(addr), listener.accept()).unwrap();
3435

3536
let client = async move {
3637
let mut buffer = [0u8; BUFFER_SIZE];
@@ -52,25 +53,23 @@ fn echo_tokio(b: &mut Bencher, content: &[u8; BUFFER_SIZE]) {
5253
})
5354
}
5455

55-
fn echo_compio(b: &mut Bencher, content: Rc<[u8; BUFFER_SIZE]>) {
56-
use compio_io::{AsyncReadExt, AsyncWriteExt};
56+
fn echo_compio_tcp(b: &mut Bencher, content: Rc<[u8; BUFFER_SIZE]>) {
57+
use compio::{
58+
io::{AsyncReadExt, AsyncWriteExt},
59+
net::{TcpListener, TcpStream},
60+
};
5761

5862
let runtime = compio::runtime::Runtime::new().unwrap();
5963
b.to_async(&runtime).iter_custom(|iter| {
6064
let content = content.clone();
6165
async move {
62-
let listener = compio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0))
63-
.await
64-
.unwrap();
66+
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).await.unwrap();
6567
let addr = listener.local_addr().unwrap();
6668

6769
let start = Instant::now();
6870
for _i in 0..iter {
69-
let (mut tx, (mut rx, _)) = futures_util::try_join!(
70-
compio::net::TcpStream::connect(addr),
71-
listener.accept()
72-
)
73-
.unwrap();
71+
let (mut tx, (mut rx, _)) =
72+
futures_util::try_join!(TcpStream::connect(addr), listener.accept()).unwrap();
7473

7574
let client = async {
7675
let mut content = content.clone();
@@ -95,23 +94,23 @@ fn echo_compio(b: &mut Bencher, content: Rc<[u8; BUFFER_SIZE]>) {
9594
}
9695

9796
#[cfg(target_os = "linux")]
98-
fn echo_monoio(b: &mut Bencher, content: &[u8; BUFFER_SIZE]) {
99-
use monoio::io::{AsyncReadRentExt, AsyncWriteRentExt};
97+
fn echo_monoio_tcp(b: &mut Bencher, content: &[u8; BUFFER_SIZE]) {
98+
use monoio::{
99+
io::{AsyncReadRentExt, AsyncWriteRentExt},
100+
net::{TcpListener, TcpStream},
101+
};
100102

101103
let runtime = MonoioRuntime::new();
102104
b.to_async(&runtime).iter_custom(|iter| {
103105
let content = Box::new(*content);
104106
async move {
105-
let listener = monoio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
107+
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
106108
let addr = listener.local_addr().unwrap();
107109

108110
let start = Instant::now();
109111
for _i in 0..iter {
110-
let (mut tx, (mut rx, _)) = futures_util::try_join!(
111-
monoio::net::TcpStream::connect(addr),
112-
listener.accept()
113-
)
114-
.unwrap();
112+
let (mut tx, (mut rx, _)) =
113+
futures_util::try_join!(TcpStream::connect(addr), listener.accept()).unwrap();
115114

116115
let client = async {
117116
let mut content = content.clone();
@@ -141,6 +140,134 @@ fn echo_monoio(b: &mut Bencher, content: &[u8; BUFFER_SIZE]) {
141140
})
142141
}
143142

143+
#[cfg(windows)]
144+
fn echo_tokio_win_named_pipe(b: &mut Bencher, content: &[u8; BUFFER_SIZE]) {
145+
use tokio::{
146+
io::{AsyncReadExt, AsyncWriteExt},
147+
net::windows::named_pipe::{ClientOptions, ServerOptions},
148+
};
149+
150+
let runtime = tokio::runtime::Builder::new_current_thread()
151+
.enable_all()
152+
.build()
153+
.unwrap();
154+
b.to_async(&runtime).iter_custom(|iter| async move {
155+
const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe";
156+
157+
let start = Instant::now();
158+
for _i in 0..iter {
159+
let mut rx = ServerOptions::new().create(PIPE_NAME).unwrap();
160+
let mut tx = ClientOptions::new().open(PIPE_NAME).unwrap();
161+
162+
rx.connect().await.unwrap();
163+
164+
let client = async {
165+
let mut buffer = [0u8; BUFFER_SIZE];
166+
for _i in 0..BUFFER_COUNT {
167+
tx.write_all(content).await.unwrap();
168+
tx.read_exact(&mut buffer).await.unwrap();
169+
}
170+
};
171+
let server = async {
172+
let mut buffer = [0u8; BUFFER_SIZE];
173+
for _i in 0..BUFFER_COUNT {
174+
rx.read_exact(&mut buffer).await.unwrap();
175+
rx.write_all(&buffer).await.unwrap();
176+
}
177+
};
178+
tokio::join!(client, server);
179+
}
180+
start.elapsed()
181+
})
182+
}
183+
184+
#[cfg(windows)]
185+
fn echo_compio_win_named_pipe(b: &mut Bencher, content: Rc<[u8; BUFFER_SIZE]>) {
186+
use compio::{
187+
fs::named_pipe::{ClientOptions, ServerOptions},
188+
io::{AsyncReadExt, AsyncWriteExt},
189+
};
190+
191+
let runtime = compio::runtime::Runtime::new().unwrap();
192+
b.to_async(&runtime).iter_custom(|iter| {
193+
let content = content.clone();
194+
async move {
195+
const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe";
196+
197+
let start = Instant::now();
198+
let options = ClientOptions::new();
199+
for _i in 0..iter {
200+
let mut rx = ServerOptions::new().create(PIPE_NAME).unwrap();
201+
let (mut tx, ()) =
202+
futures_util::try_join!(options.open(PIPE_NAME), rx.connect()).unwrap();
203+
204+
let client = async {
205+
let mut content = content.clone();
206+
let mut buffer = Box::new([0u8; BUFFER_SIZE]);
207+
for _i in 0..BUFFER_COUNT {
208+
(_, content) = tx.write_all(content).await.unwrap();
209+
(_, buffer) = tx.read_exact(buffer).await.unwrap();
210+
}
211+
};
212+
let server = async {
213+
let mut buffer = Box::new([0u8; BUFFER_SIZE]);
214+
for _i in 0..BUFFER_COUNT {
215+
(_, buffer) = rx.read_exact(buffer).await.unwrap();
216+
(_, buffer) = rx.write_all(buffer).await.unwrap();
217+
}
218+
};
219+
tokio::join!(client, server);
220+
}
221+
start.elapsed()
222+
}
223+
})
224+
}
225+
226+
fn echo_compio_unix(b: &mut Bencher, content: Rc<[u8; BUFFER_SIZE]>) {
227+
use compio::{
228+
io::{AsyncReadExt, AsyncWriteExt},
229+
net::{UnixListener, UnixStream},
230+
};
231+
232+
let runtime = compio::runtime::Runtime::new().unwrap();
233+
b.to_async(&runtime).iter_custom(|iter| {
234+
let content = content.clone();
235+
async move {
236+
let dir = tempfile::Builder::new()
237+
.prefix("compio-uds")
238+
.tempdir()
239+
.unwrap();
240+
let sock_path = dir.path().join("connect.sock");
241+
let listener = UnixListener::bind(&sock_path).await.unwrap();
242+
243+
let start = Instant::now();
244+
for _i in 0..iter {
245+
let (mut tx, (mut rx, _)) =
246+
futures_util::try_join!(UnixStream::connect(&sock_path), listener.accept())
247+
.unwrap();
248+
249+
let client = async {
250+
let mut content = content.clone();
251+
let mut buffer = Box::new([0u8; BUFFER_SIZE]);
252+
for _i in 0..BUFFER_COUNT {
253+
(_, content) = tx.write_all(content).await.unwrap();
254+
(_, buffer) = tx.read_exact(buffer).await.unwrap();
255+
}
256+
};
257+
let server = async move {
258+
let mut buffer = Box::new([0u8; BUFFER_SIZE]);
259+
for _i in 0..BUFFER_COUNT {
260+
(_, buffer) = rx.read_exact(buffer).await.unwrap();
261+
(_, buffer) = rx.write_all(buffer).await.unwrap();
262+
}
263+
};
264+
futures_util::join!(client, server);
265+
}
266+
start.elapsed()
267+
}
268+
})
269+
}
270+
144271
fn echo(c: &mut Criterion) {
145272
let mut rng = thread_rng();
146273

@@ -151,10 +278,19 @@ fn echo(c: &mut Criterion) {
151278
let mut group = c.benchmark_group("echo");
152279
group.throughput(Throughput::Bytes((BUFFER_SIZE * BUFFER_COUNT * 2) as u64));
153280

154-
group.bench_function("tokio", |b| echo_tokio(b, &content));
155-
group.bench_function("compio", |b| echo_compio(b, content.clone()));
281+
group.bench_function("tokio-tcp", |b| echo_tokio_tcp(b, &content));
282+
group.bench_function("compio-tcp", |b| echo_compio_tcp(b, content.clone()));
156283
#[cfg(target_os = "linux")]
157-
group.bench_function("monoio", |b| echo_monoio(b, &content));
284+
group.bench_function("monoio-tcp", |b| echo_monoio_tcp(b, &content));
285+
286+
#[cfg(windows)]
287+
group.bench_function("tokio-pipe", |b| echo_tokio_win_named_pipe(b, &content));
288+
#[cfg(windows)]
289+
group.bench_function("compio-pipe", |b| {
290+
echo_compio_win_named_pipe(b, content.clone())
291+
});
292+
293+
group.bench_function("compio-unix", |b| echo_compio_unix(b, content.clone()));
158294

159295
group.finish();
160296
}

0 commit comments

Comments
 (0)