diff --git a/Cargo.lock b/Cargo.lock index e20ac11d47a..4e91e242510 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2310,9 +2310,9 @@ dependencies = [ [[package]] name = "dyn-clone" -version = "1.0.18" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feeef44e73baff3a26d371801df019877a9866a8c493d315ab00177843314f35" +checksum = "1c7a8fb8a9fbf66c1f703fe16184d10ca0ee9d23be5b4436400408ba54a95005" [[package]] name = "easy-addr" @@ -5020,6 +5020,7 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.12", + "time", "tokio", "tokio-stream", "tokio-util", @@ -5636,9 +5637,11 @@ dependencies = [ name = "nym-credential-verification" version = "0.1.0" dependencies = [ + "async-trait", "bs58", "cosmwasm-std", "cw-utils", + "dyn-clone", "futures", "nym-api-requests 0.1.0", "nym-credentials", @@ -6022,8 +6025,10 @@ dependencies = [ name = "nym-gateway-storage" version = "0.1.0" dependencies = [ + "async-trait", "bincode", "defguard_wireguard_rs", + "dyn-clone", "nym-credentials-interface 0.1.0", "nym-gateway-requests", "nym-sphinx 0.1.0", @@ -7964,11 +7969,13 @@ dependencies = [ name = "nym-wireguard" version = "0.1.0" dependencies = [ + "async-trait", "base64 0.22.1", "bincode", "chrono", "dashmap", "defguard_wireguard_rs", + "dyn-clone", "futures", "ip_network", "log", diff --git a/Cargo.toml b/Cargo.toml index 8b3404c545e..73c26a7e270 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -234,6 +234,7 @@ digest = "0.10.7" dirs = "5.0" doc-comment = "0.3" dotenvy = "0.15.6" +dyn-clone = "1.0.19" ecdsa = "0.16" ed25519-dalek = "2.1" encoding_rs = "0.8.35" diff --git a/common/authenticator-requests/src/v5/registration.rs b/common/authenticator-requests/src/v5/registration.rs index 28aadf9c6de..3c51776ca05 100644 --- a/common/authenticator-requests/src/v5/registration.rs +++ b/common/authenticator-requests/src/v5/registration.rs @@ -28,8 +28,6 @@ pub type HmacSha256 = Hmac; pub type Nonce = u64; pub type Taken = Option; -pub const BANDWIDTH_CAP_PER_DAY: u64 = 250 * 1024 * 1024 * 1024; // 250 GB - #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct IpPair { pub ipv4: Ipv4Addr, diff --git a/common/credential-verification/Cargo.toml b/common/credential-verification/Cargo.toml index a86b7da3824..e85589eaaec 100644 --- a/common/credential-verification/Cargo.toml +++ b/common/credential-verification/Cargo.toml @@ -11,9 +11,11 @@ rust-version.workspace = true readme.workspace = true [dependencies] +async-trait = { workspace = true } bs58 = { workspace = true } cosmwasm-std = { workspace = true } cw-utils = { workspace = true } +dyn-clone = { workspace = true } futures = { workspace = true } rand = { workspace = true } si-scale = { workspace = true } diff --git a/common/credential-verification/src/bandwidth_storage_manager.rs b/common/credential-verification/src/bandwidth_storage_manager.rs index 5d510913464..17f89a59fb0 100644 --- a/common/credential-verification/src/bandwidth_storage_manager.rs +++ b/common/credential-verification/src/bandwidth_storage_manager.rs @@ -7,25 +7,36 @@ use crate::ClientBandwidth; use nym_credentials::ecash::utils::ecash_today; use nym_credentials_interface::Bandwidth; use nym_gateway_requests::ServerResponse; -use nym_gateway_storage::GatewayStorage; +use nym_gateway_storage::traits::BandwidthGatewayStorage; use si_scale::helpers::bibytes2; use time::OffsetDateTime; use tracing::*; const FREE_TESTNET_BANDWIDTH_VALUE: Bandwidth = Bandwidth::new_unchecked(64 * 1024 * 1024 * 1024); // 64GB -#[derive(Clone)] pub struct BandwidthStorageManager { - pub(crate) storage: GatewayStorage, + pub(crate) storage: Box, pub(crate) client_bandwidth: ClientBandwidth, pub(crate) client_id: i64, pub(crate) bandwidth_cfg: BandwidthFlushingBehaviourConfig, pub(crate) only_coconut_credentials: bool, } +impl Clone for BandwidthStorageManager { + fn clone(&self) -> Self { + Self { + storage: dyn_clone::clone_box(&*self.storage), + client_bandwidth: self.client_bandwidth.clone(), + client_id: self.client_id, + bandwidth_cfg: self.bandwidth_cfg, + only_coconut_credentials: self.only_coconut_credentials, + } + } +} + impl BandwidthStorageManager { pub fn new( - storage: GatewayStorage, + storage: Box, client_bandwidth: ClientBandwidth, client_id: i64, bandwidth_cfg: BandwidthFlushingBehaviourConfig, diff --git a/common/credential-verification/src/ecash/mod.rs b/common/credential-verification/src/ecash/mod.rs index 71d0d993ffe..107ac452997 100644 --- a/common/credential-verification/src/ecash/mod.rs +++ b/common/credential-verification/src/ecash/mod.rs @@ -2,12 +2,14 @@ // SPDX-License-Identifier: GPL-3.0-only use crate::Error; +use async_trait::async_trait; use credential_sender::CredentialHandler; use credential_sender::CredentialHandlerConfig; use error::EcashTicketError; use futures::channel::mpsc::{self, UnboundedSender}; use nym_credentials::CredentialSpendingData; use nym_credentials_interface::{ClientTicket, CompactEcashError, NymPayInfo, VerificationKeyAuth}; +use nym_gateway_storage::traits::BandwidthGatewayStorage; use nym_gateway_storage::GatewayStorage; use nym_validator_client::nym_api::EpochId; use nym_validator_client::DirectSigningHttpRpcNyxdClient; @@ -20,6 +22,7 @@ pub mod credential_sender; pub mod error; mod helpers; mod state; +pub mod traits; pub const TIME_RANGE_SEC: i64 = 30; @@ -31,44 +34,21 @@ pub struct EcashManager { cred_sender: UnboundedSender, } -impl EcashManager { - pub async fn new( - credential_handler_cfg: CredentialHandlerConfig, - nyxd_client: DirectSigningHttpRpcNyxdClient, - pk_bytes: [u8; 32], - shutdown: nym_task::TaskClient, - storage: GatewayStorage, - ) -> Result { - let shared_state = SharedState::new(nyxd_client, storage).await?; - - let (cred_sender, cred_receiver) = mpsc::unbounded(); - - let cs = - CredentialHandler::new(credential_handler_cfg, cred_receiver, shared_state.clone()) - .await?; - cs.start(shutdown); - - Ok(EcashManager { - shared_state, - pk_bytes, - pay_infos: Default::default(), - cred_sender, - }) - } - - pub async fn verification_key( +#[async_trait] +impl traits::EcashManager for EcashManager { + async fn verification_key( &self, epoch_id: EpochId, ) -> Result, EcashTicketError> { self.shared_state.verification_key(epoch_id).await } - pub fn storage(&self) -> &GatewayStorage { - &self.shared_state.storage + fn storage(&self) -> Box { + dyn_clone::clone_box(&*self.shared_state.storage) } //Check for duplicate pay_info, then check the payment, then insert pay_info if everything succeeded - pub async fn check_payment( + async fn check_payment( &self, credential: &CredentialSpendingData, aggregated_verification_key: &VerificationKeyAuth, @@ -88,6 +68,40 @@ impl EcashManager { .await } + fn async_verify(&self, ticket: ClientTicket) { + // TODO: I guess do something for shutdowns + let _ = self + .cred_sender + .unbounded_send(ticket) + .inspect_err(|_| error!("failed to send the client ticket for verification task")); + } +} + +impl EcashManager { + pub async fn new( + credential_handler_cfg: CredentialHandlerConfig, + nyxd_client: DirectSigningHttpRpcNyxdClient, + pk_bytes: [u8; 32], + shutdown: nym_task::TaskClient, + storage: GatewayStorage, + ) -> Result { + let shared_state = SharedState::new(nyxd_client, Box::new(storage)).await?; + + let (cred_sender, cred_receiver) = mpsc::unbounded(); + + let cs = + CredentialHandler::new(credential_handler_cfg, cred_receiver, shared_state.clone()) + .await?; + cs.start(shutdown); + + Ok(EcashManager { + shared_state, + pk_bytes, + pay_infos: Default::default(), + cred_sender, + }) + } + pub async fn verify_pay_info(&self, pay_info: NymPayInfo) -> Result { //Public key check if pay_info.pk() != self.pk_bytes { @@ -152,12 +166,86 @@ impl EcashManager { inner.insert(index, pay_info); Ok(()) } +} - pub fn async_verify(&self, ticket: ClientTicket) { - // TODO: I guess do something for shutdowns - let _ = self - .cred_sender - .unbounded_send(ticket) - .inspect_err(|_| error!("failed to send the client ticket for verification task")); +pub struct MockEcashManager { + verfication_key: tokio::sync::RwLock, + storage: Box, +} + +impl MockEcashManager { + pub fn new(storage: Box) -> Self { + Self { + verfication_key: tokio::sync::RwLock::new( + VerificationKeyAuth::from_bytes(&[ + 129, 187, 76, 12, 1, 51, 46, 26, 132, 205, 148, 109, 140, 131, 50, 119, 45, + 128, 51, 218, 106, 70, 181, 74, 244, 38, 162, 62, 42, 12, 5, 100, 7, 136, 32, + 155, 18, 219, 195, 182, 3, 56, 168, 16, 93, 154, 249, 230, 16, 202, 90, 134, + 246, 25, 98, 6, 175, 215, 188, 239, 71, 84, 66, 1, 43, 66, 197, 180, 216, 80, + 55, 185, 140, 216, 14, 48, 244, 214, 20, 68, 106, 41, 48, 252, 188, 181, 231, + 170, 23, 211, 215, 12, 91, 147, 47, 7, 4, 0, 0, 0, 0, 0, 0, 0, 174, 31, 237, + 215, 159, 183, 71, 125, 90, 147, 84, 78, 49, 216, 66, 232, 92, 206, 41, 230, + 239, 209, 211, 166, 131, 190, 148, 36, 225, 194, 146, 6, 120, 34, 194, 5, 154, + 155, 234, 41, 191, 119, 227, 51, 91, 128, 151, 240, 129, 208, 253, 171, 234, + 170, 71, 139, 251, 78, 49, 35, 218, 16, 77, 150, 177, 204, 83, 210, 67, 147, + 66, 162, 58, 25, 96, 168, 61, 180, 92, 21, 18, 78, 194, 98, 176, 123, 122, 176, + 81, 150, 187, 20, 64, 69, 0, 134, 142, 3, 84, 108, 3, 55, 107, 111, 73, 31, 46, + 51, 225, 248, 202, 173, 194, 24, 104, 96, 31, 61, 24, 140, 220, 31, 176, 200, + 30, 217, 66, 58, 11, 181, 158, 196, 179, 199, 177, 7, 210, 4, 119, 142, 149, + 59, 3, 186, 145, 27, 230, 125, 230, 246, 197, 196, 119, 70, 239, 115, 99, 215, + 63, 205, 63, 74, 108, 201, 42, 226, 150, 137, 3, 157, 45, 25, 163, 54, 107, + 153, 61, 141, 64, 207, 139, 41, 203, 39, 36, 97, 181, 72, 206, 235, 221, 178, + 171, 60, 4, 6, 170, 181, 213, 10, 216, 53, 28, 32, 33, 41, 224, 60, 247, 206, + 137, 108, 251, 229, 234, 112, 65, 145, 124, 212, 125, 116, 154, 114, 2, 125, + 202, 24, 25, 196, 219, 104, 200, 131, 133, 180, 39, 21, 144, 204, 8, 151, 218, + 99, 64, 209, 47, 5, 42, 13, 214, 139, 54, 112, 224, 53, 238, 250, 56, 42, 105, + 15, 21, 238, 99, 225, 79, 121, 104, 155, 230, 243, 133, 47, 39, 147, 98, 45, + 113, 137, 200, 102, 151, 122, 174, 9, 250, 17, 138, 191, 129, 202, 244, 107, + 75, 48, 141, 136, 89, 168, 124, 88, 174, 251, 17, 35, 146, 88, 76, 134, 102, + 105, 204, 16, 176, 214, 63, 13, 170, 225, 250, 112, 7, 237, 161, 160, 15, 71, + 10, 130, 137, 69, 186, 64, 223, 188, 5, 5, 228, 57, 214, 134, 247, 20, 171, + 140, 43, 230, 57, 29, 127, 136, 169, 80, 14, 137, 130, 200, 205, 222, 81, 143, + 40, 77, 68, 197, 91, 142, 91, 84, 164, 15, 133, 242, 149, 255, 173, 201, 108, + 208, 23, 188, 230, 158, 146, 54, 198, 52, 148, 123, 202, 52, 222, 50, 4, 62, + 211, 208, 176, 61, 104, 151, 227, 192, 224, 200, 132, 53, 187, 240, 254, 150, + 60, 30, 140, 11, 63, 71, 12, 30, 233, 255, 144, 250, 16, 81, 38, 33, 9, 185, + 195, 214, 0, 119, 117, 94, 100, 103, 144, 10, 189, 65, 113, 114, 192, 11, 177, + 214, 223, 218, 36, 139, 183, 2, 206, 247, 245, 88, 62, 231, 183, 50, 46, 95, + 202, 152, 82, 244, 80, 173, 192, 147, 51, 248, 46, 181, 194, 205, 233, 67, 144, + 155, 250, 142, 124, 71, 9, 136, 142, 88, 29, 99, 222, 43, 181, 172, 120, 187, + 179, 172, 240, 231, 57, 236, 195, 158, 182, 203, 19, 49, 220, 180, 212, 101, + 105, 239, 58, 215, 0, 50, 100, 172, 29, 236, 170, 108, 129, 150, 5, 64, 238, + 59, 50, 4, 21, 131, 197, 142, 191, 76, 101, 140, 133, 112, 38, 235, 113, 203, + 22, 161, 204, 84, 73, 125, 219, 70, 62, 67, 119, 52, 130, 208, 180, 231, 78, + 141, 181, 13, 207, 196, 126, 159, 70, 34, 195, 70, + ]) + .unwrap(), + ), + storage: dyn_clone::clone_box(&*storage), + } } } + +#[async_trait] +impl traits::EcashManager for MockEcashManager { + async fn verification_key( + &self, + _epoch_id: EpochId, + ) -> Result, EcashTicketError> { + Ok(self.verfication_key.read().await) + } + + fn storage(&self) -> Box { + dyn_clone::clone_box(&*self.storage) + } + + async fn check_payment( + &self, + _credential: &CredentialSpendingData, + _aggregated_verification_key: &VerificationKeyAuth, + ) -> Result<(), EcashTicketError> { + Ok(()) + } + + fn async_verify(&self, _ticket: ClientTicket) {} +} diff --git a/common/credential-verification/src/ecash/state.rs b/common/credential-verification/src/ecash/state.rs index 2c937fd6923..c16db2111fa 100644 --- a/common/credential-verification/src/ecash/state.rs +++ b/common/credential-verification/src/ecash/state.rs @@ -6,7 +6,7 @@ use crate::Error; use cosmwasm_std::{from_json, CosmosMsg, WasmMsg}; use nym_credentials_interface::VerificationKeyAuth; use nym_ecash_contract_common::msg::ExecuteMsg; -use nym_gateway_storage::GatewayStorage; +use nym_gateway_storage::traits::BandwidthGatewayStorage; use nym_validator_client::coconut::all_ecash_api_clients; use nym_validator_client::nym_api::EpochId; use nym_validator_client::nyxd::contract_traits::{ @@ -22,18 +22,28 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::{error, trace, warn}; // state shared by different subtasks dealing with credentials -#[derive(Clone)] pub(crate) struct SharedState { pub(crate) nyxd_client: Arc>, pub(crate) address: AccountId, pub(crate) epoch_data: Arc>>, - pub(crate) storage: GatewayStorage, + pub(crate) storage: Box, +} + +impl Clone for SharedState { + fn clone(&self) -> Self { + Self { + nyxd_client: self.nyxd_client.clone(), + address: self.address.clone(), + epoch_data: self.epoch_data.clone(), + storage: dyn_clone::clone_box(&*self.storage), + } + } } impl SharedState { pub(crate) async fn new( nyxd_client: DirectSigningHttpRpcNyxdClient, - storage: GatewayStorage, + storage: Box, ) -> Result { let address = nyxd_client.address(); diff --git a/common/credential-verification/src/ecash/traits.rs b/common/credential-verification/src/ecash/traits.rs new file mode 100644 index 00000000000..a7e81976812 --- /dev/null +++ b/common/credential-verification/src/ecash/traits.rs @@ -0,0 +1,23 @@ +use async_trait::async_trait; +use nym_credentials::CredentialSpendingData; +use nym_credentials_interface::{ClientTicket, VerificationKeyAuth}; +use nym_gateway_storage::traits::BandwidthGatewayStorage; +use nym_validator_client::nym_api::EpochId; +use tokio::sync::RwLockReadGuard; + +use crate::ecash::error::EcashTicketError; + +#[async_trait] +pub trait EcashManager { + async fn verification_key( + &self, + epoch_id: EpochId, + ) -> Result, EcashTicketError>; + fn storage(&self) -> Box; + async fn check_payment( + &self, + credential: &CredentialSpendingData, + aggregated_verification_key: &VerificationKeyAuth, + ) -> Result<(), EcashTicketError>; + fn async_verify(&self, ticket: ClientTicket); +} diff --git a/common/credential-verification/src/lib.rs b/common/credential-verification/src/lib.rs index 50d6db1e05d..2164c255145 100644 --- a/common/credential-verification/src/lib.rs +++ b/common/credential-verification/src/lib.rs @@ -1,8 +1,8 @@ // Copyright 2024 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +use crate::ecash::traits::EcashManager; use bandwidth_storage_manager::BandwidthStorageManager; -use ecash::EcashManager; use nym_credentials::ecash::utils::{cred_exp_date, ecash_today, EcashTime}; use nym_credentials_interface::{Bandwidth, ClientTicket, TicketType}; use nym_gateway_requests::models::CredentialSpendingRequest; @@ -20,14 +20,14 @@ pub mod error; pub struct CredentialVerifier { credential: CredentialSpendingRequest, - ecash_verifier: Arc, + ecash_verifier: Arc, bandwidth_storage_manager: BandwidthStorageManager, } impl CredentialVerifier { pub fn new( credential: CredentialSpendingRequest, - ecash_verifier: Arc, + ecash_verifier: Arc, bandwidth_storage_manager: BandwidthStorageManager, ) -> Self { CredentialVerifier { diff --git a/common/gateway-storage/Cargo.toml b/common/gateway-storage/Cargo.toml index 810697f2587..4ead02b9393 100644 --- a/common/gateway-storage/Cargo.toml +++ b/common/gateway-storage/Cargo.toml @@ -9,8 +9,10 @@ edition.workspace = true license.workspace = true [dependencies] +async-trait = { workspace = true } bincode = { workspace = true } defguard_wireguard_rs = { workspace = true } +dyn-clone = { workspace = true } sqlx = { workspace = true, features = [ "runtime-tokio-rustls", "sqlite", @@ -21,6 +23,7 @@ sqlx = { workspace = true, features = [ ] } time = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true, features = ["sync"], optional = true } tracing = { workspace = true } nym-credentials-interface = { path = "../credentials-interface" } @@ -35,3 +38,7 @@ sqlx = { workspace = true, features = [ "macros", "migrate", ] } + +[features] +default = [] +mock = ["tokio"] \ No newline at end of file diff --git a/common/gateway-storage/migrations/20250605120000_trim_wireguard_peer_data.sql b/common/gateway-storage/migrations/20250605120000_trim_wireguard_peer_data.sql new file mode 100644 index 00000000000..cd627850bfc --- /dev/null +++ b/common/gateway-storage/migrations/20250605120000_trim_wireguard_peer_data.sql @@ -0,0 +1,19 @@ +/* + * Copyright 2025 - Nym Technologies SA + * SPDX-License-Identifier: Apache-2.0 + */ + +DELETE FROM wireguard_peer WHERE client_id IS NULL; + +CREATE TABLE wireguard_peer_new +( + public_key TEXT NOT NULL PRIMARY KEY UNIQUE, + allowed_ips BLOB NOT NULL, + client_id INTEGER REFERENCES clients(id) NOT NULL +); + +INSERT INTO wireguard_peer_new (public_key, allowed_ips, client_id) +SELECT public_key, allowed_ips, client_id FROM wireguard_peer; + +DROP TABLE wireguard_peer; +ALTER TABLE wireguard_peer_new RENAME TO wireguard_peer; \ No newline at end of file diff --git a/common/gateway-storage/src/clients.rs b/common/gateway-storage/src/clients.rs index 8bee61e04b8..83365f8cfd2 100644 --- a/common/gateway-storage/src/clients.rs +++ b/common/gateway-storage/src/clients.rs @@ -3,6 +3,8 @@ use std::str::FromStr; +use nym_credentials_interface::TicketType; + use crate::models::Client; #[derive(Debug, PartialEq, sqlx::Type)] @@ -15,6 +17,17 @@ pub enum ClientType { ExitWireguard, } +impl From for ClientType { + fn from(value: TicketType) -> Self { + match value { + TicketType::V1MixnetEntry => ClientType::EntryMixnet, + TicketType::V1MixnetExit => ClientType::ExitMixnet, + TicketType::V1WireguardEntry => ClientType::EntryWireguard, + TicketType::V1WireguardExit => ClientType::ExitWireguard, + } + } +} + impl FromStr for ClientType { type Err = &'static str; diff --git a/common/gateway-storage/src/error.rs b/common/gateway-storage/src/error.rs index 272d86b5571..26004517513 100644 --- a/common/gateway-storage/src/error.rs +++ b/common/gateway-storage/src/error.rs @@ -20,6 +20,18 @@ pub enum GatewayStorageError { #[error("the stored data associated with ticket {ticket_id} is malformed!")] MalformedStoredTicketData { ticket_id: i64 }, - #[error("Failed to convert from type of database: {0}")] - TypeConversion(String), + #[error("Failed to convert from type of database: {field_key}")] + TypeConversion { field_key: &'static str }, + + #[error("Serialization failure for {field_key}")] + Serialize { + field_key: &'static str, + source: bincode::Error, + }, + + #[error("Deserialization failure for {field_key}")] + Deserialize { + field_key: &'static str, + source: bincode::Error, + }, } diff --git a/common/gateway-storage/src/lib.rs b/common/gateway-storage/src/lib.rs index ea534b8c1bc..e10fdc3c1ee 100644 --- a/common/gateway-storage/src/lib.rs +++ b/common/gateway-storage/src/lib.rs @@ -1,6 +1,7 @@ // Copyright 2020 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only +use async_trait::async_trait; use bandwidth::BandwidthManager; use clients::{ClientManager, ClientType}; use models::{ @@ -15,10 +16,10 @@ use sqlx::{ sqlite::{SqliteAutoVacuum, SqliteSynchronous}, ConnectOptions, }; -use std::path::Path; +use std::{path::Path, time::Duration}; use tickets::TicketStorageManager; use time::OffsetDateTime; -use tracing::{debug, error}; +use tracing::{debug, error, log::LevelFilter}; pub mod bandwidth; mod clients; @@ -27,11 +28,21 @@ mod inboxes; pub mod models; mod shared_keys; mod tickets; +pub mod traits; mod wireguard_peers; pub use error::GatewayStorageError; pub use inboxes::InboxManager; +use crate::traits::{BandwidthGatewayStorage, InboxGatewayStorage, SharedKeyGatewayStorage}; + +fn make_bincode_serializer() -> impl bincode::Options { + use bincode::Options; + bincode::DefaultOptions::new() + .with_big_endian() + .with_varint_encoding() +} + // note that clone here is fine as upon cloning the same underlying pool will be used #[derive(Clone)] pub struct GatewayStorage { @@ -71,6 +82,21 @@ impl GatewayStorage { &self.wireguard_peer_manager } + pub async fn handle_forget_me( + &self, + client_address: DestinationAddressBytes, + ) -> Result<(), GatewayStorageError> { + let client_id = self.get_mixnet_client_id(client_address).await?; + self.inbox_manager() + .remove_messages_for_client(&client_address.as_base58_string()) + .await?; + self.bandwidth_manager().remove_client(client_id).await?; + self.shared_key_manager() + .remove_shared_keys(&client_address.as_base58_string()) + .await?; + Ok(()) + } + /// Initialises `PersistentStorage` using the provided path. /// /// # Arguments @@ -92,6 +118,7 @@ impl GatewayStorage { .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) .synchronous(SqliteSynchronous::Normal) .auto_vacuum(SqliteAutoVacuum::Incremental) + .log_slow_statements(LevelFilter::Warn, Duration::from_millis(250)) .filename(database_path) .create_if_missing(true) .disable_statement_logging(); @@ -123,8 +150,9 @@ impl GatewayStorage { } } -impl GatewayStorage { - pub async fn get_mixnet_client_id( +#[async_trait] +impl SharedKeyGatewayStorage for GatewayStorage { + async fn get_mixnet_client_id( &self, client_address: DestinationAddressBytes, ) -> Result { @@ -134,22 +162,7 @@ impl GatewayStorage { .await?) } - pub async fn handle_forget_me( - &self, - client_address: DestinationAddressBytes, - ) -> Result<(), GatewayStorageError> { - let client_id = self.get_mixnet_client_id(client_address).await?; - self.inbox_manager() - .remove_messages_for_client(&client_address.as_base58_string()) - .await?; - self.bandwidth_manager().remove_client(client_id).await?; - self.shared_key_manager() - .remove_shared_keys(&client_address.as_base58_string()) - .await?; - Ok(()) - } - - pub async fn insert_shared_keys( + async fn insert_shared_keys( &self, client_address: DestinationAddressBytes, shared_keys: &SharedGatewayKey, @@ -178,7 +191,7 @@ impl GatewayStorage { Ok(client_id) } - pub async fn get_shared_keys( + async fn get_shared_keys( &self, client_address: DestinationAddressBytes, ) -> Result, GatewayStorageError> { @@ -190,7 +203,7 @@ impl GatewayStorage { } #[allow(dead_code)] - pub async fn remove_shared_keys( + async fn remove_shared_keys( &self, client_address: DestinationAddressBytes, ) -> Result<(), GatewayStorageError> { @@ -200,7 +213,7 @@ impl GatewayStorage { Ok(()) } - pub async fn update_last_used_authentication_timestamp( + async fn update_last_used_authentication_timestamp( &self, client_id: i64, last_used_authentication_timestamp: OffsetDateTime, @@ -214,12 +227,15 @@ impl GatewayStorage { Ok(()) } - pub async fn get_client(&self, client_id: i64) -> Result, GatewayStorageError> { + async fn get_client(&self, client_id: i64) -> Result, GatewayStorageError> { let client = self.client_manager.get_client(client_id).await?; Ok(client) } +} - pub async fn store_message( +#[async_trait] +impl InboxGatewayStorage for GatewayStorage { + async fn store_message( &self, client_address: DestinationAddressBytes, message: Vec, @@ -230,7 +246,7 @@ impl GatewayStorage { Ok(()) } - pub async fn retrieve_messages( + async fn retrieve_messages( &self, client_address: DestinationAddressBytes, start_after: Option, @@ -242,19 +258,22 @@ impl GatewayStorage { Ok(messages) } - pub async fn remove_messages(&self, ids: Vec) -> Result<(), GatewayStorageError> { + async fn remove_messages(&self, ids: Vec) -> Result<(), GatewayStorageError> { for id in ids { self.inbox_manager.remove_message(id).await?; } Ok(()) } +} - pub async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError> { +#[async_trait] +impl BandwidthGatewayStorage for GatewayStorage { + async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError> { self.bandwidth_manager.insert_new_client(client_id).await?; Ok(()) } - pub async fn set_expiration( + async fn set_expiration( &self, client_id: i64, expiration: OffsetDateTime, @@ -265,12 +284,12 @@ impl GatewayStorage { Ok(()) } - pub async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError> { + async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError> { self.bandwidth_manager.reset_bandwidth(client_id).await?; Ok(()) } - pub async fn get_available_bandwidth( + async fn get_available_bandwidth( &self, client_id: i64, ) -> Result, GatewayStorageError> { @@ -280,7 +299,7 @@ impl GatewayStorage { .await?) } - pub async fn increase_bandwidth( + async fn increase_bandwidth( &self, client_id: i64, amount: i64, @@ -291,7 +310,7 @@ impl GatewayStorage { .await?) } - pub async fn revoke_ticket_bandwidth( + async fn revoke_ticket_bandwidth( &self, ticket_id: i64, amount: i64, @@ -302,7 +321,7 @@ impl GatewayStorage { .await?) } - pub async fn decrease_bandwidth( + async fn decrease_bandwidth( &self, client_id: i64, amount: i64, @@ -313,7 +332,7 @@ impl GatewayStorage { .await?) } - pub async fn insert_epoch_signers( + async fn insert_epoch_signers( &self, epoch_id: i64, signer_ids: Vec, @@ -324,7 +343,7 @@ impl GatewayStorage { Ok(()) } - pub async fn insert_received_ticket( + async fn insert_received_ticket( &self, client_id: i64, received_at: OffsetDateTime, @@ -344,11 +363,11 @@ impl GatewayStorage { Ok(ticket_id) } - pub async fn contains_ticket(&self, serial_number: &[u8]) -> Result { + async fn contains_ticket(&self, serial_number: &[u8]) -> Result { Ok(self.ticket_manager.has_ticket_data(serial_number).await?) } - pub async fn insert_ticket_verification( + async fn insert_ticket_verification( &self, ticket_id: i64, signer_id: i64, @@ -361,7 +380,7 @@ impl GatewayStorage { Ok(()) } - pub async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> { + async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> { // set the ticket as rejected self.ticket_manager.set_rejected_ticket(ticket_id).await?; @@ -372,7 +391,7 @@ impl GatewayStorage { Ok(()) } - pub async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> { + async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> { // 1. insert into verified table self.ticket_manager .insert_verified_ticket(ticket_id) @@ -386,7 +405,7 @@ impl GatewayStorage { Ok(()) } - pub async fn remove_verified_ticket_binary_data( + async fn remove_verified_ticket_binary_data( &self, ticket_id: i64, ) -> Result<(), GatewayStorageError> { @@ -396,7 +415,7 @@ impl GatewayStorage { Ok(()) } - pub async fn get_all_verified_tickets_with_sn( + async fn get_all_verified_tickets_with_sn( &self, ) -> Result, GatewayStorageError> { Ok(self @@ -405,7 +424,7 @@ impl GatewayStorage { .await?) } - pub async fn get_all_proposed_tickets_with_sn( + async fn get_all_proposed_tickets_with_sn( &self, proposal_id: u32, ) -> Result, GatewayStorageError> { @@ -415,7 +434,7 @@ impl GatewayStorage { .await?) } - pub async fn insert_redemption_proposal( + async fn insert_redemption_proposal( &self, tickets: &[VerifiedTicket], proposal_id: u32, @@ -438,7 +457,7 @@ impl GatewayStorage { Ok(()) } - pub async fn clear_post_proposal_data( + async fn clear_post_proposal_data( &self, proposal_id: u32, resolved_at: OffsetDateTime, @@ -462,13 +481,11 @@ impl GatewayStorage { Ok(()) } - pub async fn latest_proposal(&self) -> Result, GatewayStorageError> { + async fn latest_proposal(&self) -> Result, GatewayStorageError> { Ok(self.ticket_manager.get_latest_redemption_proposal().await?) } - pub async fn get_all_unverified_tickets( - &self, - ) -> Result, GatewayStorageError> { + async fn get_all_unverified_tickets(&self) -> Result, GatewayStorageError> { self.ticket_manager .get_unverified_tickets() .await? @@ -477,21 +494,21 @@ impl GatewayStorage { .collect() } - pub async fn get_all_unresolved_proposals(&self) -> Result, GatewayStorageError> { + async fn get_all_unresolved_proposals(&self) -> Result, GatewayStorageError> { Ok(self .ticket_manager .get_all_unresolved_redemption_proposal_ids() .await?) } - pub async fn get_votes(&self, ticket_id: i64) -> Result, GatewayStorageError> { + async fn get_votes(&self, ticket_id: i64) -> Result, GatewayStorageError> { Ok(self .ticket_manager .get_verification_votes(ticket_id) .await?) } - pub async fn get_signers(&self, epoch_id: i64) -> Result, GatewayStorageError> { + async fn get_signers(&self, epoch_id: i64) -> Result, GatewayStorageError> { Ok(self.ticket_manager.get_epoch_signers(epoch_id).await?) } @@ -500,34 +517,20 @@ impl GatewayStorage { /// # Arguments /// /// * `peer`: wireguard peer data to be stored - /// * `with_client_id`: if the peer should have a corresponding client_id - /// (created with entry wireguard ticket) or live without one (or with an - /// exiting one), for temporary backwards compatibility. - pub async fn insert_wireguard_peer( + async fn insert_wireguard_peer( &self, peer: &defguard_wireguard_rs::host::Peer, - with_client_id: bool, - ) -> Result, GatewayStorageError> { + client_type: ClientType, + ) -> Result { let client_id = match self .wireguard_peer_manager .retrieve_peer(&peer.public_key.to_string()) .await? { Some(peer) => peer.client_id, - _ => { - if with_client_id { - Some( - self.client_manager - .insert_client(ClientType::EntryWireguard) - .await?, - ) - } else { - None - } - } + None => self.client_manager.insert_client(client_type).await?, }; - let mut peer = WireguardPeer::from(peer.clone()); - peer.client_id = client_id; + let peer = WireguardPeer::from_defguard_peer(peer.clone(), client_id)?; self.wireguard_peer_manager.insert_peer(&peer).await?; Ok(client_id) } @@ -537,7 +540,7 @@ impl GatewayStorage { /// # Arguments /// /// * `peer_public_key`: wireguard public key of the peer to be retrieved. - pub async fn get_wireguard_peer( + async fn get_wireguard_peer( &self, peer_public_key: &str, ) -> Result, GatewayStorageError> { @@ -549,7 +552,7 @@ impl GatewayStorage { } /// Retrieves all wireguard peers. - pub async fn get_all_wireguard_peers(&self) -> Result, GatewayStorageError> { + async fn get_all_wireguard_peers(&self) -> Result, GatewayStorageError> { let ret = self.wireguard_peer_manager.retrieve_all_peers().await?; Ok(ret) } @@ -559,7 +562,7 @@ impl GatewayStorage { /// # Arguments /// /// * `peer_public_key`: wireguard public key of the peer to be removed. - pub async fn remove_wireguard_peer( + async fn remove_wireguard_peer( &self, peer_public_key: &str, ) -> Result<(), GatewayStorageError> { diff --git a/common/gateway-storage/src/models.rs b/common/gateway-storage/src/models.rs index 531ed355de2..32b12b80a50 100644 --- a/common/gateway-storage/src/models.rs +++ b/common/gateway-storage/src/models.rs @@ -1,9 +1,7 @@ // Copyright 2021-2024 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only -use std::time::SystemTime; - -use crate::error::GatewayStorageError; +use crate::{error::GatewayStorageError, make_bincode_serializer}; use nym_credentials_interface::{AvailableBandwidth, ClientTicket, CredentialSpendingData}; use nym_gateway_requests::shared_key::{LegacySharedKeys, SharedGatewayKey, SharedSymmetricKey}; use sqlx::FromRow; @@ -112,35 +110,24 @@ impl TryFrom for ClientTicket { #[derive(Debug, Clone, FromRow)] pub struct WireguardPeer { pub public_key: String, - pub preshared_key: Option, - pub protocol_version: Option, - pub endpoint: Option, - pub last_handshake: Option, - pub tx_bytes: i64, - pub rx_bytes: i64, - pub persistent_keepalive_interval: Option, pub allowed_ips: Vec, - pub client_id: Option, + pub client_id: i64, } -impl From for WireguardPeer { - fn from(value: defguard_wireguard_rs::host::Peer) -> Self { - WireguardPeer { +impl WireguardPeer { + pub fn from_defguard_peer( + value: defguard_wireguard_rs::host::Peer, + client_id: i64, + ) -> Result { + Ok(WireguardPeer { public_key: value.public_key.to_string(), - preshared_key: value.preshared_key.as_ref().map(|k| k.to_string()), - protocol_version: value.protocol_version.map(|v| v as i64), - endpoint: value.endpoint.map(|e| e.to_string()), - last_handshake: value.last_handshake.map(OffsetDateTime::from), - tx_bytes: value.tx_bytes as i64, - rx_bytes: value.rx_bytes as i64, - persistent_keepalive_interval: value.persistent_keepalive_interval.map(|v| v as i64), - allowed_ips: bincode::Options::serialize( - bincode::DefaultOptions::new(), - &value.allowed_ips, - ) - .unwrap_or_default(), - client_id: None, - } + allowed_ips: bincode::Options::serialize(make_bincode_serializer(), &value.allowed_ips) + .map_err(|source| crate::error::GatewayStorageError::Serialize { + field_key: "allowed_ips", + source, + })?, + client_id, + }) } } @@ -149,49 +136,20 @@ impl TryFrom for defguard_wireguard_rs::host::Peer { fn try_from(value: WireguardPeer) -> Result { Ok(Self { - public_key: value - .public_key - .as_str() - .try_into() - .map_err(|e| Self::Error::TypeConversion(format!("public key {e}")))?, - preshared_key: value - .preshared_key - .as_deref() - .map(TryFrom::try_from) - .transpose() - .map_err(|e| Self::Error::TypeConversion(format!("preshared key {e}")))?, - protocol_version: value - .protocol_version - .map(TryFrom::try_from) - .transpose() - .map_err(|e| Self::Error::TypeConversion(format!("protocol version {e}")))?, - endpoint: value - .endpoint - .as_deref() - .map(|e| e.parse()) - .transpose() - .map_err(|e| Self::Error::TypeConversion(format!("endpoint {e}")))?, - last_handshake: value.last_handshake.map(SystemTime::from), - tx_bytes: value - .tx_bytes - .try_into() - .map_err(|e| Self::Error::TypeConversion(format!("tx bytes {e}")))?, - rx_bytes: value - .rx_bytes - .try_into() - .map_err(|e| Self::Error::TypeConversion(format!("rx bytes {e}")))?, - persistent_keepalive_interval: value - .persistent_keepalive_interval - .map(TryFrom::try_from) - .transpose() - .map_err(|e| { - Self::Error::TypeConversion(format!("persistent keepalive interval {e}")) - })?, + public_key: value.public_key.as_str().try_into().map_err(|_| { + Self::Error::TypeConversion { + field_key: "public_key", + } + })?, allowed_ips: bincode::Options::deserialize( bincode::DefaultOptions::new(), &value.allowed_ips, ) - .map_err(|e| Self::Error::TypeConversion(format!("allowed ips {e}")))?, + .map_err(|source| Self::Error::Deserialize { + field_key: "allowed_ips", + source, + })?, + ..Default::default() }) } } diff --git a/common/gateway-storage/src/traits.rs b/common/gateway-storage/src/traits.rs new file mode 100644 index 00000000000..aa7e01e4343 --- /dev/null +++ b/common/gateway-storage/src/traits.rs @@ -0,0 +1,511 @@ +// Copyright 2025 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use async_trait::async_trait; +use nym_credentials_interface::ClientTicket; +use nym_gateway_requests::SharedGatewayKey; +use nym_sphinx::DestinationAddressBytes; +use time::OffsetDateTime; + +use crate::{ + clients::ClientType, + models::{ + Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage, + VerifiedTicket, WireguardPeer, + }, + GatewayStorageError, +}; + +#[async_trait] +pub trait SharedKeyGatewayStorage { + async fn get_mixnet_client_id( + &self, + client_address: DestinationAddressBytes, + ) -> Result; + async fn insert_shared_keys( + &self, + client_address: DestinationAddressBytes, + shared_keys: &SharedGatewayKey, + ) -> Result; + async fn get_shared_keys( + &self, + client_address: DestinationAddressBytes, + ) -> Result, GatewayStorageError>; + #[allow(dead_code)] + async fn remove_shared_keys( + &self, + client_address: DestinationAddressBytes, + ) -> Result<(), GatewayStorageError>; + async fn update_last_used_authentication_timestamp( + &self, + client_id: i64, + last_used_authentication_timestamp: OffsetDateTime, + ) -> Result<(), GatewayStorageError>; + async fn get_client(&self, client_id: i64) -> Result, GatewayStorageError>; +} + +#[async_trait] +pub trait InboxGatewayStorage { + async fn store_message( + &self, + client_address: DestinationAddressBytes, + message: Vec, + ) -> Result<(), GatewayStorageError>; + async fn retrieve_messages( + &self, + client_address: DestinationAddressBytes, + start_after: Option, + ) -> Result<(Vec, Option), GatewayStorageError>; + async fn remove_messages(&self, ids: Vec) -> Result<(), GatewayStorageError>; +} + +#[async_trait] +pub trait BandwidthGatewayStorage: dyn_clone::DynClone { + async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError>; + async fn set_expiration( + &self, + client_id: i64, + expiration: OffsetDateTime, + ) -> Result<(), GatewayStorageError>; + async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError>; + async fn get_available_bandwidth( + &self, + client_id: i64, + ) -> Result, GatewayStorageError>; + async fn increase_bandwidth( + &self, + client_id: i64, + amount: i64, + ) -> Result; + async fn revoke_ticket_bandwidth( + &self, + ticket_id: i64, + amount: i64, + ) -> Result<(), GatewayStorageError>; + async fn decrease_bandwidth( + &self, + client_id: i64, + amount: i64, + ) -> Result; + + async fn insert_epoch_signers( + &self, + epoch_id: i64, + signer_ids: Vec, + ) -> Result<(), GatewayStorageError>; + async fn insert_received_ticket( + &self, + client_id: i64, + received_at: OffsetDateTime, + serial_number: Vec, + data: Vec, + ) -> Result; + async fn contains_ticket(&self, serial_number: &[u8]) -> Result; + async fn insert_ticket_verification( + &self, + ticket_id: i64, + signer_id: i64, + verified_at: OffsetDateTime, + accepted: bool, + ) -> Result<(), GatewayStorageError>; + async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError>; + async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError>; + async fn remove_verified_ticket_binary_data( + &self, + ticket_id: i64, + ) -> Result<(), GatewayStorageError>; + async fn get_all_verified_tickets_with_sn( + &self, + ) -> Result, GatewayStorageError>; + async fn get_all_proposed_tickets_with_sn( + &self, + proposal_id: u32, + ) -> Result, GatewayStorageError>; + async fn insert_redemption_proposal( + &self, + tickets: &[VerifiedTicket], + proposal_id: u32, + created_at: OffsetDateTime, + ) -> Result<(), GatewayStorageError>; + async fn clear_post_proposal_data( + &self, + proposal_id: u32, + resolved_at: OffsetDateTime, + rejected: bool, + ) -> Result<(), GatewayStorageError>; + async fn latest_proposal(&self) -> Result, GatewayStorageError>; + async fn get_all_unverified_tickets(&self) -> Result, GatewayStorageError>; + async fn get_all_unresolved_proposals(&self) -> Result, GatewayStorageError>; + async fn get_votes(&self, ticket_id: i64) -> Result, GatewayStorageError>; + async fn get_signers(&self, epoch_id: i64) -> Result, GatewayStorageError>; + + /// Insert a wireguard peer in the storage. + /// + /// # Arguments + /// + /// * `peer`: wireguard peer data to be stored + async fn insert_wireguard_peer( + &self, + peer: &defguard_wireguard_rs::host::Peer, + client_type: ClientType, + ) -> Result; + + /// Tries to retrieve a particular peer with the given public key. + /// + /// # Arguments + /// + /// * `peer_public_key`: wireguard public key of the peer to be retrieved. + async fn get_wireguard_peer( + &self, + peer_public_key: &str, + ) -> Result, GatewayStorageError>; + + /// Retrieves all wireguard peers. + async fn get_all_wireguard_peers(&self) -> Result, GatewayStorageError>; + + /// Remove a wireguard peer from the storage. + /// + /// # Arguments + /// + /// * `peer_public_key`: wireguard public key of the peer to be removed. + async fn remove_wireguard_peer(&self, peer_public_key: &str) + -> Result<(), GatewayStorageError>; +} + +#[cfg(feature = "mock")] +pub mod mock { + use std::{collections::HashMap, sync::Arc}; + + use tokio::sync::RwLock; + + use super::*; + + struct EcashSigner { + _epoch_id: i64, + _signer_id: i64, + } + + struct ReceivedTicket { + client_id: i64, + _received_at: OffsetDateTime, + rejected: Option, + } + + struct TicketData { + serial_number: Vec, + data: Option>, + } + + struct TicketVerification { + _ticket_id: i64, + _signer_id: i64, + _verified_at: OffsetDateTime, + _accepted: bool, + } + + #[derive(Default)] + pub struct MockGatewayStorage { + available_bandwidth: HashMap, + ecash_signers: Vec, + received_ticket: HashMap, + ticket_data: HashMap, + ticket_verification: HashMap, + verified_tickets: Vec, + wireguard_peers: HashMap, + clients: HashMap, + } + + #[async_trait] + impl BandwidthGatewayStorage for Arc> { + async fn create_bandwidth_entry(&self, client_id: i64) -> Result<(), GatewayStorageError> { + self.write().await.available_bandwidth.insert( + client_id, + PersistedBandwidth { + client_id, + available: 0, + expiration: Some(OffsetDateTime::UNIX_EPOCH), + }, + ); + Ok(()) + } + + async fn set_expiration( + &self, + client_id: i64, + expiration: OffsetDateTime, + ) -> Result<(), GatewayStorageError> { + if let Some(bw) = self.write().await.available_bandwidth.get_mut(&client_id) { + bw.expiration = Some(expiration); + } + Ok(()) + } + + async fn reset_bandwidth(&self, client_id: i64) -> Result<(), GatewayStorageError> { + if let Some(bw) = self.write().await.available_bandwidth.get_mut(&client_id) { + bw.available = 0; + bw.expiration = Some(OffsetDateTime::UNIX_EPOCH); + } + Ok(()) + } + + async fn get_available_bandwidth( + &self, + client_id: i64, + ) -> Result, GatewayStorageError> { + Ok(self + .read() + .await + .available_bandwidth + .get(&client_id) + .cloned()) + } + + async fn increase_bandwidth( + &self, + client_id: i64, + amount: i64, + ) -> Result { + self.write() + .await + .available_bandwidth + .get_mut(&client_id) + .map(|bw| { + bw.available += amount; + bw.available + }) + .ok_or(GatewayStorageError::InternalDatabaseError( + sqlx::Error::RowNotFound, + )) + } + + async fn revoke_ticket_bandwidth( + &self, + ticket_id: i64, + amount: i64, + ) -> Result<(), GatewayStorageError> { + let mut guard = self.write().await; + if let Some(client_id) = guard + .received_ticket + .get(&ticket_id) + .map(|ticket| ticket.client_id) + { + if let Some(bw) = guard.available_bandwidth.get_mut(&client_id) { + bw.available -= amount; + } + } + Ok(()) + } + + async fn decrease_bandwidth( + &self, + client_id: i64, + amount: i64, + ) -> Result { + self.write() + .await + .available_bandwidth + .get_mut(&client_id) + .map(|bw| { + bw.available -= amount; + bw.available + }) + .ok_or(GatewayStorageError::InternalDatabaseError( + sqlx::Error::RowNotFound, + )) + } + + async fn insert_epoch_signers( + &self, + _epoch_id: i64, + signer_ids: Vec, + ) -> Result<(), GatewayStorageError> { + self.write() + .await + .ecash_signers + .extend(signer_ids.iter().map(|signer_id| EcashSigner { + _epoch_id, + _signer_id: *signer_id, + })); + Ok(()) + } + + async fn insert_received_ticket( + &self, + client_id: i64, + _received_at: OffsetDateTime, + serial_number: Vec, + data: Vec, + ) -> Result { + let mut guard = self.write().await; + let ticket_id = guard.received_ticket.len() as i64; + guard.received_ticket.insert( + ticket_id, + ReceivedTicket { + client_id, + _received_at, + rejected: None, + }, + ); + guard.ticket_data.insert( + ticket_id, + TicketData { + serial_number, + data: Some(data), + }, + ); + Ok(ticket_id) + } + + async fn contains_ticket(&self, serial_number: &[u8]) -> Result { + Ok(self + .read() + .await + .ticket_data + .values() + .any(|ticket_data| ticket_data.serial_number == serial_number)) + } + + async fn insert_ticket_verification( + &self, + _ticket_id: i64, + _signer_id: i64, + _verified_at: OffsetDateTime, + _accepted: bool, + ) -> Result<(), GatewayStorageError> { + self.write().await.ticket_verification.insert( + _ticket_id, + TicketVerification { + _ticket_id, + _signer_id, + _verified_at, + _accepted, + }, + ); + Ok(()) + } + + async fn update_rejected_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> { + let mut guard = self.write().await; + if let Some(ticket) = guard.received_ticket.get_mut(&ticket_id) { + ticket.rejected = Some(true); + } + guard.ticket_data.remove(&ticket_id); + Ok(()) + } + + async fn update_verified_ticket(&self, ticket_id: i64) -> Result<(), GatewayStorageError> { + let mut guard = self.write().await; + guard.verified_tickets.push(ticket_id); + guard.ticket_verification.remove(&ticket_id); + Ok(()) + } + + async fn remove_verified_ticket_binary_data( + &self, + ticket_id: i64, + ) -> Result<(), GatewayStorageError> { + if let Some(ticket) = self.write().await.ticket_data.get_mut(&ticket_id) { + ticket.data = None; + } + Ok(()) + } + + async fn get_all_verified_tickets_with_sn( + &self, + ) -> Result, GatewayStorageError> { + todo!() + } + + async fn get_all_proposed_tickets_with_sn( + &self, + _proposal_id: u32, + ) -> Result, GatewayStorageError> { + todo!() + } + + async fn insert_redemption_proposal( + &self, + _tickets: &[VerifiedTicket], + _proposal_id: u32, + _created_at: OffsetDateTime, + ) -> Result<(), GatewayStorageError> { + todo!() + } + + async fn clear_post_proposal_data( + &self, + _proposal_id: u32, + _resolved_at: OffsetDateTime, + _rejected: bool, + ) -> Result<(), GatewayStorageError> { + todo!() + } + + async fn latest_proposal(&self) -> Result, GatewayStorageError> { + todo!() + } + + async fn get_all_unverified_tickets( + &self, + ) -> Result, GatewayStorageError> { + todo!() + } + + async fn get_all_unresolved_proposals(&self) -> Result, GatewayStorageError> { + todo!() + } + + async fn get_votes(&self, _ticket_id: i64) -> Result, GatewayStorageError> { + todo!() + } + + async fn get_signers(&self, _epoch_id: i64) -> Result, GatewayStorageError> { + todo!() + } + + async fn insert_wireguard_peer( + &self, + peer: &defguard_wireguard_rs::host::Peer, + client_type: ClientType, + ) -> Result { + let mut guard = self.write().await; + let client_id = + if let Some(peer) = guard.wireguard_peers.get(&peer.public_key.to_string()) { + peer.client_id + } else { + let client_id = guard.clients.len() as i64; + guard.clients.insert(client_id, client_type.to_string()); + client_id + }; + guard.wireguard_peers.insert( + peer.public_key.to_string(), + WireguardPeer::from_defguard_peer(peer.clone(), client_id)?, + ); + Ok(client_id) + } + + async fn get_wireguard_peer( + &self, + peer_public_key: &str, + ) -> Result, GatewayStorageError> { + Ok(self + .read() + .await + .wireguard_peers + .get(peer_public_key) + .cloned()) + } + + async fn get_all_wireguard_peers(&self) -> Result, GatewayStorageError> { + todo!() + } + + async fn remove_wireguard_peer( + &self, + peer_public_key: &str, + ) -> Result<(), GatewayStorageError> { + self.write().await.wireguard_peers.remove(peer_public_key); + Ok(()) + } + } +} diff --git a/common/gateway-storage/src/wireguard_peers.rs b/common/gateway-storage/src/wireguard_peers.rs index 00c41483be3..22bf178d991 100644 --- a/common/gateway-storage/src/wireguard_peers.rs +++ b/common/gateway-storage/src/wireguard_peers.rs @@ -27,15 +27,18 @@ impl WgPeerManager { pub(crate) async fn insert_peer(&self, peer: &WireguardPeer) -> Result<(), sqlx::Error> { sqlx::query!( r#" - INSERT OR IGNORE INTO wireguard_peer(public_key, preshared_key, protocol_version, endpoint, last_handshake, tx_bytes, rx_bytes, persistent_keepalive_interval, allowed_ips, client_id) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?); + INSERT OR IGNORE INTO wireguard_peer(public_key, allowed_ips, client_id) + VALUES (?, ?, ?); UPDATE wireguard_peer - SET preshared_key = ?, protocol_version = ?, endpoint = ?, last_handshake = ?, tx_bytes = ?, rx_bytes = ?, persistent_keepalive_interval = ?, allowed_ips = ?, client_id = ? + SET allowed_ips = ?, client_id = ? WHERE public_key = ? "#, - peer.public_key, peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.client_id, - peer.preshared_key, peer.protocol_version, peer.endpoint, peer.last_handshake, peer.tx_bytes, peer.rx_bytes, peer.persistent_keepalive_interval, peer.allowed_ips, peer.client_id, + peer.public_key, + peer.allowed_ips, + peer.client_id, + peer.allowed_ips, + peer.client_id, peer.public_key, ) .execute(&self.connection_pool) diff --git a/common/wireguard/Cargo.toml b/common/wireguard/Cargo.toml index c999861cd6d..cfa82a8bda2 100644 --- a/common/wireguard/Cargo.toml +++ b/common/wireguard/Cargo.toml @@ -11,11 +11,13 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = { workspace = true } base64 = { workspace = true } bincode = { workspace = true } chrono = { workspace = true } dashmap = { workspace = true } defguard_wireguard_rs = { workspace = true } +dyn-clone = { workspace = true } futures = { workspace = true } # The latest version on crates.io at the time of writing this (6.0.0) has a # version mismatch with x25519-dalek/curve25519-dalek that is resolved in the @@ -37,3 +39,11 @@ nym-network-defaults = { path = "../network-defaults" } nym-task = { path = "../task" } nym-wireguard-types = { path = "../wireguard-types" } nym-node-metrics = { path = "../../nym-node/nym-node-metrics" } + +[dev-dependencies] +nym-gateway-storage = { path = "../gateway-storage", features = ["mock"] } + +[features] +default = [] +mock = ["nym-gateway-storage/mock"] + diff --git a/common/wireguard/src/error.rs b/common/wireguard/src/error.rs index d2fd0e79562..1353fba0239 100644 --- a/common/wireguard/src/error.rs +++ b/common/wireguard/src/error.rs @@ -3,18 +3,18 @@ #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("traffic byte data needs to be increasing")] - InconsistentConsumedBytes, - #[error("{0}")] Defguard(#[from] defguard_wireguard_rs::error::WireguardInterfaceError), #[error("internal {0}")] Internal(String), - #[error("storage should have the requested bandwidht entry")] + #[error("storage should have the requested bandwidth entry")] MissingClientBandwidthEntry, + #[error("kernel should have the requested client entry: {0}")] + MissingClientKernelEntry(String), + #[error("{0}")] GatewayStorage(#[from] nym_gateway_storage::error::GatewayStorageError), diff --git a/common/wireguard/src/lib.rs b/common/wireguard/src/lib.rs index 6b83c648d23..2706e179823 100644 --- a/common/wireguard/src/lib.rs +++ b/common/wireguard/src/lib.rs @@ -6,16 +6,15 @@ // #![warn(clippy::expect_used)] // #![warn(clippy::unwrap_used)] -use defguard_wireguard_rs::WGApi; +use defguard_wireguard_rs::{host::Peer, key::Key, net::IpAddrMask, WGApi, WireguardInterfaceApi}; use nym_crypto::asymmetric::x25519::KeyPair; +#[cfg(target_os = "linux")] +use nym_gateway_storage::GatewayStorage; use nym_wireguard_types::Config; use peer_controller::PeerControlRequest; use std::sync::Arc; use tokio::sync::mpsc::{self, Receiver, Sender}; -#[cfg(target_os = "linux")] -use defguard_wireguard_rs::{host::Peer, key::Key, net::IpAddrMask}; - #[cfg(target_os = "linux")] use nym_network_defaults::constants::WG_TUN_BASE_NAME; @@ -28,6 +27,81 @@ pub struct WgApiWrapper { inner: WGApi, } +impl WireguardInterfaceApi for WgApiWrapper { + fn create_interface( + &self, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.create_interface() + } + + fn assign_address( + &self, + address: &IpAddrMask, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.assign_address(address) + } + + fn configure_peer_routing( + &self, + peers: &[Peer], + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.configure_peer_routing(peers) + } + + #[cfg(not(target_os = "windows"))] + fn configure_interface( + &self, + config: &defguard_wireguard_rs::InterfaceConfiguration, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.configure_interface(config) + } + + #[cfg(target_os = "windows")] + fn configure_interface( + &self, + config: &defguard_wireguard_rs::InterfaceConfiguration, + dns: &[std::net::IpAddr], + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.configure_interface(config, dns) + } + + fn remove_interface( + &self, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.remove_interface() + } + + fn configure_peer( + &self, + peer: &Peer, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.configure_peer(peer) + } + + fn remove_peer( + &self, + peer_pubkey: &Key, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.remove_peer(peer_pubkey) + } + + fn read_interface_data( + &self, + ) -> Result< + defguard_wireguard_rs::host::Host, + defguard_wireguard_rs::error::WireguardInterfaceError, + > { + self.inner.read_interface_data() + } + + fn configure_dns( + &self, + dns: &[std::net::IpAddr], + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.inner.configure_dns(dns) + } +} + impl WgApiWrapper { pub fn new(wg_api: WGApi) -> Self { WgApiWrapper { inner: wg_api } @@ -84,9 +158,9 @@ pub struct WireguardData { /// Start wireguard device #[cfg(target_os = "linux")] pub async fn start_wireguard( - storage: nym_gateway_storage::GatewayStorage, + storage: GatewayStorage, metrics: nym_node_metrics::NymNodeMetrics, - all_peers: Vec, + peers: Vec, task_client: nym_task::TaskClient, wireguard_data: WireguardData, ) -> Result, Box> { @@ -100,29 +174,13 @@ pub async fn start_wireguard( let ifname = String::from(WG_TUN_BASE_NAME); let wg_api = defguard_wireguard_rs::WGApi::new(ifname.clone(), false)?; - let mut peer_bandwidth_managers = HashMap::with_capacity(all_peers.len()); - let peers = all_peers - .into_iter() - .map(Peer::try_from) - .collect::, _>>()? - .into_iter() - .map(|mut peer| { - // since WGApi doesn't set those values on init, let's set them to 0 - peer.rx_bytes = 0; - peer.tx_bytes = 0; - peer - }) - .collect::>(); + let mut peer_bandwidth_managers = HashMap::with_capacity(peers.len()); + for peer in peers.iter() { - let bandwidth_manager = - PeerController::generate_bandwidth_manager(storage.clone(), &peer.public_key) - .await? - .map(|bw_m| Arc::new(RwLock::new(bw_m))); - // Update storage with *x_bytes set to 0, as in kernel peers we can't set those values - // so we need to restart counting. Hopefully the bandwidth was counted in available_bandwidth - storage - .insert_wireguard_peer(peer, bandwidth_manager.is_some()) - .await?; + let bandwidth_manager = Arc::new(RwLock::new( + PeerController::generate_bandwidth_manager(Box::new(storage.clone()), &peer.public_key) + .await?, + )); peer_bandwidth_managers.insert(peer.public_key.clone(), (bandwidth_manager, peer.clone())); } @@ -175,7 +233,7 @@ pub async fn start_wireguard( let host = wg_api.read_interface_data()?; let wg_api = std::sync::Arc::new(WgApiWrapper::new(wg_api)); let mut controller = PeerController::new( - storage, + Box::new(storage), metrics, wg_api.clone(), host, diff --git a/common/wireguard/src/peer_controller.rs b/common/wireguard/src/peer_controller.rs index 0718d233abd..c7b8d5e32ba 100644 --- a/common/wireguard/src/peer_controller.rs +++ b/common/wireguard/src/peer_controller.rs @@ -8,14 +8,11 @@ use defguard_wireguard_rs::{ }; use futures::channel::oneshot; use log::info; -use nym_authenticator_requests::latest::registration::{ - RemainingBandwidthData, BANDWIDTH_CAP_PER_DAY, -}; use nym_credential_verification::{ bandwidth_storage_manager::BandwidthStorageManager, BandwidthFlushingBehaviourConfig, ClientBandwidth, }; -use nym_gateway_storage::GatewayStorage; +use nym_gateway_storage::traits::BandwidthGatewayStorage; use nym_node_metrics::NymNodeMetrics; use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK; use std::time::{Duration, SystemTime}; @@ -23,14 +20,12 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::{mpsc, RwLock}; use tokio_stream::{wrappers::IntervalStream, StreamExt}; -use crate::WgApiWrapper; use crate::{error::Error, peer_handle::SharedBandwidthStorageManager}; -use crate::{peer_handle::PeerHandle, peer_storage_manager::PeerStorageManager}; +use crate::{peer_handle::PeerHandle, peer_storage_manager::CachedPeerManager}; pub enum PeerControlRequest { AddPeer { peer: Peer, - client_id: Option, response_tx: oneshot::Sender, }, RemovePeer { @@ -41,10 +36,6 @@ pub enum PeerControlRequest { key: Key, response_tx: oneshot::Sender, }, - QueryBandwidth { - key: Key, - response_tx: oneshot::Sender, - }, GetClientBandwidth { key: Key, response_tx: oneshot::Sender, @@ -64,17 +55,12 @@ pub struct QueryPeerControlResponse { pub peer: Option, } -pub struct QueryBandwidthControlResponse { - pub success: bool, - pub bandwidth_data: Option, -} - pub struct GetClientBandwidthControlResponse { pub client_bandwidth: Option, } pub struct PeerController { - storage: GatewayStorage, + storage: Box, // we have "all" metrics of a node, but they're behind a single Arc pointer, // so the overhead is minimal @@ -83,9 +69,9 @@ pub struct PeerController { // used to receive commands from individual handles too request_tx: mpsc::Sender, request_rx: mpsc::Receiver, - wg_api: Arc, + wg_api: Arc, host_information: Arc>, - bw_storage_managers: HashMap>, + bw_storage_managers: HashMap, timeout_check_interval: IntervalStream, task_client: nym_task::TaskClient, } @@ -93,11 +79,11 @@ pub struct PeerController { impl PeerController { #[allow(clippy::too_many_arguments)] pub fn new( - storage: GatewayStorage, + storage: Box, metrics: NymNodeMetrics, - wg_api: Arc, + wg_api: Arc, initial_host_information: Host, - bw_storage_managers: HashMap, Peer)>, + bw_storage_managers: HashMap, request_tx: mpsc::Sender, request_rx: mpsc::Receiver, task_client: nym_task::TaskClient, @@ -107,15 +93,11 @@ impl PeerController { ); let host_information = Arc::new(RwLock::new(initial_host_information)); for (public_key, (bandwidth_storage_manager, peer)) in bw_storage_managers.iter() { - let peer_storage_manager = PeerStorageManager::new( - storage.clone(), - peer.clone(), - bandwidth_storage_manager.is_some(), - ); + let cached_peer_manager = CachedPeerManager::new(peer); let mut handle = PeerHandle::new( public_key.clone(), host_information.clone(), - peer_storage_manager, + cached_peer_manager, bandwidth_storage_manager.clone(), request_tx.clone(), &task_client, @@ -144,32 +126,11 @@ impl PeerController { } } - // Function that should be used for peer insertion, to handle both storage and kernel interaction - pub async fn add_peer(&self, peer: &Peer, client_id: Option) -> Result<(), Error> { - if client_id.is_none() { - self.storage.insert_wireguard_peer(peer, false).await?; - } - let ret: Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> = - self.wg_api.inner.configure_peer(peer); - if client_id.is_none() && ret.is_err() { - // Try to revert the insertion in storage - if self - .storage - .remove_wireguard_peer(&peer.public_key.to_string()) - .await - .is_err() - { - log::error!("The storage has been corrupted. Wireguard peer {} will persist in storage indefinitely.", peer.public_key); - } - } - Ok(ret?) - } - // Function that should be used for peer removal, to handle both storage and kernel interaction pub async fn remove_peer(&mut self, key: &Key) -> Result<(), Error> { self.storage.remove_wireguard_peer(&key.to_string()).await?; self.bw_storage_managers.remove(key); - let ret = self.wg_api.inner.remove_peer(key); + let ret = self.wg_api.remove_peer(key); if ret.is_err() { log::error!("Wireguard peer could not be removed from wireguard kernel module. Process should be restarted so that the interface is reset."); } @@ -177,50 +138,43 @@ impl PeerController { } pub async fn generate_bandwidth_manager( - storage: GatewayStorage, + storage: Box, public_key: &Key, - ) -> Result, Error> { - if let Some(client_id) = storage + ) -> Result { + let client_id = storage .get_wireguard_peer(&public_key.to_string()) .await? .ok_or(Error::MissingClientBandwidthEntry)? - .client_id - { - let bandwidth = storage - .get_available_bandwidth(client_id) - .await? - .ok_or(Error::MissingClientBandwidthEntry)?; - Ok(Some(BandwidthStorageManager::new( - storage, - ClientBandwidth::new(bandwidth.into()), - client_id, - BandwidthFlushingBehaviourConfig::default(), - true, - ))) - } else { - Ok(None) - } + .client_id; + + let bandwidth = storage + .get_available_bandwidth(client_id) + .await? + .ok_or(Error::MissingClientBandwidthEntry)?; + + Ok(BandwidthStorageManager::new( + storage, + ClientBandwidth::new(bandwidth.into()), + client_id, + BandwidthFlushingBehaviourConfig::default(), + true, + )) } - async fn handle_add_request( - &mut self, - peer: &Peer, - client_id: Option, - ) -> Result<(), Error> { - self.add_peer(peer, client_id).await?; - let bandwidth_storage_manager = - Self::generate_bandwidth_manager(self.storage.clone(), &peer.public_key) - .await? - .map(|bw_m| Arc::new(RwLock::new(bw_m))); - let peer_storage_manager = PeerStorageManager::new( - self.storage.clone(), - peer.clone(), - bandwidth_storage_manager.is_some(), - ); + async fn handle_add_request(&mut self, peer: &Peer) -> Result<(), Error> { + self.wg_api.configure_peer(peer)?; + let bandwidth_storage_manager = Arc::new(RwLock::new( + Self::generate_bandwidth_manager( + dyn_clone::clone_box(&*self.storage), + &peer.public_key, + ) + .await?, + )); + let cached_peer_manager = CachedPeerManager::new(peer); let mut handle = PeerHandle::new( peer.public_key.clone(), self.host_information.clone(), - peer_storage_manager, + cached_peer_manager, bandwidth_storage_manager.clone(), self.request_tx.clone(), &self.task_client, @@ -228,7 +182,7 @@ impl PeerController { self.bw_storage_managers .insert(peer.public_key.clone(), bandwidth_storage_manager); // try to immediately update the host information, to eliminate races - if let Ok(host_information) = self.wg_api.inner.read_interface_data() { + if let Ok(host_information) = self.wg_api.read_interface_data() { *self.host_information.write().await = host_information; } let public_key = peer.public_key.clone(); @@ -248,35 +202,8 @@ impl PeerController { .transpose()?) } - async fn handle_query_bandwidth( - &self, - key: &Key, - ) -> Result, Error> { - let Some(bandwidth_storage_manager) = self.bw_storage_managers.get(key) else { - return Ok(None); - }; - let available_bandwidth = if let Some(bandwidth_storage_manager) = bandwidth_storage_manager - { - bandwidth_storage_manager - .read() - .await - .available_bandwidth() - .await - } else { - let Some(peer) = self.host_information.read().await.peers.get(key).cloned() else { - // host information not updated yet - return Ok(None); - }; - BANDWIDTH_CAP_PER_DAY.saturating_sub(peer.rx_bytes + peer.tx_bytes) as i64 - }; - - Ok(Some(RemainingBandwidthData { - available_bandwidth, - })) - } - async fn handle_get_client_bandwidth(&self, key: &Key) -> Option { - if let Some(Some(bandwidth_storage_manager)) = self.bw_storage_managers.get(key) { + if let Some(bandwidth_storage_manager) = self.bw_storage_managers.get(key) { Some(bandwidth_storage_manager.read().await.client_bandwidth()) } else { None @@ -362,7 +289,7 @@ impl PeerController { loop { tokio::select! { _ = self.timeout_check_interval.next() => { - let Ok(host) = self.wg_api.inner.read_interface_data() else { + let Ok(host) = self.wg_api.read_interface_data() else { log::error!("Can't read wireguard kernel data"); continue; }; @@ -376,8 +303,8 @@ impl PeerController { } msg = self.request_rx.recv() => { match msg { - Some(PeerControlRequest::AddPeer { peer, client_id, response_tx }) => { - let ret = self.handle_add_request(&peer, client_id).await; + Some(PeerControlRequest::AddPeer { peer, response_tx }) => { + let ret = self.handle_add_request(&peer).await; if ret.is_ok() { response_tx.send(AddPeerControlResponse { success: true }).ok(); } else { @@ -396,14 +323,6 @@ impl PeerController { response_tx.send(QueryPeerControlResponse { success: false, peer: None }).ok(); } } - Some(PeerControlRequest::QueryBandwidth { key, response_tx }) => { - let ret = self.handle_query_bandwidth(&key).await; - if let Ok(bandwidth_data) = ret { - response_tx.send(QueryBandwidthControlResponse { success: true, bandwidth_data }).ok(); - } else { - response_tx.send(QueryBandwidthControlResponse { success: false, bandwidth_data: None }).ok(); - } - } Some(PeerControlRequest::GetClientBandwidth { key, response_tx }) => { let client_bandwidth = self.handle_get_client_bandwidth(&key).await; response_tx.send(GetClientBandwidthControlResponse { client_bandwidth }).ok(); @@ -419,3 +338,135 @@ impl PeerController { } } } + +#[cfg(feature = "mock")] +#[derive(Default)] +struct MockWgApi { + peers: std::sync::RwLock>, +} + +#[cfg(feature = "mock")] +impl WireguardInterfaceApi for MockWgApi { + fn create_interface( + &self, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + todo!() + } + + fn assign_address( + &self, + _address: &defguard_wireguard_rs::net::IpAddrMask, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + todo!() + } + + fn configure_peer_routing( + &self, + _peers: &[Peer], + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + todo!() + } + + #[cfg(not(target_os = "windows"))] + fn configure_interface( + &self, + _config: &defguard_wireguard_rs::InterfaceConfiguration, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + todo!() + } + + #[cfg(target_os = "windows")] + fn configure_interface( + &self, + _config: &defguard_wireguard_rs::InterfaceConfiguration, + _dns: &[std::net::IpAddr], + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + todo!() + } + + fn remove_interface( + &self, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + todo!() + } + + fn configure_peer( + &self, + peer: &Peer, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.peers + .write() + .unwrap() + .insert(peer.public_key.clone(), peer.clone()); + Ok(()) + } + + fn remove_peer( + &self, + peer_pubkey: &Key, + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + self.peers.write().unwrap().remove(peer_pubkey); + Ok(()) + } + + fn read_interface_data( + &self, + ) -> Result { + let mut host = Host::default(); + host.peers = self.peers.read().unwrap().clone(); + Ok(host) + } + + fn configure_dns( + &self, + _dns: &[std::net::IpAddr], + ) -> Result<(), defguard_wireguard_rs::error::WireguardInterfaceError> { + todo!() + } +} + +#[cfg(feature = "mock")] +pub fn start_controller( + request_tx: mpsc::Sender, + request_rx: mpsc::Receiver, +) -> ( + Arc>, + nym_task::TaskManager, +) { + let storage = Arc::new(RwLock::new( + nym_gateway_storage::traits::mock::MockGatewayStorage::default(), + )); + let wg_api = Arc::new(MockWgApi::default()); + let task_manager = nym_task::TaskManager::default(); + let mut peer_controller = PeerController::new( + Box::new(storage.clone()), + Default::default(), + wg_api, + Default::default(), + Default::default(), + request_tx, + request_rx, + task_manager.subscribe(), + ); + tokio::spawn(async move { peer_controller.run().await }); + + (storage, task_manager) +} + +#[cfg(feature = "mock")] +pub async fn stop_controller(mut task_manager: nym_task::TaskManager) { + task_manager.signal_shutdown().unwrap(); + task_manager.wait_for_shutdown().await; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn start_and_stop() { + let (request_tx, request_rx) = mpsc::channel(1); + let (_, task_manager) = start_controller(request_tx.clone(), request_rx); + stop_controller(task_manager).await; + } +} diff --git a/common/wireguard/src/peer_handle.rs b/common/wireguard/src/peer_handle.rs index e4de154ee6a..f6c4673e21f 100644 --- a/common/wireguard/src/peer_handle.rs +++ b/common/wireguard/src/peer_handle.rs @@ -3,13 +3,10 @@ use crate::error::Error; use crate::peer_controller::PeerControlRequest; -use crate::peer_storage_manager::PeerStorageManager; -use defguard_wireguard_rs::host::Peer; +use crate::peer_storage_manager::{CachedPeerManager, PeerInformation}; use defguard_wireguard_rs::{host::Host, key::Key}; use futures::channel::oneshot; -use nym_authenticator_requests::latest::registration::BANDWIDTH_CAP_PER_DAY; use nym_credential_verification::bandwidth_storage_manager::BandwidthStorageManager; -use nym_gateway_storage::models::WireguardPeer; use nym_task::TaskClient; use nym_wireguard_types::DEFAULT_PEER_TIMEOUT_CHECK; use std::sync::Arc; @@ -21,8 +18,8 @@ pub(crate) type SharedBandwidthStorageManager = Arc>, - peer_storage_manager: PeerStorageManager, - bandwidth_storage_manager: Option, + cached_peer: CachedPeerManager, + bandwidth_storage_manager: SharedBandwidthStorageManager, request_tx: mpsc::Sender, timeout_check_interval: IntervalStream, task_client: TaskClient, @@ -32,8 +29,8 @@ impl PeerHandle { pub fn new( public_key: Key, host_information: Arc>, - peer_storage_manager: PeerStorageManager, - bandwidth_storage_manager: Option, + cached_peer: CachedPeerManager, + bandwidth_storage_manager: SharedBandwidthStorageManager, request_tx: mpsc::Sender, task_client: &TaskClient, ) -> Self { @@ -45,7 +42,7 @@ impl PeerHandle { PeerHandle { public_key, host_information, - peer_storage_manager, + cached_peer, bandwidth_storage_manager, request_tx, timeout_check_interval, @@ -69,14 +66,10 @@ impl PeerHandle { Ok(success) } - fn compute_spent_bandwidth(kernel_peer: &Peer, storage_peer: &WireguardPeer) -> Option { - let storage_peer_rx_bytes = u64::try_from(storage_peer.rx_bytes) - .inspect_err(|e| tracing::error!("Storage rx bytes could not be converted: {e}")) - .ok()?; - let storage_peer_tx_bytes = u64::try_from(storage_peer.tx_bytes) - .inspect_err(|e| tracing::error!("Storage tx bytes could not be converted: {e}")) - .ok()?; - + fn compute_spent_bandwidth( + kernel_peer: PeerInformation, + cached_peer: PeerInformation, + ) -> Option { let kernel_total = kernel_peer .rx_bytes .checked_add(kernel_peer.tx_bytes) @@ -88,21 +81,26 @@ impl PeerHandle { ); None })?; - let storage_total = storage_peer_rx_bytes - .checked_add(storage_peer_tx_bytes) + let cached_total = cached_peer + .rx_bytes + .checked_add(cached_peer.tx_bytes) .or_else(|| { - tracing::error!("Overflow on storage adding bytes: {storage_peer_rx_bytes} + {storage_peer_tx_bytes}"); + tracing::error!( + "Overflow on cached adding bytes: {} + {}", + cached_peer.rx_bytes, + cached_peer.tx_bytes + ); None })?; - kernel_total.checked_sub(storage_total).or_else(|| { - tracing::error!("Overflow on spent bandwidth subtraction: kernel - storage = {kernel_total} - {storage_total}"); + kernel_total.checked_sub(cached_total).or_else(|| { + tracing::error!("Overflow on spent bandwidth subtraction: kernel - cached = {kernel_total} - {cached_total}"); None }) } - async fn active_peer(&mut self, kernel_peer: &Peer) -> Result { - let Some(storage_peer) = self.peer_storage_manager.get_peer() else { + async fn active_peer(&mut self, kernel_peer: PeerInformation) -> Result { + let Some(cached_peer) = self.cached_peer.get_peer() else { log::debug!( "Peer {:?} not in storage anymore, shutting down handle", self.public_key @@ -110,76 +108,51 @@ impl PeerHandle { return Ok(false); }; - if let Some(bandwidth_manager) = &self.bandwidth_storage_manager { - let spent_bandwidth = Self::compute_spent_bandwidth(kernel_peer, &storage_peer) - .unwrap_or_else(|| { - // if gateway restarted, the kernel values restart from 0 - // and we should restart from 0 in storage as well - if let Some(peer_information) = - self.peer_storage_manager.peer_information.as_mut() - { - peer_information.force_sync = true; - peer_information.peer.rx_bytes = kernel_peer.rx_bytes; - peer_information.peer.tx_bytes = kernel_peer.tx_bytes; - } - 0 - }) - .try_into() - .map_err(|_| Error::InconsistentConsumedBytes)?; - if spent_bandwidth > 0 { - self.peer_storage_manager.update_trx(kernel_peer); - if bandwidth_manager - .write() - .await - .try_use_bandwidth(spent_bandwidth) - .await - .is_err() - { - tracing::debug!( - "Peer {} is out of bandwidth, removing it", - kernel_peer.public_key.to_string() - ); - let success = self.remove_peer().await?; - self.peer_storage_manager.remove_peer(); - return Ok(!success); - } - } - } else { - let spent_bandwidth = kernel_peer.rx_bytes + kernel_peer.tx_bytes; - if spent_bandwidth >= BANDWIDTH_CAP_PER_DAY { - log::debug!( - "Peer {} doesn't have bandwidth anymore, removing it", - self.public_key - ); - let success = self.remove_peer().await?; - return Ok(!success); - } + let spent_bandwidth = Self::compute_spent_bandwidth(kernel_peer, cached_peer) + .unwrap_or_default() + .try_into() + .inspect_err(|err| tracing::error!("Could not convert from u64 to i64: {err:?}")) + .unwrap_or_default(); + + self.cached_peer.update(kernel_peer); + + if spent_bandwidth > 0 + && self + .bandwidth_storage_manager + .write() + .await + .try_use_bandwidth(spent_bandwidth) + .await + .is_err() + { + tracing::debug!( + "Peer {} is out of bandwidth, removing it", + self.public_key.to_string() + ); + let success = self.remove_peer().await?; + self.cached_peer.remove_peer(); + return Ok(!success); } Ok(true) } async fn continue_checking(&mut self) -> Result { - let Some(kernel_peer) = self + let kernel_peer = self .host_information .read() .await .peers .get(&self.public_key) - .cloned() - else { - // the host information hasn't beed updated yet - return Ok(true); - }; - if !self.active_peer(&kernel_peer).await? { + .ok_or(Error::MissingClientKernelEntry(self.public_key.to_string()))? + .into(); + if !self.active_peer(kernel_peer).await? { log::debug!( "Peer {:?} is not active anymore, shutting down handle", self.public_key ); Ok(false) } else { - // Update storage values - self.peer_storage_manager.sync_storage_peer().await?; Ok(true) } } @@ -208,11 +181,10 @@ impl PeerHandle { _ = self.task_client.recv() => { log::trace!("PeerHandle: Received shutdown"); - if let Some(bandwidth_manager) = &self.bandwidth_storage_manager { - if let Err(e) = bandwidth_manager.write().await.sync_storage_bandwidth().await { - log::error!("Storage sync failed - {e}, unaccounted bandwidth might have been consumed"); - } + if let Err(e) = self.bandwidth_storage_manager.write().await.sync_storage_bandwidth().await { + log::error!("Storage sync failed - {e}, unaccounted bandwidth might have been consumed"); } + log::trace!("PeerHandle: Finished shutdown"); } } diff --git a/common/wireguard/src/peer_storage_manager.rs b/common/wireguard/src/peer_storage_manager.rs index 97a74635330..1675cf6b2f8 100644 --- a/common/wireguard/src/peer_storage_manager.rs +++ b/common/wireguard/src/peer_storage_manager.rs @@ -1,12 +1,8 @@ // Copyright 2024 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::error::Error; use defguard_wireguard_rs::host::Peer; -use nym_gateway_storage::models::WireguardPeer; -use nym_gateway_storage::GatewayStorage; use std::time::Duration; -use time::OffsetDateTime; const DEFAULT_PEER_MAX_FLUSHING_RATE: Duration = Duration::from_secs(60 * 60 * 24); // 24h const DEFAULT_PEER_MAX_DELTA_FLUSHING_AMOUNT: u64 = 512 * 1024 * 1024; // 512MB @@ -29,116 +25,50 @@ impl Default for PeerFlushingBehaviourConfig { } } -pub struct PeerStorageManager { - pub(crate) storage: GatewayStorage, +pub struct CachedPeerManager { pub(crate) peer_information: Option, - pub(crate) cfg: PeerFlushingBehaviourConfig, - pub(crate) with_client_id: bool, } -impl PeerStorageManager { - pub(crate) fn new(storage: GatewayStorage, peer: Peer, with_client_id: bool) -> Self { - let peer_information = Some(PeerInformation::new(peer)); +impl CachedPeerManager { + pub(crate) fn new(peer: &Peer) -> Self { Self { - storage, - peer_information, - cfg: PeerFlushingBehaviourConfig::default(), - with_client_id, + peer_information: Some(peer.into()), } } - pub(crate) fn get_peer(&self) -> Option { + pub(crate) fn get_peer(&self) -> Option { self.peer_information - .as_ref() - .map(|p| p.peer.clone().into()) } pub(crate) fn remove_peer(&mut self) { self.peer_information = None; } - pub(crate) fn update_trx(&mut self, kernel_peer: &Peer) { + pub(crate) fn update(&mut self, kernel_peer: PeerInformation) { if let Some(peer_information) = self.peer_information.as_mut() { - peer_information.update_trx_bytes(kernel_peer.tx_bytes, kernel_peer.rx_bytes); + peer_information.update_trx_bytes(kernel_peer); } } - - pub(crate) async fn sync_storage_peer(&mut self) -> Result<(), Error> { - let Some(peer_information) = self.peer_information.as_mut() else { - return Ok(()); - }; - if !peer_information.should_sync(self.cfg) { - return Ok(()); - } - if self - .storage - .get_wireguard_peer(&peer_information.peer().public_key.to_string()) - .await? - .is_none() - { - self.peer_information = None; - return Ok(()); - } - self.storage - .insert_wireguard_peer(peer_information.peer(), self.with_client_id) - .await?; - - peer_information.resync_peer_with_storage(); - - Ok(()) - } } -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub(crate) struct PeerInformation { - pub(crate) peer: Peer, - pub(crate) last_synced: OffsetDateTime, - - pub(crate) bytes_delta_since_sync: u64, - pub(crate) force_sync: bool, + pub(crate) tx_bytes: u64, + pub(crate) rx_bytes: u64, } -impl PeerInformation { - pub fn new(peer: Peer) -> PeerInformation { - PeerInformation { - peer, - last_synced: OffsetDateTime::now_utc(), - bytes_delta_since_sync: 0, - force_sync: false, - } - } - - pub(crate) fn should_sync(&self, cfg: PeerFlushingBehaviourConfig) -> bool { - if self.force_sync { - return true; - } - if self.bytes_delta_since_sync >= cfg.peer_max_delta_flushing_amount { - return true; - } - - if self.last_synced + cfg.peer_max_flushing_rate < OffsetDateTime::now_utc() - && self.bytes_delta_since_sync != 0 - { - return true; +impl From<&Peer> for PeerInformation { + fn from(value: &Peer) -> Self { + Self { + tx_bytes: value.tx_bytes, + rx_bytes: value.rx_bytes, } - - false - } - - pub(crate) fn peer(&self) -> &Peer { - &self.peer - } - - pub(crate) fn update_trx_bytes(&mut self, tx_bytes: u64, rx_bytes: u64) { - self.bytes_delta_since_sync += tx_bytes.saturating_sub(self.peer.tx_bytes) - + rx_bytes.saturating_sub(self.peer.rx_bytes); - self.peer.tx_bytes = tx_bytes; - self.peer.rx_bytes = rx_bytes; } +} - pub(crate) fn resync_peer_with_storage(&mut self) { - self.bytes_delta_since_sync = 0; - self.last_synced = OffsetDateTime::now_utc(); - self.force_sync = false; +impl PeerInformation { + pub(crate) fn update_trx_bytes(&mut self, peer: PeerInformation) { + self.tx_bytes = peer.tx_bytes; + self.rx_bytes = peer.rx_bytes; } } diff --git a/gateway/src/node/client_handling/websocket/connection_handler/authenticated.rs b/gateway/src/node/client_handling/websocket/connection_handler/authenticated.rs index e3b560ec94e..a152092ef74 100644 --- a/gateway/src/node/client_handling/websocket/connection_handler/authenticated.rs +++ b/gateway/src/node/client_handling/websocket/connection_handler/authenticated.rs @@ -24,6 +24,8 @@ use nym_gateway_requests::{ SimpleGatewayRequestsError, }; use nym_gateway_storage::error::GatewayStorageError; +use nym_gateway_storage::traits::BandwidthGatewayStorage; +use nym_gateway_storage::traits::SharedKeyGatewayStorage; use nym_node_metrics::events::MetricsEvent; use nym_sphinx::forwarding::packet::MixPacket; use nym_statistics_common::{gateways::GatewaySessionEvent, types::SessionType}; @@ -190,7 +192,7 @@ impl AuthenticatedHandler { let handler = AuthenticatedHandler { bandwidth_storage_manager: BandwidthStorageManager::new( - fresh.shared_state.storage.clone(), + Box::new(fresh.shared_state.storage.clone()), ClientBandwidth::new(bandwidth.into()), client.id, fresh.shared_state.cfg.bandwidth, diff --git a/gateway/src/node/client_handling/websocket/connection_handler/fresh.rs b/gateway/src/node/client_handling/websocket/connection_handler/fresh.rs index 8841f82d9f5..8392cafba9a 100644 --- a/gateway/src/node/client_handling/websocket/connection_handler/fresh.rs +++ b/gateway/src/node/client_handling/websocket/connection_handler/fresh.rs @@ -27,6 +27,9 @@ use nym_gateway_requests::{ INITIAL_PROTOCOL_VERSION, }; use nym_gateway_storage::error::GatewayStorageError; +use nym_gateway_storage::traits::BandwidthGatewayStorage; +use nym_gateway_storage::traits::InboxGatewayStorage; +use nym_gateway_storage::traits::SharedKeyGatewayStorage; use nym_node_metrics::events::MetricsEvent; use nym_sphinx::DestinationAddressBytes; use nym_task::TaskClient; diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index c13df9f2413..08c5bc886b9 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -13,7 +13,6 @@ use nym_credential_verification::ecash::{ credential_sender::CredentialHandlerConfig, EcashManager, }; use nym_crypto::asymmetric::ed25519; -use nym_gateway_storage::models::WireguardPeer; use nym_ip_packet_router::IpPacketRouter; use nym_mixnet_client::forwarder::MixForwardingSender; use nym_network_defaults::NymNetworkDetails; @@ -38,7 +37,11 @@ mod stale_data_cleaner; use crate::node::stale_data_cleaner::StaleMessagesCleaner; pub use client_handling::active_clients::ActiveClientsStore; pub use nym_gateway_stats_storage::PersistentStatsStorage; -pub use nym_gateway_storage::{error::GatewayStorageError, GatewayStorage}; +pub use nym_gateway_storage::{ + error::GatewayStorageError, + traits::{BandwidthGatewayStorage, InboxGatewayStorage}, + GatewayStorage, +}; use nym_node_metrics::NymNodeMetrics; pub use nym_sdk::{NymApiTopologyProvider, NymApiTopologyProviderConfig, UserAgent}; @@ -93,7 +96,7 @@ pub struct GatewayTasksBuilder { // populated and cached as necessary ecash_manager: Option>, - wireguard_peers: Option>, + wireguard_peers: Option>, wireguard_networks: Option>, } @@ -357,12 +360,12 @@ impl GatewayTasksBuilder { async fn build_wireguard_peers_and_networks( &self, - ) -> Result<(Vec, Vec), GatewayError> { + ) -> Result<(Vec, Vec), GatewayError> { let mut used_private_network_ips = vec![]; let mut all_peers = vec![]; for wireguard_peer in self.storage.get_all_wireguard_peers().await?.into_iter() { let mut peer = defguard_wireguard_rs::host::Peer::try_from(wireguard_peer.clone())?; - let Some(peer) = peer.allowed_ips.pop() else { + let Some(allowed_ip) = peer.allowed_ips.pop() else { let peer_identity = &peer.public_key; warn!("Peer {peer_identity} has empty allowed ips. It will be removed",); self.storage @@ -370,8 +373,8 @@ impl GatewayTasksBuilder { .await?; continue; }; - used_private_network_ips.push(peer.ip); - all_peers.push(wireguard_peer); + used_private_network_ips.push(allowed_ip.ip); + all_peers.push(peer); } Ok((all_peers, used_private_network_ips)) @@ -379,7 +382,9 @@ impl GatewayTasksBuilder { // only used under linux #[allow(dead_code)] - async fn get_wireguard_peers(&mut self) -> Result, GatewayError> { + async fn get_wireguard_peers( + &mut self, + ) -> Result, GatewayError> { if let Some(cached) = self.wireguard_peers.take() { return Ok(cached); } @@ -432,8 +437,8 @@ impl GatewayTasksBuilder { opts.config.clone(), wireguard_data.inner.clone(), used_private_network_ips, + ecash_manager, ) - .with_ecash_verifier(ecash_manager) .with_custom_gateway_transceiver(transceiver) .with_shutdown(self.shutdown.fork("authenticator_sp")) .with_wait_for_gateway(true) diff --git a/nym-node/src/node/mixnet/shared/final_hop.rs b/nym-node/src/node/mixnet/shared/final_hop.rs index d38fc99d9fc..cb6bc0ea64c 100644 --- a/nym-node/src/node/mixnet/shared/final_hop.rs +++ b/nym-node/src/node/mixnet/shared/final_hop.rs @@ -1,7 +1,9 @@ // Copyright 2024 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only -use nym_gateway::node::{ActiveClientsStore, GatewayStorage, GatewayStorageError}; +use nym_gateway::node::{ + ActiveClientsStore, GatewayStorage, GatewayStorageError, InboxGatewayStorage, +}; use nym_sphinx_types::DestinationAddressBytes; use tracing::debug; diff --git a/service-providers/authenticator/Cargo.toml b/service-providers/authenticator/Cargo.toml index 0c4347bb8db..85e4dc70214 100644 --- a/service-providers/authenticator/Cargo.toml +++ b/service-providers/authenticator/Cargo.toml @@ -54,3 +54,6 @@ nym-wireguard-types = { path = "../../common/wireguard-types" } [dev-dependencies] mock_instant = "0.5.3" +time = { workspace = true } + +nym-wireguard = { path = "../../common/wireguard", features = ["mock"] } diff --git a/service-providers/authenticator/src/authenticator.rs b/service-providers/authenticator/src/authenticator.rs index 268d96c1efa..2790ddddcbc 100644 --- a/service-providers/authenticator/src/authenticator.rs +++ b/service-providers/authenticator/src/authenticator.rs @@ -31,7 +31,7 @@ pub struct Authenticator { custom_topology_provider: Option>, custom_gateway_transceiver: Option>, wireguard_gateway_data: WireguardGatewayData, - ecash_verifier: Option>, + ecash_verifier: Arc, used_private_network_ips: Vec, shutdown: Option, on_start: Option>, @@ -42,13 +42,14 @@ impl Authenticator { config: Config, wireguard_gateway_data: WireguardGatewayData, used_private_network_ips: Vec, + ecash_verifier: Arc, ) -> Self { Self { config, wait_for_gateway: false, custom_topology_provider: None, custom_gateway_transceiver: None, - ecash_verifier: None, + ecash_verifier, wireguard_gateway_data, used_private_network_ips, shutdown: None, @@ -56,13 +57,6 @@ impl Authenticator { } } - #[must_use] - #[allow(unused)] - pub fn with_ecash_verifier(mut self, ecash_verifier: Arc) -> Self { - self.ecash_verifier = Some(ecash_verifier); - self - } - #[must_use] #[allow(unused)] pub fn with_shutdown(mut self, shutdown: TaskClient) -> Self { diff --git a/service-providers/authenticator/src/cli/add_gateway.rs b/service-providers/authenticator/src/cli/add_gateway.rs deleted file mode 100644 index 397a8be01e3..00000000000 --- a/service-providers/authenticator/src/cli/add_gateway.rs +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::CliAuthenticatorClient; -use nym_authenticator::error::AuthenticatorError; -use nym_bin_common::output_format::OutputFormat; -use nym_client_core::cli_helpers::client_add_gateway::{add_gateway, CommonClientAddGatewayArgs}; - -#[derive(clap::Args)] -pub(crate) struct Args { - #[command(flatten)] - common_args: CommonClientAddGatewayArgs, - - #[arg(short, long, default_value_t = OutputFormat::default())] - output: OutputFormat, -} - -impl AsRef for Args { - fn as_ref(&self) -> &CommonClientAddGatewayArgs { - &self.common_args - } -} - -pub(crate) async fn execute(args: Args) -> Result<(), AuthenticatorError> { - let output = args.output; - let res = add_gateway::(args, None).await?; - - println!("{}", output.format(&res)); - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/build_info.rs b/service-providers/authenticator/src/cli/build_info.rs deleted file mode 100644 index 73517531345..00000000000 --- a/service-providers/authenticator/src/cli/build_info.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use clap::Args; -use nym_bin_common::bin_info_owned; -use nym_bin_common::output_format::OutputFormat; - -#[derive(Args)] -pub(crate) struct BuildInfo { - #[arg(short, long, default_value_t = OutputFormat::default())] - output: OutputFormat, -} - -pub(crate) fn execute(args: BuildInfo) { - println!("{}", args.output.format(&bin_info_owned!())) -} diff --git a/service-providers/authenticator/src/cli/ecash/import_coin_index_signatures.rs b/service-providers/authenticator/src/cli/ecash/import_coin_index_signatures.rs deleted file mode 100644 index c3b1aa922e0..00000000000 --- a/service-providers/authenticator/src/cli/ecash/import_coin_index_signatures.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::CliAuthenticatorClient; -use nym_authenticator::error::AuthenticatorError; -use nym_client_core::cli_helpers::client_import_coin_index_signatures::{ - import_coin_index_signatures, CommonClientImportCoinIndexSignaturesArgs, -}; - -pub(crate) async fn execute( - args: CommonClientImportCoinIndexSignaturesArgs, -) -> Result<(), AuthenticatorError> { - import_coin_index_signatures::(args).await?; - println!("successfully imported coin index signatures!"); - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/ecash/import_credential.rs b/service-providers/authenticator/src/cli/ecash/import_credential.rs deleted file mode 100644 index e525eac5184..00000000000 --- a/service-providers/authenticator/src/cli/ecash/import_credential.rs +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::CliAuthenticatorClient; -use nym_authenticator::error::AuthenticatorError; -use nym_client_core::cli_helpers::client_import_credential::{ - import_credential, CommonClientImportTicketBookArgs, -}; - -pub async fn execute(args: CommonClientImportTicketBookArgs) -> Result<(), AuthenticatorError> { - import_credential::(args).await?; - println!("successfully imported credential!"); - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/ecash/import_expiration_date_signatures.rs b/service-providers/authenticator/src/cli/ecash/import_expiration_date_signatures.rs deleted file mode 100644 index 7db61d52388..00000000000 --- a/service-providers/authenticator/src/cli/ecash/import_expiration_date_signatures.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::CliAuthenticatorClient; -use nym_authenticator::error::AuthenticatorError; -use nym_client_core::cli_helpers::client_import_expiration_date_signatures::{ - import_expiration_date_signatures, CommonClientImportExpirationDateSignaturesArgs, -}; - -pub(crate) async fn execute( - args: CommonClientImportExpirationDateSignaturesArgs, -) -> Result<(), AuthenticatorError> { - import_expiration_date_signatures::(args).await?; - println!("successfully imported expiration date signatures!"); - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/ecash/import_master_verification_key.rs b/service-providers/authenticator/src/cli/ecash/import_master_verification_key.rs deleted file mode 100644 index d53ddacf4c2..00000000000 --- a/service-providers/authenticator/src/cli/ecash/import_master_verification_key.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::CliAuthenticatorClient; -use nym_authenticator::error::AuthenticatorError; -use nym_client_core::cli_helpers::client_import_master_verification_key::{ - import_master_verification_key, CommonClientImportMasterVerificationKeyArgs, -}; - -pub(crate) async fn execute( - args: CommonClientImportMasterVerificationKeyArgs, -) -> Result<(), AuthenticatorError> { - import_master_verification_key::(args).await?; - println!("successfully imported master verification key!"); - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/ecash/mod.rs b/service-providers/authenticator/src/cli/ecash/mod.rs deleted file mode 100644 index 9272ed013cd..00000000000 --- a/service-providers/authenticator/src/cli/ecash/mod.rs +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use clap::{Args, Subcommand}; -use nym_authenticator::error::AuthenticatorError; -use nym_client_core::cli_helpers::client_import_coin_index_signatures::CommonClientImportCoinIndexSignaturesArgs; -use nym_client_core::cli_helpers::client_import_credential::CommonClientImportTicketBookArgs; -use nym_client_core::cli_helpers::client_import_expiration_date_signatures::CommonClientImportExpirationDateSignaturesArgs; -use nym_client_core::cli_helpers::client_import_master_verification_key::CommonClientImportMasterVerificationKeyArgs; - -pub(crate) mod import_coin_index_signatures; -pub(crate) mod import_credential; -pub(crate) mod import_expiration_date_signatures; -pub(crate) mod import_master_verification_key; -pub(crate) mod show_ticketbooks; - -#[derive(Args)] -#[clap(args_conflicts_with_subcommands = true, subcommand_required = true)] -pub struct Ecash { - #[clap(subcommand)] - pub command: EcashCommands, -} - -impl Ecash { - pub async fn execute(self) -> Result<(), AuthenticatorError> { - match self.command { - EcashCommands::ShowTicketBooks(args) => show_ticketbooks::execute(args).await?, - EcashCommands::ImportTicketBook(args) => import_credential::execute(args).await?, - EcashCommands::ImportCoinIndexSignatures(args) => { - import_coin_index_signatures::execute(args).await? - } - EcashCommands::ImportExpirationDateSignatures(args) => { - import_expiration_date_signatures::execute(args).await? - } - EcashCommands::ImportMasterVerificationKey(args) => { - import_master_verification_key::execute(args).await? - } - } - Ok(()) - } -} - -#[derive(Subcommand)] -pub enum EcashCommands { - /// Display information associated with the imported ticketbooks, - ShowTicketBooks(show_ticketbooks::Args), - - /// Import a pre-generated ticketbook - ImportTicketBook(CommonClientImportTicketBookArgs), - - /// Import coin index signatures needed for ticketbooks - ImportCoinIndexSignatures(CommonClientImportCoinIndexSignaturesArgs), - - /// Import expiration date signatures needed for ticketbooks - ImportExpirationDateSignatures(CommonClientImportExpirationDateSignaturesArgs), - - /// Import master verification key needed for ticketbooks - ImportMasterVerificationKey(CommonClientImportMasterVerificationKeyArgs), -} diff --git a/service-providers/authenticator/src/cli/ecash/show_ticketbooks.rs b/service-providers/authenticator/src/cli/ecash/show_ticketbooks.rs deleted file mode 100644 index ca31d398020..00000000000 --- a/service-providers/authenticator/src/cli/ecash/show_ticketbooks.rs +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::CliAuthenticatorClient; -use nym_authenticator::error::AuthenticatorError; -use nym_bin_common::output_format::OutputFormat; -use nym_client_core::cli_helpers::client_show_ticketbooks::{ - show_ticketbooks, CommonShowTicketbooksArgs, -}; - -#[derive(clap::Args)] -pub(crate) struct Args { - #[command(flatten)] - common_args: CommonShowTicketbooksArgs, - - #[arg(short, long, default_value_t = OutputFormat::default())] - output: OutputFormat, -} - -impl AsRef for Args { - fn as_ref(&self) -> &CommonShowTicketbooksArgs { - &self.common_args - } -} - -pub(crate) async fn execute(args: Args) -> Result<(), AuthenticatorError> { - let output = args.output; - let res = show_ticketbooks::(args).await?; - - println!("{}", output.format(&res)); - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/init.rs b/service-providers/authenticator/src/cli/init.rs deleted file mode 100644 index 217c3160d0b..00000000000 --- a/service-providers/authenticator/src/cli/init.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::{override_config, CliAuthenticatorClient, OverrideConfig}; -use clap::Args; -use nym_authenticator::{ - config::{default_config_directory, default_config_filepath, default_data_directory, Config}, - error::AuthenticatorError, -}; -use nym_bin_common::output_format::OutputFormat; -use nym_client_core::cli_helpers::client_init::{ - initialise_client, CommonClientInitArgs, InitResultsWithConfig, InitialisableClient, -}; -use serde::Serialize; -use std::{fmt::Display, fs, path::PathBuf}; - -impl InitialisableClient for CliAuthenticatorClient { - type InitArgs = Init; - - fn initialise_storage_paths(id: &str) -> Result<(), Self::Error> { - fs::create_dir_all(default_data_directory(id))?; - fs::create_dir_all(default_config_directory(id))?; - Ok(()) - } - - fn default_config_path(id: &str) -> PathBuf { - default_config_filepath(id) - } - - fn construct_config(init_args: &Self::InitArgs) -> Self::Config { - override_config( - Config::new(&init_args.common_args.id), - OverrideConfig::from(init_args.clone()), - ) - } -} - -#[derive(Args, Clone, Debug)] -pub(crate) struct Init { - #[command(flatten)] - common_args: CommonClientInitArgs, - - #[clap(short, long, default_value_t = OutputFormat::default())] - output: OutputFormat, -} - -impl From for OverrideConfig { - fn from(init_config: Init) -> Self { - OverrideConfig { - nym_apis: init_config.common_args.nym_apis, - nyxd_urls: init_config.common_args.nyxd_urls, - enabled_credentials_mode: init_config.common_args.enabled_credentials_mode, - } - } -} - -impl AsRef for Init { - fn as_ref(&self) -> &CommonClientInitArgs { - &self.common_args - } -} - -#[derive(Debug, Serialize)] -pub struct InitResults { - #[serde(flatten)] - client_core: nym_client_core::init::types::InitResults, - client_address: String, -} - -impl InitResults { - fn new(res: InitResultsWithConfig) -> Self { - Self { - client_address: res.init_results.address.to_string(), - client_core: res.init_results, - } - } -} - -impl Display for InitResults { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - writeln!(f, "{}", self.client_core)?; - write!(f, "Address of this authenticator: {}", self.client_address) - } -} - -pub(crate) async fn execute(args: Init) -> Result<(), AuthenticatorError> { - eprintln!("Initialising client..."); - - let output = args.output; - let res = initialise_client::(args, None).await?; - - let init_results = InitResults::new(res); - println!("{}", output.format(&init_results)); - - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/list_gateways.rs b/service-providers/authenticator/src/cli/list_gateways.rs deleted file mode 100644 index 9297d5cd5b6..00000000000 --- a/service-providers/authenticator/src/cli/list_gateways.rs +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::CliAuthenticatorClient; -use nym_authenticator::error::AuthenticatorError; -use nym_bin_common::output_format::OutputFormat; -use nym_client_core::cli_helpers::client_list_gateways::{ - list_gateways, CommonClientListGatewaysArgs, -}; - -#[derive(clap::Args)] -pub(crate) struct Args { - #[command(flatten)] - common_args: CommonClientListGatewaysArgs, - - #[arg(short, long, default_value_t = OutputFormat::default())] - output: OutputFormat, -} - -impl AsRef for Args { - fn as_ref(&self) -> &CommonClientListGatewaysArgs { - &self.common_args - } -} - -pub(crate) async fn execute(args: Args) -> Result<(), AuthenticatorError> { - let output = args.output; - let res = list_gateways::(args).await?; - - println!("{}", output.format(&res)); - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/mod.rs b/service-providers/authenticator/src/cli/mod.rs deleted file mode 100644 index 08aca6f720e..00000000000 --- a/service-providers/authenticator/src/cli/mod.rs +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::ecash::Ecash; -use clap::{CommandFactory, Parser, Subcommand}; -use log::error; -use nym_authenticator::{ - config::{helpers::try_upgrade_config, BaseClientConfig, Config}, - error::AuthenticatorError, -}; -use nym_bin_common::bin_info; -use nym_bin_common::completions::{fig_generate, ArgShell}; -use nym_client_core::cli_helpers::CliClient; -use std::sync::OnceLock; - -mod add_gateway; -mod build_info; -pub mod ecash; -mod init; -mod list_gateways; -mod peer_handler; -mod request; -mod run; -mod sign; -mod switch_gateway; - -pub(crate) struct CliAuthenticatorClient; - -impl CliClient for CliAuthenticatorClient { - const NAME: &'static str = "authenticator"; - type Error = AuthenticatorError; - type Config = Config; - - async fn try_upgrade_outdated_config(id: &str) -> Result<(), Self::Error> { - try_upgrade_config(id).await - } - - async fn try_load_current_config(id: &str) -> Result { - try_load_current_config(id).await - } -} - -fn pretty_build_info_static() -> &'static str { - static PRETTY_BUILD_INFORMATION: OnceLock = OnceLock::new(); - PRETTY_BUILD_INFORMATION.get_or_init(|| bin_info!().pretty_print()) -} - -#[derive(Parser)] -#[command(author = "Nymtech", version, about, long_version = pretty_build_info_static())] -pub(crate) struct Cli { - /// Path pointing to an env file that configures the client. - #[arg(short, long)] - pub(crate) config_env_file: Option, - - /// Flag used for disabling the printed banner in tty. - #[arg(long)] - pub(crate) no_banner: bool, - - #[command(subcommand)] - command: Commands, -} - -#[allow(clippy::large_enum_variant)] -#[derive(Subcommand)] -pub(crate) enum Commands { - /// Initialize an authenticator. Do this first! - Init(init::Init), - - /// Run the authenticator with the provided configuration and optionally override - /// parameters. - Run(run::Run), - - /// Make a dummy request to a running authenticator - Request(request::Request), - - /// Ecash-related functionalities - Ecash(Ecash), - - /// List all registered with gateways - ListGateways(list_gateways::Args), - - /// Add new gateway to this client - AddGateway(add_gateway::Args), - - /// Change the currently active gateway. Note that you must have already registered with the new gateway! - SwitchGateway(switch_gateway::Args), - - /// Sign to prove ownership of this authenticator - Sign(sign::Sign), - - /// Show build information of this binary - BuildInfo(build_info::BuildInfo), - - /// Generate shell completions - Completions(ArgShell), - - /// Generate Fig specification - GenerateFigSpec, -} - -// Configuration that can be overridden. -pub(crate) struct OverrideConfig { - nym_apis: Option>, - nyxd_urls: Option>, - enabled_credentials_mode: Option, -} - -pub(crate) fn override_config(config: Config, args: OverrideConfig) -> Config { - config - .with_optional_base_custom_env( - BaseClientConfig::with_custom_nym_apis, - args.nym_apis, - nym_network_defaults::var_names::NYM_API, - nym_config::parse_urls, - ) - .with_optional_base_custom_env( - BaseClientConfig::with_custom_nyxd, - args.nyxd_urls, - nym_network_defaults::var_names::NYXD, - nym_config::parse_urls, - ) - .with_optional_base( - BaseClientConfig::with_disabled_credentials, - args.enabled_credentials_mode.map(|b| !b), - ) -} - -pub(crate) async fn execute(args: Cli) -> Result<(), AuthenticatorError> { - let bin_name = "nym-authenticator"; - - match args.command { - Commands::Init(m) => init::execute(m).await?, - Commands::Run(m) => run::execute(&m).await?, - Commands::Request(r) => request::execute(&r).await?, - Commands::Ecash(ecash) => ecash.execute().await?, - Commands::ListGateways(args) => list_gateways::execute(args).await?, - Commands::AddGateway(args) => add_gateway::execute(args).await?, - Commands::SwitchGateway(args) => switch_gateway::execute(args).await?, - Commands::Sign(m) => sign::execute(&m).await?, - Commands::BuildInfo(m) => build_info::execute(m), - Commands::Completions(s) => s.generate(&mut Cli::command(), bin_name), - Commands::GenerateFigSpec => fig_generate(&mut Cli::command(), bin_name), - } - Ok(()) -} - -async fn try_load_current_config(id: &str) -> Result { - // try to load the config as is - if let Ok(cfg) = Config::read_from_default_path(id) { - return if !cfg.validate() { - Err(AuthenticatorError::ConfigValidationFailure) - } else { - Ok(cfg) - }; - } - - // we couldn't load it - try upgrading it from older revisions - try_upgrade_config(id).await?; - - let config = match Config::read_from_default_path(id) { - Ok(cfg) => cfg, - Err(err) => { - error!("Failed to load config for {id}. Are you sure you have run `init` before? (Error was: {err})"); - return Err(AuthenticatorError::FailedToLoadConfig(id.to_string())); - } - }; - - if !config.validate() { - return Err(AuthenticatorError::ConfigValidationFailure); - } - - Ok(config) -} diff --git a/service-providers/authenticator/src/cli/peer_handler.rs b/service-providers/authenticator/src/cli/peer_handler.rs deleted file mode 100644 index 402c09815a6..00000000000 --- a/service-providers/authenticator/src/cli/peer_handler.rs +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use nym_sdk::TaskClient; -use nym_wireguard::peer_controller::{ - AddPeerControlResponse, GetClientBandwidthControlResponse, PeerControlRequest, - QueryBandwidthControlResponse, QueryPeerControlResponse, RemovePeerControlResponse, -}; -use tokio::sync::mpsc; - -pub struct DummyHandler { - peer_rx: mpsc::Receiver, - task_client: TaskClient, -} - -impl DummyHandler { - pub fn new(peer_rx: mpsc::Receiver, task_client: TaskClient) -> Self { - DummyHandler { - peer_rx, - task_client, - } - } - - pub async fn run(mut self) { - while !self.task_client.is_shutdown() { - tokio::select! { - msg = self.peer_rx.recv() => { - if let Some(msg) = msg { - match msg { - PeerControlRequest::AddPeer { peer, client_id, response_tx } => { - log::info!("[DUMMY] Adding peer {peer:?} with client id {client_id:?}"); - response_tx.send(AddPeerControlResponse { success: true }).ok(); - } - PeerControlRequest::RemovePeer { key, response_tx } => { - log::info!("[DUMMY] Removing peer {key:?}"); - response_tx.send(RemovePeerControlResponse { success: true }).ok(); - } - PeerControlRequest::QueryPeer{key, response_tx} => { - log::info!("[DUMMY] Querying peer {key:?}"); - response_tx.send(QueryPeerControlResponse { success: true, peer: None }).ok(); - } - PeerControlRequest::QueryBandwidth{key, response_tx} => { - log::info!("[DUMMY] Querying bandwidth for peer {key:?}"); - response_tx.send(QueryBandwidthControlResponse { success: true, bandwidth_data: None }).ok(); - } - PeerControlRequest::GetClientBandwidth{key, response_tx} => { - log::info!("[DUMMY] Getting client bandwidth for peer {key:?}"); - response_tx.send(GetClientBandwidthControlResponse {client_bandwidth: None }).ok(); - } - } - } else { - break; - } - } - - _ = self.task_client.recv() => { - log::trace!("DummyHandler: Received shutdown"); - } - } - } - log::debug!("DummyHandler: Exiting"); - } -} diff --git a/service-providers/authenticator/src/cli/request.rs b/service-providers/authenticator/src/cli/request.rs deleted file mode 100644 index 9ceb3f61438..00000000000 --- a/service-providers/authenticator/src/cli/request.rs +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::try_load_current_config; -use crate::cli::AuthenticatorError; -use crate::cli::{override_config, OverrideConfig}; -use clap::{Args, Subcommand}; -use nym_authenticator_requests::latest::{ - registration::{ClientMac, FinalMessage, GatewayClient, InitMessage, IpPair}, - request::{AuthenticatorRequest, AuthenticatorRequestData}, -}; -use nym_client_core::cli_helpers::client_run::CommonClientRunArgs; -use nym_sdk::mixnet::{MixnetMessageSender, Recipient, TransmissionLane}; -use nym_task::TaskHandle; -use nym_wireguard_types::PeerPublicKey; -use std::net::{Ipv4Addr, Ipv6Addr}; -use std::str::FromStr; -use std::time::Duration; -use tokio::time::sleep; - -#[allow(clippy::struct_excessive_bools)] -#[derive(Args, Clone)] -pub(crate) struct Request { - #[command(flatten)] - common_args: CommonClientRunArgs, - - #[command(subcommand)] - request: RequestType, - - authenticator_recipient: String, -} - -impl From for OverrideConfig { - fn from(request_config: Request) -> Self { - OverrideConfig { - nym_apis: None, - nyxd_urls: request_config.common_args.nyxd_urls, - enabled_credentials_mode: request_config.common_args.enabled_credentials_mode, - } - } -} - -#[derive(Clone, Subcommand)] -pub(crate) enum RequestType { - Initial(Initial), - Final(Final), - QueryBandwidth(QueryBandwidth), -} - -#[derive(Args, Clone, Debug)] -pub(crate) struct Initial { - pub_key: String, -} - -#[derive(Args, Clone, Debug)] -pub(crate) struct Final { - pub_key: String, - private_ipv4: String, - private_ipv6: String, - mac: String, -} - -#[derive(Args, Clone, Debug)] -pub(crate) struct QueryBandwidth { - pub_key: String, -} - -impl TryFrom for AuthenticatorRequestData { - type Error = AuthenticatorError; - - fn try_from(value: RequestType) -> Result { - let ret = match value { - RequestType::Initial(req) => AuthenticatorRequestData::Initial(InitMessage::new( - PeerPublicKey::from_str(&req.pub_key)?, - )), - RequestType::Final(req) => AuthenticatorRequestData::Final(Box::new(FinalMessage { - gateway_client: GatewayClient { - pub_key: PeerPublicKey::from_str(&req.pub_key)?, - private_ips: IpPair::new( - Ipv4Addr::from_str(&req.private_ipv4)?, - Ipv6Addr::from_str(&req.private_ipv6)?, - ), - mac: ClientMac::from_str(&req.mac)?, - }, - credential: None, - })), - RequestType::QueryBandwidth(req) => { - AuthenticatorRequestData::QueryBandwidth(PeerPublicKey::from_str(&req.pub_key)?) - } - }; - Ok(ret) - } -} - -pub(crate) async fn execute(args: &Request) -> Result<(), AuthenticatorError> { - let mut config = try_load_current_config(&args.common_args.id).await?; - config = override_config(config, OverrideConfig::from(args.clone())); - - let shutdown = TaskHandle::default(); - let mixnet_client = nym_authenticator::mixnet_client::create_mixnet_client( - &config.base, - shutdown.get_handle().named("nym_sdk::MixnetClient"), - None, - None, - false, - &config.storage_paths.common_paths, - ) - .await?; - - let request_data = AuthenticatorRequestData::try_from(args.request.clone())?; - let authenticator_recipient = Recipient::from_str(&args.authenticator_recipient)?; - let (request, _) = match request_data { - AuthenticatorRequestData::Initial(init_message) => { - AuthenticatorRequest::new_initial_request(init_message) - } - AuthenticatorRequestData::Final(final_message) => { - AuthenticatorRequest::new_final_request(*final_message) - } - AuthenticatorRequestData::QueryBandwidth(query_message) => { - AuthenticatorRequest::new_query_request(query_message) - } - AuthenticatorRequestData::TopUpBandwidth(top_up_message) => { - AuthenticatorRequest::new_topup_request(*top_up_message) - } - }; - mixnet_client - .split_sender() - .send(nym_sdk::mixnet::InputMessage::new_regular( - authenticator_recipient, - request.to_bytes().unwrap(), - TransmissionLane::General, - None, - )) - .await - .map_err(|source| AuthenticatorError::FailedToSendPacketToMixnet { - source: Box::new(source), - })?; - - log::info!("Sent request, sleeping 60 seconds or until killed"); - sleep(Duration::from_secs(60)).await; - - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/run.rs b/service-providers/authenticator/src/cli/run.rs deleted file mode 100644 index d6b273f5b1d..00000000000 --- a/service-providers/authenticator/src/cli/run.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::peer_handler::DummyHandler; -use crate::cli::try_load_current_config; -use crate::cli::{override_config, OverrideConfig}; -use clap::Args; -use nym_authenticator::error::AuthenticatorError; -use nym_client_core::cli_helpers::client_run::CommonClientRunArgs; -use nym_crypto::asymmetric::x25519::KeyPair; -use nym_task::TaskHandle; -use nym_wireguard::WireguardGatewayData; -use rand::rngs::OsRng; -use std::sync::Arc; - -#[allow(clippy::struct_excessive_bools)] -#[derive(Args, Clone)] -pub(crate) struct Run { - #[command(flatten)] - common_args: CommonClientRunArgs, -} - -impl From for OverrideConfig { - fn from(run_config: Run) -> Self { - OverrideConfig { - nym_apis: None, - nyxd_urls: run_config.common_args.nyxd_urls, - enabled_credentials_mode: run_config.common_args.enabled_credentials_mode, - } - } -} - -pub(crate) async fn execute(args: &Run) -> Result<(), AuthenticatorError> { - let mut config = try_load_current_config(&args.common_args.id).await?; - config = override_config(config, OverrideConfig::from(args.clone())); - log::debug!("Using config: {config:#?}"); - - log::info!("Starting authenticator service provider"); - let (wireguard_gateway_data, peer_rx) = WireguardGatewayData::new( - config.authenticator.clone().into(), - Arc::new(KeyPair::new(&mut OsRng)), - ); - let task_handler = TaskHandle::default(); - let handler = DummyHandler::new(peer_rx, task_handler.fork("peer_handler")); - tokio::spawn(async move { - handler.run().await; - }); - - let mut server = nym_authenticator::Authenticator::new(config, wireguard_gateway_data, vec![]); - if let Some(custom_mixnet) = &args.common_args.custom_mixnet { - server = server.with_stored_topology(custom_mixnet)? - } - - server.run_service_provider().await -} diff --git a/service-providers/authenticator/src/cli/sign.rs b/service-providers/authenticator/src/cli/sign.rs deleted file mode 100644 index 30bd8ed1d13..00000000000 --- a/service-providers/authenticator/src/cli/sign.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::try_load_current_config; -use clap::Args; -use nym_authenticator::error::AuthenticatorError; -use nym_bin_common::output_format::OutputFormat; -use nym_client_core::client::key_manager::persistence::OnDiskKeys; -use nym_client_core::error::ClientCoreError; -use nym_crypto::asymmetric::ed25519; -use nym_types::helpers::ConsoleSigningOutput; - -#[derive(Args, Clone)] -pub(crate) struct Sign { - /// The id of the mixnode you want to sign with - #[arg(long)] - id: String, - - /// Signs a transaction-specific payload, that is going to be sent to the smart contract, with your identity key - #[arg(long)] - contract_msg: String, - - #[arg(short, long, default_value_t = OutputFormat::default())] - output: OutputFormat, -} - -fn print_signed_contract_msg( - private_key: &ed25519::PrivateKey, - raw_msg: &str, - output: OutputFormat, -) { - let trimmed = raw_msg.trim(); - eprintln!(">>> attempting to sign {trimmed}"); - - let Ok(decoded) = bs58::decode(trimmed).into_vec() else { - println!("it seems you have incorrectly copied the message to sign. Make sure you didn't accidentally skip any characters"); - return; - }; - - eprintln!(">>> decoding the message..."); - - // we don't really care about what particular information is embedded inside of it, - // we just want to know if user correctly copied the string, i.e. whether it's a valid bs58 encoded json - if serde_json::from_slice::(&decoded).is_err() { - println!("it seems you have incorrectly copied the message to sign. Make sure you didn't accidentally skip any characters"); - return; - }; - - // if this is a valid json, it MUST be a valid string - let decoded_string = String::from_utf8(decoded.clone()).unwrap(); - let signature = private_key.sign(&decoded).to_base58_string(); - - let sign_output = ConsoleSigningOutput::new(decoded_string, signature); - println!("{}", output.format(&sign_output)); -} - -pub(crate) async fn execute(args: &Sign) -> Result<(), AuthenticatorError> { - let config = try_load_current_config(&args.id).await?; - - let key_store = OnDiskKeys::new(config.storage_paths.common_paths.keys); - let identity_keypair = key_store.load_identity_keypair().map_err(|source| { - AuthenticatorError::ClientCoreError(ClientCoreError::KeyStoreError { - source: Box::new(source), - }) - })?; - - print_signed_contract_msg( - identity_keypair.private_key(), - &args.contract_msg, - args.output, - ); - - Ok(()) -} diff --git a/service-providers/authenticator/src/cli/switch_gateway.rs b/service-providers/authenticator/src/cli/switch_gateway.rs deleted file mode 100644 index 24ce8d48e23..00000000000 --- a/service-providers/authenticator/src/cli/switch_gateway.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -use crate::cli::CliAuthenticatorClient; -use nym_authenticator::error::AuthenticatorError; -use nym_client_core::cli_helpers::client_switch_gateway::{ - switch_gateway, CommonClientSwitchGatewaysArgs, -}; - -#[derive(clap::Args, Clone, Debug)] -pub struct Args { - #[command(flatten)] - common_args: CommonClientSwitchGatewaysArgs, -} - -impl AsRef for Args { - fn as_ref(&self) -> &CommonClientSwitchGatewaysArgs { - &self.common_args - } -} - -pub(crate) async fn execute(args: Args) -> Result<(), AuthenticatorError> { - switch_gateway::(args).await -} diff --git a/service-providers/authenticator/src/error.rs b/service-providers/authenticator/src/error.rs index d6672a4076c..7774fc1e48a 100644 --- a/service-providers/authenticator/src/error.rs +++ b/service-providers/authenticator/src/error.rs @@ -17,6 +17,9 @@ pub enum AuthenticatorError { #[error("{0}")] CredentialVerificationError(#[from] nym_credential_verification::Error), + #[error("invalid credential type")] + InvalidCredentialType, + #[error("the entity wrapping the network requester has disconnected")] DisconnectedParent, @@ -77,13 +80,7 @@ pub enum AuthenticatorError { #[error("peers can't be interacted with anymore")] PeerInteractionStopped, - #[error("operation is not supported")] - UnsupportedOperation, - - #[error("operation unavailable for older client")] - OldClient, - - #[error("storage should have the requested bandwidht entry")] + #[error("storage should have the requested bandwidth entry")] MissingClientBandwidthEntry, #[error("unknown version number")] @@ -103,6 +100,9 @@ pub enum AuthenticatorError { #[error("{0}")] RecipientFormatting(#[from] nym_sdk::mixnet::RecipientFormattingError), + + #[error("no credential received")] + NoCredentialReceived, } pub type Result = std::result::Result; diff --git a/service-providers/authenticator/src/main.rs b/service-providers/authenticator/src/main.rs deleted file mode 100644 index 0883e64a995..00000000000 --- a/service-providers/authenticator/src/main.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 - -mod cli; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - use clap::Parser; - - let args = cli::Cli::parse(); - nym_bin_common::logging::setup_tracing_logger(); - nym_network_defaults::setup_env(args.config_env_file.as_ref()); - - if !args.no_banner { - nym_bin_common::logging::maybe_print_banner(clap::crate_name!(), clap::crate_version!()); - } - - cli::execute(args).await?; - Ok(()) -} diff --git a/service-providers/authenticator/src/mixnet_listener.rs b/service-providers/authenticator/src/mixnet_listener.rs index b28df229822..b8a7221d2ef 100644 --- a/service-providers/authenticator/src/mixnet_listener.rs +++ b/service-providers/authenticator/src/mixnet_listener.rs @@ -23,13 +23,15 @@ use nym_authenticator_requests::{ }, v1, v2, v3, v4, v5, CURRENT_VERSION, }; +use nym_credential_verification::ecash::traits::EcashManager; use nym_credential_verification::{ - bandwidth_storage_manager::BandwidthStorageManager, ecash::EcashManager, - BandwidthFlushingBehaviourConfig, ClientBandwidth, CredentialVerifier, + bandwidth_storage_manager::BandwidthStorageManager, BandwidthFlushingBehaviourConfig, + ClientBandwidth, CredentialVerifier, }; -use nym_credentials_interface::CredentialSpendingData; +use nym_credentials_interface::{CredentialSpendingData, TicketType}; use nym_crypto::asymmetric::x25519::KeyPair; use nym_gateway_requests::models::CredentialSpendingRequest; +use nym_gateway_storage::models::PersistedBandwidth; use nym_sdk::mixnet::{ AnonymousSenderTag, InputMessage, MixnetMessageSender, Recipient, TransmissionLane, }; @@ -74,7 +76,7 @@ pub(crate) struct MixnetListener { pub(crate) peer_manager: PeerManager, - pub(crate) ecash_verifier: Option>, + pub(crate) ecash_verifier: Arc, pub(crate) timeout_check_interval: IntervalStream, @@ -88,7 +90,7 @@ impl MixnetListener { wireguard_gateway_data: WireguardGatewayData, mixnet_client: nym_sdk::mixnet::MixnetClient, task_handle: TaskHandle, - ecash_verifier: Option>, + ecash_verifier: Arc, ) -> Self { let timeout_check_interval = IntervalStream::new(tokio::time::interval(DEFAULT_REGISTRATION_TIMEOUT_CHECK)); @@ -500,38 +502,37 @@ impl MixnetListener { 128, )); - // If gateway does ecash verification and client sends a credential, we do the additional - // credential verification. Later this will become mandatory. - if let (Some(ecash_verifier), Some(credential)) = - (self.ecash_verifier.clone(), final_message.credential()) + let Some(credential) = final_message.credential() else { + return Err(AuthenticatorError::NoCredentialReceived); + }; + let client_id = self + .ecash_verifier + .storage() + .insert_wireguard_peer( + &peer, + TicketType::try_from_encoded(credential.payment.t_type) + .map_err(|_| AuthenticatorError::InvalidCredentialType)? + .into(), + ) + .await?; + if let Err(e) = + credential_verification(self.ecash_verifier.clone(), credential, client_id).await { - let client_id = ecash_verifier + self.ecash_verifier .storage() - .insert_wireguard_peer(&peer, true) - .await? - .ok_or(AuthenticatorError::InternalError( - "peer with ticket shouldn't have been used before without a ticket".to_string(), - ))?; - if let Err(e) = - Self::credential_verification(ecash_verifier.clone(), credential, client_id).await - { - ecash_verifier - .storage() - .remove_wireguard_peer(&peer.public_key.to_string()) - .await?; - return Err(e); - } - let public_key = peer.public_key.to_string(); - if let Err(e) = self.peer_manager.add_peer(peer, Some(client_id)).await { - ecash_verifier - .storage() - .remove_wireguard_peer(&public_key) - .await?; - return Err(e); - } - } else { - self.peer_manager.add_peer(peer, None).await?; + .remove_wireguard_peer(&peer.public_key.to_string()) + .await?; + return Err(e); + } + let public_key = peer.public_key.to_string(); + if let Err(e) = self.peer_manager.add_peer(peer).await { + self.ecash_verifier + .storage() + .remove_wireguard_peer(&public_key) + .await?; + return Err(e); } + registred_and_free .registration_in_progres .remove(&final_message.pub_key()); @@ -596,37 +597,6 @@ impl MixnetListener { Ok((bytes, reply_to)) } - async fn credential_verification( - ecash_verifier: Arc, - credential: CredentialSpendingData, - client_id: i64, - ) -> Result { - ecash_verifier - .storage() - .create_bandwidth_entry(client_id) - .await?; - let bandwidth = ecash_verifier - .storage() - .get_available_bandwidth(client_id) - .await? - .ok_or(AuthenticatorError::InternalError( - "bandwidth entry should have just been created".to_string(), - ))?; - let client_bandwidth = ClientBandwidth::new(bandwidth.into()); - let mut verifier = CredentialVerifier::new( - CredentialSpendingRequest::new(credential), - ecash_verifier.clone(), - BandwidthStorageManager::new( - ecash_verifier.storage().clone(), - client_bandwidth, - client_id, - BandwidthFlushingBehaviourConfig::default(), - true, - ), - ); - Ok(verifier.verify().await?) - } - async fn on_query_bandwidth_request( &mut self, msg: Box, @@ -634,12 +604,12 @@ impl MixnetListener { request_id: u64, reply_to: Option, ) -> AuthenticatorHandleResult { - let bandwidth_data = self.peer_manager.query_bandwidth(msg).await?; + let bandwidth_data = self.peer_manager.query_bandwidth(msg.pub_key()).await?; let bytes = match AuthenticatorVersion::from(protocol) { AuthenticatorVersion::V1 => { v1::response::AuthenticatorResponse::new_remaining_bandwidth( bandwidth_data.map(|data| v1::registration::RemainingBandwidthData { - available_bandwidth: data.available_bandwidth as u64, + available_bandwidth: data as u64, suspended: false, }), reply_to.ok_or(AuthenticatorError::MissingReplyToForOldClient)?, @@ -652,8 +622,10 @@ impl MixnetListener { } AuthenticatorVersion::V2 => { v2::response::AuthenticatorResponse::new_remaining_bandwidth( - bandwidth_data.map(|data| v2::registration::RemainingBandwidthData { - available_bandwidth: data.available_bandwidth, + bandwidth_data.map(|available_bandwidth| { + v2::registration::RemainingBandwidthData { + available_bandwidth, + } }), reply_to.ok_or(AuthenticatorError::MissingReplyToForOldClient)?, request_id, @@ -665,8 +637,10 @@ impl MixnetListener { } AuthenticatorVersion::V3 => { v3::response::AuthenticatorResponse::new_remaining_bandwidth( - bandwidth_data.map(|data| v3::registration::RemainingBandwidthData { - available_bandwidth: data.available_bandwidth, + bandwidth_data.map(|available_bandwidth| { + v3::registration::RemainingBandwidthData { + available_bandwidth, + } }), reply_to.ok_or(AuthenticatorError::MissingReplyToForOldClient)?, request_id, @@ -678,8 +652,10 @@ impl MixnetListener { } AuthenticatorVersion::V4 => { v4::response::AuthenticatorResponse::new_remaining_bandwidth( - bandwidth_data.map(|data| v4::registration::RemainingBandwidthData { - available_bandwidth: data.available_bandwidth, + bandwidth_data.map(|available_bandwidth| { + v4::registration::RemainingBandwidthData { + available_bandwidth, + } }), reply_to.ok_or(AuthenticatorError::MissingReplyToForOldClient)?, request_id, @@ -691,8 +667,10 @@ impl MixnetListener { } AuthenticatorVersion::V5 => { v5::response::AuthenticatorResponse::new_remaining_bandwidth( - bandwidth_data.map(|data| v5::registration::RemainingBandwidthData { - available_bandwidth: data.available_bandwidth, + bandwidth_data.map(|available_bandwidth| { + v5::registration::RemainingBandwidthData { + available_bandwidth, + } }), request_id, ) @@ -713,17 +691,13 @@ impl MixnetListener { request_id: u64, reply_to: Option, ) -> AuthenticatorHandleResult { - let Some(ecash_verifier) = self.ecash_verifier.clone() else { - return Err(AuthenticatorError::UnsupportedOperation); - }; - - let client_id = ecash_verifier + let client_id = self + .ecash_verifier .storage() .get_wireguard_peer(&msg.pub_key().to_string()) .await? .ok_or(AuthenticatorError::MissingClientBandwidthEntry)? - .client_id - .ok_or(AuthenticatorError::OldClient)?; + .client_id; let client_bandwidth = self .peer_manager .query_client_bandwidth(msg.pub_key()) @@ -737,9 +711,9 @@ impl MixnetListener { let credential = msg.credential(); let mut verifier = CredentialVerifier::new( CredentialSpendingRequest::new(credential.clone()), - ecash_verifier.clone(), + self.ecash_verifier.clone(), BandwidthStorageManager::new( - ecash_verifier.storage().clone(), + self.ecash_verifier.storage(), client_bandwidth, client_id, BandwidthFlushingBehaviourConfig::default(), @@ -907,6 +881,45 @@ impl MixnetListener { } } +pub async fn credential_storage_preparation( + ecash_verifier: Arc, + client_id: i64, +) -> Result { + ecash_verifier + .storage() + .create_bandwidth_entry(client_id) + .await?; + let bandwidth = ecash_verifier + .storage() + .get_available_bandwidth(client_id) + .await? + .ok_or(AuthenticatorError::InternalError( + "bandwidth entry should have just been created".to_string(), + ))?; + Ok(bandwidth) +} + +async fn credential_verification( + ecash_verifier: Arc, + credential: CredentialSpendingData, + client_id: i64, +) -> Result { + let bandwidth = credential_storage_preparation(ecash_verifier.clone(), client_id).await?; + let client_bandwidth = ClientBandwidth::new(bandwidth.into()); + let mut verifier = CredentialVerifier::new( + CredentialSpendingRequest::new(credential), + ecash_verifier.clone(), + BandwidthStorageManager::new( + ecash_verifier.storage(), + client_bandwidth, + client_id, + BandwidthFlushingBehaviourConfig::default(), + true, + ), + ); + Ok(verifier.verify().await?) +} + fn deserialize_request(reconstructed: &ReconstructedMessage) -> Result { let request_version = *reconstructed .message diff --git a/service-providers/authenticator/src/peer_manager.rs b/service-providers/authenticator/src/peer_manager.rs index 2cb5ac137ba..3d4c86a95d6 100644 --- a/service-providers/authenticator/src/peer_manager.rs +++ b/service-providers/authenticator/src/peer_manager.rs @@ -4,15 +4,11 @@ use crate::error::*; use defguard_wireguard_rs::{host::Peer, key::Key}; use futures::channel::oneshot; -use nym_authenticator_requests::{ - latest::registration::{GatewayClient, RemainingBandwidthData}, - traits::QueryBandwidthMessage, -}; use nym_credential_verification::ClientBandwidth; use nym_wireguard::{ peer_controller::{ AddPeerControlResponse, GetClientBandwidthControlResponse, PeerControlRequest, - QueryBandwidthControlResponse, QueryPeerControlResponse, RemovePeerControlResponse, + QueryPeerControlResponse, RemovePeerControlResponse, }, WireguardGatewayData, }; @@ -28,13 +24,9 @@ impl PeerManager { wireguard_gateway_data, } } - pub async fn add_peer(&mut self, peer: Peer, client_id: Option) -> Result<()> { + pub async fn add_peer(&mut self, peer: Peer) -> Result<()> { let (response_tx, response_rx) = oneshot::channel(); - let msg = PeerControlRequest::AddPeer { - peer, - client_id, - response_tx, - }; + let msg = PeerControlRequest::AddPeer { peer, response_tx }; self.wireguard_gateway_data .peer_tx() .send(msg) @@ -52,8 +44,8 @@ impl PeerManager { Ok(()) } - pub async fn _remove_peer(&mut self, client: &GatewayClient) -> Result<()> { - let key = Key::new(client.pub_key().to_bytes()); + pub async fn _remove_peer(&mut self, pub_key: PeerPublicKey) -> Result<()> { + let key = Key::new(pub_key.to_bytes()); let (response_tx, response_rx) = oneshot::channel(); let msg = PeerControlRequest::RemovePeer { key, response_tx }; self.wireguard_gateway_data @@ -63,7 +55,7 @@ impl PeerManager { .map_err(|_| AuthenticatorError::PeerInteractionStopped)?; let RemovePeerControlResponse { success } = response_rx.await.map_err(|_| { - AuthenticatorError::InternalError("no response for add peer".to_string()) + AuthenticatorError::InternalError("no response for remove peer".to_string()) })?; if !success { return Err(AuthenticatorError::InternalError( @@ -94,31 +86,13 @@ impl PeerManager { Ok(peer) } - pub async fn query_bandwidth( - &mut self, - msg: Box, - ) -> Result> { - let key = Key::new(msg.pub_key().to_bytes()); - let (response_tx, response_rx) = oneshot::channel(); - let msg = PeerControlRequest::QueryBandwidth { key, response_tx }; - self.wireguard_gateway_data - .peer_tx() - .send(msg) - .await - .map_err(|_| AuthenticatorError::PeerInteractionStopped)?; - - let QueryBandwidthControlResponse { - success, - bandwidth_data, - } = response_rx.await.map_err(|_| { - AuthenticatorError::InternalError("no response for query bandwidth".to_string()) - })?; - if !success { - return Err(AuthenticatorError::InternalError( - "querying bandwidth could not be performed".to_string(), - )); - } - Ok(bandwidth_data) + pub async fn query_bandwidth(&mut self, public_key: PeerPublicKey) -> Result> { + let res = if let Some(client_bandwidth) = self.query_client_bandwidth(public_key).await? { + Some(client_bandwidth.available().await) + } else { + None + }; + Ok(res) } pub async fn query_client_bandwidth( @@ -143,3 +117,243 @@ impl PeerManager { Ok(client_bandwidth) } } + +#[cfg(test)] +mod tests { + use std::{str::FromStr, sync::Arc}; + + use nym_credential_verification::{ + bandwidth_storage_manager::BandwidthStorageManager, ecash::MockEcashManager, + }; + use nym_credentials_interface::Bandwidth; + use nym_crypto::asymmetric::x25519::KeyPair; + use nym_gateway_storage::traits::{mock::MockGatewayStorage, BandwidthGatewayStorage}; + use nym_wireguard::peer_controller::{start_controller, stop_controller}; + use rand::rngs::OsRng; + use time::{Duration, OffsetDateTime}; + use tokio::sync::RwLock; + + use crate::{config::Authenticator, mixnet_listener::credential_storage_preparation}; + + use super::*; + + #[tokio::test] + async fn add_peer() { + let (wireguard_data, request_rx) = WireguardGatewayData::new( + Authenticator::default().into(), + Arc::new(KeyPair::new(&mut OsRng)), + ); + let mut peer_manager = PeerManager::new(wireguard_data); + let (storage, task_manager) = start_controller( + peer_manager.wireguard_gateway_data.peer_tx().clone(), + request_rx, + ); + let peer = Peer::default(); + let ecash_manager = MockEcashManager::new(Box::new(storage.clone())); + + assert!(peer_manager.add_peer(peer.clone()).await.is_err()); + + let client_id = storage + .insert_wireguard_peer(&peer, FromStr::from_str("entry_wireguard").unwrap()) + .await + .unwrap(); + assert!(peer_manager.add_peer(peer.clone()).await.is_err()); + + credential_storage_preparation(Arc::new(ecash_manager), client_id) + .await + .unwrap(); + peer_manager.add_peer(peer.clone()).await.unwrap(); + + stop_controller(task_manager).await; + } + + async fn helper_add_peer( + storage: &Arc>, + peer_manager: &mut PeerManager, + ) -> i64 { + let peer = Peer::default(); + let ecash_manager = MockEcashManager::new(Box::new(storage.clone())); + let client_id = storage + .insert_wireguard_peer(&peer, FromStr::from_str("entry_wireguard").unwrap()) + .await + .unwrap(); + credential_storage_preparation(Arc::new(ecash_manager), client_id) + .await + .unwrap(); + peer_manager.add_peer(peer.clone()).await.unwrap(); + + client_id + } + + #[tokio::test] + async fn remove_peer() { + let (wireguard_data, request_rx) = WireguardGatewayData::new( + Authenticator::default().into(), + Arc::new(KeyPair::new(&mut OsRng)), + ); + let mut peer_manager = PeerManager::new(wireguard_data); + let key = Key::default(); + let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); + let (storage, task_manager) = start_controller( + peer_manager.wireguard_gateway_data.peer_tx().clone(), + request_rx, + ); + + helper_add_peer(&storage, &mut peer_manager).await; + peer_manager._remove_peer(public_key).await.unwrap(); + + stop_controller(task_manager).await; + } + + #[tokio::test] + async fn query_peer() { + let (wireguard_data, request_rx) = WireguardGatewayData::new( + Authenticator::default().into(), + Arc::new(KeyPair::new(&mut OsRng)), + ); + let mut peer_manager = PeerManager::new(wireguard_data); + let key = Key::default(); + let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); + let (storage, task_manager) = start_controller( + peer_manager.wireguard_gateway_data.peer_tx().clone(), + request_rx, + ); + + assert!(peer_manager.query_peer(public_key).await.unwrap().is_none()); + + helper_add_peer(&storage, &mut peer_manager).await; + let peer = peer_manager.query_peer(public_key).await.unwrap().unwrap(); + assert_eq!(peer.public_key, key); + + stop_controller(task_manager).await; + } + + #[tokio::test] + async fn query_bandwidth() { + let (wireguard_data, request_rx) = WireguardGatewayData::new( + Authenticator::default().into(), + Arc::new(KeyPair::new(&mut OsRng)), + ); + let mut peer_manager = PeerManager::new(wireguard_data); + let key = Key::default(); + let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); + let (storage, task_manager) = start_controller( + peer_manager.wireguard_gateway_data.peer_tx().clone(), + request_rx, + ); + + assert!(peer_manager + .query_bandwidth(public_key) + .await + .unwrap() + .is_none()); + + helper_add_peer(&storage, &mut peer_manager).await; + let available_bandwidth = peer_manager + .query_bandwidth(public_key) + .await + .unwrap() + .unwrap(); + assert_eq!(available_bandwidth, 0); + + stop_controller(task_manager).await; + } + + #[tokio::test] + async fn query_client_bandwidth() { + let (wireguard_data, request_rx) = WireguardGatewayData::new( + Authenticator::default().into(), + Arc::new(KeyPair::new(&mut OsRng)), + ); + let mut peer_manager = PeerManager::new(wireguard_data); + let key = Key::default(); + let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); + let (storage, task_manager) = start_controller( + peer_manager.wireguard_gateway_data.peer_tx().clone(), + request_rx, + ); + + assert!(peer_manager + .query_client_bandwidth(public_key) + .await + .unwrap() + .is_none()); + + helper_add_peer(&storage, &mut peer_manager).await; + let available_bandwidth = peer_manager + .query_client_bandwidth(public_key) + .await + .unwrap() + .unwrap() + .available() + .await; + assert_eq!(available_bandwidth, 0); + + stop_controller(task_manager).await; + } + + #[tokio::test] + async fn increase_decrease_bandwidth() { + let (wireguard_data, request_rx) = WireguardGatewayData::new( + Authenticator::default().into(), + Arc::new(KeyPair::new(&mut OsRng)), + ); + let mut peer_manager = PeerManager::new(wireguard_data); + let key = Key::default(); + let public_key = PeerPublicKey::from_str(&key.to_string()).unwrap(); + let top_up = 42; + let consume = 4; + let (storage, task_manager) = start_controller( + peer_manager.wireguard_gateway_data.peer_tx().clone(), + request_rx, + ); + + let client_id = helper_add_peer(&storage, &mut peer_manager).await; + let client_bandwidth = peer_manager + .query_client_bandwidth(public_key) + .await + .unwrap() + .unwrap(); + + let mut bw_manager = BandwidthStorageManager::new( + Box::new(storage), + client_bandwidth.clone(), + client_id, + Default::default(), + true, + ); + bw_manager + .increase_bandwidth( + Bandwidth::new_unchecked(top_up as u64), + OffsetDateTime::now_utc() + .checked_add(Duration::minutes(1)) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(client_bandwidth.available().await, top_up); + assert_eq!( + peer_manager + .query_bandwidth(public_key) + .await + .unwrap() + .unwrap(), + top_up + ); + + bw_manager.try_use_bandwidth(consume).await.unwrap(); + let remaining = top_up - consume; + assert_eq!(client_bandwidth.available().await, remaining); + assert_eq!( + peer_manager + .query_bandwidth(public_key) + .await + .unwrap() + .unwrap(), + remaining + ); + + stop_controller(task_manager).await; + } +}