Skip to content

Commit d2cc0c3

Browse files
committed
Implement Confirm/Listen interfaces
1 parent 9b46601 commit d2cc0c3

File tree

1 file changed

+226
-6
lines changed

1 file changed

+226
-6
lines changed

src/sweep.rs

Lines changed: 226 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,23 @@ use crate::wallet::Wallet;
88
use crate::{Error, KeysManager};
99

1010
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
11-
use lightning::chain::BestBlock;
11+
use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
1212
use lightning::impl_writeable_tlv_based;
1313
use lightning::sign::{EntropySource, SpendableOutputDescriptor};
1414
use lightning::util::persist::KVStore;
1515
use lightning::util::ser::Writeable;
1616

1717
use bitcoin::secp256k1::Secp256k1;
18-
use bitcoin::{BlockHash, LockTime, PackedLockTime, Transaction};
18+
use bitcoin::{BlockHash, BlockHeader, LockTime, PackedLockTime, Transaction, Txid};
1919

20+
use std::collections::HashSet;
2021
use std::ops::Deref;
2122
use std::sync::{Arc, Mutex};
2223

24+
const CONSIDERED_SPENT_THRESHOLD_CONF: u32 = 6;
25+
26+
const REGENERATE_SPEND_THRESHOLD: u32 = 144;
27+
2328
#[derive(Clone, Debug, PartialEq, Eq)]
2429
pub(crate) struct SpendableOutputInfo {
2530
id: [u8; 32],
@@ -37,29 +42,43 @@ impl_writeable_tlv_based!(SpendableOutputInfo, {
3742
(8, confirmed_in_block, option),
3843
});
3944

40-
pub(crate) struct OutputSweeper<K: KVStore + Sync + Send, L: Deref>
45+
pub(crate) struct OutputSweeper<K: KVStore + Sync + Send, F: Deref, L: Deref>
4146
where
47+
F::Target: Filter,
4248
L::Target: Logger,
4349
{
4450
outputs: Mutex<Vec<SpendableOutputInfo>>,
4551
wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
4652
keys_manager: Arc<KeysManager>,
4753
kv_store: Arc<K>,
4854
best_block: Mutex<BestBlock>,
55+
chain_source: Option<F>,
4956
logger: L,
5057
}
5158

52-
impl<K: KVStore + Sync + Send, L: Deref> OutputSweeper<K, L>
59+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> OutputSweeper<K, F, L>
5360
where
61+
F::Target: Filter,
5462
L::Target: Logger,
5563
{
5664
pub(crate) fn new(
5765
outputs: Vec<SpendableOutputInfo>, wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
58-
keys_manager: Arc<KeysManager>, kv_store: Arc<K>, best_block: BestBlock, logger: L,
66+
keys_manager: Arc<KeysManager>, kv_store: Arc<K>, best_block: BestBlock,
67+
chain_source: Option<F>, logger: L,
5968
) -> Self {
69+
if let Some(filter) = chain_source.as_ref() {
70+
for o in &outputs {
71+
if let Some(tx) = o.spending_tx.as_ref() {
72+
if let Some(tx_out) = tx.output.first() {
73+
filter.register_tx(&tx.txid(), &tx_out.script_pubkey);
74+
}
75+
}
76+
}
77+
}
78+
6079
let outputs = Mutex::new(outputs);
6180
let best_block = Mutex::new(best_block);
62-
Self { outputs, wallet, keys_manager, kv_store, best_block, logger }
81+
Self { outputs, wallet, keys_manager, kv_store, best_block, chain_source, logger }
6382
}
6483

6584
pub(crate) fn add_outputs(&self, mut output_descriptors: Vec<SpendableOutputDescriptor>) {
@@ -76,6 +95,11 @@ where
7695
match self.get_spending_tx(&non_static_outputs, cur_height) {
7796
Ok(spending_tx) => {
7897
self.wallet.broadcast_transactions(&[&spending_tx]);
98+
if let Some(filter) = self.chain_source.as_ref() {
99+
if let Some(tx_out) = spending_tx.output.first() {
100+
filter.register_tx(&spending_tx.txid(), &tx_out.script_pubkey);
101+
}
102+
}
79103
(Some(spending_tx), Some(cur_height))
80104
}
81105
Err(e) => {
@@ -150,3 +174,199 @@ where
150174
})
151175
}
152176
}
177+
178+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> Listen for OutputSweeper<K, F, L>
179+
where
180+
F::Target: Filter,
181+
L::Target: Logger,
182+
{
183+
fn filtered_block_connected(
184+
&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32,
185+
) {
186+
{
187+
let best_block = self.best_block.lock().unwrap();
188+
assert_eq!(best_block.block_hash(), header.prev_blockhash,
189+
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
190+
assert_eq!(best_block.height(), height - 1,
191+
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
192+
}
193+
194+
self.transactions_confirmed(header, txdata, height);
195+
self.best_block_updated(header, height);
196+
}
197+
198+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
199+
let new_height = height - 1;
200+
{
201+
let mut best_block = self.best_block.lock().unwrap();
202+
assert_eq!(best_block.block_hash(), header.block_hash(),
203+
"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
204+
assert_eq!(best_block.height(), height,
205+
"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
206+
*best_block = BestBlock::new(header.prev_blockhash, new_height)
207+
}
208+
209+
let mut locked_outputs = self.outputs.lock().unwrap();
210+
for output_info in locked_outputs.iter_mut() {
211+
if output_info.confirmed_in_block == Some((height, header.block_hash())) {
212+
output_info.confirmed_in_block = None;
213+
match self.persist_info(output_info) {
214+
Ok(()) => {}
215+
Err(e) => {
216+
log_error!(self.logger, "Error persisting spendable output info: {:?}", e)
217+
}
218+
}
219+
}
220+
}
221+
}
222+
}
223+
224+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> Confirm for OutputSweeper<K, F, L>
225+
where
226+
F::Target: Filter,
227+
L::Target: Logger,
228+
{
229+
fn transactions_confirmed(
230+
&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32,
231+
) {
232+
let mut locked_outputs = self.outputs.lock().unwrap();
233+
for (_, tx) in txdata {
234+
locked_outputs
235+
.iter_mut()
236+
.filter(|o| o.spending_tx.as_ref().map(|t| t.txid()) == Some(tx.txid()))
237+
.for_each(|o| {
238+
o.confirmed_in_block = Some((height, header.block_hash()));
239+
match self.persist_info(o) {
240+
Ok(()) => {}
241+
Err(e) => {
242+
log_error!(
243+
self.logger,
244+
"Error persisting spendable output info: {:?}",
245+
e
246+
)
247+
}
248+
}
249+
});
250+
}
251+
}
252+
253+
fn transaction_unconfirmed(&self, txid: &Txid) {
254+
let mut locked_outputs = self.outputs.lock().unwrap();
255+
256+
// Get what height was unconfirmed.
257+
let unconf_height = locked_outputs
258+
.iter()
259+
.find(|o| o.spending_tx.as_ref().map(|t| t.txid()) == Some(*txid))
260+
.and_then(|o| o.confirmed_in_block)
261+
.map(|t| t.0);
262+
263+
// Unconfirm all >= this height.
264+
locked_outputs
265+
.iter_mut()
266+
.filter(|o| o.confirmed_in_block.map(|t| t.0) >= unconf_height)
267+
.for_each(|o| {
268+
o.confirmed_in_block = None;
269+
match self.persist_info(o) {
270+
Ok(()) => {}
271+
Err(e) => {
272+
log_error!(self.logger, "Error persisting spendable output info: {:?}", e)
273+
}
274+
}
275+
});
276+
}
277+
278+
fn best_block_updated(&self, header: &BlockHeader, height: u32) {
279+
*self.best_block.lock().unwrap() = BestBlock::new(header.block_hash(), height);
280+
281+
let mut locked_outputs = self.outputs.lock().unwrap();
282+
283+
// Regenerate spending tx and fee bump all outputs that didn't get confirmed by now.
284+
for output_info in locked_outputs.iter_mut().filter(|o| o.confirmed_in_block.is_none()) {
285+
let bcast_height = output_info.broadcast_height.unwrap_or(0);
286+
if height >= bcast_height + REGENERATE_SPEND_THRESHOLD {
287+
let output_descriptors = vec![output_info.descriptor.clone()];
288+
match self.get_spending_tx(&output_descriptors, height) {
289+
Ok(Some(spending_tx)) => {
290+
if let Some(filter) = self.chain_source.as_ref() {
291+
if let Some(tx_out) = spending_tx.output.first() {
292+
filter.register_tx(&spending_tx.txid(), &tx_out.script_pubkey);
293+
}
294+
}
295+
output_info.spending_tx = Some(spending_tx);
296+
output_info.broadcast_height = Some(height);
297+
match self.persist_info(output_info) {
298+
Ok(()) => {}
299+
Err(e) => {
300+
log_error!(
301+
self.logger,
302+
"Error persisting spendable output info: {:?}",
303+
e
304+
)
305+
}
306+
}
307+
}
308+
Ok(None) => {
309+
log_debug!(
310+
self.logger,
311+
"Omitted spending static outputs: {:?}",
312+
output_descriptors
313+
);
314+
}
315+
Err(err) => {
316+
log_error!(self.logger, "Error spending outputs: {:?}", err);
317+
}
318+
};
319+
}
320+
}
321+
322+
// Prune all outputs that have sufficient depth by now.
323+
locked_outputs.retain(|o| {
324+
if let Some((conf_height, _)) = o.confirmed_in_block {
325+
if height >= conf_height + CONSIDERED_SPENT_THRESHOLD_CONF - 1 {
326+
let key = hex_utils::to_string(&o.id);
327+
match self.kv_store.remove(
328+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
329+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
330+
&key,
331+
false,
332+
) {
333+
Ok(_) => return false,
334+
Err(e) => {
335+
log_error!(
336+
self.logger,
337+
"Removal of key {}/{}/{} failed due to: {}",
338+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
339+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
340+
key,
341+
e
342+
);
343+
return true;
344+
}
345+
}
346+
}
347+
}
348+
true
349+
});
350+
351+
// Rebroadcast all pending spending txs
352+
let mut txs = locked_outputs
353+
.iter()
354+
.filter_map(|o| o.spending_tx.as_ref())
355+
.collect::<HashSet<&Transaction>>();
356+
self.wallet.broadcast_transactions(&txs.drain().collect::<Vec<_>>());
357+
}
358+
359+
fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
360+
let locked_outputs = self.outputs.lock().unwrap();
361+
locked_outputs
362+
.iter()
363+
.filter_map(|o| {
364+
if let Some(tx) = o.spending_tx.as_ref() {
365+
Some((tx.txid(), o.confirmed_in_block.map(|c| c.1)))
366+
} else {
367+
None
368+
}
369+
})
370+
.collect::<Vec<_>>()
371+
}
372+
}

0 commit comments

Comments
 (0)