From b3b7ed0ba2edd87f55727bec4e727965aec09c31 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 7 May 2024 04:29:07 +0000 Subject: [PATCH 01/16] feat: initial implementation for request read timeout --- crates/base/src/lib.rs | 1 + crates/base/src/server.rs | 14 ++- crates/base/src/timeout.rs | 237 +++++++++++++++++++++++++++++++++++++ 3 files changed, 250 insertions(+), 2 deletions(-) create mode 100644 crates/base/src/timeout.rs diff --git a/crates/base/src/lib.rs b/crates/base/src/lib.rs index 205761517..02158ed8c 100644 --- a/crates/base/src/lib.rs +++ b/crates/base/src/lib.rs @@ -9,6 +9,7 @@ pub mod snapshot; pub mod utils; mod inspector_server; +mod timeout; pub use inspector_server::InspectorOption; pub use sb_graph::DecoratorType; diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 107b0697a..4a148ae2a 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -487,7 +487,7 @@ impl Server { let _ = stream.set_nodelay(true); } - accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone()) + accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone(), None) } Err(e) => error!("socket error: {}", e) } @@ -507,7 +507,7 @@ impl Server { let _ = stream.get_ref().0.set_nodelay(true); } - accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone()); + accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone(), None); } Err(e) => error!("socket error: {}", e) } @@ -675,6 +675,7 @@ fn accept_stream( event_tx: Option>, metric_src: SharedMetricSource, graceful_exit_token: CancellationToken, + maybe_req_idle_timeout_dur: Option, ) where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -682,6 +683,15 @@ fn accept_stream( tokio::task::spawn({ async move { let (service, cancel) = WorkerService::new(metric_src.clone(), req_tx); + let (timeout_tx, timeout_rx) = mpsc::unbounded_channel(); + + let io = crate::timeout::Stream::new( + io, + maybe_req_idle_timeout_dur.unwrap_or(Duration::MAX), + timeout_rx, + ); + + let service = crate::timeout::Service::new(service, timeout_tx); let _guard = cancel.drop_guard(); let _active_io_count_guard = scopeguard::guard(metric_src, |it| { diff --git a/crates/base/src/timeout.rs b/crates/base/src/timeout.rs new file mode 100644 index 000000000..4a8fee24a --- /dev/null +++ b/crates/base/src/timeout.rs @@ -0,0 +1,237 @@ +// This implementation originated from the link below: +// https://gist.github.com/programatik29/36d371c657392fd7f322e7342957b6d1 + +use std::{ + pin::Pin, + task::{ready, Poll}, + time::Duration, +}; + +use futures_util::Future; +use pin_project::pin_project; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, + time::{sleep, Instant, Sleep}, +}; + +pub(super) enum State { + Wait, + Reset, +} + +pub struct Stream { + inner: S, + sleep: Pin>, + duration: Duration, + waiting: bool, + finished: bool, + state: UnboundedReceiver, +} + +impl Stream { + pub(super) fn new(inner: S, duration: Duration, rx: UnboundedReceiver) -> Self { + Self { + inner, + sleep: Box::pin(sleep(duration)), + duration, + waiting: false, + finished: false, + state: rx, + } + } +} + +impl AsyncRead for Stream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + if !self.finished { + match Pin::new(&mut self.state).poll_recv(cx) { + Poll::Ready(Some(State::Reset)) => { + self.waiting = false; + + let deadline = Instant::now() + self.duration; + + self.sleep.as_mut().reset(deadline); + } + + // enter waiting mode (for response body last chunk) + Poll::Ready(Some(State::Wait)) => self.waiting = true, + Poll::Ready(None) => self.finished = true, + Poll::Pending => (), + } + } + + if !self.waiting { + // return error if timer is elapsed + if let Poll::Ready(()) = self.sleep.as_mut().poll(cx) { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "request header read timed out", + ))); + } + } + + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for Stream { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +pub struct Service { + inner: S, + tx: UnboundedSender, +} + +impl Service { + pub(super) fn new(inner: S, tx: UnboundedSender) -> Self { + Self { inner, tx } + } +} + +impl hyper::service::Service for Service +where + S: hyper::service::Service>, +{ + type Response = hyper::Response>; + type Error = S::Error; + type Future = ServiceFuture; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + // send timer wait signal + let _ = self.tx.send(State::Wait); + + ServiceFuture::new(self.inner.call(req), self.tx.clone()) + } +} + +#[pin_project] +pub struct ServiceFuture { + #[pin] + inner: F, + tx: Option>, +} + +impl ServiceFuture { + fn new(inner: F, tx: UnboundedSender) -> Self { + Self { + inner, + tx: Some(tx), + } + } +} + +impl Future for ServiceFuture +where + F: Future, Error>>, +{ + type Output = Result>, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + + this.inner.poll(cx).map(|result| { + result.map(|response| { + response + .map(|body| Body::new(body, this.tx.take().expect("future polled after ready"))) + }) + }) + } +} + +#[pin_project] +pub struct Body { + #[pin] + inner: B, + tx: UnboundedSender, +} + +impl Body { + fn new(inner: B, tx: UnboundedSender) -> Self { + Self { inner, tx } + } +} + +impl hyper::body::HttpBody for Body +where + B: hyper::body::HttpBody, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll>> { + let this = self.project(); + let option = ready!(this.inner.poll_data(cx)); + + if option.is_none() { + let _ = this.tx.send(State::Reset); + } + + Poll::Ready(option) + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll, Self::Error>> { + self.project().inner.poll_trailers(cx) + } + + fn is_end_stream(&self) -> bool { + let is_end_stream = self.inner.is_end_stream(); + + if is_end_stream { + let _ = self.tx.send(State::Reset); + } + + is_end_stream + } + + fn size_hint(&self) -> hyper::body::SizeHint { + self.inner.size_hint() + } +} From 625dca14d81df25f4bdf50d54bf1db7f7334c7f8 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 7 May 2024 22:08:32 +0000 Subject: [PATCH 02/16] style: do format rust files correctly --- crates/base/src/rt_worker/worker_ctx.rs | 7 +-- crates/base/tests/integration_tests.rs | 11 +++-- crates/cpu_timer/src/lib.rs | 9 +--- crates/node/errors.rs | 20 +++------ crates/npm/managed/mod.rs | 5 ++- crates/npm/managed/tarball.rs | 59 ++++++++++++------------- crates/sb_core/cache/fetch_cacher.rs | 19 ++++---- crates/sb_fs/virtual_fs.rs | 26 ++++++----- crates/sb_graph/graph_resolver.rs | 11 +++-- 9 files changed, 78 insertions(+), 89 deletions(-) diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index b9e1b0be7..34cd3c137 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -253,7 +253,7 @@ pub fn create_supervisor( debug!("Hard memory limit triggered"); if memory_limit_tx.send(()).is_err() { - error!("failed to send memory limit reached notification - isolate may already be terminating"); + error!("failed to send memory limit reached notification - isolate may already be terminating"); } true @@ -263,10 +263,7 @@ pub fn create_supervisor( worker_runtime.js_runtime.add_near_heap_limit_callback({ let memory_limit_tx = memory_limit_tx.clone(); move |cur, _| { - debug!( - "Low memory alert triggered: {}", - bytes_to_display(cur as u64), - ); + debug!("Low memory alert triggered: {}", bytes_to_display(cur as u64),); if memory_limit_tx.send(()).is_err() { error!("failed to send memory limit reached notification - isolate may already be terminating"); diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index e64fa4117..91c65621b 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -1310,12 +1310,11 @@ async fn test_decorators(ty: Option) { if is_disabled { assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); - assert!( - resp.text().await.unwrap().starts_with( - "{\"msg\":\"InvalidWorkerCreation: worker boot error Uncaught SyntaxError: Invalid or unexpected token" - ), - - ); + assert!(resp + .text() + .await + .unwrap() + .starts_with("{\"msg\":\"InvalidWorkerCreation: worker boot error Uncaught SyntaxError: Invalid or unexpected token"),); } else { assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.text().await.unwrap().as_str(), "meow?"); diff --git a/crates/cpu_timer/src/lib.rs b/crates/cpu_timer/src/lib.rs index f68c08bf4..9d3e40d5a 100644 --- a/crates/cpu_timer/src/lib.rs +++ b/crates/cpu_timer/src/lib.rs @@ -212,14 +212,9 @@ fn register_sigalrm() { std::thread::Builder::new() .name("sb-cpu-timer".into()) .spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - + let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); let sig_receiver_handle = rt.spawn(async move { - let mut signals = - SignalsInfo::with_exfiltrator([signal::SIGALRM], raw::WithRawSiginfo).unwrap(); + let mut signals = SignalsInfo::with_exfiltrator([signal::SIGALRM], raw::WithRawSiginfo).unwrap(); while let Some(siginfo) = signals.next().await { let _ = sig_timer_id_tx.send(unsafe { siginfo.si_value().sival_ptr as usize }); diff --git a/crates/node/errors.rs b/crates/node/errors.rs index 16b6b59bb..352b5e3c8 100644 --- a/crates/node/errors.rs +++ b/crates/node/errors.rs @@ -147,13 +147,13 @@ pub fn err_package_import_not_defined( } pub fn err_unsupported_dir_import(path: &str, base: &str) -> AnyError { - generic_error(format!("[ERR_UNSUPPORTED_DIR_IMPORT] Directory import '{path}' is not supported resolving ES modules imported from {base}")) + generic_error(format!( + "[ERR_UNSUPPORTED_DIR_IMPORT] Directory import '{path}' is not supported resolving ES modules imported from {base}" + )) } pub fn err_unsupported_esm_url_scheme(url: &Url) -> AnyError { - let mut msg = - "[ERR_UNSUPPORTED_ESM_URL_SCHEME] Only file and data URLS are supported by the default ESM loader" - .to_string(); + let mut msg = "[ERR_UNSUPPORTED_ESM_URL_SCHEME] Only file and data URLS are supported by the default ESM loader".to_string(); if cfg!(window) && url.scheme().len() == 2 { msg = format!("{msg}. On Windows, absolute path must be valid file:// URLs"); @@ -181,14 +181,8 @@ mod test { format!("[ERR_PACKAGE_PATH_NOT_EXPORTED] Package subpath './jsx-runtime' is not defined for types by \"exports\" in 'test_path{separator_char}package.json'") ); assert_eq!( - err_package_path_not_exported( - "test_path".to_string(), - ".", - None, - NodeResolutionMode::Types, - ) - .to_string(), - format!("[ERR_PACKAGE_PATH_NOT_EXPORTED] No \"exports\" main defined for types in 'test_path{separator_char}package.json'") - ); + err_package_path_not_exported("test_path".to_string(), ".", None, NodeResolutionMode::Types,).to_string(), + format!("[ERR_PACKAGE_PATH_NOT_EXPORTED] No \"exports\" main defined for types in 'test_path{separator_char}package.json'") + ); } } diff --git a/crates/npm/managed/mod.rs b/crates/npm/managed/mod.rs index b6bb9699b..540d36454 100644 --- a/crates/npm/managed/mod.rs +++ b/crates/npm/managed/mod.rs @@ -378,7 +378,10 @@ impl ManagedCliNpmResolver { Ok(nv) => NpmPackageReqResolution::Ok(nv), Err(err) => { if self.api.mark_force_reload() { - log::debug!("Restarting npm specifier resolution to check for new registry information. Error: {:#}", err); + log::debug!( + "Restarting npm specifier resolution to check for new registry information. Error: {:#}", + err + ); NpmPackageReqResolution::ReloadRegistryInfo(err.into()) } else { NpmPackageReqResolution::Err(err.into()) diff --git a/crates/npm/managed/tarball.rs b/crates/npm/managed/tarball.rs index 585d34c59..bfc946848 100644 --- a/crates/npm/managed/tarball.rs +++ b/crates/npm/managed/tarball.rs @@ -75,11 +75,11 @@ fn verify_tarball_integrity( if tarball_checksum != *expected_checksum { bail!( - "Tarball checksum did not match what was provided by npm registry for {}.\n\nExpected: {}\nActual: {}", - package, - expected_checksum, - tarball_checksum, - ) + "Tarball checksum did not match what was provided by npm registry for {}.\n\nExpected: {}\nActual: {}", + package, + expected_checksum, + tarball_checksum, + ) } Ok(()) } @@ -159,8 +159,7 @@ mod test { name: "package".to_string(), version: Version::parse_from_npm("1.0.0").unwrap(), }; - let actual_checksum = - "z4PhNX7vuL3xVChQ1m2AB9Yg5AULVxXcg/SpIdNs6c5H0NE8XYXysP+DGNKHfuwvY7kxvUdBeoGlODJ6+SfaPg=="; + let actual_checksum = "z4PhNX7vuL3xVChQ1m2AB9Yg5AULVxXcg/SpIdNs6c5H0NE8XYXysP+DGNKHfuwvY7kxvUdBeoGlODJ6+SfaPg=="; assert_eq!( verify_tarball_integrity( &package, @@ -196,23 +195,25 @@ mod test { .unwrap_err() .to_string(), concat!( - "Tarball checksum did not match what was provided by npm ", - "registry for package@1.0.0.\n\nExpected: test\nActual: 2jmj7l5rSw0yVb/vlWAYkK/YBwk=", - ), + "Tarball checksum did not match what was provided by npm ", + "registry for package@1.0.0.\n\nExpected: test\nActual: 2jmj7l5rSw0yVb/vlWAYkK/YBwk=", + ), ); assert_eq!( - verify_tarball_integrity( - &package, - &Vec::new(), - &NpmPackageVersionDistInfoIntegrity::Integrity { - algorithm: "sha512", - base64_hash: "test" - } - ) - .unwrap_err() - .to_string(), - format!("Tarball checksum did not match what was provided by npm registry for package@1.0.0.\n\nExpected: test\nActual: {actual_checksum}"), - ); + verify_tarball_integrity( + &package, + &Vec::new(), + &NpmPackageVersionDistInfoIntegrity::Integrity { + algorithm: "sha512", + base64_hash: "test" + } + ) + .unwrap_err() + .to_string(), + format!( + "Tarball checksum did not match what was provided by npm registry for package@1.0.0.\n\nExpected: test\nActual: {actual_checksum}" + ), + ); assert!(verify_tarball_integrity( &package, &Vec::new(), @@ -224,15 +225,11 @@ mod test { .is_ok()); let actual_hex = "da39a3ee5e6b4b0d3255bfef95601890afd80709"; assert_eq!( - verify_tarball_integrity( - &package, - &Vec::new(), - &NpmPackageVersionDistInfoIntegrity::LegacySha1Hex("test"), - ) - .unwrap_err() - .to_string(), - format!("Tarball checksum did not match what was provided by npm registry for package@1.0.0.\n\nExpected: test\nActual: {actual_hex}"), - ); + verify_tarball_integrity(&package, &Vec::new(), &NpmPackageVersionDistInfoIntegrity::LegacySha1Hex("test"),) + .unwrap_err() + .to_string(), + format!("Tarball checksum did not match what was provided by npm registry for package@1.0.0.\n\nExpected: test\nActual: {actual_hex}"), + ); assert!(verify_tarball_integrity( &package, &Vec::new(), diff --git a/crates/sb_core/cache/fetch_cacher.rs b/crates/sb_core/cache/fetch_cacher.rs index e660691ba..9a21768b1 100644 --- a/crates/sb_core/cache/fetch_cacher.rs +++ b/crates/sb_core/cache/fetch_cacher.rs @@ -178,8 +178,8 @@ impl Loader for FetchCacher { LoaderCacheSetting::Reload => { if matches!(file_fetcher.cache_setting(), CacheSetting::Only) { return Err(deno_core::anyhow::anyhow!( - "Failed to resolve version constraint. Try running again without --cached-only" - )); + "Failed to resolve version constraint. Try running again without --cached-only" + )); } Some(CacheSetting::ReloadAll) } @@ -194,15 +194,12 @@ impl Loader for FetchCacher { }) .await .map(|file| { - let maybe_headers = - match (file.maybe_headers, file_header_overrides.get(&specifier)) { - (Some(headers), Some(overrides)) => { - Some(headers.into_iter().chain(overrides.clone()).collect()) - } - (Some(headers), None) => Some(headers), - (None, Some(overrides)) => Some(overrides.clone()), - (None, None) => None, - }; + let maybe_headers = match (file.maybe_headers, file_header_overrides.get(&specifier)) { + (Some(headers), Some(overrides)) => Some(headers.into_iter().chain(overrides.clone()).collect()), + (Some(headers), None) => Some(headers), + (None, Some(overrides)) => Some(overrides.clone()), + (None, None) => None, + }; Ok(Some(LoadResponse::Module { specifier: file.specifier, maybe_headers, diff --git a/crates/sb_fs/virtual_fs.rs b/crates/sb_fs/virtual_fs.rs index de14b469d..a533b3087 100644 --- a/crates/sb_fs/virtual_fs.rs +++ b/crates/sb_fs/virtual_fs.rs @@ -96,22 +96,22 @@ impl VfsBuilder { if target.is_file() { // this may change behavior, so warn the user about it log::warn!( - "Symlink target is outside '{}'. Inlining symlink at '{}' to '{}' as file.", - self.root_path.display(), - path.display(), - target.display(), - ); + "Symlink target is outside '{}'. Inlining symlink at '{}' to '{}' as file.", + self.root_path.display(), + path.display(), + target.display(), + ); // inline the symlink and make the target file let file_bytes = std::fs::read(&target) .with_context(|| format!("Reading {}", path.display()))?; self.add_file(&path, file_bytes)?; } else { log::warn!( - "Symlink target is outside '{}'. Excluding symlink at '{}' with target '{}'.", - self.root_path.display(), - path.display(), - target.display(), - ); + "Symlink target is outside '{}'. Excluding symlink at '{}' with target '{}'.", + self.root_path.display(), + path.display(), + target.display(), + ); } } } @@ -527,7 +527,11 @@ impl FileBackedVfsFile { if offset >= 0 { *current_pos += offset as u64; } else if -offset as u64 > *current_pos { - return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "An attempt was made to move the file pointer before the beginning of the file.").into()); + return Err(std::io::Error::new( + std::io::ErrorKind::PermissionDenied, + "An attempt was made to move the file pointer before the beginning of the file.", + ) + .into()); } else { *current_pos -= -offset as u64; } diff --git a/crates/sb_graph/graph_resolver.rs b/crates/sb_graph/graph_resolver.rs index 330ebe21b..43f47e62d 100644 --- a/crates/sb_graph/graph_resolver.rs +++ b/crates/sb_graph/graph_resolver.rs @@ -49,9 +49,10 @@ fn resolve_package_json_dep( if specifier.starts_with(bare_specifier) { let path = &specifier[bare_specifier.len()..]; if path.is_empty() || path.starts_with('/') { - let req = req_result.as_ref().map_err(|_err| { - anyhow!("Parsing version constraints in the application-level package.json is more strict at the moment.") - })?; + let req = req_result + .as_ref() + .map_err(|_err| anyhow!("Parsing version constraints in the application-level package.json is more strict at the moment."))?; + return Ok(Some(ModuleSpecifier::parse(&format!("npm:{req}{path}"))?)); } } @@ -234,7 +235,9 @@ impl Resolver for CliGraphResolver { if let Some(vendor_specifier) = &self.maybe_vendor_specifier { if let Ok(specifier) = &result { if specifier.as_str().starts_with(vendor_specifier.as_str()) { - return Err(ResolveError::Other(anyhow!("Importing from the vendor directory is not permitted. Use a remote specifier instead or disable vendoring."))); + return Err(ResolveError::Other(anyhow!( + "Importing from the vendor directory is not permitted. Use a remote specifier instead or disable vendoring." + ))); } } } From ba6aefe1ab3a3a4ce79b70388d1e52b45001c96a Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 7 May 2024 22:08:59 +0000 Subject: [PATCH 03/16] chore(cli): fill `description` field --- crates/cli/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index fffc676c8..67d6cf6dc 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -2,6 +2,7 @@ name = "cli" version = "0.1.0" edition = "2021" +description = "A server based on Deno runtime, capable of running JavaScript, TypeScript, and WASM services" [[bin]] name = "edge-runtime" From beb8684680e25896b2b1d3d3189f5de544ba734a Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Tue, 7 May 2024 22:20:36 +0000 Subject: [PATCH 04/16] stamp: move cli flags into a separated module --- crates/cli/src/flags.rs | 222 ++++++++++++++++++++++++++++++++++++++++ crates/cli/src/main.rs | 186 +-------------------------------- 2 files changed, 226 insertions(+), 182 deletions(-) create mode 100644 crates/cli/src/flags.rs diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs new file mode 100644 index 000000000..83f308cd5 --- /dev/null +++ b/crates/cli/src/flags.rs @@ -0,0 +1,222 @@ +use std::{net::SocketAddr, path::PathBuf}; + +use clap::{ + arg, + builder::{BoolishValueParser, FalseyValueParser, TypedValueParser}, + crate_version, value_parser, ArgAction, ArgGroup, Command, +}; + +pub(super) fn get_cli() -> Command { + Command::new(env!("CARGO_BIN_NAME")) + .about(env!("CARGO_PKG_DESCRIPTION")) + .version(format!( + "{}\ndeno {} ({}, {})", + crate_version!(), + env!("DENO_VERSION"), + env!("PROFILE"), + env!("TARGET") + )) + .arg_required_else_help(true) + .arg( + arg!(-v - -verbose) + .help("Use verbose output") + .conflicts_with("quiet") + .global(true) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(-q - -quiet) + .help("Do not print any log messages") + .conflicts_with("verbose") + .global(true) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(--"log-source") + .help("Include source file and line in log messages") + .global(true) + .action(ArgAction::SetTrue), + ) + .subcommand(get_start_command()) + .subcommand(get_bundle_command()) + .subcommand(get_unbundle_command()) +} + +fn get_start_command() -> Command { + Command::new("start") + .about("Start the server") + .arg(arg!(-i --ip ).help("Host IP address to listen on").default_value("0.0.0.0")) + .arg( + arg!(-p --port ) + .help("Port to listen on") + .env("EDGE_RUNTIME_PORT") + .default_value("9000") + .value_parser(value_parser!(u16)), + ) + .arg( + arg!(--tls[PORT]) + .env("EDGE_RUNTIME_TLS") + .num_args(0..=1) + .default_missing_value("443") + .value_parser(value_parser!(u16)) + .requires("key") + .requires("cert"), + ) + .arg( + arg!(--key ) + .help("Path to PEM-encoded key to be used to TLS") + .env("EDGE_RUNTIME_TLS_KEY_PATH") + .value_parser(value_parser!(PathBuf)), + ) + .arg( + arg!(--cert ) + .help("Path to PEM-encoded X.509 certificate to be used to TLS") + .env("EDGE_RUNTIME_TLS_CERT_PATH") + .value_parser(value_parser!(PathBuf)), + ) + .arg( + arg!(--"main-service" ) + .help("Path to main service directory or eszip") + .default_value("examples/main"), + ) + .arg( + arg!(--"disable-module-cache") + .help("Disable using module cache") + .default_value("false") + .value_parser(FalseyValueParser::new()), + ) + .arg(arg!(--"import-map" ).help("Path to import map file")) + .arg(arg!(--"event-worker" ).help("Path to event worker directory")) + .arg(arg!(--"main-entrypoint" ).help("Path to entrypoint in main service (only for eszips)")) + .arg(arg!(--"events-entrypoint" ).help("Path to entrypoint in events worker (only for eszips)")) + .arg( + arg!(--"policy" ) + .help("Policy to enforce in the worker pool") + .default_value("per_worker") + .value_parser(["per_worker", "per_request", "oneshot"]), + ) + .arg( + arg!(--"decorator" ) + .help(concat!( + "Type of decorator to use on the main worker and event worker. ", + "If not specified, the decorator feature is disabled." + )) + .value_parser(["tc39", "typescript", "typescript_with_metadata"]), + ) + .arg( + arg!(--"graceful-exit-timeout"[SECONDS]) + .help(concat!( + "Maximum time in seconds that can wait for workers before terminating forcibly. ", + "If providing zero value, the runtime will not try a graceful exit." + )) + // NOTE(Nyannyacha): Default timeout value follows the + // value[1] defined in moby. + // + // [1]: https://github.com/moby/moby/blob/master/daemon/config/config.go#L45-L47 + .default_value("15") + .value_parser(value_parser!(u64).range(..u64::MAX)), + ) + .arg( + arg!( + --"experimental-graceful-exit-keepalive-deadline-ratio" + + ) + .help(concat!( + "(Experimental) Maximum period of time that incoming requests can be processed over a", + " pre-established keep-alive HTTP connection. ", + "This is specified as a percentage of the `--graceful-exit-timeout` value. ", + "The percentage cannot be greater than 95." + )) + .value_parser(value_parser!(u64).range(..=95)), + ) + .arg( + arg!(--"max-parallelism" ) + .help("Maximum count of workers that can exist in the worker pool simultaneously") + .value_parser( + // NOTE: Acceptable bounds were chosen arbitrarily. + value_parser!(u32).range(1..9999).map(|it| -> usize { it as usize }), + ), + ) + .arg( + arg!(--"request-wait-timeout" ) + .help("Maximum time in milliseconds that can wait to establish a connection with a worker") + .value_parser(value_parser!(u64)), + ) + .arg( + arg!(--"inspect"[HOST_AND_PORT]) + .help("Activate inspector on host:port") + .num_args(0..=1) + .value_parser(value_parser!(SocketAddr)) + .require_equals(true) + .default_missing_value("127.0.0.1:9229"), + ) + .arg( + arg!(--"inspect-brk"[HOST_AND_PORT]) + .help("Activate inspector on host:port, wait for debugger to connect and break at the start of user script") + .num_args(0..=1) + .value_parser(value_parser!(SocketAddr)) + .require_equals(true) + .default_missing_value("127.0.0.1:9229"), + ) + .arg( + arg!(--"inspect-wait"[HOST_AND_PORT]) + .help("Activate inspector on host:port and wait for debugger to connect before running user code") + .num_args(0..=1) + .value_parser(value_parser!(SocketAddr)) + .require_equals(true) + .default_missing_value("127.0.0.1:9229"), + ) + .group(ArgGroup::new("inspector").args(["inspect", "inspect-brk", "inspect-wait"])) + .arg( + arg!(--"inspect-main") + .help("Allow creating inspector for main worker") + .requires("inspector") + .action(ArgAction::SetTrue), + ) + .arg(arg!(--"static" ).help("Glob pattern for static files to be included")) + .arg( + arg!(--"tcp-nodelay"[BOOL]) + .help("Disables Nagle's algorithm") + .num_args(0..=1) + .value_parser(BoolishValueParser::new()) + .require_equals(true) + .default_value("true") + .default_missing_value("true"), + ) +} + +fn get_bundle_command() -> Command { + Command::new("bundle") + .about(concat!( + "Creates an 'eszip' file that can be executed by the EdgeRuntime. ", + "Such file contains all the modules in contained in a single binary." + )) + .arg(arg!(--"output" ).help("Path to output eszip file").default_value("bin.eszip")) + .arg( + arg!(--"entrypoint" ) + .help("Path to entrypoint to bundle as an eszip") + .required(true), + ) + .arg(arg!(--"static" ).help("Glob pattern for static files to be included")) + .arg(arg!(--"import-map" ).help("Path to import map file")) + .arg( + arg!(--"decorator" ) + .help("Type of decorator to use when bundling. If not specified, the decorator feature is disabled.") + .value_parser(["tc39", "typescript", "typescript_with_metadata"]), + ) +} + +fn get_unbundle_command() -> Command { + Command::new("unbundle") + .about("Unbundles an .eszip file into the specified directory") + .arg( + arg!(--"output" ) + .help("Path to extract the ESZIP content") + .default_value("./"), + ) + .arg( + arg!(--"eszip" ) + .help("Path of eszip to extract") + .required(true), + ) +} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 26779826e..4f962ae48 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -1,3 +1,4 @@ +mod flags; mod logger; use anyhow::{anyhow, bail, Error}; @@ -6,9 +7,9 @@ use base::deno_runtime::MAYBE_DENO_VERSION; use base::rt_worker::worker_pool::{SupervisorPolicy, WorkerPoolPolicy}; use base::server::{ServerFlags, Tls, WorkerEntrypoints}; use base::{DecoratorType, InspectorOption}; -use clap::builder::{BoolishValueParser, FalseyValueParser, TypedValueParser}; -use clap::{arg, crate_version, value_parser, ArgAction, ArgGroup, ArgMatches, Command}; +use clap::ArgMatches; use deno_core::url::Url; +use flags::get_cli; use log::warn; use sb_graph::emitter::EmitterFactory; use sb_graph::import_map::load_import_map; @@ -21,185 +22,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; -fn cli() -> Command { - Command::new("edge-runtime") - .about("A server based on Deno runtime, capable of running JavaScript, TypeScript, and WASM services") - .version(format!( - "{}\ndeno {} ({}, {})", - crate_version!(), - env!("DENO_VERSION"), - env!("PROFILE"), - env!("TARGET") - )) - .arg_required_else_help(true) - .arg( - arg!(-v --verbose "Use verbose output") - .conflicts_with("quiet") - .global(true) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(-q --quiet "Do not print any log messages") - .conflicts_with("verbose") - .global(true) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(--"log-source" "Include source file and line in log messages") - .global(true) - .action(ArgAction::SetTrue), - ) - .subcommand( - Command::new("start") - .about("Start the server") - .arg(arg!(-i --ip "Host IP address to listen on").default_value("0.0.0.0")) - .arg( - arg!(-p --port "Port to listen on") - .env("EDGE_RUNTIME_PORT") - .default_value("9000") - .value_parser(value_parser!(u16)) - ) - .arg( - arg!(--tls [PORT]) - .env("EDGE_RUNTIME_TLS") - .num_args(0..=1) - .default_missing_value("443") - .value_parser(value_parser!(u16)) - .requires("key") - .requires("cert") - ) - .arg( - arg!(--key "Path to PEM-encoded key to be used to TLS") - .env("EDGE_RUNTIME_TLS_KEY_PATH") - .value_parser(value_parser!(PathBuf)) - ) - .arg( - arg!(--cert "Path to PEM-encoded X.509 certificate to be used to TLS") - .env("EDGE_RUNTIME_TLS_CERT_PATH") - .value_parser(value_parser!(PathBuf)) - ) - .arg(arg!(--"main-service" "Path to main service directory or eszip").default_value("examples/main")) - .arg(arg!(--"disable-module-cache" "Disable using module cache").default_value("false").value_parser(FalseyValueParser::new())) - .arg(arg!(--"import-map" "Path to import map file")) - .arg(arg!(--"event-worker" "Path to event worker directory")) - .arg(arg!(--"main-entrypoint" "Path to entrypoint in main service (only for eszips)")) - .arg(arg!(--"events-entrypoint" "Path to entrypoint in events worker (only for eszips)")) - .arg( - arg!(--"policy" "Policy to enforce in the worker pool") - .default_value("per_worker") - .value_parser(["per_worker", "per_request", "oneshot"]) - ) - .arg( - arg!(--"decorator" "Type of decorator to use on the main worker and event worker. If not specified, the decorator feature is disabled.") - .value_parser(["tc39", "typescript", "typescript_with_metadata"]) - ) - .arg( - arg!(--"graceful-exit-timeout" [SECONDS]) - .help( - concat!( - "Maximum time in seconds that can wait for workers before terminating forcibly. ", - "If providing zero value, the runtime will not try a graceful exit." - ) - ) - // NOTE(Nyannyacha): Default timeout value follows the - // value[1] defined in moby. - // - // [1]: https://github.com/moby/moby/blob/master/daemon/config/config.go#L45-L47 - .default_value("15") - .value_parser( - value_parser!(u64) - .range(..u64::MAX) - ) - ) - .arg( - arg!( - --"experimental-graceful-exit-keepalive-deadline-ratio" - - ) - .help( - concat!( - "(Experimental) Maximum period of time that incoming requests can be processed over a pre-established ", - "keep-alive HTTP connection. ", - "This is specified as a percentage of the `--graceful-exit-timeout` value. ", - "The percentage cannot be greater than 95." - ) - ) - .value_parser( - value_parser!(u64) - .range(..=95) - ) - ) - .arg( - arg!(--"max-parallelism" "Maximum count of workers that can exist in the worker pool simultaneously") - .value_parser( - // NOTE: Acceptable bounds were chosen arbitrarily. - value_parser!(u32) - .range(1..9999) - .map(|it| -> usize { it as usize }) - ) - ) - .arg( - arg!(--"request-wait-timeout" "Maximum time in milliseconds that can wait to establish a connection with a worker") - .value_parser(value_parser!(u64)) - ) - .arg( - arg!(--"inspect" [HOST_AND_PORT] "Activate inspector on host:port (default: 127.0.0.1:9229)") - .num_args(0..=1) - .value_parser(value_parser!(SocketAddr)) - .require_equals(true) - .default_missing_value("127.0.0.1:9229") - ) - .arg( - arg!(--"inspect-brk" [HOST_AND_PORT] "Activate inspector on host:port, wait for debugger to connect and break at the start of user script") - .num_args(0..=1) - .value_parser(value_parser!(SocketAddr)) - .require_equals(true) - .default_missing_value("127.0.0.1:9229") - ) - .arg( - arg!(--"inspect-wait" [HOST_AND_PORT] "Activate inspector on host:port and wait for debugger to connect before running user code") - .num_args(0..=1) - .value_parser(value_parser!(SocketAddr)) - .require_equals(true) - .default_missing_value("127.0.0.1:9229") - ) - .group( - ArgGroup::new("inspector") - .args(["inspect", "inspect-brk", "inspect-wait"]) - ) - .arg( - arg!(--"inspect-main" "Allow creating inspector for main worker") - .requires("inspector") - .action(ArgAction::SetTrue) - ) - .arg(arg!(--"static" "Glob pattern for static files to be included")) - .arg(arg!(--"tcp-nodelay" [BOOL] "Disables Nagle's algorithm") - .num_args(0..=1) - .value_parser(BoolishValueParser::new()) - .require_equals(true) - .default_value("true") - .default_missing_value("true") - ) - ) - .subcommand( - Command::new("bundle") - .about("Creates an 'eszip' file that can be executed by the EdgeRuntime. Such file contains all the modules in contained in a single binary.") - .arg(arg!(--"output" "Path to output eszip file").default_value("bin.eszip")) - .arg(arg!(--"entrypoint" "Path to entrypoint to bundle as an eszip").required(true)) - .arg(arg!(--"static" "Glob pattern for static files to be included")) - .arg(arg!(--"import-map" "Path to import map file")) - .arg( - arg!(--"decorator" "Type of decorator to use when bundling. If not specified, the decorator feature is disabled.") - .value_parser(["tc39", "typescript", "typescript_with_metadata"]) - ) - ).subcommand( - Command::new("unbundle") - .about("Unbundles an .eszip file into the specified directory") - .arg(arg!(--"output" "Path to extract the ESZIP content").default_value("./")) - .arg(arg!(--"eszip" "Path of eszip to extract").required(true)) - ) -} - fn main() -> Result<(), anyhow::Error> { MAYBE_DENO_VERSION.get_or_init(|| env!("DENO_VERSION").to_string()); @@ -212,7 +34,7 @@ fn main() -> Result<(), anyhow::Error> { // TODO: Tokio runtime shouldn't be needed here (Address later) let local = tokio::task::LocalSet::new(); let res: Result<(), Error> = local.block_on(&runtime, async { - let matches = cli().get_matches(); + let matches = get_cli().get_matches(); if !matches.get_flag("quiet") { let verbose = matches.get_flag("verbose"); From 43e0011631e254dcc04ee6b08670ba89ac7a1d28 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Wed, 8 May 2024 02:48:28 +0000 Subject: [PATCH 05/16] stamp: update `Cargo.lock` --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index f969be055..516933a48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6224,7 +6224,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "rand", "static_assertions", ] From 057ece3b9ed0c0016484137aca6b865b6a69ca3b Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Wed, 8 May 2024 02:48:46 +0000 Subject: [PATCH 06/16] stamp: polishing --- Cargo.toml | 3 ++- crates/base/Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bb5f671f7..5b992dadb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ deno_webidl = { version = "0.136.0" } deno_web = { version = "0.167.0" } deno_websocket = { version = "0.141.0" } deno_webstorage = { version = "0.131.0" } +deno_lockfile = { version = "0.18.0" } enum-as-inner = "0.6.0" serde = { version = "1.0.149", features = ["derive"] } hyper = "0.14.26" @@ -52,7 +53,6 @@ tokio = { version = "1.35", features = ["full"] } bytes = { version = "1.4.0" } once_cell = "1.17.1" thiserror = "1.0.40" -deno_lockfile = "0.18.0" async-trait = "0.1.73" indexmap = { version = "2.0.0", features = ["serde"] } flate2 = "=1.0.26" @@ -78,6 +78,7 @@ glob = "0.3.1" httparse = "1.8" http = "0.2" faster-hex = "0.9.0" + # DEBUG #[patch.crates-io] #deno_core = { path = "/your/path/to/deno_core/core" } diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index b092725b6..98319d5a7 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -44,7 +44,7 @@ tokio-rustls = { version = "0.25.0" } rustls-pemfile = { version = "2.1.0" } futures-util = { workspace = true } url.workspace = true -event_worker ={ version = "0.1.0", path = "../event_worker" } +event_worker = { version = "0.1.0", path = "../event_worker" } sb_workers = { version = "0.1.0", path = "../sb_workers" } sb_env = { version = "0.1.0", path = "../sb_env" } sb_core = { version = "0.1.0", path = "../sb_core" } @@ -109,7 +109,7 @@ reqwest.workspace = true serde = { workspace = true, features = ["derive"] } tokio.workspace = true url.workspace = true -event_worker ={ version = "0.1.0", path = "../event_worker" } +event_worker = { version = "0.1.0", path = "../event_worker" } deno_broadcast_channel.workspace = true deno_core.workspace = true deno_canvas.workspace = true From 1b29d8351842ed57b83be2a18d2d6cd72e53be32 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Wed, 8 May 2024 03:16:45 +0000 Subject: [PATCH 07/16] stamp: add a flag for read timeout for request --- crates/base/src/server.rs | 3 +++ crates/cli/src/flags.rs | 5 +++++ crates/cli/src/main.rs | 3 +++ 3 files changed, 11 insertions(+) diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 4a148ae2a..bf6d1e88b 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -244,6 +244,7 @@ pub struct ServerFlags { pub tcp_nodelay: bool, pub graceful_exit_deadline_sec: u64, pub graceful_exit_keepalive_deadline_ms: Option, + pub request_idle_timeout_ms: Option, } #[derive(Debug)] @@ -467,11 +468,13 @@ impl Server { let ServerFlags { tcp_nodelay, + request_idle_timeout_ms, mut graceful_exit_deadline_sec, mut graceful_exit_keepalive_deadline_ms, .. } = flags; + let request_idle_timeout_dur = request_idle_timeout_ms.map(Duration::from_millis); let mut terminate_signal_fut = get_termination_signal(); loop { diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs index 83f308cd5..47c865309 100644 --- a/crates/cli/src/flags.rs +++ b/crates/cli/src/flags.rs @@ -142,6 +142,11 @@ fn get_start_command() -> Command { .help("Maximum time in milliseconds that can wait to establish a connection with a worker") .value_parser(value_parser!(u64)), ) + .arg( + arg!(--"request-idle-timeout" ) + .help("Maximum time that can be waited from when the connection is accepted until the request body is fully read") + .value_parser(value_parser!(u64)), + ) .arg( arg!(--"inspect"[HOST_AND_PORT]) .help("Activate inspector on host:port") diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 4f962ae48..e31ed8493 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -122,6 +122,8 @@ fn main() -> Result<(), anyhow::Error> { sub_matches.get_one::("max-parallelism").cloned(); let maybe_request_wait_timeout = sub_matches.get_one::("request-wait-timeout").cloned(); + let maybe_request_idle_timeout = + sub_matches.get_one::("request-idle-timeout").cloned(); let static_patterns = if let Some(val_ref) = sub_matches.get_many::("static") { val_ref.map(|s| s.as_str()).collect::>() @@ -194,6 +196,7 @@ fn main() -> Result<(), anyhow::Error> { tcp_nodelay, graceful_exit_deadline_sec, graceful_exit_keepalive_deadline_ms, + request_idle_timeout_ms: maybe_request_idle_timeout, }, None, WorkerEntrypoints { From 6c4710760764c2e0427a3c0c153bb09a9fb44287 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Wed, 8 May 2024 03:18:00 +0000 Subject: [PATCH 08/16] stamp: niche optimization --- crates/base/src/server.rs | 21 ++--- crates/base/src/timeout.rs | 153 +++++++++++++++++++++++-------------- 2 files changed, 106 insertions(+), 68 deletions(-) diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index bf6d1e88b..f6be7bbf5 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -686,23 +686,26 @@ fn accept_stream( tokio::task::spawn({ async move { let (service, cancel) = WorkerService::new(metric_src.clone(), req_tx); - let (timeout_tx, timeout_rx) = mpsc::unbounded_channel(); + let (io, maybe_timeout_tx) = if let Some(timeout_dur) = maybe_req_idle_timeout_dur { + let (timeout_tx, timeout_rx) = mpsc::unbounded_channel(); - let io = crate::timeout::Stream::new( - io, - maybe_req_idle_timeout_dur.unwrap_or(Duration::MAX), - timeout_rx, - ); - - let service = crate::timeout::Service::new(service, timeout_tx); + ( + crate::timeout::Stream::with_timeout(io, timeout_dur, timeout_rx), + Some(timeout_tx), + ) + } else { + (crate::timeout::Stream::with_bypass(io), None) + }; let _guard = cancel.drop_guard(); let _active_io_count_guard = scopeguard::guard(metric_src, |it| { it.decl_active_io(); }); - let conn_fut = Http::new().serve_connection(io, service).with_upgrades(); let mut shutting_down = false; + let conn_fut = Http::new() + .serve_connection(io, crate::timeout::Service::new(service, maybe_timeout_tx)) + .with_upgrades(); pin!(conn_fut); diff --git a/crates/base/src/timeout.rs b/crates/base/src/timeout.rs index 4a8fee24a..571348190 100644 --- a/crates/base/src/timeout.rs +++ b/crates/base/src/timeout.rs @@ -20,25 +20,43 @@ pub(super) enum State { Reset, } +enum StreamKind { + UseTimeout { + sleep: Pin>, + duration: Duration, + waiting: bool, + finished: bool, + state: UnboundedReceiver, + }, + + Bypass, +} + pub struct Stream { inner: S, - sleep: Pin>, - duration: Duration, - waiting: bool, - finished: bool, - state: UnboundedReceiver, + kind: StreamKind, } impl Stream { - pub(super) fn new(inner: S, duration: Duration, rx: UnboundedReceiver) -> Self { - Self { + fn new(inner: S, kind: StreamKind) -> Self { + Self { inner, kind } + } + + pub(super) fn with_timeout(inner: S, duration: Duration, rx: UnboundedReceiver) -> Self { + Self::new( inner, - sleep: Box::pin(sleep(duration)), - duration, - waiting: false, - finished: false, - state: rx, - } + StreamKind::UseTimeout { + sleep: Box::pin(sleep(duration)), + duration, + waiting: false, + finished: false, + state: rx, + }, + ) + } + + pub(super) fn with_bypass(inner: S) -> Self { + Self::new(inner, StreamKind::Bypass) } } @@ -48,31 +66,43 @@ impl AsyncRead for Stream { cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { - if !self.finished { - match Pin::new(&mut self.state).poll_recv(cx) { - Poll::Ready(Some(State::Reset)) => { - self.waiting = false; - - let deadline = Instant::now() + self.duration; - - self.sleep.as_mut().reset(deadline); + match &mut self.kind { + StreamKind::UseTimeout { + sleep, + duration, + waiting, + finished, + state, + } => { + if !*finished { + match Pin::new(state).poll_recv(cx) { + Poll::Ready(Some(State::Reset)) => { + *waiting = false; + + let deadline = Instant::now() + *duration; + + sleep.as_mut().reset(deadline); + } + + // enter waiting mode (for response body last chunk) + Poll::Ready(Some(State::Wait)) => *waiting = true, + Poll::Ready(None) => *finished = true, + Poll::Pending => (), + } } - // enter waiting mode (for response body last chunk) - Poll::Ready(Some(State::Wait)) => self.waiting = true, - Poll::Ready(None) => self.finished = true, - Poll::Pending => (), + if !*waiting { + // return error if timer is elapsed + if let Poll::Ready(()) = sleep.as_mut().poll(cx) { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "request header read timed out", + ))); + } + } } - } - if !self.waiting { - // return error if timer is elapsed - if let Poll::Ready(()) = self.sleep.as_mut().poll(cx) { - return Poll::Ready(Err(std::io::Error::new( - std::io::ErrorKind::TimedOut, - "request header read timed out", - ))); - } + StreamKind::Bypass => {} } Pin::new(&mut self.inner).poll_read(cx, buf) @@ -117,11 +147,11 @@ impl AsyncWrite for Stream { pub struct Service { inner: S, - tx: UnboundedSender, + tx: Option>, } impl Service { - pub(super) fn new(inner: S, tx: UnboundedSender) -> Self { + pub(super) fn new(inner: S, tx: Option>) -> Self { Self { inner, tx } } } @@ -139,8 +169,10 @@ where } fn call(&mut self, req: Request) -> Self::Future { - // send timer wait signal - let _ = self.tx.send(State::Wait); + if let Some(tx) = self.tx.as_ref() { + // send timer wait signal + let _ = tx.send(State::Wait); + } ServiceFuture::new(self.inner.call(req), self.tx.clone()) } @@ -154,11 +186,8 @@ pub struct ServiceFuture { } impl ServiceFuture { - fn new(inner: F, tx: UnboundedSender) -> Self { - Self { - inner, - tx: Some(tx), - } + fn new(inner: F, tx: Option>) -> Self { + Self { inner, tx } } } @@ -172,10 +201,7 @@ where let this = self.project(); this.inner.poll(cx).map(|result| { - result.map(|response| { - response - .map(|body| Body::new(body, this.tx.take().expect("future polled after ready"))) - }) + result.map(|response| response.map(|body| Body::new(body, this.tx.take()))) }) } } @@ -184,11 +210,11 @@ where pub struct Body { #[pin] inner: B, - tx: UnboundedSender, + tx: Option>, } impl Body { - fn new(inner: B, tx: UnboundedSender) -> Self { + fn new(inner: B, tx: Option>) -> Self { Self { inner, tx } } } @@ -205,13 +231,18 @@ where cx: &mut std::task::Context<'_>, ) -> Poll>> { let this = self.project(); - let option = ready!(this.inner.poll_data(cx)); - if option.is_none() { - let _ = this.tx.send(State::Reset); - } + if let Some(tx) = this.tx.as_ref() { + let option = ready!(this.inner.poll_data(cx)); - Poll::Ready(option) + if option.is_none() { + let _ = tx.send(State::Reset); + } + + Poll::Ready(option) + } else { + this.inner.poll_data(cx) + } } fn poll_trailers( @@ -222,13 +253,17 @@ where } fn is_end_stream(&self) -> bool { - let is_end_stream = self.inner.is_end_stream(); + if let Some(tx) = self.tx.as_ref() { + let is_end_stream = self.inner.is_end_stream(); - if is_end_stream { - let _ = self.tx.send(State::Reset); - } + if is_end_stream { + let _ = tx.send(State::Reset); + } - is_end_stream + is_end_stream + } else { + self.inner.is_end_stream() + } } fn size_hint(&self) -> hyper::body::SizeHint { From 181f461667b90b7b1d7c9dc84a4f2e1fe6031bef Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Wed, 8 May 2024 03:21:47 +0000 Subject: [PATCH 09/16] stamp: apply timeout passed from cli --- crates/base/src/server.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index f6be7bbf5..615d6468b 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -490,7 +490,14 @@ impl Server { let _ = stream.set_nodelay(true); } - accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone(), None) + accept_stream( + stream, + main_worker_req_tx, + event_tx, + metric_src, + graceful_exit_token.clone(), + request_idle_timeout_dur + ) } Err(e) => error!("socket error: {}", e) } @@ -510,7 +517,14 @@ impl Server { let _ = stream.get_ref().0.set_nodelay(true); } - accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone(), None); + accept_stream( + stream, + main_worker_req_tx, + event_tx, + metric_src, + graceful_exit_token.clone(), + request_idle_timeout_dur + ) } Err(e) => error!("socket error: {}", e) } From ac3ee13cdc27e0abb50ed30aad4811a77f35c0a0 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Wed, 8 May 2024 04:03:50 +0000 Subject: [PATCH 10/16] stamp: update integration test macro --- crates/base/src/macros/test_macros.rs | 32 +++++++++++++++++++++------ 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/crates/base/src/macros/test_macros.rs b/crates/base/src/macros/test_macros.rs index 13ca42c6b..300cf388c 100644 --- a/crates/base/src/macros/test_macros.rs +++ b/crates/base/src/macros/test_macros.rs @@ -1,6 +1,6 @@ #[macro_export] -macro_rules! integration_test { - ($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { +macro_rules! integration_test_with_server_flag { + ($flag:expr, $main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { use futures_util::FutureExt; use $crate::macros::test_macros::__private; @@ -11,12 +11,12 @@ macro_rules! integration_test { let schema = if tls.is_some() { "https" } else { "http" }; let signal = tokio::spawn(async move { while let Some(base::server::ServerHealth::Listening(event_rx, metric_src)) = rx.recv().await { - integration_test!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+)); + $crate::integration_test_with_server_flag!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+)); } None }); - let token = integration_test!(@term $(, $($token)+)?); + let token = $crate::integration_test_with_server_flag!(@term $(, $($token)+)?); let mut listen_fut = base::commands::start_server( "0.0.0.0", $port, @@ -26,7 +26,7 @@ macro_rules! integration_test { None, $policy, $import_map, - $crate::server::ServerFlags::default(), + $flag, Some(tx.clone()), $crate::server::WorkerEntrypoints { main: None, @@ -43,7 +43,7 @@ macro_rules! integration_test { if let Ok(maybe_response_from_server) = resp { // then, after checking the response... (2) let resp = maybe_response_from_server.unwrap(); - integration_test!(@resp resp, ($($function)+)).await; + $crate::integration_test_with_server_flag!(@resp resp, ($($function)+)).await; } else { panic!("Request thread had a heart attack"); } @@ -62,7 +62,7 @@ macro_rules! integration_test { let _ = listen_fut.await; }); - integration_test!(@term_cleanup $($($token)+)?, token, join_fut); + $crate::integration_test_with_server_flag!(@term_cleanup $($($token)+)?, token, join_fut); } }; @@ -136,6 +136,24 @@ macro_rules! integration_test { }; } +#[macro_export] +macro_rules! integration_test { + ($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { + $crate::integration_test_with_server_flag!( + $crate::server::ServerFlags::default(), + $main_file, + $port, + $url, + $policy, + $import_map, + $req_builder, + $tls, + ($($function)+) + $(,$($token)+)? + ) + }; +} + #[doc(hidden)] pub mod __private { use std::future::Future; From cd9b42d6a6380b4495f5e55e1a2383662a6747fb Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Wed, 8 May 2024 04:51:58 +0000 Subject: [PATCH 11/16] stamp: rustfmt --- crates/cli/src/flags.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs index 47c865309..81581e55d 100644 --- a/crates/cli/src/flags.rs +++ b/crates/cli/src/flags.rs @@ -18,15 +18,13 @@ pub(super) fn get_cli() -> Command { )) .arg_required_else_help(true) .arg( - arg!(-v - -verbose) - .help("Use verbose output") + arg!(-v --verbose "Use verbose output") .conflicts_with("quiet") .global(true) .action(ArgAction::SetTrue), ) .arg( - arg!(-q - -quiet) - .help("Do not print any log messages") + arg!(-q --quiet "Do not print any log messages") .conflicts_with("verbose") .global(true) .action(ArgAction::SetTrue), @@ -54,7 +52,7 @@ fn get_start_command() -> Command { .value_parser(value_parser!(u16)), ) .arg( - arg!(--tls[PORT]) + arg!(--tls [PORT]) .env("EDGE_RUNTIME_TLS") .num_args(0..=1) .default_missing_value("443") @@ -104,7 +102,7 @@ fn get_start_command() -> Command { .value_parser(["tc39", "typescript", "typescript_with_metadata"]), ) .arg( - arg!(--"graceful-exit-timeout"[SECONDS]) + arg!(--"graceful-exit-timeout" [SECONDS]) .help(concat!( "Maximum time in seconds that can wait for workers before terminating forcibly. ", "If providing zero value, the runtime will not try a graceful exit." @@ -148,7 +146,7 @@ fn get_start_command() -> Command { .value_parser(value_parser!(u64)), ) .arg( - arg!(--"inspect"[HOST_AND_PORT]) + arg!(--"inspect" [HOST_AND_PORT]) .help("Activate inspector on host:port") .num_args(0..=1) .value_parser(value_parser!(SocketAddr)) @@ -156,7 +154,7 @@ fn get_start_command() -> Command { .default_missing_value("127.0.0.1:9229"), ) .arg( - arg!(--"inspect-brk"[HOST_AND_PORT]) + arg!(--"inspect-brk" [HOST_AND_PORT]) .help("Activate inspector on host:port, wait for debugger to connect and break at the start of user script") .num_args(0..=1) .value_parser(value_parser!(SocketAddr)) @@ -164,7 +162,7 @@ fn get_start_command() -> Command { .default_missing_value("127.0.0.1:9229"), ) .arg( - arg!(--"inspect-wait"[HOST_AND_PORT]) + arg!(--"inspect-wait" [HOST_AND_PORT]) .help("Activate inspector on host:port and wait for debugger to connect before running user code") .num_args(0..=1) .value_parser(value_parser!(SocketAddr)) @@ -180,7 +178,7 @@ fn get_start_command() -> Command { ) .arg(arg!(--"static" ).help("Glob pattern for static files to be included")) .arg( - arg!(--"tcp-nodelay"[BOOL]) + arg!(--"tcp-nodelay" [BOOL]) .help("Disables Nagle's algorithm") .num_args(0..=1) .value_parser(BoolishValueParser::new()) From ce45522ec46df8acacaf67a645d85c9fb8cc2b1b Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Wed, 8 May 2024 21:22:37 +0000 Subject: [PATCH 12/16] stamp: update integration test macro (2) --- crates/base/src/macros/test_macros.rs | 50 +++++++++++++++++++-------- 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/crates/base/src/macros/test_macros.rs b/crates/base/src/macros/test_macros.rs index 300cf388c..d74c544aa 100644 --- a/crates/base/src/macros/test_macros.rs +++ b/crates/base/src/macros/test_macros.rs @@ -1,3 +1,33 @@ +#[macro_export] +macro_rules! integration_test_listen_fut { + ($port:expr, $tls:expr, $main_file:expr, $policy:expr, $import_map:expr, $flag:expr, $tx:expr, $token:expr) => {{ + use futures_util::FutureExt; + + let tls: Option = $tls.clone(); + + base::commands::start_server( + "0.0.0.0", + $port, + tls, + String::from($main_file), + None, + None, + $policy, + $import_map, + $flag, + Some($tx.clone()), + $crate::server::WorkerEntrypoints { + main: None, + events: None, + }, + $token.clone(), + vec![], + None, + ) + .boxed() + }}; +} + #[macro_export] macro_rules! integration_test_with_server_flag { ($flag:expr, $main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { @@ -17,26 +47,16 @@ macro_rules! integration_test_with_server_flag { }); let token = $crate::integration_test_with_server_flag!(@term $(, $($token)+)?); - let mut listen_fut = base::commands::start_server( - "0.0.0.0", + let mut listen_fut = $crate::integration_test_listen_fut!( $port, tls, - String::from($main_file), - None, - None, + $main_file, $policy, $import_map, $flag, - Some(tx.clone()), - $crate::server::WorkerEntrypoints { - main: None, - events: None, - }, - token.clone(), - vec![], - None - ) - .boxed(); + tx, + token + ); tokio::select! { resp = signal => { From 0f1fcc225f6af864a2211cb423bd3c5e7d93aff0 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 9 May 2024 02:21:24 +0000 Subject: [PATCH 13/16] stamp: downcast the downstream correctly --- crates/base/src/rt_worker/worker_ctx.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index 34cd3c137..bc58fa2fb 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -1,5 +1,6 @@ use crate::deno_runtime::DenoRuntime; use crate::inspector_server::Inspector; +use crate::timeout; use crate::utils::send_event_if_event_worker_available; use crate::utils::units::bytes_to_display; @@ -184,7 +185,7 @@ async fn relay_upgraded_request_and_response( match copy_bidirectional(&mut upstream, &mut downstream).await { Ok(_) => {} Err(err) if err.kind() == ErrorKind::UnexpectedEof => { - let Ok(_) = downstream.downcast::>() else { + let Ok(_) = downstream.downcast::>>() else { // TODO(Nyannyacha): It would be better if we send // `close_notify` before shutdown an upstream if downstream is a // TLS stream. From c7e7c35697ead4ed8ce504b4f237f7b67d50887c Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 9 May 2024 02:22:57 +0000 Subject: [PATCH 14/16] stamp: add tests for slowloris attack --- crates/base/tests/integration_tests.rs | 288 ++++++++++++++++++++++++- 1 file changed, 284 insertions(+), 4 deletions(-) diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 91c65621b..5acdb319b 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -1,18 +1,26 @@ #[path = "../src/utils/integration_test_helper.rs"] mod integration_test_helper; -use std::{borrow::Cow, collections::HashMap, path::Path, time::Duration}; +use std::{ + borrow::Cow, + collections::HashMap, + io::{self, Cursor}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::Path, + sync::Arc, + time::Duration, +}; use anyhow::Context; use async_tungstenite::WebSocketStream; use base::{ - integration_test, + integration_test, integration_test_listen_fut, rt_worker::worker_ctx::{create_user_worker_pool, create_worker, TerminationToken}, - server::{ServerEvent, Tls}, + server::{ServerEvent, ServerFlags, ServerHealth, Tls}, DecoratorType, }; use deno_core::serde_json; -use futures_util::{Future, SinkExt, StreamExt}; +use futures_util::{future::BoxFuture, Future, FutureExt, SinkExt, StreamExt}; use http::{Method, Request, StatusCode}; use http_utils::utils::get_upgrade_type; use hyper::{body::to_bytes, Body}; @@ -27,10 +35,16 @@ use sb_workers::context::{MainWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRu use serde::Deserialize; use serial_test::serial; use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, join, + net::TcpStream, sync::{mpsc, oneshot}, time::{sleep, timeout}, }; +use tokio_rustls::{ + rustls::{pki_types::ServerName, ClientConfig, RootCertStore}, + TlsConnector, +}; use tokio_util::compat::TokioAsyncReadCompatExt; use tungstenite::Message; use urlencoding::encode; @@ -1414,10 +1428,228 @@ async fn oak_with_jsr_specifier() { ); } +async fn test_slowloris(request_idle_timeout_ms: u64, maybe_tls: Option, test_fn: F) +where + F: (FnOnce(Box) -> R) + Send + 'static, + R: Future + Send, +{ + let token = TerminationToken::new(); + + let (health_tx, mut health_rx) = mpsc::channel(1); + let (tx, rx) = oneshot::channel(); + + let mut listen_fut = integration_test_listen_fut!( + NON_SECURE_PORT, + maybe_tls, + "./test_cases/main", + None, + None, + ServerFlags { + request_idle_timeout_ms: Some(request_idle_timeout_ms), + ..Default::default() + }, + health_tx, + Some(token.clone()) + ); + + let req_fut = { + let token = token.clone(); + async move { + assert_eq!(test_fn(maybe_tls.stream().await).await, true); + + if timeout(Duration::from_secs(10), token.cancel_and_wait()) + .await + .is_err() + { + panic!("failed to terminate server within 10 seconds"); + } + + tx.send(()).unwrap(); + } + }; + + let join_fut = tokio::spawn(async move { + while let Some(ServerHealth::Listening(..)) = health_rx.recv().await { + break; + } + + req_fut.await; + }); + + tokio::select! { + _ = join_fut => {} + _ = &mut listen_fut => {} + }; + + if timeout(Duration::from_secs(10), rx).await.is_err() { + panic!("failed to check within 10 seconds"); + } +} + +async fn test_slowloris_no_prompt_timeout(maybe_tls: Option, invert: bool) { + test_slowloris( + if invert { u64::MAX } else { 5000 }, + maybe_tls, + move |mut io| async move { + static HEADER: &[u8] = b"GET /oak-with-jsr HTTP/1.1\r\nHost: localhost\r\n\r\n"; + + let check_io_kind_fn = move |err: std::io::Error| { + if invert { + return true; + } + + match err.kind() { + io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof => { + return true; + } + + _ => return false, + } + }; + + // > 5000ms + sleep(Duration::from_secs(10)).await; + + if let Err(err) = io.write_all(HEADER).await { + return check_io_kind_fn(err); + } + + if let Err(err) = io.flush().await { + return check_io_kind_fn(err); + } + + let mut buf = vec![0; 1_048_576]; + + match io.read(&mut buf).await { + Ok(nread) => { + if invert { + nread > 0 + } else { + nread == 0 + } + } + + Err(err) => check_io_kind_fn(err), + } + }, + ) + .await; +} + +#[tokio::test] +#[serial] +async fn test_slowloris_no_prompt_timeout_non_secure() { + test_slowloris_no_prompt_timeout(new_localhost_tls(false), false).await; +} + +#[tokio::test] +#[serial] +#[ignore = "too slow"] +async fn test_slowloris_no_prompt_timeout_non_secure_inverted() { + test_slowloris_no_prompt_timeout(new_localhost_tls(false), true).await; +} + +#[tokio::test] +#[serial] +async fn test_slowloris_no_prompt_timeout_secure() { + test_slowloris_no_prompt_timeout(new_localhost_tls(true), false).await; +} + +#[tokio::test] +#[serial] +#[ignore = "too slow"] +async fn test_slowloris_no_prompt_timeout_secure_inverted() { + test_slowloris_no_prompt_timeout(new_localhost_tls(true), true).await; +} + +async fn test_slowloris_slow_header_timedout(maybe_tls: Option, invert: bool) { + test_slowloris( + if invert { u64::MAX } else { 5000 }, + maybe_tls, + move |mut io| async move { + static HEADER: &[u8] = b"GET /oak-with-jsr HTTP/1.1\r\nHost: localhost\r\n\r\n"; + + let check_io_kind_fn = move |err: std::io::Error| { + if invert { + return true; + } + + match err.kind() { + io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof => { + return true; + } + + _ => return false, + } + }; + + // takes 1000ms per each character (ie. > 5000ms) + for &b in HEADER { + if let Err(err) = io.write(&[b]).await { + return check_io_kind_fn(err); + } + + if let Err(err) = io.flush().await { + return check_io_kind_fn(err); + } + + sleep(Duration::from_secs(1)).await; + } + + let mut buf = vec![0; 1_048_576]; + + match io.read(&mut buf).await { + Ok(nread) => { + if invert { + nread > 0 + } else { + nread == 0 + } + } + + Err(err) => check_io_kind_fn(err), + } + }, + ) + .await; +} + +#[tokio::test] +#[serial] +async fn test_slowloris_slow_header_timedout_non_secure() { + test_slowloris_slow_header_timedout(new_localhost_tls(false), false).await; +} + +#[tokio::test] +#[serial] +#[ignore = "too slow 2x"] +async fn test_slowloris_slow_header_timedout_non_secure_inverted() { + test_slowloris_slow_header_timedout(new_localhost_tls(false), true).await; +} + +#[tokio::test] +#[serial] +async fn test_slowloris_slow_header_timedout_secure() { + test_slowloris_slow_header_timedout(new_localhost_tls(true), false).await; +} + +#[tokio::test] +#[serial] +#[ignore = "too slow 2x"] +async fn test_slowloris_slow_header_timedout_secure_inverted() { + test_slowloris_slow_header_timedout(new_localhost_tls(true), true).await; +} + +trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + Unpin {} + +impl AsyncReadWrite for T where T: AsyncRead + AsyncWrite + Send + Unpin {} + trait TlsExt { fn client(&self) -> Client; fn schema(&self) -> &'static str; + fn sock_addr(&self) -> SocketAddr; fn port(&self) -> u16; + fn stream(&self) -> BoxFuture<'static, Box>; } impl TlsExt for Option { @@ -1440,6 +1672,20 @@ impl TlsExt for Option { } } + fn sock_addr(&self) -> SocketAddr { + const SOCK_ADDR_SECURE: SocketAddr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), SECURE_PORT); + + const SOCK_ADDR_NON_SECURE: SocketAddr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), NON_SECURE_PORT); + + if self.is_some() { + SOCK_ADDR_SECURE + } else { + SOCK_ADDR_NON_SECURE + } + } + fn port(&self) -> u16 { if self.is_some() { SECURE_PORT @@ -1447,6 +1693,40 @@ impl TlsExt for Option { NON_SECURE_PORT } } + + fn stream(&self) -> BoxFuture<'static, Box> { + let use_tls = self.is_some(); + let sock_addr = self.sock_addr(); + + async move { + if use_tls { + let mut cursor = Cursor::new(Vec::from(TLS_LOCALHOST_ROOT_CA)); + let certs = rustls_pemfile::certs(&mut cursor) + .collect::, _>>() + .unwrap(); + + let mut root_cert_store = RootCertStore::empty(); + let _ = root_cert_store.add_parsable_certificates(certs); + + let config = ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth(); + + let connector = TlsConnector::from(Arc::new(config)); + let dnsname = ServerName::try_from("localhost").unwrap(); + + let stream = TcpStream::connect(sock_addr).await.unwrap(); + let stream = connector.connect(dnsname, stream).await.unwrap(); + + Box::new(stream) as Box + } else { + let stream = TcpStream::connect(sock_addr).await.unwrap(); + + Box::new(stream) as Box + } + } + .boxed() + } } fn new_localhost_tls(secure: bool) -> Option { From e2ba7da720f866bd2352ba2f52f4716fb30cc1ff Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Thu, 9 May 2024 02:32:41 +0000 Subject: [PATCH 15/16] stamp: makes clippy happy --- crates/base/tests/integration_tests.rs | 30 +++++++++++--------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 5acdb319b..3b45c913b 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -1455,7 +1455,7 @@ where let req_fut = { let token = token.clone(); async move { - assert_eq!(test_fn(maybe_tls.stream().await).await, true); + assert!(test_fn(maybe_tls.stream().await).await); if timeout(Duration::from_secs(10), token.cancel_and_wait()) .await @@ -1469,8 +1469,10 @@ where }; let join_fut = tokio::spawn(async move { - while let Some(ServerHealth::Listening(..)) = health_rx.recv().await { - break; + loop { + if let Some(ServerHealth::Listening(..)) = health_rx.recv().await { + break; + } } req_fut.await; @@ -1498,13 +1500,10 @@ async fn test_slowloris_no_prompt_timeout(maybe_tls: Option, invert: bool) return true; } - match err.kind() { - io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof => { - return true; - } - - _ => return false, - } + matches!( + err.kind(), + io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof + ) }; // > 5000ms @@ -1574,13 +1573,10 @@ async fn test_slowloris_slow_header_timedout(maybe_tls: Option, invert: boo return true; } - match err.kind() { - io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof => { - return true; - } - - _ => return false, - } + matches!( + err.kind(), + io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof + ) }; // takes 1000ms per each character (ie. > 5000ms) From ad65a0da1cc64e98c98b79bcaf2e8c683dc7b261 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Fri, 10 May 2024 05:40:34 +0000 Subject: [PATCH 16/16] stamp: polishing --- crates/base/src/server.rs | 23 ++++++--------- crates/base/src/timeout.rs | 40 +++++++++++++++----------- crates/base/tests/integration_tests.rs | 4 +-- crates/cli/src/flags.rs | 4 +-- crates/cli/src/main.rs | 6 ++-- 5 files changed, 40 insertions(+), 37 deletions(-) diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 615d6468b..885e6ff57 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -244,7 +244,7 @@ pub struct ServerFlags { pub tcp_nodelay: bool, pub graceful_exit_deadline_sec: u64, pub graceful_exit_keepalive_deadline_ms: Option, - pub request_idle_timeout_ms: Option, + pub request_read_timeout_ms: Option, } #[derive(Debug)] @@ -468,13 +468,13 @@ impl Server { let ServerFlags { tcp_nodelay, - request_idle_timeout_ms, + request_read_timeout_ms, mut graceful_exit_deadline_sec, mut graceful_exit_keepalive_deadline_ms, .. } = flags; - let request_idle_timeout_dur = request_idle_timeout_ms.map(Duration::from_millis); + let request_read_timeout_dur = request_read_timeout_ms.map(Duration::from_millis); let mut terminate_signal_fut = get_termination_signal(); loop { @@ -496,7 +496,7 @@ impl Server { event_tx, metric_src, graceful_exit_token.clone(), - request_idle_timeout_dur + request_read_timeout_dur ) } Err(e) => error!("socket error: {}", e) @@ -523,7 +523,7 @@ impl Server { event_tx, metric_src, graceful_exit_token.clone(), - request_idle_timeout_dur + request_read_timeout_dur ) } Err(e) => error!("socket error: {}", e) @@ -692,7 +692,7 @@ fn accept_stream( event_tx: Option>, metric_src: SharedMetricSource, graceful_exit_token: CancellationToken, - maybe_req_idle_timeout_dur: Option, + maybe_req_read_timeout_dur: Option, ) where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -700,15 +700,10 @@ fn accept_stream( tokio::task::spawn({ async move { let (service, cancel) = WorkerService::new(metric_src.clone(), req_tx); - let (io, maybe_timeout_tx) = if let Some(timeout_dur) = maybe_req_idle_timeout_dur { - let (timeout_tx, timeout_rx) = mpsc::unbounded_channel(); - - ( - crate::timeout::Stream::with_timeout(io, timeout_dur, timeout_rx), - Some(timeout_tx), - ) + let (io, maybe_timeout_tx) = if let Some(timeout_dur) = maybe_req_read_timeout_dur { + crate::timeout::Stream::with_timeout(io, timeout_dur) } else { - (crate::timeout::Stream::with_bypass(io), None) + crate::timeout::Stream::with_bypass(io) }; let _guard = cancel.drop_guard(); diff --git a/crates/base/src/timeout.rs b/crates/base/src/timeout.rs index 571348190..4fac82bff 100644 --- a/crates/base/src/timeout.rs +++ b/crates/base/src/timeout.rs @@ -11,7 +11,7 @@ use futures_util::Future; use pin_project::pin_project; use tokio::{ io::{AsyncRead, AsyncWrite}, - sync::mpsc::{UnboundedReceiver, UnboundedSender}, + sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, time::{sleep, Instant, Sleep}, }; @@ -26,7 +26,7 @@ enum StreamKind { duration: Duration, waiting: bool, finished: bool, - state: UnboundedReceiver, + rx: UnboundedReceiver, }, Bypass, @@ -42,21 +42,29 @@ impl Stream { Self { inner, kind } } - pub(super) fn with_timeout(inner: S, duration: Duration, rx: UnboundedReceiver) -> Self { - Self::new( - inner, - StreamKind::UseTimeout { - sleep: Box::pin(sleep(duration)), - duration, - waiting: false, - finished: false, - state: rx, - }, + pub(super) fn with_timeout( + inner: S, + duration: Duration, + ) -> (Self, Option>) { + let (tx, rx) = mpsc::unbounded_channel(); + + ( + Self::new( + inner, + StreamKind::UseTimeout { + sleep: Box::pin(sleep(duration)), + duration, + waiting: false, + finished: false, + rx, + }, + ), + Some(tx), ) } - pub(super) fn with_bypass(inner: S) -> Self { - Self::new(inner, StreamKind::Bypass) + pub(super) fn with_bypass(inner: S) -> (Self, Option>) { + (Self::new(inner, StreamKind::Bypass), None) } } @@ -72,10 +80,10 @@ impl AsyncRead for Stream { duration, waiting, finished, - state, + rx, } => { if !*finished { - match Pin::new(state).poll_recv(cx) { + match Pin::new(rx).poll_recv(cx) { Poll::Ready(Some(State::Reset)) => { *waiting = false; diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 3b45c913b..e24f82fbb 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -1428,7 +1428,7 @@ async fn oak_with_jsr_specifier() { ); } -async fn test_slowloris(request_idle_timeout_ms: u64, maybe_tls: Option, test_fn: F) +async fn test_slowloris(request_read_timeout_ms: u64, maybe_tls: Option, test_fn: F) where F: (FnOnce(Box) -> R) + Send + 'static, R: Future + Send, @@ -1445,7 +1445,7 @@ where None, None, ServerFlags { - request_idle_timeout_ms: Some(request_idle_timeout_ms), + request_read_timeout_ms: Some(request_read_timeout_ms), ..Default::default() }, health_tx, diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs index 81581e55d..298bad2b2 100644 --- a/crates/cli/src/flags.rs +++ b/crates/cli/src/flags.rs @@ -141,8 +141,8 @@ fn get_start_command() -> Command { .value_parser(value_parser!(u64)), ) .arg( - arg!(--"request-idle-timeout" ) - .help("Maximum time that can be waited from when the connection is accepted until the request body is fully read") + arg!(--"request-read-timeout" ) + .help("Maximum time in milliseconds that can be waited from when the connection is accepted until the request body is fully read") .value_parser(value_parser!(u64)), ) .arg( diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index e31ed8493..e44687577 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -122,8 +122,8 @@ fn main() -> Result<(), anyhow::Error> { sub_matches.get_one::("max-parallelism").cloned(); let maybe_request_wait_timeout = sub_matches.get_one::("request-wait-timeout").cloned(); - let maybe_request_idle_timeout = - sub_matches.get_one::("request-idle-timeout").cloned(); + let maybe_request_read_timeout = + sub_matches.get_one::("request-read-timeout").cloned(); let static_patterns = if let Some(val_ref) = sub_matches.get_many::("static") { val_ref.map(|s| s.as_str()).collect::>() @@ -196,7 +196,7 @@ fn main() -> Result<(), anyhow::Error> { tcp_nodelay, graceful_exit_deadline_sec, graceful_exit_keepalive_deadline_ms, - request_idle_timeout_ms: maybe_request_idle_timeout, + request_read_timeout_ms: maybe_request_read_timeout, }, None, WorkerEntrypoints {