From 8640d122101bb9385904ceaf1ed1426a7bed3cf1 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 2 Aug 2023 16:15:49 +0200 Subject: [PATCH 1/5] Add `SqliteStore` implementation We upstream our `SqliteStore` implementation that allows persistence towards an SQLite database backend. --- bench/Cargo.toml | 2 +- bench/benches/bench.rs | 1 + lightning-persister/Cargo.toml | 6 + lightning-persister/src/lib.rs | 3 + lightning-persister/src/sqlite_store/mod.rs | 269 ++++++++++++++++++++ 5 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 lightning-persister/src/sqlite_store/mod.rs diff --git a/bench/Cargo.toml b/bench/Cargo.toml index e582d29da81..f9208e1cc86 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -13,7 +13,7 @@ hashbrown = ["lightning/hashbrown"] [dependencies] lightning = { path = "../lightning", features = ["_test_utils", "criterion"] } -lightning-persister = { path = "../lightning-persister", features = ["criterion"] } +lightning-persister = { path = "../lightning-persister", features = ["criterion", "sqlite-bundled"] } lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync", features = ["criterion"] } criterion = { version = "0.4", default-features = false } diff --git a/bench/benches/bench.rs b/bench/benches/bench.rs index eaa3fcec50c..72098f6a066 100644 --- a/bench/benches/bench.rs +++ b/bench/benches/bench.rs @@ -19,6 +19,7 @@ criterion_group!(benches, lightning::sign::benches::bench_get_secure_random_bytes, lightning::ln::channelmanager::bench::bench_sends, lightning_persister::fs_store::bench::bench_sends, + lightning_persister::sqlite_store::bench::bench_sends, lightning_rapid_gossip_sync::bench::bench_reading_full_graph_from_file, lightning::routing::gossip::benches::read_network_graph, lightning::routing::gossip::benches::write_network_graph); diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml index 43d97ebbe12..1d0a88e592b 100644 --- a/lightning-persister/Cargo.toml +++ b/lightning-persister/Cargo.toml @@ -13,9 +13,14 @@ edition = "2018" all-features = true rustdoc-args = ["--cfg", "docsrs"] +[features] +sqlite = ["rusqlite"] +sqlite-bundled = ["sqlite", "rusqlite/bundled"] + [dependencies] bitcoin = "0.29.0" lightning = { version = "0.0.117-alpha2", path = "../lightning" } +rusqlite = { version = "0.28.0", optional = true, default-features = false} [target.'cfg(windows)'.dependencies] windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] } @@ -26,3 +31,4 @@ criterion = { version = "0.4", optional = true, default-features = false } [dev-dependencies] lightning = { version = "0.0.117-alpha2", path = "../lightning", features = ["_test_utils"] } bitcoin = { version = "0.29.0", default-features = false } +rusqlite = { version = "0.28.0", default-features = false, features = ["bundled"]} diff --git a/lightning-persister/src/lib.rs b/lightning-persister/src/lib.rs index ae258e137d7..d7aa1dad4eb 100644 --- a/lightning-persister/src/lib.rs +++ b/lightning-persister/src/lib.rs @@ -12,6 +12,9 @@ pub mod fs_store; +#[cfg(any(test, feature = "sqlite"))] +pub mod sqlite_store; + mod utils; #[cfg(test)] diff --git a/lightning-persister/src/sqlite_store/mod.rs b/lightning-persister/src/sqlite_store/mod.rs new file mode 100644 index 00000000000..2c283f54206 --- /dev/null +++ b/lightning-persister/src/sqlite_store/mod.rs @@ -0,0 +1,269 @@ +//! Objects related to [`SqliteStore`] live here. +use crate::utils::check_namespace_key_validity; + +use lightning::util::persist::KVStore; +use lightning::util::string::PrintableString; +use lightning::io; + +use rusqlite::{named_params, Connection}; + +use std::fs; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +/// The default database file name. +pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite"; + +/// The default table in which we store all data. +pub const DEFAULT_KV_TABLE_NAME: &str = "ldk_data"; + +// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration. +const SCHEMA_USER_VERSION: u16 = 2; + +/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database. +/// +/// [SQLite]: https://sqlite.org +pub struct SqliteStore { + connection: Arc>, + data_dir: PathBuf, + kv_table_name: String, +} + +impl SqliteStore { + /// Constructs a new [`SqliteStore`]. + /// + /// If not already existing, a new SQLite database will be created in the given `data_dir` under the + /// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`). + /// + /// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`]. + pub fn new(data_dir: PathBuf, db_file_name: Option, kv_table_name: Option) -> io::Result { + let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string()); + let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string()); + + fs::create_dir_all(data_dir.clone()).map_err(|e| { + let msg = format!("Failed to create database destination directory {}: {}", + data_dir.display(), e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + let mut db_file_path = data_dir.clone(); + db_file_path.push(db_file_name); + + let connection = Connection::open(db_file_path.clone()).map_err(|e| { + let msg = format!("Failed to open/create database file {}: {}", + db_file_path.display(), e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + connection.pragma(Some(rusqlite::DatabaseName::Main), + "user_version", SCHEMA_USER_VERSION, |_| { + Ok(()) + }).map_err(|e| { + let msg = format!("Failed to set PRAGMA user_version: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + namespace TEXT NOT NULL, + sub_namespace TEXT DEFAULT \"\" NOT NULL, + key TEXT NOT NULL CHECK (key <> ''), + value BLOB, PRIMARY KEY ( namespace, sub_namespace, key ) + );", + kv_table_name + ); + + connection.execute(&sql, []).map_err(|e| { + let msg = format!("Failed to create table {}: {}", kv_table_name, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + let connection = Arc::new(Mutex::new(connection)); + Ok(Self { connection, data_dir, kv_table_name }) + } + + /// Returns the data directory. + pub fn get_data_dir(&self) -> PathBuf { + self.data_dir.clone() + } +} + +impl KVStore for SqliteStore { + fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result> { + check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?; + + let locked_conn = self.connection.lock().unwrap(); + let sql = + format!("SELECT value FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace AND key=:key;", + self.kv_table_name); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + let res = stmt + .query_row( + named_params! { + ":namespace": namespace, + ":sub_namespace": sub_namespace, + ":key": key, + }, + |row| row.get(0), + ) + .map_err(|e| match e { + rusqlite::Error::QueryReturnedNoRows => { + let msg = + format!("Failed to read as key could not be found: {}/{}/{}", + PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key)); + std::io::Error::new(std::io::ErrorKind::NotFound, msg) + } + e => { + let msg = format!("Failed to read from key {}/{}/{}: {}", + PrintableString(namespace), PrintableString(sub_namespace), + PrintableString(key), e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + } + })?; + Ok(res) + } + + fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { + check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!( + "INSERT OR REPLACE INTO {} (namespace, sub_namespace, key, value) VALUES (:namespace, :sub_namespace, :key, :value);", + self.kv_table_name + ); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + stmt.execute( + named_params! { + ":namespace": namespace, + ":sub_namespace": sub_namespace, + ":key": key, + ":value": buf, + }, + ) + .map(|_| ()) + .map_err(|e| { + let msg = format!("Failed to write to key {}/{}/{}: {}", + PrintableString(namespace), PrintableString(sub_namespace), + PrintableString(key), e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + }) + } + + fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, _lazy: bool) -> std::io::Result<()> { + check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace AND key=:key;", self.kv_table_name); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + stmt.execute( + named_params! { + ":namespace": namespace, + ":sub_namespace": sub_namespace, + ":key": key, + }, + ) + .map_err(|e| { + let msg = format!("Failed to delete key {}/{}/{}: {}", + PrintableString(namespace), PrintableString(sub_namespace), + PrintableString(key), e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + Ok(()) + } + + fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result> { + check_namespace_key_validity(namespace, sub_namespace, None, "list")?; + + let locked_conn = self.connection.lock().unwrap(); + + let sql = format!("SELECT key FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace", self.kv_table_name); + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + let mut keys = Vec::new(); + + let rows_iter = stmt + .query_map( + named_params! { + ":namespace": namespace, + ":sub_namespace": sub_namespace, + }, |row| row.get(0)) + .map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; + + for k in rows_iter { + keys.push(k.map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?); + } + + Ok(keys) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{do_read_write_remove_list_persist,do_test_store}; + + impl Drop for SqliteStore { + fn drop(&mut self) { + match fs::remove_dir_all(&self.data_dir) { + Err(e) => println!("Failed to remove test store directory: {}", e), + _ => {} + } + } + } + + #[test] + fn read_write_remove_list_persist() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("read_write_remove_list_persist"); + let store = SqliteStore::new(temp_path, Some("test_db".to_string()), Some("test_table".to_string())).unwrap(); + do_read_write_remove_list_persist(&store); + } + + #[test] + fn test_sqlite_store() { + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_sqlite_store"); + let store_0 = SqliteStore::new(temp_path.clone(), Some("test_db_0".to_string()), Some("test_table".to_string())).unwrap(); + let store_1 = SqliteStore::new(temp_path, Some("test_db_1".to_string()), Some("test_table".to_string())).unwrap(); + do_test_store(&store_0, &store_1) + } +} + +#[cfg(ldk_bench)] +/// Benches +pub mod bench { + use criterion::Criterion; + + /// Bench! + pub fn bench_sends(bench: &mut Criterion) { + let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap(); + let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap(); + lightning::ln::channelmanager::bench::bench_two_sends( + bench, "bench_sqlite_persisted_sends", store_a, store_b); + } +} From 56d7566efabe88dac5c02014f44759d1ffe19dc3 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 7 Sep 2023 11:46:15 +0200 Subject: [PATCH 2/5] f Add schema migrations and test them --- .../src/sqlite_store/migrations.rs | 118 ++++++++++++++++++ lightning-persister/src/sqlite_store/mod.rs | 17 ++- 2 files changed, 134 insertions(+), 1 deletion(-) create mode 100644 lightning-persister/src/sqlite_store/migrations.rs diff --git a/lightning-persister/src/sqlite_store/migrations.rs b/lightning-persister/src/sqlite_store/migrations.rs new file mode 100644 index 00000000000..849fd30c145 --- /dev/null +++ b/lightning-persister/src/sqlite_store/migrations.rs @@ -0,0 +1,118 @@ +use rusqlite::Connection; + +use lightning::io; + +pub(super) fn migrate_schema(connection: &Connection, kv_table_name: &str, from_version: u16, to_version: u16) -> io::Result<()> { + assert!(from_version < to_version); + if from_version == 1 && to_version == 2 { + let sql = format!( + "ALTER TABLE {} + ADD sub_namespace TEXT DEFAULT \"\" NOT NULL;", + kv_table_name); + connection .execute(&sql, []).map_err(|e| { + let msg = format!("Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + connection.pragma(Some(rusqlite::DatabaseName::Main), + "user_version", to_version, |_| { + Ok(()) + }).map_err(|e| { + let msg = format!("Failed to upgrade user_version from {} to {}: {}", + from_version, to_version, e); + io::Error::new(io::ErrorKind::Other, msg) + })?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use crate::sqlite_store::SqliteStore; + use crate::test_utils::do_read_write_remove_list_persist; + + use lightning::util::persist::KVStore; + + use rusqlite::{named_params, Connection}; + + use std::fs; + + #[test] + fn rwrl_post_schema_1_migration() { + let old_schema_version = 1; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("rwrl_post_schema_1_migration"); + + let db_file_name = "test_db".to_string(); + let kv_table_name = "test_table".to_string(); + + let test_namespace = "testspace".to_string(); + let test_key = "testkey".to_string(); + let test_data = [42u8; 32]; + + { + // We create a database with a SCHEMA_VERSION 1 table + fs::create_dir_all(temp_path.clone()).unwrap(); + let mut db_file_path = temp_path.clone(); + db_file_path.push(db_file_name.clone()); + + let connection = Connection::open(db_file_path.clone()).unwrap(); + + connection + .pragma(Some(rusqlite::DatabaseName::Main), "user_version", old_schema_version, |_| { + Ok(()) + }).unwrap(); + + let sql = format!( + "CREATE TABLE IF NOT EXISTS {} ( + namespace TEXT NOT NULL, + key TEXT NOT NULL CHECK (key <> ''), + value BLOB, PRIMARY KEY ( namespace, key ) + );", + kv_table_name + ); + + connection.execute(&sql, []).unwrap(); + + // We write some data to to the table + let sql = format!( + "INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);", + kv_table_name + ); + let mut stmt = connection.prepare_cached(&sql).unwrap(); + + stmt.execute( + named_params! { + ":namespace": test_namespace, + ":key": test_key, + ":value": test_data, + }).unwrap(); + + // We read the just written data back to assert it happened. + let sql = format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", + kv_table_name); + let mut stmt = connection.prepare_cached(&sql).unwrap(); + + let res: Vec = stmt + .query_row( + named_params! { + ":namespace": test_namespace, + ":key": test_key, + }, + |row| row.get(0), + ).unwrap(); + + assert_eq!(res, test_data); + } + + // Check we migrate the db just fine without losing our written data. + let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap(); + let res = store.read(&test_namespace, "", &test_key).unwrap(); + assert_eq!(res, test_data); + + // Check we can continue to use the store just fine. + do_read_write_remove_list_persist(&store); + } +} diff --git a/lightning-persister/src/sqlite_store/mod.rs b/lightning-persister/src/sqlite_store/mod.rs index 2c283f54206..30d277c513e 100644 --- a/lightning-persister/src/sqlite_store/mod.rs +++ b/lightning-persister/src/sqlite_store/mod.rs @@ -11,6 +11,8 @@ use std::fs; use std::path::PathBuf; use std::sync::{Arc, Mutex}; +mod migrations; + /// The default database file name. pub const DEFAULT_SQLITE_DB_FILE_NAME: &str = "ldk_data.sqlite"; @@ -54,13 +56,26 @@ impl SqliteStore { io::Error::new(io::ErrorKind::Other, msg) })?; - connection.pragma(Some(rusqlite::DatabaseName::Main), + let sql = format!("SELECT user_version FROM pragma_user_version"); + let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap(); + + if version_res == 0 { + // New database, set our SCHEMA_USER_VERSION and continue + connection.pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| { Ok(()) }).map_err(|e| { let msg = format!("Failed to set PRAGMA user_version: {}", e); io::Error::new(io::ErrorKind::Other, msg) })?; + } else if version_res < SCHEMA_USER_VERSION { + migrations::migrate_schema(&connection, &kv_table_name, version_res, + SCHEMA_USER_VERSION)?; + } else if version_res > SCHEMA_USER_VERSION { + let msg = format!("Failed to open database: incompatible schema version {}. Expected: {}", + version_res, SCHEMA_USER_VERSION); + return Err(io::Error::new(io::ErrorKind::Other, msg)); + } let sql = format!( "CREATE TABLE IF NOT EXISTS {} ( From 260e8ee72cb7b4ec9fdfd4396973cfa1d54e3eb7 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 29 Sep 2023 12:42:55 +0200 Subject: [PATCH 3/5] f Account for renamed namespaces --- lightning-persister/Cargo.toml | 1 + .../src/sqlite_store/migrations.rs | 102 ++++++--- lightning-persister/src/sqlite_store/mod.rs | 216 +++++++++++------- lightning-persister/src/test_utils.rs | 11 + 4 files changed, 223 insertions(+), 107 deletions(-) diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml index 1d0a88e592b..fcb60d61456 100644 --- a/lightning-persister/Cargo.toml +++ b/lightning-persister/Cargo.toml @@ -32,3 +32,4 @@ criterion = { version = "0.4", optional = true, default-features = false } lightning = { version = "0.0.117-alpha2", path = "../lightning", features = ["_test_utils"] } bitcoin = { version = "0.29.0", default-features = false } rusqlite = { version = "0.28.0", default-features = false, features = ["bundled"]} +rand = "0.8.5" diff --git a/lightning-persister/src/sqlite_store/migrations.rs b/lightning-persister/src/sqlite_store/migrations.rs index 849fd30c145..56fe7420740 100644 --- a/lightning-persister/src/sqlite_store/migrations.rs +++ b/lightning-persister/src/sqlite_store/migrations.rs @@ -2,25 +2,64 @@ use rusqlite::Connection; use lightning::io; -pub(super) fn migrate_schema(connection: &Connection, kv_table_name: &str, from_version: u16, to_version: u16) -> io::Result<()> { +pub(super) fn migrate_schema( + connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16, +) -> io::Result<()> { assert!(from_version < to_version); if from_version == 1 && to_version == 2 { + let tx = connection.transaction().map_err(|e| { + let msg = format!( + "Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Rename 'namespace' column to 'primary_namespace' + let sql = format!( + "ALTER TABLE {} + RENAME COLUMN namespace TO primary_namespace;", + kv_table_name + ); + + tx.execute(&sql, []).map_err(|e| { + let msg = format!( + "Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Add new 'secondary_namespace' column let sql = format!( "ALTER TABLE {} - ADD sub_namespace TEXT DEFAULT \"\" NOT NULL;", - kv_table_name); - connection .execute(&sql, []).map_err(|e| { - let msg = format!("Failed to migrate table {} from user_version {} to {}: {}", - kv_table_name, from_version, to_version, e); + ADD secondary_namespace TEXT DEFAULT \"\" NOT NULL;", + kv_table_name + ); + + tx.execute(&sql, []).map_err(|e| { + let msg = format!( + "Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e + ); + io::Error::new(io::ErrorKind::Other, msg) + })?; + + // Update user_version + tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", to_version, |_| Ok(())) + .map_err(|e| { + let msg = format!( + "Failed to upgrade user_version from {} to {}: {}", + from_version, to_version, e + ); io::Error::new(io::ErrorKind::Other, msg) })?; - connection.pragma(Some(rusqlite::DatabaseName::Main), - "user_version", to_version, |_| { - Ok(()) - }).map_err(|e| { - let msg = format!("Failed to upgrade user_version from {} to {}: {}", - from_version, to_version, e); + tx.commit().map_err(|e| { + let msg = format!( + "Failed to migrate table {} from user_version {} to {}: {}", + kv_table_name, from_version, to_version, e + ); io::Error::new(io::ErrorKind::Other, msg) })?; } @@ -30,7 +69,7 @@ pub(super) fn migrate_schema(connection: &Connection, kv_table_name: &str, from_ #[cfg(test)] mod tests { use crate::sqlite_store::SqliteStore; - use crate::test_utils::do_read_write_remove_list_persist; + use crate::test_utils::{do_read_write_remove_list_persist, random_storage_path}; use lightning::util::persist::KVStore; @@ -42,7 +81,7 @@ mod tests { fn rwrl_post_schema_1_migration() { let old_schema_version = 1; - let mut temp_path = std::env::temp_dir(); + let mut temp_path = random_storage_path(); temp_path.push("rwrl_post_schema_1_migration"); let db_file_name = "test_db".to_string(); @@ -61,9 +100,13 @@ mod tests { let connection = Connection::open(db_file_path.clone()).unwrap(); connection - .pragma(Some(rusqlite::DatabaseName::Main), "user_version", old_schema_version, |_| { - Ok(()) - }).unwrap(); + .pragma( + Some(rusqlite::DatabaseName::Main), + "user_version", + old_schema_version, + |_| Ok(()), + ) + .unwrap(); let sql = format!( "CREATE TABLE IF NOT EXISTS {} ( @@ -71,8 +114,8 @@ mod tests { key TEXT NOT NULL CHECK (key <> ''), value BLOB, PRIMARY KEY ( namespace, key ) );", - kv_table_name - ); + kv_table_name + ); connection.execute(&sql, []).unwrap(); @@ -83,16 +126,18 @@ mod tests { ); let mut stmt = connection.prepare_cached(&sql).unwrap(); - stmt.execute( - named_params! { - ":namespace": test_namespace, - ":key": test_key, - ":value": test_data, - }).unwrap(); + stmt.execute(named_params! { + ":namespace": test_namespace, + ":key": test_key, + ":value": test_data, + }) + .unwrap(); // We read the just written data back to assert it happened. - let sql = format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", - kv_table_name); + let sql = format!( + "SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", + kv_table_name + ); let mut stmt = connection.prepare_cached(&sql).unwrap(); let res: Vec = stmt @@ -102,7 +147,8 @@ mod tests { ":key": test_key, }, |row| row.get(0), - ).unwrap(); + ) + .unwrap(); assert_eq!(res, test_data); } diff --git a/lightning-persister/src/sqlite_store/mod.rs b/lightning-persister/src/sqlite_store/mod.rs index 30d277c513e..3b353a9dd05 100644 --- a/lightning-persister/src/sqlite_store/mod.rs +++ b/lightning-persister/src/sqlite_store/mod.rs @@ -1,9 +1,9 @@ //! Objects related to [`SqliteStore`] live here. use crate::utils::check_namespace_key_validity; +use lightning::io; use lightning::util::persist::KVStore; use lightning::util::string::PrintableString; -use lightning::io; use rusqlite::{named_params, Connection}; @@ -38,21 +38,26 @@ impl SqliteStore { /// given `db_file_name` (or the default to [`DEFAULT_SQLITE_DB_FILE_NAME`] if set to `None`). /// /// Similarly, the given `kv_table_name` will be used or default to [`DEFAULT_KV_TABLE_NAME`]. - pub fn new(data_dir: PathBuf, db_file_name: Option, kv_table_name: Option) -> io::Result { + pub fn new( + data_dir: PathBuf, db_file_name: Option, kv_table_name: Option, + ) -> io::Result { let db_file_name = db_file_name.unwrap_or(DEFAULT_SQLITE_DB_FILE_NAME.to_string()); let kv_table_name = kv_table_name.unwrap_or(DEFAULT_KV_TABLE_NAME.to_string()); fs::create_dir_all(data_dir.clone()).map_err(|e| { - let msg = format!("Failed to create database destination directory {}: {}", - data_dir.display(), e); + let msg = format!( + "Failed to create database destination directory {}: {}", + data_dir.display(), + e + ); io::Error::new(io::ErrorKind::Other, msg) })?; let mut db_file_path = data_dir.clone(); db_file_path.push(db_file_name); - let connection = Connection::open(db_file_path.clone()).map_err(|e| { - let msg = format!("Failed to open/create database file {}: {}", - db_file_path.display(), e); + let mut connection = Connection::open(db_file_path.clone()).map_err(|e| { + let msg = + format!("Failed to open/create database file {}: {}", db_file_path.display(), e); io::Error::new(io::ErrorKind::Other, msg) })?; @@ -61,28 +66,38 @@ impl SqliteStore { if version_res == 0 { // New database, set our SCHEMA_USER_VERSION and continue - connection.pragma(Some(rusqlite::DatabaseName::Main), - "user_version", SCHEMA_USER_VERSION, |_| { - Ok(()) - }).map_err(|e| { - let msg = format!("Failed to set PRAGMA user_version: {}", e); - io::Error::new(io::ErrorKind::Other, msg) - })?; + connection + .pragma( + Some(rusqlite::DatabaseName::Main), + "user_version", + SCHEMA_USER_VERSION, + |_| Ok(()), + ) + .map_err(|e| { + let msg = format!("Failed to set PRAGMA user_version: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; } else if version_res < SCHEMA_USER_VERSION { - migrations::migrate_schema(&connection, &kv_table_name, version_res, - SCHEMA_USER_VERSION)?; + migrations::migrate_schema( + &mut connection, + &kv_table_name, + version_res, + SCHEMA_USER_VERSION, + )?; } else if version_res > SCHEMA_USER_VERSION { - let msg = format!("Failed to open database: incompatible schema version {}. Expected: {}", - version_res, SCHEMA_USER_VERSION); + let msg = format!( + "Failed to open database: incompatible schema version {}. Expected: {}", + version_res, SCHEMA_USER_VERSION + ); return Err(io::Error::new(io::ErrorKind::Other, msg)); } let sql = format!( "CREATE TABLE IF NOT EXISTS {} ( - namespace TEXT NOT NULL, - sub_namespace TEXT DEFAULT \"\" NOT NULL, + primary_namespace TEXT NOT NULL, + secondary_namespace TEXT DEFAULT \"\" NOT NULL, key TEXT NOT NULL CHECK (key <> ''), - value BLOB, PRIMARY KEY ( namespace, sub_namespace, key ) + value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key ) );", kv_table_name ); @@ -103,12 +118,14 @@ impl SqliteStore { } impl KVStore for SqliteStore { - fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result> { - check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?; + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> std::io::Result> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; let locked_conn = self.connection.lock().unwrap(); let sql = - format!("SELECT value FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace AND key=:key;", + format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name); let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { @@ -119,36 +136,45 @@ impl KVStore for SqliteStore { let res = stmt .query_row( named_params! { - ":namespace": namespace, - ":sub_namespace": sub_namespace, + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, ":key": key, }, |row| row.get(0), ) .map_err(|e| match e { rusqlite::Error::QueryReturnedNoRows => { - let msg = - format!("Failed to read as key could not be found: {}/{}/{}", - PrintableString(namespace), PrintableString(sub_namespace), PrintableString(key)); + let msg = format!( + "Failed to read as key could not be found: {}/{}/{}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key) + ); std::io::Error::new(std::io::ErrorKind::NotFound, msg) } e => { - let msg = format!("Failed to read from key {}/{}/{}: {}", - PrintableString(namespace), PrintableString(sub_namespace), - PrintableString(key), e); + let msg = format!( + "Failed to read from key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); std::io::Error::new(std::io::ErrorKind::Other, msg) } })?; Ok(res) } - fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { - check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?; + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> std::io::Result<()> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; let locked_conn = self.connection.lock().unwrap(); let sql = format!( - "INSERT OR REPLACE INTO {} (namespace, sub_namespace, key, value) VALUES (:namespace, :sub_namespace, :key, :value);", + "INSERT OR REPLACE INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:primary_namespace, :secondary_namespace, :key, :value);", self.kv_table_name ); @@ -157,57 +183,68 @@ impl KVStore for SqliteStore { std::io::Error::new(std::io::ErrorKind::Other, msg) })?; - stmt.execute( - named_params! { - ":namespace": namespace, - ":sub_namespace": sub_namespace, - ":key": key, - ":value": buf, - }, - ) - .map(|_| ()) - .map_err(|e| { - let msg = format!("Failed to write to key {}/{}/{}: {}", - PrintableString(namespace), PrintableString(sub_namespace), - PrintableString(key), e); - std::io::Error::new(std::io::ErrorKind::Other, msg) - }) + stmt.execute(named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key": key, + ":value": buf, + }) + .map(|_| ()) + .map_err(|e| { + let msg = format!( + "Failed to write to key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); + std::io::Error::new(std::io::ErrorKind::Other, msg) + }) } - fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, _lazy: bool) -> std::io::Result<()> { - check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?; + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> std::io::Result<()> { + check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; let locked_conn = self.connection.lock().unwrap(); - let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace AND key=:key;", self.kv_table_name); + let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name); let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { let msg = format!("Failed to prepare statement: {}", e); std::io::Error::new(std::io::ErrorKind::Other, msg) })?; - stmt.execute( - named_params! { - ":namespace": namespace, - ":sub_namespace": sub_namespace, - ":key": key, - }, - ) - .map_err(|e| { - let msg = format!("Failed to delete key {}/{}/{}: {}", - PrintableString(namespace), PrintableString(sub_namespace), - PrintableString(key), e); - std::io::Error::new(std::io::ErrorKind::Other, msg) - })?; + stmt.execute(named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key": key, + }) + .map_err(|e| { + let msg = format!( + "Failed to delete key {}/{}/{}: {}", + PrintableString(primary_namespace), + PrintableString(secondary_namespace), + PrintableString(key), + e + ); + std::io::Error::new(std::io::ErrorKind::Other, msg) + })?; Ok(()) } - fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result> { - check_namespace_key_validity(namespace, sub_namespace, None, "list")?; + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result> { + check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; let locked_conn = self.connection.lock().unwrap(); - let sql = format!("SELECT key FROM {} WHERE namespace=:namespace AND sub_namespace=:sub_namespace", self.kv_table_name); + let sql = format!( + "SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace", + self.kv_table_name + ); let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { let msg = format!("Failed to prepare statement: {}", e); std::io::Error::new(std::io::ErrorKind::Other, msg) @@ -218,9 +255,11 @@ impl KVStore for SqliteStore { let rows_iter = stmt .query_map( named_params! { - ":namespace": namespace, - ":sub_namespace": sub_namespace, - }, |row| row.get(0)) + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + }, + |row| row.get(0), + ) .map_err(|e| { let msg = format!("Failed to retrieve queried rows: {}", e); std::io::Error::new(std::io::ErrorKind::Other, msg) @@ -240,7 +279,7 @@ impl KVStore for SqliteStore { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{do_read_write_remove_list_persist,do_test_store}; + use crate::test_utils::{do_read_write_remove_list_persist, do_test_store, random_storage_path}; impl Drop for SqliteStore { fn drop(&mut self) { @@ -253,18 +292,33 @@ mod tests { #[test] fn read_write_remove_list_persist() { - let mut temp_path = std::env::temp_dir(); + let mut temp_path = random_storage_path(); temp_path.push("read_write_remove_list_persist"); - let store = SqliteStore::new(temp_path, Some("test_db".to_string()), Some("test_table".to_string())).unwrap(); + let store = SqliteStore::new( + temp_path, + Some("test_db".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); do_read_write_remove_list_persist(&store); } #[test] fn test_sqlite_store() { - let mut temp_path = std::env::temp_dir(); + let mut temp_path = random_storage_path(); temp_path.push("test_sqlite_store"); - let store_0 = SqliteStore::new(temp_path.clone(), Some("test_db_0".to_string()), Some("test_table".to_string())).unwrap(); - let store_1 = SqliteStore::new(temp_path, Some("test_db_1".to_string()), Some("test_table".to_string())).unwrap(); + let store_0 = SqliteStore::new( + temp_path.clone(), + Some("test_db_0".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); + let store_1 = SqliteStore::new( + temp_path, + Some("test_db_1".to_string()), + Some("test_table".to_string()), + ) + .unwrap(); do_test_store(&store_0, &store_1) } } @@ -279,6 +333,10 @@ pub mod bench { let store_a = super::SqliteStore::new("bench_sqlite_store_a".into(), None, None).unwrap(); let store_b = super::SqliteStore::new("bench_sqlite_store_b".into(), None, None).unwrap(); lightning::ln::channelmanager::bench::bench_two_sends( - bench, "bench_sqlite_persisted_sends", store_a, store_b); + bench, + "bench_sqlite_persisted_sends", + store_a, + store_b, + ); } } diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs index 360fa3492bf..bb874194837 100644 --- a/lightning-persister/src/test_utils.rs +++ b/lightning-persister/src/test_utils.rs @@ -7,8 +7,19 @@ use lightning::util::test_utils; use lightning::{check_closed_broadcast, check_closed_event, check_added_monitors}; use lightning::events::ClosureReason; +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; + use std::panic::RefUnwindSafe; +pub fn random_storage_path() -> PathBuf { + let mut temp_path = std::env::temp_dir(); + let mut rng = thread_rng(); + let rand_dir: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); + temp_path.push(rand_dir); + temp_path +} + pub(crate) fn do_read_write_remove_list_persist(kv_store: &K) { let data = [42u8; 32]; From 617d00218cd7b2ff47788e82a41a52f3c5b58400 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 25 Aug 2023 15:19:22 +0200 Subject: [PATCH 4/5] Add `TestSyncStore` We add a `KVStore` implementation that wraps all three of our `KVStore` implementation and uses them in lock step while asserting they behave identical. This will be useful for (upcoming) integration tests. --- lightning-persister/src/test_utils.rs | 116 +++++++++++++++++++++++++- 1 file changed, 115 insertions(+), 1 deletion(-) diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs index bb874194837..6637921948a 100644 --- a/lightning-persister/src/test_utils.rs +++ b/lightning-persister/src/test_utils.rs @@ -1,9 +1,13 @@ +use crate::fs_store::FilesystemStore; +use crate::sqlite_store::SqliteStore; + use lightning::util::persist::{KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN, read_channel_monitors}; use lightning::ln::functional_test_utils::{connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block, create_network, create_node_cfgs, create_node_chanmgrs, send_payment}; + use lightning::chain::channelmonitor::CLOSED_CHANNEL_UPDATE_ID; -use lightning::util::test_utils; +use lightning::util::test_utils::{self, TestStore}; use lightning::{check_closed_broadcast, check_closed_event, check_added_monitors}; use lightning::events::ClosureReason; @@ -11,6 +15,7 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::panic::RefUnwindSafe; +use std::path::PathBuf; pub fn random_storage_path() -> PathBuf { let mut temp_path = std::env::temp_dir(); @@ -131,3 +136,112 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // Make sure everything is persisted as expected after close. check_persisted_data!(CLOSED_CHANNEL_UPDATE_ID); } + +// A `KVStore` impl for testing purposes that wraps all our `KVStore`s and asserts their synchronicity. +pub(crate) struct TestSyncStore { + test_store: TestStore, + fs_store: FilesystemStore, + sqlite_store: SqliteStore, +} + +impl TestSyncStore { + pub(crate) fn new(dest_dir: PathBuf) -> Self { + let fs_store = FilesystemStore::new(dest_dir.clone()); + let sqlite_store = SqliteStore::new(dest_dir, + Some("test_sync_db".to_string()), Some("test_sync_table".to_string())).unwrap(); + let test_store = TestStore::new(false); + Self { fs_store, sqlite_store, test_store } + } +} + +impl KVStore for TestSyncStore { + fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result> { + let fs_res = self.fs_store.read(namespace, sub_namespace, key); + let sqlite_res = self.sqlite_store.read(namespace, sub_namespace, key); + let test_res = self.test_store.read(namespace, sub_namespace, key); + + match fs_res { + Ok(read) => { + assert_eq!(read, sqlite_res.unwrap()); + assert_eq!(read, test_res.unwrap()); + Ok(read) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert_eq!(e.kind(), unsafe { sqlite_res.unwrap_err_unchecked().kind() }); + assert!(test_res.is_err()); + assert_eq!(e.kind(), unsafe { test_res.unwrap_err_unchecked().kind() }); + Err(e) + } + } + } + + fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { + let fs_res = self.fs_store.write(namespace, sub_namespace, key, buf); + let sqlite_res = self.sqlite_store.write(namespace, sub_namespace, key, buf); + let test_res = self.test_store.write(namespace, sub_namespace, key, buf); + + assert!(self.list(namespace, sub_namespace).unwrap().contains(&key.to_string())); + + match fs_res { + Ok(()) => { + assert!(sqlite_res.is_ok()); + assert!(test_res.is_ok()); + Ok(()) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert!(test_res.is_err()); + Err(e) + } + } + } + + fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> { + let fs_res = self.fs_store.remove(namespace, sub_namespace, key, lazy); + let sqlite_res = self.sqlite_store.remove(namespace, sub_namespace, key, lazy); + let test_res = self.test_store.remove(namespace, sub_namespace, key, lazy); + + assert!(!self.list(namespace, sub_namespace).unwrap().contains(&key.to_string())); + + match fs_res { + Ok(()) => { + assert!(sqlite_res.is_ok()); + assert!(test_res.is_ok()); + Ok(()) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert!(test_res.is_err()); + Err(e) + } + } + } + + fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result> { + let fs_res = self.fs_store.list(namespace, sub_namespace); + let sqlite_res = self.sqlite_store.list(namespace, sub_namespace); + let test_res = self.test_store.list(namespace, sub_namespace); + + match fs_res { + Ok(mut list) => { + list.sort(); + + let mut sqlite_list = sqlite_res.unwrap(); + sqlite_list.sort(); + assert_eq!(list, sqlite_list); + + let mut test_list = test_res.unwrap(); + test_list.sort(); + assert_eq!(list, test_list); + + Ok(list) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert!(test_res.is_err()); + Err(e) + } + } + } +} From b67b16d20f0f4b7068d161df0ad31919077f78c9 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 29 Sep 2023 12:46:49 +0200 Subject: [PATCH 5/5] f Make more robust and account for renamed namespaces --- lightning-persister/src/test_utils.rs | 123 +++++++++++++++++--------- 1 file changed, 80 insertions(+), 43 deletions(-) diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs index 6637921948a..89cd7e7f189 100644 --- a/lightning-persister/src/test_utils.rs +++ b/lightning-persister/src/test_utils.rs @@ -16,6 +16,7 @@ use rand::{thread_rng, Rng}; use std::panic::RefUnwindSafe; use std::path::PathBuf; +use std::sync::RwLock; pub fn random_storage_path() -> PathBuf { let mut temp_path = std::env::temp_dir(); @@ -139,6 +140,7 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // A `KVStore` impl for testing purposes that wraps all our `KVStore`s and asserts their synchronicity. pub(crate) struct TestSyncStore { + serializer: RwLock<()>, test_store: TestStore, fs_store: FilesystemStore, sqlite_store: SqliteStore, @@ -146,19 +148,61 @@ pub(crate) struct TestSyncStore { impl TestSyncStore { pub(crate) fn new(dest_dir: PathBuf) -> Self { - let fs_store = FilesystemStore::new(dest_dir.clone()); - let sqlite_store = SqliteStore::new(dest_dir, - Some("test_sync_db".to_string()), Some("test_sync_table".to_string())).unwrap(); + let serializer = RwLock::new(()); + let mut fs_dir = dest_dir.clone(); + fs_dir.push("fs_store"); + let fs_store = FilesystemStore::new(fs_dir); + let mut sql_dir = dest_dir.clone(); + sql_dir.push("sqlite_store"); + let sqlite_store = SqliteStore::new( + sql_dir, + Some("test_sync_db".to_string()), + Some("test_sync_table".to_string()), + ) + .unwrap(); let test_store = TestStore::new(false); - Self { fs_store, sqlite_store, test_store } + Self { serializer, fs_store, sqlite_store, test_store } + } + + fn do_list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result> { + let fs_res = self.fs_store.list(primary_namespace, secondary_namespace); + let sqlite_res = self.sqlite_store.list(primary_namespace, secondary_namespace); + let test_res = self.test_store.list(primary_namespace, secondary_namespace); + + match fs_res { + Ok(mut list) => { + list.sort(); + + let mut sqlite_list = sqlite_res.unwrap(); + sqlite_list.sort(); + assert_eq!(list, sqlite_list); + + let mut test_list = test_res.unwrap(); + test_list.sort(); + assert_eq!(list, test_list); + + Ok(list) + } + Err(e) => { + assert!(sqlite_res.is_err()); + assert!(test_res.is_err()); + Err(e) + } + } } } impl KVStore for TestSyncStore { - fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result> { - let fs_res = self.fs_store.read(namespace, sub_namespace, key); - let sqlite_res = self.sqlite_store.read(namespace, sub_namespace, key); - let test_res = self.test_store.read(namespace, sub_namespace, key); + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> std::io::Result> { + let _guard = self.serializer.read().unwrap(); + + let fs_res = self.fs_store.read(primary_namespace, secondary_namespace, key); + let sqlite_res = self.sqlite_store.read(primary_namespace, secondary_namespace, key); + let test_res = self.test_store.read(primary_namespace, secondary_namespace, key); match fs_res { Ok(read) => { @@ -176,12 +220,18 @@ impl KVStore for TestSyncStore { } } - fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> { - let fs_res = self.fs_store.write(namespace, sub_namespace, key, buf); - let sqlite_res = self.sqlite_store.write(namespace, sub_namespace, key, buf); - let test_res = self.test_store.write(namespace, sub_namespace, key, buf); + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8], + ) -> std::io::Result<()> { + let _guard = self.serializer.write().unwrap(); + let fs_res = self.fs_store.write(primary_namespace, secondary_namespace, key, buf); + let sqlite_res = self.sqlite_store.write(primary_namespace, secondary_namespace, key, buf); + let test_res = self.test_store.write(primary_namespace, secondary_namespace, key, buf); - assert!(self.list(namespace, sub_namespace).unwrap().contains(&key.to_string())); + assert!(self + .do_list(primary_namespace, secondary_namespace) + .unwrap() + .contains(&key.to_string())); match fs_res { Ok(()) => { @@ -197,12 +247,19 @@ impl KVStore for TestSyncStore { } } - fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> { - let fs_res = self.fs_store.remove(namespace, sub_namespace, key, lazy); - let sqlite_res = self.sqlite_store.remove(namespace, sub_namespace, key, lazy); - let test_res = self.test_store.remove(namespace, sub_namespace, key, lazy); + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> std::io::Result<()> { + let _guard = self.serializer.write().unwrap(); + let fs_res = self.fs_store.remove(primary_namespace, secondary_namespace, key, lazy); + let sqlite_res = + self.sqlite_store.remove(primary_namespace, secondary_namespace, key, lazy); + let test_res = self.test_store.remove(primary_namespace, secondary_namespace, key, lazy); - assert!(!self.list(namespace, sub_namespace).unwrap().contains(&key.to_string())); + assert!(!self + .do_list(primary_namespace, secondary_namespace) + .unwrap() + .contains(&key.to_string())); match fs_res { Ok(()) => { @@ -218,30 +275,10 @@ impl KVStore for TestSyncStore { } } - fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result> { - let fs_res = self.fs_store.list(namespace, sub_namespace); - let sqlite_res = self.sqlite_store.list(namespace, sub_namespace); - let test_res = self.test_store.list(namespace, sub_namespace); - - match fs_res { - Ok(mut list) => { - list.sort(); - - let mut sqlite_list = sqlite_res.unwrap(); - sqlite_list.sort(); - assert_eq!(list, sqlite_list); - - let mut test_list = test_res.unwrap(); - test_list.sort(); - assert_eq!(list, test_list); - - Ok(list) - } - Err(e) => { - assert!(sqlite_res.is_err()); - assert!(test_res.is_err()); - Err(e) - } - } + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> std::io::Result> { + let _guard = self.serializer.read().unwrap(); + self.do_list(primary_namespace, secondary_namespace) } }