Skip to content

Commit 613053a

Browse files
committed
Persist latest_node_ann_broadcast_timestamp
We remember when we last broadcasted a node announcement and only re-announce if sufficient time has passed, we have at least one public channel, and we have some connected peers to gossip to.
1 parent 6844dc5 commit 613053a

File tree

3 files changed

+139
-53
lines changed

3 files changed

+139
-53
lines changed

src/io/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
2727
pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments";
2828

2929
/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
30-
pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE: &str = "";
31-
pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_KEY: &str = "rgs_latest_sync_timestamp";
30+
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = "";
31+
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp";
32+
33+
/// The last time we broadcast a node announcement will be persisted under this key.
34+
pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE: &str = "";
35+
pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY: &str = "latest_node_ann_bcast_timestamp";
3236

3337
/// Provides an interface that allows to store and retrieve persisted values that are associated
3438
/// with given keys.

src/io/utils.rs

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,12 @@ where
174174
Ok(res)
175175
}
176176

177-
pub(crate) fn read_rgs_latest_sync_timestamp<K: Deref>(kv_store: K) -> Result<u32, std::io::Error>
177+
pub(crate) fn read_latest_rgs_sync_timestamp<K: Deref>(kv_store: K) -> Result<u32, std::io::Error>
178178
where
179179
K::Target: KVStore,
180180
{
181181
let mut reader =
182-
kv_store.read(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)?;
182+
kv_store.read(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY)?;
183183
u32::read(&mut reader).map_err(|_| {
184184
std::io::Error::new(
185185
std::io::ErrorKind::InvalidData,
@@ -188,21 +188,21 @@ where
188188
})
189189
}
190190

191-
pub(crate) fn write_rgs_latest_sync_timestamp<K: Deref, L: Deref>(
191+
pub(crate) fn write_latest_rgs_sync_timestamp<K: Deref, L: Deref>(
192192
updated_timestamp: u32, kv_store: K, logger: L,
193193
) -> Result<(), Error>
194194
where
195195
K::Target: KVStore,
196196
L::Target: Logger,
197197
{
198198
let mut writer = kv_store
199-
.write(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)
199+
.write(LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE, LATEST_RGS_SYNC_TIMESTAMP_KEY)
200200
.map_err(|e| {
201201
log_error!(
202202
logger,
203203
"Getting writer for key {}/{} failed due to: {}",
204-
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
205-
RGS_LATEST_SYNC_TIMESTAMP_KEY,
204+
LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE,
205+
LATEST_RGS_SYNC_TIMESTAMP_KEY,
206206
e
207207
);
208208
Error::PersistenceFailed
@@ -211,8 +211,8 @@ where
211211
log_error!(
212212
logger,
213213
"Writing data to key {}/{} failed due to: {}",
214-
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
215-
RGS_LATEST_SYNC_TIMESTAMP_KEY,
214+
LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE,
215+
LATEST_RGS_SYNC_TIMESTAMP_KEY,
216216
e
217217
);
218218
Error::PersistenceFailed
@@ -221,8 +221,65 @@ where
221221
log_error!(
222222
logger,
223223
"Committing data to key {}/{} failed due to: {}",
224-
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
225-
RGS_LATEST_SYNC_TIMESTAMP_KEY,
224+
LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE,
225+
LATEST_RGS_SYNC_TIMESTAMP_KEY,
226+
e
227+
);
228+
Error::PersistenceFailed
229+
})
230+
}
231+
232+
pub(crate) fn read_latest_node_ann_bcast_timestamp<K: Deref>(
233+
kv_store: K,
234+
) -> Result<u64, std::io::Error>
235+
where
236+
K::Target: KVStore,
237+
{
238+
let mut reader = kv_store
239+
.read(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY)?;
240+
u64::read(&mut reader).map_err(|_| {
241+
std::io::Error::new(
242+
std::io::ErrorKind::InvalidData,
243+
"Failed to deserialize latest node announcment broadcast timestamp",
244+
)
245+
})
246+
}
247+
248+
pub(crate) fn write_latest_node_ann_bcast_timestamp<K: Deref, L: Deref>(
249+
updated_timestamp: u64, kv_store: K, logger: L,
250+
) -> Result<(), Error>
251+
where
252+
K::Target: KVStore,
253+
L::Target: Logger,
254+
{
255+
let mut writer = kv_store
256+
.write(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY)
257+
.map_err(|e| {
258+
log_error!(
259+
logger,
260+
"Getting writer for key {}/{} failed due to: {}",
261+
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
262+
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
263+
e
264+
);
265+
Error::PersistenceFailed
266+
})?;
267+
updated_timestamp.write(&mut writer).map_err(|e| {
268+
log_error!(
269+
logger,
270+
"Writing data to key {}/{} failed due to: {}",
271+
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
272+
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
273+
e
274+
);
275+
Error::PersistenceFailed
276+
})?;
277+
writer.commit().map_err(|e| {
278+
log_error!(
279+
logger,
280+
"Committing data to key {}/{} failed due to: {}",
281+
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
282+
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
226283
e
227284
);
228285
Error::PersistenceFailed

src/lib.rs

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
172172
// The time in-between peer reconnection attempts.
173173
const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);
174174

175+
// The time in-between RGS sync attempts.
176+
const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);
177+
175178
// The time in-between node announcement broadcast attempts.
176179
const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);
177180

@@ -550,7 +553,7 @@ impl Builder {
550553
));
551554

552555
// Reset the RGS sync timestamp in case we somehow switch gossip sources
553-
io::utils::write_rgs_latest_sync_timestamp(
556+
io::utils::write_latest_rgs_sync_timestamp(
554557
0,
555558
Arc::clone(&kv_store),
556559
Arc::clone(&logger),
@@ -560,7 +563,7 @@ impl Builder {
560563
}
561564
GossipSourceConfig::RapidGossipSync(rgs_server) => {
562565
let latest_sync_timestamp =
563-
io::utils::read_rgs_latest_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0);
566+
io::utils::read_latest_rgs_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0);
564567
Arc::new(GossipSource::new_rgs(
565568
rgs_server.clone(),
566569
latest_sync_timestamp,
@@ -756,38 +759,39 @@ impl Node {
756759
let gossip_source = Arc::clone(&self.gossip_source);
757760
let gossip_sync_store = Arc::clone(&self.kv_store);
758761
let gossip_sync_logger = Arc::clone(&self.logger);
759-
let stop_gossip_sync = Arc::clone(&stop_running);
762+
let mut stop_gossip_sync = self.stop_receiver.clone();
760763
runtime.spawn(async move {
764+
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
761765
loop {
762-
let gossip_sync_logger = Arc::clone(&gossip_sync_logger);
763-
let stop_gossip_sync = Arc::clone(&stop_gossip_sync);
764-
if stop_gossip_sync.load(Ordering::Acquire) {
765-
return;
766-
}
767-
768-
let now = Instant::now();
769-
match gossip_source.update_rgs_snapshot().await {
770-
Ok(updated_timestamp) => {
771-
log_info!(
772-
gossip_sync_logger,
773-
"Background sync of RGS gossip data finished in {}ms.",
774-
now.elapsed().as_millis()
775-
);
776-
io::utils::write_rgs_latest_sync_timestamp(
777-
updated_timestamp,
778-
Arc::clone(&gossip_sync_store),
779-
Arc::clone(&gossip_sync_logger),
780-
)
781-
.expect("Persistence failed");
766+
tokio::select! {
767+
_ = stop_gossip_sync.changed() => {
768+
return;
769+
}
770+
_ = interval.tick() => {
771+
let gossip_sync_logger = Arc::clone(&gossip_sync_logger);
772+
let now = Instant::now();
773+
match gossip_source.update_rgs_snapshot().await {
774+
Ok(updated_timestamp) => {
775+
log_info!(
776+
gossip_sync_logger,
777+
"Background sync of RGS gossip data finished in {}ms.",
778+
now.elapsed().as_millis()
779+
);
780+
io::utils::write_latest_rgs_sync_timestamp(
781+
updated_timestamp,
782+
Arc::clone(&gossip_sync_store),
783+
Arc::clone(&gossip_sync_logger),
784+
)
785+
.expect("Persistence failed");
786+
}
787+
Err(e) => log_error!(
788+
gossip_sync_logger,
789+
"Background sync of RGS gossip data failed: {}",
790+
e
791+
),
792+
}
782793
}
783-
Err(e) => log_error!(
784-
gossip_sync_logger,
785-
"Background sync of RGS gossip data failed: {}",
786-
e
787-
),
788794
}
789-
790-
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
791795
}
792796
});
793797
}
@@ -906,30 +910,51 @@ impl Node {
906910
let bcast_cm = Arc::clone(&self.channel_manager);
907911
let bcast_pm = Arc::clone(&self.peer_manager);
908912
let bcast_config = Arc::clone(&self.config);
913+
let bcast_store = Arc::clone(&self.kv_store);
914+
let bcast_logger = Arc::clone(&self.logger);
909915
let mut stop_bcast = self.stop_receiver.clone();
910916
runtime.spawn(async move {
911-
let mut interval = tokio::time::interval(NODE_ANN_BCAST_INTERVAL);
917+
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
918+
let mut interval = tokio::time::interval(Duration::from_secs(30));
912919
loop {
913920
tokio::select! {
914921
_ = stop_bcast.changed() => {
915922
return;
916923
}
917-
_ = interval.tick(), if bcast_cm.list_channels().iter().any(|chan| chan.is_public) => {
918-
while bcast_pm.get_peer_node_ids().is_empty() {
919-
// Sleep a bit and retry if we don't have any peers yet.
920-
tokio::time::sleep(Duration::from_secs(5)).await;
921-
922-
// Check back if we need to stop.
923-
match stop_bcast.has_changed() {
924-
Ok(false) => {},
925-
Ok(true) => return,
926-
Err(_) => return,
924+
_ = interval.tick() => {
925+
let skip_broadcast = match io::utils::read_latest_node_ann_bcast_timestamp(Arc::clone(&bcast_store)) {
926+
Ok(latest_bcast_time_secs) => {
927+
// Skip if the time hasn't elapsed yet.
928+
let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL;
929+
next_bcast_unix_time.elapsed().is_err()
930+
}
931+
Err(_) => {
932+
// Don't skip if we haven't broadcasted before.
933+
false
927934
}
935+
};
936+
937+
if skip_broadcast {
938+
continue;
939+
}
940+
941+
if bcast_cm.list_channels().iter().any(|chan| chan.is_public) {
942+
// Skip if we don't have any public channels.
943+
continue;
944+
}
945+
946+
if bcast_pm.get_peer_node_ids().is_empty() {
947+
// Skip if we don't have any connected peers to gossip to.
948+
continue;
928949
}
929950

930951
let addresses =
931952
bcast_config.listening_address.iter().cloned().map(|a| a.0).collect();
932953
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);
954+
955+
let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
956+
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
957+
.expect("Persistence failed");
933958
}
934959
}
935960
}

0 commit comments

Comments
 (0)