Skip to content

Commit 489ec5f

Browse files
committed
io_uring: fix: flatten threads for try_join! macro
The try join was not working as supposed to: "Waits on multiple concurrent branches, returning when all branches complete with Ok(_) or on the first Err(_)." But if a single thread return with Err, try_join was still running. The original code was from this example: https://fasterthanli.me/articles/remote-development-with-rust-on-fly-io?ref=dailydev#a-wonderful-tcp-proxy-with-tokio-uring But the threads was not flattened like an official tokio docs is saying: https://docs.rs/tokio/latest/tokio/macro.try_join.html#examples (second example: "Using try_join! with spawned tasks.") This commit is fixing this problem.
1 parent 0b7fc79 commit 489ec5f

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

src/io_uring.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
66
use std::sync::Arc;
77
use std::time::{Duration, Instant};
88
use tokio::sync::Notify;
9+
use tokio::task::JoinHandle;
910
use tokio::time::{sleep, timeout};
1011
use tokio_uring::buf::BoundedBuf;
1112
use tokio_uring::buf::BoundedBufMut;
@@ -175,6 +176,14 @@ async fn transfer_monitor(
175176
}
176177
}
177178

179+
async fn flatten<T>(handle: &mut JoinHandle<Result<T>>) -> Result<T> {
180+
match handle.await {
181+
Ok(Ok(result)) => Ok(result),
182+
Ok(Err(err)) => Err(err),
183+
Err(_) => Err("handling failed".into()),
184+
}
185+
}
186+
178187
pub async fn io_loop(
179188
stats_interval: Option<Duration>,
180189
need_restart: Arc<Notify>,
@@ -251,7 +260,11 @@ pub async fn io_loop(
251260
let mut monitor = tokio::spawn(transfer_monitor(stats_interval, file_bytes, stream_bytes));
252261

253262
// Stop as soon as one of them errors
254-
let res = tokio::try_join!(&mut from_file, &mut from_stream, &mut monitor);
263+
let res = tokio::try_join!(
264+
flatten(&mut from_file),
265+
flatten(&mut from_stream),
266+
flatten(&mut monitor)
267+
);
255268
if let Err(e) = res {
256269
error!("{} Connection error: {}", NAME, e);
257270
}

0 commit comments

Comments
 (0)