Skip to content

Commit c50c590

Browse files
committed
refactor(proxy/http): a concrete orig_proto error
this commit introduces a concrete error type for the `orig_proto` upgrade layer. this layer is used by the proxy's http client to transparently upgrade outbound http/1 traffic to http/2. rather than boxing errors, we define a concrete error type to facilitate inspecting errors in the future. for now, the top-level http client continues to box errors thrown by the "orig_proto" upgrade client. Signed-off-by: katelyn martin <kate@buoyant.io>
1 parent e49fc6c commit c50c590

File tree

2 files changed

+88
-50
lines changed

2 files changed

+88
-50
lines changed

linkerd/proxy/http/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ where
135135
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
136136
match self {
137137
Self::H2(ref mut svc) => svc.poll_ready(cx).map_err(Into::into),
138-
Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx),
138+
Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Into::into),
139139
Self::Http1(ref mut svc) => svc.poll_ready(cx),
140140
}
141141
}
@@ -156,7 +156,7 @@ where
156156

157157
match self {
158158
Self::Http1(ref mut svc) => svc.call(req),
159-
Self::OrigProtoUpgrade(ref mut svc) => svc.call(req),
159+
Self::OrigProtoUpgrade(ref mut svc) => svc.call(req).map_err(Into::into).boxed(),
160160
Self::H2(ref mut svc) => Box::pin(
161161
svc.call(req)
162162
.err_into::<Error>()

linkerd/proxy/http/src/orig_proto.rs

Lines changed: 86 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::{h1, h2, Body};
22
use futures::prelude::*;
33
use http::header::{HeaderValue, TRANSFER_ENCODING};
44
use http_body::Frame;
5-
use linkerd_error::{Error, Result};
5+
use linkerd_error::Result;
66
use linkerd_http_box::BoxBody;
77
use linkerd_stack::{layer, MakeConnection, Service};
88
use std::{
@@ -43,6 +43,20 @@ pub struct Downgrade<S> {
4343
#[derive(Clone, Debug)]
4444
pub struct WasUpgrade(());
4545

46+
/// An error returned by the [`Upgrade`] client.
47+
///
48+
/// This can represent an error presented by either of the underlying HTTP/1 or HTTP/2 clients,
49+
/// or a "downgraded" HTTP/2 error.
50+
#[derive(Debug, Error)]
51+
pub enum Error {
52+
#[error("{0}")]
53+
Downgraded(#[from] DowngradedH2Error),
54+
#[error(transparent)]
55+
H1(linkerd_error::Error),
56+
#[error(transparent)]
57+
H2(hyper::Error),
58+
}
59+
4660
// === impl Upgrade ===
4761

4862
impl<C, T, B> Upgrade<C, T, B> {
@@ -59,22 +73,24 @@ where
5973
C::Future: Unpin + Send + 'static,
6074
B: crate::Body + Send + Unpin + 'static,
6175
B::Data: Send,
62-
B::Error: Into<Error> + Send + Sync,
76+
B::Error: Into<linkerd_error::Error> + Send + Sync,
6377
{
6478
type Response = http::Response<BoxBody>;
6579
type Error = Error;
66-
type Future = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;
80+
type Future = Pin<
81+
Box<dyn Future<Output = Result<http::Response<BoxBody>, Self::Error>> + Send + 'static>,
82+
>;
6783

6884
#[inline]
6985
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
7086
let Self { http1, h2 } = self;
7187

72-
match http1.poll_ready(cx) {
88+
match http1.poll_ready(cx).map_err(Error::H1) {
7389
Poll::Ready(Ok(())) => {}
7490
poll => return poll,
7591
}
7692

77-
h2.poll_ready(cx).map_err(downgrade_h2_error)
93+
h2.poll_ready(cx).map_err(Error::h2)
7894
}
7995

8096
fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
@@ -85,7 +101,12 @@ where
85101
.is_some()
86102
{
87103
debug!("Skipping orig-proto upgrade due to HTTP/1.1 upgrade");
88-
return Box::pin(self.http1.call(req).map_ok(|rsp| rsp.map(BoxBody::new)));
104+
return Box::pin(
105+
self.http1
106+
.call(req)
107+
.map_ok(|rsp| rsp.map(BoxBody::new))
108+
.map_err(Error::H1),
109+
);
89110
}
90111

91112
let orig_version = req.version();
@@ -113,97 +134,113 @@ where
113134

114135
*req.version_mut() = http::Version::HTTP_2;
115136

116-
Box::pin(
117-
self.h2
118-
.call(req)
119-
.map_err(downgrade_h2_error)
120-
.map_ok(move |mut rsp| {
121-
let version = rsp
122-
.headers_mut()
123-
.remove(L5D_ORIG_PROTO)
124-
.and_then(|orig_proto| {
125-
if orig_proto == "HTTP/1.1" {
126-
Some(http::Version::HTTP_11)
127-
} else if orig_proto == "HTTP/1.0" {
128-
Some(http::Version::HTTP_10)
129-
} else {
130-
None
131-
}
132-
})
133-
.unwrap_or(orig_version);
134-
trace!(?version, "Downgrading response");
135-
*rsp.version_mut() = version;
136-
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
137-
}),
138-
)
137+
Box::pin(self.h2.call(req).map_err(Error::h2).map_ok(move |mut rsp| {
138+
let version = rsp
139+
.headers_mut()
140+
.remove(L5D_ORIG_PROTO)
141+
.and_then(|orig_proto| {
142+
if orig_proto == "HTTP/1.1" {
143+
Some(http::Version::HTTP_11)
144+
} else if orig_proto == "HTTP/1.0" {
145+
Some(http::Version::HTTP_10)
146+
} else {
147+
None
148+
}
149+
})
150+
.unwrap_or(orig_version);
151+
trace!(?version, "Downgrading response");
152+
*rsp.version_mut() = version;
153+
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
154+
}))
155+
}
156+
}
157+
158+
// === impl Error ===
159+
160+
impl Error {
161+
fn h2(err: hyper::Error) -> Self {
162+
if let Some(downgraded) = downgrade_h2_error(&err) {
163+
return Self::Downgraded(downgraded);
164+
}
165+
166+
Self::H2(err)
139167
}
140168
}
141169

142170
/// Handles HTTP/2 client errors for HTTP/1.1 requests by wrapping the error type. This
143171
/// simplifies error handling elsewhere so that HTTP/2 errors can only be encountered when the
144172
/// original request was HTTP/2.
145-
fn downgrade_h2_error<E: std::error::Error + Send + Sync + 'static>(orig: E) -> Error {
173+
fn downgrade_h2_error<E: std::error::Error + Send + Sync + 'static>(
174+
orig: &E,
175+
) -> Option<DowngradedH2Error> {
146176
#[inline]
147177
fn reason(e: &(dyn std::error::Error + 'static)) -> Option<h2::Reason> {
148178
e.downcast_ref::<h2::H2Error>()?.reason()
149179
}
150180

151181
// If the provided error was an H2 error, wrap it as a downgraded error.
152-
if let Some(reason) = reason(&orig) {
153-
return DowngradedH2Error(reason).into();
182+
if let Some(reason) = reason(orig) {
183+
return Some(DowngradedH2Error(reason));
154184
}
155185

156186
// Otherwise, check the source chain to see if its original error was an H2 error.
157187
let mut cause = orig.source();
158188
while let Some(error) = cause {
159189
if let Some(reason) = reason(error) {
160-
return DowngradedH2Error(reason).into();
190+
return Some(DowngradedH2Error(reason));
161191
}
162192

163193
cause = error.source();
164194
}
165195

166-
// If the error was not an H2 error, return the original error (boxed).
167-
orig.into()
196+
// If the error was not an H2 error, return None.
197+
None
168198
}
169199

170200
#[cfg(test)]
171201
#[test]
172202
fn test_downgrade_h2_error() {
173203
assert!(
174-
downgrade_h2_error(h2::H2Error::from(h2::Reason::PROTOCOL_ERROR)).is::<DowngradedH2Error>(),
204+
downgrade_h2_error(&h2::H2Error::from(h2::Reason::PROTOCOL_ERROR)).is_some(),
175205
"h2 errors must be downgraded"
176206
);
177207

178208
#[derive(Debug, Error)]
179209
#[error("wrapped h2 error: {0}")]
180-
struct WrapError(#[source] Error);
210+
struct WrapError(#[source] linkerd_error::Error);
181211
assert!(
182-
downgrade_h2_error(WrapError(
212+
downgrade_h2_error(&WrapError(
183213
h2::H2Error::from(h2::Reason::PROTOCOL_ERROR).into()
184214
))
185-
.is::<DowngradedH2Error>(),
215+
.is_some(),
186216
"wrapped h2 errors must be downgraded"
187217
);
188218

189219
assert!(
190-
downgrade_h2_error(WrapError(
220+
downgrade_h2_error(&WrapError(
191221
WrapError(h2::H2Error::from(h2::Reason::PROTOCOL_ERROR).into()).into()
192222
))
193-
.is::<DowngradedH2Error>(),
223+
.is_some(),
194224
"double-wrapped h2 errors must be downgraded"
195225
);
196226

197227
assert!(
198-
!downgrade_h2_error(std::io::Error::new(
228+
downgrade_h2_error(&std::io::Error::new(
199229
std::io::ErrorKind::Other,
200230
"non h2 error"
201231
))
202-
.is::<DowngradedH2Error>(),
232+
.is_none(),
203233
"other h2 errors must not be downgraded"
204234
);
205235
}
206236

237+
#[cfg(test)]
238+
#[test]
239+
fn test_downgrade_error_source() {
240+
let err = Error::Downgraded(DowngradedH2Error(h2::Reason::PROTOCOL_ERROR));
241+
assert!(linkerd_error::is_caused_by::<DowngradedH2Error>(&err));
242+
}
243+
207244
// === impl UpgradeResponseBody ===
208245

209246
impl<B> Body for UpgradeResponseBody<B>
@@ -212,7 +249,7 @@ where
212249
B::Error: std::error::Error + Send + Sync + 'static,
213250
{
214251
type Data = B::Data;
215-
type Error = Error;
252+
type Error = linkerd_error::Error;
216253

217254
#[inline]
218255
fn is_end_stream(&self) -> bool {
@@ -223,10 +260,11 @@ where
223260
self: Pin<&mut Self>,
224261
cx: &mut Context<'_>,
225262
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
226-
self.project()
227-
.inner
228-
.poll_frame(cx)
229-
.map_err(downgrade_h2_error)
263+
self.project().inner.poll_frame(cx).map_err(|err| {
264+
downgrade_h2_error(&err)
265+
.map(Into::into)
266+
.unwrap_or_else(|| err.into())
267+
})
230268
}
231269

232270
#[inline]

0 commit comments

Comments
 (0)