From 6ab3092c99b1f6aec2a0f4b6ddc96808e5fbef61 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 25 Apr 2023 14:37:19 +0200 Subject: [PATCH 1/2] Implement `GossipSource` to allow RGS gossip updates Here we implement a `GossipSource` object that allows us to configure P2P or RGS gossip sources without the need to propagate and leak LDK type parameters upwards. To this end, `GossipSource` wraps the corresponding variants and implements a `RoutingMessageHandler` that is delegating or ignoring the incoming messages. --- Cargo.toml | 2 +- bindings/ldk_node.udl | 1 + src/error.rs | 3 + src/gossip.rs | 89 +++++++++++++++++++++++++ src/lib.rs | 146 ++++++++++++++++++++++++++++++++---------- src/types.rs | 26 ++++++-- 6 files changed, 226 insertions(+), 41 deletions(-) create mode 100644 src/gossip.rs diff --git a/Cargo.toml b/Cargo.toml index a0c9d81c4..d1116d28b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ categories = ["cryptography::cryptocurrencies"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] -crate-type = ["staticlib", "cdylib"] +crate-type = ["lib", "staticlib", "cdylib"] name = "ldk_node" [[bin]] diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 0da572529..e214c3412 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -77,6 +77,7 @@ enum NodeError { "WalletOperationFailed", "WalletSigningFailed", "TxSyncFailed", + "GossipUpdateFailed", "InvalidAddress", "InvalidPublicKey", "InvalidPaymentHash", diff --git a/src/error.rs b/src/error.rs index 9a9a9c512..d4cfda407 100644 --- a/src/error.rs +++ b/src/error.rs @@ -29,6 +29,8 @@ pub enum Error { WalletSigningFailed, /// A transaction sync operation failed. TxSyncFailed, + /// A gossip updating operation failed. + GossipUpdateFailed, /// The given address is invalid. InvalidAddress, /// The given public key is invalid. @@ -71,6 +73,7 @@ impl fmt::Display for Error { Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."), Self::WalletSigningFailed => write!(f, "Failed to sign given transaction."), Self::TxSyncFailed => write!(f, "Failed to sync transactions."), + Self::GossipUpdateFailed => write!(f, "Failed to update gossip data."), Self::InvalidAddress => write!(f, "The given address is invalid."), Self::InvalidPublicKey => write!(f, "The given public key is invalid."), Self::InvalidPaymentHash => write!(f, "The given payment hash is invalid."), diff --git a/src/gossip.rs b/src/gossip.rs new file mode 100644 index 000000000..d5bdffaeb --- /dev/null +++ b/src/gossip.rs @@ -0,0 +1,89 @@ +use crate::logger::{log_trace, FilesystemLogger, Logger}; +use crate::types::{GossipSync, NetworkGraph, P2PGossipSync, RapidGossipSync}; +use crate::Error; + +use lightning::routing::utxo::UtxoLookup; + +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +pub(crate) enum GossipSource { + P2PNetwork { + gossip_sync: Arc, + }, + RapidGossipSync { + gossip_sync: Arc, + server_url: String, + latest_sync_timestamp: AtomicU32, + logger: Arc, + }, +} + +impl GossipSource { + pub fn new_p2p(network_graph: Arc, logger: Arc) -> Self { + let gossip_sync = Arc::new(P2PGossipSync::new( + network_graph, + None::>, + logger, + )); + Self::P2PNetwork { gossip_sync } + } + + pub fn new_rgs( + server_url: String, network_graph: Arc, logger: Arc, + ) -> Self { + let gossip_sync = Arc::new(RapidGossipSync::new(network_graph, Arc::clone(&logger))); + let latest_sync_timestamp = AtomicU32::new(0); + Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } + } + + pub fn is_rgs(&self) -> bool { + if let Self::RapidGossipSync { .. } = self { + true + } else { + false + } + } + + pub fn as_gossip_sync(&self) -> GossipSync { + match self { + Self::RapidGossipSync { gossip_sync, .. } => { + GossipSync::Rapid(Arc::clone(&gossip_sync)) + } + Self::P2PNetwork { gossip_sync, .. } => GossipSync::P2P(Arc::clone(&gossip_sync)), + } + } + + pub async fn update_rgs_snapshot(&self) -> Result<(), Error> { + match self { + Self::P2PNetwork { gossip_sync: _ } => Ok(()), + Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => { + let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire); + let query_url = format!("{}/{}", server_url, query_timestamp); + let response = reqwest::get(query_url).await.map_err(|e| { + log_trace!(logger, "Failed to retrieve RGS gossip update: {}", e); + Error::GossipUpdateFailed + })?; + + match response.error_for_status() { + Ok(res) => { + let update_data = res.bytes().await.map_err(|e| { + log_trace!(logger, "Failed to retrieve RGS gossip update: {}", e); + Error::GossipUpdateFailed + })?; + + let new_latest_sync_timestamp = gossip_sync + .update_network_graph(&update_data) + .map_err(|_| Error::GossipUpdateFailed)?; + latest_sync_timestamp.store(new_latest_sync_timestamp, Ordering::Release); + Ok(()) + } + Err(e) => { + log_trace!(logger, "Failed to retrieve RGS gossip update: {}", e); + Err(Error::GossipUpdateFailed) + } + } + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 63d23f69e..493137c5c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,6 +76,7 @@ mod error; mod event; +mod gossip; mod hex_utils; mod io; mod logger; @@ -89,6 +90,7 @@ mod wallet; pub use bip39; pub use bitcoin; pub use lightning; +use lightning::ln::msgs::RoutingMessageHandler; pub use lightning_invoice; pub use error::Error as NodeError; @@ -96,6 +98,7 @@ use error::Error; pub use event::Event; use event::{EventHandler, EventQueue}; +use gossip::GossipSource; use io::fs_store::FilesystemStore; use io::{KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE}; use payment_store::PaymentStore; @@ -117,15 +120,12 @@ use lightning::ln::channelmanager::{ }; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; -use lightning::routing::gossip::P2PGossipSync; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}; -use lightning::routing::utxo::UtxoLookup; use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig}; use lightning::util::ser::ReadableArgs; use lightning_background_processor::process_events_async; -use lightning_background_processor::GossipSync as BPGossipSync; use lightning_transaction_sync::EsploraSyncClient; @@ -201,32 +201,41 @@ impl Default for Config { } #[derive(Debug, Clone)] -enum WalletEntropySource { +enum EntropySourceConfig { SeedFile(String), SeedBytes([u8; WALLET_KEYS_SEED_LEN]), Bip39Mnemonic { mnemonic: bip39::Mnemonic, passphrase: Option }, } +#[derive(Debug, Clone)] +enum GossipSourceConfig { + P2PNetwork, + RapidGossipSync(String), +} + /// A builder for an [`Node`] instance, allowing to set some configuration and module choices from /// the getgo. #[derive(Debug, Clone)] pub struct Builder { config: Config, - entropy_source: Option, + entropy_source_config: Option, + gossip_source_config: Option, } impl Builder { /// Creates a new builder instance with the default configuration. pub fn new() -> Self { let config = Config::default(); - let entropy_source = None; - Self { config, entropy_source } + let entropy_source_config = None; + let gossip_source_config = None; + Self { config, entropy_source_config, gossip_source_config } } /// Creates a new builder instance from an [`Config`]. pub fn from_config(config: Config) -> Self { - let entropy_source = None; - Self { config, entropy_source } + let entropy_source_config = None; + let gossip_source_config = None; + Self { config, entropy_source_config, gossip_source_config } } /// Configures the [`Node`] instance to source its wallet entropy from a seed file on disk. @@ -234,13 +243,27 @@ impl Builder { /// If the given file does not exist a new random seed file will be generated and /// stored at the given location. pub fn set_entropy_seed_path(&mut self, seed_path: String) -> &mut Self { - self.entropy_source = Some(WalletEntropySource::SeedFile(seed_path)); + self.entropy_source_config = Some(EntropySourceConfig::SeedFile(seed_path)); self } /// Configures the [`Node`] instance to source its wallet entropy from the given seed bytes. pub fn set_entropy_seed_bytes(&mut self, seed_bytes: [u8; WALLET_KEYS_SEED_LEN]) -> &mut Self { - self.entropy_source = Some(WalletEntropySource::SeedBytes(seed_bytes)); + self.entropy_source_config = Some(EntropySourceConfig::SeedBytes(seed_bytes)); + self + } + + /// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer + /// network. + pub fn set_gossip_source_p2p(&mut self) -> &mut Self { + self.gossip_source_config = Some(GossipSourceConfig::P2PNetwork); + self + } + + /// Configures the [`Node`] instance to source its gossip data from the given RapidGossipSync + /// server. + pub fn set_gossip_source_rgs(&mut self, rgs_server_url: String) -> &mut Self { + self.gossip_source_config = Some(GossipSourceConfig::RapidGossipSync(rgs_server_url)); self } @@ -250,7 +273,8 @@ impl Builder { pub fn set_entropy_bip39_mnemonic( &mut self, mnemonic: bip39::Mnemonic, passphrase: Option, ) -> &mut Self { - self.entropy_source = Some(WalletEntropySource::Bip39Mnemonic { mnemonic, passphrase }); + self.entropy_source_config = + Some(EntropySourceConfig::Bip39Mnemonic { mnemonic, passphrase }); self } @@ -303,14 +327,14 @@ impl Builder { let logger = Arc::new(FilesystemLogger::new(log_file_path)); // Initialize the on-chain wallet and chain access - let seed_bytes = if let Some(entropy_source) = &self.entropy_source { + let seed_bytes = if let Some(entropy_source_config) = &self.entropy_source_config { // Use the configured entropy source, if the user set one. - match entropy_source { - WalletEntropySource::SeedBytes(bytes) => bytes.clone(), - WalletEntropySource::SeedFile(seed_path) => { + match entropy_source_config { + EntropySourceConfig::SeedBytes(bytes) => bytes.clone(), + EntropySourceConfig::SeedFile(seed_path) => { io::utils::read_or_generate_seed_file(seed_path) } - WalletEntropySource::Bip39Mnemonic { mnemonic, passphrase } => match passphrase { + EntropySourceConfig::Bip39Mnemonic { mnemonic, passphrase } => match passphrase { Some(passphrase) => mnemonic.to_seed(passphrase), None => mnemonic.to_seed(""), }, @@ -495,13 +519,6 @@ impl Builder { chain_monitor.watch_channel(funding_outpoint, channel_monitor); } - // Initialize the P2PGossipSync - let gossip_sync = Arc::new(P2PGossipSync::new( - Arc::clone(&network_graph), - None::>, - Arc::clone(&logger), - )); - // Initialize the PeerManager let onion_messenger: Arc = Arc::new(OnionMessenger::new( Arc::clone(&keys_manager), @@ -510,17 +527,47 @@ impl Builder { IgnoringMessageHandler {}, )); let ephemeral_bytes: [u8; 32] = keys_manager.get_secure_random_bytes(); - let lightning_msg_handler = MessageHandler { - chan_handler: Arc::clone(&channel_manager), - route_handler: Arc::clone(&gossip_sync), - onion_message_handler: onion_messenger, - }; let cur_time = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .expect("System time error: Clock may have gone backwards"); - let peer_manager: Arc = Arc::new(PeerManager::new( - lightning_msg_handler, + + // Initialize the GossipSource + // Use the configured gossip source, if the user set one, otherwise default to P2PNetwork. + let gossip_source_config = + self.gossip_source_config.as_ref().unwrap_or(&GossipSourceConfig::P2PNetwork); + + let gossip_source = match gossip_source_config { + GossipSourceConfig::P2PNetwork => { + Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger))) + } + GossipSourceConfig::RapidGossipSync(rgs_server) => Arc::new(GossipSource::new_rgs( + rgs_server.clone(), + Arc::clone(&network_graph), + Arc::clone(&logger), + )), + }; + + let msg_handler = match gossip_source.as_gossip_sync() { + GossipSync::P2P(p2p_gossip_sync) => MessageHandler { + chan_handler: Arc::clone(&channel_manager), + route_handler: Arc::clone(&p2p_gossip_sync) + as Arc, + onion_message_handler: onion_messenger, + }, + GossipSync::Rapid(_) => MessageHandler { + chan_handler: Arc::clone(&channel_manager), + route_handler: Arc::new(IgnoringMessageHandler {}) + as Arc, + onion_message_handler: onion_messenger, + }, + GossipSync::None => { + unreachable!("We must always have a gossip sync!"); + } + }; + + let peer_manager = Arc::new(PeerManager::new( + msg_handler, cur_time.as_secs().try_into().expect("System time error"), &ephemeral_bytes, Arc::clone(&logger), @@ -579,7 +626,7 @@ impl Builder { peer_manager, keys_manager, network_graph, - gossip_sync, + gossip_source, kv_store, logger, scorer, @@ -604,7 +651,7 @@ pub struct Node { peer_manager: Arc, keys_manager: Arc, network_graph: Arc, - gossip_sync: Arc, + gossip_source: Arc, kv_store: Arc, logger: Arc, scorer: Arc>, @@ -678,6 +725,37 @@ impl Node { ); }); + if self.gossip_source.is_rgs() { + let gossip_source = Arc::clone(&self.gossip_source); + let gossip_sync_logger = Arc::clone(&self.logger); + let stop_gossip_sync = Arc::clone(&stop_running); + runtime.spawn(async move { + loop { + let gossip_sync_logger = Arc::clone(&gossip_sync_logger); + let stop_gossip_sync = Arc::clone(&stop_gossip_sync); + if stop_gossip_sync.load(Ordering::Acquire) { + return; + } + + let now = Instant::now(); + match gossip_source.update_rgs_snapshot().await { + Ok(()) => log_info!( + gossip_sync_logger, + "Background sync of RGS gossip data finished in {}ms.", + now.elapsed().as_millis() + ), + Err(e) => log_error!( + gossip_sync_logger, + "Background sync of RGS gossip data failed: {}", + e + ), + } + + tokio::time::sleep(Duration::from_secs(60 * 60)).await; + } + }); + } + let sync_logger = Arc::clone(&self.logger); let stop_sync = Arc::clone(&stop_running); runtime.spawn(async move { @@ -774,7 +852,7 @@ impl Node { let background_event_handler = Arc::clone(&event_handler); let background_chain_mon = Arc::clone(&self.chain_monitor); let background_chan_man = Arc::clone(&self.channel_manager); - let background_gossip_sync = BPGossipSync::p2p(Arc::clone(&self.gossip_sync)); + let background_gossip_sync = self.gossip_source.as_gossip_sync(); let background_peer_man = Arc::clone(&self.peer_manager); let background_logger = Arc::clone(&self.logger); let background_scorer = Arc::clone(&self.scorer); diff --git a/src/types.rs b/src/types.rs index 05107f2b3..cb4ecb11b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -8,13 +8,12 @@ use crate::UniffiCustomTypeConverter; use lightning::chain::chainmonitor; use lightning::chain::keysinterface::InMemorySigner; use lightning::ln::channelmanager::ChannelDetails as LdkChannelDetails; +use lightning::ln::msgs::RoutingMessageHandler; use lightning::ln::peer_handler::IgnoringMessageHandler; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::routing::gossip; -use lightning::routing::gossip::P2PGossipSync; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::ProbabilisticScorer; -use lightning::routing::utxo::UtxoLookup; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning_invoice::{Invoice, SignedRawInvoice}; use lightning_net_tokio::SocketDescriptor; @@ -42,7 +41,7 @@ pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< SocketDescriptor, Arc, - Arc, + Arc, Arc, Arc, IgnoringMessageHandler, @@ -66,11 +65,26 @@ pub(crate) type Router = DefaultRouter, Arc, Arc>>; pub(crate) type Scorer = ProbabilisticScorer, Arc>; -pub(crate) type GossipSync = - P2PGossipSync, Arc, Arc>; - pub(crate) type NetworkGraph = gossip::NetworkGraph>; +pub(crate) type UtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; + +pub(crate) type P2PGossipSync = lightning::routing::gossip::P2PGossipSync< + Arc, + Arc, + Arc, +>; +pub(crate) type RapidGossipSync = + lightning_rapid_gossip_sync::RapidGossipSync, Arc>; + +pub(crate) type GossipSync = lightning_background_processor::GossipSync< + Arc, + Arc, + Arc, + Arc, + Arc, +>; + pub(crate) type OnionMessenger = lightning::onion_message::OnionMessenger< Arc>, Arc>, From f3aeeee2ad9b7dfe3b2e626dfeb6ba1010ceb7bf Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Sat, 13 May 2023 09:28:15 +0200 Subject: [PATCH 2/2] Persist RGS `last_sync_timestamp` across restarts --- src/gossip.rs | 11 ++++----- src/io/mod.rs | 4 ++++ src/io/utils.rs | 60 +++++++++++++++++++++++++++++++++++++++++++++++-- src/lib.rs | 48 ++++++++++++++++++++++++++++++--------- 4 files changed, 105 insertions(+), 18 deletions(-) diff --git a/src/gossip.rs b/src/gossip.rs index d5bdffaeb..a3c535310 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -30,10 +30,11 @@ impl GossipSource { } pub fn new_rgs( - server_url: String, network_graph: Arc, logger: Arc, + server_url: String, latest_sync_timestamp: u32, network_graph: Arc, + logger: Arc, ) -> Self { let gossip_sync = Arc::new(RapidGossipSync::new(network_graph, Arc::clone(&logger))); - let latest_sync_timestamp = AtomicU32::new(0); + let latest_sync_timestamp = AtomicU32::new(latest_sync_timestamp); Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } } @@ -54,9 +55,9 @@ impl GossipSource { } } - pub async fn update_rgs_snapshot(&self) -> Result<(), Error> { + pub async fn update_rgs_snapshot(&self) -> Result { match self { - Self::P2PNetwork { gossip_sync: _ } => Ok(()), + Self::P2PNetwork { gossip_sync: _ } => Ok(0), Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => { let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire); let query_url = format!("{}/{}", server_url, query_timestamp); @@ -76,7 +77,7 @@ impl GossipSource { .update_network_graph(&update_data) .map_err(|_| Error::GossipUpdateFailed)?; latest_sync_timestamp.store(new_latest_sync_timestamp, Ordering::Release); - Ok(()) + Ok(new_latest_sync_timestamp) } Err(e) => { log_trace!(logger, "Failed to retrieve RGS gossip update: {}", e); diff --git a/src/io/mod.rs b/src/io/mod.rs index 1c3dd3c69..87a8544a2 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -26,6 +26,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; /// The payment information will be persisted under this prefix. pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments"; +/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key. +pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE: &str = ""; +pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_KEY: &str = "rgs_latest_sync_timestamp"; + /// Provides an interface that allows to store and retrieve persisted values that are associated /// with given keys. /// diff --git a/src/io/utils.rs b/src/io/utils.rs index 00baf77eb..95ad56b06 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -1,15 +1,16 @@ use super::*; use crate::WALLET_KEYS_SEED_LEN; +use crate::logger::log_error; use crate::peer_store::PeerStore; -use crate::{EventQueue, PaymentDetails}; +use crate::{Error, EventQueue, PaymentDetails}; use lightning::chain::channelmonitor::ChannelMonitor; use lightning::chain::keysinterface::{EntropySource, SignerProvider}; use lightning::routing::gossip::NetworkGraph; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}; use lightning::util::logger::Logger; -use lightning::util::ser::{Readable, ReadableArgs}; +use lightning::util::ser::{Readable, ReadableArgs, Writeable}; use bitcoin::hash_types::{BlockHash, Txid}; use bitcoin::hashes::hex::FromHex; @@ -172,3 +173,58 @@ where } Ok(res) } + +pub(crate) fn read_rgs_latest_sync_timestamp(kv_store: K) -> Result +where + K::Target: KVStore, +{ + let mut reader = + kv_store.read(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)?; + u32::read(&mut reader).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to deserialize latest RGS sync timestamp", + ) + }) +} + +pub(crate) fn write_rgs_latest_sync_timestamp( + updated_timestamp: u32, kv_store: K, logger: L, +) -> Result<(), Error> +where + K::Target: KVStore, + L::Target: Logger, +{ + let mut writer = kv_store + .write(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY) + .map_err(|e| { + log_error!( + logger, + "Getting writer for key {}/{} failed due to: {}", + RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, + RGS_LATEST_SYNC_TIMESTAMP_KEY, + e + ); + Error::PersistenceFailed + })?; + updated_timestamp.write(&mut writer).map_err(|e| { + log_error!( + logger, + "Writing data to key {}/{} failed due to: {}", + RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, + RGS_LATEST_SYNC_TIMESTAMP_KEY, + e + ); + Error::PersistenceFailed + })?; + writer.commit().map_err(|e| { + log_error!( + logger, + "Committing data to key {}/{} failed due to: {}", + RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, + RGS_LATEST_SYNC_TIMESTAMP_KEY, + e + ); + Error::PersistenceFailed + }) +} diff --git a/src/lib.rs b/src/lib.rs index 493137c5c..06a2f4463 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -539,13 +539,30 @@ impl Builder { let gossip_source = match gossip_source_config { GossipSourceConfig::P2PNetwork => { - Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger))) + let p2p_source = Arc::new(GossipSource::new_p2p( + Arc::clone(&network_graph), + Arc::clone(&logger), + )); + + // Reset the RGS sync timestamp in case we somehow switch gossip sources + io::utils::write_rgs_latest_sync_timestamp( + 0, + Arc::clone(&kv_store), + Arc::clone(&logger), + ) + .expect("Persistence failed"); + p2p_source + } + GossipSourceConfig::RapidGossipSync(rgs_server) => { + let latest_sync_timestamp = + io::utils::read_rgs_latest_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0); + Arc::new(GossipSource::new_rgs( + rgs_server.clone(), + latest_sync_timestamp, + Arc::clone(&network_graph), + Arc::clone(&logger), + )) } - GossipSourceConfig::RapidGossipSync(rgs_server) => Arc::new(GossipSource::new_rgs( - rgs_server.clone(), - Arc::clone(&network_graph), - Arc::clone(&logger), - )), }; let msg_handler = match gossip_source.as_gossip_sync() { @@ -727,6 +744,7 @@ impl Node { if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); + let gossip_sync_store = Arc::clone(&self.kv_store); let gossip_sync_logger = Arc::clone(&self.logger); let stop_gossip_sync = Arc::clone(&stop_running); runtime.spawn(async move { @@ -739,11 +757,19 @@ impl Node { let now = Instant::now(); match gossip_source.update_rgs_snapshot().await { - Ok(()) => log_info!( - gossip_sync_logger, - "Background sync of RGS gossip data finished in {}ms.", - now.elapsed().as_millis() - ), + Ok(updated_timestamp) => { + log_info!( + gossip_sync_logger, + "Background sync of RGS gossip data finished in {}ms.", + now.elapsed().as_millis() + ); + io::utils::write_rgs_latest_sync_timestamp( + updated_timestamp, + Arc::clone(&gossip_sync_store), + Arc::clone(&gossip_sync_logger), + ) + .expect("Persistence failed"); + } Err(e) => log_error!( gossip_sync_logger, "Background sync of RGS gossip data failed: {}",