Skip to content

Commit f6eb56a

Browse files
committed
Switch stop signal over to tokio::sync::watch::channel
1 parent 4bed2e1 commit f6eb56a

File tree

1 file changed

+124
-102
lines changed

1 file changed

+124
-102
lines changed

src/lib.rs

Lines changed: 124 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -570,11 +570,12 @@ impl Builder {
570570
}
571571
};
572572

573-
let stop_running = Arc::new(AtomicBool::new(false));
573+
let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
574574

575575
Arc::new(Node {
576576
runtime,
577-
stop_running,
577+
stop_sender,
578+
stop_receiver,
578579
config,
579580
wallet,
580581
tx_sync,
@@ -599,7 +600,8 @@ impl Builder {
599600
/// Needs to be initialized and instantiated through [`Builder::build`].
600601
pub struct Node {
601602
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
602-
stop_running: Arc<AtomicBool>,
603+
stop_sender: tokio::sync::watch::Sender<()>,
604+
stop_receiver: tokio::sync::watch::Receiver<()>,
603605
config: Arc<Config>,
604606
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>,
605607
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -633,8 +635,6 @@ impl Node {
633635

634636
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
635637

636-
let stop_running = Arc::new(AtomicBool::new(false));
637-
638638
let event_handler = Arc::new(EventHandler::new(
639639
Arc::clone(&self.wallet),
640640
Arc::clone(&self.event_queue),
@@ -653,66 +653,76 @@ impl Node {
653653
let sync_cman = Arc::clone(&self.channel_manager);
654654
let sync_cmon = Arc::clone(&self.chain_monitor);
655655
let sync_logger = Arc::clone(&self.logger);
656-
let stop_sync = Arc::clone(&stop_running);
656+
let mut stop_sync = self.stop_receiver.clone();
657657

658658
std::thread::spawn(move || {
659659
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
660660
async move {
661+
let mut interval = tokio::time::interval(Duration::from_secs(30));
662+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
661663
loop {
662-
if stop_sync.load(Ordering::Acquire) {
663-
return;
664-
}
665664
let now = Instant::now();
666-
match wallet.sync().await {
667-
Ok(()) => log_info!(
668-
sync_logger,
669-
"Background sync of on-chain wallet finished in {}ms.",
670-
now.elapsed().as_millis()
671-
),
672-
Err(err) => {
673-
log_error!(
674-
sync_logger,
675-
"Background sync of on-chain wallet failed: {}",
676-
err
677-
)
665+
tokio::select! {
666+
_ = stop_sync.changed() => {
667+
return;
668+
}
669+
_ = interval.tick() => {
670+
match wallet.sync().await {
671+
Ok(()) => log_info!(
672+
sync_logger,
673+
"Background sync of on-chain wallet finished in {}ms.",
674+
now.elapsed().as_millis()
675+
),
676+
Err(err) => {
677+
log_error!(
678+
sync_logger,
679+
"Background sync of on-chain wallet failed: {}",
680+
err
681+
)
682+
}
683+
}
678684
}
679685
}
680-
tokio::time::sleep(Duration::from_secs(20)).await;
681686
}
682687
},
683688
);
684689
});
685690

686691
let sync_logger = Arc::clone(&self.logger);
687-
let stop_sync = Arc::clone(&stop_running);
692+
let mut stop_sync = self.stop_receiver.clone();
688693
runtime.spawn(async move {
694+
let mut interval = tokio::time::interval(Duration::from_secs(10));
695+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
689696
loop {
690-
if stop_sync.load(Ordering::Acquire) {
691-
return;
692-
}
693697
let now = Instant::now();
694-
let confirmables = vec![
695-
&*sync_cman as &(dyn Confirm + Sync + Send),
696-
&*sync_cmon as &(dyn Confirm + Sync + Send),
697-
];
698-
match tx_sync.sync(confirmables).await {
699-
Ok(()) => log_info!(
700-
sync_logger,
701-
"Background sync of Lightning wallet finished in {}ms.",
702-
now.elapsed().as_millis()
703-
),
704-
Err(e) => {
705-
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
698+
tokio::select! {
699+
_ = stop_sync.changed() => {
700+
return;
701+
}
702+
_ = interval.tick() => {
703+
let confirmables = vec![
704+
&*sync_cman as &(dyn Confirm + Sync + Send),
705+
&*sync_cmon as &(dyn Confirm + Sync + Send),
706+
];
707+
match tx_sync.sync(confirmables).await {
708+
Ok(()) => log_info!(
709+
sync_logger,
710+
"Background sync of Lightning wallet finished in {}ms.",
711+
now.elapsed().as_millis()
712+
),
713+
Err(e) => {
714+
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
715+
}
716+
}
706717
}
707718
}
708-
tokio::time::sleep(Duration::from_secs(5)).await;
709719
}
710720
});
711721

712722
if let Some(listening_address) = &self.config.listening_address {
713723
// Setup networking
714724
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
715-
let stop_listen = Arc::clone(&stop_running);
725+
let mut stop_listen = self.stop_receiver.clone();
716726
let listening_address = listening_address.clone();
717727

718728
let bind_addr = listening_address
@@ -727,18 +737,22 @@ impl Node {
727737
"Failed to bind to listen address/port - is something else already listening on it?",
728738
);
729739
loop {
730-
if stop_listen.load(Ordering::Acquire) {
731-
return;
732-
}
733740
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
734-
let tcp_stream = listener.accept().await.unwrap().0;
735-
tokio::spawn(async move {
736-
lightning_net_tokio::setup_inbound(
737-
Arc::clone(&peer_mgr),
738-
tcp_stream.into_std().unwrap(),
739-
)
740-
.await;
741-
});
741+
tokio::select! {
742+
_ = stop_listen.changed() => {
743+
return;
744+
}
745+
res = listener.accept() => {
746+
let tcp_stream = res.unwrap().0;
747+
tokio::spawn(async move {
748+
lightning_net_tokio::setup_inbound(
749+
Arc::clone(&peer_mgr),
750+
tcp_stream.into_std().unwrap(),
751+
)
752+
.await;
753+
});
754+
}
755+
}
742756
}
743757
});
744758
}
@@ -748,36 +762,38 @@ impl Node {
748762
let connect_pm = Arc::clone(&self.peer_manager);
749763
let connect_logger = Arc::clone(&self.logger);
750764
let connect_peer_store = Arc::clone(&self.peer_store);
751-
let stop_connect = Arc::clone(&stop_running);
765+
let mut stop_connect = self.stop_receiver.clone();
752766
runtime.spawn(async move {
753767
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
754768
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
755769
loop {
756-
if stop_connect.load(Ordering::Acquire) {
757-
return;
758-
}
759-
760-
interval.tick().await;
761-
let pm_peers = connect_pm
762-
.get_peer_node_ids()
763-
.iter()
764-
.map(|(peer, _addr)| *peer)
765-
.collect::<Vec<_>>();
766-
for node_id in connect_cm
767-
.list_channels()
768-
.iter()
769-
.map(|chan| chan.counterparty.node_id)
770-
.filter(|id| !pm_peers.contains(id))
771-
{
772-
if let Some(peer_info) = connect_peer_store.get_peer(&node_id) {
773-
let _ = do_connect_peer(
774-
peer_info.node_id,
775-
peer_info.address,
776-
Arc::clone(&connect_pm),
777-
Arc::clone(&connect_logger),
778-
)
779-
.await;
780-
}
770+
tokio::select! {
771+
_ = stop_connect.changed() => {
772+
return;
773+
}
774+
_ = interval.tick() => {
775+
let pm_peers = connect_pm
776+
.get_peer_node_ids()
777+
.iter()
778+
.map(|(peer, _addr)| *peer)
779+
.collect::<Vec<_>>();
780+
for node_id in connect_cm
781+
.list_channels()
782+
.iter()
783+
.map(|chan| chan.counterparty.node_id)
784+
.filter(|id| !pm_peers.contains(id))
785+
{
786+
if let Some(peer_info) = connect_peer_store.get_peer(&node_id) {
787+
let _ = do_connect_peer(
788+
peer_info.node_id,
789+
peer_info.address,
790+
Arc::clone(&connect_pm),
791+
Arc::clone(&connect_logger),
792+
)
793+
.await;
794+
}
795+
}
796+
}
781797
}
782798
}
783799
});
@@ -786,28 +802,32 @@ impl Node {
786802
let bcast_cm = Arc::clone(&self.channel_manager);
787803
let bcast_pm = Arc::clone(&self.peer_manager);
788804
let bcast_config = Arc::clone(&self.config);
789-
let stop_bcast = Arc::clone(&stop_running);
805+
let mut stop_bcast = self.stop_receiver.clone();
790806
runtime.spawn(async move {
791807
let mut interval = tokio::time::interval(NODE_ANN_BCAST_INTERVAL);
792808
loop {
793-
if stop_bcast.load(Ordering::Acquire) {
794-
return;
795-
}
796-
797-
if !bcast_cm.list_channels().iter().any(|chan| chan.is_public) { continue; }
798-
799-
interval.tick().await;
800-
801-
if !bcast_cm.list_channels().iter().any(|chan| chan.is_public) { continue; }
809+
tokio::select! {
810+
_ = stop_bcast.changed() => {
811+
return;
812+
}
813+
_ = interval.tick(), if bcast_cm.list_channels().iter().any(|chan| chan.is_public) => {
814+
while bcast_pm.get_peer_node_ids().is_empty() {
815+
// Sleep a bit and retry if we don't have any peers yet.
816+
tokio::time::sleep(Duration::from_secs(5)).await;
817+
818+
// Check back if we need to stop.
819+
match stop_bcast.has_changed() {
820+
Ok(false) => {},
821+
Ok(true) => return,
822+
Err(_) => return,
823+
}
824+
}
802825

803-
while bcast_pm.get_peer_node_ids().is_empty() {
804-
// Sleep a bit and retry if we don't have any peers yet.
805-
tokio::time::sleep(Duration::from_secs(5)).await;
826+
let addresses =
827+
bcast_config.listening_address.iter().cloned().map(|a| a.0).collect();
828+
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);
829+
}
806830
}
807-
808-
let addresses =
809-
bcast_config.listening_address.iter().cloned().map(|a| a.0).collect();
810-
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);
811831
}
812832
});
813833

@@ -820,15 +840,17 @@ impl Node {
820840
let background_peer_man = Arc::clone(&self.peer_manager);
821841
let background_logger = Arc::clone(&self.logger);
822842
let background_scorer = Arc::clone(&self.scorer);
823-
let stop_background_processing = Arc::clone(&stop_running);
843+
let stop_bp = self.stop_receiver.clone();
824844
let sleeper = move |d| {
825-
let stop = Arc::clone(&stop_background_processing);
845+
let mut stop = stop_bp.clone();
826846
Box::pin(async move {
827-
if stop.load(Ordering::Acquire) {
828-
true
829-
} else {
830-
tokio::time::sleep(d).await;
831-
false
847+
tokio::select! {
848+
_ = stop.changed() => {
849+
true
850+
}
851+
_ = tokio::time::sleep(d) => {
852+
false
853+
}
832854
}
833855
})
834856
};
@@ -860,7 +882,7 @@ impl Node {
860882
pub fn stop(&self) -> Result<(), Error> {
861883
let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?;
862884
// Stop the runtime.
863-
self.stop_running.store(true, Ordering::Release);
885+
self.stop_sender.send(()).expect("Failed to send stop signal");
864886

865887
// Stop disconnect peers.
866888
self.peer_manager.disconnect_all_peers();

0 commit comments

Comments
 (0)