Skip to content

Commit f7157c2

Browse files
committed
Add FilesystemStore
We upstream the `FilesystemStore` implementation, which is backwards compatible with `lightning-persister::FilesystemPersister`.
1 parent 6ea29e0 commit f7157c2

File tree

3 files changed

+275
-0
lines changed

3 files changed

+275
-0
lines changed

lightning-persister/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ libc = "0.2"
2020

2121
[target.'cfg(windows)'.dependencies]
2222
winapi = { version = "0.3", features = ["winbase"] }
23+
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
2324

2425
[target.'cfg(ldk_bench)'.dependencies]
2526
criterion = { version = "0.4", optional = true, default-features = false }

lightning-persister/src/fs_store.rs

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
//! Objects related to [`FilesystemStore`] live here.
2+
use lightning::util::persist::KVStore;
3+
4+
use std::collections::HashMap;
5+
use std::fs;
6+
use std::io::{BufReader, Read, Write};
7+
use std::path::{Path, PathBuf};
8+
use std::sync::{Arc, Mutex, RwLock};
9+
10+
#[cfg(not(target_os = "windows"))]
11+
use std::os::unix::io::AsRawFd;
12+
13+
#[cfg(target_os = "windows")]
14+
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
15+
16+
#[cfg(target_os = "windows")]
17+
macro_rules! call {
18+
($e: expr) => {
19+
if $e != 0 {
20+
return Ok(());
21+
} else {
22+
return Err(std::io::Error::last_os_error());
23+
}
24+
};
25+
}
26+
27+
#[cfg(target_os = "windows")]
28+
fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
29+
path.as_ref().encode_wide().chain(Some(0)).collect()
30+
}
31+
32+
/// A [`KVStore`] implementation that writes to and reads from the file system.
33+
pub struct FilesystemStore {
34+
data_dir: PathBuf,
35+
locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
36+
}
37+
38+
impl FilesystemStore {
39+
/// Constructs a new [`FilesystemStore`].
40+
pub fn new(data_dir: PathBuf) -> Self {
41+
let locks = Mutex::new(HashMap::new());
42+
Self { data_dir, locks }
43+
}
44+
45+
/// Returns the data directory.
46+
pub fn get_data_dir(&self) -> PathBuf {
47+
self.data_dir.clone()
48+
}
49+
}
50+
51+
impl KVStore for FilesystemStore {
52+
type Reader = FilesystemReader;
53+
54+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
55+
let mut outer_lock = self.locks.lock().unwrap();
56+
let lock_key = (namespace.to_string(), key.to_string());
57+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
58+
59+
if key.is_empty() {
60+
let msg = format!("Failed to read {}/{}: key may not be empty.", namespace, key);
61+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
62+
}
63+
64+
let mut dest_file_path = self.data_dir.clone();
65+
dest_file_path.push(namespace);
66+
dest_file_path.push(key);
67+
FilesystemReader::new(dest_file_path, inner_lock_ref)
68+
}
69+
70+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
71+
let mut outer_lock = self.locks.lock().unwrap();
72+
let lock_key = (namespace.to_string(), key.to_string());
73+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
74+
let _guard = inner_lock_ref.write().unwrap();
75+
76+
if key.is_empty() {
77+
let msg = format!("Failed to write {}/{}: key may not be empty.", namespace, key);
78+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
79+
}
80+
81+
let mut dest_file_path = self.data_dir.clone();
82+
dest_file_path.push(namespace);
83+
dest_file_path.push(key);
84+
85+
let parent_directory = dest_file_path
86+
.parent()
87+
.ok_or_else(|| {
88+
let msg =
89+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
90+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
91+
})?
92+
.to_path_buf();
93+
fs::create_dir_all(&parent_directory)?;
94+
95+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
96+
// We never want to end up in a state where we've lost the old data, or end up using the
97+
// old data on power loss after we've returned.
98+
// The way to atomically write a file on Unix platforms is:
99+
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
100+
let mut tmp_file_path = dest_file_path.clone();
101+
tmp_file_path.set_extension("tmp");
102+
103+
{
104+
let mut tmp_file = fs::File::create(&tmp_file_path)?;
105+
tmp_file.write_all(&buf)?;
106+
tmp_file.sync_all()?;
107+
}
108+
109+
#[cfg(not(target_os = "windows"))]
110+
{
111+
fs::rename(&tmp_file_path, &dest_file_path)?;
112+
let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
113+
unsafe {
114+
libc::fsync(dir_file.as_raw_fd());
115+
}
116+
Ok(())
117+
}
118+
119+
#[cfg(target_os = "windows")]
120+
{
121+
if dest_file_path.exists() {
122+
call!(unsafe {
123+
windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
124+
path_to_windows_str(dest_file_path).as_ptr(),
125+
path_to_windows_str(tmp_file_path).as_ptr(),
126+
std::ptr::null(),
127+
windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
128+
std::ptr::null_mut() as *const core::ffi::c_void,
129+
std::ptr::null_mut() as *const core::ffi::c_void,
130+
)
131+
});
132+
} else {
133+
call!(unsafe {
134+
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
135+
path_to_windows_str(tmp_file_path).as_ptr(),
136+
path_to_windows_str(dest_file_path).as_ptr(),
137+
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
138+
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
139+
)
140+
});
141+
}
142+
}
143+
}
144+
145+
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> {
146+
let mut outer_lock = self.locks.lock().unwrap();
147+
let lock_key = (namespace.to_string(), key.to_string());
148+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());
149+
150+
let _guard = inner_lock_ref.write().unwrap();
151+
152+
if key.is_empty() {
153+
let msg = format!("Failed to remove {}/{}: key may not be empty.", namespace, key);
154+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
155+
}
156+
157+
let mut dest_file_path = self.data_dir.clone();
158+
dest_file_path.push(namespace);
159+
dest_file_path.push(key);
160+
161+
if !dest_file_path.is_file() {
162+
return Ok(());
163+
}
164+
165+
fs::remove_file(&dest_file_path)?;
166+
#[cfg(not(target_os = "windows"))]
167+
{
168+
let parent_directory = dest_file_path.parent().ok_or_else(|| {
169+
let msg =
170+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
171+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
172+
})?;
173+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
174+
unsafe {
175+
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
176+
// to the inode might get cached (and hence possibly lost on crash), depending on
177+
// the target platform and file system.
178+
//
179+
// In order to assert we permanently removed the file in question we therefore
180+
// call `fsync` on the parent directory on platforms that support it,
181+
libc::fsync(dir_file.as_raw_fd());
182+
}
183+
}
184+
185+
if dest_file_path.is_file() {
186+
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
187+
}
188+
189+
if Arc::strong_count(&inner_lock_ref) == 2 {
190+
// It's safe to remove the lock entry if we're the only one left holding a strong
191+
// reference. Checking this is necessary to ensure we continue to distribute references to the
192+
// same lock as long as some Readers are around. However, we still want to
193+
// clean up the table when possible.
194+
//
195+
// Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
196+
// around, but is preferable to doing nothing *or* something overly complex such as
197+
// implementing yet another RAII structure just for this pupose.
198+
outer_lock.remove(&lock_key);
199+
}
200+
201+
// Garbage collect all lock entries that are not referenced anymore.
202+
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
203+
204+
Ok(())
205+
}
206+
207+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
208+
let mut prefixed_dest = self.data_dir.clone();
209+
prefixed_dest.push(namespace);
210+
211+
let mut keys = Vec::new();
212+
213+
if !Path::new(&prefixed_dest).exists() {
214+
return Ok(Vec::new());
215+
}
216+
217+
for entry in fs::read_dir(&prefixed_dest)? {
218+
let entry = entry?;
219+
let p = entry.path();
220+
221+
if !p.is_file() {
222+
continue;
223+
}
224+
225+
if let Some(ext) = p.extension() {
226+
if ext == "tmp" {
227+
continue;
228+
}
229+
}
230+
231+
if let Ok(relative_path) = p.strip_prefix(&prefixed_dest) {
232+
keys.push(relative_path.display().to_string())
233+
}
234+
}
235+
236+
Ok(keys)
237+
}
238+
}
239+
240+
/// A buffered [`Read`] implementation as returned from [`FilesystemStore::read`].
241+
pub struct FilesystemReader {
242+
inner: BufReader<fs::File>,
243+
lock_ref: Arc<RwLock<()>>,
244+
}
245+
246+
impl FilesystemReader {
247+
fn new(dest_file_path: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
248+
let f = fs::File::open(dest_file_path.clone())?;
249+
let inner = BufReader::new(f);
250+
Ok(Self { inner, lock_ref })
251+
}
252+
}
253+
254+
impl Read for FilesystemReader {
255+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
256+
let _guard = self.lock_ref.read().unwrap();
257+
self.inner.read(buf)
258+
}
259+
}
260+
261+
#[cfg(test)]
262+
mod tests {
263+
use super::*;
264+
use crate::test_utils::do_read_write_remove_list_persist;
265+
266+
#[test]
267+
fn read_write_remove_list_persist() {
268+
let temp_path = std::env::temp_dir();
269+
let fs_store = FilesystemStore::new(temp_path);
270+
do_read_write_remove_list_persist(&fs_store);
271+
}
272+
}

lightning-persister/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
#[cfg(ldk_bench)] extern crate criterion;
1212

13+
pub mod fs_store;
14+
1315
#[cfg(test)]
1416
mod test_utils;
1517

0 commit comments

Comments
 (0)