From 716c43f6ed84c24a0ed04c03f88ff3f3e076bf2b Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 26 Jun 2025 17:12:05 +0200 Subject: [PATCH 1/4] Reorganize object store metrics --- quickwit/quickwit-storage/src/metrics.rs | 84 ++---- .../src/object_storage/azure_blob_storage.rs | 28 +- .../src/object_storage/error.rs | 7 +- .../src/object_storage/metrics_wrappers.rs | 264 ++++++++++++++++++ .../src/object_storage/mod.rs | 2 + .../object_storage/s3_compatible_storage.rs | 50 +--- 6 files changed, 318 insertions(+), 117 deletions(-) create mode 100644 quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 43ef588e192..ff62b373a4a 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -16,7 +16,7 @@ use once_cell::sync::Lazy; use quickwit_common::metrics::{ - GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, + GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec, new_gauge, new_histogram_vec, }; @@ -30,19 +30,13 @@ pub struct StorageMetrics { pub searcher_split_cache: CacheMetrics, pub get_slice_timeout_successes: [IntCounter; 3], pub get_slice_timeout_all_timeouts: IntCounter, - pub object_storage_get_total: IntCounter, - pub object_storage_get_errors_total: IntCounterVec<1>, + pub object_storage_requests_total: IntCounterVec<2>, + pub object_storage_request_duration: HistogramVec<2>, pub object_storage_get_slice_in_flight_count: IntGauge, pub object_storage_get_slice_in_flight_num_bytes: IntGauge, - pub object_storage_put_total: IntCounter, - pub object_storage_put_parts: IntCounter, pub object_storage_download_num_bytes: IntCounter, + pub object_storage_download_errors: IntCounterVec<1>, pub object_storage_upload_num_bytes: IntCounter, - - pub object_storage_delete_requests_total: IntCounter, - pub object_storage_bulk_delete_requests_total: IntCounter, - pub object_storage_delete_request_duration: Histogram, - pub object_storage_bulk_delete_request_duration: Histogram, } impl Default for StorageMetrics { @@ -63,31 +57,6 @@ impl Default for StorageMetrics { let get_slice_timeout_all_timeouts = get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]); - let object_storage_requests_total = new_counter_vec( - "object_storage_requests_total", - "Total number of object storage requests performed.", - "storage", - &[], - ["action"], - ); - let object_storage_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_object"]); - let object_storage_bulk_delete_requests_total = - object_storage_requests_total.with_label_values(["delete_objects"]); - - let object_storage_request_duration = new_histogram_vec( - "object_storage_request_duration_seconds", - "Duration of object storage requests in seconds.", - "storage", - &[], - ["action"], - vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], - ); - let object_storage_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_object"]); - let object_storage_bulk_delete_request_duration = - object_storage_request_duration.with_label_values(["delete_objects"]); - StorageMetrics { fast_field_cache: CacheMetrics::for_component("fastfields"), fd_cache_metrics: CacheMetrics::for_component("fd"), @@ -97,19 +66,23 @@ impl Default for StorageMetrics { split_footer_cache: CacheMetrics::for_component("splitfooter"), get_slice_timeout_successes, get_slice_timeout_all_timeouts, - object_storage_get_total: new_counter( - "object_storage_gets_total", - "Number of objects fetched. Might be lower than get_slice_timeout_outcome if \ - queries are debounced.", + object_storage_requests_total: new_counter_vec( + "object_storage_requests_total", + "Number of requests to the object store, by action and status. Requests are \ + recorded when the response headers are returned, download failures will not \ + appear as errors.", "storage", &[], + ["action", "status"], ), - object_storage_get_errors_total: new_counter_vec::<1>( - "object_storage_get_errors_total", - "Number of GetObject errors.", + object_storage_request_duration: new_histogram_vec( + "object_storage_request_duration", + "Durations until the response headers are returned from the object store, by \ + action and status. This does not measure the download time.", "storage", &[], - ["code"], + ["action", "status"], + vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0], ), object_storage_get_slice_in_flight_count: new_gauge( "object_storage_get_slice_in_flight_count", @@ -124,35 +97,26 @@ impl Default for StorageMetrics { "storage", &[], ), - object_storage_put_total: new_counter( - "object_storage_puts_total", - "Number of objects uploaded. May differ from object_storage_requests_parts due to \ - multipart upload.", - "storage", - &[], - ), - object_storage_put_parts: new_counter( - "object_storage_puts_parts", - "Number of object parts uploaded.", - "", - &[], - ), object_storage_download_num_bytes: new_counter( "object_storage_download_num_bytes", "Amount of data downloaded from an object storage.", "storage", &[], ), + object_storage_download_errors: new_counter_vec( + "object_storage_download_errors", + "Number of download requests that received successfull response headers but \ + failed during download.", + "storage", + &[], + ["status"], + ), object_storage_upload_num_bytes: new_counter( "object_storage_upload_num_bytes", "Amount of data uploaded to an object storage.", "storage", &[], ), - object_storage_delete_requests_total, - object_storage_bulk_delete_requests_total, - object_storage_delete_request_duration, - object_storage_bulk_delete_request_duration, } } } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index b21776fa69f..726c35be37f 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -45,10 +45,11 @@ use tracing::{instrument, warn}; use crate::debouncer::DebouncedStorage; use crate::metrics::object_storage_get_slice_in_flight_guards; +use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, STORAGE_METRICS, Storage, - StorageError, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError, + StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, }; /// Azure object storage resolver. @@ -225,7 +226,6 @@ impl AzureBlobStorage { name: &'a str, payload: Box, ) -> StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_parts.inc(); crate::STORAGE_METRICS .object_storage_upload_num_bytes .inc_by(payload.len()); @@ -237,6 +237,7 @@ impl AzureBlobStorage { .put_block_blob(data) .hash(hash) .into_future() + .with_count_metric("put_block_blob") .await?; Result::<(), AzureErrorWrapper>::Ok(()) }) @@ -261,7 +262,6 @@ impl AzureBlobStorage { .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); crate::STORAGE_METRICS .object_storage_upload_num_bytes .inc_by(range.end - range.start); @@ -276,6 +276,7 @@ impl AzureBlobStorage { .put_block(block_id.clone(), data) .hash(hash) .into_future() + .with_count_metric("put_block") .await?; Result::<_, AzureErrorWrapper>::Ok(block_id) }) @@ -299,6 +300,7 @@ impl AzureBlobStorage { blob_client .put_block_list(block_list) .into_future() + .with_count_metric("put_block_list") .await .map_err(AzureErrorWrapper::from)?; @@ -327,7 +329,6 @@ impl Storage for AzureBlobStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let name = self.blob_name(path); let total_len = payload.len(); let part_num_bytes = self.multipart_policy.part_num_bytes(total_len); @@ -345,7 +346,7 @@ impl Storage for AzureBlobStorage { let name = self.blob_name(path); let mut output_stream = self.container_client.blob_client(name).get().into_stream(); - while let Some(chunk_result) = output_stream.next().await { + while let Some(chunk_result) = output_stream.next().with_count_metric("get_blob").await { let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?; let chunk_response_body_stream = chunk_response .data @@ -353,10 +354,7 @@ impl Storage for AzureBlobStorage { .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } output.flush().await?; Ok(()) @@ -369,6 +367,7 @@ impl Storage for AzureBlobStorage { .blob_client(blob_name) .delete() .into_future() + .with_count_metric("delete_blob") .await .map_err(|err| AzureErrorWrapper::from(err).into()); ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?; @@ -513,7 +512,7 @@ async fn extract_range_data_and_hash( .await? .into_async_read(); let mut buf: Vec = Vec::with_capacity(range.count()); - tokio::io::copy(&mut reader, &mut buf).await?; + tokio::io::copy_buf(&mut reader, &mut buf).await?; let data = Bytes::from(buf); let hash = md5::compute(&data[..]); Ok((data, hash)) @@ -544,7 +543,7 @@ async fn download_all( output: &mut Vec, ) -> Result<(), AzureErrorWrapper> { output.clear(); - while let Some(chunk_result) = chunk_stream.next().await { + while let Some(chunk_result) = chunk_stream.next().with_count_metric("get_blob").await { let chunk_response = chunk_result?; let chunk_response_body_stream = chunk_response .data @@ -552,10 +551,7 @@ async fn download_all( .into_async_read() .compat(); let mut body_stream_reader = BufReader::new(chunk_response_body_stream); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - crate::STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; } // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); diff --git a/quickwit/quickwit-storage/src/object_storage/error.rs b/quickwit/quickwit-storage/src/object_storage/error.rs index 5f60fe1f944..8a7efc13332 100644 --- a/quickwit/quickwit-storage/src/object_storage/error.rs +++ b/quickwit/quickwit-storage/src/object_storage/error.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use aws_sdk_s3::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError}; +use aws_sdk_s3::error::{DisplayErrorContext, SdkError}; use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError; use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError; use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError; @@ -62,11 +62,6 @@ pub trait ToStorageErrorKind { impl ToStorageErrorKind for GetObjectError { fn to_storage_error_kind(&self) -> StorageErrorKind { - let error_code = self.code().unwrap_or("unknown"); - crate::STORAGE_METRICS - .object_storage_get_errors_total - .with_label_values([error_code]) - .inc(); match self { GetObjectError::InvalidObjectState(_) => StorageErrorKind::Service, GetObjectError::NoSuchKey(_) => StorageErrorKind::NotFound, diff --git a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs new file mode 100644 index 00000000000..2347ceee9c7 --- /dev/null +++ b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs @@ -0,0 +1,264 @@ +use std::borrow::Cow; +use std::io; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll, ready}; +use std::time::Instant; + +use pin_project::{pin_project, pinned_drop}; +use tokio::io::{AsyncBufRead, AsyncWrite}; + +use crate::STORAGE_METRICS; + +pub enum Status { + Pending, + Done, + Ready(String), +} + +/// Converts an object store client SDK Result<> to the [Status] that should be +/// recorded in the metrics. +/// +/// The `Marker` type is necessary to avoid conflicting implementations of the +/// trait. +pub trait AsStatus { + fn as_status(&self) -> Status; +} + +/// Wrapper around object store requests to record metrics, including cancellation. +#[pin_project(PinnedDrop)] +pub struct RequestMetricsWrapper +where + F: Future, + F::Output: AsStatus, +{ + #[pin] + tracked: F, + action: &'static str, + start: Option, + uploaded_bytes: Option, + status: Status, + _marker: PhantomData, +} + +#[pinned_drop] +impl PinnedDrop for RequestMetricsWrapper +where + F: Future, + F::Output: AsStatus, +{ + fn drop(self: Pin<&mut Self>) { + let status = match &self.status { + Status::Pending => "cancelled", + Status::Done => return, + Status::Ready(s) => s.as_str(), + }; + let label_values = [self.action, status]; + STORAGE_METRICS + .object_storage_requests_total + .with_label_values(label_values) + .inc(); + if let Some(start) = self.start { + STORAGE_METRICS + .object_storage_request_duration + .with_label_values(label_values) + .observe(start.elapsed().as_secs_f64()); + } + if let Some(bytes) = self.uploaded_bytes { + STORAGE_METRICS + .object_storage_upload_num_bytes + .inc_by(bytes); + } + } +} + +impl Future for RequestMetricsWrapper +where + F: Future, + F::Output: AsStatus, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = response.as_status(); + + Poll::Ready(response) + } +} + +pub trait S3MetricsWrapperExt +where + F: Future, + F::Output: AsStatus, +{ + fn with_count_metric(self, action: &'static str) -> RequestMetricsWrapper; + + fn with_count_and_duration_metrics( + self, + action: &'static str, + ) -> RequestMetricsWrapper; + + fn with_count_and_upload_metrics( + self, + action: &'static str, + bytes: u64, + ) -> RequestMetricsWrapper; +} + +impl S3MetricsWrapperExt for F +where + F: Future, + F::Output: AsStatus, +{ + fn with_count_metric(self, action: &'static str) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: Status::Pending, + start: None, + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_duration_metrics( + self, + action: &'static str, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: Status::Pending, + start: Some(Instant::now()), + uploaded_bytes: None, + _marker: PhantomData, + } + } + + fn with_count_and_upload_metrics( + self, + action: &'static str, + bytes: u64, + ) -> RequestMetricsWrapper { + RequestMetricsWrapper { + tracked: self, + action, + status: Status::Pending, + start: None, + uploaded_bytes: Some(bytes), + _marker: PhantomData, + } + } +} + +pub struct S3Marker; + +impl AsStatus for Result +where E: aws_sdk_s3::error::ProvideErrorMetadata +{ + fn as_status(&self) -> Status { + let status_str = match self { + Ok(_) => "success".to_string(), + Err(e) => e.meta().code().unwrap_or("unknown").to_string(), + }; + Status::Ready(status_str) + } +} + +pub struct AzureMarker; + +impl AsStatus for Result { + fn as_status(&self) -> Status { + let Err(err) = self else { + return Status::Ready("success".to_string()); + }; + let err_status_str = match err.kind() { + azure_storage::ErrorKind::HttpResponse { status, .. } => status.to_string(), + azure_storage::ErrorKind::Credential => "credential".to_string(), + azure_storage::ErrorKind::Io => "io".to_string(), + azure_storage::ErrorKind::DataConversion => "data_conversion".to_string(), + _ => "unknown".to_string(), + }; + Status::Ready(err_status_str) + } +} + +// The Azure SDK get_blob request returns Option because it chunks +// the download into a stream of get requests. +impl AsStatus for Option> { + fn as_status(&self) -> Status { + match self { + None => Status::Done, + Some(res) => res.as_status(), + } + } +} + +/// Track io errors during downloads. +/// +/// Downloads are a bit different from other requests because the request might +/// fail while getting the bytes from the response body, long after getting a +/// successful response header. +#[pin_project(PinnedDrop)] +struct DownloadMetricsWrapper +where F: Future> +{ + #[pin] + tracked: F, + result: Option>, +} + +#[pinned_drop] +impl PinnedDrop for DownloadMetricsWrapper +where F: Future> +{ + fn drop(self: Pin<&mut Self>) { + let status = match &self.result { + None => Cow::Borrowed("cancelled"), + Some(Err(e)) => Cow::Owned(format!("{e:?}")), + Some(Ok(downloaded_bytes)) => { + STORAGE_METRICS + .object_storage_download_num_bytes + .inc_by(*downloaded_bytes); + return; + } + }; + STORAGE_METRICS + .object_storage_download_errors + .with_label_values([status.as_ref()]) + .inc(); + } +} + +impl Future for DownloadMetricsWrapper +where F: Future> +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.result = match &response { + Ok(s) => Some(Ok(*s)), + Err(e) => Some(Err(e.kind())), + }; + Poll::Ready(response) + } +} + +pub async fn copy_with_download_metrics<'a, R, W>( + reader: &'a mut R, + writer: &'a mut W, +) -> io::Result +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + DownloadMetricsWrapper { + tracked: tokio::io::copy_buf(reader, writer), + result: None, + } + .await +} diff --git a/quickwit/quickwit-storage/src/object_storage/mod.rs b/quickwit/quickwit-storage/src/object_storage/mod.rs index e914c107291..cee3bacd338 100644 --- a/quickwit/quickwit-storage/src/object_storage/mod.rs +++ b/quickwit/quickwit-storage/src/object_storage/mod.rs @@ -14,6 +14,8 @@ mod error; +mod metrics_wrappers; + mod s3_compatible_storage; pub use self::s3_compatible_storage::S3CompatibleObjectStorage; pub use self::s3_compatible_storage_resolver::S3CompatibleObjectStorageFactory; diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 6a7105fb8f1..361c8b33c52 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -46,10 +46,11 @@ use tracing::{info, instrument, warn}; use crate::metrics::object_storage_get_slice_in_flight_guards; use crate::object_storage::MultiPartPolicy; +use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics}; use crate::storage::SendableAsync; use crate::{ - BulkDeleteError, DeleteFailure, OwnedBytes, STORAGE_METRICS, Storage, StorageError, - StorageErrorKind, StorageResolverError, StorageResult, + BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind, + StorageResolverError, StorageResult, }; /// Semaphore to limit the number of concurrent requests to the object store. Some object stores @@ -286,11 +287,6 @@ impl S3CompatibleObjectStorage { .await .map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?; - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(len); - self.s3_client .put_object() .bucket(bucket) @@ -298,6 +294,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64) .send() + .with_count_and_upload_metrics("put_object", len) .await .map_err(|sdk_error| { if sdk_error.is_retryable() { @@ -332,6 +329,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .key(key) .send() + .with_count_metric("create_multipart_upload") .await }) .await? @@ -421,11 +419,6 @@ impl S3CompatibleObjectStorage { .map_err(Retry::Permanent)?; let md5 = BASE64_STANDARD.encode(part.md5.0); - crate::STORAGE_METRICS.object_storage_put_parts.inc(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(part.len()); - let upload_part_output = self .s3_client .upload_part() @@ -437,6 +430,7 @@ impl S3CompatibleObjectStorage { .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() + .with_count_and_upload_metrics("upload_part", part.len()) .await .map_err(|s3_err| { if s3_err.is_retryable() { @@ -516,6 +510,7 @@ impl S3CompatibleObjectStorage { .multipart_upload(completed_upload.clone()) .upload_id(upload_id) .send() + .with_count_metric("complete_multipart_upload") .await }) .await?; @@ -530,6 +525,7 @@ impl S3CompatibleObjectStorage { .key(key) .upload_id(upload_id) .send() + .with_count_metric("abort_multipart_upload") .await }) .await?; @@ -544,8 +540,6 @@ impl S3CompatibleObjectStorage { let key = self.key(path); let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1)); - crate::STORAGE_METRICS.object_storage_get_total.inc(); - let get_object_output = self .s3_client .get_object() @@ -553,6 +547,7 @@ impl S3CompatibleObjectStorage { .key(key) .set_range(range_str) .send() + .with_count_metric("get_object") .await?; Ok(get_object_output) } @@ -640,17 +635,12 @@ impl S3CompatibleObjectStorage { for (path_chunk, delete) in &mut delete_requests_it { let delete_objects_res: StorageResult = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_bulk_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_bulk_delete_request_duration - .start_timer(); self.s3_client .delete_objects() .bucket(self.bucket.clone()) .delete(delete.clone()) .send() + .with_count_and_duration_metrics("delete_objects") .await }) .await @@ -716,10 +706,7 @@ impl S3CompatibleObjectStorage { async fn download_all(byte_stream: ByteStream, output: &mut Vec) -> io::Result<()> { output.clear(); let mut body_stream_reader = BufReader::new(byte_stream.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_stream_reader, output).await?; // When calling `get_all`, the Vec capacity is not properly set. output.shrink_to_fit(); Ok(()) @@ -735,6 +722,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .max_keys(1) .send() + .with_count_metric("list_objects_v2") .await?; Ok(()) } @@ -744,7 +732,6 @@ impl Storage for S3CompatibleObjectStorage { path: &Path, payload: Box, ) -> crate::StorageResult<()> { - crate::STORAGE_METRICS.object_storage_put_total.inc(); let _permit = REQUEST_SEMAPHORE.acquire().await; let key = self.key(path); let total_len = payload.len(); @@ -763,10 +750,7 @@ impl Storage for S3CompatibleObjectStorage { let get_object_output = aws_retry(&self.retry_params, || self.get_object(path, None)).await?; let mut body_read = BufReader::new(get_object_output.body.into_async_read()); - let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?; - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(num_bytes_copied); + copy_with_download_metrics(&mut body_read, output).await?; output.flush().await?; Ok(()) } @@ -776,17 +760,12 @@ impl Storage for S3CompatibleObjectStorage { let bucket = self.bucket.clone(); let key = self.key(path); let delete_res = aws_retry(&self.retry_params, || async { - crate::STORAGE_METRICS - .object_storage_delete_requests_total - .inc(); - let _timer = crate::STORAGE_METRICS - .object_storage_delete_request_duration - .start_timer(); self.s3_client .delete_object() .bucket(&bucket) .key(&key) .send() + .with_count_and_duration_metrics("delete_object") .await }) .await; @@ -867,6 +846,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() + .with_count_metric("head_object") .await }) .await?; From 88528bfefe2293075e77ff3e749b97bd8a71fdac Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Wed, 2 Jul 2025 12:08:24 +0200 Subject: [PATCH 2/4] Fix feature flags and license headers --- .../src/object_storage/metrics_wrappers.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs index 2347ceee9c7..0371756091b 100644 --- a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs +++ b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs @@ -1,3 +1,17 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::borrow::Cow; use std::io; use std::marker::PhantomData; @@ -12,6 +26,7 @@ use crate::STORAGE_METRICS; pub enum Status { Pending, + #[allow(dead_code)] Done, Ready(String), } @@ -167,8 +182,10 @@ where E: aws_sdk_s3::error::ProvideErrorMetadata } } +#[cfg(feature = "azure")] pub struct AzureMarker; +#[cfg(feature = "azure")] impl AsStatus for Result { fn as_status(&self) -> Status { let Err(err) = self else { @@ -187,6 +204,7 @@ impl AsStatus for Result { // The Azure SDK get_blob request returns Option because it chunks // the download into a stream of get requests. +#[cfg(feature = "azure")] impl AsStatus for Option> { fn as_status(&self) -> Status { match self { From cb7eb486b8ef8aeaabd77f8e79697baea21c8d91 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 15 Jul 2025 16:01:26 +0200 Subject: [PATCH 3/4] Improve download metric - Use explicit label values - Track download at the copy level Unified label values for object store actions. --- quickwit/quickwit-storage/src/metrics.rs | 19 +- .../src/object_storage/azure_blob_storage.rs | 35 ++- .../src/object_storage/metrics_wrappers.rs | 268 ++++++++++++++---- .../object_storage/s3_compatible_storage.rs | 24 +- 4 files changed, 252 insertions(+), 94 deletions(-) diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index ff62b373a4a..5ea00df485c 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -34,9 +34,9 @@ pub struct StorageMetrics { pub object_storage_request_duration: HistogramVec<2>, pub object_storage_get_slice_in_flight_count: IntGauge, pub object_storage_get_slice_in_flight_num_bytes: IntGauge, - pub object_storage_download_num_bytes: IntCounter, + pub object_storage_download_num_bytes: IntCounterVec<1>, pub object_storage_download_errors: IntCounterVec<1>, - pub object_storage_upload_num_bytes: IntCounter, + pub object_storage_upload_num_bytes: IntCounterVec<1>, } impl Default for StorageMetrics { @@ -97,25 +97,28 @@ impl Default for StorageMetrics { "storage", &[], ), - object_storage_download_num_bytes: new_counter( + object_storage_download_num_bytes: new_counter_vec( "object_storage_download_num_bytes", - "Amount of data downloaded from an object storage.", + "Amount of data downloaded from object storage.", "storage", &[], + ["status"], ), object_storage_download_errors: new_counter_vec( "object_storage_download_errors", - "Number of download requests that received successfull response headers but \ - failed during download.", + "Number of download requests that received successful response headers but failed \ + during download.", "storage", &[], ["status"], ), - object_storage_upload_num_bytes: new_counter( + object_storage_upload_num_bytes: new_counter_vec( "object_storage_upload_num_bytes", - "Amount of data uploaded to an object storage.", + "Amount of data uploaded to object storage. The value recorded for failed and \ + aborted uploads is the full payload size.", "storage", &[], + ["status"], ), } } diff --git a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs index 726c35be37f..3bb6d9711dd 100644 --- a/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs @@ -45,7 +45,9 @@ use tracing::{instrument, warn}; use crate::debouncer::DebouncedStorage; use crate::metrics::object_storage_get_slice_in_flight_guards; -use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics}; +use crate::object_storage::metrics_wrappers::{ + ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics, +}; use crate::storage::SendableAsync; use crate::{ BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError, @@ -226,9 +228,6 @@ impl AzureBlobStorage { name: &'a str, payload: Box, ) -> StorageResult<()> { - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(payload.len()); retry(&self.retry_params, || async { let data = Bytes::from(payload.read_all().await?.to_vec()); let hash = azure_storage_blobs::prelude::Hash::from(md5::compute(&data[..]).0); @@ -237,7 +236,7 @@ impl AzureBlobStorage { .put_block_blob(data) .hash(hash) .into_future() - .with_count_metric("put_block_blob") + .with_count_and_upload_metrics(ActionLabel::PutObject, payload.len()) .await?; Result::<(), AzureErrorWrapper>::Ok(()) }) @@ -262,9 +261,6 @@ impl AzureBlobStorage { .map(|(num, range)| { let moved_blob_client = blob_client.clone(); let moved_payload = payload.clone(); - crate::STORAGE_METRICS - .object_storage_upload_num_bytes - .inc_by(range.end - range.start); async move { retry(&self.retry_params, || async { let block_id = format!("block:{num}"); @@ -276,7 +272,10 @@ impl AzureBlobStorage { .put_block(block_id.clone(), data) .hash(hash) .into_future() - .with_count_metric("put_block") + .with_count_and_upload_metrics( + ActionLabel::UploadPart, + range.end - range.start, + ) .await?; Result::<_, AzureErrorWrapper>::Ok(block_id) }) @@ -300,7 +299,7 @@ impl AzureBlobStorage { blob_client .put_block_list(block_list) .into_future() - .with_count_metric("put_block_list") + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await .map_err(AzureErrorWrapper::from)?; @@ -317,6 +316,7 @@ impl Storage for AzureBlobStorage { .max_results(NonZeroU32::new(1u32).expect("1 is always non-zero.")) .into_stream() .next() + .with_count_metric(ActionLabel::ListObjects) .await { let _ = first_blob_result?; @@ -346,7 +346,11 @@ impl Storage for AzureBlobStorage { let name = self.blob_name(path); let mut output_stream = self.container_client.blob_client(name).get().into_stream(); - while let Some(chunk_result) = output_stream.next().with_count_metric("get_blob").await { + while let Some(chunk_result) = output_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?; let chunk_response_body_stream = chunk_response .data @@ -367,7 +371,7 @@ impl Storage for AzureBlobStorage { .blob_client(blob_name) .delete() .into_future() - .with_count_metric("delete_blob") + .with_count_metric(ActionLabel::DeleteObject) .await .map_err(|err| AzureErrorWrapper::from(err).into()); ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?; @@ -490,6 +494,7 @@ impl Storage for AzureBlobStorage { .blob_client(name) .get_properties() .into_future() + .with_count_metric(ActionLabel::HeadObject) .await; match properties_result { Ok(response) => Ok(response.blob.properties.content_length), @@ -543,7 +548,11 @@ async fn download_all( output: &mut Vec, ) -> Result<(), AzureErrorWrapper> { output.clear(); - while let Some(chunk_result) = chunk_stream.next().with_count_metric("get_blob").await { + while let Some(chunk_result) = chunk_stream + .next() + .with_count_metric(ActionLabel::GetObject) + .await + { let chunk_response = chunk_result?; let chunk_response_body_stream = chunk_response .data diff --git a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs index 0371756091b..f2d92991984 100644 --- a/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs +++ b/quickwit/quickwit-storage/src/object_storage/metrics_wrappers.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::Cow; use std::io; use std::marker::PhantomData; use std::pin::Pin; @@ -24,8 +23,39 @@ use tokio::io::{AsyncBufRead, AsyncWrite}; use crate::STORAGE_METRICS; -pub enum Status { +pub enum ActionLabel { + AbortMultipartUpload, + CompleteMultipartUpload, + CreateMultipartUpload, + DeleteObject, + DeleteObjects, + GetObject, + HeadObject, + ListObjects, + PutObject, + UploadPart, +} + +impl ActionLabel { + fn as_str(&self) -> &'static str { + match self { + ActionLabel::AbortMultipartUpload => "abort_multipart_upload", + ActionLabel::CompleteMultipartUpload => "complete_multipart_upload", + ActionLabel::CreateMultipartUpload => "create_multipart_upload", + ActionLabel::DeleteObject => "delete_object", + ActionLabel::DeleteObjects => "delete_objects", + ActionLabel::GetObject => "get_object", + ActionLabel::HeadObject => "head_object", + ActionLabel::ListObjects => "list_objects", + ActionLabel::PutObject => "put_object", + ActionLabel::UploadPart => "upload_part", + } + } +} + +pub enum RequestStatus { Pending, + // only useful on feature="azure" #[allow(dead_code)] Done, Ready(String), @@ -36,8 +66,8 @@ pub enum Status { /// /// The `Marker` type is necessary to avoid conflicting implementations of the /// trait. -pub trait AsStatus { - fn as_status(&self) -> Status; +pub trait AsRequestStatus { + fn as_status(&self) -> RequestStatus; } /// Wrapper around object store requests to record metrics, including cancellation. @@ -45,14 +75,14 @@ pub trait AsStatus { pub struct RequestMetricsWrapper where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { #[pin] tracked: F, - action: &'static str, + action: ActionLabel, start: Option, uploaded_bytes: Option, - status: Status, + status: RequestStatus, _marker: PhantomData, } @@ -60,15 +90,15 @@ where impl PinnedDrop for RequestMetricsWrapper where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { fn drop(self: Pin<&mut Self>) { let status = match &self.status { - Status::Pending => "cancelled", - Status::Done => return, - Status::Ready(s) => s.as_str(), + RequestStatus::Pending => "cancelled", + RequestStatus::Done => return, + RequestStatus::Ready(s) => s.as_str(), }; - let label_values = [self.action, status]; + let label_values = [self.action.as_str(), status]; STORAGE_METRICS .object_storage_requests_total .with_label_values(label_values) @@ -82,6 +112,7 @@ where if let Some(bytes) = self.uploaded_bytes { STORAGE_METRICS .object_storage_upload_num_bytes + .with_label_values([status]) .inc_by(bytes); } } @@ -90,7 +121,7 @@ where impl Future for RequestMetricsWrapper where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { type Output = F::Output; @@ -103,35 +134,35 @@ where } } -pub trait S3MetricsWrapperExt +pub trait RequestMetricsWrapperExt where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { - fn with_count_metric(self, action: &'static str) -> RequestMetricsWrapper; + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper; fn with_count_and_duration_metrics( self, - action: &'static str, + action: ActionLabel, ) -> RequestMetricsWrapper; fn with_count_and_upload_metrics( self, - action: &'static str, + action: ActionLabel, bytes: u64, ) -> RequestMetricsWrapper; } -impl S3MetricsWrapperExt for F +impl RequestMetricsWrapperExt for F where F: Future, - F::Output: AsStatus, + F::Output: AsRequestStatus, { - fn with_count_metric(self, action: &'static str) -> RequestMetricsWrapper { + fn with_count_metric(self, action: ActionLabel) -> RequestMetricsWrapper { RequestMetricsWrapper { tracked: self, action, - status: Status::Pending, + status: RequestStatus::Pending, start: None, uploaded_bytes: None, _marker: PhantomData, @@ -140,12 +171,12 @@ where fn with_count_and_duration_metrics( self, - action: &'static str, + action: ActionLabel, ) -> RequestMetricsWrapper { RequestMetricsWrapper { tracked: self, action, - status: Status::Pending, + status: RequestStatus::Pending, start: Some(Instant::now()), uploaded_bytes: None, _marker: PhantomData, @@ -154,13 +185,13 @@ where fn with_count_and_upload_metrics( self, - action: &'static str, + action: ActionLabel, bytes: u64, ) -> RequestMetricsWrapper { RequestMetricsWrapper { tracked: self, action, - status: Status::Pending, + status: RequestStatus::Pending, start: None, uploaded_bytes: Some(bytes), _marker: PhantomData, @@ -170,15 +201,15 @@ where pub struct S3Marker; -impl AsStatus for Result +impl AsRequestStatus for Result where E: aws_sdk_s3::error::ProvideErrorMetadata { - fn as_status(&self) -> Status { + fn as_status(&self) -> RequestStatus { let status_str = match self { Ok(_) => "success".to_string(), Err(e) => e.meta().code().unwrap_or("unknown").to_string(), }; - Status::Ready(status_str) + RequestStatus::Ready(status_str) } } @@ -186,10 +217,10 @@ where E: aws_sdk_s3::error::ProvideErrorMetadata pub struct AzureMarker; #[cfg(feature = "azure")] -impl AsStatus for Result { - fn as_status(&self) -> Status { +impl AsRequestStatus for Result { + fn as_status(&self) -> RequestStatus { let Err(err) = self else { - return Status::Ready("success".to_string()); + return RequestStatus::Ready("success".to_string()); }; let err_status_str = match err.kind() { azure_storage::ErrorKind::HttpResponse { status, .. } => status.to_string(), @@ -198,69 +229,84 @@ impl AsStatus for Result { azure_storage::ErrorKind::DataConversion => "data_conversion".to_string(), _ => "unknown".to_string(), }; - Status::Ready(err_status_str) + RequestStatus::Ready(err_status_str) } } // The Azure SDK get_blob request returns Option because it chunks // the download into a stream of get requests. #[cfg(feature = "azure")] -impl AsStatus for Option> { - fn as_status(&self) -> Status { +impl AsRequestStatus for Option> { + fn as_status(&self) -> RequestStatus { match self { - None => Status::Done, + None => RequestStatus::Done, Some(res) => res.as_status(), } } } +pub enum DownloadStatus { + InProgress, + Done, + Failed(&'static str), +} + /// Track io errors during downloads. /// /// Downloads are a bit different from other requests because the request might /// fail while getting the bytes from the response body, long after getting a /// successful response header. #[pin_project(PinnedDrop)] -struct DownloadMetricsWrapper -where F: Future> +struct DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, { #[pin] - tracked: F, - result: Option>, + tracked: copy_buf::CopyBuf<'a, R, W>, + status: DownloadStatus, } #[pinned_drop] -impl PinnedDrop for DownloadMetricsWrapper -where F: Future> +impl<'a, R, W> PinnedDrop for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, { fn drop(self: Pin<&mut Self>) { - let status = match &self.result { - None => Cow::Borrowed("cancelled"), - Some(Err(e)) => Cow::Owned(format!("{e:?}")), - Some(Ok(downloaded_bytes)) => { - STORAGE_METRICS - .object_storage_download_num_bytes - .inc_by(*downloaded_bytes); - return; - } + let error_opt = match &self.status { + DownloadStatus::InProgress => Some("cancelled"), + DownloadStatus::Failed(e) => Some(*e), + DownloadStatus::Done => None, }; + STORAGE_METRICS - .object_storage_download_errors - .with_label_values([status.as_ref()]) - .inc(); + .object_storage_download_num_bytes + .with_label_values([error_opt.unwrap_or("success")]) + .inc_by(self.tracked.amt); + + if let Some(error) = error_opt { + STORAGE_METRICS + .object_storage_download_errors + .with_label_values([error]) + .inc(); + } } } -impl Future for DownloadMetricsWrapper -where F: Future> +impl<'a, R, W> Future for DownloadMetricsWrapper<'a, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, { - type Output = F::Output; + type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); let response = ready!(this.tracked.poll(cx)); - *this.result = match &response { - Ok(s) => Some(Ok(*s)), - Err(e) => Some(Err(e.kind())), + *this.status = match &response { + Ok(_) => DownloadStatus::Done, + Err(e) => DownloadStatus::Failed(io_error_as_label(e.kind())), }; Poll::Ready(response) } @@ -275,8 +321,106 @@ where W: AsyncWrite + Unpin + ?Sized, { DownloadMetricsWrapper { - tracked: tokio::io::copy_buf(reader, writer), - result: None, + tracked: copy_buf::CopyBuf { + reader, + writer, + amt: 0, + }, + status: DownloadStatus::InProgress, } .await } + +/// This is a fork of `tokio::io::copy_buf` that enables tracking the number of +/// bytes transferred. This estimate should be accurate as long as the network +/// is the bottleneck. +mod copy_buf { + + use std::future::Future; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll, ready}; + + use tokio::io::{AsyncBufRead, AsyncWrite}; + + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + pub reader: &'a mut R, + pub writer: &'a mut W, + pub amt: u64, + } + + impl Future for CopyBuf<'_, R, W> + where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, + { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let me = &mut *self; + let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut self.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); + } + self.amt += i as u64; + Pin::new(&mut *self.reader).consume(i); + } + } + } +} + +fn io_error_as_label(error: io::ErrorKind) -> &'static str { + use io::ErrorKind::*; + // most of these variants are not expected to happen + match error { + AddrInUse => "addr_in_use", + AddrNotAvailable => "addr_not_available", + AlreadyExists => "already_exists", + ArgumentListTooLong => "argument_list_too_long", + BrokenPipe => "broken_pipe", + ConnectionAborted => "connection_aborted", + ConnectionRefused => "connection_refused", + ConnectionReset => "connection_reset", + CrossesDevices => "crosses_devices", + Deadlock => "deadlock", + DirectoryNotEmpty => "directory_not_empty", + ExecutableFileBusy => "executable_file_busy", + FileTooLarge => "file_too_large", + HostUnreachable => "host_unreachable", + Interrupted => "interrupted", + InvalidData => "invalid_data", + InvalidFilename => "invalid_filename", + InvalidInput => "invalid_input", + IsADirectory => "is_a_directory", + NetworkDown => "network_down", + NetworkUnreachable => "network_unreachable", + NotADirectory => "not_a_directory", + NotConnected => "not_connected", + NotFound => "not_found", + NotSeekable => "not_seekable", + Other => "other", + OutOfMemory => "out_of_memory", + PermissionDenied => "permission_denied", + QuotaExceeded => "quota_exceeded", + ReadOnlyFilesystem => "read_only_filesystem", + ResourceBusy => "resource_busy", + StaleNetworkFileHandle => "stale_network_file_handle", + StorageFull => "storage_full", + TimedOut => "timed_out", + TooManyLinks => "too_many_links", + UnexpectedEof => "unexpected_eof", + Unsupported => "unsupported", + WouldBlock => "would_block", + WriteZero => "write_zero", + _ => "uncategorized", + } +} diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 361c8b33c52..9d6d376205e 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -46,7 +46,9 @@ use tracing::{info, instrument, warn}; use crate::metrics::object_storage_get_slice_in_flight_guards; use crate::object_storage::MultiPartPolicy; -use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics}; +use crate::object_storage::metrics_wrappers::{ + ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics, +}; use crate::storage::SendableAsync; use crate::{ BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind, @@ -294,7 +296,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64) .send() - .with_count_and_upload_metrics("put_object", len) + .with_count_and_upload_metrics(ActionLabel::PutObject, len) .await .map_err(|sdk_error| { if sdk_error.is_retryable() { @@ -329,7 +331,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .key(key) .send() - .with_count_metric("create_multipart_upload") + .with_count_metric(ActionLabel::CreateMultipartUpload) .await }) .await? @@ -430,7 +432,7 @@ impl S3CompatibleObjectStorage { .part_number(part.part_number as i32) .upload_id(upload_id.0) .send() - .with_count_and_upload_metrics("upload_part", part.len()) + .with_count_and_upload_metrics(ActionLabel::UploadPart, part.len()) .await .map_err(|s3_err| { if s3_err.is_retryable() { @@ -510,7 +512,7 @@ impl S3CompatibleObjectStorage { .multipart_upload(completed_upload.clone()) .upload_id(upload_id) .send() - .with_count_metric("complete_multipart_upload") + .with_count_metric(ActionLabel::CompleteMultipartUpload) .await }) .await?; @@ -525,7 +527,7 @@ impl S3CompatibleObjectStorage { .key(key) .upload_id(upload_id) .send() - .with_count_metric("abort_multipart_upload") + .with_count_metric(ActionLabel::AbortMultipartUpload) .await }) .await?; @@ -547,7 +549,7 @@ impl S3CompatibleObjectStorage { .key(key) .set_range(range_str) .send() - .with_count_metric("get_object") + .with_count_metric(ActionLabel::GetObject) .await?; Ok(get_object_output) } @@ -640,7 +642,7 @@ impl S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .delete(delete.clone()) .send() - .with_count_and_duration_metrics("delete_objects") + .with_count_and_duration_metrics(ActionLabel::DeleteObjects) .await }) .await @@ -722,7 +724,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(self.bucket.clone()) .max_keys(1) .send() - .with_count_metric("list_objects_v2") + .with_count_metric(ActionLabel::ListObjects) .await?; Ok(()) } @@ -765,7 +767,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() - .with_count_and_duration_metrics("delete_object") + .with_count_and_duration_metrics(ActionLabel::DeleteObject) .await }) .await; @@ -846,7 +848,7 @@ impl Storage for S3CompatibleObjectStorage { .bucket(&bucket) .key(&key) .send() - .with_count_metric("head_object") + .with_count_metric(ActionLabel::HeadObject) .await }) .await?; From 1553498b0d4da4ef2ad3ca00ca662909ca2c8309 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 15 Jul 2025 17:12:58 +0200 Subject: [PATCH 4/4] Improve metric descriptions --- quickwit/quickwit-storage/src/metrics.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 5ea00df485c..064448e0270 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -78,7 +78,7 @@ impl Default for StorageMetrics { object_storage_request_duration: new_histogram_vec( "object_storage_request_duration", "Durations until the response headers are returned from the object store, by \ - action and status. This does not measure the download time.", + action and status. This does not measure the download time for the body content.", "storage", &[], ["action", "status"], @@ -86,14 +86,14 @@ impl Default for StorageMetrics { ), object_storage_get_slice_in_flight_count: new_gauge( "object_storage_get_slice_in_flight_count", - "Number of GetObject for which the memory was allocated but the download is still \ - in progress.", + "Number of get_object for which the memory was allocated but the download is \ + still in progress.", "storage", &[], ), object_storage_get_slice_in_flight_num_bytes: new_gauge( "object_storage_get_slice_in_flight_num_bytes", - "Memory allocated for GetObject requests that are still in progress.", + "Memory allocated for get_object requests that are still in progress.", "storage", &[], ), @@ -106,6 +106,9 @@ impl Default for StorageMetrics { ), object_storage_download_errors: new_counter_vec( "object_storage_download_errors", + // Download errors are recorded separately because the associated + // get_object requests were already recorded as successful in + // object_storage_requests_total "Number of download requests that received successful response headers but failed \ during download.", "storage",