Skip to content

Commit 7d7e572

Browse files
committed
f Return trait objects from KVStore, update io::util deser methods
1 parent f63e633 commit 7d7e572

File tree

7 files changed

+253
-162
lines changed

7 files changed

+253
-162
lines changed

src/event.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
PaymentInfo, PaymentInfoStorage, PaymentStatus, Wallet,
44
};
55

6-
use crate::io::KVStoreUnpersister;
6+
use crate::io::{KVStoreUnpersister, EVENT_QUEUE_PERSISTENCE_KEY};
77
use crate::logger::{log_error, log_info, Logger};
88

99
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
@@ -24,9 +24,6 @@ use std::ops::Deref;
2424
use std::sync::{Arc, Condvar, Mutex};
2525
use std::time::Duration;
2626

27-
/// The event queue will be persisted under this key.
28-
pub(crate) const EVENTS_PERSISTENCE_KEY: &str = "events";
29-
3027
/// An event emitted by [`Node`], which should be handled by the user.
3128
///
3229
/// [`Node`]: [`crate::Node`]
@@ -86,7 +83,7 @@ impl_writeable_tlv_based_enum!(Event,
8683
};
8784
);
8885

89-
pub(crate) struct EventQueue<K: Deref>
86+
pub struct EventQueue<K: Deref>
9087
where
9188
K::Target: KVStorePersister,
9289
{
@@ -134,7 +131,7 @@ where
134131

135132
fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
136133
self.persister
137-
.persist(EVENTS_PERSISTENCE_KEY, &EventQueueSerWrapper(locked_queue))
134+
.persist(EVENT_QUEUE_PERSISTENCE_KEY, &EventQueueSerWrapper(locked_queue))
138135
.map_err(|_| Error::PersistenceFailed)?;
139136
Ok(())
140137
}
@@ -572,7 +569,7 @@ mod tests {
572569
}
573570

574571
// Check we can read back what we persisted.
575-
let persisted_bytes = persister.get_persisted_bytes(EVENTS_PERSISTENCE_KEY).unwrap();
572+
let persisted_bytes = persister.get_persisted_bytes(EVENT_QUEUE_PERSISTENCE_KEY).unwrap();
576573
let deser_event_queue =
577574
EventQueue::read(&mut &persisted_bytes[..], Arc::clone(&persister)).unwrap();
578575
assert_eq!(deser_event_queue.next_event(), expected_event);

src/io/fs_store.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,19 @@ impl FilesystemStore {
4646
}
4747
}
4848

49-
impl KVStore<FilesystemReader, FilesystemWriter> for FilesystemStore {
50-
fn read(&self, namespace: &str, key: &str) -> std::io::Result<FilesystemReader> {
49+
impl KVStore for FilesystemStore {
50+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Box<dyn Read>> {
5151
let mut dest_file = self.dest_dir.clone();
5252
dest_file.push(namespace);
5353
dest_file.push(key);
54-
FilesystemReader::new(dest_file)
54+
FilesystemReader::new(dest_file).map(|r| Box::new(r) as Box<dyn Read>)
5555
}
5656

57-
fn write(&self, namespace: &str, key: &str) -> std::io::Result<FilesystemWriter> {
57+
fn write(&self, namespace: &str, key: &str) -> std::io::Result<Box<dyn TransactionalWrite>> {
5858
let mut dest_file = self.dest_dir.clone();
5959
dest_file.push(namespace);
6060
dest_file.push(key);
61-
FilesystemWriter::new(dest_file)
61+
FilesystemWriter::new(dest_file).map(|w| Box::new(w) as Box<dyn TransactionalWrite>)
6262
}
6363

6464
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {

src/io/mod.rs

Lines changed: 22 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,31 @@
11
pub(crate) mod fs_store;
22
pub(crate) mod utils;
33

4-
use lightning::chain::channelmonitor::ChannelMonitor;
5-
use lightning::chain::keysinterface::{EntropySource, SignerProvider};
6-
use lightning::util::ser::ReadableArgs;
7-
8-
use bitcoin::hash_types::{BlockHash, Txid};
9-
use bitcoin::hashes::hex::FromHex;
10-
114
use std::io::{Read, Write};
12-
use std::ops::Deref;
135

14-
// The namespace and key LDK uses for persisting
15-
pub(crate) const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
6+
// The namespacs and keys LDK uses for persisting
167
pub(crate) const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE: &str = "";
8+
pub(crate) const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager";
179

1810
pub(crate) const CHANNEL_MONITOR_PERSISTENCE_NAMESPACE: &str = "monitors";
1911

12+
pub(crate) const NETWORK_GRAPH_PERSISTENCE_NAMESPACE: &str = "";
13+
pub(crate) const NETWORK_GRAPH_PERSISTENCE_KEY: &str = "network_graph";
14+
15+
pub(crate) const SCORER_PERSISTENCE_NAMESPACE: &str = "";
16+
pub(crate) const SCORER_PERSISTENCE_KEY: &str = "scorer";
17+
18+
/// The event queue will be persisted under this key.
19+
pub(crate) const EVENT_QUEUE_PERSISTENCE_NAMESPACE: &str = "";
20+
pub(crate) const EVENT_QUEUE_PERSISTENCE_KEY: &str = "events";
21+
22+
/// The peer information will be persisted under this key.
23+
pub(crate) const PEER_INFO_PERSISTENCE_NAMESPACE: &str = "";
24+
pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
25+
26+
/// The payment information will be persisted under this prefix.
27+
pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments";
28+
2029
/// Provides an interface that allows to store and retrieve persisted values that are associated
2130
/// with given keys.
2231
///
@@ -26,71 +35,24 @@ pub(crate) const CHANNEL_MONITOR_PERSISTENCE_NAMESPACE: &str = "monitors";
2635
///
2736
/// Keys and namespaces are required to be valid ASCII strings and the empty namespace (`""`) is
2837
/// assumed to be valid namespace.
29-
pub trait KVStore<R: Read, TW: TransactionalWrite> {
38+
pub trait KVStore {
3039
/// Returns a [`Read`] for the given `key` from which [`Readable`]s may be read.
3140
///
3241
/// [`Readable`]: lightning::util::ser::Readable
33-
fn read(&self, namespace: &str, key: &str) -> std::io::Result<R>;
42+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Box<dyn Read>>;
3443
/// Returns a [`TransactionalWrite`] for the given `key` to which [`Writeable`]s may be written.
3544
///
3645
/// Note that [`TransactionalWrite::commit`] MUST be called to commit the written data, otherwise
3746
/// the changes won't be persisted.
3847
///
3948
/// [`Writeable`]: lightning::util::ser::Writeable
40-
fn write(&self, namespace: &str, key: &str) -> std::io::Result<TW>;
49+
fn write(&self, namespace: &str, key: &str) -> std::io::Result<Box<dyn TransactionalWrite>>;
4150
/// Removes any data that had previously been persisted under the given `key`.
4251
///
4352
/// Returns `true` if the key was present, and `false` otherwise.
4453
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool>;
4554
/// Returns a list of keys that are stored under the given `namespace`.
4655
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>>;
47-
/// Read the persisted [`ChannelMonitor`]s from the store.
48-
fn read_channelmonitors<ES: Deref, SP: Deref>(
49-
&self, entropy_source: ES, signer_provider: SP,
50-
) -> std::io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
51-
where
52-
ES::Target: EntropySource + Sized,
53-
SP::Target: SignerProvider + Sized,
54-
{
55-
let mut res = Vec::new();
56-
57-
for stored_key in self.list(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE)? {
58-
let txid = Txid::from_hex(stored_key.split_at(64).0).map_err(|_| {
59-
std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
60-
})?;
61-
62-
let index: u16 = stored_key.split_at(65).1.parse().map_err(|_| {
63-
std::io::Error::new(
64-
std::io::ErrorKind::InvalidData,
65-
"Invalid tx index in stored key",
66-
)
67-
})?;
68-
69-
match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(
70-
&mut self.read(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &stored_key)?,
71-
(&*entropy_source, &*signer_provider),
72-
) {
73-
Ok((block_hash, channel_monitor)) => {
74-
if channel_monitor.get_funding_txo().0.txid != txid
75-
|| channel_monitor.get_funding_txo().0.index != index
76-
{
77-
return Err(std::io::Error::new(
78-
std::io::ErrorKind::InvalidData,
79-
"ChannelMonitor was stored under the wrong key",
80-
));
81-
}
82-
res.push((block_hash, channel_monitor));
83-
}
84-
Err(e) => {
85-
return Err(std::io::Error::new(
86-
std::io::ErrorKind::InvalidData,
87-
format!("Failed to deserialize ChannelMonitor: {}", e),
88-
))
89-
}
90-
}
91-
}
92-
Ok(res)
93-
}
9456
}
9557

9658
/// A [`Write`] asserting data consistency.

src/io/utils.rs

Lines changed: 129 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
1-
use crate::payment_store::{PaymentInfo, PAYMENT_INFO_PERSISTENCE_NAMESPACE};
2-
use crate::{Config, FilesystemLogger, NetworkGraph, Scorer, WALLET_KEYS_SEED_LEN};
1+
use super::*;
2+
use crate::WALLET_KEYS_SEED_LEN;
33

4+
use crate::peer_store::PeerInfoStorage;
5+
use crate::{EventQueue, PaymentInfo};
6+
7+
use lightning::chain::channelmonitor::ChannelMonitor;
8+
use lightning::chain::keysinterface::{EntropySource, SignerProvider};
9+
use lightning::routing::gossip::NetworkGraph;
410
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters};
11+
use lightning::util::logger::Logger;
12+
use lightning::util::persist::KVStorePersister;
513
use lightning::util::ser::{Readable, ReadableArgs};
614

15+
use bitcoin::hash_types::{BlockHash, Txid};
16+
use bitcoin::hashes::hex::FromHex;
717
use rand::{thread_rng, RngCore};
818

919
use std::fs;
10-
use std::io::{BufReader, Write};
20+
use std::io::Write;
21+
use std::ops::Deref;
1122
use std::path::Path;
12-
use std::sync::Arc;
23+
24+
use super::KVStore;
1325

1426
pub(crate) fn read_or_generate_seed_file(keys_seed_path: &str) -> [u8; WALLET_KEYS_SEED_LEN] {
1527
if Path::new(&keys_seed_path).exists() {
@@ -33,53 +45,125 @@ pub(crate) fn read_or_generate_seed_file(keys_seed_path: &str) -> [u8; WALLET_KE
3345
}
3446
}
3547

36-
pub(crate) fn read_network_graph(config: &Config, logger: Arc<FilesystemLogger>) -> NetworkGraph {
37-
let ldk_data_dir = format!("{}/ldk", config.storage_dir_path);
38-
let network_graph_path = format!("{}/network_graph", ldk_data_dir);
39-
40-
if let Ok(file) = fs::File::open(network_graph_path) {
41-
if let Ok(graph) = NetworkGraph::read(&mut BufReader::new(file), Arc::clone(&logger)) {
42-
return graph;
43-
}
44-
}
45-
46-
NetworkGraph::new(config.network, logger)
47-
}
48-
49-
pub(crate) fn read_scorer(
50-
config: &Config, network_graph: Arc<NetworkGraph>, logger: Arc<FilesystemLogger>,
51-
) -> Scorer {
52-
let ldk_data_dir = format!("{}/ldk", config.storage_dir_path);
53-
let scorer_path = format!("{}/scorer", ldk_data_dir);
48+
/// Read previously persisted [`ChannelMonitor`]s from the store.
49+
pub(crate) fn read_channel_monitors<K: Deref, ES: Deref, SP: Deref>(
50+
kv_store: K, entropy_source: ES, signer_provider: SP,
51+
) -> std::io::Result<Vec<(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>>
52+
where
53+
K::Target: KVStore,
54+
ES::Target: EntropySource + Sized,
55+
SP::Target: SignerProvider + Sized,
56+
{
57+
let mut res = Vec::new();
5458

55-
let params = ProbabilisticScoringParameters::default();
56-
if let Ok(file) = fs::File::open(scorer_path) {
57-
let args = (params.clone(), Arc::clone(&network_graph), Arc::clone(&logger));
58-
if let Ok(scorer) = ProbabilisticScorer::read(&mut BufReader::new(file), args) {
59-
return scorer;
60-
}
61-
}
62-
ProbabilisticScorer::new(params, network_graph, logger)
63-
}
59+
for stored_key in kv_store.list(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE)? {
60+
let txid = Txid::from_hex(stored_key.split_at(64).0).map_err(|_| {
61+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid tx ID in stored key")
62+
})?;
6463

65-
pub(crate) fn read_payment_info(config: &Config) -> Vec<PaymentInfo> {
66-
let ldk_data_dir = format!("{}/ldk", config.storage_dir_path);
67-
let payment_store_path = format!("{}/{}", ldk_data_dir, PAYMENT_INFO_PERSISTENCE_NAMESPACE);
68-
let mut payments = Vec::new();
64+
let index: u16 = stored_key.split_at(65).1.parse().map_err(|_| {
65+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid tx index in stored key")
66+
})?;
6967

70-
if let Ok(res) = fs::read_dir(payment_store_path) {
71-
for entry in res {
72-
if let Ok(entry) = entry {
73-
if entry.path().is_file() {
74-
if let Ok(mut f) = fs::File::open(entry.path()) {
75-
if let Ok(payment_info) = PaymentInfo::read(&mut f) {
76-
payments.push(payment_info);
77-
}
78-
}
68+
match <(BlockHash, ChannelMonitor<<SP::Target as SignerProvider>::Signer>)>::read(
69+
&mut kv_store.read(CHANNEL_MONITOR_PERSISTENCE_NAMESPACE, &stored_key)?,
70+
(&*entropy_source, &*signer_provider),
71+
) {
72+
Ok((block_hash, channel_monitor)) => {
73+
if channel_monitor.get_funding_txo().0.txid != txid
74+
|| channel_monitor.get_funding_txo().0.index != index
75+
{
76+
return Err(std::io::Error::new(
77+
std::io::ErrorKind::InvalidData,
78+
"ChannelMonitor was stored under the wrong key",
79+
));
7980
}
81+
res.push((block_hash, channel_monitor));
82+
}
83+
Err(e) => {
84+
return Err(std::io::Error::new(
85+
std::io::ErrorKind::InvalidData,
86+
format!("Failed to deserialize ChannelMonitor: {}", e),
87+
))
8088
}
8189
}
8290
}
91+
Ok(res)
92+
}
93+
/// Read a previously persisted [`NetworkGraph`] from the store.
94+
pub(crate) fn read_network_graph<K: Deref, L: Deref>(
95+
kv_store: K, logger: L,
96+
) -> Result<NetworkGraph<L>, std::io::Error>
97+
where
98+
K::Target: KVStore,
99+
L::Target: Logger,
100+
{
101+
let mut reader =
102+
kv_store.read(NETWORK_GRAPH_PERSISTENCE_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY)?;
103+
let graph = NetworkGraph::read(&mut reader, logger).map_err(|_| {
104+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize NetworkGraph")
105+
})?;
106+
Ok(graph)
107+
}
108+
/// Read a previously persisted [`Scorer`] from the store.
109+
pub(crate) fn read_scorer<K: Deref, G: Deref<Target = NetworkGraph<L>>, L: Deref>(
110+
kv_store: K, network_graph: G, logger: L,
111+
) -> Result<ProbabilisticScorer<G, L>, std::io::Error>
112+
where
113+
K::Target: KVStore,
114+
L::Target: Logger,
115+
{
116+
let params = ProbabilisticScoringParameters::default();
117+
let mut reader = kv_store.read(SCORER_PERSISTENCE_NAMESPACE, SCORER_PERSISTENCE_KEY)?;
118+
let args = (params, network_graph, logger);
119+
let scorer = ProbabilisticScorer::read(&mut reader, args).map_err(|_| {
120+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize Scorer")
121+
})?;
122+
Ok(scorer)
123+
}
124+
/// Read previously persisted events from the store.
125+
pub(crate) fn read_event_queue<K: Deref>(kv_store: K) -> Result<EventQueue<K>, std::io::Error>
126+
where
127+
K::Target: KVStore + KVStorePersister,
128+
{
129+
let mut reader =
130+
kv_store.read(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY)?;
131+
let event_queue = EventQueue::read(&mut reader, kv_store).map_err(|_| {
132+
std::io::Error::new(std::io::ErrorKind::InvalidData, "Failed to deserialize EventQueue")
133+
})?;
134+
Ok(event_queue)
135+
}
136+
/// Read previously persisted peer info from the store.
137+
pub(crate) fn read_peer_info<K: Deref>(kv_store: K) -> Result<PeerInfoStorage<K>, std::io::Error>
138+
where
139+
K::Target: KVStore + KVStorePersister,
140+
{
141+
let mut reader = kv_store.read(PEER_INFO_PERSISTENCE_NAMESPACE, PEER_INFO_PERSISTENCE_KEY)?;
142+
let peer_info = PeerInfoStorage::read(&mut reader, kv_store).map_err(|_| {
143+
std::io::Error::new(
144+
std::io::ErrorKind::InvalidData,
145+
"Failed to deserialize PeerInfoStorage",
146+
)
147+
})?;
148+
Ok(peer_info)
149+
}
150+
/// Read previously persisted payments information from the store.
151+
pub(crate) fn read_payment_info<K: Deref>(kv_store: K) -> Result<Vec<PaymentInfo>, std::io::Error>
152+
where
153+
K::Target: KVStore + KVStorePersister,
154+
{
155+
let mut res = Vec::new();
83156

84-
payments
157+
for stored_key in kv_store.list(PAYMENT_INFO_PERSISTENCE_NAMESPACE)? {
158+
let payment_info =
159+
PaymentInfo::read(&mut kv_store.read(PAYMENT_INFO_PERSISTENCE_NAMESPACE, &stored_key)?)
160+
.map_err(|_| {
161+
std::io::Error::new(
162+
std::io::ErrorKind::InvalidData,
163+
"Failed to deserialize PaymentInfo",
164+
)
165+
})?;
166+
res.push(payment_info);
167+
}
168+
Ok(res)
85169
}

0 commit comments

Comments
 (0)