Skip to content

Commit e0a719c

Browse files
committed
io_uring: transfer_monitor implementation
1 parent ec11a3c commit e0a719c

File tree

1 file changed

+50
-5
lines changed

1 file changed

+50
-5
lines changed

src/io_uring.rs

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
66
use std::sync::Arc;
77
use std::time::{Duration, Instant};
88
use tokio::sync::Notify;
9-
use tokio::time::timeout;
9+
use tokio::time::{sleep, timeout};
1010
use tokio_uring::buf::BoundedBuf;
1111
use tokio_uring::buf::BoundedBufMut;
1212
use tokio_uring::fs::File;
@@ -131,12 +131,57 @@ async fn copy<A: Endpoint<A>, B: Endpoint<B>>(
131131

132132
async fn transfer_monitor(
133133
stats_interval: Option<Duration>,
134-
_: Arc<AtomicUsize>,
135-
_: Arc<AtomicUsize>,
134+
usb_bytes_written: Arc<AtomicUsize>,
135+
tcp_bytes_written: Arc<AtomicUsize>,
136136
) -> Result<(), std::io::Error> {
137-
//TODO: implementation
137+
let mut usb_bytes_out_last: usize = 0;
138+
let mut tcp_bytes_out_last: usize = 0;
139+
let mut report_time = Instant::now();
140+
141+
loop {
142+
// Stats printing
143+
if stats_interval.is_some() && report_time.elapsed() > stats_interval.unwrap() {
144+
// load current total transfer from AtomicUsize:
145+
let usb_bytes_out = usb_bytes_written.load(Ordering::Relaxed);
146+
let tcp_bytes_out = tcp_bytes_written.load(Ordering::Relaxed);
147+
148+
// compute USB transfer
149+
usb_bytes_out_last = usb_bytes_out - usb_bytes_out_last;
150+
let usb_transferred_total = ByteSize::b(usb_bytes_out.try_into().unwrap());
151+
let usb_transferred_last = ByteSize::b(usb_bytes_out_last.try_into().unwrap());
152+
let usb_speed: u64 =
153+
(usb_bytes_out_last as f64 / report_time.elapsed().as_secs_f64()).round() as u64;
154+
let usb_speed = ByteSize::b(usb_speed);
155+
156+
// compute TCP transfer
157+
tcp_bytes_out_last = tcp_bytes_out - tcp_bytes_out_last;
158+
let tcp_transferred_total = ByteSize::b(tcp_bytes_out.try_into().unwrap());
159+
let tcp_transferred_last = ByteSize::b(tcp_bytes_out_last.try_into().unwrap());
160+
let tcp_speed: u64 =
161+
(tcp_bytes_out_last as f64 / report_time.elapsed().as_secs_f64()).round() as u64;
162+
let tcp_speed = ByteSize::b(tcp_speed);
163+
164+
info!(
165+
"{} {} {: >9} ({: >9}/s), {: >9} total | {} {: >9} ({: >9}/s), {: >9} total",
166+
NAME,
167+
"phone -> car 🔺",
168+
usb_transferred_last.to_string_as(true),
169+
usb_speed.to_string_as(true),
170+
usb_transferred_total.to_string_as(true),
171+
"car -> phone 🔻",
172+
tcp_transferred_last.to_string_as(true),
173+
tcp_speed.to_string_as(true),
174+
tcp_transferred_total.to_string_as(true),
175+
);
138176

139-
Ok(())
177+
// save values for next iteration
178+
report_time = Instant::now();
179+
usb_bytes_out_last = usb_bytes_out;
180+
tcp_bytes_out_last = tcp_bytes_out;
181+
}
182+
183+
sleep(Duration::from_millis(100)).await;
184+
}
140185
}
141186

142187
pub async fn io_loop(

0 commit comments

Comments
 (0)