Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 101 additions & 3 deletions service/src/fusedev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
use std::any::Any;
use std::ffi::{CStr, CString};
use std::fs::metadata;
use std::io::{Error, ErrorKind, Result};
use std::io::{Error, ErrorKind, Result, Write};
use std::ops::Deref;
use std::os::fd::AsRawFd;
#[cfg(target_os = "linux")]
use std::os::linux::fs::MetadataExt;
#[cfg(target_os = "linux")]
use std::os::unix::ffi::OsStrExt;
#[cfg(target_os = "macos")]
use std::os::unix::fs::MetadataExt;
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::{
atomic::{AtomicI32, AtomicU64, Ordering},
mpsc::{channel, Receiver, Sender},
Expand All @@ -29,7 +30,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use fuse_backend_rs::abi::fuse_abi::{InHeader, OutHeader};
use fuse_backend_rs::api::server::{MetricsHook, Server};
use fuse_backend_rs::api::Vfs;
use fuse_backend_rs::transport::{FuseChannel, FuseSession};
use fuse_backend_rs::transport::{FuseChannel, FuseDevWriter, FuseSession};
use mio::Waker;
#[cfg(target_os = "linux")]
use nix::sys::stat::{major, minor};
Expand Down Expand Up @@ -145,6 +146,86 @@ impl FuseServer {
}
}

struct FusedevNotifier<'a> {
session: &'a Mutex<FuseSession>,
server: &'a Arc<Server<Arc<Vfs>>>,
}

impl<'a> FusedevNotifier<'a> {
fn new(session: &'a Mutex<FuseSession>, server: &'a Arc<Server<Arc<Vfs>>>) -> Self {
FusedevNotifier { session, server }
}

fn notify_resend(&self) -> NydusResult<()> {
let session = self.session.lock().unwrap();
let file = session
.get_fuse_file()
.ok_or(NydusError::FuseDeviceNotFound)?;

let fd = file.as_raw_fd();
let mut buf = vec![0u8; session.bufsize()];
let writer: FuseDevWriter = FuseDevWriter::new(fd, &mut buf)?;

self.server
.notify_resend(writer)
.map_err(NydusError::FuseWriteError)?;
Ok(())
}
}

struct FuseSysfsNotifier<'a> {
conn: &'a AtomicU64,
}

impl<'a> FuseSysfsNotifier<'a> {
fn new(conn: &'a AtomicU64) -> Self {
Self { conn }
}

fn get_possible_base_paths() -> Vec<&'static str> {
vec!["/proc/sys/fs/fuse/connections", "/sys/fs/fuse/connections"]
}

fn try_notify_with_path(&self, base_path: &str, event: &str) -> NydusResult<()> {
let path = PathBuf::from(base_path)
.join(self.conn.load(Ordering::Acquire).to_string())
.join(event);

let mut file = std::fs::OpenOptions::new()
.write(true)
.open(&path)
.map_err(NydusError::SysfsOpenError)?;

file.write_all(b"1").map_err(NydusError::SysfsWriteError)?;
Ok(())
}

fn notify(&self, event: &str) -> NydusResult<()> {
let paths = Self::get_possible_base_paths();

for (idx, path) in paths.iter().enumerate() {
match self.try_notify_with_path(path, event) {
Ok(()) => return Ok(()),
Err(e) => {
if !matches!(e, NydusError::SysfsOpenError(_)) || idx == paths.len() - 1 {
return Err(e);
}
}
}
}

Ok(())
}

fn notify_resend(&self) -> NydusResult<()> {
self.notify("resend")
}

fn notify_flush(&self) -> NydusResult<()> {
self.notify("flush")
}
}

#[allow(dead_code)]
pub struct FusedevFsService {
/// Fuse connection ID which usually equals to `st_dev`
Expand Down Expand Up @@ -205,6 +286,23 @@ impl FusedevFsService {
session.wake().map_err(NydusError::SessionShutdown)?;
Ok(())
}

pub fn drain_fuse_requests(&self) -> NydusResult<()> {
let fusedev_notifier = FusedevNotifier::new(&self.session, &self.server);
let sysfs_notifier = FuseSysfsNotifier::new(&self.conn);

match self.failover_policy {
FailoverPolicy::None => Ok(()),
FailoverPolicy::Flush => sysfs_notifier.notify_flush(),
FailoverPolicy::Resend => fusedev_notifier.notify_resend().or_else(|e| {
error!(
"Failed to notify resend by /dev/fuse, {:?}. Trying to do it by sysfs",
e
);
sysfs_notifier.notify_resend()
}),
}
}
}

impl FsService for FusedevFsService {
Expand Down
8 changes: 8 additions & 0 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ pub enum Error {
// Fuse session has been shutdown.
#[error("FUSE session has been shut down, {0}")]
SessionShutdown(FuseTransportError),
#[error("FUSE device not found")]
FuseDeviceNotFound,
#[error("Fuse write error, {0}")]
FuseWriteError(#[source] FuseError),
#[error("Sysfs file open error, {0}")]
SysfsOpenError(#[source] io::Error),
#[error("Sysfs write error, {0}")]
SysfsWriteError(#[source] io::Error),

// virtio-fs
#[error("failed to handle event other than input event")]
Expand Down
28 changes: 20 additions & 8 deletions service/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ impl From<UpgradeMgrError> for Error {
/// FUSE fail-over policies.
#[derive(PartialEq, Eq, Debug)]
pub enum FailoverPolicy {
/// Do nothing.
None,
/// Flush pending requests.
Flush,
/// Resend pending requests.
Expand All @@ -60,6 +62,7 @@ impl TryFrom<&str> for FailoverPolicy {

fn try_from(p: &str) -> std::result::Result<Self, Self::Error> {
match p {
"none" => Ok(FailoverPolicy::None),
"flush" => Ok(FailoverPolicy::Flush),
"resend" => Ok(FailoverPolicy::Resend),
x => Err(einval!(format!("invalid FUSE fail-over mode {}", x))),
Expand Down Expand Up @@ -480,13 +483,13 @@ pub mod fusedev_upgrade {

// restore fuse fd
if let Some(f) = mgr.return_file() {
svc.as_any()
.downcast_ref::<FusedevFsService>()
.unwrap()
.session
.lock()
.unwrap()
.set_fuse_file(f);
let fuse_svc = svc.as_any().downcast_ref::<FusedevFsService>().unwrap();
fuse_svc.session.lock().unwrap().set_fuse_file(f);

// drain fuse requests
if let Err(e) = fuse_svc.drain_fuse_requests() {
warn!("Failed to drain fuse requests: {}", e);
}
}

// restore vfs
Expand Down Expand Up @@ -522,6 +525,10 @@ mod tests {

#[test]
fn test_failover_policy() {
assert_eq!(
FailoverPolicy::try_from("none").unwrap(),
FailoverPolicy::None
);
assert_eq!(
FailoverPolicy::try_from("flush").unwrap(),
FailoverPolicy::Flush
Expand All @@ -531,11 +538,16 @@ mod tests {
FailoverPolicy::Resend
);

let strs = vec!["flash", "Resend"];
let strs = vec!["null", "flash", "Resend"];
for s in strs.clone().into_iter() {
assert!(FailoverPolicy::try_from(s).is_err());
}

let str = String::from("none");
assert_eq!(
FailoverPolicy::try_from(&str).unwrap(),
FailoverPolicy::None
);
let str = String::from("flush");
assert_eq!(
FailoverPolicy::try_from(&str).unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion src/bin/nydusd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ fn append_fuse_options(app: Command) -> Command {
.long("failover-policy")
.default_value("resend")
.help("FUSE server failover policy")
.value_parser(["resend", "flush"])
.value_parser(["none", "resend", "flush"])
.required(false),
)
.arg(
Expand Down
Loading