Skip to content

Commit 1d1146c

Browse files
committed
add persister that writes channel updates individually
1 parent 2510a20 commit 1d1146c

File tree

1 file changed

+326
-27
lines changed

1 file changed

+326
-27
lines changed

lightning/src/util/persist.rs

Lines changed: 326 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
//! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`],
99
//! and [`ChannelMonitor`] all in one place.
1010
11+
use core::convert::{TryFrom, TryInto};
1112
use core::ops::Deref;
1213
use bitcoin::hashes::hex::{FromHex, ToHex};
1314
use bitcoin::{BlockHash, Txid};
1415

1516
use crate::io;
17+
use crate::ln::msgs::DecodeError;
1618
use crate::prelude::{Vec, String};
1719
use crate::routing::scoring::WriteableScore;
1820

@@ -21,12 +23,12 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
2123
use crate::chain::chainmonitor::{Persist, MonitorUpdateId};
2224
use crate::sign::{EntropySource, NodeSigner, WriteableEcdsaChannelSigner, SignerProvider};
2325
use crate::chain::transaction::OutPoint;
24-
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate};
26+
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID};
2527
use crate::ln::channelmanager::ChannelManager;
2628
use crate::routing::router::Router;
2729
use crate::routing::gossip::NetworkGraph;
2830
use crate::util::logger::Logger;
29-
use crate::util::ser::{ReadableArgs, Writeable};
31+
use crate::util::ser::{Readable, ReadableArgs, Writeable};
3032

3133
/// The namespace under which the [`ChannelManager`] will be persisted.
3234
pub const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE: &str = "";
@@ -35,6 +37,8 @@ pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
3537

3638
/// The namespace under which [`ChannelMonitor`]s will be persisted.
3739
pub const CHANNEL_MONITOR_PERSISTENCE_NAMESPACE: &str = "monitors";
40+
/// The namespace under which [`ChannelMonitorUpdate`]s will be persisted.
41+
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE: &str = "monitors_updates";
3842

3943
/// The namespace under which the [`NetworkGraph`] will be persisted.
4044
pub const NETWORK_GRAPH_PERSISTENCE_NAMESPACE: &str = "";
@@ -134,33 +138,28 @@ impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Der
134138
}
135139
}
136140

137-
impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
138-
impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
139-
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
140-
// down once these start returning failure.
141-
// A PermanentFailure implies we should probably just shut down the node since we're
142-
// force-closing channels without even broadcasting!
141+
// impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore> Persist<ChannelSigner> for K {
142+
// // TODO: We really need a way for the persister to inform the user that its time to crash/shut
143+
// // down once these start returning failure.
144+
// // A PermanentFailure implies we should probably just shut down the node since we're
145+
// // force-closing channels without even broadcasting!
143146

144-
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
145-
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
146-
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
147-
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
148-
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
149-
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
150-
Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
151-
}
152-
}
147+
// fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
148+
// let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
149+
// match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
150+
// Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
151+
// Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
152+
// }
153+
// }
153154

154-
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
155-
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
156-
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
157-
let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
158-
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
159-
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
160-
Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
161-
}
162-
}
163-
}
155+
// fn update_persisted_channel(&self, funding_txo: OutPoint, _update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
156+
// let key = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
157+
// match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
158+
// Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
159+
// Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
160+
// }
161+
// }
162+
// }
164163

165164
/// Read previously persisted [`ChannelMonitor`]s from the store.
166165
pub fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
@@ -207,3 +206,303 @@ where
207206
}
208207
Ok(res)
209208
}
209+
210+
enum KVStoreChannelMonitorReaderError {
211+
/// The monitor name was improperly formatted.
212+
BadMonitorName(String, String),
213+
/// The monitor could not be decoded.
214+
MonitorDecodeFailed(DecodeError, String),
215+
/// The update could not be decoded.
216+
UpdateDecodeFailed(DecodeError, String),
217+
/// Storage could not be read.
218+
StorageReadFailed(io::Error, String),
219+
/// An update could not be applied to a monitor.
220+
UpdateFailed(String, String),
221+
}
222+
223+
impl From<KVStoreChannelMonitorReaderError> for io::Error {
224+
fn from(value: KVStoreChannelMonitorReaderError) -> Self {
225+
match value {
226+
KVStoreChannelMonitorReaderError::BadMonitorName(reason, context) => {
227+
io::Error::new(io::ErrorKind::InvalidInput, format!("{reason}, context: {context}'"))
228+
},
229+
KVStoreChannelMonitorReaderError::MonitorDecodeFailed(reason, context) => {
230+
io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'"))
231+
},
232+
KVStoreChannelMonitorReaderError::UpdateDecodeFailed(reason, context) => {
233+
io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'"))
234+
},
235+
KVStoreChannelMonitorReaderError::StorageReadFailed(reason, context) => {
236+
io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context:?}'"))
237+
},
238+
KVStoreChannelMonitorReaderError::UpdateFailed(reason, context) => {
239+
io::Error::new(io::ErrorKind::InvalidData, format!("{reason}, context: {context}'"))
240+
},
241+
}
242+
}
243+
}
244+
245+
/// A struct representing a name for a monitor.
246+
#[derive(Clone, Debug)]
247+
pub struct MonitorName(String);
248+
249+
impl TryFrom<MonitorName> for OutPoint {
250+
type Error = std::io::Error;
251+
252+
fn try_from(value: MonitorName) -> Result<Self, io::Error> {
253+
let (txid_hex, index) = value.0.split_once('_').ok_or_else(|| {
254+
KVStoreChannelMonitorReaderError::BadMonitorName("no underscore".to_string(), value.0.clone())
255+
})?;
256+
let index = index.parse().map_err(|e| {
257+
KVStoreChannelMonitorReaderError::BadMonitorName(
258+
format!("bad index value, caused by {e}"),
259+
value.0.clone(),
260+
)
261+
})?;
262+
let txid = Txid::from_hex(txid_hex).map_err(|e| {
263+
KVStoreChannelMonitorReaderError::BadMonitorName(
264+
format!("bad txid, caused by: {e}"),
265+
value.0.clone(),
266+
)
267+
})?;
268+
Ok(OutPoint { txid, index })
269+
}
270+
}
271+
272+
impl From<OutPoint> for MonitorName {
273+
fn from(value: OutPoint) -> Self {
274+
MonitorName(format!("{}_{}", value.txid.to_hex(), value.index))
275+
}
276+
}
277+
278+
/// A struct representing a name for an update.
279+
#[derive(Clone, Debug)]
280+
pub struct UpdateName(String);
281+
282+
impl From<u64> for UpdateName {
283+
fn from(value: u64) -> Self {
284+
Self(format!("{:0>20}", value))
285+
}
286+
}
287+
288+
#[allow(clippy::type_complexity)]
289+
pub trait KVStoreChannelMonitorReader<K: KVStore> {
290+
fn read_channelmonitors<ES: Deref + Clone, SP: Deref + Clone, B: Deref, F: Deref + Clone, L: Deref>(
291+
&self, entropy_source: ES, signer_provider: SP, broadcaster: &B, fee_estimator: F,
292+
logger: &L,
293+
) -> std::io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
294+
where
295+
ES::Target: EntropySource + Sized,
296+
SP::Target: SignerProvider + Sized,
297+
B::Target: BroadcasterInterface,
298+
F::Target: FeeEstimator,
299+
L::Target: Logger;
300+
/// List all the names of monitors.
301+
fn list_monitor_names(&self) -> io::Result<Vec<MonitorName>>;
302+
/// Key to a specific monitor.
303+
fn monitor_key(&self, monitor_name: &MonitorName) -> String;
304+
/// Deserialize a channel monitor.
305+
fn deserialize_monitor<ES: Deref, SP: Deref>(
306+
&self, entropy_source: ES, signer_provider: SP, monitor_name: MonitorName,
307+
) -> io::Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>
308+
where
309+
ES::Target: EntropySource + Sized,
310+
SP::Target: SignerProvider + Sized;
311+
/// List all the names of updates corresponding to a given monitor name.
312+
fn list_update_names(&self, monitor_name: &MonitorName) -> io::Result<Vec<UpdateName>>;
313+
/// Path to corresponding update directory for a given monitor name.
314+
fn path_to_monitor_updates(&self, monitor_name: &MonitorName) -> String;
315+
/// Deserialize a channel monitor update.
316+
fn deserialize_monitor_update(
317+
&self, monitor_name: &MonitorName, update_name: &UpdateName,
318+
) -> io::Result<ChannelMonitorUpdate>;
319+
/// Key to a specific update.
320+
fn update_key(&self, monitor_name: &MonitorName, update_name: &UpdateName) -> String;
321+
/// Delete updates with an update_id lower than the given channel monitor.
322+
fn delete_stale_updates<ChannelSigner: WriteableEcdsaChannelSigner>(
323+
&self, channel_id: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
324+
) -> io::Result<()>;
325+
}
326+
327+
impl<K: KVStore> KVStoreChannelMonitorReader<K> for K {
328+
fn read_channelmonitors<
329+
ES: Deref + Clone,
330+
SP: Deref + Clone,
331+
B: Deref,
332+
F: Deref + Clone,
333+
L: Deref,
334+
>(
335+
&self, entropy_source: ES, signer_provider: SP, broadcaster: &B, fee_estimator: F,
336+
logger: &L,
337+
) -> std::io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
338+
where
339+
ES::Target: EntropySource + Sized,
340+
SP::Target: SignerProvider + Sized,
341+
B::Target: BroadcasterInterface,
342+
F::Target: FeeEstimator,
343+
L::Target: Logger
344+
{
345+
let mut res = Vec::new();
346+
// for each monitor...
347+
for monitor_name in self.list_monitor_names()? {
348+
// ...parse the monitor
349+
let (bh, monitor) = self.deserialize_monitor(
350+
entropy_source.clone(),
351+
signer_provider.clone(),
352+
monitor_name.clone(),
353+
)?;
354+
// ...parse and apply the updates with an id higher than the monitor.
355+
for update_name in self.list_update_names(&monitor_name)? {
356+
let update = self.deserialize_monitor_update(&monitor_name, &update_name)?;
357+
if update.update_id == CLOSED_CHANNEL_UPDATE_ID
358+
|| update.update_id > monitor.get_latest_update_id()
359+
{
360+
monitor
361+
.update_monitor(&update, broadcaster, fee_estimator.clone(), logger)
362+
.map_err(|_| {
363+
KVStoreChannelMonitorReaderError::UpdateFailed(
364+
"update_monitor returned Err(())".to_string(),
365+
format!("monitor: {:?}", monitor_name),
366+
)
367+
})?;
368+
}
369+
}
370+
// ...push the result into the return vec
371+
res.push((bh, monitor))
372+
}
373+
Ok(res)
374+
}
375+
376+
/// Key to a specific monitor.
377+
fn monitor_key(&self, monitor_name: &MonitorName) -> String {
378+
CHANNEL_MONITOR_PERSISTENCE_NAMESPACE.to_owned() + &monitor_name.0
379+
}
380+
381+
/// Key to a specific update.
382+
fn update_key(&self, monitor_name: &MonitorName, update_name: &UpdateName) -> String {
383+
self.path_to_monitor_updates(monitor_name) + &update_name.0
384+
}
385+
386+
/// List all the names of monitors.
387+
fn list_monitor_names(&self) -> io::Result<Vec<MonitorName>> {
388+
Ok(self.list(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE)?.into_iter().map(MonitorName).collect())
389+
}
390+
391+
/// List all the names of updates corresponding to a given monitor name.
392+
fn list_update_names(&self, monitor_name: &MonitorName) -> io::Result<Vec<UpdateName>> {
393+
let update_dir_path = self.path_to_monitor_updates(monitor_name);
394+
Ok(self.list(&update_dir_path)?.into_iter().map(UpdateName).collect())
395+
}
396+
397+
/// Path to corresponding update directory for a given monitor name.
398+
fn path_to_monitor_updates(&self, monitor_name: &MonitorName) -> String {
399+
CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE.to_owned() + &monitor_name.0
400+
}
401+
402+
/// Deserialize a channel monitor.
403+
fn deserialize_monitor<ES: Deref, SP: Deref>(
404+
&self, entropy_source: ES, signer_provider: SP, monitor_name: MonitorName,
405+
) -> io::Result<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>
406+
where
407+
ES::Target: EntropySource + Sized,
408+
SP::Target: SignerProvider + Sized
409+
{
410+
let key = self.monitor_key(&monitor_name);
411+
let outpoint: OutPoint = monitor_name.try_into()?;
412+
match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(
413+
&mut self.read(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key)
414+
.map_err(|e| KVStoreChannelMonitorReaderError::StorageReadFailed(e, key.to_owned()))?,
415+
(&*entropy_source, &*signer_provider),
416+
) {
417+
Ok((blockhash, channel_monitor)) => {
418+
if channel_monitor.get_funding_txo().0.txid != outpoint.txid
419+
|| channel_monitor.get_funding_txo().0.index != outpoint.index
420+
{
421+
return Err(KVStoreChannelMonitorReaderError::MonitorDecodeFailed(
422+
DecodeError::InvalidValue,
423+
key,
424+
)
425+
.into());
426+
}
427+
Ok((blockhash, channel_monitor))
428+
}
429+
Err(e) => Err(KVStoreChannelMonitorReaderError::MonitorDecodeFailed(e, key).into()),
430+
}
431+
}
432+
433+
/// Deserialize a channel monitor update.
434+
fn deserialize_monitor_update(
435+
&self, monitor_name: &MonitorName, update_name: &UpdateName,
436+
) -> io::Result<ChannelMonitorUpdate>
437+
{
438+
let key = self.update_key(monitor_name, update_name);
439+
Ok(ChannelMonitorUpdate::read(&mut self
440+
.read(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, &key)
441+
.map_err(|e| KVStoreChannelMonitorReaderError::StorageReadFailed(e, key.to_owned()))?)
442+
.map_err(|e| KVStoreChannelMonitorReaderError::UpdateDecodeFailed(e, key))?)
443+
}
444+
445+
/// Delete updates with an update_id lower than the given channel monitor.
446+
fn delete_stale_updates<ChannelSigner: WriteableEcdsaChannelSigner>(
447+
&self, channel_id: OutPoint, monitor: &ChannelMonitor<ChannelSigner>,
448+
) -> io::Result<()>
449+
{
450+
let monitor_name: MonitorName = channel_id.into();
451+
let update_names =
452+
self.list_update_names(&monitor_name)?;
453+
for update_name in update_names {
454+
let update =
455+
self.deserialize_monitor_update(&monitor_name, &update_name)?;
456+
if update.update_id != CLOSED_CHANNEL_UPDATE_ID
457+
&& update.update_id <= monitor.get_latest_update_id()
458+
{
459+
self.remove(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, &self.update_key(&monitor_name, &update_name))?;
460+
}
461+
}
462+
Ok(())
463+
}
464+
}
465+
466+
impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + KVStoreChannelMonitorReader<K>> Persist<ChannelSigner> for K {
467+
// TODO: We really need a way for the persister to inform the user that its time to crash/shut
468+
// down once these start returning failure.
469+
// A PermanentFailure implies we should probably just shut down the node since we're
470+
// force-closing channels without even broadcasting!
471+
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus
472+
{
473+
let key = self.monitor_key(&funding_txo.into());
474+
match self.write(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &key, &monitor.encode()) {
475+
Ok(()) => {
476+
if let Err(_e) = self.delete_stale_updates(funding_txo, monitor) {
477+
// TODO(domz): what to do? seems like an error or panic is needed, but OTOH cleanup is technically optional
478+
//log_error!(self.logger, "error cleaning up channel monitor updates! {}", e);
479+
};
480+
chain::ChannelMonitorUpdateStatus::Completed
481+
},
482+
Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
483+
}
484+
}
485+
486+
fn update_persisted_channel(
487+
&self, funding_txo: OutPoint, update: Option<&ChannelMonitorUpdate>,
488+
monitor: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId,
489+
) -> chain::ChannelMonitorUpdateStatus {
490+
match update {
491+
Some(update) => {
492+
// This is an update to the monitor, which we persist to apply on restart.
493+
// IMPORTANT: update_id: MonitorUpdateId is not to be confused with ChannelMonitorUpdate.update_id.
494+
// The first is an opaque identifier for this call (used for calling back write completion). The second
495+
// is the channel update sequence number.
496+
let key = self.update_key(&funding_txo.into(), &update.update_id.into());
497+
match self.write(CHANNEL_MONITOR_UPDATE_PERSISTENCE_NAMESPACE, &key, &update.encode()) {
498+
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
499+
Err(_) => chain::ChannelMonitorUpdateStatus::PermanentFailure,
500+
}
501+
}
502+
// A new block. Now we need to persist the entire new monitor and discard the old
503+
// updates.
504+
None => self.persist_new_channel(funding_txo, monitor, update_id),
505+
}
506+
}
507+
508+
}

0 commit comments

Comments
 (0)