Skip to content

Commit a9efbcc

Browse files
committed
feat!: Introduce BroadcastQueue
1 parent ce70bb7 commit a9efbcc

File tree

6 files changed

+528
-31
lines changed

6 files changed

+528
-31
lines changed

wallet/src/types.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ pub struct LocalOutput {
7373
pub derivation_index: u32,
7474
/// The position of the output in the blockchain.
7575
pub chain_position: ChainPosition<ConfirmationBlockTime>,
76+
/// Whether this output exists in a transaction that is yet to be broadcasted.
77+
pub needs_broadcast: bool,
7678
}
7779

7880
/// A [`Utxo`] with its `satisfaction_weight`.

wallet/src/wallet/broadcast_queue.rs

Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
//! Unbroadcasted transaction queue.
2+
3+
use alloc::vec::Vec;
4+
use chain::tx_graph;
5+
use chain::Anchor;
6+
use chain::TxGraph;
7+
8+
use crate::collections::HashSet;
9+
use crate::collections::VecDeque;
10+
11+
use bitcoin::Txid;
12+
use chain::Merge;
13+
14+
/// An ordered unbroadcasted list.
15+
///
16+
/// It is ordered in case of RBF txs.
17+
#[derive(Debug, Clone, Default)]
18+
pub struct BroadcastQueue {
19+
queue: VecDeque<Txid>,
20+
21+
/// Enforces that we do not have duplicates in `queue`.
22+
dedup: HashSet<Txid>,
23+
}
24+
25+
/// Represents a single mutation to [`BroadcastQueue`].
26+
#[derive(Debug, Clone, PartialEq, serde::Deserialize, serde::Serialize)]
27+
pub enum Mutation {
28+
/// A push to the back of the queue.
29+
Push(Txid),
30+
///
31+
Remove(Txid),
32+
}
33+
34+
/// A list of mutations made to [`BroadcastQueue`].
35+
#[must_use]
36+
#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize, serde::Serialize)]
37+
pub struct ChangeSet {
38+
/// Mutations.
39+
pub mutations: Vec<Mutation>,
40+
}
41+
42+
impl Merge for ChangeSet {
43+
fn merge(&mut self, other: Self) {
44+
self.mutations.extend(other.mutations);
45+
}
46+
47+
fn is_empty(&self) -> bool {
48+
self.mutations.is_empty()
49+
}
50+
}
51+
52+
impl BroadcastQueue {
53+
/// Construct [`Unbroadcasted`] from the given `changeset`.
54+
pub fn from_changeset(changeset: ChangeSet) -> Self {
55+
let mut out = BroadcastQueue::default();
56+
out.apply_changeset(changeset);
57+
out
58+
}
59+
60+
/// Apply the given `changeset`.
61+
pub fn apply_changeset(&mut self, changeset: ChangeSet) {
62+
for mutation in changeset.mutations {
63+
match mutation {
64+
Mutation::Push(txid) => self._push(txid),
65+
Mutation::Remove(txid) => self._remove(txid),
66+
};
67+
}
68+
}
69+
70+
/// Whether the `txid` exists in the queue.
71+
pub fn contains(&self, txid: Txid) -> bool {
72+
self.dedup.contains(&txid)
73+
}
74+
75+
/// Push a `txid` to the queue if it does not already exist.
76+
///
77+
/// # Warning
78+
///
79+
/// This does not get rid of conflicting transactions already in the queue.
80+
pub fn push(&mut self, txid: Txid) -> ChangeSet {
81+
let mut changeset = ChangeSet::default();
82+
if self._push(txid) {
83+
changeset.mutations.push(Mutation::Push(txid));
84+
}
85+
changeset
86+
}
87+
fn _push(&mut self, txid: Txid) -> bool {
88+
if self.dedup.insert(txid) {
89+
self.queue.push_back(txid);
90+
return true;
91+
}
92+
false
93+
}
94+
95+
/// Push a `txid` to the broadcast queue (if it does not exist already) and displaces all
96+
/// coflicting txids in the queue.
97+
pub fn push_and_displace_conflicts<A>(&mut self, tx_graph: &TxGraph<A>, txid: Txid) -> ChangeSet
98+
where
99+
A: Anchor,
100+
{
101+
let mut changeset = ChangeSet::default();
102+
103+
let tx = match tx_graph.get_tx(txid) {
104+
Some(tx) => tx,
105+
None => {
106+
debug_assert!(
107+
!self.dedup.contains(&txid),
108+
"Cannot have txid in queue which has no corresponding tx in graph"
109+
);
110+
return changeset;
111+
}
112+
};
113+
114+
if self._push(txid) {
115+
changeset.mutations.push(Mutation::Push(txid));
116+
117+
for txid in tx_graph.walk_conflicts(&tx, |_, conflict_txid| Some(conflict_txid)) {
118+
if self._remove(txid) {
119+
changeset.mutations.push(Mutation::Remove(txid));
120+
}
121+
}
122+
}
123+
124+
changeset
125+
}
126+
127+
/// Returns the next `txid` of the queue to broadcast which has no dependencies to other
128+
/// transactions in the queue.
129+
pub fn next_to_broadcast<A>(&self, tx_graph: &TxGraph<A>) -> Option<Txid>
130+
where
131+
A: Anchor,
132+
{
133+
self.queue.iter().copied().find(|&txid| {
134+
let tx = match tx_graph.get_tx(txid) {
135+
Some(tx) => tx,
136+
None => return false,
137+
};
138+
if tx
139+
.input
140+
.iter()
141+
.any(|txin| self.dedup.contains(&txin.previous_output.txid))
142+
{
143+
return false;
144+
}
145+
true
146+
})
147+
}
148+
149+
/// Returns unbroadcasted dependencies of the given `txid`.
150+
///
151+
/// The returned `Vec` is in broadcast order.
152+
pub fn unbroadcasted_dependencies<A>(&self, tx_graph: &TxGraph<A>, txid: Txid) -> Vec<Txid>
153+
where
154+
A: Anchor,
155+
{
156+
let tx = match tx_graph.get_tx(txid) {
157+
Some(tx) => tx,
158+
None => return Vec::new(),
159+
};
160+
let mut txs = tx_graph
161+
.walk_ancestors(tx, |_depth, ancestor_tx| {
162+
let ancestor_txid = ancestor_tx.compute_txid();
163+
if self.dedup.contains(&ancestor_txid) {
164+
Some(ancestor_txid)
165+
} else {
166+
None
167+
}
168+
})
169+
.collect::<Vec<_>>();
170+
txs.reverse();
171+
txs
172+
}
173+
174+
/// Untracks and removes a transaction from the broadcast queue.
175+
///
176+
/// Transactions are automatically removed from the queue upon successful broadcast, so calling
177+
/// this method directly is typically not required.
178+
pub fn remove(&mut self, txid: Txid) -> ChangeSet {
179+
let mut changeset = ChangeSet::default();
180+
if self._remove(txid) {
181+
changeset.mutations.push(Mutation::Remove(txid));
182+
}
183+
changeset
184+
}
185+
fn _remove(&mut self, txid: Txid) -> bool {
186+
if self.dedup.remove(&txid) {
187+
let i = (0..self.queue.len())
188+
.zip(self.queue.iter().copied())
189+
.find_map(|(i, queue_txid)| if queue_txid == txid { Some(i) } else { None })
190+
.expect("must exist in queue to exist in `queue`");
191+
let _removed = self.queue.remove(i);
192+
debug_assert_eq!(_removed, Some(txid));
193+
return true;
194+
}
195+
false
196+
}
197+
198+
/// Untracks and removes a transaction and it's descendants from the broadcast queue.
199+
pub fn remove_and_displace_dependants<A>(
200+
&mut self,
201+
tx_graph: &TxGraph<A>,
202+
txid: Txid,
203+
) -> ChangeSet
204+
where
205+
A: Anchor,
206+
{
207+
let mut changeset = ChangeSet::default();
208+
209+
if self._remove(txid) {
210+
changeset.mutations.push(Mutation::Remove(txid));
211+
for txid in tx_graph.walk_descendants(txid, |_depth, txid| Some(txid)) {
212+
if self._remove(txid) {
213+
changeset.mutations.push(Mutation::Remove(txid));
214+
}
215+
}
216+
}
217+
changeset
218+
}
219+
220+
/// Untrack transactions that are given anchors and/or mempool timestamps.
221+
pub fn filter_from_graph_changeset<A>(
222+
&mut self,
223+
graph_changeset: &tx_graph::ChangeSet<A>,
224+
) -> ChangeSet {
225+
let mut changeset = ChangeSet::default();
226+
let s_txids = graph_changeset.last_seen.iter().map(|(txid, _)| *txid);
227+
let a_txids = graph_changeset.anchors.iter().map(|(_, txid)| *txid);
228+
let e_txids = graph_changeset.last_evicted.iter().map(|(txid, _)| *txid);
229+
for txid in s_txids.chain(a_txids).chain(e_txids) {
230+
changeset.merge(self.remove(txid));
231+
}
232+
changeset
233+
}
234+
235+
/// Txids ordered by precedence.
236+
///
237+
/// Transactions with greater precedence will appear later in this list.
238+
pub fn txids(&self) -> impl ExactSizeIterator<Item = Txid> + '_ {
239+
self.queue.iter().copied()
240+
}
241+
242+
/// Initial changeset.
243+
pub fn initial_changeset(&self) -> ChangeSet {
244+
ChangeSet {
245+
mutations: self.queue.iter().copied().map(Mutation::Push).collect(),
246+
}
247+
}
248+
}

wallet/src/wallet/changeset.rs

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use bdk_chain::{
22
indexed_tx_graph, keychain_txout, local_chain, tx_graph, ConfirmationBlockTime, Merge,
33
};
4+
use bitcoin::Txid;
45
use miniscript::{Descriptor, DescriptorPublicKey};
56
use serde::{Deserialize, Serialize};
67

8+
use super::broadcast_queue;
9+
710
type IndexedTxGraphChangeSet =
811
indexed_tx_graph::ChangeSet<ConfirmationBlockTime, keychain_txout::ChangeSet>;
912

@@ -114,6 +117,9 @@ pub struct ChangeSet {
114117
pub tx_graph: tx_graph::ChangeSet<ConfirmationBlockTime>,
115118
/// Changes to [`KeychainTxOutIndex`](keychain_txout::KeychainTxOutIndex).
116119
pub indexer: keychain_txout::ChangeSet,
120+
/// Changes to [`BroadcastQueue`](broadcast_queue::BroadcastQueue).
121+
#[serde(default)]
122+
pub broadcast_queue: broadcast_queue::ChangeSet,
117123
}
118124

119125
impl Merge for ChangeSet {
@@ -145,6 +151,7 @@ impl Merge for ChangeSet {
145151
Merge::merge(&mut self.local_chain, other.local_chain);
146152
Merge::merge(&mut self.tx_graph, other.tx_graph);
147153
Merge::merge(&mut self.indexer, other.indexer);
154+
Merge::merge(&mut self.broadcast_queue, other.broadcast_queue);
148155
}
149156

150157
fn is_empty(&self) -> bool {
@@ -154,6 +161,7 @@ impl Merge for ChangeSet {
154161
&& self.local_chain.is_empty()
155162
&& self.tx_graph.is_empty()
156163
&& self.indexer.is_empty()
164+
&& self.broadcast_queue.is_empty()
157165
}
158166
}
159167

@@ -163,6 +171,8 @@ impl ChangeSet {
163171
pub const WALLET_SCHEMA_NAME: &'static str = "bdk_wallet";
164172
/// Name of table to store wallet descriptors and network.
165173
pub const WALLET_TABLE_NAME: &'static str = "bdk_wallet";
174+
/// Name of table to store broadcast queue txids.
175+
pub const WALLET_BROADCAST_QUEUE_TABLE_NAME: &'static str = "bdk_wallet_broadcast_queue";
166176

167177
/// Get v0 sqlite [ChangeSet] schema
168178
pub fn schema_v0() -> alloc::string::String {
@@ -177,12 +187,23 @@ impl ChangeSet {
177187
)
178188
}
179189

190+
/// Get v1 sqlite [`ChangeSet`] schema.
191+
pub fn schema_v1() -> alloc::string::String {
192+
format!(
193+
"CREATE TABLE {} ( \
194+
id INTEGER PRIMARY KEY AUTOINCREMENT,
195+
txid TEXT NOT NULL UNIQUE \
196+
) STRICT;",
197+
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME,
198+
)
199+
}
200+
180201
/// Initialize sqlite tables for wallet tables.
181202
pub fn init_sqlite_tables(db_tx: &chain::rusqlite::Transaction) -> chain::rusqlite::Result<()> {
182203
crate::rusqlite_impl::migrate_schema(
183204
db_tx,
184205
Self::WALLET_SCHEMA_NAME,
185-
&[&Self::schema_v0()],
206+
&[&Self::schema_v0(), &Self::schema_v1()],
186207
)?;
187208

188209
bdk_chain::local_chain::ChangeSet::init_sqlite_tables(db_tx)?;
@@ -220,6 +241,19 @@ impl ChangeSet {
220241
changeset.network = network.map(Impl::into_inner);
221242
}
222243

244+
let mut queue_statement = db_tx.prepare(&format!(
245+
"SELECT txid FROM {} ORDER BY id ASC",
246+
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
247+
))?;
248+
let row_iter = queue_statement.query_map([], |row| row.get::<_, Impl<Txid>>("txid"))?;
249+
for row in row_iter {
250+
let Impl(txid) = row?;
251+
changeset
252+
.broadcast_queue
253+
.mutations
254+
.push(broadcast_queue::Mutation::Push(txid));
255+
}
256+
223257
changeset.local_chain = local_chain::ChangeSet::from_sqlite(db_tx)?;
224258
changeset.tx_graph = tx_graph::ChangeSet::<_>::from_sqlite(db_tx)?;
225259
changeset.indexer = keychain_txout::ChangeSet::from_sqlite(db_tx)?;
@@ -268,6 +302,25 @@ impl ChangeSet {
268302
})?;
269303
}
270304

305+
let mut queue_insert_statement = db_tx.prepare_cached(&format!(
306+
"INSERT OR IGNORE INTO {}(txid) VALUES(:txid)",
307+
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
308+
))?;
309+
let mut queue_remove_statement = db_tx.prepare_cached(&format!(
310+
"DELETE FROM {} WHERE txid=:txid",
311+
Self::WALLET_BROADCAST_QUEUE_TABLE_NAME
312+
))?;
313+
for mutation in &self.broadcast_queue.mutations {
314+
match mutation {
315+
broadcast_queue::Mutation::Push(txid) => {
316+
queue_insert_statement.execute(named_params! { ":txid": Impl(*txid) })?;
317+
}
318+
broadcast_queue::Mutation::Remove(txid) => {
319+
queue_remove_statement.execute(named_params! { ":txid": Impl(*txid) })?;
320+
}
321+
}
322+
}
323+
271324
self.local_chain.persist_to_sqlite(db_tx)?;
272325
self.tx_graph.persist_to_sqlite(db_tx)?;
273326
self.indexer.persist_to_sqlite(db_tx)?;
@@ -311,3 +364,12 @@ impl From<keychain_txout::ChangeSet> for ChangeSet {
311364
}
312365
}
313366
}
367+
368+
impl From<broadcast_queue::ChangeSet> for ChangeSet {
369+
fn from(broadcast_queue: broadcast_queue::ChangeSet) -> Self {
370+
Self {
371+
broadcast_queue,
372+
..Default::default()
373+
}
374+
}
375+
}

0 commit comments

Comments
 (0)