Skip to content

Commit 8da219d

Browse files
authored
Update Otel/Tonic (#698)
And other deps
1 parent c55ba57 commit 8da219d

File tree

18 files changed

+99
-88
lines changed

18 files changed

+99
-88
lines changed

Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ license-file = "LICENSE.txt"
1010
derive_builder = "0.20"
1111
derive_more = { version = "0.99", default-features = false, features = ["constructor", "display", "from", "into"] }
1212
once_cell = "1.16"
13-
tonic = "0.9"
14-
tonic-build = "0.9"
15-
opentelemetry = "0.21"
13+
tonic = "0.11"
14+
tonic-build = "0.11"
15+
opentelemetry = "0.22"
16+
prost = "0.12"
17+
prost-types = "0.12"

client/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ http = "0.2"
2525
once_cell = { workspace = true }
2626
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
2727
parking_lot = "0.12"
28-
prost-types = "0.11"
28+
prost-types = { workspace = true }
2929
slotmap = "1.0"
3030
thiserror = "1.0"
3131
tokio = "1.1"
@@ -43,4 +43,4 @@ path = "../core-api"
4343

4444
[dev-dependencies]
4545
assert_matches = "1"
46-
mockall = "0.11"
46+
mockall = "0.12"

core-api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ async-trait = "0.1"
2020
derive_builder = { workspace = true }
2121
derive_more = { workspace = true }
2222
opentelemetry = { workspace = true, optional = true }
23-
prost-types = "0.11"
23+
prost-types = { workspace = true }
2424
serde = { version = "1.0", default_features = false, features = ["derive"] }
2525
serde_json = "1.0"
2626
thiserror = "1.0"

core/Cargo.toml

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ default = ["otel"]
1818
# record WF input data, we can build them a custom SDK or they can build - it adds significant extra
1919
# code size in the form of [de]serializers.
2020
save_wf_inputs = ["rmp-serde", "temporal-sdk-core-protos/serde_serialize"]
21-
otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "dep:opentelemetry-prometheus"]
21+
otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp",
22+
"dep:opentelemetry-prometheus", "dep:hyper", "dep:hyper-util", "dep:http-body-util"]
2223
tokio-console = ["console-subscriber"]
2324
ephemeral-server = ["dep:flate2", "dep:nix", "dep:reqwest", "dep:tar", "dep:zip"]
2425

2526
[dependencies]
2627
anyhow = "1.0"
27-
arc-swap = "1.3"
2828
async-trait = "0.1"
2929
base64 = "0.21"
30-
console-subscriber = { version = "0.1", optional = true }
30+
console-subscriber = { version = "0.2", optional = true }
3131
crossbeam-channel = "0.5"
3232
crossbeam-queue = "0.3"
3333
dashmap = "5.5"
@@ -39,22 +39,23 @@ flate2 = { version = "1.0", optional = true }
3939
futures = "0.3"
4040
futures-util = "0.3"
4141
governor = "0.6"
42-
http = "0.2"
43-
hyper = "0.14"
42+
http-body-util = { version = "0.1", optional = true }
43+
hyper = { version = "1.2", optional = true }
44+
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"], optional = true }
4445
itertools = "0.12"
45-
lru = "0.11"
46-
mockall = "0.11"
46+
lru = "0.12"
47+
mockall = "0.12"
4748
nix = { version = "0.28", optional = true, features = ["process", "signal"] }
4849
once_cell = { workspace = true }
4950
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
50-
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio", "metrics"], optional = true }
51-
opentelemetry-otlp = { version = "0.14", features = ["tokio", "metrics"], optional = true }
52-
opentelemetry-prometheus = { version = "0.14", optional = true }
51+
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio", "metrics"], optional = true }
52+
opentelemetry-otlp = { version = "0.15", features = ["tokio", "metrics"], optional = true }
53+
opentelemetry-prometheus = { version = "0.15", optional = true }
5354
parking_lot = { version = "0.12", features = ["send_guard"] }
5455
pin-project = "1.0"
5556
prometheus = "0.13"
56-
prost = "0.11"
57-
prost-types = { version = "0.4", package = "prost-wkt-types" }
57+
prost = { workspace = true }
58+
prost-types = { version = "0.5", package = "prost-wkt-types" }
5859
rand = "0.8.3"
5960
reqwest = { version = "0.11", features = ["json", "stream", "rustls-tls", "tokio-rustls"], default-features = false, optional = true }
6061
ringbuf = "0.3"
@@ -70,7 +71,6 @@ tokio-util = { version = "0.7", features = ["io", "io-util"] }
7071
tokio-stream = "0.1"
7172
tonic = { workspace = true, features = ["tls", "tls-roots"] }
7273
tracing = "0.1"
73-
tracing-futures = "0.2"
7474
tracing-subscriber = { version = "0.3", features = ["parking_lot", "env-filter", "registry"] }
7575
url = "2.2"
7676
uuid = { version = "1.1", features = ["v4"] }

core/src/protosext/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ impl ValidScheduleLA {
406406
.clone()
407407
.try_into_or_none()
408408
.unwrap_or_else(|| Duration::from_secs(60));
409-
let cancellation_type = ActivityCancellationType::from_i32(v.cancellation_type)
409+
let cancellation_type = ActivityCancellationType::try_from(v.cancellation_type)
410410
.unwrap_or(ActivityCancellationType::WaitCancellationCompleted);
411411
Ok(ValidScheduleLA {
412412
seq: v.seq,

core/src/telemetry/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ mod prometheus_server;
1010

1111
#[cfg(feature = "otel")]
1212
pub use metrics::{default_buckets_for, MetricsCallBuffer};
13-
1413
#[cfg(feature = "otel")]
1514
pub use otel::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
1615

core/src/telemetry/otel.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ use opentelemetry_sdk::{
2121
data::Temporality,
2222
new_view,
2323
reader::{AggregationSelector, DefaultAggregationSelector, TemporalitySelector},
24-
Aggregation, Instrument, InstrumentKind, MeterProvider, MeterProviderBuilder,
25-
PeriodicReader, View,
24+
Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader, View,
2625
},
2726
runtime, AttributeSet, Resource,
2827
};
@@ -152,7 +151,7 @@ pub fn build_otlp_metric_exporter(
152151
.with_interval(opts.metric_periodicity)
153152
.build();
154153
let mp = augment_meter_provider_with_defaults(
155-
MeterProvider::builder().with_reader(reader),
154+
MeterProviderBuilder::default().with_reader(reader),
156155
&opts.global_tags,
157156
)?
158157
.build();
@@ -167,16 +166,18 @@ pub struct StartedPromServer {
167166

168167
/// Builds and runs a prometheus endpoint which can be scraped by prom instances for metrics export.
169168
/// Returns the meter that can be used as a [CoreMeter].
169+
///
170+
/// Requires a Tokio runtime to exist, and will block briefly while binding the server endpoint.
170171
pub fn start_prometheus_metric_exporter(
171172
opts: PrometheusExporterOptions,
172173
) -> Result<StartedPromServer, anyhow::Error> {
173174
let (srv, exporter) = PromServer::new(&opts, SDKAggSelector::default())?;
174175
let meter_provider = augment_meter_provider_with_defaults(
175-
MeterProvider::builder().with_reader(exporter),
176+
MeterProviderBuilder::default().with_reader(exporter),
176177
&opts.global_tags,
177178
)?
178179
.build();
179-
let bound_addr = srv.bound_addr();
180+
let bound_addr = srv.bound_addr()?;
180181
let handle = tokio::spawn(async move { srv.run().await });
181182
Ok(StartedPromServer {
182183
meter: Arc::new(CoreOtelMeter(meter_provider.meter(TELEM_SERVICE_NAME))),

core/src/telemetry/prometheus_server.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
use hyper::{
2-
header::CONTENT_TYPE,
3-
server::conn::AddrIncoming,
4-
service::{make_service_fn, service_fn},
5-
Body, Method, Request, Response, Server,
1+
use http_body_util::Full;
2+
use hyper::{body::Bytes, header::CONTENT_TYPE, service::service_fn, Method, Request, Response};
3+
use hyper_util::{
4+
rt::{TokioExecutor, TokioIo},
5+
server::conn::auto,
66
};
77
use opentelemetry_prometheus::PrometheusExporter;
88
use opentelemetry_sdk::metrics::reader::AggregationSelector;
99
use prometheus::{Encoder, Registry, TextEncoder};
10-
use std::{convert::Infallible, net::SocketAddr};
10+
use std::net::{SocketAddr, TcpListener};
1111
use temporal_sdk_core_api::telemetry::PrometheusExporterOptions;
12+
use tokio::io;
1213

1314
/// Exposes prometheus metrics for scraping
1415
pub(super) struct PromServer {
15-
bound_addr: AddrIncoming,
16+
listener: TcpListener,
1617
registry: Registry,
1718
}
1819

@@ -36,37 +37,49 @@ impl PromServer {
3637
} else {
3738
exporter
3839
};
39-
let bound_addr = AddrIncoming::bind(&opts.socket_addr)?;
4040
Ok((
4141
Self {
42-
bound_addr,
42+
listener: TcpListener::bind(opts.socket_addr)?,
4343
registry,
4444
},
4545
exporter.build()?,
4646
))
4747
}
4848

49-
pub async fn run(self) -> hyper::Result<()> {
49+
pub async fn run(self) -> Result<(), anyhow::Error> {
5050
// Spin up hyper server to serve metrics for scraping. We use hyper since we already depend
5151
// on it via Tonic.
52-
let svc = make_service_fn(move |_conn| {
52+
self.listener.set_nonblocking(true)?;
53+
let listener = tokio::net::TcpListener::from_std(self.listener)?;
54+
loop {
55+
let (stream, _) = listener.accept().await?;
56+
let io = TokioIo::new(stream);
5357
let regclone = self.registry.clone();
54-
async move { Ok::<_, Infallible>(service_fn(move |req| metrics_req(req, regclone.clone()))) }
55-
});
56-
let server = Server::builder(self.bound_addr).serve(svc);
57-
server.await
58+
tokio::task::spawn(async move {
59+
let server = auto::Builder::new(TokioExecutor::new());
60+
if let Err(e) = server
61+
.serve_connection(
62+
io,
63+
service_fn(move |req| metrics_req(req, regclone.clone())),
64+
)
65+
.await
66+
{
67+
warn!("Error serving metrics connection: {:?}", e);
68+
}
69+
});
70+
}
5871
}
5972

60-
pub fn bound_addr(&self) -> SocketAddr {
61-
self.bound_addr.local_addr()
73+
pub fn bound_addr(&self) -> io::Result<SocketAddr> {
74+
self.listener.local_addr()
6275
}
6376
}
6477

6578
/// Serves prometheus metrics in the expected format for scraping
6679
async fn metrics_req(
67-
req: Request<Body>,
80+
req: Request<hyper::body::Incoming>,
6881
registry: Registry,
69-
) -> Result<Response<Body>, hyper::Error> {
82+
) -> Result<Response<Full<Bytes>>, hyper::Error> {
7083
let response = match (req.method(), req.uri().path()) {
7184
(&Method::GET, "/metrics") => {
7285
let mut buffer = vec![];
@@ -77,12 +90,12 @@ async fn metrics_req(
7790
Response::builder()
7891
.status(200)
7992
.header(CONTENT_TYPE, encoder.format_type())
80-
.body(Body::from(buffer))
93+
.body(buffer.into())
8194
.unwrap()
8295
}
8396
_ => Response::builder()
8497
.status(404)
85-
.body(Body::empty())
98+
.body(vec![].into())
8699
.expect("Can't fail to construct empty resp"),
87100
};
88101
Ok(response)

core/src/worker/activities/local_activities.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl LocalActivityExecutionResult {
9797
..
9898
}),
9999
..
100-
}) => TimeoutType::from_i32(*timeout_type),
100+
}) => TimeoutType::try_from(*timeout_type).ok(),
101101
_ => None,
102102
}
103103
}

core/src/worker/workflow/machines/activity_state_machine.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ impl ActivityMachine {
117117
let mut s = Self::from_parts(
118118
Created {}.into(),
119119
SharedState {
120-
cancellation_type: ActivityCancellationType::from_i32(attrs.cancellation_type)
121-
.unwrap(),
120+
cancellation_type: ActivityCancellationType::try_from(attrs.cancellation_type)
121+
.unwrap_or(ActivityCancellationType::TryCancel),
122122
attrs,
123123
internal_flags,
124124
scheduled_event_id: 0,

0 commit comments

Comments
 (0)