Skip to content

Commit a3d933a

Browse files
committed
Add initial polling logic
We first initialize by synchronizing all `Listen` implementations, and then head into a loop continuously polling our RPC `BlockSource`. We also implement a `BoundedHeaderCache` to limit in-memory footprint.
1 parent 0b2cf5c commit a3d933a

File tree

4 files changed

+355
-15
lines changed

4 files changed

+355
-15
lines changed

src/chain/bitcoind_rpc.rs

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,22 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8+
use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet};
9+
10+
use lightning::chain::Listen;
11+
812
use lightning_block_sync::http::HttpEndpoint;
13+
use lightning_block_sync::poll::ValidatedBlockHeader;
914
use lightning_block_sync::rpc::RpcClient;
10-
use lightning_block_sync::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource};
15+
use lightning_block_sync::{
16+
AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache,
17+
};
1118

1219
use bitcoin::BlockHash;
1320

1421
use base64::prelude::{Engine, BASE64_STANDARD};
1522

23+
use std::collections::{HashMap, VecDeque};
1624
use std::sync::Arc;
1725

1826
pub struct BitcoindRpcClient {
@@ -51,3 +59,75 @@ impl BlockSource for BitcoindRpcClient {
5159
Box::pin(async move { self.rpc_client.get_best_block().await })
5260
}
5361
}
62+
63+
const MAX_HEADER_CACHE_ENTRIES: usize = 100;
64+
65+
pub(crate) struct BoundedHeaderCache {
66+
header_map: HashMap<BlockHash, ValidatedBlockHeader>,
67+
recently_seen: VecDeque<BlockHash>,
68+
}
69+
70+
impl BoundedHeaderCache {
71+
pub(crate) fn new() -> Self {
72+
let header_map = HashMap::new();
73+
let recently_seen = VecDeque::new();
74+
Self { header_map, recently_seen }
75+
}
76+
}
77+
78+
impl Cache for BoundedHeaderCache {
79+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
80+
self.header_map.get(block_hash)
81+
}
82+
83+
fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) {
84+
self.recently_seen.push_back(block_hash);
85+
self.header_map.insert(block_hash, block_header);
86+
87+
if self.header_map.len() >= MAX_HEADER_CACHE_ENTRIES {
88+
// Keep dropping old entries until we've actually removed a header entry.
89+
while let Some(oldest_entry) = self.recently_seen.pop_front() {
90+
if self.header_map.remove(&oldest_entry).is_some() {
91+
break;
92+
}
93+
}
94+
}
95+
}
96+
97+
fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
98+
self.recently_seen.retain(|e| e != block_hash);
99+
self.header_map.remove(block_hash)
100+
}
101+
}
102+
103+
pub(crate) struct ChainListener {
104+
pub(crate) onchain_wallet: Arc<Wallet>,
105+
pub(crate) channel_manager: Arc<ChannelManager>,
106+
pub(crate) chain_monitor: Arc<ChainMonitor>,
107+
pub(crate) output_sweeper: Arc<Sweeper>,
108+
}
109+
110+
impl Listen for ChainListener {
111+
fn filtered_block_connected(
112+
&self, header: &bitcoin::block::Header,
113+
txdata: &lightning::chain::transaction::TransactionData, height: u32,
114+
) {
115+
self.onchain_wallet.filtered_block_connected(header, txdata, height);
116+
self.channel_manager.filtered_block_connected(header, txdata, height);
117+
self.chain_monitor.filtered_block_connected(header, txdata, height);
118+
self.output_sweeper.filtered_block_connected(header, txdata, height);
119+
}
120+
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
121+
self.onchain_wallet.block_connected(block, height);
122+
self.channel_manager.block_connected(block, height);
123+
self.chain_monitor.block_connected(block, height);
124+
self.output_sweeper.block_connected(block, height);
125+
}
126+
127+
fn block_disconnected(&self, header: &bitcoin::block::Header, height: u32) {
128+
self.onchain_wallet.block_disconnected(header, height);
129+
self.channel_manager.block_disconnected(header, height);
130+
self.chain_monitor.block_disconnected(header, height);
131+
self.output_sweeper.block_disconnected(header, height);
132+
}
133+
}

0 commit comments

Comments
 (0)