Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 173 additions & 8 deletions mountpoint-s3-fs/src/data_cache/cache_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::io;
use std::os::unix::ffi::OsStrExt as _;
use std::os::unix::fs::DirBuilderExt;
use std::path::{Path, PathBuf};
use std::time::SystemTime;

use thiserror::Error;

Expand All @@ -39,6 +40,15 @@ pub enum ManagedCacheDirError {
}

impl ManagedCacheDir {
// This is the sub-directory where Mountpoint will store cache contents for the current Mountpoint instance.
const MOUNTPOINT_CACHE_DIR_NAME: &str = "mountpoint-cache";
// This is the sub-directory prefix where Mountpoint will rename any leftover cache sub-directory from any previous Mountpoint instances
// and then will attempt to clean up in a background thread.
const MOUNTPOINT_OLD_CACHE_DIR_PREFIX: &str = "old-mountpoint-cache.";
// This is the maximum number of retries Mountpoint will try to clean up the old cache folder in the background thread.
// Mountpoint will log any errors if clean up fails.
const MOUNTPOINT_OLD_CACHE_DIR_CLEANUP_MAX_RETRY: usize = 3;

/// Create a new directory inside the provided parent path.
///
/// If `should_cleanup` is `true` and `<parent_path>/mountpoint-cache` already exists,
Expand All @@ -53,7 +63,7 @@ impl ManagedCacheDir {
cache_key: Option<&OsStr>,
should_cleanup: bool,
) -> Result<Self, ManagedCacheDirError> {
let mountpoint_cache_path = parent_path.as_ref().join("mountpoint-cache");
let mountpoint_cache_path = parent_path.as_ref().join(ManagedCacheDir::MOUNTPOINT_CACHE_DIR_NAME);
let managed_cache_path = match cache_key {
None => mountpoint_cache_path.clone(),
Some(cache_key) => mountpoint_cache_path.join(hash_cache_key(cache_key.as_bytes())),
Expand All @@ -65,7 +75,9 @@ impl ManagedCacheDir {
};

if should_cleanup {
managed_cache_dir.remove()?;
managed_cache_dir
.remove_in_background()
.map_err(ManagedCacheDirError::CleanupFailure)?;
}
Self::create_dir(&managed_cache_dir.mountpoint_cache_path)?;
if cache_key.is_some() {
Expand All @@ -77,16 +89,61 @@ impl ManagedCacheDir {
/// Remove the cache sub-directory, along with its contents if any
fn remove(&self) -> Result<(), ManagedCacheDirError> {
tracing::debug!(cache_subdirectory = ?self.mountpoint_cache_path, "removing the cache sub-directory and any contents");
if let Err(remove_dir_err) = fs::remove_dir_all(&self.mountpoint_cache_path) {
match remove_dir_err.kind() {
io::ErrorKind::NotFound => (),
_kind => return Err(ManagedCacheDirError::CleanupFailure(remove_dir_err)),
}
}
remove_dir_all_ignore_not_found(&self.mountpoint_cache_path).map_err(ManagedCacheDirError::CleanupFailure)?;
tracing::trace!(cache_subdirectory = ?self.mountpoint_cache_path, "cache sub-directory removal complete");
Ok(())
}

/// Remove the cache sub-directory in a background thread.
/// This method first renames `<parent_path>/mountpoint-cache` into `<parent_path>/old-mountpoint-cache.<EPOCH_NS>` in the current thread,
/// and then spawns a detached background thread to clean up the old cache folder.
fn remove_in_background(&self) -> io::Result<()> {
let exists = self.mountpoint_cache_path.try_exists()?;
if !exists {
// Nothing to do
return Ok(());
}

let epoch_ns = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or_default();
let renamed_cache_path = self.mountpoint_cache_path.with_file_name(format!(
"{}{}",
ManagedCacheDir::MOUNTPOINT_OLD_CACHE_DIR_PREFIX,
epoch_ns
));

tracing::debug!(
cache_subdirectory = ?self.mountpoint_cache_path,
renamed_cache_subdirectory = ?renamed_cache_path,
"renaming the cache sub-directory to clean up in a background thread");
fs::rename(&self.mountpoint_cache_path, &renamed_cache_path)?;

std::thread::spawn(move || {
for attempt in 1..=ManagedCacheDir::MOUNTPOINT_OLD_CACHE_DIR_CLEANUP_MAX_RETRY {
match remove_dir_all_ignore_not_found(&renamed_cache_path) {
Ok(()) => {
tracing::debug!(
attempt = attempt,
renamed_cache_subdirectory = ?renamed_cache_path,
"cache sub-directory removal complete");
return;
}
Err(err) => {
tracing::error!(
attempt = attempt,
renamed_cache_subdirectory = ?renamed_cache_path,
error = ?err,
"failed to remove cache sub-directory in background");
}
}
}
});

Ok(())
}

/// Create a directory, assuming the parent path exists.
fn create_dir(path: &Path) -> Result<(), ManagedCacheDirError> {
let mkdir_result = fs::DirBuilder::new().mode(0o700).create(path);
Expand Down Expand Up @@ -130,6 +187,16 @@ fn hash_cache_key(cache_key: &[u8]) -> String {
hex::encode(hashed_key)
}

/// Removes given directory after removing all its contents.
/// This function is a wrapper around [fs::remove_dir_all] and just ignores "not found" errors.
fn remove_dir_all_ignore_not_found(path: &Path) -> io::Result<()> {
match fs::remove_dir_all(path) {
Ok(()) => Ok(()),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err),
}
}

#[cfg(test)]
mod tests {
use test_case::test_matrix;
Expand All @@ -140,6 +207,7 @@ mod tests {
use std::fs;
use std::os::unix::ffi::OsStrExt as _;
use std::os::unix::fs::{DirBuilderExt, PermissionsExt};
use std::time::{Duration, Instant};

const EXPECTED_DIR_MODE: u32 = 0o700;

Expand Down Expand Up @@ -298,4 +366,101 @@ mod tests {

temp_dir.close().unwrap();
}

// This test is disabled by default as it takes 10m+ to run.
// #[test_matrix([50_000, 100_000, 250_000, 500_000])]
#[allow(unused)]
fn test_big_folder_cleaned_in_background(num_entries: usize) {
let temp_dir = tempfile::tempdir().unwrap();
let expected_path = temp_dir.path().join("mountpoint-cache");

// Here we're trying to create a folder structure similar to how Mountpoint stores local-cache.
// Cache folder layout of a Mountpoint instance would look like:
//
// $ tree -L 3 --filelimit=256 /tmp/mp-cache/mountpoint-cache
// /tmp/mp-cache/mountpoint-cache
// └── V2
// ├── 00 [418 entries exceeds filelimit, not opening dir]
// ├── 01 [425 entries exceeds filelimit, not opening dir]
// ├── 02 [410 entries exceeds filelimit, not opening dir]
// ├── 03 [454 entries exceeds filelimit, not opening dir]
// ├── 04 [415 entries exceeds filelimit, not opening dir]
// ├── 05 [423 entries exceeds filelimit, not opening dir]
// ├── 06 [404 entries exceeds filelimit, not opening dir]
// ├── 07 [424 entries exceeds filelimit, not opening dir]
// ├── 08 [423 entries exceeds filelimit, not opening dir]
// ├── 09 [396 entries exceeds filelimit, not opening dir]
// ├── 0a [456 entries exceeds filelimit, not opening dir]
// ...
// 256 folders in total
let start = std::time::Instant::now();
const NUM_PARTITIONS: usize = 256;
for n in 0..num_entries {
let dir = expected_path.join("V2").join(format!("{}", n % NUM_PARTITIONS));
fs::DirBuilder::new()
.recursive(true)
.mode(0o775) // something that isn't the expected `0o700`
.create(&dir)
.unwrap();
fs::File::create(dir.join(format!("{n}.txt"))).unwrap();
}
println!(
"created cache directory with {} entries in {:?}",
num_entries,
start.elapsed()
);

let start = Instant::now();
let managed_dir = ManagedCacheDir::new_from_parent_with_cache_key(temp_dir.path(), None, SHOULD_CLEANUP)
.expect("creating managed dir should succeed");
println!("managed dir constructed in {:?}", start.elapsed());

// `<parent_path>/mountpoint-cache` should exists with correct permissions and without any entries
assert_dir_exists_with_permissions!(&expected_path);
assert_eq!(
0,
fs::read_dir(&expected_path).unwrap().count(),
"directory should be empty"
);

// Ensure old cache directory exists...
let old_cache_dir = fs::read_dir(temp_dir.path())
.unwrap()
.filter_map(|res| match res {
Ok(entry) => {
let path = entry.path();
let file_name = path.file_name().unwrap().to_str().unwrap();
if file_name.starts_with("old-mountpoint-cache.") {
Some(entry)
} else {
None
}
}
Err(_) => None,
})
.next()
.expect("missing old cache directory");
assert!(old_cache_dir.path().exists(), "old cache directory must exists");
// ... but it should be cleaned in the background, and eventually should disappear in 5 mins
for _ in 0..300 {
std::thread::sleep(Duration::from_secs(1));
if !old_cache_dir.path().exists() {
break;
}
}
assert!(
!old_cache_dir.path().exists(),
"old cache directory must disappear after 5 mins"
);
println!(
"old cache directory {:?} cleaned up in {:?}",
old_cache_dir.path(),
start.elapsed()
);

drop(managed_dir);
assert_dir_does_not_exist!(&expected_path, "cache folder should be cleaned on drop");

temp_dir.close().unwrap();
}
}
Loading