Skip to content

Allow specifying R/W flags for reads and writes on tokio_uring::fs::File #326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
542 changes: 528 additions & 14 deletions src/fs/file.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,4 @@ mod write_fixed;
mod writev;

mod writev_all;
pub(crate) use writev_all::writev_at_all;
pub(crate) use writev_all::writev_at_all;
3 changes: 2 additions & 1 deletion src/io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub(crate) struct Read<T> {
}

impl<T: BoundedBufMut> Op<Read<T>> {
pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result<Op<Read<T>>> {
pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64, flags: io_uring::types::RwFlags) -> io::Result<Op<Read<T>>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
Expand All @@ -32,6 +32,7 @@ impl<T: BoundedBufMut> Op<Read<T>> {
let len = read.buf.bytes_total();
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.rw_flags(flags)
.build()
},
)
Expand Down
2 changes: 2 additions & 0 deletions src/io/read_fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ where
fd: &SharedFd,
buf: T,
offset: u64,
flags: io_uring::types::RwFlags
) -> io::Result<Op<ReadFixed<T>>> {
use io_uring::{opcode, types};

Expand All @@ -41,6 +42,7 @@ where
let buf_index = read_fixed.buf.get_buf().buf_index();
opcode::ReadFixed::new(types::Fd(fd.raw_fd()), ptr, len as _, buf_index)
.offset(offset as _)
.rw_flags(flags)
.build()
},
)
Expand Down
2 changes: 2 additions & 0 deletions src/io/readv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl<T: BoundedBufMut> Op<Readv<T>> {
fd: &SharedFd,
mut bufs: Vec<T>,
offset: u64,
flags: io_uring::types::RwFlags,
) -> io::Result<Op<Readv<T>>> {
use io_uring::{opcode, types};

Expand Down Expand Up @@ -51,6 +52,7 @@ impl<T: BoundedBufMut> Op<Readv<T>> {
read.iovs.len() as u32,
)
.offset(offset as _)
.rw_flags(flags)
.build()
},
)
Expand Down
10 changes: 5 additions & 5 deletions src/io/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Socket {
}

pub(crate) fn write<T: BoundedBuf>(&self, buf: T) -> UnsubmittedWrite<T> {
UnsubmittedOneshot::write_at(&self.fd, buf, 0)
UnsubmittedOneshot::write_at(&self.fd, buf, 0, 0)
}

pub async fn write_all<T: BoundedBuf>(&self, buf: T) -> crate::BufResult<(), T> {
Expand Down Expand Up @@ -85,7 +85,7 @@ impl Socket {
where
T: BoundedBuf<Buf = FixedBuf>,
{
let op = Op::write_fixed_at(&self.fd, buf, 0).unwrap();
let op = Op::write_fixed_at(&self.fd, buf, 0, 0).unwrap();
op.await
}

Expand Down Expand Up @@ -130,7 +130,7 @@ impl Socket {
}

pub async fn writev<T: BoundedBuf>(&self, buf: Vec<T>) -> crate::BufResult<usize, Vec<T>> {
let op = Op::writev_at(&self.fd, buf, 0).unwrap();
let op = Op::writev_at(&self.fd, buf, 0, 0).unwrap();
op.await
}

Expand Down Expand Up @@ -169,15 +169,15 @@ impl Socket {
}

pub(crate) async fn read<T: BoundedBufMut>(&self, buf: T) -> crate::BufResult<usize, T> {
let op = Op::read_at(&self.fd, buf, 0).unwrap();
let op = Op::read_at(&self.fd, buf, 0, 0).unwrap();
op.await
}

pub(crate) async fn read_fixed<T>(&self, buf: T) -> crate::BufResult<usize, T>
where
T: BoundedBufMut<BufMut = FixedBuf>,
{
let op = Op::read_fixed_at(&self.fd, buf, 0).unwrap();
let op = Op::read_fixed_at(&self.fd, buf, 0, 0).unwrap();
op.await
}

Expand Down
3 changes: 2 additions & 1 deletion src/io/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<T> OneshotOutputTransform for WriteTransform<T> {
}

impl<T: BoundedBuf> UnsubmittedWrite<T> {
pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64) -> Self {
pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64, flags: io_uring::types::RwFlags) -> Self {
use io_uring::{opcode, types};

// Get raw buffer info
Expand All @@ -53,6 +53,7 @@ impl<T: BoundedBuf> UnsubmittedWrite<T> {
},
opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.rw_flags(flags)
.build(),
)
}
Expand Down
2 changes: 2 additions & 0 deletions src/io/write_fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ where
fd: &SharedFd,
buf: T,
offset: u64,
flags: io_uring::types::RwFlags,
) -> io::Result<Op<WriteFixed<T>>> {
use io_uring::{opcode, types};

Expand All @@ -40,6 +41,7 @@ where
let buf_index = write_fixed.buf.get_buf().buf_index();
opcode::WriteFixed::new(types::Fd(fd.raw_fd()), ptr, len as _, buf_index)
.offset(offset as _)
.rw_flags(flags)
.build()
},
)
Expand Down
4 changes: 3 additions & 1 deletion src/io/writev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ pub(crate) struct Writev<T> {
iovs: Vec<iovec>,
}

impl<T: BoundedBuf> Op<Writev<T>> {
impl<T: BoundedBuf> Op<Writev<T>> {
pub(crate) fn writev_at(
fd: &SharedFd,
mut bufs: Vec<T>,
offset: u64,
flags: io_uring::types::RwFlags,
) -> io::Result<Op<Writev<T>>> {
use io_uring::{opcode, types};

Expand All @@ -47,6 +48,7 @@ impl<T: BoundedBuf> Op<Writev<T>> {
write.iovs.len() as u32,
)
.offset(offset as _)
.rw_flags(flags)
.build()
},
)
Expand Down
5 changes: 4 additions & 1 deletion src/io/writev_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub(crate) async fn writev_at_all<T: BoundedBuf>(
fd: &SharedFd,
mut bufs: Vec<T>,
offset: Option<u64>,
flags: io_uring::types::RwFlags,
) -> crate::BufResult<usize, Vec<T>> {
// TODO decide if the function should return immediately if all the buffer lengths
// were to sum to zero. That would save an allocation and one call into writev.
Expand Down Expand Up @@ -50,7 +51,7 @@ pub(crate) async fn writev_at_all<T: BoundedBuf>(
};

// Call the Op that is internal to this module.
let op = Op::writev_at_all2(fd, bufs, iovs, iovs_ptr, iovs_len, o).unwrap();
let op = Op::writev_at_all2(fd, bufs, iovs, iovs_ptr, iovs_len, o, flags).unwrap();
let res;
(res, fd, bufs, iovs) = op.await;

Expand Down Expand Up @@ -125,6 +126,7 @@ impl<T: BoundedBuf> Op<WritevAll<T>> {
iovs_ptr: *const iovec,
iovs_len: u32,
offset: u64,
flags: io_uring::types::RwFlags,
) -> io::Result<Op<WritevAll<T>>> {
use io_uring::{opcode, types};

Expand All @@ -135,6 +137,7 @@ impl<T: BoundedBuf> Op<WritevAll<T>> {
|write| {
opcode::Writev::new(types::Fd(write.fd.raw_fd()), iovs_ptr, iovs_len)
.offset(offset as _)
.rw_flags(flags)
.build()
},
)
Expand Down