Skip to content

Commit 85d0eb7

Browse files
committed
feat(wallet): WIP: BroadcastQueue
1 parent ce70bb7 commit 85d0eb7

File tree

2 files changed

+198
-0
lines changed

2 files changed

+198
-0
lines changed

wallet/src/wallet/broadcast_queue.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
//! Unbroadcasted transaction queue.
2+
3+
use std::vec::Vec;
4+
5+
use crate::collections::BTreeMap;
6+
use crate::collections::HashMap;
7+
use crate::collections::VecDeque;
8+
9+
use bitcoin::Txid;
10+
use chain::Merge;
11+
use chain::TxUpdate;
12+
13+
/// An ordered unbroadcasted list.
14+
///
15+
/// It is ordered in case of RBF txs.
16+
#[derive(Debug, Clone, Default)]
17+
pub struct BroadcastQueue {
18+
queue: VecDeque<Txid>,
19+
20+
/// Enforces that we only change the tx entry if the change has a larger `seq`.
21+
latest_seq: HashMap<Txid, (u64, bool)>,
22+
next_seq: u64,
23+
}
24+
25+
/// Represents changes made to [`BroadcastQueue`].
26+
#[must_use]
27+
#[derive(Debug, Clone, Default, PartialEq, serde::Deserialize, serde::Serialize)]
28+
pub struct ChangeSet {
29+
pub insert_tx: BTreeMap<Txid, u64>,
30+
pub remove_tx: BTreeMap<Txid, u64>,
31+
}
32+
33+
impl Merge for ChangeSet {
34+
fn merge(&mut self, other: Self) {
35+
self.insert_tx.extend(
36+
other
37+
.insert_tx
38+
.into_iter()
39+
// Only replace `seq`uences which are larger.
40+
.filter(|(txid, other_seq)| match self.insert_tx.get(txid) {
41+
Some(seq) => seq < other_seq,
42+
None => true,
43+
})
44+
.collect::<Vec<_>>(),
45+
);
46+
self.remove_tx.extend(
47+
other
48+
.remove_tx
49+
.into_iter()
50+
.filter(|(txid, other_seq)| match self.remove_tx.get(txid) {
51+
Some(seq) => seq < other_seq,
52+
None => true,
53+
})
54+
.collect::<Vec<_>>(),
55+
);
56+
}
57+
58+
fn is_empty(&self) -> bool {
59+
self.insert_tx.is_empty() && self.remove_tx.is_empty()
60+
}
61+
}
62+
63+
impl BroadcastQueue {
64+
/// Construct [`Unbroadcasted`] from the given `changeset`.
65+
pub fn from_changeset(changeset: ChangeSet) -> Self {
66+
let mut out = BroadcastQueue::default();
67+
out.apply_changeset(changeset);
68+
out
69+
}
70+
71+
/// Apply the given `changeset`.
72+
pub fn apply_changeset(&mut self, changeset: ChangeSet) {
73+
for (txid, seq) in changeset.insert_tx {
74+
self._push(txid, seq);
75+
}
76+
for (txid, seq) in changeset.remove_tx {
77+
self._remove(txid, seq);
78+
}
79+
self._sort_queue();
80+
}
81+
82+
fn _sort_queue(&mut self) {
83+
self.queue
84+
.make_contiguous()
85+
.sort_unstable_by_key(|txid| self.latest_seq.get(txid));
86+
}
87+
88+
fn _push(&mut self, txid: Txid, seq: u64) -> bool {
89+
let modified = &mut false;
90+
self.latest_seq
91+
.entry(txid)
92+
.and_modify(|(v, in_queue)| {
93+
if *v < seq {
94+
*v = seq;
95+
*modified = true;
96+
if !*in_queue {
97+
*in_queue = true;
98+
self.queue.push_back(txid);
99+
}
100+
}
101+
})
102+
.or_insert_with(|| {
103+
*modified = true;
104+
self.queue.push_back(txid);
105+
(seq, true)
106+
});
107+
self.next_seq = self.next_seq.max(seq.saturating_add(1));
108+
*modified
109+
}
110+
111+
fn _contains(latest_seq: &mut HashMap<Txid, (u64, bool)>, txid: Txid) -> bool {
112+
match latest_seq.get(&txid) {
113+
Some(&(_, contains)) => contains,
114+
None => false,
115+
}
116+
}
117+
118+
fn _remove(&mut self, txid: Txid, seq: u64) -> bool {
119+
let modified = &mut false;
120+
self.latest_seq
121+
.entry(txid)
122+
.and_modify(|(v, in_queue)| {
123+
if *v <= seq {
124+
*v = seq;
125+
*modified = true;
126+
if *in_queue {
127+
*in_queue = false;
128+
// Most of the time, we are popping from the front of the queue.
129+
let index = self
130+
.queue
131+
.iter()
132+
.enumerate()
133+
.find_map(|(i, &q_txid)| if q_txid == txid { Some(i) } else { None })
134+
.expect("must be in queue");
135+
self.queue.swap_remove_front(index);
136+
}
137+
}
138+
})
139+
.or_insert_with(|| {
140+
*modified = true;
141+
(seq, false)
142+
});
143+
self.next_seq = self.next_seq.max(seq.saturating_add(1));
144+
*modified
145+
}
146+
147+
/// Reinserting will bump the tx's seq to `next_seq`.
148+
pub fn push(&mut self, txid: Txid) -> ChangeSet {
149+
let seq = self.next_seq;
150+
151+
let mut changeset = ChangeSet::default();
152+
if self._push(txid, seq) {
153+
changeset.insert_tx.insert(txid, seq);
154+
self.next_seq += 1;
155+
}
156+
changeset
157+
}
158+
159+
/// Returns the next `txid` of the transaction to broadcast.
160+
pub fn next(&self) -> Option<Txid> {
161+
// TODO: Implement.
162+
// NOTE: Prioritize txs that have no dependencies.
163+
None
164+
}
165+
166+
/// Untrack the `txid`.
167+
pub fn remove(&mut self, txid: Txid) -> ChangeSet {
168+
let seq = self.next_seq;
169+
170+
let mut changeset = ChangeSet::default();
171+
if self._remove(txid, seq) {
172+
changeset.remove_tx.insert(txid, seq);
173+
self.next_seq += 1;
174+
}
175+
changeset
176+
}
177+
178+
/// Untrack transactions that are given anchors and seen-at timestamps.
179+
pub fn update<A>(&mut self, tx_update: &TxUpdate<A>) -> ChangeSet {
180+
let mut changeset = ChangeSet::default();
181+
for (_, txid) in &tx_update.anchors {
182+
changeset.merge(self.remove(*txid));
183+
}
184+
for (txid, _) in &tx_update.seen_ats {
185+
changeset.merge(self.remove(*txid));
186+
}
187+
changeset
188+
}
189+
190+
/// Txids ordered by precedence.
191+
///
192+
/// Transactions with greater precedence will appear later in this list.
193+
pub fn txids(&self) -> impl ExactSizeIterator<Item = Txid> + '_ {
194+
self.queue.iter().copied()
195+
}
196+
}

wallet/src/wallet/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ mod persisted;
5858
pub mod signer;
5959
pub mod tx_builder;
6060
pub(crate) mod utils;
61+
mod broadcast_queue;
6162

6263
use crate::collections::{BTreeMap, HashMap, HashSet};
6364
use crate::descriptor::{
@@ -82,6 +83,7 @@ pub use params::*;
8283
pub use persisted::*;
8384
pub use utils::IsDust;
8485
pub use utils::TxDetails;
86+
pub use broadcast_queue::*;
8587

8688
/// A Bitcoin wallet
8789
///

0 commit comments

Comments
 (0)