Skip to content

Commit a683ff5

Browse files
authored
feat(outbound): instrument per-parent connection protocol counts (#3733)
The outbound proxy makes protocol decisions based on the discovery response, keyed on a "parent" reference. This change adds a `protocol::metrics` middleware that records connection counts by parent reference.
1 parent ec168c3 commit a683ff5

File tree

4 files changed

+362
-2
lines changed

4 files changed

+362
-2
lines changed

linkerd/app/outbound/src/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub struct OutboundMetrics {
3636

3737
#[derive(Clone, Debug, Default)]
3838
pub(crate) struct PromMetrics {
39+
pub(crate) protocol: crate::protocol::MetricsFamilies,
3940
pub(crate) http_detect: crate::http::DetectMetricsFamilies<ParentRef>,
4041
pub(crate) http: crate::http::HttpMetrics,
4142
pub(crate) opaq: crate::opaq::OpaqMetrics,
@@ -89,6 +90,9 @@ where
8990

9091
impl PromMetrics {
9192
pub fn register(registry: &mut prom::Registry) -> Self {
93+
let protocol = crate::protocol::MetricsFamilies::register(
94+
registry.sub_registry_with_prefix("tcp_protocol"),
95+
);
9296
let http_detect = crate::http::DetectMetricsFamilies::register(
9397
// Scoped consistently with the inbound metrics.
9498
registry.sub_registry_with_prefix("tcp_detect_http"),
@@ -103,6 +107,7 @@ impl PromMetrics {
103107
let tls = crate::tls::TlsMetrics::register(registry.sub_registry_with_prefix("tls"));
104108

105109
Self {
110+
protocol,
106111
http_detect,
107112
http,
108113
opaq,

linkerd/app/outbound/src/protocol.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@ use crate::{http, Outbound, ParentRef};
22
use linkerd_app_core::{io, svc, Error, Infallible};
33
use std::{fmt::Debug, hash::Hash};
44

5+
mod metrics;
6+
#[cfg(test)]
7+
mod tests;
8+
9+
pub use self::metrics::MetricsFamilies;
10+
511
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
612
pub struct Http<T> {
713
version: http::Variant,
814
parent: T,
915
}
1016

1117
/// Parameter type indicating how the proxy should handle a connection.
12-
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
18+
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
1319
pub enum Protocol {
1420
Http1,
1521
Http2,
@@ -97,7 +103,7 @@ impl<N> Outbound<N> {
97103
.arc_new_tcp()
98104
});
99105

100-
http.map_stack(|_, _, http| {
106+
http.map_stack(|_, rt, http| {
101107
// First separate traffic that needs protocol detection. Then switch
102108
// between traffic that is known to be HTTP or opaque.
103109
let known = http.push_switch(
@@ -131,6 +137,7 @@ impl<N> Outbound<N> {
131137
},
132138
detect.into_inner(),
133139
)
140+
.push(metrics::NewRecord::layer(rt.metrics.prom.protocol.clone()))
134141
.arc_new_tcp()
135142
})
136143
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use super::Protocol;
2+
use crate::ParentRef;
3+
use linkerd_app_core::{
4+
metrics::prom::{self, EncodeLabelSetMut},
5+
svc,
6+
};
7+
8+
#[derive(Clone, Debug)]
9+
pub struct NewRecord<N> {
10+
inner: N,
11+
metrics: MetricsFamilies,
12+
}
13+
14+
#[derive(Clone, Debug)]
15+
pub struct Record<S> {
16+
inner: S,
17+
counter: prom::Counter,
18+
}
19+
20+
#[derive(Clone, Debug, Default)]
21+
pub struct MetricsFamilies {
22+
connections: prom::Family<Labels, prom::Counter>,
23+
}
24+
25+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
26+
struct Labels {
27+
protocol: Protocol,
28+
parent_ref: ParentRef,
29+
}
30+
31+
// === impl MetricsFamilies ===
32+
33+
impl MetricsFamilies {
34+
pub fn register(reg: &mut prom::Registry) -> Self {
35+
let connections = prom::Family::default();
36+
reg.register(
37+
"connections",
38+
"Outbound TCP connections by protocol configuration",
39+
connections.clone(),
40+
);
41+
42+
Self { connections }
43+
}
44+
}
45+
46+
// === impl NewRecord ===
47+
48+
impl<N> NewRecord<N> {
49+
pub fn layer(metrics: MetricsFamilies) -> impl svc::layer::Layer<N, Service = Self> + Clone {
50+
svc::layer::mk(move |inner| Self {
51+
inner,
52+
metrics: metrics.clone(),
53+
})
54+
}
55+
}
56+
57+
impl<T, N> svc::NewService<T> for NewRecord<N>
58+
where
59+
T: svc::Param<Protocol>,
60+
T: svc::Param<ParentRef>,
61+
N: svc::NewService<T>,
62+
{
63+
type Service = Record<N::Service>;
64+
65+
fn new_service(&self, target: T) -> Self::Service {
66+
let counter = (*self.metrics.connections.get_or_create(&Labels {
67+
protocol: target.param(),
68+
parent_ref: target.param(),
69+
}))
70+
.clone();
71+
72+
let inner = self.inner.new_service(target);
73+
Record { inner, counter }
74+
}
75+
}
76+
77+
// === impl Record ===
78+
79+
impl<S, I> svc::Service<I> for Record<S>
80+
where
81+
S: svc::Service<I>,
82+
{
83+
type Response = S::Response;
84+
type Error = S::Error;
85+
type Future = S::Future;
86+
87+
fn poll_ready(
88+
&mut self,
89+
cx: &mut std::task::Context<'_>,
90+
) -> std::task::Poll<Result<(), Self::Error>> {
91+
self.inner.poll_ready(cx)
92+
}
93+
94+
fn call(&mut self, io: I) -> Self::Future {
95+
self.counter.inc();
96+
self.inner.call(io)
97+
}
98+
}
99+
100+
// === impl Labels ===
101+
102+
impl prom::EncodeLabelSetMut for Labels {
103+
fn encode_label_set(&self, enc: &mut prom::encoding::LabelSetEncoder<'_>) -> std::fmt::Result {
104+
use prom::encoding::EncodeLabel;
105+
106+
let protocol = match self.protocol {
107+
Protocol::Http1 => "http/1",
108+
Protocol::Http2 => "http/2",
109+
Protocol::Detect => "detect",
110+
Protocol::Opaque => "opaq",
111+
Protocol::Tls => "tls",
112+
};
113+
114+
("protocol", protocol).encode(enc.encode_label())?;
115+
self.parent_ref.encode_label_set(enc)?;
116+
117+
Ok(())
118+
}
119+
}
120+
121+
impl prom::encoding::EncodeLabelSet for Labels {
122+
fn encode(&self, mut enc: prom::encoding::LabelSetEncoder<'_>) -> Result<(), std::fmt::Error> {
123+
self.encode_label_set(&mut enc)
124+
}
125+
}

0 commit comments

Comments
 (0)