Skip to content

Commit ed7bfcd

Browse files
committed
Shutdown: Wait for event processing to fully stop
.. before initiating the Runtime shutdown.
1 parent 400efe5 commit ed7bfcd

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

src/builder.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,7 @@ fn build_with_store_internal(
972972
};
973973

974974
let (stop_sender, _) = tokio::sync::watch::channel(());
975+
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
975976

976977
let is_listening = Arc::new(AtomicBool::new(false));
977978
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
@@ -983,6 +984,7 @@ fn build_with_store_internal(
983984
Ok(Node {
984985
runtime,
985986
stop_sender,
987+
event_handling_stopped_sender,
986988
config,
987989
wallet,
988990
tx_sync,

src/lib.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ uniffi::include_scaffolding!("ldk_node");
171171
pub struct Node {
172172
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
173173
stop_sender: tokio::sync::watch::Sender<()>,
174+
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
174175
config: Arc<Config>,
175176
wallet: Arc<Wallet>,
176177
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
@@ -714,6 +715,7 @@ impl Node {
714715
};
715716

716717
let background_stop_logger = Arc::clone(&self.logger);
718+
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
717719
runtime.spawn(async move {
718720
process_events_async(
719721
background_persister,
@@ -734,6 +736,18 @@ impl Node {
734736
panic!("Failed to process events");
735737
});
736738
log_trace!(background_stop_logger, "Events processing stopped.",);
739+
740+
match event_handling_stopped_sender.send(()) {
741+
Ok(_) => (),
742+
Err(e) => {
743+
log_error!(
744+
background_stop_logger,
745+
"Failed to send 'events handling stopped' signal. This should never happen: {}",
746+
e
747+
);
748+
debug_assert!(false);
749+
},
750+
}
737751
});
738752

739753
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
@@ -783,9 +797,47 @@ impl Node {
783797
},
784798
}
785799

786-
// Stop disconnect peers.
800+
// Disconnect all peers.
787801
self.peer_manager.disconnect_all_peers();
788802

803+
// Wait until event handling stopped, at least until a timeout is reached.
804+
let event_handling_stopped_logger = Arc::clone(&self.logger);
805+
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();
806+
807+
let _ = runtime
808+
.block_on(async {
809+
tokio::time::timeout(
810+
Duration::from_secs(10),
811+
event_handling_stopped_receiver.changed(),
812+
)
813+
.await
814+
})
815+
.map_err(|e| {
816+
log_error!(
817+
event_handling_stopped_logger,
818+
"Stopping event handling timed out. This should never happen: {}",
819+
e
820+
);
821+
debug_assert!(false);
822+
})
823+
.unwrap_or_else(|_| {
824+
log_error!(
825+
event_handling_stopped_logger,
826+
"Stopping event handling failed. This should never happen.",
827+
);
828+
panic!("Stopping event handling failed. This should never happen.");
829+
});
830+
831+
#[cfg(tokio_unstable)]
832+
{
833+
log_trace!(
834+
self.logger,
835+
"Active runtime tasks left prior to shutdown: {}",
836+
runtime.metrics().active_tasks_count()
837+
);
838+
}
839+
840+
// Shutdown our runtime. By now ~no or only very few tasks should be left.
789841
runtime.shutdown_timeout(Duration::from_secs(10));
790842

791843
log_info!(self.logger, "Shutdown complete.");

0 commit comments

Comments
 (0)