Skip to content

Commit 96f23f4

Browse files
committed
Introduce KVStore trait and FilesystemStore impl
Rather than further relying on the upstream `KVStorePersister`/`KVStoreUnpersister`, we here implement a general `KVStore` trait that allows access to `Read`s/`TransactionalWrite`s which may be used to deserialize/serialize data via the `Readable`/`Writeable` implementations. Notably `TransactionalWrite` is a `Write` for which the written data needs to be explictly `commit`ed, asserting that we always persist either the whole new change or no change at all. Additionally, we avoid the `Info` umbrella term but opt to introduce `PaymentDetails` to align naming with upcoming `PeerDetails` and `ChannelDetails`.
1 parent 3b1be8d commit 96f23f4

File tree

11 files changed

+1344
-419
lines changed

11 files changed

+1344
-419
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ libc = "0.2"
6666
electrsd = { version = "0.22.0", features = ["legacy", "esplora_a33e97e1", "bitcoind_23_0"] }
6767
electrum-client = "0.12.0"
6868
proptest = "1.0.0"
69+
regex = "1.5.6"
6970

7071
[profile.release]
7172
panic = "abort"

src/event.rs

Lines changed: 142 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use crate::{
2-
hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PaymentDirection,
3-
PaymentInfo, PaymentInfoStorage, PaymentStatus, Wallet,
2+
hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PaymentDetails,
3+
PaymentDirection, PaymentStatus, PaymentStore, Wallet,
44
};
55

6-
use crate::io::KVStoreUnpersister;
6+
use crate::io::{
7+
KVStore, TransactionalWrite, EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_NAMESPACE,
8+
};
79
use crate::logger::{log_error, log_info, Logger};
810

911
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
@@ -14,7 +16,6 @@ use lightning::util::errors::APIError;
1416
use lightning::util::events::Event as LdkEvent;
1517
use lightning::util::events::EventHandler as LdkEventHandler;
1618
use lightning::util::events::PaymentPurpose;
17-
use lightning::util::persist::KVStorePersister;
1819
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
1920

2021
use bitcoin::secp256k1::Secp256k1;
@@ -24,9 +25,6 @@ use std::ops::Deref;
2425
use std::sync::{Arc, Condvar, Mutex};
2526
use std::time::Duration;
2627

27-
/// The event queue will be persisted under this key.
28-
pub(crate) const EVENTS_PERSISTENCE_KEY: &str = "events";
29-
3028
/// An event emitted by [`Node`], which should be handled by the user.
3129
///
3230
/// [`Node`]: [`crate::Node`]
@@ -86,30 +84,33 @@ impl_writeable_tlv_based_enum!(Event,
8684
};
8785
);
8886

89-
pub(crate) struct EventQueue<K: Deref>
87+
pub struct EventQueue<K: Deref, L: Deref>
9088
where
91-
K::Target: KVStorePersister,
89+
K::Target: KVStore,
90+
L::Target: Logger,
9291
{
9392
queue: Mutex<VecDeque<Event>>,
9493
notifier: Condvar,
95-
persister: K,
94+
kv_store: K,
95+
logger: L,
9696
}
9797

98-
impl<K: Deref> EventQueue<K>
98+
impl<K: Deref, L: Deref> EventQueue<K, L>
9999
where
100-
K::Target: KVStorePersister,
100+
K::Target: KVStore,
101+
L::Target: Logger,
101102
{
102-
pub(crate) fn new(persister: K) -> Self {
103+
pub(crate) fn new(kv_store: K, logger: L) -> Self {
103104
let queue: Mutex<VecDeque<Event>> = Mutex::new(VecDeque::new());
104105
let notifier = Condvar::new();
105-
Self { queue, notifier, persister }
106+
Self { queue, notifier, kv_store, logger }
106107
}
107108

108109
pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
109110
{
110111
let mut locked_queue = self.queue.lock().unwrap();
111112
locked_queue.push_back(event);
112-
self.persist_queue(&locked_queue)?;
113+
self.write_queue_and_commit(&locked_queue)?;
113114
}
114115

115116
self.notifier.notify_one();
@@ -126,32 +127,64 @@ where
126127
{
127128
let mut locked_queue = self.queue.lock().unwrap();
128129
locked_queue.pop_front();
129-
self.persist_queue(&locked_queue)?;
130+
self.write_queue_and_commit(&locked_queue)?;
130131
}
131132
self.notifier.notify_one();
132133
Ok(())
133134
}
134135

135-
fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
136-
self.persister
137-
.persist(EVENTS_PERSISTENCE_KEY, &EventQueueSerWrapper(locked_queue))
138-
.map_err(|_| Error::PersistenceFailed)?;
136+
fn write_queue_and_commit(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
137+
let mut writer = self
138+
.kv_store
139+
.write(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY)
140+
.map_err(|e| {
141+
log_error!(
142+
self.logger,
143+
"Getting writer for key {}/{} failed due to: {}",
144+
EVENT_QUEUE_PERSISTENCE_NAMESPACE,
145+
EVENT_QUEUE_PERSISTENCE_KEY,
146+
e
147+
);
148+
Error::PersistenceFailed
149+
})?;
150+
EventQueueSerWrapper(locked_queue).write(&mut writer).map_err(|e| {
151+
log_error!(
152+
self.logger,
153+
"Writing event queue data to key {}/{} failed due to: {}",
154+
EVENT_QUEUE_PERSISTENCE_NAMESPACE,
155+
EVENT_QUEUE_PERSISTENCE_KEY,
156+
e
157+
);
158+
Error::PersistenceFailed
159+
})?;
160+
writer.commit().map_err(|e| {
161+
log_error!(
162+
self.logger,
163+
"Committing event queue data to key {}/{} failed due to: {}",
164+
EVENT_QUEUE_PERSISTENCE_NAMESPACE,
165+
EVENT_QUEUE_PERSISTENCE_KEY,
166+
e
167+
);
168+
Error::PersistenceFailed
169+
})?;
139170
Ok(())
140171
}
141172
}
142173

143-
impl<K: Deref> ReadableArgs<K> for EventQueue<K>
174+
impl<K: Deref, L: Deref> ReadableArgs<(K, L)> for EventQueue<K, L>
144175
where
145-
K::Target: KVStorePersister,
176+
K::Target: KVStore,
177+
L::Target: Logger,
146178
{
147179
#[inline]
148180
fn read<R: lightning::io::Read>(
149-
reader: &mut R, persister: K,
181+
reader: &mut R, args: (K, L),
150182
) -> Result<Self, lightning::ln::msgs::DecodeError> {
183+
let (kv_store, logger) = args;
151184
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
152185
let queue: Mutex<VecDeque<Event>> = Mutex::new(read_queue.0);
153186
let notifier = Condvar::new();
154-
Ok(Self { queue, notifier, persister })
187+
Ok(Self { queue, notifier, kv_store, logger })
155188
}
156189
}
157190

@@ -184,29 +217,29 @@ impl Writeable for EventQueueSerWrapper<'_> {
184217

185218
pub(crate) struct EventHandler<K: Deref + Clone, L: Deref>
186219
where
187-
K::Target: KVStorePersister + KVStoreUnpersister,
220+
K::Target: KVStore,
188221
L::Target: Logger,
189222
{
190223
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>,
191-
event_queue: Arc<EventQueue<K>>,
224+
event_queue: Arc<EventQueue<K, L>>,
192225
channel_manager: Arc<ChannelManager>,
193226
network_graph: Arc<NetworkGraph>,
194227
keys_manager: Arc<KeysManager>,
195-
payment_store: Arc<PaymentInfoStorage<K>>,
228+
payment_store: Arc<PaymentStore<K, L>>,
196229
tokio_runtime: Arc<tokio::runtime::Runtime>,
197230
logger: L,
198231
_config: Arc<Config>,
199232
}
200233

201234
impl<K: Deref + Clone, L: Deref> EventHandler<K, L>
202235
where
203-
K::Target: KVStorePersister + KVStoreUnpersister,
236+
K::Target: KVStore,
204237
L::Target: Logger,
205238
{
206239
pub fn new(
207-
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>, event_queue: Arc<EventQueue<K>>,
240+
wallet: Arc<Wallet<bdk::database::SqliteDatabase>>, event_queue: Arc<EventQueue<K, L>>,
208241
channel_manager: Arc<ChannelManager>, network_graph: Arc<NetworkGraph>,
209-
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentInfoStorage<K>>,
242+
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentStore<K, L>>,
210243
tokio_runtime: Arc<tokio::runtime::Runtime>, logger: L, _config: Arc<Config>,
211244
) -> Self {
212245
Self {
@@ -225,7 +258,7 @@ where
225258

226259
impl<K: Deref + Clone, L: Deref> LdkEventHandler for EventHandler<K, L>
227260
where
228-
K::Target: KVStorePersister + KVStoreUnpersister,
261+
K::Target: KVStore,
229262
L::Target: Logger,
230263
{
231264
fn handle_event(&self, event: LdkEvent) {
@@ -306,7 +339,7 @@ where
306339
);
307340
self.channel_manager.fail_htlc_backwards(&payment_hash);
308341
self.payment_store
309-
.set_status(&payment_hash, PaymentStatus::Failed)
342+
.update(&payment_hash, None, None, None, Some(PaymentStatus::Failed))
310343
.expect("Failed to access payment store");
311344
return;
312345
}
@@ -341,7 +374,7 @@ where
341374
);
342375
self.channel_manager.fail_htlc_backwards(&payment_hash);
343376
self.payment_store
344-
.set_status(&payment_hash, PaymentStatus::Failed)
377+
.update(&payment_hash, None, None, None, Some(PaymentStatus::Failed))
345378
.expect("Failed to access payment store");
346379
}
347380
}
@@ -357,47 +390,84 @@ where
357390
hex_utils::to_string(&payment_hash.0),
358391
amount_msat,
359392
);
360-
let (payment_preimage, payment_secret) = match purpose {
393+
match purpose {
361394
PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => {
362-
(payment_preimage, Some(payment_secret))
395+
match self.payment_store.update(
396+
&payment_hash,
397+
Some(payment_preimage),
398+
Some(Some(payment_secret)),
399+
Some(Some(amount_msat)),
400+
Some(PaymentStatus::Succeeded),
401+
) {
402+
Ok(true) => (),
403+
Ok(false) => {
404+
log_error!(
405+
self.logger,
406+
"Payment with hash {} couldn't be found in store",
407+
hex_utils::to_string(&payment_hash.0)
408+
);
409+
debug_assert!(false);
410+
}
411+
Err(e) => {
412+
log_error!(
413+
self.logger,
414+
"Failed to update payment with hash {}: {}",
415+
hex_utils::to_string(&payment_hash.0),
416+
e
417+
);
418+
debug_assert!(false);
419+
}
420+
}
421+
}
422+
PaymentPurpose::SpontaneousPayment(preimage) => {
423+
let payment = PaymentDetails {
424+
preimage: Some(preimage),
425+
hash: payment_hash,
426+
secret: None,
427+
amount_msat: Some(amount_msat),
428+
direction: PaymentDirection::Inbound,
429+
status: PaymentStatus::Succeeded,
430+
};
431+
432+
match self.payment_store.insert(payment) {
433+
Ok(false) => (),
434+
Ok(true) => {
435+
log_error!(
436+
self.logger,
437+
"Spontaneous payment with hash {} was previosly known",
438+
hex_utils::to_string(&payment_hash.0)
439+
);
440+
debug_assert!(false);
441+
}
442+
Err(e) => {
443+
log_error!(
444+
self.logger,
445+
"Failed to insert payment with hash {}: {}",
446+
hex_utils::to_string(&payment_hash.0),
447+
e
448+
);
449+
debug_assert!(false);
450+
}
451+
}
363452
}
364-
PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None),
365453
};
366454

367-
let mut locked_store = self.payment_store.lock().unwrap();
368-
locked_store
369-
.entry(payment_hash)
370-
.and_modify(|payment_info| {
371-
payment_info.status = PaymentStatus::Succeeded;
372-
payment_info.preimage = payment_preimage;
373-
payment_info.secret = payment_secret;
374-
payment_info.amount_msat = Some(amount_msat);
375-
})
376-
.or_insert(PaymentInfo {
377-
preimage: payment_preimage,
378-
payment_hash,
379-
secret: payment_secret,
380-
amount_msat: Some(amount_msat),
381-
direction: PaymentDirection::Inbound,
382-
status: PaymentStatus::Succeeded,
383-
});
384-
385455
self.event_queue
386456
.add_event(Event::PaymentReceived { payment_hash, amount_msat })
387457
.expect("Failed to push to event queue");
388458
}
389459
LdkEvent::PaymentSent { payment_preimage, payment_hash, fee_paid_msat, .. } => {
390-
if let Some(mut payment_info) = self.payment_store.get(&payment_hash) {
391-
payment_info.preimage = Some(payment_preimage);
392-
payment_info.status = PaymentStatus::Succeeded;
460+
if let Some(mut payment) = self.payment_store.get(&payment_hash) {
461+
payment.preimage = Some(payment_preimage);
462+
payment.status = PaymentStatus::Succeeded;
393463
self.payment_store
394-
.insert(payment_info.clone())
464+
.insert(payment.clone())
395465
.expect("Failed to access payment store");
396466
log_info!(
397467
self.logger,
398468
"Successfully sent payment of {}msat{} from \
399469
payment hash {:?} with preimage {:?}",
400-
payment_info.amount_msat.unwrap(),
470+
payment.amount_msat.unwrap(),
401471
if let Some(fee) = fee_paid_msat {
402472
format!(" (fee {} msat)", fee)
403473
} else {
@@ -419,7 +489,7 @@ where
419489
);
420490

421491
self.payment_store
422-
.set_status(&payment_hash, PaymentStatus::Failed)
492+
.update(&payment_hash, None, None, None, Some(PaymentStatus::Failed))
423493
.expect("Failed to access payment store");
424494
self.event_queue
425495
.add_event(Event::PaymentFailed { payment_hash })
@@ -554,33 +624,36 @@ where
554624
#[cfg(test)]
555625
mod tests {
556626
use super::*;
557-
use crate::test::utils::TestPersister;
627+
use crate::test::utils::{TestLogger, TestStore};
558628

559629
#[test]
560630
fn event_queue_persistence() {
561-
let persister = Arc::new(TestPersister::new());
562-
let event_queue = EventQueue::new(Arc::clone(&persister));
631+
let store = Arc::new(TestStore::new());
632+
let logger = Arc::new(TestLogger::new());
633+
let event_queue = EventQueue::new(Arc::clone(&store), Arc::clone(&logger));
563634

564635
let expected_event = Event::ChannelReady { channel_id: [23u8; 32], user_channel_id: 2323 };
565636
event_queue.add_event(expected_event.clone()).unwrap();
566-
assert!(persister.get_and_clear_did_persist());
637+
assert!(store.get_and_clear_did_persist());
567638

568639
// Check we get the expected event and that it is returned until we mark it handled.
569640
for _ in 0..5 {
570641
assert_eq!(event_queue.next_event(), expected_event);
571-
assert_eq!(false, persister.get_and_clear_did_persist());
642+
assert_eq!(false, store.get_and_clear_did_persist());
572643
}
573644

574645
// Check we can read back what we persisted.
575-
let persisted_bytes = persister.get_persisted_bytes(EVENTS_PERSISTENCE_KEY).unwrap();
646+
let persisted_bytes = store
647+
.get_persisted_bytes(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY)
648+
.unwrap();
576649
let deser_event_queue =
577-
EventQueue::read(&mut &persisted_bytes[..], Arc::clone(&persister)).unwrap();
650+
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
578651
assert_eq!(deser_event_queue.next_event(), expected_event);
579-
assert!(!persister.get_and_clear_did_persist());
652+
assert!(!store.get_and_clear_did_persist());
580653

581654
// Check we persisted on `event_handled()`
582655
event_queue.event_handled().unwrap();
583656

584-
assert!(persister.get_and_clear_did_persist());
657+
assert!(store.get_and_clear_did_persist());
585658
}
586659
}

0 commit comments

Comments
 (0)