Skip to content

Commit d18167f

Browse files
committed
Add initial polling logic
We first initialize by synchronizing all `Listen` implementations, and then head into a loop continuously polling our RPC `BlockSource`. We also implement a `BoundedHeaderCache` to limit in-memory footprint.
1 parent 2739a95 commit d18167f

File tree

3 files changed

+233
-6
lines changed

3 files changed

+233
-6
lines changed

src/chain/bitcoind_rpc.rs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,22 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet};
9+
10+
use lightning::chain::Listen;
11+
812
use lightning_block_sync::http::HttpEndpoint;
13+
use lightning_block_sync::poll::ValidatedBlockHeader;
914
use lightning_block_sync::rpc::RpcClient;
10-
use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
15+
use lightning_block_sync::{
16+
AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache,
17+
};
1118

1219
use bitcoin::BlockHash;
1320

1421
use base64::prelude::{Engine, BASE64_STANDARD};
1522

23+
use std::collections::{HashMap, VecDeque};
1624
use std::sync::Arc;
1725

1826
pub struct BitcoindRpcClient {
@@ -51,3 +59,75 @@ impl BlockSource for BitcoindRpcClient {
5159
Box::pin(async move { self.rpc_client.get_best_block().await })
5260
}
5361
}
62+
63+
const MAX_HEADER_CACHE_ENTRIES: usize = 100;
64+
65+
pub(super) struct BoundedHeaderCache {
66+
header_map: HashMap<BlockHash, ValidatedBlockHeader>,
67+
recently_seen: VecDeque<BlockHash>,
68+
}
69+
70+
impl BoundedHeaderCache {
71+
pub(super) fn new() -> Self {
72+
let header_map = HashMap::new();
73+
let recently_seen = VecDeque::new();
74+
Self { header_map, recently_seen }
75+
}
76+
}
77+
78+
impl Cache for BoundedHeaderCache {
79+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
80+
self.header_map.get(block_hash)
81+
}
82+
83+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
84+
self.recently_seen.push_back(block_hash);
85+
self.header_map.insert(block_hash, block_header);
86+
87+
if self.header_map.len() >= MAX_HEADER_CACHE_ENTRIES {
88+
// Keep dropping old entries until we've actually removed a header entry.
89+
while let Some(oldest_entry) = self.recently_seen.pop_front() {
90+
if self.header_map.remove(&oldest_entry).is_some() {
91+
break;
92+
}
93+
}
94+
}
95+
}
96+
97+
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
98+
self.recently_seen.retain(|e| e != block_hash);
99+
self.header_map.remove(block_hash)
100+
}
101+
}
102+
103+
pub(crate) struct ChainListener {
104+
pub(crate) onchain_wallet: Arc<Wallet>,
105+
pub(crate) channel_manager: Arc<ChannelManager>,
106+
pub(crate) chain_monitor: Arc<ChainMonitor>,
107+
pub(crate) output_sweeper: Arc<Sweeper>,
108+
}
109+
110+
impl Listen for ChainListener {
111+
fn filtered_block_connected(
112+
&self, header: &bitcoin::block::Header,
113+
txdata: &lightning::chain::transaction::TransactionData, height: u32,
114+
) {
115+
self.onchain_wallet.filtered_block_connected(header, txdata, height);
116+
self.channel_manager.filtered_block_connected(header, txdata, height);
117+
self.chain_monitor.filtered_block_connected(header, txdata, height);
118+
self.output_sweeper.filtered_block_connected(header, txdata, height);
119+
}
120+
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
121+
self.onchain_wallet.block_connected(block, height);
122+
self.channel_manager.block_connected(block, height);
123+
self.chain_monitor.block_connected(block, height);
124+
self.output_sweeper.block_connected(block, height);
125+
}
126+
127+
fn block_disconnected(&self, header: &bitcoin::block::Header, height: u32) {
128+
self.onchain_wallet.block_disconnected(header, height);
129+
self.channel_manager.block_disconnected(header, height);
130+
self.chain_monitor.block_disconnected(header, height);
131+
self.output_sweeper.block_disconnected(header, height);
132+
}
133+
}

src/chain/mod.rs

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
mod bitcoind_rpc;
99

10+
use crate::chain::bitcoind_rpc::{BitcoindRpcClient, BoundedHeaderCache, ChainListener};
1011
use crate::config::{
1112
Config, EsploraSyncConfig, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP,
1213
BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS,
@@ -22,11 +23,15 @@ use crate::logger::{log_bytes, log_error, log_info, log_trace, FilesystemLogger,
2223
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2324
use crate::{Error, NodeMetrics};
2425

25-
use lightning::chain::{Confirm, Filter};
26+
use lightning::chain::{Confirm, Filter, Listen};
2627
use lightning::util::ser::Writeable;
2728

2829
use lightning_transaction_sync::EsploraSyncClient;
2930

31+
use lightning_block_sync::init::synchronize_listeners;
32+
use lightning_block_sync::poll::ChainPoller;
33+
use lightning_block_sync::SpvClient;
34+
3035
use bdk_esplora::EsploraAsyncExt;
3136

3237
use esplora_client::AsyncClient as EsploraAsyncClient;
@@ -37,14 +42,14 @@ use std::collections::HashMap;
3742
use std::sync::{Arc, Mutex, RwLock};
3843
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
3944

40-
use self::bitcoind_rpc::BitcoindRpcClient;
41-
4245
// The default Esplora server we're using.
4346
pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api";
4447

4548
// The default Esplora client timeout we're using.
4649
pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10;
4750

51+
const CHAIN_POLLING_INTERVAL_SECS: u64 = 1;
52+
4853
pub(crate) enum WalletSyncStatus {
4954
Completed,
5055
InProgress { subscribers: tokio::sync::broadcast::Sender<Result<(), Error>> },
@@ -241,7 +246,144 @@ impl ChainSource {
241246
}
242247
}
243248
},
244-
Self::BitcoindRpc { .. } => todo!(),
249+
Self::BitcoindRpc {
250+
bitcoind_rpc_client,
251+
onchain_wallet,
252+
kv_store,
253+
config,
254+
logger,
255+
node_metrics,
256+
..
257+
} => {
258+
let mut header_cache = BoundedHeaderCache::new();
259+
let channel_manager_best_block_hash =
260+
channel_manager.current_best_block().block_hash;
261+
let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash;
262+
let onchain_wallet_best_block_hash = onchain_wallet.current_best_block().block_hash;
263+
264+
let mut chain_listeners = vec![
265+
(
266+
onchain_wallet_best_block_hash,
267+
&**onchain_wallet as &(dyn Listen + Send + Sync),
268+
),
269+
(
270+
channel_manager_best_block_hash,
271+
&*channel_manager as &(dyn Listen + Send + Sync),
272+
),
273+
(sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)),
274+
];
275+
276+
// TODO: Eventually we might want to see if we can synchronize `ChannelMonitor`s
277+
// before giving them to `ChainMonitor` it the first place. However, this isn't
278+
// trivial as we load them on initialization (in the `Builder`) and only gain
279+
// network access during `start`. For now, we just make sure we get the worst known
280+
// block hash and sychronize them via `ChainMonitor`.
281+
if let Some(worst_channel_monitor_block_hash) = chain_monitor
282+
.list_monitors()
283+
.iter()
284+
.flat_map(|(txo, _)| chain_monitor.get_monitor(*txo))
285+
.map(|m| m.current_best_block())
286+
.min_by_key(|b| b.height)
287+
.map(|b| b.block_hash)
288+
{
289+
chain_listeners.push((
290+
worst_channel_monitor_block_hash,
291+
&*chain_monitor as &(dyn Listen + Send + Sync),
292+
));
293+
}
294+
295+
let chain_tip = loop {
296+
match synchronize_listeners(
297+
bitcoind_rpc_client.as_ref(),
298+
config.network,
299+
&mut header_cache,
300+
chain_listeners.clone(),
301+
)
302+
.await
303+
{
304+
Ok(chain_tip) => {
305+
{
306+
let unix_time_secs_opt = SystemTime::now()
307+
.duration_since(UNIX_EPOCH)
308+
.ok()
309+
.map(|d| d.as_secs());
310+
let mut locked_node_metrics = node_metrics.write().unwrap();
311+
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
312+
unix_time_secs_opt;
313+
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
314+
unix_time_secs_opt;
315+
write_node_metrics(
316+
&*locked_node_metrics,
317+
Arc::clone(&kv_store),
318+
Arc::clone(&logger),
319+
)
320+
.unwrap_or_else(|e| {
321+
log_error!(logger, "Failed to persist node metrics: {}", e);
322+
});
323+
}
324+
break chain_tip;
325+
},
326+
327+
Err(e) => {
328+
log_error!(logger, "Failed to synchronize chain listeners: {:?}", e);
329+
tokio::time::sleep(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS))
330+
.await;
331+
},
332+
}
333+
};
334+
335+
let chain_poller =
336+
ChainPoller::new(Arc::clone(&bitcoind_rpc_client), config.network);
337+
let chain_listener = ChainListener {
338+
onchain_wallet: Arc::clone(&onchain_wallet),
339+
channel_manager: Arc::clone(&channel_manager),
340+
chain_monitor: Arc::clone(&chain_monitor),
341+
output_sweeper: Arc::clone(&output_sweeper),
342+
};
343+
let mut spv_client =
344+
SpvClient::new(chain_tip, chain_poller, &mut header_cache, &chain_listener);
345+
let mut chain_polling_interval =
346+
tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS));
347+
chain_polling_interval
348+
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
349+
350+
// Start the polling loop.
351+
loop {
352+
tokio::select! {
353+
_ = stop_sync_receiver.changed() => {
354+
log_trace!(
355+
logger,
356+
"Stopping polling for new chain data.",
357+
);
358+
return;
359+
}
360+
_ = chain_polling_interval.tick() => {
361+
let _ = spv_client.poll_best_tip().await.map_err(|e| {
362+
log_error!(logger, "Failed to poll for chain data: {:?}", e);
363+
});
364+
{
365+
let unix_time_secs_opt = SystemTime::now()
366+
.duration_since(UNIX_EPOCH)
367+
.ok()
368+
.map(|d| d.as_secs());
369+
let mut locked_node_metrics = node_metrics.write().unwrap();
370+
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
371+
unix_time_secs_opt;
372+
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
373+
unix_time_secs_opt;
374+
write_node_metrics(
375+
&*locked_node_metrics,
376+
Arc::clone(&kv_store),
377+
Arc::clone(&logger),
378+
)
379+
.unwrap_or_else(|e| {
380+
log_error!(logger, "Failed to persist node metrics: {}", e);
381+
});
382+
}
383+
}
384+
}
385+
}
386+
},
245387
}
246388
}
247389

src/wallet/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::fee_estimator::{ConfirmationTarget, FeeEstimator};
1313
use crate::Error;
1414

1515
use lightning::chain::chaininterface::BroadcasterInterface;
16-
use lightning::chain::Listen;
16+
use lightning::chain::{BestBlock, Listen};
1717

1818
use lightning::events::bump_transaction::{Utxo, WalletSource};
1919
use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage};
@@ -85,6 +85,11 @@ where
8585
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
8686
}
8787

88+
pub(crate) fn current_best_block(&self) -> BestBlock {
89+
let checkpoint = self.inner.lock().unwrap().latest_checkpoint();
90+
BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() }
91+
}
92+
8893
pub(crate) fn apply_update(&self, update: impl Into<Update>) -> Result<(), Error> {
8994
let mut locked_wallet = self.inner.lock().unwrap();
9095
match locked_wallet.apply_update(update) {

0 commit comments

Comments
 (0)