diff --git a/Cargo.lock b/Cargo.lock index e8183e2567..491bbae0ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1085,8 +1085,7 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +source = "git+https://github.com/cratelyn/hyper.git?rev=390a65c2#390a65c2965f0abaca01ed05f172d294ac047654" dependencies = [ "bytes", "futures-channel", diff --git a/Cargo.toml b/Cargo.toml index c0c45d3c00..c5bde6e659 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,3 +136,11 @@ features = ["tokio", "tracing"] [workspace.dependencies.linkerd2-proxy-api] version = "0.16.0" + +# TODO(kate): this is a temporary patch of hyper, until hyperium/hyper#3883 +# is released. see: +# - https://github.com/hyperium/hyper/pull/3883 +# - https://github.com/cratelyn/hyper/commit/0edd1c80ed3576705423877d6906684df261cc45 +[patch.crates-io.hyper] +git = "https://github.com/cratelyn/hyper.git" +rev = "390a65c2" diff --git a/linkerd/proxy/http/src/client.rs b/linkerd/proxy/http/src/client.rs index 88f413c6a4..09cb13450c 100644 --- a/linkerd/proxy/http/src/client.rs +++ b/linkerd/proxy/http/src/client.rs @@ -7,6 +7,7 @@ use crate::{h1, h2, orig_proto}; use futures::prelude::*; +use hyper::client::conn::TrySendError; use linkerd_error::{Error, Result}; use linkerd_http_box::BoxBody; use linkerd_stack::{layer, ExtractParam, MakeConnection, Service, ServiceExt}; @@ -134,9 +135,12 @@ where #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self { - Self::H2(ref mut svc) => svc.poll_ready(cx).map_err(Into::into), - Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx), - Self::Http1(_) => Poll::Ready(Ok(())), + Self::H2(ref mut svc) => svc + .poll_ready(cx) + .map_err(TrySendError::into_error) + .map_err(Error::from), + Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Error::from), + Self::Http1(ref mut svc) => svc.poll_ready(cx), } } @@ -155,11 +159,13 @@ where debug!(headers = ?req.headers()); match self { - Self::Http1(ref mut h1) => h1.request(req), - Self::OrigProtoUpgrade(ref mut svc) => svc.call(req), + Self::Http1(ref mut svc) => svc.call(req), + Self::OrigProtoUpgrade(ref mut svc) => svc.call(req).map_err(Into::into).boxed(), Self::H2(ref mut svc) => Box::pin( svc.call(req) - .err_into::() + // TODO(kate): recover abandoned requests from the http/2 client. + .map_err(TrySendError::into_error) + .map_err(Error::from) .map_ok(|rsp| rsp.map(BoxBody::new)), ) as RspFuture, } diff --git a/linkerd/proxy/http/src/h1.rs b/linkerd/proxy/http/src/h1.rs index 9f51ac9f17..8f3419a9f3 100644 --- a/linkerd/proxy/http/src/h1.rs +++ b/linkerd/proxy/http/src/h1.rs @@ -61,7 +61,7 @@ impl Clone for Client { type RspFuture = Pin>> + Send + 'static>>; -impl Client +impl tower::Service> for Client where T: Clone + Send + Sync + 'static, C: MakeConnection<(crate::Variant, T)> + Clone + Send + Sync + 'static, @@ -71,7 +71,15 @@ where B::Data: Send, B::Error: Into + Send + Sync, { - pub(crate) fn request(&mut self, mut req: http::Request) -> RspFuture { + type Response = http::Response; + type Error = Error; + type Future = RspFuture; + + fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, mut req: http::Request) -> RspFuture { // Marked by `upgrade`. let upgrade = req.extensions_mut().remove::(); let is_http_connect = req.method() == http::Method::CONNECT; diff --git a/linkerd/proxy/http/src/h2.rs b/linkerd/proxy/http/src/h2.rs index 62c0ed58db..2b9621ea19 100644 --- a/linkerd/proxy/http/src/h2.rs +++ b/linkerd/proxy/http/src/h2.rs @@ -149,12 +149,12 @@ where B::Error: Into + Send + Sync, { type Response = http::Response; - type Error = hyper::Error; + type Error = hyper::client::conn::TrySendError>; type Future = Pin>>>; #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.tx.poll_ready(cx).map_err(From::from) + self.tx.poll_ready(cx).map_err(Into::into) } fn call(&mut self, mut req: http::Request) -> Self::Future { @@ -172,6 +172,6 @@ where *req.version_mut() = http::Version::HTTP_11; } - self.tx.send_request(req).boxed() + self.tx.try_send_request(req).boxed() } } diff --git a/linkerd/proxy/http/src/orig_proto.rs b/linkerd/proxy/http/src/orig_proto.rs index c9c31c7861..6eabe5eca5 100644 --- a/linkerd/proxy/http/src/orig_proto.rs +++ b/linkerd/proxy/http/src/orig_proto.rs @@ -2,7 +2,7 @@ use super::{h1, h2, Body}; use futures::prelude::*; use http::header::{HeaderValue, TRANSFER_ENCODING}; use http_body::Frame; -use linkerd_error::{Error, Result}; +use linkerd_error::Result; use linkerd_http_box::BoxBody; use linkerd_stack::{layer, MakeConnection, Service}; use std::{ @@ -43,6 +43,20 @@ pub struct Downgrade { #[derive(Clone, Debug)] pub struct WasUpgrade(()); +/// An error returned by the [`Upgrade`] client. +/// +/// This can represent an error presented by either of the underlying HTTP/1 or HTTP/2 clients, +/// or a "downgraded" HTTP/2 error. +#[derive(Debug, Error)] +pub enum Error { + #[error("{0}")] + Downgraded(#[from] DowngradedH2Error), + #[error(transparent)] + H1(linkerd_error::Error), + #[error(transparent)] + H2(hyper::client::conn::TrySendError>), +} + // === impl Upgrade === impl Upgrade { @@ -59,15 +73,24 @@ where C::Future: Unpin + Send + 'static, B: crate::Body + Send + Unpin + 'static, B::Data: Send, - B::Error: Into + Send + Sync, + B::Error: Into + Send + Sync, { type Response = http::Response; - type Error = Error; - type Future = Pin>> + Send + 'static>>; + type Error = Error; + type Future = Pin< + Box, Self::Error>> + Send + 'static>, + >; #[inline] fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.h2.poll_ready(cx).map_err(downgrade_h2_error) + let Self { http1, h2 } = self; + + match http1.poll_ready(cx).map_err(Error::H1) { + Poll::Ready(Ok(())) => {} + poll => return poll, + } + + h2.poll_ready(cx).map_err(Error::h2) } fn call(&mut self, mut req: http::Request) -> Self::Future { @@ -78,7 +101,12 @@ where .is_some() { debug!("Skipping orig-proto upgrade due to HTTP/1.1 upgrade"); - return Box::pin(self.http1.request(req).map_ok(|rsp| rsp.map(BoxBody::new))); + return Box::pin( + self.http1 + .call(req) + .map_ok(|rsp| rsp.map(BoxBody::new)) + .map_err(Error::H1), + ); } let orig_version = req.version(); @@ -106,97 +134,113 @@ where *req.version_mut() = http::Version::HTTP_2; - Box::pin( - self.h2 - .call(req) - .map_err(downgrade_h2_error) - .map_ok(move |mut rsp| { - let version = rsp - .headers_mut() - .remove(L5D_ORIG_PROTO) - .and_then(|orig_proto| { - if orig_proto == "HTTP/1.1" { - Some(http::Version::HTTP_11) - } else if orig_proto == "HTTP/1.0" { - Some(http::Version::HTTP_10) - } else { - None - } - }) - .unwrap_or(orig_version); - trace!(?version, "Downgrading response"); - *rsp.version_mut() = version; - rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner })) - }), - ) + Box::pin(self.h2.call(req).map_err(Error::h2).map_ok(move |mut rsp| { + let version = rsp + .headers_mut() + .remove(L5D_ORIG_PROTO) + .and_then(|orig_proto| { + if orig_proto == "HTTP/1.1" { + Some(http::Version::HTTP_11) + } else if orig_proto == "HTTP/1.0" { + Some(http::Version::HTTP_10) + } else { + None + } + }) + .unwrap_or(orig_version); + trace!(?version, "Downgrading response"); + *rsp.version_mut() = version; + rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner })) + })) + } +} + +// === impl Error === + +impl Error { + fn h2(err: hyper::client::conn::TrySendError>) -> Self { + if let Some(downgraded) = downgrade_h2_error(err.error()) { + return Self::Downgraded(downgraded); + } + + Self::H2(err) } } /// Handles HTTP/2 client errors for HTTP/1.1 requests by wrapping the error type. This /// simplifies error handling elsewhere so that HTTP/2 errors can only be encountered when the /// original request was HTTP/2. -fn downgrade_h2_error(orig: E) -> Error { +fn downgrade_h2_error( + orig: &E, +) -> Option { #[inline] fn reason(e: &(dyn std::error::Error + 'static)) -> Option { e.downcast_ref::()?.reason() } // If the provided error was an H2 error, wrap it as a downgraded error. - if let Some(reason) = reason(&orig) { - return DowngradedH2Error(reason).into(); + if let Some(reason) = reason(orig) { + return Some(DowngradedH2Error(reason)); } // Otherwise, check the source chain to see if its original error was an H2 error. let mut cause = orig.source(); while let Some(error) = cause { if let Some(reason) = reason(error) { - return DowngradedH2Error(reason).into(); + return Some(DowngradedH2Error(reason)); } cause = error.source(); } - // If the error was not an H2 error, return the original error (boxed). - orig.into() + // If the error was not an H2 error, return None. + None } #[cfg(test)] #[test] fn test_downgrade_h2_error() { assert!( - downgrade_h2_error(h2::H2Error::from(h2::Reason::PROTOCOL_ERROR)).is::(), + downgrade_h2_error(&h2::H2Error::from(h2::Reason::PROTOCOL_ERROR)).is_some(), "h2 errors must be downgraded" ); #[derive(Debug, Error)] #[error("wrapped h2 error: {0}")] - struct WrapError(#[source] Error); + struct WrapError(#[source] linkerd_error::Error); assert!( - downgrade_h2_error(WrapError( + downgrade_h2_error(&WrapError( h2::H2Error::from(h2::Reason::PROTOCOL_ERROR).into() )) - .is::(), + .is_some(), "wrapped h2 errors must be downgraded" ); assert!( - downgrade_h2_error(WrapError( + downgrade_h2_error(&WrapError( WrapError(h2::H2Error::from(h2::Reason::PROTOCOL_ERROR).into()).into() )) - .is::(), + .is_some(), "double-wrapped h2 errors must be downgraded" ); assert!( - !downgrade_h2_error(std::io::Error::new( + downgrade_h2_error(&std::io::Error::new( std::io::ErrorKind::Other, "non h2 error" )) - .is::(), + .is_none(), "other h2 errors must not be downgraded" ); } +#[cfg(test)] +#[test] +fn test_downgrade_error_source() { + let err = Error::Downgraded(DowngradedH2Error(h2::Reason::PROTOCOL_ERROR)); + assert!(linkerd_error::is_caused_by::(&err)); +} + // === impl UpgradeResponseBody === impl Body for UpgradeResponseBody @@ -205,7 +249,7 @@ where B::Error: std::error::Error + Send + Sync + 'static, { type Data = B::Data; - type Error = Error; + type Error = linkerd_error::Error; #[inline] fn is_end_stream(&self) -> bool { @@ -216,10 +260,11 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { - self.project() - .inner - .poll_frame(cx) - .map_err(downgrade_h2_error) + self.project().inner.poll_frame(cx).map_err(|err| { + downgrade_h2_error(&err) + .map(Into::into) + .unwrap_or_else(|| err.into()) + }) } #[inline]