Skip to content

Support float and duration metrics #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions temporalio/bridge/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,46 @@ def record(self, value: int, attrs: MetricAttributes) -> None:
self._ref.record(value, attrs._ref)


class MetricHistogramFloat:
"""Metric histogram using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize histogram."""
self._ref = meter._ref.new_histogram_float(name, description, unit)

def record(self, value: float, attrs: MetricAttributes) -> None:
"""Record value on histogram."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.record(value, attrs._ref)


class MetricHistogramDuration:
"""Metric histogram using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize histogram."""
self._ref = meter._ref.new_histogram_duration(name, description, unit)

def record(self, value_ms: int, attrs: MetricAttributes) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass through timedelta here too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the most trivial to create these types of objects on the Rust side and there's a cost to creating them on the Rust side (have to capture GIL, etc). I considered converting back to timedelta closer to the user (and I do it in .NET), but I would have to decide whether to create it on the Rust side or Python side (today each getter of a Python buffered metric value is actually backed by Rust values which is neat). The Rust side is a pain, and the Python side is expensive and unnecessary since the user can do it themselves.

But I have left open this possibility. I have an enum of MetricBufferDurationFormat and if we want to add a third TIMEDELTA enum we can (like we did in .NET).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

K, that's reasonable. Being able to record them as timedeltas is the main thing that matters and that works.

"""Record value on histogram."""
if value_ms < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.record(value_ms, attrs._ref)


class MetricGauge:
"""Metric gauge using SDK Core."""

Expand All @@ -95,6 +135,26 @@ def set(self, value: int, attrs: MetricAttributes) -> None:
self._ref.set(value, attrs._ref)


class MetricGaugeFloat:
"""Metric gauge using SDK Core."""

def __init__(
self,
meter: MetricMeter,
name: str,
description: Optional[str],
unit: Optional[str],
) -> None:
"""Initialize gauge."""
self._ref = meter._ref.new_gauge_float(name, description, unit)

def set(self, value: float, attrs: MetricAttributes) -> None:
"""Set value on gauge."""
if value < 0:
raise ValueError("Metric value must be non-negative value")
self._ref.set(value, attrs._ref)


class MetricAttributes:
"""Metric attributes using SDK Core."""

Expand Down
4 changes: 2 additions & 2 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ def __init__(self, *, telemetry: TelemetryConfig) -> None:
"""Create SDK Core runtime."""
self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry)

def retrieve_buffered_metrics(self) -> Sequence[Any]:
def retrieve_buffered_metrics(self, durations_as_seconds: bool) -> Sequence[Any]:
"""Get buffered metrics."""
return self._ref.retrieve_buffered_metrics()
return self._ref.retrieve_buffered_metrics(durations_as_seconds)

def write_test_info_log(self, message: str, extra_data: str) -> None:
"""Write a test core log at INFO level."""
Expand Down
6 changes: 3 additions & 3 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use std::str::FromStr;
use std::time::Duration;
use temporal_client::{
ClientKeepAliveConfig as CoreClientKeepAliveConfig, ClientOptions, ClientOptionsBuilder,
ConfiguredClient, HealthService, OperatorService, RetryClient, RetryConfig,
TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService, HttpConnectProxyOptions,
ConfiguredClient, HealthService, HttpConnectProxyOptions, OperatorService, RetryClient,
RetryConfig, TemporalServiceClientWithMetrics, TestService, TlsConfig, WorkflowService,
};
use tonic::metadata::MetadataKey;
use url::Url;
Expand Down Expand Up @@ -467,4 +467,4 @@ impl From<ClientHttpConnectProxyConfig> for HttpConnectProxyOptions {
basic_auth: conf.basic_auth,
}
}
}
}
3 changes: 3 additions & 0 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<metric::MetricAttributesRef>()?;
m.add_class::<metric::MetricCounterRef>()?;
m.add_class::<metric::MetricHistogramRef>()?;
m.add_class::<metric::MetricHistogramFloatRef>()?;
m.add_class::<metric::MetricHistogramDurationRef>()?;
m.add_class::<metric::MetricGaugeRef>()?;
m.add_class::<metric::MetricGaugeFloatRef>()?;
m.add_class::<metric::BufferedMetricUpdate>()?;
m.add_class::<metric::BufferedMetric>()?;
m.add_function(wrap_pyfunction!(new_metric_meter, m)?)?;
Expand Down
131 changes: 119 additions & 12 deletions temporalio/bridge/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::any::Any;
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};

use pyo3::prelude::*;
Expand Down Expand Up @@ -32,11 +33,26 @@ pub struct MetricHistogramRef {
histogram: Arc<dyn metrics::Histogram>,
}

#[pyclass]
pub struct MetricHistogramFloatRef {
histogram: Arc<dyn metrics::HistogramF64>,
}

#[pyclass]
pub struct MetricHistogramDurationRef {
histogram: Arc<dyn metrics::HistogramDuration>,
}

#[pyclass]
pub struct MetricGaugeRef {
gauge: Arc<dyn metrics::Gauge>,
}

#[pyclass]
pub struct MetricGaugeFloatRef {
gauge: Arc<dyn metrics::GaugeF64>,
}

pub fn new_metric_meter(runtime_ref: &runtime::RuntimeRef) -> Option<MetricMeterRef> {
runtime_ref
.runtime
Expand Down Expand Up @@ -84,6 +100,36 @@ impl MetricMeterRef {
}
}

fn new_histogram_float(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricHistogramFloatRef {
MetricHistogramFloatRef {
histogram: self.meter.inner.histogram_f64(build_metric_parameters(
name,
description,
unit,
)),
}
}

fn new_histogram_duration(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricHistogramDurationRef {
MetricHistogramDurationRef {
histogram: self.meter.inner.histogram_duration(build_metric_parameters(
name,
description,
unit,
)),
}
}

fn new_gauge(
&self,
name: String,
Expand All @@ -97,6 +143,20 @@ impl MetricMeterRef {
.gauge(build_metric_parameters(name, description, unit)),
}
}

fn new_gauge_float(
&self,
name: String,
description: Option<String>,
unit: Option<String>,
) -> MetricGaugeFloatRef {
MetricGaugeFloatRef {
gauge: self
.meter
.inner
.gauge_f64(build_metric_parameters(name, description, unit)),
}
}
}

#[pymethods]
Expand All @@ -113,13 +173,35 @@ impl MetricHistogramRef {
}
}

#[pymethods]
impl MetricHistogramFloatRef {
fn record(&self, value: f64, attrs_ref: &MetricAttributesRef) {
self.histogram.record(value, &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricHistogramDurationRef {
fn record(&self, value_ms: u64, attrs_ref: &MetricAttributesRef) {
self.histogram
.record(Duration::from_millis(value_ms), &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricGaugeRef {
fn set(&self, value: u64, attrs_ref: &MetricAttributesRef) {
self.gauge.record(value, &attrs_ref.attrs);
}
}

#[pymethods]
impl MetricGaugeFloatRef {
fn set(&self, value: f64, attrs_ref: &MetricAttributesRef) {
self.gauge.record(value, &attrs_ref.attrs);
}
}

fn build_metric_parameters(
name: String,
description: Option<String>,
Expand Down Expand Up @@ -192,16 +274,18 @@ pub struct BufferedMetricUpdate {
}

#[derive(Clone)]
pub struct BufferedMetricUpdateValue(metrics::MetricUpdateVal);
pub enum BufferedMetricUpdateValue {
U64(u64),
U128(u128),
F64(f64),
}

impl IntoPy<PyObject> for BufferedMetricUpdateValue {
fn into_py(self, py: Python) -> PyObject {
match self.0 {
metrics::MetricUpdateVal::Delta(v) => v.into_py(py),
metrics::MetricUpdateVal::DeltaF64(v) => v.into_py(py),
metrics::MetricUpdateVal::Value(v) => v.into_py(py),
metrics::MetricUpdateVal::ValueF64(v) => v.into_py(py),
metrics::MetricUpdateVal::Duration(v) => v.as_millis().into_py(py),
match self {
BufferedMetricUpdateValue::U64(v) => v.into_py(py),
BufferedMetricUpdateValue::U128(v) => v.into_py(py),
BufferedMetricUpdateValue::F64(v) => v.into_py(py),
}
}
}
Expand Down Expand Up @@ -236,16 +320,18 @@ impl CustomMetricAttributes for BufferedMetricAttributes {
pub fn convert_metric_events<'p>(
py: Python<'p>,
events: Vec<MetricEvent<BufferedMetricRef>>,
durations_as_seconds: bool,
) -> Vec<BufferedMetricUpdate> {
events
.into_iter()
.filter_map(|e| convert_metric_event(py, e))
.filter_map(|e| convert_metric_event(py, e, durations_as_seconds))
.collect()
}

fn convert_metric_event<'p>(
py: Python<'p>,
event: MetricEvent<BufferedMetricRef>,
durations_as_seconds: bool,
) -> Option<BufferedMetricUpdate> {
match event {
// Create the metric and put it on the lazy ref
Expand All @@ -262,9 +348,19 @@ fn convert_metric_event<'p>(
description: Some(params.description)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
unit: Some(params.unit)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
unit: if matches!(kind, metrics::MetricKind::HistogramDuration)
&& params.unit == "duration"
{
if durations_as_seconds {
Some("s".to_owned())
} else {
Some("ms".to_owned())
}
} else if params.unit.is_empty() {
None
} else {
Some(params.unit.to_string())
},
kind: match kind {
metrics::MetricKind::Counter => 0,
metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => 1,
Expand Down Expand Up @@ -324,7 +420,18 @@ fn convert_metric_event<'p>(
update,
} => Some(BufferedMetricUpdate {
metric: instrument.get().clone().0.clone(),
value: BufferedMetricUpdateValue(update),
value: match update {
metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => {
BufferedMetricUpdateValue::F64(v.as_secs_f64())
}
metrics::MetricUpdateVal::Duration(v) => {
BufferedMetricUpdateValue::U128(v.as_millis())
}
metrics::MetricUpdateVal::Delta(v) => BufferedMetricUpdateValue::U64(v),
metrics::MetricUpdateVal::DeltaF64(v) => BufferedMetricUpdateValue::F64(v),
metrics::MetricUpdateVal::Value(v) => BufferedMetricUpdateValue::U64(v),
metrics::MetricUpdateVal::ValueF64(v) => BufferedMetricUpdateValue::F64(v),
},
attributes: attributes
.get()
.clone()
Expand Down
7 changes: 6 additions & 1 deletion temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,19 @@ impl Drop for Runtime {

#[pymethods]
impl RuntimeRef {
fn retrieve_buffered_metrics<'p>(&self, py: Python<'p>) -> Vec<BufferedMetricUpdate> {
fn retrieve_buffered_metrics<'p>(
&self,
py: Python<'p>,
durations_as_seconds: bool,
) -> Vec<BufferedMetricUpdate> {
convert_metric_events(
py,
self.runtime
.metrics_call_buffer
.as_ref()
.expect("Attempting to retrieve buffered metrics without buffer")
.retrieve(),
durations_as_seconds,
)
}

Expand Down
Loading
Loading