diff --git a/src/io/mod.rs b/src/io/mod.rs index 87a8544a2..9a9067e6c 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -27,8 +27,8 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments"; /// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key. -pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE: &str = ""; -pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_KEY: &str = "rgs_latest_sync_timestamp"; +pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = ""; +pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp"; /// Provides an interface that allows to store and retrieve persisted values that are associated /// with given keys. diff --git a/src/io/utils.rs b/src/io/utils.rs index 95ad56b06..cd058d475 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -174,12 +174,12 @@ where Ok(res) } -pub(crate) fn read_rgs_latest_sync_timestamp(kv_store: K) -> Result +pub(crate) fn read_latest_rgs_sync_timestamp(kv_store: K) -> Result where K::Target: KVStore, { let mut reader = - kv_store.read(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)?; + kv_store.read(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY)?; u32::read(&mut reader).map_err(|_| { std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -188,7 +188,7 @@ where }) } -pub(crate) fn write_rgs_latest_sync_timestamp( +pub(crate) fn write_latest_rgs_sync_timestamp( updated_timestamp: u32, kv_store: K, logger: L, ) -> Result<(), Error> where @@ -196,13 +196,13 @@ where L::Target: Logger, { let mut writer = kv_store - .write(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY) + .write(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY) .map_err(|e| { log_error!( logger, "Getting writer for key {}/{} failed due to: {}", - RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, - RGS_LATEST_SYNC_TIMESTAMP_KEY, + LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_KEY, e ); Error::PersistenceFailed @@ -211,8 +211,8 @@ where log_error!( logger, "Writing data to key {}/{} failed due to: {}", - RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, - RGS_LATEST_SYNC_TIMESTAMP_KEY, + LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_KEY, e ); Error::PersistenceFailed @@ -221,8 +221,8 @@ where log_error!( logger, "Committing data to key {}/{} failed due to: {}", - RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, - RGS_LATEST_SYNC_TIMESTAMP_KEY, + LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, + LATEST_RGS_SYNC_TIMESTAMP_KEY, e ); Error::PersistenceFailed diff --git a/src/lib.rs b/src/lib.rs index b8b13ab1b..7b3f96d66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -172,6 +172,9 @@ const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); // The time in between peer reconnection attempts. const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10); +// The time in-between RGS sync attempts. +const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); + // The length in bytes of our wallets' keys seed. const WALLET_KEYS_SEED_LEN: usize = 64; @@ -547,7 +550,7 @@ impl Builder { )); // Reset the RGS sync timestamp in case we somehow switch gossip sources - io::utils::write_rgs_latest_sync_timestamp( + io::utils::write_latest_rgs_sync_timestamp( 0, Arc::clone(&kv_store), Arc::clone(&logger), @@ -557,7 +560,7 @@ impl Builder { } GossipSourceConfig::RapidGossipSync(rgs_server) => { let latest_sync_timestamp = - io::utils::read_rgs_latest_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0); + io::utils::read_latest_rgs_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0); Arc::new(GossipSource::new_rgs( rgs_server.clone(), latest_sync_timestamp, @@ -631,11 +634,12 @@ impl Builder { } }; - let stop_running = Arc::new(AtomicBool::new(false)); + let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); Arc::new(Node { runtime, - stop_running, + stop_sender, + stop_receiver, config, wallet, tx_sync, @@ -660,7 +664,8 @@ impl Builder { /// Needs to be initialized and instantiated through [`Builder::build`]. pub struct Node { runtime: Arc>>, - stop_running: Arc, + stop_sender: tokio::sync::watch::Sender<()>, + stop_receiver: tokio::sync::watch::Receiver<()>, config: Arc, wallet: Arc>, tx_sync: Arc>>, @@ -694,8 +699,6 @@ impl Node { let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); - let stop_running = Arc::new(AtomicBool::new(false)); - let event_handler = Arc::new(EventHandler::new( Arc::clone(&self.wallet), Arc::clone(&self.event_queue), @@ -714,31 +717,36 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_logger = Arc::clone(&self.logger); - let stop_sync = Arc::clone(&stop_running); + let mut stop_sync = self.stop_receiver.clone(); std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - if stop_sync.load(Ordering::Acquire) { - return; - } let now = Instant::now(); - match wallet.sync().await { - Ok(()) => log_info!( - sync_logger, - "Background sync of on-chain wallet finished in {}ms.", - now.elapsed().as_millis() - ), - Err(err) => { - log_error!( - sync_logger, - "Background sync of on-chain wallet failed: {}", - err - ) + tokio::select! { + _ = stop_sync.changed() => { + return; + } + _ = interval.tick() => { + match wallet.sync().await { + Ok(()) => log_info!( + sync_logger, + "Background sync of on-chain wallet finished in {}ms.", + now.elapsed().as_millis() + ), + Err(err) => { + log_error!( + sync_logger, + "Background sync of on-chain wallet failed: {}", + err + ) + } + } } } - tokio::time::sleep(Duration::from_secs(20)).await; } }, ); @@ -748,72 +756,78 @@ impl Node { let gossip_source = Arc::clone(&self.gossip_source); let gossip_sync_store = Arc::clone(&self.kv_store); let gossip_sync_logger = Arc::clone(&self.logger); - let stop_gossip_sync = Arc::clone(&stop_running); + let mut stop_gossip_sync = self.stop_receiver.clone(); runtime.spawn(async move { + let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); loop { - let gossip_sync_logger = Arc::clone(&gossip_sync_logger); - let stop_gossip_sync = Arc::clone(&stop_gossip_sync); - if stop_gossip_sync.load(Ordering::Acquire) { - return; - } - - let now = Instant::now(); - match gossip_source.update_rgs_snapshot().await { - Ok(updated_timestamp) => { - log_info!( - gossip_sync_logger, - "Background sync of RGS gossip data finished in {}ms.", - now.elapsed().as_millis() - ); - io::utils::write_rgs_latest_sync_timestamp( - updated_timestamp, - Arc::clone(&gossip_sync_store), - Arc::clone(&gossip_sync_logger), - ) - .expect("Persistence failed"); + tokio::select! { + _ = stop_gossip_sync.changed() => { + return; + } + _ = interval.tick() => { + let gossip_sync_logger = Arc::clone(&gossip_sync_logger); + let now = Instant::now(); + match gossip_source.update_rgs_snapshot().await { + Ok(updated_timestamp) => { + log_info!( + gossip_sync_logger, + "Background sync of RGS gossip data finished in {}ms.", + now.elapsed().as_millis() + ); + io::utils::write_latest_rgs_sync_timestamp( + updated_timestamp, + Arc::clone(&gossip_sync_store), + Arc::clone(&gossip_sync_logger), + ) + .expect("Persistence failed"); + } + Err(e) => log_error!( + gossip_sync_logger, + "Background sync of RGS gossip data failed: {}", + e + ), + } } - Err(e) => log_error!( - gossip_sync_logger, - "Background sync of RGS gossip data failed: {}", - e - ), } - - tokio::time::sleep(Duration::from_secs(60 * 60)).await; } }); } let sync_logger = Arc::clone(&self.logger); - let stop_sync = Arc::clone(&stop_running); + let mut stop_sync = self.stop_receiver.clone(); runtime.spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(10)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { - if stop_sync.load(Ordering::Acquire) { - return; - } let now = Instant::now(); - let confirmables = vec![ - &*sync_cman as &(dyn Confirm + Sync + Send), - &*sync_cmon as &(dyn Confirm + Sync + Send), - ]; - match tx_sync.sync(confirmables).await { - Ok(()) => log_info!( - sync_logger, - "Background sync of Lightning wallet finished in {}ms.", - now.elapsed().as_millis() - ), - Err(e) => { - log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + tokio::select! { + _ = stop_sync.changed() => { + return; + } + _ = interval.tick() => { + let confirmables = vec![ + &*sync_cman as &(dyn Confirm + Sync + Send), + &*sync_cmon as &(dyn Confirm + Sync + Send), + ]; + match tx_sync.sync(confirmables).await { + Ok(()) => log_info!( + sync_logger, + "Background sync of Lightning wallet finished in {}ms.", + now.elapsed().as_millis() + ), + Err(e) => { + log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e) + } + } } } - tokio::time::sleep(Duration::from_secs(5)).await; } }); if let Some(listening_address) = &self.config.listening_address { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); - let stop_listen = Arc::clone(&stop_running); + let mut stop_listen = self.stop_receiver.clone(); let listening_address = listening_address.clone(); let bind_addr = listening_address @@ -828,18 +842,22 @@ impl Node { "Failed to bind to listen address/port - is something else already listening on it?", ); loop { - if stop_listen.load(Ordering::Acquire) { - return; - } let peer_mgr = Arc::clone(&peer_manager_connection_handler); - let tcp_stream = listener.accept().await.unwrap().0; - tokio::spawn(async move { - lightning_net_tokio::setup_inbound( - Arc::clone(&peer_mgr), - tcp_stream.into_std().unwrap(), - ) - .await; - }); + tokio::select! { + _ = stop_listen.changed() => { + return; + } + res = listener.accept() => { + let tcp_stream = res.unwrap().0; + tokio::spawn(async move { + lightning_net_tokio::setup_inbound( + Arc::clone(&peer_mgr), + tcp_stream.into_std().unwrap(), + ) + .await; + }); + } + } } }); } @@ -849,35 +867,38 @@ impl Node { let connect_pm = Arc::clone(&self.peer_manager); let connect_logger = Arc::clone(&self.logger); let connect_peer_store = Arc::clone(&self.peer_store); - let stop_connect = Arc::clone(&stop_running); + let mut stop_connect = self.stop_receiver.clone(); runtime.spawn(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); loop { - if stop_connect.load(Ordering::Acquire) { - return; - } - let pm_peers = connect_pm - .get_peer_node_ids() - .iter() - .map(|(peer, _addr)| *peer) - .collect::>(); - for node_id in connect_cm - .list_channels() - .iter() - .map(|chan| chan.counterparty.node_id) - .filter(|id| !pm_peers.contains(id)) - { - if let Some(peer_info) = connect_peer_store.get_peer(&node_id) { - let _ = do_connect_peer( - peer_info.node_id, - peer_info.address, - Arc::clone(&connect_pm), - Arc::clone(&connect_logger), - ) - .await; - } + tokio::select! { + _ = stop_connect.changed() => { + return; + } + _ = interval.tick() => { + let pm_peers = connect_pm + .get_peer_node_ids() + .iter() + .map(|(peer, _addr)| *peer) + .collect::>(); + for node_id in connect_cm + .list_channels() + .iter() + .map(|chan| chan.counterparty.node_id) + .filter(|id| !pm_peers.contains(id)) + { + if let Some(peer_info) = connect_peer_store.get_peer(&node_id) { + let _ = do_connect_peer( + peer_info.node_id, + peer_info.address, + Arc::clone(&connect_pm), + Arc::clone(&connect_logger), + ) + .await; + } + } + } } - interval.tick().await; } }); @@ -890,15 +911,17 @@ impl Node { let background_peer_man = Arc::clone(&self.peer_manager); let background_logger = Arc::clone(&self.logger); let background_scorer = Arc::clone(&self.scorer); - let stop_background_processing = Arc::clone(&stop_running); + let stop_bp = self.stop_receiver.clone(); let sleeper = move |d| { - let stop = Arc::clone(&stop_background_processing); + let mut stop = stop_bp.clone(); Box::pin(async move { - if stop.load(Ordering::Acquire) { - true - } else { - tokio::time::sleep(d).await; - false + tokio::select! { + _ = stop.changed() => { + true + } + _ = tokio::time::sleep(d) => { + false + } } }) }; @@ -930,7 +953,17 @@ impl Node { pub fn stop(&self) -> Result<(), Error> { let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?; // Stop the runtime. - self.stop_running.store(true, Ordering::Release); + match self.stop_sender.send(()) { + Ok(_) => (), + Err(e) => { + log_error!( + self.logger, + "Failed to send shutdown signal. This should never happen: {}", + e + ); + debug_assert!(false); + } + } // Stop disconnect peers. self.peer_manager.disconnect_all_peers();