Skip to content

Commit 0d89199

Browse files
committed
Filesystem blob store, for yer persistent volumes and the like
Signed-off-by: itowlson <ivan.towlson@fermyon.com>
1 parent adf0124 commit 0d89199

File tree

5 files changed

+356
-0
lines changed

5 files changed

+356
-0
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/blobstore-fs/Cargo.toml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
[package]
2+
name = "spin-blobstore-fs"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
homepage.workspace = true
8+
repository.workspace = true
9+
rust-version.workspace = true
10+
11+
[dependencies]
12+
anyhow = { workspace = true }
13+
futures = { workspace = true }
14+
serde = { workspace = true }
15+
spin-core = { path = "../core" }
16+
spin-factor-blobstore = { path = "../factor-blobstore" }
17+
tokio = { workspace = true }
18+
tokio-stream = "0.1.16"
19+
tokio-util = "0.7.12"
20+
walkdir = "2.5"
21+
wasmtime-wasi = { workspace = true }
22+
23+
[lints]
24+
workspace = true

crates/blobstore-fs/src/lib.rs

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
use std::{path::{Path, PathBuf}, sync::Arc};
2+
3+
use anyhow::Context;
4+
use serde::{Deserialize, Serialize};
5+
6+
use spin_core::async_trait;
7+
use spin_factor_blobstore::runtime_config::spin::MakeBlobStore;
8+
9+
/// A blob store that uses a persistent file system volume
10+
/// as a back end.
11+
#[derive(Default)]
12+
pub struct FileSystemBlobStore {
13+
_priv: (),
14+
}
15+
16+
impl FileSystemBlobStore {
17+
/// Creates a new `FileSystemBlobStore`.
18+
pub fn new() -> Self {
19+
Self::default()
20+
}
21+
}
22+
23+
impl MakeBlobStore for FileSystemBlobStore {
24+
const RUNTIME_CONFIG_TYPE: &'static str = "file_system";
25+
26+
type RuntimeConfig = FileSystemBlobStoreRuntimeConfig;
27+
28+
type ContainerManager = BlobStoreFileSystem;
29+
30+
fn make_store(
31+
&self,
32+
runtime_config: Self::RuntimeConfig,
33+
) -> anyhow::Result<Self::ContainerManager> {
34+
Ok(BlobStoreFileSystem::new(runtime_config.path))
35+
}
36+
}
37+
38+
pub struct BlobStoreFileSystem {
39+
path: PathBuf,
40+
}
41+
42+
impl BlobStoreFileSystem {
43+
fn new(path: PathBuf) -> Self {
44+
Self {
45+
path,
46+
}
47+
}
48+
}
49+
50+
/// The serialized runtime configuration for the in memory blob store.
51+
#[derive(Deserialize, Serialize)]
52+
pub struct FileSystemBlobStoreRuntimeConfig {
53+
path: PathBuf,
54+
}
55+
56+
#[async_trait]
57+
impl spin_factor_blobstore::ContainerManager for BlobStoreFileSystem {
58+
async fn get(&self, name: &str) -> Result<Arc<dyn spin_factor_blobstore::Container>, String> {
59+
let container = FileSystemContainer::new(name, &self.path);
60+
Ok(Arc::new(container))
61+
}
62+
63+
fn is_defined(&self, _container_name: &str) -> bool {
64+
true
65+
}
66+
}
67+
68+
struct FileSystemContainer {
69+
name: String,
70+
path: PathBuf,
71+
}
72+
73+
impl FileSystemContainer {
74+
fn new(name: &str, path: &Path) -> Self {
75+
Self {
76+
name: name.to_string(),
77+
path: path.to_owned(),
78+
}
79+
}
80+
81+
fn object_path(&self, name: &str) -> anyhow::Result<PathBuf> {
82+
validate_no_escape(name)?;
83+
Ok(self.path.join(name))
84+
}
85+
}
86+
87+
fn validate_no_escape(name: &str) -> anyhow::Result<()> {
88+
// TODO: this is hopelessly naive but will do for testing
89+
if name.contains("..") {
90+
anyhow::bail!("path tries to escape from base directory");
91+
}
92+
Ok(())
93+
}
94+
95+
#[async_trait]
96+
impl spin_factor_blobstore::Container for FileSystemContainer {
97+
async fn exists(&self) -> anyhow::Result<bool> {
98+
Ok(true)
99+
}
100+
async fn name(&self) -> String {
101+
self.name.clone()
102+
}
103+
async fn info(&self) -> anyhow::Result<spin_factor_blobstore::ContainerMetadata> {
104+
todo!()
105+
}
106+
async fn clear(&self) -> anyhow::Result<()> {
107+
let entries = std::fs::read_dir(&self.path)?.collect::<Vec<_>>();
108+
109+
for entry in entries {
110+
let entry = entry?;
111+
if entry.metadata()?.is_dir() {
112+
std::fs::remove_dir_all(entry.path())?;
113+
} else {
114+
std::fs::remove_file(entry.path())?;
115+
}
116+
}
117+
118+
Ok(())
119+
}
120+
async fn delete_object(&self, name: &str) -> anyhow::Result<()> {
121+
tokio::fs::remove_file(self.object_path(name)?).await?;
122+
Ok(())
123+
}
124+
async fn delete_objects(&self, names: &[String]) -> anyhow::Result<()> {
125+
let futs = names.iter().map(|name| self.delete_object(name));
126+
let results = futures::future::join_all(futs).await;
127+
128+
if let Some(err_result) = results.into_iter().find(|r| r.is_err()) {
129+
err_result
130+
} else {
131+
Ok(())
132+
}
133+
}
134+
async fn has_object(&self, name: &str) -> anyhow::Result<bool> {
135+
Ok(self.object_path(name)?.exists())
136+
}
137+
async fn object_info(&self, name: &str) -> anyhow::Result<spin_factor_blobstore::ObjectMetadata> {
138+
let meta = tokio::fs::metadata(self.object_path(name)?).await?;
139+
let created_at = meta.created()?.duration_since(std::time::SystemTime::UNIX_EPOCH)?.as_nanos().try_into()?;
140+
Ok(spin_factor_blobstore::ObjectMetadata {
141+
name: name.to_string(),
142+
container: self.name.to_string(),
143+
created_at,
144+
size: meta.len(),
145+
})
146+
}
147+
async fn get_data(&self, name: &str, start: u64, end: u64) -> anyhow::Result<Box<dyn spin_factor_blobstore::IncomingData>> {
148+
let path = self.object_path(name)?;
149+
let file = tokio::fs::File::open(&path).await?;
150+
151+
Ok(Box::new(BlobContent { file: Some(file), start, end }))
152+
}
153+
154+
async fn connect_stm(&self, name: &str, mut stm: tokio::io::ReadHalf<tokio::io::SimplexStream>, finished_tx: tokio::sync::mpsc::Sender<()>) -> anyhow::Result<()> {
155+
let path = self.object_path(name)?;
156+
if let Some(dir) = path.parent() {
157+
tokio::fs::create_dir_all(dir).await?;
158+
}
159+
let mut file = tokio::fs::File::create(&path).await?;
160+
161+
tokio::spawn(async move {
162+
use tokio::io::AsyncWriteExt;
163+
use tokio::io::AsyncReadExt;
164+
165+
const BUF_SIZE: usize = 8192;
166+
167+
loop {
168+
// TODO: I think errors should send `finished_tx`.
169+
// (Should finished_tx take a `Result<()` for error reporting?)
170+
let mut buf = vec![0; BUF_SIZE];
171+
let count = stm.read(&mut buf).await.unwrap();
172+
if count == 0 {
173+
finished_tx.send(()).await.unwrap();
174+
break;
175+
}
176+
file.write_all(&buf[0..count]).await.unwrap();
177+
}
178+
});
179+
180+
Ok(())
181+
}
182+
183+
async fn list_objects(&self) -> anyhow::Result<Box<dyn spin_factor_blobstore::ObjectNames>> {
184+
if !self.path.is_dir() {
185+
anyhow::bail!("Backing store for {} does not exist or is not a directory", self.name);
186+
}
187+
Ok(Box::new(BlobNames::new(&self.path)))
188+
}
189+
}
190+
191+
struct BlobContent {
192+
file: Option<tokio::fs::File>,
193+
start: u64,
194+
end: u64,
195+
}
196+
197+
#[async_trait]
198+
impl spin_factor_blobstore::IncomingData for BlobContent {
199+
async fn consume_sync(&mut self) -> anyhow::Result<Vec<u8>> {
200+
use tokio::io::{AsyncReadExt, AsyncSeekExt};
201+
202+
let mut file = self.file.take().context("already consumed")?;
203+
204+
let mut buf = Vec::with_capacity(1000);
205+
206+
file.seek(std::io::SeekFrom::Start(self.start)).await?;
207+
file.take(self.end - self.start).read_to_end(&mut buf).await?;
208+
209+
Ok(buf)
210+
}
211+
212+
fn consume_async(&mut self) -> wasmtime_wasi::pipe::AsyncReadStream {
213+
use futures::TryStreamExt;
214+
use futures::StreamExt;
215+
use tokio_util::compat::FuturesAsyncReadCompatExt;
216+
217+
let file = self.file.take().unwrap();
218+
let stm = tokio_util::io::ReaderStream::new(file)
219+
.skip(self.start.try_into().unwrap())
220+
.take((self.end - self.start).try_into().unwrap());
221+
222+
let ar = stm.into_async_read().compat();
223+
wasmtime_wasi::pipe::AsyncReadStream::new(ar)
224+
}
225+
226+
async fn size(&mut self) -> anyhow::Result<u64> {
227+
let file = self.file.as_ref().context("already consumed")?;
228+
let meta = file.metadata().await?;
229+
Ok(meta.len())
230+
}
231+
}
232+
233+
struct BlobNames {
234+
// This isn't async like tokio ReadDir, but it saves us having
235+
// to manage state ourselves as we traverse into subdirectories.
236+
walk_dir: Box<dyn Iterator<Item = Result<PathBuf, walkdir::Error>> + Send + Sync>,
237+
238+
base_path: PathBuf,
239+
}
240+
241+
impl BlobNames {
242+
fn new(path: &Path) -> Self {
243+
let walk_dir = walkdir::WalkDir::new(path).into_iter().filter_map(as_file_path);
244+
Self { walk_dir: Box::new(walk_dir), base_path: path.to_owned() }
245+
}
246+
247+
fn object_name(&self, path: &Path) -> anyhow::Result<String> {
248+
Ok(path.strip_prefix(&self.base_path).map(|p| format!("{}", p.display()))?)
249+
}
250+
}
251+
252+
fn as_file_path(entry: Result<walkdir::DirEntry, walkdir::Error>) -> Option<Result<PathBuf, walkdir::Error>> {
253+
match entry {
254+
Err(err) => Some(Err(err)),
255+
Ok(entry) => if entry.file_type().is_file() { Some(Ok(entry.into_path())) } else { None },
256+
}
257+
}
258+
259+
#[async_trait]
260+
impl spin_factor_blobstore::ObjectNames for BlobNames {
261+
async fn read(&mut self, len: u64) -> anyhow::Result<(Vec<String>, bool)> {
262+
let mut names = Vec::with_capacity(len.try_into().unwrap_or_default());
263+
let mut at_end = false;
264+
265+
for _ in 0..len {
266+
match self.walk_dir.next() {
267+
None => {
268+
at_end = true;
269+
break;
270+
}
271+
Some(Err(e)) => {
272+
anyhow::bail!(e);
273+
}
274+
Some(Ok(path)) => {
275+
names.push(self.object_name(&path)?);
276+
}
277+
}
278+
}
279+
280+
// We could report "at end" when we actually just returned the last file.
281+
// It's not worth messing around with peeking ahead because the cost to the
282+
// guest of making a call that returns nothing is (hopefully) small.
283+
Ok((names, at_end))
284+
}
285+
286+
async fn skip(&mut self, num: u64) -> anyhow::Result<(u64,bool)> {
287+
// TODO: we could save semi-duplicate code by delegating to `read`?
288+
// The cost would be a bunch of allocation but that seems minor when
289+
// you're dealing with the filesystem.
290+
291+
let mut count = 0;
292+
let mut at_end = false;
293+
294+
for _ in 0..num {
295+
match self.walk_dir.next() {
296+
None => {
297+
at_end = true;
298+
break;
299+
}
300+
Some(Err(e)) => {
301+
anyhow::bail!(e);
302+
}
303+
Some(Ok(_)) => {
304+
count += 1;
305+
}
306+
}
307+
}
308+
309+
Ok((count, at_end))
310+
}
311+
}

crates/runtime-config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ rust-version.workspace = true
1212
anyhow = { workspace = true }
1313
spin-blobstore-s3 = { path = "../blobstore-s3" }
1414
spin-blobstore-azure = { path = "../blobstore-azure" }
15+
spin-blobstore-fs = { path = "../blobstore-fs" }
1516
spin-blobstore-memory = { path = "../blobstore-memory" }
1617
spin-common = { path = "../common" }
1718
spin-factor-blobstore = { path = "../factor-blobstore" }

crates/runtime-config/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,9 @@ pub fn blobstore_config_resolver(
466466
cr
467467
.register_store_type(spin_blobstore_memory::MemoryBlobStore::new())
468468
.unwrap();
469+
cr
470+
.register_store_type(spin_blobstore_fs::FileSystemBlobStore::new())
471+
.unwrap();
469472
cr
470473
.register_store_type(spin_blobstore_azure::AzureBlobStoreBuilder::new())
471474
.unwrap();

0 commit comments

Comments
 (0)