diff --git a/mountpoint-s3-fs/examples/prefetch_benchmark.rs b/mountpoint-s3-fs/examples/prefetch_benchmark.rs index de6de9c9e..96addd924 100644 --- a/mountpoint-s3-fs/examples/prefetch_benchmark.rs +++ b/mountpoint-s3-fs/examples/prefetch_benchmark.rs @@ -1,14 +1,16 @@ +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::thread; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::Context; -use clap::{value_parser, Parser}; +use clap::{value_parser, Parser, ValueEnum}; use futures::executor::block_on; use mountpoint_s3_client::config::{EndpointConfig, RustLogAdapter, S3ClientConfig}; use mountpoint_s3_client::types::HeadObjectParams; use mountpoint_s3_client::{ObjectClient, S3CrtClient}; +use mountpoint_s3_fs::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, DEFAULT_CACHE_BLOCK_SIZE}; use mountpoint_s3_fs::mem_limiter::MemoryLimiter; use mountpoint_s3_fs::object::ObjectId; use mountpoint_s3_fs::prefetch::{Prefetcher, PrefetcherConfig}; @@ -33,6 +35,10 @@ fn init_tracing_subscriber() { .expect("should succeed as first and only subscriber init call"); } +const CLIENT_OPTIONS_HEADER: &str = "S3 client options"; +const BENCHMARK_OPTIONS_HEADER: &str = "Benchmark options"; +const PREFETCHER_OPTIONS_HEADER: &str = "Prefetcher options"; + #[derive(Parser, Debug)] #[clap( name = "Mountpoint Prefetcher Benchmark", @@ -56,18 +62,25 @@ pub struct CliArgs { #[clap( long, help = "Target throughput in gibibits per second", - value_name = "N", + help_heading = CLIENT_OPTIONS_HEADER, + value_name = "Gib/s", value_parser = value_parser!(u64).range(1..), alias = "throughput-target-gbps", )] pub maximum_throughput_gbps: Option, - #[arg(long, help = "Override value for CRT memory limit in gibibytes", value_name = "GiB")] + #[arg( + long, + help = "Override value for CRT memory limit in gibibytes", + help_heading = CLIENT_OPTIONS_HEADER, + value_name = "GiB", + )] pub crt_memory_limit_gib: Option, #[clap( long, help = "Maximum memory usage target for Mountpoint's memory limiter [default: 95% of total system memory]", + help_heading = PREFETCHER_OPTIONS_HEADER, value_name = "MiB", value_parser = value_parser!(u64).range(512..), )] @@ -76,6 +89,7 @@ pub struct CliArgs { #[clap( long, help = "Part size for multi-part GET in bytes", + help_heading = CLIENT_OPTIONS_HEADER, value_name = "BYTES", value_parser = value_parser!(u64).range(1..usize::MAX as u64), alias = "read-part-size", @@ -85,23 +99,83 @@ pub struct CliArgs { #[arg( long, help = "Size of read requests requests to the prefetcher", + help_heading = PREFETCHER_OPTIONS_HEADER, default_value_t = 128 * 1024, value_name = "BYTES", )] read_size: usize, - #[arg(long, help = "Number of times to download the S3 object", default_value_t = 1)] + #[arg( + long, + help = "Number of times to download the S3 object", + help_heading = BENCHMARK_OPTIONS_HEADER, + default_value_t = 1, + value_name = "N", + )] iterations: usize, - #[arg(long, help = "Number of concurrent downloads", default_value_t = 1, value_name = "N")] + #[arg( + long, + help = "Number of concurrent downloads", + help_heading = BENCHMARK_OPTIONS_HEADER, + default_value_t = 1, + value_name = "N", + )] downloads: usize, #[clap( long, help = "One or more network interfaces to use when accessing S3. Requires Linux 5.7+ or running as root.", - value_name = "NETWORK_INTERFACE" + help_heading = CLIENT_OPTIONS_HEADER, + value_name = "NETWORK_INTERFACE", )] pub bind: Option>, + + #[clap( + long, + help = "Enable caching of object content to the given directory. \ + The disk cache has no max size for this benchmark, so you must ensure there is enough available space.", + help_heading = PREFETCHER_OPTIONS_HEADER, + value_name = "DIRECTORY", + visible_alias = "cache", + )] + pub disk_cache: Option, + + #[clap( + long, + help = "Configure how the benchmark cleans the data cache directory.", + help_heading = PREFETCHER_OPTIONS_HEADER, + default_value = "every-iteration", + value_name = "MODE", + )] + pub disk_cache_cleanup: CacheCleanupMode, +} + +#[derive(Clone, Debug)] +pub enum CacheCleanupMode { + /// Never try to clean at start or end of benchmark overall or for iterations. + /// + /// This can allow effectively a 'warm' cache for testing. + /// This tool doesn't offer to perform warm up in-process as this can impact the process itself (e.g. memory usage). + /// You should warm up the cache with a separate process (such as this benchmark, but an earlier invocation). + Never, + /// Try to clean at start and end of every benchmark iteration. + /// + /// This can allow effectively a 'cold' cache for testing. + EveryIteration, +} + +impl ValueEnum for CacheCleanupMode { + fn value_variants<'a>() -> &'a [Self] { + &[CacheCleanupMode::Never, CacheCleanupMode::EveryIteration] + } + + fn to_possible_value(&self) -> Option { + match self { + CacheCleanupMode::Never => Some(clap::builder::PossibleValue::new("never")), + CacheCleanupMode::EveryIteration => Some(clap::builder::PossibleValue::new("every-iteration")), + } + } } fn main() -> anyhow::Result<()> { @@ -134,11 +208,36 @@ fn main() -> anyhow::Result<()> { let runtime = Runtime::new(client.event_loop_group()); for iteration in 0..args.iterations { - let manager = Prefetcher::default_builder(client.clone()).build( - runtime.clone(), - mem_limiter.clone(), - PrefetcherConfig::default(), - ); + let manager = { + let client = client.clone(); + let builder = match &args.disk_cache { + None => Prefetcher::default_builder(client), + Some(cache_directory) => { + if matches!(args.disk_cache_cleanup, CacheCleanupMode::EveryIteration) { + tracing::trace!( + iteration, + ?cache_directory, + "removing contents of cache directory between iterations", + ); + // Wait a little to avoid files existing in the direcotry. + // Presumably, this is waiting for any tasks writing blocks on runtime to complete. + // TODO: Fix this sillyness? + std::thread::sleep(Duration::from_millis(2000)); + remove_all_entries(cache_directory.as_path()) + .context("failed to cleanup cache directory between iterations")?; + } + + let config = DiskDataCacheConfig { + cache_directory: cache_directory.clone(), + block_size: DEFAULT_CACHE_BLOCK_SIZE, + limit: CacheLimit::Unbounded, + }; + let disk_cache = DiskDataCache::new(config); + Prefetcher::caching_builder(disk_cache, client) + } + }; + builder.build(runtime.clone(), mem_limiter.clone(), PrefetcherConfig::default()) + }; let received_bytes = Arc::new(AtomicU64::new(0)); let start = Instant::now(); @@ -177,7 +276,7 @@ fn main() -> anyhow::Result<()> { Ok(()) } -fn make_s3_client_from_args(args: &CliArgs) -> Result { +fn make_s3_client_from_args(args: &CliArgs) -> anyhow::Result { let initial_read_window_size = 1024 * 1024 + 128 * 1024; let mut client_config = S3ClientConfig::new() .read_backpressure(true) @@ -195,5 +294,19 @@ fn make_s3_client_from_args(args: &CliArgs) -> Result std::io::Result<()> { + for entry in std::fs::read_dir(directory)? { + let path = entry?.path(); + match (path.is_file(), path.is_dir()) { + (true, false) => std::fs::remove_file(&path)?, + (false, true) => std::fs::remove_dir_all(&path)?, + _ => unreachable!("entry should be exactly one of file or directory"), + } + } + Ok(()) } diff --git a/mountpoint-s3-fs/src/data_cache.rs b/mountpoint-s3-fs/src/data_cache.rs index 415527299..10ae49abd 100644 --- a/mountpoint-s3-fs/src/data_cache.rs +++ b/mountpoint-s3-fs/src/data_cache.rs @@ -25,6 +25,9 @@ use crate::object::ObjectId; /// Indexes blocks within a given object. pub type BlockIndex = u64; +/// Default block size for data caches. +pub const DEFAULT_CACHE_BLOCK_SIZE: u64 = 1024 * 1024; + /// Errors returned by operations on a [DataCache] #[derive(Debug, Error)] pub enum DataCacheError { diff --git a/mountpoint-s3-fs/src/data_cache/disk_data_cache.rs b/mountpoint-s3-fs/src/data_cache/disk_data_cache.rs index b0cbc9bd4..7d0d520c5 100644 --- a/mountpoint-s3-fs/src/data_cache/disk_data_cache.rs +++ b/mountpoint-s3-fs/src/data_cache/disk_data_cache.rs @@ -39,7 +39,7 @@ pub struct DiskDataCache { } /// Configuration for a [DiskDataCache]. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct DiskDataCacheConfig { pub cache_directory: PathBuf, /// Size of data blocks. @@ -49,7 +49,7 @@ pub struct DiskDataCacheConfig { } /// Limit the cache size. -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum CacheLimit { Unbounded, TotalSize { max_size: usize }, diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index 459a95423..1033e84a1 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -7,7 +7,9 @@ use clap::{value_parser, ArgGroup, Parser, ValueEnum}; use mountpoint_s3_client::config::{AddressingStyle, S3ClientAuthConfig, AWSCRT_LOG_TARGET}; use mountpoint_s3_client::instance_info::InstanceInfo; use mountpoint_s3_client::user_agent::UserAgent; -use mountpoint_s3_fs::data_cache::{CacheLimit, DataCacheConfig, DiskDataCacheConfig, ExpressDataCacheConfig}; +use mountpoint_s3_fs::data_cache::{ + CacheLimit, DataCacheConfig, DiskDataCacheConfig, ExpressDataCacheConfig, DEFAULT_CACHE_BLOCK_SIZE, +}; use mountpoint_s3_fs::fs::{CacheConfig, ServerSideEncryption, TimeToLive}; use mountpoint_s3_fs::fuse::config::{FuseOptions, FuseSessionConfig, MountPoint}; use mountpoint_s3_fs::logging::{prepare_log_file_name, LoggingConfig}; @@ -528,7 +530,7 @@ impl CliArgs { if let Some(kib) = self.cache_block_size { return kib * 1024; } - 1024 * 1024 // 1 MiB block size - default for disk cache and for express cache + DEFAULT_CACHE_BLOCK_SIZE // default for both disk cache and express cache } fn cache_config(&self) -> CacheConfig {