Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/metrics.rs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already said that logged metrics aren't stable in docs, but I think it would be worth adding a note to the changelog to cover how we expect the metrics to change. (Any log line format change, metric name changes, etc..)

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// S3 metric name constants
pub const S3_REQUEST_COUNT: &str = "s3.request_count";
pub const S3_REQUEST_FAILURE: &str = "s3.request_failure";
pub const S3_REQUEST_ERRORS: &str = "s3.request_errors";
pub const S3_REQUEST_CANCELED: &str = "s3.request_canceled";
pub const S3_REQUEST_TOTAL_LATENCY: &str = "s3.request_total_latency";
pub const S3_REQUEST_FIRST_BYTE_LATENCY: &str = "s3.request_first_byte_latency";
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::endpoint_config::EndpointError;
use crate::endpoint_config::{self, EndpointConfig};
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use crate::metrics::{
ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_FAILURE,
ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_CANCELED, S3_REQUEST_COUNT, S3_REQUEST_ERRORS,
S3_REQUEST_FIRST_BYTE_LATENCY, S3_REQUEST_TOTAL_LATENCY,
};
use crate::object_client::*;
Expand Down Expand Up @@ -611,7 +611,7 @@ impl S3CrtClientInner {
metrics::histogram!(S3_REQUEST_TOTAL_LATENCY, ATTR_S3_REQUEST => request_type).record(duration.as_micros() as f64);
metrics::counter!(S3_REQUEST_COUNT, ATTR_S3_REQUEST => request_type).increment(1);
if request_failure {
metrics::counter!(S3_REQUEST_FAILURE, ATTR_S3_REQUEST => request_type, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
metrics::counter!(S3_REQUEST_ERRORS, ATTR_S3_REQUEST => request_type, ATTR_HTTP_STATUS => http_status.unwrap_or(-1).to_string()).increment(1);
} else if request_canceled {
metrics::counter!(S3_REQUEST_CANCELED, ATTR_S3_REQUEST => request_type).increment(1);
}
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3-client/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use metrics::{
};
use mountpoint_s3_client::error::ObjectClientError;
use mountpoint_s3_client::metrics::{
ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_COUNT, S3_REQUEST_FAILURE, S3_REQUEST_FIRST_BYTE_LATENCY,
ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_COUNT, S3_REQUEST_ERRORS, S3_REQUEST_FIRST_BYTE_LATENCY,
S3_REQUEST_TOTAL_LATENCY,
};
use mountpoint_s3_client::types::{GetObjectParams, HeadObjectParams};
Expand Down Expand Up @@ -272,7 +272,7 @@ async fn test_get_object_metrics_403() {

// Verify S3 request failure metrics
let (key, request_failures) = metrics
.get(S3_REQUEST_FAILURE, Some(ATTR_S3_REQUEST), Some("GetObject"))
.get(S3_REQUEST_ERRORS, Some(ATTR_S3_REQUEST), Some("GetObject"))
.expect("request failures metric should exist");

// Verify HTTP status code attribute exists and is correct
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ test-case = "3.3.1"
tokio = { version = "1.47.1", features = ["rt", "macros"] }
walkdir = "2.5.0"
wiremock = "0.6.5"
once_cell = "1.21.3"

[features]
# Unreleased and/or experimental features: not enabled in the release binary and may be dropped in future
Expand Down
9 changes: 5 additions & 4 deletions mountpoint-s3-fs/src/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use time::OffsetDateTime;
use tracing::{Instrument, field, instrument};

use crate::fs::{DirectoryEntry, DirectoryReplier, InodeNo, S3Filesystem, ToErrno};
use crate::metrics::defs::{ATTR_FUSE_REQUEST, FUSE_IO_SIZE, FUSE_REQUEST_ERRORS};
#[cfg(target_os = "macos")]
use fuser::ReplyXTimes;
use fuser::{
Expand Down Expand Up @@ -48,7 +49,7 @@ macro_rules! fuse_error {
($name:literal, $reply:expr, $err:expr, $fs:expr, $request:expr) => {{
let err = $err;
event!(err.level, "{} failed with errno {}: {:#}", $name, err.to_errno(), err);
::metrics::counter!("fuse.op_failures", "op" => $name).increment(1);
::metrics::counter!(FUSE_REQUEST_ERRORS, ATTR_FUSE_REQUEST => $name).increment(1);
if let Some(error_logger) = $fs.error_logger.as_ref() {
error_logger.error(&err, $name, $request.unique());
}
Expand All @@ -60,7 +61,7 @@ macro_rules! fuse_error {
macro_rules! fuse_unsupported {
($name:literal, $reply:expr, $err:expr, $level:expr) => {{
event!($level, "{} failed: operation not supported by Mountpoint", $name);
::metrics::counter!("fuse.op_failures", "op" => $name).increment(1);
::metrics::counter!(FUSE_REQUEST_ERRORS, ATTR_FUSE_REQUEST => $name).increment(1);
::metrics::counter!("fuse.op_unimplemented","op" => $name).increment(1);
$reply.error($err);
}};
Expand Down Expand Up @@ -152,7 +153,7 @@ where
}

metrics::counter!("fuse.total_bytes", "type" => "read").increment(bytes_sent as u64);
metrics::histogram!("fuse.io_size", "type" => "read").record(bytes_sent as f64);
metrics::histogram!(FUSE_IO_SIZE, ATTR_FUSE_REQUEST => "read").record(bytes_sent as f64);
}

#[instrument(level="warn", skip_all, fields(req=req.unique(), ino=parent, name=field::Empty))]
Expand Down Expand Up @@ -334,7 +335,7 @@ where
Ok(bytes_written) => {
reply.written(bytes_written);
metrics::counter!("fuse.total_bytes", "type" => "write").increment(bytes_written as u64);
metrics::histogram!("fuse.io_size", "type" => "write").record(bytes_written as f64);
metrics::histogram!(FUSE_IO_SIZE, ATTR_FUSE_REQUEST => "write").record(bytes_written as f64);
}
Err(e) => fuse_error!("write", reply, e, self, req),
}
Expand Down
16 changes: 6 additions & 10 deletions mountpoint-s3-fs/src/fuse/session.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::io;

use anyhow::Context;
use const_format::formatcp;
#[cfg(target_os = "linux")]
use fuser::MountOption;
use fuser::{Filesystem, Session, SessionUnmounter};
use tracing::{debug, error, trace, warn};

use super::config::{FuseSessionConfig, MountPoint};
use crate::metrics::defs::{FUSE_IDLE_THREADS, FUSE_TOTAL_THREADS};
use crate::sync::Arc;
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::mpsc::{self, Sender};
Expand Down Expand Up @@ -194,10 +194,6 @@ struct WorkerPool<W: Work> {
max_workers: usize,
}

const METRIC_NAME_PREFIX_WORKERS: &str = "fuse.mp_workers";
const METRIC_NAME_FUSE_WORKERS_TOTAL: &str = formatcp!("{METRIC_NAME_PREFIX_WORKERS}.total_count");
const METRIC_NAME_FUSE_WORKERS_IDLE: &str = formatcp!("{METRIC_NAME_PREFIX_WORKERS}.idle_count");

#[derive(Debug)]
struct WorkerPoolState<W: Work> {
work: W,
Expand Down Expand Up @@ -248,8 +244,8 @@ impl<W: Work> WorkerPool<W> {

let new_count = old_count + 1;
let idle_worker_count = self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst) + 1;
metrics::gauge!(METRIC_NAME_FUSE_WORKERS_TOTAL).set(new_count as f64);
metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).set(idle_worker_count as f64);
metrics::gauge!(FUSE_TOTAL_THREADS).set(new_count as f64);
metrics::histogram!(FUSE_IDLE_THREADS).record(idle_worker_count as f64);

let worker_index = old_count;
let clone = (*self).clone();
Expand All @@ -267,7 +263,7 @@ impl<W: Work> WorkerPool<W> {
self.state.work.run(
|| {
let previous_idle_count = self.state.idle_worker_count.fetch_sub(1, Ordering::SeqCst);
metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).decrement(1);
metrics::histogram!(FUSE_IDLE_THREADS).record((previous_idle_count - 1) as f64);
if previous_idle_count == 1 {
// This was the only idle thread, try to spawn a new one.
if let Err(error) = self.try_add_worker() {
Expand All @@ -276,8 +272,8 @@ impl<W: Work> WorkerPool<W> {
}
},
|| {
self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst);
metrics::gauge!(METRIC_NAME_FUSE_WORKERS_IDLE).increment(1);
let idle_worker_count = self.state.idle_worker_count.fetch_add(1, Ordering::SeqCst);
metrics::histogram!(FUSE_IDLE_THREADS).record((idle_worker_count + 1) as f64);
},
)
}
Expand Down
12 changes: 7 additions & 5 deletions mountpoint-s3-fs/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::thread::{self, JoinHandle};
use std::time::Duration;

use dashmap::DashMap;
use defs::PROCESS_MEMORY_USAGE;
use metrics::{Key, Metadata, Recorder};
use sysinfo::{MemoryRefreshKind, ProcessRefreshKind, ProcessesToUpdate, System, get_current_pid};

Expand Down Expand Up @@ -98,7 +99,7 @@ fn poll_process_metrics(sys: &mut System) {
if let Some(process) = sys.process(pid) {
// update the metrics only when there is some change, otherwise it will be too spammy.
if last_mem != process.memory() {
metrics::gauge!("process.memory_usage").set(process.memory() as f64);
metrics::gauge!(PROCESS_MEMORY_USAGE).set(process.memory() as f64);
metrics::gauge!("system.available_memory").set(sys.available_memory() as f64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not also for system memory now?

}
}
Expand Down Expand Up @@ -412,13 +413,14 @@ mod tests {
mod test_otlp_metrics {
use super::*;
use crate::metrics::data::Metric;
use crate::metrics::defs::{ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_FAILURE};
use crate::metrics::defs::{ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_ERRORS};
use crate::metrics_otel::{OtlpConfig, OtlpMetricsExporter};
use metrics::{Key, Unit};
use opentelemetry::metrics::MeterProvider as _;
use opentelemetry_sdk::metrics::data::{AggregatedMetrics, MetricData, ResourceMetrics};
use opentelemetry_sdk::metrics::in_memory_exporter::InMemoryMetricExporter;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use std::sync::Arc;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: No newline

struct TestContext {
exporter: InMemoryMetricExporter,
provider: SdkMeterProvider,
Expand Down Expand Up @@ -522,15 +524,15 @@ mod test_otlp_metrics {
let ctx = TestContext::new();

let key = Key::from_parts(
"s3.request_failure",
S3_REQUEST_ERRORS,
vec![
metrics::Label::new(ATTR_S3_REQUEST, "GetObject"),
metrics::Label::new(ATTR_HTTP_STATUS, "403"),
metrics::Label::new("some-attribute", "some-value"),
],
);

let config = defs::lookup_config(S3_REQUEST_FAILURE);
let config = defs::lookup_config(S3_REQUEST_ERRORS);
let counter = Metric::counter_otlp(&ctx.otlp_exporter, &key, &config);
counter.as_counter().increment(1);

Expand All @@ -542,7 +544,7 @@ mod test_otlp_metrics {
let scope_metrics: Vec<_> = resource_metrics.scope_metrics().collect();
let metric = scope_metrics[0]
.metrics()
.find(|m| m.name() == "s3.request_failure")
.find(|m| m.name() == S3_REQUEST_ERRORS)
.unwrap();

match metric.data() {
Expand Down
23 changes: 18 additions & 5 deletions mountpoint-s3-fs/src/metrics/defs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use metrics::Unit;
pub use mountpoint_s3_client::metrics::{
ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_COUNT, S3_REQUEST_FAILURE, S3_REQUEST_FIRST_BYTE_LATENCY,
ATTR_HTTP_STATUS, ATTR_S3_REQUEST, S3_REQUEST_COUNT, S3_REQUEST_ERRORS, S3_REQUEST_FIRST_BYTE_LATENCY,
S3_REQUEST_TOTAL_LATENCY,
};

Expand All @@ -21,11 +21,14 @@ pub struct MetricConfig {
// Metric name constants
pub const FUSE_REQUEST_LATENCY: &str = "fuse.request_latency";
pub const FUSE_IO_SIZE: &str = "fuse.io_size";
pub const FUSE_REQUEST_FAILURE: &str = "fuse.request_failure";
pub const FUSE_REQUEST_ERRORS: &str = "fuse.request_errors";
pub const FUSE_IDLE_THREADS: &str = "fuse.idle_threads";
pub const FUSE_TOTAL_THREADS: &str = "fuse.total_threads";

pub const PROCESS_MEMORY_USAGE: &str = "process.memory_usage";

pub const PREFETCH_RESET_STATE: &str = "prefetch.reset_state";

// Attribute constants
pub const ATTR_FUSE_REQUEST: &str = "fuse_request";

Expand All @@ -41,22 +44,27 @@ pub fn lookup_config(name: &str) -> MetricConfig {
stability: MetricStability::Stable,
otlp_attributes: &[ATTR_FUSE_REQUEST],
},
FUSE_REQUEST_FAILURE => MetricConfig {
FUSE_REQUEST_ERRORS => MetricConfig {
unit: Unit::Count,
stability: MetricStability::Stable,
otlp_attributes: &[ATTR_FUSE_REQUEST],
},
FUSE_IDLE_THREADS => MetricConfig {
unit: Unit::Count,
stability: MetricStability::Stable,
stability: MetricStability::Experimental,
otlp_attributes: &[],
},
FUSE_TOTAL_THREADS => MetricConfig {
unit: Unit::Count,
stability: MetricStability::Experimental,
otlp_attributes: &[],
},
S3_REQUEST_COUNT => MetricConfig {
unit: Unit::Count,
stability: MetricStability::Stable,
otlp_attributes: &[ATTR_S3_REQUEST],
},
S3_REQUEST_FAILURE => MetricConfig {
S3_REQUEST_ERRORS => MetricConfig {
unit: Unit::Count,
stability: MetricStability::Stable,
otlp_attributes: &[ATTR_S3_REQUEST, ATTR_HTTP_STATUS],
Expand All @@ -76,6 +84,11 @@ pub fn lookup_config(name: &str) -> MetricConfig {
stability: MetricStability::Stable,
otlp_attributes: &[],
},
PREFETCH_RESET_STATE => MetricConfig {
unit: Unit::Count,
stability: MetricStability::Experimental,
otlp_attributes: &[],
},
// Treat everything else as count metrics
_ => MetricConfig {
unit: Unit::Count,
Expand Down
4 changes: 3 additions & 1 deletion mountpoint-s3-fs/src/metrics/tracing_span.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Instant;

use crate::metrics::defs::{ATTR_FUSE_REQUEST, FUSE_REQUEST_LATENCY};
use metrics::histogram;
use tracing::span::Attributes;
use tracing::{Id, Level, Subscriber};
Expand Down Expand Up @@ -46,7 +47,8 @@ where
if Self::should_instrument_request_time(ctx.span(&id)) {
let data = ctx.span(&id).unwrap();
let RequestTime(start_time) = *data.extensions().get::<RequestTime>().unwrap();
histogram!("fuse.op_latency_us", "op" => data.name()).record(start_time.elapsed().as_micros() as f64);
histogram!(FUSE_REQUEST_LATENCY, ATTR_FUSE_REQUEST => data.name())
.record(start_time.elapsed().as_micros() as f64);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion mountpoint-s3-fs/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use tracing::trace;
use crate::checksums::{ChecksummedBytes, IntegrityError};
use crate::data_cache::DataCache;
use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
use crate::metrics::defs::PREFETCH_RESET_STATE;
use crate::object::ObjectId;

mod backpressure_controller;
Expand Down Expand Up @@ -306,7 +307,7 @@ where
actual = offset,
"out-of-order read, resetting prefetch"
);
counter!("prefetch.out_of_order").increment(1);
counter!(PREFETCH_RESET_STATE).increment(1);

// This is an approximation, tolerating some seeking caused by concurrent readahead.
self.record_contiguous_read_metric();
Expand Down
Loading
Loading