Skip to content

Commit b414c4d

Browse files
authored
Merge pull request #97 from TheBlueMatt/main
Backfill existing network graph into the DB on startup
2 parents 2b60838 + da7ac4b commit b414c4d

File tree

3 files changed

+52
-32
lines changed

3 files changed

+52
-32
lines changed

src/lib.rs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::lookup::DeltaSet;
2828
use crate::persistence::GossipPersister;
2929
use crate::serialization::{MutatedNodeProperties, NodeSerializationStrategy, SerializationSet, UpdateSerialization};
3030
use crate::snapshot::Snapshotter;
31-
use crate::types::RGSSLogger;
31+
use crate::types::{RGSSLogger, GossipMessage};
3232

3333
mod downloader;
3434
mod tracking;
@@ -103,13 +103,33 @@ impl<L: Deref + Clone + Send + Sync + 'static> RapidSyncProcessor<L> where L::Ta
103103
let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);
104104

105105
if config::DOWNLOAD_NEW_GOSSIP {
106-
let (mut persister, persistence_sender) = GossipPersister::new(self.network_graph.clone(), self.logger.clone());
106+
let (mut persister, persistence_sender) =
107+
GossipPersister::new(self.network_graph.clone(), self.logger.clone()).await;
108+
log_info!(self.logger, "Starting gossip db persistence listener");
109+
tokio::spawn(async move { persister.persist_gossip().await; });
110+
111+
{
112+
log_info!(self.logger, "Backfilling latest gossip from cached network graph…");
113+
let graph = self.network_graph.read_only();
114+
for (_, chan) in graph.channels().unordered_iter() {
115+
if let Some(announcement) = &chan.announcement_message {
116+
if let Some(funding) = chan.capacity_sats {
117+
let gossip_msg = GossipMessage::ChannelAnnouncement(announcement.clone(), funding, None);
118+
persistence_sender.send(gossip_msg).await.unwrap();
119+
}
120+
}
121+
if let Some(update) = chan.one_to_two.as_ref().map(|i| i.last_update_message.as_ref()).flatten() {
122+
persistence_sender.send(GossipMessage::ChannelUpdate(update.clone(), None)).await.unwrap();
123+
}
124+
if let Some(update) = chan.two_to_one.as_ref().map(|i| i.last_update_message.as_ref()).flatten() {
125+
persistence_sender.send(GossipMessage::ChannelUpdate(update.clone(), None)).await.unwrap();
126+
}
127+
}
128+
}
107129

108130
log_info!(self.logger, "Starting gossip download");
109131
tokio::spawn(tracking::download_gossip(persistence_sender, sync_completion_sender,
110132
Arc::clone(&self.network_graph), self.logger.clone()));
111-
log_info!(self.logger, "Starting gossip db persistence listener");
112-
tokio::spawn(async move { persister.persist_gossip().await; });
113133
} else {
114134
sync_completion_sender.send(()).await.unwrap();
115135
}

src/persistence.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,7 @@ pub(crate) struct GossipPersister<L: Deref> where L::Target: Logger {
2424
}
2525

2626
impl<L: Deref + Clone + Send + Sync + 'static> GossipPersister<L> where L::Target: Logger {
27-
pub fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
28-
let (gossip_persistence_sender, gossip_persistence_receiver) =
29-
mpsc::channel::<GossipMessage>(100);
30-
let runtime = Runtime::new().unwrap();
31-
(GossipPersister {
32-
gossip_persistence_receiver,
33-
network_graph,
34-
tokio_runtime: runtime,
35-
logger
36-
}, gossip_persistence_sender)
37-
}
38-
39-
pub(crate) async fn persist_gossip(&mut self) {
27+
pub async fn new(network_graph: Arc<NetworkGraph<L>>, logger: L) -> (Self, mpsc::Sender<GossipMessage>) {
4028
{ // initialize the database
4129
// this client instance is only used once
4230
let mut client = crate::connect_to_db().await;
@@ -50,7 +38,7 @@ impl<L: Deref + Clone + Send + Sync + 'static> GossipPersister<L> where L::Targe
5038

5139
let cur_schema = client.query("SELECT db_schema FROM config WHERE id = $1", &[&1]).await.unwrap();
5240
if !cur_schema.is_empty() {
53-
config::upgrade_db(cur_schema[0].get(0), &mut client, self.logger.clone()).await;
41+
config::upgrade_db(cur_schema[0].get(0), &mut client, logger.clone()).await;
5442
}
5543

5644
let preparation = client.execute("set time zone UTC", &[]).await;
@@ -93,6 +81,18 @@ impl<L: Deref + Clone + Send + Sync + 'static> GossipPersister<L> where L::Targe
9381
}
9482
}
9583

84+
let (gossip_persistence_sender, gossip_persistence_receiver) =
85+
mpsc::channel::<GossipMessage>(100);
86+
let runtime = Runtime::new().unwrap();
87+
(GossipPersister {
88+
gossip_persistence_receiver,
89+
network_graph,
90+
tokio_runtime: runtime,
91+
logger
92+
}, gossip_persistence_sender)
93+
}
94+
95+
pub(crate) async fn persist_gossip(&mut self) {
9696
// print log statement every minute
9797
let mut latest_persistence_log = Instant::now() - Duration::from_secs(60);
9898
let mut i = 0u32;

src/tests/mod.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ async fn test_persistence_runtime() {
211211
let logger = Arc::new(TestLogger::new());
212212
let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
213213
let network_graph_arc = Arc::new(network_graph);
214-
let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
214+
let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
215215

216216
tokio::task::spawn_blocking(move || {
217217
drop(_persister);
@@ -240,7 +240,7 @@ async fn test_trivial_setup() {
240240
let logger = Arc::new(TestLogger::new());
241241
let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
242242
let network_graph_arc = Arc::new(network_graph);
243-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
243+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
244244

245245
let short_channel_id = 1;
246246
let timestamp = current_time() - 10;
@@ -314,7 +314,7 @@ async fn test_node_announcement_persistence() {
314314
let logger = Arc::new(TestLogger::new());
315315
let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
316316
let network_graph_arc = Arc::new(network_graph);
317-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
317+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
318318

319319
{ // seed the db
320320
let mut announcement = generate_node_announcement(None);
@@ -355,7 +355,7 @@ async fn test_node_announcement_delta_detection() {
355355
let logger = Arc::new(TestLogger::new());
356356
let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
357357
let network_graph_arc = Arc::new(network_graph);
358-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
358+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
359359

360360
let timestamp = current_time() - 10;
361361

@@ -446,7 +446,7 @@ async fn test_unidirectional_intermediate_update_consideration() {
446446
let logger = Arc::new(TestLogger::new());
447447
let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
448448
let network_graph_arc = Arc::new(network_graph);
449-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
449+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
450450

451451
let short_channel_id = 1;
452452
let timestamp = current_time() - 10;
@@ -515,7 +515,7 @@ async fn test_bidirectional_intermediate_update_consideration() {
515515
let logger = Arc::new(TestLogger::new());
516516
let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
517517
let network_graph_arc = Arc::new(network_graph);
518-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
518+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
519519

520520
let short_channel_id = 1;
521521
let timestamp = current_time() - 10;
@@ -573,7 +573,7 @@ async fn test_channel_reminders() {
573573
let logger = Arc::new(TestLogger::new());
574574
let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
575575
let network_graph_arc = Arc::new(network_graph);
576-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
576+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
577577

578578
let timestamp = current_time();
579579
println!("timestamp: {}", timestamp);
@@ -663,7 +663,7 @@ async fn test_full_snapshot_recency() {
663663
println!("timestamp: {}", timestamp);
664664

665665
{ // seed the db
666-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
666+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
667667
let announcement = generate_channel_announcement(short_channel_id);
668668
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
669669
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
@@ -744,7 +744,7 @@ async fn test_full_snapshot_recency_with_wrong_seen_order() {
744744
println!("timestamp: {}", timestamp);
745745

746746
{ // seed the db
747-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
747+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
748748
let announcement = generate_channel_announcement(short_channel_id);
749749
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
750750
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
@@ -825,7 +825,7 @@ async fn test_full_snapshot_recency_with_wrong_propagation_order() {
825825
println!("timestamp: {}", timestamp);
826826

827827
{ // seed the db
828-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
828+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
829829
let announcement = generate_channel_announcement(short_channel_id);
830830
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
831831
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
@@ -908,7 +908,7 @@ async fn test_full_snapshot_mutiny_scenario() {
908908
println!("timestamp: {}", timestamp);
909909

910910
{ // seed the db
911-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
911+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
912912
let announcement = generate_channel_announcement(short_channel_id);
913913
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
914914
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
@@ -1043,7 +1043,7 @@ async fn test_full_snapshot_interlaced_channel_timestamps() {
10431043
println!("timestamp: {}", timestamp);
10441044

10451045
{ // seed the db
1046-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1046+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
10471047
let secondary_channel_id = main_channel_id + 1;
10481048

10491049
{ // main channel
@@ -1155,7 +1155,7 @@ async fn test_full_snapshot_persistence() {
11551155
println!("timestamp: {}", timestamp);
11561156

11571157
{ // seed the db
1158-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1158+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
11591159
let announcement = generate_channel_announcement(short_channel_id);
11601160
network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
11611161
receiver.send(GossipMessage::ChannelAnnouncement(announcement, 100, None)).await.unwrap();
@@ -1210,7 +1210,7 @@ async fn test_full_snapshot_persistence() {
12101210
}
12111211

12121212
{ // update the db
1213-
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1213+
let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone()).await;
12141214

12151215
{ // second update
12161216
let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);

0 commit comments

Comments
 (0)