Skip to content

Commit f88de49

Browse files
committed
Drop stop_receiver and switch to stop_sender.subscribe()
Our `stop_sender`/`stop_receiver` watch channel is used to signal to background tasks that they need to stop as we're about to shut down. Previously, we stored both sides in our `Node` object. However, when restarting the node, this would lead to us cloning a stale `stop_receiver`, i.e., one that still had the change signal set, messing with our shutdown logic on restarts. Here, we instead drop the stored receiver and just create all receivers by `subscribe`ing to the sender where necessary, which fixes the prior bug.
1 parent afd67d6 commit f88de49

File tree

2 files changed

+11
-13
lines changed

2 files changed

+11
-13
lines changed

src/builder.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -943,12 +943,11 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
943943
},
944944
};
945945

946-
let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
946+
let (stop_sender, _) = tokio::sync::watch::channel(());
947947

948948
Ok(Node {
949949
runtime,
950950
stop_sender,
951-
stop_receiver,
952951
config,
953952
wallet,
954953
tx_sync,

src/lib.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ uniffi::include_scaffolding!("ldk_node");
179179
pub struct Node<K: KVStore + Sync + Send + 'static> {
180180
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
181181
stop_sender: tokio::sync::watch::Sender<()>,
182-
stop_receiver: tokio::sync::watch::Receiver<()>,
183182
config: Arc<Config>,
184183
wallet: Arc<Wallet>,
185184
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -247,7 +246,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
247246
// Setup wallet sync
248247
let wallet = Arc::clone(&self.wallet);
249248
let sync_logger = Arc::clone(&self.logger);
250-
let mut stop_sync = self.stop_receiver.clone();
249+
let mut stop_sync = self.stop_sender.subscribe();
251250
let onchain_wallet_sync_interval_secs = self
252251
.config
253252
.onchain_wallet_sync_interval_secs
@@ -288,7 +287,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
288287
);
289288
});
290289

291-
let mut stop_fee_updates = self.stop_receiver.clone();
290+
let mut stop_fee_updates = self.stop_sender.subscribe();
292291
let fee_update_logger = Arc::clone(&self.logger);
293292
let fee_estimator = Arc::clone(&self.fee_estimator);
294293
let fee_rate_cache_update_interval_secs =
@@ -331,7 +330,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
331330
let sync_cmon = Arc::clone(&self.chain_monitor);
332331
let sync_sweeper = Arc::clone(&self.output_sweeper);
333332
let sync_logger = Arc::clone(&self.logger);
334-
let mut stop_sync = self.stop_receiver.clone();
333+
let mut stop_sync = self.stop_sender.subscribe();
335334
let wallet_sync_interval_secs =
336335
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
337336
runtime.spawn(async move {
@@ -369,7 +368,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
369368
let gossip_source = Arc::clone(&self.gossip_source);
370369
let gossip_sync_store = Arc::clone(&self.kv_store);
371370
let gossip_sync_logger = Arc::clone(&self.logger);
372-
let mut stop_gossip_sync = self.stop_receiver.clone();
371+
let mut stop_gossip_sync = self.stop_sender.subscribe();
373372
runtime.spawn(async move {
374373
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
375374
loop {
@@ -412,7 +411,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
412411
if let Some(listening_addresses) = &self.config.listening_addresses {
413412
// Setup networking
414413
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
415-
let mut stop_listen = self.stop_receiver.clone();
414+
let mut stop_listen = self.stop_sender.subscribe();
416415
let listening_logger = Arc::clone(&self.logger);
417416

418417
let mut bind_addrs = Vec::with_capacity(listening_addresses.len());
@@ -467,7 +466,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
467466
let connect_pm = Arc::clone(&self.peer_manager);
468467
let connect_logger = Arc::clone(&self.logger);
469468
let connect_peer_store = Arc::clone(&self.peer_store);
470-
let mut stop_connect = self.stop_receiver.clone();
469+
let mut stop_connect = self.stop_sender.subscribe();
471470
runtime.spawn(async move {
472471
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
473472
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -516,7 +515,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
516515
let bcast_config = Arc::clone(&self.config);
517516
let bcast_store = Arc::clone(&self.kv_store);
518517
let bcast_logger = Arc::clone(&self.logger);
519-
let mut stop_bcast = self.stop_receiver.clone();
518+
let mut stop_bcast = self.stop_sender.subscribe();
520519
runtime.spawn(async move {
521520
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
522521
let mut interval = tokio::time::interval(Duration::from_secs(30));
@@ -572,7 +571,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
572571
}
573572
});
574573

575-
let mut stop_tx_bcast = self.stop_receiver.clone();
574+
let mut stop_tx_bcast = self.stop_sender.subscribe();
576575
let tx_bcaster = Arc::clone(&self.tx_broadcaster);
577576
runtime.spawn(async move {
578577
// Every second we try to clear our broadcasting queue.
@@ -613,7 +612,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
613612
let background_logger = Arc::clone(&self.logger);
614613
let background_error_logger = Arc::clone(&self.logger);
615614
let background_scorer = Arc::clone(&self.scorer);
616-
let stop_bp = self.stop_receiver.clone();
615+
let stop_bp = self.stop_sender.subscribe();
617616
let sleeper = move |d| {
618617
let mut stop = stop_bp.clone();
619618
Box::pin(async move {
@@ -650,7 +649,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
650649
});
651650

652651
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
653-
let mut stop_liquidity_handler = self.stop_receiver.clone();
652+
let mut stop_liquidity_handler = self.stop_sender.subscribe();
654653
let liquidity_handler = Arc::clone(&liquidity_source);
655654
runtime.spawn(async move {
656655
loop {

0 commit comments

Comments
 (0)