Skip to content

feat(proxy/http): recover abandoned http requests #3903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,7 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
source = "git+https://github.com/cratelyn/hyper.git?rev=390a65c2#390a65c2965f0abaca01ed05f172d294ac047654"
dependencies = [
"bytes",
"futures-channel",
Expand Down
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,11 @@ features = ["tokio", "tracing"]

[workspace.dependencies.linkerd2-proxy-api]
version = "0.16.0"

# TODO(kate): this is a temporary patch of hyper, until hyperium/hyper#3883
# is released. see:
# - https://github.com/hyperium/hyper/pull/3883
# - https://github.com/cratelyn/hyper/commit/0edd1c80ed3576705423877d6906684df261cc45
[patch.crates-io.hyper]
git = "https://github.com/cratelyn/hyper.git"
rev = "390a65c2"
18 changes: 12 additions & 6 deletions linkerd/proxy/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use crate::{h1, h2, orig_proto};
use futures::prelude::*;
use hyper::client::conn::TrySendError;
use linkerd_error::{Error, Result};
use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, ExtractParam, MakeConnection, Service, ServiceExt};
Expand Down Expand Up @@ -134,9 +135,12 @@
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self {
Self::H2(ref mut svc) => svc.poll_ready(cx).map_err(Into::into),
Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx),
Self::Http1(_) => Poll::Ready(Ok(())),
Self::H2(ref mut svc) => svc
.poll_ready(cx)
.map_err(TrySendError::into_error)
.map_err(Error::from),
Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Error::from),

Check failure on line 142 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: `B` cannot be shared between threads safely --> linkerd/proxy/http/src/client.rs:142:79 | 142 | Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Error::from), | ^^^^^ `B` cannot be shared between threads safely | note: required because it appears within the type `http::Request<B>` --> /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/http-1.3.1/src/request.rs:158:12 | 158 | pub struct Request<T> { | ^^^^^^^ note: required because it appears within the type `Option<http::Request<B>>` --> /rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/option.rs:571:10 note: required because it appears within the type `hyper::client::conn::TrySendError<http::Request<B>>` --> /usr/local/cargo/git/checkouts/hyper-a94b23502fa55416/390a65c/src/client/dispatch.rs:26:12 | 26 | pub struct TrySendError<T> { | ^^^^^^^^^^^^ note: required because it appears within the type `orig_proto::Error<B>` --> linkerd/proxy/http/src/orig_proto.rs:51:10 | 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>` to implement `From<orig_proto::Error<B>>` help: consider further restricting this bound | 127 | B: crate::Body + Send + Unpin + 'static + std::marker::Sync, | +++++++++++++++++++

Check failure on line 142 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: `B` doesn't implement `Debug` --> linkerd/proxy/http/src/client.rs:142:79 | 142 | Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Error::from), | ^^^^^ `B` cannot be formatted using `{:?}` because it doesn't implement `Debug` | note: required for `orig_proto::Error<B>` to implement `Debug` --> linkerd/proxy/http/src/orig_proto.rs:50:10 | 50 | #[derive(Debug, Error)] | ^^^^^ unsatisfied trait bound introduced in this `derive` macro note: required for `orig_proto::Error<B>` to implement `std::error::Error` --> linkerd/proxy/http/src/orig_proto.rs:50:17 | 50 | #[derive(Debug, Error)] | ^^^^^ | | | unsatisfied trait bound introduced in this `derive` macro | in this derive macro expansion 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>` to implement `From<orig_proto::Error<B>>` = note: this error originates in the derive macro `Debug` which comes from the expansion of the derive macro `Error` (in Nightly builds, run with -Z macro-backtrace for more info) help: consider further restricting this bound | 127 | B: crate::Body + Send + Unpin + 'static + std::fmt::Debug, | +++++++++++++++++

Check failure on line 142 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: the trait bound `hyper::client::conn::TrySendError<http::Request<B>>: std::error::Error` is not satisfied --> linkerd/proxy/http/src/client.rs:142:79 | 142 | Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Error::from), | ^^^^^ the trait `std::error::Error` is not implemented for `hyper::client::conn::TrySendError<http::Request<B>>`, which is required by `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>: From<orig_proto::Error<B>>` | = help: the following other types implement trait `From<T>`: `Box<CStr>` implements `From<&CStr>` `Box<CStr>` implements `From<CString>` `Box<CStr>` implements `From<Cow<'_, CStr>>` `Box<OsStr>` implements `From<&OsStr>` `Box<OsStr>` implements `From<Cow<'_, OsStr>>` `Box<OsStr>` implements `From<OsString>` `Box<Path>` implements `From<&Path>` `Box<Path>` implements `From<Cow<'_, Path>>` and 20 others note: required for `orig_proto::Error<B>` to implement `std::error::Error` --> linkerd/proxy/http/src/orig_proto.rs:50:17 | 50 | #[derive(Debug, Error)] | ^^^^^ | | | unsatisfied trait bound introduced in this `derive` macro | in this derive macro expansion 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>` to implement `From<orig_proto::Error<B>>` = note: this error originates in the derive macro `Error` (in Nightly builds, run with -Z macro-backtrace for more info)

Check failure on line 142 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: `B` cannot be shared between threads safely --> linkerd/proxy/http/src/client.rs:142:52 | 142 | Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Error::from), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `B` cannot be shared between threads safely | note: required because it appears within the type `http::Request<B>` --> /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/http-1.3.1/src/request.rs:158:12 | 158 | pub struct Request<T> { | ^^^^^^^ note: required because it appears within the type `Option<http::Request<B>>` --> /rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/option.rs:571:10 note: required because it appears within the type `hyper::client::conn::TrySendError<http::Request<B>>` --> /usr/local/cargo/git/checkouts/hyper-a94b23502fa55416/390a65c/src/client/dispatch.rs:26:12 | 26 | pub struct TrySendError<T> { | ^^^^^^^^^^^^ note: required because it appears within the type `orig_proto::Error<B>` --> linkerd/proxy/http/src/orig_proto.rs:51:10 | 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<dyn std::error::Error + std::marker::Send + Sync>` to implement `From<orig_proto::Error<B>>` help: consider further restricting this bound | 127 | B: crate::Body + Send + Unpin + 'static + std::marker::Sync, | +++++++++++++++++++

Check failure on line 142 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: `B` doesn't implement `Debug` --> linkerd/proxy/http/src/client.rs:142:52 | 142 | Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Error::from), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `B` cannot be formatted using `{:?}` because it doesn't implement `Debug` | note: required for `orig_proto::Error<B>` to implement `Debug` --> linkerd/proxy/http/src/orig_proto.rs:50:10 | 50 | #[derive(Debug, Error)] | ^^^^^ unsatisfied trait bound introduced in this `derive` macro note: required for `orig_proto::Error<B>` to implement `std::error::Error` --> linkerd/proxy/http/src/orig_proto.rs:50:17 | 50 | #[derive(Debug, Error)] | ^^^^^ | | | unsatisfied trait bound introduced in this `derive` macro | in this derive macro expansion 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<dyn std::error::Error + std::marker::Send + Sync>` to implement `From<orig_proto::Error<B>>` = note: this error originates in the derive macro `Debug` which comes from the expansion of the derive macro `Error` (in Nightly builds, run with -Z macro-backtrace for more info) help: consider further restricting this bound | 127 | B: crate::Body + Send + Unpin + 'static + std::fmt::Debug, | +++++++++++++++++

Check failure on line 142 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: the trait bound `hyper::client::conn::TrySendError<http::Request<B>>: std::error::Error` is not satisfied --> linkerd/proxy/http/src/client.rs:142:52 | 142 | Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Error::from), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::error::Error` is not implemented for `hyper::client::conn::TrySendError<http::Request<B>>`, which is required by `Box<dyn std::error::Error + std::marker::Send + Sync>: From<orig_proto::Error<B>>` | = help: the following other types implement trait `From<T>`: `Box<CStr>` implements `From<&CStr>` `Box<CStr>` implements `From<CString>` `Box<CStr>` implements `From<Cow<'_, CStr>>` `Box<OsStr>` implements `From<&OsStr>` `Box<OsStr>` implements `From<Cow<'_, OsStr>>` `Box<OsStr>` implements `From<OsString>` `Box<Path>` implements `From<&Path>` `Box<Path>` implements `From<Cow<'_, Path>>` and 20 others note: required for `orig_proto::Error<B>` to implement `std::error::Error` --> linkerd/proxy/http/src/orig_proto.rs:50:17 | 50 | #[derive(Debug, Error)] | ^^^^^ | | | unsatisfied trait bound introduced in this `derive` macro | in this derive macro expansion 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<dyn std::error::Error + std::marker::Send + Sync>` to implement `From<orig_proto::Error<B>>` = note: this error originates in the derive macro `Error` (in Nightly builds, run with -Z macro-backtrace for more info)
Self::Http1(ref mut svc) => svc.poll_ready(cx),
}
}

Expand All @@ -155,11 +159,13 @@
debug!(headers = ?req.headers());

match self {
Self::Http1(ref mut h1) => h1.request(req),
Self::OrigProtoUpgrade(ref mut svc) => svc.call(req),
Self::Http1(ref mut svc) => svc.call(req),
Self::OrigProtoUpgrade(ref mut svc) => svc.call(req).map_err(Into::into).boxed(),

Check failure on line 163 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: the trait bound `hyper::client::conn::TrySendError<http::Request<B>>: std::error::Error` is not satisfied --> linkerd/proxy/http/src/client.rs:163:56 | 163 | Self::OrigProtoUpgrade(ref mut svc) => svc.call(req).map_err(Into::into).boxed(), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::error::Error` is not implemented for `hyper::client::conn::TrySendError<http::Request<B>>`, which is required by `orig_proto::Error<B>: Into<_>` | note: required for `orig_proto::Error<B>` to implement `std::error::Error` --> linkerd/proxy/http/src/orig_proto.rs:50:17 | 50 | #[derive(Debug, Error)] | ^^^^^ | | | unsatisfied trait bound introduced in this `derive` macro | in this derive macro expansion 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<dyn std::error::Error + std::marker::Send + Sync>` to implement `From<orig_proto::Error<B>>` = note: required for `orig_proto::Error<B>` to implement `Into<Box<dyn std::error::Error + std::marker::Send + Sync>>` = note: this error originates in the derive macro `Error` (in Nightly builds, run with -Z macro-backtrace for more info)

Check failure on line 163 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: `B` cannot be shared between threads safely --> linkerd/proxy/http/src/client.rs:163:56 | 163 | Self::OrigProtoUpgrade(ref mut svc) => svc.call(req).map_err(Into::into).boxed(), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `B` cannot be shared between threads safely | note: required because it appears within the type `http::Request<B>` --> /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/http-1.3.1/src/request.rs:158:12 | 158 | pub struct Request<T> { | ^^^^^^^ note: required because it appears within the type `Option<http::Request<B>>` --> /rustc/90b35a6239c3d8bdabc530a6a0816f7ff89a0aaf/library/core/src/option.rs:571:10 note: required because it appears within the type `hyper::client::conn::TrySendError<http::Request<B>>` --> /usr/local/cargo/git/checkouts/hyper-a94b23502fa55416/390a65c/src/client/dispatch.rs:26:12 | 26 | pub struct TrySendError<T> { | ^^^^^^^^^^^^ note: required because it appears within the type `orig_proto::Error<B>` --> linkerd/proxy/http/src/orig_proto.rs:51:10 | 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>` to implement `From<orig_proto::Error<B>>` = note: required for `orig_proto::Error<B>` to implement `Into<Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>>` help: consider further restricting this bound | 127 | B: crate::Body + Send + Unpin + 'static + std::marker::Sync, | +++++++++++++++++++

Check failure on line 163 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: `B` doesn't implement `Debug` --> linkerd/proxy/http/src/client.rs:163:56 | 163 | Self::OrigProtoUpgrade(ref mut svc) => svc.call(req).map_err(Into::into).boxed(), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `B` cannot be formatted using `{:?}` because it doesn't implement `Debug` | note: required for `orig_proto::Error<B>` to implement `Debug` --> linkerd/proxy/http/src/orig_proto.rs:50:10 | 50 | #[derive(Debug, Error)] | ^^^^^ unsatisfied trait bound introduced in this `derive` macro note: required for `orig_proto::Error<B>` to implement `std::error::Error` --> linkerd/proxy/http/src/orig_proto.rs:50:17 | 50 | #[derive(Debug, Error)] | ^^^^^ | | | unsatisfied trait bound introduced in this `derive` macro | in this derive macro expansion 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>` to implement `From<orig_proto::Error<B>>` = note: required for `orig_proto::Error<B>` to implement `Into<Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>>` = note: this error originates in the derive macro `Debug` which comes from the expansion of the derive macro `Error` (in Nightly builds, run with -Z macro-backtrace for more info) help: consider further restricting this bound | 127 | B: crate::Body + Send + Unpin + 'static + std::fmt::Debug, | +++++++++++++++++

Check failure on line 163 in linkerd/proxy/http/src/client.rs

View workflow job for this annotation

GitHub Actions / package (amd64, linux, gnu)

error[E0277]: the trait bound `hyper::client::conn::TrySendError<http::Request<B>>: std::error::Error` is not satisfied --> linkerd/proxy/http/src/client.rs:163:56 | 163 | Self::OrigProtoUpgrade(ref mut svc) => svc.call(req).map_err(Into::into).boxed(), | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::error::Error` is not implemented for `hyper::client::conn::TrySendError<http::Request<B>>`, which is required by `orig_proto::Error<B>: Into<_>` | note: required for `orig_proto::Error<B>` to implement `std::error::Error` --> linkerd/proxy/http/src/orig_proto.rs:50:17 | 50 | #[derive(Debug, Error)] | ^^^^^ | | | unsatisfied trait bound introduced in this `derive` macro | in this derive macro expansion 51 | pub enum Error<B> { | ^^^^^ = note: required for `Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>` to implement `From<orig_proto::Error<B>>` = note: required for `orig_proto::Error<B>` to implement `Into<Box<(dyn std::error::Error + std::marker::Send + Sync + 'static)>>` = note: this error originates in the derive macro `Error` (in Nightly builds, run with -Z macro-backtrace for more info)
Self::H2(ref mut svc) => Box::pin(
svc.call(req)
.err_into::<Error>()
// TODO(kate): recover abandoned requests from the http/2 client.
.map_err(TrySendError::into_error)
.map_err(Error::from)
.map_ok(|rsp| rsp.map(BoxBody::new)),
) as RspFuture,
}
Expand Down
12 changes: 10 additions & 2 deletions linkerd/proxy/http/src/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<C: Clone, T: Clone, B> Clone for Client<C, T, B> {

type RspFuture = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;

impl<C, T, B> Client<C, T, B>
impl<C, T, B> tower::Service<http::Request<B>> for Client<C, T, B>
where
T: Clone + Send + Sync + 'static,
C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static,
Expand All @@ -71,7 +71,15 @@ where
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
{
pub(crate) fn request(&mut self, mut req: http::Request<B>) -> RspFuture {
type Response = http::Response<BoxBody>;
type Error = Error;
type Future = RspFuture;

fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll<Result<()>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, mut req: http::Request<B>) -> RspFuture {
// Marked by `upgrade`.
let upgrade = req.extensions_mut().remove::<Http11Upgrade>();
let is_http_connect = req.method() == http::Method::CONNECT;
Expand Down
6 changes: 3 additions & 3 deletions linkerd/proxy/http/src/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ where
B::Error: Into<Error> + Send + Sync,
{
type Response = http::Response<hyper::body::Incoming>;
type Error = hyper::Error;
type Error = hyper::client::conn::TrySendError<http::Request<B>>;
type Future = Pin<Box<dyn Send + Future<Output = Result<Self::Response, Self::Error>>>>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_ready(cx).map_err(From::from)
self.tx.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
Expand All @@ -172,6 +172,6 @@ where
*req.version_mut() = http::Version::HTTP_11;
}

self.tx.send_request(req).boxed()
self.tx.try_send_request(req).boxed()
}
}
141 changes: 93 additions & 48 deletions linkerd/proxy/http/src/orig_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{h1, h2, Body};
use futures::prelude::*;
use http::header::{HeaderValue, TRANSFER_ENCODING};
use http_body::Frame;
use linkerd_error::{Error, Result};
use linkerd_error::Result;
use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, MakeConnection, Service};
use std::{
Expand Down Expand Up @@ -43,6 +43,20 @@ pub struct Downgrade<S> {
#[derive(Clone, Debug)]
pub struct WasUpgrade(());

/// An error returned by the [`Upgrade`] client.
///
/// This can represent an error presented by either of the underlying HTTP/1 or HTTP/2 clients,
/// or a "downgraded" HTTP/2 error.
#[derive(Debug, Error)]
pub enum Error<B> {
#[error("{0}")]
Downgraded(#[from] DowngradedH2Error),
#[error(transparent)]
H1(linkerd_error::Error),
#[error(transparent)]
H2(hyper::client::conn::TrySendError<http::Request<B>>),
}

// === impl Upgrade ===

impl<C, T, B> Upgrade<C, T, B> {
Expand All @@ -59,15 +73,24 @@ where
C::Future: Unpin + Send + 'static,
B: crate::Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
B::Error: Into<linkerd_error::Error> + Send + Sync,
{
type Response = http::Response<BoxBody>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;
type Error = Error<B>;
type Future = Pin<
Box<dyn Future<Output = Result<http::Response<BoxBody>, Self::Error>> + Send + 'static>,
>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.h2.poll_ready(cx).map_err(downgrade_h2_error)
let Self { http1, h2 } = self;

match http1.poll_ready(cx).map_err(Error::H1) {
Poll::Ready(Ok(())) => {}
poll => return poll,
}

h2.poll_ready(cx).map_err(Error::h2)
}

fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
Expand All @@ -78,7 +101,12 @@ where
.is_some()
{
debug!("Skipping orig-proto upgrade due to HTTP/1.1 upgrade");
return Box::pin(self.http1.request(req).map_ok(|rsp| rsp.map(BoxBody::new)));
return Box::pin(
self.http1
.call(req)
.map_ok(|rsp| rsp.map(BoxBody::new))
.map_err(Error::H1),
);
}

let orig_version = req.version();
Expand Down Expand Up @@ -106,97 +134,113 @@ where

*req.version_mut() = http::Version::HTTP_2;

Box::pin(
self.h2
.call(req)
.map_err(downgrade_h2_error)
.map_ok(move |mut rsp| {
let version = rsp
.headers_mut()
.remove(L5D_ORIG_PROTO)
.and_then(|orig_proto| {
if orig_proto == "HTTP/1.1" {
Some(http::Version::HTTP_11)
} else if orig_proto == "HTTP/1.0" {
Some(http::Version::HTTP_10)
} else {
None
}
})
.unwrap_or(orig_version);
trace!(?version, "Downgrading response");
*rsp.version_mut() = version;
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
}),
)
Box::pin(self.h2.call(req).map_err(Error::h2).map_ok(move |mut rsp| {
let version = rsp
.headers_mut()
.remove(L5D_ORIG_PROTO)
.and_then(|orig_proto| {
if orig_proto == "HTTP/1.1" {
Some(http::Version::HTTP_11)
} else if orig_proto == "HTTP/1.0" {
Some(http::Version::HTTP_10)
} else {
None
}
})
.unwrap_or(orig_version);
trace!(?version, "Downgrading response");
*rsp.version_mut() = version;
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
}))
}
}

// === impl Error ===

impl<B> Error<B> {
fn h2(err: hyper::client::conn::TrySendError<http::Request<B>>) -> Self {
if let Some(downgraded) = downgrade_h2_error(err.error()) {
return Self::Downgraded(downgraded);
}

Self::H2(err)
}
}

/// Handles HTTP/2 client errors for HTTP/1.1 requests by wrapping the error type. This
/// simplifies error handling elsewhere so that HTTP/2 errors can only be encountered when the
/// original request was HTTP/2.
fn downgrade_h2_error<E: std::error::Error + Send + Sync + 'static>(orig: E) -> Error {
fn downgrade_h2_error<E: std::error::Error + Send + Sync + 'static>(
orig: &E,
) -> Option<DowngradedH2Error> {
#[inline]
fn reason(e: &(dyn std::error::Error + 'static)) -> Option<h2::Reason> {
e.downcast_ref::<h2::H2Error>()?.reason()
}

// If the provided error was an H2 error, wrap it as a downgraded error.
if let Some(reason) = reason(&orig) {
return DowngradedH2Error(reason).into();
if let Some(reason) = reason(orig) {
return Some(DowngradedH2Error(reason));
}

// Otherwise, check the source chain to see if its original error was an H2 error.
let mut cause = orig.source();
while let Some(error) = cause {
if let Some(reason) = reason(error) {
return DowngradedH2Error(reason).into();
return Some(DowngradedH2Error(reason));
}

cause = error.source();
}

// If the error was not an H2 error, return the original error (boxed).
orig.into()
// If the error was not an H2 error, return None.
None
}

#[cfg(test)]
#[test]
fn test_downgrade_h2_error() {
assert!(
downgrade_h2_error(h2::H2Error::from(h2::Reason::PROTOCOL_ERROR)).is::<DowngradedH2Error>(),
downgrade_h2_error(&h2::H2Error::from(h2::Reason::PROTOCOL_ERROR)).is_some(),
"h2 errors must be downgraded"
);

#[derive(Debug, Error)]
#[error("wrapped h2 error: {0}")]
struct WrapError(#[source] Error);
struct WrapError(#[source] linkerd_error::Error);
assert!(
downgrade_h2_error(WrapError(
downgrade_h2_error(&WrapError(
h2::H2Error::from(h2::Reason::PROTOCOL_ERROR).into()
))
.is::<DowngradedH2Error>(),
.is_some(),
"wrapped h2 errors must be downgraded"
);

assert!(
downgrade_h2_error(WrapError(
downgrade_h2_error(&WrapError(
WrapError(h2::H2Error::from(h2::Reason::PROTOCOL_ERROR).into()).into()
))
.is::<DowngradedH2Error>(),
.is_some(),
"double-wrapped h2 errors must be downgraded"
);

assert!(
!downgrade_h2_error(std::io::Error::new(
downgrade_h2_error(&std::io::Error::new(
std::io::ErrorKind::Other,
"non h2 error"
))
.is::<DowngradedH2Error>(),
.is_none(),
"other h2 errors must not be downgraded"
);
}

#[cfg(test)]
#[test]
fn test_downgrade_error_source() {
let err = Error::Downgraded(DowngradedH2Error(h2::Reason::PROTOCOL_ERROR));
assert!(linkerd_error::is_caused_by::<DowngradedH2Error>(&err));
}

// === impl UpgradeResponseBody ===

impl<B> Body for UpgradeResponseBody<B>
Expand All @@ -205,7 +249,7 @@ where
B::Error: std::error::Error + Send + Sync + 'static,
{
type Data = B::Data;
type Error = Error;
type Error = linkerd_error::Error;

#[inline]
fn is_end_stream(&self) -> bool {
Expand All @@ -216,10 +260,11 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project()
.inner
.poll_frame(cx)
.map_err(downgrade_h2_error)
self.project().inner.poll_frame(cx).map_err(|err| {
downgrade_h2_error(&err)
.map(Into::into)
.unwrap_or_else(|| err.into())
})
}

#[inline]
Expand Down
Loading