Skip to content

Commit 8fc03ee

Browse files
committed
feat(rpc): introduce FilterIter
1 parent 43f0f8d commit 8fc03ee

File tree

5 files changed

+547
-1
lines changed

5 files changed

+547
-1
lines changed

crates/bitcoind_rpc/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,15 @@ bitcoincore-rpc = { version = "0.19.0" }
2121
bdk_core = { path = "../core", version = "0.4.1", default-features = false }
2222

2323
[dev-dependencies]
24+
bdk_bitcoind_rpc = { path = "." }
2425
bdk_testenv = { path = "../testenv" }
2526
bdk_chain = { path = "../chain" }
2627

2728
[features]
2829
default = ["std"]
2930
std = ["bitcoin/std", "bdk_core/std"]
3031
serde = ["bitcoin/serde", "bdk_core/serde"]
32+
33+
[[example]]
34+
name = "filter_iter"
35+
required-features = ["std"]
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#![allow(clippy::print_stdout)]
2+
use std::time::Instant;
3+
4+
use anyhow::Context;
5+
use bdk_bitcoind_rpc::bip158::{Event, EventInner, FilterIter};
6+
use bdk_chain::bitcoin::{constants::genesis_block, secp256k1::Secp256k1, Network};
7+
use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex;
8+
use bdk_chain::local_chain::LocalChain;
9+
use bdk_chain::miniscript::Descriptor;
10+
use bdk_chain::{BlockId, ConfirmationBlockTime, IndexedTxGraph, SpkIterator};
11+
use bdk_testenv::anyhow;
12+
13+
// This example shows how BDK chain and tx-graph structures are updated using compact
14+
// filters syncing. Assumes a connection can be made to a bitcoin node via environment
15+
// variables `RPC_URL` and `RPC_COOKIE`.
16+
17+
// Usage: `cargo run -p bdk_bitcoind_rpc --example filter_iter`
18+
19+
const EXTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/0/*)#uswl2jj7";
20+
const INTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/1/*)#dyt7h8zx";
21+
const SPK_COUNT: u32 = 25;
22+
const NETWORK: Network = Network::Signet;
23+
24+
const START_HEIGHT: u32 = 170_000;
25+
const START_HASH: &str = "00000041c812a89f084f633e4cf47e819a2f6b1c0a15162355a930410522c99d";
26+
27+
fn main() -> anyhow::Result<()> {
28+
// Setup receiving chain and graph structures.
29+
let secp = Secp256k1::new();
30+
let (descriptor, _) = Descriptor::parse_descriptor(&secp, EXTERNAL)?;
31+
let (change_descriptor, _) = Descriptor::parse_descriptor(&secp, INTERNAL)?;
32+
let (mut chain, _) = LocalChain::from_genesis_hash(genesis_block(NETWORK).block_hash());
33+
let mut graph = IndexedTxGraph::<ConfirmationBlockTime, KeychainTxOutIndex<&str>>::new({
34+
let mut index = KeychainTxOutIndex::default();
35+
index.insert_descriptor("external", descriptor.clone())?;
36+
index.insert_descriptor("internal", change_descriptor.clone())?;
37+
index
38+
});
39+
40+
// Assume a minimum birthday height
41+
let block = BlockId {
42+
height: START_HEIGHT,
43+
hash: START_HASH.parse()?,
44+
};
45+
let _ = chain.insert_block(block)?;
46+
47+
// Configure RPC client
48+
let url = std::env::var("RPC_URL").context("must set RPC_URL")?;
49+
let cookie = std::env::var("RPC_COOKIE").context("must set RPC_COOKIE")?;
50+
let rpc_client =
51+
bitcoincore_rpc::Client::new(&url, bitcoincore_rpc::Auth::CookieFile(cookie.into()))?;
52+
53+
// Initialize block emitter
54+
let cp = chain.tip();
55+
let start_height = cp.height();
56+
let mut emitter = FilterIter::new_with_checkpoint(&rpc_client, cp);
57+
for (_, desc) in graph.index.keychains() {
58+
let spks = SpkIterator::new_with_range(desc, 0..SPK_COUNT).map(|(_, spk)| spk);
59+
emitter.add_spks(spks);
60+
}
61+
62+
let start = Instant::now();
63+
64+
// Sync
65+
if let Some(tip) = emitter.get_tip()? {
66+
let blocks_to_scan = tip.height - start_height;
67+
68+
for event in emitter.by_ref() {
69+
let event = event?;
70+
let curr = event.height();
71+
// apply relevant blocks
72+
if let Event::Block(EventInner { height, ref block }) = event {
73+
let _ = graph.apply_block_relevant(block, height);
74+
println!("Matched block {}", curr);
75+
}
76+
if curr % 1000 == 0 {
77+
let progress = (curr - start_height) as f32 / blocks_to_scan as f32;
78+
println!("[{:.2}%]", progress * 100.0);
79+
}
80+
}
81+
// update chain
82+
if let Some(cp) = emitter.chain_update() {
83+
let _ = chain.apply_update(cp)?;
84+
}
85+
}
86+
87+
println!("\ntook: {}s", start.elapsed().as_secs());
88+
println!("Local tip: {}", chain.tip().height());
89+
let unspent: Vec<_> = graph
90+
.graph()
91+
.filter_chain_unspents(
92+
&chain,
93+
chain.tip().block_id(),
94+
graph.index.outpoints().clone(),
95+
)
96+
.collect();
97+
if !unspent.is_empty() {
98+
println!("\nUnspent");
99+
for (index, utxo) in unspent {
100+
// (k, index) | value | outpoint |
101+
println!("{:?} | {} | {}", index, utxo.txout.value, utxo.outpoint);
102+
}
103+
}
104+
105+
Ok(())
106+
}

crates/bitcoind_rpc/src/bip158.rs

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
//! Compact block filters sync over RPC. For more details refer to [BIP157][0].
2+
//!
3+
//! This module is home to [`FilterIter`], a structure that returns bitcoin blocks by matching
4+
//! a list of script pubkeys against a [BIP158][1] [`BlockFilter`].
5+
//!
6+
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
7+
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
8+
9+
use bdk_core::collections::BTreeMap;
10+
use core::fmt;
11+
12+
use bdk_core::bitcoin;
13+
use bdk_core::{BlockId, CheckPoint};
14+
use bitcoin::{
15+
bip158::{self, BlockFilter},
16+
Block, BlockHash, ScriptBuf,
17+
};
18+
use bitcoincore_rpc;
19+
use bitcoincore_rpc::RpcApi;
20+
21+
/// Block height
22+
type Height = u32;
23+
24+
/// Type that generates block [`Event`]s by matching a list of script pubkeys against a
25+
/// [`BlockFilter`].
26+
#[derive(Debug)]
27+
pub struct FilterIter<'c, C> {
28+
// RPC client
29+
client: &'c C,
30+
// SPK inventory
31+
spks: Vec<ScriptBuf>,
32+
// local cp
33+
cp: Option<CheckPoint>,
34+
// blocks map
35+
blocks: BTreeMap<Height, BlockHash>,
36+
// best height counter
37+
height: Height,
38+
// stop height
39+
stop: Height,
40+
}
41+
42+
impl<'c, C: RpcApi> FilterIter<'c, C> {
43+
/// Construct [`FilterIter`] from a given `client` and start `height`.
44+
pub fn new_with_height(client: &'c C, height: u32) -> Self {
45+
Self {
46+
client,
47+
spks: vec![],
48+
cp: None,
49+
blocks: BTreeMap::new(),
50+
height,
51+
stop: 0,
52+
}
53+
}
54+
55+
/// Construct [`FilterIter`] from a given `client` and [`CheckPoint`].
56+
pub fn new_with_checkpoint(client: &'c C, cp: CheckPoint) -> Self {
57+
let mut filter_iter = Self::new_with_height(client, cp.height());
58+
filter_iter.cp = Some(cp);
59+
filter_iter
60+
}
61+
62+
/// Extends `self` with an iterator of spks.
63+
pub fn add_spks(&mut self, spks: impl IntoIterator<Item = ScriptBuf>) {
64+
self.spks.extend(spks)
65+
}
66+
67+
/// Add spk to the list of spks to scan with.
68+
pub fn add_spk(&mut self, spk: ScriptBuf) {
69+
self.spks.push(spk);
70+
}
71+
72+
/// Get the next filter and increment the current best height.
73+
///
74+
/// Returns `Ok(None)` when the stop height is exceeded.
75+
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
76+
if self.height > self.stop {
77+
return Ok(None);
78+
}
79+
let height = self.height;
80+
let hash = match self.blocks.get(&height) {
81+
Some(h) => *h,
82+
None => self.client.get_block_hash(height as u64)?,
83+
};
84+
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
85+
let filter = BlockFilter::new(&filter_bytes);
86+
self.height += 1;
87+
Ok(Some((BlockId { height, hash }, filter)))
88+
}
89+
90+
/// Get the remote tip.
91+
///
92+
/// Returns `None` if the remote height is not strictly greater than the height of this
93+
/// [`FilterIter`].
94+
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
95+
let tip_hash = self.client.get_best_block_hash()?;
96+
let mut header = self.client.get_block_header_info(&tip_hash)?;
97+
let tip_height = header.height as u32;
98+
if self.height >= tip_height {
99+
// nothing to do
100+
return Ok(None);
101+
}
102+
self.blocks.insert(tip_height, tip_hash);
103+
104+
// if we have a checkpoint we use a lookback of ten blocks
105+
// to ensure consistency of the local chain
106+
if let Some(cp) = self.cp.as_ref() {
107+
// adjust start height to point of agreement + 1
108+
let base = self.find_base_with(cp.clone())?;
109+
self.height = base.height + 1;
110+
111+
for _ in 0..9 {
112+
let hash = match header.previous_block_hash {
113+
Some(hash) => hash,
114+
None => break,
115+
};
116+
header = self.client.get_block_header_info(&hash)?;
117+
let height = header.height as u32;
118+
if height < self.height {
119+
break;
120+
}
121+
self.blocks.insert(height, hash);
122+
}
123+
}
124+
125+
self.stop = tip_height;
126+
127+
Ok(Some(BlockId {
128+
height: tip_height,
129+
hash: tip_hash,
130+
}))
131+
}
132+
}
133+
134+
/// Alias for a compact filter and associated block id.
135+
type NextFilter = (BlockId, BlockFilter);
136+
137+
/// Event inner type
138+
#[derive(Debug, Clone)]
139+
pub struct EventInner {
140+
/// Height
141+
pub height: Height,
142+
/// Block
143+
pub block: Block,
144+
}
145+
146+
/// Kind of event produced by [`FilterIter`].
147+
#[derive(Debug, Clone)]
148+
pub enum Event {
149+
/// Block
150+
Block(EventInner),
151+
/// No match
152+
NoMatch(Height),
153+
}
154+
155+
impl Event {
156+
/// Whether this event contains a matching block.
157+
pub fn is_match(&self) -> bool {
158+
matches!(self, Event::Block(_))
159+
}
160+
161+
/// Get the height of this event.
162+
pub fn height(&self) -> Height {
163+
match self {
164+
Self::Block(EventInner { height, .. }) => *height,
165+
Self::NoMatch(h) => *h,
166+
}
167+
}
168+
}
169+
170+
impl<C: RpcApi> Iterator for FilterIter<'_, C> {
171+
type Item = Result<Event, Error>;
172+
173+
fn next(&mut self) -> Option<Self::Item> {
174+
(|| -> Result<_, Error> {
175+
// if the next filter matches any of our watched spks, get the block
176+
// and return it, inserting relevant block ids along the way
177+
self.next_filter()?.map_or(Ok(None), |(block, filter)| {
178+
let height = block.height;
179+
let hash = block.hash;
180+
181+
if self.spks.is_empty() {
182+
Err(Error::NoScripts)
183+
} else if filter
184+
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
185+
.map_err(Error::Bip158)?
186+
{
187+
let block = self.client.get_block(&hash)?;
188+
self.blocks.insert(height, hash);
189+
let inner = EventInner { height, block };
190+
Ok(Some(Event::Block(inner)))
191+
} else {
192+
Ok(Some(Event::NoMatch(height)))
193+
}
194+
})
195+
})()
196+
.transpose()
197+
}
198+
}
199+
200+
impl<C: RpcApi> FilterIter<'_, C> {
201+
/// Returns the point of agreement between `self` and the given `cp`.
202+
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
203+
loop {
204+
let height = cp.height();
205+
let fetched_hash = match self.blocks.get(&height) {
206+
Some(hash) => *hash,
207+
None if height == 0 => cp.hash(),
208+
_ => self.client.get_block_hash(height as _)?,
209+
};
210+
if cp.hash() == fetched_hash {
211+
// ensure this block also exists in self
212+
self.blocks.insert(height, cp.hash());
213+
return Ok(cp.block_id());
214+
}
215+
// remember conflicts
216+
self.blocks.insert(height, fetched_hash);
217+
cp = cp.prev().expect("must break before genesis");
218+
}
219+
}
220+
221+
/// Returns a chain update from the newly scanned blocks.
222+
///
223+
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
224+
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
225+
pub fn chain_update(&mut self) -> Option<CheckPoint> {
226+
if self.cp.is_none() || self.blocks.is_empty() {
227+
return None;
228+
}
229+
230+
// note: to connect with the local chain we must guarantee that `self.blocks.first()`
231+
// is also the point of agreement with `self.cp`.
232+
Some(
233+
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
234+
.expect("blocks must be in order"),
235+
)
236+
}
237+
}
238+
239+
/// Errors that may occur during a compact filters sync.
240+
#[derive(Debug)]
241+
pub enum Error {
242+
/// bitcoin bip158 error
243+
Bip158(bip158::Error),
244+
/// attempted to scan blocks without any script pubkeys
245+
NoScripts,
246+
/// `bitcoincore_rpc` error
247+
Rpc(bitcoincore_rpc::Error),
248+
}
249+
250+
impl From<bitcoincore_rpc::Error> for Error {
251+
fn from(e: bitcoincore_rpc::Error) -> Self {
252+
Self::Rpc(e)
253+
}
254+
}
255+
256+
impl fmt::Display for Error {
257+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258+
match self {
259+
Self::Bip158(e) => e.fmt(f),
260+
Self::NoScripts => write!(f, "no script pubkeys were provided to match with"),
261+
Self::Rpc(e) => e.fmt(f),
262+
}
263+
}
264+
}
265+
266+
#[cfg(feature = "std")]
267+
impl std::error::Error for Error {}

0 commit comments

Comments
 (0)