Skip to content

Commit 716c43f

Browse files
committed
Reorganize object store metrics
1 parent 42f0c58 commit 716c43f

File tree

6 files changed

+318
-117
lines changed

6 files changed

+318
-117
lines changed

quickwit/quickwit-storage/src/metrics.rs

Lines changed: 24 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
use once_cell::sync::Lazy;
1818
use quickwit_common::metrics::{
19-
GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec,
19+
GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec,
2020
new_gauge, new_histogram_vec,
2121
};
2222

@@ -30,19 +30,13 @@ pub struct StorageMetrics {
3030
pub searcher_split_cache: CacheMetrics,
3131
pub get_slice_timeout_successes: [IntCounter; 3],
3232
pub get_slice_timeout_all_timeouts: IntCounter,
33-
pub object_storage_get_total: IntCounter,
34-
pub object_storage_get_errors_total: IntCounterVec<1>,
33+
pub object_storage_requests_total: IntCounterVec<2>,
34+
pub object_storage_request_duration: HistogramVec<2>,
3535
pub object_storage_get_slice_in_flight_count: IntGauge,
3636
pub object_storage_get_slice_in_flight_num_bytes: IntGauge,
37-
pub object_storage_put_total: IntCounter,
38-
pub object_storage_put_parts: IntCounter,
3937
pub object_storage_download_num_bytes: IntCounter,
38+
pub object_storage_download_errors: IntCounterVec<1>,
4039
pub object_storage_upload_num_bytes: IntCounter,
41-
42-
pub object_storage_delete_requests_total: IntCounter,
43-
pub object_storage_bulk_delete_requests_total: IntCounter,
44-
pub object_storage_delete_request_duration: Histogram,
45-
pub object_storage_bulk_delete_request_duration: Histogram,
4640
}
4741

4842
impl Default for StorageMetrics {
@@ -63,31 +57,6 @@ impl Default for StorageMetrics {
6357
let get_slice_timeout_all_timeouts =
6458
get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]);
6559

66-
let object_storage_requests_total = new_counter_vec(
67-
"object_storage_requests_total",
68-
"Total number of object storage requests performed.",
69-
"storage",
70-
&[],
71-
["action"],
72-
);
73-
let object_storage_delete_requests_total =
74-
object_storage_requests_total.with_label_values(["delete_object"]);
75-
let object_storage_bulk_delete_requests_total =
76-
object_storage_requests_total.with_label_values(["delete_objects"]);
77-
78-
let object_storage_request_duration = new_histogram_vec(
79-
"object_storage_request_duration_seconds",
80-
"Duration of object storage requests in seconds.",
81-
"storage",
82-
&[],
83-
["action"],
84-
vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0],
85-
);
86-
let object_storage_delete_request_duration =
87-
object_storage_request_duration.with_label_values(["delete_object"]);
88-
let object_storage_bulk_delete_request_duration =
89-
object_storage_request_duration.with_label_values(["delete_objects"]);
90-
9160
StorageMetrics {
9261
fast_field_cache: CacheMetrics::for_component("fastfields"),
9362
fd_cache_metrics: CacheMetrics::for_component("fd"),
@@ -97,19 +66,23 @@ impl Default for StorageMetrics {
9766
split_footer_cache: CacheMetrics::for_component("splitfooter"),
9867
get_slice_timeout_successes,
9968
get_slice_timeout_all_timeouts,
100-
object_storage_get_total: new_counter(
101-
"object_storage_gets_total",
102-
"Number of objects fetched. Might be lower than get_slice_timeout_outcome if \
103-
queries are debounced.",
69+
object_storage_requests_total: new_counter_vec(
70+
"object_storage_requests_total",
71+
"Number of requests to the object store, by action and status. Requests are \
72+
recorded when the response headers are returned, download failures will not \
73+
appear as errors.",
10474
"storage",
10575
&[],
76+
["action", "status"],
10677
),
107-
object_storage_get_errors_total: new_counter_vec::<1>(
108-
"object_storage_get_errors_total",
109-
"Number of GetObject errors.",
78+
object_storage_request_duration: new_histogram_vec(
79+
"object_storage_request_duration",
80+
"Durations until the response headers are returned from the object store, by \
81+
action and status. This does not measure the download time.",
11082
"storage",
11183
&[],
112-
["code"],
84+
["action", "status"],
85+
vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0],
11386
),
11487
object_storage_get_slice_in_flight_count: new_gauge(
11588
"object_storage_get_slice_in_flight_count",
@@ -124,35 +97,26 @@ impl Default for StorageMetrics {
12497
"storage",
12598
&[],
12699
),
127-
object_storage_put_total: new_counter(
128-
"object_storage_puts_total",
129-
"Number of objects uploaded. May differ from object_storage_requests_parts due to \
130-
multipart upload.",
131-
"storage",
132-
&[],
133-
),
134-
object_storage_put_parts: new_counter(
135-
"object_storage_puts_parts",
136-
"Number of object parts uploaded.",
137-
"",
138-
&[],
139-
),
140100
object_storage_download_num_bytes: new_counter(
141101
"object_storage_download_num_bytes",
142102
"Amount of data downloaded from an object storage.",
143103
"storage",
144104
&[],
145105
),
106+
object_storage_download_errors: new_counter_vec(
107+
"object_storage_download_errors",
108+
"Number of download requests that received successfull response headers but \
109+
failed during download.",
110+
"storage",
111+
&[],
112+
["status"],
113+
),
146114
object_storage_upload_num_bytes: new_counter(
147115
"object_storage_upload_num_bytes",
148116
"Amount of data uploaded to an object storage.",
149117
"storage",
150118
&[],
151119
),
152-
object_storage_delete_requests_total,
153-
object_storage_bulk_delete_requests_total,
154-
object_storage_delete_request_duration,
155-
object_storage_bulk_delete_request_duration,
156120
}
157121
}
158122
}

quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ use tracing::{instrument, warn};
4545

4646
use crate::debouncer::DebouncedStorage;
4747
use crate::metrics::object_storage_get_slice_in_flight_guards;
48+
use crate::object_storage::metrics_wrappers::{S3MetricsWrapperExt, copy_with_download_metrics};
4849
use crate::storage::SendableAsync;
4950
use crate::{
50-
BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, STORAGE_METRICS, Storage,
51-
StorageError, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult,
51+
BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError,
52+
StorageErrorKind, StorageFactory, StorageResolverError, StorageResult,
5253
};
5354

5455
/// Azure object storage resolver.
@@ -225,7 +226,6 @@ impl AzureBlobStorage {
225226
name: &'a str,
226227
payload: Box<dyn crate::PutPayload>,
227228
) -> StorageResult<()> {
228-
crate::STORAGE_METRICS.object_storage_put_parts.inc();
229229
crate::STORAGE_METRICS
230230
.object_storage_upload_num_bytes
231231
.inc_by(payload.len());
@@ -237,6 +237,7 @@ impl AzureBlobStorage {
237237
.put_block_blob(data)
238238
.hash(hash)
239239
.into_future()
240+
.with_count_metric("put_block_blob")
240241
.await?;
241242
Result::<(), AzureErrorWrapper>::Ok(())
242243
})
@@ -261,7 +262,6 @@ impl AzureBlobStorage {
261262
.map(|(num, range)| {
262263
let moved_blob_client = blob_client.clone();
263264
let moved_payload = payload.clone();
264-
crate::STORAGE_METRICS.object_storage_put_parts.inc();
265265
crate::STORAGE_METRICS
266266
.object_storage_upload_num_bytes
267267
.inc_by(range.end - range.start);
@@ -276,6 +276,7 @@ impl AzureBlobStorage {
276276
.put_block(block_id.clone(), data)
277277
.hash(hash)
278278
.into_future()
279+
.with_count_metric("put_block")
279280
.await?;
280281
Result::<_, AzureErrorWrapper>::Ok(block_id)
281282
})
@@ -299,6 +300,7 @@ impl AzureBlobStorage {
299300
blob_client
300301
.put_block_list(block_list)
301302
.into_future()
303+
.with_count_metric("put_block_list")
302304
.await
303305
.map_err(AzureErrorWrapper::from)?;
304306

@@ -327,7 +329,6 @@ impl Storage for AzureBlobStorage {
327329
path: &Path,
328330
payload: Box<dyn crate::PutPayload>,
329331
) -> crate::StorageResult<()> {
330-
crate::STORAGE_METRICS.object_storage_put_total.inc();
331332
let name = self.blob_name(path);
332333
let total_len = payload.len();
333334
let part_num_bytes = self.multipart_policy.part_num_bytes(total_len);
@@ -345,18 +346,15 @@ impl Storage for AzureBlobStorage {
345346
let name = self.blob_name(path);
346347
let mut output_stream = self.container_client.blob_client(name).get().into_stream();
347348

348-
while let Some(chunk_result) = output_stream.next().await {
349+
while let Some(chunk_result) = output_stream.next().with_count_metric("get_blob").await {
349350
let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?;
350351
let chunk_response_body_stream = chunk_response
351352
.data
352353
.map_err(FutureError::other)
353354
.into_async_read()
354355
.compat();
355356
let mut body_stream_reader = BufReader::new(chunk_response_body_stream);
356-
let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?;
357-
STORAGE_METRICS
358-
.object_storage_download_num_bytes
359-
.inc_by(num_bytes_copied);
357+
copy_with_download_metrics(&mut body_stream_reader, output).await?;
360358
}
361359
output.flush().await?;
362360
Ok(())
@@ -369,6 +367,7 @@ impl Storage for AzureBlobStorage {
369367
.blob_client(blob_name)
370368
.delete()
371369
.into_future()
370+
.with_count_metric("delete_blob")
372371
.await
373372
.map_err(|err| AzureErrorWrapper::from(err).into());
374373
ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?;
@@ -513,7 +512,7 @@ async fn extract_range_data_and_hash(
513512
.await?
514513
.into_async_read();
515514
let mut buf: Vec<u8> = Vec::with_capacity(range.count());
516-
tokio::io::copy(&mut reader, &mut buf).await?;
515+
tokio::io::copy_buf(&mut reader, &mut buf).await?;
517516
let data = Bytes::from(buf);
518517
let hash = md5::compute(&data[..]);
519518
Ok((data, hash))
@@ -544,18 +543,15 @@ async fn download_all(
544543
output: &mut Vec<u8>,
545544
) -> Result<(), AzureErrorWrapper> {
546545
output.clear();
547-
while let Some(chunk_result) = chunk_stream.next().await {
546+
while let Some(chunk_result) = chunk_stream.next().with_count_metric("get_blob").await {
548547
let chunk_response = chunk_result?;
549548
let chunk_response_body_stream = chunk_response
550549
.data
551550
.map_err(FutureError::other)
552551
.into_async_read()
553552
.compat();
554553
let mut body_stream_reader = BufReader::new(chunk_response_body_stream);
555-
let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?;
556-
crate::STORAGE_METRICS
557-
.object_storage_download_num_bytes
558-
.inc_by(num_bytes_copied);
554+
copy_with_download_metrics(&mut body_stream_reader, output).await?;
559555
}
560556
// When calling `get_all`, the Vec capacity is not properly set.
561557
output.shrink_to_fit();

quickwit/quickwit-storage/src/object_storage/error.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use aws_sdk_s3::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError};
15+
use aws_sdk_s3::error::{DisplayErrorContext, SdkError};
1616
use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
1717
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
1818
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
@@ -62,11 +62,6 @@ pub trait ToStorageErrorKind {
6262

6363
impl ToStorageErrorKind for GetObjectError {
6464
fn to_storage_error_kind(&self) -> StorageErrorKind {
65-
let error_code = self.code().unwrap_or("unknown");
66-
crate::STORAGE_METRICS
67-
.object_storage_get_errors_total
68-
.with_label_values([error_code])
69-
.inc();
7065
match self {
7166
GetObjectError::InvalidObjectState(_) => StorageErrorKind::Service,
7267
GetObjectError::NoSuchKey(_) => StorageErrorKind::NotFound,

0 commit comments

Comments
 (0)