From 938e76926f0459eff6e3f840961378b348d88eb5 Mon Sep 17 00:00:00 2001 From: Sidong Yang Date: Fri, 27 Dec 2024 15:30:53 +0900 Subject: [PATCH 1/2] Introduce UnsubmittedRead for unsubmitted_read_at() This patch introduces UnsubmittedRead like read version of UnsubmittedWrite. And replace old version Op::read_at() to UnsubmittedOneshot::read_at() Signed-off-by: Sidong Yang --- src/fs/file.rs | 6 +-- src/io/mod.rs | 2 +- src/io/read.rs | 91 +++++++++++++++++++++--------------------- src/io/socket.rs | 6 +-- src/lib.rs | 2 + src/net/tcp/stream.rs | 5 +-- src/net/udp.rs | 2 +- src/net/unix/stream.rs | 2 +- 8 files changed, 58 insertions(+), 58 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..4a67889f 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -177,9 +177,9 @@ impl File { /// } /// ``` pub async fn read_at(&self, buf: T, pos: u64) -> crate::BufResult { - // Submit the read operation - let op = Op::read_at(&self.fd, buf, pos).unwrap(); - op.await + UnsubmittedOneshot::read_at(&self.fd, buf, pos) + .submit() + .await } /// Read some bytes at the specified offset from the file into the specified diff --git a/src/io/mod.rs b/src/io/mod.rs index 1afcef22..ce834687 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -15,7 +15,7 @@ pub(crate) use noop::NoOp; mod open; -mod read; +pub(crate) mod read; mod read_fixed; diff --git a/src/io/read.rs b/src/io/read.rs index c3395b40..65331d54 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,64 +1,63 @@ +use io_uring::cqueue::Entry; + use crate::buf::BoundedBufMut; use crate::io::SharedFd; -use crate::BufResult; +use crate::{BufResult, OneshotOutputTransform, UnsubmittedOneshot}; -use crate::runtime::driver::op::{Completable, CqeResult, Op}; -use crate::runtime::CONTEXT; use std::io; +use std::marker::PhantomData; + +/// An unsubmitted read operation. +pub type UnsubmittedRead = UnsubmittedOneshot, ReadTransform>; -pub(crate) struct Read { +#[allow(missing_docs)] +pub struct ReadData { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. - #[allow(dead_code)] - fd: SharedFd, + _fd: SharedFd, - /// Reference to the in-flight buffer. - pub(crate) buf: T, + buf: T, } -impl Op> { - pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { - use io_uring::{opcode, types}; - - CONTEXT.with(|x| { - x.handle().expect("Not in a runtime context").submit_op( - Read { - fd: fd.clone(), - buf, - }, - |read| { - // Get raw buffer info - let ptr = read.buf.stable_mut_ptr(); - let len = read.buf.bytes_total(); - opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) - .offset(offset as _) - .build() - }, - ) - }) - } +#[allow(missing_docs)] +pub struct ReadTransform { + _phantom: PhantomData, } -impl Completable for Read -where - T: BoundedBufMut, -{ +impl OneshotOutputTransform for ReadTransform { type Output = BufResult; + type StoredData = ReadData; - fn complete(self, cqe: CqeResult) -> Self::Output { - // Convert the operation result to `usize` - let res = cqe.result.map(|v| v as usize); - // Recover the buffer - let mut buf = self.buf; + fn transform_oneshot_output(self, data: Self::StoredData, cqe: Entry) -> Self::Output { + let res = if cqe.result() >= 0 { + Ok(cqe.result() as usize) + } else { + Err(io::Error::from_raw_os_error(-cqe.result())) + }; - // If the operation was successful, advance the initialized cursor. - if let Ok(n) = res { - // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { - buf.set_init(n); - } - } + (res, data.buf) + } +} + +impl UnsubmittedRead { + pub(crate) fn read_at(fd: &SharedFd, mut buf: T, offset: u64) -> Self { + use io_uring::{opcode, types}; - (res, buf) + // Get raw buffer info + let ptr = buf.stable_mut_ptr(); + let len = buf.bytes_total(); + + Self::new( + ReadData { + _fd: fd.clone(), + buf, + }, + ReadTransform { + _phantom: PhantomData, + }, + opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _) + .offset(offset as _) + .build(), + ) } } diff --git a/src/io/socket.rs b/src/io/socket.rs index dda1bb36..13f43ffa 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -1,5 +1,6 @@ use crate::io::write::UnsubmittedWrite; use crate::runtime::driver::op::Op; +use crate::UnsubmittedRead; use crate::{ buf::fixed::FixedBuf, buf::{BoundedBuf, BoundedBufMut, IoBuf, Slice}, @@ -168,9 +169,8 @@ impl Socket { op.await } - pub(crate) async fn read(&self, buf: T) -> crate::BufResult { - let op = Op::read_at(&self.fd, buf, 0).unwrap(); - op.await + pub(crate) fn read(&self, buf: T) -> UnsubmittedRead { + UnsubmittedOneshot::read_at(&self.fd, buf, 0) } pub(crate) async fn read_fixed(&self, buf: T) -> crate::BufResult diff --git a/src/lib.rs b/src/lib.rs index 819eebf7..cf35a608 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,9 @@ pub mod buf; pub mod fs; pub mod net; +pub use io::read::*; pub use io::write::*; + pub use runtime::driver::op::{InFlightOneshot, OneshotOutputTransform, UnsubmittedOneshot}; pub use runtime::spawn; pub use runtime::Runtime; diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 2450dcb9..054dd7c3 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -5,8 +5,7 @@ use std::{ }; use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut}, io::{SharedFd, Socket}, UnsubmittedWrite, }; @@ -75,7 +74,7 @@ impl TcpStream { /// /// Returns the original buffer and quantity of data read. pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await + self.inner.read(buf).submit().await } /// Read some data from the stream into a registered buffer. diff --git a/src/net/udp.rs b/src/net/udp.rs index cb0cef66..3df18fe5 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -317,7 +317,7 @@ impl UdpSocket { /// /// Returns the original buffer and quantity of data read. pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await + self.inner.read(buf).submit().await } /// Receives a single datagram message into a registered buffer. diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index 40e7ddc5..a2e141b7 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -76,7 +76,7 @@ impl UnixStream { /// Read some data from the stream into the buffer, returning the original buffer and /// quantity of data read. pub async fn read(&self, buf: T) -> crate::BufResult { - self.inner.read(buf).await + self.inner.read(buf).submit().await } /// Like [`read`], but using a pre-mapped buffer From 7eb37379ba7cd80fd48ecdcd3b1041485558d4d4 Mon Sep 17 00:00:00 2001 From: Sidong Yang Date: Fri, 27 Dec 2024 17:22:07 +0900 Subject: [PATCH 2/2] Introduce unsubmitted_read[_at]() for file, stream This patch introduce unsubmitted_read[_at]() that returns UnsubmittedRead. This function could be used for modifying operations before submit like setting flags or link operations. Signed-off-by: Sidong Yang --- src/fs/file.rs | 16 +++++++++++++++- src/net/tcp/stream.rs | 11 ++++++++++- src/net/udp.rs | 14 +++++++++++--- src/net/unix/stream.rs | 14 +++++++++++--- 4 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index 4a67889f..2c47010a 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -4,7 +4,7 @@ use crate::fs::OpenOptions; use crate::io::SharedFd; use crate::runtime::driver::op::Op; -use crate::{UnsubmittedOneshot, UnsubmittedWrite}; +use crate::{UnsubmittedOneshot, UnsubmittedRead, UnsubmittedWrite}; use std::fmt; use std::io; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -182,6 +182,20 @@ impl File { .await } + /// Read some bytes at the specified offset from the file into the specified + /// buffer, returning how many bytes were read. + /// + /// Like [`read`], but returns unsubmitted. + /// + /// Returns an UnsubmittedRead could be submitted. + pub async fn unsubmitted_read_at( + &self, + buf: T, + pos: u64, + ) -> UnsubmittedRead { + UnsubmittedOneshot::read_at(&self.fd, buf, pos) + } + /// Read some bytes at the specified offset from the file into the specified /// array of buffers, returning how many bytes were read. /// diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 054dd7c3..fe3fcbae 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -7,7 +7,7 @@ use std::{ use crate::{ buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut}, io::{SharedFd, Socket}, - UnsubmittedWrite, + UnsubmittedRead, UnsubmittedWrite, }; /// A TCP stream between a local and a remote socket. @@ -77,6 +77,15 @@ impl TcpStream { self.inner.read(buf).submit().await } + /// Read some data from the stream + /// + /// Like [`read`], but returns unsubmitted. + /// + /// Returns an UnsubmittedRead could be submitted. + pub fn unsubmitted_read(&self, buf: T) -> UnsubmittedRead { + self.inner.read(buf) + } + /// Read some data from the stream into a registered buffer. /// /// Like [`read`], but using a pre-mapped buffer diff --git a/src/net/udp.rs b/src/net/udp.rs index 3df18fe5..3f513f02 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,8 +1,7 @@ use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut}, io::{SharedFd, Socket}, - UnsubmittedWrite, + UnsubmittedRead, UnsubmittedWrite, }; use socket2::SockAddr; use std::{ @@ -320,6 +319,15 @@ impl UdpSocket { self.inner.read(buf).submit().await } + /// Read some data from the stream + /// + /// Like [`read`], but returns unsubmitted. + /// + /// Returns an UnsubmittedRead could be submitted. + pub fn unsubmitted_read(&self, buf: T) -> UnsubmittedRead { + self.inner.read(buf) + } + /// Receives a single datagram message into a registered buffer. /// /// Like [`read`], but using a pre-mapped buffer diff --git a/src/net/unix/stream.rs b/src/net/unix/stream.rs index a2e141b7..a82b5a99 100644 --- a/src/net/unix/stream.rs +++ b/src/net/unix/stream.rs @@ -1,8 +1,7 @@ use crate::{ - buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{fixed::FixedBuf, BoundedBuf, BoundedBufMut}, io::{SharedFd, Socket}, - UnsubmittedWrite, + UnsubmittedRead, UnsubmittedWrite, }; use socket2::SockAddr; use std::{ @@ -79,6 +78,15 @@ impl UnixStream { self.inner.read(buf).submit().await } + /// Read some data from the stream + /// + /// Like [`read`], but returns unsubmitted. + /// + /// Returns an UnsubmittedRead could be submitted. + pub fn unsubmitted_read(&self, buf: T) -> UnsubmittedRead { + self.inner.read(buf) + } + /// Like [`read`], but using a pre-mapped buffer /// registered with [`FixedBufRegistry`]. ///