diff --git a/Makefile b/Makefile index 2830e62..ed54621 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,11 @@ build: .PHONY: fmt fmt: - @rustfmt --config-path ./.rustfmt.toml --check ./crates/rproxy/**/* + @cargo +nightly fmt --check + +.PHONY: lint +lint: + @cargo +nightly clippy --all-features -- -D warnings .PHONY: help help: diff --git a/crates/rproxy/src/circuit_breaker/circuit_breaker.rs b/crates/rproxy/src/circuit_breaker.rs similarity index 100% rename from crates/rproxy/src/circuit_breaker/circuit_breaker.rs rename to crates/rproxy/src/circuit_breaker.rs diff --git a/crates/rproxy/src/circuit_breaker/mod.rs b/crates/rproxy/src/circuit_breaker/mod.rs deleted file mode 100644 index 9a13f67..0000000 --- a/crates/rproxy/src/circuit_breaker/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod circuit_breaker; -pub(crate) use circuit_breaker::CircuitBreaker; diff --git a/crates/rproxy/src/config/config.rs b/crates/rproxy/src/config.rs similarity index 77% rename from crates/rproxy/src/config/config.rs rename to crates/rproxy/src/config.rs index e715e73..8c67aac 100644 --- a/crates/rproxy/src/config/config.rs +++ b/crates/rproxy/src/config.rs @@ -1,25 +1,34 @@ +mod config_authrpc; +pub(crate) use config_authrpc::*; + +mod config_circuit_breaker; +pub(crate) use config_circuit_breaker::*; + +mod config_flashblocks; +pub(crate) use config_flashblocks::*; + +mod config_logging; +pub(crate) use config_logging::*; + +mod config_metrics; +pub(crate) use config_metrics::*; + +mod config_proxy_http; +pub(crate) use config_proxy_http::*; + +mod config_proxy_ws; +pub(crate) use config_proxy_ws::*; + +mod config_rpc; +pub(crate) use config_rpc::*; + +mod config_tls; use std::{process, sync::LazyLock}; use clap::Parser; +pub(crate) use config_tls::*; use thiserror::Error; -use crate::config::{ - ConfigAuthrpc, - ConfigAuthrpcError, - ConfigCircuitBreaker, - ConfigCircuitBreakerError, - ConfigFlashblocks, - ConfigFlashblocksError, - ConfigLogError, - ConfigLogging, - ConfigMetrics, - ConfigMetricsError, - ConfigRpc, - ConfigRpcError, - ConfigTls, - ConfigTlsError, -}; - pub(crate) const ALREADY_VALIDATED: &str = "parameter must have been validated already"; pub(crate) static PARALLELISM: LazyLock = @@ -76,10 +85,10 @@ impl Config { let mut errs: Vec = vec![]; // authrpc proxy - if self.rpc.enabled { - if let Some(_errs) = self.authrpc.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } + if self.rpc.enabled && + let Some(_errs) = self.authrpc.validate() + { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); } // circuit-breaker @@ -88,10 +97,10 @@ impl Config { } // flashblocks proxy - if self.flashblocks.enabled { - if let Some(_errs) = self.flashblocks.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } + if self.flashblocks.enabled && + let Some(_errs) = self.flashblocks.validate() + { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); } // logging @@ -105,17 +114,17 @@ impl Config { } // rpc proxy - if self.rpc.enabled { - if let Some(_errs) = self.rpc.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } + if self.rpc.enabled && + let Some(_errs) = self.rpc.validate() + { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); } // tls - if self.tls.certificate != "" || self.tls.key != "" { - if let Some(_errs) = self.tls.validate() { - errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); - } + if (!self.tls.certificate.is_empty() || !self.tls.key.is_empty()) && + let Some(_errs) = self.tls.validate() + { + errs.append(&mut _errs.into_iter().map(|err| err.into()).collect()); } if !self.authrpc.enabled && !self.flashblocks.enabled && !self.rpc.enabled { diff --git a/crates/rproxy/src/config/config_authrpc.rs b/crates/rproxy/src/config/config_authrpc.rs index e50e7c0..a574cf7 100644 --- a/crates/rproxy/src/config/config_authrpc.rs +++ b/crates/rproxy/src/config/config_authrpc.rs @@ -169,7 +169,7 @@ impl ConfigAuthrpc { // backend_url match Url::parse(&self.backend_url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigAuthrpcError::BackendUrlMissesHost { url: self.backend_url.clone(), }); @@ -194,9 +194,9 @@ impl ConfigAuthrpc { // mirroring_peer_urls for peer_url in self.mirroring_peer_urls.iter() { - match Url::parse(&peer_url) { + match Url::parse(peer_url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigAuthrpcError::PeerUrlMissesHost { url: peer_url.clone() }); } } @@ -235,7 +235,7 @@ impl ConfigAuthrpc { let local_ips = get_all_local_ip_addresses(); self.mirroring_peer_urls.retain(|url| { - let peer_url = Url::parse(&url).expect(ALREADY_VALIDATED); + let peer_url = Url::parse(url).expect(ALREADY_VALIDATED); let peer_host = peer_url.host_str().expect(ALREADY_VALIDATED); if !peer_url.port().eq(&backend_url.port()) { diff --git a/crates/rproxy/src/config/config_circuit_breaker.rs b/crates/rproxy/src/config/config_circuit_breaker.rs index 44127e1..c8d782a 100644 --- a/crates/rproxy/src/config/config_circuit_breaker.rs +++ b/crates/rproxy/src/config/config_circuit_breaker.rs @@ -90,10 +90,10 @@ impl ConfigCircuitBreaker { } // url - if self.url != "" { + if !self.url.is_empty() { match Url::parse(&self.url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigCircuitBreakerError::UrlMissesHost { url: self.url.clone(), }); diff --git a/crates/rproxy/src/config/config_flashblocks.rs b/crates/rproxy/src/config/config_flashblocks.rs index 929e94f..c693523 100644 --- a/crates/rproxy/src/config/config_flashblocks.rs +++ b/crates/rproxy/src/config/config_flashblocks.rs @@ -98,13 +98,13 @@ impl ConfigFlashblocks { // backend_url match self.backend_url.parse::() { Ok(uri) => { - if let None = uri.authority() { + if uri.authority().is_none() { errs.push(ConfigFlashblocksError::BackendUrlMissesHost { url: self.backend_url.clone(), }); } - if let None = uri.host() { + if uri.host().is_none() { errs.push(ConfigFlashblocksError::BackendUrlMissesHost { url: self.backend_url.clone(), }); diff --git a/crates/rproxy/src/config/config_logging.rs b/crates/rproxy/src/config/config_logging.rs index a46a265..055e6aa 100644 --- a/crates/rproxy/src/config/config_logging.rs +++ b/crates/rproxy/src/config/config_logging.rs @@ -45,7 +45,7 @@ impl ConfigLogging { pub(crate) fn setup_logging(&self) { match self.format { - ConfigLogFormat::JSON => { + ConfigLogFormat::Json => { tracing_subscriber::registry() .with(EnvFilter::from(self.level.clone())) .with(fmt::layer().json().flatten_event(true)) @@ -66,7 +66,7 @@ impl ConfigLogging { #[derive(Clone, Debug, clap::ValueEnum)] pub(crate) enum ConfigLogFormat { - JSON, + Json, Text, } diff --git a/crates/rproxy/src/config/config_rpc.rs b/crates/rproxy/src/config/config_rpc.rs index 3f5d4f6..5b1b578 100644 --- a/crates/rproxy/src/config/config_rpc.rs +++ b/crates/rproxy/src/config/config_rpc.rs @@ -183,7 +183,7 @@ impl ConfigRpc { // backend_url match Url::parse(&self.backend_url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigRpcError::BackendUrlMissesHost { url: self.backend_url.clone(), }); @@ -205,9 +205,9 @@ impl ConfigRpc { // mirroring_peer_urls for peer_url in self.mirroring_peer_urls.iter() { - match Url::parse(&peer_url) { + match Url::parse(peer_url) { Ok(url) => { - if let None = url.host() { + if url.host().is_none() { errs.push(ConfigRpcError::PeerUrlMissesHost { url: peer_url.clone() }); } } @@ -246,7 +246,7 @@ impl ConfigRpc { let local_ips = get_all_local_ip_addresses(); self.mirroring_peer_urls.retain(|url| { - let peer_url = Url::parse(&url).expect(ALREADY_VALIDATED); + let peer_url = Url::parse(url).expect(ALREADY_VALIDATED); let peer_host = peer_url.host_str().expect(ALREADY_VALIDATED); if !peer_url.port().eq(&backend_url.port()) { diff --git a/crates/rproxy/src/config/config_tls.rs b/crates/rproxy/src/config/config_tls.rs index fdccd27..4aadf0c 100644 --- a/crates/rproxy/src/config/config_tls.rs +++ b/crates/rproxy/src/config/config_tls.rs @@ -53,11 +53,11 @@ impl ConfigTls { // certificate { - if self.certificate == "" && self.key != "" { + if self.certificate.is_empty() && !self.key.is_empty() { errs.push(ConfigTlsError::MissingCertificate); } - if self.certificate != "" { + if !self.certificate.is_empty() { match File::open(self.certificate.clone()) { Err(err) => { errs.push(ConfigTlsError::InvalidCertificateFile { @@ -104,11 +104,11 @@ impl ConfigTls { // key { - if self.certificate != "" && self.key == "" { + if !self.certificate.is_empty() && self.key.is_empty() { errs.push(ConfigTlsError::MissingKey); } - if self.key != "" { + if !self.key.is_empty() { match File::open(self.key.clone()) { Err(err) => { errs.push(ConfigTlsError::InvalidKeyFile { @@ -155,20 +155,15 @@ impl ConfigTls { // certificate + key { - match (cert, key) { - (Some(cert), Some(key)) => { - if let Err(err) = - ServerConfig::builder().with_no_client_auth().with_single_cert(cert, key) - { - errs.push(ConfigTlsError::InvalidPair { - path_cert: self.certificate.clone(), - path_key: self.key.clone(), - err: err.to_string(), - }); - } - } - - (_, _) => {} + if let (Some(cert), Some(key)) = (cert, key) && + let Err(err) = + ServerConfig::builder().with_no_client_auth().with_single_cert(cert, key) + { + errs.push(ConfigTlsError::InvalidPair { + path_cert: self.certificate.clone(), + path_key: self.key.clone(), + err: err.to_string(), + }); } } @@ -179,7 +174,7 @@ impl ConfigTls { } pub(crate) fn enabled(&self) -> bool { - self.certificate != "" && self.key != "" + !self.certificate.is_empty() && !self.key.is_empty() } pub(crate) fn key(&self) -> &PrivateKeyDer<'static> { diff --git a/crates/rproxy/src/config/mod.rs b/crates/rproxy/src/config/mod.rs deleted file mode 100644 index 4202988..0000000 --- a/crates/rproxy/src/config/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -mod config_authrpc; -pub(crate) use config_authrpc::*; - -mod config_circuit_breaker; -pub(crate) use config_circuit_breaker::*; - -mod config_flashblocks; -pub(crate) use config_flashblocks::*; - -mod config_logging; -pub(crate) use config_logging::*; - -mod config_metrics; -pub(crate) use config_metrics::*; - -mod config_proxy_http; -pub(crate) use config_proxy_http::*; - -mod config_proxy_ws; -pub(crate) use config_proxy_ws::*; - -mod config_rpc; -pub(crate) use config_rpc::*; - -mod config_tls; -pub(crate) use config_tls::*; - -mod config; -pub use config::Config; -pub(crate) use config::*; diff --git a/crates/rproxy/src/jrpc/jrpc.rs b/crates/rproxy/src/jrpc.rs similarity index 100% rename from crates/rproxy/src/jrpc/jrpc.rs rename to crates/rproxy/src/jrpc.rs diff --git a/crates/rproxy/src/jrpc/mod.rs b/crates/rproxy/src/jrpc/mod.rs deleted file mode 100644 index 5742f20..0000000 --- a/crates/rproxy/src/jrpc/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod jrpc; -pub(crate) use jrpc::*; diff --git a/crates/rproxy/src/metrics/metrics.rs b/crates/rproxy/src/metrics.rs similarity index 97% rename from crates/rproxy/src/metrics/metrics.rs rename to crates/rproxy/src/metrics.rs index 223f28b..e4f30c3 100644 --- a/crates/rproxy/src/metrics/metrics.rs +++ b/crates/rproxy/src/metrics.rs @@ -1,3 +1,7 @@ +mod metrics_candlestick; +pub(crate) use metrics_candlestick::Candlestick; + +mod metrics_labels; use std::{net::TcpListener, sync::Arc, time::Duration}; use actix_web::{ @@ -9,6 +13,7 @@ use actix_web::{ web, }; use awc::http::Method; +pub(crate) use metrics_labels::*; use prometheus_client::{ metrics::{counter::Counter, family::Family, gauge::Gauge}, registry::{Registry, Unit}, @@ -16,16 +21,7 @@ use prometheus_client::{ use socket2::{SockAddr, Socket, TcpKeepalive}; use tracing::{error, info}; -use crate::{ - config::ConfigMetrics, - metrics::{ - Candlestick, - LabelsProxy, - LabelsProxyClientInfo, - LabelsProxyHttpJrpc, - LabelsProxyWs, - }, -}; +use crate::config::ConfigMetrics; // Metrics ------------------------------------------------------------- @@ -269,7 +265,7 @@ impl Metrics { self: Arc, canceller: tokio_util::sync::CancellationToken, ) -> Result<(), Box> { - let listen_address = self.config.listen_address().clone(); + let listen_address = self.config.listen_address(); let listener = match self.listen() { Ok(listener) => listener, diff --git a/crates/rproxy/src/metrics/mod.rs b/crates/rproxy/src/metrics/mod.rs deleted file mode 100644 index 3cf2ed5..0000000 --- a/crates/rproxy/src/metrics/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -mod metrics_candlestick; -pub(crate) use metrics_candlestick::Candlestick; - -mod metrics_labels; -pub(crate) use metrics_labels::*; - -mod metrics; -pub(crate) use metrics::Metrics; diff --git a/crates/rproxy/src/proxy/proxy.rs b/crates/rproxy/src/proxy.rs similarity index 98% rename from crates/rproxy/src/proxy/proxy.rs rename to crates/rproxy/src/proxy.rs index b4283cb..e54d90f 100644 --- a/crates/rproxy/src/proxy/proxy.rs +++ b/crates/rproxy/src/proxy.rs @@ -122,7 +122,7 @@ impl Drop for ProxyConnectionGuard { fn drop(&mut self) { let val = self.client_connections_count.fetch_sub(1, Ordering::Relaxed) - 1; - let metric_labels = LabelsProxy { proxy: &self.proxy_name }; + let metric_labels = LabelsProxy { proxy: self.proxy_name }; self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc(); diff --git a/crates/rproxy/src/proxy/mod.rs b/crates/rproxy/src/proxy/mod.rs deleted file mode 100644 index c677ef9..0000000 --- a/crates/rproxy/src/proxy/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod proxy; -pub(crate) use proxy::{Proxy, ProxyConnectionGuard, ProxyInner}; diff --git a/crates/rproxy/src/proxy_http/proxy_http.rs b/crates/rproxy/src/proxy_http.rs similarity index 93% rename from crates/rproxy/src/proxy_http/proxy_http.rs rename to crates/rproxy/src/proxy_http.rs index 87871b0..ee8249e 100644 --- a/crates/rproxy/src/proxy_http/proxy_http.rs +++ b/crates/rproxy/src/proxy_http.rs @@ -1,3 +1,10 @@ +mod proxy_http_inner_authrpc; +pub(crate) use proxy_http_inner_authrpc::ProxyHttpInnerAuthrpc; + +mod proxy_http_inner_rpc; +pub(crate) use proxy_http_inner_rpc::ProxyHttpInnerRpc; + +mod proxy_http_inner; use std::{ borrow::Cow, fmt::Debug, @@ -38,6 +45,7 @@ use bytes::Bytes; use futures::TryStreamExt; use futures_core::Stream; use pin_project::pin_project; +pub(crate) use proxy_http_inner::ProxyHttpInner; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::sync::broadcast; @@ -51,7 +59,6 @@ use crate::{ jrpc::JrpcRequestMetaMaybeBatch, metrics::{LabelsProxy, LabelsProxyClientInfo, LabelsProxyHttpJrpc, Metrics}, proxy::{Proxy, ProxyConnectionGuard}, - proxy_http::ProxyHttpInner, utils::{Loggable, decompress, is_hop_by_hop_header, raw_transaction_to_hash}, }; @@ -88,7 +95,7 @@ where let backend = ProxyHttpBackendEndpoint::new( inner.clone(), - id.clone(), + id, shared.metrics.clone(), config.backend_url(), connections_limit, @@ -102,7 +109,7 @@ where .map(|peer_url| { ProxyHttpBackendEndpoint::new( shared.inner(), - id.clone(), + id, shared.metrics.clone(), peer_url.to_owned(), config.backend_max_concurrent_requests(), @@ -114,7 +121,7 @@ where ); let postprocessor = ProxyHttpPostprocessor:: { - worker_id: id.clone(), + worker_id: id, inner: inner.clone(), metrics: shared.metrics.clone(), mirroring_peers: peers.clone(), @@ -132,7 +139,7 @@ where canceller: tokio_util::sync::CancellationToken, resetter: broadcast::Sender<()>, ) -> Result<(), Box> { - let listen_address = config.listen_address().clone(); + let listen_address = config.listen_address(); let listener = match Self::listen(&config) { Ok(listener) => listener, @@ -215,7 +222,7 @@ where let handler = server.handle(); let mut resetter = resetter.subscribe(); tokio::spawn(async move { - if let Ok(_) = resetter.recv().await { + if resetter.recv().await.is_ok() { info!(proxy = P::name(), "Reset signal received, stopping http-proxy..."); handler.stop(true).await; } @@ -306,8 +313,8 @@ where let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::()); - let id = info.id.clone(); - let connection_id = info.connection_id.clone(); + let id = info.id; + let connection_id = info.connection_id; let bck_req = this.backend.new_backend_request(&info); let bck_req_body = ProxyHttpRequestBody::new(this.clone(), info, cli_req_body, timestamp); @@ -350,10 +357,10 @@ where } fn postprocess_client_request(&self, req: ProxiedHttpRequest) { - let id = req.info.id.clone(); - let connection_id = req.info.connection_id.clone(); + let id = req.info.id; + let connection_id = req.info.connection_id; - if let Err(_) = self.requests.insert_sync(id, req) { + if self.requests.insert_sync(id, req).is_err() { error!( proxy = P::name(), request_id = %id, @@ -580,16 +587,14 @@ where None => return, }; - let method = match match message.get_key_value("method") { + let method = (match message.get_key_value("method") { Some((_, method)) => method.as_str(), None => None, - } { - Some(method) => method, - None => "", - } + }) + .unwrap_or_default() .to_owned(); - if method != "" { + if !method.is_empty() { // single-shot request let params = match match message.get_mut("params") { @@ -629,7 +634,7 @@ where } "engine_newPayloadV4" => { - if params.len() < 1 { + if params.is_empty() { return; } @@ -654,7 +659,7 @@ where } "eth_sendBundle" => { - if params.len() < 1 { + if params.is_empty() { return; } @@ -698,15 +703,14 @@ where None => return, }; - if let Some(execution_payload) = result.get_mut("executionPayload") { - if let Some(transactions) = execution_payload.get_mut("transactions") { - if let Some(transactions) = transactions.as_array_mut() { - // engine_getPayloadV4 + if let Some(execution_payload) = result.get_mut("executionPayload") && + let Some(transactions) = execution_payload.get_mut("transactions") && + let Some(transactions) = transactions.as_array_mut() + { + // engine_getPayloadV4 - for transaction in transactions { - raw_transaction_to_hash(transaction); - } - } + for transaction in transactions { + raw_transaction_to_hash(transaction); } } } @@ -890,7 +894,7 @@ where fn handle(&mut self, msg: ProxiedHttpCombo, ctx: &mut Self::Context) -> Self::Result { let inner = self.inner.clone(); let metrics = self.metrics.clone(); - let worker_id = self.worker_id.clone(); + let worker_id = self.worker_id; let mirroring_peers = self.mirroring_peers.clone(); let mut mirroring_peer_round_robin_index = self.mirroring_peer_round_robin_index.load(Ordering::Relaxed); @@ -1000,7 +1004,7 @@ where let start = UtcDateTime::now(); let inner = self.inner.clone(); - let worker_id = self.worker_id.clone(); + let worker_id = self.worker_id; let metrics = self.metrics.clone(); let mrr_req = self.new_backend_request(&cli_req.info); @@ -1100,11 +1104,11 @@ impl ProxyHttpRequestInfo { // append remote ip to x-forwarded-for if let Some(peer_addr) = req.connection_info().peer_addr() { let mut forwarded_for = String::new(); - if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) { - if let Ok(ff) = ff.to_str() { - forwarded_for.push_str(ff); - forwarded_for.push_str(", "); - } + if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) && + let Ok(ff) = ff.to_str() + { + forwarded_for.push_str(ff); + forwarded_for.push_str(", "); } forwarded_for.push_str(peer_addr); if let Ok(forwarded_for) = HeaderValue::from_str(&forwarded_for) { @@ -1113,21 +1117,19 @@ impl ProxyHttpRequestInfo { } // set x-forwarded-proto if it's not already set - if req.connection_info().scheme() != "" { - if None == req.headers().get(header::X_FORWARDED_PROTO) { - if let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) { - headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); - } - } + if req.connection_info().scheme() != "" && + req.headers().get(header::X_FORWARDED_PROTO).is_none() && + let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) + { + headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); } // set x-forwarded-host if it's not already set - if req.connection_info().scheme() != "" { - if None == req.headers().get(header::X_FORWARDED_HOST) { - if let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().scheme()) { - headers.insert(header::X_FORWARDED_HOST, forwarded_host); - } - } + if req.connection_info().scheme() != "" && + req.headers().get(header::X_FORWARDED_HOST).is_none() && + let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().scheme()) + { + headers.insert(header::X_FORWARDED_HOST, forwarded_host); } // remote address from the guard has port, and connection info has ip @@ -1165,12 +1167,12 @@ impl ProxyHttpRequestInfo { #[inline] pub(crate) fn id(&self) -> Uuid { - self.id.clone() + self.id } #[inline] pub(crate) fn connection_id(&self) -> Uuid { - self.connection_id.clone() + self.connection_id } #[inline] @@ -1209,7 +1211,7 @@ impl ProxyHttpResponseInfo { #[inline] pub(crate) fn id(&self) -> Uuid { - self.id.clone() + self.id } fn content_encoding(&self) -> String { @@ -1233,7 +1235,7 @@ where info: Option, start: UtcDateTime, - body: Box>, + body: Vec, #[pin] stream: S, @@ -1255,7 +1257,7 @@ where info: Some(info), stream: body, start: timestamp, - body: Box::new(Vec::new()), // TODO: preallocate reasonable size + body: Vec::new(), // TODO: preallocate reasonable size } } } @@ -1306,12 +1308,7 @@ where if let Some(info) = mem::take(this.info) { let proxy = this.proxy.clone(); - let req = ProxiedHttpRequest::new( - info, - mem::take(this.body), - this.start.clone(), - end, - ); + let req = ProxiedHttpRequest::new(info, mem::take(this.body), *this.start, end); proxy.postprocess_client_request(req); } @@ -1334,7 +1331,7 @@ where info: Option, start: UtcDateTime, - body: Box>, + body: Vec, #[pin] stream: S, @@ -1357,7 +1354,7 @@ where proxy, stream: body, start: timestamp, - body: Box::new(Vec::new()), // TODO: preallocate reasonable size + body: Vec::new(), // TODO: preallocate reasonable size info: Some(ProxyHttpResponseInfo::new(id, status, headers)), } } @@ -1408,12 +1405,8 @@ where if let Some(info) = mem::take(this.info) { let proxy = this.proxy.clone(); - let res = ProxiedHttpResponse::new( - info, - mem::take(this.body), - this.start.clone(), - end, - ); + let res = + ProxiedHttpResponse::new(info, mem::take(this.body), *this.start, end); proxy.postprocess_backend_response(res); } @@ -1441,14 +1434,14 @@ pub(crate) struct ProxiedHttpRequest { impl ProxiedHttpRequest { pub(crate) fn new( info: ProxyHttpRequestInfo, - body: Box>, + body: Vec, start: UtcDateTime, end: UtcDateTime, ) -> Self { let size = body.len(); Self { info, - body: Bytes::from(*body), + body: Bytes::from(body), size, decompressed_body: Bytes::new(), decompressed_size: 0, @@ -1490,14 +1483,14 @@ pub(crate) struct ProxiedHttpResponse { impl ProxiedHttpResponse { pub(crate) fn new( info: ProxyHttpResponseInfo, - body: Box>, + body: Vec, start: UtcDateTime, end: UtcDateTime, ) -> Self { let size = body.len(); Self { info, - body: Bytes::from(*body), + body: Bytes::from(body), size, decompressed_body: Bytes::new(), decompressed_size: 0, diff --git a/crates/rproxy/src/proxy_http/mod.rs b/crates/rproxy/src/proxy_http/mod.rs deleted file mode 100644 index 93d77b6..0000000 --- a/crates/rproxy/src/proxy_http/mod.rs +++ /dev/null @@ -1,16 +0,0 @@ -mod proxy_http_inner_authrpc; -pub(crate) use proxy_http_inner_authrpc::ProxyHttpInnerAuthrpc; - -mod proxy_http_inner_rpc; -pub(crate) use proxy_http_inner_rpc::ProxyHttpInnerRpc; - -mod proxy_http; -pub(crate) use proxy_http::{ - ProxiedHttpRequest, - ProxiedHttpResponse, - ProxyHttp, - ProxyHttpRequestInfo, -}; - -mod proxy_http_inner; -pub(crate) use proxy_http_inner::ProxyHttpInner; diff --git a/crates/rproxy/src/proxy_http/proxy_http_inner_authrpc.rs b/crates/rproxy/src/proxy_http/proxy_http_inner_authrpc.rs index 831d086..8cf9c8c 100644 --- a/crates/rproxy/src/proxy_http/proxy_http_inner_authrpc.rs +++ b/crates/rproxy/src/proxy_http/proxy_http_inner_authrpc.rs @@ -39,14 +39,13 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { fn should_mirror(jrpc_req: &JrpcRequestMeta) -> bool { let method = jrpc_req.method(); - if true && - !method.starts_with("engine_forkchoiceUpdated") && + if !method.starts_with("engine_forkchoiceUpdated") && !method.starts_with("engine_newPayload") && !method.starts_with("miner_setMaxDASize") { return false; } - return true; + true } match jrpc_req { @@ -58,7 +57,7 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { return true; } } - return false; + false } } } diff --git a/crates/rproxy/src/proxy_http/proxy_http_inner_rpc.rs b/crates/rproxy/src/proxy_http/proxy_http_inner_rpc.rs index f5da0c8..9be9051 100644 --- a/crates/rproxy/src/proxy_http/proxy_http_inner_rpc.rs +++ b/crates/rproxy/src/proxy_http/proxy_http_inner_rpc.rs @@ -47,7 +47,7 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { return false; } - return mirror_errored_requests || jrpc_res.error.is_none() + mirror_errored_requests || jrpc_res.error.is_none() } match jrpc_req { @@ -67,11 +67,11 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { } }; - return should_mirror( + should_mirror( jrpc_req_single, &jrpc_res_single, self.config.mirror_errored_requests, - ); + ) } JrpcRequestMetaMaybeBatch::Batch(jrpc_req_batch) => { @@ -106,7 +106,7 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { return true; } } - return false; + false } } } diff --git a/crates/rproxy/src/proxy_ws/proxy_ws.rs b/crates/rproxy/src/proxy_ws.rs similarity index 90% rename from crates/rproxy/src/proxy_ws/proxy_ws.rs rename to crates/rproxy/src/proxy_ws.rs index 72fdfb8..8115521 100644 --- a/crates/rproxy/src/proxy_ws/proxy_ws.rs +++ b/crates/rproxy/src/proxy_ws.rs @@ -1,3 +1,6 @@ +mod proxy_ws_flashblocks; + +mod proxy_ws_inner; use std::{ io::Write, marker::PhantomData, @@ -26,6 +29,8 @@ use futures::{ stream::{SplitSink, SplitStream}, }; use prometheus_client::metrics::gauge::Atomic; +pub(crate) use proxy_ws_flashblocks::ProxyWsInnerFlashblocks; +pub(crate) use proxy_ws_inner::ProxyWsInner; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::{net::TcpStream, sync::broadcast}; @@ -40,19 +45,18 @@ use crate::{ metrics::{LabelsProxyWs, Metrics}, proxy::{Proxy, ProxyConnectionGuard}, proxy_http::ProxyHttpRequestInfo, - proxy_ws::ProxyWsInner, utils::{Loggable, raw_transaction_to_hash}, }; const WS_PING_INTERVAL_SECONDS: u64 = 1; -const WS_CLI_ERROR: &'static str = "client error"; -const WS_BCK_ERROR: &'static str = "backend error"; -const WS_BCK_TIMEOUT: &'static str = "backend error"; -const WS_CLOSE_OK: &'static str = ""; +const WS_CLI_ERROR: &str = "client error"; +const WS_BCK_ERROR: &str = "backend error"; +const WS_BCK_TIMEOUT: &str = "backend error"; +const WS_CLOSE_OK: &str = ""; -const WS_LABEL_BACKEND: &'static str = "backend"; -const WS_LABEL_CLIENT: &'static str = "client"; +const WS_LABEL_BACKEND: &str = "backend"; +const WS_LABEL_CLIENT: &str = "client"; // ProxyWs ------------------------------------------------------------- @@ -90,7 +94,7 @@ where let config = shared.config(); - let backend = ProxyWsBackendEndpoint::new(id.clone(), config.backend_url()); + let backend = ProxyWsBackendEndpoint::new(id, config.backend_url()); let postprocessor = ProxyWsPostprocessor:: { inner: shared.inner.clone(), @@ -125,7 +129,7 @@ where canceller: tokio_util::sync::CancellationToken, resetter: broadcast::Sender<()>, ) -> Result<(), Box> { - let listen_address = config.listen_address().clone(); + let listen_address = config.listen_address(); let listener = match Self::listen(&config) { Ok(listener) => listener, @@ -190,7 +194,7 @@ where let handler = proxy.handle(); let mut resetter = resetter.subscribe(); tokio::spawn(async move { - if let Ok(_) = resetter.recv().await { + if resetter.recv().await.is_ok() { info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); handler.stop(true).await; } @@ -582,7 +586,7 @@ where start: timestamp, end: UtcDateTime::now(), }); - return Ok(()); + Ok(()) } // text @@ -619,7 +623,7 @@ where start: timestamp, end: UtcDateTime::now(), }); - return Ok(()); + Ok(()) } // ping @@ -634,31 +638,28 @@ where ); return Err(WS_CLI_ERROR); } - return Ok(()); + Ok(()) } // pong actix_ws::Message::Pong(bytes) => { - if let Some(pong) = ProxyWsPing::from_bytes(bytes) { - if let Some((_, ping)) = this.pings.remove_sync(&pong.id) { - if pong == ping { - this.ping_balance_cli.dec(); - this.shared - .metrics - .ws_latency_client - .get_or_create(&LabelsProxyWs { - proxy: P::name(), - destination: WS_LABEL_BACKEND, - }) - .record( - (1000000.0 * - (timestamp - pong.timestamp).as_seconds_f64() / - 2.0) - as i64, - ); - return Ok(()); - } - } + if let Some(pong) = ProxyWsPing::from_bytes(bytes) && + let Some((_, ping)) = this.pings.remove_sync(&pong.id) && + pong == ping + { + this.ping_balance_cli.dec(); + this.shared + .metrics + .ws_latency_client + .get_or_create(&LabelsProxyWs { + proxy: P::name(), + destination: WS_LABEL_BACKEND, + }) + .record( + (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / + 2.0) as i64, + ); + return Ok(()); } warn!( proxy = P::name(), @@ -666,7 +667,7 @@ where worker_id = %this.id, "Unexpected websocket pong received from client", ); - return Ok(()); + Ok(()) } // close @@ -691,12 +692,10 @@ where ); return Err(WS_BCK_ERROR); } - return Err(WS_CLOSE_OK); + Err(WS_CLOSE_OK) } - _ => { - return Ok(()); - } + _ => Ok(()), } } @@ -708,7 +707,7 @@ where error = ?err, "Client websocket stream error" ); - return Err(WS_CLI_ERROR); + Err(WS_CLI_ERROR) } None => { @@ -718,7 +717,7 @@ where worker_id = %this.id, "Client had closed websocket stream" ); - return Err(WS_CLOSE_OK); + Err(WS_CLOSE_OK) } } } @@ -760,7 +759,7 @@ where start: timestamp, end: UtcDateTime::now(), }); - return Ok(()); + Ok(()) } // text @@ -789,7 +788,7 @@ where start: timestamp, end: UtcDateTime::now(), }); - return Ok(()); + Ok(()) } // ping @@ -804,31 +803,28 @@ where ); return Err(WS_BCK_ERROR); } - return Ok(()); + Ok(()) } // pong tungstenite::Message::Pong(bytes) => { - if let Some(pong) = ProxyWsPing::from_bytes(bytes) { - if let Some((_, ping)) = this.pings.remove_sync(&pong.id) { - if pong == ping { - this.ping_balance_bck.dec(); - this.shared - .metrics - .ws_latency_backend - .get_or_create(&LabelsProxyWs { - proxy: P::name(), - destination: WS_LABEL_BACKEND, - }) - .record( - (1000000.0 * - (timestamp - pong.timestamp).as_seconds_f64() / - 2.0) - as i64, - ); - return Ok(()); - } - } + if let Some(pong) = ProxyWsPing::from_bytes(bytes) && + let Some((_, ping)) = this.pings.remove_sync(&pong.id) && + pong == ping + { + this.ping_balance_bck.dec(); + this.shared + .metrics + .ws_latency_backend + .get_or_create(&LabelsProxyWs { + proxy: P::name(), + destination: WS_LABEL_BACKEND, + }) + .record( + (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / + 2.0) as i64, + ); + return Ok(()); } warn!( proxy = P::name(), @@ -836,7 +832,7 @@ where worker_id = %this.id, "Unexpected websocket pong received from backend", ); - return Ok(()); + Ok(()) } // close @@ -858,12 +854,10 @@ where ); return Err(WS_CLI_ERROR); } - return Err(WS_CLOSE_OK); + Err(WS_CLOSE_OK) } - _ => { - return Ok(()); - } + _ => Ok(()), } } @@ -875,7 +869,7 @@ where error = ?err, "Backend websocket stream error" ); - return Err(WS_BCK_ERROR); + Err(WS_BCK_ERROR) } None => { @@ -885,7 +879,7 @@ where worker_id = %this.id, "Backend had closed websocket stream" ); - return Err(WS_CLOSE_OK); + Err(WS_CLOSE_OK) } } } @@ -909,7 +903,7 @@ where let json_msg = if config.log_backend_messages() { Loggable(&Self::maybe_sanitise( config.log_sanitise(), - serde_json::from_slice(&msg).unwrap_or_default(), + serde_json::from_slice(msg).unwrap_or_default(), )) } else { Loggable(&serde_json::Value::Null) @@ -931,7 +925,7 @@ where let json_msg = if config.log_backend_messages() { Loggable(&Self::maybe_sanitise( config.log_sanitise(), - serde_json::from_str(&msg).unwrap_or_default(), + serde_json::from_str(msg).unwrap_or_default(), )) } else { Loggable(&serde_json::Value::Null) @@ -953,7 +947,7 @@ where let json_msg = if config.log_client_messages() { Loggable(&Self::maybe_sanitise( config.log_sanitise(), - serde_json::from_slice(&msg).unwrap_or_default(), + serde_json::from_slice(msg).unwrap_or_default(), )) } else { Loggable(&serde_json::Value::Null) @@ -975,7 +969,7 @@ where let json_msg = if config.log_client_messages() { Loggable(&Self::maybe_sanitise( config.log_sanitise(), - serde_json::from_str(&msg).unwrap_or_default(), + serde_json::from_str(msg).unwrap_or_default(), )) } else { Loggable(&serde_json::Value::Null) @@ -1000,15 +994,13 @@ where return message; } - if let Some(object) = message.as_object_mut() { - if let Some(diff) = object.get_mut("diff") { - if let Some(transactions) = diff.get_mut("transactions") { - if let Some(transactions) = transactions.as_array_mut() { - for transaction in transactions { - raw_transaction_to_hash(transaction); - } - } - } + if let Some(object) = message.as_object_mut() && + let Some(diff) = object.get_mut("diff") && + let Some(transactions) = diff.get_mut("transactions") && + let Some(transactions) = transactions.as_array_mut() + { + for transaction in transactions { + raw_transaction_to_hash(transaction); } } @@ -1193,7 +1185,7 @@ where fn handle(&mut self, msg: ProxyWsMessage, ctx: &mut Self::Context) -> Self::Result { let inner = self.inner.clone(); let metrics = self.metrics.clone(); - let worker_id = self.worker_id.clone(); + let worker_id = self.worker_id; ctx.spawn( async move { diff --git a/crates/rproxy/src/proxy_ws/mod.rs b/crates/rproxy/src/proxy_ws/mod.rs deleted file mode 100644 index 770cf87..0000000 --- a/crates/rproxy/src/proxy_ws/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -mod proxy_ws; - -mod proxy_ws_flashblocks; - -mod proxy_ws_inner; -pub(crate) use proxy_ws::ProxyWs; -pub(crate) use proxy_ws_flashblocks::ProxyWsInnerFlashblocks; -pub(crate) use proxy_ws_inner::ProxyWsInner; diff --git a/crates/rproxy/src/server/server.rs b/crates/rproxy/src/server.rs similarity index 94% rename from crates/rproxy/src/server/server.rs rename to crates/rproxy/src/server.rs index 12bf1a9..7315cf7 100644 --- a/crates/rproxy/src/server/server.rs +++ b/crates/rproxy/src/server.rs @@ -46,7 +46,7 @@ impl Server { } // spawn circuit-breaker - if config.circuit_breaker.url != "" { + if !config.circuit_breaker.url.is_empty() { let canceller = canceller.clone(); let resetter = resetter.clone(); @@ -86,7 +86,7 @@ impl Server { let resetter = resetter.clone(); services.push(tokio::spawn(async move { - let res = ProxyHttp::::run( + ProxyHttp::::run( config, tls, metrics, @@ -101,8 +101,7 @@ impl Server { "Failed to start http-proxy, terminating...", ); canceller.cancel(); - }); - res + }) })); } @@ -115,7 +114,7 @@ impl Server { let resetter = resetter.clone(); services.push(tokio::spawn(async move { - let res = ProxyHttp::::run( + ProxyHttp::::run( config, tls, metrics, @@ -130,8 +129,7 @@ impl Server { "Failed to start http-proxy, terminating...", ); canceller.cancel(); - }); - res + }) })); } @@ -144,7 +142,7 @@ impl Server { let resetter = resetter.clone(); services.push(tokio::spawn(async move { - let res = ProxyWs::::run( + ProxyWs::::run( config, tls, metrics, @@ -159,8 +157,7 @@ impl Server { "Failed to start websocket-proxy, terminating...", ); canceller.cancel(); - }); - res + }) })); } diff --git a/crates/rproxy/src/server/mod.rs b/crates/rproxy/src/server/mod.rs deleted file mode 100644 index dc1344f..0000000 --- a/crates/rproxy/src/server/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod server; -pub use server::Server; diff --git a/crates/rproxy/src/utils/mod.rs b/crates/rproxy/src/utils.rs similarity index 100% rename from crates/rproxy/src/utils/mod.rs rename to crates/rproxy/src/utils.rs diff --git a/crates/rproxy/src/utils/utils_compression.rs b/crates/rproxy/src/utils/utils_compression.rs index 3ea8be6..56cdc49 100644 --- a/crates/rproxy/src/utils/utils_compression.rs +++ b/crates/rproxy/src/utils/utils_compression.rs @@ -48,5 +48,5 @@ pub fn decompress(body: Bytes, size: usize, content_encoding: String) -> (Bytes, _ => {} } - return (body.clone(), size); + (body.clone(), size) } diff --git a/crates/rproxy/src/utils/utils_loggable.rs b/crates/rproxy/src/utils/utils_loggable.rs index 65c0931..f41ba4f 100644 --- a/crates/rproxy/src/utils/utils_loggable.rs +++ b/crates/rproxy/src/utils/utils_loggable.rs @@ -53,7 +53,7 @@ impl valuable::Listable for Loggable<'_> { if let serde_json::Value::Array(arr) = &self.0 { return (arr.len(), Some(arr.len())); } - return (0, Some(0)); + (0, Some(0)) } } @@ -62,6 +62,6 @@ impl valuable::Mappable for Loggable<'_> { if let serde_json::Value::Object(obj) = &self.0 { return (obj.len(), Some(obj.len())); } - return (0, Some(0)); + (0, Some(0)) } }