Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 127 additions & 14 deletions mountpoint-s3-fs/examples/prefetch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use std::time::{Duration, Instant};

use anyhow::Context;
use clap::{value_parser, Parser};
use clap::{value_parser, Parser, ValueEnum};
use futures::executor::block_on;
use mountpoint_s3_client::config::{EndpointConfig, RustLogAdapter, S3ClientConfig};
use mountpoint_s3_client::types::HeadObjectParams;
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use mountpoint_s3_fs::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, DEFAULT_CACHE_BLOCK_SIZE};
use mountpoint_s3_fs::mem_limiter::MemoryLimiter;
use mountpoint_s3_fs::object::ObjectId;
use mountpoint_s3_fs::prefetch::{Prefetcher, PrefetcherConfig};
Expand All @@ -33,6 +35,10 @@ fn init_tracing_subscriber() {
.expect("should succeed as first and only subscriber init call");
}

const CLIENT_OPTIONS_HEADER: &str = "S3 client options";
const BENCHMARK_OPTIONS_HEADER: &str = "Benchmark options";
const PREFETCHER_OPTIONS_HEADER: &str = "Prefetcher options";

#[derive(Parser, Debug)]
#[clap(
name = "Mountpoint Prefetcher Benchmark",
Expand All @@ -56,18 +62,25 @@ pub struct CliArgs {
#[clap(
long,
help = "Target throughput in gibibits per second",
value_name = "N",
help_heading = CLIENT_OPTIONS_HEADER,
value_name = "Gib/s",
value_parser = value_parser!(u64).range(1..),
alias = "throughput-target-gbps",
)]
pub maximum_throughput_gbps: Option<u64>,

#[arg(long, help = "Override value for CRT memory limit in gibibytes", value_name = "GiB")]
#[arg(
long,
help = "Override value for CRT memory limit in gibibytes",
help_heading = CLIENT_OPTIONS_HEADER,
value_name = "GiB",
)]
pub crt_memory_limit_gib: Option<u64>,

#[clap(
long,
help = "Maximum memory usage target for Mountpoint's memory limiter [default: 95% of total system memory]",
help_heading = PREFETCHER_OPTIONS_HEADER,
value_name = "MiB",
value_parser = value_parser!(u64).range(512..),
)]
Expand All @@ -76,6 +89,7 @@ pub struct CliArgs {
#[clap(
long,
help = "Part size for multi-part GET in bytes",
help_heading = CLIENT_OPTIONS_HEADER,
value_name = "BYTES",
value_parser = value_parser!(u64).range(1..usize::MAX as u64),
alias = "read-part-size",
Expand All @@ -85,23 +99,83 @@ pub struct CliArgs {
#[arg(
long,
help = "Size of read requests requests to the prefetcher",
help_heading = PREFETCHER_OPTIONS_HEADER,
default_value_t = 128 * 1024,
value_name = "BYTES",
)]
read_size: usize,

#[arg(long, help = "Number of times to download the S3 object", default_value_t = 1)]
#[arg(
long,
help = "Number of times to download the S3 object",
help_heading = BENCHMARK_OPTIONS_HEADER,
default_value_t = 1,
value_name = "N",
)]
iterations: usize,

#[arg(long, help = "Number of concurrent downloads", default_value_t = 1, value_name = "N")]
#[arg(
long,
help = "Number of concurrent downloads",
help_heading = BENCHMARK_OPTIONS_HEADER,
default_value_t = 1,
value_name = "N",
)]
downloads: usize,

#[clap(
long,
help = "One or more network interfaces to use when accessing S3. Requires Linux 5.7+ or running as root.",
value_name = "NETWORK_INTERFACE"
help_heading = CLIENT_OPTIONS_HEADER,
value_name = "NETWORK_INTERFACE",
)]
pub bind: Option<Vec<String>>,

#[clap(
long,
help = "Enable caching of object content to the given directory. \
The disk cache has no max size for this benchmark, so you must ensure there is enough available space.",
help_heading = PREFETCHER_OPTIONS_HEADER,
value_name = "DIRECTORY",
visible_alias = "cache",
)]
pub disk_cache: Option<PathBuf>,

#[clap(
long,
help = "Configure how the benchmark cleans the data cache directory.",
help_heading = PREFETCHER_OPTIONS_HEADER,
default_value = "every-iteration",
value_name = "MODE",
)]
pub disk_cache_cleanup: CacheCleanupMode,
}

#[derive(Clone, Debug)]
pub enum CacheCleanupMode {
/// Never try to clean at start or end of benchmark overall or for iterations.
///
/// This can allow effectively a 'warm' cache for testing.
/// This tool doesn't offer to perform warm up in-process as this can impact the process itself (e.g. memory usage).
/// You should warm up the cache with a separate process (such as this benchmark, but an earlier invocation).
Never,
/// Try to clean at start and end of every benchmark iteration.
///
/// This can allow effectively a 'cold' cache for testing.
EveryIteration,
}

impl ValueEnum for CacheCleanupMode {
fn value_variants<'a>() -> &'a [Self] {
&[CacheCleanupMode::Never, CacheCleanupMode::EveryIteration]
}

fn to_possible_value(&self) -> Option<clap::builder::PossibleValue> {
match self {
CacheCleanupMode::Never => Some(clap::builder::PossibleValue::new("never")),
CacheCleanupMode::EveryIteration => Some(clap::builder::PossibleValue::new("every-iteration")),
}
}
}

fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -134,11 +208,36 @@ fn main() -> anyhow::Result<()> {
let runtime = Runtime::new(client.event_loop_group());

for iteration in 0..args.iterations {
let manager = Prefetcher::default_builder(client.clone()).build(
runtime.clone(),
mem_limiter.clone(),
PrefetcherConfig::default(),
);
let manager = {
let client = client.clone();
let builder = match &args.disk_cache {
None => Prefetcher::default_builder(client),
Some(cache_directory) => {
if matches!(args.disk_cache_cleanup, CacheCleanupMode::EveryIteration) {
tracing::trace!(
iteration,
?cache_directory,
"removing contents of cache directory between iterations",
);
// Wait a little to avoid files existing in the direcotry.
// Presumably, this is waiting for any tasks writing blocks on runtime to complete.
// TODO: Fix this sillyness?
std::thread::sleep(Duration::from_millis(2000));
remove_all_entries(cache_directory.as_path())
.context("failed to cleanup cache directory between iterations")?;
}

let config = DiskDataCacheConfig {
cache_directory: cache_directory.clone(),
block_size: DEFAULT_CACHE_BLOCK_SIZE,
limit: CacheLimit::Unbounded,
};
let disk_cache = DiskDataCache::new(config);
Prefetcher::caching_builder(disk_cache, client)
}
};
builder.build(runtime.clone(), mem_limiter.clone(), PrefetcherConfig::default())
};

let received_bytes = Arc::new(AtomicU64::new(0));
let start = Instant::now();
Expand Down Expand Up @@ -177,7 +276,7 @@ fn main() -> anyhow::Result<()> {
Ok(())
}

fn make_s3_client_from_args(args: &CliArgs) -> Result<S3CrtClient, impl std::error::Error> {
fn make_s3_client_from_args(args: &CliArgs) -> anyhow::Result<S3CrtClient> {
let initial_read_window_size = 1024 * 1024 + 128 * 1024;
let mut client_config = S3ClientConfig::new()
.read_backpressure(true)
Expand All @@ -195,5 +294,19 @@ fn make_s3_client_from_args(args: &CliArgs) -> Result<S3CrtClient, impl std::err
if let Some(interfaces) = &args.bind {
client_config = client_config.network_interface_names(interfaces.clone());
}
S3CrtClient::new(client_config)
let client = S3CrtClient::new(client_config)?;
Ok(client)
}

/// Remove all directory entries recursively under `directory`.
fn remove_all_entries(directory: &Path) -> std::io::Result<()> {
for entry in std::fs::read_dir(directory)? {
let path = entry?.path();
match (path.is_file(), path.is_dir()) {
(true, false) => std::fs::remove_file(&path)?,
(false, true) => std::fs::remove_dir_all(&path)?,
_ => unreachable!("entry should be exactly one of file or directory"),
}
}
Ok(())
}
3 changes: 3 additions & 0 deletions mountpoint-s3-fs/src/data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use crate::object::ObjectId;
/// Indexes blocks within a given object.
pub type BlockIndex = u64;

/// Default block size for data caches.
pub const DEFAULT_CACHE_BLOCK_SIZE: u64 = 1024 * 1024;

/// Errors returned by operations on a [DataCache]
#[derive(Debug, Error)]
pub enum DataCacheError {
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3-fs/src/data_cache/disk_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct DiskDataCache {
}

/// Configuration for a [DiskDataCache].
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct DiskDataCacheConfig {
pub cache_directory: PathBuf,
/// Size of data blocks.
Expand All @@ -49,7 +49,7 @@ pub struct DiskDataCacheConfig {
}

/// Limit the cache size.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum CacheLimit {
Unbounded,
TotalSize { max_size: usize },
Expand Down
6 changes: 4 additions & 2 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use clap::{value_parser, ArgGroup, Parser, ValueEnum};
use mountpoint_s3_client::config::{AddressingStyle, S3ClientAuthConfig, AWSCRT_LOG_TARGET};
use mountpoint_s3_client::instance_info::InstanceInfo;
use mountpoint_s3_client::user_agent::UserAgent;
use mountpoint_s3_fs::data_cache::{CacheLimit, DataCacheConfig, DiskDataCacheConfig, ExpressDataCacheConfig};
use mountpoint_s3_fs::data_cache::{
CacheLimit, DataCacheConfig, DiskDataCacheConfig, ExpressDataCacheConfig, DEFAULT_CACHE_BLOCK_SIZE,
};
use mountpoint_s3_fs::fs::{CacheConfig, ServerSideEncryption, TimeToLive};
use mountpoint_s3_fs::fuse::config::{FuseOptions, FuseSessionConfig, MountPoint};
use mountpoint_s3_fs::logging::{prepare_log_file_name, LoggingConfig};
Expand Down Expand Up @@ -528,7 +530,7 @@ impl CliArgs {
if let Some(kib) = self.cache_block_size {
return kib * 1024;
}
1024 * 1024 // 1 MiB block size - default for disk cache and for express cache
DEFAULT_CACHE_BLOCK_SIZE // default for both disk cache and express cache
}

fn cache_config(&self) -> CacheConfig {
Expand Down
Loading