Skip to content

Commit 429bc63

Browse files
committed
Introduce status method allowing to query the Node's status
.. we replace the simple `is_running` with a more verbose `status` method returning a `NodeStatus` struct, giving more information on syncing states etc.
1 parent d9e9f2e commit 429bc63

File tree

5 files changed

+180
-22
lines changed

5 files changed

+180
-22
lines changed

bindings/ldk_node.udl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ interface Node {
4444
void start();
4545
[Throws=NodeError]
4646
void stop();
47+
NodeStatus status();
4748
Event? next_event();
4849
Event wait_next_event();
4950
[Async]
@@ -99,7 +100,6 @@ interface Node {
99100
[Throws=NodeError]
100101
string sign_message([ByRef]sequence<u8> msg);
101102
boolean verify_signature([ByRef]sequence<u8> msg, [ByRef]string sig, [ByRef]PublicKey pkey);
102-
boolean is_running();
103103
};
104104

105105
[Error]
@@ -139,6 +139,22 @@ enum NodeError {
139139
"LiquidityFeeTooHigh",
140140
};
141141

142+
dictionary NodeStatus {
143+
boolean is_running;
144+
boolean is_listening;
145+
BestBlock current_best_block;
146+
u64? latest_wallet_sync_timestamp;
147+
u64? latest_onchain_wallet_sync_timestamp;
148+
u64? latest_fee_rate_cache_update_timestamp;
149+
u64? latest_rgs_snapshot_timestamp;
150+
u64? latest_node_announcement_broadcast_timestamp;
151+
};
152+
153+
dictionary BestBlock {
154+
BlockHash block_hash;
155+
u32 height;
156+
};
157+
142158
[Error]
143159
enum BuildError {
144160
"InvalidSeedBytes",

src/builder.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use std::fmt;
6565
use std::fs;
6666
use std::io::Cursor;
6767
use std::path::PathBuf;
68+
use std::sync::atomic::AtomicBool;
6869
use std::sync::{Arc, Mutex, RwLock};
6970
use std::time::SystemTime;
7071

@@ -945,6 +946,13 @@ fn build_with_store_internal(
945946

946947
let (stop_sender, _) = tokio::sync::watch::channel(());
947948

949+
let is_listening = Arc::new(AtomicBool::new(false));
950+
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
951+
let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None));
952+
let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None));
953+
let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None));
954+
let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None));
955+
948956
Ok(Node {
949957
runtime,
950958
stop_sender,
@@ -968,6 +976,12 @@ fn build_with_store_internal(
968976
scorer,
969977
peer_store,
970978
payment_store,
979+
is_listening,
980+
latest_wallet_sync_timestamp,
981+
latest_onchain_wallet_sync_timestamp,
982+
latest_fee_rate_cache_update_timestamp,
983+
latest_rgs_snapshot_timestamp,
984+
latest_node_announcement_broadcast_timestamp,
971985
})
972986
}
973987

src/lib.rs

Lines changed: 128 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pub use error::Error as NodeError;
107107
use error::Error;
108108

109109
pub use event::Event;
110-
pub use types::ChannelConfig;
110+
pub use types::{BestBlock, ChannelConfig};
111111

112112
pub use io::utils::generate_entropy_mnemonic;
113113

@@ -164,8 +164,9 @@ use rand::Rng;
164164

165165
use std::default::Default;
166166
use std::net::ToSocketAddrs;
167+
use std::sync::atomic::{AtomicBool, Ordering};
167168
use std::sync::{Arc, Mutex, RwLock};
168-
use std::time::{Duration, Instant, SystemTime};
169+
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
169170

170171
#[cfg(feature = "uniffi")]
171172
uniffi::include_scaffolding!("ldk_node");
@@ -196,6 +197,12 @@ pub struct Node {
196197
scorer: Arc<Mutex<Scorer>>,
197198
peer_store: Arc<PeerStore<Arc<FilesystemLogger>>>,
198199
payment_store: Arc<PaymentStore<Arc<FilesystemLogger>>>,
200+
is_listening: Arc<AtomicBool>,
201+
latest_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
202+
latest_onchain_wallet_sync_timestamp: Arc<RwLock<Option<u64>>>,
203+
latest_fee_rate_cache_update_timestamp: Arc<RwLock<Option<u64>>>,
204+
latest_rgs_snapshot_timestamp: Arc<RwLock<Option<u64>>>,
205+
latest_node_announcement_broadcast_timestamp: Arc<RwLock<Option<u64>>>,
199206
}
200207

201208
impl Node {
@@ -219,6 +226,8 @@ impl Node {
219226
// Block to ensure we update our fee rate cache once on startup
220227
let fee_estimator = Arc::clone(&self.fee_estimator);
221228
let sync_logger = Arc::clone(&self.logger);
229+
let sync_fee_rate_update_timestamp =
230+
Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
222231
let runtime_ref = &runtime;
223232
tokio::task::block_in_place(move || {
224233
runtime_ref.block_on(async move {
@@ -230,6 +239,9 @@ impl Node {
230239
"Initial fee rate cache update finished in {}ms.",
231240
now.elapsed().as_millis()
232241
);
242+
let unix_time_secs_opt =
243+
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
244+
*sync_fee_rate_update_timestamp.write().unwrap() = unix_time_secs_opt;
233245
Ok(())
234246
},
235247
Err(e) => {
@@ -243,6 +255,7 @@ impl Node {
243255
// Setup wallet sync
244256
let wallet = Arc::clone(&self.wallet);
245257
let sync_logger = Arc::clone(&self.logger);
258+
let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
246259
let mut stop_sync = self.stop_sender.subscribe();
247260
let onchain_wallet_sync_interval_secs = self
248261
.config
@@ -264,11 +277,16 @@ impl Node {
264277
_ = onchain_wallet_sync_interval.tick() => {
265278
let now = Instant::now();
266279
match wallet.sync().await {
267-
Ok(()) => log_trace!(
280+
Ok(()) => {
281+
log_trace!(
268282
sync_logger,
269283
"Background sync of on-chain wallet finished in {}ms.",
270284
now.elapsed().as_millis()
271-
),
285+
);
286+
let unix_time_secs_opt =
287+
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
288+
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
289+
}
272290
Err(err) => {
273291
log_error!(
274292
sync_logger,
@@ -286,6 +304,7 @@ impl Node {
286304

287305
let mut stop_fee_updates = self.stop_sender.subscribe();
288306
let fee_update_logger = Arc::clone(&self.logger);
307+
let fee_update_timestamp = Arc::clone(&self.latest_fee_rate_cache_update_timestamp);
289308
let fee_estimator = Arc::clone(&self.fee_estimator);
290309
let fee_rate_cache_update_interval_secs =
291310
self.config.fee_rate_cache_update_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
@@ -304,11 +323,16 @@ impl Node {
304323
_ = fee_rate_update_interval.tick() => {
305324
let now = Instant::now();
306325
match fee_estimator.update_fee_estimates().await {
307-
Ok(()) => log_trace!(
326+
Ok(()) => {
327+
log_trace!(
308328
fee_update_logger,
309329
"Background update of fee rate cache finished in {}ms.",
310330
now.elapsed().as_millis()
311-
),
331+
);
332+
let unix_time_secs_opt =
333+
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
334+
*fee_update_timestamp.write().unwrap() = unix_time_secs_opt;
335+
}
312336
Err(err) => {
313337
log_error!(
314338
fee_update_logger,
@@ -327,6 +351,7 @@ impl Node {
327351
let sync_cmon = Arc::clone(&self.chain_monitor);
328352
let sync_sweeper = Arc::clone(&self.output_sweeper);
329353
let sync_logger = Arc::clone(&self.logger);
354+
let sync_wallet_timestamp = Arc::clone(&self.latest_wallet_sync_timestamp);
330355
let mut stop_sync = self.stop_sender.subscribe();
331356
let wallet_sync_interval_secs =
332357
self.config.wallet_sync_interval_secs.max(WALLET_SYNC_INTERVAL_MINIMUM_SECS);
@@ -347,11 +372,16 @@ impl Node {
347372
];
348373
let now = Instant::now();
349374
match tx_sync.sync(confirmables).await {
350-
Ok(()) => log_trace!(
375+
Ok(()) => {
376+
log_trace!(
351377
sync_logger,
352378
"Background sync of Lightning wallet finished in {}ms.",
353379
now.elapsed().as_millis()
354-
),
380+
);
381+
let unix_time_secs_opt =
382+
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
383+
*sync_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
384+
}
355385
Err(e) => {
356386
log_error!(sync_logger, "Background sync of Lightning wallet failed: {}", e)
357387
}
@@ -365,6 +395,7 @@ impl Node {
365395
let gossip_source = Arc::clone(&self.gossip_source);
366396
let gossip_sync_store = Arc::clone(&self.kv_store);
367397
let gossip_sync_logger = Arc::clone(&self.logger);
398+
let gossip_rgs_sync_timestamp = Arc::clone(&self.latest_rgs_snapshot_timestamp);
368399
let mut stop_gossip_sync = self.stop_sender.subscribe();
369400
runtime.spawn(async move {
370401
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
@@ -392,6 +423,7 @@ impl Node {
392423
log_error!(gossip_sync_logger, "Persistence failed: {}", e);
393424
panic!("Persistence failed");
394425
});
426+
*gossip_rgs_sync_timestamp.write().unwrap() = Some(updated_timestamp as u64);
395427
}
396428
Err(e) => log_error!(
397429
gossip_sync_logger,
@@ -410,6 +442,7 @@ impl Node {
410442
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
411443
let mut stop_listen = self.stop_sender.subscribe();
412444
let listening_logger = Arc::clone(&self.logger);
445+
let listening_indicator = Arc::clone(&self.is_listening);
413446

414447
let mut bind_addrs = Vec::with_capacity(listening_addresses.len());
415448

@@ -428,6 +461,7 @@ impl Node {
428461
}
429462

430463
runtime.spawn(async move {
464+
{
431465
let listener =
432466
tokio::net::TcpListener::bind(&*bind_addrs).await
433467
.unwrap_or_else(|e| {
@@ -437,11 +471,13 @@ impl Node {
437471
);
438472
});
439473

474+
listening_indicator.store(true, Ordering::Release);
475+
440476
loop {
441477
let peer_mgr = Arc::clone(&peer_manager_connection_handler);
442478
tokio::select! {
443479
_ = stop_listen.changed() => {
444-
return;
480+
break;
445481
}
446482
res = listener.accept() => {
447483
let tcp_stream = res.unwrap().0;
@@ -455,6 +491,9 @@ impl Node {
455491
}
456492
}
457493
}
494+
}
495+
496+
listening_indicator.store(false, Ordering::Release);
458497
});
459498
}
460499

@@ -505,6 +544,7 @@ impl Node {
505544
let bcast_config = Arc::clone(&self.config);
506545
let bcast_store = Arc::clone(&self.kv_store);
507546
let bcast_logger = Arc::clone(&self.logger);
547+
let bcast_ann_timestamp = Arc::clone(&self.latest_node_announcement_broadcast_timestamp);
508548
let mut stop_bcast = self.stop_sender.subscribe();
509549
runtime.spawn(async move {
510550
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
@@ -550,12 +590,17 @@ impl Node {
550590

551591
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);
552592

553-
let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
554-
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
555-
.unwrap_or_else(|e| {
556-
log_error!(bcast_logger, "Persistence failed: {}", e);
557-
panic!("Persistence failed");
558-
});
593+
let unix_time_secs_opt =
594+
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
595+
*bcast_ann_timestamp.write().unwrap() = unix_time_secs_opt;
596+
597+
if let Some(unix_time_secs) = unix_time_secs_opt {
598+
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
599+
.unwrap_or_else(|e| {
600+
log_error!(bcast_logger, "Persistence failed: {}", e);
601+
panic!("Persistence failed");
602+
});
603+
}
559604
}
560605
}
561606
}
@@ -659,11 +704,6 @@ impl Node {
659704
Ok(())
660705
}
661706

662-
/// Returns whether the [`Node`] is running.
663-
pub fn is_running(&self) -> bool {
664-
self.runtime.read().unwrap().is_some()
665-
}
666-
667707
/// Disconnects all peers, stops all running background tasks, and shuts down [`Node`].
668708
///
669709
/// After this returns most API methods will return [`Error::NotRunning`].
@@ -694,6 +734,32 @@ impl Node {
694734
Ok(())
695735
}
696736

737+
/// Returns the status of the [`Node`].
738+
pub fn status(&self) -> NodeStatus {
739+
let is_running = self.runtime.read().unwrap().is_some();
740+
let is_listening = self.is_listening.load(Ordering::Acquire);
741+
let current_best_block = self.channel_manager.current_best_block().into();
742+
let latest_wallet_sync_timestamp = *self.latest_wallet_sync_timestamp.read().unwrap();
743+
let latest_onchain_wallet_sync_timestamp =
744+
*self.latest_onchain_wallet_sync_timestamp.read().unwrap();
745+
let latest_fee_rate_cache_update_timestamp =
746+
*self.latest_fee_rate_cache_update_timestamp.read().unwrap();
747+
let latest_rgs_snapshot_timestamp = *self.latest_rgs_snapshot_timestamp.read().unwrap();
748+
let latest_node_announcement_broadcast_timestamp =
749+
*self.latest_node_announcement_broadcast_timestamp.read().unwrap();
750+
751+
NodeStatus {
752+
is_running,
753+
is_listening,
754+
current_best_block,
755+
latest_wallet_sync_timestamp,
756+
latest_onchain_wallet_sync_timestamp,
757+
latest_fee_rate_cache_update_timestamp,
758+
latest_rgs_snapshot_timestamp,
759+
latest_node_announcement_broadcast_timestamp,
760+
}
761+
}
762+
697763
/// Returns the next event in the event queue, if currently available.
698764
///
699765
/// Will return `Some(..)` if an event is available and `None` otherwise.
@@ -1740,8 +1806,50 @@ impl Drop for Node {
17401806
}
17411807
}
17421808

1809+
<<<<<<< HEAD
17431810
async fn connect_peer_if_necessary(
17441811
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>,
1812+
=======
1813+
/// Represents the status of the [`Node`].
1814+
#[derive(Clone, Debug, PartialEq, Eq)]
1815+
pub struct NodeStatus {
1816+
/// Indicates whether the [`Node`] is running.
1817+
pub is_running: bool,
1818+
/// Indicates whether the [`Node`] is listening for incoming connections on the addresses
1819+
/// configured via [`Config::listening_addresses`].
1820+
pub is_listening: bool,
1821+
/// The best block to which our Lightning wallet is currently synced.
1822+
pub current_best_block: BestBlock,
1823+
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced
1824+
/// our Lightning wallet to the chain tip.
1825+
///
1826+
/// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized.
1827+
pub latest_wallet_sync_timestamp: Option<u64>,
1828+
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully synced
1829+
/// our on-chain wallet to the chain tip.
1830+
///
1831+
/// Will be `None` if the wallet hasn't been synced since the [`Node`] was initialized.
1832+
pub latest_onchain_wallet_sync_timestamp: Option<u64>,
1833+
/// The timestamp, in seconds since start of the UNIX epoch, when we last successfully update
1834+
/// our fee rate cache.
1835+
///
1836+
/// Will be `None` if the cache hasn't been updated since the [`Node`] was initialized.
1837+
pub latest_fee_rate_cache_update_timestamp: Option<u64>,
1838+
/// The timestamp, in seconds since start of the UNIX epoch, when the last rapid gossip sync
1839+
/// (RGS) snapshot we successfully applied was generated.
1840+
///
1841+
/// Will be `None` if RGS isn't configured or the snapshot hasn't been updated since the [`Node`] was initialized.
1842+
pub latest_rgs_snapshot_timestamp: Option<u64>,
1843+
/// The timestamp, in seconds since start of the UNIX epoch, when we last broadcasted a node
1844+
/// announcement.
1845+
///
1846+
/// Will be `None` if we have no public channels or we haven't broadcasted since the [`Node`] was initialized.
1847+
pub latest_node_announcement_broadcast_timestamp: Option<u64>,
1848+
}
1849+
1850+
async fn connect_peer_if_necessary<K: KVStore + Sync + Send + 'static>(
1851+
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager<K>>,
1852+
>>>>>>> 98f2c5a (Introduce `status` method allowing to query the `Node`'s status)
17451853
logger: Arc<FilesystemLogger>,
17461854
) -> Result<(), Error> {
17471855
if peer_manager.peer_by_node_id(&node_id).is_some() {

0 commit comments

Comments
 (0)