Skip to content

Commit ff42b12

Browse files
committed
foundationdb: add DirectorySubspace
1 parent 852f993 commit ff42b12

File tree

4 files changed

+199
-59
lines changed

4 files changed

+199
-59
lines changed

foundationdb/src/directory/mod.rs

Lines changed: 168 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::directory::error::DirectoryError;
1818
use crate::directory::node::Node;
1919
use crate::future::FdbSlice;
2020
use crate::tuple::hca::HighContentionAllocator;
21-
use crate::tuple::Subspace;
21+
use crate::tuple::{PackResult, Subspace, TuplePack, TupleUnpack};
2222
use crate::{FdbResult, Transaction};
2323
use byteorder::{LittleEndian, WriteBytesExt};
2424

@@ -75,44 +75,7 @@ const DEFAULT_SUB_DIRS: i64 = 0;
7575
/// futures::executor::block_on(async_main()).expect("failed to run");
7676
/// drop(network);
7777
/// ```
78-
/// ## How it works
79-
///
80-
/// Here's what will be generated when using the Directory to create a path `/app/my-app`:
81-
///
82-
/// ```text
83-
/// +
84-
/// |
85-
/// | version = (1,0,0) # Directory's version
86-
/// |
87-
/// | +
88-
/// | "hca"| # used to allocate numbers like 12 and 42
89-
/// | +
90-
/// \xFE |
91-
/// node's | (0,"app")=12 # id allocated by the hca for "path"
92-
/// subspace | (0,"app","layer")="" # layer allow an ownership's mecanism
93-
/// |
94-
/// |
95-
/// | (0,"app",0,"my-app","layer")="" # layer allow an ownership's mecanism
96-
/// | (0,"app",0,"my-app")=42 # id allocated by the hca for "layer"
97-
/// +
98-
///
99-
///
100-
/// +
101-
/// |
102-
/// |
103-
/// (12,42) |
104-
/// content | # data's subspace for path "app","my-app"
105-
/// subspace |
106-
/// |
107-
/// +
108-
/// ```
109-
/// In this schema:
110-
///
111-
/// * vertical lines represents `Subspaces`,
112-
/// * `()` `Tuples`,
113-
/// * `#` comments.
114-
///
115-
#[derive(Debug)]
78+
#[derive(Debug, Clone)]
11679
pub struct DirectoryLayer {
11780
/// the subspace used to store the hierarchy of paths. Each path is composed of Nodes.
11881
/// Default is `Subspace::all()`.
@@ -156,7 +119,7 @@ impl DirectoryLayer {
156119
path: Vec<String>,
157120
prefix: Option<Vec<u8>>,
158121
layer: Option<Vec<u8>>,
159-
) -> Result<Subspace, DirectoryError> {
122+
) -> Result<DirectorySubspace, DirectoryError> {
160123
self.create_or_open_internal(txn, path, prefix, layer, true, true)
161124
.await
162125
}
@@ -170,7 +133,7 @@ impl DirectoryLayer {
170133
path: Vec<String>,
171134
prefix: Option<Vec<u8>>,
172135
layer: Option<Vec<u8>>,
173-
) -> Result<Subspace, DirectoryError> {
136+
) -> Result<DirectorySubspace, DirectoryError> {
174137
self.create_or_open_internal(txn, path, prefix, layer, true, true)
175138
.await
176139
}
@@ -184,7 +147,7 @@ impl DirectoryLayer {
184147
txn: &Transaction,
185148
path: Vec<String>,
186149
layer: Option<Vec<u8>>,
187-
) -> Result<Subspace, DirectoryError> {
150+
) -> Result<DirectorySubspace, DirectoryError> {
188151
self.create_or_open_internal(txn, path, None, layer, false, true)
189152
.await
190153
}
@@ -250,7 +213,7 @@ impl DirectoryLayer {
250213
layer: Option<Vec<u8>>,
251214
allow_create: bool,
252215
allow_open: bool,
253-
) -> Result<Subspace, DirectoryError> {
216+
) -> Result<DirectorySubspace, DirectoryError> {
254217
dbg!(&path, &prefix, &layer, allow_create, allow_open);
255218
self.check_version(trx, allow_create).await?;
256219

@@ -267,7 +230,7 @@ impl DirectoryLayer {
267230
}
268231

269232
match self
270-
.find_node(&trx, path.to_owned(), allow_create, prefix)
233+
.find_node(&trx, path.to_owned(), allow_create, prefix.to_owned())
271234
.await
272235
{
273236
Ok(node) => {
@@ -276,19 +239,58 @@ impl DirectoryLayer {
276239
return Err(DirectoryError::DirAlreadyExists);
277240
}
278241

279-
match layer {
242+
match layer.to_owned() {
280243
None => {}
281244
Some(l) => {
282245
node.check_layer(l)?;
283246
}
284247
}
285248

286-
Ok(node.content_subspace.unwrap())
249+
let subspace = self.node_with_prefix(node.node_subspace.bytes().pack_to_vec());
250+
251+
self.contents_of_node(subspace, path.to_owned(), layer.to_owned())
252+
.await
287253
}
288254
Err(err) => Err(err),
289255
}
290256
}
291257

258+
fn node_with_prefix(&self, prefix: Vec<u8>) -> Option<Subspace> {
259+
match prefix.len() {
260+
0 => None,
261+
_ => Some(self.node_subspace.subspace(&(prefix))),
262+
}
263+
}
264+
265+
// generate a DirectorySubspace
266+
async fn contents_of_node(
267+
&self,
268+
subspace: Option<Subspace>,
269+
path: Vec<String>,
270+
layer: Option<Vec<u8>>,
271+
) -> Result<DirectorySubspace, DirectoryError> {
272+
let subspace_bytes = match subspace {
273+
None => vec![],
274+
Some(s) => s.bytes().to_vec(),
275+
};
276+
277+
let p = self.node_subspace.unpack::<Vec<u8>>(&subspace_bytes)?;
278+
279+
let mut new_path = Vec::from(self.path.to_owned());
280+
for p in path {
281+
new_path.push(String::from(p));
282+
}
283+
284+
let ss = Subspace::from_bytes(&*p);
285+
286+
let layer = match layer {
287+
None => vec![],
288+
Some(layer) => layer,
289+
};
290+
291+
Ok(DirectorySubspace::new(ss, self.clone(), new_path, layer))
292+
}
293+
292294
/// `check_version` is checking the Directory's version in FDB.
293295
async fn check_version(
294296
&self,
@@ -419,6 +421,8 @@ impl DirectoryLayer {
419421
}
420422
}
421423

424+
dbg!(&node);
425+
422426
Ok(node)
423427
}
424428

@@ -433,4 +437,122 @@ impl DirectoryLayer {
433437
let version_key = self.get_root_node_subspace().subspace(&version_subspace);
434438
trx.get(version_key.bytes(), false).await
435439
}
440+
441+
// TODO: check that we have the same behavior than the Go's bindings:
442+
// func (dl directoryLayer) partitionSubpath(lpath, rpath []string) []string {
443+
// r := make([]string, len(lpath)-len(dl.path)+len(rpath))
444+
// copy(r, lpath[len(dl.path):])
445+
// copy(r[len(lpath)-len(dl.path):], rpath)
446+
// return r
447+
// }
448+
pub(crate) fn partition_subpath(
449+
&self,
450+
left_path: Vec<String>,
451+
right_path: Vec<String>,
452+
) -> Vec<String> {
453+
let mut r: Vec<String> = vec![];
454+
let extract_left = &left_path[self.path.len()..];
455+
r.extend_from_slice(extract_left);
456+
r.extend_from_slice(right_path.as_slice());
457+
r
458+
}
459+
}
460+
461+
/// `DirectorySubspace` is a directory that can act as a subspace
462+
#[derive(Debug)]
463+
pub struct DirectorySubspace {
464+
subspace: Subspace,
465+
directory: DirectoryLayer,
466+
path: Vec<String>,
467+
layer: Vec<u8>,
468+
}
469+
470+
// directory related func
471+
impl DirectorySubspace {
472+
pub fn new(
473+
subspace: Subspace,
474+
directory: DirectoryLayer,
475+
path: Vec<String>,
476+
layer: Vec<u8>,
477+
) -> Self {
478+
DirectorySubspace {
479+
subspace,
480+
directory,
481+
path,
482+
layer,
483+
}
484+
}
485+
486+
pub async fn create_or_open(
487+
&self,
488+
txn: &Transaction,
489+
path: Vec<String>,
490+
prefix: Option<Vec<u8>>,
491+
layer: Option<Vec<u8>>,
492+
) -> Result<DirectorySubspace, DirectoryError> {
493+
self.directory
494+
.create_or_open(
495+
txn,
496+
self.directory
497+
.partition_subpath(self.path.clone(), path.clone()),
498+
prefix,
499+
layer,
500+
)
501+
.await
502+
}
503+
504+
pub async fn create(
505+
&self,
506+
txn: &Transaction,
507+
path: Vec<String>,
508+
prefix: Option<Vec<u8>>,
509+
layer: Option<Vec<u8>>,
510+
) -> Result<DirectorySubspace, DirectoryError> {
511+
self.directory
512+
.create(
513+
txn,
514+
self.directory
515+
.partition_subpath(self.path.clone(), path.clone()),
516+
prefix,
517+
layer,
518+
)
519+
.await
520+
}
521+
522+
pub async fn open(
523+
&self,
524+
txn: &Transaction,
525+
path: Vec<String>,
526+
layer: Option<Vec<u8>>,
527+
) -> Result<DirectorySubspace, DirectoryError> {
528+
self.directory
529+
.open(
530+
txn,
531+
self.directory
532+
.partition_subpath(self.path.clone(), path.clone()),
533+
layer,
534+
)
535+
.await
536+
}
537+
}
538+
539+
// Subspace related func
540+
impl DirectorySubspace {
541+
/// Returns the key encoding the specified Tuple with the prefix of this Subspace
542+
/// prepended.
543+
pub fn pack<T: TuplePack>(&self, v: &T) -> Vec<u8> {
544+
self.subspace.pack(v)
545+
}
546+
547+
/// `unpack` returns the Tuple encoded by the given key with the prefix of this Subspace
548+
/// removed. `unpack` will return an error if the key is not in this Subspace or does not
549+
/// encode a well-formed Tuple.
550+
pub fn unpack<'de, T: TupleUnpack<'de>>(&self, key: &'de [u8]) -> PackResult<T> {
551+
self.subspace.unpack(key)
552+
}
553+
554+
/// `bytes` returns the literal bytes of the prefix of this Subspace.
555+
pub fn bytes(&self) -> &[u8] {
556+
self.subspace.bytes()
557+
}
436558
}

foundationdb/src/directory/node.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
// copied, modified, or distributed except according to those terms.
88

99
use crate::directory::DirectoryError;
10-
use crate::tuple::Subspace;
10+
use crate::tuple::{Subspace, TuplePack};
1111
use crate::{FdbError, RangeOption, Transaction};
1212

1313
/// Node are used to represent the paths generated by a Directory.
@@ -39,6 +39,10 @@ impl Node {
3939
match &self.layer {
4040
None => Err(DirectoryError::IncompatibleLayer),
4141
Some(layer_bytes) => {
42+
if String::from("partition").pack_to_vec().eq(&layer) {
43+
unimplemented!("partition is not yet supported")
44+
}
45+
4246
if layer_bytes.len() != layer.len() {
4347
Err(DirectoryError::IncompatibleLayer)
4448
} else {

foundationdb/src/tuple/hca.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,25 @@ impl TransactError for HcaError {
9191
/// Represents a High Contention Allocator for a given subspace
9292
#[derive(Debug)]
9393
pub struct HighContentionAllocator {
94+
// original subspace kept to implement Clone
95+
subspace: Subspace,
9496
counters: Subspace,
9597
recent: Subspace,
9698
allocation_mutex: Mutex<()>,
9799
}
98100

101+
impl Clone for HighContentionAllocator {
102+
fn clone(&self) -> Self {
103+
HighContentionAllocator::new(self.subspace.to_owned())
104+
}
105+
}
106+
99107
impl HighContentionAllocator {
100108
/// Constructs an allocator that will use the input subspace for assigning values.
101109
/// The given subspace should not be used by anything other than the allocator
102110
pub fn new(subspace: Subspace) -> HighContentionAllocator {
103111
HighContentionAllocator {
112+
subspace: subspace.clone(),
104113
counters: subspace.subspace(&0i64),
105114
recent: subspace.subspace(&1i64),
106115
allocation_mutex: Mutex::new(()),

0 commit comments

Comments
 (0)