Skip to content

fix(bitcoind_rpc): properly handle reorgs in FilterIter #1985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 129 additions & 74 deletions crates/bitcoind_rpc/src/bip158.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki

use bdk_core::collections::BTreeMap;
use bdk_core::collections::{BTreeMap, BTreeSet};
use core::fmt;

use bdk_core::bitcoin;
Expand All @@ -15,8 +15,8 @@ use bitcoin::{
bip158::{self, BlockFilter},
Block, BlockHash, ScriptBuf,
};
use bitcoincore_rpc;
use bitcoincore_rpc::RpcApi;
use bitcoincore_rpc::{self, jsonrpc};

/// Block height
type Height = u32;
Expand All @@ -33,6 +33,10 @@ pub struct FilterIter<'c, C> {
cp: Option<CheckPoint>,
// blocks map
blocks: BTreeMap<Height, BlockHash>,
// set of heights with filters that matched any watched SPK
matched: BTreeSet<Height>,
// initial height
start: Height,
// best height counter
height: Height,
// stop height
Expand All @@ -47,6 +51,8 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
spks: vec![],
cp: None,
blocks: BTreeMap::new(),
matched: BTreeSet::new(),
start: height,
height,
stop: 0,
}
Expand All @@ -69,57 +75,28 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
self.spks.push(spk);
}

/// Get the next filter and increment the current best height.
///
/// Returns `Ok(None)` when the stop height is exceeded.
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
if self.height > self.stop {
return Ok(None);
}
let height = self.height;
let hash = match self.blocks.get(&height) {
Some(h) => *h,
None => self.client.get_block_hash(height as u64)?,
};
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
let filter = BlockFilter::new(&filter_bytes);
self.height += 1;
Ok(Some((BlockId { height, hash }, filter)))
/// Get the block hash by `height` if it is found in the blocks map.
fn get_block_hash(&self, height: &Height) -> Option<BlockHash> {
self.blocks.get(height).copied()
}

/// Get the remote tip.
///
/// Returns `None` if the remote height is not strictly greater than the height of this
/// [`FilterIter`].
/// Returns `None` if the remote height is less than the height of this [`FilterIter`].
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
Comment on lines +85 to 86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a little out of scope for this PR, but the documentation here is a little lacking.

get_tip, as I understand, serves the following purposes:

  1. Initialize the FilerIter. Without calling it, FilterIter returns nothing.
  2. If it returns None, it means we are up to date with remote.

Is this the case?

If so, purpose 2. is not satisfied because reorgs are not detected by get_tip.

@ValuedMammal are you able to provide some clarity, thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_tip is used to set the start and stop height of a scan and is only necessary to call once per instance of FilterIter. During review I was going to mention that I think get_tip should actually clear self.blocks, because I didn't anticipate the user interleaving calls to get_tip and next.

Copy link
Member

@evanlinjin evanlinjin Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ValuedMammal Thank you for the update.

The internal implementation of FilterIter implies that it is a single-use structure (construct once, call .next till end). However, the API design of it implies something different.

What do you think about changing the API to properly represent a "single-use" workflow? I.e. I would change the following:

  • Remove or unexpose add_spk{s}.
  • Change constructor methods to also take in spks. Return None if there are no spks provided.
  • Make sure get_tip logic is run on the first call to next.
  • Rename get_tip to init which would return the target height.

(Separate PR of course)

Let me know what you think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, and can make a follow up PR to implement this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it makes sense to make the API simpler/safer by making it a one time use struct that takes spks in the constructor. @LagginTimes please open a new issue or PR so we can continue exact implementation changes there.

let tip_hash = self.client.get_best_block_hash()?;
let mut header = self.client.get_block_header_info(&tip_hash)?;
let header = self.client.get_block_header_info(&tip_hash)?;
let tip_height = header.height as u32;
if self.height >= tip_height {
// Allow returning tip if we're exactly at it. Return `None`` if we've already scanned past.
if self.height > tip_height {
// nothing to do
return Ok(None);
}
self.blocks.insert(tip_height, tip_hash);

// if we have a checkpoint we use a lookback of ten blocks
// to ensure consistency of the local chain
// start scanning from point of agreement + 1
if let Some(cp) = self.cp.as_ref() {
// adjust start height to point of agreement + 1
let base = self.find_base_with(cp.clone())?;
self.height = base.height + 1;

for _ in 0..9 {
let hash = match header.previous_block_hash {
Some(hash) => hash,
None => break,
};
header = self.client.get_block_header_info(&hash)?;
let height = header.height as u32;
if height < self.height {
break;
}
self.blocks.insert(height, hash);
}
self.height = base.height.saturating_add(1);
}

self.stop = tip_height;
Expand All @@ -131,9 +108,6 @@ impl<'c, C: RpcApi> FilterIter<'c, C> {
}
}

/// Alias for a compact filter and associated block id.
type NextFilter = (BlockId, BlockFilter);

/// Event inner type
#[derive(Debug, Clone)]
pub struct EventInner {
Expand Down Expand Up @@ -171,29 +145,7 @@ impl<C: RpcApi> Iterator for FilterIter<'_, C> {
type Item = Result<Event, Error>;

fn next(&mut self) -> Option<Self::Item> {
(|| -> Result<_, Error> {
// if the next filter matches any of our watched spks, get the block
// and return it, inserting relevant block ids along the way
self.next_filter()?.map_or(Ok(None), |(block, filter)| {
let height = block.height;
let hash = block.hash;

if self.spks.is_empty() {
Err(Error::NoScripts)
} else if filter
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
.map_err(Error::Bip158)?
{
let block = self.client.get_block(&hash)?;
self.blocks.insert(height, hash);
let inner = EventInner { height, block };
Ok(Some(Event::Block(inner)))
} else {
Ok(Some(Event::NoMatch(height)))
}
})
})()
.transpose()
self.next_event().transpose()
}
}

Expand All @@ -202,8 +154,8 @@ impl<C: RpcApi> FilterIter<'_, C> {
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
loop {
let height = cp.height();
let fetched_hash = match self.blocks.get(&height) {
Some(hash) => *hash,
let fetched_hash = match self.get_block_hash(&height) {
Some(hash) => hash,
None if height == 0 => cp.hash(),
_ => self.client.get_block_hash(height as _)?,
};
Expand All @@ -221,19 +173,122 @@ impl<C: RpcApi> FilterIter<'_, C> {
/// Returns a chain update from the newly scanned blocks.
///
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
/// if not all events have been emitted (by calling `next`).
pub fn chain_update(&mut self) -> Option<CheckPoint> {
if self.cp.is_none() || self.blocks.is_empty() {
if self.cp.is_none() || self.blocks.is_empty() || self.height <= self.stop {
return None;
}

// note: to connect with the local chain we must guarantee that `self.blocks.first()`
// is also the point of agreement with `self.cp`.
// We return blocks up to and including the initial height, all of the matching blocks,
// and blocks in the terminal range.
let tail_range = self.stop.saturating_sub(9)..=self.stop;
Some(
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
.expect("blocks must be in order"),
CheckPoint::from_block_ids(self.blocks.iter().filter_map(|(&height, &hash)| {
if height <= self.start
|| self.matched.contains(&height)
|| tail_range.contains(&height)
{
Some(BlockId { height, hash })
} else {
None
}
}))
.expect("blocks must be in order"),
)
}

fn next_event(&mut self) -> Result<Option<Event>, Error> {
let (height, hash) = match self.find_next_block()? {
None => return Ok(None),
Some((height, _)) if height > self.stop => return Ok(None),
Some(block) => block,
};

// Emit and increment `height` (which should really be `next_height`).
let is_match = BlockFilter::new(&self.client.get_block_filter(&hash)?.filter)
.match_any(&hash, self.spks.iter().map(ScriptBuf::as_ref))
.map_err(Error::Bip158)?;

let event = if is_match {
Event::Block(EventInner {
height,
block: self.client.get_block(&hash)?,
})
} else {
Event::NoMatch(height)
};

// Mutate internal state at the end, once we are sure there are no more errors.
if is_match {
self.matched.insert(height);
}
self.matched.split_off(&height);
self.blocks.split_off(&height);
self.blocks.insert(height, hash);
self.height = height.saturating_add(1);
self.cp = self
.cp
.as_ref()
.and_then(|cp| cp.range(..=cp.height()).next());

Ok(Some(event))
}

/// Non-mutating method that finds the next block which connects with our previously-emitted
/// history.
fn find_next_block(&self) -> Result<Option<(Height, BlockHash)>, bitcoincore_rpc::Error> {
let mut height = self.height;

// Search blocks backwards until we find a block which connects with something the consumer
// has already seen.
let hash = loop {
let hash = match self.client.get_block_hash(height as _) {
Ok(hash) => hash,
Err(bitcoincore_rpc::Error::JsonRpc(jsonrpc::Error::Rpc(rpc_err)))
// -8: Out of bounds, -5: Not found
if rpc_err.code == -8 || rpc_err.code == -5 =>
{
return Ok(None)
}
Err(err) => return Err(err),
};
let header = self.client.get_block_header(&hash)?;

let prev_height = match height.checked_sub(1) {
Some(prev_height) => prev_height,
// Always emit the genesis block as it cannot change.
None => break hash,
};

let prev_hash_remote = header.prev_blockhash;
if let Some(&prev_hash) = self.blocks.get(&prev_height) {
if prev_hash == prev_hash_remote {
break hash;
}
height = prev_height;
continue;
}

let maybe_prev_cp = self
.cp
.as_ref()
.and_then(|cp| cp.range(..=prev_height).next());
if let Some(prev_cp) = maybe_prev_cp {
if prev_cp.height() != prev_height {
// Try again at a height that the consumer can compare against.
height = prev_cp.height();
continue;
}
if prev_cp.hash() != prev_hash_remote {
height = prev_height;
continue;
}
}
break hash;
};

Ok(Some((height, hash)))
}
}

/// Errors that may occur during a compact filters sync.
Expand Down
Loading
Loading