Skip to content

Commit 42cc51a

Browse files
authored
💥 Replace OTel Prometheus Exporter (#942)
1 parent f2e056c commit 42cc51a

File tree

27 files changed

+2594
-707
lines changed

27 files changed

+2594
-707
lines changed

AGENTS.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ cargo integ-test # integration tests (starts ephemeral server by default)
4040
cargo test --test heavy_tests # load tests -- agents do not need to run this and should not
4141
```
4242

43+
Rust compilation can take some time. Do not interrupt builds or tests unless they are taking more
44+
than 10 minutes.
45+
4346
Additional checks:
4447

4548
```bash
@@ -63,6 +66,8 @@ Documentation can be generated with `cargo doc`.
6366
Reviewers will look for:
6467

6568
- All builds, tests, and lints passing in CI
69+
- Note that some tests cause intentional panics. That does not mean the test failed. You should
70+
only consider tests that have failed according to the harness to be a real problem.
6671
- New tests covering behavior changes
6772
- Clear and concise code following existing style (see `README.md` for error handling guidance)
6873
- Documentation updates for any public API changes

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ license-file = "LICENSE.txt"
88

99
[workspace.dependencies]
1010
derive_builder = "0.20"
11-
derive_more = { version = "2.0", features = ["constructor", "display", "from", "into", "debug"] }
11+
derive_more = { version = "2.0", features = ["constructor", "display", "from", "into", "debug", "try_into"] }
1212
thiserror = "2"
13-
tonic = "0.12"
14-
tonic-build = "0.12"
15-
opentelemetry = { version = "0.29", features = ["metrics"] }
13+
tonic = "0.13"
14+
tonic-build = "0.13"
15+
opentelemetry = { version = "0.30", features = ["metrics"] }
1616
prost = "0.13"
1717
prost-types = "0.13"
1818

client/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ backoff = "0.4"
2020
base64 = "0.22"
2121
derive_builder = { workspace = true }
2222
derive_more = { workspace = true }
23+
bytes = "1.10"
2324
futures-util = { version = "0.3", default-features = false }
2425
futures-retry = "0.6.0"
25-
http = "1.1.0"
26+
http = "1.1"
2627
http-body-util = "0.1"
2728
hyper = { version = "1.4.1" }
2829
hyper-util = "0.1.6"
@@ -31,7 +32,7 @@ parking_lot = "0.12"
3132
slotmap = "1.0"
3233
thiserror = { workspace = true }
3334
tokio = "1.1"
34-
tonic = { workspace = true, features = ["tls", "tls-roots"] }
35+
tonic = { workspace = true, features = ["tls-ring", "tls-native-roots"] }
3536
tower = { version = "0.5", features = ["util"] }
3637
tracing = "0.1"
3738
url = "2.2"

client/src/lib.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use temporal_sdk_core_protos::{
7272
};
7373
use tonic::{
7474
Code,
75-
body::BoxBody,
75+
body::Body,
7676
client::GrpcService,
7777
codegen::InterceptedService,
7878
metadata::{MetadataKey, MetadataMap, MetadataValue},
@@ -595,7 +595,7 @@ fn get_decode_max_size() -> usize {
595595
impl<T> TemporalServiceClient<T>
596596
where
597597
T: Clone,
598-
T: GrpcService<BoxBody> + Send + Clone + 'static,
598+
T: GrpcService<Body> + Send + Clone + 'static,
599599
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
600600
T::Error: Into<tonic::codegen::StdError>,
601601
<T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
@@ -1175,13 +1175,13 @@ impl From<common::v1::Priority> for Priority {
11751175
impl<T> WorkflowClientTrait for T
11761176
where
11771177
T: RawClientLike + NamespacedClient + Clone + Send + Sync + 'static,
1178-
<Self as RawClientLike>::SvcType: GrpcService<BoxBody> + Send + Clone + 'static,
1179-
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::ResponseBody:
1178+
<Self as RawClientLike>::SvcType: GrpcService<Body> + Send + Clone + 'static,
1179+
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::ResponseBody:
11801180
tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
1181-
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::Error:
1181+
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::Error:
11821182
Into<tonic::codegen::StdError>,
1183-
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::Future: Send,
1184-
<<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::ResponseBody
1183+
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::Future: Send,
1184+
<<<Self as RawClientLike>::SvcType as GrpcService<Body>>::ResponseBody
11851185
as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
11861186
{
11871187
async fn start_workflow(
@@ -1673,6 +1673,15 @@ impl<T> RequestExt for tonic::Request<T> {
16731673
}
16741674
}
16751675

1676+
macro_rules! dbg_panic {
1677+
($($arg:tt)*) => {
1678+
use tracing::error;
1679+
error!($($arg)*);
1680+
debug_assert!(false, $($arg)*);
1681+
};
1682+
}
1683+
pub(crate) use dbg_panic;
1684+
16761685
#[cfg(test)]
16771686
mod tests {
16781687
use super::*;

client/src/metrics.rs

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
use crate::{AttachMetricLabels, CallType};
1+
use crate::{AttachMetricLabels, CallType, dbg_panic};
22
use futures_util::{FutureExt, future::BoxFuture};
33
use std::{
44
sync::Arc,
55
task::{Context, Poll},
66
time::{Duration, Instant},
77
};
88
use temporal_sdk_core_api::telemetry::metrics::{
9-
CoreMeter, Counter, HistogramDuration, MetricAttributes, MetricKeyValue, MetricParameters,
10-
TemporalMeter,
9+
CoreMeter, Counter, CounterBase, HistogramDuration, HistogramDurationBase, MetricAttributable,
10+
MetricAttributes, MetricKeyValue, MetricParameters, TemporalMeter,
1111
};
12-
use tonic::{Code, body::BoxBody, transport::Channel};
12+
use tonic::{Code, body::Body, transport::Channel};
1313
use tower::Service;
1414

1515
/// The string name (which may be prefixed) for this metric
@@ -26,22 +26,24 @@ pub(crate) struct MetricsContext {
2626
meter: Arc<dyn CoreMeter>,
2727
kvs: MetricAttributes,
2828
poll_is_long: bool,
29+
instruments: Instruments,
30+
}
31+
#[derive(Clone)]
32+
struct Instruments {
33+
svc_request: Counter,
34+
svc_request_failed: Counter,
35+
long_svc_request: Counter,
36+
long_svc_request_failed: Counter,
2937

30-
svc_request: Arc<dyn Counter>,
31-
svc_request_failed: Arc<dyn Counter>,
32-
long_svc_request: Arc<dyn Counter>,
33-
long_svc_request_failed: Arc<dyn Counter>,
34-
35-
svc_request_latency: Arc<dyn HistogramDuration>,
36-
long_svc_request_latency: Arc<dyn HistogramDuration>,
38+
svc_request_latency: HistogramDuration,
39+
long_svc_request_latency: HistogramDuration,
3740
}
3841

3942
impl MetricsContext {
4043
pub(crate) fn new(tm: TemporalMeter) -> Self {
4144
let meter = tm.inner;
42-
Self {
43-
kvs: meter.new_attributes(tm.default_attribs),
44-
poll_is_long: false,
45+
let kvs = meter.new_attributes(tm.default_attribs);
46+
let instruments = Instruments {
4547
svc_request: meter.counter(MetricParameters {
4648
name: "request".into(),
4749
description: "Count of client request successes by rpc name".into(),
@@ -72,6 +74,11 @@ impl MetricsContext {
7274
unit: "duration".into(),
7375
description: "Histogram of client long-poll request latencies".into(),
7476
}),
77+
};
78+
Self {
79+
kvs,
80+
poll_is_long: false,
81+
instruments,
7582
meter,
7683
}
7784
}
@@ -81,6 +88,33 @@ impl MetricsContext {
8188
self.kvs = self
8289
.meter
8390
.extend_attributes(self.kvs.clone(), new_kvs.into());
91+
92+
let _ = self
93+
.instruments
94+
.svc_request
95+
.with_attributes(&self.kvs)
96+
.and_then(|v| {
97+
self.instruments.svc_request = v;
98+
self.instruments.long_svc_request.with_attributes(&self.kvs)
99+
})
100+
.and_then(|v| {
101+
self.instruments.long_svc_request = v;
102+
self.instruments
103+
.svc_request_latency
104+
.with_attributes(&self.kvs)
105+
})
106+
.and_then(|v| {
107+
self.instruments.svc_request_latency = v;
108+
self.instruments
109+
.long_svc_request_latency
110+
.with_attributes(&self.kvs)
111+
})
112+
.map(|v| {
113+
self.instruments.long_svc_request_latency = v;
114+
})
115+
.inspect_err(|e| {
116+
dbg_panic!("Failed to extend client metrics attributes: {:?}", e);
117+
});
84118
}
85119

86120
pub(crate) fn set_is_long_poll(&mut self) {
@@ -90,9 +124,9 @@ impl MetricsContext {
90124
/// A request to the temporal service was made
91125
pub(crate) fn svc_request(&self) {
92126
if self.poll_is_long {
93-
self.long_svc_request.add(1, &self.kvs);
127+
self.instruments.long_svc_request.adds(1);
94128
} else {
95-
self.svc_request.add(1, &self.kvs);
129+
self.instruments.svc_request.adds(1);
96130
}
97131
}
98132

@@ -108,18 +142,18 @@ impl MetricsContext {
108142
&self.kvs
109143
};
110144
if self.poll_is_long {
111-
self.long_svc_request_failed.add(1, kvs);
145+
self.instruments.long_svc_request_failed.add(1, kvs);
112146
} else {
113-
self.svc_request_failed.add(1, kvs);
147+
self.instruments.svc_request_failed.add(1, kvs);
114148
}
115149
}
116150

117151
/// Record service request latency
118152
pub(crate) fn record_svc_req_latency(&self, dur: Duration) {
119153
if self.poll_is_long {
120-
self.long_svc_request_latency.record(dur, &self.kvs);
154+
self.instruments.long_svc_request_latency.records(dur);
121155
} else {
122-
self.svc_request_latency.record(dur, &self.kvs);
156+
self.instruments.svc_request_latency.records(dur);
123157
}
124158
}
125159
}
@@ -177,16 +211,16 @@ pub struct GrpcMetricSvc {
177211
pub(crate) disable_errcode_label: bool,
178212
}
179213

180-
impl Service<http::Request<BoxBody>> for GrpcMetricSvc {
181-
type Response = http::Response<BoxBody>;
214+
impl Service<http::Request<Body>> for GrpcMetricSvc {
215+
type Response = http::Response<Body>;
182216
type Error = tonic::transport::Error;
183217
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
184218

185219
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
186220
self.inner.poll_ready(cx).map_err(Into::into)
187221
}
188222

189-
fn call(&mut self, mut req: http::Request<BoxBody>) -> Self::Future {
223+
fn call(&mut self, mut req: http::Request<Body>) -> Self::Future {
190224
let metrics = self
191225
.metrics
192226
.clone()

client/src/raw.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use temporal_sdk_core_protos::{
2424
};
2525
use tonic::{
2626
Request, Response, Status,
27-
body::BoxBody,
27+
body::Body,
2828
client::GrpcService,
2929
metadata::{AsciiMetadataValue, KeyAndValueRef},
3030
};
@@ -166,7 +166,7 @@ where
166166
impl<T> RawClientLike for TemporalServiceClient<T>
167167
where
168168
T: Send + Sync + Clone + 'static,
169-
T: GrpcService<BoxBody> + Send + Clone + 'static,
169+
T: GrpcService<Body> + Send + Clone + 'static,
170170
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
171171
T::Error: Into<tonic::codegen::StdError>,
172172
<T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
@@ -221,7 +221,7 @@ where
221221
impl<T> RawClientLike for ConfiguredClient<TemporalServiceClient<T>>
222222
where
223223
T: Send + Sync + Clone + 'static,
224-
T: GrpcService<BoxBody> + Send + Clone + 'static,
224+
T: GrpcService<Body> + Send + Clone + 'static,
225225
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
226226
T::Error: Into<tonic::codegen::StdError>,
227227
<T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
@@ -373,7 +373,7 @@ pub(super) struct IsUserLongPoll;
373373
impl<RC, T> WorkflowService for RC
374374
where
375375
RC: RawClientLike<SvcType = T>,
376-
T: GrpcService<BoxBody> + Send + Clone + 'static,
376+
T: GrpcService<Body> + Send + Clone + 'static,
377377
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
378378
T::Error: Into<tonic::codegen::StdError>,
379379
T::Future: Send,
@@ -383,7 +383,7 @@ where
383383
impl<RC, T> OperatorService for RC
384384
where
385385
RC: RawClientLike<SvcType = T>,
386-
T: GrpcService<BoxBody> + Send + Clone + 'static,
386+
T: GrpcService<Body> + Send + Clone + 'static,
387387
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
388388
T::Error: Into<tonic::codegen::StdError>,
389389
T::Future: Send,
@@ -393,7 +393,7 @@ where
393393
impl<RC, T> CloudService for RC
394394
where
395395
RC: RawClientLike<SvcType = T>,
396-
T: GrpcService<BoxBody> + Send + Clone + 'static,
396+
T: GrpcService<Body> + Send + Clone + 'static,
397397
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
398398
T::Error: Into<tonic::codegen::StdError>,
399399
T::Future: Send,
@@ -403,7 +403,7 @@ where
403403
impl<RC, T> TestService for RC
404404
where
405405
RC: RawClientLike<SvcType = T>,
406-
T: GrpcService<BoxBody> + Send + Clone + 'static,
406+
T: GrpcService<Body> + Send + Clone + 'static,
407407
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
408408
T::Error: Into<tonic::codegen::StdError>,
409409
T::Future: Send,
@@ -413,7 +413,7 @@ where
413413
impl<RC, T> HealthService for RC
414414
where
415415
RC: RawClientLike<SvcType = T>,
416-
T: GrpcService<BoxBody> + Send + Clone + 'static,
416+
T: GrpcService<Body> + Send + Clone + 'static,
417417
T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
418418
T::Error: Into<tonic::codegen::StdError>,
419419
T::Future: Send,
@@ -483,13 +483,13 @@ macro_rules! proxier {
483483
pub trait $trait_name: RawClientLike
484484
where
485485
// Yo this is wild
486-
<Self as RawClientLike>::SvcType: GrpcService<BoxBody> + Send + Clone + 'static,
487-
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::ResponseBody:
486+
<Self as RawClientLike>::SvcType: GrpcService<Body> + Send + Clone + 'static,
487+
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::ResponseBody:
488488
tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
489-
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::Error:
489+
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::Error:
490490
Into<tonic::codegen::StdError>,
491-
<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::Future: Send,
492-
<<<Self as RawClientLike>::SvcType as GrpcService<BoxBody>>::ResponseBody
491+
<<Self as RawClientLike>::SvcType as GrpcService<Body>>::Future: Send,
492+
<<<Self as RawClientLike>::SvcType as GrpcService<Body>>::ResponseBody
493493
as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
494494
{
495495
$(

core-api/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@ categories = ["development-tools"]
1414

1515
[features]
1616
otel_impls = ["dep:opentelemetry"]
17-
envconfig = ["dep:toml", "dep:serde", "dep:dirs", "dep:anyhow"]
17+
envconfig = ["dep:toml", "dep:serde", "dep:dirs"]
1818

1919
[dependencies]
20-
anyhow = { version = "1.0", optional = true }
2120
async-trait = "0.1"
2221
dirs = { version = "5.0", optional = true }
2322
derive_builder = { workspace = true }
@@ -29,6 +28,7 @@ serde_json = "1.0"
2928
thiserror = { workspace = true }
3029
toml = { version = "0.8", optional = true }
3130
tonic = { workspace = true }
31+
tracing = "0.1"
3232
tracing-core = "0.1"
3333
url = "2.3"
3434

core-api/src/envconfig.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ pub enum ConfigError {
6969
InvalidConfig(String),
7070

7171
#[error("Configuration loading error: {0}")]
72-
LoadError(anyhow::Error),
72+
LoadError(Box<dyn std::error::Error>),
7373
}
7474

7575
impl From<std::str::Utf8Error> for ConfigError {

core-api/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,3 +138,12 @@ pub trait Worker: Send + Sync {
138138
/// functions have returned `ShutDown` errors.
139139
async fn finalize_shutdown(self);
140140
}
141+
142+
macro_rules! dbg_panic {
143+
($($arg:tt)*) => {
144+
use tracing::error;
145+
error!($($arg)*);
146+
debug_assert!(false, $($arg)*);
147+
};
148+
}
149+
pub(crate) use dbg_panic;

0 commit comments

Comments
 (0)