diff --git a/Cargo.toml b/Cargo.toml index 85ce27dd..d6768e9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ keywords = ["async", "fs", "io-uring"] tokio = { version = "1.2", features = ["net", "rt", "sync"] } slab = "0.4.2" libc = "0.2.80" -io-uring = "0.6.0" -socket2 = { version = "0.4.4", features = ["all"] } +io-uring = "0.7" +socket2 = { version = "0.5", features = ["all"] } bytes = { version = "1.0", optional = true } futures-util = { version = "0.3.26", default-features = false, features = ["std"] } @@ -29,10 +29,10 @@ futures-util = { version = "0.3.26", default-features = false, features = ["std" tempfile = "3.2.0" tokio-test = "0.4.2" iai = "0.1.1" -criterion = "0.4.0" +criterion = "0.5" # we use joinset in our tests tokio = "1.21.2" -nix = "0.26.1" +nix = "0.30" [package.metadata.docs.rs] all-features = true diff --git a/benches/lai/no_op.rs b/benches/lai/no_op.rs index 9a4b4b3e..0e203b0f 100644 --- a/benches/lai/no_op.rs +++ b/benches/lai/no_op.rs @@ -4,6 +4,7 @@ use tokio::task::JoinSet; #[derive(Clone)] struct Options { iterations: usize, + #[allow(dead_code)] concurrency: usize, sq_size: usize, cq_size: usize, @@ -60,20 +61,26 @@ fn no_op_x1() -> Result<(), Box> { } fn no_op_x32() -> Result<(), Box> { - let mut opts = Options::default(); - opts.concurrency = 32; + let opts = Options { + concurrency: 32, + ..Options::default() + }; run_no_ops(black_box(opts)) } fn no_op_x64() -> Result<(), Box> { - let mut opts = Options::default(); - opts.concurrency = 64; + let opts = Options { + concurrency: 64, + ..Options::default() + }; run_no_ops(black_box(opts)) } fn no_op_x256() -> Result<(), Box> { - let mut opts = Options::default(); - opts.concurrency = 256; + let opts = Options { + concurrency: 256, + ..Options::default() + }; run_no_ops(black_box(opts)) } diff --git a/examples/tcp_listener.rs b/examples/tcp_listener.rs index 3121f406..79d414b5 100644 --- a/examples/tcp_listener.rs +++ b/examples/tcp_listener.rs @@ -24,7 +24,7 @@ fn main() { use tokio_uring::buf::BoundedBuf; // for slice() - println!("{} connected", socket_addr); + println!("{socket_addr} connected"); let mut n = 0; let mut buf = vec![0u8; 4096]; @@ -33,14 +33,14 @@ fn main() { buf = nbuf; let read = result.unwrap(); if read == 0 { - println!("{} closed, {} total ping-ponged", socket_addr, n); + println!("{socket_addr} closed, {n} total ping-ponged"); break; } let (res, slice) = stream.write_all(buf.slice(..read)).await; res.unwrap(); buf = slice.into_inner(); - println!("{} all {} bytes ping-ponged", socket_addr, read); + println!("{socket_addr} all {read} bytes ping-ponged"); n += read; } }); diff --git a/examples/tcp_listener_fixed_buffers.rs b/examples/tcp_listener_fixed_buffers.rs index 30faaeae..d66f5b95 100644 --- a/examples/tcp_listener_fixed_buffers.rs +++ b/examples/tcp_listener_fixed_buffers.rs @@ -34,7 +34,7 @@ async fn accept_loop(listen_addr: SocketAddr) { ); // Other iterators may be passed to FixedBufRegistry::new also. - let registry = FixedBufRegistry::new(iter::repeat(vec![0; 4096]).take(POOL_SIZE)); + let registry = FixedBufRegistry::new(iter::repeat_n(vec![0; 4096], POOL_SIZE)); // Register the buffers with the kernel, asserting the syscall passed. @@ -55,7 +55,7 @@ async fn echo_handler( peer: SocketAddr, registry: FixedBufRegistry, ) { - println!("peer {} connected", peer); + println!("peer {peer} connected"); // Get one of the two fixed buffers. // If neither is unavailable, print reason and return immediately, dropping this connection; @@ -68,7 +68,7 @@ async fn echo_handler( }; if fbuf.is_none() { let _ = stream.shutdown(std::net::Shutdown::Write); - println!("peer {} closed, no fixed buffers available", peer); + println!("peer {peer} closed, no fixed buffers available"); return; }; @@ -90,7 +90,7 @@ async fn echo_handler( let (res, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await; res.unwrap(); - println!("peer {} all {} bytes ping-ponged", peer, read); + println!("peer {peer} all {read} bytes ping-ponged"); n += read; // Important. One of the points of this example. @@ -98,5 +98,5 @@ async fn echo_handler( }; } let _ = stream.shutdown(std::net::Shutdown::Write); - println!("peer {} closed, {} total ping-ponged", peer, n); + println!("peer {peer} closed, {n} total ping-ponged"); } diff --git a/examples/test_create_dir_all.rs b/examples/test_create_dir_all.rs index 3404de8e..a5a14857 100644 --- a/examples/test_create_dir_all.rs +++ b/examples/test_create_dir_all.rs @@ -163,10 +163,9 @@ async fn main1() -> io::Result<()> { if unexpected == 0 { Ok(()) } else { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("{unexpected} unexpected result(s)"), - )) + Err(std::io::Error::other(format!( + "{unexpected} unexpected result(s)" + ))) } } @@ -208,10 +207,9 @@ async fn matches_mode>(path: P, want_mode: u16) -> io::Result<()> if want_mode == got_mode { Ok(()) } else { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - format!("want mode {want_mode:#o}, got mode {got_mode:#o}"), - )) + Err(std::io::Error::other(format!( + "want mode {want_mode:#o}, got mode {got_mode:#o}" + ))) } } @@ -231,10 +229,7 @@ async fn is_regfile>(path: P) -> io::Result<()> { if is_regfile { Ok(()) } else { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "not regular file", - )) + Err(std::io::Error::other("not regular file")) } } @@ -244,17 +239,14 @@ async fn is_dir>(path: P) -> io::Result<()> { if is_dir { Ok(()) } else { - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "not directory", - )) + Err(std::io::Error::other("not directory")) } } fn main() { tokio_uring::start(async { if let Err(e) = main1().await { - println!("error: {}", e); + println!("error: {e}"); } }); } diff --git a/examples/wrk-bench.rs b/examples/wrk-bench.rs index 3b4905fc..8555c80f 100644 --- a/examples/wrk-bench.rs +++ b/examples/wrk-bench.rs @@ -24,7 +24,7 @@ fn main() -> io::Result<()> { let (result, _) = stream.write(RESPONSE).submit().await; if let Err(err) = result { - eprintln!("Client connection failed: {}", err); + eprintln!("Client connection failed: {err}"); } }); } diff --git a/src/fs/create_dir_all.rs b/src/fs/create_dir_all.rs index 413ef83d..8aac3c2a 100644 --- a/src/fs/create_dir_all.rs +++ b/src/fs/create_dir_all.rs @@ -124,7 +124,7 @@ impl DirBuilder { // recursion when only the first level of the directory needs to be built. For now, this serves // its purpose. - fn recurse_create_dir_all<'a>(&'a self, path: &'a Path) -> LocalBoxFuture> { + fn recurse_create_dir_all<'a>(&'a self, path: &'a Path) -> LocalBoxFuture<'a, io::Result<()>> { Box::pin(async move { if path == Path::new("") { return Ok(()); @@ -139,10 +139,7 @@ impl DirBuilder { match path.parent() { Some(p) => self.recurse_create_dir_all(p).await?, None => { - return Err(std::io::Error::new( - std::io::ErrorKind::Other, - "failed to create whole tree", - )); + return Err(std::io::Error::other("failed to create whole tree")); /* TODO build own allocation free error some day like the std library does. return Err(io::const_io_error!( io::ErrorKind::Uncategorized, diff --git a/src/io/accept.rs b/src/io/accept.rs index d4e19d58..bdbc673a 100644 --- a/src/io/accept.rs +++ b/src/io/accept.rs @@ -46,7 +46,7 @@ impl Completable for Accept { let fd = SharedFd::new(fd as i32); let socket = Socket { fd }; let (_, addr) = unsafe { - socket2::SockAddr::init(move |addr_storage, len| { + socket2::SockAddr::try_init(move |addr_storage, len| { self.socketaddr.0.clone_into(&mut *addr_storage); *len = self.socketaddr.1; Ok(()) diff --git a/src/io/noop.rs b/src/io/noop.rs index 8ad28e7e..95d7664b 100644 --- a/src/io/noop.rs +++ b/src/io/noop.rs @@ -32,7 +32,7 @@ mod test { use crate as tokio_uring; #[test] - fn perform_no_op() -> () { + fn perform_no_op() { tokio_uring::start(async { tokio_uring::no_op().await.unwrap(); }) diff --git a/src/io/recv_from.rs b/src/io/recv_from.rs index e9b360ca..e20ff2bf 100644 --- a/src/io/recv_from.rs +++ b/src/io/recv_from.rs @@ -24,7 +24,7 @@ impl Op> { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) })]; - let socket_addr = Box::new(unsafe { SockAddr::init(|_, _| Ok(()))?.1 }); + let socket_addr = Box::new(unsafe { SockAddr::try_init(|_, _| Ok(()))?.1 }); let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); msghdr.msg_iov = io_slices.as_mut_ptr().cast(); diff --git a/src/io/recvmsg.rs b/src/io/recvmsg.rs index 3cae2e50..db2c6e7e 100644 --- a/src/io/recvmsg.rs +++ b/src/io/recvmsg.rs @@ -28,7 +28,7 @@ impl Op> { })); } - let socket_addr = Box::new(unsafe { SockAddr::init(|_, _| Ok(()))?.1 }); + let socket_addr = Box::new(unsafe { SockAddr::try_init(|_, _| Ok(()))?.1 }); let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); msghdr.msg_iov = io_slices.as_mut_ptr().cast(); diff --git a/src/io/socket.rs b/src/io/socket.rs index dda1bb36..cea3c148 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -9,7 +9,7 @@ use crate::{ use std::{ io, net::SocketAddr, - os::unix::io::{AsRawFd, IntoRawFd, RawFd}, + os::unix::io::{AsFd, AsRawFd, BorrowedFd, IntoRawFd, RawFd}, path::Path, }; @@ -285,3 +285,9 @@ impl AsRawFd for Socket { self.fd.raw_fd() } } + +impl AsFd for Socket { + fn as_fd(&self) -> BorrowedFd<'_> { + unsafe { BorrowedFd::borrow_raw(self.fd.raw_fd()) } + } +} diff --git a/src/lib.rs b/src/lib.rs index 819eebf7..aca43dd4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,7 +57,7 @@ //! call `close()`. #![warn(missing_docs)] -#![allow(clippy::thread_local_initializer_can_be_made_const)] +#![allow(clippy::missing_const_for_thread_local)] macro_rules! syscall { ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 2435c61b..cd9ddf44 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -129,9 +129,8 @@ impl TcpListener { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { let (socket, socket_addr) = self.inner.accept().await?; let stream = TcpStream { inner: socket }; - let socket_addr = socket_addr.ok_or_else(|| { - io::Error::new(io::ErrorKind::Other, "Could not get socket IP address") - })?; + let socket_addr = + socket_addr.ok_or_else(|| io::Error::other("Could not get socket IP address"))?; Ok((stream, socket_addr)) } } diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index f57605d6..352b8710 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -118,8 +118,7 @@ impl Driver { return Ok(()); } } - Err(io::Error::new( - io::ErrorKind::Other, + Err(io::Error::other( "fixed buffers are not currently registered", )) } diff --git a/src/runtime/driver/op/slab_list.rs b/src/runtime/driver/op/slab_list.rs index ca5169bf..ae48267c 100644 --- a/src/runtime/driver/op/slab_list.rs +++ b/src/runtime/driver/op/slab_list.rs @@ -111,7 +111,7 @@ impl<'a, T> SlabList<'a, T> { } } -impl<'a, T> Drop for SlabList<'a, T> { +impl Drop for SlabList<'_, T> { fn drop(&mut self) { while !self.is_empty() { let removed = self.slab.remove(self.index.start); @@ -120,7 +120,7 @@ impl<'a, T> Drop for SlabList<'a, T> { } } -impl<'a, T> Iterator for SlabList<'a, T> { +impl Iterator for SlabList<'_, T> { type Item = T; fn next(&mut self) -> Option { diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 369c060b..c4df2d0f 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -177,13 +177,13 @@ mod test { #[test] fn block_on() { let rt = Runtime::new(&builder()).unwrap(); - rt.block_on(async move { () }); + rt.block_on(async move {}); } #[test] fn block_on_twice() { let rt = Runtime::new(&builder()).unwrap(); - rt.block_on(async move { () }); - rt.block_on(async move { () }); + rt.block_on(async move {}); + rt.block_on(async move {}); } } diff --git a/tests/fixed_buf.rs b/tests/fixed_buf.rs index a5442217..55a5218d 100644 --- a/tests/fixed_buf.rs +++ b/tests/fixed_buf.rs @@ -183,7 +183,7 @@ fn pool_next_as_concurrency_limit() { mem::drop(file); let mut content = String::new(); tempfile.read_to_string(&mut content).unwrap(); - println!("{}", content); + println!("{content}"); }) } diff --git a/tests/fs_file.rs b/tests/fs_file.rs index 096565e6..861979d5 100644 --- a/tests/fs_file.rs +++ b/tests/fs_file.rs @@ -1,7 +1,4 @@ -use std::{ - io::prelude::*, - os::unix::io::{AsRawFd, FromRawFd, RawFd}, -}; +use std::io::prelude::*; use tempfile::NamedTempFile; @@ -129,26 +126,11 @@ fn cancel_read() { }); } -#[test] -fn explicit_close() { - let mut tempfile = tempfile(); - tempfile.write_all(HELLO).unwrap(); - - tokio_uring::start(async { - let file = File::open(tempfile.path()).await.unwrap(); - let fd = file.as_raw_fd(); - - file.close().await.unwrap(); - - assert_invalid_fd(fd); - }) -} - #[test] fn drop_open() { tokio_uring::start(async { let tempfile = tempfile(); - let _ = File::create(tempfile.path()); + _ = File::create(tempfile.path()); // Do something else let file = File::create(tempfile.path()).await.unwrap(); @@ -160,19 +142,6 @@ fn drop_open() { }); } -#[test] -fn drop_off_runtime() { - let file = tokio_uring::start(async { - let tempfile = tempfile(); - File::open(tempfile.path()).await.unwrap() - }); - - let fd = file.as_raw_fd(); - drop(file); - - assert_invalid_fd(fd); -} - #[test] fn sync_doesnt_kill_anything() { let tempfile = tempfile(); @@ -319,7 +288,6 @@ fn tempfile() -> NamedTempFile { async fn poll_once(future: impl std::future::Future) { use std::future::poll_fn; - // use std::future::Future; use std::task::Poll; use tokio::pin; @@ -331,15 +299,3 @@ async fn poll_once(future: impl std::future::Future) { }) .await; } - -fn assert_invalid_fd(fd: RawFd) { - use std::fs::File; - - let mut f = unsafe { File::from_raw_fd(fd) }; - let mut buf = vec![]; - - match f.read_to_end(&mut buf) { - Err(ref e) if e.raw_os_error() == Some(libc::EBADF) => {} - res => panic!("assert_invalid_fd finds for fd {:?}, res = {:?}", fd, res), - } -}