diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..2c47010a 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -4,7 +4,7 @@ use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; -use crate::{UnsubmittedOneshot, UnsubmittedWrite}; +use crate::{UnsubmittedOneshot, UnsubmittedRead, UnsubmittedWrite}; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -177,9 +177,23 @@ impl File { /// } /// ``` pub async fn read_at(&self, buf: T, pos: u64) -> crate::BufResult { - // Submit the read operation - let op = Op::read_at(&self.fd, buf, pos).unwrap(); - op.await + UnsubmittedOneshot::read_at(&self.fd, buf, pos) + .submit() + .await + } + + /// Read some bytes at the specified offset from the file into the specified + /// buffer, returning how many bytes were read. + /// + /// Like [`read`], but returns unsubmitted. + /// + /// Returns an UnsubmittedRead could be submitted. + pub async fn unsubmitted_read_at( + &self, + buf: T, + pos: u64, + ) -> UnsubmittedRead { + UnsubmittedOneshot::read_at(&self.fd, buf, pos) } /// Read some bytes at the specified offset from the file into the specified diff --git a/src/io/mod.rs b/src/io/mod.rs index 1afcef22..ce834687 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -15,7 +15,7 @@ pub(crate) use noop::NoOp; mod open; -mod read; +pub(crate) mod read; mod read_fixed; diff --git a/src/io/read.rs b/src/io/read.rs index c3395b40..65331d54 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,64 +1,63 @@ +use io_uring::cqueue::Entry; + use crate::buf::BoundedBufMut; use crate::io::SharedFd; -use crate::BufResult; +use crate::{BufResult, OneshotOutputTransform, UnsubmittedOneshot}; -use crate::runtime::driver::op::{Completable, CqeResult, Op}; -use crate::runtime::CONTEXT; use std::io; +use std::marker::PhantomData; + +/// An unsubmitted read operation. +pub type UnsubmittedRead = UnsubmittedOneshot, ReadTransform>; -pub(crate) struct Read { +#[allow(missing_docs)] +pub struct ReadData { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, + _fd: SharedFd, - /// Reference to the in-flight buffer. - pub(crate) buf: T, + buf: T, } -impl Op> { - pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { - use io_uring::{opcode, types}; - - CONTEXT.with(|x| { - x.handle().expect("Not in a runtime context").submit_op( - Read { - fd: fd.clone(), - buf, - }, - |read| { - // Get raw buffer info - let ptr = read.buf.stable_mut_ptr(); - let len = read.buf.bytes_total(); - opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build() - }, - ) - }) - } +#[allow(missing_docs)] +pub struct ReadTransform { + _phantom: PhantomData, } -impl Completable for Read -where - T: BoundedBufMut, -{ +impl OneshotOutputTransform for ReadTransform { type Output = BufResult; + type StoredData = ReadData; - fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let mut buf = self.buf; + fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { + let res = if cqe.result() >= 0 { + Ok(cqe.result() as usize) + } else { + Err(io::Error::from_raw_os_error(-cqe.result())) + }; - // If the operation was successful, advance the initialized cursor. - if let Ok(n) = res { - // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { - buf.set_init(n); - } - } + (res, data.buf) + } +} + +impl UnsubmittedRead { + pub(crate) fn read_at(fd: &SharedFd, mut buf: T, offset: u64) -> Self { + use io_uring::{opcode, types}; - (res, buf) + // Get raw buffer info + let ptr = buf.stable_mut_ptr(); + let len = buf.bytes_total(); + + Self::new( + ReadData { + _fd: fd.clone(), + buf, + }, + ReadTransform { + _phantom: PhantomData, + }, + opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) + .offset(offset as _) + .build(), + ) } } diff --git a/src/io/socket.rs b/src/io/socket.rs index dda1bb36..13f43ffa 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,5 +1,6 @@ use crate::io::write::UnsubmittedWrite; use crate::runtime::driver::op::Op; +use crate::UnsubmittedRead; use crate::{ buf::fixed::FixedBuf, buf::{BoundedBuf, BoundedBufMut, IoBuf, Slice}, @@ -168,9 +169,8 @@ impl Socket { op.await } - pub(crate) async fn read(&self, buf: T) -> crate::BufResult { - let op = Op::read_at(&self.fd, buf, 0).unwrap(); - op.await + pub(crate) fn read(&self, buf: T) -> UnsubmittedRead { + UnsubmittedOneshot::read_at(&self.fd, buf, 0) } pub(crate) async fn read_fixed(&self, buf: T) -> crate::BufResult diff --git a/src/lib.rs b/src/lib.rs index 819eebf7..cf35a608 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,9 @@ pub mod buf; pub mod fs; pub mod net; +pub use io::read::*; pub use io::write::*; + pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot}; pub use runtime::spawn; pub use runtime::Runtime; diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 2450dcb9..fe3fcbae 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -5,10 +5,9 @@ use std::{ }; use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut}, io::{SharedFd, Socket}, - UnsubmittedWrite, + UnsubmittedRead, UnsubmittedWrite, }; /// A TCP stream between a local and a remote socket. @@ -75,7 +74,16 @@ impl TcpStream { /// /// Returns the original buffer and quantity of data read. pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await + self.inner.read(buf).submit().await + } + + /// Read some data from the stream + /// + /// Like [`read`], but returns unsubmitted. + /// + /// Returns an UnsubmittedRead could be submitted. + pub fn unsubmitted_read(&self, buf: T) -> UnsubmittedRead { + self.inner.read(buf) } /// Read some data from the stream into a registered buffer. diff --git a/src/net/udp.rs b/src/net/udp.rs index cb0cef66..3f513f02 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,8 +1,7 @@ use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut}, io::{SharedFd, Socket}, - UnsubmittedWrite, + UnsubmittedRead, UnsubmittedWrite, }; use socket2::SockAddr; use std::{ @@ -317,7 +316,16 @@ impl UdpSocket { /// /// Returns the original buffer and quantity of data read. pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await + self.inner.read(buf).submit().await + } + + /// Read some data from the stream + /// + /// Like [`read`], but returns unsubmitted. + /// + /// Returns an UnsubmittedRead could be submitted. + pub fn unsubmitted_read(&self, buf: T) -> UnsubmittedRead { + self.inner.read(buf) } /// Receives a single datagram message into a registered buffer. diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index 40e7ddc5..a82b5a99 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -1,8 +1,7 @@ use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut}, io::{SharedFd, Socket}, - UnsubmittedWrite, + UnsubmittedRead, UnsubmittedWrite, }; use socket2::SockAddr; use std::{ @@ -76,7 +75,16 @@ impl UnixStream { /// Read some data from the stream into the buffer, returning the original buffer and /// quantity of data read. pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await + self.inner.read(buf).submit().await + } + + /// Read some data from the stream + /// + /// Like [`read`], but returns unsubmitted. + /// + /// Returns an UnsubmittedRead could be submitted. + pub fn unsubmitted_read(&self, buf: T) -> UnsubmittedRead { + self.inner.read(buf) } /// Like [`read`], but using a pre-mapped buffer