Skip to content

feat: support read timeout for inbound HTTP requests #337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ deno_webidl = { version = "0.136.0" }
deno_web = { version = "0.167.0" }
deno_websocket = { version = "0.141.0" }
deno_webstorage = { version = "0.131.0" }
deno_lockfile = { version = "0.18.0" }
enum-as-inner = "0.6.0"
serde = { version = "1.0.149", features = ["derive"] }
hyper = "0.14.26"
tokio = { version = "1.35", features = ["full"] }
bytes = { version = "1.4.0" }
once_cell = "1.17.1"
thiserror = "1.0.40"
deno_lockfile = "0.18.0"
async-trait = "0.1.73"
indexmap = { version = "2.0.0", features = ["serde"] }
flate2 = "=1.0.26"
Expand All @@ -78,6 +78,7 @@ glob = "0.3.1"
httparse = "1.8"
http = "0.2"
faster-hex = "0.9.0"

# DEBUG
#[patch.crates-io]
#deno_core = { path = "/your/path/to/deno_core/core" }
Expand Down
4 changes: 2 additions & 2 deletions crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ tokio-rustls = { version = "0.25.0" }
rustls-pemfile = { version = "2.1.0" }
futures-util = { workspace = true }
url.workspace = true
event_worker ={ version = "0.1.0", path = "../event_worker" }
event_worker = { version = "0.1.0", path = "../event_worker" }
sb_workers = { version = "0.1.0", path = "../sb_workers" }
sb_env = { version = "0.1.0", path = "../sb_env" }
sb_core = { version = "0.1.0", path = "../sb_core" }
Expand Down Expand Up @@ -109,7 +109,7 @@ reqwest.workspace = true
serde = { workspace = true, features = ["derive"] }
tokio.workspace = true
url.workspace = true
event_worker ={ version = "0.1.0", path = "../event_worker" }
event_worker = { version = "0.1.0", path = "../event_worker" }
deno_broadcast_channel.workspace = true
deno_core.workspace = true
deno_canvas.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod snapshot;
pub mod utils;

mod inspector_server;
mod timeout;

pub use inspector_server::InspectorOption;
pub use sb_graph::DecoratorType;
82 changes: 60 additions & 22 deletions crates/base/src/macros/test_macros.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,36 @@
#[macro_export]
macro_rules! integration_test {
($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => {
macro_rules! integration_test_listen_fut {
($port:expr, $tls:expr, $main_file:expr, $policy:expr, $import_map:expr, $flag:expr, $tx:expr, $token:expr) => {{
use futures_util::FutureExt;

let tls: Option<base::server::Tls> = $tls.clone();

base::commands::start_server(
"0.0.0.0",
$port,
tls,
String::from($main_file),
None,
None,
$policy,
$import_map,
$flag,
Some($tx.clone()),
$crate::server::WorkerEntrypoints {
main: None,
events: None,
},
$token.clone(),
vec![],
None,
)
.boxed()
}};
}

#[macro_export]
macro_rules! integration_test_with_server_flag {
($flag:expr, $main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => {
use futures_util::FutureExt;
use $crate::macros::test_macros::__private;

Expand All @@ -11,39 +41,29 @@ macro_rules! integration_test {
let schema = if tls.is_some() { "https" } else { "http" };
let signal = tokio::spawn(async move {
while let Some(base::server::ServerHealth::Listening(event_rx, metric_src)) = rx.recv().await {
integration_test!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+));
$crate::integration_test_with_server_flag!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+));
}
None
});

let token = integration_test!(@term $(, $($token)+)?);
let mut listen_fut = base::commands::start_server(
"0.0.0.0",
let token = $crate::integration_test_with_server_flag!(@term $(, $($token)+)?);
let mut listen_fut = $crate::integration_test_listen_fut!(
$port,
tls,
String::from($main_file),
None,
None,
$main_file,
$policy,
$import_map,
$crate::server::ServerFlags::default(),
Some(tx.clone()),
$crate::server::WorkerEntrypoints {
main: None,
events: None,
},
token.clone(),
vec![],
None
)
.boxed();
$flag,
tx,
token
);

tokio::select! {
resp = signal => {
if let Ok(maybe_response_from_server) = resp {
// then, after checking the response... (2)
let resp = maybe_response_from_server.unwrap();
integration_test!(@resp resp, ($($function)+)).await;
$crate::integration_test_with_server_flag!(@resp resp, ($($function)+)).await;
} else {
panic!("Request thread had a heart attack");
}
Expand All @@ -62,7 +82,7 @@ macro_rules! integration_test {
let _ = listen_fut.await;
});

integration_test!(@term_cleanup $($($token)+)?, token, join_fut);
$crate::integration_test_with_server_flag!(@term_cleanup $($($token)+)?, token, join_fut);
}
};

Expand Down Expand Up @@ -136,6 +156,24 @@ macro_rules! integration_test {
};
}

#[macro_export]
macro_rules! integration_test {
($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => {
$crate::integration_test_with_server_flag!(
$crate::server::ServerFlags::default(),
$main_file,
$port,
$url,
$policy,
$import_map,
$req_builder,
$tls,
($($function)+)
$(,$($token)+)?
)
};
}

#[doc(hidden)]
pub mod __private {
use std::future::Future;
Expand Down
10 changes: 4 additions & 6 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::deno_runtime::DenoRuntime;
use crate::inspector_server::Inspector;
use crate::timeout;
use crate::utils::send_event_if_event_worker_available;
use crate::utils::units::bytes_to_display;

Expand Down Expand Up @@ -184,7 +185,7 @@ async fn relay_upgraded_request_and_response(
match copy_bidirectional(&mut upstream, &mut downstream).await {
Ok(_) => {}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
let Ok(_) = downstream.downcast::<TlsStream<TcpStream>>() else {
let Ok(_) = downstream.downcast::<timeout::Stream<TlsStream<TcpStream>>>() else {
// TODO(Nyannyacha): It would be better if we send
// `close_notify` before shutdown an upstream if downstream is a
// TLS stream.
Expand Down Expand Up @@ -253,7 +254,7 @@ pub fn create_supervisor(
debug!("Hard memory limit triggered");

if memory_limit_tx.send(()).is_err() {
error!("failed to send memory limit reached notification - isolate may already be terminating");
error!("failed to send memory limit reached notification - isolate may already be terminating");
}

true
Expand All @@ -263,10 +264,7 @@ pub fn create_supervisor(
worker_runtime.js_runtime.add_near_heap_limit_callback({
let memory_limit_tx = memory_limit_tx.clone();
move |cur, _| {
debug!(
"Low memory alert triggered: {}",
bytes_to_display(cur as u64),
);
debug!("Low memory alert triggered: {}", bytes_to_display(cur as u64),);

if memory_limit_tx.send(()).is_err() {
error!("failed to send memory limit reached notification - isolate may already be terminating");
Expand Down
31 changes: 28 additions & 3 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ pub struct ServerFlags {
pub tcp_nodelay: bool,
pub graceful_exit_deadline_sec: u64,
pub graceful_exit_keepalive_deadline_ms: Option<u64>,
pub request_read_timeout_ms: Option<u64>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -467,11 +468,13 @@ impl Server {

let ServerFlags {
tcp_nodelay,
request_read_timeout_ms,
mut graceful_exit_deadline_sec,
mut graceful_exit_keepalive_deadline_ms,
..
} = flags;

let request_read_timeout_dur = request_read_timeout_ms.map(Duration::from_millis);
let mut terminate_signal_fut = get_termination_signal();

loop {
Expand All @@ -487,7 +490,14 @@ impl Server {
let _ = stream.set_nodelay(true);
}

accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone())
accept_stream(
stream,
main_worker_req_tx,
event_tx,
metric_src,
graceful_exit_token.clone(),
request_read_timeout_dur
)
}
Err(e) => error!("socket error: {}", e)
}
Expand All @@ -507,7 +517,14 @@ impl Server {
let _ = stream.get_ref().0.set_nodelay(true);
}

accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone());
accept_stream(
stream,
main_worker_req_tx,
event_tx,
metric_src,
graceful_exit_token.clone(),
request_read_timeout_dur
)
}
Err(e) => error!("socket error: {}", e)
}
Expand Down Expand Up @@ -675,21 +692,29 @@ fn accept_stream<I>(
event_tx: Option<UnboundedSender<ServerEvent>>,
metric_src: SharedMetricSource,
graceful_exit_token: CancellationToken,
maybe_req_read_timeout_dur: Option<Duration>,
) where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
metric_src.incl_active_io();
tokio::task::spawn({
async move {
let (service, cancel) = WorkerService::new(metric_src.clone(), req_tx);
let (io, maybe_timeout_tx) = if let Some(timeout_dur) = maybe_req_read_timeout_dur {
crate::timeout::Stream::with_timeout(io, timeout_dur)
} else {
crate::timeout::Stream::with_bypass(io)
};

let _guard = cancel.drop_guard();
let _active_io_count_guard = scopeguard::guard(metric_src, |it| {
it.decl_active_io();
});

let conn_fut = Http::new().serve_connection(io, service).with_upgrades();
let mut shutting_down = false;
let conn_fut = Http::new()
.serve_connection(io, crate::timeout::Service::new(service, maybe_timeout_tx))
.with_upgrades();

pin!(conn_fut);

Expand Down
Loading
Loading