Skip to content

Commit a4fbb83

Browse files
committed
feat!: Introduce BroadcastQueue
1 parent 85d0eb7 commit a4fbb83

File tree

6 files changed

+455
-156
lines changed

6 files changed

+455
-156
lines changed

wallet/src/types.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ pub struct LocalOutput {
7373
pub derivation_index: u32,
7474
/// The position of the output in the blockchain.
7575
pub chain_position: ChainPosition<ConfirmationBlockTime>,
76+
/// Whether this output exists in a transaction that is yet to be broadcasted.
77+
pub needs_broadcast: bool,
7678
}
7779

7880
/// A [`Utxo`] with its `satisfaction_weight`.

wallet/src/wallet/broadcast_queue.rs

Lines changed: 175 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
//! Unbroadcasted transaction queue.
22
3-
use std::vec::Vec;
3+
use alloc::vec::Vec;
4+
use chain::tx_graph;
5+
use chain::Anchor;
6+
use chain::TxGraph;
47

5-
use crate::collections::BTreeMap;
6-
use crate::collections::HashMap;
8+
use crate::collections::HashSet;
79
use crate::collections::VecDeque;
810

911
use bitcoin::Txid;
1012
use chain::Merge;
11-
use chain::TxUpdate;
1213

1314
/// An ordered unbroadcasted list.
1415
///
@@ -17,46 +18,34 @@ use chain::TxUpdate;
1718
pub struct BroadcastQueue {
1819
queue: VecDeque<Txid>,
1920

20-
/// Enforces that we only change the tx entry if the change has a larger `seq`.
21-
latest_seq: HashMap<Txid, (u64, bool)>,
22-
next_seq: u64,
21+
/// Enforces that we do not have duplicates in `queue`.
22+
dedup: HashSet<Txid>,
2323
}
2424

25-
/// Represents changes made to [`BroadcastQueue`].
25+
/// Represents a single mutation to [`BroadcastQueue`].
26+
#[derive(Debug, Clone, PartialEq, serde::Deserialize, serde::Serialize)]
27+
pub enum Mutation {
28+
/// A push to the back of the queue.
29+
Push(Txid),
30+
///
31+
Remove(Txid),
32+
}
33+
34+
/// A list of mutations made to [`BroadcastQueue`].
2635
#[must_use]
2736
#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize, serde::Serialize)]
2837
pub struct ChangeSet {
29-
pub insert_tx: BTreeMap<Txid, u64>,
30-
pub remove_tx: BTreeMap<Txid, u64>,
38+
/// Mutations.
39+
pub mutations: Vec<Mutation>,
3140
}
3241

3342
impl Merge for ChangeSet {
3443
fn merge(&mut self, other: Self) {
35-
self.insert_tx.extend(
36-
other
37-
.insert_tx
38-
.into_iter()
39-
// Only replace `seq`uences which are larger.
40-
.filter(|(txid, other_seq)| match self.insert_tx.get(txid) {
41-
Some(seq) => seq < other_seq,
42-
None => true,
43-
})
44-
.collect::<Vec<_>>(),
45-
);
46-
self.remove_tx.extend(
47-
other
48-
.remove_tx
49-
.into_iter()
50-
.filter(|(txid, other_seq)| match self.remove_tx.get(txid) {
51-
Some(seq) => seq < other_seq,
52-
None => true,
53-
})
54-
.collect::<Vec<_>>(),
55-
);
44+
self.mutations.extend(other.mutations);
5645
}
5746

5847
fn is_empty(&self) -> bool {
59-
self.insert_tx.is_empty() && self.remove_tx.is_empty()
48+
self.mutations.is_empty()
6049
}
6150
}
6251

@@ -70,119 +59,175 @@ impl BroadcastQueue {
7059

7160
/// Apply the given `changeset`.
7261
pub fn apply_changeset(&mut self, changeset: ChangeSet) {
73-
for (txid, seq) in changeset.insert_tx {
74-
self._push(txid, seq);
75-
}
76-
for (txid, seq) in changeset.remove_tx {
77-
self._remove(txid, seq);
78-
}
79-
self._sort_queue();
80-
}
81-
82-
fn _sort_queue(&mut self) {
83-
self.queue
84-
.make_contiguous()
85-
.sort_unstable_by_key(|txid| self.latest_seq.get(txid));
86-
}
87-
88-
fn _push(&mut self, txid: Txid, seq: u64) -> bool {
89-
let modified = &mut false;
90-
self.latest_seq
91-
.entry(txid)
92-
.and_modify(|(v, in_queue)| {
93-
if *v < seq {
94-
*v = seq;
95-
*modified = true;
96-
if !*in_queue {
97-
*in_queue = true;
98-
self.queue.push_back(txid);
99-
}
100-
}
101-
})
102-
.or_insert_with(|| {
103-
*modified = true;
104-
self.queue.push_back(txid);
105-
(seq, true)
106-
});
107-
self.next_seq = self.next_seq.max(seq.saturating_add(1));
108-
*modified
109-
}
110-
111-
fn _contains(latest_seq: &mut HashMap<Txid, (u64, bool)>, txid: Txid) -> bool {
112-
match latest_seq.get(&txid) {
113-
Some(&(_, contains)) => contains,
114-
None => false,
62+
for mutation in changeset.mutations {
63+
match mutation {
64+
Mutation::Push(txid) => self._push(txid),
65+
Mutation::Remove(txid) => self._remove(txid),
66+
};
11567
}
11668
}
11769

118-
fn _remove(&mut self, txid: Txid, seq: u64) -> bool {
119-
let modified = &mut false;
120-
self.latest_seq
121-
.entry(txid)
122-
.and_modify(|(v, in_queue)| {
123-
if *v <= seq {
124-
*v = seq;
125-
*modified = true;
126-
if *in_queue {
127-
*in_queue = false;
128-
// Most of the time, we are popping from the front of the queue.
129-
let index = self
130-
.queue
131-
.iter()
132-
.enumerate()
133-
.find_map(|(i, &q_txid)| if q_txid == txid { Some(i) } else { None })
134-
.expect("must be in queue");
135-
self.queue.swap_remove_front(index);
136-
}
137-
}
138-
})
139-
.or_insert_with(|| {
140-
*modified = true;
141-
(seq, false)
142-
});
143-
self.next_seq = self.next_seq.max(seq.saturating_add(1));
144-
*modified
70+
/// Whether the `txid` exists in the queue.
71+
pub fn contains(&self, txid: Txid) -> bool {
72+
self.dedup.contains(&txid)
14573
}
14674

147-
/// Reinserting will bump the tx's seq to `next_seq`.
75+
/// Push a `txid` to the queue if it does not already exist.
76+
///
77+
/// # Warning
78+
///
79+
/// This does not get rid of conflicting transactions already in the queue.
14880
pub fn push(&mut self, txid: Txid) -> ChangeSet {
149-
let seq = self.next_seq;
81+
let mut changeset = ChangeSet::default();
82+
if self._push(txid) {
83+
changeset.mutations.push(Mutation::Push(txid));
84+
}
85+
changeset
86+
}
87+
fn _push(&mut self, txid: Txid) -> bool {
88+
if self.dedup.insert(txid) {
89+
self.queue.push_back(txid);
90+
return true;
91+
}
92+
false
93+
}
15094

95+
/// Push a `txid` to the broadcast queue (if it does not exist already) and displaces all
96+
/// coflicting txids in the queue.
97+
pub fn push_and_displace_conflicts<A>(&mut self, tx_graph: &TxGraph<A>, txid: Txid) -> ChangeSet
98+
where
99+
A: Anchor,
100+
{
151101
let mut changeset = ChangeSet::default();
152-
if self._push(txid, seq) {
153-
changeset.insert_tx.insert(txid, seq);
154-
self.next_seq += 1;
102+
103+
let tx = match tx_graph.get_tx(txid) {
104+
Some(tx) => tx,
105+
None => {
106+
debug_assert!(
107+
!self.dedup.contains(&txid),
108+
"Cannot have txid in queue which has no corresponding tx in graph"
109+
);
110+
return changeset;
111+
}
112+
};
113+
114+
if self._push(txid) {
115+
changeset.mutations.push(Mutation::Push(txid));
116+
117+
for txid in tx_graph.walk_conflicts(&tx, |_, conflict_txid| Some(conflict_txid)) {
118+
if self._remove(txid) {
119+
changeset.mutations.push(Mutation::Remove(txid));
120+
}
121+
}
155122
}
123+
156124
changeset
157125
}
158126

159-
/// Returns the next `txid` of the transaction to broadcast.
160-
pub fn next(&self) -> Option<Txid> {
161-
// TODO: Implement.
162-
// NOTE: Prioritize txs that have no dependencies.
163-
None
127+
/// Returns the next `txid` of the queue to broadcast which has no dependencies to other
128+
/// transactions in the queue.
129+
pub fn next_to_broadcast<A>(&self, tx_graph: &TxGraph<A>) -> Option<Txid>
130+
where
131+
A: Anchor,
132+
{
133+
self.queue.iter().copied().find(|&txid| {
134+
let tx = match tx_graph.get_tx(txid) {
135+
Some(tx) => tx,
136+
None => return false,
137+
};
138+
if tx
139+
.input
140+
.iter()
141+
.any(|txin| self.dedup.contains(&txin.previous_output.txid))
142+
{
143+
return false;
144+
}
145+
true
146+
})
164147
}
165148

166-
/// Untrack the `txid`.
167-
pub fn remove(&mut self, txid: Txid) -> ChangeSet {
168-
let seq = self.next_seq;
149+
/// Returns unbroadcasted dependencies of the given `txid`.
150+
///
151+
/// The returned `Vec` is in broadcast order.
152+
pub fn unbroadcasted_dependencies<A>(&self, tx_graph: &TxGraph<A>, txid: Txid) -> Vec<Txid>
153+
where
154+
A: Anchor,
155+
{
156+
let tx = match tx_graph.get_tx(txid) {
157+
Some(tx) => tx,
158+
None => return Vec::new(),
159+
};
160+
let mut txs = tx_graph
161+
.walk_ancestors(tx, |_depth, ancestor_tx| {
162+
let ancestor_txid = ancestor_tx.compute_txid();
163+
if self.dedup.contains(&ancestor_txid) {
164+
Some(ancestor_txid)
165+
} else {
166+
None
167+
}
168+
})
169+
.collect::<Vec<_>>();
170+
txs.reverse();
171+
txs
172+
}
169173

174+
/// Untracks and removes a transaction from the broadcast queue.
175+
///
176+
/// Transactions are automatically removed from the queue upon successful broadcast, so calling
177+
/// this method directly is typically not required.
178+
pub fn remove(&mut self, txid: Txid) -> ChangeSet {
170179
let mut changeset = ChangeSet::default();
171-
if self._remove(txid, seq) {
172-
changeset.remove_tx.insert(txid, seq);
173-
self.next_seq += 1;
180+
if self._remove(txid) {
181+
changeset.mutations.push(Mutation::Remove(txid));
174182
}
175183
changeset
176184
}
185+
fn _remove(&mut self, txid: Txid) -> bool {
186+
if self.dedup.remove(&txid) {
187+
let i = (0..self.queue.len())
188+
.zip(self.queue.iter().copied())
189+
.find_map(|(i, queue_txid)| if queue_txid == txid { Some(i) } else { None })
190+
.expect("must exist in queue to exist in `queue`");
191+
let _removed = self.queue.remove(i);
192+
debug_assert_eq!(_removed, Some(txid));
193+
return true;
194+
}
195+
false
196+
}
177197

178-
/// Untrack transactions that are given anchors and seen-at timestamps.
179-
pub fn update<A>(&mut self, tx_update: &TxUpdate<A>) -> ChangeSet {
198+
/// Untracks and removes a transaction and it's descendants from the broadcast queue.
199+
pub fn remove_and_displace_dependants<A>(
200+
&mut self,
201+
tx_graph: &TxGraph<A>,
202+
txid: Txid,
203+
) -> ChangeSet
204+
where
205+
A: Anchor,
206+
{
180207
let mut changeset = ChangeSet::default();
181-
for (_, txid) in &tx_update.anchors {
182-
changeset.merge(self.remove(*txid));
208+
209+
if self._remove(txid) {
210+
changeset.mutations.push(Mutation::Remove(txid));
211+
for txid in tx_graph.walk_descendants(txid, |_depth, txid| Some(txid)) {
212+
if self._remove(txid) {
213+
changeset.mutations.push(Mutation::Remove(txid));
214+
}
215+
}
183216
}
184-
for (txid, _) in &tx_update.seen_ats {
185-
changeset.merge(self.remove(*txid));
217+
changeset
218+
}
219+
220+
/// Untrack transactions that are given anchors and/or mempool timestamps.
221+
pub fn filter_from_graph_changeset<A>(
222+
&mut self,
223+
graph_changeset: &tx_graph::ChangeSet<A>,
224+
) -> ChangeSet {
225+
let mut changeset = ChangeSet::default();
226+
let s_txids = graph_changeset.last_seen.iter().map(|(txid, _)| *txid);
227+
let a_txids = graph_changeset.anchors.iter().map(|(_, txid)| *txid);
228+
let e_txids = graph_changeset.last_evicted.iter().map(|(txid, _)| *txid);
229+
for txid in s_txids.chain(a_txids).chain(e_txids) {
230+
changeset.merge(self.remove(txid));
186231
}
187232
changeset
188233
}
@@ -193,4 +238,11 @@ impl BroadcastQueue {
193238
pub fn txids(&self) -> impl ExactSizeIterator<Item = Txid> + '_ {
194239
self.queue.iter().copied()
195240
}
241+
242+
/// Initial changeset.
243+
pub fn initial_changeset(&self) -> ChangeSet {
244+
ChangeSet {
245+
mutations: self.queue.iter().copied().map(Mutation::Push).collect(),
246+
}
247+
}
196248
}

0 commit comments

Comments
 (0)