Skip to content

Commit 3a20407

Browse files
committed
Add initial polling logic
We first initialize by synchronizing all `Listen` implementations, and then head into a loop continuously polling our RPC `BlockSource`. We also implement a `BoundedHeaderCache` to limit in-memory footprint.
1 parent 93314b5 commit 3a20407

File tree

3 files changed

+233
-6
lines changed

3 files changed

+233
-6
lines changed

src/chain/bitcoind_rpc.rs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,22 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet};
9+
10+
use lightning::chain::Listen;
11+
812
use lightning_block_sync::http::HttpEndpoint;
13+
use lightning_block_sync::poll::ValidatedBlockHeader;
914
use lightning_block_sync::rpc::RpcClient;
10-
use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
15+
use lightning_block_sync::{
16+
AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache,
17+
};
1118

1219
use bitcoin::BlockHash;
1320

1421
use base64::prelude::{Engine, BASE64_STANDARD};
1522

23+
use std::collections::{HashMap, VecDeque};
1624
use std::sync::Arc;
1725

1826
pub struct BitcoindRpcClient {
@@ -51,3 +59,75 @@ impl BlockSource for BitcoindRpcClient {
5159
Box::pin(async move { self.rpc_client.get_best_block().await })
5260
}
5361
}
62+
63+
const MAX_HEADER_CACHE_ENTRIES: usize = 100;
64+
65+
pub(super) struct BoundedHeaderCache {
66+
header_map: HashMap<BlockHash, ValidatedBlockHeader>,
67+
recently_seen: VecDeque<BlockHash>,
68+
}
69+
70+
impl BoundedHeaderCache {
71+
pub(super) fn new() -> Self {
72+
let header_map = HashMap::new();
73+
let recently_seen = VecDeque::new();
74+
Self { header_map, recently_seen }
75+
}
76+
}
77+
78+
impl Cache for BoundedHeaderCache {
79+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
80+
self.header_map.get(block_hash)
81+
}
82+
83+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
84+
self.recently_seen.push_back(block_hash);
85+
self.header_map.insert(block_hash, block_header);
86+
87+
if self.header_map.len() >= MAX_HEADER_CACHE_ENTRIES {
88+
// Keep dropping old entries until we've actually removed a header entry.
89+
while let Some(oldest_entry) = self.recently_seen.pop_front() {
90+
if self.header_map.remove(&oldest_entry).is_some() {
91+
break;
92+
}
93+
}
94+
}
95+
}
96+
97+
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
98+
self.recently_seen.retain(|e| e != block_hash);
99+
self.header_map.remove(block_hash)
100+
}
101+
}
102+
103+
pub(crate) struct ChainListener {
104+
pub(crate) onchain_wallet: Arc<Wallet>,
105+
pub(crate) channel_manager: Arc<ChannelManager>,
106+
pub(crate) chain_monitor: Arc<ChainMonitor>,
107+
pub(crate) output_sweeper: Arc<Sweeper>,
108+
}
109+
110+
impl Listen for ChainListener {
111+
fn filtered_block_connected(
112+
&self, header: &bitcoin::block::Header,
113+
txdata: &lightning::chain::transaction::TransactionData, height: u32,
114+
) {
115+
self.onchain_wallet.filtered_block_connected(header, txdata, height);
116+
self.channel_manager.filtered_block_connected(header, txdata, height);
117+
self.chain_monitor.filtered_block_connected(header, txdata, height);
118+
self.output_sweeper.filtered_block_connected(header, txdata, height);
119+
}
120+
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
121+
self.onchain_wallet.block_connected(block, height);
122+
self.channel_manager.block_connected(block, height);
123+
self.chain_monitor.block_connected(block, height);
124+
self.output_sweeper.block_connected(block, height);
125+
}
126+
127+
fn block_disconnected(&self, header: &bitcoin::block::Header, height: u32) {
128+
self.onchain_wallet.block_disconnected(header, height);
129+
self.channel_manager.block_disconnected(header, height);
130+
self.chain_monitor.block_disconnected(header, height);
131+
self.output_sweeper.block_disconnected(header, height);
132+
}
133+
}

src/chain/mod.rs

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
mod bitcoind_rpc;
99
mod block_sync_init;
1010

11-
use crate::chain::bitcoind_rpc::BitcoindRpcClient;
12-
11+
use crate::chain::bitcoind_rpc::{BitcoindRpcClient, BoundedHeaderCache, ChainListener};
12+
use crate::chain::block_sync_init::synchronize_listeners;
1313
use crate::config::{
1414
Config, EsploraSyncConfig, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP,
1515
BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS,
@@ -25,11 +25,14 @@ use crate::logger::{log_bytes, log_error, log_info, log_trace, FilesystemLogger,
2525
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
2626
use crate::{Error, NodeMetrics};
2727

28-
use lightning::chain::{Confirm, Filter};
28+
use lightning::chain::{Confirm, Filter, Listen};
2929
use lightning::util::ser::Writeable;
3030

3131
use lightning_transaction_sync::EsploraSyncClient;
3232

33+
use lightning_block_sync::poll::ChainPoller;
34+
use lightning_block_sync::SpvClient;
35+
3336
use bdk_esplora::EsploraAsyncExt;
3437

3538
use esplora_client::AsyncClient as EsploraAsyncClient;
@@ -46,6 +49,8 @@ pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/ap
4649
// The default Esplora client timeout we're using.
4750
pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10;
4851

52+
const CHAIN_POLLING_INTERVAL_SECS: u64 = 1;
53+
4954
pub(crate) enum WalletSyncStatus {
5055
Completed,
5156
InProgress { subscribers: tokio::sync::broadcast::Sender<Result<(), Error>> },
@@ -242,7 +247,144 @@ impl ChainSource {
242247
}
243248
}
244249
},
245-
Self::BitcoindRpc { .. } => todo!(),
250+
Self::BitcoindRpc {
251+
bitcoind_rpc_client,
252+
onchain_wallet,
253+
kv_store,
254+
config,
255+
logger,
256+
node_metrics,
257+
..
258+
} => {
259+
let mut header_cache = BoundedHeaderCache::new();
260+
let channel_manager_best_block_hash =
261+
channel_manager.current_best_block().block_hash;
262+
let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash;
263+
let onchain_wallet_best_block_hash = onchain_wallet.current_best_block().block_hash;
264+
265+
let mut chain_listeners = vec![
266+
(
267+
onchain_wallet_best_block_hash,
268+
&**onchain_wallet as &(dyn Listen + Send + Sync),
269+
),
270+
(
271+
channel_manager_best_block_hash,
272+
&*channel_manager as &(dyn Listen + Send + Sync),
273+
),
274+
(sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)),
275+
];
276+
277+
// TODO: Eventually we might want to see if we can synchronize `ChannelMonitor`s
278+
// before giving them to `ChainMonitor` it the first place. However, this isn't
279+
// trivial as we load them on initialization (in the `Builder`) and only gain
280+
// network access during `start`. For now, we just make sure we get the worst known
281+
// block hash and sychronize them via `ChainMonitor`.
282+
if let Some(worst_channel_monitor_block_hash) = chain_monitor
283+
.list_monitors()
284+
.iter()
285+
.flat_map(|(txo, _)| chain_monitor.get_monitor(*txo))
286+
.map(|m| m.current_best_block())
287+
.min_by_key(|b| b.height)
288+
.map(|b| b.block_hash)
289+
{
290+
chain_listeners.push((
291+
worst_channel_monitor_block_hash,
292+
&*chain_monitor as &(dyn Listen + Send + Sync),
293+
));
294+
}
295+
296+
let chain_tip = loop {
297+
match synchronize_listeners(
298+
bitcoind_rpc_client.as_ref(),
299+
config.network,
300+
&mut header_cache,
301+
chain_listeners.clone(),
302+
)
303+
.await
304+
{
305+
Ok(chain_tip) => {
306+
{
307+
let unix_time_secs_opt = SystemTime::now()
308+
.duration_since(UNIX_EPOCH)
309+
.ok()
310+
.map(|d| d.as_secs());
311+
let mut locked_node_metrics = node_metrics.write().unwrap();
312+
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
313+
unix_time_secs_opt;
314+
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
315+
unix_time_secs_opt;
316+
write_node_metrics(
317+
&*locked_node_metrics,
318+
Arc::clone(&kv_store),
319+
Arc::clone(&logger),
320+
)
321+
.unwrap_or_else(|e| {
322+
log_error!(logger, "Failed to persist node metrics: {}", e);
323+
});
324+
}
325+
break chain_tip;
326+
},
327+
328+
Err(e) => {
329+
log_error!(logger, "Failed to synchronize chain listeners: {:?}", e);
330+
tokio::time::sleep(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS))
331+
.await;
332+
},
333+
}
334+
};
335+
336+
let chain_poller =
337+
ChainPoller::new(Arc::clone(&bitcoind_rpc_client), config.network);
338+
let chain_listener = ChainListener {
339+
onchain_wallet: Arc::clone(&onchain_wallet),
340+
channel_manager: Arc::clone(&channel_manager),
341+
chain_monitor: Arc::clone(&chain_monitor),
342+
output_sweeper: Arc::clone(&output_sweeper),
343+
};
344+
let mut spv_client =
345+
SpvClient::new(chain_tip, chain_poller, &mut header_cache, &chain_listener);
346+
let mut chain_polling_interval =
347+
tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS));
348+
chain_polling_interval
349+
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
350+
351+
// Start the polling loop.
352+
loop {
353+
tokio::select! {
354+
_ = stop_sync_receiver.changed() => {
355+
log_trace!(
356+
logger,
357+
"Stopping polling for new chain data.",
358+
);
359+
return;
360+
}
361+
_ = chain_polling_interval.tick() => {
362+
let _ = spv_client.poll_best_tip().await.map_err(|e| {
363+
log_error!(logger, "Failed to poll for chain data: {:?}", e);
364+
});
365+
{
366+
let unix_time_secs_opt = SystemTime::now()
367+
.duration_since(UNIX_EPOCH)
368+
.ok()
369+
.map(|d| d.as_secs());
370+
let mut locked_node_metrics = node_metrics.write().unwrap();
371+
locked_node_metrics.latest_lightning_wallet_sync_timestamp =
372+
unix_time_secs_opt;
373+
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
374+
unix_time_secs_opt;
375+
write_node_metrics(
376+
&*locked_node_metrics,
377+
Arc::clone(&kv_store),
378+
Arc::clone(&logger),
379+
)
380+
.unwrap_or_else(|e| {
381+
log_error!(logger, "Failed to persist node metrics: {}", e);
382+
});
383+
}
384+
}
385+
}
386+
}
387+
},
246388
}
247389
}
248390

src/wallet/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::fee_estimator::{ConfirmationTarget, FeeEstimator};
1313
use crate::Error;
1414

1515
use lightning::chain::chaininterface::BroadcasterInterface;
16-
use lightning::chain::Listen;
16+
use lightning::chain::{BestBlock, Listen};
1717

1818
use lightning::events::bump_transaction::{Utxo, WalletSource};
1919
use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage};
@@ -85,6 +85,11 @@ where
8585
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
8686
}
8787

88+
pub(crate) fn current_best_block(&self) -> BestBlock {
89+
let checkpoint = self.inner.lock().unwrap().latest_checkpoint();
90+
BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() }
91+
}
92+
8893
pub(crate) fn apply_update(&self, update: impl Into<Update>) -> Result<(), Error> {
8994
let mut locked_wallet = self.inner.lock().unwrap();
9095
match locked_wallet.apply_update(update) {

0 commit comments

Comments
 (0)