Skip to content

Commit ec11a3c

Browse files
committed
io_uring: introduce transfer monitoring thread
1 parent 230c87d commit ec11a3c

File tree

1 file changed

+23
-1
lines changed

1 file changed

+23
-1
lines changed

src/io_uring.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::TCP_SERVER_PORT;
22
use bytesize::ByteSize;
33
use simplelog::*;
44
use std::rc::Rc;
5+
use std::sync::atomic::{AtomicUsize, Ordering};
56
use std::sync::Arc;
67
use std::time::{Duration, Instant};
78
use tokio::sync::Notify;
@@ -62,6 +63,7 @@ async fn copy<A: Endpoint<A>, B: Endpoint<B>>(
6263
dbg_name: &'static str,
6364
direction: &'static str,
6465
stats_interval: Option<Duration>,
66+
bytes_written: Arc<AtomicUsize>,
6567
) -> Result<(), std::io::Error> {
6668
// For statistics
6769
let mut bytes_out: usize = 0;
@@ -115,6 +117,7 @@ async fn copy<A: Endpoint<A>, B: Endpoint<B>>(
115117
debug!("{}: after write, {} bytes", dbg_name, n);
116118
// Increment byte counters for statistics
117119
if stats_interval.is_some() {
120+
bytes_written.fetch_add(n, Ordering::Relaxed);
118121
bytes_out += n;
119122
bytes_out_last += n;
120123
}
@@ -126,6 +129,16 @@ async fn copy<A: Endpoint<A>, B: Endpoint<B>>(
126129
}
127130
}
128131

132+
async fn transfer_monitor(
133+
stats_interval: Option<Duration>,
134+
_: Arc<AtomicUsize>,
135+
_: Arc<AtomicUsize>,
136+
) -> Result<(), std::io::Error> {
137+
//TODO: implementation
138+
139+
Ok(())
140+
}
141+
129142
pub async fn io_loop(
130143
stats_interval: Option<Duration>,
131144
need_restart: Arc<Notify>,
@@ -180,7 +193,9 @@ pub async fn io_loop(
180193
// tokio-uring runtime is single-threaded, we can use `Rc` instead of
181194
// `Arc`.
182195
let file = Rc::new(usb);
196+
let file_bytes = Arc::new(AtomicUsize::new(0));
183197
let stream = Rc::new(stream);
198+
let stream_bytes = Arc::new(AtomicUsize::new(0));
184199

185200
// We need to copy in both directions...
186201
let mut from_file = tokio_uring::spawn(copy(
@@ -189,17 +204,23 @@ pub async fn io_loop(
189204
"USB",
190205
"📲 car to phone",
191206
stats_interval,
207+
stream_bytes.clone(),
192208
));
193209
let mut from_stream = tokio_uring::spawn(copy(
194210
stream.clone(),
195211
file.clone(),
196212
"TCP",
197213
"📱 phone to car",
198214
stats_interval,
215+
file_bytes.clone(),
199216
));
200217

218+
// Thread for monitoring transfer
219+
let mut monitor =
220+
tokio_uring::spawn(transfer_monitor(stats_interval, file_bytes, stream_bytes));
221+
201222
// Stop as soon as one of them errors
202-
let res = tokio::try_join!(&mut from_file, &mut from_stream);
223+
let res = tokio::try_join!(&mut from_file, &mut from_stream, &mut monitor);
203224
if let Err(e) = res {
204225
error!("{} Connection error: {}", NAME, e);
205226
}
@@ -208,6 +229,7 @@ pub async fn io_loop(
208229
// for each direction)
209230
from_file.abort();
210231
from_stream.abort();
232+
monitor.abort();
211233

212234
// stream(s) closed, notify main loop to restart
213235
need_restart.notify_one();

0 commit comments

Comments
 (0)