diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index c6b3f01d7..71fe0e63a 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -6,6 +6,9 @@ dictionary Config { Network network; NetAddress? listening_address; u32 default_cltv_expiry_delta; + u64 onchain_wallet_sync_interval_secs; + u64 wallet_sync_interval_secs; + u64 fee_rate_cache_update_interval_secs; LogLevel log_level; }; @@ -157,6 +160,7 @@ dictionary ChannelDetails { u64 channel_value_satoshis; u64? unspendable_punishment_reserve; UserChannelId user_channel_id; + u32 feerate_sat_per_1000_weight; u64 balance_msat; u64 outbound_capacity_msat; u64 inbound_capacity_msat; diff --git a/src/lib.rs b/src/lib.rs index 25c7ab7ac..ca0600781 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,7 +118,7 @@ use types::{ pub use types::{ChannelDetails, ChannelId, PeerDetails, UserChannelId}; use wallet::Wallet; -use logger::{log_error, log_info, FilesystemLogger, Logger}; +use logger::{log_error, log_info, log_trace, FilesystemLogger, Logger}; use lightning::chain::keysinterface::EntropySource; use lightning::chain::{chainmonitor, BestBlock, Confirm, Watch}; @@ -161,7 +161,6 @@ use std::convert::TryInto; use std::default::Default; use std::fs; use std::net::ToSocketAddrs; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime}; @@ -173,7 +172,9 @@ const DEFAULT_STORAGE_DIR_PATH: &str = "/tmp/ldk_node/"; const DEFAULT_NETWORK: Network = Network::Bitcoin; const DEFAULT_LISTENING_ADDR: &str = "0.0.0.0:9735"; const DEFAULT_CLTV_EXPIRY_DELTA: u32 = 144; -const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; +const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 60; +const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 20; +const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Debug; // The 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold @@ -183,6 +184,9 @@ const BDK_CLIENT_STOP_GAP: usize = 20; // The number of concurrent requests made against the API provider. const BDK_CLIENT_CONCURRENCY: u8 = 8; +// The default Esplora server we're using. +const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; + // The timeout after which we abandon retrying failed payments. const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); @@ -195,6 +199,9 @@ const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); // The time in-between node announcement broadcast attempts. const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60); +// The lower limit which we apply to any configured wallet sync intervals. +const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; + // The length in bytes of our wallets' keys seed. const WALLET_KEYS_SEED_LEN: usize = 64; @@ -203,13 +210,16 @@ const WALLET_KEYS_SEED_LEN: usize = 64; /// /// ### Defaults /// -/// | Parameter | Value | -/// |-----------------------------|------------------| -/// | `storage_dir_path` | /tmp/ldk_node/ | -/// | `network` | `Bitcoin` | -/// | `listening_address` | 0.0.0.0:9735 | -/// | `default_cltv_expiry_delta` | 144 | -/// | `log_level` | `Debug` | +/// | Parameter | Value | +/// |----------------------------------------|------------------| +/// | `storage_dir_path` | /tmp/ldk_node/ | +/// | `network` | `Bitcoin | +/// | `listening_address` | 0.0.0.0:9735 | +/// | `default_cltv_expiry_delta` | 144 | +/// | `onchain_wallet_sync_interval_secs` | 60 | +/// | `wallet_sync_interval_secs` | 20 | +/// | `fee_rate_cache_update_interval_secs` | 600 | +/// | `log_level` | `Debug` | /// pub struct Config { /// The path where the underlying LDK and BDK persist their data. @@ -220,6 +230,18 @@ pub struct Config { pub listening_address: Option, /// The default CLTV expiry delta to be used for payments. pub default_cltv_expiry_delta: u32, + /// The time in-between background sync attempts of the onchain wallet, in seconds. + /// + /// **Note:** A minimum of 10 seconds is always enforced. + pub onchain_wallet_sync_interval_secs: u64, + /// The time in-between background sync attempts of the LDK wallet, in seconds. + /// + /// **Note:** A minimum of 10 seconds is always enforced. + pub wallet_sync_interval_secs: u64, + /// The time in-between background update attempts to our fee rate cache, in seconds. + /// + /// **Note:** A minimum of 10 seconds is always enforced. + pub fee_rate_cache_update_interval_secs: u64, /// The level at which we log messages. /// /// Any messages below this level will be excluded from the logs. @@ -233,6 +255,9 @@ impl Default for Config { network: DEFAULT_NETWORK, listening_address: Some(DEFAULT_LISTENING_ADDR.parse().unwrap()), default_cltv_expiry_delta: DEFAULT_CLTV_EXPIRY_DELTA, + onchain_wallet_sync_interval_secs: DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS, + wallet_sync_interval_secs: DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS, + fee_rate_cache_update_interval_secs: DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS, log_level: DEFAULT_LOG_LEVEL, } } @@ -766,40 +791,57 @@ impl Node { let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); - let event_handler = Arc::new(EventHandler::new( - Arc::clone(&self.wallet), - Arc::clone(&self.event_queue), - Arc::clone(&self.channel_manager), - Arc::clone(&self.network_graph), - Arc::clone(&self.keys_manager), - Arc::clone(&self.payment_store), - Arc::clone(&self.runtime), - Arc::clone(&self.logger), - Arc::clone(&self.config), - )); + // Block to ensure we update our fee rate cache once on startup + let wallet = Arc::clone(&self.wallet); + let sync_logger = Arc::clone(&self.logger); + runtime.block_on(async move { + let now = Instant::now(); + match wallet.update_fee_estimates().await { + Ok(()) => { + log_info!( + sync_logger, + "Initial fee rate cache update finished in {}ms.", + now.elapsed().as_millis() + ); + Ok(()) + } + Err(e) => { + log_error!(sync_logger, "Initial fee rate cache update failed: {}", e,); + Err(e) + } + } + })?; // Setup wallet sync let wallet = Arc::clone(&self.wallet); - let tx_sync = Arc::clone(&self.tx_sync); - let sync_cman = Arc::clone(&self.channel_manager); - let sync_cmon = Arc::clone(&self.chain_monitor); let sync_logger = Arc::clone(&self.logger); let mut stop_sync = self.stop_receiver.clone(); - + let onchain_wallet_sync_interval_secs = + self.config.onchain_wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + let fee_rate_cache_update_interval_secs = + self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( async move { - let mut interval = tokio::time::interval(Duration::from_secs(30)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut onchain_wallet_sync_interval = tokio::time::interval( + Duration::from_secs(onchain_wallet_sync_interval_secs), + ); + onchain_wallet_sync_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut fee_rate_update_interval = tokio::time::interval(Duration::from_secs( + fee_rate_cache_update_interval_secs, + )); + // We just blocked on updating, so skip the first tick. + fee_rate_update_interval.reset(); loop { - let now = Instant::now(); tokio::select! { _ = stop_sync.changed() => { return; } - _ = interval.tick() => { + _ = onchain_wallet_sync_interval.tick() => { + let now = Instant::now(); match wallet.sync().await { - Ok(()) => log_info!( + Ok(()) => log_trace!( sync_logger, "Background sync of on-chain wallet finished in {}ms.", now.elapsed().as_millis() @@ -813,12 +855,66 @@ impl Node { } } } + _ = fee_rate_update_interval.tick() => { + let now = Instant::now(); + match wallet.update_fee_estimates().await { + Ok(()) => log_trace!( + sync_logger, + "Background update of fee rate cache finished in {}ms.", + now.elapsed().as_millis() + ), + Err(err) => { + log_error!( + sync_logger, + "Background update of fee rate cache failed: {}", + err + ) + } + } + } } } }, ); }); + let tx_sync = Arc::clone(&self.tx_sync); + let sync_cman = Arc::clone(&self.channel_manager); + let sync_cmon = Arc::clone(&self.chain_monitor); + let sync_logger = Arc::clone(&self.logger); + let mut stop_sync = self.stop_receiver.clone(); + let wallet_sync_interval_secs = + self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS); + runtime.spawn(async move { + let mut wallet_sync_interval = + tokio::time::interval(Duration::from_secs(wallet_sync_interval_secs)); + wallet_sync_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_sync.changed() => { + return; + } + _ = wallet_sync_interval.tick() => { + let confirmables = vec![ + &*sync_cman as &(dyn Confirm + Sync + Send), + &*sync_cmon as &(dyn Confirm + Sync + Send), + ]; + let now = Instant::now(); + match tx_sync.sync(confirmables).await { + Ok(()) => log_trace!( + sync_logger, + "Background sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ), + Err(e) => { + log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + } + } + } + } + } + }); + if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); let gossip_sync_store = Arc::clone(&self.kv_store); @@ -836,7 +932,7 @@ impl Node { let now = Instant::now(); match gossip_source.update_rgs_snapshot().await { Ok(updated_timestamp) => { - log_info!( + log_trace!( gossip_sync_logger, "Background sync of RGS gossip data finished in {}ms.", now.elapsed().as_millis() @@ -860,37 +956,6 @@ impl Node { }); } - let sync_logger = Arc::clone(&self.logger); - let mut stop_sync = self.stop_receiver.clone(); - runtime.spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(10)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - let now = Instant::now(); - tokio::select! { - _ = stop_sync.changed() => { - return; - } - _ = interval.tick() => { - let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync + Send), - &*sync_cmon as &(dyn Confirm + Sync + Send), - ]; - match tx_sync.sync(confirmables).await { - Ok(()) => log_info!( - sync_logger, - "Background sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ), - Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) - } - } - } - } - } - }); - if let Some(listening_address) = &self.config.listening_address { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); @@ -1024,6 +1089,18 @@ impl Node { } }); + let event_handler = Arc::new(EventHandler::new( + Arc::clone(&self.wallet), + Arc::clone(&self.event_queue), + Arc::clone(&self.channel_manager), + Arc::clone(&self.network_graph), + Arc::clone(&self.keys_manager), + Arc::clone(&self.payment_store), + Arc::clone(&self.runtime), + Arc::clone(&self.logger), + Arc::clone(&self.config), + )); + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); @@ -1208,22 +1285,16 @@ impl Node { let con_node_id = peer_info.node_id; let con_addr = peer_info.address.clone(); - let con_success = Arc::new(AtomicBool::new(false)); - let con_success_cloned = Arc::clone(&con_success); let con_logger = Arc::clone(&self.logger); let con_pm = Arc::clone(&self.peer_manager); + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. tokio::task::block_in_place(move || { runtime.block_on(async move { - let res = - connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await; - con_success_cloned.store(res.is_ok(), Ordering::Release); + connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await }) - }); - - if !con_success.load(Ordering::Acquire) { - return Err(Error::ConnectionFailed); - } + })?; log_info!(self.logger, "Connected to peer {}@{}. ", peer_info.node_id, peer_info.address); @@ -1286,22 +1357,16 @@ impl Node { let con_node_id = peer_info.node_id; let con_addr = peer_info.address.clone(); - let con_success = Arc::new(AtomicBool::new(false)); - let con_success_cloned = Arc::clone(&con_success); let con_logger = Arc::clone(&self.logger); let con_pm = Arc::clone(&self.peer_manager); + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. tokio::task::block_in_place(move || { runtime.block_on(async move { - let res = - connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await; - con_success_cloned.store(res.is_ok(), Ordering::Release); + connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await }) - }); - - if !con_success.load(Ordering::Acquire) { - return Err(Error::ConnectionFailed); - } + })?; let user_config = UserConfig { channel_handshake_limits: ChannelHandshakeLimits { @@ -1342,15 +1407,17 @@ impl Node { } } - /// Sync the LDK and BDK wallets with the current chain state. + /// Manually sync the LDK and BDK wallets with the current chain state. /// - /// Note that the wallets will be also synced regularly in the background. + /// **Note:** The wallets are regularly synced in the background, which is configurable via + /// [`Config::onchain_wallet_sync_interval_secs`] and [`Config::wallet_sync_interval_secs`]. + /// Therefore, using this blocking sync method is almost always redudant and should be avoided + /// where possible. pub fn sync_wallets(&self) -> Result<(), Error> { let rt_lock = self.runtime.read().unwrap(); if rt_lock.is_none() { return Err(Error::NotRunning); } - let runtime = rt_lock.as_ref().unwrap(); let wallet = Arc::clone(&self.wallet); let tx_sync = Arc::clone(&self.tx_sync); @@ -1373,39 +1440,31 @@ impl Node { "Sync of on-chain wallet finished in {}ms.", now.elapsed().as_millis() ); - Ok(()) } Err(e) => { log_error!(sync_logger, "Sync of on-chain wallet failed: {}", e); - Err(e) + return Err(e); + } + }; + + let now = Instant::now(); + match tx_sync.sync(confirmables).await { + Ok(()) => { + log_info!( + sync_logger, + "Sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); + Ok(()) + } + Err(e) => { + log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e); + Err(e.into()) } } }, ) - })?; - - let sync_logger = Arc::clone(&self.logger); - tokio::task::block_in_place(move || { - runtime.block_on(async move { - let now = Instant::now(); - match tx_sync.sync(confirmables).await { - Ok(()) => { - log_info!( - sync_logger, - "Sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); - Ok(()) - } - Err(e) => { - log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e); - Err(e) - } - } - }) - })?; - - Ok(()) + }) } /// Close a previously opened channel. diff --git a/src/test/functional_tests.rs b/src/test/functional_tests.rs index ba92dfc78..4df14a54d 100644 --- a/src/test/functional_tests.rs +++ b/src/test/functional_tests.rs @@ -399,7 +399,7 @@ fn onchain_spend_receive() { #[test] fn sign_verify_msg() { - let (_, electrsd) = setup_bitcoind_and_electrsd(); + let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); let config = random_config(); let builder = Builder::from_config(config.clone()); diff --git a/src/types.rs b/src/types.rs index f164eb70d..13f2a5391 100644 --- a/src/types.rs +++ b/src/types.rs @@ -159,6 +159,9 @@ pub struct ChannelDetails { pub unspendable_punishment_reserve: Option, /// The local `user_channel_id` of this channel. pub user_channel_id: UserChannelId, + /// The currently negotiated fee rate denominated in satoshi per 1000 weight units, + /// which is applied to commitment and HTLC transactions. + pub feerate_sat_per_1000_weight: u32, /// Total balance of the channel. This is the amount that will be returned to the user if the /// channel is closed. /// @@ -214,6 +217,7 @@ impl From for ChannelDetails { unspendable_punishment_reserve: value.unspendable_punishment_reserve, user_channel_id: UserChannelId(value.user_channel_id), balance_msat: value.balance_msat, + feerate_sat_per_1000_weight: value.feerate_sat_per_1000_weight.unwrap(), outbound_capacity_msat: value.outbound_capacity_msat, inbound_capacity_msat: value.inbound_capacity_msat, confirmations_required: value.confirmations_required, diff --git a/src/wallet.rs b/src/wallet.rs index bd6996ebe..c6362486d 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -73,14 +73,6 @@ where } }; - match self.update_fee_estimates().await { - Ok(()) => (), - Err(e) => { - log_error!(self.logger, "Fee estimation error: {}", e); - return Err(e); - } - } - let sync_options = SyncOptions { progress: None }; let wallet_lock = self.inner.lock().unwrap(); let res = match wallet_lock.sync(&self.blockchain, sync_options).await {