Skip to content

op options: fixed file table slots #256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/fs/open_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub struct OpenOptions {
truncate: bool,
create: bool,
create_new: bool,
pub(crate) fixed_file_auto_select: bool,
pub(crate) mode: libc::mode_t,
pub(crate) custom_flags: libc::c_int,
}
Expand Down Expand Up @@ -95,6 +96,7 @@ impl OpenOptions {
truncate: false,
create: false,
create_new: false,
fixed_file_auto_select: false,
mode: 0o666,
custom_flags: 0,
}
Expand Down Expand Up @@ -217,6 +219,40 @@ impl OpenOptions {
self
}

/// Sets the option for using the io_uring fixed file table to manage
/// the file descriptor, rather than the kernel's process file descriptor table.
///
/// The regular file descriptor, often referred to as the raw fd,
/// will not be available.
///
/// # Examples
///
/// ```no_run
/// use tokio_uring::fs::OpenOptions;
///
/// fn main() -> Result<(), Box<dyn std::error::Error>> {
/// tokio_uring::start(async {
/// let file = OpenOptions::new()
/// .write(true)
/// .truncate(true)
/// .fixed_file_auto_select(true)
/// .open("foo.txt")
/// .await?;
///
/// // Write to file. And then close.
///
/// // Close, returning close result.
/// file.close().await?;
/// Ok(())
/// })
///
/// }
/// ```
pub fn fixed_file_auto_select(&mut self, fixed_file_auto_select: bool) -> &mut OpenOptions {
self.fixed_file_auto_select = fixed_file_auto_select;
self
}

/// Sets the option to create a new file, or open it if it already exists.
///
/// In order for the file to be created, [`OpenOptions::write`] or
Expand Down
31 changes: 20 additions & 11 deletions src/io/close.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
use crate::io::shared_fd::CommonFd;
use crate::runtime::driver::op;
use crate::runtime::driver::op::{Completable, Op};
use crate::runtime::CONTEXT;
use std::io;
use std::os::unix::io::RawFd;

pub(crate) struct Close {
fd: RawFd,
}
pub(crate) struct Close {}

impl Op<Close> {
pub(crate) fn close(fd: RawFd) -> io::Result<Op<Close>> {
pub(crate) fn close(fd: CommonFd) -> io::Result<Op<Close>> {
use io_uring::{opcode, types};

CONTEXT.with(|x| {
x.handle()
.expect("Not in a runtime context")
.submit_op(Close { fd }, |close| {
opcode::Close::new(types::Fd(close.fd)).build()
match fd {
CommonFd::Raw(raw) => {
let fd = types::Fd(raw);
CONTEXT.with(|x| {
x.handle()
.expect("Not in a runtime context")
.submit_op(Close {}, |_close| opcode::Close::new(fd).build())
})
}
CommonFd::Fixed(fixed) => {
let fd = types::Fixed(fixed);
CONTEXT.with(|x| {
x.handle()
.expect("Not in a runtime context")
.submit_op(Close {}, |_close| opcode::Close::new(fd).build())
})
})
}
}
}
}

Expand Down
35 changes: 28 additions & 7 deletions src/io/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::ffi::CString;
use std::io;
use std::os::unix::io::RawFd;
use std::path::Path;

/// Open a file
#[allow(dead_code)]
pub(crate) struct Open {
pub(crate) path: CString,
pub(crate) flags: libc::c_int,
path: CString,
flags: libc::c_int,
fixed_table_auto_select: bool,
}

impl Op<Open> {
Expand All @@ -24,10 +26,20 @@ impl Op<Open> {
| options.creation_mode()?
| (options.custom_flags & !libc::O_ACCMODE);

let (file_index, fixed_table_auto_select) = if options.fixed_file_auto_select {
(Some(types::DestinationSlot::auto_target()), true)
} else {
(None, false)
};

CONTEXT.with(|x| {
x.handle()
.expect("Not in a runtime context")
.submit_op(Open { path, flags }, |open| {
x.handle().expect("Not in a runtime context").submit_op(
Open {
path,
flags,
fixed_table_auto_select,
},
|open| {
// Get a reference to the memory. The string will be held by the
// operation state and will not be accessed again until the operation
// completes.
Expand All @@ -36,8 +48,10 @@ impl Op<Open> {
opcode::OpenAt::new(types::Fd(libc::AT_FDCWD), p_ref)
.flags(flags)
.mode(options.mode)
.file_index(file_index)
.build()
})
},
)
})
}
}
Expand All @@ -46,6 +60,13 @@ impl Completable for Open {
type Output = io::Result<File>;

fn complete(self, cqe: CqeResult) -> Self::Output {
Ok(File::from_shared_fd(SharedFd::new(cqe.result? as _)))
let result = cqe.result?;
let shared_fd = if self.fixed_table_auto_select {
SharedFd::new_fixed(result)
} else {
SharedFd::new(result as RawFd)
};

Ok(File::from_shared_fd(shared_fd))
}
}
18 changes: 15 additions & 3 deletions src/io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::buf::BoundedBufMut;
use crate::io::SharedFd;
use crate::BufResult;

use crate::io::shared_fd::CommonFd;
use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
use std::io;
Expand Down Expand Up @@ -30,9 +31,20 @@ impl<T: BoundedBufMut> Op<Read<T>> {
// Get raw buffer info
let ptr = read.buf.stable_mut_ptr();
let len = read.buf.bytes_total();
opcode::Read::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build()
match read.fd.common_fd() {
CommonFd::Raw(raw) => {
let fd = types::Fd(raw);
opcode::Read::new(fd, ptr, len as _)
.offset(offset as _)
.build()
}
CommonFd::Fixed(fixed) => {
let fd = types::Fixed(fixed);
opcode::Read::new(fd, ptr, len as _)
.offset(offset as _)
.build()
}
}
},
)
})
Expand Down
81 changes: 76 additions & 5 deletions src/io/shared_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
};

use crate::runtime::driver::op::Op;
use crate::runtime::CONTEXT;

// Tracks in-flight operations on a file descriptor. Ensures all in-flight
// operations complete before submitting the close.
Expand All @@ -23,9 +24,15 @@ pub(crate) struct SharedFd {
inner: Rc<Inner>,
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum CommonFd {
Raw(RawFd),
Fixed(u32),
}

struct Inner {
// Open file descriptor
fd: RawFd,
fd: CommonFd,

// Track the sharing state of the file descriptor:
// normal, being waited on to allow a close by the parent's owner, or already closed.
Expand All @@ -45,6 +52,14 @@ enum State {

impl SharedFd {
pub(crate) fn new(fd: RawFd) -> SharedFd {
Self::_new(CommonFd::Raw(fd))
}

pub(crate) fn new_fixed(slot: u32) -> SharedFd {
Self::_new(CommonFd::Fixed(slot))
}

fn _new(fd: CommonFd) -> SharedFd {
SharedFd {
inner: Rc::new(Inner {
fd,
Expand All @@ -53,8 +68,26 @@ impl SharedFd {
}
}

/// Returns the RawFd
/*
* This function name won't make sense when this fixed file feature
* is fully fleshed out. For now, we panic if called on
* a fixed file.
*/
/// Returns the RawFd.
pub(crate) fn raw_fd(&self) -> RawFd {
match self.inner.fd {
CommonFd::Raw(raw) => raw,
CommonFd::Fixed(_fixed) => {
// TODO remove this function completely once all the uring opcodes that accept
// a fixed file table slot have been modified. For now, we have to keep it to avoid
// too many file changes all at once.
unreachable!("fixed file support not yet added for this call stack");
}
}
}

// Returns the common fd, either a RawFd or the fixed fd slot number.
pub(crate) fn common_fd(&self) -> CommonFd {
self.inner.fd
}

Expand Down Expand Up @@ -147,14 +180,52 @@ impl Drop for SharedFd {

impl Drop for Inner {
fn drop(&mut self) {
// If the inner state isn't `Closed`, the user hasn't called close().await
// so do it synchronously.
// If the inner state isn't `Closed`, the user hasn't called close().await so close it now.
// At least for the case of a regular file descriptor we can do it synchronously. For the
// case of a fixed file table descriptor, we may already be out of the driver's context,
// but if we aren't we resort to the io_uring close operation - and spawn a task to do it.

let state = self.state.borrow_mut();

if let State::Closed = *state {
return;
}
let _ = unsafe { std::fs::File::from_raw_fd(self.fd) };

// Perform one form of close or the other.
match self.fd {
CommonFd::Raw(raw) => {
let _ = unsafe { std::fs::File::from_raw_fd(raw) };
}

CommonFd::Fixed(fixed) => {
// As there is no synchronous close for a fixed file table slot, we have to resort
// to the async close provided by the io_uring device. If we knew the fixed file
// table had been unregistered, this wouldn't be necessary either.

match CONTEXT.try_with(|cx| cx.is_set()) {
Ok(true) => {}
// If the driver is gone, nothing to do. The fixed table has already been taken
// down by the device anyway.
_ => return,
}

// TODO Investigate the idea from the liburing team of replacing the one slot with
// a -1 by using the register/files_update synchronous command. If the current
// scheme that uses a spawn is initiallly acceptable, probably leave it like this
// for now and wait to be able to benchmark once we have streaming tcp sockets.

crate::spawn(async move {
if let Ok(true) = CONTEXT.try_with(|cx| cx.is_set()) {
let fd = CommonFd::Fixed(fixed);
if let Ok(op) = Op::close(fd) {
let _ = op.await;
}
// Else, should warn or panic if the Op::Close can't be built? It would
// mean the fixed value was out of reach which would not be expected at
// this point.
}
});
}
}
}
}
26 changes: 20 additions & 6 deletions src/io/write.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot};
use crate::{
buf::BoundedBuf,
io::{shared_fd::CommonFd, SharedFd},
BufResult, OneshotOutputTransform, UnsubmittedOneshot,
};
use io_uring::cqueue::Entry;
use std::io;
use std::marker::PhantomData;
use std::{io, marker::PhantomData};

/// An unsubmitted write operation.
pub type UnsubmittedWrite<T> = UnsubmittedOneshot<WriteData<T>, WriteTransform<T>>;
Expand Down Expand Up @@ -51,9 +54,20 @@ impl<T: BoundedBuf> UnsubmittedWrite<T> {
WriteTransform {
_phantom: PhantomData::default(),
},
opcode::Write::new(types::Fd(fd.raw_fd()), ptr, len as _)
.offset(offset as _)
.build(),
match fd.common_fd() {
CommonFd::Raw(raw) => {
let fd = types::Fd(raw);
opcode::Write::new(fd, ptr, len as _)
.offset(offset as _)
.build()
}
CommonFd::Fixed(fixed) => {
let fd = types::Fixed(fixed);
opcode::Write::new(fd, ptr, len as _)
.offset(offset as _)
.build()
}
},
)
}
}