Skip to content

Commit 25bc973

Browse files
authored
feat: instrument HTTP protocol detection metrics (#3722)
This change updates the DetectHttp middleware to record metrics about HTTP protocol detection. Specfically, it records the the counts of results and a very coarse histogram of the time taken to detect the protocol. The inbound, outbound, and admin (via inbound) stacks are updated to record metrics against the main registry.
1 parent e7c2afd commit 25bc973

File tree

18 files changed

+429
-61
lines changed

18 files changed

+429
-61
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1725,6 +1725,7 @@ dependencies = [
17251725
"linkerd-io",
17261726
"linkerd-stack",
17271727
"linkerd-tracing",
1728+
"prometheus-client",
17281729
"thiserror 2.0.12",
17291730
"tokio",
17301731
"tokio-test",

linkerd/app/admin/src/stack.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ impl Config {
122122
.push_on_service(http::BoxResponse::layer())
123123
.arc_new_clone_http();
124124

125+
let inbound::DetectMetrics(detect_metrics) = metrics.detect.clone();
125126
let tcp = http
126127
.unlift_new()
127128
.push(http::NewServeHttp::layer({
@@ -177,9 +178,12 @@ impl Config {
177178
)
178179
.arc_new_tcp()
179180
.lift_new_with_target()
180-
.push(http::NewDetect::layer(svc::stack::CloneParam::from(
181-
http::DetectParams { read_timeout: DETECT_TIMEOUT }
182-
)))
181+
.push(http::NewDetect::layer(move |tcp: &Tcp| {
182+
http::DetectParams {
183+
read_timeout: DETECT_TIMEOUT,
184+
metrics: detect_metrics.metrics(tcp.policy.server_label())
185+
}
186+
}))
183187
.push(transport::metrics::NewServer::layer(metrics.proxy.transport))
184188
.push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {
185189
Tcp {

linkerd/app/inbound/src/accept.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,11 @@ mod tests {
182182
}
183183

184184
fn inbound() -> Inbound<()> {
185-
Inbound::new(test_util::default_config(), test_util::runtime().0)
185+
Inbound::new(
186+
test_util::default_config(),
187+
test_util::runtime().0,
188+
&mut Default::default(),
189+
)
186190
}
187191

188192
fn new_panic<T>(msg: &'static str) -> svc::ArcNewTcp<T, io::DuplexStream> {

linkerd/app/inbound/src/detect.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
};
55
use linkerd_app_core::{
66
identity, io,
7-
metrics::ServerLabel,
7+
metrics::{prom, ServerLabel},
88
proxy::http,
99
svc, tls,
1010
transport::{
@@ -20,6 +20,10 @@ use tracing::info;
2020
#[cfg(test)]
2121
mod tests;
2222

23+
#[derive(Clone, Debug)]
24+
pub struct MetricsFamilies(pub HttpDetectMetrics);
25+
pub type HttpDetectMetrics = http::DetectMetricsFamilies<ServerLabel>;
26+
2327
#[derive(Clone, Debug, PartialEq, Eq)]
2428
pub(crate) struct Forward {
2529
client_addr: Remote<ClientAddr>,
@@ -61,7 +65,11 @@ type TlsIo<I> = tls::server::Io<identity::ServerIo<tls::server::DetectIo<I>>, I>
6165
impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
6266
/// Builds a stack that terminates mesh TLS and detects whether the traffic is HTTP (as hinted
6367
/// by policy).
64-
pub(crate) fn push_detect<T, I, F, FSvc>(self, forward: F) -> Inbound<svc::ArcNewTcp<T, I>>
68+
pub(crate) fn push_detect<T, I, F, FSvc>(
69+
self,
70+
MetricsFamilies(metrics): MetricsFamilies,
71+
forward: F,
72+
) -> Inbound<svc::ArcNewTcp<T, I>>
6573
where
6674
T: svc::Param<OrigDstAddr> + svc::Param<Remote<ClientAddr>> + svc::Param<AllowPolicy>,
6775
T: Clone + Send + 'static,
@@ -72,14 +80,18 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
7280
FSvc::Error: Into<Error>,
7381
FSvc::Future: Send,
7482
{
75-
self.push_detect_http(forward.clone())
83+
self.push_detect_http(metrics, forward.clone())
7684
.push_detect_tls(forward)
7785
}
7886

7987
/// Builds a stack that handles HTTP detection once TLS detection has been performed. If the
8088
/// connection is determined to be HTTP, the inner stack is used; otherwise the connection is
8189
/// passed to the provided 'forward' stack.
82-
fn push_detect_http<I, F, FSvc>(self, forward: F) -> Inbound<svc::ArcNewTcp<Tls, I>>
90+
fn push_detect_http<I, F, FSvc>(
91+
self,
92+
metrics: HttpDetectMetrics,
93+
forward: F,
94+
) -> Inbound<svc::ArcNewTcp<Tls, I>>
8395
where
8496
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr,
8597
I: Debug + Send + Sync + Unpin + 'static,
@@ -153,11 +165,12 @@ impl Inbound<svc::ArcNewTcp<Http, io::BoxedIo>> {
153165
forward.into_inner(),
154166
)
155167
.lift_new_with_target()
156-
.push(http::NewDetect::layer(|Detect { timeout, .. }: &Detect| {
157-
http::DetectParams {
168+
.push(http::NewDetect::layer(
169+
move |Detect { timeout, tls }: &Detect| http::DetectParams {
158170
read_timeout: *timeout,
159-
}
160-
}))
171+
metrics: metrics.metrics(tls.policy.server_label()),
172+
},
173+
))
161174
.arc_new_tcp();
162175

163176
http.push_on_service(svc::MapTargetLayer::new(io::BoxedIo::new))
@@ -445,3 +458,13 @@ impl<T> svc::InsertParam<tls::ConditionalServerTls, T> for TlsParams {
445458
(tls, target)
446459
}
447460
}
461+
462+
// === impl MetricsFamilies ===
463+
464+
impl MetricsFamilies {
465+
pub fn register(reg: &mut prom::Registry) -> Self {
466+
Self(http::DetectMetricsFamilies::register(
467+
reg.sub_registry_with_prefix("http"),
468+
))
469+
}
470+
}

linkerd/app/inbound/src/detect/tests.rs

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ const HTTP1: &[u8] = b"GET / HTTP/1.1\r\nhost: example.com\r\n\r\n";
1313
const HTTP2: &[u8] = b"PRI * HTTP/2.0\r\n";
1414
const NOT_HTTP: &[u8] = b"foo\r\nbar\r\nblah\r\n";
1515

16+
const RESULTS_NOT_HTTP: &str = "results_total{result=\"not_http\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
17+
const RESULTS_HTTP1: &str = "results_total{result=\"http/1\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
18+
const RESULTS_HTTP2: &str = "results_total{result=\"http/2\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
19+
const RESULTS_READ_TIMEOUT: &str = "results_total{result=\"read_timeout\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
20+
const RESULTS_ERROR: &str = "results_total{result=\"error\",srv_group=\"policy.linkerd.io\",srv_kind=\"server\",srv_name=\"testsrv\",srv_port=\"1000\"}";
21+
1622
fn authzs() -> Arc<[Authorization]> {
1723
Arc::new([Authorization {
1824
authentication: Authentication::Unauthenticated,
@@ -41,6 +47,35 @@ fn allow(protocol: Protocol) -> AllowPolicy {
4147
allow
4248
}
4349

50+
macro_rules! assert_contains_metric {
51+
($registry:expr, $metric:expr, $value:expr) => {{
52+
let mut buf = String::new();
53+
prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed");
54+
let lines = buf.split_terminator('\n').collect::<Vec<_>>();
55+
assert_eq!(
56+
lines.iter().find(|l| l.starts_with($metric)),
57+
Some(&&*format!("{} {}", $metric, $value)),
58+
"metric '{}' not found in:\n{:?}",
59+
$metric,
60+
buf
61+
);
62+
}};
63+
}
64+
65+
macro_rules! assert_not_contains_metric {
66+
($registry:expr, $pattern:expr) => {{
67+
let mut buf = String::new();
68+
prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed");
69+
let lines = buf.split_terminator('\n').collect::<Vec<_>>();
70+
assert!(
71+
!lines.iter().any(|l| l.starts_with($pattern)),
72+
"metric '{}' found in:\n{:?}",
73+
$pattern,
74+
buf
75+
);
76+
}};
77+
}
78+
4479
#[tokio::test(flavor = "current_thread")]
4580
async fn detect_tls_opaque() {
4681
let _trace = trace::test::trace_init();
@@ -77,14 +112,21 @@ async fn detect_http_non_http() {
77112
let (ior, mut iow) = io::duplex(100);
78113
iow.write_all(NOT_HTTP).await.unwrap();
79114

115+
let mut registry = prom::Registry::default();
80116
inbound()
81117
.with_stack(new_panic("http stack must not be used"))
82-
.push_detect_http(new_ok())
118+
.push_detect_http(super::HttpDetectMetrics::register(&mut registry), new_ok())
83119
.into_inner()
84120
.new_service(target)
85121
.oneshot(ior)
86122
.await
87123
.expect("should succeed");
124+
125+
assert_contains_metric!(&registry, RESULTS_NOT_HTTP, 1);
126+
assert_contains_metric!(&registry, RESULTS_HTTP1, 0);
127+
assert_contains_metric!(&registry, RESULTS_HTTP2, 0);
128+
assert_contains_metric!(&registry, RESULTS_READ_TIMEOUT, 0);
129+
assert_contains_metric!(&registry, RESULTS_ERROR, 0);
88130
}
89131

90132
#[tokio::test(flavor = "current_thread")]
@@ -108,14 +150,24 @@ async fn detect_http() {
108150
let (ior, mut iow) = io::duplex(100);
109151
iow.write_all(HTTP1).await.unwrap();
110152

153+
let mut registry = prom::Registry::default();
111154
inbound()
112155
.with_stack(new_ok())
113-
.push_detect_http(new_panic("tcp stack must not be used"))
156+
.push_detect_http(
157+
super::HttpDetectMetrics::register(&mut registry),
158+
new_panic("tcp stack must not be used"),
159+
)
114160
.into_inner()
115161
.new_service(target)
116162
.oneshot(ior)
117163
.await
118164
.expect("should succeed");
165+
166+
assert_contains_metric!(&registry, RESULTS_NOT_HTTP, 0);
167+
assert_contains_metric!(&registry, RESULTS_HTTP1, 1);
168+
assert_contains_metric!(&registry, RESULTS_HTTP2, 0);
169+
assert_contains_metric!(&registry, RESULTS_READ_TIMEOUT, 0);
170+
assert_contains_metric!(&registry, RESULTS_ERROR, 0);
119171
}
120172

121173
#[tokio::test(flavor = "current_thread")]
@@ -134,14 +186,24 @@ async fn hinted_http1() {
134186
let (ior, mut iow) = io::duplex(100);
135187
iow.write_all(HTTP1).await.unwrap();
136188

189+
let mut registry = prom::Registry::default();
137190
inbound()
138191
.with_stack(new_ok())
139-
.push_detect_http(new_panic("tcp stack must not be used"))
192+
.push_detect_http(
193+
super::HttpDetectMetrics::register(&mut registry),
194+
new_panic("tcp stack must not be used"),
195+
)
140196
.into_inner()
141197
.new_service(target)
142198
.oneshot(ior)
143199
.await
144200
.expect("should succeed");
201+
202+
assert_contains_metric!(&registry, RESULTS_NOT_HTTP, 0);
203+
assert_contains_metric!(&registry, RESULTS_HTTP1, 1);
204+
assert_contains_metric!(&registry, RESULTS_HTTP2, 0);
205+
assert_contains_metric!(&registry, RESULTS_READ_TIMEOUT, 0);
206+
assert_contains_metric!(&registry, RESULTS_ERROR, 0);
145207
}
146208

147209
#[tokio::test(flavor = "current_thread")]
@@ -160,14 +222,24 @@ async fn hinted_http1_supports_http2() {
160222
let (ior, mut iow) = io::duplex(100);
161223
iow.write_all(HTTP2).await.unwrap();
162224

225+
let mut registry = prom::Registry::default();
163226
inbound()
164227
.with_stack(new_ok())
165-
.push_detect_http(new_panic("tcp stack must not be used"))
228+
.push_detect_http(
229+
super::HttpDetectMetrics::register(&mut registry),
230+
new_panic("tcp stack must not be used"),
231+
)
166232
.into_inner()
167233
.new_service(target)
168234
.oneshot(ior)
169235
.await
170236
.expect("should succeed");
237+
238+
assert_contains_metric!(&registry, RESULTS_NOT_HTTP, 0);
239+
assert_contains_metric!(&registry, RESULTS_HTTP1, 0);
240+
assert_contains_metric!(&registry, RESULTS_HTTP2, 1);
241+
assert_contains_metric!(&registry, RESULTS_READ_TIMEOUT, 0);
242+
assert_contains_metric!(&registry, RESULTS_ERROR, 0);
171243
}
172244

173245
#[tokio::test(flavor = "current_thread")]
@@ -185,14 +257,25 @@ async fn hinted_http2() {
185257

186258
let (ior, _) = io::duplex(100);
187259

260+
let mut registry = prom::Registry::default();
188261
inbound()
189262
.with_stack(new_ok())
190-
.push_detect_http(new_panic("tcp stack must not be used"))
263+
.push_detect_http(
264+
super::HttpDetectMetrics::register(&mut registry),
265+
new_panic("tcp stack must not be used"),
266+
)
191267
.into_inner()
192268
.new_service(target)
193269
.oneshot(ior)
194270
.await
195271
.expect("should succeed");
272+
273+
// No detection is performed when HTTP/2 is hinted, so no metrics are recorded.
274+
assert_not_contains_metric!(&registry, RESULTS_NOT_HTTP);
275+
assert_not_contains_metric!(&registry, RESULTS_HTTP1);
276+
assert_not_contains_metric!(&registry, RESULTS_HTTP2);
277+
assert_not_contains_metric!(&registry, RESULTS_READ_TIMEOUT);
278+
assert_not_contains_metric!(&registry, RESULTS_ERROR);
196279
}
197280

198281
fn client_id() -> tls::ClientId {
@@ -210,7 +293,11 @@ fn orig_dst_addr() -> OrigDstAddr {
210293
}
211294

212295
fn inbound() -> Inbound<()> {
213-
Inbound::new(test_util::default_config(), test_util::runtime().0)
296+
Inbound::new(
297+
test_util::default_config(),
298+
test_util::runtime().0,
299+
&mut Default::default(),
300+
)
214301
}
215302

216303
fn new_panic<T, I: 'static>(msg: &'static str) -> svc::ArcNewTcp<T, I> {

linkerd/app/inbound/src/http/tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ fn build_server<I>(
3333
where
3434
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
3535
{
36-
Inbound::new(cfg, rt)
36+
Inbound::new(cfg, rt, &mut Default::default())
3737
.with_stack(connect)
3838
.map_stack(|cfg, _, s| {
3939
s.push_map_target(|t| Param::<Remote<ServerAddr>>::param(&t))

linkerd/app/inbound/src/lib.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,15 @@ pub mod test_util;
2020

2121
#[cfg(fuzzing)]
2222
pub use self::http::fuzz as http_fuzz;
23-
pub use self::{metrics::InboundMetrics, policy::DefaultPolicy};
23+
pub use self::{
24+
detect::MetricsFamilies as DetectMetrics, metrics::InboundMetrics, policy::DefaultPolicy,
25+
};
2426
use linkerd_app_core::{
2527
config::{ConnectConfig, ProxyConfig, QueueConfig},
2628
drain,
2729
http_tracing::SpanSink,
2830
identity, io,
31+
metrics::prom,
2932
proxy::{tap, tcp},
3033
svc,
3134
transport::{self, Remote, ServerAddr},
@@ -148,9 +151,9 @@ impl<S> Inbound<S> {
148151
}
149152

150153
impl Inbound<()> {
151-
pub fn new(config: Config, runtime: ProxyRuntime) -> Self {
154+
pub fn new(config: Config, runtime: ProxyRuntime, prom: &mut prom::Registry) -> Self {
152155
let runtime = Runtime {
153-
metrics: InboundMetrics::new(runtime.metrics),
156+
metrics: InboundMetrics::new(runtime.metrics, prom),
154157
identity: runtime.identity,
155158
tap: runtime.tap,
156159
span_sink: runtime.span_sink,
@@ -166,7 +169,11 @@ impl Inbound<()> {
166169
#[cfg(any(test, feature = "test-util"))]
167170
pub fn for_test() -> (Self, drain::Signal) {
168171
let (rt, drain) = test_util::runtime();
169-
let this = Self::new(test_util::default_config(), rt);
172+
let this = Self::new(
173+
test_util::default_config(),
174+
rt,
175+
&mut prom::Registry::default(),
176+
);
170177
(this, drain)
171178
}
172179

linkerd/app/inbound/src/metrics.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,22 @@ pub struct InboundMetrics {
2525
/// Holds metrics that are common to both inbound and outbound proxies. These metrics are
2626
/// reported separately
2727
pub proxy: Proxy,
28+
29+
pub detect: crate::detect::MetricsFamilies,
2830
}
2931

3032
impl InboundMetrics {
31-
pub(crate) fn new(proxy: Proxy) -> Self {
33+
pub(crate) fn new(proxy: Proxy, reg: &mut prom::Registry) -> Self {
34+
let detect =
35+
crate::detect::MetricsFamilies::register(reg.sub_registry_with_prefix("tcp_detect"));
36+
3237
Self {
3338
http_authz: authz::HttpAuthzMetrics::default(),
3439
http_errors: error::HttpErrorMetrics::default(),
3540
tcp_authz: authz::TcpAuthzMetrics::default(),
3641
tcp_errors: error::TcpErrorMetrics::default(),
3742
proxy,
43+
detect,
3844
}
3945
}
4046
}

0 commit comments

Comments
 (0)