Skip to content

Commit d1e11bf

Browse files
committed
Merge branch 'igonovg/icx-proxy-fix' into 'master'
BOUN-777: fix icx-proxy http2 GOAWAY handling The goal is to make `icx-proxy` retry on receiving GOAWAY from nginx. Since this error is actually emitted by the client itself, the request never reaches the server and we can transparently retry it. * Implement request retry logic when the error is caused by HTTP2 GOAWAY frame * Update `clap` to 4.0 * Remove stuff that depends on `clap` `cargo`-feature since Bazel does not include it See merge request dfinity-lab/public/ic!12615
2 parents adbea6b + fd7e5ba commit d1e11bf

File tree

6 files changed

+90
-37
lines changed

6 files changed

+90
-37
lines changed

Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rs/boundary_node/icx_proxy/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@ DEPENDENCIES = [
77
"@crate_index//:axum",
88
"@crate_index//:base64",
99
"@crate_index//:candid",
10-
"@crate_index//:clap",
10+
"@crate_index//:clap_4_0_0",
1111
"@crate_index//:flate2",
1212
"@crate_index//:form_urlencoded",
1313
"@crate_index//:futures",
14+
"@crate_index//:h2",
1415
"@crate_index//:hex",
1516
"@crate_index//:http-body",
1617
"@crate_index//:hyper",

rs/boundary_node/icx_proxy/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@ clap = { version = "4", features = ["cargo", "derive"] }
2626
flate2 = "1"
2727
form_urlencoded = "1"
2828
futures = "0.3"
29+
h2 = "0.3.19"
2930
hex = "0.4"
3031
http-body = "0.4"
31-
hyper = { version = "0.14.11", features = ["client", "http2", "http1"] }
32+
hyper = { version = "0.14.26", features = ["client", "http2", "http1"] }
3233
hyper-rustls = { version = "0.24.0", features = [ "http2" ] }
3334
itertools = "0.10"
34-
ic-agent = { workspace = true, default-features = false, features = ["hyper"] }
35+
ic-agent = { workspace = true, default-features = false, features = ["hyper", "reqwest"] }
3536
ic-utils = { workspace = true, features = ["raw"] }
3637
lazy-regex = "2"
3738
opentelemetry = "0.17"

rs/boundary_node/icx_proxy/src/logging.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{fs::File, io::stderr, path::PathBuf};
22

33
use axum::Router;
4-
use clap::{crate_version, ArgAction::Count, Args, ValueEnum};
4+
use clap::{ArgAction::Count, Args, ValueEnum};
55
use tower_http::trace::TraceLayer;
66
use tracing::{
77
info, info_span, level_filters::LevelFilter, span::EnteredSpan, subscriber::set_global_default,
@@ -139,7 +139,7 @@ pub fn setup(opts: LoggingOpts) -> EnteredSpan {
139139
}
140140
.expect("Failed to setup tracing.");
141141

142-
let span = info_span!(target: "icx_proxy", "icx-proxy", version = crate_version!()).entered();
142+
let span = info_span!(target: "icx_proxy", "icx-proxy").entered();
143143
info!(target: "icx_proxy", "Log Level: {filter}");
144144
span
145145
}

rs/boundary_node/icx_proxy/src/main.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use std::{net::SocketAddr, path::PathBuf};
66

77
use anyhow::Context;
8-
use clap::{builder::ValueParser, crate_authors, crate_version, Parser};
8+
use clap::{builder::ValueParser, Parser};
99
use futures::try_join;
1010
use tracing::{error, Instrument};
1111

@@ -46,11 +46,6 @@ impl<R, E> InspectErr for Result<R, E> {
4646
}
4747

4848
#[derive(Parser)]
49-
#[clap(
50-
version = crate_version!(),
51-
author = crate_authors!(),
52-
propagate_version = true,
53-
)]
5449
struct Opts {
5550
/// The address to bind to.
5651
#[clap(long, default_value = "127.0.0.1:3000")]

rs/boundary_node/icx_proxy/src/proxy/agent.rs

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
22
borrow::Borrow,
3+
error::Error,
34
net::{IpAddr, SocketAddr},
45
sync::{
56
atomic::{AtomicUsize, Ordering},
@@ -20,14 +21,15 @@ use ic_utils::{
2021
call::{AsyncCall, SyncCall},
2122
interfaces::http_request::HttpRequestCanister,
2223
};
23-
use tracing::{enabled, info, instrument, trace, Level};
24+
use tracing::{enabled, error, info, instrument, trace, Level};
2425

2526
use crate::error::ErrorFactory;
27+
use crate::http;
2628
use crate::http::request::HttpRequest;
2729
use crate::http::response::{AgentResponseAny, HttpResponse};
2830
use crate::{
2931
canister_id,
30-
proxy::{AppState, HandleError, HyperService},
32+
proxy::{AppState, HandleError, HyperService, REQUEST_BODY_SIZE_LIMIT},
3133
validate::Validate,
3234
};
3335

@@ -125,9 +127,24 @@ fn remove_hop_headers(headers: &HeaderMap<HeaderValue>) -> HeaderMap<HeaderValue
125127
.collect()
126128
}
127129

130+
// Dive into the error chain and figure out if the underlying error was caused by an HTTP2 GOAWAY frame
131+
fn is_h2_goaway(e: &anyhow::Error) -> bool {
132+
if let Some(AgentError::TransportError(e)) = e.downcast_ref::<AgentError>() {
133+
if let Some(e) = e.downcast_ref::<hyper::Error>() {
134+
let def_err = h2::Error::from(h2::Reason::INTERNAL_ERROR);
135+
136+
if let Some(e) = e.source().unwrap_or(&def_err).downcast_ref::<h2::Error>() {
137+
return e.is_go_away();
138+
}
139+
}
140+
}
141+
142+
false
143+
}
144+
128145
#[instrument(level = "info", skip_all, fields(addr = display(addr), replica = display(&*args.replica_uri)))]
129146
pub async fn handler<V: Validate, C: HyperService<Body>>(
130-
State(args): State<Args<V, C>>,
147+
State(mut args): State<Args<V, C>>,
131148
ConnectInfo(addr): ConnectInfo<SocketAddr>,
132149
uri_canister_id: Option<canister_id::UriHost>,
133150
host_canister_id: Option<canister_id::HostHeader>,
@@ -137,28 +154,65 @@ pub async fn handler<V: Validate, C: HyperService<Body>>(
137154
let uri_canister_id = uri_canister_id.map(|v| v.0);
138155
let host_canister_id = host_canister_id.map(|v| v.0);
139156
let query_param_canister_id = query_param_canister_id.map(|v| v.0);
140-
process_request_inner(
141-
request,
142-
addr,
143-
args.agent,
144-
&args.replica_uri,
145-
args.validator,
146-
args.client,
147-
uri_canister_id
148-
.or(host_canister_id)
149-
.or(query_param_canister_id),
150-
)
151-
.await
152-
.handle_error(args.debug)
157+
158+
// Read the request body into a Vec
159+
let (parts, body) = request.into_parts();
160+
let body = match http::body::read_streaming_body(body, REQUEST_BODY_SIZE_LIMIT).await {
161+
Err(e) => {
162+
error!("Unable to read body: {}", e);
163+
return Response::builder()
164+
.status(500)
165+
.body("Error reading body".into())
166+
.unwrap();
167+
}
168+
Ok(b) => b,
169+
};
170+
171+
let mut retries = 3;
172+
loop {
173+
// Create a new request based on the incoming one
174+
let mut request_new = Request::new(Body::from(body.clone()));
175+
*request_new.headers_mut() = parts.headers.clone();
176+
*request_new.method_mut() = parts.method.clone();
177+
*request_new.uri_mut() = parts.uri.clone();
178+
179+
let res = process_request_inner(
180+
request_new,
181+
addr,
182+
&args.agent,
183+
&args.replica_uri,
184+
&args.validator,
185+
&mut args.client,
186+
uri_canister_id
187+
.or(host_canister_id)
188+
.or(query_param_canister_id),
189+
)
190+
.await;
191+
192+
// If we have retries left - check if the underlying reason is a GOAWAY and retry if that's the case.
193+
// GOAWAY is issued when the server is gracefully shutting down and it will not execute the request.
194+
// So we can safely retry the request even if it's not idempotent since it was never worked on in case of GOAWAY.
195+
if retries > 0 {
196+
if let Err(e) = &res {
197+
if is_h2_goaway(e) {
198+
retries -= 1;
199+
info!("HTTP GOAWAY received, retrying request");
200+
continue;
201+
}
202+
}
203+
}
204+
205+
return res.handle_error(args.debug);
206+
}
153207
}
154208

155209
async fn process_request_inner(
156210
request: Request<Body>,
157211
addr: SocketAddr,
158-
agent: Agent,
212+
agent: &Agent,
159213
replica_uri: &Uri,
160-
validator: impl Validate,
161-
mut client: impl HyperService<Body>,
214+
validator: &impl Validate,
215+
client: &mut impl HyperService<Body>,
162216
canister_id: Option<Principal>,
163217
) -> Result<Response<Body>, anyhow::Error> {
164218
let canister_id = match canister_id {
@@ -217,11 +271,12 @@ async fn process_request_inner(
217271
);
218272
}
219273

220-
let canister = HttpRequestCanister::create(&agent, canister_id);
274+
let canister = HttpRequestCanister::create(agent, canister_id);
221275
let header_fields = http_request
222276
.headers
223277
.iter()
224278
.map(|(name, value)| HeaderField(name.into(), value.into()));
279+
225280
let query_result = canister
226281
.http_request_custom(
227282
&http_request.method,
@@ -258,7 +313,7 @@ async fn process_request_inner(
258313
agent_response
259314
};
260315

261-
let http_response = HttpResponse::from((&agent, agent_response));
316+
let http_response = HttpResponse::from((agent, agent_response));
262317
let mut response_builder =
263318
Response::builder().status(StatusCode::from_u16(http_response.status_code)?);
264319
for (name, value) in &http_response.headers {
@@ -270,7 +325,7 @@ async fn process_request_inner(
270325
// and this could cause memory issues and possibly create DOS attack vectors.
271326
let should_validate = !http_response.has_streaming_body && !is_update_call;
272327
if should_validate {
273-
let validation = validator.validate(&agent, &canister_id, &http_request, &http_response);
328+
let validation = validator.validate(agent, &canister_id, &http_request, &http_response);
274329

275330
if validation.is_err() {
276331
return Ok(Response::builder()

0 commit comments

Comments
 (0)