From 75bc8925328557cde2956acf24c20bfca7a528d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Mon, 2 Jun 2025 17:51:01 +1000 Subject: [PATCH 1/5] feat!: Introduce `BroadcastQueue` --- wallet/src/types.rs | 2 + wallet/src/wallet/broadcast_queue.rs | 248 +++++++++++++++++++++++++++ wallet/src/wallet/changeset.rs | 64 ++++++- wallet/src/wallet/coin_selection.rs | 3 + wallet/src/wallet/mod.rs | 235 +++++++++++++++++++++---- wallet/src/wallet/tx_builder.rs | 4 +- 6 files changed, 525 insertions(+), 31 deletions(-) create mode 100644 wallet/src/wallet/broadcast_queue.rs diff --git a/wallet/src/types.rs b/wallet/src/types.rs index c15be6c5..f0cb85bf 100644 --- a/wallet/src/types.rs +++ b/wallet/src/types.rs @@ -73,6 +73,8 @@ pub struct LocalOutput { pub derivation_index: u32, /// The position of the output in the blockchain. pub chain_position: ChainPosition, + /// Whether this output exists in a transaction that is yet to be broadcasted. + pub needs_broadcast: bool, } /// A [`Utxo`] with its `satisfaction_weight`. diff --git a/wallet/src/wallet/broadcast_queue.rs b/wallet/src/wallet/broadcast_queue.rs new file mode 100644 index 00000000..2905c8be --- /dev/null +++ b/wallet/src/wallet/broadcast_queue.rs @@ -0,0 +1,248 @@ +//! Unbroadcasted transaction queue. + +use alloc::vec::Vec; +use chain::tx_graph; +use chain::Anchor; +use chain::TxGraph; + +use crate::collections::HashSet; +use crate::collections::VecDeque; + +use bitcoin::Txid; +use chain::Merge; + +/// An ordered unbroadcasted list. +/// +/// It is ordered in case of RBF txs. +#[derive(Debug, Clone, Default)] +pub struct BroadcastQueue { + queue: VecDeque, + + /// Enforces that we do not have duplicates in `queue`. + dedup: HashSet, +} + +/// Represents a single mutation to [`BroadcastQueue`]. +#[derive(Debug, Clone, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum Mutation { + /// A push to the back of the queue. + Push(Txid), + /// A removal from the queue. + Remove(Txid), +} + +/// A list of mutations made to [`BroadcastQueue`]. +#[must_use] +#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize, serde::Serialize)] +pub struct ChangeSet { + /// Mutations. + pub mutations: Vec, +} + +impl Merge for ChangeSet { + fn merge(&mut self, other: Self) { + self.mutations.extend(other.mutations); + } + + fn is_empty(&self) -> bool { + self.mutations.is_empty() + } +} + +impl BroadcastQueue { + /// Construct [`Unbroadcasted`] from the given `changeset`. + pub fn from_changeset(changeset: ChangeSet) -> Self { + let mut out = BroadcastQueue::default(); + out.apply_changeset(changeset); + out + } + + /// Apply the given `changeset`. + pub fn apply_changeset(&mut self, changeset: ChangeSet) { + for mutation in changeset.mutations { + match mutation { + Mutation::Push(txid) => self._push(txid), + Mutation::Remove(txid) => self._remove(txid), + }; + } + } + + /// Whether the `txid` exists in the queue. + pub fn contains(&self, txid: Txid) -> bool { + self.dedup.contains(&txid) + } + + /// Push a `txid` to the queue if it does not already exist. + /// + /// # Warning + /// + /// This does not get rid of conflicting transactions already in the queue. + pub fn push(&mut self, txid: Txid) -> ChangeSet { + let mut changeset = ChangeSet::default(); + if self._push(txid) { + changeset.mutations.push(Mutation::Push(txid)); + } + changeset + } + fn _push(&mut self, txid: Txid) -> bool { + if self.dedup.insert(txid) { + self.queue.push_back(txid); + return true; + } + false + } + + /// Push a `txid` to the broadcast queue (if it does not exist already) and displaces all + /// coflicting txids in the queue. + pub fn push_and_displace_conflicts(&mut self, tx_graph: &TxGraph, txid: Txid) -> ChangeSet + where + A: Anchor, + { + let mut changeset = ChangeSet::default(); + + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => { + debug_assert!( + !self.dedup.contains(&txid), + "Cannot have txid in queue which has no corresponding tx in graph" + ); + return changeset; + } + }; + + if self._push(txid) { + changeset.mutations.push(Mutation::Push(txid)); + + for txid in tx_graph.walk_conflicts(&tx, |_, conflict_txid| Some(conflict_txid)) { + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + } + } + + changeset + } + + /// Returns the next `txid` of the queue to broadcast which has no dependencies to other + /// transactions in the queue. + pub fn next_to_broadcast(&self, tx_graph: &TxGraph) -> Option + where + A: Anchor, + { + self.queue.iter().copied().find(|&txid| { + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => return false, + }; + if tx + .input + .iter() + .any(|txin| self.dedup.contains(&txin.previous_output.txid)) + { + return false; + } + true + }) + } + + /// Returns unbroadcasted dependencies of the given `txid`. + /// + /// The returned `Vec` is in broadcast order. + pub fn unbroadcasted_dependencies(&self, tx_graph: &TxGraph, txid: Txid) -> Vec + where + A: Anchor, + { + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => return Vec::new(), + }; + let mut txs = tx_graph + .walk_ancestors(tx, |_depth, ancestor_tx| { + let ancestor_txid = ancestor_tx.compute_txid(); + if self.dedup.contains(&ancestor_txid) { + Some(ancestor_txid) + } else { + None + } + }) + .collect::>(); + txs.reverse(); + txs + } + + /// Untracks and removes a transaction from the broadcast queue. + /// + /// Transactions are automatically removed from the queue upon successful broadcast, so calling + /// this method directly is typically not required. + pub fn remove(&mut self, txid: Txid) -> ChangeSet { + let mut changeset = ChangeSet::default(); + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + changeset + } + fn _remove(&mut self, txid: Txid) -> bool { + if self.dedup.remove(&txid) { + let i = (0..self.queue.len()) + .zip(self.queue.iter().copied()) + .find_map(|(i, queue_txid)| if queue_txid == txid { Some(i) } else { None }) + .expect("must exist in queue to exist in `queue`"); + let _removed = self.queue.remove(i); + debug_assert_eq!(_removed, Some(txid)); + return true; + } + false + } + + /// Untracks and removes a transaction and it's descendants from the broadcast queue. + pub fn remove_and_displace_dependants( + &mut self, + tx_graph: &TxGraph, + txid: Txid, + ) -> ChangeSet + where + A: Anchor, + { + let mut changeset = ChangeSet::default(); + + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + for txid in tx_graph.walk_descendants(txid, |_depth, txid| Some(txid)) { + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + } + } + changeset + } + + /// Untrack transactions that are given anchors and/or mempool timestamps. + pub fn filter_from_graph_changeset( + &mut self, + graph_changeset: &tx_graph::ChangeSet, + ) -> ChangeSet { + let mut changeset = ChangeSet::default(); + let s_txids = graph_changeset.last_seen.keys().copied(); + let a_txids = graph_changeset.anchors.iter().map(|(_, txid)| *txid); + let e_txids = graph_changeset.last_evicted.keys().copied(); + for txid in s_txids.chain(a_txids).chain(e_txids) { + changeset.merge(self.remove(txid)); + } + changeset + } + + /// Txids ordered by precedence. + /// + /// Transactions with greater precedence will appear later in this list. + pub fn txids(&self) -> impl ExactSizeIterator + '_ { + self.queue.iter().copied() + } + + /// Initial changeset. + pub fn initial_changeset(&self) -> ChangeSet { + ChangeSet { + mutations: self.queue.iter().copied().map(Mutation::Push).collect(), + } + } +} diff --git a/wallet/src/wallet/changeset.rs b/wallet/src/wallet/changeset.rs index ebfdb9fb..254a85a8 100644 --- a/wallet/src/wallet/changeset.rs +++ b/wallet/src/wallet/changeset.rs @@ -1,9 +1,12 @@ use bdk_chain::{ indexed_tx_graph, keychain_txout, local_chain, tx_graph, ConfirmationBlockTime, Merge, }; +use bitcoin::Txid; use miniscript::{Descriptor, DescriptorPublicKey}; use serde::{Deserialize, Serialize}; +use super::broadcast_queue; + type IndexedTxGraphChangeSet = indexed_tx_graph::ChangeSet; @@ -114,6 +117,9 @@ pub struct ChangeSet { pub tx_graph: tx_graph::ChangeSet, /// Changes to [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex). pub indexer: keychain_txout::ChangeSet, + /// Changes to [`BroadcastQueue`](broadcast_queue::BroadcastQueue). + #[serde(default)] + pub broadcast_queue: broadcast_queue::ChangeSet, } impl Merge for ChangeSet { @@ -145,6 +151,7 @@ impl Merge for ChangeSet { Merge::merge(&mut self.local_chain, other.local_chain); Merge::merge(&mut self.tx_graph, other.tx_graph); Merge::merge(&mut self.indexer, other.indexer); + Merge::merge(&mut self.broadcast_queue, other.broadcast_queue); } fn is_empty(&self) -> bool { @@ -154,6 +161,7 @@ impl Merge for ChangeSet { && self.local_chain.is_empty() && self.tx_graph.is_empty() && self.indexer.is_empty() + && self.broadcast_queue.is_empty() } } @@ -163,6 +171,8 @@ impl ChangeSet { pub const WALLET_SCHEMA_NAME: &'static str = "bdk_wallet"; /// Name of table to store wallet descriptors and network. pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet"; + /// Name of table to store broadcast queue txids. + pub const WALLET_BROADCAST_QUEUE_TABLE_NAME: &'static str = "bdk_wallet_broadcast_queue"; /// Get v0 sqlite [ChangeSet] schema pub fn schema_v0() -> alloc::string::String { @@ -177,12 +187,23 @@ impl ChangeSet { ) } + /// Get v1 sqlite [`ChangeSet`] schema. + pub fn schema_v1() -> alloc::string::String { + format!( + "CREATE TABLE {} ( \ + id INTEGER PRIMARY KEY AUTOINCREMENT, + txid TEXT NOT NULL UNIQUE \ + ) STRICT;", + Self::WALLET_BROADCAST_QUEUE_TABLE_NAME, + ) + } + /// Initialize sqlite tables for wallet tables. pub fn init_sqlite_tables(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result<()> { crate::rusqlite_impl::migrate_schema( db_tx, Self::WALLET_SCHEMA_NAME, - &[&Self::schema_v0()], + &[&Self::schema_v0(), &Self::schema_v1()], )?; bdk_chain::local_chain::ChangeSet::init_sqlite_tables(db_tx)?; @@ -220,6 +241,19 @@ impl ChangeSet { changeset.network = network.map(Impl::into_inner); } + let mut queue_statement = db_tx.prepare(&format!( + "SELECT txid FROM {} ORDER BY id ASC", + Self::WALLET_BROADCAST_QUEUE_TABLE_NAME + ))?; + let row_iter = queue_statement.query_map([], |row| row.get::<_, Impl>("txid"))?; + for row in row_iter { + let Impl(txid) = row?; + changeset + .broadcast_queue + .mutations + .push(broadcast_queue::Mutation::Push(txid)); + } + changeset.local_chain = local_chain::ChangeSet::from_sqlite(db_tx)?; changeset.tx_graph = tx_graph::ChangeSet::<_>::from_sqlite(db_tx)?; changeset.indexer = keychain_txout::ChangeSet::from_sqlite(db_tx)?; @@ -268,6 +302,25 @@ impl ChangeSet { })?; } + let mut queue_insert_statement = db_tx.prepare_cached(&format!( + "INSERT OR IGNORE INTO {}(txid) VALUES(:txid)", + Self::WALLET_BROADCAST_QUEUE_TABLE_NAME + ))?; + let mut queue_remove_statement = db_tx.prepare_cached(&format!( + "DELETE FROM {} WHERE txid=:txid", + Self::WALLET_BROADCAST_QUEUE_TABLE_NAME + ))?; + for mutation in &self.broadcast_queue.mutations { + match mutation { + broadcast_queue::Mutation::Push(txid) => { + queue_insert_statement.execute(named_params! { ":txid": Impl(*txid) })?; + } + broadcast_queue::Mutation::Remove(txid) => { + queue_remove_statement.execute(named_params! { ":txid": Impl(*txid) })?; + } + } + } + self.local_chain.persist_to_sqlite(db_tx)?; self.tx_graph.persist_to_sqlite(db_tx)?; self.indexer.persist_to_sqlite(db_tx)?; @@ -311,3 +364,12 @@ impl From for ChangeSet { } } } + +impl From for ChangeSet { + fn from(broadcast_queue: broadcast_queue::ChangeSet) -> Self { + Self { + broadcast_queue, + ..Default::default() + } + } +} diff --git a/wallet/src/wallet/coin_selection.rs b/wallet/src/wallet/coin_selection.rs index 57ed5329..a2fc4fe7 100644 --- a/wallet/src/wallet/coin_selection.rs +++ b/wallet/src/wallet/coin_selection.rs @@ -801,6 +801,7 @@ mod test { is_spent: false, derivation_index: 42, chain_position, + needs_broadcast: false, }), } } @@ -856,6 +857,7 @@ mod test { last_seen: Some(1), } }, + needs_broadcast: false, }), }); } @@ -883,6 +885,7 @@ mod test { first_seen: Some(1), last_seen: Some(1), }, + needs_broadcast: false, }), }) .collect() diff --git a/wallet/src/wallet/mod.rs b/wallet/src/wallet/mod.rs index be3773d3..c9aae4fa 100644 --- a/wallet/src/wallet/mod.rs +++ b/wallet/src/wallet/mod.rs @@ -22,7 +22,6 @@ use alloc::{ use core::{cmp::Ordering, fmt, mem, ops::Deref}; use bdk_chain::{ - indexed_tx_graph, indexer::keychain_txout::KeychainTxOutIndex, local_chain::{ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain}, spk_client::{ @@ -49,6 +48,7 @@ use miniscript::{ }; use rand_core::RngCore; +pub mod broadcast_queue; mod changeset; pub mod coin_selection; pub mod error; @@ -77,6 +77,7 @@ use crate::wallet::{ // re-exports pub use bdk_chain::Balance; +pub use broadcast_queue::BroadcastQueue; pub use changeset::ChangeSet; pub use params::*; pub use persisted::*; @@ -106,6 +107,7 @@ pub struct Wallet { change_signers: Arc, chain: LocalChain, indexed_graph: IndexedTxGraph>, + broadcast_queue: BroadcastQueue, stage: ChangeSet, network: Network, secp: SecpCtx, @@ -450,12 +452,16 @@ impl Wallet { let indexed_graph = IndexedTxGraph::new(index); let indexed_graph_changeset = indexed_graph.initial_changeset(); + let broadcast_queue = BroadcastQueue::default(); + let broadcast_queue_changeset = broadcast_queue.initial_changeset(); + let stage = ChangeSet { descriptor, change_descriptor, local_chain: chain_changeset, tx_graph: indexed_graph_changeset.tx_graph, indexer: indexed_graph_changeset.indexer, + broadcast_queue: broadcast_queue_changeset, network: Some(network), }; @@ -465,6 +471,7 @@ impl Wallet { network, chain, indexed_graph, + broadcast_queue, stage, secp, }) @@ -654,6 +661,8 @@ impl Wallet { indexed_graph.apply_changeset(changeset.indexer.into()); indexed_graph.apply_changeset(changeset.tx_graph.into()); + let broadcast_queue = BroadcastQueue::from_changeset(changeset.broadcast_queue); + let stage = ChangeSet::default(); Ok(Some(Wallet { @@ -661,6 +670,7 @@ impl Wallet { change_signers, chain, indexed_graph, + broadcast_queue, stage, network, secp, @@ -733,14 +743,14 @@ impl Wallet { /// ``` pub fn reveal_next_address(&mut self, keychain: KeychainKind) -> AddressInfo { let keychain = self.map_keychain(keychain); - let index = &mut self.indexed_graph.index; - let stage = &mut self.stage; - let ((index, spk), index_changeset) = index + let ((index, spk), index_changeset) = self + .indexed_graph + .index .reveal_next_spk(keychain) .expect("keychain must exist"); - stage.merge(index_changeset.into()); + self.stage_changes(index_changeset); AddressInfo { index, @@ -771,7 +781,7 @@ impl Wallet { .reveal_to_target(keychain, index) .expect("keychain must exist"); - self.stage.merge(index_changeset.into()); + self.stage_changes(index_changeset); spks.into_iter().map(move |(index, spk)| AddressInfo { index, @@ -797,8 +807,7 @@ impl Wallet { .next_unused_spk(keychain) .expect("keychain must exist"); - self.stage - .merge(indexed_tx_graph::ChangeSet::from(index_changeset).into()); + self.stage_changes(index_changeset); AddressInfo { index, @@ -859,17 +868,32 @@ impl Wallet { self.indexed_graph.index.index_of_spk(spk).cloned() } - /// Return the list of unspent outputs of this wallet + /// Return the list of unspent outputs of this wallet. pub fn list_unspent(&self) -> impl Iterator + '_ { + self._list_unspent(CanonicalizationParams::default()) + } + + /// List unspent outputs (UTXOs) of this wallet assuming that unbroadcasted transactions are + /// canonical. + /// + /// TODO: Better docs here. + pub fn list_unspent_with_unbroadcasted(&self) -> impl Iterator + '_ { + self._list_unspent(self.include_unbroadcasted_canonicalization_params()) + } + + fn _list_unspent( + &self, + params: CanonicalizationParams, + ) -> impl Iterator + '_ { self.indexed_graph .graph() .filter_chain_unspents( &self.chain, self.chain.tip().block_id(), - CanonicalizationParams::default(), + params, self.indexed_graph.index.outpoints().iter().cloned(), ) - .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo)) + .map(|((k, i), full_txo)| new_local_utxo(&self.broadcast_queue, k, i, full_txo)) } /// Get the [`TxDetails`] of a wallet transaction. @@ -911,7 +935,7 @@ impl Wallet { CanonicalizationParams::default(), self.indexed_graph.index.outpoints().iter().cloned(), ) - .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo)) + .map(|((k, i), full_txo)| new_local_utxo(&self.broadcast_queue, k, i, full_txo)) } /// Get all the checkpoints the wallet is currently storing indexed by height. @@ -955,17 +979,29 @@ impl Wallet { /// Returns the utxo owned by this wallet corresponding to `outpoint` if it exists in the /// wallet's database. - pub fn get_utxo(&self, op: OutPoint) -> Option { + pub fn get_utxo(&self, outpoint: OutPoint) -> Option { + self._get_utxo(CanonicalizationParams::default(), outpoint) + } + + /// Returns the utxo owned by this wallet corresponding to `outpoint`. + pub fn get_utxo_include_unbroadcasted(&self, outpoint: OutPoint) -> Option { + self._get_utxo( + self.include_unbroadcasted_canonicalization_params(), + outpoint, + ) + } + + fn _get_utxo(&self, params: CanonicalizationParams, op: OutPoint) -> Option { let ((keychain, index), _) = self.indexed_graph.index.txout(op)?; self.indexed_graph .graph() .filter_chain_unspents( &self.chain, self.chain.tip().block_id(), - CanonicalizationParams::default(), + params, core::iter::once(((), op)), ) - .map(|(_, full_txo)| new_local_utxo(keychain, index, full_txo)) + .map(|(_, full_txo)| new_local_utxo(&self.broadcast_queue, keychain, index, full_txo)) .next() } @@ -988,7 +1024,7 @@ impl Wallet { /// [`list_output`]: Self::list_output pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) { let additions = self.indexed_graph.insert_txout(outpoint, txout); - self.stage.merge(additions.into()); + self.stage_changes(additions); } /// Calculates the fee of a given transaction. Returns [`Amount::ZERO`] if `tx` is a coinbase @@ -1149,9 +1185,13 @@ impl Wallet { /// Iterate over relevant and canonical transactions in the wallet. /// - /// A transaction is relevant when it spends from or spends to at least one tracked output. A - /// transaction is canonical when it is confirmed in the best chain, or does not conflict - /// with any transaction confirmed in the best chain. + /// A transaction is relevant when it spends from or spends to at least one tracked output. + /// + /// A transaction is canonical when either: + /// * It is confirmed in the best chain and does not conflict with a transaction in the + /// broadcast queue. + /// * It seen in the mempool, is not evicted and does not conflict with a transaction that is + /// confirmed or seen later in the mempool. /// /// To iterate over all transactions, including those that are irrelevant and not canonical, use /// [`TxGraph::full_txs`]. @@ -1198,10 +1238,19 @@ impl Wallet { /// Return the balance, separated into available, trusted-pending, untrusted-pending and /// immature values. pub fn balance(&self) -> Balance { + self._balance(CanonicalizationParams::default()) + } + + /// Return the balance that includes unbroadcasted transactions. + pub fn balance_with_unbroadcasted(&self) -> Balance { + self._balance(self.include_unbroadcasted_canonicalization_params()) + } + + fn _balance(&self, params: CanonicalizationParams) -> Balance { self.indexed_graph.graph().balance( &self.chain, self.chain.tip().block_id(), - CanonicalizationParams::default(), + params, self.indexed_graph.index.outpoints().iter().cloned(), |&(k, _), _| k == KeychainKind::Internal, ) @@ -1624,7 +1673,7 @@ impl Wallet { if let Some((_, index_changeset)) = self.indexed_graph.index.reveal_to_target(keychain, index) { - self.stage.merge(index_changeset.into()); + self.stage_changes(index_changeset); self.mark_used(keychain, index); } } @@ -1681,7 +1730,11 @@ impl Wallet { let txout_index = &self.indexed_graph.index; let chain_tip = self.chain.tip().block_id(); let chain_positions = graph - .list_canonical_txs(&self.chain, chain_tip, CanonicalizationParams::default()) + .list_canonical_txs( + &self.chain, + chain_tip, + self.include_unbroadcasted_canonicalization_params(), + ) .map(|canon_tx| (canon_tx.tx_node.txid, canon_tx.chain_position)) .collect::>(); @@ -1754,6 +1807,9 @@ impl Wallet { is_spent: true, derivation_index, chain_position, + needs_broadcast: self + .broadcast_queue + .contains(txin.previous_output.txid), }), }, ), @@ -1954,7 +2010,11 @@ impl Wallet { let confirmation_heights = self .indexed_graph .graph() - .list_canonical_txs(&self.chain, chain_tip, CanonicalizationParams::default()) + .list_canonical_txs( + &self.chain, + chain_tip, + self.include_unbroadcasted_canonicalization_params(), + ) .filter(|canon_tx| prev_txids.contains(&canon_tx.tx_node.txid)) // This is for a small performance gain. Although `.filter` filters out excess txs, it // will still consume the internal `CanonicalIter` entirely. Having a `.take` here @@ -2107,14 +2167,14 @@ impl Wallet { .filter_chain_unspents( &self.chain, self.chain.tip().block_id(), - CanonicalizationParams::default(), + self.include_unbroadcasted_canonicalization_params(), self.indexed_graph.index.outpoints().iter().cloned(), ) // only create LocalOutput if UTxO is mature .filter_map(move |((k, i), full_txo)| { full_txo .is_mature(current_height) - .then(|| new_local_utxo(k, i, full_txo)) + .then(|| new_local_utxo(&self.broadcast_queue, k, i, full_txo)) }) // only process UTxOs not selected manually, they will be considered later in the // chain NOTE: this avoid UTxOs in both required and optional list @@ -2335,7 +2395,7 @@ impl Wallet { .reveal_to_target_multi(&update.last_active_indices); changeset.merge(index_changeset.into()); changeset.merge(self.indexed_graph.apply_update(update.tx_update).into()); - self.stage.merge(changeset); + self.stage_changes(changeset); Ok(()) } @@ -2431,7 +2491,7 @@ impl Wallet { .apply_block_relevant(block, height) .into(), ); - self.stage.merge(changeset); + self.stage_changes(changeset); Ok(()) } @@ -2454,7 +2514,7 @@ impl Wallet { let indexed_graph_changeset = self .indexed_graph .batch_insert_relevant_unconfirmed(unconfirmed_txs); - self.stage.merge(indexed_graph_changeset.into()); + self.stage_changes(indexed_graph_changeset); } /// Apply evictions of the given txids with their associated timestamps. @@ -2472,7 +2532,7 @@ impl Wallet { .list_canonical_txs( chain, chain.tip().block_id(), - CanonicalizationParams::default(), + self.include_unbroadcasted_canonicalization_params(), ) .map(|c| c.tx_node.txid) .collect(); @@ -2499,6 +2559,121 @@ impl Wallet { } } +/// Methods to interact with the broadcast queue. +impl Wallet { + pub(crate) fn stage_changes>(&mut self, changeset: C) { + let changeset: ChangeSet = changeset.into(); + if !changeset.tx_graph.is_empty() { + self.stage.merge( + self.broadcast_queue + .filter_from_graph_changeset(&changeset.tx_graph) + .into(), + ); + } + self.stage.merge(changeset); + } + + /// [`CanonicalizationParams`] which includes transactions in the broadcast queue. + pub fn include_unbroadcasted_canonicalization_params(&self) -> CanonicalizationParams { + CanonicalizationParams { + assume_canonical: self.broadcast_queue.txids().collect(), + } + } + + /// Add a transaction to the broadcast queue transaction for broadcast. + /// + /// Unsigned transactions can be inserted and later reinserted when signed. + /// + /// Conflicts of the inserted transaction will be removed from the broadcast queue. + pub fn add_tx_to_broadcast_queue>>(&mut self, tx: T) { + let tx: Arc = tx.into(); + let txid = tx.compute_txid(); + + self.stage.merge(self.indexed_graph.insert_tx(tx).into()); + + // TODO: Figure out whether we should displace conflicting txs in the queue on insertion. + // let queue_changeset = self.broadcast_queue.push(txid); + let queue_changeset = self + .broadcast_queue + .push_and_displace_conflicts(self.indexed_graph.graph(), txid); + + self.stage.merge(queue_changeset.into()); + } + + /// Remove transaction from the broadcast queue. + /// + /// This will also remove all descendants of this transaction in the broadcast queue. + /// + /// Returns `true` if the transaction is successfully removed. + pub fn remove_tx_from_broadcast_queue(&mut self, txid: Txid) -> bool { + // // TODO: Figure out whether we should displace descendants of removed txs as well. + let queue_changeset = self + .broadcast_queue + .remove_and_displace_dependants(self.indexed_graph.graph(), txid); + // let queue_changeset = self.broadcast_queue.remove(txid); + let is_removed = !queue_changeset.is_empty(); + self.stage.merge(queue_changeset.into()); + is_removed + } + + /// Broadcast queue. + /// + /// Elements are in broadcast order. + pub fn broadcast_queue(&self) -> impl Iterator> + '_ { + self.broadcast_queue.txids().filter_map(|txid| { + let tx_opt = self.indexed_graph.graph().get_tx(txid); + debug_assert!( + tx_opt.is_some(), + "A txid in the broadcast queue must exist in the graph" + ); + tx_opt + }) + } + + /// Number of transactions in the broadcast queue. + pub fn broadcast_queue_len(&self) -> usize { + self.broadcast_queue.txids().len() + } + + /// Mark a transaction as successfully broadcasted. + pub fn mark_tx_as_broadcasted_at(&mut self, txid: Txid, at: u64) { + let mut update = TxUpdate::default(); + update.seen_ats.insert((txid, at)); + let changeset = self.indexed_graph.apply_update(update); + self.stage_changes(changeset); + } + + /// Get `tx` descendants that are in the broadcast queue. + /// + /// This can be used to check which transactions will no longer be broadcastable if `tx` is + /// becomes unavailable. + pub fn descendants_in_broadcast_queue( + &self, + tx: &Transaction, + ) -> impl Iterator> + '_ { + self.indexed_graph + .graph() + .walk_descendants(tx.compute_txid(), |_, txid| Some(txid)) + .filter(|&txid| self.broadcast_queue.contains(txid)) + .filter_map(|txid| self.indexed_graph.graph().get_tx(txid)) + } + + /// Get `tx` conflicts that are in the broadcast queue. + /// + /// This can be used to check which transactions will be removed from the broadcast queue if + /// `tx` is inserted. + pub fn conflicts_in_broadcast_queue<'t>( + &'t self, + tx: &'t Transaction, + ) -> impl Iterator> + 't { + self.indexed_graph + .graph() + .walk_conflicts(tx, |_, txid| Some(txid)) + .filter(|&txid| self.broadcast_queue.contains(txid)) + .filter_map(|txid| self.indexed_graph.graph().get_tx(txid)) + } +} + /// Methods to construct sync/full-scan requests for spk-based chain sources. impl Wallet { /// Create a partial [`SyncRequest`] for all revealed spks at `start_time`. @@ -2610,6 +2785,7 @@ where } fn new_local_utxo( + broadcast_queue: &BroadcastQueue, keychain: KeychainKind, derivation_index: u32, full_txo: FullTxOut, @@ -2621,6 +2797,7 @@ fn new_local_utxo( chain_position: full_txo.chain_position, keychain, derivation_index, + needs_broadcast: broadcast_queue.contains(full_txo.outpoint.txid), } } diff --git a/wallet/src/wallet/tx_builder.rs b/wallet/src/wallet/tx_builder.rs index 949cb792..ca49c9e0 100644 --- a/wallet/src/wallet/tx_builder.rs +++ b/wallet/src/wallet/tx_builder.rs @@ -281,7 +281,7 @@ impl<'a, Cs> TxBuilder<'a, Cs> { .iter() .map(|outpoint| { self.wallet - .get_utxo(*outpoint) + .get_utxo_include_unbroadcasted(*outpoint) .ok_or(AddUtxoError::UnknownUtxo(*outpoint)) .map(|output| { ( @@ -1029,6 +1029,7 @@ mod test { last_seen: Some(1), }, derivation_index: 0, + needs_broadcast: false, }, LocalOutput { outpoint: OutPoint { @@ -1049,6 +1050,7 @@ mod test { transitively: None, }, derivation_index: 1, + needs_broadcast: false, }, ] } From a65eabe7e58e71048c6c79426ce97c3cf6cb897f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Fri, 6 Jun 2025 22:16:31 +1000 Subject: [PATCH 2/5] feat: Add `TxBuilder::exclude_unbroadcasted` If the caller wishes to avoid spending unbroadcasted UTXOs without double-spending any of the unbroadcasted transactions. --- wallet/src/wallet/tx_builder.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/wallet/src/wallet/tx_builder.rs b/wallet/src/wallet/tx_builder.rs index ca49c9e0..c5daddfc 100644 --- a/wallet/src/wallet/tx_builder.rs +++ b/wallet/src/wallet/tx_builder.rs @@ -459,6 +459,20 @@ impl<'a, Cs> TxBuilder<'a, Cs> { self } + /// Add unbroadcasted transactions to the unspendable list. + pub fn exclude_unbroadcasted(&mut self) -> &mut Self { + let unbroadcasted_ops = self.wallet.broadcast_queue().flat_map(|tx| { + let txid = tx.compute_txid(); + (0_u32..) + .take(tx.output.len()) + .map(move |vout| OutPoint::new(txid, vout)) + }); + for op in unbroadcasted_ops { + self.params.unspendable.insert(op); + } + self + } + /// Sign with a specific sig hash /// /// **Use this option very carefully** From 68c0505cf904c870e82aed2eb3206a540b1eee62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 26 Jun 2025 03:42:02 +0000 Subject: [PATCH 3/5] WIP: `IntentTracker` --- wallet/src/wallet/broadcast_queue.rs | 248 --------------- wallet/src/wallet/changeset.rs | 26 +- wallet/src/wallet/intent_tracker.rs | 432 ++++++++++++++++++++++++++ wallet/src/wallet/mod.rs | 444 ++++++++++++++++++++------- wallet/src/wallet/tx_builder.rs | 43 ++- wallet/tests/wallet.rs | 4 +- 6 files changed, 800 insertions(+), 397 deletions(-) delete mode 100644 wallet/src/wallet/broadcast_queue.rs create mode 100644 wallet/src/wallet/intent_tracker.rs diff --git a/wallet/src/wallet/broadcast_queue.rs b/wallet/src/wallet/broadcast_queue.rs deleted file mode 100644 index 2905c8be..00000000 --- a/wallet/src/wallet/broadcast_queue.rs +++ /dev/null @@ -1,248 +0,0 @@ -//! Unbroadcasted transaction queue. - -use alloc::vec::Vec; -use chain::tx_graph; -use chain::Anchor; -use chain::TxGraph; - -use crate::collections::HashSet; -use crate::collections::VecDeque; - -use bitcoin::Txid; -use chain::Merge; - -/// An ordered unbroadcasted list. -/// -/// It is ordered in case of RBF txs. -#[derive(Debug, Clone, Default)] -pub struct BroadcastQueue { - queue: VecDeque, - - /// Enforces that we do not have duplicates in `queue`. - dedup: HashSet, -} - -/// Represents a single mutation to [`BroadcastQueue`]. -#[derive(Debug, Clone, PartialEq, serde::Deserialize, serde::Serialize)] -pub enum Mutation { - /// A push to the back of the queue. - Push(Txid), - /// A removal from the queue. - Remove(Txid), -} - -/// A list of mutations made to [`BroadcastQueue`]. -#[must_use] -#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize, serde::Serialize)] -pub struct ChangeSet { - /// Mutations. - pub mutations: Vec, -} - -impl Merge for ChangeSet { - fn merge(&mut self, other: Self) { - self.mutations.extend(other.mutations); - } - - fn is_empty(&self) -> bool { - self.mutations.is_empty() - } -} - -impl BroadcastQueue { - /// Construct [`Unbroadcasted`] from the given `changeset`. - pub fn from_changeset(changeset: ChangeSet) -> Self { - let mut out = BroadcastQueue::default(); - out.apply_changeset(changeset); - out - } - - /// Apply the given `changeset`. - pub fn apply_changeset(&mut self, changeset: ChangeSet) { - for mutation in changeset.mutations { - match mutation { - Mutation::Push(txid) => self._push(txid), - Mutation::Remove(txid) => self._remove(txid), - }; - } - } - - /// Whether the `txid` exists in the queue. - pub fn contains(&self, txid: Txid) -> bool { - self.dedup.contains(&txid) - } - - /// Push a `txid` to the queue if it does not already exist. - /// - /// # Warning - /// - /// This does not get rid of conflicting transactions already in the queue. - pub fn push(&mut self, txid: Txid) -> ChangeSet { - let mut changeset = ChangeSet::default(); - if self._push(txid) { - changeset.mutations.push(Mutation::Push(txid)); - } - changeset - } - fn _push(&mut self, txid: Txid) -> bool { - if self.dedup.insert(txid) { - self.queue.push_back(txid); - return true; - } - false - } - - /// Push a `txid` to the broadcast queue (if it does not exist already) and displaces all - /// coflicting txids in the queue. - pub fn push_and_displace_conflicts(&mut self, tx_graph: &TxGraph, txid: Txid) -> ChangeSet - where - A: Anchor, - { - let mut changeset = ChangeSet::default(); - - let tx = match tx_graph.get_tx(txid) { - Some(tx) => tx, - None => { - debug_assert!( - !self.dedup.contains(&txid), - "Cannot have txid in queue which has no corresponding tx in graph" - ); - return changeset; - } - }; - - if self._push(txid) { - changeset.mutations.push(Mutation::Push(txid)); - - for txid in tx_graph.walk_conflicts(&tx, |_, conflict_txid| Some(conflict_txid)) { - if self._remove(txid) { - changeset.mutations.push(Mutation::Remove(txid)); - } - } - } - - changeset - } - - /// Returns the next `txid` of the queue to broadcast which has no dependencies to other - /// transactions in the queue. - pub fn next_to_broadcast(&self, tx_graph: &TxGraph) -> Option - where - A: Anchor, - { - self.queue.iter().copied().find(|&txid| { - let tx = match tx_graph.get_tx(txid) { - Some(tx) => tx, - None => return false, - }; - if tx - .input - .iter() - .any(|txin| self.dedup.contains(&txin.previous_output.txid)) - { - return false; - } - true - }) - } - - /// Returns unbroadcasted dependencies of the given `txid`. - /// - /// The returned `Vec` is in broadcast order. - pub fn unbroadcasted_dependencies(&self, tx_graph: &TxGraph, txid: Txid) -> Vec - where - A: Anchor, - { - let tx = match tx_graph.get_tx(txid) { - Some(tx) => tx, - None => return Vec::new(), - }; - let mut txs = tx_graph - .walk_ancestors(tx, |_depth, ancestor_tx| { - let ancestor_txid = ancestor_tx.compute_txid(); - if self.dedup.contains(&ancestor_txid) { - Some(ancestor_txid) - } else { - None - } - }) - .collect::>(); - txs.reverse(); - txs - } - - /// Untracks and removes a transaction from the broadcast queue. - /// - /// Transactions are automatically removed from the queue upon successful broadcast, so calling - /// this method directly is typically not required. - pub fn remove(&mut self, txid: Txid) -> ChangeSet { - let mut changeset = ChangeSet::default(); - if self._remove(txid) { - changeset.mutations.push(Mutation::Remove(txid)); - } - changeset - } - fn _remove(&mut self, txid: Txid) -> bool { - if self.dedup.remove(&txid) { - let i = (0..self.queue.len()) - .zip(self.queue.iter().copied()) - .find_map(|(i, queue_txid)| if queue_txid == txid { Some(i) } else { None }) - .expect("must exist in queue to exist in `queue`"); - let _removed = self.queue.remove(i); - debug_assert_eq!(_removed, Some(txid)); - return true; - } - false - } - - /// Untracks and removes a transaction and it's descendants from the broadcast queue. - pub fn remove_and_displace_dependants( - &mut self, - tx_graph: &TxGraph, - txid: Txid, - ) -> ChangeSet - where - A: Anchor, - { - let mut changeset = ChangeSet::default(); - - if self._remove(txid) { - changeset.mutations.push(Mutation::Remove(txid)); - for txid in tx_graph.walk_descendants(txid, |_depth, txid| Some(txid)) { - if self._remove(txid) { - changeset.mutations.push(Mutation::Remove(txid)); - } - } - } - changeset - } - - /// Untrack transactions that are given anchors and/or mempool timestamps. - pub fn filter_from_graph_changeset( - &mut self, - graph_changeset: &tx_graph::ChangeSet, - ) -> ChangeSet { - let mut changeset = ChangeSet::default(); - let s_txids = graph_changeset.last_seen.keys().copied(); - let a_txids = graph_changeset.anchors.iter().map(|(_, txid)| *txid); - let e_txids = graph_changeset.last_evicted.keys().copied(); - for txid in s_txids.chain(a_txids).chain(e_txids) { - changeset.merge(self.remove(txid)); - } - changeset - } - - /// Txids ordered by precedence. - /// - /// Transactions with greater precedence will appear later in this list. - pub fn txids(&self) -> impl ExactSizeIterator + '_ { - self.queue.iter().copied() - } - - /// Initial changeset. - pub fn initial_changeset(&self) -> ChangeSet { - ChangeSet { - mutations: self.queue.iter().copied().map(Mutation::Push).collect(), - } - } -} diff --git a/wallet/src/wallet/changeset.rs b/wallet/src/wallet/changeset.rs index 254a85a8..857fc4ce 100644 --- a/wallet/src/wallet/changeset.rs +++ b/wallet/src/wallet/changeset.rs @@ -5,7 +5,7 @@ use bitcoin::Txid; use miniscript::{Descriptor, DescriptorPublicKey}; use serde::{Deserialize, Serialize}; -use super::broadcast_queue; +use super::intent_tracker; type IndexedTxGraphChangeSet = indexed_tx_graph::ChangeSet; @@ -117,9 +117,9 @@ pub struct ChangeSet { pub tx_graph: tx_graph::ChangeSet, /// Changes to [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex). pub indexer: keychain_txout::ChangeSet, - /// Changes to [`BroadcastQueue`](broadcast_queue::BroadcastQueue). + /// Changes to [`IntentTracker`](intent_tracker::IntentTracker). #[serde(default)] - pub broadcast_queue: broadcast_queue::ChangeSet, + pub intent_tracker: intent_tracker::ChangeSet, } impl Merge for ChangeSet { @@ -151,7 +151,7 @@ impl Merge for ChangeSet { Merge::merge(&mut self.local_chain, other.local_chain); Merge::merge(&mut self.tx_graph, other.tx_graph); Merge::merge(&mut self.indexer, other.indexer); - Merge::merge(&mut self.broadcast_queue, other.broadcast_queue); + Merge::merge(&mut self.intent_tracker, other.intent_tracker); } fn is_empty(&self) -> bool { @@ -161,7 +161,7 @@ impl Merge for ChangeSet { && self.local_chain.is_empty() && self.tx_graph.is_empty() && self.indexer.is_empty() - && self.broadcast_queue.is_empty() + && self.intent_tracker.is_empty() } } @@ -249,9 +249,9 @@ impl ChangeSet { for row in row_iter { let Impl(txid) = row?; changeset - .broadcast_queue + .intent_tracker .mutations - .push(broadcast_queue::Mutation::Push(txid)); + .push(intent_tracker::Mutation::Push(txid)); } changeset.local_chain = local_chain::ChangeSet::from_sqlite(db_tx)?; @@ -310,12 +310,12 @@ impl ChangeSet { "DELETE FROM {} WHERE txid=:txid", Self::WALLET_BROADCAST_QUEUE_TABLE_NAME ))?; - for mutation in &self.broadcast_queue.mutations { + for mutation in &self.intent_tracker.mutations { match mutation { - broadcast_queue::Mutation::Push(txid) => { + intent_tracker::Mutation::Push(txid) => { queue_insert_statement.execute(named_params! { ":txid": Impl(*txid) })?; } - broadcast_queue::Mutation::Remove(txid) => { + intent_tracker::Mutation::Remove(txid) => { queue_remove_statement.execute(named_params! { ":txid": Impl(*txid) })?; } } @@ -365,10 +365,10 @@ impl From for ChangeSet { } } -impl From for ChangeSet { - fn from(broadcast_queue: broadcast_queue::ChangeSet) -> Self { +impl From for ChangeSet { + fn from(broadcast_queue: intent_tracker::ChangeSet) -> Self { Self { - broadcast_queue, + intent_tracker: broadcast_queue, ..Default::default() } } diff --git a/wallet/src/wallet/intent_tracker.rs b/wallet/src/wallet/intent_tracker.rs new file mode 100644 index 00000000..2b053a48 --- /dev/null +++ b/wallet/src/wallet/intent_tracker.rs @@ -0,0 +1,432 @@ +//! Unbroadcasted transaction queue. + +use alloc::sync::Arc; + +use alloc::vec::Vec; +use bitcoin::OutPoint; +use bitcoin::Transaction; +use chain::tx_graph; +use chain::Anchor; +use chain::CanonicalIter; +use chain::CanonicalReason; +use chain::ChainOracle; +use chain::ChainPosition; +use chain::TxGraph; + +use crate::collections::BTreeMap; +use crate::collections::HashMap; +use crate::collections::HashSet; +use crate::collections::VecDeque; + +use bdk_chain::bdk_core::Merge; +use bitcoin::Txid; + +#[derive(Debug)] +pub struct CanonicalView { + pub txs: HashMap, CanonicalReason)>, + pub spends: HashMap, +} + +impl Default for CanonicalView { + fn default() -> Self { + Self { + txs: HashMap::new(), + spends: HashMap::new(), + } + } +} + +impl CanonicalView { + pub fn from_iter(iter: CanonicalIter<'_, A, C>) -> Result + where + A: Anchor, + C: ChainOracle, + { + let mut view = Self::default(); + for r in iter { + let (txid, tx, reason) = r?; + for txin in &tx.input { + view.spends.insert(txin.previous_output, txid); + } + view.txs.insert(txid, (tx, reason)); + } + Ok(view) + } + + pub fn spend(&self, op: OutPoint) -> Option<(Txid, Arc, &CanonicalReason)> { + let txid = self.spends.get(&op)?; + let (tx, reason) = self.txs.get(txid)?; + Some((*txid, tx.clone(), reason)) + } +} + +/// Indicates whether a transaction was observed in the network. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NetworkSeen { + /// The transaction was previously seen (e.g., in mempool or on-chain). + Seen, + /// The transaction was never seen in the network. + NeverSeen, +} + +impl NetworkSeen { + /// Whether the transaction was once observed in the network. + pub fn was_seen(self) -> bool { + match self { + NetworkSeen::Seen => true, + NetworkSeen::NeverSeen => false, + } + } +} + +/// Represents an input (spend) that depends on non-canonical transaction ancestry. +/// +/// This struct models an input that attempts to spend an output via a transaction path +/// that is not part of the canonical network view (e.g., evicted, conflicted, or unknown). +#[derive(Debug, Clone, Default)] +pub struct UncanonicalSpendInfo { + /// Non-canonical ancestor transactions reachable from this input. + /// + /// Each entry maps an ancestor `Txid` to its observed status in the network. + /// - `Seen` indicates the transaction was previously seen but is no longer part of the + /// canonical view. + /// - `NeverSeen` indicates it was never observed (e.g., not yet broadcast). + pub uncanonical_ancestors: BTreeMap, + + /// Canonical transactions that conflict with this spend. + /// + /// This may be a direct conflict or a conflict with one of the `uncanonical_ancestors`. + /// The value is a tuple of (conflict distance, chain position). + /// + /// Descendants of conflicts are also conflicts. These transactions will have the same distance + /// value as their conflicting parent. + pub conflicting_txs: BTreeMap)>, +} + +/// Tracked and uncanonical transaction. +#[derive(Debug, Clone)] +pub struct UncanonicalTx { + /// Txid. + pub txid: Txid, + /// The uncanonical transaction. + pub tx: Arc, + /// Whether the transaction was one seen by the network. + pub network_seen: NetworkSeen, + /// Spends, identified by prevout, which are uncanonical. + pub uncanonical_spends: BTreeMap>, +} + +impl UncanonicalTx { + /// Whether the transaction was once observed in the network. + /// + /// Assuming that the chain-source does not lie, we can safely remove transactions that + pub fn was_seen(&self) -> bool { + self.network_seen.was_seen() + } + + /// A transaction is safe to untrack if it is network uncanonical and we can gurarantee that + /// it will not become canonical again given that there is no reorg of depth greater than + /// `assume_final_depth`. + /// + /// `assume_final_depth` of `0` means that unconfirmed (mempool) transactions are assumed to be + /// final. + /// + /// This may return false-negatives if the wallet is unaware of conflicts. I.e. if purely + /// syncing with Electrum (TODO: @evanlinjin Expand on this). + pub fn is_safe_to_untrack(&self, tip_height: u32, assume_final_depth: u32) -> bool { + self.conflicts().any(|(_, pos)| { + let depth = match pos { + ChainPosition::Confirmed { anchor, .. } => { + tip_height.saturating_sub(anchor.confirmation_height_upper_bound()) + } + ChainPosition::Unconfirmed { .. } => 0, + }; + depth >= assume_final_depth + }) + } + + /// Iterate over transactions that are currently canonical in the network, but would be rendered + /// uncanonical if this transaction were to become canonical. + /// + /// This includes both direct and indirect conflicts, such as any transaction that relies on + /// conflicting ancestry. + pub fn conflicts(&self) -> impl Iterator)> { + self.uncanonical_spends + .values() + .flat_map(|spend| &spend.conflicting_txs) + .map(|(&txid, (_, pos))| (txid, pos)) + .filter({ + let mut dedup = HashSet::::new(); + move |(txid, _)| dedup.insert(*txid) + }) + } + + pub fn confirmed_conflicts(&self) -> impl Iterator { + self.conflicts().filter_map(|(txid, pos)| match pos { + ChainPosition::Confirmed { anchor, .. } => Some((txid, anchor)), + ChainPosition::Unconfirmed { .. } => None, + }) + } + + pub fn unconfirmed_conflicts(&self) -> impl Iterator + '_ { + self.conflicts().filter_map(|(txid, pos)| match pos { + ChainPosition::Confirmed { .. } => None, + ChainPosition::Unconfirmed { .. } => Some(txid), + }) + } + + /// Missing ancestors. + /// + /// Either evicted from mempool, or never successfully broadcast in the first place. + pub fn missing_parents(&self) -> impl Iterator + '_ { + self.uncanonical_spends + .values() + .flat_map(|spend_info| &spend_info.uncanonical_ancestors) + .map(|(&txid, &network_seen)| (txid, network_seen)) + } + + pub fn contains_conflicts(&self) -> bool { + self.conflicts().next().is_some() + } + + pub fn contains_confirmed_conflicts(&self) -> bool { + self.confirmed_conflicts().next().is_some() + } +} + +/// An ordered unbroadcasted staging area. +/// +/// It is ordered in case of RBF txs. +#[derive(Debug, Clone, Default)] +pub struct CanonicalizationTracker { + /// Tracks the order that transactions are added. + order: VecDeque, + + /// Enforces that we do not have duplicates in `queue`. + dedup: HashSet, +} + +/// Represents a single mutation to [`BroadcastQueue`]. +#[derive(Debug, Clone, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum Mutation { + /// A push to the back of the queue. + Push(Txid), + /// A removal from the queue. + Remove(Txid), +} + +/// A list of mutations made to [`BroadcastQueue`]. +#[must_use] +#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize, serde::Serialize)] +pub struct ChangeSet { + /// Mutations. + pub mutations: Vec, +} + +impl Merge for ChangeSet { + fn merge(&mut self, other: Self) { + self.mutations.extend(other.mutations); + } + + fn is_empty(&self) -> bool { + self.mutations.is_empty() + } +} + +impl CanonicalizationTracker { + /// Construct [`Unbroadcasted`] from the given `changeset`. + pub fn from_changeset(changeset: ChangeSet) -> Self { + let mut out = CanonicalizationTracker::default(); + out.apply_changeset(changeset); + out + } + + /// Apply the given `changeset`. + pub fn apply_changeset(&mut self, changeset: ChangeSet) { + for mutation in changeset.mutations { + match mutation { + Mutation::Push(txid) => self._push(txid), + Mutation::Remove(txid) => self._remove(txid), + }; + } + } + + /// Whether the `txid` exists in the queue. + pub fn contains(&self, txid: Txid) -> bool { + self.dedup.contains(&txid) + } + + /// Push a `txid` to the queue if it does not already exist. + /// + /// # Warning + /// + /// This does not get rid of conflicting transactions already in the queue. + pub fn push(&mut self, txid: Txid) -> ChangeSet { + let mut changeset = ChangeSet::default(); + if self._push(txid) { + changeset.mutations.push(Mutation::Push(txid)); + } + changeset + } + fn _push(&mut self, txid: Txid) -> bool { + if self.dedup.insert(txid) { + self.order.push_back(txid); + return true; + } + false + } + + /// Push a `txid` to the broadcast queue (if it does not exist already) and displaces all + /// coflicting txids in the queue. + pub fn push_and_displace_conflicts(&mut self, tx_graph: &TxGraph, txid: Txid) -> ChangeSet + where + A: Anchor, + { + let mut changeset = ChangeSet::default(); + + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => { + debug_assert!( + !self.dedup.contains(&txid), + "Cannot have txid in queue which has no corresponding tx in graph" + ); + return changeset; + } + }; + + if self._push(txid) { + changeset.mutations.push(Mutation::Push(txid)); + + for txid in tx_graph.walk_conflicts(&tx, |_, conflict_txid| Some(conflict_txid)) { + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + } + } + + changeset + } + + /// Returns the next `txid` of the queue to broadcast which has no dependencies to other + /// transactions in the queue. + pub fn next_to_broadcast(&self, tx_graph: &TxGraph) -> Option + where + A: Anchor, + { + self.order.iter().copied().find(|&txid| { + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => return false, + }; + if tx + .input + .iter() + .any(|txin| self.dedup.contains(&txin.previous_output.txid)) + { + return false; + } + true + }) + } + + /// Returns unbroadcasted dependencies of the given `txid`. + /// + /// The returned `Vec` is in broadcast order. + pub fn unbroadcasted_dependencies(&self, tx_graph: &TxGraph, txid: Txid) -> Vec + where + A: Anchor, + { + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => return Vec::new(), + }; + let mut txs = tx_graph + .walk_ancestors(tx, |_depth, ancestor_tx| { + let ancestor_txid = ancestor_tx.compute_txid(); + if self.dedup.contains(&ancestor_txid) { + Some(ancestor_txid) + } else { + None + } + }) + .collect::>(); + txs.reverse(); + txs + } + + /// Untracks and removes a transaction from the broadcast queue. + /// + /// Transactions are automatically removed from the queue upon successful broadcast, so calling + /// this method directly is typically not required. + pub fn remove(&mut self, txid: Txid) -> ChangeSet { + let mut changeset = ChangeSet::default(); + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + changeset + } + fn _remove(&mut self, txid: Txid) -> bool { + if self.dedup.remove(&txid) { + let i = (0..self.order.len()) + .zip(self.order.iter().copied()) + .find_map(|(i, queue_txid)| if queue_txid == txid { Some(i) } else { None }) + .expect("must exist in queue to exist in `queue`"); + let _removed = self.order.remove(i); + debug_assert_eq!(_removed, Some(txid)); + return true; + } + false + } + + /// Untracks and removes a transaction and it's descendants from the broadcast queue. + pub fn remove_and_displace_dependants( + &mut self, + tx_graph: &TxGraph, + txid: Txid, + ) -> ChangeSet + where + A: Anchor, + { + let mut changeset = ChangeSet::default(); + + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + for txid in tx_graph.walk_descendants(txid, |_depth, txid| Some(txid)) { + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + } + } + changeset + } + + /// Untrack transactions that are given anchors and/or mempool timestamps. + pub fn filter_from_graph_changeset( + &mut self, + graph_changeset: &tx_graph::ChangeSet, + ) -> ChangeSet { + let mut changeset = ChangeSet::default(); + let s_txids = graph_changeset.last_seen.keys().copied(); + let a_txids = graph_changeset.anchors.iter().map(|(_, txid)| *txid); + let e_txids = graph_changeset.last_evicted.keys().copied(); + for txid in s_txids.chain(a_txids).chain(e_txids) { + changeset.merge(self.remove(txid)); + } + changeset + } + + /// Txids ordered by precedence. + /// + /// Transactions with greater precedence will appear later in this list. + pub fn txids(&self) -> impl ExactSizeIterator + '_ { + self.order.iter().copied() + } + + /// Initial changeset. + pub fn initial_changeset(&self) -> ChangeSet { + ChangeSet { + mutations: self.order.iter().copied().map(Mutation::Push).collect(), + } + } +} diff --git a/wallet/src/wallet/mod.rs b/wallet/src/wallet/mod.rs index c9aae4fa..183ad915 100644 --- a/wallet/src/wallet/mod.rs +++ b/wallet/src/wallet/mod.rs @@ -19,7 +19,10 @@ use alloc::{ sync::Arc, vec::Vec, }; +use chain::{CanonicalReason, ChainOracle, ObservedIn}; use core::{cmp::Ordering, fmt, mem, ops::Deref}; +use intent_tracker::{CanonicalView, NetworkSeen, UncanonicalSpendInfo, UncanonicalTx}; +use std::collections::btree_map::Entry; use bdk_chain::{ indexer::keychain_txout::KeychainTxOutIndex, @@ -48,11 +51,11 @@ use miniscript::{ }; use rand_core::RngCore; -pub mod broadcast_queue; mod changeset; pub mod coin_selection; pub mod error; pub mod export; +pub mod intent_tracker; mod params; mod persisted; pub mod signer; @@ -77,8 +80,8 @@ use crate::wallet::{ // re-exports pub use bdk_chain::Balance; -pub use broadcast_queue::BroadcastQueue; pub use changeset::ChangeSet; +pub use intent_tracker::CanonicalizationTracker; pub use params::*; pub use persisted::*; pub use utils::IsDust; @@ -107,7 +110,9 @@ pub struct Wallet { change_signers: Arc, chain: LocalChain, indexed_graph: IndexedTxGraph>, - broadcast_queue: BroadcastQueue, + intent_tracker: CanonicalizationTracker, + network_view: CanonicalView, + intent_view: CanonicalView, stage: ChangeSet, network: Network, secp: SecpCtx, @@ -452,8 +457,8 @@ impl Wallet { let indexed_graph = IndexedTxGraph::new(index); let indexed_graph_changeset = indexed_graph.initial_changeset(); - let broadcast_queue = BroadcastQueue::default(); - let broadcast_queue_changeset = broadcast_queue.initial_changeset(); + let intent_tracker = CanonicalizationTracker::default(); + let intent_tracker_changeset = intent_tracker.initial_changeset(); let stage = ChangeSet { descriptor, @@ -461,20 +466,25 @@ impl Wallet { local_chain: chain_changeset, tx_graph: indexed_graph_changeset.tx_graph, indexer: indexed_graph_changeset.indexer, - broadcast_queue: broadcast_queue_changeset, + intent_tracker: intent_tracker_changeset, network: Some(network), }; - Ok(Wallet { + let mut wallet = Wallet { signers, change_signers, network, chain, indexed_graph, - broadcast_queue, + intent_tracker, stage, secp, - }) + network_view: CanonicalView::default(), + intent_view: CanonicalView::default(), + }; + wallet.update_views(); + + Ok(wallet) } /// Build [`Wallet`] by loading from persistence or [`ChangeSet`]. @@ -661,20 +671,25 @@ impl Wallet { indexed_graph.apply_changeset(changeset.indexer.into()); indexed_graph.apply_changeset(changeset.tx_graph.into()); - let broadcast_queue = BroadcastQueue::from_changeset(changeset.broadcast_queue); + let broadcast_queue = CanonicalizationTracker::from_changeset(changeset.intent_tracker); let stage = ChangeSet::default(); - Ok(Some(Wallet { + let mut wallet = Wallet { signers, change_signers, chain, indexed_graph, - broadcast_queue, + intent_tracker: broadcast_queue, stage, network, secp, - })) + network_view: CanonicalView::default(), + intent_view: CanonicalView::default(), + }; + wallet.update_views(); + + Ok(Some(wallet)) } /// Get the Bitcoin network the wallet is using. @@ -873,12 +888,12 @@ impl Wallet { self._list_unspent(CanonicalizationParams::default()) } - /// List unspent outputs (UTXOs) of this wallet assuming that unbroadcasted transactions are + /// List unspent outputs (UTXOs) of this wallet assuming that intended transactions are /// canonical. /// /// TODO: Better docs here. - pub fn list_unspent_with_unbroadcasted(&self) -> impl Iterator + '_ { - self._list_unspent(self.include_unbroadcasted_canonicalization_params()) + pub fn list_intended_unspent(&self) -> impl Iterator + '_ { + self._list_unspent(self.intent_view_canonicalization_params()) } fn _list_unspent( @@ -893,7 +908,7 @@ impl Wallet { params, self.indexed_graph.index.outpoints().iter().cloned(), ) - .map(|((k, i), full_txo)| new_local_utxo(&self.broadcast_queue, k, i, full_txo)) + .map(|((k, i), full_txo)| new_local_utxo(&self.intent_tracker, k, i, full_txo)) } /// Get the [`TxDetails`] of a wallet transaction. @@ -935,7 +950,7 @@ impl Wallet { CanonicalizationParams::default(), self.indexed_graph.index.outpoints().iter().cloned(), ) - .map(|((k, i), full_txo)| new_local_utxo(&self.broadcast_queue, k, i, full_txo)) + .map(|((k, i), full_txo)| new_local_utxo(&self.intent_tracker, k, i, full_txo)) } /// Get all the checkpoints the wallet is currently storing indexed by height. @@ -982,15 +997,6 @@ impl Wallet { pub fn get_utxo(&self, outpoint: OutPoint) -> Option { self._get_utxo(CanonicalizationParams::default(), outpoint) } - - /// Returns the utxo owned by this wallet corresponding to `outpoint`. - pub fn get_utxo_include_unbroadcasted(&self, outpoint: OutPoint) -> Option { - self._get_utxo( - self.include_unbroadcasted_canonicalization_params(), - outpoint, - ) - } - fn _get_utxo(&self, params: CanonicalizationParams, op: OutPoint) -> Option { let ((keychain, index), _) = self.indexed_graph.index.txout(op)?; self.indexed_graph @@ -1001,7 +1007,7 @@ impl Wallet { params, core::iter::once(((), op)), ) - .map(|(_, full_txo)| new_local_utxo(&self.broadcast_queue, keychain, index, full_txo)) + .map(|(_, full_txo)| new_local_utxo(&self.intent_tracker, keychain, index, full_txo)) .next() } @@ -1199,6 +1205,25 @@ impl Wallet { /// To iterate over all canonical transactions, including those that are irrelevant, use /// [`TxGraph::list_canonical_txs`]. pub fn transactions(&self) -> impl Iterator + '_ { + self.canonical_txs() + } + + /// Iterate over relevant and canonical transactions in the wallet. + /// + /// A transaction is relevant when it spends from or spends to at least one tracked output. + /// + /// A transaction is canonical when either: + /// * It is confirmed in the best chain and does not conflict with a transaction in the + /// broadcast queue. + /// * It seen in the mempool, is not evicted and does not conflict with a transaction that is + /// confirmed or seen later in the mempool. + /// + /// To iterate over all transactions, including those that are irrelevant and not canonical, use + /// [`TxGraph::full_txs`]. + /// + /// To iterate over all canonical transactions, including those that are irrelevant, use + /// [`TxGraph::list_canonical_txs`]. + pub fn canonical_txs(&self) -> impl Iterator + '_ { let tx_graph = self.indexed_graph.graph(); let tx_index = &self.indexed_graph.index; tx_graph @@ -1223,10 +1248,10 @@ impl Wallet { /// # let mut wallet:Wallet = todo!(); /// // Transactions by chain position: first unconfirmed then descending by confirmed height. /// let sorted_txs: Vec = - /// wallet.transactions_sort_by(|tx1, tx2| tx2.chain_position.cmp(&tx1.chain_position)); + /// wallet.canonical_txs_sort_by(|tx1, tx2| tx2.chain_position.cmp(&tx1.chain_position)); /// # Ok::<(), anyhow::Error>(()) /// ``` - pub fn transactions_sort_by(&self, compare: F) -> Vec + pub fn canonical_txs_sort_by(&self, compare: F) -> Vec where F: FnMut(&WalletTx, &WalletTx) -> Ordering, { @@ -1241,9 +1266,9 @@ impl Wallet { self._balance(CanonicalizationParams::default()) } - /// Return the balance that includes unbroadcasted transactions. - pub fn balance_with_unbroadcasted(&self) -> Balance { - self._balance(self.include_unbroadcasted_canonicalization_params()) + /// Return the balance that includes tracked transactions which are not canonical. + pub fn intended_balance(&self) -> Balance { + self._balance(self.intent_view_canonicalization_params()) } fn _balance(&self, params: CanonicalizationParams) -> Balance { @@ -1733,7 +1758,7 @@ impl Wallet { .list_canonical_txs( &self.chain, chain_tip, - self.include_unbroadcasted_canonicalization_params(), + self.intent_view_canonicalization_params(), ) .map(|canon_tx| (canon_tx.tx_node.txid, canon_tx.chain_position)) .collect::>(); @@ -1774,6 +1799,13 @@ impl Wallet { .input .drain(..) .map(|txin| -> Result<_, BuildFeeBumpError> { + // cannot fee-bump if a conflicting transaction is already confirmed + if let Some((spend_txid, _, reason)) = self.network_view.spend(txin.previous_output) + { + if spend_txid != txid && matches!(reason, CanonicalReason::Anchor { .. }) { + return Err(BuildFeeBumpError::TransactionConfirmed(spend_txid)); + } + } graph // Get previous transaction .get_tx(txin.previous_output.txid) @@ -1808,7 +1840,7 @@ impl Wallet { derivation_index, chain_position, needs_broadcast: self - .broadcast_queue + .intent_tracker .contains(txin.previous_output.txid), }), }, @@ -2013,7 +2045,7 @@ impl Wallet { .list_canonical_txs( &self.chain, chain_tip, - self.include_unbroadcasted_canonicalization_params(), + self.intent_view_canonicalization_params(), ) .filter(|canon_tx| prev_txids.contains(&canon_tx.tx_node.txid)) // This is for a small performance gain. Although `.filter` filters out excess txs, it @@ -2155,6 +2187,19 @@ impl Wallet { /// Given the options returns the list of utxos that must be used to form the /// transaction and any further that may be used if needed. fn filter_utxos(&self, params: &TxParams, current_height: u32) -> Vec { + let uncanonical_txids_to_exclude = self + .uncanonical_txs() + .filter_map(|utx| match params.uncanonical_utxo_policy { + crate::UncanonicalUtxoPolicy::Include if utx.contains_conflicts() => None, + crate::UncanonicalUtxoPolicy::IncludeUnconfirmedConflicts + if utx.contains_confirmed_conflicts() => + { + None + } + _ => Some(utx.txid), + }) + .collect::>(); + if params.manually_selected_only { vec![] // only process optional UTxOs if manually_selected_only is false @@ -2167,14 +2212,18 @@ impl Wallet { .filter_chain_unspents( &self.chain, self.chain.tip().block_id(), - self.include_unbroadcasted_canonicalization_params(), + self.intent_view_canonicalization_params(), self.indexed_graph.index.outpoints().iter().cloned(), ) + // remove all uncanonical outputs that do not satisfy the uncanonical output policy + .filter(move |(_, full_txo)| { + !uncanonical_txids_to_exclude.contains(&full_txo.outpoint.txid) + }) // only create LocalOutput if UTxO is mature .filter_map(move |((k, i), full_txo)| { full_txo .is_mature(current_height) - .then(|| new_local_utxo(&self.broadcast_queue, k, i, full_txo)) + .then(|| new_local_utxo(&self.intent_tracker, k, i, full_txo)) }) // only process UTxOs not selected manually, they will be considered later in the // chain NOTE: this avoid UTxOs in both required and optional list @@ -2532,7 +2581,7 @@ impl Wallet { .list_canonical_txs( chain, chain.tip().block_id(), - self.include_unbroadcasted_canonicalization_params(), + self.intent_view_canonicalization_params(), ) .map(|c| c.tx_node.txid) .collect(); @@ -2561,117 +2610,274 @@ impl Wallet { /// Methods to interact with the broadcast queue. impl Wallet { - pub(crate) fn stage_changes>(&mut self, changeset: C) { + fn update_network_view(&mut self) { + let chain = &self.chain; + let tip = chain.tip().block_id(); + let network_params = CanonicalizationParams::default(); + let network_view_iter = + self.indexed_graph + .graph() + .canonical_iter(chain, tip, network_params); + self.network_view = CanonicalView::from_iter(network_view_iter).expect("infallible"); + } + + /// Updates the intent canonical view and returns evicted txids from the old view. + fn update_intent_view(&mut self) -> impl Iterator + '_ { + let chain = &self.chain; + let tip = chain.tip().block_id(); + let intent_params = self.intent_view_canonicalization_params(); + let intent_view_iter = self + .indexed_graph + .graph() + .canonical_iter(chain, tip, intent_params); + let mut temp_intent_view = CanonicalView::from_iter(intent_view_iter).expect("infallible"); + core::mem::swap(&mut self.intent_view, &mut temp_intent_view); + temp_intent_view + .txs + .into_keys() + .filter(|old_txid| !self.intent_view.txs.contains_key(old_txid)) + } + + fn update_views(&mut self) { + self.update_network_view(); + self.update_intent_view(); + } + + /// Stage changes and return evicted txids from intent canonical view. + /// + /// TODO: Do we also need to return evicted txids from network canonical view. + pub(crate) fn stage_changes>(&mut self, changeset: C) -> Vec { let changeset: ChangeSet = changeset.into(); - if !changeset.tx_graph.is_empty() { - self.stage.merge( - self.broadcast_queue - .filter_from_graph_changeset(&changeset.tx_graph) - .into(), - ); + + // TODO: Skip rebuilding views for certain types of changes. + self.update_network_view(); + let evicted_txids = self.update_intent_view().collect::>(); + for &evicted_txid in &evicted_txids { + self.stage + .merge(self.intent_tracker.remove(evicted_txid).into()); } self.stage.merge(changeset); + evicted_txids } /// [`CanonicalizationParams`] which includes transactions in the broadcast queue. - pub fn include_unbroadcasted_canonicalization_params(&self) -> CanonicalizationParams { + pub(crate) fn intent_view_canonicalization_params(&self) -> CanonicalizationParams { CanonicalizationParams { - assume_canonical: self.broadcast_queue.txids().collect(), + assume_canonical: self.intent_tracker.txids().collect(), } } - /// Add a transaction to the broadcast queue transaction for broadcast. + /// Keep track of a transaction and untrack all conflicts of it (if any). /// - /// Unsigned transactions can be inserted and later reinserted when signed. + /// The caller is responsible for replacing, cpfp-ing, or manually abandoning the transaction if + /// it ever becomes network-uncanonical (TODO: Expand on this). /// - /// Conflicts of the inserted transaction will be removed from the broadcast queue. - pub fn add_tx_to_broadcast_queue>>(&mut self, tx: T) { + /// Unsigned transactions can be inserted and later reinserted when signed (TODO: Work on this + /// API, maybe don't include this for now?). + pub fn track_tx>>(&mut self, tx: T) -> Vec { let tx: Arc = tx.into(); let txid = tx.compute_txid(); - self.stage.merge(self.indexed_graph.insert_tx(tx).into()); - - // TODO: Figure out whether we should displace conflicting txs in the queue on insertion. - // let queue_changeset = self.broadcast_queue.push(txid); - let queue_changeset = self - .broadcast_queue - .push_and_displace_conflicts(self.indexed_graph.graph(), txid); - - self.stage.merge(queue_changeset.into()); + let mut changeset = ChangeSet::default(); + changeset.merge(self.indexed_graph.insert_tx(tx).into()); + changeset.merge(self.intent_tracker.push(txid).into()); + self.stage_changes(changeset) } - /// Remove transaction from the broadcast queue. + /// Untrack transaction so that the wallet will forget about it if it becomes network + /// uncanonical. /// - /// This will also remove all descendants of this transaction in the broadcast queue. + /// This will also untrack all descendants of this transaction. /// - /// Returns `true` if the transaction is successfully removed. - pub fn remove_tx_from_broadcast_queue(&mut self, txid: Txid) -> bool { - // // TODO: Figure out whether we should displace descendants of removed txs as well. - let queue_changeset = self - .broadcast_queue - .remove_and_displace_dependants(self.indexed_graph.graph(), txid); - // let queue_changeset = self.broadcast_queue.remove(txid); - let is_removed = !queue_changeset.is_empty(); - self.stage.merge(queue_changeset.into()); - is_removed + /// Returns all transactions untracked by this operation. + pub fn untrack_tx(&mut self, txid: Txid) -> Vec { + let mut changeset = ChangeSet::default(); + changeset.merge(self.intent_tracker.remove(txid).into()); + self.stage_changes(changeset) } - /// Broadcast queue. + /// List transactions that are tracked but not network-canonical. /// - /// Elements are in broadcast order. - pub fn broadcast_queue(&self) -> impl Iterator> + '_ { - self.broadcast_queue.txids().filter_map(|txid| { - let tx_opt = self.indexed_graph.graph().get_tx(txid); - debug_assert!( - tx_opt.is_some(), - "A txid in the broadcast queue must exist in the graph" - ); - tx_opt - }) + /// # Order + /// + /// Transactions are returned in the order that they were tracked with + /// [`track_tx`](Self::track_tx). + /// + /// These are the transactions that commands action! + pub fn uncanonical_txs( + &self, + ) -> impl Iterator> + '_ { + let graph = self.indexed_graph.graph(); + let tip = self.chain.tip(); + self.intent_tracker + .txids() + .filter(|txid| { + debug_assert!( + self.intent_view.txs.contains_key(txid), + "A tracked tx must exist in the `intent_view`", + ); + !self.network_view.txs.contains_key(txid) + }) + .filter_map(|txid| { + let txn_opt = graph.get_tx_node(txid); + debug_assert!(txn_opt.is_some(), "A tracked txid must exist in the graph"); + txn_opt + }) + .filter_map(move |tx_node| { + let txid = tx_node.txid; + let tx = tx_node.tx; + let network_seen = if tx_node.last_seen.is_some() || !tx_node.anchors.is_empty() { + NetworkSeen::Seen + } else { + NetworkSeen::NeverSeen + }; + let uncanonical_spends = tx + .input + .iter() + .filter_map(|txin| { + let op = txin.previous_output; + + let mut spend = UncanonicalSpendInfo::::default(); + + let mut visited = HashSet::::new(); + let mut stack = Vec::<(OutPoint, u32)>::new(); // (prev-outpoint, distance) + stack.push((op, 0)); + + while let Some((prevout, distance)) = stack.pop() { + if !visited.insert(prevout) { + continue; + } + if self.network_view.txs.contains_key(&prevout.txid) { + // This tx is canonical. + continue; + } + + let prev_tx_node = match graph.get_tx_node(prevout.txid) { + Some(prev_tx) => prev_tx, + None => continue, + }; + + let uncanonical_ancestor_entry = + match spend.uncanonical_ancestors.entry(prevout.txid) { + Entry::Vacant(entry) => entry, + // Uncanonical tx already visited. + Entry::Occupied(_) => continue, + }; + uncanonical_ancestor_entry.insert( + if !prev_tx_node.anchors.is_empty() + || prev_tx_node.last_seen.is_some() + { + NetworkSeen::Seen + } else { + NetworkSeen::NeverSeen + }, + ); + + if let Some((conflict_txid, _, reason)) = + self.network_view.spend(prevout) + { + if let Entry::Vacant(entry) = + spend.conflicting_txs.entry(conflict_txid) + { + let conflict_tx_node = match graph.get_tx_node(conflict_txid) { + Some(tx_node) => tx_node, + None => continue, + }; + let maybe_direct_anchor = conflict_tx_node + .anchors + .iter() + .find(|a| { + self.chain + .is_block_in_chain(a.block_id, tip.block_id()) + .expect("infallible") + .unwrap_or(false) + }) + .cloned(); + let conflict_pos = match maybe_direct_anchor { + Some(anchor) => ChainPosition::Confirmed { + anchor, + transitively: None, + }, + None => match reason.clone() { + CanonicalReason::Assumed { .. } => { + debug_assert!(false, "network view must not have any assumed-canonical txs"); + ChainPosition::Unconfirmed { + first_seen: None, + last_seen: None, + } + } + CanonicalReason::Anchor { anchor, descendant } => { + ChainPosition::Confirmed { + anchor, + transitively: descendant, + } + } + CanonicalReason::ObservedIn { observed_in, .. } => { + ChainPosition::Unconfirmed { + first_seen: conflict_tx_node.first_seen, + last_seen: match observed_in { + ObservedIn::Block(_) => None, + ObservedIn::Mempool(last_seen) => { + Some(last_seen) + } + }, + } + } + }, + }; + entry.insert((distance, conflict_pos)); + } + } + + stack.extend( + prev_tx_node + .tx + .input + .iter() + .map(|txin| (txin.previous_output, distance + 1)), + ); + } + + Some((op, spend)) + }) + .collect(); + + Some(UncanonicalTx { + txid, + tx, + network_seen, + uncanonical_spends, + }) + }) } - /// Number of transactions in the broadcast queue. - pub fn broadcast_queue_len(&self) -> usize { - self.broadcast_queue.txids().len() + /// Whether the transaction is a self-spend. + /// + /// A self-spend if where all inputs and outputs are owned by this wallet. + /// + /// This may return false-negatives if there is missing information from the wallet. + pub fn is_self_spend(&self, tx: &Transaction) -> bool { + let index = &self.indexed_graph.index; + for txout in &tx.output { + if index.index_of_spk(txout.script_pubkey.clone()).is_none() { + return false; + } + } + for txin in &tx.input { + if index.txout(txin.previous_output).is_none() { + return false; + } + } + true } /// Mark a transaction as successfully broadcasted. - pub fn mark_tx_as_broadcasted_at(&mut self, txid: Txid, at: u64) { + pub fn insert_tx_broadcasted_at(&mut self, txid: Txid, at: u64) { let mut update = TxUpdate::default(); update.seen_ats.insert((txid, at)); let changeset = self.indexed_graph.apply_update(update); self.stage_changes(changeset); } - - /// Get `tx` descendants that are in the broadcast queue. - /// - /// This can be used to check which transactions will no longer be broadcastable if `tx` is - /// becomes unavailable. - pub fn descendants_in_broadcast_queue( - &self, - tx: &Transaction, - ) -> impl Iterator> + '_ { - self.indexed_graph - .graph() - .walk_descendants(tx.compute_txid(), |_, txid| Some(txid)) - .filter(|&txid| self.broadcast_queue.contains(txid)) - .filter_map(|txid| self.indexed_graph.graph().get_tx(txid)) - } - - /// Get `tx` conflicts that are in the broadcast queue. - /// - /// This can be used to check which transactions will be removed from the broadcast queue if - /// `tx` is inserted. - pub fn conflicts_in_broadcast_queue<'t>( - &'t self, - tx: &'t Transaction, - ) -> impl Iterator> + 't { - self.indexed_graph - .graph() - .walk_conflicts(tx, |_, txid| Some(txid)) - .filter(|&txid| self.broadcast_queue.contains(txid)) - .filter_map(|txid| self.indexed_graph.graph().get_tx(txid)) - } } /// Methods to construct sync/full-scan requests for spk-based chain sources. @@ -2785,7 +2991,7 @@ where } fn new_local_utxo( - broadcast_queue: &BroadcastQueue, + broadcast_queue: &CanonicalizationTracker, keychain: KeychainKind, derivation_index: u32, full_txo: FullTxOut, diff --git a/wallet/src/wallet/tx_builder.rs b/wallet/src/wallet/tx_builder.rs index c5daddfc..bae1da69 100644 --- a/wallet/src/wallet/tx_builder.rs +++ b/wallet/src/wallet/tx_builder.rs @@ -141,6 +141,19 @@ pub(crate) struct TxParams { pub(crate) bumping_fee: Option, pub(crate) current_height: Option, pub(crate) allow_dust: bool, + pub(crate) uncanonical_utxo_policy: UncanonicalUtxoPolicy, +} + +#[derive(Default, Debug, Clone)] +pub(crate) enum UncanonicalUtxoPolicy { + /// Exlude all uncanonical UTXOs. + #[default] + Exclude, + /// Include uncanonical UTXOs which do not conflict with the canonical history. + Include, + /// Include uncanonical UTXOs, including those that conflict with unconfirmed canonical + /// history. + IncludeUnconfirmedConflicts, } #[derive(Clone, Copy, Debug)] @@ -281,7 +294,8 @@ impl<'a, Cs> TxBuilder<'a, Cs> { .iter() .map(|outpoint| { self.wallet - .get_utxo_include_unbroadcasted(*outpoint) + // TODO: We should really pick from the intent canonical view. + .get_utxo(*outpoint) .ok_or(AddUtxoError::UnknownUtxo(*outpoint)) .map(|output| { ( @@ -459,20 +473,6 @@ impl<'a, Cs> TxBuilder<'a, Cs> { self } - /// Add unbroadcasted transactions to the unspendable list. - pub fn exclude_unbroadcasted(&mut self) -> &mut Self { - let unbroadcasted_ops = self.wallet.broadcast_queue().flat_map(|tx| { - let txid = tx.compute_txid(); - (0_u32..) - .take(tx.output.len()) - .map(move |vout| OutPoint::new(txid, vout)) - }); - for op in unbroadcasted_ops { - self.params.unspendable.insert(op); - } - self - } - /// Sign with a specific sig hash /// /// **Use this option very carefully** @@ -640,6 +640,19 @@ impl<'a, Cs> TxBuilder<'a, Cs> { self } + /// Include uncanonical UTXOs which do not conflict with the canonical history. + pub fn include_uncanonical(&mut self) -> &mut Self { + self.params.uncanonical_utxo_policy = UncanonicalUtxoPolicy::Include; + self + } + + /// Include uncanonical UTXOs, inclusive of those that conflict with unconfirmed canonical + /// transactions. UTXOs that conflict with confirmed history are still excluded. + pub fn include_uncanonical_conflicts(&mut self) -> &mut Self { + self.params.uncanonical_utxo_policy = UncanonicalUtxoPolicy::IncludeUnconfirmedConflicts; + self + } + /// Sets the address to *drain* excess coins to. /// /// Usually, when there are excess coins they are sent to a change address generated by the diff --git a/wallet/tests/wallet.rs b/wallet/tests/wallet.rs index 8ed28159..83eca351 100644 --- a/wallet/tests/wallet.rs +++ b/wallet/tests/wallet.rs @@ -4605,13 +4605,13 @@ fn single_descriptor_wallet_can_create_tx_and_receive_change() { } #[test] -fn test_transactions_sort_by() { +fn test_canonical_txs_sort_by() { let (mut wallet, _txid) = get_funded_wallet_wpkh(); receive_output(&mut wallet, Amount::from_sat(25_000), ReceiveTo::Mempool(0)); // sort by chain position, unconfirmed then confirmed by descending block height let sorted_txs: Vec = - wallet.transactions_sort_by(|t1, t2| t2.chain_position.cmp(&t1.chain_position)); + wallet.canonical_txs_sort_by(|t1, t2| t2.chain_position.cmp(&t1.chain_position)); let conf_heights: Vec> = sorted_txs .iter() .map(|tx| tx.chain_position.confirmation_height_upper_bound()) From 8a2d04f6026b5a60f3ac0c670dad394c6c8dbfda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 28 Jun 2025 04:13:51 +0000 Subject: [PATCH 4/5] feat!: [WIP] More `IntentTracker` tweaks --- wallet/src/wallet/intent_tracker.rs | 247 ++++++++++++++++++++++++++-- wallet/src/wallet/mod.rs | 208 +++++++---------------- 2 files changed, 292 insertions(+), 163 deletions(-) diff --git a/wallet/src/wallet/intent_tracker.rs b/wallet/src/wallet/intent_tracker.rs index 2b053a48..d18ba791 100644 --- a/wallet/src/wallet/intent_tracker.rs +++ b/wallet/src/wallet/intent_tracker.rs @@ -1,16 +1,21 @@ //! Unbroadcasted transaction queue. +use core::convert::Infallible; + use alloc::sync::Arc; use alloc::vec::Vec; use bitcoin::OutPoint; use bitcoin::Transaction; use chain::tx_graph; +use chain::tx_graph::TxNode; use chain::Anchor; +use chain::BlockId; use chain::CanonicalIter; use chain::CanonicalReason; use chain::ChainOracle; use chain::ChainPosition; +use chain::ObservedIn; use chain::TxGraph; use crate::collections::BTreeMap; @@ -21,10 +26,11 @@ use crate::collections::VecDeque; use bdk_chain::bdk_core::Merge; use bitcoin::Txid; +/// A consistent view of transactions. #[derive(Debug)] pub struct CanonicalView { - pub txs: HashMap, CanonicalReason)>, - pub spends: HashMap, + pub(crate) txs: HashMap, CanonicalReason)>, + pub(crate) spends: HashMap, } impl Default for CanonicalView { @@ -37,7 +43,7 @@ impl Default for CanonicalView { } impl CanonicalView { - pub fn from_iter(iter: CanonicalIter<'_, A, C>) -> Result + pub(crate) fn from_iter(iter: CanonicalIter<'_, A, C>) -> Result where A: Anchor, C: ChainOracle, @@ -53,11 +59,49 @@ impl CanonicalView { Ok(view) } + /// Return the transaction that spends the given `op`. pub fn spend(&self, op: OutPoint) -> Option<(Txid, Arc, &CanonicalReason)> { let txid = self.spends.get(&op)?; let (tx, reason) = self.txs.get(txid)?; Some((*txid, tx.clone(), reason)) } + + /// Iterate all descendants of the given transaction in the [`CanonicalView`], avoiding + /// duplicates. + fn descendants( + &self, + tx: impl AsRef, + ) -> impl Iterator, &CanonicalReason)> { + let tx: &Transaction = tx.as_ref(); + let txid = tx.compute_txid(); + + let mut visited = HashSet::::new(); + visited.insert(txid); + + let mut outpoints = core::iter::repeat_n(txid, tx.output.len()) + .zip(0_u32..) + .map(|(txid, vout)| OutPoint::new(txid, vout)) + .collect::>(); + + core::iter::from_fn(move || { + while let Some(op) = outpoints.pop() { + let (txid, tx, reason) = match self.spend(op) { + Some(spent_by) => spent_by, + None => continue, + }; + if !visited.insert(txid) { + continue; + } + outpoints.extend( + core::iter::repeat_n(txid, tx.output.len()) + .zip(0_u32..) + .map(|(txid, vout)| OutPoint::new(txid, vout)), + ); + return Some((txid, tx, reason)); + } + None + }) + } } /// Indicates whether a transaction was observed in the network. @@ -83,8 +127,8 @@ impl NetworkSeen { /// /// This struct models an input that attempts to spend an output via a transaction path /// that is not part of the canonical network view (e.g., evicted, conflicted, or unknown). -#[derive(Debug, Clone, Default)] -pub struct UncanonicalSpendInfo { +#[derive(Debug, Clone)] +pub struct SpendInfo { /// Non-canonical ancestor transactions reachable from this input. /// /// Each entry maps an ancestor `Txid` to its observed status in the network. @@ -95,12 +139,177 @@ pub struct UncanonicalSpendInfo { /// Canonical transactions that conflict with this spend. /// - /// This may be a direct conflict or a conflict with one of the `uncanonical_ancestors`. - /// The value is a tuple of (conflict distance, chain position). + /// This may be a direct conflict, a conflict with one of the [`uncanonical_ancestors`], or a + /// canonical descendant of a conflict (which are also conflicts). The value is the chain + /// position of the conflict. /// - /// Descendants of conflicts are also conflicts. These transactions will have the same distance - /// value as their conflicting parent. - pub conflicting_txs: BTreeMap)>, + /// [`uncanonical_ancestors`]: Self::uncanonical_ancestors + pub conflicting_txs: BTreeMap>, +} + +impl Default for SpendInfo { + fn default() -> Self { + Self { + uncanonical_ancestors: BTreeMap::new(), + conflicting_txs: BTreeMap::new(), + } + } +} + +impl SpendInfo { + pub(crate) fn new( + chain: &C, + chain_tip: BlockId, + tx_graph: &TxGraph, + network_view: &CanonicalView, + op: OutPoint, + ) -> Self + where + C: ChainOracle, + { + use crate::collections::btree_map::Entry; + + let mut spend_info = Self::default(); + + let mut visited = HashSet::::new(); + let mut stack = Vec::::new(); + stack.push(op); + + while let Some(prev_op) = stack.pop() { + if !visited.insert(prev_op) { + // Outpoint already visited. + continue; + } + if network_view.txs.contains_key(&prev_op.txid) { + // Tx is already canonical. + continue; + } + + let prev_tx_node = match tx_graph.get_tx_node(prev_op.txid) { + Some(prev_tx) => prev_tx, + // Tx not known by tx-graph. + None => continue, + }; + + match spend_info.uncanonical_ancestors.entry(prev_op.txid) { + Entry::Vacant(entry) => entry.insert( + if !prev_tx_node.anchors.is_empty() || prev_tx_node.last_seen.is_some() { + NetworkSeen::Seen + } else { + NetworkSeen::NeverSeen + }, + ), + // Tx already visited. + Entry::Occupied(_) => continue, + }; + + // Find conflicts to populate `conflicting_txs`. + if let Some((conflict_txid, conflict_tx, reason)) = network_view.spend(prev_op) { + let conflict_tx_entry = match spend_info.conflicting_txs.entry(conflict_txid) { + Entry::Vacant(vacant_entry) => vacant_entry, + // Skip if conflicting tx already visited. + Entry::Occupied(_) => continue, + }; + let conflict_tx_node = match tx_graph.get_tx_node(conflict_txid) { + Some(tx_node) => tx_node, + // Skip if conflict tx does not exist in our graph. + None => continue, + }; + conflict_tx_entry.insert(Self::get_pos( + chain, + chain_tip, + &conflict_tx_node, + reason, + )); + + // Find descendants of `conflict_tx` too. + for (conflict_txid, _, reason) in network_view.descendants(conflict_tx) { + let conflict_tx_entry = match spend_info.conflicting_txs.entry(conflict_txid) { + Entry::Vacant(vacant_entry) => vacant_entry, + // Skip if conflicting tx already visited. + Entry::Occupied(_) => continue, + }; + let conflict_tx_node = match tx_graph.get_tx_node(conflict_txid) { + Some(tx_node) => tx_node, + // Skip if conflict tx does not exist in our graph. + None => continue, + }; + conflict_tx_entry.insert(Self::get_pos( + chain, + chain_tip, + &conflict_tx_node, + reason, + )); + } + } + + stack.extend( + prev_tx_node + .tx + .input + .iter() + .map(|txin| txin.previous_output), + ); + } + + spend_info + } + + fn get_pos( + chain: &C, + chain_tip: BlockId, + tx_node: &TxNode<'_, Arc, A>, + canonical_reason: &CanonicalReason, + ) -> ChainPosition + where + C: ChainOracle, + { + let maybe_direct_anchor = tx_node + .anchors + .iter() + .find(|a| { + chain + .is_block_in_chain(a.anchor_block(), chain_tip) + .expect("infallible") + .unwrap_or(false) + }) + .cloned(); + match maybe_direct_anchor { + Some(anchor) => ChainPosition::Confirmed { + anchor, + transitively: None, + }, + None => match canonical_reason.clone() { + CanonicalReason::Assumed { .. } => { + debug_assert!( + false, + "network view must not have any assumed-canonical txs" + ); + ChainPosition::Unconfirmed { + first_seen: None, + last_seen: None, + } + } + CanonicalReason::Anchor { anchor, descendant } => ChainPosition::Confirmed { + anchor, + transitively: descendant, + }, + CanonicalReason::ObservedIn { observed_in, .. } => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: match observed_in { + ObservedIn::Block(_) => None, + ObservedIn::Mempool(last_seen) => Some(last_seen), + }, + }, + }, + } + } + + /// If the spend info is empty, then it can belong in the canonical history without displacing + /// existing transactions or need to add additional transactions other than itself. + pub fn is_empty(&self) -> bool { + self.uncanonical_ancestors.is_empty() && self.conflicting_txs.is_empty() + } } /// Tracked and uncanonical transaction. @@ -113,7 +322,7 @@ pub struct UncanonicalTx { /// Whether the transaction was one seen by the network. pub network_seen: NetworkSeen, /// Spends, identified by prevout, which are uncanonical. - pub uncanonical_spends: BTreeMap>, + pub uncanonical_spends: BTreeMap>, } impl UncanonicalTx { @@ -154,13 +363,14 @@ impl UncanonicalTx { self.uncanonical_spends .values() .flat_map(|spend| &spend.conflicting_txs) - .map(|(&txid, (_, pos))| (txid, pos)) + .map(|(&txid, pos)| (txid, pos)) .filter({ let mut dedup = HashSet::::new(); move |(txid, _)| dedup.insert(*txid) }) } + /// Iterate over confirmed, network-canonical txids which conflict with this transaction. pub fn confirmed_conflicts(&self) -> impl Iterator { self.conflicts().filter_map(|(txid, pos)| match pos { ChainPosition::Confirmed { anchor, .. } => Some((txid, anchor)), @@ -168,6 +378,7 @@ impl UncanonicalTx { }) } + /// Iterate over unconfirmed, network-canonical txids which conflict with this transaction. pub fn unconfirmed_conflicts(&self) -> impl Iterator + '_ { self.conflicts().filter_map(|(txid, pos)| match pos { ChainPosition::Confirmed { .. } => None, @@ -185,20 +396,20 @@ impl UncanonicalTx { .map(|(&txid, &network_seen)| (txid, network_seen)) } + /// Whether this transaction conflicts with network-canonical transactions. pub fn contains_conflicts(&self) -> bool { self.conflicts().next().is_some() } + /// Whether this transaction conflicts with confirmed, network-canonical transactions. pub fn contains_confirmed_conflicts(&self) -> bool { self.confirmed_conflicts().next().is_some() } } -/// An ordered unbroadcasted staging area. -/// -/// It is ordered in case of RBF txs. +/// An ordered tracking area for uncanonical transactions. #[derive(Debug, Clone, Default)] -pub struct CanonicalizationTracker { +pub struct IntentTracker { /// Tracks the order that transactions are added. order: VecDeque, @@ -233,10 +444,10 @@ impl Merge for ChangeSet { } } -impl CanonicalizationTracker { +impl IntentTracker { /// Construct [`Unbroadcasted`] from the given `changeset`. pub fn from_changeset(changeset: ChangeSet) -> Self { - let mut out = CanonicalizationTracker::default(); + let mut out = IntentTracker::default(); out.apply_changeset(changeset); out } diff --git a/wallet/src/wallet/mod.rs b/wallet/src/wallet/mod.rs index 183ad915..e175637a 100644 --- a/wallet/src/wallet/mod.rs +++ b/wallet/src/wallet/mod.rs @@ -19,10 +19,9 @@ use alloc::{ sync::Arc, vec::Vec, }; -use chain::{CanonicalReason, ChainOracle, ObservedIn}; +use chain::CanonicalReason; use core::{cmp::Ordering, fmt, mem, ops::Deref}; -use intent_tracker::{CanonicalView, NetworkSeen, UncanonicalSpendInfo, UncanonicalTx}; -use std::collections::btree_map::Entry; +use intent_tracker::{CanonicalView, NetworkSeen, SpendInfo, UncanonicalTx}; use bdk_chain::{ indexer::keychain_txout::KeychainTxOutIndex, @@ -81,7 +80,7 @@ use crate::wallet::{ // re-exports pub use bdk_chain::Balance; pub use changeset::ChangeSet; -pub use intent_tracker::CanonicalizationTracker; +pub use intent_tracker::IntentTracker; pub use params::*; pub use persisted::*; pub use utils::IsDust; @@ -110,7 +109,7 @@ pub struct Wallet { change_signers: Arc, chain: LocalChain, indexed_graph: IndexedTxGraph>, - intent_tracker: CanonicalizationTracker, + intent_tracker: IntentTracker, network_view: CanonicalView, intent_view: CanonicalView, stage: ChangeSet, @@ -457,7 +456,7 @@ impl Wallet { let indexed_graph = IndexedTxGraph::new(index); let indexed_graph_changeset = indexed_graph.initial_changeset(); - let intent_tracker = CanonicalizationTracker::default(); + let intent_tracker = IntentTracker::default(); let intent_tracker_changeset = intent_tracker.initial_changeset(); let stage = ChangeSet { @@ -482,7 +481,7 @@ impl Wallet { network_view: CanonicalView::default(), intent_view: CanonicalView::default(), }; - wallet.update_views(); + wallet.rebuild_all_views(); Ok(wallet) } @@ -671,7 +670,7 @@ impl Wallet { indexed_graph.apply_changeset(changeset.indexer.into()); indexed_graph.apply_changeset(changeset.tx_graph.into()); - let broadcast_queue = CanonicalizationTracker::from_changeset(changeset.intent_tracker); + let broadcast_queue = IntentTracker::from_changeset(changeset.intent_tracker); let stage = ChangeSet::default(); @@ -687,7 +686,7 @@ impl Wallet { network_view: CanonicalView::default(), intent_view: CanonicalView::default(), }; - wallet.update_views(); + wallet.rebuild_all_views(); Ok(Some(wallet)) } @@ -2610,57 +2609,67 @@ impl Wallet { /// Methods to interact with the broadcast queue. impl Wallet { - fn update_network_view(&mut self) { + fn rebuild_all_views(&mut self) { + let _ = self.rebuild_network_view(); + let _ = self.rebuild_intent_view(); + } + + /// Updates the network canonical view and returns evicted txids from the old view. + fn rebuild_network_view(&mut self) -> impl Iterator + '_ { + let params = CanonicalizationParams::default(); + let view = &mut self.network_view; let chain = &self.chain; - let tip = chain.tip().block_id(); - let network_params = CanonicalizationParams::default(); - let network_view_iter = - self.indexed_graph - .graph() - .canonical_iter(chain, tip, network_params); - self.network_view = CanonicalView::from_iter(network_view_iter).expect("infallible"); + let chain_tip = chain.tip().block_id(); + let graph = self.indexed_graph.graph(); + Self::_rebuild_view(view, params, chain, chain_tip, graph) } /// Updates the intent canonical view and returns evicted txids from the old view. - fn update_intent_view(&mut self) -> impl Iterator + '_ { + fn rebuild_intent_view(&mut self) -> impl Iterator + '_ { + let params = self.intent_view_canonicalization_params(); + let view = &mut self.intent_view; let chain = &self.chain; - let tip = chain.tip().block_id(); - let intent_params = self.intent_view_canonicalization_params(); - let intent_view_iter = self - .indexed_graph - .graph() - .canonical_iter(chain, tip, intent_params); - let mut temp_intent_view = CanonicalView::from_iter(intent_view_iter).expect("infallible"); - core::mem::swap(&mut self.intent_view, &mut temp_intent_view); - temp_intent_view - .txs - .into_keys() - .filter(|old_txid| !self.intent_view.txs.contains_key(old_txid)) + let chain_tip = chain.tip().block_id(); + let graph = self.indexed_graph.graph(); + Self::_rebuild_view(view, params, chain, chain_tip, graph) } - fn update_views(&mut self) { - self.update_network_view(); - self.update_intent_view(); + /// Recompute the given `view` and return txids that are evicted from the old view. + fn _rebuild_view<'v>( + view: &'v mut CanonicalView, + params: CanonicalizationParams, + chain: &'v LocalChain, + chain_tip: BlockId, + graph: &'v TxGraph, + ) -> impl Iterator + 'v { + let view_iter = graph.canonical_iter(chain, chain_tip, params); + let mut temp_view = CanonicalView::from_iter(view_iter).expect("infallible"); + core::mem::swap(view, &mut temp_view); + temp_view + .txs + .into_keys() + .filter(|old_txid| !view.txs.contains_key(old_txid)) } - /// Stage changes and return evicted txids from intent canonical view. - /// - /// TODO: Do we also need to return evicted txids from network canonical view. + /// Stage changes, rebuild views and return evicted txids from intent canonical view. pub(crate) fn stage_changes>(&mut self, changeset: C) -> Vec { let changeset: ChangeSet = changeset.into(); // TODO: Skip rebuilding views for certain types of changes. - self.update_network_view(); - let evicted_txids = self.update_intent_view().collect::>(); - for &evicted_txid in &evicted_txids { + // TODO: Figure out what type of notifications users need. + // TODO: * I.e. if a tracked tx gets evicted from the canonical network view. + let _txids_evicted_from_network = self.rebuild_network_view(); + drop(_txids_evicted_from_network); + let txids_evicted_from_intent = self.rebuild_intent_view().collect::>(); + for &evicted_txid in &txids_evicted_from_intent { self.stage .merge(self.intent_tracker.remove(evicted_txid).into()); } self.stage.merge(changeset); - evicted_txids + txids_evicted_from_intent } - /// [`CanonicalizationParams`] which includes transactions in the broadcast queue. + /// [`CanonicalizationParams`] which prioritize tracked transactions. pub(crate) fn intent_view_canonicalization_params(&self) -> CanonicalizationParams { CanonicalizationParams { assume_canonical: self.intent_tracker.txids().collect(), @@ -2736,109 +2745,18 @@ impl Wallet { .iter() .filter_map(|txin| { let op = txin.previous_output; - - let mut spend = UncanonicalSpendInfo::::default(); - - let mut visited = HashSet::::new(); - let mut stack = Vec::<(OutPoint, u32)>::new(); // (prev-outpoint, distance) - stack.push((op, 0)); - - while let Some((prevout, distance)) = stack.pop() { - if !visited.insert(prevout) { - continue; - } - if self.network_view.txs.contains_key(&prevout.txid) { - // This tx is canonical. - continue; - } - - let prev_tx_node = match graph.get_tx_node(prevout.txid) { - Some(prev_tx) => prev_tx, - None => continue, - }; - - let uncanonical_ancestor_entry = - match spend.uncanonical_ancestors.entry(prevout.txid) { - Entry::Vacant(entry) => entry, - // Uncanonical tx already visited. - Entry::Occupied(_) => continue, - }; - uncanonical_ancestor_entry.insert( - if !prev_tx_node.anchors.is_empty() - || prev_tx_node.last_seen.is_some() - { - NetworkSeen::Seen - } else { - NetworkSeen::NeverSeen - }, - ); - - if let Some((conflict_txid, _, reason)) = - self.network_view.spend(prevout) - { - if let Entry::Vacant(entry) = - spend.conflicting_txs.entry(conflict_txid) - { - let conflict_tx_node = match graph.get_tx_node(conflict_txid) { - Some(tx_node) => tx_node, - None => continue, - }; - let maybe_direct_anchor = conflict_tx_node - .anchors - .iter() - .find(|a| { - self.chain - .is_block_in_chain(a.block_id, tip.block_id()) - .expect("infallible") - .unwrap_or(false) - }) - .cloned(); - let conflict_pos = match maybe_direct_anchor { - Some(anchor) => ChainPosition::Confirmed { - anchor, - transitively: None, - }, - None => match reason.clone() { - CanonicalReason::Assumed { .. } => { - debug_assert!(false, "network view must not have any assumed-canonical txs"); - ChainPosition::Unconfirmed { - first_seen: None, - last_seen: None, - } - } - CanonicalReason::Anchor { anchor, descendant } => { - ChainPosition::Confirmed { - anchor, - transitively: descendant, - } - } - CanonicalReason::ObservedIn { observed_in, .. } => { - ChainPosition::Unconfirmed { - first_seen: conflict_tx_node.first_seen, - last_seen: match observed_in { - ObservedIn::Block(_) => None, - ObservedIn::Mempool(last_seen) => { - Some(last_seen) - } - }, - } - } - }, - }; - entry.insert((distance, conflict_pos)); - } - } - - stack.extend( - prev_tx_node - .tx - .input - .iter() - .map(|txin| (txin.previous_output, distance + 1)), - ); + let spend_info = SpendInfo::new( + &self.chain, + tip.block_id(), + graph, + &self.network_view, + op, + ); + if spend_info.is_empty() { + None + } else { + Some((op, spend_info)) } - - Some((op, spend)) }) .collect(); @@ -2855,7 +2773,7 @@ impl Wallet { /// /// A self-spend if where all inputs and outputs are owned by this wallet. /// - /// This may return false-negatives if there is missing information from the wallet. + /// This may return false-negatives if there are missing information from the wallet. pub fn is_self_spend(&self, tx: &Transaction) -> bool { let index = &self.indexed_graph.index; for txout in &tx.output { @@ -2991,7 +2909,7 @@ where } fn new_local_utxo( - broadcast_queue: &CanonicalizationTracker, + broadcast_queue: &IntentTracker, keychain: KeychainKind, derivation_index: u32, full_txo: FullTxOut, From 8e1b43c2033eccc78ffc5c05977e143b63aa962a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Sat, 28 Jun 2025 04:13:51 +0000 Subject: [PATCH 5/5] docs: [WIP] Initial work on intent tracker example --- wallet/Cargo.toml | 6 +++ wallet/examples/intent_tracker.rs | 84 +++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 wallet/examples/intent_tracker.rs diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index df0a6228..dca091d6 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -44,6 +44,8 @@ bdk_chain = { version = "0.23.0", features = ["rusqlite"] } bdk_wallet = { path = ".", features = ["rusqlite", "file_store", "test-utils"] } anyhow = "1" rand = "^0.8" +bdk_testenv = "0.13" +bdk_bitcoind_rpc = { version = "0.20" } [package.metadata.docs.rs] all-features = true @@ -58,3 +60,7 @@ required-features = ["all-keys"] name = "miniscriptc" path = "examples/compiler.rs" required-features = ["compiler"] + +[[example]] +name = "intent_tracker" +path = "examples/intent_tracker.rs" diff --git a/wallet/examples/intent_tracker.rs b/wallet/examples/intent_tracker.rs new file mode 100644 index 00000000..f4e6ee5f --- /dev/null +++ b/wallet/examples/intent_tracker.rs @@ -0,0 +1,84 @@ +use std::{str::FromStr, sync::Arc, time::Duration}; + +use anyhow::Context; +use bdk_testenv::{bitcoincore_rpc::RpcApi, TestEnv}; +use bdk_wallet::{KeychainKind, SignOptions, Wallet}; +use bitcoin::{Amount, Network, Txid}; + +const DESCRIPTOR: &str = bdk_testenv::utils::DESCRIPTORS[3]; + +fn main() -> anyhow::Result<()> { + let env = TestEnv::new().context("failed to start testenv")?; + env.mine_blocks(101, None) + .context("failed to mine blocks")?; + + let mut wallet = Wallet::create_single(DESCRIPTOR) + .network(Network::Regtest) + .create_wallet_no_persist() + .context("failed to construct wallet")?; + + let mut emitter = bdk_bitcoind_rpc::Emitter::new( + env.rpc_client(), + wallet.latest_checkpoint(), + 0, + wallet + .transactions() + .filter(|tx| tx.chain_position.is_unconfirmed()) + .map(|tx| tx.tx_node.txid), + ); + while let Some(block_event) = emitter.next_block()? { + wallet.apply_block(&block_event.block, block_event.block_height())?; + } + + // Receive an unconfirmed tx, spend from it, and the unconfirmed tx get's RBF'ed. + // Our API should be able to recognise that the outgoing tx became evicted and allow the caller + // to respond accordingly. + let wallet_addr = wallet.next_unused_address(KeychainKind::External).address; + let remote_addr = env + .rpc_client() + .get_new_address(None, None)? + .assume_checked(); + let incoming_txid = env.send(&wallet_addr, Amount::ONE_BTC)?; + + let mempool_event = emitter.mempool()?; + wallet.apply_evicted_txs(mempool_event.evicted_ats()); + wallet.apply_unconfirmed_txs(mempool_event.new_txs); + assert_eq!(wallet.balance().total(), Amount::ONE_BTC); + + // Create & broadcast outgoing tx. + let mut tx_builder = wallet.build_tx(); + tx_builder.add_recipient(remote_addr, Amount::ONE_BTC / 2); + let mut psbt = tx_builder.finish()?; + assert!(wallet.sign(&mut psbt, SignOptions::default())?); + let outgoing_tx = psbt.extract_tx()?; + wallet.track_tx(outgoing_tx.clone()); + assert_eq!(wallet.uncanonical_txs().count(), 1); + + // Sync. + let outgoing_txid = env.rpc_client().send_raw_transaction(&outgoing_tx)?; + env.wait_until_electrum_sees_txid(outgoing_txid, Duration::from_secs(5))?; + let mempool_event = emitter.mempool()?; + // TODO: Why is `outgoing_txid` not emitted? + println!("mempool_event: {mempool_event:#?}"); + wallet.apply_evicted_txs(mempool_event.evicted_ats()); + wallet.apply_unconfirmed_txs(mempool_event.new_txs); + let tx = wallet + .canonical_txs() + .find(|tx| tx.tx_node.txid == outgoing_txid) + .expect("must find outgoing tx"); + assert_eq!(wallet.uncanonical_txs().count(), 0); + + // RBF incoming tx. + let res = env + .rpc_client() + .call::("bumpfee", &[incoming_txid.to_string().into()])?; + let incoming_replacement_txid = Txid::from_str(res.get("txid").unwrap().as_str().unwrap())?; + + let mempool_event = emitter.mempool()?; + wallet.apply_evicted_txs(mempool_event.evicted_ats()); + wallet.apply_unconfirmed_txs(mempool_event.new_txs); + + for uncanonical_tx in wallet.uncanonical_txs() {} + + Ok(()) +}