diff --git a/wallet/Cargo.toml b/wallet/Cargo.toml index df0a6228..dca091d6 100644 --- a/wallet/Cargo.toml +++ b/wallet/Cargo.toml @@ -44,6 +44,8 @@ bdk_chain = { version = "0.23.0", features = ["rusqlite"] } bdk_wallet = { path = ".", features = ["rusqlite", "file_store", "test-utils"] } anyhow = "1" rand = "^0.8" +bdk_testenv = "0.13" +bdk_bitcoind_rpc = { version = "0.20" } [package.metadata.docs.rs] all-features = true @@ -58,3 +60,7 @@ required-features = ["all-keys"] name = "miniscriptc" path = "examples/compiler.rs" required-features = ["compiler"] + +[[example]] +name = "intent_tracker" +path = "examples/intent_tracker.rs" diff --git a/wallet/examples/intent_tracker.rs b/wallet/examples/intent_tracker.rs new file mode 100644 index 00000000..f4e6ee5f --- /dev/null +++ b/wallet/examples/intent_tracker.rs @@ -0,0 +1,84 @@ +use std::{str::FromStr, sync::Arc, time::Duration}; + +use anyhow::Context; +use bdk_testenv::{bitcoincore_rpc::RpcApi, TestEnv}; +use bdk_wallet::{KeychainKind, SignOptions, Wallet}; +use bitcoin::{Amount, Network, Txid}; + +const DESCRIPTOR: &str = bdk_testenv::utils::DESCRIPTORS[3]; + +fn main() -> anyhow::Result<()> { + let env = TestEnv::new().context("failed to start testenv")?; + env.mine_blocks(101, None) + .context("failed to mine blocks")?; + + let mut wallet = Wallet::create_single(DESCRIPTOR) + .network(Network::Regtest) + .create_wallet_no_persist() + .context("failed to construct wallet")?; + + let mut emitter = bdk_bitcoind_rpc::Emitter::new( + env.rpc_client(), + wallet.latest_checkpoint(), + 0, + wallet + .transactions() + .filter(|tx| tx.chain_position.is_unconfirmed()) + .map(|tx| tx.tx_node.txid), + ); + while let Some(block_event) = emitter.next_block()? { + wallet.apply_block(&block_event.block, block_event.block_height())?; + } + + // Receive an unconfirmed tx, spend from it, and the unconfirmed tx get's RBF'ed. + // Our API should be able to recognise that the outgoing tx became evicted and allow the caller + // to respond accordingly. + let wallet_addr = wallet.next_unused_address(KeychainKind::External).address; + let remote_addr = env + .rpc_client() + .get_new_address(None, None)? + .assume_checked(); + let incoming_txid = env.send(&wallet_addr, Amount::ONE_BTC)?; + + let mempool_event = emitter.mempool()?; + wallet.apply_evicted_txs(mempool_event.evicted_ats()); + wallet.apply_unconfirmed_txs(mempool_event.new_txs); + assert_eq!(wallet.balance().total(), Amount::ONE_BTC); + + // Create & broadcast outgoing tx. + let mut tx_builder = wallet.build_tx(); + tx_builder.add_recipient(remote_addr, Amount::ONE_BTC / 2); + let mut psbt = tx_builder.finish()?; + assert!(wallet.sign(&mut psbt, SignOptions::default())?); + let outgoing_tx = psbt.extract_tx()?; + wallet.track_tx(outgoing_tx.clone()); + assert_eq!(wallet.uncanonical_txs().count(), 1); + + // Sync. + let outgoing_txid = env.rpc_client().send_raw_transaction(&outgoing_tx)?; + env.wait_until_electrum_sees_txid(outgoing_txid, Duration::from_secs(5))?; + let mempool_event = emitter.mempool()?; + // TODO: Why is `outgoing_txid` not emitted? + println!("mempool_event: {mempool_event:#?}"); + wallet.apply_evicted_txs(mempool_event.evicted_ats()); + wallet.apply_unconfirmed_txs(mempool_event.new_txs); + let tx = wallet + .canonical_txs() + .find(|tx| tx.tx_node.txid == outgoing_txid) + .expect("must find outgoing tx"); + assert_eq!(wallet.uncanonical_txs().count(), 0); + + // RBF incoming tx. + let res = env + .rpc_client() + .call::("bumpfee", &[incoming_txid.to_string().into()])?; + let incoming_replacement_txid = Txid::from_str(res.get("txid").unwrap().as_str().unwrap())?; + + let mempool_event = emitter.mempool()?; + wallet.apply_evicted_txs(mempool_event.evicted_ats()); + wallet.apply_unconfirmed_txs(mempool_event.new_txs); + + for uncanonical_tx in wallet.uncanonical_txs() {} + + Ok(()) +} diff --git a/wallet/src/types.rs b/wallet/src/types.rs index c15be6c5..f0cb85bf 100644 --- a/wallet/src/types.rs +++ b/wallet/src/types.rs @@ -73,6 +73,8 @@ pub struct LocalOutput { pub derivation_index: u32, /// The position of the output in the blockchain. pub chain_position: ChainPosition, + /// Whether this output exists in a transaction that is yet to be broadcasted. + pub needs_broadcast: bool, } /// A [`Utxo`] with its `satisfaction_weight`. diff --git a/wallet/src/wallet/changeset.rs b/wallet/src/wallet/changeset.rs index ebfdb9fb..857fc4ce 100644 --- a/wallet/src/wallet/changeset.rs +++ b/wallet/src/wallet/changeset.rs @@ -1,9 +1,12 @@ use bdk_chain::{ indexed_tx_graph, keychain_txout, local_chain, tx_graph, ConfirmationBlockTime, Merge, }; +use bitcoin::Txid; use miniscript::{Descriptor, DescriptorPublicKey}; use serde::{Deserialize, Serialize}; +use super::intent_tracker; + type IndexedTxGraphChangeSet = indexed_tx_graph::ChangeSet; @@ -114,6 +117,9 @@ pub struct ChangeSet { pub tx_graph: tx_graph::ChangeSet, /// Changes to [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex). pub indexer: keychain_txout::ChangeSet, + /// Changes to [`IntentTracker`](intent_tracker::IntentTracker). + #[serde(default)] + pub intent_tracker: intent_tracker::ChangeSet, } impl Merge for ChangeSet { @@ -145,6 +151,7 @@ impl Merge for ChangeSet { Merge::merge(&mut self.local_chain, other.local_chain); Merge::merge(&mut self.tx_graph, other.tx_graph); Merge::merge(&mut self.indexer, other.indexer); + Merge::merge(&mut self.intent_tracker, other.intent_tracker); } fn is_empty(&self) -> bool { @@ -154,6 +161,7 @@ impl Merge for ChangeSet { && self.local_chain.is_empty() && self.tx_graph.is_empty() && self.indexer.is_empty() + && self.intent_tracker.is_empty() } } @@ -163,6 +171,8 @@ impl ChangeSet { pub const WALLET_SCHEMA_NAME: &'static str = "bdk_wallet"; /// Name of table to store wallet descriptors and network. pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet"; + /// Name of table to store broadcast queue txids. + pub const WALLET_BROADCAST_QUEUE_TABLE_NAME: &'static str = "bdk_wallet_broadcast_queue"; /// Get v0 sqlite [ChangeSet] schema pub fn schema_v0() -> alloc::string::String { @@ -177,12 +187,23 @@ impl ChangeSet { ) } + /// Get v1 sqlite [`ChangeSet`] schema. + pub fn schema_v1() -> alloc::string::String { + format!( + "CREATE TABLE {} ( \ + id INTEGER PRIMARY KEY AUTOINCREMENT, + txid TEXT NOT NULL UNIQUE \ + ) STRICT;", + Self::WALLET_BROADCAST_QUEUE_TABLE_NAME, + ) + } + /// Initialize sqlite tables for wallet tables. pub fn init_sqlite_tables(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result<()> { crate::rusqlite_impl::migrate_schema( db_tx, Self::WALLET_SCHEMA_NAME, - &[&Self::schema_v0()], + &[&Self::schema_v0(), &Self::schema_v1()], )?; bdk_chain::local_chain::ChangeSet::init_sqlite_tables(db_tx)?; @@ -220,6 +241,19 @@ impl ChangeSet { changeset.network = network.map(Impl::into_inner); } + let mut queue_statement = db_tx.prepare(&format!( + "SELECT txid FROM {} ORDER BY id ASC", + Self::WALLET_BROADCAST_QUEUE_TABLE_NAME + ))?; + let row_iter = queue_statement.query_map([], |row| row.get::<_, Impl>("txid"))?; + for row in row_iter { + let Impl(txid) = row?; + changeset + .intent_tracker + .mutations + .push(intent_tracker::Mutation::Push(txid)); + } + changeset.local_chain = local_chain::ChangeSet::from_sqlite(db_tx)?; changeset.tx_graph = tx_graph::ChangeSet::<_>::from_sqlite(db_tx)?; changeset.indexer = keychain_txout::ChangeSet::from_sqlite(db_tx)?; @@ -268,6 +302,25 @@ impl ChangeSet { })?; } + let mut queue_insert_statement = db_tx.prepare_cached(&format!( + "INSERT OR IGNORE INTO {}(txid) VALUES(:txid)", + Self::WALLET_BROADCAST_QUEUE_TABLE_NAME + ))?; + let mut queue_remove_statement = db_tx.prepare_cached(&format!( + "DELETE FROM {} WHERE txid=:txid", + Self::WALLET_BROADCAST_QUEUE_TABLE_NAME + ))?; + for mutation in &self.intent_tracker.mutations { + match mutation { + intent_tracker::Mutation::Push(txid) => { + queue_insert_statement.execute(named_params! { ":txid": Impl(*txid) })?; + } + intent_tracker::Mutation::Remove(txid) => { + queue_remove_statement.execute(named_params! { ":txid": Impl(*txid) })?; + } + } + } + self.local_chain.persist_to_sqlite(db_tx)?; self.tx_graph.persist_to_sqlite(db_tx)?; self.indexer.persist_to_sqlite(db_tx)?; @@ -311,3 +364,12 @@ impl From for ChangeSet { } } } + +impl From for ChangeSet { + fn from(broadcast_queue: intent_tracker::ChangeSet) -> Self { + Self { + intent_tracker: broadcast_queue, + ..Default::default() + } + } +} diff --git a/wallet/src/wallet/coin_selection.rs b/wallet/src/wallet/coin_selection.rs index 57ed5329..a2fc4fe7 100644 --- a/wallet/src/wallet/coin_selection.rs +++ b/wallet/src/wallet/coin_selection.rs @@ -801,6 +801,7 @@ mod test { is_spent: false, derivation_index: 42, chain_position, + needs_broadcast: false, }), } } @@ -856,6 +857,7 @@ mod test { last_seen: Some(1), } }, + needs_broadcast: false, }), }); } @@ -883,6 +885,7 @@ mod test { first_seen: Some(1), last_seen: Some(1), }, + needs_broadcast: false, }), }) .collect() diff --git a/wallet/src/wallet/intent_tracker.rs b/wallet/src/wallet/intent_tracker.rs new file mode 100644 index 00000000..d18ba791 --- /dev/null +++ b/wallet/src/wallet/intent_tracker.rs @@ -0,0 +1,643 @@ +//! Unbroadcasted transaction queue. + +use core::convert::Infallible; + +use alloc::sync::Arc; + +use alloc::vec::Vec; +use bitcoin::OutPoint; +use bitcoin::Transaction; +use chain::tx_graph; +use chain::tx_graph::TxNode; +use chain::Anchor; +use chain::BlockId; +use chain::CanonicalIter; +use chain::CanonicalReason; +use chain::ChainOracle; +use chain::ChainPosition; +use chain::ObservedIn; +use chain::TxGraph; + +use crate::collections::BTreeMap; +use crate::collections::HashMap; +use crate::collections::HashSet; +use crate::collections::VecDeque; + +use bdk_chain::bdk_core::Merge; +use bitcoin::Txid; + +/// A consistent view of transactions. +#[derive(Debug)] +pub struct CanonicalView { + pub(crate) txs: HashMap, CanonicalReason)>, + pub(crate) spends: HashMap, +} + +impl Default for CanonicalView { + fn default() -> Self { + Self { + txs: HashMap::new(), + spends: HashMap::new(), + } + } +} + +impl CanonicalView { + pub(crate) fn from_iter(iter: CanonicalIter<'_, A, C>) -> Result + where + A: Anchor, + C: ChainOracle, + { + let mut view = Self::default(); + for r in iter { + let (txid, tx, reason) = r?; + for txin in &tx.input { + view.spends.insert(txin.previous_output, txid); + } + view.txs.insert(txid, (tx, reason)); + } + Ok(view) + } + + /// Return the transaction that spends the given `op`. + pub fn spend(&self, op: OutPoint) -> Option<(Txid, Arc, &CanonicalReason)> { + let txid = self.spends.get(&op)?; + let (tx, reason) = self.txs.get(txid)?; + Some((*txid, tx.clone(), reason)) + } + + /// Iterate all descendants of the given transaction in the [`CanonicalView`], avoiding + /// duplicates. + fn descendants( + &self, + tx: impl AsRef, + ) -> impl Iterator, &CanonicalReason)> { + let tx: &Transaction = tx.as_ref(); + let txid = tx.compute_txid(); + + let mut visited = HashSet::::new(); + visited.insert(txid); + + let mut outpoints = core::iter::repeat_n(txid, tx.output.len()) + .zip(0_u32..) + .map(|(txid, vout)| OutPoint::new(txid, vout)) + .collect::>(); + + core::iter::from_fn(move || { + while let Some(op) = outpoints.pop() { + let (txid, tx, reason) = match self.spend(op) { + Some(spent_by) => spent_by, + None => continue, + }; + if !visited.insert(txid) { + continue; + } + outpoints.extend( + core::iter::repeat_n(txid, tx.output.len()) + .zip(0_u32..) + .map(|(txid, vout)| OutPoint::new(txid, vout)), + ); + return Some((txid, tx, reason)); + } + None + }) + } +} + +/// Indicates whether a transaction was observed in the network. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum NetworkSeen { + /// The transaction was previously seen (e.g., in mempool or on-chain). + Seen, + /// The transaction was never seen in the network. + NeverSeen, +} + +impl NetworkSeen { + /// Whether the transaction was once observed in the network. + pub fn was_seen(self) -> bool { + match self { + NetworkSeen::Seen => true, + NetworkSeen::NeverSeen => false, + } + } +} + +/// Represents an input (spend) that depends on non-canonical transaction ancestry. +/// +/// This struct models an input that attempts to spend an output via a transaction path +/// that is not part of the canonical network view (e.g., evicted, conflicted, or unknown). +#[derive(Debug, Clone)] +pub struct SpendInfo { + /// Non-canonical ancestor transactions reachable from this input. + /// + /// Each entry maps an ancestor `Txid` to its observed status in the network. + /// - `Seen` indicates the transaction was previously seen but is no longer part of the + /// canonical view. + /// - `NeverSeen` indicates it was never observed (e.g., not yet broadcast). + pub uncanonical_ancestors: BTreeMap, + + /// Canonical transactions that conflict with this spend. + /// + /// This may be a direct conflict, a conflict with one of the [`uncanonical_ancestors`], or a + /// canonical descendant of a conflict (which are also conflicts). The value is the chain + /// position of the conflict. + /// + /// [`uncanonical_ancestors`]: Self::uncanonical_ancestors + pub conflicting_txs: BTreeMap>, +} + +impl Default for SpendInfo { + fn default() -> Self { + Self { + uncanonical_ancestors: BTreeMap::new(), + conflicting_txs: BTreeMap::new(), + } + } +} + +impl SpendInfo { + pub(crate) fn new( + chain: &C, + chain_tip: BlockId, + tx_graph: &TxGraph, + network_view: &CanonicalView, + op: OutPoint, + ) -> Self + where + C: ChainOracle, + { + use crate::collections::btree_map::Entry; + + let mut spend_info = Self::default(); + + let mut visited = HashSet::::new(); + let mut stack = Vec::::new(); + stack.push(op); + + while let Some(prev_op) = stack.pop() { + if !visited.insert(prev_op) { + // Outpoint already visited. + continue; + } + if network_view.txs.contains_key(&prev_op.txid) { + // Tx is already canonical. + continue; + } + + let prev_tx_node = match tx_graph.get_tx_node(prev_op.txid) { + Some(prev_tx) => prev_tx, + // Tx not known by tx-graph. + None => continue, + }; + + match spend_info.uncanonical_ancestors.entry(prev_op.txid) { + Entry::Vacant(entry) => entry.insert( + if !prev_tx_node.anchors.is_empty() || prev_tx_node.last_seen.is_some() { + NetworkSeen::Seen + } else { + NetworkSeen::NeverSeen + }, + ), + // Tx already visited. + Entry::Occupied(_) => continue, + }; + + // Find conflicts to populate `conflicting_txs`. + if let Some((conflict_txid, conflict_tx, reason)) = network_view.spend(prev_op) { + let conflict_tx_entry = match spend_info.conflicting_txs.entry(conflict_txid) { + Entry::Vacant(vacant_entry) => vacant_entry, + // Skip if conflicting tx already visited. + Entry::Occupied(_) => continue, + }; + let conflict_tx_node = match tx_graph.get_tx_node(conflict_txid) { + Some(tx_node) => tx_node, + // Skip if conflict tx does not exist in our graph. + None => continue, + }; + conflict_tx_entry.insert(Self::get_pos( + chain, + chain_tip, + &conflict_tx_node, + reason, + )); + + // Find descendants of `conflict_tx` too. + for (conflict_txid, _, reason) in network_view.descendants(conflict_tx) { + let conflict_tx_entry = match spend_info.conflicting_txs.entry(conflict_txid) { + Entry::Vacant(vacant_entry) => vacant_entry, + // Skip if conflicting tx already visited. + Entry::Occupied(_) => continue, + }; + let conflict_tx_node = match tx_graph.get_tx_node(conflict_txid) { + Some(tx_node) => tx_node, + // Skip if conflict tx does not exist in our graph. + None => continue, + }; + conflict_tx_entry.insert(Self::get_pos( + chain, + chain_tip, + &conflict_tx_node, + reason, + )); + } + } + + stack.extend( + prev_tx_node + .tx + .input + .iter() + .map(|txin| txin.previous_output), + ); + } + + spend_info + } + + fn get_pos( + chain: &C, + chain_tip: BlockId, + tx_node: &TxNode<'_, Arc, A>, + canonical_reason: &CanonicalReason, + ) -> ChainPosition + where + C: ChainOracle, + { + let maybe_direct_anchor = tx_node + .anchors + .iter() + .find(|a| { + chain + .is_block_in_chain(a.anchor_block(), chain_tip) + .expect("infallible") + .unwrap_or(false) + }) + .cloned(); + match maybe_direct_anchor { + Some(anchor) => ChainPosition::Confirmed { + anchor, + transitively: None, + }, + None => match canonical_reason.clone() { + CanonicalReason::Assumed { .. } => { + debug_assert!( + false, + "network view must not have any assumed-canonical txs" + ); + ChainPosition::Unconfirmed { + first_seen: None, + last_seen: None, + } + } + CanonicalReason::Anchor { anchor, descendant } => ChainPosition::Confirmed { + anchor, + transitively: descendant, + }, + CanonicalReason::ObservedIn { observed_in, .. } => ChainPosition::Unconfirmed { + first_seen: tx_node.first_seen, + last_seen: match observed_in { + ObservedIn::Block(_) => None, + ObservedIn::Mempool(last_seen) => Some(last_seen), + }, + }, + }, + } + } + + /// If the spend info is empty, then it can belong in the canonical history without displacing + /// existing transactions or need to add additional transactions other than itself. + pub fn is_empty(&self) -> bool { + self.uncanonical_ancestors.is_empty() && self.conflicting_txs.is_empty() + } +} + +/// Tracked and uncanonical transaction. +#[derive(Debug, Clone)] +pub struct UncanonicalTx { + /// Txid. + pub txid: Txid, + /// The uncanonical transaction. + pub tx: Arc, + /// Whether the transaction was one seen by the network. + pub network_seen: NetworkSeen, + /// Spends, identified by prevout, which are uncanonical. + pub uncanonical_spends: BTreeMap>, +} + +impl UncanonicalTx { + /// Whether the transaction was once observed in the network. + /// + /// Assuming that the chain-source does not lie, we can safely remove transactions that + pub fn was_seen(&self) -> bool { + self.network_seen.was_seen() + } + + /// A transaction is safe to untrack if it is network uncanonical and we can gurarantee that + /// it will not become canonical again given that there is no reorg of depth greater than + /// `assume_final_depth`. + /// + /// `assume_final_depth` of `0` means that unconfirmed (mempool) transactions are assumed to be + /// final. + /// + /// This may return false-negatives if the wallet is unaware of conflicts. I.e. if purely + /// syncing with Electrum (TODO: @evanlinjin Expand on this). + pub fn is_safe_to_untrack(&self, tip_height: u32, assume_final_depth: u32) -> bool { + self.conflicts().any(|(_, pos)| { + let depth = match pos { + ChainPosition::Confirmed { anchor, .. } => { + tip_height.saturating_sub(anchor.confirmation_height_upper_bound()) + } + ChainPosition::Unconfirmed { .. } => 0, + }; + depth >= assume_final_depth + }) + } + + /// Iterate over transactions that are currently canonical in the network, but would be rendered + /// uncanonical if this transaction were to become canonical. + /// + /// This includes both direct and indirect conflicts, such as any transaction that relies on + /// conflicting ancestry. + pub fn conflicts(&self) -> impl Iterator)> { + self.uncanonical_spends + .values() + .flat_map(|spend| &spend.conflicting_txs) + .map(|(&txid, pos)| (txid, pos)) + .filter({ + let mut dedup = HashSet::::new(); + move |(txid, _)| dedup.insert(*txid) + }) + } + + /// Iterate over confirmed, network-canonical txids which conflict with this transaction. + pub fn confirmed_conflicts(&self) -> impl Iterator { + self.conflicts().filter_map(|(txid, pos)| match pos { + ChainPosition::Confirmed { anchor, .. } => Some((txid, anchor)), + ChainPosition::Unconfirmed { .. } => None, + }) + } + + /// Iterate over unconfirmed, network-canonical txids which conflict with this transaction. + pub fn unconfirmed_conflicts(&self) -> impl Iterator + '_ { + self.conflicts().filter_map(|(txid, pos)| match pos { + ChainPosition::Confirmed { .. } => None, + ChainPosition::Unconfirmed { .. } => Some(txid), + }) + } + + /// Missing ancestors. + /// + /// Either evicted from mempool, or never successfully broadcast in the first place. + pub fn missing_parents(&self) -> impl Iterator + '_ { + self.uncanonical_spends + .values() + .flat_map(|spend_info| &spend_info.uncanonical_ancestors) + .map(|(&txid, &network_seen)| (txid, network_seen)) + } + + /// Whether this transaction conflicts with network-canonical transactions. + pub fn contains_conflicts(&self) -> bool { + self.conflicts().next().is_some() + } + + /// Whether this transaction conflicts with confirmed, network-canonical transactions. + pub fn contains_confirmed_conflicts(&self) -> bool { + self.confirmed_conflicts().next().is_some() + } +} + +/// An ordered tracking area for uncanonical transactions. +#[derive(Debug, Clone, Default)] +pub struct IntentTracker { + /// Tracks the order that transactions are added. + order: VecDeque, + + /// Enforces that we do not have duplicates in `queue`. + dedup: HashSet, +} + +/// Represents a single mutation to [`BroadcastQueue`]. +#[derive(Debug, Clone, PartialEq, serde::Deserialize, serde::Serialize)] +pub enum Mutation { + /// A push to the back of the queue. + Push(Txid), + /// A removal from the queue. + Remove(Txid), +} + +/// A list of mutations made to [`BroadcastQueue`]. +#[must_use] +#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize, serde::Serialize)] +pub struct ChangeSet { + /// Mutations. + pub mutations: Vec, +} + +impl Merge for ChangeSet { + fn merge(&mut self, other: Self) { + self.mutations.extend(other.mutations); + } + + fn is_empty(&self) -> bool { + self.mutations.is_empty() + } +} + +impl IntentTracker { + /// Construct [`Unbroadcasted`] from the given `changeset`. + pub fn from_changeset(changeset: ChangeSet) -> Self { + let mut out = IntentTracker::default(); + out.apply_changeset(changeset); + out + } + + /// Apply the given `changeset`. + pub fn apply_changeset(&mut self, changeset: ChangeSet) { + for mutation in changeset.mutations { + match mutation { + Mutation::Push(txid) => self._push(txid), + Mutation::Remove(txid) => self._remove(txid), + }; + } + } + + /// Whether the `txid` exists in the queue. + pub fn contains(&self, txid: Txid) -> bool { + self.dedup.contains(&txid) + } + + /// Push a `txid` to the queue if it does not already exist. + /// + /// # Warning + /// + /// This does not get rid of conflicting transactions already in the queue. + pub fn push(&mut self, txid: Txid) -> ChangeSet { + let mut changeset = ChangeSet::default(); + if self._push(txid) { + changeset.mutations.push(Mutation::Push(txid)); + } + changeset + } + fn _push(&mut self, txid: Txid) -> bool { + if self.dedup.insert(txid) { + self.order.push_back(txid); + return true; + } + false + } + + /// Push a `txid` to the broadcast queue (if it does not exist already) and displaces all + /// coflicting txids in the queue. + pub fn push_and_displace_conflicts(&mut self, tx_graph: &TxGraph, txid: Txid) -> ChangeSet + where + A: Anchor, + { + let mut changeset = ChangeSet::default(); + + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => { + debug_assert!( + !self.dedup.contains(&txid), + "Cannot have txid in queue which has no corresponding tx in graph" + ); + return changeset; + } + }; + + if self._push(txid) { + changeset.mutations.push(Mutation::Push(txid)); + + for txid in tx_graph.walk_conflicts(&tx, |_, conflict_txid| Some(conflict_txid)) { + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + } + } + + changeset + } + + /// Returns the next `txid` of the queue to broadcast which has no dependencies to other + /// transactions in the queue. + pub fn next_to_broadcast(&self, tx_graph: &TxGraph) -> Option + where + A: Anchor, + { + self.order.iter().copied().find(|&txid| { + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => return false, + }; + if tx + .input + .iter() + .any(|txin| self.dedup.contains(&txin.previous_output.txid)) + { + return false; + } + true + }) + } + + /// Returns unbroadcasted dependencies of the given `txid`. + /// + /// The returned `Vec` is in broadcast order. + pub fn unbroadcasted_dependencies(&self, tx_graph: &TxGraph, txid: Txid) -> Vec + where + A: Anchor, + { + let tx = match tx_graph.get_tx(txid) { + Some(tx) => tx, + None => return Vec::new(), + }; + let mut txs = tx_graph + .walk_ancestors(tx, |_depth, ancestor_tx| { + let ancestor_txid = ancestor_tx.compute_txid(); + if self.dedup.contains(&ancestor_txid) { + Some(ancestor_txid) + } else { + None + } + }) + .collect::>(); + txs.reverse(); + txs + } + + /// Untracks and removes a transaction from the broadcast queue. + /// + /// Transactions are automatically removed from the queue upon successful broadcast, so calling + /// this method directly is typically not required. + pub fn remove(&mut self, txid: Txid) -> ChangeSet { + let mut changeset = ChangeSet::default(); + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + changeset + } + fn _remove(&mut self, txid: Txid) -> bool { + if self.dedup.remove(&txid) { + let i = (0..self.order.len()) + .zip(self.order.iter().copied()) + .find_map(|(i, queue_txid)| if queue_txid == txid { Some(i) } else { None }) + .expect("must exist in queue to exist in `queue`"); + let _removed = self.order.remove(i); + debug_assert_eq!(_removed, Some(txid)); + return true; + } + false + } + + /// Untracks and removes a transaction and it's descendants from the broadcast queue. + pub fn remove_and_displace_dependants( + &mut self, + tx_graph: &TxGraph, + txid: Txid, + ) -> ChangeSet + where + A: Anchor, + { + let mut changeset = ChangeSet::default(); + + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + for txid in tx_graph.walk_descendants(txid, |_depth, txid| Some(txid)) { + if self._remove(txid) { + changeset.mutations.push(Mutation::Remove(txid)); + } + } + } + changeset + } + + /// Untrack transactions that are given anchors and/or mempool timestamps. + pub fn filter_from_graph_changeset( + &mut self, + graph_changeset: &tx_graph::ChangeSet, + ) -> ChangeSet { + let mut changeset = ChangeSet::default(); + let s_txids = graph_changeset.last_seen.keys().copied(); + let a_txids = graph_changeset.anchors.iter().map(|(_, txid)| *txid); + let e_txids = graph_changeset.last_evicted.keys().copied(); + for txid in s_txids.chain(a_txids).chain(e_txids) { + changeset.merge(self.remove(txid)); + } + changeset + } + + /// Txids ordered by precedence. + /// + /// Transactions with greater precedence will appear later in this list. + pub fn txids(&self) -> impl ExactSizeIterator + '_ { + self.order.iter().copied() + } + + /// Initial changeset. + pub fn initial_changeset(&self) -> ChangeSet { + ChangeSet { + mutations: self.order.iter().copied().map(Mutation::Push).collect(), + } + } +} diff --git a/wallet/src/wallet/mod.rs b/wallet/src/wallet/mod.rs index be3773d3..e175637a 100644 --- a/wallet/src/wallet/mod.rs +++ b/wallet/src/wallet/mod.rs @@ -19,10 +19,11 @@ use alloc::{ sync::Arc, vec::Vec, }; +use chain::CanonicalReason; use core::{cmp::Ordering, fmt, mem, ops::Deref}; +use intent_tracker::{CanonicalView, NetworkSeen, SpendInfo, UncanonicalTx}; use bdk_chain::{ - indexed_tx_graph, indexer::keychain_txout::KeychainTxOutIndex, local_chain::{ApplyHeaderError, CannotConnectError, CheckPoint, CheckPointIter, LocalChain}, spk_client::{ @@ -53,6 +54,7 @@ mod changeset; pub mod coin_selection; pub mod error; pub mod export; +pub mod intent_tracker; mod params; mod persisted; pub mod signer; @@ -78,6 +80,7 @@ use crate::wallet::{ // re-exports pub use bdk_chain::Balance; pub use changeset::ChangeSet; +pub use intent_tracker::IntentTracker; pub use params::*; pub use persisted::*; pub use utils::IsDust; @@ -106,6 +109,9 @@ pub struct Wallet { change_signers: Arc, chain: LocalChain, indexed_graph: IndexedTxGraph>, + intent_tracker: IntentTracker, + network_view: CanonicalView, + intent_view: CanonicalView, stage: ChangeSet, network: Network, secp: SecpCtx, @@ -450,24 +456,34 @@ impl Wallet { let indexed_graph = IndexedTxGraph::new(index); let indexed_graph_changeset = indexed_graph.initial_changeset(); + let intent_tracker = IntentTracker::default(); + let intent_tracker_changeset = intent_tracker.initial_changeset(); + let stage = ChangeSet { descriptor, change_descriptor, local_chain: chain_changeset, tx_graph: indexed_graph_changeset.tx_graph, indexer: indexed_graph_changeset.indexer, + intent_tracker: intent_tracker_changeset, network: Some(network), }; - Ok(Wallet { + let mut wallet = Wallet { signers, change_signers, network, chain, indexed_graph, + intent_tracker, stage, secp, - }) + network_view: CanonicalView::default(), + intent_view: CanonicalView::default(), + }; + wallet.rebuild_all_views(); + + Ok(wallet) } /// Build [`Wallet`] by loading from persistence or [`ChangeSet`]. @@ -654,17 +670,25 @@ impl Wallet { indexed_graph.apply_changeset(changeset.indexer.into()); indexed_graph.apply_changeset(changeset.tx_graph.into()); + let broadcast_queue = IntentTracker::from_changeset(changeset.intent_tracker); + let stage = ChangeSet::default(); - Ok(Some(Wallet { + let mut wallet = Wallet { signers, change_signers, chain, indexed_graph, + intent_tracker: broadcast_queue, stage, network, secp, - })) + network_view: CanonicalView::default(), + intent_view: CanonicalView::default(), + }; + wallet.rebuild_all_views(); + + Ok(Some(wallet)) } /// Get the Bitcoin network the wallet is using. @@ -733,14 +757,14 @@ impl Wallet { /// ``` pub fn reveal_next_address(&mut self, keychain: KeychainKind) -> AddressInfo { let keychain = self.map_keychain(keychain); - let index = &mut self.indexed_graph.index; - let stage = &mut self.stage; - let ((index, spk), index_changeset) = index + let ((index, spk), index_changeset) = self + .indexed_graph + .index .reveal_next_spk(keychain) .expect("keychain must exist"); - stage.merge(index_changeset.into()); + self.stage_changes(index_changeset); AddressInfo { index, @@ -771,7 +795,7 @@ impl Wallet { .reveal_to_target(keychain, index) .expect("keychain must exist"); - self.stage.merge(index_changeset.into()); + self.stage_changes(index_changeset); spks.into_iter().map(move |(index, spk)| AddressInfo { index, @@ -797,8 +821,7 @@ impl Wallet { .next_unused_spk(keychain) .expect("keychain must exist"); - self.stage - .merge(indexed_tx_graph::ChangeSet::from(index_changeset).into()); + self.stage_changes(index_changeset); AddressInfo { index, @@ -859,17 +882,32 @@ impl Wallet { self.indexed_graph.index.index_of_spk(spk).cloned() } - /// Return the list of unspent outputs of this wallet + /// Return the list of unspent outputs of this wallet. pub fn list_unspent(&self) -> impl Iterator + '_ { + self._list_unspent(CanonicalizationParams::default()) + } + + /// List unspent outputs (UTXOs) of this wallet assuming that intended transactions are + /// canonical. + /// + /// TODO: Better docs here. + pub fn list_intended_unspent(&self) -> impl Iterator + '_ { + self._list_unspent(self.intent_view_canonicalization_params()) + } + + fn _list_unspent( + &self, + params: CanonicalizationParams, + ) -> impl Iterator + '_ { self.indexed_graph .graph() .filter_chain_unspents( &self.chain, self.chain.tip().block_id(), - CanonicalizationParams::default(), + params, self.indexed_graph.index.outpoints().iter().cloned(), ) - .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo)) + .map(|((k, i), full_txo)| new_local_utxo(&self.intent_tracker, k, i, full_txo)) } /// Get the [`TxDetails`] of a wallet transaction. @@ -911,7 +949,7 @@ impl Wallet { CanonicalizationParams::default(), self.indexed_graph.index.outpoints().iter().cloned(), ) - .map(|((k, i), full_txo)| new_local_utxo(k, i, full_txo)) + .map(|((k, i), full_txo)| new_local_utxo(&self.intent_tracker, k, i, full_txo)) } /// Get all the checkpoints the wallet is currently storing indexed by height. @@ -955,17 +993,20 @@ impl Wallet { /// Returns the utxo owned by this wallet corresponding to `outpoint` if it exists in the /// wallet's database. - pub fn get_utxo(&self, op: OutPoint) -> Option { + pub fn get_utxo(&self, outpoint: OutPoint) -> Option { + self._get_utxo(CanonicalizationParams::default(), outpoint) + } + fn _get_utxo(&self, params: CanonicalizationParams, op: OutPoint) -> Option { let ((keychain, index), _) = self.indexed_graph.index.txout(op)?; self.indexed_graph .graph() .filter_chain_unspents( &self.chain, self.chain.tip().block_id(), - CanonicalizationParams::default(), + params, core::iter::once(((), op)), ) - .map(|(_, full_txo)| new_local_utxo(keychain, index, full_txo)) + .map(|(_, full_txo)| new_local_utxo(&self.intent_tracker, keychain, index, full_txo)) .next() } @@ -988,7 +1029,7 @@ impl Wallet { /// [`list_output`]: Self::list_output pub fn insert_txout(&mut self, outpoint: OutPoint, txout: TxOut) { let additions = self.indexed_graph.insert_txout(outpoint, txout); - self.stage.merge(additions.into()); + self.stage_changes(additions); } /// Calculates the fee of a given transaction. Returns [`Amount::ZERO`] if `tx` is a coinbase @@ -1149,9 +1190,13 @@ impl Wallet { /// Iterate over relevant and canonical transactions in the wallet. /// - /// A transaction is relevant when it spends from or spends to at least one tracked output. A - /// transaction is canonical when it is confirmed in the best chain, or does not conflict - /// with any transaction confirmed in the best chain. + /// A transaction is relevant when it spends from or spends to at least one tracked output. + /// + /// A transaction is canonical when either: + /// * It is confirmed in the best chain and does not conflict with a transaction in the + /// broadcast queue. + /// * It seen in the mempool, is not evicted and does not conflict with a transaction that is + /// confirmed or seen later in the mempool. /// /// To iterate over all transactions, including those that are irrelevant and not canonical, use /// [`TxGraph::full_txs`]. @@ -1159,6 +1204,25 @@ impl Wallet { /// To iterate over all canonical transactions, including those that are irrelevant, use /// [`TxGraph::list_canonical_txs`]. pub fn transactions(&self) -> impl Iterator + '_ { + self.canonical_txs() + } + + /// Iterate over relevant and canonical transactions in the wallet. + /// + /// A transaction is relevant when it spends from or spends to at least one tracked output. + /// + /// A transaction is canonical when either: + /// * It is confirmed in the best chain and does not conflict with a transaction in the + /// broadcast queue. + /// * It seen in the mempool, is not evicted and does not conflict with a transaction that is + /// confirmed or seen later in the mempool. + /// + /// To iterate over all transactions, including those that are irrelevant and not canonical, use + /// [`TxGraph::full_txs`]. + /// + /// To iterate over all canonical transactions, including those that are irrelevant, use + /// [`TxGraph::list_canonical_txs`]. + pub fn canonical_txs(&self) -> impl Iterator + '_ { let tx_graph = self.indexed_graph.graph(); let tx_index = &self.indexed_graph.index; tx_graph @@ -1183,10 +1247,10 @@ impl Wallet { /// # let mut wallet:Wallet = todo!(); /// // Transactions by chain position: first unconfirmed then descending by confirmed height. /// let sorted_txs: Vec = - /// wallet.transactions_sort_by(|tx1, tx2| tx2.chain_position.cmp(&tx1.chain_position)); + /// wallet.canonical_txs_sort_by(|tx1, tx2| tx2.chain_position.cmp(&tx1.chain_position)); /// # Ok::<(), anyhow::Error>(()) /// ``` - pub fn transactions_sort_by(&self, compare: F) -> Vec + pub fn canonical_txs_sort_by(&self, compare: F) -> Vec where F: FnMut(&WalletTx, &WalletTx) -> Ordering, { @@ -1198,10 +1262,19 @@ impl Wallet { /// Return the balance, separated into available, trusted-pending, untrusted-pending and /// immature values. pub fn balance(&self) -> Balance { + self._balance(CanonicalizationParams::default()) + } + + /// Return the balance that includes tracked transactions which are not canonical. + pub fn intended_balance(&self) -> Balance { + self._balance(self.intent_view_canonicalization_params()) + } + + fn _balance(&self, params: CanonicalizationParams) -> Balance { self.indexed_graph.graph().balance( &self.chain, self.chain.tip().block_id(), - CanonicalizationParams::default(), + params, self.indexed_graph.index.outpoints().iter().cloned(), |&(k, _), _| k == KeychainKind::Internal, ) @@ -1624,7 +1697,7 @@ impl Wallet { if let Some((_, index_changeset)) = self.indexed_graph.index.reveal_to_target(keychain, index) { - self.stage.merge(index_changeset.into()); + self.stage_changes(index_changeset); self.mark_used(keychain, index); } } @@ -1681,7 +1754,11 @@ impl Wallet { let txout_index = &self.indexed_graph.index; let chain_tip = self.chain.tip().block_id(); let chain_positions = graph - .list_canonical_txs(&self.chain, chain_tip, CanonicalizationParams::default()) + .list_canonical_txs( + &self.chain, + chain_tip, + self.intent_view_canonicalization_params(), + ) .map(|canon_tx| (canon_tx.tx_node.txid, canon_tx.chain_position)) .collect::>(); @@ -1721,6 +1798,13 @@ impl Wallet { .input .drain(..) .map(|txin| -> Result<_, BuildFeeBumpError> { + // cannot fee-bump if a conflicting transaction is already confirmed + if let Some((spend_txid, _, reason)) = self.network_view.spend(txin.previous_output) + { + if spend_txid != txid && matches!(reason, CanonicalReason::Anchor { .. }) { + return Err(BuildFeeBumpError::TransactionConfirmed(spend_txid)); + } + } graph // Get previous transaction .get_tx(txin.previous_output.txid) @@ -1754,6 +1838,9 @@ impl Wallet { is_spent: true, derivation_index, chain_position, + needs_broadcast: self + .intent_tracker + .contains(txin.previous_output.txid), }), }, ), @@ -1954,7 +2041,11 @@ impl Wallet { let confirmation_heights = self .indexed_graph .graph() - .list_canonical_txs(&self.chain, chain_tip, CanonicalizationParams::default()) + .list_canonical_txs( + &self.chain, + chain_tip, + self.intent_view_canonicalization_params(), + ) .filter(|canon_tx| prev_txids.contains(&canon_tx.tx_node.txid)) // This is for a small performance gain. Although `.filter` filters out excess txs, it // will still consume the internal `CanonicalIter` entirely. Having a `.take` here @@ -2095,6 +2186,19 @@ impl Wallet { /// Given the options returns the list of utxos that must be used to form the /// transaction and any further that may be used if needed. fn filter_utxos(&self, params: &TxParams, current_height: u32) -> Vec { + let uncanonical_txids_to_exclude = self + .uncanonical_txs() + .filter_map(|utx| match params.uncanonical_utxo_policy { + crate::UncanonicalUtxoPolicy::Include if utx.contains_conflicts() => None, + crate::UncanonicalUtxoPolicy::IncludeUnconfirmedConflicts + if utx.contains_confirmed_conflicts() => + { + None + } + _ => Some(utx.txid), + }) + .collect::>(); + if params.manually_selected_only { vec![] // only process optional UTxOs if manually_selected_only is false @@ -2107,14 +2211,18 @@ impl Wallet { .filter_chain_unspents( &self.chain, self.chain.tip().block_id(), - CanonicalizationParams::default(), + self.intent_view_canonicalization_params(), self.indexed_graph.index.outpoints().iter().cloned(), ) + // remove all uncanonical outputs that do not satisfy the uncanonical output policy + .filter(move |(_, full_txo)| { + !uncanonical_txids_to_exclude.contains(&full_txo.outpoint.txid) + }) // only create LocalOutput if UTxO is mature .filter_map(move |((k, i), full_txo)| { full_txo .is_mature(current_height) - .then(|| new_local_utxo(k, i, full_txo)) + .then(|| new_local_utxo(&self.intent_tracker, k, i, full_txo)) }) // only process UTxOs not selected manually, they will be considered later in the // chain NOTE: this avoid UTxOs in both required and optional list @@ -2335,7 +2443,7 @@ impl Wallet { .reveal_to_target_multi(&update.last_active_indices); changeset.merge(index_changeset.into()); changeset.merge(self.indexed_graph.apply_update(update.tx_update).into()); - self.stage.merge(changeset); + self.stage_changes(changeset); Ok(()) } @@ -2431,7 +2539,7 @@ impl Wallet { .apply_block_relevant(block, height) .into(), ); - self.stage.merge(changeset); + self.stage_changes(changeset); Ok(()) } @@ -2454,7 +2562,7 @@ impl Wallet { let indexed_graph_changeset = self .indexed_graph .batch_insert_relevant_unconfirmed(unconfirmed_txs); - self.stage.merge(indexed_graph_changeset.into()); + self.stage_changes(indexed_graph_changeset); } /// Apply evictions of the given txids with their associated timestamps. @@ -2472,7 +2580,7 @@ impl Wallet { .list_canonical_txs( chain, chain.tip().block_id(), - CanonicalizationParams::default(), + self.intent_view_canonicalization_params(), ) .map(|c| c.tx_node.txid) .collect(); @@ -2499,6 +2607,197 @@ impl Wallet { } } +/// Methods to interact with the broadcast queue. +impl Wallet { + fn rebuild_all_views(&mut self) { + let _ = self.rebuild_network_view(); + let _ = self.rebuild_intent_view(); + } + + /// Updates the network canonical view and returns evicted txids from the old view. + fn rebuild_network_view(&mut self) -> impl Iterator + '_ { + let params = CanonicalizationParams::default(); + let view = &mut self.network_view; + let chain = &self.chain; + let chain_tip = chain.tip().block_id(); + let graph = self.indexed_graph.graph(); + Self::_rebuild_view(view, params, chain, chain_tip, graph) + } + + /// Updates the intent canonical view and returns evicted txids from the old view. + fn rebuild_intent_view(&mut self) -> impl Iterator + '_ { + let params = self.intent_view_canonicalization_params(); + let view = &mut self.intent_view; + let chain = &self.chain; + let chain_tip = chain.tip().block_id(); + let graph = self.indexed_graph.graph(); + Self::_rebuild_view(view, params, chain, chain_tip, graph) + } + + /// Recompute the given `view` and return txids that are evicted from the old view. + fn _rebuild_view<'v>( + view: &'v mut CanonicalView, + params: CanonicalizationParams, + chain: &'v LocalChain, + chain_tip: BlockId, + graph: &'v TxGraph, + ) -> impl Iterator + 'v { + let view_iter = graph.canonical_iter(chain, chain_tip, params); + let mut temp_view = CanonicalView::from_iter(view_iter).expect("infallible"); + core::mem::swap(view, &mut temp_view); + temp_view + .txs + .into_keys() + .filter(|old_txid| !view.txs.contains_key(old_txid)) + } + + /// Stage changes, rebuild views and return evicted txids from intent canonical view. + pub(crate) fn stage_changes>(&mut self, changeset: C) -> Vec { + let changeset: ChangeSet = changeset.into(); + + // TODO: Skip rebuilding views for certain types of changes. + // TODO: Figure out what type of notifications users need. + // TODO: * I.e. if a tracked tx gets evicted from the canonical network view. + let _txids_evicted_from_network = self.rebuild_network_view(); + drop(_txids_evicted_from_network); + let txids_evicted_from_intent = self.rebuild_intent_view().collect::>(); + for &evicted_txid in &txids_evicted_from_intent { + self.stage + .merge(self.intent_tracker.remove(evicted_txid).into()); + } + self.stage.merge(changeset); + txids_evicted_from_intent + } + + /// [`CanonicalizationParams`] which prioritize tracked transactions. + pub(crate) fn intent_view_canonicalization_params(&self) -> CanonicalizationParams { + CanonicalizationParams { + assume_canonical: self.intent_tracker.txids().collect(), + } + } + + /// Keep track of a transaction and untrack all conflicts of it (if any). + /// + /// The caller is responsible for replacing, cpfp-ing, or manually abandoning the transaction if + /// it ever becomes network-uncanonical (TODO: Expand on this). + /// + /// Unsigned transactions can be inserted and later reinserted when signed (TODO: Work on this + /// API, maybe don't include this for now?). + pub fn track_tx>>(&mut self, tx: T) -> Vec { + let tx: Arc = tx.into(); + let txid = tx.compute_txid(); + + let mut changeset = ChangeSet::default(); + changeset.merge(self.indexed_graph.insert_tx(tx).into()); + changeset.merge(self.intent_tracker.push(txid).into()); + self.stage_changes(changeset) + } + + /// Untrack transaction so that the wallet will forget about it if it becomes network + /// uncanonical. + /// + /// This will also untrack all descendants of this transaction. + /// + /// Returns all transactions untracked by this operation. + pub fn untrack_tx(&mut self, txid: Txid) -> Vec { + let mut changeset = ChangeSet::default(); + changeset.merge(self.intent_tracker.remove(txid).into()); + self.stage_changes(changeset) + } + + /// List transactions that are tracked but not network-canonical. + /// + /// # Order + /// + /// Transactions are returned in the order that they were tracked with + /// [`track_tx`](Self::track_tx). + /// + /// These are the transactions that commands action! + pub fn uncanonical_txs( + &self, + ) -> impl Iterator> + '_ { + let graph = self.indexed_graph.graph(); + let tip = self.chain.tip(); + self.intent_tracker + .txids() + .filter(|txid| { + debug_assert!( + self.intent_view.txs.contains_key(txid), + "A tracked tx must exist in the `intent_view`", + ); + !self.network_view.txs.contains_key(txid) + }) + .filter_map(|txid| { + let txn_opt = graph.get_tx_node(txid); + debug_assert!(txn_opt.is_some(), "A tracked txid must exist in the graph"); + txn_opt + }) + .filter_map(move |tx_node| { + let txid = tx_node.txid; + let tx = tx_node.tx; + let network_seen = if tx_node.last_seen.is_some() || !tx_node.anchors.is_empty() { + NetworkSeen::Seen + } else { + NetworkSeen::NeverSeen + }; + let uncanonical_spends = tx + .input + .iter() + .filter_map(|txin| { + let op = txin.previous_output; + let spend_info = SpendInfo::new( + &self.chain, + tip.block_id(), + graph, + &self.network_view, + op, + ); + if spend_info.is_empty() { + None + } else { + Some((op, spend_info)) + } + }) + .collect(); + + Some(UncanonicalTx { + txid, + tx, + network_seen, + uncanonical_spends, + }) + }) + } + + /// Whether the transaction is a self-spend. + /// + /// A self-spend if where all inputs and outputs are owned by this wallet. + /// + /// This may return false-negatives if there are missing information from the wallet. + pub fn is_self_spend(&self, tx: &Transaction) -> bool { + let index = &self.indexed_graph.index; + for txout in &tx.output { + if index.index_of_spk(txout.script_pubkey.clone()).is_none() { + return false; + } + } + for txin in &tx.input { + if index.txout(txin.previous_output).is_none() { + return false; + } + } + true + } + + /// Mark a transaction as successfully broadcasted. + pub fn insert_tx_broadcasted_at(&mut self, txid: Txid, at: u64) { + let mut update = TxUpdate::default(); + update.seen_ats.insert((txid, at)); + let changeset = self.indexed_graph.apply_update(update); + self.stage_changes(changeset); + } +} + /// Methods to construct sync/full-scan requests for spk-based chain sources. impl Wallet { /// Create a partial [`SyncRequest`] for all revealed spks at `start_time`. @@ -2610,6 +2909,7 @@ where } fn new_local_utxo( + broadcast_queue: &IntentTracker, keychain: KeychainKind, derivation_index: u32, full_txo: FullTxOut, @@ -2621,6 +2921,7 @@ fn new_local_utxo( chain_position: full_txo.chain_position, keychain, derivation_index, + needs_broadcast: broadcast_queue.contains(full_txo.outpoint.txid), } } diff --git a/wallet/src/wallet/tx_builder.rs b/wallet/src/wallet/tx_builder.rs index 949cb792..bae1da69 100644 --- a/wallet/src/wallet/tx_builder.rs +++ b/wallet/src/wallet/tx_builder.rs @@ -141,6 +141,19 @@ pub(crate) struct TxParams { pub(crate) bumping_fee: Option, pub(crate) current_height: Option, pub(crate) allow_dust: bool, + pub(crate) uncanonical_utxo_policy: UncanonicalUtxoPolicy, +} + +#[derive(Default, Debug, Clone)] +pub(crate) enum UncanonicalUtxoPolicy { + /// Exlude all uncanonical UTXOs. + #[default] + Exclude, + /// Include uncanonical UTXOs which do not conflict with the canonical history. + Include, + /// Include uncanonical UTXOs, including those that conflict with unconfirmed canonical + /// history. + IncludeUnconfirmedConflicts, } #[derive(Clone, Copy, Debug)] @@ -281,6 +294,7 @@ impl<'a, Cs> TxBuilder<'a, Cs> { .iter() .map(|outpoint| { self.wallet + // TODO: We should really pick from the intent canonical view. .get_utxo(*outpoint) .ok_or(AddUtxoError::UnknownUtxo(*outpoint)) .map(|output| { @@ -626,6 +640,19 @@ impl<'a, Cs> TxBuilder<'a, Cs> { self } + /// Include uncanonical UTXOs which do not conflict with the canonical history. + pub fn include_uncanonical(&mut self) -> &mut Self { + self.params.uncanonical_utxo_policy = UncanonicalUtxoPolicy::Include; + self + } + + /// Include uncanonical UTXOs, inclusive of those that conflict with unconfirmed canonical + /// transactions. UTXOs that conflict with confirmed history are still excluded. + pub fn include_uncanonical_conflicts(&mut self) -> &mut Self { + self.params.uncanonical_utxo_policy = UncanonicalUtxoPolicy::IncludeUnconfirmedConflicts; + self + } + /// Sets the address to *drain* excess coins to. /// /// Usually, when there are excess coins they are sent to a change address generated by the @@ -1029,6 +1056,7 @@ mod test { last_seen: Some(1), }, derivation_index: 0, + needs_broadcast: false, }, LocalOutput { outpoint: OutPoint { @@ -1049,6 +1077,7 @@ mod test { transitively: None, }, derivation_index: 1, + needs_broadcast: false, }, ] } diff --git a/wallet/tests/wallet.rs b/wallet/tests/wallet.rs index 8ed28159..83eca351 100644 --- a/wallet/tests/wallet.rs +++ b/wallet/tests/wallet.rs @@ -4605,13 +4605,13 @@ fn single_descriptor_wallet_can_create_tx_and_receive_change() { } #[test] -fn test_transactions_sort_by() { +fn test_canonical_txs_sort_by() { let (mut wallet, _txid) = get_funded_wallet_wpkh(); receive_output(&mut wallet, Amount::from_sat(25_000), ReceiveTo::Mempool(0)); // sort by chain position, unconfirmed then confirmed by descending block height let sorted_txs: Vec = - wallet.transactions_sort_by(|t1, t2| t2.chain_position.cmp(&t1.chain_position)); + wallet.canonical_txs_sort_by(|t1, t2| t2.chain_position.cmp(&t1.chain_position)); let conf_heights: Vec> = sorted_txs .iter() .map(|tx| tx.chain_position.confirmation_height_upper_bound())