Skip to content

Commit 8640d12

Browse files
committed
Add SqliteStore implementation
We upstream our `SqliteStore` implementation that allows persistence towards an SQLite database backend.
1 parent 6016101 commit 8640d12

File tree

5 files changed

+280
-1
lines changed

5 files changed

+280
-1
lines changed

bench/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ hashbrown = ["lightning/hashbrown"]
1313

1414
[dependencies]
1515
lightning = { path = "../lightning", features = ["_test_utils", "criterion"] }
16-
lightning-persister = { path = "../lightning-persister", features = ["criterion"] }
16+
lightning-persister = { path = "../lightning-persister", features = ["criterion", "sqlite-bundled"] }
1717
lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync", features = ["criterion"] }
1818
criterion = { version = "0.4", default-features = false }
1919

bench/benches/bench.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ criterion_group!(benches,
1919
lightning::sign::benches::bench_get_secure_random_bytes,
2020
lightning::ln::channelmanager::bench::bench_sends,
2121
lightning_persister::fs_store::bench::bench_sends,
22+
lightning_persister::sqlite_store::bench::bench_sends,
2223
lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file,
2324
lightning::routing::gossip::benches::read_network_graph,
2425
lightning::routing::gossip::benches::write_network_graph);

lightning-persister/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@ edition = "2018"
1313
all-features = true
1414
rustdoc-args = ["--cfg", "docsrs"]
1515

16+
[features]
17+
sqlite = ["rusqlite"]
18+
sqlite-bundled = ["sqlite", "rusqlite/bundled"]
19+
1620
[dependencies]
1721
bitcoin = "0.29.0"
1822
lightning = { version = "0.0.117-alpha2", path = "../lightning" }
23+
rusqlite = { version = "0.28.0", optional = true, default-features = false}
1924

2025
[target.'cfg(windows)'.dependencies]
2126
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
@@ -26,3 +31,4 @@ criterion = { version = "0.4", optional = true, default-features = false }
2631
[dev-dependencies]
2732
lightning = { version = "0.0.117-alpha2", path = "../lightning", features = ["_test_utils"] }
2833
bitcoin = { version = "0.29.0", default-features = false }
34+
rusqlite = { version = "0.28.0", default-features = false, features = ["bundled"]}

lightning-persister/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212

1313
pub mod fs_store;
1414

15+
#[cfg(any(test, feature = "sqlite"))]
16+
pub mod sqlite_store;
17+
1518
mod utils;
1619

1720
#[cfg(test)]
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
//! Objects related to [`SqliteStore`] live here.
2+
use crate::utils::check_namespace_key_validity;
3+
4+
use lightning::util::persist::KVStore;
5+
use lightning::util::string::PrintableString;
6+
use lightning::io;
7+
8+
use rusqlite::{named_params, Connection};
9+
10+
use std::fs;
11+
use std::path::PathBuf;
12+
use std::sync::{Arc, Mutex};
13+
14+
/// The default database file name.
15+
pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite";
16+
17+
/// The default table in which we store all data.
18+
pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data";
19+
20+
// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
21+
const SCHEMA_USER_VERSION: u16 = 2;
22+
23+
/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
24+
///
25+
/// [SQLite]: https://sqlite.org
26+
pub struct SqliteStore {
27+
connection: Arc<Mutex<Connection>>,
28+
data_dir: PathBuf,
29+
kv_table_name: String,
30+
}
31+
32+
impl SqliteStore {
33+
/// Constructs a new [`SqliteStore`].
34+
///
35+
/// If not already existing, a new SQLite database will be created in the given `data_dir` under the
36+
/// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`).
37+
///
38+
/// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`].
39+
pub fn new(data_dir: PathBuf, db_file_name: Option<String>, kv_table_name: Option<String>) -> io::Result<Self> {
40+
let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string());
41+
let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string());
42+
43+
fs::create_dir_all(data_dir.clone()).map_err(|e| {
44+
let msg = format!("Failed to create database destination directory {}: {}",
45+
data_dir.display(), e);
46+
io::Error::new(io::ErrorKind::Other, msg)
47+
})?;
48+
let mut db_file_path = data_dir.clone();
49+
db_file_path.push(db_file_name);
50+
51+
let connection = Connection::open(db_file_path.clone()).map_err(|e| {
52+
let msg = format!("Failed to open/create database file {}: {}",
53+
db_file_path.display(), e);
54+
io::Error::new(io::ErrorKind::Other, msg)
55+
})?;
56+
57+
connection.pragma(Some(rusqlite::DatabaseName::Main),
58+
"user_version", SCHEMA_USER_VERSION, |_| {
59+
Ok(())
60+
}).map_err(|e| {
61+
let msg = format!("Failed to set PRAGMA user_version: {}", e);
62+
io::Error::new(io::ErrorKind::Other, msg)
63+
})?;
64+
65+
let sql = format!(
66+
"CREATE TABLE IF NOT EXISTS {} (
67+
namespace TEXT NOT NULL,
68+
sub_namespace TEXT DEFAULT \"\" NOT NULL,
69+
key TEXT NOT NULL CHECK (key <> ''),
70+
value BLOB, PRIMARY KEY ( namespace, sub_namespace, key )
71+
);",
72+
kv_table_name
73+
);
74+
75+
connection.execute(&sql, []).map_err(|e| {
76+
let msg = format!("Failed to create table {}: {}", kv_table_name, e);
77+
io::Error::new(io::ErrorKind::Other, msg)
78+
})?;
79+
80+
let connection = Arc::new(Mutex::new(connection));
81+
Ok(Self { connection, data_dir, kv_table_name })
82+
}
83+
84+
/// Returns the data directory.
85+
pub fn get_data_dir(&self) -> PathBuf {
86+
self.data_dir.clone()
87+
}
88+
}
89+
90+
impl KVStore for SqliteStore {
91+
fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
92+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?;
93+
94+
let locked_conn = self.connection.lock().unwrap();
95+
let sql =
96+
format!("SELECT value FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace AND key=:key;",
97+
self.kv_table_name);
98+
99+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
100+
let msg = format!("Failed to prepare statement: {}", e);
101+
std::io::Error::new(std::io::ErrorKind::Other, msg)
102+
})?;
103+
104+
let res = stmt
105+
.query_row(
106+
named_params! {
107+
":namespace": namespace,
108+
":sub_namespace": sub_namespace,
109+
":key": key,
110+
},
111+
|row| row.get(0),
112+
)
113+
.map_err(|e| match e {
114+
rusqlite::Error::QueryReturnedNoRows => {
115+
let msg =
116+
format!("Failed to read as key could not be found: {}/{}/{}",
117+
PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key));
118+
std::io::Error::new(std::io::ErrorKind::NotFound, msg)
119+
}
120+
e => {
121+
let msg = format!("Failed to read from key {}/{}/{}: {}",
122+
PrintableString(namespace), PrintableString(sub_namespace),
123+
PrintableString(key), e);
124+
std::io::Error::new(std::io::ErrorKind::Other, msg)
125+
}
126+
})?;
127+
Ok(res)
128+
}
129+
130+
fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
131+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?;
132+
133+
let locked_conn = self.connection.lock().unwrap();
134+
135+
let sql = format!(
136+
"INSERT OR REPLACE INTO {} (namespace, sub_namespace, key, value) VALUES (:namespace, :sub_namespace, :key, :value);",
137+
self.kv_table_name
138+
);
139+
140+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
141+
let msg = format!("Failed to prepare statement: {}", e);
142+
std::io::Error::new(std::io::ErrorKind::Other, msg)
143+
})?;
144+
145+
stmt.execute(
146+
named_params! {
147+
":namespace": namespace,
148+
":sub_namespace": sub_namespace,
149+
":key": key,
150+
":value": buf,
151+
},
152+
)
153+
.map(|_| ())
154+
.map_err(|e| {
155+
let msg = format!("Failed to write to key {}/{}/{}: {}",
156+
PrintableString(namespace), PrintableString(sub_namespace),
157+
PrintableString(key), e);
158+
std::io::Error::new(std::io::ErrorKind::Other, msg)
159+
})
160+
}
161+
162+
fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, _lazy: bool) -> std::io::Result<()> {
163+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?;
164+
165+
let locked_conn = self.connection.lock().unwrap();
166+
167+
let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace AND key=:key;", self.kv_table_name);
168+
169+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
170+
let msg = format!("Failed to prepare statement: {}", e);
171+
std::io::Error::new(std::io::ErrorKind::Other, msg)
172+
})?;
173+
174+
stmt.execute(
175+
named_params! {
176+
":namespace": namespace,
177+
":sub_namespace": sub_namespace,
178+
":key": key,
179+
},
180+
)
181+
.map_err(|e| {
182+
let msg = format!("Failed to delete key {}/{}/{}: {}",
183+
PrintableString(namespace), PrintableString(sub_namespace),
184+
PrintableString(key), e);
185+
std::io::Error::new(std::io::ErrorKind::Other, msg)
186+
})?;
187+
Ok(())
188+
}
189+
190+
fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result<Vec<String>> {
191+
check_namespace_key_validity(namespace, sub_namespace, None, "list")?;
192+
193+
let locked_conn = self.connection.lock().unwrap();
194+
195+
let sql = format!("SELECT key FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace", self.kv_table_name);
196+
let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| {
197+
let msg = format!("Failed to prepare statement: {}", e);
198+
std::io::Error::new(std::io::ErrorKind::Other, msg)
199+
})?;
200+
201+
let mut keys = Vec::new();
202+
203+
let rows_iter = stmt
204+
.query_map(
205+
named_params! {
206+
":namespace": namespace,
207+
":sub_namespace": sub_namespace,
208+
}, |row| row.get(0))
209+
.map_err(|e| {
210+
let msg = format!("Failed to retrieve queried rows: {}", e);
211+
std::io::Error::new(std::io::ErrorKind::Other, msg)
212+
})?;
213+
214+
for k in rows_iter {
215+
keys.push(k.map_err(|e| {
216+
let msg = format!("Failed to retrieve queried rows: {}", e);
217+
std::io::Error::new(std::io::ErrorKind::Other, msg)
218+
})?);
219+
}
220+
221+
Ok(keys)
222+
}
223+
}
224+
225+
#[cfg(test)]
226+
mod tests {
227+
use super::*;
228+
use crate::test_utils::{do_read_write_remove_list_persist,do_test_store};
229+
230+
impl Drop for SqliteStore {
231+
fn drop(&mut self) {
232+
match fs::remove_dir_all(&self.data_dir) {
233+
Err(e) => println!("Failed to remove test store directory: {}", e),
234+
_ => {}
235+
}
236+
}
237+
}
238+
239+
#[test]
240+
fn read_write_remove_list_persist() {
241+
let mut temp_path = std::env::temp_dir();
242+
temp_path.push("read_write_remove_list_persist");
243+
let store = SqliteStore::new(temp_path, Some("test_db".to_string()), Some("test_table".to_string())).unwrap();
244+
do_read_write_remove_list_persist(&store);
245+
}
246+
247+
#[test]
248+
fn test_sqlite_store() {
249+
let mut temp_path = std::env::temp_dir();
250+
temp_path.push("test_sqlite_store");
251+
let store_0 = SqliteStore::new(temp_path.clone(), Some("test_db_0".to_string()), Some("test_table".to_string())).unwrap();
252+
let store_1 = SqliteStore::new(temp_path, Some("test_db_1".to_string()), Some("test_table".to_string())).unwrap();
253+
do_test_store(&store_0, &store_1)
254+
}
255+
}
256+
257+
#[cfg(ldk_bench)]
258+
/// Benches
259+
pub mod bench {
260+
use criterion::Criterion;
261+
262+
/// Bench!
263+
pub fn bench_sends(bench: &mut Criterion) {
264+
let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap();
265+
let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap();
266+
lightning::ln::channelmanager::bench::bench_two_sends(
267+
bench, "bench_sqlite_persisted_sends", store_a, store_b);
268+
}
269+
}

0 commit comments

Comments
 (0)