Skip to content

Commit a1cd366

Browse files
committed
feat(foundationdb): add support for complex paths in directory
1 parent 26814a1 commit a1cd366

File tree

3 files changed

+121
-77
lines changed

3 files changed

+121
-77
lines changed

foundationdb/src/directory/directory.rs

Lines changed: 59 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
// copied, modified, or distributed except according to those terms.
88

99
use crate::directory::node;
10+
use crate::directory::node::Node;
1011
use crate::future::FdbSlice;
1112
use crate::tuple::hca::HighContentionAllocator;
1213
use crate::tuple::{pack_into, Subspace};
@@ -59,20 +60,30 @@ impl Default for DirectoryLayer {
5960
}
6061

6162
impl DirectoryLayer {
62-
fn get_path(&self) -> Vec<String> {
63-
self.path.to_owned()
63+
pub async fn create_or_open(
64+
&self,
65+
txn: &Transaction,
66+
paths: Vec<String>,
67+
) -> Result<Subspace, DirectoryError> {
68+
self.create_or_open_internal(txn, paths, vec![], vec![], true, true)
69+
.await
6470
}
6571

66-
fn get_layer(&self) -> Vec<u8> {
67-
self.layer.to_owned()
72+
pub async fn create(
73+
&self,
74+
txn: &Transaction,
75+
paths: Vec<String>,
76+
) -> Result<Subspace, DirectoryError> {
77+
self.create_or_open_internal(txn, paths, vec![], vec![], true, false)
78+
.await
6879
}
6980

70-
pub async fn create_or_open(
81+
pub async fn open(
7182
&self,
7283
txn: &Transaction,
7384
paths: Vec<String>,
7485
) -> Result<Subspace, DirectoryError> {
75-
self.create_or_open_internal(txn, paths, vec![], vec![], true, true)
86+
self.create_or_open_internal(txn, paths, vec![], vec![], false, true)
7687
.await
7788
}
7889

@@ -103,11 +114,15 @@ impl DirectoryLayer {
103114
return Err(DirectoryError::NoPathProvided);
104115
}
105116

106-
let mut node = self.find(trx, paths.to_owned()).await?;
107-
node.prefetch_metadata(trx).await?;
117+
let nodes = self.find_nodes(trx, paths.to_owned()).await?;
118+
119+
let last_node = nodes.last().expect("could not contain 0 nodes");
120+
121+
// if the node_subspace of the last element exists, then we do not need to create anything
122+
// and we can return it directly
123+
if last_node.content_subspace.is_some() {
124+
let node = nodes.last().expect("could not contain 0 node");
108125

109-
// subspace already exists
110-
if node.content_subspace.is_some() {
111126
if !allow_open {
112127
return Err(DirectoryError::DirAlreadyExists);
113128
}
@@ -116,34 +131,21 @@ impl DirectoryLayer {
116131
node.check_layer(layer)?;
117132
}
118133

119-
return Ok(node.content_subspace.unwrap().to_owned());
134+
return Ok(node.content_subspace.clone().unwrap());
120135
}
121136

122-
// subspace does not exists
137+
// at least one node does not exists, we need to create them
123138
if !allow_create {
124139
return Err(DirectoryError::DirNotExists);
125140
}
126141

127-
if prefix.len() > 0 {
128-
unimplemented!("no prefix allowed yet")
129-
}
142+
let mut subspace = self.content_subspace.clone();
130143

131-
if paths.len() != 1 {
132-
unimplemented!("paths too long for now")
144+
for mut node in nodes {
145+
let allocator = self.allocator.allocate(trx).await?;
146+
subspace = node.create_subspace(&trx, allocator, &subspace).await?;
133147
}
134-
135-
let allocator = self.allocator.allocate(trx).await?;
136-
let new_subspace = self.content_subspace.subspace(&allocator);
137-
138-
// store node in the node_subspace
139-
let mut new_node_key = vec![DEFAULT_SUB_DIRS];
140-
pack_into(paths.get(0).unwrap(), &mut new_node_key);
141-
trx.set(
142-
self.node_subspace.subspace(&new_node_key).bytes(),
143-
new_subspace.bytes(),
144-
);
145-
146-
Ok(new_subspace)
148+
Ok(subspace)
147149
}
148150

149151
async fn check_version(
@@ -195,42 +197,46 @@ impl DirectoryLayer {
195197
value.write_u32::<LittleEndian>(MINOR_VERSION).unwrap();
196198
value.write_u32::<LittleEndian>(PATCH_VERSION).unwrap();
197199
let version_subspace: &[u8] = b"version";
198-
let version_key = self.node_subspace.subspace(&version_subspace);
199-
200-
trx.set(version_key.bytes(), &value);
200+
let directory_version_key = self.node_subspace.subspace(&version_subspace);
201+
trx.set(directory_version_key.bytes(), &value);
201202

202203
Ok(())
203204
}
204205

205-
async fn find(&self, trx: &Transaction, path: Vec<String>) -> Result<node::Node, FdbError> {
206-
let mut node = node::Node {
207-
subspace: self.node_subspace.to_owned(),
208-
path: path.to_owned(),
209-
target_path: vec![],
210-
layer: None,
211-
content_subspace: None,
212-
already_fetched_metadata: false,
213-
};
214-
for path_name in path.to_owned() {
206+
/// walk is crawling the node_subspace and searching for the nodes
207+
/// Result will hold a Vec with at least two nodes: root node and as many nodes as paths
208+
async fn find_nodes(
209+
&self,
210+
trx: &Transaction,
211+
paths: Vec<String>,
212+
) -> Result<Vec<node::Node>, FdbError> {
213+
let mut nodes = vec![];
214+
215+
let mut subspace = self.node_subspace.to_owned();
216+
217+
for path_name in paths {
215218
let mut next_node_key = vec![DEFAULT_SUB_DIRS];
216219
pack_into(&path_name, &mut next_node_key);
217-
let next_node_subspace = node.subspace.subspace(&next_node_key);
220+
subspace = subspace.subspace(&next_node_key);
218221

219-
match trx.get(next_node_subspace.bytes(), false).await? {
220-
None => {
221-
if !path.ends_with(&[path_name]) {
222-
unimplemented!("node not found")
223-
}
224-
}
222+
let mut node = Node {
223+
layer: None,
224+
node_subspace: subspace.to_owned(),
225+
content_subspace: None,
226+
};
227+
228+
node.retrieve_layer(&trx).await?;
229+
230+
match trx.get(node.node_subspace.bytes(), false).await? {
225231
Some(fdb_slice) => {
226-
node.subspace = next_node_subspace;
227232
node.content_subspace = Some(Subspace::from_bytes(&*fdb_slice));
228-
node.path.push(path_name.to_owned());
229233
}
234+
_ => {} // noop in case of a none existing node
230235
}
236+
nodes.push(node);
231237
}
232238

233-
Ok(node)
239+
Ok(nodes)
234240
}
235241

236242
async fn get_version_value(&self, trx: &Transaction) -> FdbResult<Option<FdbSlice>> {

foundationdb/src/directory/node.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,10 @@ use crate::tuple::Subspace;
1010
use crate::{DirectoryError, FdbError, Transaction};
1111

1212
pub(crate) struct Node {
13-
pub(crate) subspace: Subspace,
14-
pub(crate) path: Vec<String>,
15-
pub(crate) target_path: Vec<String>,
1613
pub(crate) layer: Option<Vec<u8>>,
1714

15+
pub(crate) node_subspace: Subspace,
1816
pub(crate) content_subspace: Option<Subspace>,
19-
20-
pub(crate) already_fetched_metadata: bool,
2117
}
2218

2319
impl Node {
@@ -34,23 +30,30 @@ impl Node {
3430
}
3531
}
3632

37-
// TODO: https://docs.rs/futures/0.3.4/futures/future/trait.FutureExt.html#method.shared
38-
pub(crate) async fn prefetch_metadata(&mut self, trx: &Transaction) -> Result<(), FdbError> {
39-
if !self.already_fetched_metadata {
40-
self.layer(trx).await?;
41-
self.already_fetched_metadata = true;
42-
}
43-
Ok(())
33+
pub(crate) async fn create_subspace(
34+
&mut self,
35+
trx: &Transaction,
36+
allocator: i64,
37+
parent_subspace: &Subspace,
38+
) -> Result<Subspace, DirectoryError> {
39+
let new_subspace = parent_subspace.subspace(&allocator);
40+
41+
let key = self.node_subspace.to_owned();
42+
trx.set(key.bytes(), new_subspace.bytes());
43+
44+
self.content_subspace = Some(new_subspace.to_owned());
45+
46+
Ok(new_subspace.clone())
4447
}
4548

4649
// retrieve the layer used for this node
47-
pub(crate) async fn layer(&mut self, trx: &Transaction) -> Result<(), FdbError> {
50+
pub(crate) async fn retrieve_layer(&mut self, trx: &Transaction) -> Result<(), FdbError> {
4851
if self.layer == None {
49-
let key = self.subspace.subspace(&b"layer".to_vec());
52+
let key = self.node_subspace.subspace(&b"layer".to_vec());
5053
self.layer = match trx.get(key.bytes(), false).await {
5154
Ok(None) => Some(vec![]),
5255
Err(err) => return Err(err),
53-
Ok(Some(fv)) => Some(fv.to_vec()),
56+
Ok(Some(fdb_slice)) => Some(fdb_slice.to_vec()),
5457
}
5558
}
5659
Ok(())

foundationdb/tests/directory.rs

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,62 @@ use foundationdb::*;
99

1010
mod common;
1111

12-
async fn test_create_then_open_async() -> FdbResult<()> {
13-
let paths = vec![String::from("some_path")];
14-
let directory = DirectoryLayer::default();
15-
16-
let db = common::database().await?;
12+
async fn test_create_then_open_async(
13+
db: &Database,
14+
directory: &DirectoryLayer,
15+
paths: Vec<String>,
16+
) -> FdbResult<()> {
17+
eprintln!("creating directory for {:?}", paths.to_owned());
1718
let trx = db.create_trx()?;
18-
let create_output = directory.create_or_open(&trx, paths.to_owned()).await;
19+
let create_output = directory.create(&trx, paths.to_owned()).await;
1920
assert!(create_output.is_ok());
2021

2122
trx.commit().await?;
23+
eprintln!("opening directory for {:?}", paths.to_owned());
2224

2325
let trx = db.create_trx()?;
24-
let get_output = directory.create_or_open(&trx, paths.to_owned()).await;
26+
let get_output = directory.open(&trx, paths.to_owned()).await;
2527
assert!(get_output.is_ok());
28+
2629
assert_eq!(create_output.unwrap().bytes(), get_output.unwrap().bytes());
30+
Ok(())
31+
}
2732

33+
async fn test_create_or_open_async(
34+
db: &Database,
35+
directory: &DirectoryLayer,
36+
paths: Vec<String>,
37+
) -> FdbResult<()> {
38+
let trx = db.create_trx()?;
39+
let create_output = directory.create_or_open(&trx, paths.to_owned()).await;
40+
assert!(create_output.is_ok());
2841
Ok(())
2942
}
3043

3144
#[test]
32-
fn test_create_or_open() {
45+
fn test_directory() {
3346
let _guard = unsafe { foundationdb::boot() };
34-
futures::executor::block_on(test_create_then_open_async()).expect("failed to run");
47+
let db = futures::executor::block_on(common::database()).expect("cannot open fdb");
48+
49+
eprintln!("clearing all keys");
50+
let trx = db.create_trx().expect("cannot create txn");
51+
trx.clear_range(b"\x00", b"\xff");
52+
futures::executor::block_on(trx.commit()).expect("could not clear keys");
53+
54+
eprintln!("creating directories");
55+
let directory = DirectoryLayer::default();
56+
57+
futures::executor::block_on(test_create_or_open_async(
58+
&db,
59+
&directory,
60+
vec![String::from("a")],
61+
))
62+
.expect("failed to run");
63+
64+
futures::executor::block_on(test_create_then_open_async(
65+
&db,
66+
&directory,
67+
vec![String::from("b"), String::from("a")],
68+
))
69+
.expect("failed to run");
3570
}

0 commit comments

Comments
 (0)