Skip to content

Commit 93ce4eb

Browse files
committed
Improve download metric
- Use explicit label values - Track download at the copy level Unified label values for object store actions.
1 parent 88528bf commit 93ce4eb

File tree

4 files changed

+253
-94
lines changed

4 files changed

+253
-94
lines changed

quickwit/quickwit-storage/src/metrics.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ pub struct StorageMetrics {
3434
pub object_storage_request_duration: HistogramVec<2>,
3535
pub object_storage_get_slice_in_flight_count: IntGauge,
3636
pub object_storage_get_slice_in_flight_num_bytes: IntGauge,
37-
pub object_storage_download_num_bytes: IntCounter,
37+
pub object_storage_download_num_bytes: IntCounterVec<1>,
3838
pub object_storage_download_errors: IntCounterVec<1>,
39-
pub object_storage_upload_num_bytes: IntCounter,
39+
pub object_storage_upload_num_bytes: IntCounterVec<1>,
4040
}
4141

4242
impl Default for StorageMetrics {
@@ -97,25 +97,28 @@ impl Default for StorageMetrics {
9797
"storage",
9898
&[],
9999
),
100-
object_storage_download_num_bytes: new_counter(
100+
object_storage_download_num_bytes: new_counter_vec(
101101
"object_storage_download_num_bytes",
102-
"Amount of data downloaded from an object storage.",
102+
"Amount of data downloaded from object storage.",
103103
"storage",
104104
&[],
105+
["status"],
105106
),
106107
object_storage_download_errors: new_counter_vec(
107108
"object_storage_download_errors",
108-
"Number of download requests that received successfull response headers but \
109-
failed during download.",
109+
"Number of download requests that received successful response headers but failed \
110+
during download.",
110111
"storage",
111112
&[],
112113
["status"],
113114
),
114-
object_storage_upload_num_bytes: new_counter(
115+
object_storage_upload_num_bytes: new_counter_vec(
115116
"object_storage_upload_num_bytes",
116-
"Amount of data uploaded to an object storage.",
117+
"Amount of data uploaded to object storage. The value recorded for failed and \
118+
aborted uploads is the full payload size.",
117119
"storage",
118120
&[],
121+
["status"],
119122
),
120123
}
121124
}

quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ use tracing::{instrument, warn};
4545

4646
use crate::debouncer::DebouncedStorage;
4747
use crate::metrics::object_storage_get_slice_in_flight_guards;
48-
use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics};
48+
use crate::object_storage::metrics_wrappers::{
49+
ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics,
50+
};
4951
use crate::storage::SendableAsync;
5052
use crate::{
5153
BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError,
@@ -226,9 +228,6 @@ impl AzureBlobStorage {
226228
name: &'a str,
227229
payload: Box<dyn crate::PutPayload>,
228230
) -> StorageResult<()> {
229-
crate::STORAGE_METRICS
230-
.object_storage_upload_num_bytes
231-
.inc_by(payload.len());
232231
retry(&self.retry_params, || async {
233232
let data = Bytes::from(payload.read_all().await?.to_vec());
234233
let hash = azure_storage_blobs::prelude::Hash::from(md5::compute(&data[..]).0);
@@ -237,7 +236,7 @@ impl AzureBlobStorage {
237236
.put_block_blob(data)
238237
.hash(hash)
239238
.into_future()
240-
.with_count_metric("put_block_blob")
239+
.with_count_and_upload_metrics(ActionLabel::PutObject, payload.len())
241240
.await?;
242241
Result::<(), AzureErrorWrapper>::Ok(())
243242
})
@@ -262,9 +261,6 @@ impl AzureBlobStorage {
262261
.map(|(num, range)| {
263262
let moved_blob_client = blob_client.clone();
264263
let moved_payload = payload.clone();
265-
crate::STORAGE_METRICS
266-
.object_storage_upload_num_bytes
267-
.inc_by(range.end - range.start);
268264
async move {
269265
retry(&self.retry_params, || async {
270266
let block_id = format!("block:{num}");
@@ -276,7 +272,10 @@ impl AzureBlobStorage {
276272
.put_block(block_id.clone(), data)
277273
.hash(hash)
278274
.into_future()
279-
.with_count_metric("put_block")
275+
.with_count_and_upload_metrics(
276+
ActionLabel::UploadPart,
277+
range.end - range.start,
278+
)
280279
.await?;
281280
Result::<_, AzureErrorWrapper>::Ok(block_id)
282281
})
@@ -300,7 +299,7 @@ impl AzureBlobStorage {
300299
blob_client
301300
.put_block_list(block_list)
302301
.into_future()
303-
.with_count_metric("put_block_list")
302+
.with_count_metric(ActionLabel::CompleteMultipartUpload)
304303
.await
305304
.map_err(AzureErrorWrapper::from)?;
306305

@@ -317,6 +316,7 @@ impl Storage for AzureBlobStorage {
317316
.max_results(NonZeroU32::new(1u32).expect("1 is always non-zero."))
318317
.into_stream()
319318
.next()
319+
.with_count_metric(ActionLabel::ListObjects)
320320
.await
321321
{
322322
let _ = first_blob_result?;
@@ -346,7 +346,11 @@ impl Storage for AzureBlobStorage {
346346
let name = self.blob_name(path);
347347
let mut output_stream = self.container_client.blob_client(name).get().into_stream();
348348

349-
while let Some(chunk_result) = output_stream.next().with_count_metric("get_blob").await {
349+
while let Some(chunk_result) = output_stream
350+
.next()
351+
.with_count_metric(ActionLabel::GetObject)
352+
.await
353+
{
350354
let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?;
351355
let chunk_response_body_stream = chunk_response
352356
.data
@@ -367,7 +371,7 @@ impl Storage for AzureBlobStorage {
367371
.blob_client(blob_name)
368372
.delete()
369373
.into_future()
370-
.with_count_metric("delete_blob")
374+
.with_count_metric(ActionLabel::DeleteObject)
371375
.await
372376
.map_err(|err| AzureErrorWrapper::from(err).into());
373377
ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?;
@@ -490,6 +494,7 @@ impl Storage for AzureBlobStorage {
490494
.blob_client(name)
491495
.get_properties()
492496
.into_future()
497+
.with_count_metric(ActionLabel::HeadObject)
493498
.await;
494499
match properties_result {
495500
Ok(response) => Ok(response.blob.properties.content_length),
@@ -543,7 +548,11 @@ async fn download_all(
543548
output: &mut Vec<u8>,
544549
) -> Result<(), AzureErrorWrapper> {
545550
output.clear();
546-
while let Some(chunk_result) = chunk_stream.next().with_count_metric("get_blob").await {
551+
while let Some(chunk_result) = chunk_stream
552+
.next()
553+
.with_count_metric(ActionLabel::GetObject)
554+
.await
555+
{
547556
let chunk_response = chunk_result?;
548557
let chunk_response_body_stream = chunk_response
549558
.data

0 commit comments

Comments
 (0)