Skip to content

Commit 6d44f23

Browse files
authored
Add non monotonic counter support to prometheus (#385)
1 parent 232a87e commit 6d44f23

File tree

13 files changed

+156
-84
lines changed

13 files changed

+156
-84
lines changed

examples/basic-otlp/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
6161

6262
let value_recorder_two = meter.f64_value_recorder("ex.com.two").init();
6363

64-
let _correlations =
64+
let _baggage =
6565
Context::current_with_baggage(vec![FOO_KEY.string("foo1"), BAR_KEY.string("bar1")])
6666
.attach();
6767

examples/basic/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
6767

6868
let value_recorder_two = meter.f64_value_recorder("ex.com.two").init();
6969

70-
let _correlations =
70+
let _baggage =
7171
Context::current_with_baggage(vec![FOO_KEY.string("foo1"), BAR_KEY.string("bar1")])
7272
.attach();
7373

opentelemetry-prometheus/src/lib.rs

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@
6666

6767
use opentelemetry::global;
6868
use opentelemetry::sdk::{
69-
export::metrics::{CheckpointSet, ExportKind, Histogram, Record, Sum},
69+
export::metrics::{CheckpointSet, ExportKindSelector, Histogram, LastValue, Record, Sum},
7070
metrics::{
71-
aggregators::{HistogramAggregator, SumAggregator},
71+
aggregators::{HistogramAggregator, LastValueAggregator, SumAggregator},
7272
controllers,
7373
selectors::simple::Selector,
7474
PullController,
@@ -77,9 +77,7 @@ use opentelemetry::sdk::{
7777
};
7878
use opentelemetry::{
7979
labels,
80-
metrics::{
81-
registry::RegistryMeterProvider, Descriptor, InstrumentKind, MetricsError, NumberKind,
82-
},
80+
metrics::{registry::RegistryMeterProvider, MetricsError, NumberKind},
8381
KeyValue,
8482
};
8583
use std::collections::HashMap;
@@ -92,7 +90,8 @@ use sanitize::sanitize;
9290

9391
/// Cache disabled by default.
9492
const DEFAULT_CACHE_PERIOD: Duration = Duration::from_secs(0);
95-
const EXPORT_KIND: ExportKind = ExportKind::Cumulative;
93+
94+
const EXPORT_KIND_SELECTOR: ExportKindSelector = ExportKindSelector::Cumulative;
9695

9796
/// Create a new prometheus exporter builder.
9897
pub fn exporter() -> ExporterBuilder {
@@ -181,7 +180,7 @@ impl ExporterBuilder {
181180
.default_histogram_boundaries
182181
.unwrap_or_else(|| vec![0.5, 0.9, 0.99]);
183182
let selector = Box::new(Selector::Histogram(default_histogram_boundaries.clone()));
184-
let mut controller_builder = controllers::pull(selector, Box::new(EXPORT_KIND))
183+
let mut controller_builder = controllers::pull(selector, Box::new(EXPORT_KIND_SELECTOR))
185184
.with_cache_period(self.cache_period.unwrap_or(DEFAULT_CACHE_PERIOD))
186185
.with_memory(true);
187186
if let Some(resource) = self.resource {
@@ -256,17 +255,6 @@ impl PrometheusExporter {
256255
.map_err(Into::into)
257256
.map(|locked| locked.provider())
258257
}
259-
260-
/// Determine the export kind this exporter should use for a given instrument
261-
/// and descriptor.
262-
pub fn export_kind_for(&self, _descriptor: &Descriptor, _kind: &InstrumentKind) -> ExportKind {
263-
// NOTE: Summary values should use Delta aggregation, then be
264-
// combined into a sliding window, see the TODO below.
265-
// NOTE: Prometheus also supports a "GaugeDelta" exposition format,
266-
// which is expressed as a delta histogram. Need to understand if this
267-
// should be a default behavior for ValueRecorder/ValueObserver.
268-
EXPORT_KIND
269-
}
270258
}
271259

272260
#[derive(Debug)]
@@ -296,9 +284,10 @@ impl prometheus::core::Collector for Collector {
296284
return metrics;
297285
}
298286

299-
if let Err(err) = controller.try_for_each(&EXPORT_KIND, &mut |record| {
287+
if let Err(err) = controller.try_for_each(&EXPORT_KIND_SELECTOR, &mut |record| {
300288
let agg = record.aggregator().ok_or(MetricsError::NoDataCollected)?;
301289
let number_kind = record.descriptor().number_kind();
290+
let instrument_kind = record.descriptor().instrument_kind();
302291

303292
let mut label_keys = Vec::new();
304293
let mut label_values = Vec::new();
@@ -309,7 +298,15 @@ impl prometheus::core::Collector for Collector {
309298
if let Some(hist) = agg.as_any().downcast_ref::<HistogramAggregator>() {
310299
metrics.push(build_histogram(hist, number_kind, desc, label_values)?);
311300
} else if let Some(sum) = agg.as_any().downcast_ref::<SumAggregator>() {
312-
metrics.push(build_counter(sum, number_kind, desc, label_values)?);
301+
let counter = if instrument_kind.monotonic() {
302+
build_monotonic_counter(sum, number_kind, desc, label_values)?
303+
} else {
304+
build_non_monotonic_counter(sum, number_kind, desc, label_values)?
305+
};
306+
307+
metrics.push(counter);
308+
} else if let Some(last) = agg.as_any().downcast_ref::<LastValueAggregator>() {
309+
metrics.push(build_last_value(last, number_kind, desc, label_values)?);
313310
}
314311

315312
Ok(())
@@ -324,7 +321,59 @@ impl prometheus::core::Collector for Collector {
324321
}
325322
}
326323

327-
fn build_counter(
324+
fn build_last_value(
325+
lv: &LastValueAggregator,
326+
kind: &NumberKind,
327+
desc: prometheus::core::Desc,
328+
labels: Vec<KeyValue>,
329+
) -> Result<prometheus::proto::MetricFamily, MetricsError> {
330+
let (last_value, _) = lv.last_value()?;
331+
332+
let mut g = prometheus::proto::Gauge::default();
333+
g.set_value(last_value.to_f64(kind));
334+
335+
let mut m = prometheus::proto::Metric::default();
336+
m.set_label(protobuf::RepeatedField::from_vec(
337+
labels.into_iter().map(build_label_pair).collect(),
338+
));
339+
m.set_gauge(g);
340+
341+
let mut mf = prometheus::proto::MetricFamily::default();
342+
mf.set_name(desc.fq_name);
343+
mf.set_help(desc.help);
344+
mf.set_field_type(prometheus::proto::MetricType::GAUGE);
345+
mf.set_metric(protobuf::RepeatedField::from_vec(vec![m]));
346+
347+
Ok(mf)
348+
}
349+
350+
fn build_non_monotonic_counter(
351+
sum: &SumAggregator,
352+
kind: &NumberKind,
353+
desc: prometheus::core::Desc,
354+
labels: Vec<KeyValue>,
355+
) -> Result<prometheus::proto::MetricFamily, MetricsError> {
356+
let sum = sum.sum()?;
357+
358+
let mut g = prometheus::proto::Gauge::default();
359+
g.set_value(sum.to_f64(kind));
360+
361+
let mut m = prometheus::proto::Metric::default();
362+
m.set_label(protobuf::RepeatedField::from_vec(
363+
labels.into_iter().map(build_label_pair).collect(),
364+
));
365+
m.set_gauge(g);
366+
367+
let mut mf = prometheus::proto::MetricFamily::default();
368+
mf.set_name(desc.fq_name);
369+
mf.set_help(desc.help);
370+
mf.set_field_type(prometheus::proto::MetricType::GAUGE);
371+
mf.set_metric(protobuf::RepeatedField::from_vec(vec![m]));
372+
373+
Ok(mf)
374+
}
375+
376+
fn build_monotonic_counter(
328377
sum: &SumAggregator,
329378
kind: &NumberKind,
330379
desc: prometheus::core::Desc,

opentelemetry-prometheus/tests/integration_test.rs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use opentelemetry::sdk::Resource;
2-
use opentelemetry::{metrics::MeterProvider, KeyValue};
2+
use opentelemetry::{
3+
metrics::{MeterProvider, ObserverResult},
4+
KeyValue,
5+
};
36
use opentelemetry_prometheus::PrometheusExporter;
47
use prometheus::{Encoder, TextEncoder};
58

@@ -12,6 +15,7 @@ fn test_add() {
1215

1316
let meter = exporter.provider().unwrap().meter("test");
1417

18+
let up_down_counter = meter.f64_up_down_counter("updowncounter").init();
1519
let counter = meter.f64_counter("counter").init();
1620
let value_recorder = meter.f64_value_recorder("value_recorder").init();
1721

@@ -22,18 +26,32 @@ fn test_add() {
2226
counter.add(10.0, &labels);
2327
counter.add(5.3, &labels);
2428

25-
expected.push("counter{A=\"B\",C=\"D\",R=\"V\"} 15.3");
29+
expected.push(r#"counter{A="B",C="D",R="V"} 15.3"#);
30+
31+
let cb_labels = labels.clone();
32+
let _observer = meter
33+
.i64_value_observer("intobserver", move |result: ObserverResult<i64>| {
34+
result.observe(1, cb_labels.as_ref())
35+
})
36+
.init();
37+
38+
expected.push(r#"intobserver{A="B",C="D",R="V"} 1"#);
2639

2740
value_recorder.record(-0.6, &labels);
2841
value_recorder.record(-0.4, &labels);
2942
value_recorder.record(0.6, &labels);
3043
value_recorder.record(20.0, &labels);
3144

32-
expected.push("value_recorder_bucket{A=\"B\",C=\"D\",R=\"V\",le=\"+Inf\"} 4");
33-
expected.push("value_recorder_bucket{A=\"B\",C=\"D\",R=\"V\",le=\"-0.5\"} 1");
34-
expected.push("value_recorder_bucket{A=\"B\",C=\"D\",R=\"V\",le=\"1\"} 3");
35-
expected.push("value_recorder_count{A=\"B\",C=\"D\",R=\"V\"} 4");
36-
expected.push("value_recorder_sum{A=\"B\",C=\"D\",R=\"V\"} 19.6");
45+
expected.push(r#"value_recorder_bucket{A="B",C="D",R="V",le="+Inf"} 4"#);
46+
expected.push(r#"value_recorder_bucket{A="B",C="D",R="V",le="-0.5"} 1"#);
47+
expected.push(r#"value_recorder_bucket{A="B",C="D",R="V",le="1"} 3"#);
48+
expected.push(r#"value_recorder_count{A="B",C="D",R="V"} 4"#);
49+
expected.push(r#"value_recorder_sum{A="B",C="D",R="V"} 19.6"#);
50+
51+
up_down_counter.add(10.0, &labels);
52+
up_down_counter.add(-3.2, &labels);
53+
54+
expected.push(r#"updowncounter{A="B",C="D",R="V"} 6.8"#);
3755

3856
compare_export(&exporter, expected)
3957
}

opentelemetry/src/api/metrics/async_instrument.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ where
6969
}
7070
}
7171

72-
/// Observe captures a single integer value from the associated instrument
73-
/// callback, with the given labels.
72+
/// Observe captures a single value from the associated instrument callback,
73+
/// with the given labels.
7474
pub fn observe(&self, value: T, labels: &[KeyValue]) {
7575
(self.f)(
7676
labels,

opentelemetry/src/api/metrics/kind.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,12 @@ impl InstrumentKind {
5151

5252
/// Whether this kind of instrument groups its inputs (as opposed to adding).
5353
pub fn grouping(&self) -> bool {
54-
matches!(
55-
self,
56-
InstrumentKind::Counter
57-
| InstrumentKind::UpDownCounter
58-
| InstrumentKind::SumObserver
59-
| InstrumentKind::UpDownSumObserver
60-
)
54+
!self.adding()
55+
}
56+
57+
/// Whether this kind of instrument exposes a non-decreasing sum.
58+
pub fn monotonic(&self) -> bool {
59+
matches!(self, InstrumentKind::Counter | InstrumentKind::SumObserver)
6160
}
6261

6362
/// Whether this kind of instrument receives precomputed sums.

opentelemetry/src/api/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#[cfg(feature = "metrics")]
22
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
33
use crate::api::metrics::MetricsError;
4+
#[cfg(feature = "trace")]
5+
#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
46
use crate::api::trace::TraceError;
57
use std::sync::PoisonError;
68

opentelemetry/src/global/error_handler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub fn handle_error<T: Into<Error>>(err: T) {
2121
#[cfg(feature = "trace")]
2222
#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
2323
Error::Trace(err) => eprintln!("OpenTelemetry trace error occurred {:?}", err),
24-
Error::Other(err_msg) => println!("OpenTelemetry error occurred {}", err_msg),
24+
Error::Other(err_msg) => eprintln!("OpenTelemetry error occurred {}", err_msg),
2525
},
2626
}
2727
}

opentelemetry/src/sdk/export/metrics/mod.rs

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ pub trait Subtractor {
168168
/// Exporter handles presentation of the checkpoint of aggregate metrics. This
169169
/// is the final stage of a metrics export pipeline, where metric data are
170170
/// formatted for a specific system.
171-
pub trait Exporter: ExportKindSelector {
171+
pub trait Exporter: ExportKindFor {
172172
/// Export is called immediately after completing a collection pass in the SDK.
173173
///
174174
/// The CheckpointSet interface refers to the Processor that just completed
@@ -179,7 +179,7 @@ pub trait Exporter: ExportKindSelector {
179179
/// ExportKindSelector is a sub-interface of Exporter used to indicate
180180
/// whether the Processor should compute Delta or Cumulative
181181
/// Aggregations.
182-
pub trait ExportKindSelector: fmt::Debug {
182+
pub trait ExportKindFor: fmt::Debug {
183183
/// Determines the correct `ExportKind` that should be used when exporting data
184184
/// for the given metric instrument.
185185
fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind;
@@ -202,7 +202,7 @@ pub trait CheckpointSet: fmt::Debug {
202202
/// return the error to the caller.
203203
fn try_for_each(
204204
&mut self,
205-
export_selector: &dyn ExportKindSelector,
205+
export_selector: &dyn ExportKindFor,
206206
f: &mut dyn FnMut(&Record<'_>) -> Result<()>,
207207
) -> Result<()>;
208208
}
@@ -373,15 +373,22 @@ impl<'a> Accumulation<'a> {
373373
#[derive(Clone, Debug)]
374374
pub enum ExportKind {
375375
/// Indicates that the `Exporter` expects a cumulative `Aggregation`.
376-
Cumulative = 1, // e.g., Prometheus
376+
Cumulative = 1,
377377

378378
/// Indicates that the `Exporter` expects a delta `Aggregation`.
379-
Delta = 2, // e.g., StatsD
379+
Delta = 2,
380+
}
380381

381-
/// Indicates that the `Exporter` expects either a cumulative or a delta
382-
/// `Aggregation`, whichever does not require maintaining state for the
383-
/// given instrument.
384-
PassThrough = 4, // e.g., OTLP
382+
/// Strategies for selecting which export kind is used for an instrument.
383+
#[derive(Debug)]
384+
pub enum ExportKindSelector {
385+
/// A selector that always returns [`ExportKind::Cumulative`].
386+
Cumulative,
387+
/// A selector that always returns [`ExportKind::Delta`].
388+
Delta,
389+
/// A selector that returns cumulative or delta based on a given instrument
390+
/// kind.
391+
Stateless,
385392
}
386393

387394
impl ExportKind {
@@ -409,8 +416,18 @@ impl ExportKind {
409416
}
410417
}
411418

412-
impl ExportKindSelector for ExportKind {
413-
fn export_kind_for(&self, _descriptor: &Descriptor) -> ExportKind {
414-
self.clone()
419+
impl ExportKindFor for ExportKindSelector {
420+
fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind {
421+
match self {
422+
ExportKindSelector::Cumulative => ExportKind::Cumulative,
423+
ExportKindSelector::Delta => ExportKind::Delta,
424+
ExportKindSelector::Stateless => {
425+
if descriptor.instrument_kind().precomputed_sum() {
426+
ExportKind::Cumulative
427+
} else {
428+
ExportKind::Delta
429+
}
430+
}
431+
}
415432
}
416433
}

opentelemetry/src/sdk/export/metrics/stdout.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
use crate::global;
33
use crate::sdk::{
44
export::metrics::{
5-
CheckpointSet, Count, ExportKind, ExportKindSelector, Exporter, LastValue, Max, Min,
6-
Quantile, Sum,
5+
CheckpointSet, Count, ExportKind, ExportKindFor, ExportKindSelector, Exporter, LastValue,
6+
Max, Min, Quantile, Sum,
77
},
88
metrics::{
99
aggregators::{
@@ -235,16 +235,16 @@ where
235235
}
236236
}
237237

238-
impl<W> ExportKindSelector for StdoutExporter<W>
238+
impl<W> ExportKindFor for StdoutExporter<W>
239239
where
240240
W: fmt::Debug + io::Write,
241241
{
242-
fn export_kind_for(&self, _descriptor: &Descriptor) -> ExportKind {
243-
ExportKind::PassThrough
242+
fn export_kind_for(&self, descriptor: &Descriptor) -> ExportKind {
243+
ExportKindSelector::Stateless.export_kind_for(descriptor)
244244
}
245245
}
246246

247-
/// A formatter for user-defined batch serilization.
247+
/// A formatter for user-defined batch serialization.
248248
pub struct Formatter(Box<dyn Fn(ExportBatch) -> Result<String> + Send + Sync>);
249249
impl fmt::Debug for Formatter {
250250
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -361,7 +361,7 @@ where
361361
let (spawn, interval, exporter) = self.try_build()?;
362362
let mut push_builder = controllers::push(
363363
simple::Selector::Exact,
364-
ExportKind::PassThrough,
364+
ExportKindSelector::Stateless,
365365
exporter,
366366
spawn,
367367
interval,

0 commit comments

Comments
 (0)