Skip to content

Commit 7102ff4

Browse files
committed
Switch stop signal over to tokio::sync::watch::channel
1 parent 9d746bb commit 7102ff4

File tree

1 file changed

+124
-102
lines changed

1 file changed

+124
-102
lines changed

src/lib.rs

Lines changed: 124 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -634,11 +634,12 @@ impl Builder {
634634
}
635635
};
636636

637-
let stop_running = Arc::new(AtomicBool::new(false));
637+
let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
638638

639639
Arc::new(Node {
640640
runtime,
641-
stop_running,
641+
stop_sender,
642+
stop_receiver,
642643
config,
643644
wallet,
644645
tx_sync,
@@ -663,7 +664,8 @@ impl Builder {
663664
/// Needs to be initialized and instantiated through [`Builder::build`].
664665
pub struct Node {
665666
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
666-
stop_running: Arc<AtomicBool>,
667+
stop_sender: tokio::sync::watch::Sender<()>,
668+
stop_receiver: tokio::sync::watch::Receiver<()>,
667669
config: Arc<Config>,
668670
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>,
669671
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -697,8 +699,6 @@ impl Node {
697699

698700
let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
699701

700-
let stop_running = Arc::new(AtomicBool::new(false));
701-
702702
let event_handler = Arc::new(EventHandler::new(
703703
Arc::clone(&self.wallet),
704704
Arc::clone(&self.event_queue),
@@ -717,31 +717,36 @@ impl Node {
717717
let sync_cman = Arc::clone(&self.channel_manager);
718718
let sync_cmon = Arc::clone(&self.chain_monitor);
719719
let sync_logger = Arc::clone(&self.logger);
720-
let stop_sync = Arc::clone(&stop_running);
720+
let mut stop_sync = self.stop_receiver.clone();
721721

722722
std::thread::spawn(move || {
723723
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
724724
async move {
725+
let mut interval = tokio::time::interval(Duration::from_secs(30));
726+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
725727
loop {
726-
if stop_sync.load(Ordering::Acquire) {
727-
return;
728-
}
729728
let now = Instant::now();
730-
match wallet.sync().await {
731-
Ok(()) => log_info!(
732-
sync_logger,
733-
"Background sync of on-chain wallet finished in {}ms.",
734-
now.elapsed().as_millis()
735-
),
736-
Err(err) => {
737-
log_error!(
738-
sync_logger,
739-
"Background sync of on-chain wallet failed: {}",
740-
err
741-
)
729+
tokio::select! {
730+
_ = stop_sync.changed() => {
731+
return;
732+
}
733+
_ = interval.tick() => {
734+
match wallet.sync().await {
735+
Ok(()) => log_info!(
736+
sync_logger,
737+
"Background sync of on-chain wallet finished in {}ms.",
738+
now.elapsed().as_millis()
739+
),
740+
Err(err) => {
741+
log_error!(
742+
sync_logger,
743+
"Background sync of on-chain wallet failed: {}",
744+
err
745+
)
746+
}
747+
}
742748
}
743749
}
744-
tokio::time::sleep(Duration::from_secs(20)).await;
745750
}
746751
},
747752
);
@@ -788,35 +793,40 @@ impl Node {
788793
}
789794

790795
let sync_logger = Arc::clone(&self.logger);
791-
let stop_sync = Arc::clone(&stop_running);
796+
let mut stop_sync = self.stop_receiver.clone();
792797
runtime.spawn(async move {
798+
let mut interval = tokio::time::interval(Duration::from_secs(10));
799+
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
793800
loop {
794-
if stop_sync.load(Ordering::Acquire) {
795-
return;
796-
}
797801
let now = Instant::now();
798-
let confirmables = vec![
799-
&*sync_cman as &(dyn Confirm + Sync + Send),
800-
&*sync_cmon as &(dyn Confirm + Sync + Send),
801-
];
802-
match tx_sync.sync(confirmables).await {
803-
Ok(()) => log_info!(
804-
sync_logger,
805-
"Background sync of Lightning wallet finished in {}ms.",
806-
now.elapsed().as_millis()
807-
),
808-
Err(e) => {
809-
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
802+
tokio::select! {
803+
_ = stop_sync.changed() => {
804+
return;
805+
}
806+
_ = interval.tick() => {
807+
let confirmables = vec![
808+
&*sync_cman as &(dyn Confirm + Sync + Send),
809+
&*sync_cmon as &(dyn Confirm + Sync + Send),
810+
];
811+
match tx_sync.sync(confirmables).await {
812+
Ok(()) => log_info!(
813+
sync_logger,
814+
"Background sync of Lightning wallet finished in {}ms.",
815+
now.elapsed().as_millis()
816+
),
817+
Err(e) => {
818+
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
819+
}
820+
}
810821
}
811822
}
812-
tokio::time::sleep(Duration::from_secs(5)).await;
813823
}
814824
});
815825

816826
if let Some(listening_address) = &self.config.listening_address {
817827
// Setup networking
818828
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
819-
let stop_listen = Arc::clone(&stop_running);
829+
let mut stop_listen = self.stop_receiver.clone();
820830
let listening_address = listening_address.clone();
821831

822832
let bind_addr = listening_address
@@ -831,18 +841,22 @@ impl Node {
831841
"Failed to bind to listen address/port - is something else already listening on it?",
832842
);
833843
loop {
834-
if stop_listen.load(Ordering::Acquire) {
835-
return;
836-
}
837844
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
838-
let tcp_stream = listener.accept().await.unwrap().0;
839-
tokio::spawn(async move {
840-
lightning_net_tokio::setup_inbound(
841-
Arc::clone(&peer_mgr),
842-
tcp_stream.into_std().unwrap(),
843-
)
844-
.await;
845-
});
845+
tokio::select! {
846+
_ = stop_listen.changed() => {
847+
return;
848+
}
849+
res = listener.accept() => {
850+
let tcp_stream = res.unwrap().0;
851+
tokio::spawn(async move {
852+
lightning_net_tokio::setup_inbound(
853+
Arc::clone(&peer_mgr),
854+
tcp_stream.into_std().unwrap(),
855+
)
856+
.await;
857+
});
858+
}
859+
}
846860
}
847861
});
848862
}
@@ -852,36 +866,38 @@ impl Node {
852866
let connect_pm = Arc::clone(&self.peer_manager);
853867
let connect_logger = Arc::clone(&self.logger);
854868
let connect_peer_store = Arc::clone(&self.peer_store);
855-
let stop_connect = Arc::clone(&stop_running);
869+
let mut stop_connect = self.stop_receiver.clone();
856870
runtime.spawn(async move {
857871
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
858872
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
859873
loop {
860-
if stop_connect.load(Ordering::Acquire) {
861-
return;
862-
}
863-
864-
interval.tick().await;
865-
let pm_peers = connect_pm
866-
.get_peer_node_ids()
867-
.iter()
868-
.map(|(peer, _addr)| *peer)
869-
.collect::<Vec<_>>();
870-
for node_id in connect_cm
871-
.list_channels()
872-
.iter()
873-
.map(|chan| chan.counterparty.node_id)
874-
.filter(|id| !pm_peers.contains(id))
875-
{
876-
if let Some(peer_info) = connect_peer_store.get_peer(&node_id) {
877-
let _ = do_connect_peer(
878-
peer_info.node_id,
879-
peer_info.address,
880-
Arc::clone(&connect_pm),
881-
Arc::clone(&connect_logger),
882-
)
883-
.await;
884-
}
874+
tokio::select! {
875+
_ = stop_connect.changed() => {
876+
return;
877+
}
878+
_ = interval.tick() => {
879+
let pm_peers = connect_pm
880+
.get_peer_node_ids()
881+
.iter()
882+
.map(|(peer, _addr)| *peer)
883+
.collect::<Vec<_>>();
884+
for node_id in connect_cm
885+
.list_channels()
886+
.iter()
887+
.map(|chan| chan.counterparty.node_id)
888+
.filter(|id| !pm_peers.contains(id))
889+
{
890+
if let Some(peer_info) = connect_peer_store.get_peer(&node_id) {
891+
let _ = do_connect_peer(
892+
peer_info.node_id,
893+
peer_info.address,
894+
Arc::clone(&connect_pm),
895+
Arc::clone(&connect_logger),
896+
)
897+
.await;
898+
}
899+
}
900+
}
885901
}
886902
}
887903
});
@@ -890,28 +906,32 @@ impl Node {
890906
let bcast_cm = Arc::clone(&self.channel_manager);
891907
let bcast_pm = Arc::clone(&self.peer_manager);
892908
let bcast_config = Arc::clone(&self.config);
893-
let stop_bcast = Arc::clone(&stop_running);
909+
let mut stop_bcast = self.stop_receiver.clone();
894910
runtime.spawn(async move {
895911
let mut interval = tokio::time::interval(NODE_ANN_BCAST_INTERVAL);
896912
loop {
897-
if stop_bcast.load(Ordering::Acquire) {
898-
return;
899-
}
900-
901-
if !bcast_cm.list_channels().iter().any(|chan| chan.is_public) { continue; }
902-
903-
interval.tick().await;
904-
905-
if !bcast_cm.list_channels().iter().any(|chan| chan.is_public) { continue; }
913+
tokio::select! {
914+
_ = stop_bcast.changed() => {
915+
return;
916+
}
917+
_ = interval.tick(), if bcast_cm.list_channels().iter().any(|chan| chan.is_public) => {
918+
while bcast_pm.get_peer_node_ids().is_empty() {
919+
// Sleep a bit and retry if we don't have any peers yet.
920+
tokio::time::sleep(Duration::from_secs(5)).await;
921+
922+
// Check back if we need to stop.
923+
match stop_bcast.has_changed() {
924+
Ok(false) => {},
925+
Ok(true) => return,
926+
Err(_) => return,
927+
}
928+
}
906929

907-
while bcast_pm.get_peer_node_ids().is_empty() {
908-
// Sleep a bit and retry if we don't have any peers yet.
909-
tokio::time::sleep(Duration::from_secs(5)).await;
930+
let addresses =
931+
bcast_config.listening_address.iter().cloned().map(|a| a.0).collect();
932+
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);
933+
}
910934
}
911-
912-
let addresses =
913-
bcast_config.listening_address.iter().cloned().map(|a| a.0).collect();
914-
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);
915935
}
916936
});
917937

@@ -924,15 +944,17 @@ impl Node {
924944
let background_peer_man = Arc::clone(&self.peer_manager);
925945
let background_logger = Arc::clone(&self.logger);
926946
let background_scorer = Arc::clone(&self.scorer);
927-
let stop_background_processing = Arc::clone(&stop_running);
947+
let stop_bp = self.stop_receiver.clone();
928948
let sleeper = move |d| {
929-
let stop = Arc::clone(&stop_background_processing);
949+
let mut stop = stop_bp.clone();
930950
Box::pin(async move {
931-
if stop.load(Ordering::Acquire) {
932-
true
933-
} else {
934-
tokio::time::sleep(d).await;
935-
false
951+
tokio::select! {
952+
_ = stop.changed() => {
953+
true
954+
}
955+
_ = tokio::time::sleep(d) => {
956+
false
957+
}
936958
}
937959
})
938960
};
@@ -964,7 +986,7 @@ impl Node {
964986
pub fn stop(&self) -> Result<(), Error> {
965987
let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?;
966988
// Stop the runtime.
967-
self.stop_running.store(true, Ordering::Release);
989+
self.stop_sender.send(()).expect("Failed to send stop signal");
968990

969991
// Stop disconnect peers.
970992
self.peer_manager.disconnect_all_peers();

0 commit comments

Comments
 (0)