From 250b8fd554a73f267d7997588c6e06d8dfc79cad Mon Sep 17 00:00:00 2001 From: Garren Smith Date: Thu, 27 Feb 2020 19:21:01 +0200 Subject: [PATCH 1/4] setup --- foundationdb/src/directory/directory.rs | 13 ++++++++++++ foundationdb/src/directory/mod.rs | 5 +++++ foundationdb/src/lib.rs | 2 ++ foundationdb/tests/directory.rs | 28 +++++++++++++++++++++++++ scripts/run_bindingtester.sh | 4 ++-- 5 files changed, 50 insertions(+), 2 deletions(-) create mode 100644 foundationdb/src/directory/directory.rs create mode 100644 foundationdb/src/directory/mod.rs create mode 100644 foundationdb/tests/directory.rs diff --git a/foundationdb/src/directory/directory.rs b/foundationdb/src/directory/directory.rs new file mode 100644 index 00000000..286f2317 --- /dev/null +++ b/foundationdb/src/directory/directory.rs @@ -0,0 +1,13 @@ +use crate::Transaction; + +pub struct Directory; + +// use crate::transactions::{Transaction}; + +impl Directory { + + pub fn create_or_open(trx: Transaction, path: Option<&str>, layer: Option<&[u8]> ) -> bool { + false + } + +} diff --git a/foundationdb/src/directory/mod.rs b/foundationdb/src/directory/mod.rs new file mode 100644 index 00000000..fd4b4d4b --- /dev/null +++ b/foundationdb/src/directory/mod.rs @@ -0,0 +1,5 @@ +pub mod directory; + +pub use directory::*; + + diff --git a/foundationdb/src/lib.rs b/foundationdb/src/lib.rs index de15ee5b..09c6fa86 100644 --- a/foundationdb/src/lib.rs +++ b/foundationdb/src/lib.rs @@ -100,6 +100,7 @@ pub mod api; #[cfg(any(feature = "fdb-5_1", feature = "fdb-5_2", feature = "fdb-6_0"))] pub mod cluster; mod database; +mod directory; mod error; pub mod future; mod keyselector; @@ -117,6 +118,7 @@ pub use crate::error::FdbError; pub use crate::error::FdbResult; pub use crate::keyselector::*; pub use crate::transaction::*; +pub use crate::directory::*; /// Initialize the FoundationDB Client API, this can only be called once per process. /// diff --git a/foundationdb/tests/directory.rs b/foundationdb/tests/directory.rs new file mode 100644 index 00000000..e9193222 --- /dev/null +++ b/foundationdb/tests/directory.rs @@ -0,0 +1,28 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +use foundationdb::*; +use futures::future; +use futures::prelude::*; + +mod common; + +async fn test_create_or_open_async() -> FdbResult<()> { + let db = common::database().await?; + let trx = db.create_trx()?; + let out = Directory::create_or_open(trx); + assert!(out); + + Ok(()) +} + +#[test] +fn test_create_or_open() { + common::boot(); + futures::executor::block_on(test_create_or_open_async()).expect("failed to run"); +} + diff --git a/scripts/run_bindingtester.sh b/scripts/run_bindingtester.sh index 04a02c01..b0ff3085 100755 --- a/scripts/run_bindingtester.sh +++ b/scripts/run_bindingtester.sh @@ -6,7 +6,7 @@ fdb_rs_dir=$(pwd) bindingtester="${fdb_rs_dir:?}/$1" case $(uname) in Darwin) - brew install mono +# brew install mono ;; Linux) sudo apt update @@ -22,7 +22,7 @@ esac cd ${fdb_builddir:?} ## Get foundationdb source - git clone --depth 1 https://github.com/apple/foundationdb.git -b release-6.1 +# git clone --depth 1 https://github.com/apple/foundationdb.git -b release-6.1 cd foundationdb git checkout release-6.1 From 97e9d18643be87c6a0189f0be1c36fa294639b4d Mon Sep 17 00:00:00 2001 From: Garren Smith Date: Thu, 12 Mar 2020 16:58:37 +0200 Subject: [PATCH 2/4] initial basic work --- foundationdb/src/directory/directory.rs | 204 +++++++++++++++++- .../src/directory/directory_subspace.rs | 14 ++ foundationdb/src/directory/mod.rs | 43 ++++ foundationdb/src/directory/node.rs | 65 ++++++ 4 files changed, 322 insertions(+), 4 deletions(-) create mode 100644 foundationdb/src/directory/directory_subspace.rs create mode 100644 foundationdb/src/directory/node.rs diff --git a/foundationdb/src/directory/directory.rs b/foundationdb/src/directory/directory.rs index 286f2317..6c4b0179 100644 --- a/foundationdb/src/directory/directory.rs +++ b/foundationdb/src/directory/directory.rs @@ -1,13 +1,209 @@ use crate::Transaction; +use crate::tuple::{Subspace, pack_into, pack}; +use crate::tuple::hca::HighContentionAllocator; -pub struct Directory; +use super::*; +use std::result; +use crate::DirectoryError::Version; +use crate::directory::directory_subspace::{DirectorySubspaceResult, DirectorySubspace}; -// use crate::transactions::{Transaction}; +const LAYER_VERSION: (u8, u8, u8) = (1, 0, 0); +const MAJOR_VERSION: u32 = 1; +const MINOR_VERSION: u32 = 0; +const PATCH_VERSION: u32 = 0; +const DEFAULT_NODE_PREFIX:&[u8] = b"\xFE"; + +const SUBDIRS:u8 = 0; + +#[derive(PartialEq)] +enum PermissionLevel { + Read, + Write +} + +pub type DirectoryResult = result::Result; + +pub struct Directory { + node_prefix: Subspace, + content_prefix: Subspace, + + allow_manual_prefixes: bool, + + allocator: HighContentionAllocator, + root_node: Subspace, + + path: Vec, + layer: Vec, +} impl Directory { - pub fn create_or_open(trx: Transaction, path: Option<&str>, layer: Option<&[u8]> ) -> bool { - false + pub fn root() -> Directory { + Directory { + node_prefix: DEFAULT_NODE_PREFIX.into(), + content_prefix: Subspace::from_bytes(b""), + + allow_manual_prefixes: false, + + allocator: HighContentionAllocator::new(Subspace::from_bytes(b"hca")), + root_node: DEFAULT_NODE_PREFIX.into(), + + path: Vec::new(), + layer: Vec::new() + } + } + + pub fn contents_of_node(&self, node: Subspace, path: &[String], layer: &[u8]) -> DirectorySubspaceResult { + + + Ok(DirectorySubspace) + } + + // pub fn new(parent_node: Directory, path: &[String], layer: &[u8]) -> Directory { + // Directory { + // + // allow_manual_prefixes: true, + // + // allocator: HighContentionAllocator::new(Subspace::from_bytes(b"hca")), + // + // root_node: parent_node.node_prefix.clone(), + // path: path.to_vec(), + // layer: layer.to_vec(), + // + // } + // + // } + + pub async fn create_or_open(&self, trx: Transaction, path: &[&str], layer: &[u8], prefix: &[u8], allow_create: bool, allow_open: bool) -> DirectoryResult { + self.check_version(&trx, PermissionLevel::Read).await?; + + if prefix.len() > 0 && !self.allow_manual_prefixes { + if self.path.len() == 0 { + return Err(DirectoryError::Message("cannot specify a prefix unless manual prefixes are enabled".to_string())) + } + + return Err(DirectoryError::Message("cannot specify a prefix in a partition".to_string())) + } + + if path.len() == 0 { + return Err(DirectoryError::CannotOpenRoot) + } + + // FIND + + if !allow_create { + return Err(DirectoryError::NotExist) + } + + // self.initialize_directory(&trx); + + if prefix.len() == 0 { + // let new_subspace = self.allocator.allocate(&trx).await?; + // TODO: maybe check range and prefix free but I think the allocate does that already + } else { + let is_prefix_free = self.is_prefix_free(&trx, prefix).await?; + } + // + // if layer != self.get_layer() && layer != &[] { + // return Err(DirectoryError::LayerMismatch); + // } + + Ok(Directory::root()) + } + + // pub async fn find(&self, trx: Transaction, path: &[&str]) -> DirectoryResult { + // + // } + + // pub async fn initialize_directory(&self, trx: &Transaction) { + // let version = [MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION].to_le_bytes(); + // let version_subspace: &[u8] = b"version"; + // let version_key = self.root_node.subspace(&version_subspace); + // + // trx.set(version_key.bytes(), version).await; + // } + + async fn is_prefix_free(&self, trx: &Transaction, prefix: &[u8]) -> Result { + + if prefix.len() == 0 { + return Ok(false); + } + + Ok(true) + } + + + async fn check_version(&self, trx: &Transaction, perm_level: PermissionLevel ) -> Result<(), DirectoryError> { + let version_subspace: &[u8] = b"version"; + let version_key = self.root_node.subspace(&version_subspace); + let version_opt = trx.get(version_key.bytes(), false).await?; + + match version_opt { + None => { + if perm_level == PermissionLevel::Write { + //init + return Err(Version("fix soon".to_string())); + } + + Ok(()) + } + Some(versions) => { + if versions.len() < 12 { + return Err(Version("incorrect version length".to_string())); + } + let mut arr = [0u8; 4]; + arr.copy_from_slice(&versions[0..4]); + let major: u32 = u32::from_be_bytes(arr); + + arr.copy_from_slice(&versions[4..8]); + let minor: u32 = u32::from_be_bytes(arr); + + arr.copy_from_slice(&versions[8..12]); + let patch: u32 = u32::from_be_bytes(arr); + + if major > MAJOR_VERSION { + let msg = format!("cannot load directory with version {}.{}.{} using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION); + return Err(Version(msg)) + } + + if minor > MINOR_VERSION && perm_level == PermissionLevel::Write { + let msg = format!("directory with version {}.{}.{} is read-only when opened using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION); + return Err(Version(msg)) + } + + Ok(()) + } + } } + // pub async fn find(&self, trx: &Transaction, path: &[&str]) -> DirectoryResult { + // let mut node = Directory::root(); + // + // for path_name in path { + // let mut node_layer_id = vec!(SUBDIRS); + // pack_into(&path_name, &mut node_layer_id); + // let new_node = node.node_prefix.subspace(&node_layer_id); + // + // match trx.get(new_node.bytes(), false).await { + // Err(_) => { + // return Ok(node); + // } + // Result(node_name) => { + // let ss = node.node_with_prefix(key); + // node.node_prefix = ss; + // node.path.push(path_name.to_string()) + // } + // } + // } + // + // + // Ok(node) + // } + + + + pub fn get_layer(&self) -> &[u8] { + self.layer.as_slice() + } } + diff --git a/foundationdb/src/directory/directory_subspace.rs b/foundationdb/src/directory/directory_subspace.rs new file mode 100644 index 00000000..000ca8a3 --- /dev/null +++ b/foundationdb/src/directory/directory_subspace.rs @@ -0,0 +1,14 @@ +use crate::tuple::Subspace; +use crate::{Directory, DirectoryError}; +use std::result; + +pub type DirectorySubspaceResult = result::Result; + +pub struct DirectorySubspace { + subspace: Subspace, + dl: Directory, + path: Vec, + layer: Vec +} + +// TODO: impl { .. } \ No newline at end of file diff --git a/foundationdb/src/directory/mod.rs b/foundationdb/src/directory/mod.rs index fd4b4d4b..1c4d03f1 100644 --- a/foundationdb/src/directory/mod.rs +++ b/foundationdb/src/directory/mod.rs @@ -1,5 +1,48 @@ pub mod directory; +pub mod node; +pub mod directory_subspace; pub use directory::*; +use std::io; +use std::fmt::{self, Display}; +use crate::error; + + +#[derive(Debug)] +pub enum DirectoryError { + CannotOpenRoot, + LayerMismatch, + NotExist, + Message(String), + Version(String), + IoError(io::Error), + FdbError(error::FdbError) +} + +impl From for DirectoryError { + fn from(err: io::Error) -> Self { + DirectoryError::IoError(err) + } +} + +impl From for DirectoryError { + fn from(err: error::FdbError) -> Self { + DirectoryError::FdbError(err) + } +} + +impl Display for DirectoryError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + DirectoryError::CannotOpenRoot => write!(f, "Cannot open root directory"), + DirectoryError::LayerMismatch => write!(f, "Layer mismatch"), + DirectoryError::NotExist => write!(f, "Directory does not exist"), + DirectoryError::Version(s) => s.fmt(f), + DirectoryError::Message(s) => s.fmt(f), + DirectoryError::IoError(err) => err.fmt(f), + DirectoryError::FdbError(err) => err.fmt(f), + } + } +} diff --git a/foundationdb/src/directory/node.rs b/foundationdb/src/directory/node.rs new file mode 100644 index 00000000..c9135c3e --- /dev/null +++ b/foundationdb/src/directory/node.rs @@ -0,0 +1,65 @@ +use crate::tuple::{Subspace}; +use crate::future::FdbSlice; +use crate::{Transaction, Directory, DirectoryResult}; +use crate::directory::directory_subspace::DirectorySubspaceResult; + + +pub struct Node { + subspace: Subspace, + path: Vec, + target_path: Vec, + _layer: Option> +} + +impl Node { + + pub fn exists(&self) -> bool { + // if self.subspace == None { + // return false; + // } + + true + } + + pub async fn prefetchMetadata(&self, trx: &Transaction) -> &Node { + if self.exists() { + self.layer(trx).await; + } + + return self; + } + + pub async fn layer(&mut self, trx: &Transaction) -> &[u8] { + if self._layer == None { + let key = self.subspace.subspace(&b"layer".to_vec()); + self._layer = match trx.get(key.bytes(), false).await { + Ok(None) => Some(Vec::new()), + Err(_) => Some(Vec::new()), + Ok(Some(fv)) => Some(fv.to_vec()) + } + } + + return self._layer.unwrap().as_slice() + } + + pub async fn is_in_partition(&mut self, trx: Transaction, include_empty_subpath: bool) -> bool { + if !self.exists() { + return false + } + + self.layer(&trx).await == b"partition" && + (include_empty_subpath || self.target_path.len() > self.path.len()) + } + + pub fn get_partition_subpath(&self) -> &[String] { + self.target_path[..self.path.len()].clone() + } + + pub async fn get_contents(self, directory: Directory, trx: &Transaction ) -> DirectorySubspaceResult { + directory.contents_of_node(self.subspace, &self.path, &self.layer(trx).await) + } +} + + + + From fd9afee135fd5fd1d9fa1c70867114de6ead118c Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Thu, 24 Jun 2021 13:06:15 +0200 Subject: [PATCH 3/4] feat(foundationdb,bindingtester): Add directory Provides structures for managing directories in FoundationDB. The FoundationDB API provides directories as a tool for managing related Subspaces. Directories are a recommended approach for administering applications. Each application should create or open at least one directory to manage its subspaces. For general guidance on directory usage, see the discussion in the Developer Guide. Directories are identified by hierarchical paths analogous to the paths in a Unix-like file system. A path is represented as a List of strings. Each directory has an associated subspace used to store its content. The layer maps each path to a short prefix used for the corresponding subspace. In effect, directories provide a level of indirection for access to subspaces. --- .github/workflows/ci.yml | 2 +- foundationdb-bindingtester/Cargo.toml | 1 + foundationdb-bindingtester/README.md | 3 +- foundationdb-bindingtester/src/main.rs | 1101 +++++++++++++++++ foundationdb/Cargo.toml | 6 +- foundationdb/examples/class-scheduling.rs | 4 +- foundationdb/src/directory/directory.rs | 209 ---- foundationdb/src/directory/directory_layer.rs | 832 +++++++++++++ .../src/directory/directory_partition.rs | 243 ++++ .../src/directory/directory_subspace.rs | 252 +++- foundationdb/src/directory/error.rs | 73 ++ foundationdb/src/directory/mod.rs | 388 +++++- foundationdb/src/directory/node.rs | 126 +- foundationdb/src/future.rs | 13 +- foundationdb/src/lib.rs | 3 +- foundationdb/src/tuple/hca.rs | 9 + foundationdb/src/tuple/mod.rs | 12 +- foundationdb/tests/directory.rs | 73 +- foundationdb/tests/range.rs | 4 +- foundationdb/tests/tokio.rs | 4 +- foundationdb/tests/watch.rs | 4 +- scripts/run_bindingtester.sh | 11 +- 22 files changed, 3036 insertions(+), 337 deletions(-) delete mode 100644 foundationdb/src/directory/directory.rs create mode 100644 foundationdb/src/directory/directory_layer.rs create mode 100644 foundationdb/src/directory/directory_partition.rs create mode 100644 foundationdb/src/directory/error.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 15174b58..061291c0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] - toolchain: ["1.40.0", "stable"] + toolchain: ["1.51.0", "stable"] runs-on: ${{ matrix.os }} diff --git a/foundationdb-bindingtester/Cargo.toml b/foundationdb-bindingtester/Cargo.toml index 2144faa2..2136552b 100644 --- a/foundationdb-bindingtester/Cargo.toml +++ b/foundationdb-bindingtester/Cargo.toml @@ -37,3 +37,4 @@ futures = "0.3.0" log = "0.4.8" num-bigint = "0.3.0" structopt = "0.3.3" +async-trait = "0.1.48" diff --git a/foundationdb-bindingtester/README.md b/foundationdb-bindingtester/README.md index 2a89756f..c9e89b17 100644 --- a/foundationdb-bindingtester/README.md +++ b/foundationdb-bindingtester/README.md @@ -11,4 +11,5 @@ The following configurations are tested and should pass without any issue: ./bindingtester.py --test-name scripted ./bindingtester.py --num-ops 1000 --test-name api --api-version 610 ./bindingtester.py --num-ops 1000 --concurrency 5 --test-name api --api-version 610 -``` +./bindingtester.py --num-ops 10000 --concurrency 1 --test-name directory --api-version 610 --no-directory-snapshot-ops +``` \ No newline at end of file diff --git a/foundationdb-bindingtester/src/main.rs b/foundationdb-bindingtester/src/main.rs index 69f38e03..23219fca 100644 --- a/foundationdb-bindingtester/src/main.rs +++ b/foundationdb-bindingtester/src/main.rs @@ -27,13 +27,21 @@ static GOT_COMMITTED_VERSION: Element = static ERROR_NONE: Element = Element::Bytes(Bytes(Cow::Borrowed(b"ERROR: NONE"))); static ERROR_MULTIPLE: Element = Element::Bytes(Bytes(Cow::Borrowed(b"ERROR: MULTIPLE"))); static OK: Element = Element::Bytes(Bytes(Cow::Borrowed(b"OK"))); +static ERROR_DIRECTORY: Element = Element::Bytes(Bytes(Cow::Borrowed(b"DIRECTORY_ERROR"))); #[cfg(feature = "fdb-6_2")] static GOT_APPROXIMATE_SIZE: Element = Element::Bytes(Bytes(Cow::Borrowed(b"GOT_APPROXIMATE_SIZE"))); use crate::fdb::options::{MutationType, StreamingMode}; + +use foundationdb::directory::directory_layer::DirectoryLayer; +use foundationdb::directory::error::DirectoryError; +use foundationdb::directory::{Directory, DirectoryOutput}; +use foundationdb::tuple::{PackResult, TupleUnpack}; + use tuple::VersionstampOffset; + fn mutation_from_str(s: &str) -> MutationType { match s { "ADD" => MutationType::Add, @@ -95,6 +103,14 @@ impl std::fmt::Debug for Instr { } } +#[derive(Debug)] +enum DirectoryStackItem { + Directory(DirectoryLayer), + DirectoryOutput(DirectoryOutput), + Subspace(Subspace), + Null, +} + impl Instr { fn pop_database(&mut self) -> bool { if self.database { @@ -188,6 +204,39 @@ enum InstrCode { // misc UnitTests, + + // Directory/Subspace/Layer Creation + DirectoryCreateSubspace, + DirectoryCreateLayer, + DirectoryCreateOrOpen, + DirectoryCreate, + DirectoryOpen, + + // Directory Management + DirectoryChange, + DirectorySetErrorIndex, + + // Directory Operations + DirectoryMove, + DirectoryMoveTo, + DirectoryRemove, + DirectoryRemoveIfExists, + DirectoryList, + DirectoryExists, + + // Subspace operation + DirectoryPackKey, + DirectoryUnpackKey, + DirectoryRange, + DirectoryContains, + DirectoryOpenSubspace, + + // Directory Logging + DirectoryLogSubspace, + DirectoryLogDirectory, + + // Other + DirectoryStripPrefix, } fn has_opt<'a>(cmd: &'a str, opt: &'static str) -> (&'a str, bool) { @@ -263,6 +312,33 @@ impl Instr { "UNIT_TESTS" => UnitTests, + "DIRECTORY_CREATE_SUBSPACE" => DirectoryCreateSubspace, + "DIRECTORY_CREATE_LAYER" => DirectoryCreateLayer, + "DIRECTORY_CREATE_OR_OPEN" => DirectoryCreateOrOpen, + "DIRECTORY_CREATE" => DirectoryCreate, + "DIRECTORY_OPEN" => DirectoryOpen, + + "DIRECTORY_CHANGE" => DirectoryChange, + "DIRECTORY_SET_ERROR_INDEX" => DirectorySetErrorIndex, + + "DIRECTORY_MOVE" => DirectoryMove, + "DIRECTORY_MOVE_TO" => DirectoryMoveTo, + "DIRECTORY_REMOVE" => DirectoryRemove, + "DIRECTORY_REMOVE_IF_EXISTS" => DirectoryRemoveIfExists, + "DIRECTORY_LIST" => DirectoryList, + "DIRECTORY_EXISTS" => DirectoryExists, + + "DIRECTORY_PACK_KEY" => DirectoryPackKey, + "DIRECTORY_UNPACK_KEY" => DirectoryUnpackKey, + "DIRECTORY_RANGE" => DirectoryRange, + "DIRECTORY_CONTAINS" => DirectoryContains, + "DIRECTORY_OPEN_SUBSPACE" => DirectoryOpenSubspace, + + "DIRECTORY_LOG_SUBSPACE" => DirectoryLogSubspace, + "DIRECTORY_LOG_DIRECTORY" => DirectoryLogDirectory, + + "DIRECTORY_STRIP_PREFIX" => DirectoryStripPrefix, + name => unimplemented!("inimplemented instr: {}", name), }; Instr { @@ -462,6 +538,10 @@ struct StackMachine { threads: Vec>, trx_counter: usize, + + directory_stack: Vec, + directory_index: usize, + error_index: usize, } fn strinc(key: Bytes) -> Bytes { @@ -492,6 +572,10 @@ impl StackMachine { last_version: 0, threads: Vec::new(), trx_counter: 0, + + directory_stack: vec![DirectoryStackItem::Directory(DirectoryLayer::default())], + directory_index: 0, + error_index: 0, } } @@ -563,6 +647,41 @@ impl StackMachine { } } + async fn pop_optional_bytes(&mut self) -> Option> { + let element = self.pop_element().await; + match element { + Element::Bytes(v) => Some(v.to_vec()), + Element::Nil => None, + Element::String(_) => None, + Element::Tuple(_) => None, + Element::Int(_) => None, + Element::BigInt(_) => None, + Element::Float(_) => None, + Element::Double(_) => None, + Element::Bool(_) => None, + Element::Uuid(_) => None, + Element::Versionstamp(_) => None, + } + } + + async fn pop_string_tuple(&mut self, count: usize) -> Vec> { + let mut result = vec![]; + + if count == 0 { + result.push(vec![]); + } else { + for _i in 0..count { + let mut sub_result = vec![]; + let vec_size = self.pop_i64().await; + for _j in 0..vec_size { + sub_result.push(self.pop_str().await); + } + result.push(sub_result); + } + } + result + } + async fn pop_element(&mut self) -> Element<'static> { let item = self.pop().await; if let Some(data) = item.data { @@ -603,6 +722,27 @@ impl StackMachine { } } + fn push_directory_err(&mut self, code: &InstrCode, number: usize, err: DirectoryError) { + debug!("[{}] DIRECTORY_ERROR during {:?}: {:?}", number, code, err); + self.push(number, Element::Tuple(vec![ERROR_DIRECTORY.clone()])); + + if let InstrCode::DirectoryCreateSubspace + | InstrCode::DirectoryCreateOrOpen + | InstrCode::DirectoryCreateLayer + | InstrCode::DirectoryCreate + | InstrCode::DirectoryOpen + | InstrCode::DirectoryMove + | InstrCode::DirectoryMoveTo + | InstrCode::DirectoryOpenSubspace = code + { + debug!( + "pushed NULL in the directory_stack at index {} because of the error", + self.directory_stack.len() + ); + self.directory_stack.push(DirectoryStackItem::Null); + } + } + fn push_err(&mut self, number: usize, err: FdbError) { trace!("ERROR {:?}", err); let packed = pack(&Element::Tuple(vec![ @@ -634,6 +774,7 @@ impl StackMachine { let is_db = instr.pop_database(); let mut mutation = false; let mut pending = false; + let (mut trx, trx_name) = if is_db { ( TransactionState::Transaction(self.check(number, db.create_trx())?), @@ -1506,6 +1647,877 @@ impl StackMachine { // test_locality(db) // test_predicates() } + + // Pop 1 tuple off the stack as [path]. Pop 1 additional item as [raw_prefix]. + // Create a subspace with path as the prefix tuple and the specified + // raw_prefix. Append it to the directory list. + DirectoryCreateSubspace => { + let tuple_prefix = self.pop_string_tuple(1).await; + let raw_prefix = self.pop_bytes().await; + let subspace = + Subspace::from_bytes(&raw_prefix).subspace(tuple_prefix.get(0).unwrap()); + debug!( + "pushing a new subspace {:?} at index {}", + &subspace, + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::Subspace(subspace)); + } + + // Pop 3 items off the stack as [index1, index2, allow_manual_prefixes]. Let + // node_subspace be the object in the directory list at index1 and + // content_subspace be the object in the directory list at index2. Create a new + // directory layer with the specified node_subspace and content_subspace. If + // allow_manual_prefixes is 1, then enable manual prefixes on the directory + // layer. Append the resulting directory layer to the directory list. + // + // If either of the two specified subspaces are null, then do not create a + // directory layer and instead push null onto the directory list. + DirectoryCreateLayer => { + let index_node_subspace = self.pop_usize().await; + let index_content_subspace = self.pop_usize().await; + let allow_manual_prefixes = self.pop_i64().await == 1; + + let node_subspace = self.directory_stack.get(index_node_subspace); + let content_subspace = self.directory_stack.get(index_content_subspace); + + if node_subspace.is_none() || content_subspace.is_none() { + error!( + "pushing null on the directory list: {}, {}", + node_subspace.is_some(), + content_subspace.is_some() + ); + self.directory_stack.push(DirectoryStackItem::Null); + } else { + let node_subspace = match node_subspace.unwrap() { + DirectoryStackItem::Subspace(s) => s.to_owned(), + _ => panic!("expecting subspace"), + }; + + let content_subspace = match content_subspace.unwrap() { + DirectoryStackItem::Subspace(s) => s.to_owned(), + _ => panic!("expecting subspace"), + }; + + debug!("pushed a directory at index {}", self.directory_stack.len()); + + self.directory_stack + .push(DirectoryStackItem::Directory(DirectoryLayer::new( + node_subspace, + content_subspace, + allow_manual_prefixes, + ))); + } + } + + // Pop 1 tuple off the stack as [path]. Pop 2 additional items as + // [layer, prefix]. create a directory with the specified path, layer, + // and prefix. If either of layer or prefix is null, use the default value for + // that parameter (layer='', prefix=null). + DirectoryCreate => { + let path = self.pop_string_tuple(1).await; + let layer = self.pop_optional_bytes().await; + let prefix = self.pop_optional_bytes().await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "creating path {:?} with layer {:?} and prefix {:?} using directory at index {}", + path.get(0).unwrap(), + &layer, + &prefix, + self.directory_index, + ); + + match directory + .create( + txn, + (*path.get(0).unwrap().to_owned()).to_vec(), + prefix, + layer, + ) + .await + { + Ok(directory_subspace) => { + debug!( + "pushing DirectoryOutput at index {}", + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(directory_subspace)); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Pop 1 tuple off the stack as [path]. Pop 1 additional item as [layer]. Open + // a directory with the specified path and layer. If layer is null, use the + // default value (layer=''). + DirectoryOpen => { + let path = self.pop_string_tuple(1).await; + let bytes_layer = self.pop_bytes().await; + + let layer = if bytes_layer.is_empty() { + None + } else { + Some(bytes_layer.to_vec()) + }; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "opening path {:?} with layer {:?} with index {}", + path.get(0).unwrap(), + &layer, + self.directory_index + ); + + match directory + .open(txn, (*path.get(0).unwrap().to_owned()).to_vec(), layer) + .await + { + Ok(directory_subspace) => { + debug!( + "pushing newly opened DirectoryOutput at index {}", + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(directory_subspace)); + + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [path]. Pop 1 additional item as [layer]. Open + // a directory with the specified path and layer. If layer is null, use the + // default value (layer=''). + DirectoryCreateOrOpen => { + let path = self.pop_string_tuple(1).await; + let bytes_layer = self.pop_bytes().await; + + let layer = if bytes_layer.is_empty() { + None + } else { + Some(bytes_layer.to_vec()) + }; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "create_or_open path {:?} with layer {:?} with index {}", + path.get(0).unwrap(), + &layer, + self.directory_index + ); + match directory + .create_or_open( + txn, + (*path.get(0).unwrap().to_owned()).to_vec(), + None, + layer, + ) + .await + { + Ok(directory_subspace) => { + debug!( + "pushing created_or_opened {:?} at index {}", + &directory_subspace, + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(directory_subspace)); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => self.push_directory_err(&instr.code, number, e), + }; + } + + // Pop the top item off the stack as [index]. Set the current directory list + // index to index. In the event that the directory at this new index is null + // (as the result of a previous error), set the directory list index to the + // error index. + DirectoryChange => { + self.directory_index = self.pop_usize().await; + debug!("setting directory_index to {}", self.directory_index); + match self.directory_stack.get(self.directory_index) { + None => { + self.directory_index = self.error_index; + debug!( + "setting directory_index to error index {}: no directory found", + self.directory_index + ); + } + Some(d) => match d { + DirectoryStackItem::Null => { + self.directory_index = self.error_index; + debug!( + "setting directory_index to error index {}: because it is Null", + self.directory_index + ); + } + _ => {} + }, + } + } + + // Pop the top item off the stack as [error_index]. Set the current error index + // to error_index. + DirectorySetErrorIndex => { + self.error_index = self.pop_usize().await; + } + + // Use the current directory for this operation. + // + // Pop 2 tuples off the stack as [old_path, new_path]. Call move with the + // specified old_path and new_path. Append the result onto the directory list. + DirectoryMove => { + let paths = self.pop_string_tuple(2).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "moving {:?} to {:?} using directory at index {}", + paths.get(0).unwrap(), + paths.get(1).unwrap(), + self.directory_index + ); + + match directory + .move_to( + txn, + (*paths.get(0).unwrap().to_vec()).to_owned(), + (*paths.get(1).unwrap().to_vec()).to_owned(), + ) + .await + { + Ok(s) => { + debug!( + "pushing moved directory {:?} at index {}", + &s, + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(s)); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [new_absolute_path]. Call moveTo with the + // specified new_absolute_path. Append the result onto the directory list. + DirectoryMoveTo => { + let paths = self.pop_string_tuple(1).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + debug!( + "moving directory {:?} to {:?}", + self.directory_index, &paths + ); + + match directory + .move_directory(txn, (*paths.get(0).unwrap().to_vec()).to_owned()) + .await + { + Ok(s) => { + debug!( + "pushing moved directory {:?} at index {}", + &s, + self.directory_stack.len() + ); + self.directory_stack + .push(DirectoryStackItem::DirectoryOutput(s)); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [count] (either 0 or 1). If count is 1, pop 1 + // tuple off the stack as [path]. Call remove_if_exits, passing it path if one + // was popped. + DirectoryRemove => { + let count = self.pop_usize().await; + let paths = self.pop_string_tuple(count).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + let paths = paths.get(0).expect("could not retrieve a path"); + debug!( + "removing path {:?} using directory at index {}", + paths, self.directory_index + ); + match directory.remove(txn, paths.to_owned()).await { + Ok(deleted) => { + if !deleted { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from("directory does not exists")), + ); + } else if is_db { + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + } + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [count] (either 0 or 1). If count is 1, pop 1 + // tuple off the stack as [path]. Call remove_if_exits, passing it path if one + // was popped. + DirectoryRemoveIfExists => { + let count = self.pop_usize().await; + let paths = self.pop_string_tuple(count).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + let paths = paths.get(0).expect("could not retrieve a path"); + match directory.remove_if_exists(txn, paths.to_owned()).await { + Ok(_) => { + if is_db { + local_trx.commit().await.expect("could not commit"); + } + } + Err(err) => self.push_directory_err(&instr.code, number, err), + }; + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [count] (either 0 or 1). If count is 1, pop 1 + // tuple off the stack as [path]. Call list, passing it path if one was popped. + // Pack the resulting list of directories using the tuple layer and push the + // packed string onto the stack. + DirectoryList => { + let count = self.pop_usize().await; + let paths = match count { + 1 => { + let paths = self.pop_string_tuple(count).await; + paths.get(0).expect("could not retrieve a path").clone() + } + 0 => vec![], + _ => panic!(), + }; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + match directory.list(txn, paths.to_vec()).await { + Ok(children) => { + let mut elements: Vec = vec![]; + debug!( + "found {} items under path {:?} with directory at index {}:", + children.len(), + paths, + self.directory_index + ); + for child in children { + debug!("\t - {}", &child); + let element = Element::String(Cow::from(child)); + elements.push(element); + } + let tuple = Element::Tuple(elements); + self.push(number, Element::Bytes(Bytes::from(tuple.pack_to_vec()))); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [count] (either 0 or 1). If count is 1, pop 1 + // tuple off the stack as [path]. Call exists, passing it path if one + // was popped. Push 1 onto the stack if the path exists and 0 if it does not. + DirectoryExists => { + let count = self.pop_usize().await; + + let paths = self.pop_string_tuple(count).await; + + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let local_trx = db.create_trx().unwrap(); + let txn = match is_db { + true => &local_trx, + false => match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }, + }; + + let paths = paths.get(0).expect("could not retrieve a path"); + match directory.exists(txn, paths.to_owned()).await { + Ok(exists) => { + self.push(number, Element::Int(i64::from(exists))); + if is_db { + debug!("commiting local trx"); + local_trx.commit().await.expect("could not commit"); + } + } + Err(e) => { + self.push_directory_err(&instr.code, number, e); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [key_tuple]. Pack key_tuple and push the result + // onto the stack. + DirectoryPackKey => { + let n: usize = self.pop_usize().await; + debug!("DirectoryPackKey {}", n); + let mut buf = Vec::new(); + for _ in 0..n { + let element: Element = self.pop_element().await; + debug!(" - {:?}", element); + buf.push(element); + } + + let tuple = Element::Tuple(buf); + + match self.pack_with_current_subspace(&tuple) { + None => self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from("cannot pack with current item")), + ), + Some(bytes) => { + self.push(number, Element::Bytes(bytes.into())); + } + } + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [key]. Unpack key and push the resulting tuple + // onto the stack one item at a time. + DirectoryUnpackKey => { + let data = self.pop_bytes().await; + let data = data.to_vec(); + debug!("directory_unpack {:?}", data); + let data: Vec = self.unpack_with_current_subspace(&data).unwrap().unwrap(); + for element in data { + debug!(" - {:?}", element); + self.push(number, Element::Tuple(vec![element.into_owned()])); + } + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [tuple]. Create a range using tuple and push + // range.begin and range.end onto the stack. + DirectoryRange => { + let n: usize = self.pop_usize().await; + let mut buf = Vec::new(); + for _ in 0..n { + let element: Element = self.pop_element().await; + debug!(" - {:?}", element); + buf.push(element); + } + + match self.get_current_directory_item() { + Some(DirectoryStackItem::DirectoryOutput( + DirectoryOutput::DirectoryPartition(_d), + )) => { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from( + "operation not allowed on directoryPartition", + )), + ); + } + _ => { + let tuple = Element::Tuple(buf); + let subspace = self.subspace_with_current_item(&tuple).unwrap(); + let (begin_range, end_range) = subspace.range(); + self.push(number, Element::Bytes(begin_range.into())); + self.push(number, Element::Bytes(end_range.into())); + } + } + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [key]. Check if the current directory contains + // the specified key. Push 1 if it does and 0 if it doesn't. + DirectoryContains => { + let raw_prefix = self.pop_bytes().await; + let b = match self.get_current_directory_item() { + None => panic!("not found"), + Some(DirectoryStackItem::Subspace(s)) => s.is_start_of(&raw_prefix.to_vec()), + Some(DirectoryStackItem::DirectoryOutput(d)) => match d { + DirectoryOutput::DirectorySubspace(s) => { + s.is_start_of(&raw_prefix.to_vec()) + } + _ => panic!("not a DirectorySubspace"), + }, + _ => panic!("not found"), + }; + + self.push(number, Element::Int(b as i64)); + } + + // Use the current directory for this operation. + // + // Pop 1 tuple off the stack as [tuple]. Open the subspace of the current + // directory specified by tuple and push it onto the directory list. + DirectoryOpenSubspace => { + let n: usize = self.pop_usize().await; + debug!("DirectoryRange {}", n); + let mut buf = Vec::new(); + for _ in 0..n { + let element: Element = self.pop_element().await; + debug!(" - {:?}", element); + buf.push(element); + } + + let tuple = Element::Tuple(buf); + self.directory_stack.push(DirectoryStackItem::Subspace( + self.subspace_with_current_item(&tuple).unwrap(), + )); + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [prefix]. Let key equal + // prefix + tuple.pack([dir_index]). Set key to be the result of calling + // directory.key() in the current transaction. + DirectoryLogSubspace => { + debug!( + "logging subspace {}/{}", + self.directory_index, + self.directory_stack.len() + ); + let raw_prefix = self.pop_bytes().await; + let txn = match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }; + let key = Subspace::from_bytes(&*raw_prefix).pack(&self.directory_index); + + match self.directory_stack.get(self.directory_index) { + None => panic!("nothing in the stack"), + Some(DirectoryStackItem::Null) => panic!("Directory is NULL"), + Some(DirectoryStackItem::Directory(_)) => { + panic!("trying to get a subspace, got a Directory") + } + Some(DirectoryStackItem::DirectoryOutput( + DirectoryOutput::DirectorySubspace(d), + )) => { + txn.set(&key, d.bytes()); + debug!( + "logging subspace [{}] {:?}={:?}", + self.directory_index, + unpack::>(&key).unwrap(), + d.bytes(), + ); + } + Some(DirectoryStackItem::DirectoryOutput( + DirectoryOutput::DirectoryPartition(_), + )) => { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from( + "cannot get key for the root of a directory partition", + )), + ); + } + Some(DirectoryStackItem::Subspace(s)) => { + txn.set(&key, s.bytes()); + debug!( + "logging subspace [{}] {:?}={:?}", + self.directory_index, + unpack::>(&key).unwrap(), + s.bytes(), + ); + } + }; + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [raw_prefix]. Create a subspace log_subspace + // with path (dir_index) and the specified raw_prefix. Set: + // + // tr[log_subspace[u'path']] = the tuple packed path of the directory. + // + // tr[log_subspace[u'layer']] = the tuple packed layer of the directory. + // + // tr[log_subspace[u'exists']] = the packed tuple containing a 1 if the + // directory exists and 0 if it doesn't. + // + // tr[log_subspace[u'children']] the tuple packed list of children of the + // directory. + // + // Where log_subspace[u] is the subspace packed tuple containing only the + // single specified unicode string . + DirectoryLogDirectory => { + debug!("logging directory {}", self.directory_index); + let directory = self + .get_current_directory() + .expect("could not find a directory"); + + let txn = match trx { + TransactionState::Transaction(ref t) => t, + _ => { + panic!("could not find an active transaction"); + } + }; + + let raw_prefix = self.pop_bytes().await; + let subspace = Subspace::from_bytes(&*raw_prefix).subspace(&(self.directory_index)); + + let key_path = subspace.pack(&(String::from("path"))); + let value_path = pack(&self.get_path_for_current_directory().unwrap()); + + let key_layer = subspace.pack(&("layer")); + let value_layer = pack(&self.get_layer_for_current_directory().unwrap()); + + let exists = directory + .exists(&txn, vec![]) + .await + .expect("could not list directory"); + + let key_exists = subspace.pack(&(String::from("exists"))); + let value_exists = pack(&Element::Tuple(vec![Element::Int(match exists { + true => 1, + false => 0, + })])); + + let children = if exists { + directory.list(txn, vec![]).await.unwrap() + } else { + vec![] + }; + + let tuple_children = Element::Tuple( + children + .iter() + .map(|s| Element::String(Cow::Owned(s.clone()))) + .collect(), + ); + + let key_children = subspace.pack(&(String::from("children"))); + let value_children = pack(&tuple_children); + + txn.set(&key_path, &value_path); + debug!( + "[{}] {:?}={:?}", + self.directory_index, + unpack::>(&key_path).unwrap(), + unpack::>(&value_path).unwrap(), + ); + txn.set(&key_layer, &value_layer); + debug!( + "[{}] {:?}={:?}", + self.directory_index, + unpack::>(&key_layer).unwrap(), + unpack::>(&value_layer).unwrap(), + ); + txn.set(&key_exists, &value_exists); + debug!( + "[{}] {:?}={:?}", + self.directory_index, + unpack::>(&key_exists).unwrap(), + unpack::>(&value_exists).unwrap(), + ); + txn.set(&key_children, &value_children); + debug!( + "[{}] {:?}={:?}", + self.directory_index, + unpack::>(&key_children).unwrap(), + unpack::>(&value_children).unwrap(), + ); + } + + // Use the current directory for this operation. + // + // Pop 1 item off the stack as [byte_array]. Call .key() on the current + // subspace and store the result as [prefix]. Throw an error if the popped + // array does not start with prefix. Otherwise, remove the prefix from the + // popped array and push the result onto the stack. + DirectoryStripPrefix => match self.pop_optional_bytes().await { + None => { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Other(String::from("bad input on bytes")), + ); + } + Some(raw_prefix) => { + let ssb = self.get_bytes_for_current_directory().unwrap(); + if !raw_prefix.to_vec().starts_with(&ssb) { + self.push_directory_err( + &instr.code, + number, + DirectoryError::Version(String::from( + "String does not start with raw prefix", + )), + ); + } else { + self.push( + number, + Element::Bytes(Bytes::from(raw_prefix[ssb.len()..].to_owned())), + ); + } + } + }, } if is_db && pending { @@ -1546,6 +2558,95 @@ impl StackMachine { handle.join().expect("joined thread to not panic"); } } + + fn get_current_directory_item(&mut self) -> Option<&DirectoryStackItem> { + self.directory_stack.get(self.directory_index) + } + + fn get_current_directory(&self) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::Directory(d) => Some(Box::new(d.clone())), + DirectoryStackItem::DirectoryOutput(d) => Some(Box::new((*d).clone())), + _ => None, + }, + } + } + + fn pack_with_current_subspace(&self, v: &T) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(DirectoryOutput::DirectorySubspace(d)) => { + Some(d.pack(v)) + } + DirectoryStackItem::Subspace(d) => Some(d.pack(v)), + _ => None, + }, + } + } + + fn unpack_with_current_subspace<'de, T: TupleUnpack<'de>>( + &self, + key: &'de [u8], + ) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(d) => Some(d.unpack(key)), + DirectoryStackItem::Subspace(d) => Some(d.unpack(key)), + _ => None, + }, + } + } + + fn subspace_with_current_item(&self, t: &T) -> Option { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(d) => Some(d.subspace(t)), + DirectoryStackItem::Subspace(d) => Some(d.subspace(t)), + _ => None, + }, + } + } + + fn get_path_for_current_directory(&self) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::Directory(d) => Some(d.get_path()), + DirectoryStackItem::DirectoryOutput(d) => Some(d.get_path()), + _ => None, + }, + } + } + + fn get_layer_for_current_directory(&self) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(d) => Some(d.get_layer()), + DirectoryStackItem::Directory(_) => Some(vec![]), + _ => None, + }, + } + } + + fn get_bytes_for_current_directory(&self) -> Option> { + match self.directory_stack.get(self.directory_index) { + None => None, + Some(directory_or_subspace) => match directory_or_subspace { + DirectoryStackItem::DirectoryOutput(d) => match d { + DirectoryOutput::DirectorySubspace(s) => Some(Vec::from(s.bytes())), + DirectoryOutput::DirectoryPartition(_) => None, + }, + DirectoryStackItem::Subspace(subspace) => Some(Vec::from(subspace.bytes())), + _ => None, + }, + } + } } fn main() { diff --git a/foundationdb/Cargo.toml b/foundationdb/Cargo.toml index a244ca0c..d68110e2 100644 --- a/foundationdb/Cargo.toml +++ b/foundationdb/Cargo.toml @@ -43,15 +43,17 @@ foundationdb-gen = { version = "0.5.1", path = "../foundationdb-gen", default-fe [dependencies] foundationdb-sys = { version = "0.5.1", path = "../foundationdb-sys", default-features = false } -futures = "0.3.1" +futures = "0.3.14" memchr = "2.2.1" rand = { version = "0.7.2", features = ["default", "small_rng"] } static_assertions = "1.1.0" uuid = { version = "0.8.1", optional = true } num-bigint = { version = "0.3.0", optional = true } +byteorder = "1.3.2" +async-trait = "0.1.48" +async-recursion = "0.3.2" [dev-dependencies] -byteorder = "1.3.2" lazy_static = "1.4.0" log = "0.4.8" tokio = { version = "0.2.9", features = ["rt-core", "rt-threaded", "macros"] } diff --git a/foundationdb/examples/class-scheduling.rs b/foundationdb/examples/class-scheduling.rs index c0a80cfa..1f412849 100644 --- a/foundationdb/examples/class-scheduling.rs +++ b/foundationdb/examples/class-scheduling.rs @@ -297,7 +297,7 @@ async fn simulate_students(student_id: usize, num_ops: usize) { for _ in 0..num_ops { let mut moods = Vec::::new(); - if my_classes.len() > 0 { + if !my_classes.is_empty() { moods.push(Mood::Ditch); moods.push(Mood::Switch); } @@ -306,7 +306,7 @@ async fn simulate_students(student_id: usize, num_ops: usize) { moods.push(Mood::Add); } - let mood = moods.choose(&mut rng).map(|mood| *mood).unwrap(); + let mood = moods.choose(&mut rng).copied().unwrap(); // on errors we recheck for available classes if perform_op( diff --git a/foundationdb/src/directory/directory.rs b/foundationdb/src/directory/directory.rs deleted file mode 100644 index 6c4b0179..00000000 --- a/foundationdb/src/directory/directory.rs +++ /dev/null @@ -1,209 +0,0 @@ -use crate::Transaction; -use crate::tuple::{Subspace, pack_into, pack}; -use crate::tuple::hca::HighContentionAllocator; - -use super::*; -use std::result; -use crate::DirectoryError::Version; -use crate::directory::directory_subspace::{DirectorySubspaceResult, DirectorySubspace}; - -const LAYER_VERSION: (u8, u8, u8) = (1, 0, 0); -const MAJOR_VERSION: u32 = 1; -const MINOR_VERSION: u32 = 0; -const PATCH_VERSION: u32 = 0; -const DEFAULT_NODE_PREFIX:&[u8] = b"\xFE"; - -const SUBDIRS:u8 = 0; - -#[derive(PartialEq)] -enum PermissionLevel { - Read, - Write -} - -pub type DirectoryResult = result::Result; - -pub struct Directory { - node_prefix: Subspace, - content_prefix: Subspace, - - allow_manual_prefixes: bool, - - allocator: HighContentionAllocator, - root_node: Subspace, - - path: Vec, - layer: Vec, -} - -impl Directory { - - pub fn root() -> Directory { - Directory { - node_prefix: DEFAULT_NODE_PREFIX.into(), - content_prefix: Subspace::from_bytes(b""), - - allow_manual_prefixes: false, - - allocator: HighContentionAllocator::new(Subspace::from_bytes(b"hca")), - root_node: DEFAULT_NODE_PREFIX.into(), - - path: Vec::new(), - layer: Vec::new() - } - } - - pub fn contents_of_node(&self, node: Subspace, path: &[String], layer: &[u8]) -> DirectorySubspaceResult { - - - Ok(DirectorySubspace) - } - - // pub fn new(parent_node: Directory, path: &[String], layer: &[u8]) -> Directory { - // Directory { - // - // allow_manual_prefixes: true, - // - // allocator: HighContentionAllocator::new(Subspace::from_bytes(b"hca")), - // - // root_node: parent_node.node_prefix.clone(), - // path: path.to_vec(), - // layer: layer.to_vec(), - // - // } - // - // } - - pub async fn create_or_open(&self, trx: Transaction, path: &[&str], layer: &[u8], prefix: &[u8], allow_create: bool, allow_open: bool) -> DirectoryResult { - self.check_version(&trx, PermissionLevel::Read).await?; - - if prefix.len() > 0 && !self.allow_manual_prefixes { - if self.path.len() == 0 { - return Err(DirectoryError::Message("cannot specify a prefix unless manual prefixes are enabled".to_string())) - } - - return Err(DirectoryError::Message("cannot specify a prefix in a partition".to_string())) - } - - if path.len() == 0 { - return Err(DirectoryError::CannotOpenRoot) - } - - // FIND - - if !allow_create { - return Err(DirectoryError::NotExist) - } - - // self.initialize_directory(&trx); - - if prefix.len() == 0 { - // let new_subspace = self.allocator.allocate(&trx).await?; - // TODO: maybe check range and prefix free but I think the allocate does that already - } else { - let is_prefix_free = self.is_prefix_free(&trx, prefix).await?; - } - // - // if layer != self.get_layer() && layer != &[] { - // return Err(DirectoryError::LayerMismatch); - // } - - Ok(Directory::root()) - } - - // pub async fn find(&self, trx: Transaction, path: &[&str]) -> DirectoryResult { - // - // } - - // pub async fn initialize_directory(&self, trx: &Transaction) { - // let version = [MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION].to_le_bytes(); - // let version_subspace: &[u8] = b"version"; - // let version_key = self.root_node.subspace(&version_subspace); - // - // trx.set(version_key.bytes(), version).await; - // } - - async fn is_prefix_free(&self, trx: &Transaction, prefix: &[u8]) -> Result { - - if prefix.len() == 0 { - return Ok(false); - } - - Ok(true) - } - - - async fn check_version(&self, trx: &Transaction, perm_level: PermissionLevel ) -> Result<(), DirectoryError> { - let version_subspace: &[u8] = b"version"; - let version_key = self.root_node.subspace(&version_subspace); - let version_opt = trx.get(version_key.bytes(), false).await?; - - match version_opt { - None => { - if perm_level == PermissionLevel::Write { - //init - return Err(Version("fix soon".to_string())); - } - - Ok(()) - } - Some(versions) => { - if versions.len() < 12 { - return Err(Version("incorrect version length".to_string())); - } - let mut arr = [0u8; 4]; - arr.copy_from_slice(&versions[0..4]); - let major: u32 = u32::from_be_bytes(arr); - - arr.copy_from_slice(&versions[4..8]); - let minor: u32 = u32::from_be_bytes(arr); - - arr.copy_from_slice(&versions[8..12]); - let patch: u32 = u32::from_be_bytes(arr); - - if major > MAJOR_VERSION { - let msg = format!("cannot load directory with version {}.{}.{} using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION); - return Err(Version(msg)) - } - - if minor > MINOR_VERSION && perm_level == PermissionLevel::Write { - let msg = format!("directory with version {}.{}.{} is read-only when opened using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION); - return Err(Version(msg)) - } - - Ok(()) - } - } - } - - // pub async fn find(&self, trx: &Transaction, path: &[&str]) -> DirectoryResult { - // let mut node = Directory::root(); - // - // for path_name in path { - // let mut node_layer_id = vec!(SUBDIRS); - // pack_into(&path_name, &mut node_layer_id); - // let new_node = node.node_prefix.subspace(&node_layer_id); - // - // match trx.get(new_node.bytes(), false).await { - // Err(_) => { - // return Ok(node); - // } - // Result(node_name) => { - // let ss = node.node_with_prefix(key); - // node.node_prefix = ss; - // node.path.push(path_name.to_string()) - // } - // } - // } - // - // - // Ok(node) - // } - - - - pub fn get_layer(&self) -> &[u8] { - self.layer.as_slice() - } -} - diff --git a/foundationdb/src/directory/directory_layer.rs b/foundationdb/src/directory/directory_layer.rs new file mode 100644 index 00000000..241e008b --- /dev/null +++ b/foundationdb/src/directory/directory_layer.rs @@ -0,0 +1,832 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! The default Directory implementation. + +use crate::directory::directory_partition::DirectoryPartition; +use crate::directory::directory_subspace::DirectorySubspace; +use crate::directory::error::DirectoryError; +use crate::directory::node::Node; +use crate::directory::{compare_slice, strinc, Directory, DirectoryOutput}; +use crate::future::FdbSlice; +use crate::tuple::hca::HighContentionAllocator; +use crate::tuple::{Element, Subspace, TuplePack}; +use crate::RangeOption; +use crate::{FdbResult, Transaction}; +use async_recursion::async_recursion; +use async_trait::async_trait; +use byteorder::{LittleEndian, WriteBytesExt}; + +use std::cmp::Ordering; + +use std::ops::Deref; +use std::option::Option::Some; +use std::sync::Arc; + +pub(crate) const DEFAULT_SUB_DIRS: i64 = 0; +const MAJOR_VERSION: u32 = 1; +const MINOR_VERSION: u32 = 0; +const PATCH_VERSION: u32 = 0; +pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE"; +const DEFAULT_HCA_PREFIX: &[u8] = b"hca"; +pub(crate) const PARTITION_LAYER: &[u8] = b"partition"; +pub(crate) const LAYER_SUFFIX: &[u8] = b"layer"; + +/// A DirectoryLayer defines a new root directory. +/// The node subspace and content subspace control where the directory metadata and contents, +/// respectively, are stored. The default root directory has a node subspace with raw prefix \xFE +/// and a content subspace with no prefix. +#[derive(Clone)] +pub struct DirectoryLayer { + pub(crate) inner: Arc, +} + +impl std::fmt::Debug for DirectoryLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +#[derive(Debug)] +pub struct DirectoryLayerInner { + pub(crate) root_node: Subspace, + pub(crate) node_subspace: Subspace, + pub(crate) content_subspace: Subspace, + pub(crate) allocator: HighContentionAllocator, + pub(crate) allow_manual_prefixes: bool, + + pub(crate) path: Vec, +} + +impl Deref for DirectoryLayer { + type Target = DirectoryLayerInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl Default for DirectoryLayer { + /// The default root directory stores directory layer metadata in keys beginning with 0xFE, + ///and allocates newly created directories in (unused) prefixes starting with 0x00 through 0xFD. + ///This is appropriate for otherwise empty databases, but may conflict with other formal or informal partitionings of keyspace. + /// If you already have other content in your database, you may wish to use NewDirectoryLayer to + /// construct a non-standard root directory to control where metadata and keys are stored. + fn default() -> Self { + Self::new( + Subspace::from_bytes(DEFAULT_NODE_PREFIX), + Subspace::all(), + false, + ) + } +} + +impl DirectoryLayer { + pub fn new( + node_subspace: Subspace, + content_subspace: Subspace, + allow_manual_prefixes: bool, + ) -> Self { + let root_node = node_subspace.subspace(&node_subspace.bytes()); + + DirectoryLayer { + inner: Arc::new(DirectoryLayerInner { + root_node: root_node.clone(), + node_subspace, + content_subspace, + allocator: HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX)), + allow_manual_prefixes, + path: vec![], + }), + } + } + + pub(crate) fn new_with_path( + node_subspace: Subspace, + content_subspace: Subspace, + allow_manual_prefixes: bool, + path: Vec, + ) -> Self { + let root_node = node_subspace.subspace(&node_subspace.bytes()); + + DirectoryLayer { + inner: Arc::new(DirectoryLayerInner { + root_node: root_node.clone(), + node_subspace, + content_subspace, + allocator: HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX)), + allow_manual_prefixes, + path, + }), + } + } + + pub fn get_path(&self) -> Vec { + self.path.clone() + } + + fn node_with_optional_prefix(&self, prefix: Option) -> Option { + match prefix { + None => None, + Some(fdb_slice) => Some(self.node_with_prefix(&(&*fdb_slice))), + } + } + + fn node_with_prefix(&self, prefix: &T) -> Subspace { + self.inner.node_subspace.subspace(prefix) + } + + async fn find(&self, trx: &Transaction, path: Vec) -> Result { + let mut node = Node { + subspace: Some(self.root_node.clone()), + current_path: vec![], + target_path: path.clone(), + layer: vec![], + loaded_metadata: false, + directory_layer: self.clone(), + }; + + // walking through the provided path + for path_name in path.iter() { + node.current_path.push(path_name.clone()); + let node_subspace = match node.subspace { + // unreachable because on first iteration, it is set to root_node, + // on other iteration, `node.exists` is checking for the subspace's value + None => unreachable!("node's subspace is not set"), + Some(s) => s, + }; + let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned())); + + // finding the next node + let fdb_slice_value = trx.get(key.bytes(), false).await?; + + node = Node { + subspace: self.node_with_optional_prefix(fdb_slice_value), + current_path: node.current_path.clone(), + target_path: path.clone(), + layer: vec![], + loaded_metadata: false, + directory_layer: self.clone(), + }; + + node.load_metadata(&trx).await?; + + if !node.exists() || node.layer.eq(PARTITION_LAYER) { + return Ok(node); + } + } + + if !node.loaded_metadata { + node.load_metadata(&trx).await?; + } + + Ok(node) + } + + fn to_absolute_path(&self, sub_path: &[String]) -> Vec { + let mut path: Vec = Vec::with_capacity(self.path.len() + sub_path.len()); + + path.extend_from_slice(&self.path); + path.extend_from_slice(sub_path); + + path + } + + pub(crate) fn contents_of_node( + &self, + node: Subspace, + path: Vec, + layer: Vec, + ) -> Result { + let prefix: Vec = self.node_subspace.unpack(node.bytes())?; + + if layer.eq(PARTITION_LAYER) { + Ok(DirectoryOutput::DirectoryPartition( + DirectoryPartition::new(self.to_absolute_path(&path), prefix, self.clone()), + )) + } else { + Ok(DirectoryOutput::DirectorySubspace(DirectorySubspace::new( + self.to_absolute_path(&path), + prefix, + self, + layer, + ))) + } + } + + /// `create_or_open_internal` is the function used to open and/or create a directory. + #[async_recursion] + async fn create_or_open_internal( + &self, + trx: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + allow_create: bool, + allow_open: bool, + ) -> Result { + self.check_version(trx, allow_create).await?; + + if prefix.is_some() && !self.allow_manual_prefixes { + if self.path.is_empty() { + return Err(DirectoryError::PrefixNotAllowed); + } + + return Err(DirectoryError::CannotPrefixInPartition); + } + + if path.is_empty() { + return Err(DirectoryError::NoPathProvided); + } + + let node = self.find(trx, path.to_owned()).await?; + + if node.exists() { + if node.is_in_partition(false) { + let sub_path = node.get_partition_subpath(); + match node.get_contents()? { + DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"), + DirectoryOutput::DirectoryPartition(directory_partition) => { + let dir_space = directory_partition + .directory_subspace + .directory_layer + .create_or_open_internal( + trx, + sub_path.to_owned(), + prefix, + layer, + allow_create, + allow_open, + ) + .await?; + Ok(dir_space) + } + } + } else { + self.open_internal(layer, &node, allow_open).await + } + } else { + self.create_internal(trx, path, layer, prefix, allow_create) + .await + } + } + + async fn open_internal( + &self, + layer: Option>, + node: &Node, + allow_open: bool, + ) -> Result { + if !allow_open { + return Err(DirectoryError::DirAlreadyExists); + } + + match layer { + None => {} + Some(layer) => { + if !layer.is_empty() { + match compare_slice(&layer, &node.layer) { + Ordering::Equal => {} + _ => { + return Err(DirectoryError::IncompatibleLayer); + } + } + } + } + } + + node.get_contents() + } + + async fn create_internal( + &self, + trx: &Transaction, + path: Vec, + layer: Option>, + prefix: Option>, + allow_create: bool, + ) -> Result { + if !allow_create { + return Err(DirectoryError::DirectoryDoesNotExists); + } + + let layer = layer.unwrap_or_default(); + + self.check_version(trx, allow_create).await?; + let new_prefix = self.get_prefix(trx, prefix.clone()).await?; + + let is_prefix_free = self + .is_prefix_free(trx, new_prefix.to_owned(), prefix.is_none()) + .await?; + + if !is_prefix_free { + return Err(DirectoryError::DirectoryPrefixInUse); + } + + let parent_node = self.get_parent_node(trx, path.to_owned()).await?; + let node = self.node_with_prefix(&new_prefix); + + let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path.last().unwrap())); + let key_layer = node.pack(&LAYER_SUFFIX.to_vec()); + + trx.set(&key.bytes(), &new_prefix); + trx.set(&key_layer, &layer); + + self.contents_of_node(node, path.to_owned(), layer.to_owned()) + } + + async fn get_parent_node( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + if path.len() > 1 { + let (_, list) = path.split_last().unwrap(); + + let parent = self + .create_or_open_internal(trx, list.to_vec(), None, None, true, true) + .await?; + Ok(self.node_with_prefix(&parent.bytes().to_vec())) + } else { + Ok(self.root_node.clone()) + } + } + + async fn is_prefix_free( + &self, + trx: &Transaction, + prefix: Vec, + snapshot: bool, + ) -> Result { + if prefix.is_empty() { + return Ok(false); + } + + let node = self + .node_containing_key(trx, prefix.to_owned(), snapshot) + .await?; + + if node.is_some() { + return Ok(false); + } + + let range_option = RangeOption::from(( + self.node_subspace.pack(&prefix), + self.node_subspace.pack(&strinc(prefix)), + )); + + let result = trx.get_range(&range_option, 1, snapshot).await?; + + Ok(result.is_empty()) + } + + async fn node_containing_key( + &self, + trx: &Transaction, + key: Vec, + snapshot: bool, + ) -> Result, DirectoryError> { + if key.starts_with(self.node_subspace.bytes()) { + return Ok(Some(self.root_node.clone())); + } + + let mut key_after = key.to_vec(); + // pushing 0x00 to simulate keyAfter + key_after.push(0); + + let range_end = self.node_subspace.pack(&key_after); + + let mut range_option = RangeOption::from((self.node_subspace.range().0, range_end)); + range_option.reverse = true; + range_option.limit = Some(1); + + // checking range + let fdb_values = trx.get_range(&range_option, 1, snapshot).await?; + + match fdb_values.get(0) { + None => {} + Some(fdb_key_value) => { + let previous_prefix: Vec = + self.node_subspace.unpack(fdb_key_value.key())?; + match previous_prefix.get(0) { + Some(Element::Bytes(b)) => { + let previous_prefix = b.to_vec(); + if key.starts_with(&previous_prefix) { + return Ok(Some(self.node_with_prefix(&previous_prefix))); + }; + } + _ => {} + }; + } + } + Ok(None) + } + + async fn get_prefix( + &self, + trx: &Transaction, + prefix: Option>, + ) -> Result, DirectoryError> { + match prefix { + None => { + // no prefix provided, allocating one + let allocator = self.allocator.allocate(trx).await?; + let subspace = self.content_subspace.subspace(&allocator); + + // checking range + let result = trx + .get_range(&RangeOption::from(subspace.range()), 1, false) + .await?; + + if !result.is_empty() { + return Err(DirectoryError::PrefixNotEmpty); + } + + Ok(subspace.bytes().to_vec()) + } + Some(v) => Ok(v), + } + } + + /// `check_version` is checking the Directory's version in FDB. + async fn check_version( + &self, + trx: &Transaction, + allow_creation: bool, + ) -> Result<(), DirectoryError> { + let version = self.get_version_value(trx).await?; + match version { + None => { + if allow_creation { + self.initialize_directory(trx).await + } else { + return Ok(()); + } + } + Some(versions) => { + if versions.len() < 12 { + return Err(DirectoryError::Version( + "incorrect version length".to_string(), + )); + } + let mut arr = [0u8; 4]; + arr.copy_from_slice(&versions[0..4]); + let major: u32 = u32::from_le_bytes(arr); + + arr.copy_from_slice(&versions[4..8]); + let minor: u32 = u32::from_le_bytes(arr); + + arr.copy_from_slice(&versions[8..12]); + let patch: u32 = u32::from_le_bytes(arr); + + if major > MAJOR_VERSION { + let msg = format!("cannot load directory with version {}.{}.{} using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION); + return Err(DirectoryError::Version(msg)); + } + + if minor > MINOR_VERSION { + let msg = format!("directory with version {}.{}.{} is read-only when opened using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION); + return Err(DirectoryError::Version(msg)); + } + + Ok(()) + } + } + } + + /// `initialize_directory` is initializing the directory + async fn initialize_directory(&self, trx: &Transaction) -> Result<(), DirectoryError> { + let mut value = vec![]; + value.write_u32::(MAJOR_VERSION).unwrap(); + value.write_u32::(MINOR_VERSION).unwrap(); + value.write_u32::(PATCH_VERSION).unwrap(); + let version_subspace: &[u8] = b"version"; + let directory_version_key = self.root_node.subspace(&version_subspace); + trx.set(directory_version_key.bytes(), &value); + + Ok(()) + } + + async fn get_version_value(&self, trx: &Transaction) -> FdbResult> { + let version_subspace: &[u8] = b"version"; + let version_key = self.root_node.subspace(&version_subspace); + + trx.get(version_key.bytes(), false).await + } + + async fn exists_internal( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + self.check_version(trx, false).await?; + + let node = self.find(trx, path.to_owned()).await?; + + if !node.exists() { + return Ok(false); + } + + if node.is_in_partition(false) { + return node + .get_contents()? + .exists(trx, node.to_owned().get_partition_subpath()) + .await; + } + + Ok(true) + } + + async fn list_internal( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + self.check_version(trx, false).await?; + + let node = self.find(trx, path.to_owned()).await?; + if !node.exists() { + return Err(DirectoryError::PathDoesNotExists); + } + if node.is_in_partition(true) { + match node.get_contents()? { + DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"), + DirectoryOutput::DirectoryPartition(directory_partition) => { + return directory_partition + .directory_subspace + .directory_layer + .list(trx, node.get_partition_subpath()) + .await + } + }; + } + + Ok(node.list_sub_folders(trx).await?) + } + + async fn move_to_internal( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + self.check_version(trx, true).await?; + + if old_path.len() <= new_path.len() + && compare_slice(&old_path[..], &new_path[..old_path.len()]) == Ordering::Equal + { + return Err(DirectoryError::CannotMoveBetweenSubdirectory); + } + + let old_node = self.find(trx, old_path.to_owned()).await?; + let new_node = self.find(trx, new_path.to_owned()).await?; + + if !old_node.exists() { + return Err(DirectoryError::PathDoesNotExists); + } + + if old_node.is_in_partition(false) || new_node.is_in_partition(false) { + if !old_node.is_in_partition(false) + || !new_node.is_in_partition(false) + || old_node.current_path.eq(&new_node.current_path) + { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + + return new_node + .get_contents()? + .move_to( + trx, + old_node.get_partition_subpath(), + new_node.get_partition_subpath(), + ) + .await; + } + + if new_node.exists() || new_path.is_empty() { + return Err(DirectoryError::DirAlreadyExists); + } + + let parent_path = match new_path.split_last() { + None => vec![], + Some((_, elements)) => elements.to_vec(), + }; + + let parent_node = self.find(trx, parent_path).await?; + if !parent_node.exists() { + return Err(DirectoryError::ParentDirDoesNotExists); + } + + let subspace_parent_node = match parent_node.subspace { + // not reachable because `self.find` is creating a node with a subspace, + None => unreachable!("node's subspace is not set"), + Some(ref s) => s.clone(), + }; + + let key = + subspace_parent_node.subspace(&(DEFAULT_SUB_DIRS, new_path.to_owned().last().unwrap())); + let value: Vec = self + .node_subspace + .unpack(old_node.subspace.clone().unwrap().bytes())?; + trx.set(&key.bytes(), &value); + + self.remove_from_parent(trx, old_path.to_owned()).await?; + + self.contents_of_node( + old_node.subspace.unwrap(), + new_path.to_owned(), + old_node.layer, + ) + } + + async fn remove_from_parent( + &self, + trx: &Transaction, + path: Vec, + ) -> Result<(), DirectoryError> { + let (last_element, parent_path) = match path.split_last() { + None => return Err(DirectoryError::BadDestinationDirectory), + Some((last, elements)) => (last.clone(), elements.to_vec()), + }; + + let parent_node = self.find(trx, parent_path).await?; + match parent_node.subspace { + None => {} + Some(subspace) => { + let key = subspace.pack(&(DEFAULT_SUB_DIRS, last_element)); + trx.clear(&key); + } + } + + Ok(()) + } + + #[async_recursion] + async fn remove_internal( + &self, + trx: &Transaction, + path: Vec, + fail_on_nonexistent: bool, + ) -> Result { + self.check_version(trx, true).await?; + + if path.is_empty() { + return Err(DirectoryError::CannotModifyRootDirectory); + } + + let node = self.find(&trx, path.to_owned()).await?; + + if !node.exists() { + return if fail_on_nonexistent { + Err(DirectoryError::DirectoryDoesNotExists) + } else { + Ok(false) + }; + } + + if node.is_in_partition(false) { + match node.get_contents()? { + DirectoryOutput::DirectorySubspace(_) => { + unreachable!("already directory partition") + } + DirectoryOutput::DirectoryPartition(d) => { + return d + .directory_subspace + .directory_layer + .remove_internal(trx, node.get_partition_subpath(), fail_on_nonexistent) + .await + } + } + } + + self.remove_recursive(trx, node.subspace.unwrap().clone()) + .await?; + self.remove_from_parent(trx, path.to_owned()).await?; + + Ok(true) + } + + #[async_recursion] + async fn remove_recursive( + &self, + trx: &Transaction, + node_sub: Subspace, + ) -> Result<(), DirectoryError> { + let sub_dir = node_sub.subspace(&(DEFAULT_SUB_DIRS)); + let (mut begin, end) = sub_dir.range(); + + loop { + let range_option = RangeOption::from((begin.as_slice(), end.as_slice())); + + let range = trx.get_range(&range_option, 1024, false).await?; + let has_more = range.more(); + + for row_key in range { + let sub_node = self.node_with_prefix(&row_key.value()); + self.remove_recursive(trx, sub_node).await?; + begin = row_key.key().pack_to_vec(); + } + + if !has_more { + break; + } + } + + let node_prefix: Vec = self.node_subspace.unpack(node_sub.bytes())?; + + trx.clear_range(&node_prefix, &strinc(node_prefix.to_owned())); + trx.clear_subspace_range(&node_sub); + + Ok(()) + } +} + +#[async_trait] +impl Directory for DirectoryLayer { + /// `create_or_open` opens the directory specified by path (relative to this + /// Directory), and returns the directory and its contents as a + /// Subspace. If the directory does not exist, it is created + /// (creating parent directories if necessary). + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.create_or_open_internal(txn, path, prefix, layer, true, true) + .await + } + + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.create_or_open_internal(txn, path, prefix, layer, true, false) + .await + } + + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result { + self.create_or_open_internal(txn, path, None, layer, false, true) + .await + } + + async fn exists(&self, trx: &Transaction, path: Vec) -> Result { + self.exists_internal(trx, path).await + } + + async fn move_directory( + &self, + _trx: &Transaction, + _new_path: Vec, + ) -> Result { + Err(DirectoryError::CannotMoveRootDirectory) + } + + /// `move_to` the directory from old_path to new_path(both relative to this + /// Directory), and returns the directory (at its new location) and its + /// contents as a Subspace. Move will return an error if a directory + /// does not exist at oldPath, a directory already exists at newPath, or the + /// parent directory of newPath does not exist. + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + self.move_to_internal(trx, old_path, new_path).await + } + + async fn remove(&self, trx: &Transaction, path: Vec) -> Result { + self.remove_internal(trx, path.to_owned(), true).await + } + + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + self.remove_internal(trx, path.to_owned(), false).await + } + + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + self.list_internal(trx, path).await + } +} diff --git a/foundationdb/src/directory/directory_partition.rs b/foundationdb/src/directory/directory_partition.rs new file mode 100644 index 00000000..d253c3f4 --- /dev/null +++ b/foundationdb/src/directory/directory_partition.rs @@ -0,0 +1,243 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! A resulting Subspace whose prefix is preprended to all of its descendant directories's prefixes. + +use crate::directory::directory_layer::{DirectoryLayer, DEFAULT_NODE_PREFIX, PARTITION_LAYER}; +use crate::directory::directory_subspace::DirectorySubspace; +use crate::directory::error::DirectoryError; +use crate::directory::{Directory, DirectoryOutput}; +use crate::tuple::Subspace; +use crate::Transaction; +use async_trait::async_trait; +use std::ops::Deref; +use std::sync::Arc; + +/// A `DirectoryPartition` is a DirectorySubspace whose prefix is preprended to all of its descendant +/// directories's prefixes. It cannot be used as a Subspace. Instead, you must create at +/// least one subdirectory to store content. +#[derive(Clone)] +pub struct DirectoryPartition { + pub(crate) inner: Arc, +} + +#[derive(Debug)] +pub struct DirectoryPartitionInner { + pub(crate) directory_subspace: DirectorySubspace, + pub(crate) parent_directory_layer: DirectoryLayer, +} + +impl Deref for DirectoryPartition { + type Target = DirectoryPartitionInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::fmt::Debug for DirectoryPartition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +impl DirectoryPartition { + // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectoryPartition.h#L34-L43 + pub(crate) fn new( + path: Vec, + prefix: Vec, + parent_directory_layer: DirectoryLayer, + ) -> Self { + let mut node_subspace_bytes = vec![]; + node_subspace_bytes.extend_from_slice(&prefix); + node_subspace_bytes.extend_from_slice(DEFAULT_NODE_PREFIX); + + let new_directory_layer = DirectoryLayer::new_with_path( + Subspace::from_bytes(&node_subspace_bytes), + Subspace::from_bytes(prefix.as_slice()), + false, + path.to_owned(), + ); + + DirectoryPartition { + inner: Arc::new(DirectoryPartitionInner { + directory_subspace: DirectorySubspace::new( + path, + prefix, + &new_directory_layer, + Vec::from(PARTITION_LAYER), + ), + parent_directory_layer, + }), + } + } +} + +impl DirectoryPartition { + pub fn get_path(&self) -> Vec { + self.inner.directory_subspace.get_path() + } + + fn get_directory_layer_for_path(&self, path: &Vec) -> DirectoryLayer { + if path.is_empty() { + self.parent_directory_layer.clone() + } else { + self.directory_subspace.directory_layer.clone() + } + } + + fn get_partition_subpath( + &self, + path: Vec, + directory_layer: Option, + ) -> Vec { + let mut new_path = vec![]; + + new_path.extend_from_slice( + &self.directory_subspace.get_path()[directory_layer + .unwrap_or(self.directory_subspace.directory_layer.clone()) + .path + .len()..], + ); + new_path.extend_from_slice(&path); + + new_path + } + + pub fn get_layer(&self) -> Vec { + String::from("partition").into_bytes() + } +} + +#[async_trait] +impl Directory for DirectoryPartition { + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.inner + .directory_subspace + .create_or_open(txn, path, prefix, layer) + .await + } + + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.inner + .directory_subspace + .create(txn, path, prefix, layer) + .await + } + + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result { + self.inner.directory_subspace.open(txn, path, layer).await + } + + async fn exists(&self, trx: &Transaction, path: Vec) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + + directory_layer + .exists( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn move_directory( + &self, + trx: &Transaction, + new_path: Vec, + ) -> Result { + let directory_layer = self.get_directory_layer_for_path(&vec![]); + let directory_layer_path = directory_layer.path.to_owned(); + + if directory_layer_path.len() > new_path.len() { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + + for (i, path) in directory_layer_path.iter().enumerate() { + match new_path.get(i) { + None => return Err(DirectoryError::CannotMoveBetweenPartition), + Some(new_path_item) => { + if !new_path_item.eq(path) { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + } + } + } + + let mut new_relative_path = vec![]; + new_relative_path.extend_from_slice(&new_path[directory_layer_path.len()..]); + + directory_layer + .move_to( + trx, + self.get_partition_subpath(vec![], Some(directory_layer.clone())), + new_relative_path.to_owned(), + ) + .await + } + + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + self.inner + .directory_subspace + .move_to(trx, old_path, new_path) + .await + } + + async fn remove(&self, trx: &Transaction, path: Vec) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + directory_layer + .remove( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + directory_layer + .remove_if_exists( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + self.inner.directory_subspace.list(trx, path).await + } +} diff --git a/foundationdb/src/directory/directory_subspace.rs b/foundationdb/src/directory/directory_subspace.rs index 000ca8a3..0a6bdcce 100644 --- a/foundationdb/src/directory/directory_subspace.rs +++ b/foundationdb/src/directory/directory_subspace.rs @@ -1,14 +1,252 @@ -use crate::tuple::Subspace; -use crate::{Directory, DirectoryError}; -use std::result; +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. -pub type DirectorySubspaceResult = result::Result; +//! The resulting Subspace generated with a Directory +use crate::directory::directory_layer::DirectoryLayer; +use crate::directory::error::DirectoryError; +use crate::directory::{Directory, DirectoryOutput}; +use crate::tuple::{PackResult, Subspace, TuplePack, TupleUnpack}; +use crate::Transaction; +use async_trait::async_trait; + +/// A `DirectorySubspace` represents the contents of a directory, but it also remembers +/// the path with which it was opened and offers convenience methods to operate on the directory at that path. +/// An instance of `DirectorySubspace` can be used for all the usual subspace operations. +/// It can also be used to operate on the directory with which it was opened. +#[derive(Debug, Clone)] pub struct DirectorySubspace { + pub(crate) directory_layer: DirectoryLayer, subspace: Subspace, - dl: Directory, path: Vec, - layer: Vec + layer: Vec, +} + +impl DirectorySubspace { + pub fn new( + path: Vec, + prefix: Vec, + directory_layer: &DirectoryLayer, + layer: Vec, + ) -> Self { + DirectorySubspace { + directory_layer: directory_layer.clone(), + subspace: Subspace::from_bytes(&prefix), + path, + layer, + } + } + + // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectorySubspace.cpp#L105 + fn get_partition_subpath( + &self, + path: Vec, + directory_layer: Option, + ) -> Vec { + let mut new_path = vec![]; + + new_path.extend_from_slice( + &self.path[directory_layer + .unwrap_or(self.directory_layer.clone()) + .path + .len()..], + ); + new_path.extend_from_slice(&path); + + new_path + } } -// TODO: impl { .. } \ No newline at end of file +impl DirectorySubspace { + pub fn subspace(&self, t: &T) -> Subspace { + self.subspace.subspace(t) + } + + pub fn bytes(&self) -> &[u8] { + self.subspace.bytes() + } + + pub fn pack(&self, t: &T) -> Vec { + self.subspace.pack(t) + } + + pub fn unpack<'de, T: TupleUnpack<'de>>(&self, key: &'de [u8]) -> PackResult { + self.subspace.unpack(key) + } + + pub fn range(&self) -> (Vec, Vec) { + self.subspace.range() + } + + pub fn get_path(&self) -> Vec { + self.path.clone() + } + + pub fn set_path(&mut self, path: Vec) { + self.path = path; + } + + pub fn get_layer(&self) -> Vec { + self.layer.clone() + } + + pub fn is_start_of(&self, key: &[u8]) -> bool { + self.subspace.is_start_of(&key) + } + + fn get_directory_layer_for_path(&self, _: &Vec) -> DirectoryLayer { + self.directory_layer.clone() + } +} + +#[async_trait] +impl Directory for DirectorySubspace { + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.directory_layer + .create_or_open( + txn, + self.get_partition_subpath(path.to_owned(), None), + prefix, + layer, + ) + .await + } + + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + self.directory_layer + .create( + txn, + self.get_partition_subpath(path.to_owned(), None), + prefix, + layer, + ) + .await + } + + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result { + self.directory_layer + .open( + txn, + self.get_partition_subpath(path.to_owned(), None), + layer, + ) + .await + } + + async fn exists(&self, trx: &Transaction, path: Vec) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + + directory_layer + .exists( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn move_directory( + &self, + trx: &Transaction, + new_path: Vec, + ) -> Result { + let directory_layer = self.get_directory_layer_for_path(&vec![]); + let directory_layer_path = directory_layer.path.to_owned(); + + if directory_layer_path.len() > new_path.len() { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + + for (i, path) in directory_layer_path.iter().enumerate() { + match new_path.get(i) { + None => return Err(DirectoryError::CannotMoveBetweenPartition), + Some(new_path_item) => { + if !new_path_item.eq(path) { + return Err(DirectoryError::CannotMoveBetweenPartition); + } + } + } + } + + let mut new_relative_path = vec![]; + new_relative_path.extend_from_slice(&new_path[directory_layer_path.len()..]); + + directory_layer + .move_to( + trx, + self.get_partition_subpath(vec![], Some(directory_layer.clone())), + new_relative_path.to_owned(), + ) + .await + } + + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { + self.directory_layer + .move_to( + trx, + self.get_partition_subpath(old_path, None), + self.get_partition_subpath(new_path, None), + ) + .await + } + + async fn remove(&self, trx: &Transaction, path: Vec) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + directory_layer + .remove( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + let directory_layer = self.get_directory_layer_for_path(&path); + directory_layer + .remove_if_exists( + trx, + self.get_partition_subpath(path.to_owned(), Some(directory_layer.clone())), + ) + .await + } + + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + self.directory_layer + .list(trx, self.get_partition_subpath(path.to_owned(), None)) + .await + } +} diff --git a/foundationdb/src/directory/error.rs b/foundationdb/src/directory/error.rs new file mode 100644 index 00000000..c81582c9 --- /dev/null +++ b/foundationdb/src/directory/error.rs @@ -0,0 +1,73 @@ +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! Errors that can be thrown by Directory. + +use crate::error; +use crate::tuple::hca::HcaError; +use crate::tuple::PackError; +use std::io; + +/// The enumeration holding all possible errors from a Directory. +#[derive(Debug)] +pub enum DirectoryError { + /// cannot modify the root directory + CannotModifyRootDirectory, + /// prefix is already used + DirectoryPrefixInUse, + /// Directory does not exists + DirectoryDoesNotExists, + /// missing path. + NoPathProvided, + /// tried to create an already existing path. + DirAlreadyExists, + /// missing directory. + PathDoesNotExists, + /// Parent does not exists + ParentDirDoesNotExists, + /// the layer is incompatible. + IncompatibleLayer, + /// the destination directory cannot be a subdirectory of the source directory. + BadDestinationDirectory, + /// Bad directory version. + Version(String), + /// cannot specify a prefix unless manual prefixes are enabled + PrefixNotAllowed, + /// cannot specify a prefix in a partition. + CannotPrefixInPartition, + /// the root directory cannot be moved + CannotMoveRootDirectory, + CannotMoveBetweenPartition, + /// the destination directory cannot be a subdirectory of the source directory + CannotMoveBetweenSubdirectory, + /// Prefix is not empty + PrefixNotEmpty, + IoError(io::Error), + FdbError(error::FdbError), + HcaError(HcaError), + PackError(PackError), + Other(String), +} + +impl From for DirectoryError { + fn from(err: error::FdbError) -> Self { + DirectoryError::FdbError(err) + } +} + +impl From for DirectoryError { + fn from(err: HcaError) -> Self { + DirectoryError::HcaError(err) + } +} + +impl From for DirectoryError { + fn from(err: PackError) -> Self { + DirectoryError::PackError(err) + } +} diff --git a/foundationdb/src/directory/mod.rs b/foundationdb/src/directory/mod.rs index 1c4d03f1..fcee4d29 100644 --- a/foundationdb/src/directory/mod.rs +++ b/foundationdb/src/directory/mod.rs @@ -1,48 +1,372 @@ -pub mod directory; -pub mod node; +// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors +// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors. +// +// Licensed under the Apache License, Version 2.0, or the MIT license , at your option. This file may not be +// copied, modified, or distributed except according to those terms. + +//! Directory provides a tool for managing related subspaces. +//! +//! The FoundationDB API provides directories as a tool for managing related Subspaces. +//! For general guidance on directory usage, see the discussion in the [Developer Guide](https://apple.github.io/foundationdb/developer-guide.html#directories). +//! +//! Directories are identified by hierarchical paths analogous to the paths in a Unix-like file system. +//! A path is represented as a slice of strings. Each directory has an associated subspace used to +//! store its content. The directory layer maps each path to a short prefix used for the +//! corresponding subspace. In effect, directories provide a level of indirection for access to subspaces. +//! Directory operations are transactional. +//! +//! It is a direct backport of the [Flow implementation](https://github.com/apple/foundationdb/tree/master/bindings/flow). +//! +//! Examples: +//! +//! ```rust +//! use futures::prelude::*; +//! use foundationdb::directory::Directory; +//! +//! async fn async_main() -> foundationdb::FdbResult<()> { +//! let db = foundationdb::Database::default()?; +//! +//! // creates a transaction +//! let trx = db.create_trx()?; +//! +//! // creates a directory +//! let directory = foundationdb::directory::directory_layer::DirectoryLayer::default(); +//! +//! // use the directory to create a subspace to use +//! let content_subspace = directory.create_or_open( +//! // the transaction used to read/write the directory. +//! &trx, +//! // the path used, which can view as a UNIX path like `/app/my-app`. +//! vec![String::from("my-awesome-app"), String::from("my-awesome-user")], +//! // do not use any custom prefix or layer +//! None, None, +//! ).await; +//! assert_eq!(true, content_subspace.is_ok()); +//! +//! // Don't forget to commit your transaction to persist the subspace +//! trx.commit().await?; +//! +//! Ok(()) +//! } +//! +//! // Safe because drop is called before the program exits +//! let network = unsafe { foundationdb::boot() }; +//! futures::executor::block_on(async_main()).expect("failed to run"); +//! drop(network); +//! ``` +pub mod directory_layer; +pub mod directory_partition; pub mod directory_subspace; +pub mod error; +pub(crate) mod node; + +use crate::directory::directory_subspace::DirectorySubspace; +use crate::directory::error::DirectoryError; +use async_trait::async_trait; + +use crate::Transaction; + +use crate::directory::directory_partition::DirectoryPartition; +use crate::tuple::{PackResult, Subspace, TuplePack, TupleUnpack}; +use core::cmp; +use std::cmp::Ordering; + +/// `Directory` represents a subspace of keys in a FoundationDB database, identified by a hierarchical path. +#[async_trait] +pub trait Directory { + /// Creates or opens the subdirectory of this Directory located at path (creating parent directories, if necessary). + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result; + + /// Creates a subdirectory of this Directory located at path (creating parent directories if necessary). + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result; + + /// Opens the subdirectory of this Directory located at path. + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result; + + /// Checks if the subdirectory of this Directory located at path exists. + async fn exists(&self, trx: &Transaction, path: Vec) -> Result; + + /// Moves this Directory to the specified newAbsolutePath. + async fn move_directory( + &self, + trx: &Transaction, + new_path: Vec, + ) -> Result; + + /// Moves the subdirectory of this Directory located at oldpath to newpath. + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result; + + /// Removes the subdirectory of this Directory located at path and all of its subdirectories, as well as all of their contents. + async fn remove(&self, trx: &Transaction, path: Vec) -> Result; -pub use directory::*; -use std::io; -use std::fmt::{self, Display}; -use crate::error; - - -#[derive(Debug)] -pub enum DirectoryError { - CannotOpenRoot, - LayerMismatch, - NotExist, - Message(String), - Version(String), - IoError(io::Error), - FdbError(error::FdbError) + /// Removes the subdirectory of this Directory located at path (if the path exists) and all of its subdirectories, as well as all of their contents. + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result; + + /// List the subdirectories of this directory at a given subpath. + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError>; } -impl From for DirectoryError { - fn from(err: io::Error) -> Self { - DirectoryError::IoError(err) +pub(crate) fn compare_slice(a: &[T], b: &[T]) -> cmp::Ordering { + for (ai, bi) in a.iter().zip(b.iter()) { + match ai.cmp(&bi) { + Ordering::Equal => continue, + ord => return ord, + } } + + // if every single element was equal, compare length + a.len().cmp(&b.len()) +} + +/// DirectoryOutput represents the different output of a Directory. +#[derive(Clone, Debug)] +pub enum DirectoryOutput { + /// Under classic usage, you will obtain an `DirectorySubspace` + DirectorySubspace(DirectorySubspace), + /// You can open an `DirectoryPartition` by using the "partition" layer + DirectoryPartition(DirectoryPartition), } -impl From for DirectoryError { - fn from(err: error::FdbError) -> Self { - DirectoryError::FdbError(err) +// TODO: should we have a Subspace trait? +impl DirectoryOutput { + pub fn subspace(&self, t: &T) -> Subspace { + match self { + DirectoryOutput::DirectorySubspace(d) => d.subspace(t), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot open subspace in the root of a directory partition") + } + } + } + + pub fn bytes(&self) -> &[u8] { + match self { + DirectoryOutput::DirectorySubspace(d) => d.bytes(), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot get key for the root of a directory partition") + } + } + } + + pub fn pack(&self, t: &T) -> Vec { + match self { + DirectoryOutput::DirectorySubspace(d) => d.pack(t), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot pack for the root of a directory partition") + } + } + } + + pub fn unpack<'de, T: TupleUnpack<'de>>(&self, key: &'de [u8]) -> PackResult { + match self { + DirectoryOutput::DirectorySubspace(d) => d.unpack(key), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot unpack keys using the root of a directory partition") + } + } + } + + pub fn range(&self) -> (Vec, Vec) { + match self { + DirectoryOutput::DirectorySubspace(d) => d.range(), + DirectoryOutput::DirectoryPartition(_) => { + panic!("cannot get range for the root of a directory partition") + } + } + } + + pub fn get_path(&self) -> Vec { + match self { + DirectoryOutput::DirectorySubspace(d) => d.get_path(), + DirectoryOutput::DirectoryPartition(d) => d.get_path(), + } + } + + pub fn get_layer(&self) -> Vec { + match self { + DirectoryOutput::DirectorySubspace(d) => d.get_layer(), + DirectoryOutput::DirectoryPartition(d) => d.get_layer(), + } } } -impl Display for DirectoryError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { +#[async_trait] +impl Directory for DirectoryOutput { + async fn create_or_open( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => { + d.create_or_open(txn, path, prefix, layer).await + } + DirectoryOutput::DirectoryPartition(d) => { + d.create_or_open(txn, path, prefix, layer).await + } + } + } + + async fn create( + &self, + txn: &Transaction, + path: Vec, + prefix: Option>, + layer: Option>, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.create(txn, path, prefix, layer).await, + DirectoryOutput::DirectoryPartition(d) => d.create(txn, path, prefix, layer).await, + } + } + + async fn open( + &self, + txn: &Transaction, + path: Vec, + layer: Option>, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.open(txn, path, layer).await, + DirectoryOutput::DirectoryPartition(d) => d.open(txn, path, layer).await, + } + } + + async fn exists(&self, trx: &Transaction, path: Vec) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.exists(trx, path).await, + DirectoryOutput::DirectoryPartition(d) => d.exists(trx, path).await, + } + } + + async fn move_directory( + &self, + trx: &Transaction, + new_path: Vec, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.move_directory(trx, new_path).await, + DirectoryOutput::DirectoryPartition(d) => d.move_directory(trx, new_path).await, + } + } + + async fn move_to( + &self, + trx: &Transaction, + old_path: Vec, + new_path: Vec, + ) -> Result { match self { - DirectoryError::CannotOpenRoot => write!(f, "Cannot open root directory"), - DirectoryError::LayerMismatch => write!(f, "Layer mismatch"), - DirectoryError::NotExist => write!(f, "Directory does not exist"), - DirectoryError::Version(s) => s.fmt(f), - DirectoryError::Message(s) => s.fmt(f), - DirectoryError::IoError(err) => err.fmt(f), - DirectoryError::FdbError(err) => err.fmt(f), + DirectoryOutput::DirectorySubspace(d) => d.move_to(trx, old_path, new_path).await, + DirectoryOutput::DirectoryPartition(d) => d.move_to(trx, old_path, new_path).await, } } + + async fn remove(&self, trx: &Transaction, path: Vec) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.remove(trx, path).await, + DirectoryOutput::DirectoryPartition(d) => d.remove(trx, path).await, + } + } + + async fn remove_if_exists( + &self, + trx: &Transaction, + path: Vec, + ) -> Result { + match self { + DirectoryOutput::DirectorySubspace(d) => d.remove_if_exists(trx, path).await, + DirectoryOutput::DirectoryPartition(d) => d.remove_if_exists(trx, path).await, + } + } + + async fn list( + &self, + trx: &Transaction, + path: Vec, + ) -> Result, DirectoryError> { + match self { + DirectoryOutput::DirectorySubspace(d) => d.list(trx, path).await, + DirectoryOutput::DirectoryPartition(d) => d.list(trx, path).await, + } + } +} + +// Strinc returns the first key that would sort outside the range prefixed by prefix. +pub(crate) fn strinc(key: Vec) -> Vec { + let mut key = key; + + for i in (0..key.len()).rev() { + if key[i] != 0xff { + key[i] += 1; + return key; + } else { + // stripping key from trailing 0xFF bytes + key.remove(i); + } + } + panic!("failed to strinc"); } +#[cfg(test)] +mod tests { + use super::*; + + // https://github.com/apple/foundationdb/blob/e34df983ee8c0db333babf36fb620318d026553d/bindings/c/test/unit/unit_tests.cpp#L95 + #[test] + fn test_strinc() { + assert_eq!(strinc(Vec::from("a".as_bytes())), Vec::from("b".as_bytes())); + assert_eq!(strinc(Vec::from("y".as_bytes())), Vec::from("z".as_bytes())); + assert_eq!( + strinc(Vec::from("!".as_bytes())), + Vec::from("\"".as_bytes()) + ); + assert_eq!(strinc(Vec::from("*".as_bytes())), Vec::from("+".as_bytes())); + assert_eq!( + strinc(Vec::from("fdb".as_bytes())), + Vec::from("fdc".as_bytes()) + ); + assert_eq!( + strinc(Vec::from("foundation database 6".as_bytes())), + Vec::from("foundation database 7".as_bytes()) + ); + assert_eq!(strinc(vec![61u8, 62u8, 255u8]), Vec::from(vec![61u8, 63u8])); + // from seed 3180880087 + assert_eq!(strinc(vec![253u8, 255u8]), Vec::from(vec![254u8])); + assert_eq!(strinc(vec![253u8, 255u8, 255u8]), Vec::from(vec![254u8])); + } +} diff --git a/foundationdb/src/directory/node.rs b/foundationdb/src/directory/node.rs index c9135c3e..6843ffa6 100644 --- a/foundationdb/src/directory/node.rs +++ b/foundationdb/src/directory/node.rs @@ -1,65 +1,97 @@ -use crate::tuple::{Subspace}; -use crate::future::FdbSlice; -use crate::{Transaction, Directory, DirectoryResult}; -use crate::directory::directory_subspace::DirectorySubspaceResult; - - -pub struct Node { - subspace: Subspace, - path: Vec, - target_path: Vec, - _layer: Option> +use crate::directory::directory_layer::{ + DirectoryLayer, DEFAULT_SUB_DIRS, LAYER_SUFFIX, PARTITION_LAYER, +}; +use crate::directory::error::DirectoryError; +use crate::directory::DirectoryOutput; +use crate::tuple::Subspace; +use crate::RangeOption; +use crate::Transaction; + +#[derive(Debug, Clone)] +pub(crate) struct Node { + pub(crate) subspace: Option, + pub(crate) current_path: Vec, + pub(crate) target_path: Vec, + pub(crate) layer: Vec, + pub(crate) loaded_metadata: bool, + pub(crate) directory_layer: DirectoryLayer, } impl Node { + // `load_metadata` is loading extra information for the node, like the layer + pub(crate) async fn load_metadata(&mut self, trx: &Transaction) -> Result<(), DirectoryError> { + if !self.exists() { + self.loaded_metadata = true; + return Ok(()); + } - pub fn exists(&self) -> bool { - // if self.subspace == None { - // return false; - // } + let key = self.subspace.as_ref().unwrap().pack(&LAYER_SUFFIX.to_vec()); + self.layer = match trx.get(&key, false).await { + Ok(None) => vec![], + Err(err) => return Err(DirectoryError::FdbError(err)), + Ok(Some(fdb_slice)) => fdb_slice.to_vec(), + }; - true - } + self.loaded_metadata = true; - pub async fn prefetchMetadata(&self, trx: &Transaction) -> &Node { - if self.exists() { - self.layer(trx).await; - } - - return self; + Ok(()) } - pub async fn layer(&mut self, trx: &Transaction) -> &[u8] { - if self._layer == None { - let key = self.subspace.subspace(&b"layer".to_vec()); - self._layer = match trx.get(key.bytes(), false).await { - Ok(None) => Some(Vec::new()), - Err(_) => Some(Vec::new()), - Ok(Some(fv)) => Some(fv.to_vec()) - } - } + pub(crate) fn is_in_partition(&self, include_empty_subpath: bool) -> bool { + assert!(self.loaded_metadata); - return self._layer.unwrap().as_slice() + self.exists() + && self.layer.eq(PARTITION_LAYER) + && (include_empty_subpath || self.target_path.len() > self.current_path.len()) } - pub async fn is_in_partition(&mut self, trx: Transaction, include_empty_subpath: bool) -> bool { - if !self.exists() { - return false - } + pub(crate) fn get_partition_subpath(&self) -> Vec { + Vec::from(&self.target_path[self.current_path.len()..]) + } - self.layer(&trx).await == b"partition" && - (include_empty_subpath || self.target_path.len() > self.path.len()) + pub(crate) fn exists(&self) -> bool { + self.subspace.is_some() } - pub fn get_partition_subpath(&self) -> &[String] { - self.target_path[..self.path.len()].clone() + /// list sub-folders for a node + pub(crate) async fn list_sub_folders( + &self, + trx: &Transaction, + ) -> Result, DirectoryError> { + let mut results = vec![]; + + let range_option = RangeOption::from( + &self + .subspace + .as_ref() + .unwrap() + .to_owned() + .subspace(&(DEFAULT_SUB_DIRS)), + ); + + let fdb_values = trx.get_range(&range_option, 1_024, false).await?; + + for fdb_value in fdb_values { + let subspace = Subspace::from_bytes(fdb_value.key()); + // stripping from subspace + let sub_directory: (i64, String) = + self.subspace.as_ref().unwrap().unpack(subspace.bytes())?; + results.push(sub_directory.1); + } + Ok(results) } - pub async fn get_contents(self, directory: Directory, trx: &Transaction ) -> DirectorySubspaceResult { - directory.contents_of_node(self.subspace, &self.path, &self.layer(trx).await) + pub(crate) fn get_contents(&self) -> Result { + assert!(self.exists()); + assert!(self.loaded_metadata); + + match &self.subspace { + None => unreachable!(), + Some(subspace) => self.directory_layer.contents_of_node( + subspace.to_owned(), + self.current_path.to_owned(), + self.layer.to_owned(), + ), + } } } - - - - diff --git a/foundationdb/src/future.rs b/foundationdb/src/future.rs index 80a0827f..b60647ad 100644 --- a/foundationdb/src/future.rs +++ b/foundationdb/src/future.rs @@ -29,7 +29,6 @@ use std::ops::Deref; use std::os::raw::c_char; use std::pin::Pin; use std::ptr::NonNull; -use std::rc::Rc; use std::sync::Arc; use foundationdb_sys as fdb_sys; @@ -325,7 +324,7 @@ impl IntoIterator for FdbValues { fn into_iter(self) -> Self::IntoIter { FdbValuesIter { - f: Rc::new(self._f), + f: Arc::new(self._f), keyvalues: self.keyvalues, len: self.len, pos: 0, @@ -335,11 +334,14 @@ impl IntoIterator for FdbValues { /// An iterator of keyvalues owned by a foundationDB future pub struct FdbValuesIter { - f: Rc, + f: Arc, keyvalues: *const fdb_sys::FDBKeyValue, len: i32, pos: i32, } + +unsafe impl Send for FdbValuesIter {} + impl Iterator for FdbValuesIter { type Item = FdbValue; fn next(&mut self) -> Option { @@ -404,9 +406,12 @@ impl DoubleEndedIterator for FdbValuesIter { /// Until dropped, this might prevent multiple key/values from beeing freed. /// (i.e. the future that own the data is dropped once all data it provided is dropped) pub struct FdbValue { - _f: Rc, + _f: Arc, keyvalue: *const fdb_sys::FDBKeyValue, } + +unsafe impl Send for FdbValue {} + impl Deref for FdbValue { type Target = FdbKeyValue; fn deref(&self) -> &Self::Target { diff --git a/foundationdb/src/lib.rs b/foundationdb/src/lib.rs index 09c6fa86..f8e4c3af 100644 --- a/foundationdb/src/lib.rs +++ b/foundationdb/src/lib.rs @@ -100,7 +100,7 @@ pub mod api; #[cfg(any(feature = "fdb-5_1", feature = "fdb-5_2", feature = "fdb-6_0"))] pub mod cluster; mod database; -mod directory; +pub mod directory; mod error; pub mod future; mod keyselector; @@ -118,7 +118,6 @@ pub use crate::error::FdbError; pub use crate::error::FdbResult; pub use crate::keyselector::*; pub use crate::transaction::*; -pub use crate::directory::*; /// Initialize the FoundationDB Client API, this can only be called once per process. /// diff --git a/foundationdb/src/tuple/hca.rs b/foundationdb/src/tuple/hca.rs index 59b05e2c..a77963f7 100644 --- a/foundationdb/src/tuple/hca.rs +++ b/foundationdb/src/tuple/hca.rs @@ -91,16 +91,25 @@ impl TransactError for HcaError { /// Represents a High Contention Allocator for a given subspace #[derive(Debug)] pub struct HighContentionAllocator { + // original subspace kept to implement Clone + subspace: Subspace, counters: Subspace, recent: Subspace, allocation_mutex: Mutex<()>, } +impl Clone for HighContentionAllocator { + fn clone(&self) -> Self { + HighContentionAllocator::new(self.subspace.to_owned()) + } +} + impl HighContentionAllocator { /// Constructs an allocator that will use the input subspace for assigning values. /// The given subspace should not be used by anything other than the allocator pub fn new(subspace: Subspace) -> HighContentionAllocator { HighContentionAllocator { + subspace: subspace.clone(), counters: subspace.subspace(&0i64), recent: subspace.subspace(&1i64), allocation_mutex: Mutex::new(()), diff --git a/foundationdb/src/tuple/mod.rs b/foundationdb/src/tuple/mod.rs index 0c66a927..835a018a 100644 --- a/foundationdb/src/tuple/mod.rs +++ b/foundationdb/src/tuple/mod.rs @@ -5,7 +5,7 @@ mod element; pub mod hca; mod pack; -mod subspace; +pub mod subspace; mod versionstamp; use std::borrow::Cow; @@ -274,11 +274,11 @@ mod tests { // versionstamp test_serde( - Versionstamp::complete(b"\xaa\xbb\xcc\xdd\xee\xff\x00\x01\x02\x03".clone(), 0), + Versionstamp::complete(*b"\xaa\xbb\xcc\xdd\xee\xff\x00\x01\x02\x03", 0), b"\x33\xaa\xbb\xcc\xdd\xee\xff\x00\x01\x02\x03\x00\x00", ); test_serde( - Versionstamp::complete(b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a".clone(), 657), + Versionstamp::complete(*b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a", 657), b"\x33\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x02\x91", ); @@ -627,21 +627,21 @@ mod tests { test_serde(Element::Int(-1), &[0x13, 254]); test_serde( Element::Versionstamp(Versionstamp::complete( - b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a".clone(), + *b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a", 657, )), b"\x33\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x02\x91", ); test_serde( (Element::Versionstamp(Versionstamp::complete( - b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a".clone(), + *b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a", 657, )),), b"\x33\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x02\x91", ); test_serde( (Element::Versionstamp(Versionstamp::complete( - b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a".clone(), + *b"\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a", 657, )),), b"\x33\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x02\x91", diff --git a/foundationdb/tests/directory.rs b/foundationdb/tests/directory.rs index e9193222..1b845dea 100644 --- a/foundationdb/tests/directory.rs +++ b/foundationdb/tests/directory.rs @@ -5,24 +5,71 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. +use foundationdb::directory::directory_layer::DirectoryLayer; + +use foundationdb::directory::Directory; + use foundationdb::*; -use futures::future; -use futures::prelude::*; mod common; -async fn test_create_or_open_async() -> FdbResult<()> { - let db = common::database().await?; - let trx = db.create_trx()?; - let out = Directory::create_or_open(trx); - assert!(out); +#[test] +// testing basic features of the Directory, everything is tracked using with the BindingTester. +fn test_directory() { + let _guard = unsafe { foundationdb::boot() }; + let db = futures::executor::block_on(common::database()).expect("cannot open fdb"); - Ok(()) -} + eprintln!("clearing all keys"); + let trx = db.create_trx().expect("cannot create txn"); + trx.clear_range(b"", b"\xff"); + futures::executor::block_on(trx.commit()).expect("could not clear keys"); -#[test] -fn test_create_or_open() { - common::boot(); - futures::executor::block_on(test_create_or_open_async()).expect("failed to run"); + eprintln!("creating directories"); + let directory = DirectoryLayer::default(); + + futures::executor::block_on(test_create_then_open_then_delete( + &db, + &directory, + vec![String::from("application")], + )) + .expect("failed to run"); + + futures::executor::block_on(test_create_then_open_then_delete( + &db, + &directory, + vec![String::from("1"), String::from("2")], + )) + .expect("failed to run"); } +async fn test_create_then_open_then_delete( + db: &Database, + directory: &DirectoryLayer, + paths: Vec, +) -> FdbResult<()> { + let trx = db.create_trx()?; + + eprintln!("creating {:?}", &paths); + let create_output = directory.create(&trx, paths.to_owned(), None, None).await; + assert!( + create_output.is_ok(), + "cannot create: {:?}", + create_output.err().unwrap() + ); + trx.commit().await.expect("cannot commit"); + let trx = db.create_trx()?; + + eprintln!("opening {:?}", &paths); + let open_output = directory.open(&trx, paths.to_owned(), None).await; + assert!( + open_output.is_ok(), + "cannot create: {:?}", + open_output.err().unwrap() + ); + + assert_eq!(create_output.unwrap().bytes(), open_output.unwrap().bytes()); + trx.commit().await.expect("cannot commit"); + + // removing folder + Ok(()) +} diff --git a/foundationdb/tests/range.rs b/foundationdb/tests/range.rs index 884ddafe..2af5d354 100644 --- a/foundationdb/tests/range.rs +++ b/foundationdb/tests/range.rs @@ -51,8 +51,8 @@ async fn test_get_range_async() -> FdbResult<()> { let len = range.len(); let mut i = 0; for kv in &range { - assert!(kv.key().len() > 0); - assert!(kv.value().len() > 0); + assert!(!kv.key().is_empty()); + assert!(!kv.value().is_empty()); i += 1; } assert_eq!(i, len); diff --git a/foundationdb/tests/tokio.rs b/foundationdb/tests/tokio.rs index 0f547c37..551f95ba 100644 --- a/foundationdb/tests/tokio.rs +++ b/foundationdb/tests/tokio.rs @@ -22,7 +22,7 @@ async fn do_transact() { .expect("failed to open fdb"), ); - let adb = db.clone(); + let adb = db; tokio::spawn(async move { async fn txnfn(_txn: &Transaction) -> FdbResult<()> { Ok(()) @@ -45,7 +45,7 @@ async fn do_trx() { .expect("failed to open fdb"), ); - let adb = db.clone(); + let adb = db; tokio::spawn(async move { adb.create_trx() .expect("failed to create trx") diff --git a/foundationdb/tests/watch.rs b/foundationdb/tests/watch.rs index d44a94dd..c4a982f4 100644 --- a/foundationdb/tests/watch.rs +++ b/foundationdb/tests/watch.rs @@ -17,7 +17,7 @@ fn test_watch() { } async fn test_watch_async() -> FdbResult<()> { - const KEY: &'static [u8] = b"test-watch"; + const KEY: &[u8] = b"test-watch"; let db = common::database().await?; @@ -40,7 +40,7 @@ async fn test_watch_async() -> FdbResult<()> { } async fn test_watch_without_commit_async() -> FdbResult<()> { - const KEY: &'static [u8] = b"test-watch-2"; + const KEY: &[u8] = b"test-watch-2"; let db = common::database().await?; diff --git a/scripts/run_bindingtester.sh b/scripts/run_bindingtester.sh index b0ff3085..ab729890 100755 --- a/scripts/run_bindingtester.sh +++ b/scripts/run_bindingtester.sh @@ -6,7 +6,7 @@ fdb_rs_dir=$(pwd) bindingtester="${fdb_rs_dir:?}/$1" case $(uname) in Darwin) -# brew install mono + brew install mono ;; Linux) sudo apt update @@ -22,7 +22,7 @@ esac cd ${fdb_builddir:?} ## Get foundationdb source -# git clone --depth 1 https://github.com/apple/foundationdb.git -b release-6.1 + git clone --depth 1 https://github.com/apple/foundationdb.git -b release-6.1 cd foundationdb git checkout release-6.1 @@ -32,7 +32,8 @@ esac ## Run the test echo "testers['rust'] = Tester('rust', '${bindingtester}', 2040, 23, MAX_API_VERSION, types=ALL_TYPES) " >> ./bindings/bindingtester/known_testers.py - ./bindings/bindingtester/bindingtester.py --test-name scripted rust - ./bindings/bindingtester/bindingtester.py --num-ops 1000 --api-version 610 --test-name api --compare python rust - ./bindings/bindingtester/bindingtester.py --num-ops 1000 --api-version 610 --test-name api --concurrency 5 rust + python2 ./bindings/bindingtester/bindingtester.py --test-name scripted rust + python2 ./bindings/bindingtester/bindingtester.py --num-ops 1000 --api-version 610 --test-name api --compare python rust + python2 ./bindings/bindingtester/bindingtester.py --num-ops 1000 --api-version 610 --test-name api --concurrency 5 rust + python2 ./bindings/bindingtester/bindingtester.py --num-ops 10000 --api-version 610 --test-name directory --concurrency 1 rust --no-directory-snapshot-ops --compare ) From a92fd9c9c92d57970f7a824fd66962f53f23573d Mon Sep 17 00:00:00 2001 From: Pierre Zemb Date: Sat, 26 Jun 2021 14:18:34 +0200 Subject: [PATCH 4/4] cargo clippy --- foundationdb-bindingtester/src/main.rs | 7 +-- foundationdb/examples/class-scheduling.rs | 12 ++-- foundationdb/src/api.rs | 7 +-- foundationdb/src/directory/directory_layer.rs | 14 ++--- .../src/directory/directory_partition.rs | 6 +- .../src/directory/directory_subspace.rs | 6 +- foundationdb/src/directory/mod.rs | 6 +- foundationdb/src/tuple/mod.rs | 60 ++++++++----------- foundationdb/src/tuple/pack.rs | 4 +- foundationdb/tests/hca.rs | 8 +-- foundationdb/tests/range.rs | 1 + 11 files changed, 58 insertions(+), 73 deletions(-) diff --git a/foundationdb-bindingtester/src/main.rs b/foundationdb-bindingtester/src/main.rs index 23219fca..6afeed32 100644 --- a/foundationdb-bindingtester/src/main.rs +++ b/foundationdb-bindingtester/src/main.rs @@ -1905,16 +1905,15 @@ impl StackMachine { self.directory_index ); } - Some(d) => match d { - DirectoryStackItem::Null => { + Some(d) => { + if let DirectoryStackItem::Null = d { self.directory_index = self.error_index; debug!( "setting directory_index to error index {}: because it is Null", self.directory_index ); } - _ => {} - }, + } } } diff --git a/foundationdb/examples/class-scheduling.rs b/foundationdb/examples/class-scheduling.rs index 1f412849..838c80b5 100644 --- a/foundationdb/examples/class-scheduling.rs +++ b/foundationdb/examples/class-scheduling.rs @@ -22,21 +22,21 @@ use foundationdb::{Database, FdbError, RangeOption, TransactError, TransactOptio type Result = std::result::Result; enum Error { - FdbError(FdbError), + Fdb(FdbError), NoRemainingSeats, TooManyClasses, } impl From for Error { fn from(err: FdbError) -> Self { - Error::FdbError(err) + Error::Fdb(err) } } impl TransactError for Error { fn try_into_fdb_error(self) -> std::result::Result { match self { - Error::FdbError(err) => Ok(err), + Error::Fdb(err) => Ok(err), _ => Err(self), } } @@ -223,10 +223,10 @@ async fn switch_classes( trx: &Transaction, student_id: &str, old_class: &str, - new_class: &str, + _new_class: &str, ) -> Result<()> { - ditch_trx(trx, student_id.clone(), old_class.clone()).await; - signup_trx(trx, student_id.clone(), new_class.clone()).await?; + ditch_trx(trx, <&str>::clone(&student_id), <&str>::clone(&old_class)).await; + signup_trx(trx, <&str>::clone(&student_id), <&str>::clone(&old_class)).await?; Ok(()) } diff --git a/foundationdb/src/api.rs b/foundationdb/src/api.rs index 93290756..1347ec7f 100644 --- a/foundationdb/src/api.rs +++ b/foundationdb/src/api.rs @@ -13,7 +13,6 @@ //! - [API versioning](https://apple.github.io/foundationdb/api-c.html#api-versioning) //! - [Network](https://apple.github.io/foundationdb/api-c.html#network) -use std::panic; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; @@ -61,9 +60,9 @@ impl FdbApiBuilder { /// /// This function will panic if called more than once pub fn build(self) -> FdbResult { - if VERSION_SELECTED.compare_and_swap(false, true, Ordering::AcqRel) { - panic!("the fdb select api version can only be run once per process"); - } + VERSION_SELECTED + .compare_exchange_weak(false, true, Ordering::AcqRel, Ordering::Acquire) + .expect("the fdb select api version can only be run once per process"); error::eval(unsafe { fdb_sys::fdb_select_api_version_impl( self.runtime_version, diff --git a/foundationdb/src/directory/directory_layer.rs b/foundationdb/src/directory/directory_layer.rs index 241e008b..43295880 100644 --- a/foundationdb/src/directory/directory_layer.rs +++ b/foundationdb/src/directory/directory_layer.rs @@ -413,14 +413,12 @@ impl DirectoryLayer { Some(fdb_key_value) => { let previous_prefix: Vec = self.node_subspace.unpack(fdb_key_value.key())?; - match previous_prefix.get(0) { - Some(Element::Bytes(b)) => { - let previous_prefix = b.to_vec(); - if key.starts_with(&previous_prefix) { - return Ok(Some(self.node_with_prefix(&previous_prefix))); - }; - } - _ => {} + + if let Some(Element::Bytes(b)) = previous_prefix.get(0) { + let previous_prefix = b.to_vec(); + if key.starts_with(&previous_prefix) { + return Ok(Some(self.node_with_prefix(&previous_prefix))); + }; }; } } diff --git a/foundationdb/src/directory/directory_partition.rs b/foundationdb/src/directory/directory_partition.rs index d253c3f4..37741635 100644 --- a/foundationdb/src/directory/directory_partition.rs +++ b/foundationdb/src/directory/directory_partition.rs @@ -83,7 +83,7 @@ impl DirectoryPartition { self.inner.directory_subspace.get_path() } - fn get_directory_layer_for_path(&self, path: &Vec) -> DirectoryLayer { + fn get_directory_layer_for_path(&self, path: &[String]) -> DirectoryLayer { if path.is_empty() { self.parent_directory_layer.clone() } else { @@ -100,7 +100,7 @@ impl DirectoryPartition { new_path.extend_from_slice( &self.directory_subspace.get_path()[directory_layer - .unwrap_or(self.directory_subspace.directory_layer.clone()) + .unwrap_or_else(|| self.directory_subspace.directory_layer.clone()) .path .len()..], ); @@ -167,7 +167,7 @@ impl Directory for DirectoryPartition { trx: &Transaction, new_path: Vec, ) -> Result { - let directory_layer = self.get_directory_layer_for_path(&vec![]); + let directory_layer = self.get_directory_layer_for_path(&[]); let directory_layer_path = directory_layer.path.to_owned(); if directory_layer_path.len() > new_path.len() { diff --git a/foundationdb/src/directory/directory_subspace.rs b/foundationdb/src/directory/directory_subspace.rs index 0a6bdcce..fcf63cce 100644 --- a/foundationdb/src/directory/directory_subspace.rs +++ b/foundationdb/src/directory/directory_subspace.rs @@ -52,7 +52,7 @@ impl DirectorySubspace { new_path.extend_from_slice( &self.path[directory_layer - .unwrap_or(self.directory_layer.clone()) + .unwrap_or_else(|| self.directory_layer.clone()) .path .len()..], ); @@ -99,7 +99,7 @@ impl DirectorySubspace { self.subspace.is_start_of(&key) } - fn get_directory_layer_for_path(&self, _: &Vec) -> DirectoryLayer { + fn get_directory_layer_for_path(&self, _: &[String]) -> DirectoryLayer { self.directory_layer.clone() } } @@ -171,7 +171,7 @@ impl Directory for DirectorySubspace { trx: &Transaction, new_path: Vec, ) -> Result { - let directory_layer = self.get_directory_layer_for_path(&vec![]); + let directory_layer = self.get_directory_layer_for_path(&[]); let directory_layer_path = directory_layer.path.to_owned(); if directory_layer_path.len() > new_path.len() { diff --git a/foundationdb/src/directory/mod.rs b/foundationdb/src/directory/mod.rs index fcee4d29..22c38301 100644 --- a/foundationdb/src/directory/mod.rs +++ b/foundationdb/src/directory/mod.rs @@ -364,9 +364,9 @@ mod tests { Vec::from("foundation database 7".as_bytes()) ); - assert_eq!(strinc(vec![61u8, 62u8, 255u8]), Vec::from(vec![61u8, 63u8])); + assert_eq!(strinc(vec![61u8, 62u8, 255u8]), vec![61u8, 63u8]); // from seed 3180880087 - assert_eq!(strinc(vec![253u8, 255u8]), Vec::from(vec![254u8])); - assert_eq!(strinc(vec![253u8, 255u8, 255u8]), Vec::from(vec![254u8])); + assert_eq!(strinc(vec![253u8, 255u8]), vec![254u8]); + assert_eq!(strinc(vec![253u8, 255u8, 255u8]), vec![254u8]); } } diff --git a/foundationdb/src/tuple/mod.rs b/foundationdb/src/tuple/mod.rs index 835a018a..5a4816be 100644 --- a/foundationdb/src/tuple/mod.rs +++ b/foundationdb/src/tuple/mod.rs @@ -349,67 +349,55 @@ mod tests { test_serde(i64::min_value(), b"\x0C\x7f\xff\xff\xff\xff\xff\xff\xff"); test_serde(9252427359321063944i128, b"\x1c\x80g9\xa9np\x02\x08"); - assert!( - match unpack::(b"\x1c\x80g9\xa9np\x02\x08").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x1c\x80g9\xa9np\x02\x08").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( -9252427359321063944i128, b"\x0c\x7f\x98\xc6V\x91\x8f\xfd\xf7", ); - assert!( - match unpack::(b"\x0c\x7f\x98\xc6V\x91\x8f\xfd\xf7").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x0c\x7f\x98\xc6V\x91\x8f\xfd\xf7").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( u64::max_value() as i128, b"\x1c\xff\xff\xff\xff\xff\xff\xff\xff", ); - assert!( - match unpack::(b"\x1c\xff\xff\xff\xff\xff\xff\xff\xff").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x1c\xff\xff\xff\xff\xff\xff\xff\xff").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( -(u64::max_value() as i128), b"\x0c\x00\x00\x00\x00\x00\x00\x00\x00", ); - assert!( - match unpack::(b"\x0c\x00\x00\x00\x00\x00\x00\x00\x00").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x0c\x00\x00\x00\x00\x00\x00\x00\x00").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( (i64::max_value() as i128) + 1, b"\x1c\x80\x00\x00\x00\x00\x00\x00\x00", ); - assert!( - match unpack::(b"\x1c\x80\x00\x00\x00\x00\x00\x00\x00").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x1c\x80\x00\x00\x00\x00\x00\x00\x00").unwrap_err(), + PackError::UnsupportedIntLength + )); test_serde( (i64::min_value() as i128) - 1, b"\x0c\x7f\xff\xff\xff\xff\xff\xff\xfe", ); - assert!( - match unpack::(b"\x0c\x7f\xff\xff\xff\xff\xff\xff\xfe").unwrap_err() { - PackError::UnsupportedIntLength => true, - _ => false, - } - ); + assert!(matches!( + unpack::(b"\x0c\x7f\xff\xff\xff\xff\xff\xff\xfe").unwrap_err(), + PackError::UnsupportedIntLength + )); } #[cfg(feature = "num-bigint")] diff --git a/foundationdb/src/tuple/pack.rs b/foundationdb/src/tuple/pack.rs index c6975fc1..19a92d1c 100644 --- a/foundationdb/src/tuple/pack.rs +++ b/foundationdb/src/tuple/pack.rs @@ -179,7 +179,7 @@ fn write_bytes(w: &mut W, v: &[u8]) -> io::Result(input: &'de [u8]) -> PackResult<(&'de [u8], Cow<'de, [u8]>)> { +fn parse_slice(input: &[u8]) -> PackResult<(&[u8], Cow<[u8]>)> { let mut bytes = Vec::new(); let mut pos = 0; for idx in memchr_iter(NIL, input) { @@ -203,7 +203,7 @@ fn parse_slice<'de>(input: &'de [u8]) -> PackResult<(&'de [u8], Cow<'de, [u8]>)> Err(PackError::MissingBytes) } -fn parse_string<'de>(input: &'de [u8]) -> PackResult<(&'de [u8], Cow<'de, str>)> { +fn parse_string(input: &[u8]) -> PackResult<(&[u8], Cow)> { let (input, slice) = parse_slice(input)?; Ok(( input, diff --git a/foundationdb/tests/hca.rs b/foundationdb/tests/hca.rs index bf37b7c1..b273910d 100644 --- a/foundationdb/tests/hca.rs +++ b/foundationdb/tests/hca.rs @@ -39,9 +39,9 @@ async fn test_hca_many_sequential_allocations_async() -> FdbResult<()> { let mut all_ints = Vec::new(); for _ in 0..N { - let mut tx = db.create_trx()?; + let tx = db.create_trx()?; - let next_int: i64 = hca.allocate(&mut tx).await.unwrap(); + let next_int: i64 = hca.allocate(&tx).await.unwrap(); all_ints.push(next_int); tx.commit().await?; @@ -84,8 +84,8 @@ async fn test_hca_concurrent_allocations_async() -> FdbResult<()> { Ok(()) } -fn check_hca_result_uniqueness(results: &Vec) { - let result_set: HashSet = HashSet::from_iter(results.clone()); +fn check_hca_result_uniqueness(results: &[i64]) { + let result_set: HashSet = HashSet::from_iter(results.to_owned()); if results.len() != result_set.len() { panic!( diff --git a/foundationdb/tests/range.rs b/foundationdb/tests/range.rs index 2af5d354..b74453a9 100644 --- a/foundationdb/tests/range.rs +++ b/foundationdb/tests/range.rs @@ -20,6 +20,7 @@ fn test_range() { futures::executor::block_on(test_get_ranges_async()).expect("failed to run"); } +#[allow(clippy::needless_collect)] async fn test_get_range_async() -> FdbResult<()> { const N: usize = 10000;