diff --git a/Cargo.toml b/Cargo.toml index 4c4422461..6661497c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ bip39 = "2.0.0" rand = "0.8.5" chrono = { version = "0.4", default-features = false, features = ["clock"] } -tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] } +tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] } esplora-client = { version = "0.6", default-features = false } libc = "0.2" uniffi = { version = "0.26.0", features = ["build"], optional = true } diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 26ab55994..2723db573 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -165,11 +165,15 @@ enum NodeError { "ChannelConfigUpdateFailed", "PersistenceFailed", "FeerateEstimationUpdateFailed", + "FeerateEstimationUpdateTimeout", "WalletOperationFailed", + "WalletOperationTimeout", "OnchainTxSigningFailed", "MessageSigningFailed", "TxSyncFailed", + "TxSyncTimeout", "GossipUpdateFailed", + "GossipUpdateTimeout", "LiquidityRequestFailed", "InvalidAddress", "InvalidSocketAddress", diff --git a/src/builder.rs b/src/builder.rs index fa6e573b7..a2a93aa79 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,6 +1,6 @@ use crate::config::{ default_user_config, Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, - DEFAULT_ESPLORA_SERVER_URL, WALLET_KEYS_SEED_LEN, + DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL, WALLET_KEYS_SEED_LEN, }; use crate::connection::ConnectionManager; use crate::event::EventQueue; @@ -558,10 +558,15 @@ fn build_with_store_internal( let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora(server_url)) => { - let tx_sync = Arc::new(EsploraSyncClient::new(server_url.clone(), Arc::clone(&logger))); - let blockchain = - EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP) - .with_concurrency(BDK_CLIENT_CONCURRENCY); + let mut client_builder = esplora_client::Builder::new(&server_url.clone()); + client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); + let esplora_client = client_builder.build_async().unwrap(); + let tx_sync = Arc::new(EsploraSyncClient::from_client( + esplora_client.clone(), + Arc::clone(&logger), + )); + let blockchain = EsploraBlockchain::from_client(esplora_client, BDK_CLIENT_STOP_GAP) + .with_concurrency(BDK_CLIENT_CONCURRENCY); let tx_broadcaster = Arc::new(TransactionBroadcaster::new( tx_sync.client().clone(), Arc::clone(&logger), @@ -959,6 +964,7 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); + let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); let is_listening = Arc::new(AtomicBool::new(false)); let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None)); @@ -971,6 +977,7 @@ fn build_with_store_internal( Ok(Node { runtime, stop_sender, + event_handling_stopped_sender, config, wallet, tx_sync, diff --git a/src/config.rs b/src/config.rs index 7adae2d4b..d0e72080f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,6 +28,9 @@ pub(crate) const BDK_CLIENT_CONCURRENCY: u8 = 4; // The default Esplora server we're using. pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; +// The default Esplora client timeout we're using. +pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10; + // The timeout after which we abandon retrying failed payments. pub(crate) const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); @@ -46,6 +49,21 @@ pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60 // The lower limit which we apply to any configured wallet sync intervals. pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; +// The timeout after which we abort a wallet syncing operation. +pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90; + +// The timeout after which we abort a wallet syncing operation. +pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; + +// The timeout after which we abort a fee rate cache update operation. +pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; + +// The timeout after which we abort a transaction broadcast operation. +pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5; + +// The timeout after which we abort a RGS sync operation. +pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5; + // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; diff --git a/src/error.rs b/src/error.rs index 63ec0ad84..a8671d9a7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -33,16 +33,24 @@ pub enum Error { PersistenceFailed, /// A fee rate estimation update failed. FeerateEstimationUpdateFailed, + /// A fee rate estimation update timed out. + FeerateEstimationUpdateTimeout, /// A wallet operation failed. WalletOperationFailed, + /// A wallet operation timed out. + WalletOperationTimeout, /// A signing operation for transaction failed. OnchainTxSigningFailed, /// A signing operation for message failed. MessageSigningFailed, /// A transaction sync operation failed. TxSyncFailed, + /// A transaction sync operation timed out. + TxSyncTimeout, /// A gossip updating operation failed. GossipUpdateFailed, + /// A gossip updating operation timed out. + GossipUpdateTimeout, /// A liquidity request operation failed. LiquidityRequestFailed, /// The given address is invalid. @@ -111,11 +119,17 @@ impl fmt::Display for Error { Self::FeerateEstimationUpdateFailed => { write!(f, "Failed to update fee rate estimates.") }, + Self::FeerateEstimationUpdateTimeout => { + write!(f, "Updating fee rate estimates timed out.") + }, Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."), + Self::WalletOperationTimeout => write!(f, "A wallet operation timed out."), Self::OnchainTxSigningFailed => write!(f, "Failed to sign given transaction."), Self::MessageSigningFailed => write!(f, "Failed to sign given message."), Self::TxSyncFailed => write!(f, "Failed to sync transactions."), + Self::TxSyncTimeout => write!(f, "Syncing transactions timed out."), Self::GossipUpdateFailed => write!(f, "Failed to update gossip data."), + Self::GossipUpdateTimeout => write!(f, "Updating gossip data timed out."), Self::LiquidityRequestFailed => write!(f, "Failed to request inbound liquidity."), Self::InvalidAddress => write!(f, "The given address is invalid."), Self::InvalidSocketAddress => write!(f, "The given network address is invalid."), diff --git a/src/fee_estimator.rs b/src/fee_estimator.rs index 74518227f..f1fa7e43b 100644 --- a/src/fee_estimator.rs +++ b/src/fee_estimator.rs @@ -1,3 +1,4 @@ +use crate::config::FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS; use crate::logger::{log_error, log_trace, Logger}; use crate::{Config, Error}; @@ -14,6 +15,7 @@ use bitcoin::Network; use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, RwLock}; +use std::time::Duration; pub(crate) struct OnchainFeeEstimator where @@ -55,7 +57,21 @@ where ConfirmationTarget::OutputSpendingFee => 12, }; - let estimates = self.esplora_client.get_fee_estimates().await.map_err(|e| { + let estimates = tokio::time::timeout( + Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), + self.esplora_client.get_fee_estimates(), + ) + .await + .map_err(|e| { + log_error!( + self.logger, + "Updating fee rate estimates for {:?} timed out: {}", + target, + e + ); + Error::FeerateEstimationUpdateTimeout + })? + .map_err(|e| { log_error!( self.logger, "Failed to retrieve fee rate estimates for {:?}: {}", diff --git a/src/gossip.rs b/src/gossip.rs index 5a41bf51c..1241b0cdc 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -1,3 +1,4 @@ +use crate::config::RGS_SYNC_TIMEOUT_SECS; use crate::logger::{log_trace, FilesystemLogger, Logger}; use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync}; use crate::Error; @@ -6,6 +7,7 @@ use lightning::routing::utxo::UtxoLookup; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use std::time::Duration; pub(crate) enum GossipSource { P2PNetwork { @@ -55,7 +57,17 @@ impl GossipSource { Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => { let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire); let query_url = format!("{}/{}", server_url, query_timestamp); - let response = reqwest::get(query_url).await.map_err(|e| { + + let response = tokio::time::timeout( + Duration::from_secs(RGS_SYNC_TIMEOUT_SECS), + reqwest::get(query_url), + ) + .await + .map_err(|e| { + log_trace!(logger, "Retrieving RGS gossip update timed out: {}", e); + Error::GossipUpdateTimeout + })? + .map_err(|e| { log_trace!(logger, "Failed to retrieve RGS gossip update: {}", e); Error::GossipUpdateFailed })?; diff --git a/src/lib.rs b/src/lib.rs index 8c504ed51..de2a0badf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,8 +123,8 @@ pub use builder::BuildError; pub use builder::NodeBuilder as Builder; use config::{ - default_user_config, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, - RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, LDK_WALLET_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, + PEER_RECONNECTION_INTERVAL, RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use connection::ConnectionManager; @@ -173,6 +173,7 @@ uniffi::include_scaffolding!("ldk_node"); pub struct Node { runtime: Arc>>, stop_sender: tokio::sync::watch::Sender<()>, + event_handling_stopped_sender: tokio::sync::watch::Sender<()>, config: Arc, wallet: Arc, tx_sync: Arc>>, @@ -275,6 +276,10 @@ impl Node { loop { tokio::select! { _ = stop_sync.changed() => { + log_trace!( + sync_logger, + "Stopping background syncing on-chain wallet.", + ); return; } _ = onchain_wallet_sync_interval.tick() => { @@ -321,6 +326,10 @@ impl Node { loop { tokio::select! { _ = stop_fee_updates.changed() => { + log_trace!( + fee_update_logger, + "Stopping background updates of fee rate cache.", + ); return; } _ = fee_rate_update_interval.tick() => { @@ -368,6 +377,10 @@ impl Node { loop { tokio::select! { _ = stop_sync.changed() => { + log_trace!( + sync_logger, + "Stopping background syncing Lightning wallet.", + ); return; } _ = wallet_sync_interval.tick() => { @@ -377,25 +390,31 @@ impl Node { &*sync_sweeper as &(dyn Confirm + Sync + Send), ]; let now = Instant::now(); - match tx_sync.sync(confirmables).await { - Ok(()) => { - log_trace!( - sync_logger, - "Background sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - - periodically_archive_fully_resolved_monitors( - Arc::clone(&archive_cman), - Arc::clone(&archive_cmon), - Arc::clone(&sync_monitor_archival_height) - ); + let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables)); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_trace!( + sync_logger, + "Background sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + + periodically_archive_fully_resolved_monitors( + Arc::clone(&archive_cman), + Arc::clone(&archive_cmon), + Arc::clone(&sync_monitor_archival_height) + ); + } + Err(e) => { + log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + } } Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + log_error!(sync_logger, "Background sync of Lightning wallet timed out: {}", e) } } } @@ -414,6 +433,10 @@ impl Node { loop { tokio::select! { _ = stop_gossip_sync.changed() => { + log_trace!( + gossip_sync_logger, + "Stopping background syncing RGS gossip data.", + ); return; } _ = interval.tick() => { @@ -489,6 +512,10 @@ impl Node { let peer_mgr = Arc::clone(&peer_manager_connection_handler); tokio::select! { _ = stop_listen.changed() => { + log_trace!( + listening_logger, + "Stopping listening to inbound connections.", + ); break; } res = listener.accept() => { @@ -521,6 +548,10 @@ impl Node { loop { tokio::select! { _ = stop_connect.changed() => { + log_trace!( + connect_logger, + "Stopping reconnecting known peers.", + ); return; } _ = interval.tick() => { @@ -566,6 +597,10 @@ impl Node { loop { tokio::select! { _ = stop_bcast.changed() => { + log_trace!( + bcast_logger, + "Stopping broadcasting node announcements.", + ); return; } _ = interval.tick() => { @@ -622,6 +657,7 @@ impl Node { let mut stop_tx_bcast = self.stop_sender.subscribe(); let tx_bcaster = Arc::clone(&self.tx_broadcaster); + let tx_bcast_logger = Arc::clone(&self.logger); runtime.spawn(async move { // Every second we try to clear our broadcasting queue. let mut interval = tokio::time::interval(Duration::from_secs(1)); @@ -629,6 +665,10 @@ impl Node { loop { tokio::select! { _ = stop_tx_bcast.changed() => { + log_trace!( + tx_bcast_logger, + "Stopping broadcasting transactions.", + ); return; } _ = interval.tick() => { @@ -671,11 +711,17 @@ impl Node { let background_error_logger = Arc::clone(&self.logger); let background_scorer = Arc::clone(&self.scorer); let stop_bp = self.stop_sender.subscribe(); + let sleeper_logger = Arc::clone(&self.logger); let sleeper = move |d| { let mut stop = stop_bp.clone(); + let sleeper_logger = Arc::clone(&sleeper_logger); Box::pin(async move { tokio::select! { _ = stop.changed() => { + log_trace!( + sleeper_logger, + "Stopping processing events.", + ); true } _ = tokio::time::sleep(d) => { @@ -685,6 +731,8 @@ impl Node { }) }; + let background_stop_logger = Arc::clone(&self.logger); + let event_handling_stopped_sender = self.event_handling_stopped_sender.clone(); runtime.spawn(async move { process_events_async( background_persister, @@ -704,15 +752,33 @@ impl Node { log_error!(background_error_logger, "Failed to process events: {}", e); panic!("Failed to process events"); }); + log_trace!(background_stop_logger, "Events processing stopped.",); + + match event_handling_stopped_sender.send(()) { + Ok(_) => (), + Err(e) => { + log_error!( + background_stop_logger, + "Failed to send 'events handling stopped' signal. This should never happen: {}", + e + ); + debug_assert!(false); + }, + } }); if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); let liquidity_handler = Arc::clone(&liquidity_source); + let liquidity_logger = Arc::clone(&self.logger); runtime.spawn(async move { loop { tokio::select! { _ = stop_liquidity_handler.changed() => { + log_trace!( + liquidity_logger, + "Stopping processing liquidity events.", + ); return; } _ = liquidity_handler.handle_next_event() => {} @@ -748,9 +814,55 @@ impl Node { }, } - // Stop disconnect peers. + // Disconnect all peers. self.peer_manager.disconnect_all_peers(); + // Wait until event handling stopped, at least until a timeout is reached. + let event_handling_stopped_logger = Arc::clone(&self.logger); + let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe(); + + // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow + // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We + // should drop this considerably post upgrading to BDK 1.0. + let timeout_res = runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(100), + event_handling_stopped_receiver.changed(), + ) + .await + }); + + match timeout_res { + Ok(stop_res) => match stop_res { + Ok(()) => {}, + Err(e) => { + log_error!( + event_handling_stopped_logger, + "Stopping event handling failed. This should never happen: {}", + e + ); + panic!("Stopping event handling failed. This should never happen."); + }, + }, + Err(e) => { + log_error!( + event_handling_stopped_logger, + "Stopping event handling timed out: {}", + e + ); + }, + } + + #[cfg(tokio_unstable)] + { + log_trace!( + self.logger, + "Active runtime tasks left prior to shutdown: {}", + runtime.metrics().active_tasks_count() + ); + } + + // Shutdown our runtime. By now ~no or only very few tasks should be left. runtime.shutdown_timeout(Duration::from_secs(10)); log_info!(self.logger, "Shutdown complete."); @@ -1167,6 +1279,8 @@ impl Node { tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on( async move { let now = Instant::now(); + // We don't add an additional timeout here, as `Wallet::sync` already returns + // after a timeout. match wallet.sync().await { Ok(()) => { log_info!( @@ -1187,6 +1301,8 @@ impl Node { }; let now = Instant::now(); + // We don't add an additional timeout here, as + // `FeeEstimator::update_fee_estimates` already returns after a timeout. match fee_estimator.update_fee_estimates().await { Ok(()) => { log_info!( @@ -1207,30 +1323,40 @@ impl Node { } let now = Instant::now(); - match tx_sync.sync(confirmables).await { - Ok(()) => { - log_info!( - sync_logger, - "Sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); + let tx_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), + tx_sync.sync(confirmables), + ); + match tx_sync_timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_info!( + sync_logger, + "Sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - periodically_archive_fully_resolved_monitors( - archive_cman, - archive_cmon, - sync_monitor_archival_height, - ); - Ok(()) + periodically_archive_fully_resolved_monitors( + archive_cman, + archive_cmon, + sync_monitor_archival_height, + ); + Ok(()) + }, + Err(e) => { + log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e); + Err(e.into()) + }, }, Err(e) => { - log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e); - Err(e.into()) + log_error!(sync_logger, "Sync of Lightning wallet timed out: {}", e); + Err(Error::TxSyncTimeout) }, } }, diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 40483f578..4492bcfc6 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -1,4 +1,5 @@ -use crate::logger::{log_bytes, log_debug, log_error, log_trace, Logger}; +use crate::config::TX_BROADCAST_TIMEOUT_SECS; +use crate::logger::{log_bytes, log_error, log_trace, Logger}; use lightning::chain::chaininterface::BroadcasterInterface; use lightning::util::ser::Writeable; @@ -7,6 +8,7 @@ use esplora_client::AsyncClient as EsploraClient; use bitcoin::Transaction; +use reqwest::StatusCode; use tokio::sync::mpsc; use tokio::sync::Mutex; @@ -38,57 +40,68 @@ where let mut receiver = self.queue_receiver.lock().await; while let Some(next_package) = receiver.recv().await { for tx in &next_package { - match self.esplora_client.broadcast(tx).await { - Ok(()) => { - log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid()); - }, - Err(e) => match e { - esplora_client::Error::Reqwest(_) => { - // Wait 500 ms and retry in case we get a `Reqwest` error (typically - // 429) - tokio::time::sleep(Duration::from_millis(500)).await; - log_error!( + let timeout_fut = tokio::time::timeout( + Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), + self.esplora_client.broadcast(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_trace!( self.logger, - "Sync failed due to HTTP connection error, retrying: {}", - e + "Successfully broadcast transaction {}", + tx.txid() ); - match self.esplora_client.broadcast(tx).await { - Ok(()) => { - log_debug!( - self.logger, - "Successfully broadcast transaction {}", - tx.txid() - ); - }, - Err(e) => { + }, + Err(e) => match e { + esplora_client::Error::Reqwest(err) => { + if err.status() == StatusCode::from_u16(400).ok() { + // Ignore 400, as this just means bitcoind already knows the + // transaction. + // FIXME: We can further differentiate here based on the error + // message which will be available with rust-esplora-client 0.7 and + // later. + } else { log_error!( self.logger, - "Failed to broadcast transaction {}: {}", - tx.txid(), - e - ); - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) + "Failed to broadcast due to HTTP connection error: {}", + err ); - }, - } - }, - _ => { - log_error!( - self.logger, - "Failed to broadcast transaction {}: {}", - tx.txid(), - e - ); - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); + } + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + _ => { + log_error!( + self.logger, + "Failed to broadcast transaction {}: {}", + tx.txid(), + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, }, }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction due to timeout {}: {}", + tx.txid(), + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, } } } diff --git a/src/wallet.rs b/src/wallet.rs index d970b52a6..0da3f6db8 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1,5 +1,6 @@ use crate::logger::{log_error, log_info, log_trace, Logger}; +use crate::config::BDK_WALLET_SYNC_TIMEOUT_SECS; use crate::Error; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; @@ -17,7 +18,7 @@ use lightning::util::message_signing; use bdk::blockchain::EsploraBlockchain; use bdk::database::BatchDatabase; use bdk::wallet::AddressIndex; -use bdk::FeeRate; +use bdk::{Balance, FeeRate}; use bdk::{SignOptions, SyncOptions}; use bitcoin::address::{Payload, WitnessVersion}; @@ -33,10 +34,15 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey, Signing}; use bitcoin::{ScriptBuf, Transaction, TxOut, Txid}; -use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex}; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; +enum WalletSyncStatus { + Completed, + InProgress { subscribers: tokio::sync::broadcast::Sender> }, +} + pub struct Wallet where D: BatchDatabase, @@ -51,7 +57,10 @@ where // A cache storing the most recently retrieved fee rate estimations. broadcaster: B, fee_estimator: E, - sync_lock: (Mutex<()>, Condvar), + // A Mutex holding the current sync status. + sync_status: Mutex, + // TODO: Drop this workaround after BDK 1.0 upgrade. + balance_cache: RwLock, logger: L, } @@ -66,55 +75,77 @@ where blockchain: EsploraBlockchain, wallet: bdk::Wallet, broadcaster: B, fee_estimator: E, logger: L, ) -> Self { + let start_balance = wallet.get_balance().unwrap_or(Balance { + immature: 0, + trusted_pending: 0, + untrusted_pending: 0, + confirmed: 0, + }); + let inner = Mutex::new(wallet); - let sync_lock = (Mutex::new(()), Condvar::new()); - Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, logger } + let sync_status = Mutex::new(WalletSyncStatus::Completed); + let balance_cache = RwLock::new(start_balance); + Self { blockchain, inner, broadcaster, fee_estimator, sync_status, balance_cache, logger } } pub(crate) async fn sync(&self) -> Result<(), Error> { - let (lock, cvar) = &self.sync_lock; - - let guard = match lock.try_lock() { - Ok(guard) => guard, - Err(_) => { - log_info!(self.logger, "Sync in progress, skipping."); - let guard = cvar.wait(lock.lock().unwrap()); - drop(guard); - cvar.notify_all(); - return Ok(()); - }, - }; + if let Some(mut sync_receiver) = self.register_or_subscribe_pending_sync() { + log_info!(self.logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } - let sync_options = SyncOptions { progress: None }; - let wallet_lock = self.inner.lock().unwrap(); - let res = match wallet_lock.sync(&self.blockchain, sync_options).await { - Ok(()) => Ok(()), - Err(e) => match e { - bdk::Error::Esplora(ref be) => match **be { - bdk::blockchain::esplora::EsploraError::Reqwest(_) => { - tokio::time::sleep(Duration::from_secs(1)).await; - log_error!( - self.logger, - "Sync failed due to HTTP connection error, retrying: {}", - e - ); - let sync_options = SyncOptions { progress: None }; - wallet_lock.sync(&self.blockchain, sync_options).await.map_err(From::from) + let res = { + let wallet_lock = self.inner.lock().unwrap(); + + let wallet_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + wallet_lock.sync(&self.blockchain, SyncOptions { progress: None }), + ); + + match wallet_sync_timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + // TODO: Drop this workaround after BDK 1.0 upgrade. + // Update balance cache after syncing. + if let Ok(balance) = wallet_lock.get_balance() { + *self.balance_cache.write().unwrap() = balance; + } + Ok(()) }, - _ => { - log_error!(self.logger, "Sync failed due to Esplora error: {}", e); - Err(From::from(e)) + Err(e) => match e { + bdk::Error::Esplora(ref be) => match **be { + bdk::blockchain::esplora::EsploraError::Reqwest(_) => { + log_error!( + self.logger, + "Sync failed due to HTTP connection error: {}", + e + ); + Err(From::from(e)) + }, + _ => { + log_error!(self.logger, "Sync failed due to Esplora error: {}", e); + Err(From::from(e)) + }, + }, + _ => { + log_error!(self.logger, "Wallet sync error: {}", e); + Err(From::from(e)) + }, }, }, - _ => { - log_error!(self.logger, "Wallet sync error: {}", e); - Err(From::from(e)) + Err(e) => { + log_error!(self.logger, "On-chain wallet sync timed out: {}", e); + Err(Error::WalletOperationTimeout) }, - }, + } }; - drop(guard); - cvar.notify_all(); + self.propagate_result_to_subscribers(res); + res } @@ -175,13 +206,24 @@ where pub(crate) fn get_balances( &self, total_anchor_channels_reserve_sats: u64, ) -> Result<(u64, u64), Error> { - let wallet_lock = self.inner.lock().unwrap(); - let (total, spendable) = wallet_lock.get_balance().map(|bal| { - ( - bal.get_total(), - bal.get_spendable().saturating_sub(total_anchor_channels_reserve_sats), - ) - })?; + // TODO: Drop this workaround after BDK 1.0 upgrade. + // We get the balance and update our cache if we can do so without blocking on the wallet + // Mutex. Otherwise, we return a cached value. + let balance = match self.inner.try_lock() { + Ok(wallet_lock) => { + // Update balance cache if we can. + let balance = wallet_lock.get_balance()?; + *self.balance_cache.write().unwrap() = balance.clone(); + balance + }, + Err(_) => self.balance_cache.read().unwrap().clone(), + }; + + let (total, spendable) = ( + balance.get_total(), + balance.get_spendable().saturating_sub(total_anchor_channels_reserve_sats), + ); + Ok((total, spendable)) } @@ -268,6 +310,59 @@ where Ok(txid) } + + fn register_or_subscribe_pending_sync( + &self, + ) -> Option>> { + let mut sync_status_lock = self.sync_status.lock().unwrap(); + match sync_status_lock.deref_mut() { + WalletSyncStatus::Completed => { + // We're first to register for a sync. + let (tx, _) = tokio::sync::broadcast::channel(1); + *sync_status_lock = WalletSyncStatus::InProgress { subscribers: tx }; + None + }, + WalletSyncStatus::InProgress { subscribers } => { + // A sync is in-progress, we subscribe. + let rx = subscribers.subscribe(); + Some(rx) + }, + } + } + + fn propagate_result_to_subscribers(&self, res: Result<(), Error>) { + // Send the notification to any other tasks that might be waiting on it by now. + { + let mut sync_status_lock = self.sync_status.lock().unwrap(); + match sync_status_lock.deref_mut() { + WalletSyncStatus::Completed => { + // No sync in-progress, do nothing. + return; + }, + WalletSyncStatus::InProgress { subscribers } => { + // A sync is in-progress, we notify subscribers. + if subscribers.receiver_count() > 0 { + match subscribers.send(res) { + Ok(_) => (), + Err(e) => { + debug_assert!( + false, + "Failed to send wallet sync result to subscribers: {:?}", + e + ); + log_error!( + self.logger, + "Failed to send wallet sync result to subscribers: {:?}", + e + ); + }, + } + } + *sync_status_lock = WalletSyncStatus::Completed; + }, + } + } + } } impl WalletSource for Wallet