Skip to content

Commit c798c5b

Browse files
fix(bitcoind_rpc): FilterIter detects reorgs
Co-authored-by: valued mammal <valuedmammal@protonmail.com>
1 parent 4ae06b4 commit c798c5b

File tree

2 files changed

+122
-70
lines changed

2 files changed

+122
-70
lines changed

crates/bitcoind_rpc/src/bip158.rs

Lines changed: 122 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
77
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
88
9-
use bdk_core::collections::BTreeMap;
9+
use bdk_core::collections::{BTreeMap, BTreeSet};
1010
use core::fmt;
1111

1212
use bdk_core::bitcoin;
@@ -33,20 +33,29 @@ pub struct FilterIter<'c, C> {
3333
cp: Option<CheckPoint>,
3434
// blocks map
3535
blocks: BTreeMap<Height, BlockHash>,
36+
// set of heights with filters that matched any watched SPK
37+
matched: BTreeSet<Height>,
38+
// initial height
39+
start: Height,
3640
// best height counter
3741
height: Height,
3842
// stop height
3943
stop: Height,
4044
}
4145

4246
impl<'c, C: RpcApi> FilterIter<'c, C> {
47+
/// Hard cap on how far to walk back when a reorg is detected.
48+
const MAX_REORG_DEPTH: u32 = 100;
49+
4350
/// Construct [`FilterIter`] from a given `client` and start `height`.
4451
pub fn new_with_height(client: &'c C, height: u32) -> Self {
4552
Self {
4653
client,
4754
spks: vec![],
4855
cp: None,
4956
blocks: BTreeMap::new(),
57+
matched: BTreeSet::new(),
58+
start: height,
5059
height,
5160
stop: 0,
5261
}
@@ -69,22 +78,14 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
6978
self.spks.push(spk);
7079
}
7180

72-
/// Get the next filter and increment the current best height.
73-
///
74-
/// Returns `Ok(None)` when the stop height is exceeded.
75-
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
76-
if self.height > self.stop {
77-
return Ok(None);
78-
}
79-
let height = self.height;
80-
let hash = match self.blocks.get(&height) {
81-
Some(h) => *h,
82-
None => self.client.get_block_hash(height as u64)?,
83-
};
84-
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
85-
let filter = BlockFilter::new(&filter_bytes);
86-
self.height += 1;
87-
Ok(Some((BlockId { height, hash }, filter)))
81+
/// Get the block hash by `height` if it is found in the blocks map.
82+
fn get_block_hash(&self, height: &Height) -> Option<BlockHash> {
83+
self.blocks.get(height).copied()
84+
}
85+
86+
/// Insert a (non-matching) block height and hash into the blocks map.
87+
fn insert_block(&mut self, height: Height, hash: BlockHash) {
88+
self.blocks.insert(height, hash);
8889
}
8990

9091
/// Get the remote tip.
@@ -93,33 +94,17 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
9394
/// [`FilterIter`].
9495
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
9596
let tip_hash = self.client.get_best_block_hash()?;
96-
let mut header = self.client.get_block_header_info(&tip_hash)?;
97+
let header = self.client.get_block_header_info(&tip_hash)?;
9798
let tip_height = header.height as u32;
9899
if self.height >= tip_height {
99100
// nothing to do
100101
return Ok(None);
101102
}
102-
self.blocks.insert(tip_height, tip_hash);
103103

104-
// if we have a checkpoint we use a lookback of ten blocks
105-
// to ensure consistency of the local chain
104+
// start scanning from point of agreement + 1
106105
if let Some(cp) = self.cp.as_ref() {
107-
// adjust start height to point of agreement + 1
108106
let base = self.find_base_with(cp.clone())?;
109-
self.height = base.height + 1;
110-
111-
for _ in 0..9 {
112-
let hash = match header.previous_block_hash {
113-
Some(hash) => hash,
114-
None => break,
115-
};
116-
header = self.client.get_block_header_info(&hash)?;
117-
let height = header.height as u32;
118-
if height < self.height {
119-
break;
120-
}
121-
self.blocks.insert(height, hash);
122-
}
107+
self.height = base.height.saturating_add(1);
123108
}
124109

125110
self.stop = tip_height;
@@ -131,9 +116,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
131116
}
132117
}
133118

134-
/// Alias for a compact filter and associated block id.
135-
type NextFilter = (BlockId, BlockFilter);
136-
137119
/// Event inner type
138120
#[derive(Debug, Clone)]
139121
pub struct EventInner {
@@ -171,27 +153,85 @@ impl<C: RpcApi> Iterator for FilterIter<'_, C> {
171153
type Item = Result<Event, Error>;
172154

173155
fn next(&mut self) -> Option<Self::Item> {
174-
(|| -> Result<_, Error> {
175-
// if the next filter matches any of our watched spks, get the block
176-
// and return it, inserting relevant block ids along the way
177-
self.next_filter()?.map_or(Ok(None), |(block, filter)| {
178-
let height = block.height;
179-
let hash = block.hash;
180-
181-
if self.spks.is_empty() {
182-
Err(Error::NoScripts)
183-
} else if filter
184-
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
185-
.map_err(Error::Bip158)?
186-
{
187-
let block = self.client.get_block(&hash)?;
188-
self.blocks.insert(height, hash);
189-
let inner = EventInner { height, block };
190-
Ok(Some(Event::Block(inner)))
191-
} else {
192-
Ok(Some(Event::NoMatch(height)))
156+
(|| -> Result<Option<_>, Error> {
157+
if self.height > self.stop {
158+
return Ok(None);
159+
}
160+
// Fetch next filter.
161+
let mut height = self.height;
162+
let mut hash = self.client.get_block_hash(height as _)?;
163+
let mut header = self.client.get_block_header(&hash)?;
164+
165+
// Detect and resolve reorgs: either block at height changed, or its parent changed.
166+
let stored_hash = self.blocks.get(&height).copied();
167+
let prev_hash = self.blocks.get(&(height - 1)).copied();
168+
169+
// If we've seen this height before but the hash has changed, or parent changed, trigger
170+
// reorg.
171+
let reorg_detected = if let Some(old_hash) = stored_hash {
172+
old_hash != hash
173+
} else if let Some(expected_prev) = prev_hash {
174+
header.prev_blockhash != expected_prev
175+
} else {
176+
false
177+
};
178+
179+
// Reorg detected, rewind to last known-good ancestor.
180+
if reorg_detected {
181+
self.blocks.remove(&height);
182+
183+
let mut reorg_depth = 0;
184+
loop {
185+
if reorg_depth >= Self::MAX_REORG_DEPTH || height == 0 {
186+
return Err(Error::ReorgDepthExceeded);
187+
}
188+
189+
height = height.saturating_sub(1);
190+
hash = self.client.get_block_hash(height as _)?;
191+
header = self.client.get_block_header(&hash)?;
192+
193+
let prev_height = height.saturating_sub(1);
194+
if prev_height > 0 {
195+
let prev_hash = self.client.get_block_hash(prev_height as _)?;
196+
self.insert_block(prev_height, prev_hash);
197+
}
198+
199+
if let Some(prev_hash) = self.blocks.get(&prev_height) {
200+
if header.prev_blockhash == *prev_hash {
201+
break;
202+
}
203+
}
204+
205+
reorg_depth += 1;
193206
}
194-
})
207+
208+
// Update self.height so we reprocess this height
209+
self.height = height;
210+
}
211+
212+
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
213+
let filter = BlockFilter::new(&filter_bytes);
214+
215+
// record the scanned block
216+
self.insert_block(height, hash);
217+
// increment best height
218+
self.height = height.saturating_add(1);
219+
220+
// If the filter matches any of our watched SPKs, fetch the full
221+
// block, and record the matching block entry.
222+
if self.spks.is_empty() {
223+
Err(Error::NoScripts)
224+
} else if filter
225+
.match_any(&hash, self.spks.iter().map(|s| s.as_bytes()))
226+
.map_err(Error::Bip158)?
227+
{
228+
let block = self.client.get_block(&hash)?;
229+
self.matched.insert(height);
230+
let inner = EventInner { height, block };
231+
Ok(Some(Event::Block(inner)))
232+
} else {
233+
Ok(Some(Event::NoMatch(height)))
234+
}
195235
})()
196236
.transpose()
197237
}
@@ -202,36 +242,46 @@ impl<C: RpcApi> FilterIter<'_, C> {
202242
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
203243
loop {
204244
let height = cp.height();
205-
let fetched_hash = match self.blocks.get(&height) {
206-
Some(hash) => *hash,
245+
let fetched_hash = match self.get_block_hash(&height) {
246+
Some(hash) => hash,
207247
None if height == 0 => cp.hash(),
208248
_ => self.client.get_block_hash(height as _)?,
209249
};
210250
if cp.hash() == fetched_hash {
211251
// ensure this block also exists in self
212-
self.blocks.insert(height, cp.hash());
252+
self.insert_block(height, cp.hash());
213253
return Ok(cp.block_id());
214254
}
215255
// remember conflicts
216-
self.blocks.insert(height, fetched_hash);
256+
self.insert_block(height, fetched_hash);
217257
cp = cp.prev().expect("must break before genesis");
218258
}
219259
}
220260

221261
/// Returns a chain update from the newly scanned blocks.
222262
///
223263
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
224-
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
264+
/// if not all events have been emitted (by calling `next`).
225265
pub fn chain_update(&mut self) -> Option<CheckPoint> {
226-
if self.cp.is_none() || self.blocks.is_empty() {
266+
if self.cp.is_none() || self.blocks.is_empty() || self.height <= self.stop {
227267
return None;
228268
}
229269

230-
// note: to connect with the local chain we must guarantee that `self.blocks.first()`
231-
// is also the point of agreement with `self.cp`.
270+
// We return blocks up to and including the initial height, all of the matching blocks,
271+
// and blocks in the terminal range.
272+
let tail_range = self.stop.saturating_sub(9)..=self.stop;
232273
Some(
233-
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
234-
.expect("blocks must be in order"),
274+
CheckPoint::from_block_ids(self.blocks.iter().filter_map(|(&height, &hash)| {
275+
if height <= self.start
276+
|| self.matched.contains(&height)
277+
|| tail_range.contains(&height)
278+
{
279+
Some(BlockId { height, hash })
280+
} else {
281+
None
282+
}
283+
}))
284+
.expect("blocks must be in order"),
235285
)
236286
}
237287
}
@@ -245,6 +295,8 @@ pub enum Error {
245295
NoScripts,
246296
/// `bitcoincore_rpc` error
247297
Rpc(bitcoincore_rpc::Error),
298+
/// `MAX_REORG_DEPTH` exceeded
299+
ReorgDepthExceeded,
248300
}
249301

250302
impl From<bitcoincore_rpc::Error> for Error {
@@ -259,6 +311,7 @@ impl fmt::Display for Error {
259311
Self::Bip158(e) => e.fmt(f),
260312
Self::NoScripts => write!(f, "no script pubkeys were provided to match with"),
261313
Self::Rpc(e) => e.fmt(f),
314+
Self::ReorgDepthExceeded => write!(f, "maximum reorg depth exceeded"),
262315
}
263316
}
264317
}

crates/bitcoind_rpc/tests/test_filter_iter.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,6 @@ fn filter_iter_handles_reorg() -> anyhow::Result<()> {
400400

401401
// Test that while a reorg is detected we delay incrementing the best height
402402
#[test]
403-
#[ignore]
404403
fn repeat_reorgs() -> anyhow::Result<()> {
405404
const MINE_TO: u32 = 11;
406405

0 commit comments

Comments
 (0)