Skip to content

Commit 25d6bd7

Browse files
committed
io_uring: use generic function and remove old functions
1 parent a7bd58a commit 25d6bd7

File tree

1 file changed

+6
-136
lines changed

1 file changed

+6
-136
lines changed

src/io_uring.rs

Lines changed: 6 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -122,140 +122,6 @@ async fn copy<A: Endpoint<A>, B: Endpoint<B>>(
122122
}
123123
}
124124

125-
async fn copy_file_to_stream(
126-
from: Rc<tokio_uring::fs::File>,
127-
to: Rc<tokio_uring::net::TcpStream>,
128-
stats_interval: Option<Duration>,
129-
) -> Result<(), std::io::Error> {
130-
// For statistics
131-
let mut bytes_out: usize = 0;
132-
let mut bytes_out_last: usize = 0;
133-
let mut report_time = Instant::now();
134-
135-
let mut buf = vec![0u8; BUFFER_LEN];
136-
loop {
137-
// Handle stats printing
138-
if stats_interval.is_some() && report_time.elapsed() > stats_interval.unwrap() {
139-
let transferred_total = ByteSize::b(bytes_out.try_into().unwrap());
140-
let transferred_last = ByteSize::b(bytes_out_last.try_into().unwrap());
141-
142-
let speed: u64 =
143-
(bytes_out_last as f64 / report_time.elapsed().as_secs_f64()).round() as u64;
144-
let speed = ByteSize::b(speed);
145-
146-
info!(
147-
"{} 📲 car to phone transfer: {:#} ({:#}/s), {:#} total",
148-
NAME,
149-
transferred_last.to_string_as(true),
150-
speed.to_string_as(true),
151-
transferred_total.to_string_as(true),
152-
);
153-
154-
report_time = Instant::now();
155-
bytes_out_last = 0;
156-
}
157-
158-
// things look weird: we pass ownership of the buffer to `read`, and we get
159-
// it back, _even if there was an error_. There's a whole trait for that,
160-
// which `Vec<u8>` implements!
161-
debug!("USB: before read");
162-
let retval = from.read_at(buf, 0);
163-
let (res, buf_read) = timeout(READ_TIMEOUT, retval).await?;
164-
// Propagate errors, see how many bytes we read
165-
let n = res?;
166-
debug!("USB: after read, {} bytes", n);
167-
if n == 0 {
168-
// A read of size zero signals EOF (end of file), finish gracefully
169-
return Ok(());
170-
}
171-
172-
// The `slice` method here is implemented in an extension trait: it
173-
// returns an owned slice of our `Vec<u8>`, which we later turn back
174-
// into the full `Vec<u8>`
175-
debug!("USB: before write");
176-
let (res, buf_write) = to.write(buf_read.slice(..n)).submit().await;
177-
let n = res?;
178-
debug!("USB: after write, {} bytes", n);
179-
// Increment byte counters for statistics
180-
if stats_interval.is_some() {
181-
bytes_out += n;
182-
bytes_out_last += n;
183-
}
184-
185-
// Later is now, we want our full buffer back.
186-
// That's why we declared our binding `mut` way back at the start of `copy`,
187-
// even though we moved it into the very first `TcpStream::read` call.
188-
buf = buf_write.into_inner();
189-
}
190-
}
191-
192-
async fn copy_stream_to_file(
193-
from: Rc<tokio_uring::net::TcpStream>,
194-
to: Rc<tokio_uring::fs::File>,
195-
stats_interval: Option<Duration>,
196-
) -> Result<(), std::io::Error> {
197-
// For statistics
198-
let mut bytes_out: usize = 0;
199-
let mut bytes_out_last: usize = 0;
200-
let mut report_time = Instant::now();
201-
202-
let mut buf = vec![0u8; BUFFER_LEN];
203-
loop {
204-
// Handle stats printing
205-
if stats_interval.is_some() && report_time.elapsed() > stats_interval.unwrap() {
206-
let transferred_total = ByteSize::b(bytes_out.try_into().unwrap());
207-
let transferred_last = ByteSize::b(bytes_out_last.try_into().unwrap());
208-
209-
let speed: u64 =
210-
(bytes_out_last as f64 / report_time.elapsed().as_secs_f64()).round() as u64;
211-
let speed = ByteSize::b(speed);
212-
213-
info!(
214-
"{} 📱 phone to car transfer: {:#} ({:#}/s), {:#} total",
215-
NAME,
216-
transferred_last.to_string_as(true),
217-
speed.to_string_as(true),
218-
transferred_total.to_string_as(true),
219-
);
220-
221-
report_time = Instant::now();
222-
bytes_out_last = 0;
223-
}
224-
225-
// things look weird: we pass ownership of the buffer to `read`, and we get
226-
// it back, _even if there was an error_. There's a whole trait for that,
227-
// which `Vec<u8>` implements!
228-
debug!("TCP: before read");
229-
let retval = from.read(buf);
230-
let (res, buf_read) = timeout(READ_TIMEOUT, retval).await?;
231-
// Propagate errors, see how many bytes we read
232-
let n = res?;
233-
debug!("TCP: after read, {} bytes", n);
234-
if n == 0 {
235-
// A read of size zero signals EOF (end of file), finish gracefully
236-
return Ok(());
237-
}
238-
239-
// The `slice` method here is implemented in an extension trait: it
240-
// returns an owned slice of our `Vec<u8>`, which we later turn back
241-
// into the full `Vec<u8>`
242-
debug!("TCP: before write");
243-
let (res, buf_write) = to.write_at(buf_read.slice(..n), 0).submit().await;
244-
let n = res?;
245-
debug!("TCP: after write, {} bytes", n);
246-
// Increment byte counters for statistics
247-
if stats_interval.is_some() {
248-
bytes_out += n;
249-
bytes_out_last += n;
250-
}
251-
252-
// Later is now, we want our full buffer back.
253-
// That's why we declared our binding `mut` way back at the start of `copy`,
254-
// even though we moved it into the very first `TcpStream::read` call.
255-
buf = buf_write.into_inner();
256-
}
257-
}
258-
259125
pub async fn io_loop(
260126
stats_interval: Option<Duration>,
261127
need_restart: Arc<Notify>,
@@ -317,14 +183,18 @@ pub async fn io_loop(
317183
let stream = Rc::new(stream);
318184

319185
// We need to copy in both directions...
320-
let mut from_file = tokio_uring::spawn(copy_file_to_stream(
186+
let mut from_file = tokio_uring::spawn(copy(
321187
file.clone(),
322188
stream.clone(),
189+
"USB",
190+
"📲 car to phone",
323191
stats_interval,
324192
));
325-
let mut from_stream = tokio_uring::spawn(copy_stream_to_file(
193+
let mut from_stream = tokio_uring::spawn(copy(
326194
stream.clone(),
327195
file.clone(),
196+
"TCP",
197+
"📱 phone to car",
328198
stats_interval,
329199
));
330200

0 commit comments

Comments
 (0)