diff --git a/Cargo.lock b/Cargo.lock index 4d071a552..8d4486042 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2711,6 +2711,7 @@ dependencies = [ "mountpoint-s3-client", "mountpoint-s3-fuser", "nix 0.30.1", + "once_cell", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", diff --git a/mountpoint-s3-client/src/metrics.rs b/mountpoint-s3-client/src/metrics.rs index 5e0fc1ee0..eec70b636 100644 --- a/mountpoint-s3-client/src/metrics.rs +++ b/mountpoint-s3-client/src/metrics.rs @@ -1,6 +1,6 @@ // S3 metric name constants pub const S3_REQUEST_COUNT: &str = "s3.request_count"; -pub const S3_REQUEST_FAILURE: &str = "s3.request_failure"; +pub const S3_REQUEST_ERRORS: &str = "s3.request_errors"; pub const S3_REQUEST_CANCELED: &str = "s3.request_canceled"; pub const S3_REQUEST_TOTAL_LATENCY: &str = "s3.request_total_latency"; pub const S3_REQUEST_FIRST_BYTE_LATENCY: &str = "s3.request_first_byte_latency"; diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index fd990b9e4..0d3937f02 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -45,7 +45,7 @@ use crate::endpoint_config::EndpointError; use crate::endpoint_config::{self, EndpointConfig}; use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata}; use crate::metrics::{ - ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_FAILURE, + ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_ERRORS, S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY, }; use crate::object_client::*; @@ -611,7 +611,7 @@ impl S3CrtClientInner { metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => request_type).record(duration.as_micros() as f64); metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => request_type).increment(1); if request_failure { - metrics::counter!(S3_REQUEST_FAILURE, ATTR_S3_REQUEST => request_type, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1); + metrics::counter!(S3_REQUEST_ERRORS, ATTR_S3_REQUEST => request_type, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1); } else if request_canceled { metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => request_type).increment(1); } diff --git a/mountpoint-s3-client/tests/metrics.rs b/mountpoint-s3-client/tests/metrics.rs index 4d787a9f0..2315117ef 100644 --- a/mountpoint-s3-client/tests/metrics.rs +++ b/mountpoint-s3-client/tests/metrics.rs @@ -21,7 +21,7 @@ use metrics::{ }; use mountpoint_s3_client::error::ObjectClientError; use mountpoint_s3_client::metrics::{ - ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_COUNT, S3_REQUEST_FAILURE, S3_REQUEST_FIRST_BYTE_LATENCY, + ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_COUNT, S3_REQUEST_ERRORS, S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY, }; use mountpoint_s3_client::types::{GetObjectParams, HeadObjectParams}; @@ -272,7 +272,7 @@ async fn test_get_object_metrics_403() { // Verify S3 request failure metrics let (key, request_failures) = metrics - .get(S3_REQUEST_FAILURE, Some(ATTR_S3_REQUEST), Some("GetObject")) + .get(S3_REQUEST_ERRORS, Some(ATTR_S3_REQUEST), Some("GetObject")) .expect("request failures metric should exist"); // Verify HTTP status code attribute exists and is correct diff --git a/mountpoint-s3-fs/Cargo.toml b/mountpoint-s3-fs/Cargo.toml index 0d0412af0..fb2c04c5d 100644 --- a/mountpoint-s3-fs/Cargo.toml +++ b/mountpoint-s3-fs/Cargo.toml @@ -81,6 +81,7 @@ test-case = "3.3.1" tokio = { version = "1.47.1", features = ["rt", "macros"] } walkdir = "2.5.0" wiremock = "0.6.5" +once_cell = "1.21.3" [features] # Unreleased and/or experimental features: not enabled in the release binary and may be dropped in future diff --git a/mountpoint-s3-fs/src/fuse.rs b/mountpoint-s3-fs/src/fuse.rs index 54526521f..724ec5c0f 100644 --- a/mountpoint-s3-fs/src/fuse.rs +++ b/mountpoint-s3-fs/src/fuse.rs @@ -9,6 +9,7 @@ use time::OffsetDateTime; use tracing::{Instrument, field, instrument}; use crate::fs::{DirectoryEntry, DirectoryReplier, InodeNo, S3Filesystem, ToErrno}; +use crate::metrics::defs::{ATTR_FUSE_REQUEST, FUSE_IO_SIZE, FUSE_REQUEST_ERRORS}; #[cfg(target_os = "macos")] use fuser::ReplyXTimes; use fuser::{ @@ -48,7 +49,7 @@ macro_rules! fuse_error { ($name:literal, $reply:expr, $err:expr, $fs:expr, $request:expr) => {{ let err = $err; event!(err.level, "{} failed with errno {}: {:#}", $name, err.to_errno(), err); - ::metrics::counter!("fuse.op_failures", "op" => $name).increment(1); + ::metrics::counter!(FUSE_REQUEST_ERRORS, ATTR_FUSE_REQUEST => $name).increment(1); if let Some(error_logger) = $fs.error_logger.as_ref() { error_logger.error(&err, $name, $request.unique()); } @@ -60,7 +61,7 @@ macro_rules! fuse_error { macro_rules! fuse_unsupported { ($name:literal, $reply:expr, $err:expr, $level:expr) => {{ event!($level, "{} failed: operation not supported by Mountpoint", $name); - ::metrics::counter!("fuse.op_failures", "op" => $name).increment(1); + ::metrics::counter!(FUSE_REQUEST_ERRORS, ATTR_FUSE_REQUEST => $name).increment(1); ::metrics::counter!("fuse.op_unimplemented","op" => $name).increment(1); $reply.error($err); }}; @@ -152,7 +153,7 @@ where } metrics::counter!("fuse.total_bytes", "type" => "read").increment(bytes_sent as u64); - metrics::histogram!("fuse.io_size", "type" => "read").record(bytes_sent as f64); + metrics::histogram!(FUSE_IO_SIZE, ATTR_FUSE_REQUEST => "read").record(bytes_sent as f64); } #[instrument(level="warn", skip_all, fields(req=req.unique(), ino=parent, name=field::Empty))] @@ -334,7 +335,7 @@ where Ok(bytes_written) => { reply.written(bytes_written); metrics::counter!("fuse.total_bytes", "type" => "write").increment(bytes_written as u64); - metrics::histogram!("fuse.io_size", "type" => "write").record(bytes_written as f64); + metrics::histogram!(FUSE_IO_SIZE, ATTR_FUSE_REQUEST => "write").record(bytes_written as f64); } Err(e) => fuse_error!("write", reply, e, self, req), } diff --git a/mountpoint-s3-fs/src/fuse/session.rs b/mountpoint-s3-fs/src/fuse/session.rs index 9cdebe955..494c12312 100644 --- a/mountpoint-s3-fs/src/fuse/session.rs +++ b/mountpoint-s3-fs/src/fuse/session.rs @@ -1,13 +1,13 @@ use std::io; use anyhow::Context; -use const_format::formatcp; #[cfg(target_os = "linux")] use fuser::MountOption; use fuser::{Filesystem, Session, SessionUnmounter}; use tracing::{debug, error, trace, warn}; use super::config::{FuseSessionConfig, MountPoint}; +use crate::metrics::defs::{FUSE_IDLE_THREADS, FUSE_TOTAL_THREADS}; use crate::sync::Arc; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::mpsc::{self, Sender}; @@ -194,10 +194,6 @@ struct WorkerPool { max_workers: usize, } -const METRIC_NAME_PREFIX_WORKERS: &str = "fuse.mp_workers"; -const METRIC_NAME_FUSE_WORKERS_TOTAL: &str = formatcp!("{METRIC_NAME_PREFIX_WORKERS}.total_count"); -const METRIC_NAME_FUSE_WORKERS_IDLE: &str = formatcp!("{METRIC_NAME_PREFIX_WORKERS}.idle_count"); - #[derive(Debug)] struct WorkerPoolState { work: W, @@ -248,8 +244,8 @@ impl WorkerPool { let new_count = old_count + 1; let idle_worker_count = self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst) + 1; - metrics::gauge!(METRIC_NAME_FUSE_WORKERS_TOTAL).set(new_count as f64); - metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).set(idle_worker_count as f64); + metrics::gauge!(FUSE_TOTAL_THREADS).set(new_count as f64); + metrics::histogram!(FUSE_IDLE_THREADS).record(idle_worker_count as f64); let worker_index = old_count; let clone = (*self).clone(); @@ -267,7 +263,7 @@ impl WorkerPool { self.state.work.run( || { let previous_idle_count = self.state.idle_worker_count.fetch_sub(1, Ordering::SeqCst); - metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).decrement(1); + metrics::histogram!(FUSE_IDLE_THREADS).record((previous_idle_count - 1) as f64); if previous_idle_count == 1 { // This was the only idle thread, try to spawn a new one. if let Err(error) = self.try_add_worker() { @@ -276,8 +272,8 @@ impl WorkerPool { } }, || { - self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst); - metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).increment(1); + let idle_worker_count = self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst); + metrics::histogram!(FUSE_IDLE_THREADS).record((idle_worker_count + 1) as f64); }, ) } diff --git a/mountpoint-s3-fs/src/metrics.rs b/mountpoint-s3-fs/src/metrics.rs index 4baee1b01..fdb1c9530 100644 --- a/mountpoint-s3-fs/src/metrics.rs +++ b/mountpoint-s3-fs/src/metrics.rs @@ -14,6 +14,7 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use dashmap::DashMap; +use defs::PROCESS_MEMORY_USAGE; use metrics::{Key, Metadata, Recorder}; use sysinfo::{MemoryRefreshKind, ProcessRefreshKind, ProcessesToUpdate, System, get_current_pid}; @@ -98,7 +99,7 @@ fn poll_process_metrics(sys: &mut System) { if let Some(process) = sys.process(pid) { // update the metrics only when there is some change, otherwise it will be too spammy. if last_mem != process.memory() { - metrics::gauge!("process.memory_usage").set(process.memory() as f64); + metrics::gauge!(PROCESS_MEMORY_USAGE).set(process.memory() as f64); metrics::gauge!("system.available_memory").set(sys.available_memory() as f64); } } @@ -412,13 +413,14 @@ mod tests { mod test_otlp_metrics { use super::*; use crate::metrics::data::Metric; - use crate::metrics::defs::{ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_FAILURE}; + use crate::metrics::defs::{ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_ERRORS}; use crate::metrics_otel::{OtlpConfig, OtlpMetricsExporter}; use metrics::{Key, Unit}; use opentelemetry::metrics::MeterProvider as _; use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData, ResourceMetrics}; use opentelemetry_sdk::metrics::in_memory_exporter::InMemoryMetricExporter; use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; + use std::sync::Arc; struct TestContext { exporter: InMemoryMetricExporter, provider: SdkMeterProvider, @@ -522,7 +524,7 @@ mod test_otlp_metrics { let ctx = TestContext::new(); let key = Key::from_parts( - "s3.request_failure", + S3_REQUEST_ERRORS, vec![ metrics::Label::new(ATTR_S3_REQUEST, "GetObject"), metrics::Label::new(ATTR_HTTP_STATUS, "403"), @@ -530,7 +532,7 @@ mod test_otlp_metrics { ], ); - let config = defs::lookup_config(S3_REQUEST_FAILURE); + let config = defs::lookup_config(S3_REQUEST_ERRORS); let counter = Metric::counter_otlp(&ctx.otlp_exporter, &key, &config); counter.as_counter().increment(1); @@ -542,7 +544,7 @@ mod test_otlp_metrics { let scope_metrics: Vec<_> = resource_metrics.scope_metrics().collect(); let metric = scope_metrics[0] .metrics() - .find(|m| m.name() == "s3.request_failure") + .find(|m| m.name() == S3_REQUEST_ERRORS) .unwrap(); match metric.data() { diff --git a/mountpoint-s3-fs/src/metrics/defs.rs b/mountpoint-s3-fs/src/metrics/defs.rs index 4a0a871a6..6a1c57ff3 100644 --- a/mountpoint-s3-fs/src/metrics/defs.rs +++ b/mountpoint-s3-fs/src/metrics/defs.rs @@ -1,6 +1,6 @@ use metrics::Unit; pub use mountpoint_s3_client::metrics::{ - ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_COUNT, S3_REQUEST_FAILURE, S3_REQUEST_FIRST_BYTE_LATENCY, + ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_COUNT, S3_REQUEST_ERRORS, S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY, }; @@ -21,11 +21,14 @@ pub struct MetricConfig { // Metric name constants pub const FUSE_REQUEST_LATENCY: &str = "fuse.request_latency"; pub const FUSE_IO_SIZE: &str = "fuse.io_size"; -pub const FUSE_REQUEST_FAILURE: &str = "fuse.request_failure"; +pub const FUSE_REQUEST_ERRORS: &str = "fuse.request_errors"; pub const FUSE_IDLE_THREADS: &str = "fuse.idle_threads"; +pub const FUSE_TOTAL_THREADS: &str = "fuse.total_threads"; pub const PROCESS_MEMORY_USAGE: &str = "process.memory_usage"; +pub const PREFETCH_RESET_STATE: &str = "prefetch.reset_state"; + // Attribute constants pub const ATTR_FUSE_REQUEST: &str = "fuse_request"; @@ -41,14 +44,19 @@ pub fn lookup_config(name: &str) -> MetricConfig { stability: MetricStability::Stable, otlp_attributes: &[ATTR_FUSE_REQUEST], }, - FUSE_REQUEST_FAILURE => MetricConfig { + FUSE_REQUEST_ERRORS => MetricConfig { unit: Unit::Count, stability: MetricStability::Stable, otlp_attributes: &[ATTR_FUSE_REQUEST], }, FUSE_IDLE_THREADS => MetricConfig { unit: Unit::Count, - stability: MetricStability::Stable, + stability: MetricStability::Experimental, + otlp_attributes: &[], + }, + FUSE_TOTAL_THREADS => MetricConfig { + unit: Unit::Count, + stability: MetricStability::Experimental, otlp_attributes: &[], }, S3_REQUEST_COUNT => MetricConfig { @@ -56,7 +64,7 @@ pub fn lookup_config(name: &str) -> MetricConfig { stability: MetricStability::Stable, otlp_attributes: &[ATTR_S3_REQUEST], }, - S3_REQUEST_FAILURE => MetricConfig { + S3_REQUEST_ERRORS => MetricConfig { unit: Unit::Count, stability: MetricStability::Stable, otlp_attributes: &[ATTR_S3_REQUEST, ATTR_HTTP_STATUS], @@ -76,6 +84,11 @@ pub fn lookup_config(name: &str) -> MetricConfig { stability: MetricStability::Stable, otlp_attributes: &[], }, + PREFETCH_RESET_STATE => MetricConfig { + unit: Unit::Count, + stability: MetricStability::Experimental, + otlp_attributes: &[], + }, // Treat everything else as count metrics _ => MetricConfig { unit: Unit::Count, diff --git a/mountpoint-s3-fs/src/metrics/tracing_span.rs b/mountpoint-s3-fs/src/metrics/tracing_span.rs index 3d52eb84e..1547d66d6 100644 --- a/mountpoint-s3-fs/src/metrics/tracing_span.rs +++ b/mountpoint-s3-fs/src/metrics/tracing_span.rs @@ -1,5 +1,6 @@ use std::time::Instant; +use crate::metrics::defs::{ATTR_FUSE_REQUEST, FUSE_REQUEST_LATENCY}; use metrics::histogram; use tracing::span::Attributes; use tracing::{Id, Level, Subscriber}; @@ -46,7 +47,8 @@ where if Self::should_instrument_request_time(ctx.span(&id)) { let data = ctx.span(&id).unwrap(); let RequestTime(start_time) = *data.extensions().get::().unwrap(); - histogram!("fuse.op_latency_us", "op" => data.name()).record(start_time.elapsed().as_micros() as f64); + histogram!(FUSE_REQUEST_LATENCY, ATTR_FUSE_REQUEST => data.name()) + .record(start_time.elapsed().as_micros() as f64); } } } diff --git a/mountpoint-s3-fs/src/prefetch.rs b/mountpoint-s3-fs/src/prefetch.rs index 72ff664fe..daab021d8 100644 --- a/mountpoint-s3-fs/src/prefetch.rs +++ b/mountpoint-s3-fs/src/prefetch.rs @@ -42,6 +42,7 @@ use tracing::trace; use crate::checksums::{ChecksummedBytes, IntegrityError}; use crate::data_cache::DataCache; use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT}; +use crate::metrics::defs::PREFETCH_RESET_STATE; use crate::object::ObjectId; mod backpressure_controller; @@ -306,7 +307,7 @@ where actual = offset, "out-of-order read, resetting prefetch" ); - counter!("prefetch.out_of_order").increment(1); + counter!(PREFETCH_RESET_STATE).increment(1); // This is an approximation, tolerating some seeking caused by concurrent readahead. self.record_contiguous_read_metric(); diff --git a/mountpoint-s3-fs/tests/common/metrics.rs b/mountpoint-s3-fs/tests/common/metrics.rs new file mode 100644 index 000000000..76b83cea6 --- /dev/null +++ b/mountpoint-s3-fs/tests/common/metrics.rs @@ -0,0 +1,170 @@ +//! Test utilities for metrics validation + +use metrics::{ + Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, Recorder, SharedString, Unit, +}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +#[derive(Debug, Default, Clone)] +pub struct TestRecorder { + metrics: Arc>>>, +} + +impl TestRecorder { + pub fn clear(&self) { + let mut metrics = self.metrics.lock().unwrap(); + metrics.clear(); + } + + pub fn get(&self, name: &str, labels: &[(&str, &str)]) -> Option> { + let metrics = self.metrics.lock().unwrap(); + metrics + .iter() + .find(|(key, _)| { + if key.name() != name { + return false; + } + + let actual_labels: Vec<_> = key.labels().map(|l| (l.key(), l.value())).collect(); + + // Must have exact same number of labels + if actual_labels.len() != labels.len() { + return false; + } + + // Every expected label must be in the actual labels + labels.iter().all(|(k, v)| actual_labels.contains(&(*k, *v))) + }) + .map(|(_, metric)| Arc::clone(metric)) + } +} + +#[derive(Debug)] +pub enum Metric { + Histogram(Mutex>), + Counter(Mutex), + Gauge(Mutex), +} + +impl Metric { + pub fn gauge(&self) -> f64 { + match self { + Metric::Gauge(g) => *g.lock().unwrap(), + _ => panic!("expected gauge"), + } + } + + pub fn counter(&self) -> u64 { + match self { + Metric::Counter(c) => *c.lock().unwrap(), + _ => panic!("expected counter"), + } + } + + pub fn histogram(&self) -> Vec { + match self { + Metric::Histogram(h) => h.lock().unwrap().clone(), + _ => panic!("expected histogram"), + } + } +} + +impl Recorder for TestRecorder { + fn describe_counter(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + fn describe_gauge(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + fn describe_histogram(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Counter(Default::default()))); + Counter::from_arc(metric.clone()) + } + + fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Gauge(Default::default()))); + Gauge::from_arc(metric.clone()) + } + + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Histogram(Default::default()))); + Histogram::from_arc(metric.clone()) + } +} + +// Implement the metric traits +impl CounterFn for Metric { + fn increment(&self, value: u64) { + let Metric::Counter(counter) = self else { + panic!("expected counter"); + }; + *counter.lock().unwrap() += value; + } + + fn absolute(&self, value: u64) { + let Metric::Counter(counter) = self else { + panic!("expected counter"); + }; + *counter.lock().unwrap() = value; + } +} + +impl GaugeFn for Metric { + fn increment(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + *gauge.lock().unwrap() += value; + } + + fn decrement(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + *gauge.lock().unwrap() -= value; + } + + fn set(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + + *gauge.lock().unwrap() = value; + } +} + +impl HistogramFn for Metric { + fn record(&self, value: f64) { + let Metric::Histogram(histogram) = self else { + panic!("expected histogram"); + }; + histogram.lock().unwrap().push(value); + } +} + +/// Helper functions for test assertions +pub fn get_metric(recorder: &TestRecorder, name: &str, labels: &[(&str, &str)]) -> Arc { + recorder + .get(name, labels) + .unwrap_or_else(|| panic!("Expected metric '{name}' with labels {labels:?} to exist")) +} + +pub fn get_histogram(recorder: &TestRecorder, name: &str, labels: &[(&str, &str)]) -> Vec { + let metric = get_metric(recorder, name, labels); + match metric.as_ref() { + Metric::Histogram(_) => metric.histogram(), + _ => panic!("Metric '{name}' is not a histogram"), + } +} + +pub fn assert_metric_exists(recorder: &TestRecorder, name: &str, labels: &[(&str, &str)]) { + let _ = get_metric(recorder, name, labels); +} \ No newline at end of file diff --git a/mountpoint-s3-fs/tests/common/mod.rs b/mountpoint-s3-fs/tests/common/mod.rs index 096e84f03..a02a92bf7 100644 --- a/mountpoint-s3-fs/tests/common/mod.rs +++ b/mountpoint-s3-fs/tests/common/mod.rs @@ -15,6 +15,8 @@ pub mod s3; #[cfg(all(test, feature = "manifest"))] pub mod manifest; +pub mod test_recorder; + use aws_credential_types::Credentials; use fuser::{FileAttr, FileType}; use futures::executor::ThreadPool; @@ -25,12 +27,16 @@ use mountpoint_s3_client::config::{ use mountpoint_s3_client::mock_client::MockClient; use mountpoint_s3_fs::fs::{DirectoryEntry, DirectoryReplier}; use mountpoint_s3_fs::memory::PagedPool; +use mountpoint_s3_fs::metrics::metrics_tracing_span_layer; use mountpoint_s3_fs::prefetch::Prefetcher; use mountpoint_s3_fs::s3::{Bucket, Prefix, S3Path}; use mountpoint_s3_fs::{Runtime, S3Filesystem, S3FilesystemConfig, Superblock, SuperblockConfig}; use std::collections::VecDeque; use std::future::Future; use std::sync::Arc; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; pub fn make_test_filesystem( bucket: &str, @@ -136,7 +142,14 @@ pub fn get_crt_client_auth_config(credentials: Credentials) -> S3ClientAuthConfi #[ctor::ctor] fn init_tracing_subscriber() { let _ = RustLogAdapter::try_init(); - let _ = tracing_subscriber::fmt::try_init(); + + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("warn")); + + let _ = tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer()) + .with(metrics_tracing_span_layer()) + .try_init(); } #[ctor::ctor] diff --git a/mountpoint-s3-fs/tests/common/test_recorder.rs b/mountpoint-s3-fs/tests/common/test_recorder.rs new file mode 100644 index 000000000..d76ef226c --- /dev/null +++ b/mountpoint-s3-fs/tests/common/test_recorder.rs @@ -0,0 +1,171 @@ +//! Test utilities for metrics validation + +use metrics::{ + Counter, CounterFn, Gauge, GaugeFn, Histogram, HistogramFn, Key, KeyName, Metadata, Recorder, SharedString, Unit, +}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +#[derive(Debug, Default, Clone)] +pub struct TestRecorder { + metrics: Arc>>>, +} + +impl TestRecorder { + pub fn clear(&self) { + let mut metrics = self.metrics.lock().unwrap(); + metrics.clear(); + } + + pub fn print_metrics(&self) { + let metrics = self.metrics.lock().unwrap(); + for (key, metric) in metrics.iter() { + match metric.as_ref() { + Metric::Histogram(h) => { + let h = h.lock().unwrap(); + println!("{key}: {h:?}"); + } + Metric::Counter(c) => { + let c = c.lock().unwrap(); + println!("{key}: {c}"); + } + Metric::Gauge(g) => { + let g = g.lock().unwrap(); + println!("{key}: {g}"); + } + } + } + } + + pub fn get(&self, name: &str, labels: &[(&str, &str)]) -> Option> { + let metrics = self.metrics.lock().unwrap(); + metrics + .iter() + .find(|(key, _)| { + if key.name() != name { + return false; + } + + let actual_labels: Vec<_> = key.labels().map(|l| (l.key(), l.value())).collect(); + + // Must have exact same number of labels + if actual_labels.len() != labels.len() { + return false; + } + + // Every expected label must be in the actual labels + labels.iter().all(|(k, v)| actual_labels.contains(&(*k, *v))) + }) + .map(|(_, metric)| Arc::clone(metric)) + } +} + +#[derive(Debug)] +pub enum Metric { + Histogram(Mutex>), + Counter(Mutex), + Gauge(Mutex), +} + +impl Metric { + pub fn gauge(&self) -> f64 { + match self { + Metric::Gauge(g) => *g.lock().unwrap(), + _ => panic!("expected gauge"), + } + } + + pub fn counter(&self) -> u64 { + match self { + Metric::Counter(c) => *c.lock().unwrap(), + _ => panic!("expected counter"), + } + } + + pub fn histogram(&self) -> Vec { + match self { + Metric::Histogram(h) => h.lock().unwrap().clone(), + _ => panic!("expected histogram"), + } + } +} + +impl Recorder for TestRecorder { + fn describe_counter(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + fn describe_gauge(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + fn describe_histogram(&self, _key: KeyName, _unit: Option, _description: SharedString) {} + + fn register_counter(&self, key: &Key, _metadata: &Metadata<'_>) -> Counter { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Counter(Default::default()))); + Counter::from_arc(metric.clone()) + } + + fn register_gauge(&self, key: &Key, _metadata: &Metadata<'_>) -> Gauge { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Gauge(Default::default()))); + Gauge::from_arc(metric.clone()) + } + + fn register_histogram(&self, key: &Key, _metadata: &Metadata<'_>) -> Histogram { + let mut metrics = self.metrics.lock().unwrap(); + let metric = metrics + .entry(key.clone()) + .or_insert(Arc::new(Metric::Histogram(Default::default()))); + Histogram::from_arc(metric.clone()) + } +} + +// Implement the metric traits +impl CounterFn for Metric { + fn increment(&self, value: u64) { + let Metric::Counter(counter) = self else { + panic!("expected counter"); + }; + *counter.lock().unwrap() += value; + } + + fn absolute(&self, value: u64) { + let Metric::Counter(counter) = self else { + panic!("expected counter"); + }; + *counter.lock().unwrap() = value; + } +} + +impl GaugeFn for Metric { + fn increment(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + *gauge.lock().unwrap() += value; + } + + fn decrement(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + *gauge.lock().unwrap() -= value; + } + + fn set(&self, value: f64) { + let Metric::Gauge(gauge) = self else { + panic!("expected gauge"); + }; + + *gauge.lock().unwrap() = value; + } +} + +impl HistogramFn for Metric { + fn record(&self, value: f64) { + let Metric::Histogram(histogram) = self else { + panic!("expected histogram"); + }; + histogram.lock().unwrap().push(value); + } +} diff --git a/mountpoint-s3-fs/tests/fuse_tests/metrics_test.rs b/mountpoint-s3-fs/tests/fuse_tests/metrics_test.rs new file mode 100644 index 000000000..02229c438 --- /dev/null +++ b/mountpoint-s3-fs/tests/fuse_tests/metrics_test.rs @@ -0,0 +1,268 @@ +use crate::common::fuse::{TestSessionConfig, mock_session, read_dir_to_entry_names}; +use crate::common::test_recorder::{Metric, TestRecorder}; +use mountpoint_s3_fs::S3FilesystemConfig; +use mountpoint_s3_fs::metrics::defs::{ + ATTR_FUSE_REQUEST, FUSE_IO_SIZE, FUSE_REQUEST_ERRORS, FUSE_REQUEST_LATENCY, PREFETCH_RESET_STATE, +}; +use once_cell::sync::OnceCell; +use std::fs::{File, read_dir}; +use std::io::{Read, Seek, Write}; +use std::sync::Arc; +use std::sync::Once; +use std::time::Duration; + +static RECORDER: OnceCell = OnceCell::new(); + +#[cfg(feature = "s3_tests")] +#[cfg_attr(test, serial_test::serial)] +mod tests { + use super::*; + + struct TestContext { + recorder: TestRecorder, + } + + static INIT: Once = Once::new(); + + impl TestContext { + fn new() -> Self { + INIT.call_once(|| { + let recorder = TestRecorder::default(); + metrics::set_global_recorder(recorder.clone()).expect("Failed to set global recorder"); + + RECORDER.set(recorder).unwrap(); + }); + + let recorder = RECORDER.get().unwrap().clone(); + TestContext { recorder } + } + + fn setup(&self) { + self.recorder.clear(); + } + } + + fn setup() -> TestContext { + let context = TestContext::new(); + context.setup(); + context + } + + fn get_metric(recorder: &TestRecorder, name: &str, labels: &[(&str, &str)]) -> Arc { + recorder + .get(name, labels) + .unwrap_or_else(|| panic!("Expected metric '{name}' with labels {labels:?} to exist")) + } + + fn get_histogram(recorder: &TestRecorder, name: &str, labels: &[(&str, &str)]) -> Vec { + let metric = get_metric(recorder, name, labels); + match metric.as_ref() { + Metric::Histogram(_) => metric.histogram(), + _ => panic!("Metric '{name}' is not a histogram"), + } + } + + fn assert_metric_exists(recorder: &TestRecorder, name: &str, labels: &[(&str, &str)]) { + let _ = get_metric(recorder, name, labels); + } + + #[test] + fn test_fuse_write_metrics() { + let context = setup(); + + let filesystem_config = S3FilesystemConfig { + allow_overwrite: true, + ..Default::default() + }; + let config = TestSessionConfig { + filesystem_config, + ..Default::default() + }; + + let test_session = mock_session::new("write_metrics", config); + let path = test_session.mount_path().join("test.txt"); + + // Test write + let content = vec![b'a'; 1024]; + let mut file = File::create(&path).unwrap(); + file.write_all(&content).unwrap(); + file.sync_all().unwrap(); + + // Write to multiple file handles and check all samples are recorded + let path2 = test_session.mount_path().join("test2.txt"); + let mut file2 = File::create(&path2).unwrap(); + file2.write_all(&content).unwrap(); + file2.sync_all().unwrap(); + + let path3 = test_session.mount_path().join("test3.txt"); + let mut file3 = File::create(&path3).unwrap(); + file3.write_all(&content).unwrap(); + file3.sync_all().unwrap(); + + let write_handle = get_metric(&context.recorder, "fs.current_handles", &[("type", "write")]); + assert!(write_handle.gauge() >= 3.0, "should have at least 3 write handles"); + + for f in [file, file2, file3] { + drop(f); + } + + std::thread::sleep(Duration::from_millis(100)); + + let write_io_size = get_histogram(&context.recorder, FUSE_IO_SIZE, &[(ATTR_FUSE_REQUEST, "write")]); + assert_eq!(write_io_size.len(), 3, "should have 3 write operations"); + + let write_latency = get_histogram(&context.recorder, FUSE_REQUEST_LATENCY, &[(ATTR_FUSE_REQUEST, "write")]); + assert_eq!(write_latency.len(), 3, "should have 3 write operations"); + + // List files + let read_dir_iter = read_dir(test_session.mount_path()).unwrap(); + let dir_entries = read_dir_to_entry_names(read_dir_iter); + assert_eq!(dir_entries, vec!["test.txt", "test2.txt", "test3.txt"]); + + let read_dir_latency = get_histogram( + &context.recorder, + FUSE_REQUEST_LATENCY, + &[(ATTR_FUSE_REQUEST, "readdirplus")], + ); + assert!(!read_dir_latency.is_empty(), "should have 1 readdirplus operation"); + assert!( + context + .recorder + .get(FUSE_IO_SIZE, &[(ATTR_FUSE_REQUEST, "readdirplus")]) + .is_none(), + "io size should not be recorded for non read/write operations" + ); + + assert_metric_exists(&context.recorder, "fuse.total_bytes", &[("type", "write")]); + + verify_common_metrics(context); + } + + #[test] + fn test_fuse_read_metrics() { + let context = setup(); + + let filesystem_config = S3FilesystemConfig { + allow_overwrite: true, + ..Default::default() + }; + let config = TestSessionConfig { + filesystem_config, + ..Default::default() + }; + + let test_session = mock_session::new("read_metrics", config); + let path = test_session.mount_path().join("test.txt"); + + // Create file first + let content = vec![b'a'; 1024]; + let mut file = File::create(&path).unwrap(); + file.write_all(&content).unwrap(); + file.sync_all().unwrap(); + drop(file); + + let mut read_buf = vec![0; 1024]; + let mut read_file = File::open(&path).unwrap(); + let bytes_read = read_file.read(&mut read_buf).unwrap(); + read_file.sync_all().unwrap(); + assert_eq!(bytes_read, 1024); + assert_eq!(read_buf, content); + + let read_handle = get_metric(&context.recorder, "fs.current_handles", &[("type", "read")]); + assert!(read_handle.gauge() >= 1.0, "should have at least 1 read handle"); + + drop(read_file); + + std::thread::sleep(Duration::from_millis(100)); + + let read_io_size = get_histogram(&context.recorder, FUSE_IO_SIZE, &[(ATTR_FUSE_REQUEST, "read")]); + assert!(read_io_size.contains(&1024.0)); + assert!(!read_io_size.is_empty(), "should have at least 1 io_size metric"); + + let read_latency = get_histogram(&context.recorder, FUSE_REQUEST_LATENCY, &[(ATTR_FUSE_REQUEST, "read")]); + assert!(!read_latency.is_empty(), "should have at least 1 read latency metric"); + + assert_metric_exists(&context.recorder, "fuse.total_bytes", &[("type", "read")]); + verify_common_metrics(context); + } + + #[test] + fn test_fuse_request_metric_errors() { + let context = setup(); + + let test_session = mock_session::new("error_metrics", TestSessionConfig::default()); + + // Try to read a non-existent file + let non_existent = test_session.mount_path().join("does_not_exist.txt"); + assert!(File::open(&non_existent).is_err()); + + let failure = context + .recorder + .get(FUSE_REQUEST_ERRORS, &[(ATTR_FUSE_REQUEST, "lookup")]) + .unwrap_or_else(|| panic!("failure metric for lookup should exist")); + + std::thread::sleep(Duration::from_millis(100)); + assert!( + failure.counter() >= 1, + "should have at least one failed lookup operation" + ); + } + + fn verify_common_metrics(context: TestContext) { + // Mostly sanity checks for other requests we expect + for request in ["open", "getattr", "fsync"] { + assert_metric_exists(&context.recorder, FUSE_REQUEST_LATENCY, &[(ATTR_FUSE_REQUEST, request)]); + } + + assert_metric_exists(&context.recorder, "fs.inodes", &[]); + assert_metric_exists(&context.recorder, "fs.inode_kinds", &[("kind", "file")]); + assert_metric_exists(&context.recorder, "fs.inode_kinds", &[("kind", "directory")]); + assert_metric_exists(&context.recorder, "fuse.op_unimplemented", &[("op", "getxattr")]); + } + + #[test] + fn test_random_access_triggers_out_of_order() { + let context = setup(); + + let test_session = mock_session::new("random_test", TestSessionConfig::default()); + let path = test_session.mount_path().join("large_file.txt"); + + // Create a large file for random reading + let content = vec![b'y'; 64 * 1024 * 1024]; + let mut file = File::create(&path).unwrap(); + file.write_all(&content).unwrap(); + file.sync_all().unwrap(); + drop(file); + + let mut file = File::open(&path).unwrap(); + let mut buffer = vec![0u8; 1024]; + + for _ in 0..10 { + file.read_exact(&mut buffer).unwrap(); + } + + let initial_out_of_order = context + .recorder + .get(PREFETCH_RESET_STATE, &[]) + .map_or(0, |m| m.counter()); + + // Jump around the file randomly to trigger out-of-order reads + for offset in [2 * 1024 * 1024, 8 * 1024 * 1024, 0, 32 * 1024 * 1024] { + file.seek(std::io::SeekFrom::Start(offset)).unwrap(); + file.read_exact(&mut buffer).unwrap(); + } + + drop(file); + + std::thread::sleep(Duration::from_millis(100)); + let final_out_of_order = context + .recorder + .get(PREFETCH_RESET_STATE, &[]) + .map_or(0, |m| m.counter()); + + assert!( + final_out_of_order > initial_out_of_order, + "Random access should increment out_of_order metric" + ); + } +} diff --git a/mountpoint-s3-fs/tests/fuse_tests/mod.rs b/mountpoint-s3-fs/tests/fuse_tests/mod.rs index 664e46aa9..d405cfc30 100644 --- a/mountpoint-s3-fs/tests/fuse_tests/mod.rs +++ b/mountpoint-s3-fs/tests/fuse_tests/mod.rs @@ -8,6 +8,7 @@ mod iam_perm_test; mod lookup_test; #[cfg(feature = "manifest")] mod manifest_test; +mod metrics_test; mod mkdir_test; mod prefetch_test; mod read_test;