Skip to content

Commit 8d0cca3

Browse files
committed
perf(net): remove udp and rewrite tcp as echo bench
1 parent 51fcf25 commit 8d0cca3

File tree

1 file changed

+89
-111
lines changed

1 file changed

+89
-111
lines changed

compio/benches/net.rs

Lines changed: 89 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,129 +1,107 @@
1-
use criterion::{criterion_group, criterion_main, Criterion};
1+
use std::{net::Ipv4Addr, rc::Rc, time::Instant};
22

3-
criterion_group!(net, tcp, udp);
3+
use criterion::{criterion_group, criterion_main, Bencher, Criterion, Throughput};
4+
use rand::{thread_rng, RngCore};
5+
6+
criterion_group!(net, echo);
47
criterion_main!(net);
58

6-
fn tcp(c: &mut Criterion) {
7-
const PACKET_LEN: usize = 1048576;
8-
static PACKET: &[u8] = &[1u8; PACKET_LEN];
9+
const BUFFER_SIZE: usize = 4096;
10+
const BUFFER_COUNT: usize = 1024;
911

10-
let mut group = c.benchmark_group("tcp");
12+
fn echo_tokio(b: &mut Bencher, content: &[u8; BUFFER_SIZE]) {
13+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
1114

12-
group.bench_function("tokio", |b| {
13-
let runtime = tokio::runtime::Builder::new_current_thread()
14-
.enable_all()
15-
.build()
15+
let runtime = tokio::runtime::Builder::new_current_thread()
16+
.enable_all()
17+
.build()
18+
.unwrap();
19+
b.to_async(&runtime).iter_custom(|iter| async move {
20+
let listener = tokio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0))
21+
.await
1622
.unwrap();
17-
b.to_async(&runtime).iter(|| async {
18-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
19-
20-
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
21-
let addr = listener.local_addr().unwrap();
22-
let tx = tokio::net::TcpStream::connect(addr);
23-
let rx = listener.accept();
24-
let (mut tx, (mut rx, _)) = tokio::try_join!(tx, rx).unwrap();
25-
tx.write_all(PACKET).await.unwrap();
26-
let mut buffer = Vec::with_capacity(PACKET_LEN);
27-
while buffer.len() < PACKET_LEN {
28-
rx.read_buf(&mut buffer).await.unwrap();
29-
}
30-
buffer
31-
})
32-
});
23+
let addr = listener.local_addr().unwrap();
24+
25+
let start = Instant::now();
26+
for _i in 0..iter {
27+
let (mut tx, (mut rx, _)) =
28+
futures_util::try_join!(tokio::net::TcpStream::connect(addr), listener.accept())
29+
.unwrap();
30+
31+
let client = async move {
32+
let mut buffer = [0u8; BUFFER_SIZE];
33+
for _i in 0..BUFFER_COUNT {
34+
tx.write_all(content).await.unwrap();
35+
tx.read_exact(&mut buffer).await.unwrap();
36+
}
37+
};
38+
let server = async move {
39+
let mut buffer = [0u8; BUFFER_SIZE];
40+
for _i in 0..BUFFER_COUNT {
41+
rx.read_exact(&mut buffer).await.unwrap();
42+
rx.write_all(&buffer).await.unwrap();
43+
}
44+
};
45+
futures_util::join!(client, server);
46+
}
47+
start.elapsed()
48+
})
49+
}
3350

34-
group.bench_function("compio", |b| {
35-
let runtime = compio::runtime::Runtime::new().unwrap();
36-
b.to_async(&runtime).iter(|| async {
37-
use compio::io::{AsyncReadExt, AsyncWriteExt};
51+
fn echo_compio(b: &mut Bencher, content: Rc<[u8; BUFFER_SIZE]>) {
52+
use compio_io::{AsyncReadExt, AsyncWriteExt};
3853

39-
let listener = compio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
54+
let runtime = compio::runtime::Runtime::new().unwrap();
55+
b.to_async(&runtime).iter_custom(|iter| {
56+
let content = content.clone();
57+
async move {
58+
let listener = compio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, 0))
59+
.await
60+
.unwrap();
4061
let addr = listener.local_addr().unwrap();
41-
let tx = compio::net::TcpStream::connect(addr);
42-
let rx = listener.accept();
43-
let (mut tx, (mut rx, _)) = futures_util::try_join!(tx, rx).unwrap();
44-
tx.write_all(PACKET).await.0.unwrap();
45-
let buffer = Vec::with_capacity(PACKET_LEN);
46-
let (_, buffer) = rx.read_exact(buffer).await.unwrap();
47-
buffer
48-
})
49-
});
5062

51-
group.finish();
63+
let start = Instant::now();
64+
for _i in 0..iter {
65+
let (mut tx, (mut rx, _)) = futures_util::try_join!(
66+
compio::net::TcpStream::connect(addr),
67+
listener.accept()
68+
)
69+
.unwrap();
70+
71+
let client = async {
72+
let mut content = content.clone();
73+
let mut buffer = Box::new([0u8; BUFFER_SIZE]);
74+
for _i in 0..BUFFER_COUNT {
75+
(_, content) = tx.write_all(content).await.unwrap();
76+
(_, buffer) = tx.read_exact(buffer).await.unwrap();
77+
}
78+
};
79+
let server = async move {
80+
let mut buffer = Box::new([0u8; BUFFER_SIZE]);
81+
for _i in 0..BUFFER_COUNT {
82+
(_, buffer) = rx.read_exact(buffer).await.unwrap();
83+
(_, buffer) = rx.write_all(buffer).await.unwrap();
84+
}
85+
};
86+
futures_util::join!(client, server);
87+
}
88+
start.elapsed()
89+
}
90+
})
5291
}
5392

54-
fn udp(c: &mut Criterion) {
55-
const PACKET_LEN: usize = 1024;
56-
static PACKET: &[u8] = &[1u8; PACKET_LEN];
93+
fn echo(c: &mut Criterion) {
94+
let mut rng = thread_rng();
5795

58-
let mut group = c.benchmark_group("udp");
96+
let mut content = [0u8; BUFFER_SIZE];
97+
rng.fill_bytes(&mut content);
98+
let content = Rc::new(content);
5999

60-
// The socket may be dropped by firewall when the number is too large.
61-
#[cfg(target_os = "linux")]
62-
group
63-
.sample_size(16)
64-
.measurement_time(std::time::Duration::from_millis(2))
65-
.warm_up_time(std::time::Duration::from_millis(2));
100+
let mut group = c.benchmark_group("echo");
101+
group.throughput(Throughput::Bytes((BUFFER_SIZE * BUFFER_COUNT * 2) as u64));
66102

67-
group.bench_function("tokio", |b| {
68-
let runtime = tokio::runtime::Builder::new_current_thread()
69-
.enable_all()
70-
.build()
71-
.unwrap();
72-
b.to_async(&runtime).iter(|| async {
73-
let rx = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
74-
let addr_rx = rx.local_addr().unwrap();
75-
let tx = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
76-
let addr_tx = tx.local_addr().unwrap();
77-
78-
rx.connect(addr_tx).await.unwrap();
79-
tx.connect(addr_rx).await.unwrap();
80-
81-
{
82-
let mut pos = 0;
83-
while pos < PACKET_LEN {
84-
let res = tx.send(&PACKET[pos..]).await;
85-
pos += res.unwrap();
86-
}
87-
}
88-
{
89-
let mut buffer = vec![0; PACKET_LEN];
90-
let mut pos = 0;
91-
while pos < PACKET_LEN {
92-
let res = rx.recv(&mut buffer[pos..]).await;
93-
pos += res.unwrap();
94-
}
95-
buffer
96-
}
97-
})
98-
});
99-
100-
group.bench_function("compio", |b| {
101-
let runtime = compio::runtime::Runtime::new().unwrap();
102-
b.to_async(&runtime).iter(|| async {
103-
let rx = compio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
104-
let addr_rx = rx.local_addr().unwrap();
105-
let tx = compio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
106-
let addr_tx = tx.local_addr().unwrap();
107-
108-
rx.connect(addr_tx).await.unwrap();
109-
tx.connect(addr_rx).await.unwrap();
110-
111-
{
112-
let mut pos = 0;
113-
while pos < PACKET_LEN {
114-
let (res, _) = tx.send(&PACKET[pos..]).await.unwrap();
115-
pos += res;
116-
}
117-
}
118-
{
119-
let mut buffer = Vec::with_capacity(PACKET_LEN);
120-
while buffer.len() < PACKET_LEN {
121-
(_, buffer) = rx.recv(buffer).await.unwrap();
122-
}
123-
buffer
124-
}
125-
})
126-
});
103+
group.bench_function("tokio", |b| echo_tokio(b, &content));
104+
group.bench_function("compio", |b| echo_compio(b, content.clone()));
127105

128106
group.finish();
129107
}

0 commit comments

Comments
 (0)