Skip to content

Commit 27c7e72

Browse files
committed
Implement Confirm/Listen interfaces
1 parent 0118cbc commit 27c7e72

File tree

1 file changed

+211
-7
lines changed

1 file changed

+211
-7
lines changed

src/sweep.rs

Lines changed: 211 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
use crate::hex_utils;
22
use crate::io::{KVStore, SPENDABLE_OUTPUT_INFO_PERSISTENCE_NAMESPACE};
33
use crate::logger::{log_debug, log_error, Logger};
4-
use crate::wallet::Wallet;
4+
use crate::wallet::{num_blocks_from_conf_target, Wallet};
55
use crate::{Error, KeysManager};
66

77
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
8-
use lightning::chain::BestBlock;
8+
use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
99
use lightning::impl_writeable_tlv_based;
1010
use lightning::sign::{EntropySource, SpendableOutputDescriptor};
1111
use lightning::util::ser::Writeable;
1212

1313
use bitcoin::secp256k1::Secp256k1;
14-
use bitcoin::{BlockHash, LockTime, PackedLockTime, Transaction};
14+
use bitcoin::{BlockHash, BlockHeader, LockTime, PackedLockTime, Script, Transaction, Txid};
1515

1616
use std::ops::Deref;
1717
use std::sync::{Arc, Mutex};
1818

19+
const CONSIDERED_SPENT_THRESHOLD_CONF: u32 = 6;
20+
1921
#[derive(Clone, Debug, PartialEq, Eq)]
2022
pub(crate) struct SpendableOutputInfo {
2123
id: [u8; 32],
@@ -33,29 +35,42 @@ impl_writeable_tlv_based!(SpendableOutputInfo, {
3335
(8, confirmed_in_block, option),
3436
});
3537

36-
pub(crate) struct OutputSweeper<K: KVStore + Sync + Send, L: Deref>
38+
pub(crate) struct OutputSweeper<K: KVStore + Sync + Send, F: Deref, L: Deref>
3739
where
40+
F::Target: Filter,
3841
L::Target: Logger,
3942
{
4043
outputs: Mutex<Vec<SpendableOutputInfo>>,
4144
wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
4245
keys_manager: Arc<KeysManager>,
4346
kv_store: Arc<K>,
4447
best_block: Mutex<BestBlock>,
48+
chain_source: Option<F>,
4549
logger: L,
4650
}
4751

48-
impl<K: KVStore + Sync + Send, L: Deref> OutputSweeper<K, L>
52+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> OutputSweeper<K, F, L>
4953
where
54+
F::Target: Filter,
5055
L::Target: Logger,
5156
{
5257
pub(crate) fn new(
5358
outputs: Vec<SpendableOutputInfo>, wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
54-
keys_manager: Arc<KeysManager>, kv_store: Arc<K>, best_block: BestBlock, logger: L,
59+
keys_manager: Arc<KeysManager>, kv_store: Arc<K>, best_block: BestBlock,
60+
chain_source: Option<F>, logger: L,
5561
) -> Self {
62+
if let Some(filter) = chain_source.as_ref() {
63+
for o in &outputs {
64+
if let Some(tx) = o.spending_tx.as_ref() {
65+
// TODO: can we give something better than the empty script here?
66+
filter.register_tx(&tx.txid(), &Script::new())
67+
}
68+
}
69+
}
70+
5671
let outputs = Mutex::new(outputs);
5772
let best_block = Mutex::new(best_block);
58-
Self { outputs, wallet, keys_manager, kv_store, best_block, logger }
73+
Self { outputs, wallet, keys_manager, kv_store, best_block, chain_source, logger }
5974
}
6075

6176
pub(crate) fn add_outputs(&self, output_descriptors: Vec<SpendableOutputDescriptor>) {
@@ -64,6 +79,9 @@ where
6479
let (spending_tx, broadcast_height) = match self.get_spending_tx(&output_descriptors) {
6580
Ok(Some(spending_tx)) => {
6681
self.wallet.broadcast_transactions(&[&spending_tx]);
82+
if let Some(filter) = self.chain_source.as_ref() {
83+
filter.register_tx(&spending_tx.txid(), &Script::new())
84+
}
6785
(Some(spending_tx), Some(self.best_block.lock().unwrap().height()))
6886
}
6987
Ok(None) => {
@@ -138,3 +156,189 @@ where
138156
})
139157
}
140158
}
159+
160+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> Listen for OutputSweeper<K, F, L>
161+
where
162+
F::Target: Filter,
163+
L::Target: Logger,
164+
{
165+
fn filtered_block_connected(
166+
&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32,
167+
) {
168+
{
169+
let best_block = self.best_block.lock().unwrap();
170+
assert_eq!(best_block.block_hash(), header.prev_blockhash,
171+
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
172+
assert_eq!(best_block.height(), height - 1,
173+
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
174+
}
175+
176+
self.transactions_confirmed(header, txdata, height);
177+
self.best_block_updated(header, height);
178+
}
179+
180+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
181+
let new_height = height - 1;
182+
{
183+
let mut best_block = self.best_block.lock().unwrap();
184+
assert_eq!(best_block.block_hash(), header.block_hash(),
185+
"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
186+
assert_eq!(best_block.height(), height,
187+
"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
188+
*best_block = BestBlock::new(header.prev_blockhash, new_height)
189+
}
190+
191+
let mut locked_outputs = self.outputs.lock().unwrap();
192+
for output_info in locked_outputs.iter_mut() {
193+
if output_info.confirmed_in_block == Some((height, header.block_hash())) {
194+
output_info.confirmed_in_block = None;
195+
match self.persist_info(output_info) {
196+
Ok(()) => {}
197+
Err(e) => {
198+
log_error!(self.logger, "Error persisting spendable output info: {:?}", e)
199+
}
200+
}
201+
}
202+
}
203+
}
204+
}
205+
206+
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> Confirm for OutputSweeper<K, F, L>
207+
where
208+
F::Target: Filter,
209+
L::Target: Logger,
210+
{
211+
fn transactions_confirmed(
212+
&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32,
213+
) {
214+
let mut locked_outputs = self.outputs.lock().unwrap();
215+
for (_, tx) in txdata {
216+
locked_outputs
217+
.iter_mut()
218+
.filter(|o| o.spending_tx.as_ref().map(|t| t.txid()) == Some(tx.txid()))
219+
.for_each(|o| {
220+
o.confirmed_in_block = Some((height, header.block_hash()));
221+
match self.persist_info(o) {
222+
Ok(()) => {}
223+
Err(e) => {
224+
log_error!(
225+
self.logger,
226+
"Error persisting spendable output info: {:?}",
227+
e
228+
)
229+
}
230+
}
231+
});
232+
}
233+
}
234+
235+
fn transaction_unconfirmed(&self, txid: &Txid) {
236+
let mut locked_outputs = self.outputs.lock().unwrap();
237+
238+
// Get what height was unconfirmed.
239+
let unconf_height = locked_outputs
240+
.iter()
241+
.find(|o| o.spending_tx.as_ref().map(|t| t.txid()) == Some(*txid))
242+
.and_then(|o| o.confirmed_in_block)
243+
.map(|t| t.0);
244+
245+
// Unconfirm all >= this height.
246+
locked_outputs
247+
.iter_mut()
248+
.filter(|o| o.confirmed_in_block.map(|t| t.0) >= unconf_height)
249+
.for_each(|o| {
250+
o.confirmed_in_block = None;
251+
match self.persist_info(o) {
252+
Ok(()) => {}
253+
Err(e) => {
254+
log_error!(self.logger, "Error persisting spendable output info: {:?}", e)
255+
}
256+
}
257+
});
258+
}
259+
260+
fn best_block_updated(&self, header: &BlockHeader, height: u32) {
261+
*self.best_block.lock().unwrap() = BestBlock::new(header.block_hash(), height);
262+
263+
let mut locked_outputs = self.outputs.lock().unwrap();
264+
265+
// Rebroadcast all outputs that didn't get confirmed by now.
266+
for output_info in locked_outputs.iter_mut().filter(|o| o.confirmed_in_block.is_none()) {
267+
let should_broadcast = if let Some(bcast_height) = output_info.broadcast_height {
268+
height >= bcast_height + num_blocks_from_conf_target(ConfirmationTarget::Background)
269+
} else {
270+
true
271+
};
272+
if should_broadcast {
273+
let output_descriptors = vec![output_info.descriptor.clone()];
274+
match self.get_spending_tx(&output_descriptors) {
275+
Ok(Some(spending_tx)) => {
276+
self.wallet.broadcast_transactions(&[&spending_tx]);
277+
if let Some(filter) = self.chain_source.as_ref() {
278+
filter.register_tx(&spending_tx.txid(), &Script::new())
279+
}
280+
output_info.spending_tx = Some(spending_tx);
281+
output_info.broadcast_height = Some(height);
282+
match self.persist_info(output_info) {
283+
Ok(()) => {}
284+
Err(e) => {
285+
log_error!(
286+
self.logger,
287+
"Error persisting spendable output info: {:?}",
288+
e
289+
)
290+
}
291+
}
292+
}
293+
Ok(None) => {
294+
log_debug!(
295+
self.logger,
296+
"Omitted spending static outputs: {:?}",
297+
output_descriptors
298+
);
299+
}
300+
Err(err) => {
301+
log_error!(self.logger, "Error spending outputs: {:?}", err);
302+
}
303+
};
304+
}
305+
}
306+
307+
// Prune all outputs that have sufficient depth by now.
308+
locked_outputs.retain(|o| {
309+
if let Some((conf_height, _)) = o.confirmed_in_block {
310+
if height >= conf_height + CONSIDERED_SPENT_THRESHOLD_CONF {
311+
let key = hex_utils::to_string(&o.id);
312+
match self.kv_store.remove(SPENDABLE_OUTPUT_INFO_PERSISTENCE_NAMESPACE, &key) {
313+
Ok(_) => return false,
314+
Err(e) => {
315+
log_error!(
316+
self.logger,
317+
"Removal of key {}/{} failed due to: {}",
318+
SPENDABLE_OUTPUT_INFO_PERSISTENCE_NAMESPACE,
319+
key,
320+
e
321+
);
322+
return true;
323+
}
324+
}
325+
}
326+
}
327+
true
328+
});
329+
}
330+
331+
fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
332+
let locked_outputs = self.outputs.lock().unwrap();
333+
locked_outputs
334+
.iter()
335+
.filter_map(|o| {
336+
if let Some(tx) = o.spending_tx.as_ref() {
337+
Some((tx.txid(), o.confirmed_in_block.map(|c| c.1)))
338+
} else {
339+
None
340+
}
341+
})
342+
.collect::<Vec<_>>()
343+
}
344+
}

0 commit comments

Comments
 (0)