Skip to content

Commit 6aeca26

Browse files
committed
Repeat submit on iopoll without sqpoll (tokio-rs#296)
In iopoll without sqpoll mode, we need to submit repeatedly until we get completion. This patch adds some future that keep calling wake() for parking thread so runtime will submit on parking thread. Signed-off-by: Sidong Yang <sidong.yang@furiosa.ai>
1 parent a69d4bf commit 6aeca26

File tree

4 files changed

+55
-4
lines changed

4 files changed

+55
-4
lines changed

src/runtime/driver/handle.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,32 @@ pub(crate) struct WeakHandle {
3434
inner: Weak<RefCell<Driver>>,
3535
}
3636

37+
struct ThreadParker;
38+
impl std::future::Future for ThreadParker {
39+
type Output = ();
40+
fn poll(
41+
self: std::pin::Pin<&mut Self>,
42+
ctx: &mut std::task::Context<'_>,
43+
) -> std::task::Poll<<Self as std::future::Future>::Output> {
44+
ctx.waker().clone().wake();
45+
std::task::Poll::Pending
46+
}
47+
}
48+
3749
impl Handle {
38-
pub(crate) fn new(b: &crate::Builder) -> io::Result<Self> {
50+
pub(crate) fn new(
51+
b: &crate::Builder,
52+
tokio_rt: &tokio::runtime::Runtime,
53+
local: &tokio::task::LocalSet,
54+
) -> io::Result<Self> {
55+
let driver = Driver::new(b)?;
56+
let params = driver.uring.params();
57+
if params.is_setup_iopoll() && !params.is_setup_sqpoll() {
58+
let _guard = tokio_rt.enter();
59+
local.spawn_local(ThreadParker {});
60+
}
3961
Ok(Self {
40-
inner: Rc::new(RefCell::new(Driver::new(b)?)),
62+
inner: Rc::new(RefCell::new(driver)),
4163
})
4264
}
4365

src/runtime/driver/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub(crate) struct Driver {
1919
ops: Ops,
2020

2121
/// IoUring bindings
22-
uring: IoUring,
22+
pub(crate) uring: IoUring,
2323

2424
/// Reference to the currently registered buffers.
2525
/// Ensures that the buffers are not dropped until
@@ -40,6 +40,8 @@ impl Driver {
4040
pub(crate) fn new(b: &crate::Builder) -> io::Result<Driver> {
4141
let uring = b.urb.build(b.entries)?;
4242

43+
if uring.params().is_setup_iopoll() && !uring.params().is_setup_sqpoll() {}
44+
4345
Ok(Driver {
4446
ops: Ops::new(),
4547
uring,

src/runtime/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl Runtime {
8181

8282
let tokio_rt = ManuallyDrop::new(rt);
8383
let local = ManuallyDrop::new(LocalSet::new());
84-
let driver = driver::Handle::new(b)?;
84+
let driver = driver::Handle::new(b, &tokio_rt, &local)?;
8585

8686
start_uring_wakes_task(&tokio_rt, &local, driver.clone());
8787

tests/fs_file.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,33 @@ fn basic_fallocate() {
315315
});
316316
}
317317

318+
#[test]
319+
fn iopoll_without_sqpoll() {
320+
use std::os::unix::fs::OpenOptionsExt;
321+
let mut builder = tokio_uring::builder();
322+
builder.uring_builder(&tokio_uring::uring_builder().setup_iopoll());
323+
let runtime = tokio_uring::Runtime::new(&builder).unwrap();
324+
let tmp = tempfile();
325+
runtime.block_on(async {
326+
let file = std::fs::OpenOptions::new()
327+
.write(true)
328+
.custom_flags(libc::O_DIRECT)
329+
.open(tmp.path())
330+
.unwrap();
331+
let file = tokio_uring::fs::File::from_std(file);
332+
333+
let layout = std::alloc::Layout::from_size_align(512, 512).unwrap();
334+
let buf = unsafe {
335+
let raw = std::alloc::alloc(layout);
336+
std::ptr::copy("asdf".as_ptr(), raw, 4);
337+
std::slice::from_raw_parts(raw, 512)
338+
};
339+
340+
let res = file.write_at(buf, 0).submit().await.0.unwrap();
341+
assert_eq!(res, 512);
342+
});
343+
}
344+
318345
fn tempfile() -> NamedTempFile {
319346
NamedTempFile::new().unwrap()
320347
}

0 commit comments

Comments
 (0)