From 80d24c399dfd54ca5c5fad0cfcf5ef2a50052e87 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 18 Mar 2024 11:41:20 +0100 Subject: [PATCH 01/13] Introduce `balance_cache` to avoid blocking on retrieving balances Unfortunately BDK's current wallet design requires us to have it live in `Mutex` that is locked for long periods of time during syncing. This is especially painful for short-lived operations that just operate locally, such as retrieving the current balance, which we now do in several places to be able to check Anchor channels limitations, e.g., in event handling. In order to avoid blocking during balance retrieval, we introduce a `balance` cache that will be refreshed whenever we're done with syncing *or* when we can successfully get the wallet lock. Otherwise, we'll just return the cached value, allowing us to make progress even though a background sync of the wallet might be in-progress. --- src/wallet.rs | 59 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 12 deletions(-) diff --git a/src/wallet.rs b/src/wallet.rs index d970b52a6..5ade04481 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -17,7 +17,7 @@ use lightning::util::message_signing; use bdk::blockchain::EsploraBlockchain; use bdk::database::BatchDatabase; use bdk::wallet::AddressIndex; -use bdk::FeeRate; +use bdk::{Balance, FeeRate}; use bdk::{SignOptions, SyncOptions}; use bitcoin::address::{Payload, WitnessVersion}; @@ -34,7 +34,7 @@ use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey, Signing}; use bitcoin::{ScriptBuf, Transaction, TxOut, Txid}; use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::Duration; pub struct Wallet @@ -52,6 +52,8 @@ where broadcaster: B, fee_estimator: E, sync_lock: (Mutex<()>, Condvar), + // TODO: Drop this workaround after BDK 1.0 upgrade. + balance_cache: RwLock, logger: L, } @@ -66,9 +68,17 @@ where blockchain: EsploraBlockchain, wallet: bdk::Wallet, broadcaster: B, fee_estimator: E, logger: L, ) -> Self { + let start_balance = wallet.get_balance().unwrap_or(Balance { + immature: 0, + trusted_pending: 0, + untrusted_pending: 0, + confirmed: 0, + }); + let inner = Mutex::new(wallet); let sync_lock = (Mutex::new(()), Condvar::new()); - Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, logger } + let balance_cache = RwLock::new(start_balance); + Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, balance_cache, logger } } pub(crate) async fn sync(&self) -> Result<(), Error> { @@ -88,10 +98,19 @@ where let sync_options = SyncOptions { progress: None }; let wallet_lock = self.inner.lock().unwrap(); let res = match wallet_lock.sync(&self.blockchain, sync_options).await { - Ok(()) => Ok(()), + Ok(()) => { + // TODO: Drop this workaround after BDK 1.0 upgrade. + // Update balance cache after syncing. + if let Ok(balance) = wallet_lock.get_balance() { + *self.balance_cache.write().unwrap() = balance; + } + Ok(()) + }, Err(e) => match e { bdk::Error::Esplora(ref be) => match **be { bdk::blockchain::esplora::EsploraError::Reqwest(_) => { + // Drop lock, sleep for a second, retry. + drop(wallet_lock); tokio::time::sleep(Duration::from_secs(1)).await; log_error!( self.logger, @@ -99,7 +118,12 @@ where e ); let sync_options = SyncOptions { progress: None }; - wallet_lock.sync(&self.blockchain, sync_options).await.map_err(From::from) + self.inner + .lock() + .unwrap() + .sync(&self.blockchain, sync_options) + .await + .map_err(From::from) }, _ => { log_error!(self.logger, "Sync failed due to Esplora error: {}", e); @@ -175,13 +199,24 @@ where pub(crate) fn get_balances( &self, total_anchor_channels_reserve_sats: u64, ) -> Result<(u64, u64), Error> { - let wallet_lock = self.inner.lock().unwrap(); - let (total, spendable) = wallet_lock.get_balance().map(|bal| { - ( - bal.get_total(), - bal.get_spendable().saturating_sub(total_anchor_channels_reserve_sats), - ) - })?; + // TODO: Drop this workaround after BDK 1.0 upgrade. + // We get the balance and update our cache if we can do so without blocking on the wallet + // Mutex. Otherwise, we return a cached value. + let balance = match self.inner.try_lock() { + Ok(wallet_lock) => { + // Update balance cache if we can. + let balance = wallet_lock.get_balance()?; + *self.balance_cache.write().unwrap() = balance.clone(); + balance + }, + Err(_) => self.balance_cache.read().unwrap().clone(), + }; + + let (total, spendable) = ( + balance.get_total(), + balance.get_spendable().saturating_sub(total_anchor_channels_reserve_sats), + ); + Ok((total, spendable)) } From e3279d7ee831d89dd3fdeb99ccda81524fc63b0b Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Apr 2024 14:00:30 +0200 Subject: [PATCH 02/13] Drop potentially dangerous `sync_lock` `Condvar` and use pub/sub model Using a `Condvar` could be potentially dangerous in async contexts as `wait`ing on it might block the current thread potentially hosting more than one task. Here, we drop the `Condvar` and adopt a pub/sub scheme instead, similar to the one we already implemented in `ConnectionManager`. --- src/wallet.rs | 164 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 110 insertions(+), 54 deletions(-) diff --git a/src/wallet.rs b/src/wallet.rs index 5ade04481..23679e141 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -33,10 +33,15 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey, Signing}; use bitcoin::{ScriptBuf, Transaction, TxOut, Txid}; -use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; +enum WalletSyncStatus { + Completed, + InProgress { subscribers: tokio::sync::broadcast::Sender> }, +} + pub struct Wallet where D: BatchDatabase, @@ -51,7 +56,8 @@ where // A cache storing the most recently retrieved fee rate estimations. broadcaster: B, fee_estimator: E, - sync_lock: (Mutex<()>, Condvar), + // A Mutex holding the current sync status. + sync_status: Mutex, // TODO: Drop this workaround after BDK 1.0 upgrade. balance_cache: RwLock, logger: L, @@ -76,69 +82,66 @@ where }); let inner = Mutex::new(wallet); - let sync_lock = (Mutex::new(()), Condvar::new()); + let sync_status = Mutex::new(WalletSyncStatus::Completed); let balance_cache = RwLock::new(start_balance); - Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, balance_cache, logger } + Self { blockchain, inner, broadcaster, fee_estimator, sync_status, balance_cache, logger } } pub(crate) async fn sync(&self) -> Result<(), Error> { - let (lock, cvar) = &self.sync_lock; - - let guard = match lock.try_lock() { - Ok(guard) => guard, - Err(_) => { - log_info!(self.logger, "Sync in progress, skipping."); - let guard = cvar.wait(lock.lock().unwrap()); - drop(guard); - cvar.notify_all(); - return Ok(()); - }, - }; + if let Some(mut sync_receiver) = self.register_or_subscribe_pending_sync() { + log_info!(self.logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet sync result: {:?}", e); + log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e); + Error::WalletOperationFailed + })?; + } - let sync_options = SyncOptions { progress: None }; - let wallet_lock = self.inner.lock().unwrap(); - let res = match wallet_lock.sync(&self.blockchain, sync_options).await { - Ok(()) => { - // TODO: Drop this workaround after BDK 1.0 upgrade. - // Update balance cache after syncing. - if let Ok(balance) = wallet_lock.get_balance() { - *self.balance_cache.write().unwrap() = balance; - } - Ok(()) - }, - Err(e) => match e { - bdk::Error::Esplora(ref be) => match **be { - bdk::blockchain::esplora::EsploraError::Reqwest(_) => { - // Drop lock, sleep for a second, retry. - drop(wallet_lock); - tokio::time::sleep(Duration::from_secs(1)).await; - log_error!( - self.logger, - "Sync failed due to HTTP connection error, retrying: {}", - e - ); - let sync_options = SyncOptions { progress: None }; - self.inner - .lock() - .unwrap() - .sync(&self.blockchain, sync_options) - .await - .map_err(From::from) + let res = { + let wallet_lock = self.inner.lock().unwrap(); + match wallet_lock.sync(&self.blockchain, SyncOptions { progress: None }).await { + Ok(()) => { + // TODO: Drop this workaround after BDK 1.0 upgrade. + // Update balance cache after syncing. + if let Ok(balance) = wallet_lock.get_balance() { + *self.balance_cache.write().unwrap() = balance; + } + Ok(()) + }, + Err(e) => match e { + bdk::Error::Esplora(ref be) => match **be { + bdk::blockchain::esplora::EsploraError::Reqwest(_) => { + // Drop lock, sleep for a second, retry. + drop(wallet_lock); + tokio::time::sleep(Duration::from_secs(1)).await; + log_error!( + self.logger, + "Sync failed due to HTTP connection error, retrying: {}", + e + ); + let sync_options = SyncOptions { progress: None }; + self.inner + .lock() + .unwrap() + .sync(&self.blockchain, sync_options) + .await + .map_err(From::from) + }, + _ => { + log_error!(self.logger, "Sync failed due to Esplora error: {}", e); + Err(From::from(e)) + }, }, _ => { - log_error!(self.logger, "Sync failed due to Esplora error: {}", e); + log_error!(self.logger, "Wallet sync error: {}", e); Err(From::from(e)) }, }, - _ => { - log_error!(self.logger, "Wallet sync error: {}", e); - Err(From::from(e)) - }, - }, + } }; - drop(guard); - cvar.notify_all(); + self.propagate_result_to_subscribers(res); + res } @@ -303,6 +306,59 @@ where Ok(txid) } + + fn register_or_subscribe_pending_sync( + &self, + ) -> Option>> { + let mut sync_status_lock = self.sync_status.lock().unwrap(); + match sync_status_lock.deref_mut() { + WalletSyncStatus::Completed => { + // We're first to register for a sync. + let (tx, _) = tokio::sync::broadcast::channel(1); + *sync_status_lock = WalletSyncStatus::InProgress { subscribers: tx }; + None + }, + WalletSyncStatus::InProgress { subscribers } => { + // A sync is in-progress, we subscribe. + let rx = subscribers.subscribe(); + Some(rx) + }, + } + } + + fn propagate_result_to_subscribers(&self, res: Result<(), Error>) { + // Send the notification to any other tasks that might be waiting on it by now. + { + let mut sync_status_lock = self.sync_status.lock().unwrap(); + match sync_status_lock.deref_mut() { + WalletSyncStatus::Completed => { + // No sync in-progress, do nothing. + return; + }, + WalletSyncStatus::InProgress { subscribers } => { + // A sync is in-progress, we notify subscribers. + if subscribers.receiver_count() > 0 { + match subscribers.send(res) { + Ok(_) => (), + Err(e) => { + debug_assert!( + false, + "Failed to send wallet sync result to subscribers: {:?}", + e + ); + log_error!( + self.logger, + "Failed to send wallet sync result to subscribers: {:?}", + e + ); + }, + } + } + *sync_status_lock = WalletSyncStatus::Completed; + }, + } + } + } } impl WalletSource for Wallet From fd4b33f627c71a79777be1a60524d22290631157 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 12 Apr 2024 14:16:53 +0200 Subject: [PATCH 03/13] Drop immediate-retry logic in `wallet` It's not super clear that it achieves much in the face of a rate-limited Esplora server, and having a custom sleep there is just awkward. So we drop it and hope we still get a chance to sync our on-chain wallet now and then. --- src/wallet.rs | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/wallet.rs b/src/wallet.rs index 23679e141..95b3d9008 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -35,7 +35,6 @@ use bitcoin::{ScriptBuf, Transaction, TxOut, Txid}; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, RwLock}; -use std::time::Duration; enum WalletSyncStatus { Completed, @@ -111,21 +110,12 @@ where Err(e) => match e { bdk::Error::Esplora(ref be) => match **be { bdk::blockchain::esplora::EsploraError::Reqwest(_) => { - // Drop lock, sleep for a second, retry. - drop(wallet_lock); - tokio::time::sleep(Duration::from_secs(1)).await; log_error!( self.logger, - "Sync failed due to HTTP connection error, retrying: {}", + "Sync failed due to HTTP connection error: {}", e ); - let sync_options = SyncOptions { progress: None }; - self.inner - .lock() - .unwrap() - .sync(&self.blockchain, sync_options) - .await - .map_err(From::from) + Err(From::from(e)) }, _ => { log_error!(self.logger, "Sync failed due to Esplora error: {}", e); From 82ab9ac425250e2624bc3756bc11d82afc82492f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 18 Jun 2024 10:45:56 +0200 Subject: [PATCH 04/13] Drop immediate-retry logic in `tx_broadcaster` .. as we're not sure it actually increases reliability. We now only log failures, ignoring HTTP 400 as this is bitcoind's error code for "transaction already in mempool". --- src/tx_broadcaster.rs | 52 +++++++++++++++++-------------------------- 1 file changed, 20 insertions(+), 32 deletions(-) diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 40483f578..2019db28c 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -1,4 +1,4 @@ -use crate::logger::{log_bytes, log_debug, log_error, log_trace, Logger}; +use crate::logger::{log_bytes, log_error, log_trace, Logger}; use lightning::chain::chaininterface::BroadcasterInterface; use lightning::util::ser::Writeable; @@ -7,11 +7,11 @@ use esplora_client::AsyncClient as EsploraClient; use bitcoin::Transaction; +use reqwest::StatusCode; use tokio::sync::mpsc; use tokio::sync::Mutex; use std::ops::Deref; -use std::time::Duration; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; @@ -43,36 +43,24 @@ where log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid()); }, Err(e) => match e { - esplora_client::Error::Reqwest(_) => { - // Wait 500 ms and retry in case we get a `Reqwest` error (typically - // 429) - tokio::time::sleep(Duration::from_millis(500)).await; - log_error!( - self.logger, - "Sync failed due to HTTP connection error, retrying: {}", - e - ); - match self.esplora_client.broadcast(tx).await { - Ok(()) => { - log_debug!( - self.logger, - "Successfully broadcast transaction {}", - tx.txid() - ); - }, - Err(e) => { - log_error!( - self.logger, - "Failed to broadcast transaction {}: {}", - tx.txid(), - e - ); - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); - }, + esplora_client::Error::Reqwest(err) => { + if err.status() == StatusCode::from_u16(400).ok() { + // Ignore 400, as this just means bitcoind already knows the + // transaction. + // FIXME: We can further differentiate here based on the error + // message which will be available with rust-esplora-client 0.7 and + // later. + } else { + log_error!( + self.logger, + "Failed to broadcast due to HTTP connection error: {}", + err + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); } }, _ => { From f58f00f13b10ff3897d5a22ed26d60e35887b400 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 15 May 2024 11:24:06 +0200 Subject: [PATCH 05/13] Add timeout for on-chain syncing .. to make progress and unblock the `Mutex` even if BDK's wallet `sync` would never return. --- bindings/ldk_node.udl | 1 + src/config.rs | 3 +++ src/error.rs | 3 +++ src/wallet.rs | 60 ++++++++++++++++++++++++++----------------- 4 files changed, 44 insertions(+), 23 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 26ab55994..188fefd09 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -166,6 +166,7 @@ enum NodeError { "PersistenceFailed", "FeerateEstimationUpdateFailed", "WalletOperationFailed", + "WalletOperationTimeout", "OnchainTxSigningFailed", "MessageSigningFailed", "TxSyncFailed", diff --git a/src/config.rs b/src/config.rs index 7adae2d4b..4b4ecc272 100644 --- a/src/config.rs +++ b/src/config.rs @@ -46,6 +46,9 @@ pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60 // The lower limit which we apply to any configured wallet sync intervals. pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; +// The timeout after which we abort a wallet syncing operation. +pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90; + // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; diff --git a/src/error.rs b/src/error.rs index 63ec0ad84..8cc3950ec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,6 +35,8 @@ pub enum Error { FeerateEstimationUpdateFailed, /// A wallet operation failed. WalletOperationFailed, + /// A wallet operation timed out. + WalletOperationTimeout, /// A signing operation for transaction failed. OnchainTxSigningFailed, /// A signing operation for message failed. @@ -112,6 +114,7 @@ impl fmt::Display for Error { write!(f, "Failed to update fee rate estimates.") }, Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."), + Self::WalletOperationTimeout => write!(f, "A wallet operation timed out."), Self::OnchainTxSigningFailed => write!(f, "Failed to sign given transaction."), Self::MessageSigningFailed => write!(f, "Failed to sign given message."), Self::TxSyncFailed => write!(f, "Failed to sync transactions."), diff --git a/src/wallet.rs b/src/wallet.rs index 95b3d9008..0da3f6db8 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -1,5 +1,6 @@ use crate::logger::{log_error, log_info, log_trace, Logger}; +use crate::config::BDK_WALLET_SYNC_TIMEOUT_SECS; use crate::Error; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; @@ -35,6 +36,7 @@ use bitcoin::{ScriptBuf, Transaction, TxOut, Txid}; use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; enum WalletSyncStatus { Completed, @@ -98,34 +100,46 @@ where let res = { let wallet_lock = self.inner.lock().unwrap(); - match wallet_lock.sync(&self.blockchain, SyncOptions { progress: None }).await { - Ok(()) => { - // TODO: Drop this workaround after BDK 1.0 upgrade. - // Update balance cache after syncing. - if let Ok(balance) = wallet_lock.get_balance() { - *self.balance_cache.write().unwrap() = balance; - } - Ok(()) - }, - Err(e) => match e { - bdk::Error::Esplora(ref be) => match **be { - bdk::blockchain::esplora::EsploraError::Reqwest(_) => { - log_error!( - self.logger, - "Sync failed due to HTTP connection error: {}", - e - ); - Err(From::from(e)) + + let wallet_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + wallet_lock.sync(&self.blockchain, SyncOptions { progress: None }), + ); + + match wallet_sync_timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + // TODO: Drop this workaround after BDK 1.0 upgrade. + // Update balance cache after syncing. + if let Ok(balance) = wallet_lock.get_balance() { + *self.balance_cache.write().unwrap() = balance; + } + Ok(()) + }, + Err(e) => match e { + bdk::Error::Esplora(ref be) => match **be { + bdk::blockchain::esplora::EsploraError::Reqwest(_) => { + log_error!( + self.logger, + "Sync failed due to HTTP connection error: {}", + e + ); + Err(From::from(e)) + }, + _ => { + log_error!(self.logger, "Sync failed due to Esplora error: {}", e); + Err(From::from(e)) + }, }, _ => { - log_error!(self.logger, "Sync failed due to Esplora error: {}", e); + log_error!(self.logger, "Wallet sync error: {}", e); Err(From::from(e)) }, }, - _ => { - log_error!(self.logger, "Wallet sync error: {}", e); - Err(From::from(e)) - }, + }, + Err(e) => { + log_error!(self.logger, "On-chain wallet sync timed out: {}", e); + Err(Error::WalletOperationTimeout) }, } }; From 746014c4bce62e359a106d656da11f138d3ab210 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 15 May 2024 11:38:48 +0200 Subject: [PATCH 06/13] Add timeout for Lightning syncing .. even though we don't expect this to block, we're better safe than sorry and start to introduce timeouts for any calls we make to remote servers. --- bindings/ldk_node.udl | 1 + src/config.rs | 3 ++ src/error.rs | 3 ++ src/lib.rs | 98 ++++++++++++++++++++++++++----------------- 4 files changed, 66 insertions(+), 39 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 188fefd09..3bd2f8b08 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -170,6 +170,7 @@ enum NodeError { "OnchainTxSigningFailed", "MessageSigningFailed", "TxSyncFailed", + "TxSyncTimeout", "GossipUpdateFailed", "LiquidityRequestFailed", "InvalidAddress", diff --git a/src/config.rs b/src/config.rs index 4b4ecc272..dd7372528 100644 --- a/src/config.rs +++ b/src/config.rs @@ -49,6 +49,9 @@ pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; // The timeout after which we abort a wallet syncing operation. pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90; +// The timeout after which we abort a wallet syncing operation. +pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; + // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; diff --git a/src/error.rs b/src/error.rs index 8cc3950ec..3d427279c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -43,6 +43,8 @@ pub enum Error { MessageSigningFailed, /// A transaction sync operation failed. TxSyncFailed, + /// A transaction sync operation timed out. + TxSyncTimeout, /// A gossip updating operation failed. GossipUpdateFailed, /// A liquidity request operation failed. @@ -118,6 +120,7 @@ impl fmt::Display for Error { Self::OnchainTxSigningFailed => write!(f, "Failed to sign given transaction."), Self::MessageSigningFailed => write!(f, "Failed to sign given message."), Self::TxSyncFailed => write!(f, "Failed to sync transactions."), + Self::TxSyncTimeout => write!(f, "Syncing transactions timed out."), Self::GossipUpdateFailed => write!(f, "Failed to update gossip data."), Self::LiquidityRequestFailed => write!(f, "Failed to request inbound liquidity."), Self::InvalidAddress => write!(f, "The given address is invalid."), diff --git a/src/lib.rs b/src/lib.rs index 8c504ed51..45bb11059 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,8 +123,8 @@ pub use builder::BuildError; pub use builder::NodeBuilder as Builder; use config::{ - default_user_config, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, - RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, LDK_WALLET_SYNC_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, + PEER_RECONNECTION_INTERVAL, RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use connection::ConnectionManager; @@ -377,25 +377,31 @@ impl Node { &*sync_sweeper as &(dyn Confirm + Sync + Send), ]; let now = Instant::now(); - match tx_sync.sync(confirmables).await { - Ok(()) => { - log_trace!( - sync_logger, - "Background sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - - periodically_archive_fully_resolved_monitors( - Arc::clone(&archive_cman), - Arc::clone(&archive_cmon), - Arc::clone(&sync_monitor_archival_height) - ); + let timeout_fut = tokio::time::timeout(Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), tx_sync.sync(confirmables)); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_trace!( + sync_logger, + "Background sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + + periodically_archive_fully_resolved_monitors( + Arc::clone(&archive_cman), + Arc::clone(&archive_cmon), + Arc::clone(&sync_monitor_archival_height) + ); + } + Err(e) => { + log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + } } Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + log_error!(sync_logger, "Background sync of Lightning wallet timed out: {}", e) } } } @@ -1167,6 +1173,8 @@ impl Node { tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on( async move { let now = Instant::now(); + // We don't add an additional timeout here, as `Wallet::sync` already returns + // after a timeout. match wallet.sync().await { Ok(()) => { log_info!( @@ -1187,6 +1195,8 @@ impl Node { }; let now = Instant::now(); + // We don't add an additional timeout here, as + // `FeeEstimator::update_fee_estimates` already returns after a timeout. match fee_estimator.update_fee_estimates().await { Ok(()) => { log_info!( @@ -1207,30 +1217,40 @@ impl Node { } let now = Instant::now(); - match tx_sync.sync(confirmables).await { - Ok(()) => { - log_info!( - sync_logger, - "Sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ); + let tx_sync_timeout_fut = tokio::time::timeout( + Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS), + tx_sync.sync(confirmables), + ); + match tx_sync_timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_info!( + sync_logger, + "Sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + *sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - periodically_archive_fully_resolved_monitors( - archive_cman, - archive_cmon, - sync_monitor_archival_height, - ); - Ok(()) + periodically_archive_fully_resolved_monitors( + archive_cman, + archive_cmon, + sync_monitor_archival_height, + ); + Ok(()) + }, + Err(e) => { + log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e); + Err(e.into()) + }, }, Err(e) => { - log_error!(sync_logger, "Sync of Lightning wallet failed: {}", e); - Err(e.into()) + log_error!(sync_logger, "Sync of Lightning wallet timed out: {}", e); + Err(Error::TxSyncTimeout) }, } }, From 02e4b3f69af410c171b8645fce3e4b205574a7cd Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 15 May 2024 11:46:21 +0200 Subject: [PATCH 07/13] Add timeout for fee rate cache updates .. even though we don't expect this to block, we're better safe than sorry and start to introduce timeouts for any calls we make to remote servers. --- bindings/ldk_node.udl | 1 + src/config.rs | 3 +++ src/error.rs | 5 +++++ src/fee_estimator.rs | 18 +++++++++++++++++- 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 3bd2f8b08..fff7ced89 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -165,6 +165,7 @@ enum NodeError { "ChannelConfigUpdateFailed", "PersistenceFailed", "FeerateEstimationUpdateFailed", + "FeerateEstimationUpdateTimeout", "WalletOperationFailed", "WalletOperationTimeout", "OnchainTxSigningFailed", diff --git a/src/config.rs b/src/config.rs index dd7372528..365795175 100644 --- a/src/config.rs +++ b/src/config.rs @@ -52,6 +52,9 @@ pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90; // The timeout after which we abort a wallet syncing operation. pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; +// The timeout after which we abort a fee rate cache update operation. +pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; + // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; diff --git a/src/error.rs b/src/error.rs index 3d427279c..8ffdc53ec 100644 --- a/src/error.rs +++ b/src/error.rs @@ -33,6 +33,8 @@ pub enum Error { PersistenceFailed, /// A fee rate estimation update failed. FeerateEstimationUpdateFailed, + /// A fee rate estimation update timed out. + FeerateEstimationUpdateTimeout, /// A wallet operation failed. WalletOperationFailed, /// A wallet operation timed out. @@ -115,6 +117,9 @@ impl fmt::Display for Error { Self::FeerateEstimationUpdateFailed => { write!(f, "Failed to update fee rate estimates.") }, + Self::FeerateEstimationUpdateTimeout => { + write!(f, "Updating fee rate estimates timed out.") + }, Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."), Self::WalletOperationTimeout => write!(f, "A wallet operation timed out."), Self::OnchainTxSigningFailed => write!(f, "Failed to sign given transaction."), diff --git a/src/fee_estimator.rs b/src/fee_estimator.rs index 74518227f..f1fa7e43b 100644 --- a/src/fee_estimator.rs +++ b/src/fee_estimator.rs @@ -1,3 +1,4 @@ +use crate::config::FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS; use crate::logger::{log_error, log_trace, Logger}; use crate::{Config, Error}; @@ -14,6 +15,7 @@ use bitcoin::Network; use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, RwLock}; +use std::time::Duration; pub(crate) struct OnchainFeeEstimator where @@ -55,7 +57,21 @@ where ConfirmationTarget::OutputSpendingFee => 12, }; - let estimates = self.esplora_client.get_fee_estimates().await.map_err(|e| { + let estimates = tokio::time::timeout( + Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), + self.esplora_client.get_fee_estimates(), + ) + .await + .map_err(|e| { + log_error!( + self.logger, + "Updating fee rate estimates for {:?} timed out: {}", + target, + e + ); + Error::FeerateEstimationUpdateTimeout + })? + .map_err(|e| { log_error!( self.logger, "Failed to retrieve fee rate estimates for {:?}: {}", From b0a1dfcb7663582f95a817243f0661a5c19ea8d2 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 15 May 2024 11:53:06 +0200 Subject: [PATCH 08/13] Add timeout for RGS updates .. even though we don't expect this to block, we're better safe than sorry and start to introduce timeouts for any calls we make to remote servers. --- bindings/ldk_node.udl | 1 + src/config.rs | 3 +++ src/error.rs | 3 +++ src/gossip.rs | 14 +++++++++++++- 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index fff7ced89..2723db573 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -173,6 +173,7 @@ enum NodeError { "TxSyncFailed", "TxSyncTimeout", "GossipUpdateFailed", + "GossipUpdateTimeout", "LiquidityRequestFailed", "InvalidAddress", "InvalidSocketAddress", diff --git a/src/config.rs b/src/config.rs index 365795175..91543ed87 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,6 +55,9 @@ pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; // The timeout after which we abort a fee rate cache update operation. pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; +// The timeout after which we abort a RGS sync operation. +pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5; + // The length in bytes of our wallets' keys seed. pub(crate) const WALLET_KEYS_SEED_LEN: usize = 64; diff --git a/src/error.rs b/src/error.rs index 8ffdc53ec..a8671d9a7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -49,6 +49,8 @@ pub enum Error { TxSyncTimeout, /// A gossip updating operation failed. GossipUpdateFailed, + /// A gossip updating operation timed out. + GossipUpdateTimeout, /// A liquidity request operation failed. LiquidityRequestFailed, /// The given address is invalid. @@ -127,6 +129,7 @@ impl fmt::Display for Error { Self::TxSyncFailed => write!(f, "Failed to sync transactions."), Self::TxSyncTimeout => write!(f, "Syncing transactions timed out."), Self::GossipUpdateFailed => write!(f, "Failed to update gossip data."), + Self::GossipUpdateTimeout => write!(f, "Updating gossip data timed out."), Self::LiquidityRequestFailed => write!(f, "Failed to request inbound liquidity."), Self::InvalidAddress => write!(f, "The given address is invalid."), Self::InvalidSocketAddress => write!(f, "The given network address is invalid."), diff --git a/src/gossip.rs b/src/gossip.rs index 5a41bf51c..1241b0cdc 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -1,3 +1,4 @@ +use crate::config::RGS_SYNC_TIMEOUT_SECS; use crate::logger::{log_trace, FilesystemLogger, Logger}; use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync}; use crate::Error; @@ -6,6 +7,7 @@ use lightning::routing::utxo::UtxoLookup; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use std::time::Duration; pub(crate) enum GossipSource { P2PNetwork { @@ -55,7 +57,17 @@ impl GossipSource { Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => { let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire); let query_url = format!("{}/{}", server_url, query_timestamp); - let response = reqwest::get(query_url).await.map_err(|e| { + + let response = tokio::time::timeout( + Duration::from_secs(RGS_SYNC_TIMEOUT_SECS), + reqwest::get(query_url), + ) + .await + .map_err(|e| { + log_trace!(logger, "Retrieving RGS gossip update timed out: {}", e); + Error::GossipUpdateTimeout + })? + .map_err(|e| { log_trace!(logger, "Failed to retrieve RGS gossip update: {}", e); Error::GossipUpdateFailed })?; From d67a3af47d81502c9c1044673c928788d575c80e Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 18 Jun 2024 12:01:42 +0200 Subject: [PATCH 09/13] Add timeout for broadcasting transactions --- src/config.rs | 3 ++ src/tx_broadcaster.rs | 83 ++++++++++++++++++++++++++++--------------- 2 files changed, 57 insertions(+), 29 deletions(-) diff --git a/src/config.rs b/src/config.rs index 91543ed87..12f3bd5ea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -55,6 +55,9 @@ pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; // The timeout after which we abort a fee rate cache update operation. pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; +// The timeout after which we abort a transaction broadcast operation. +pub(crate) const TX_BROADCAST_TIMEOUT_SECS: u64 = 5; + // The timeout after which we abort a RGS sync operation. pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5; diff --git a/src/tx_broadcaster.rs b/src/tx_broadcaster.rs index 2019db28c..4492bcfc6 100644 --- a/src/tx_broadcaster.rs +++ b/src/tx_broadcaster.rs @@ -1,3 +1,4 @@ +use crate::config::TX_BROADCAST_TIMEOUT_SECS; use crate::logger::{log_bytes, log_error, log_trace, Logger}; use lightning::chain::chaininterface::BroadcasterInterface; @@ -12,6 +13,7 @@ use tokio::sync::mpsc; use tokio::sync::Mutex; use std::ops::Deref; +use std::time::Duration; const BCAST_PACKAGE_QUEUE_SIZE: usize = 50; @@ -38,45 +40,68 @@ where let mut receiver = self.queue_receiver.lock().await; while let Some(next_package) = receiver.recv().await { for tx in &next_package { - match self.esplora_client.broadcast(tx).await { - Ok(()) => { - log_trace!(self.logger, "Successfully broadcast transaction {}", tx.txid()); - }, - Err(e) => match e { - esplora_client::Error::Reqwest(err) => { - if err.status() == StatusCode::from_u16(400).ok() { - // Ignore 400, as this just means bitcoind already knows the - // transaction. - // FIXME: We can further differentiate here based on the error - // message which will be available with rust-esplora-client 0.7 and - // later. - } else { + let timeout_fut = tokio::time::timeout( + Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), + self.esplora_client.broadcast(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(()) => { + log_trace!( + self.logger, + "Successfully broadcast transaction {}", + tx.txid() + ); + }, + Err(e) => match e { + esplora_client::Error::Reqwest(err) => { + if err.status() == StatusCode::from_u16(400).ok() { + // Ignore 400, as this just means bitcoind already knows the + // transaction. + // FIXME: We can further differentiate here based on the error + // message which will be available with rust-esplora-client 0.7 and + // later. + } else { + log_error!( + self.logger, + "Failed to broadcast due to HTTP connection error: {}", + err + ); + } + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + _ => { log_error!( self.logger, - "Failed to broadcast due to HTTP connection error: {}", - err + "Failed to broadcast transaction {}: {}", + tx.txid(), + e ); log_trace!( self.logger, "Failed broadcast transaction bytes: {}", log_bytes!(tx.encode()) ); - } - }, - _ => { - log_error!( - self.logger, - "Failed to broadcast transaction {}: {}", - tx.txid(), - e - ); - log_trace!( - self.logger, - "Failed broadcast transaction bytes: {}", - log_bytes!(tx.encode()) - ); + }, }, }, + Err(e) => { + log_error!( + self.logger, + "Failed to broadcast transaction due to timeout {}: {}", + tx.txid(), + e + ); + log_trace!( + self.logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, } } } From de69c7588951c53ea30384086d1ebcc90bd97342 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 15 May 2024 12:20:02 +0200 Subject: [PATCH 10/13] Log shutdowns of background tasks --- src/lib.rs | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 45bb11059..369072d48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -275,6 +275,10 @@ impl Node { loop { tokio::select! { _ = stop_sync.changed() => { + log_trace!( + sync_logger, + "Stopping background syncing on-chain wallet.", + ); return; } _ = onchain_wallet_sync_interval.tick() => { @@ -321,6 +325,10 @@ impl Node { loop { tokio::select! { _ = stop_fee_updates.changed() => { + log_trace!( + fee_update_logger, + "Stopping background updates of fee rate cache.", + ); return; } _ = fee_rate_update_interval.tick() => { @@ -368,6 +376,10 @@ impl Node { loop { tokio::select! { _ = stop_sync.changed() => { + log_trace!( + sync_logger, + "Stopping background syncing Lightning wallet.", + ); return; } _ = wallet_sync_interval.tick() => { @@ -420,6 +432,10 @@ impl Node { loop { tokio::select! { _ = stop_gossip_sync.changed() => { + log_trace!( + gossip_sync_logger, + "Stopping background syncing RGS gossip data.", + ); return; } _ = interval.tick() => { @@ -495,6 +511,10 @@ impl Node { let peer_mgr = Arc::clone(&peer_manager_connection_handler); tokio::select! { _ = stop_listen.changed() => { + log_trace!( + listening_logger, + "Stopping listening to inbound connections.", + ); break; } res = listener.accept() => { @@ -527,6 +547,10 @@ impl Node { loop { tokio::select! { _ = stop_connect.changed() => { + log_trace!( + connect_logger, + "Stopping reconnecting known peers.", + ); return; } _ = interval.tick() => { @@ -572,6 +596,10 @@ impl Node { loop { tokio::select! { _ = stop_bcast.changed() => { + log_trace!( + bcast_logger, + "Stopping broadcasting node announcements.", + ); return; } _ = interval.tick() => { @@ -628,6 +656,7 @@ impl Node { let mut stop_tx_bcast = self.stop_sender.subscribe(); let tx_bcaster = Arc::clone(&self.tx_broadcaster); + let tx_bcast_logger = Arc::clone(&self.logger); runtime.spawn(async move { // Every second we try to clear our broadcasting queue. let mut interval = tokio::time::interval(Duration::from_secs(1)); @@ -635,6 +664,10 @@ impl Node { loop { tokio::select! { _ = stop_tx_bcast.changed() => { + log_trace!( + tx_bcast_logger, + "Stopping broadcasting transactions.", + ); return; } _ = interval.tick() => { @@ -677,11 +710,17 @@ impl Node { let background_error_logger = Arc::clone(&self.logger); let background_scorer = Arc::clone(&self.scorer); let stop_bp = self.stop_sender.subscribe(); + let sleeper_logger = Arc::clone(&self.logger); let sleeper = move |d| { let mut stop = stop_bp.clone(); + let sleeper_logger = Arc::clone(&sleeper_logger); Box::pin(async move { tokio::select! { _ = stop.changed() => { + log_trace!( + sleeper_logger, + "Stopping processing events.", + ); true } _ = tokio::time::sleep(d) => { @@ -691,6 +730,7 @@ impl Node { }) }; + let background_stop_logger = Arc::clone(&self.logger); runtime.spawn(async move { process_events_async( background_persister, @@ -710,15 +750,21 @@ impl Node { log_error!(background_error_logger, "Failed to process events: {}", e); panic!("Failed to process events"); }); + log_trace!(background_stop_logger, "Events processing stopped.",); }); if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); let liquidity_handler = Arc::clone(&liquidity_source); + let liquidity_logger = Arc::clone(&self.logger); runtime.spawn(async move { loop { tokio::select! { _ = stop_liquidity_handler.changed() => { + log_trace!( + liquidity_logger, + "Stopping processing liquidity events.", + ); return; } _ = liquidity_handler.handle_next_event() => {} From 0a0ccb10edd780a00e49fff9112af9702213b9b7 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 15 May 2024 13:12:42 +0200 Subject: [PATCH 11/13] Shutdown: Wait for event processing to fully stop .. before initiating the Runtime shutdown. --- src/builder.rs | 2 ++ src/lib.rs | 62 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/src/builder.rs b/src/builder.rs index fa6e573b7..6aece0a96 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -959,6 +959,7 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); + let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); let is_listening = Arc::new(AtomicBool::new(false)); let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None)); @@ -971,6 +972,7 @@ fn build_with_store_internal( Ok(Node { runtime, stop_sender, + event_handling_stopped_sender, config, wallet, tx_sync, diff --git a/src/lib.rs b/src/lib.rs index 369072d48..de2a0badf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -173,6 +173,7 @@ uniffi::include_scaffolding!("ldk_node"); pub struct Node { runtime: Arc>>, stop_sender: tokio::sync::watch::Sender<()>, + event_handling_stopped_sender: tokio::sync::watch::Sender<()>, config: Arc, wallet: Arc, tx_sync: Arc>>, @@ -731,6 +732,7 @@ impl Node { }; let background_stop_logger = Arc::clone(&self.logger); + let event_handling_stopped_sender = self.event_handling_stopped_sender.clone(); runtime.spawn(async move { process_events_async( background_persister, @@ -751,6 +753,18 @@ impl Node { panic!("Failed to process events"); }); log_trace!(background_stop_logger, "Events processing stopped.",); + + match event_handling_stopped_sender.send(()) { + Ok(_) => (), + Err(e) => { + log_error!( + background_stop_logger, + "Failed to send 'events handling stopped' signal. This should never happen: {}", + e + ); + debug_assert!(false); + }, + } }); if let Some(liquidity_source) = self.liquidity_source.as_ref() { @@ -800,9 +814,55 @@ impl Node { }, } - // Stop disconnect peers. + // Disconnect all peers. self.peer_manager.disconnect_all_peers(); + // Wait until event handling stopped, at least until a timeout is reached. + let event_handling_stopped_logger = Arc::clone(&self.logger); + let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe(); + + // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow + // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We + // should drop this considerably post upgrading to BDK 1.0. + let timeout_res = runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(100), + event_handling_stopped_receiver.changed(), + ) + .await + }); + + match timeout_res { + Ok(stop_res) => match stop_res { + Ok(()) => {}, + Err(e) => { + log_error!( + event_handling_stopped_logger, + "Stopping event handling failed. This should never happen: {}", + e + ); + panic!("Stopping event handling failed. This should never happen."); + }, + }, + Err(e) => { + log_error!( + event_handling_stopped_logger, + "Stopping event handling timed out: {}", + e + ); + }, + } + + #[cfg(tokio_unstable)] + { + log_trace!( + self.logger, + "Active runtime tasks left prior to shutdown: {}", + runtime.metrics().active_tasks_count() + ); + } + + // Shutdown our runtime. By now ~no or only very few tasks should be left. runtime.shutdown_timeout(Duration::from_secs(10)); log_info!(self.logger, "Shutdown complete."); From 5095d4237c3da7258f8237ed61c2d56855a38f88 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 15 May 2024 15:15:59 +0200 Subject: [PATCH 12/13] Bump `tokio` version to 1.37 .. as we use `Clone` for `tokio::sync::watch::Sender`, which was only introduced with 1.37. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4c4422461..6661497c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ bip39 = "2.0.0" rand = "0.8.5" chrono = { version = "0.4", default-features = false, features = ["clock"] } -tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] } +tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync" ] } esplora-client = { version = "0.6", default-features = false } libc = "0.2" uniffi = { version = "0.26.0", features = ["build"], optional = true } From f839015caa41d6d0331725d1671712a7bddd1b4f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 18 Jun 2024 12:26:25 +0200 Subject: [PATCH 13/13] Also apply a general 10 second socket timeout for the Esplora client --- src/builder.rs | 15 ++++++++++----- src/config.rs | 3 +++ 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 6aece0a96..a2a93aa79 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,6 +1,6 @@ use crate::config::{ default_user_config, Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, - DEFAULT_ESPLORA_SERVER_URL, WALLET_KEYS_SEED_LEN, + DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL, WALLET_KEYS_SEED_LEN, }; use crate::connection::ConnectionManager; use crate::event::EventQueue; @@ -558,10 +558,15 @@ fn build_with_store_internal( let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora(server_url)) => { - let tx_sync = Arc::new(EsploraSyncClient::new(server_url.clone(), Arc::clone(&logger))); - let blockchain = - EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP) - .with_concurrency(BDK_CLIENT_CONCURRENCY); + let mut client_builder = esplora_client::Builder::new(&server_url.clone()); + client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); + let esplora_client = client_builder.build_async().unwrap(); + let tx_sync = Arc::new(EsploraSyncClient::from_client( + esplora_client.clone(), + Arc::clone(&logger), + )); + let blockchain = EsploraBlockchain::from_client(esplora_client, BDK_CLIENT_STOP_GAP) + .with_concurrency(BDK_CLIENT_CONCURRENCY); let tx_broadcaster = Arc::new(TransactionBroadcaster::new( tx_sync.client().clone(), Arc::clone(&logger), diff --git a/src/config.rs b/src/config.rs index 12f3bd5ea..d0e72080f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,6 +28,9 @@ pub(crate) const BDK_CLIENT_CONCURRENCY: u8 = 4; // The default Esplora server we're using. pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; +// The default Esplora client timeout we're using. +pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10; + // The timeout after which we abandon retrying failed payments. pub(crate) const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10);