Skip to content

Introduce IntentTracker #257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions wallet/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub struct LocalOutput {
pub derivation_index: u32,
/// The position of the output in the blockchain.
pub chain_position: ChainPosition<ConfirmationBlockTime>,
/// Whether this output exists in a transaction that is yet to be broadcasted.
pub needs_broadcast: bool,
Copy link
Contributor

@nymius nymius Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an enum state field with something like: UNSPENT, ON_QUEUE, SPENT, BROADCASTED will avoid keep adding new boolean fields here, and provide a better path for update on future occasions, taking advantage of non exhaustive patters. is_spent could be marked for deprecation and be used along the new field in the meantime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea if done as a non-exhaustive enum to help reduce future API breaking changes. If we include a "LOCKED" variant could this also support #259?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we expect this to be set/unset exactly? I guess it can only be unset once the transaction in question reaches threshold confirmations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point, and it shows the limitations of the BroadcastQueue concept. In fact, I did some further thinking on this and the BroadcastQueue should really be an IntentTracker and should track txs even if they are "network canonical".

There should be a method such as .tracked_txs_which_are_not_network_canonical (better name needed) so that the caller can decide to either replace the tx, or explicitly forget about it. There are caveats to doing both since we don't want to create a sub-graph where intended payments are duplicated - BDK should handle these situations properly, or provide the required information so that the caller can make a safe decision.

}

/// A [`Utxo`] with its `satisfaction_weight`.
Expand Down
248 changes: 248 additions & 0 deletions wallet/src/wallet/broadcast_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
//! Unbroadcasted transaction queue.

use alloc::vec::Vec;
use chain::tx_graph;
use chain::Anchor;
use chain::TxGraph;

use crate::collections::HashSet;
use crate::collections::VecDeque;

use bitcoin::Txid;
use chain::Merge;

/// An ordered unbroadcasted list.
///
/// It is ordered in case of RBF txs.
#[derive(Debug, Clone, Default)]
pub struct BroadcastQueue {
queue: VecDeque<Txid>,

/// Enforces that we do not have duplicates in `queue`.
dedup: HashSet<Txid>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's worth having this separate set? How many unbroadcasted transactions are we expecting at any given time? Maybe it would just be quicker to simply iterate over the queue itself, also saving the heap allocations/memory footprint?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. Maybe premature optimization here.

}

/// Represents a single mutation to [`BroadcastQueue`].
#[derive(Debug, Clone, PartialEq, serde::Deserialize, serde::Serialize)]
pub enum Mutation {
/// A push to the back of the queue.
Push(Txid),
/// A removal from the queue.
Remove(Txid),
}

/// A list of mutations made to [`BroadcastQueue`].
#[must_use]
#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize, serde::Serialize)]
pub struct ChangeSet {
/// Mutations.
pub mutations: Vec<Mutation>,
}

impl Merge for ChangeSet {
fn merge(&mut self, other: Self) {
self.mutations.extend(other.mutations);
}

fn is_empty(&self) -> bool {
self.mutations.is_empty()
}
}

impl BroadcastQueue {
/// Construct [`Unbroadcasted`] from the given `changeset`.
pub fn from_changeset(changeset: ChangeSet) -> Self {
let mut out = BroadcastQueue::default();
out.apply_changeset(changeset);
out
}

/// Apply the given `changeset`.
pub fn apply_changeset(&mut self, changeset: ChangeSet) {
for mutation in changeset.mutations {
match mutation {
Mutation::Push(txid) => self._push(txid),
Mutation::Remove(txid) => self._remove(txid),
};
}
}

/// Whether the `txid` exists in the queue.
pub fn contains(&self, txid: Txid) -> bool {
self.dedup.contains(&txid)
}

/// Push a `txid` to the queue if it does not already exist.
///
/// # Warning
///
/// This does not get rid of conflicting transactions already in the queue.
pub fn push(&mut self, txid: Txid) -> ChangeSet {
let mut changeset = ChangeSet::default();
if self._push(txid) {
changeset.mutations.push(Mutation::Push(txid));
}
changeset
}
fn _push(&mut self, txid: Txid) -> bool {
if self.dedup.insert(txid) {
self.queue.push_back(txid);
return true;
}
false
}

/// Push a `txid` to the broadcast queue (if it does not exist already) and displaces all
/// coflicting txids in the queue.
pub fn push_and_displace_conflicts<A>(&mut self, tx_graph: &TxGraph<A>, txid: Txid) -> ChangeSet
where
A: Anchor,
{
let mut changeset = ChangeSet::default();

let tx = match tx_graph.get_tx(txid) {
Some(tx) => tx,
None => {
debug_assert!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to throw and Err here instead of the panic? It seems possible a user could mistakenly try to queue a Txid not in the tx_graph. Or is this ment to warn app devs that they should never let this happen?

I also don't understand why you only panic if the txid is not in the tx_graph and not in the dedup set. Isn't not having the Txid in the graph enough to panic due to it being invalid?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this was never meant to be in the public API. The idea is that we should only add txids into the BroadcastQueue which are also in TxGraph. If that is not the case, it is definitely an internal BDK error.

!self.dedup.contains(&txid),
"Cannot have txid in queue which has no corresponding tx in graph"
);
return changeset;
}
};

if self._push(txid) {
changeset.mutations.push(Mutation::Push(txid));

for txid in tx_graph.walk_conflicts(&tx, |_, conflict_txid| Some(conflict_txid)) {
if self._remove(txid) {
changeset.mutations.push(Mutation::Remove(txid));
}
}
}

changeset
}

/// Returns the next `txid` of the queue to broadcast which has no dependencies to other
/// transactions in the queue.
pub fn next_to_broadcast<A>(&self, tx_graph: &TxGraph<A>) -> Option<Txid>
where
A: Anchor,
{
self.queue.iter().copied().find(|&txid| {
let tx = match tx_graph.get_tx(txid) {
Some(tx) => tx,
None => return false,
};
if tx
.input
.iter()
.any(|txin| self.dedup.contains(&txin.previous_output.txid))
{
return false;
}
true
})
}

/// Returns unbroadcasted dependencies of the given `txid`.
///
/// The returned `Vec` is in broadcast order.
pub fn unbroadcasted_dependencies<A>(&self, tx_graph: &TxGraph<A>, txid: Txid) -> Vec<Txid>
where
A: Anchor,
{
let tx = match tx_graph.get_tx(txid) {
Some(tx) => tx,
None => return Vec::new(),
};
let mut txs = tx_graph
.walk_ancestors(tx, |_depth, ancestor_tx| {
let ancestor_txid = ancestor_tx.compute_txid();
if self.dedup.contains(&ancestor_txid) {
Some(ancestor_txid)
} else {
None
}
})
.collect::<Vec<_>>();
txs.reverse();
txs
}

/// Untracks and removes a transaction from the broadcast queue.
///
/// Transactions are automatically removed from the queue upon successful broadcast, so calling
/// this method directly is typically not required.
pub fn remove(&mut self, txid: Txid) -> ChangeSet {
let mut changeset = ChangeSet::default();
if self._remove(txid) {
changeset.mutations.push(Mutation::Remove(txid));
}
changeset
}
fn _remove(&mut self, txid: Txid) -> bool {
if self.dedup.remove(&txid) {
let i = (0..self.queue.len())
.zip(self.queue.iter().copied())
.find_map(|(i, queue_txid)| if queue_txid == txid { Some(i) } else { None })
.expect("must exist in queue to exist in `queue`");
let _removed = self.queue.remove(i);
debug_assert_eq!(_removed, Some(txid));
return true;
}
false
}

/// Untracks and removes a transaction and it's descendants from the broadcast queue.
pub fn remove_and_displace_dependants<A>(
&mut self,
tx_graph: &TxGraph<A>,
txid: Txid,
) -> ChangeSet
where
A: Anchor,
{
let mut changeset = ChangeSet::default();

if self._remove(txid) {
changeset.mutations.push(Mutation::Remove(txid));
for txid in tx_graph.walk_descendants(txid, |_depth, txid| Some(txid)) {
if self._remove(txid) {
changeset.mutations.push(Mutation::Remove(txid));
}
}
}
changeset
}

/// Untrack transactions that are given anchors and/or mempool timestamps.
pub fn filter_from_graph_changeset<A>(
&mut self,
graph_changeset: &tx_graph::ChangeSet<A>,
) -> ChangeSet {
let mut changeset = ChangeSet::default();
let s_txids = graph_changeset.last_seen.keys().copied();
let a_txids = graph_changeset.anchors.iter().map(|(_, txid)| *txid);
let e_txids = graph_changeset.last_evicted.keys().copied();
for txid in s_txids.chain(a_txids).chain(e_txids) {
changeset.merge(self.remove(txid));
}
changeset
}

/// Txids ordered by precedence.
///
/// Transactions with greater precedence will appear later in this list.
pub fn txids(&self) -> impl ExactSizeIterator<Item = Txid> + '_ {
self.queue.iter().copied()
}

/// Initial changeset.
pub fn initial_changeset(&self) -> ChangeSet {
ChangeSet {
mutations: self.queue.iter().copied().map(Mutation::Push).collect(),
}
}
}
64 changes: 63 additions & 1 deletion wallet/src/wallet/changeset.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use bdk_chain::{
indexed_tx_graph, keychain_txout, local_chain, tx_graph, ConfirmationBlockTime, Merge,
};
use bitcoin::Txid;
use miniscript::{Descriptor, DescriptorPublicKey};
use serde::{Deserialize, Serialize};

use super::broadcast_queue;

type IndexedTxGraphChangeSet =
indexed_tx_graph::ChangeSet<ConfirmationBlockTime, keychain_txout::ChangeSet>;

Expand Down Expand Up @@ -114,6 +117,9 @@ pub struct ChangeSet {
pub tx_graph: tx_graph::ChangeSet<ConfirmationBlockTime>,
/// Changes to [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex).
pub indexer: keychain_txout::ChangeSet,
/// Changes to [`BroadcastQueue`](broadcast_queue::BroadcastQueue).
#[serde(default)]
pub broadcast_queue: broadcast_queue::ChangeSet,
}

impl Merge for ChangeSet {
Expand Down Expand Up @@ -145,6 +151,7 @@ impl Merge for ChangeSet {
Merge::merge(&mut self.local_chain, other.local_chain);
Merge::merge(&mut self.tx_graph, other.tx_graph);
Merge::merge(&mut self.indexer, other.indexer);
Merge::merge(&mut self.broadcast_queue, other.broadcast_queue);
}

fn is_empty(&self) -> bool {
Expand All @@ -154,6 +161,7 @@ impl Merge for ChangeSet {
&& self.local_chain.is_empty()
&& self.tx_graph.is_empty()
&& self.indexer.is_empty()
&& self.broadcast_queue.is_empty()
}
}

Expand All @@ -163,6 +171,8 @@ impl ChangeSet {
pub const WALLET_SCHEMA_NAME: &'static str = "bdk_wallet";
/// Name of table to store wallet descriptors and network.
pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet";
/// Name of table to store broadcast queue txids.
pub const WALLET_BROADCAST_QUEUE_TABLE_NAME: &'static str = "bdk_wallet_broadcast_queue";

/// Get v0 sqlite [ChangeSet] schema
pub fn schema_v0() -> alloc::string::String {
Expand All @@ -177,12 +187,23 @@ impl ChangeSet {
)
}

/// Get v1 sqlite [`ChangeSet`] schema.
pub fn schema_v1() -> alloc::string::String {
format!(
"CREATE TABLE {} ( \
id INTEGER PRIMARY KEY AUTOINCREMENT,
txid TEXT NOT NULL UNIQUE \
) STRICT;",
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME,
)
}

/// Initialize sqlite tables for wallet tables.
pub fn init_sqlite_tables(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result<()> {
crate::rusqlite_impl::migrate_schema(
db_tx,
Self::WALLET_SCHEMA_NAME,
&[&Self::schema_v0()],
&[&Self::schema_v0(), &Self::schema_v1()],
)?;

bdk_chain::local_chain::ChangeSet::init_sqlite_tables(db_tx)?;
Expand Down Expand Up @@ -220,6 +241,19 @@ impl ChangeSet {
changeset.network = network.map(Impl::into_inner);
}

let mut queue_statement = db_tx.prepare(&format!(
"SELECT txid FROM {} ORDER BY id ASC",
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
))?;
let row_iter = queue_statement.query_map([], |row| row.get::<_, Impl<Txid>>("txid"))?;
for row in row_iter {
let Impl(txid) = row?;
changeset
.broadcast_queue
.mutations
.push(broadcast_queue::Mutation::Push(txid));
}

changeset.local_chain = local_chain::ChangeSet::from_sqlite(db_tx)?;
changeset.tx_graph = tx_graph::ChangeSet::<_>::from_sqlite(db_tx)?;
changeset.indexer = keychain_txout::ChangeSet::from_sqlite(db_tx)?;
Expand Down Expand Up @@ -268,6 +302,25 @@ impl ChangeSet {
})?;
}

let mut queue_insert_statement = db_tx.prepare_cached(&format!(
"INSERT OR IGNORE INTO {}(txid) VALUES(:txid)",
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
))?;
let mut queue_remove_statement = db_tx.prepare_cached(&format!(
"DELETE FROM {} WHERE txid=:txid",
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
))?;
for mutation in &self.broadcast_queue.mutations {
match mutation {
broadcast_queue::Mutation::Push(txid) => {
queue_insert_statement.execute(named_params! { ":txid": Impl(*txid) })?;
}
broadcast_queue::Mutation::Remove(txid) => {
queue_remove_statement.execute(named_params! { ":txid": Impl(*txid) })?;
}
}
}

self.local_chain.persist_to_sqlite(db_tx)?;
self.tx_graph.persist_to_sqlite(db_tx)?;
self.indexer.persist_to_sqlite(db_tx)?;
Expand Down Expand Up @@ -311,3 +364,12 @@ impl From<keychain_txout::ChangeSet> for ChangeSet {
}
}
}

impl From<broadcast_queue::ChangeSet> for ChangeSet {
fn from(broadcast_queue: broadcast_queue::ChangeSet) -> Self {
Self {
broadcast_queue,
..Default::default()
}
}
}
Loading
Loading