Skip to content

Introduce IntentTracker #257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ bdk_chain = { version = "0.23.0", features = ["rusqlite"] }
bdk_wallet = { path = ".", features = ["rusqlite", "file_store", "test-utils"] }
anyhow = "1"
rand = "^0.8"
bdk_testenv = "0.13"
bdk_bitcoind_rpc = { version = "0.20" }

[package.metadata.docs.rs]
all-features = true
Expand All @@ -58,3 +60,7 @@ required-features = ["all-keys"]
name = "miniscriptc"
path = "examples/compiler.rs"
required-features = ["compiler"]

[[example]]
name = "intent_tracker"
path = "examples/intent_tracker.rs"
84 changes: 84 additions & 0 deletions wallet/examples/intent_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::{str::FromStr, sync::Arc, time::Duration};

use anyhow::Context;
use bdk_testenv::{bitcoincore_rpc::RpcApi, TestEnv};
use bdk_wallet::{KeychainKind, SignOptions, Wallet};
use bitcoin::{Amount, Network, Txid};

const DESCRIPTOR: &str = bdk_testenv::utils::DESCRIPTORS[3];

fn main() -> anyhow::Result<()> {
let env = TestEnv::new().context("failed to start testenv")?;
env.mine_blocks(101, None)
.context("failed to mine blocks")?;

let mut wallet = Wallet::create_single(DESCRIPTOR)
.network(Network::Regtest)
.create_wallet_no_persist()
.context("failed to construct wallet")?;

let mut emitter = bdk_bitcoind_rpc::Emitter::new(
env.rpc_client(),
wallet.latest_checkpoint(),
0,
wallet
.transactions()
.filter(|tx| tx.chain_position.is_unconfirmed())
.map(|tx| tx.tx_node.txid),
);
while let Some(block_event) = emitter.next_block()? {
wallet.apply_block(&block_event.block, block_event.block_height())?;
}

// Receive an unconfirmed tx, spend from it, and the unconfirmed tx get's RBF'ed.
// Our API should be able to recognise that the outgoing tx became evicted and allow the caller
// to respond accordingly.
let wallet_addr = wallet.next_unused_address(KeychainKind::External).address;
let remote_addr = env
.rpc_client()
.get_new_address(None, None)?
.assume_checked();
let incoming_txid = env.send(&wallet_addr, Amount::ONE_BTC)?;

let mempool_event = emitter.mempool()?;
wallet.apply_evicted_txs(mempool_event.evicted_ats());
wallet.apply_unconfirmed_txs(mempool_event.new_txs);
assert_eq!(wallet.balance().total(), Amount::ONE_BTC);

// Create & broadcast outgoing tx.
let mut tx_builder = wallet.build_tx();
tx_builder.add_recipient(remote_addr, Amount::ONE_BTC / 2);
let mut psbt = tx_builder.finish()?;
assert!(wallet.sign(&mut psbt, SignOptions::default())?);
let outgoing_tx = psbt.extract_tx()?;
wallet.track_tx(outgoing_tx.clone());
assert_eq!(wallet.uncanonical_txs().count(), 1);

// Sync.
let outgoing_txid = env.rpc_client().send_raw_transaction(&outgoing_tx)?;
env.wait_until_electrum_sees_txid(outgoing_txid, Duration::from_secs(5))?;
let mempool_event = emitter.mempool()?;
// TODO: Why is `outgoing_txid` not emitted?
println!("mempool_event: {mempool_event:#?}");
wallet.apply_evicted_txs(mempool_event.evicted_ats());
wallet.apply_unconfirmed_txs(mempool_event.new_txs);
let tx = wallet
.canonical_txs()
.find(|tx| tx.tx_node.txid == outgoing_txid)
.expect("must find outgoing tx");
assert_eq!(wallet.uncanonical_txs().count(), 0);

// RBF incoming tx.
let res = env
.rpc_client()
.call::<serde_json::Value>("bumpfee", &[incoming_txid.to_string().into()])?;
let incoming_replacement_txid = Txid::from_str(res.get("txid").unwrap().as_str().unwrap())?;

let mempool_event = emitter.mempool()?;
wallet.apply_evicted_txs(mempool_event.evicted_ats());
wallet.apply_unconfirmed_txs(mempool_event.new_txs);

for uncanonical_tx in wallet.uncanonical_txs() {}

Ok(())
}
2 changes: 2 additions & 0 deletions wallet/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ pub struct LocalOutput {
pub derivation_index: u32,
/// The position of the output in the blockchain.
pub chain_position: ChainPosition<ConfirmationBlockTime>,
/// Whether this output exists in a transaction that is yet to be broadcasted.
pub needs_broadcast: bool,
Copy link
Contributor

@nymius nymius Jun 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an enum state field with something like: UNSPENT, ON_QUEUE, SPENT, BROADCASTED will avoid keep adding new boolean fields here, and provide a better path for update on future occasions, taking advantage of non exhaustive patters. is_spent could be marked for deprecation and be used along the new field in the meantime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea if done as a non-exhaustive enum to help reduce future API breaking changes. If we include a "LOCKED" variant could this also support #259?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we expect this to be set/unset exactly? I guess it can only be unset once the transaction in question reaches threshold confirmations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point, and it shows the limitations of the BroadcastQueue concept. In fact, I did some further thinking on this and the BroadcastQueue should really be an IntentTracker and should track txs even if they are "network canonical".

There should be a method such as .tracked_txs_which_are_not_network_canonical (better name needed) so that the caller can decide to either replace the tx, or explicitly forget about it. There are caveats to doing both since we don't want to create a sub-graph where intended payments are duplicated - BDK should handle these situations properly, or provide the required information so that the caller can make a safe decision.

}

/// A [`Utxo`] with its `satisfaction_weight`.
Expand Down
64 changes: 63 additions & 1 deletion wallet/src/wallet/changeset.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use bdk_chain::{
indexed_tx_graph, keychain_txout, local_chain, tx_graph, ConfirmationBlockTime, Merge,
};
use bitcoin::Txid;
use miniscript::{Descriptor, DescriptorPublicKey};
use serde::{Deserialize, Serialize};

use super::intent_tracker;

type IndexedTxGraphChangeSet =
indexed_tx_graph::ChangeSet<ConfirmationBlockTime, keychain_txout::ChangeSet>;

Expand Down Expand Up @@ -114,6 +117,9 @@ pub struct ChangeSet {
pub tx_graph: tx_graph::ChangeSet<ConfirmationBlockTime>,
/// Changes to [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex).
pub indexer: keychain_txout::ChangeSet,
/// Changes to [`IntentTracker`](intent_tracker::IntentTracker).
#[serde(default)]
pub intent_tracker: intent_tracker::ChangeSet,
}

impl Merge for ChangeSet {
Expand Down Expand Up @@ -145,6 +151,7 @@ impl Merge for ChangeSet {
Merge::merge(&mut self.local_chain, other.local_chain);
Merge::merge(&mut self.tx_graph, other.tx_graph);
Merge::merge(&mut self.indexer, other.indexer);
Merge::merge(&mut self.intent_tracker, other.intent_tracker);
}

fn is_empty(&self) -> bool {
Expand All @@ -154,6 +161,7 @@ impl Merge for ChangeSet {
&& self.local_chain.is_empty()
&& self.tx_graph.is_empty()
&& self.indexer.is_empty()
&& self.intent_tracker.is_empty()
}
}

Expand All @@ -163,6 +171,8 @@ impl ChangeSet {
pub const WALLET_SCHEMA_NAME: &'static str = "bdk_wallet";
/// Name of table to store wallet descriptors and network.
pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet";
/// Name of table to store broadcast queue txids.
pub const WALLET_BROADCAST_QUEUE_TABLE_NAME: &'static str = "bdk_wallet_broadcast_queue";

/// Get v0 sqlite [ChangeSet] schema
pub fn schema_v0() -> alloc::string::String {
Expand All @@ -177,12 +187,23 @@ impl ChangeSet {
)
}

/// Get v1 sqlite [`ChangeSet`] schema.
pub fn schema_v1() -> alloc::string::String {
format!(
"CREATE TABLE {} ( \
id INTEGER PRIMARY KEY AUTOINCREMENT,
txid TEXT NOT NULL UNIQUE \
) STRICT;",
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME,
)
}

/// Initialize sqlite tables for wallet tables.
pub fn init_sqlite_tables(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result<()> {
crate::rusqlite_impl::migrate_schema(
db_tx,
Self::WALLET_SCHEMA_NAME,
&[&Self::schema_v0()],
&[&Self::schema_v0(), &Self::schema_v1()],
)?;

bdk_chain::local_chain::ChangeSet::init_sqlite_tables(db_tx)?;
Expand Down Expand Up @@ -220,6 +241,19 @@ impl ChangeSet {
changeset.network = network.map(Impl::into_inner);
}

let mut queue_statement = db_tx.prepare(&format!(
"SELECT txid FROM {} ORDER BY id ASC",
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
))?;
let row_iter = queue_statement.query_map([], |row| row.get::<_, Impl<Txid>>("txid"))?;
for row in row_iter {
let Impl(txid) = row?;
changeset
.intent_tracker
.mutations
.push(intent_tracker::Mutation::Push(txid));
}

changeset.local_chain = local_chain::ChangeSet::from_sqlite(db_tx)?;
changeset.tx_graph = tx_graph::ChangeSet::<_>::from_sqlite(db_tx)?;
changeset.indexer = keychain_txout::ChangeSet::from_sqlite(db_tx)?;
Expand Down Expand Up @@ -268,6 +302,25 @@ impl ChangeSet {
})?;
}

let mut queue_insert_statement = db_tx.prepare_cached(&format!(
"INSERT OR IGNORE INTO {}(txid) VALUES(:txid)",
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
))?;
let mut queue_remove_statement = db_tx.prepare_cached(&format!(
"DELETE FROM {} WHERE txid=:txid",
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
))?;
for mutation in &self.intent_tracker.mutations {
match mutation {
intent_tracker::Mutation::Push(txid) => {
queue_insert_statement.execute(named_params! { ":txid": Impl(*txid) })?;
}
intent_tracker::Mutation::Remove(txid) => {
queue_remove_statement.execute(named_params! { ":txid": Impl(*txid) })?;
}
}
}

self.local_chain.persist_to_sqlite(db_tx)?;
self.tx_graph.persist_to_sqlite(db_tx)?;
self.indexer.persist_to_sqlite(db_tx)?;
Expand Down Expand Up @@ -311,3 +364,12 @@ impl From<keychain_txout::ChangeSet> for ChangeSet {
}
}
}

impl From<intent_tracker::ChangeSet> for ChangeSet {
fn from(broadcast_queue: intent_tracker::ChangeSet) -> Self {
Self {
intent_tracker: broadcast_queue,
..Default::default()
}
}
}
3 changes: 3 additions & 0 deletions wallet/src/wallet/coin_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ mod test {
is_spent: false,
derivation_index: 42,
chain_position,
needs_broadcast: false,
}),
}
}
Expand Down Expand Up @@ -856,6 +857,7 @@ mod test {
last_seen: Some(1),
}
},
needs_broadcast: false,
}),
});
}
Expand Down Expand Up @@ -883,6 +885,7 @@ mod test {
first_seen: Some(1),
last_seen: Some(1),
},
needs_broadcast: false,
}),
})
.collect()
Expand Down
Loading
Loading