Skip to content

Commit 8750caa

Browse files
authored
Fixed buffers to support ReadFixed and WriteFixed ops (#54)
Add infrastructure to manage pre-registered buffers, and operation methods `File::read_fixed_at` and `File::write_fixed_at` to make use of them. Exclusive access to buffer data between the application and in-flight ops is controlled at runtime. This is initial API to enable fixed buffers. Future developments may include: - ✅ Improved parameter polymorphism with ~#53~ #172; - An internal linked list of free buffers, to be able to check out the next available buffer from `FixedBufRegistry` without enumerating or keeping track of indices; - Support `IORING_REGISTER_BUFFERS2`/`IORING_REGISTER_BUFFERS_UPDATE`.
1 parent 0e1c987 commit 8750caa

File tree

17 files changed

+946
-1
lines changed

17 files changed

+946
-1
lines changed

src/buf/fixed/buffers.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use libc::{iovec, UIO_MAXIOV};
2+
use std::cmp;
3+
use std::mem;
4+
use std::ptr;
5+
use std::slice;
6+
7+
// Internal state shared by FixedBufRegistry and FixedBuf handles.
8+
pub(crate) struct FixedBuffers {
9+
// Pointer to an allocated array of iovec records referencing
10+
// the allocated buffers. The number of initialized records is the
11+
// same as the length of the states array.
12+
raw_bufs: ptr::NonNull<iovec>,
13+
// State information on the buffers. Indices in this array correspond to
14+
// the indices in the array at raw_bufs.
15+
states: Vec<BufState>,
16+
// Original capacity of raw_bufs as a Vec.
17+
orig_cap: usize,
18+
}
19+
20+
// State information of a buffer in the registry,
21+
enum BufState {
22+
// The buffer is not in use.
23+
// The field records the length of the initialized part.
24+
Free { init_len: usize },
25+
// The buffer is checked out.
26+
// Its data are logically owned by the FixedBuf handle,
27+
// which also keeps track of the length of the initialized part.
28+
CheckedOut,
29+
}
30+
31+
impl FixedBuffers {
32+
pub(crate) fn new(bufs: impl Iterator<Item = Vec<u8>>) -> Self {
33+
let bufs = bufs.take(cmp::min(UIO_MAXIOV as usize, 65_536));
34+
let (size_hint, _) = bufs.size_hint();
35+
let mut iovecs = Vec::with_capacity(size_hint);
36+
let mut states = Vec::with_capacity(size_hint);
37+
for mut buf in bufs {
38+
iovecs.push(iovec {
39+
iov_base: buf.as_mut_ptr() as *mut _,
40+
iov_len: buf.capacity(),
41+
});
42+
states.push(BufState::Free {
43+
init_len: buf.len(),
44+
});
45+
mem::forget(buf);
46+
}
47+
debug_assert_eq!(iovecs.len(), states.len());
48+
// Safety: Vec::as_mut_ptr never returns null
49+
let raw_bufs = unsafe { ptr::NonNull::new_unchecked(iovecs.as_mut_ptr()) };
50+
let orig_cap = iovecs.capacity();
51+
mem::forget(iovecs);
52+
FixedBuffers {
53+
raw_bufs,
54+
states,
55+
orig_cap,
56+
}
57+
}
58+
59+
// If the indexed buffer is free, changes its state to checked out and
60+
// returns its data. If the buffer is already checked out, returns None.
61+
pub(crate) fn check_out(&mut self, index: usize) -> Option<(iovec, usize)> {
62+
let iovecs_ptr = self.raw_bufs;
63+
self.states.get_mut(index).and_then(|state| match *state {
64+
BufState::Free { init_len } => {
65+
*state = BufState::CheckedOut;
66+
// Safety: the allocated array under the pointer is valid
67+
// for the lifetime of self, the index is inside the array
68+
// as checked by Vec::get_mut above, called on the array of
69+
// states that has the same length.
70+
let iovec = unsafe { iovecs_ptr.as_ptr().add(index).read() };
71+
Some((iovec, init_len))
72+
}
73+
BufState::CheckedOut => None,
74+
})
75+
}
76+
77+
// Sets the indexed buffer's state to free and records the updated length
78+
// of its initialized part. The buffer addressed must be in the checked out
79+
// state, otherwise this function may panic.
80+
pub(crate) fn check_in(&mut self, index: usize, init_len: usize) {
81+
let state = self.states.get_mut(index).expect("invalid buffer index");
82+
debug_assert!(
83+
matches!(state, BufState::CheckedOut),
84+
"the buffer must be checked out"
85+
);
86+
*state = BufState::Free { init_len };
87+
}
88+
89+
pub(crate) fn iovecs(&self) -> &[iovec] {
90+
// Safety: the raw_bufs pointer is valid for the lifetime of self,
91+
// the slice length is valid by construction.
92+
unsafe { slice::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len()) }
93+
}
94+
}
95+
96+
impl Drop for FixedBuffers {
97+
fn drop(&mut self) {
98+
let iovecs = unsafe {
99+
Vec::from_raw_parts(self.raw_bufs.as_ptr(), self.states.len(), self.orig_cap)
100+
};
101+
for (i, iovec) in iovecs.iter().enumerate() {
102+
match self.states[i] {
103+
BufState::Free { init_len } => {
104+
let ptr = iovec.iov_base as *mut u8;
105+
let cap = iovec.iov_len;
106+
let v = unsafe { Vec::from_raw_parts(ptr, init_len, cap) };
107+
mem::drop(v);
108+
}
109+
BufState::CheckedOut => unreachable!("all buffers must be checked in"),
110+
}
111+
}
112+
}
113+
}

src/buf/fixed/handle.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use super::FixedBuffers;
2+
use crate::buf::{IoBuf, IoBufMut};
3+
4+
use libc::iovec;
5+
use std::cell::RefCell;
6+
use std::fmt::{self, Debug};
7+
use std::mem::ManuallyDrop;
8+
use std::ops::{Deref, DerefMut};
9+
use std::rc::Rc;
10+
11+
/// A unique handle to a memory buffer that can be pre-registered with
12+
/// the kernel for `io-uring` operations.
13+
///
14+
/// `FixedBuf` handles can be obtained from a [`FixedBufRegistry`] collection.
15+
/// For each buffer, only a single `FixedBuf` handle can be either used by the
16+
/// application code or owned by an I/O operation at any given time,
17+
/// thus avoiding data races between `io-uring` operations in flight and
18+
/// the application accessing buffer data.
19+
///
20+
/// [`FixedBufRegistry`]: super::FixedBufRegistry
21+
///
22+
pub struct FixedBuf {
23+
registry: Rc<RefCell<FixedBuffers>>,
24+
buf: ManuallyDrop<Vec<u8>>,
25+
index: u16,
26+
}
27+
28+
impl Drop for FixedBuf {
29+
fn drop(&mut self) {
30+
let mut registry = self.registry.borrow_mut();
31+
debug_assert_eq!(
32+
registry.iovecs()[self.index as usize].iov_base as *const u8,
33+
self.buf.as_ptr()
34+
);
35+
debug_assert_eq!(
36+
registry.iovecs()[self.index as usize].iov_len,
37+
self.buf.capacity()
38+
);
39+
registry.check_in(self.index as usize, self.buf.len());
40+
}
41+
}
42+
43+
impl FixedBuf {
44+
pub(super) unsafe fn new(
45+
registry: Rc<RefCell<FixedBuffers>>,
46+
iovec: iovec,
47+
init_len: usize,
48+
index: u16,
49+
) -> Self {
50+
let buf = Vec::from_raw_parts(iovec.iov_base as _, init_len, iovec.iov_len);
51+
FixedBuf {
52+
registry,
53+
buf: ManuallyDrop::new(buf),
54+
index,
55+
}
56+
}
57+
58+
pub(crate) fn buf_index(&self) -> u16 {
59+
self.index
60+
}
61+
}
62+
63+
unsafe impl IoBuf for FixedBuf {
64+
fn stable_ptr(&self) -> *const u8 {
65+
self.buf.as_ptr()
66+
}
67+
68+
fn bytes_init(&self) -> usize {
69+
self.buf.len()
70+
}
71+
72+
fn bytes_total(&self) -> usize {
73+
self.buf.capacity()
74+
}
75+
}
76+
77+
unsafe impl IoBufMut for FixedBuf {
78+
fn stable_mut_ptr(&mut self) -> *mut u8 {
79+
self.buf.as_mut_ptr()
80+
}
81+
82+
unsafe fn set_init(&mut self, pos: usize) {
83+
if self.buf.len() < pos {
84+
self.buf.set_len(pos)
85+
}
86+
}
87+
}
88+
89+
impl Deref for FixedBuf {
90+
type Target = [u8];
91+
92+
fn deref(&self) -> &[u8] {
93+
&self.buf
94+
}
95+
}
96+
97+
impl DerefMut for FixedBuf {
98+
fn deref_mut(&mut self) -> &mut [u8] {
99+
&mut self.buf
100+
}
101+
}
102+
103+
impl Debug for FixedBuf {
104+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105+
f.debug_struct("FixedBuf")
106+
.field("buf", &*self.buf) // deref ManuallyDrop
107+
.field("index", &self.index)
108+
.finish_non_exhaustive()
109+
}
110+
}

src/buf/fixed/mod.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
//! Buffers pre-registered with the kernel.
2+
//!
3+
//! This module provides facilities for registering in-memory buffers with
4+
//! the `tokio-uring` runtime. Operations like [`File::read_fixed_at`][rfa] and
5+
//! [`File::write_fixed_at`][wfa] make use of buffers pre-mapped by
6+
//! the kernel to reduce per-I/O overhead.
7+
//! The [`FixedBufRegistry::register`] method is used to register a collection of
8+
//! buffers with the kernel; it must be called before any of the [`FixedBuf`]
9+
//! handles to the collection's buffers can be used with I/O operations.
10+
//!
11+
//! [rfa]: crate::fs::File::read_fixed_at
12+
//! [wfa]: crate::fs::File::write_fixed_at
13+
14+
mod buffers;
15+
pub(crate) use self::buffers::FixedBuffers;
16+
17+
mod handle;
18+
pub use handle::FixedBuf;
19+
20+
mod registry;
21+
pub use registry::FixedBufRegistry;

src/buf/fixed/registry.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
use super::{buffers::FixedBuffers, FixedBuf};
2+
3+
use std::cell::RefCell;
4+
use std::io;
5+
use std::rc::Rc;
6+
7+
/// An indexed collection of I/O buffers pre-registered with the kernel.
8+
///
9+
/// `FixedBufRegistry` allows the application to manage a collection of buffers
10+
/// allocated in memory, that can be registered in the current `tokio-uring`
11+
/// context using the [`register`] method.
12+
///
13+
/// A `FixedBufRegistry` value is a lightweight handle for a collection of
14+
/// allocated buffers. Cloning of a `FixedBufRegistry` creates a new reference to
15+
/// the same collection of buffers.
16+
///
17+
/// The buffers of the collection are not deallocated until:
18+
/// - all `FixedBufRegistry` references to the collection have been dropped;
19+
/// - all [`FixedBuf`] handles to individual buffers in the collection have
20+
/// been dropped, including the buffer handles owned by any I/O operations
21+
/// in flight;
22+
/// - The `tokio-uring` [`Runtime`] the buffers are registered with
23+
/// has been dropped.
24+
///
25+
/// [`register`]: Self::register
26+
/// [`Runtime`]: crate::Runtime
27+
#[derive(Clone)]
28+
pub struct FixedBufRegistry {
29+
inner: Rc<RefCell<FixedBuffers>>,
30+
}
31+
32+
impl FixedBufRegistry {
33+
/// Creates a new collection of buffers from the provided allocated vectors.
34+
///
35+
/// The buffers are assigned 0-based indices in the order of the iterable
36+
/// input parameter. The returned collection takes up to [`UIO_MAXIOV`]
37+
/// buffers from the input. Any items in excess of that amount are silently
38+
/// dropped, unless the input iterator produces the vectors lazily.
39+
///
40+
/// [`UIO_MAXIOV`]: libc::UIO_MAXIOV
41+
///
42+
/// # Examples
43+
///
44+
/// ```
45+
/// use tokio_uring::buf::fixed::FixedBufRegistry;
46+
/// use std::iter;
47+
///
48+
/// let registry = FixedBufRegistry::new(iter::repeat(vec![0; 4096]).take(10));
49+
/// ```
50+
pub fn new(bufs: impl IntoIterator<Item = Vec<u8>>) -> Self {
51+
FixedBufRegistry {
52+
inner: Rc::new(RefCell::new(FixedBuffers::new(bufs.into_iter()))),
53+
}
54+
}
55+
56+
/// Registers the buffers with the kernel.
57+
///
58+
/// This method must be called in the context of a `tokio-uring` runtime.
59+
/// The registration persists for the lifetime of the runtime, unless
60+
/// revoked by the [`unregister`] method. Dropping the
61+
/// `FixedBufRegistry` instance this method has been called on does not revoke
62+
/// the registration or deallocate the buffers.
63+
///
64+
/// [`unregister`]: Self::unregister
65+
///
66+
/// This call can be blocked in the kernel to complete any operations
67+
/// in-flight on the same `io-uring` instance. The application is
68+
/// recommended to register buffers before starting any I/O operations.
69+
///
70+
/// # Errors
71+
///
72+
/// If a collection of buffers is currently registered in the context
73+
/// of the `tokio-uring` runtime this call is made in, the function returns
74+
/// an error.
75+
pub fn register(&self) -> io::Result<()> {
76+
crate::io::register_buffers(&self.inner)
77+
}
78+
79+
/// Unregisters this collection of buffers.
80+
///
81+
/// This method must be called in the context of a `tokio-uring` runtime,
82+
/// where the buffers should have been previously registered.
83+
///
84+
/// This operation invalidates any `FixedBuf` handles checked out from
85+
/// this registry instance. Continued use of such handles in I/O
86+
/// operations may result in an error.
87+
///
88+
/// # Errors
89+
///
90+
/// If another collection of buffers is currently registered in the context
91+
/// of the `tokio-uring` runtime this call is made in, the function returns
92+
/// an error. Calling `unregister` when no `FixedBufRegistry` is currently
93+
/// registered on this runtime also returns an error.
94+
pub fn unregister(&self) -> io::Result<()> {
95+
crate::io::unregister_buffers(&self.inner)
96+
}
97+
98+
/// Returns a buffer identified by the specified index for use by the
99+
/// application, unless the buffer is already in use.
100+
///
101+
/// The buffer is released to be available again once the
102+
/// returned `FixedBuf` handle has been dropped. An I/O operation
103+
/// using the buffer takes ownership of it and returns it once completed,
104+
/// preventing shared use of the buffer while the operation is in flight.
105+
pub fn check_out(&self, index: usize) -> Option<FixedBuf> {
106+
let mut inner = self.inner.borrow_mut();
107+
inner.check_out(index).map(|(iovec, init_len)| {
108+
debug_assert!(index <= u16::MAX as usize);
109+
// Safety: the validity of iovec and init_len is ensured by
110+
// FixedBuffers::check_out
111+
unsafe { FixedBuf::new(Rc::clone(&self.inner), iovec, init_len, index as u16) }
112+
})
113+
}
114+
}

src/buf/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
//! crate defines [`IoBuf`] and [`IoBufMut`] traits which are implemented by buffer
55
//! types that respect the `io-uring` contract.
66
7+
pub mod fixed;
8+
79
mod io_buf;
810
pub use io_buf::IoBuf;
911

0 commit comments

Comments
 (0)