Skip to content

Commit 64c608c

Browse files
authored
feat(inbound): record metrics about transport header usage (#3723)
Inbound proxies may receive meshed traffic directly on the proxy's inbound port with a transport header, informing inbound routing behavior. This change updates the inbound proxy to record metrics about the usage of transport headers, including the total number of requests with a transport header by session protocol and target port.
1 parent 25bc973 commit 64c608c

File tree

4 files changed

+217
-0
lines changed

4 files changed

+217
-0
lines changed

linkerd/app/inbound/src/direct.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ use std::fmt::Debug;
1515
use thiserror::Error;
1616
use tracing::{debug_span, info_span};
1717

18+
mod metrics;
19+
20+
pub use self::metrics::MetricsFamilies;
21+
1822
/// Creates I/O errors when a connection cannot be forwarded because no transport
1923
/// header was present.
2024
#[derive(Debug, Default)]
@@ -108,6 +112,7 @@ impl<N> Inbound<N> {
108112
{
109113
self.map_stack(|config, rt, inner| {
110114
let detect_timeout = config.proxy.detect_protocol_timeout;
115+
let metrics = rt.metrics.direct.clone();
111116

112117
let identity = rt
113118
.identity
@@ -211,6 +216,7 @@ impl<N> Inbound<N> {
211216
)
212217
.check_new_service::<(TransportHeader, ClientInfo), _>()
213218
// Use ALPN to determine whether a transport header should be read.
219+
.push(metrics::NewRecord::layer(metrics))
214220
.push(svc::ArcNewService::layer())
215221
.push(NewTransportHeaderServer::layer(detect_timeout))
216222
.check_new_service::<ClientInfo, _>()
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use super::ClientInfo;
2+
use linkerd_app_core::{
3+
metrics::prom::{self, EncodeLabelSetMut},
4+
svc, tls,
5+
transport_header::{SessionProtocol, TransportHeader},
6+
};
7+
8+
#[cfg(test)]
9+
mod tests;
10+
11+
#[derive(Clone, Debug)]
12+
pub struct NewRecord<N> {
13+
inner: N,
14+
metrics: MetricsFamilies,
15+
}
16+
17+
#[derive(Clone, Debug, Default)]
18+
pub struct MetricsFamilies {
19+
connections: prom::Family<Labels, prom::Counter>,
20+
}
21+
22+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23+
struct Labels {
24+
header: TransportHeader,
25+
client_id: tls::ClientId,
26+
}
27+
28+
impl MetricsFamilies {
29+
pub fn register(reg: &mut prom::Registry) -> Self {
30+
let connections = prom::Family::default();
31+
reg.register(
32+
"connections",
33+
"TCP connections with transport headers",
34+
connections.clone(),
35+
);
36+
37+
Self { connections }
38+
}
39+
}
40+
41+
impl<N> NewRecord<N> {
42+
pub fn layer(metrics: MetricsFamilies) -> impl svc::layer::Layer<N, Service = Self> + Clone {
43+
svc::layer::mk(move |inner| Self {
44+
inner,
45+
metrics: metrics.clone(),
46+
})
47+
}
48+
}
49+
50+
impl<N> svc::NewService<(TransportHeader, ClientInfo)> for NewRecord<N>
51+
where
52+
N: svc::NewService<(TransportHeader, ClientInfo)>,
53+
{
54+
type Service = N::Service;
55+
56+
fn new_service(&self, (header, client): (TransportHeader, ClientInfo)) -> Self::Service {
57+
self.metrics
58+
.connections
59+
.get_or_create(&Labels {
60+
header: header.clone(),
61+
client_id: client.client_id.clone(),
62+
})
63+
.inc();
64+
65+
self.inner.new_service((header, client))
66+
}
67+
}
68+
69+
impl prom::EncodeLabelSetMut for Labels {
70+
fn encode_label_set(&self, enc: &mut prom::encoding::LabelSetEncoder<'_>) -> std::fmt::Result {
71+
use prom::encoding::EncodeLabel;
72+
(
73+
"session_protocol",
74+
self.header.protocol.as_ref().map(|p| match p {
75+
SessionProtocol::Http1 => "http/1",
76+
SessionProtocol::Http2 => "http/2",
77+
}),
78+
)
79+
.encode(enc.encode_label())?;
80+
("target_port", self.header.port).encode(enc.encode_label())?;
81+
("target_name", self.header.name.as_deref()).encode(enc.encode_label())?;
82+
("client_id", self.client_id.to_str()).encode(enc.encode_label())?;
83+
Ok(())
84+
}
85+
}
86+
87+
impl prom::encoding::EncodeLabelSet for Labels {
88+
fn encode(&self, mut enc: prom::encoding::LabelSetEncoder<'_>) -> Result<(), std::fmt::Error> {
89+
self.encode_label_set(&mut enc)
90+
}
91+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
use super::*;
2+
use crate::direct::ClientInfo;
3+
use futures::future;
4+
use linkerd_app_core::{
5+
io,
6+
metrics::prom,
7+
svc, tls,
8+
transport::addrs::{ClientAddr, OrigDstAddr, Remote},
9+
transport_header::{SessionProtocol, TransportHeader},
10+
Error,
11+
};
12+
use std::str::FromStr;
13+
14+
fn new_ok<T>() -> svc::ArcNewTcp<T, io::BoxedIo> {
15+
svc::ArcNewService::new(|_| svc::BoxService::new(svc::mk(|_| future::ok::<(), Error>(()))))
16+
}
17+
18+
macro_rules! assert_counted {
19+
($registry:expr, $proto:expr, $port:expr, $name:expr, $value:expr) => {{
20+
let mut buf = String::new();
21+
prom::encoding::text::encode_registry(&mut buf, $registry).expect("encode registry failed");
22+
let metric = format!("connections_total{{session_protocol=\"{}\",target_port=\"{}\",target_name=\"{}\",client_id=\"test.client\"}}", $proto, $port, $name);
23+
assert_eq!(
24+
buf.split_terminator('\n')
25+
.find(|l| l.starts_with(&*metric)),
26+
Some(&*format!("{metric} {}", $value)),
27+
"metric '{metric}' not found in:\n{buf}"
28+
);
29+
}};
30+
}
31+
32+
// Added helper to setup and run the test
33+
fn run_metric_test(header: TransportHeader) -> prom::Registry {
34+
let mut registry = prom::Registry::default();
35+
let families = MetricsFamilies::register(&mut registry);
36+
let new_record = svc::layer::Layer::layer(&NewRecord::layer(families.clone()), new_ok());
37+
// common client info
38+
let client_id = tls::ClientId::from_str("test.client").unwrap();
39+
let client_addr = Remote(ClientAddr(([127, 0, 0, 1], 40000).into()));
40+
let local_addr = OrigDstAddr(([127, 0, 0, 1], 4143).into());
41+
let client_info = ClientInfo {
42+
client_id: client_id.clone(),
43+
alpn: Some(tls::NegotiatedProtocol("transport.l5d.io/v1".into())),
44+
client_addr,
45+
local_addr,
46+
};
47+
let _svc = svc::NewService::new_service(&new_record, (header.clone(), client_info.clone()));
48+
registry
49+
}
50+
51+
#[test]
52+
fn records_metrics_http1_local() {
53+
let header = TransportHeader {
54+
port: 8080,
55+
name: None,
56+
protocol: Some(SessionProtocol::Http1),
57+
};
58+
let registry = run_metric_test(header);
59+
assert_counted!(&registry, "http/1", 8080, "", 1);
60+
}
61+
62+
#[test]
63+
fn records_metrics_http2_local() {
64+
let header = TransportHeader {
65+
port: 8081,
66+
name: None,
67+
protocol: Some(SessionProtocol::Http2),
68+
};
69+
let registry = run_metric_test(header);
70+
assert_counted!(&registry, "http/2", 8081, "", 1);
71+
}
72+
73+
#[test]
74+
fn records_metrics_opaq_local() {
75+
let header = TransportHeader {
76+
port: 8082,
77+
name: None,
78+
protocol: None,
79+
};
80+
let registry = run_metric_test(header);
81+
assert_counted!(&registry, "", 8082, "", 1);
82+
}
83+
84+
#[test]
85+
fn records_metrics_http1_gateway() {
86+
let header = TransportHeader {
87+
port: 8080,
88+
name: Some("mysvc.myns.svc.cluster.local".parse().unwrap()),
89+
protocol: Some(SessionProtocol::Http1),
90+
};
91+
let registry = run_metric_test(header);
92+
assert_counted!(&registry, "http/1", 8080, "mysvc.myns.svc.cluster.local", 1);
93+
}
94+
95+
#[test]
96+
fn records_metrics_http2_gateway() {
97+
let header = TransportHeader {
98+
port: 8081,
99+
name: Some("mysvc.myns.svc.cluster.local".parse().unwrap()),
100+
protocol: Some(SessionProtocol::Http2),
101+
};
102+
let registry = run_metric_test(header);
103+
assert_counted!(&registry, "http/2", 8081, "mysvc.myns.svc.cluster.local", 1);
104+
}
105+
106+
#[test]
107+
fn records_metrics_opaq_gateway() {
108+
let header = TransportHeader {
109+
port: 8082,
110+
name: Some("mysvc.myns.svc.cluster.local".parse().unwrap()),
111+
protocol: None,
112+
};
113+
let registry = run_metric_test(header);
114+
assert_counted!(&registry, "", 8082, "mysvc.myns.svc.cluster.local", 1);
115+
}

linkerd/app/inbound/src/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@ pub struct InboundMetrics {
2727
pub proxy: Proxy,
2828

2929
pub detect: crate::detect::MetricsFamilies,
30+
pub direct: crate::direct::MetricsFamilies,
3031
}
3132

3233
impl InboundMetrics {
3334
pub(crate) fn new(proxy: Proxy, reg: &mut prom::Registry) -> Self {
3435
let detect =
3536
crate::detect::MetricsFamilies::register(reg.sub_registry_with_prefix("tcp_detect"));
37+
let direct = crate::direct::MetricsFamilies::register(
38+
reg.sub_registry_with_prefix("tcp_transport_header"),
39+
);
3640

3741
Self {
3842
http_authz: authz::HttpAuthzMetrics::default(),
@@ -41,6 +45,7 @@ impl InboundMetrics {
4145
tcp_errors: error::TcpErrorMetrics::default(),
4246
proxy,
4347
detect,
48+
direct,
4449
}
4550
}
4651
}

0 commit comments

Comments
 (0)