Skip to content

Commit e13fee9

Browse files
committed
Introduce KVStore trait and FilesystemStore impl
Rather than further relying on the upstream `KVStorePersister`, we here implement a general `KVStore` trait that allows access to `Read`s/`TransactionalWrite`s which may be used to deserialize/serialize data via the `Readable`/`Writeable` implementations. Notably `TransactionalWrite` is a `Write` for which the written data needs to be explictly `commit`ed, asserting that we always persist either the whole new change or no change at all.
1 parent 112e275 commit e13fee9

File tree

6 files changed

+437
-63
lines changed

6 files changed

+437
-63
lines changed

src/io/fs_store.rs

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
#[cfg(target_os = "windows")]
2+
extern crate winapi;
3+
4+
use super::{KVStore, KVStoreUnpersister, TransactionalWrite};
5+
6+
use std::fs;
7+
use std::io::{BufReader, BufWriter, Read, Write};
8+
use std::path::{Path, PathBuf};
9+
use std::str::FromStr;
10+
11+
#[cfg(not(target_os = "windows"))]
12+
use std::os::unix::io::AsRawFd;
13+
14+
use lightning::util::persist::KVStorePersister;
15+
use lightning::util::ser::Writeable;
16+
17+
#[cfg(target_os = "windows")]
18+
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
19+
20+
#[cfg(target_os = "windows")]
21+
macro_rules! call {
22+
($e: expr) => {
23+
if $e != 0 {
24+
return Ok(());
25+
} else {
26+
return Err(std::io::Error::last_os_error());
27+
}
28+
};
29+
}
30+
31+
#[cfg(target_os = "windows")]
32+
fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::WCHAR> {
33+
path.as_ref().encode_wide().chain(Some(0)).collect()
34+
}
35+
36+
pub struct FilesystemStore {
37+
dest_dir: PathBuf,
38+
}
39+
40+
impl FilesystemStore {
41+
pub fn new(dest_dir: PathBuf) -> Self {
42+
Self { dest_dir }
43+
}
44+
}
45+
46+
impl KVStore<FilesystemReader, FilesystemWriter> for FilesystemStore {
47+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<FilesystemReader> {
48+
let mut dest_file = self.dest_dir.clone();
49+
dest_file.push(namespace);
50+
dest_file.push(key);
51+
FilesystemReader::new(dest_file)
52+
}
53+
54+
fn write(&self, namespace: &str, key: &str) -> std::io::Result<FilesystemWriter> {
55+
let mut dest_file = self.dest_dir.clone();
56+
dest_file.push(namespace);
57+
dest_file.push(key);
58+
FilesystemWriter::new(dest_file)
59+
}
60+
61+
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
62+
let mut dest_file = self.dest_dir.clone();
63+
dest_file.push(namespace);
64+
dest_file.push(key);
65+
66+
if !dest_file.is_file() {
67+
return Ok(false);
68+
}
69+
70+
fs::remove_file(&dest_file)?;
71+
#[cfg(not(target_os = "windows"))]
72+
{
73+
let msg = format!("Could not retrieve parent directory of {}.", dest_file.display());
74+
let parent_directory = dest_file
75+
.parent()
76+
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg))?;
77+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
78+
unsafe {
79+
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
80+
// to the inode might get cached (and hence possibly lost on crash), depending on
81+
// the target platform and file system.
82+
//
83+
// In order to assert we permanently removed the file in question we therefore
84+
// call `fsync` on the parent directory on platforms that support it,
85+
libc::fsync(dir_file.as_raw_fd());
86+
}
87+
}
88+
89+
if dest_file.is_file() {
90+
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Unpersisting key failed"));
91+
}
92+
93+
Ok(true)
94+
}
95+
96+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
97+
let mut prefixed_dest = self.dest_dir.clone();
98+
prefixed_dest.push(namespace);
99+
100+
let mut keys = Vec::new();
101+
102+
if !Path::new(&prefixed_dest).exists() {
103+
return Ok(Vec::new());
104+
}
105+
106+
for entry in fs::read_dir(prefixed_dest.clone())? {
107+
let entry = entry?;
108+
let p = entry.path();
109+
110+
if !p.is_file() {
111+
continue;
112+
}
113+
114+
if let Some(ext) = p.extension() {
115+
if ext == "tmp" {
116+
continue;
117+
}
118+
}
119+
120+
if let Ok(relative_path) = p.strip_prefix(prefixed_dest.clone()) {
121+
keys.push(relative_path.display().to_string())
122+
}
123+
}
124+
125+
Ok(keys)
126+
}
127+
}
128+
129+
pub struct FilesystemReader {
130+
inner: BufReader<fs::File>,
131+
}
132+
133+
impl FilesystemReader {
134+
pub fn new(dest_file: PathBuf) -> std::io::Result<Self> {
135+
let f = fs::File::open(dest_file.clone())?;
136+
let inner = BufReader::new(f);
137+
Ok(Self { inner })
138+
}
139+
}
140+
141+
impl Read for FilesystemReader {
142+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
143+
self.inner.read(buf)
144+
}
145+
}
146+
147+
pub struct FilesystemWriter {
148+
dest_file: PathBuf,
149+
parent_directory: PathBuf,
150+
tmp_file: PathBuf,
151+
tmp_writer: BufWriter<fs::File>,
152+
}
153+
154+
impl FilesystemWriter {
155+
pub fn new(dest_file: PathBuf) -> std::io::Result<Self> {
156+
let msg = format!("Could not retrieve parent directory of {}.", dest_file.display());
157+
let parent_directory = dest_file
158+
.parent()
159+
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg))?
160+
.to_path_buf();
161+
fs::create_dir_all(parent_directory.clone())?;
162+
163+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
164+
// We never want to end up in a state where we've lost the old data, or end up using the
165+
// old data on power loss after we've returned.
166+
// The way to atomically write a file on Unix platforms is:
167+
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
168+
let mut tmp_file = dest_file.clone();
169+
tmp_file.set_extension("tmp");
170+
171+
let tmp_writer = BufWriter::new(fs::File::create(&tmp_file)?);
172+
173+
Ok(Self { dest_file, parent_directory, tmp_file, tmp_writer })
174+
}
175+
}
176+
177+
impl Write for FilesystemWriter {
178+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
179+
Ok(self.tmp_writer.write(buf)?)
180+
}
181+
182+
fn flush(&mut self) -> std::io::Result<()> {
183+
self.tmp_writer.flush()?;
184+
self.tmp_writer.get_ref().sync_all()?;
185+
Ok(())
186+
}
187+
}
188+
189+
impl TransactionalWrite for FilesystemWriter {
190+
fn commit(&mut self) -> std::io::Result<()> {
191+
self.flush()?;
192+
// Fsync the parent directory on Unix.
193+
#[cfg(not(target_os = "windows"))]
194+
{
195+
fs::rename(&self.tmp_file, &self.dest_file)?;
196+
let dir_file = fs::OpenOptions::new().read(true).open(self.parent_directory.clone())?;
197+
unsafe {
198+
libc::fsync(dir_file.as_raw_fd());
199+
}
200+
}
201+
202+
#[cfg(target_os = "windows")]
203+
{
204+
if dest_file.exists() {
205+
unsafe {
206+
winapi::um::winbase::ReplaceFileW(
207+
path_to_windows_str(dest_file).as_ptr(),
208+
path_to_windows_str(tmp_file).as_ptr(),
209+
std::ptr::null(),
210+
winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS,
211+
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
212+
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
213+
)
214+
};
215+
} else {
216+
call!(unsafe {
217+
winapi::um::winbase::MoveFileExW(
218+
path_to_windows_str(tmp_file).as_ptr(),
219+
path_to_windows_str(dest_file).as_ptr(),
220+
winapi::um::winbase::MOVEFILE_WRITE_THROUGH
221+
| winapi::um::winbase::MOVEFILE_REPLACE_EXISTING,
222+
)
223+
});
224+
}
225+
}
226+
Ok(())
227+
}
228+
}
229+
230+
impl KVStorePersister for FilesystemStore {
231+
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
232+
let msg = format!("Could not persist file for key {}.", prefixed_key);
233+
let dest_file = PathBuf::from_str(prefixed_key).map_err(|_| {
234+
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg.clone())
235+
})?;
236+
237+
let parent_directory = dest_file.parent().ok_or(lightning::io::Error::new(
238+
lightning::io::ErrorKind::InvalidInput,
239+
msg.clone(),
240+
))?;
241+
let namespace = parent_directory.display().to_string();
242+
243+
let dest_without_namespace = dest_file
244+
.strip_prefix(&namespace)
245+
.map_err(|_| lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?;
246+
let key = dest_without_namespace.display().to_string();
247+
let mut writer = self.write(&namespace, &key)?;
248+
object.write(&mut writer)?;
249+
Ok(writer.commit()?)
250+
}
251+
}
252+
253+
impl KVStoreUnpersister for FilesystemStore {
254+
fn unpersist(&self, key: &str) -> std::io::Result<bool> {
255+
let msg = format!("Could not retrieve file for key {}.", key);
256+
let dest_file = PathBuf::from_str(key)
257+
.map_err(|_| lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?;
258+
let msg = format!("Could not retrieve parent directory of {}.", dest_file.display());
259+
let parent_directory = dest_file
260+
.parent()
261+
.ok_or(lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?;
262+
let namespace = parent_directory.display().to_string();
263+
self.remove(&namespace, key)
264+
}
265+
}
266+
267+
#[cfg(test)]
268+
mod tests {
269+
use super::*;
270+
use crate::test::utils::random_storage_path;
271+
use lightning::util::persist::KVStorePersister;
272+
use lightning::util::ser::{Readable, Writeable};
273+
274+
use proptest::prelude::*;
275+
proptest! {
276+
#[test]
277+
fn read_write_remove_list_persist(data in any::<[u8; 32]>()) {
278+
let rand_dir = random_storage_path();
279+
280+
let fs_store = FilesystemStore::new(rand_dir.into());
281+
let namespace = "testspace";
282+
let key = "testkey";
283+
284+
// Test the basic KVStore operations.
285+
let mut writer = fs_store.write(namespace, key).unwrap();
286+
data.write(&mut writer).unwrap();
287+
writer.commit().unwrap();
288+
289+
let listed_keys = fs_store.list(namespace).unwrap();
290+
assert_eq!(listed_keys.len(), 1);
291+
assert_eq!(listed_keys[0], "testkey");
292+
293+
let mut reader = fs_store.read(namespace, key).unwrap();
294+
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
295+
assert_eq!(data, read_data);
296+
297+
fs_store.remove(namespace, key).unwrap();
298+
299+
let listed_keys = fs_store.list(namespace).unwrap();
300+
assert_eq!(listed_keys.len(), 0);
301+
302+
// Test KVStorePersister
303+
let prefixed_key = format!("{}/{}", namespace, key);
304+
fs_store.persist(&prefixed_key, &data).unwrap();
305+
let mut reader = fs_store.read(namespace, key).unwrap();
306+
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
307+
assert_eq!(data, read_data);
308+
}
309+
}
310+
}

0 commit comments

Comments
 (0)