Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ build:

.PHONY: fmt
fmt:
@rustfmt --config-path ./.rustfmt.toml --check ./crates/rproxy/**/*
@cargo +nightly fmt --check

.PHONY: lint
lint:
@cargo +nightly clippy --all-features -- -D warnings

.PHONY: help
help:
Expand Down
2 changes: 0 additions & 2 deletions crates/rproxy/src/circuit_breaker/mod.rs

This file was deleted.

75 changes: 42 additions & 33 deletions crates/rproxy/src/config/config.rs → crates/rproxy/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
mod config_authrpc;
pub(crate) use config_authrpc::*;

mod config_circuit_breaker;
pub(crate) use config_circuit_breaker::*;

mod config_flashblocks;
pub(crate) use config_flashblocks::*;

mod config_logging;
pub(crate) use config_logging::*;

mod config_metrics;
pub(crate) use config_metrics::*;

mod config_proxy_http;
pub(crate) use config_proxy_http::*;

mod config_proxy_ws;
pub(crate) use config_proxy_ws::*;

mod config_rpc;
pub(crate) use config_rpc::*;

mod config_tls;
use std::{process, sync::LazyLock};

use clap::Parser;
pub(crate) use config_tls::*;
use thiserror::Error;

use crate::config::{
ConfigAuthrpc,
ConfigAuthrpcError,
ConfigCircuitBreaker,
ConfigCircuitBreakerError,
ConfigFlashblocks,
ConfigFlashblocksError,
ConfigLogError,
ConfigLogging,
ConfigMetrics,
ConfigMetricsError,
ConfigRpc,
ConfigRpcError,
ConfigTls,
ConfigTlsError,
};

pub(crate) const ALREADY_VALIDATED: &str = "parameter must have been validated already";

pub(crate) static PARALLELISM: LazyLock<usize> =
Expand Down Expand Up @@ -76,10 +85,10 @@ impl Config {
let mut errs: Vec<ConfigError> = vec![];

// authrpc proxy
if self.rpc.enabled {
if let Some(_errs) = self.authrpc.validate() {
errs.append(&mut _errs.into_iter().map(|err| err.into()).collect());
}
if self.rpc.enabled &&
let Some(_errs) = self.authrpc.validate()
{
errs.append(&mut _errs.into_iter().map(|err| err.into()).collect());
}

// circuit-breaker
Expand All @@ -88,10 +97,10 @@ impl Config {
}

// flashblocks proxy
if self.flashblocks.enabled {
if let Some(_errs) = self.flashblocks.validate() {
errs.append(&mut _errs.into_iter().map(|err| err.into()).collect());
}
if self.flashblocks.enabled &&
let Some(_errs) = self.flashblocks.validate()
{
errs.append(&mut _errs.into_iter().map(|err| err.into()).collect());
}

// logging
Expand All @@ -105,17 +114,17 @@ impl Config {
}

// rpc proxy
if self.rpc.enabled {
if let Some(_errs) = self.rpc.validate() {
errs.append(&mut _errs.into_iter().map(|err| err.into()).collect());
}
if self.rpc.enabled &&
let Some(_errs) = self.rpc.validate()
{
errs.append(&mut _errs.into_iter().map(|err| err.into()).collect());
}

// tls
if self.tls.certificate != "" || self.tls.key != "" {
if let Some(_errs) = self.tls.validate() {
errs.append(&mut _errs.into_iter().map(|err| err.into()).collect());
}
if (!self.tls.certificate.is_empty() || !self.tls.key.is_empty()) &&
let Some(_errs) = self.tls.validate()
{
errs.append(&mut _errs.into_iter().map(|err| err.into()).collect());
}

if !self.authrpc.enabled && !self.flashblocks.enabled && !self.rpc.enabled {
Expand Down
8 changes: 4 additions & 4 deletions crates/rproxy/src/config/config_authrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl ConfigAuthrpc {
// backend_url
match Url::parse(&self.backend_url) {
Ok(url) => {
if let None = url.host() {
if url.host().is_none() {
errs.push(ConfigAuthrpcError::BackendUrlMissesHost {
url: self.backend_url.clone(),
});
Expand All @@ -194,9 +194,9 @@ impl ConfigAuthrpc {

// mirroring_peer_urls
for peer_url in self.mirroring_peer_urls.iter() {
match Url::parse(&peer_url) {
match Url::parse(peer_url) {
Ok(url) => {
if let None = url.host() {
if url.host().is_none() {
errs.push(ConfigAuthrpcError::PeerUrlMissesHost { url: peer_url.clone() });
}
}
Expand Down Expand Up @@ -235,7 +235,7 @@ impl ConfigAuthrpc {
let local_ips = get_all_local_ip_addresses();

self.mirroring_peer_urls.retain(|url| {
let peer_url = Url::parse(&url).expect(ALREADY_VALIDATED);
let peer_url = Url::parse(url).expect(ALREADY_VALIDATED);
let peer_host = peer_url.host_str().expect(ALREADY_VALIDATED);

if !peer_url.port().eq(&backend_url.port()) {
Expand Down
4 changes: 2 additions & 2 deletions crates/rproxy/src/config/config_circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ impl ConfigCircuitBreaker {
}

// url
if self.url != "" {
if !self.url.is_empty() {
match Url::parse(&self.url) {
Ok(url) => {
if let None = url.host() {
if url.host().is_none() {
errs.push(ConfigCircuitBreakerError::UrlMissesHost {
url: self.url.clone(),
});
Expand Down
4 changes: 2 additions & 2 deletions crates/rproxy/src/config/config_flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ impl ConfigFlashblocks {
// backend_url
match self.backend_url.parse::<Uri>() {
Ok(uri) => {
if let None = uri.authority() {
if uri.authority().is_none() {
errs.push(ConfigFlashblocksError::BackendUrlMissesHost {
url: self.backend_url.clone(),
});
}

if let None = uri.host() {
if uri.host().is_none() {
errs.push(ConfigFlashblocksError::BackendUrlMissesHost {
url: self.backend_url.clone(),
});
Expand Down
4 changes: 2 additions & 2 deletions crates/rproxy/src/config/config_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl ConfigLogging {

pub(crate) fn setup_logging(&self) {
match self.format {
ConfigLogFormat::JSON => {
ConfigLogFormat::Json => {
tracing_subscriber::registry()
.with(EnvFilter::from(self.level.clone()))
.with(fmt::layer().json().flatten_event(true))
Expand All @@ -66,7 +66,7 @@ impl ConfigLogging {

#[derive(Clone, Debug, clap::ValueEnum)]
pub(crate) enum ConfigLogFormat {
JSON,
Json,
Text,
}

Expand Down
8 changes: 4 additions & 4 deletions crates/rproxy/src/config/config_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl ConfigRpc {
// backend_url
match Url::parse(&self.backend_url) {
Ok(url) => {
if let None = url.host() {
if url.host().is_none() {
errs.push(ConfigRpcError::BackendUrlMissesHost {
url: self.backend_url.clone(),
});
Expand All @@ -205,9 +205,9 @@ impl ConfigRpc {

// mirroring_peer_urls
for peer_url in self.mirroring_peer_urls.iter() {
match Url::parse(&peer_url) {
match Url::parse(peer_url) {
Ok(url) => {
if let None = url.host() {
if url.host().is_none() {
errs.push(ConfigRpcError::PeerUrlMissesHost { url: peer_url.clone() });
}
}
Expand Down Expand Up @@ -246,7 +246,7 @@ impl ConfigRpc {
let local_ips = get_all_local_ip_addresses();

self.mirroring_peer_urls.retain(|url| {
let peer_url = Url::parse(&url).expect(ALREADY_VALIDATED);
let peer_url = Url::parse(url).expect(ALREADY_VALIDATED);
let peer_host = peer_url.host_str().expect(ALREADY_VALIDATED);

if !peer_url.port().eq(&backend_url.port()) {
Expand Down
33 changes: 14 additions & 19 deletions crates/rproxy/src/config/config_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ impl ConfigTls {

// certificate
{
if self.certificate == "" && self.key != "" {
if self.certificate.is_empty() && !self.key.is_empty() {
errs.push(ConfigTlsError::MissingCertificate);
}

if self.certificate != "" {
if !self.certificate.is_empty() {
match File::open(self.certificate.clone()) {
Err(err) => {
errs.push(ConfigTlsError::InvalidCertificateFile {
Expand Down Expand Up @@ -104,11 +104,11 @@ impl ConfigTls {

// key
{
if self.certificate != "" && self.key == "" {
if !self.certificate.is_empty() && self.key.is_empty() {
errs.push(ConfigTlsError::MissingKey);
}

if self.key != "" {
if !self.key.is_empty() {
match File::open(self.key.clone()) {
Err(err) => {
errs.push(ConfigTlsError::InvalidKeyFile {
Expand Down Expand Up @@ -155,20 +155,15 @@ impl ConfigTls {

// certificate + key
{
match (cert, key) {
(Some(cert), Some(key)) => {
if let Err(err) =
ServerConfig::builder().with_no_client_auth().with_single_cert(cert, key)
{
errs.push(ConfigTlsError::InvalidPair {
path_cert: self.certificate.clone(),
path_key: self.key.clone(),
err: err.to_string(),
});
}
}

(_, _) => {}
if let (Some(cert), Some(key)) = (cert, key) &&
let Err(err) =
ServerConfig::builder().with_no_client_auth().with_single_cert(cert, key)
{
errs.push(ConfigTlsError::InvalidPair {
path_cert: self.certificate.clone(),
path_key: self.key.clone(),
err: err.to_string(),
});
}
}

Expand All @@ -179,7 +174,7 @@ impl ConfigTls {
}

pub(crate) fn enabled(&self) -> bool {
self.certificate != "" && self.key != ""
!self.certificate.is_empty() && !self.key.is_empty()
}

pub(crate) fn key(&self) -> &PrivateKeyDer<'static> {
Expand Down
30 changes: 0 additions & 30 deletions crates/rproxy/src/config/mod.rs

This file was deleted.

File renamed without changes.
2 changes: 0 additions & 2 deletions crates/rproxy/src/jrpc/mod.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
mod metrics_candlestick;
pub(crate) use metrics_candlestick::Candlestick;

mod metrics_labels;
use std::{net::TcpListener, sync::Arc, time::Duration};

use actix_web::{
Expand All @@ -9,23 +13,15 @@ use actix_web::{
web,
};
use awc::http::Method;
pub(crate) use metrics_labels::*;
use prometheus_client::{
metrics::{counter::Counter, family::Family, gauge::Gauge},
registry::{Registry, Unit},
};
use socket2::{SockAddr, Socket, TcpKeepalive};
use tracing::{error, info};

use crate::{
config::ConfigMetrics,
metrics::{
Candlestick,
LabelsProxy,
LabelsProxyClientInfo,
LabelsProxyHttpJrpc,
LabelsProxyWs,
},
};
use crate::config::ConfigMetrics;

// Metrics -------------------------------------------------------------

Expand Down Expand Up @@ -269,7 +265,7 @@ impl Metrics {
self: Arc<Self>,
canceller: tokio_util::sync::CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let listen_address = self.config.listen_address().clone();
let listen_address = self.config.listen_address();

let listener = match self.listen() {
Ok(listener) => listener,
Expand Down
8 changes: 0 additions & 8 deletions crates/rproxy/src/metrics/mod.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Drop for ProxyConnectionGuard {
fn drop(&mut self) {
let val = self.client_connections_count.fetch_sub(1, Ordering::Relaxed) - 1;

let metric_labels = LabelsProxy { proxy: &self.proxy_name };
let metric_labels = LabelsProxy { proxy: self.proxy_name };

self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val);
self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc();
Expand Down
2 changes: 0 additions & 2 deletions crates/rproxy/src/proxy/mod.rs

This file was deleted.

Loading