Skip to content

Commit 89945c2

Browse files
authored
Buffered metric support (#240)
Fixes #176
1 parent 7f746cf commit 89945c2

File tree

12 files changed

+651
-83
lines changed

12 files changed

+651
-83
lines changed

temporalio/ext/src/client.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,15 @@ impl Client {
176176
.await?;
177177
Ok(core)
178178
},
179-
move |_, result: Result<CoreClient, ClientInitError>| match result {
180-
Ok(core) => callback.push(Client {
181-
core,
182-
runtime_handle,
183-
}),
184-
Err(err) => callback.push(new_error!("Failed client connect: {}", err)),
179+
move |ruby, result: Result<CoreClient, ClientInitError>| match result {
180+
Ok(core) => callback.push(
181+
&ruby,
182+
Client {
183+
core,
184+
runtime_handle,
185+
},
186+
),
187+
Err(err) => callback.push(&ruby, new_error!("Failed client connect: {}", err)),
185188
},
186189
);
187190
Ok(())
@@ -325,12 +328,12 @@ where
325328
};
326329
res.map(|msg| msg.get_ref().encode_to_vec())
327330
},
328-
move |_, result| {
331+
move |ruby, result| {
329332
match result {
330333
// TODO(cretz): Any reasonable way to prevent byte copy that is just going to get decoded into proto
331334
// object?
332-
Ok(val) => callback.push(RString::from_slice(&val)),
333-
Err(status) => callback.push(RpcFailure { status }),
335+
Ok(val) => callback.push(&ruby, RString::from_slice(&val)),
336+
Err(status) => callback.push(&ruby, RpcFailure { status }),
334337
}
335338
},
336339
);

temporalio/ext/src/metric.rs

Lines changed: 229 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
1-
use std::{sync::Arc, time::Duration};
1+
use std::{any::Any, sync::Arc, time::Duration};
22

33
use magnus::{
4-
class, function, method,
4+
class, function,
5+
gc::register_mark_object,
6+
method,
57
prelude::*,
68
r_hash::ForEach,
7-
value::{IntoId, Qfalse, Qtrue},
8-
DataTypeFunctions, Error, Float, Integer, RHash, RString, Ruby, Symbol, TryConvert, TypedData,
9-
Value,
9+
value::{IntoId, Lazy, Qfalse, Qtrue},
10+
DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol,
11+
Symbol, TryConvert, TypedData, Value,
12+
};
13+
use temporal_sdk_core_api::telemetry::metrics::{
14+
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent,
1015
};
11-
use temporal_sdk_core_api::telemetry::metrics;
1216

13-
use crate::{error, id, runtime::Runtime, ROOT_MOD};
17+
use crate::{error, id, runtime::Runtime, util::SendSyncBoxValue, ROOT_MOD};
1418

1519
pub fn init(ruby: &Ruby) -> Result<(), Error> {
1620
let root_mod = ruby.get_inner(&ROOT_MOD);
@@ -268,3 +272,221 @@ fn metric_key_value(k: Value, v: Value) -> Result<metrics::MetricKeyValue, Error
268272
};
269273
Ok(metrics::MetricKeyValue::new(key, val))
270274
}
275+
276+
#[derive(Clone, Debug)]
277+
pub struct BufferedMetricRef {
278+
value: Arc<SendSyncBoxValue<Value>>,
279+
}
280+
281+
impl BufferInstrumentRef for BufferedMetricRef {}
282+
283+
#[derive(Debug)]
284+
struct BufferedMetricAttributes {
285+
value: SendSyncBoxValue<RHash>,
286+
}
287+
288+
impl CustomMetricAttributes for BufferedMetricAttributes {
289+
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
290+
self as Arc<dyn Any + Send + Sync>
291+
}
292+
}
293+
294+
static METRIC_BUFFER_UPDATE: Lazy<RClass> = Lazy::new(|ruby| {
295+
let cls = ruby
296+
.class_object()
297+
.const_get::<_, RModule>("Temporalio")
298+
.unwrap()
299+
.const_get::<_, RClass>("Runtime")
300+
.unwrap()
301+
.const_get::<_, RClass>("MetricBuffer")
302+
.unwrap()
303+
.const_get("Update")
304+
.unwrap();
305+
// Make sure class is not GC'd
306+
register_mark_object(cls);
307+
cls
308+
});
309+
310+
static METRIC_BUFFER_METRIC: Lazy<RClass> = Lazy::new(|ruby| {
311+
let cls = ruby
312+
.class_object()
313+
.const_get::<_, RModule>("Temporalio")
314+
.unwrap()
315+
.const_get::<_, RClass>("Runtime")
316+
.unwrap()
317+
.const_get::<_, RClass>("MetricBuffer")
318+
.unwrap()
319+
.const_get("Metric")
320+
.unwrap();
321+
// Make sure class is not GC'd
322+
register_mark_object(cls);
323+
cls
324+
});
325+
326+
static METRIC_KIND_COUNTER: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("counter"));
327+
static METRIC_KIND_GAUGE: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("gauge"));
328+
static METRIC_KIND_HISTOGRAM: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("histogram"));
329+
330+
pub fn convert_metric_events(
331+
ruby: &Ruby,
332+
events: Vec<MetricEvent<BufferedMetricRef>>,
333+
durations_as_seconds: bool,
334+
) -> Result<Vec<Value>, Error> {
335+
let temp: Result<Vec<Option<Value>>, Error> = events
336+
.into_iter()
337+
.map(|e| convert_metric_event(ruby, e, durations_as_seconds))
338+
.collect();
339+
Ok(temp?.into_iter().flatten().collect())
340+
}
341+
342+
fn convert_metric_event(
343+
ruby: &Ruby,
344+
event: MetricEvent<BufferedMetricRef>,
345+
durations_as_seconds: bool,
346+
) -> Result<Option<Value>, Error> {
347+
match event {
348+
// Create the metric and put it on the lazy ref
349+
MetricEvent::Create {
350+
params,
351+
populate_into,
352+
kind,
353+
} => {
354+
let cls = ruby.get_inner(&METRIC_BUFFER_METRIC);
355+
let val: Value = cls.funcall(
356+
"new",
357+
(
358+
// Name
359+
params.name.to_string(),
360+
// Description
361+
Some(params.description)
362+
.filter(|s| !s.is_empty())
363+
.map(|s| s.to_string()),
364+
// Unit
365+
if matches!(kind, metrics::MetricKind::HistogramDuration)
366+
&& params.unit == "duration"
367+
{
368+
if durations_as_seconds {
369+
Some("s".to_owned())
370+
} else {
371+
Some("ms".to_owned())
372+
}
373+
} else if params.unit.is_empty() {
374+
None
375+
} else {
376+
Some(params.unit.to_string())
377+
},
378+
// Kind
379+
match kind {
380+
metrics::MetricKind::Counter => ruby.get_inner(&METRIC_KIND_COUNTER),
381+
metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => {
382+
ruby.get_inner(&METRIC_KIND_GAUGE)
383+
}
384+
metrics::MetricKind::Histogram
385+
| metrics::MetricKind::HistogramF64
386+
| metrics::MetricKind::HistogramDuration => {
387+
ruby.get_inner(&METRIC_KIND_HISTOGRAM)
388+
}
389+
},
390+
),
391+
)?;
392+
// Put on lazy ref
393+
populate_into
394+
.set(Arc::new(BufferedMetricRef {
395+
value: Arc::new(SendSyncBoxValue::new(val)),
396+
}))
397+
.map_err(|_| error!("Failed setting metric ref"))?;
398+
Ok(None)
399+
}
400+
// Create the attributes and put it on the lazy ref
401+
MetricEvent::CreateAttributes {
402+
populate_into,
403+
append_from,
404+
attributes,
405+
} => {
406+
// Create a hash (from existing or new)
407+
let hash: RHash = match append_from {
408+
Some(existing) => {
409+
let attrs = existing
410+
.get()
411+
.clone()
412+
.as_any()
413+
.downcast::<BufferedMetricAttributes>()
414+
.map_err(|_| {
415+
error!("Unable to downcast to expected buffered metric attributes")
416+
})?
417+
.value
418+
.value(ruby);
419+
attrs.funcall("dup", ())?
420+
}
421+
None => ruby.hash_new_capa(attributes.len()),
422+
};
423+
// Add attributes
424+
for kv in attributes.into_iter() {
425+
match kv.value {
426+
metrics::MetricValue::String(v) => hash.aset(kv.key, v)?,
427+
metrics::MetricValue::Int(v) => hash.aset(kv.key, v)?,
428+
metrics::MetricValue::Float(v) => hash.aset(kv.key, v)?,
429+
metrics::MetricValue::Bool(v) => hash.aset(kv.key, v)?,
430+
};
431+
}
432+
hash.freeze();
433+
// Put on lazy ref
434+
populate_into
435+
.set(Arc::new(BufferedMetricAttributes {
436+
value: SendSyncBoxValue::new(hash),
437+
}))
438+
.map_err(|_| error!("Failed setting metric attrs"))?;
439+
Ok(None)
440+
}
441+
// Convert to Ruby metric update
442+
MetricEvent::Update {
443+
instrument,
444+
attributes,
445+
update,
446+
} => {
447+
let cls = ruby.get_inner(&METRIC_BUFFER_UPDATE);
448+
Ok(Some(
449+
cls.funcall(
450+
"new",
451+
(
452+
// Metric
453+
instrument.get().clone().value.clone().value(ruby),
454+
// Value
455+
match update {
456+
metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => {
457+
ruby.into_value(v.as_secs_f64())
458+
}
459+
metrics::MetricUpdateVal::Duration(v) => {
460+
// As of this writing, https://github.com/matsadler/magnus/pull/136 not released, so we will do
461+
// the logic ourselves
462+
let val = v.as_millis();
463+
if val <= u64::MAX as u128 {
464+
ruby.into_value(val as u64)
465+
} else {
466+
ruby.module_kernel()
467+
.funcall("Integer", (val.to_string(),))
468+
.unwrap()
469+
}
470+
}
471+
metrics::MetricUpdateVal::Delta(v) => ruby.into_value(v),
472+
metrics::MetricUpdateVal::DeltaF64(v) => ruby.into_value(v),
473+
metrics::MetricUpdateVal::Value(v) => ruby.into_value(v),
474+
metrics::MetricUpdateVal::ValueF64(v) => ruby.into_value(v),
475+
},
476+
// Attributes
477+
attributes
478+
.get()
479+
.clone()
480+
.as_any()
481+
.downcast::<BufferedMetricAttributes>()
482+
.map_err(|_| {
483+
error!("Unable to downcast to expected buffered metric attributes")
484+
})?
485+
.value
486+
.value(ruby),
487+
),
488+
)?,
489+
))
490+
}
491+
}
492+
}

temporalio/ext/src/runtime.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
11
use super::{error, id, ROOT_MOD};
2+
use crate::metric::{convert_metric_events, BufferedMetricRef};
23
use crate::util::{without_gvl, Struct};
34
use magnus::{
4-
class, function, method, prelude::*, DataTypeFunctions, Error, Ruby, TypedData, Value,
5+
class, function, method, prelude::*, DataTypeFunctions, Error, RArray, Ruby, TypedData, Value,
56
};
67
use std::collections::HashMap;
78
use std::net::SocketAddr;
89
use std::str::FromStr;
910
use std::sync::mpsc::{channel, Receiver, Sender};
1011
use std::time::Duration;
1112
use std::{future::Future, sync::Arc};
12-
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
13+
use temporal_sdk_core::telemetry::{
14+
build_otlp_metric_exporter, start_prometheus_metric_exporter, MetricsCallBuffer,
15+
};
1316
use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder};
1417
use temporal_sdk_core_api::telemetry::{
15-
Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol,
16-
PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
18+
metrics::MetricCallBufferer, Logger, MetricTemporality, OtelCollectorOptionsBuilder,
19+
OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
1720
};
1821
use tracing::error as log_error;
1922
use url::Url;
@@ -24,6 +27,10 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
2427
.define_class("Runtime", class::object())?;
2528
class.define_singleton_method("new", function!(Runtime::new, 1))?;
2629
class.define_method("run_command_loop", method!(Runtime::run_command_loop, 0))?;
30+
class.define_method(
31+
"retrieve_buffered_metrics",
32+
method!(Runtime::retrieve_buffered_metrics, 1),
33+
)?;
2734
Ok(())
2835
}
2936

@@ -33,6 +40,7 @@ pub struct Runtime {
3340
/// Separate cloneable handle that can be referenced in other Rust objects.
3441
pub(crate) handle: RuntimeHandle,
3542
async_command_rx: Receiver<AsyncCommand>,
43+
metrics_call_buffer: Option<Arc<MetricsCallBuffer<BufferedMetricRef>>>,
3644
}
3745

3846
#[derive(Clone)]
@@ -94,9 +102,10 @@ impl Runtime {
94102
.map_err(|err| error!("Failed initializing telemetry: {}", err))?;
95103

96104
// Create metrics (created after Core runtime since it needs Tokio handle)
105+
let mut metrics_call_buffer = None;
97106
if let Some(metrics) = telemetry.child(id!("metrics"))? {
98107
let _guard = core.tokio_handle().enter();
99-
match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.child(id!("buffered_with_size"))?) {
108+
match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.member::<Option<usize>>(id!("buffered_with_size"))?) {
100109
// Build OTel
101110
(Some(opentelemetry), None, None) => {
102111
let mut opts_build = OtelCollectorOptionsBuilder::default();
@@ -148,8 +157,11 @@ impl Runtime {
148157
|err| error!("Failed building starting Prometheus exporter: {}", err),
149158
)?.meter);
150159
},
151-
// TODO(cretz): Metric buffering
152-
(None, None, Some(_buffer_size)) => return Err(error!("Metric buffering not yet supported")),
160+
(None, None, Some(buffer_size)) => {
161+
let buffer = Arc::new(MetricsCallBuffer::new(buffer_size));
162+
core.telemetry_mut().attach_late_init_metrics(buffer.clone());
163+
metrics_call_buffer = Some(buffer);
164+
},
153165
_ => return Err(error!("One and only one of opentelemetry, prometheus, or buffered_with_size must be set"))
154166
};
155167
}
@@ -163,6 +175,7 @@ impl Runtime {
163175
async_command_tx,
164176
},
165177
async_command_rx,
178+
metrics_call_buffer,
166179
})
167180
}
168181

@@ -193,6 +206,16 @@ impl Runtime {
193206
}
194207
}
195208
}
209+
210+
pub fn retrieve_buffered_metrics(&self, durations_as_seconds: bool) -> Result<RArray, Error> {
211+
let ruby = Ruby::get().expect("Not in Ruby thread");
212+
let buff = self
213+
.metrics_call_buffer
214+
.clone()
215+
.expect("Attempting to retrieve buffered metrics without buffer");
216+
let updates = convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)?;
217+
Ok(ruby.ary_new_from_values(&updates))
218+
}
196219
}
197220

198221
impl RuntimeHandle {

0 commit comments

Comments
 (0)