-
Notifications
You must be signed in to change notification settings - Fork 411
Move persist into async part of the sweeper #3819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -382,8 +382,10 @@ where | |
output_spender: O, change_destination_source: D, kv_store: K, logger: L, | ||
) -> Self { | ||
let outputs = Vec::new(); | ||
let sweeper_state = | ||
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } }); | ||
let sweeper_state = Mutex::new(SweeperState { | ||
persistent: PersistentSweeperState { outputs, best_block }, | ||
dirty: false, | ||
}); | ||
Self { | ||
sweeper_state, | ||
pending_sweep: AtomicBool::new(false), | ||
|
@@ -447,7 +449,10 @@ where | |
} | ||
self.persist_state(&state_lock.persistent).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}) | ||
})?; | ||
state_lock.dirty = false; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Returns a list of the currently tracked spendable outputs. | ||
|
@@ -504,12 +509,20 @@ where | |
|
||
// See if there is anything to sweep before requesting a change address. | ||
{ | ||
let sweeper_state = self.sweeper_state.lock().unwrap(); | ||
let mut sweeper_state = self.sweeper_state.lock().unwrap(); | ||
|
||
let cur_height = sweeper_state.persistent.best_block.height; | ||
let has_respends = | ||
sweeper_state.persistent.outputs.iter().any(|o| filter_fn(o, cur_height)); | ||
if !has_respends { | ||
// If there is nothing to sweep, we still persist the state if it is dirty. | ||
if sweeper_state.dirty { | ||
self.persist_state(&sweeper_state.persistent).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
})?; | ||
sweeper_state.dirty = false; | ||
} | ||
|
||
return Ok(()); | ||
} | ||
} | ||
|
@@ -534,7 +547,8 @@ where | |
.collect(); | ||
|
||
if respend_descriptors.is_empty() { | ||
// It could be that a tx confirmed and there is now nothing to sweep anymore. | ||
// It could be that a tx confirmed and there is now nothing to sweep anymore. If there is dirty state, | ||
// we'll persist it in the next cycle. | ||
return Ok(()); | ||
} | ||
|
||
|
@@ -570,6 +584,7 @@ where | |
self.persist_state(&sweeper_state.persistent).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
})?; | ||
sweeper_state.dirty = false; | ||
|
||
self.broadcaster.broadcast_transactions(&[&spending_tx]); | ||
} | ||
|
@@ -595,6 +610,8 @@ where | |
} | ||
true | ||
}); | ||
|
||
sweeper_state.dirty = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems this is redundant, but probably also can't hurt. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Leaving as is, to at least flag dirty in the functions that change state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also not doing the RAII pattern. There are just a few places where dirty needs to be cleared, and seems fine for now. |
||
} | ||
|
||
fn persist_state(&self, sweeper_state: &PersistentSweeperState) -> Result<(), io::Error> { | ||
|
@@ -648,13 +665,17 @@ where | |
} | ||
} | ||
} | ||
|
||
sweeper_state.dirty = true; | ||
} | ||
|
||
fn best_block_updated_internal( | ||
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32, | ||
) { | ||
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height); | ||
self.prune_confirmed_outputs(sweeper_state); | ||
|
||
sweeper_state.dirty = true; | ||
} | ||
} | ||
|
||
|
@@ -678,12 +699,8 @@ where | |
assert_eq!(state_lock.persistent.best_block.height, height - 1, | ||
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); | ||
|
||
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); | ||
self.best_block_updated_internal(&mut *state_lock, header, height); | ||
|
||
let _ = self.persist_state(&state_lock.persistent).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height); | ||
self.best_block_updated_internal(&mut state_lock, header, height); | ||
} | ||
|
||
fn block_disconnected(&self, header: &Header, height: u32) { | ||
|
@@ -705,9 +722,7 @@ where | |
} | ||
} | ||
|
||
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
state_lock.dirty = true; | ||
} | ||
} | ||
|
||
|
@@ -727,9 +742,6 @@ where | |
) { | ||
let mut state_lock = self.sweeper_state.lock().unwrap(); | ||
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); | ||
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
} | ||
|
||
fn transaction_unconfirmed(&self, txid: &Txid) { | ||
|
@@ -752,18 +764,13 @@ where | |
.filter(|o| o.status.confirmation_height() >= Some(unconf_height)) | ||
.for_each(|o| o.status.unconfirmed()); | ||
|
||
self.persist_state(&state_lock.persistent).unwrap_or_else(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
state_lock.dirty = true; | ||
} | ||
} | ||
|
||
fn best_block_updated(&self, header: &Header, height: u32) { | ||
let mut state_lock = self.sweeper_state.lock().unwrap(); | ||
self.best_block_updated_internal(&mut *state_lock, header, height); | ||
let _ = self.persist_state(&state_lock.persistent).map_err(|e| { | ||
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); | ||
}); | ||
self.best_block_updated_internal(&mut state_lock, header, height); | ||
} | ||
|
||
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> { | ||
|
@@ -792,6 +799,7 @@ where | |
#[derive(Debug)] | ||
struct SweeperState { | ||
persistent: PersistentSweeperState, | ||
dirty: bool, | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
|
@@ -856,7 +864,7 @@ where | |
} | ||
} | ||
|
||
let sweeper_state = Mutex::new(SweeperState { persistent: state }); | ||
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false }); | ||
Ok(Self { | ||
sweeper_state, | ||
pending_sweep: AtomicBool::new(false), | ||
|
@@ -905,7 +913,7 @@ where | |
} | ||
} | ||
|
||
let sweeper_state = Mutex::new(SweeperState { persistent: state }); | ||
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false }); | ||
Ok(( | ||
best_block, | ||
OutputSweeper { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO adding the wrapper struct causes us to litter the code with
.persistent
in a lot of places where it's not relevant what's written to disk and what's not, and I'm not sure the concrete benefit besides the principle. Not worth holding up the PR though.