Skip to content

Commit 8f1bc5d

Browse files
committed
fix: repeat submit on iopoll without sqpoll
tokio-rs#297 Fixes: tokio-rs#296
2 parents af928fe + 6aeca26 commit 8f1bc5d

File tree

4 files changed

+55
-4
lines changed

4 files changed

+55
-4
lines changed

src/runtime/driver/handle.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,32 @@ pub(crate) struct WeakHandle {
3434
inner: Weak<RefCell<Driver>>,
3535
}
3636

37+
struct ThreadParker;
38+
impl std::future::Future for ThreadParker {
39+
type Output = ();
40+
fn poll(
41+
self: std::pin::Pin<&mut Self>,
42+
ctx: &mut std::task::Context<'_>,
43+
) -> std::task::Poll<<Self as std::future::Future>::Output> {
44+
ctx.waker().clone().wake();
45+
std::task::Poll::Pending
46+
}
47+
}
48+
3749
impl Handle {
38-
pub(crate) fn new(b: &crate::Builder) -> io::Result<Self> {
50+
pub(crate) fn new(
51+
b: &crate::Builder,
52+
tokio_rt: &tokio::runtime::Runtime,
53+
local: &tokio::task::LocalSet,
54+
) -> io::Result<Self> {
55+
let driver = Driver::new(b)?;
56+
let params = driver.uring.params();
57+
if params.is_setup_iopoll() && !params.is_setup_sqpoll() {
58+
let _guard = tokio_rt.enter();
59+
local.spawn_local(ThreadParker {});
60+
}
3961
Ok(Self {
40-
inner: Rc::new(RefCell::new(Driver::new(b)?)),
62+
inner: Rc::new(RefCell::new(driver)),
4163
})
4264
}
4365

src/runtime/driver/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub(crate) struct Driver {
1919
ops: Ops,
2020

2121
/// IoUring bindings
22-
uring: IoUring,
22+
pub(crate) uring: IoUring,
2323

2424
/// Reference to the currently registered buffers.
2525
/// Ensures that the buffers are not dropped until
@@ -40,6 +40,8 @@ impl Driver {
4040
pub(crate) fn new(b: &crate::Builder) -> io::Result<Driver> {
4141
let uring = b.urb.build(b.entries)?;
4242

43+
if uring.params().is_setup_iopoll() && !uring.params().is_setup_sqpoll() {}
44+
4345
Ok(Driver {
4446
ops: Ops::new(),
4547
uring,

src/runtime/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl Runtime {
8181

8282
let tokio_rt = ManuallyDrop::new(rt);
8383
let local = ManuallyDrop::new(LocalSet::new());
84-
let driver = driver::Handle::new(b)?;
84+
let driver = driver::Handle::new(b, &tokio_rt, &local)?;
8585

8686
start_uring_wakes_task(&tokio_rt, &local, driver.clone());
8787

tests/fs_file.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,33 @@ fn basic_fallocate() {
315315
});
316316
}
317317

318+
#[test]
319+
fn iopoll_without_sqpoll() {
320+
use std::os::unix::fs::OpenOptionsExt;
321+
let mut builder = tokio_uring::builder();
322+
builder.uring_builder(&tokio_uring::uring_builder().setup_iopoll());
323+
let runtime = tokio_uring::Runtime::new(&builder).unwrap();
324+
let tmp = tempfile();
325+
runtime.block_on(async {
326+
let file = std::fs::OpenOptions::new()
327+
.write(true)
328+
.custom_flags(libc::O_DIRECT)
329+
.open(tmp.path())
330+
.unwrap();
331+
let file = tokio_uring::fs::File::from_std(file);
332+
333+
let layout = std::alloc::Layout::from_size_align(512, 512).unwrap();
334+
let buf = unsafe {
335+
let raw = std::alloc::alloc(layout);
336+
std::ptr::copy("asdf".as_ptr(), raw, 4);
337+
std::slice::from_raw_parts(raw, 512)
338+
};
339+
340+
let res = file.write_at(buf, 0).submit().await.0.unwrap();
341+
assert_eq!(res, 512);
342+
});
343+
}
344+
318345
fn tempfile() -> NamedTempFile {
319346
NamedTempFile::new().unwrap()
320347
}

0 commit comments

Comments
 (0)