Skip to content

Commit 6c41849

Browse files
committed
Restore event queue from disk in setup
1 parent e499d12 commit 6c41849

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

src/lib.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ mod logger;
3434
use access::LdkLiteChainAccess;
3535
pub use error::LdkLiteError as Error;
3636
pub use event::LdkLiteEvent;
37-
use event::LdkLiteEventHandler;
37+
use event::{LdkLiteEventHandler, LdkLiteEventQueue};
3838

3939
#[allow(unused_imports)]
4040
use logger::{
@@ -81,16 +81,12 @@ use std::collections::HashMap;
8181
use std::fs;
8282
use std::net::SocketAddr;
8383
use std::sync::atomic::{AtomicBool, Ordering};
84-
use std::sync::mpsc;
8584
use std::sync::{Arc, Mutex, RwLock};
8685
use std::thread;
8786
use std::time::{Duration, SystemTime};
8887

8988
// TODO: Is MemoryDatabase okay to use?
9089

91-
// The number of messages we buffer in the used channels.
92-
const CHANNEL_BUF_SIZE: usize = 1000;
93-
9490
// The used 'stop gap' parameter used by BDK's wallet sync. This seems to configure the threshold
9591
// number of blocks after which BDK stops looking for scripts belonging to the wallet.
9692
const BDK_CLIENT_STOP_GAP: usize = 20;
@@ -328,17 +324,23 @@ impl LdkLiteBuilder {
328324
let inbound_payments = Arc::new(Mutex::new(HashMap::new()));
329325
let outbound_payments = Arc::new(Mutex::new(HashMap::new()));
330326

331-
// Step 14: Handle LDK Events
332-
let event_queue = mpsc::sync_channel(CHANNEL_BUF_SIZE);
333-
let event_sender = event_queue.0.clone();
327+
// Step 14: Restore event handler from disk or create a new one.
328+
let event_queue = if let Ok(mut f) =
329+
fs::File::open(format!("{}/{}", ldk_data_dir.clone(), event::EVENTS_PERSISTENCE_KEY))
330+
{
331+
Arc::new(LdkLiteEventQueue::read(&mut f, Arc::clone(&persister))?)
332+
} else {
333+
Arc::new(LdkLiteEventQueue::new(Arc::clone(&persister)))
334+
};
335+
334336
let event_handler = LdkLiteEventHandler::new(
335337
Arc::clone(&chain_access),
338+
Arc::clone(&event_queue),
336339
Arc::clone(&channel_manager),
337340
Arc::clone(&network_graph),
338341
Arc::clone(&keys_manager),
339342
Arc::clone(&inbound_payments),
340343
Arc::clone(&outbound_payments),
341-
event_sender,
342344
Arc::clone(&logger),
343345
Arc::clone(&config),
344346
);
@@ -365,6 +367,7 @@ impl LdkLiteBuilder {
365367
running,
366368
config,
367369
chain_access,
370+
event_queue,
368371
channel_manager,
369372
chain_monitor,
370373
peer_manager,
@@ -377,7 +380,6 @@ impl LdkLiteBuilder {
377380
invoice_payer,
378381
inbound_payments,
379382
outbound_payments,
380-
event_queue,
381383
})
382384
}
383385
}
@@ -398,6 +400,7 @@ pub struct LdkLite {
398400
running: RwLock<Option<LdkLiteRuntime>>,
399401
config: Arc<LdkLiteConfig>,
400402
chain_access: Arc<LdkLiteChainAccess<MemoryDatabase>>,
403+
event_queue: Arc<LdkLiteEventQueue<FilesystemPersister>>,
401404
channel_manager: Arc<ChannelManager>,
402405
chain_monitor: Arc<ChainMonitor>,
403406
peer_manager: Arc<PeerManager>,
@@ -410,7 +413,6 @@ pub struct LdkLite {
410413
invoice_payer: Arc<InvoicePayer<LdkLiteEventHandler>>,
411414
inbound_payments: Arc<PaymentInfoStorage>,
412415
outbound_payments: Arc<PaymentInfoStorage>,
413-
event_queue: (EventSender, EventReceiver),
414416
}
415417

416418
impl LdkLite {
@@ -579,9 +581,16 @@ impl LdkLite {
579581
})
580582
}
581583

582-
/// Returns the next event from the event queue. Blocks until a new event is available.
583-
pub fn next_event(&self) -> Result<LdkLiteEvent, Error> {
584-
Ok(self.event_queue.1.recv()?)
584+
/// Blocks until the next event is available.
585+
///
586+
/// Note: this will always return the same event until handling is confirmed via [`event_handled`].
587+
pub fn next_event(&self) -> LdkLiteEvent {
588+
self.event_queue.next_event()
589+
}
590+
591+
/// Confirm the last retrieved event handled.
592+
pub fn event_handled(&self) -> Result<(), Error> {
593+
self.event_queue.event_handled()
585594
}
586595

587596
/// Returns our own node id
@@ -955,8 +964,5 @@ pub(crate) type NetworkGraph = gossip::NetworkGraph<Arc<FilesystemLogger>>;
955964

956965
pub(crate) type PaymentInfoStorage = Mutex<HashMap<PaymentHash, PaymentInfo>>;
957966

958-
pub(crate) type EventSender = mpsc::SyncSender<LdkLiteEvent>;
959-
pub(crate) type EventReceiver = mpsc::Receiver<LdkLiteEvent>;
960-
961967
#[cfg(test)]
962968
mod tests {}

0 commit comments

Comments
 (0)