Skip to content

Rewrite block index #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 9, 2024
70 changes: 48 additions & 22 deletions benches/lsmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,62 @@ fn memtable_get_upper_bound(c: &mut Criterion) {
}

fn tli_find_item(c: &mut Criterion) {
use lsm_tree::segment::block_index::top_level::{BlockHandleBlockHandle, TopLevelIndex};
use lsm_tree::segment::block_index::{
block_handle::KeyedBlockHandle, top_level::TopLevelIndex,
};

let mut group = c.benchmark_group("TLI find item");

for item_count in [10u64, 100, 1_000, 10_000, 100_000, 1_000_000] {
let tree = {
let mut tree = std::collections::BTreeMap::new();
let items = {
let mut items = Vec::with_capacity(item_count as usize);

for x in 0..item_count {
tree.insert(
x.to_be_bytes().into(),
BlockHandleBlockHandle { offset: 0, size: 0 },
);
items.push(KeyedBlockHandle {
start_key: x.to_be_bytes().into(),
offset: x,
size: 0,
});
}

tree
items
};

let index = TopLevelIndex::from_tree(tree);
let index = TopLevelIndex::from_boxed_slice(items.into());

group.bench_function(format!("TLI find ({item_count} items)"), |b| {
let key = (item_count / 10 * 6).to_be_bytes();
let expected: Arc<[u8]> = (item_count / 10 * 6 + 1).to_be_bytes().into();
group.bench_function(
format!("TLI get_next_block_handle ({item_count} items)"),
|b| {
let key = (item_count / 10 * 6).to_be_bytes();
let expected: Arc<[u8]> = (item_count / 10 * 6 + 1).to_be_bytes().into();

b.iter(|| {
assert_eq!(&expected, index.get_next_block_handle(&key).unwrap().0);
})
});
let block = index.get_lowest_block_containing_item(&key).unwrap();

b.iter(|| {
assert_eq!(
expected,
index.get_next_block_handle(block.offset).unwrap().start_key
);
})
},
);

group.bench_function(
format!("TLI get_block_containing_item ({item_count} items)"),
|b| {
let key = (item_count / 10 * 6).to_be_bytes();

b.iter(|| {
assert_eq!(
key,
&*index
.get_lowest_block_containing_item(&key)
.unwrap()
.start_key
);
})
},
);
}
}

Expand Down Expand Up @@ -104,27 +132,25 @@ fn value_block_size(c: &mut Criterion) {
}

fn value_block_size_find(c: &mut Criterion) {
use lsm_tree::segment::{
block_index::block_handle::BlockHandle, block_index::BlockHandleBlock,
};
use lsm_tree::segment::block_index::{block_handle::KeyedBlockHandle, IndexBlock};

let mut group = c.benchmark_group("Find item in BlockHandleBlock");

// NOTE: Anything above 1000 is unlikely
for item_count in [10, 100, 500, 1_000] {
group.bench_function(format!("{item_count} items"), |b| {
let items = (0u64..item_count)
.map(|x| BlockHandle {
.map(|x| KeyedBlockHandle {
start_key: x.to_be_bytes().into(),
offset: 56,
size: 635,
})
.collect();

let block = BlockHandleBlock { items, crc: 0 };
let block = IndexBlock { items, crc: 0 };
let key = &0u64.to_be_bytes();

b.iter(|| block.get_block_containing_item(key))
b.iter(|| block.get_lowest_block_containing_item(key))
});
}
}
Expand Down
55 changes: 26 additions & 29 deletions src/block_cache.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::segment::block_index::block_handle::BlockHandle;
use crate::segment::id::GlobalSegmentId;
use crate::segment::{block::ValueBlock, block_index::BlockHandleBlock};
use crate::{
either::{
Either,
Either::{Left, Right},
},
value::UserKey,
use crate::either::{
Either,
Either::{Left, Right},
};
use crate::segment::block_index::block_handle::KeyedBlockHandle;
use crate::segment::id::GlobalSegmentId;
use crate::segment::{block::ValueBlock, block_index::IndexBlock};
use quick_cache::Weighter;
use quick_cache::{sync::Cache, Equivalent};
use std::sync::Arc;
Expand All @@ -18,27 +15,27 @@ enum BlockTag {
Index = 1,
}

type Item = Either<Arc<ValueBlock>, Arc<BlockHandleBlock>>;
type Item = Either<Arc<ValueBlock>, Arc<IndexBlock>>;

// (Type (disk or index), Segment ID, Block key)
// (Type (disk or index), Segment ID, Block offset)
#[derive(Eq, std::hash::Hash, PartialEq)]
struct CacheKey((BlockTag, GlobalSegmentId, UserKey));
struct CacheKey((BlockTag, GlobalSegmentId, u64));

impl From<(BlockTag, GlobalSegmentId, UserKey)> for CacheKey {
fn from(value: (BlockTag, GlobalSegmentId, UserKey)) -> Self {
impl From<(BlockTag, GlobalSegmentId, u64)> for CacheKey {
fn from(value: (BlockTag, GlobalSegmentId, u64)) -> Self {
Self(value)
}
}

impl std::ops::Deref for CacheKey {
type Target = (BlockTag, GlobalSegmentId, UserKey);
type Target = (BlockTag, GlobalSegmentId, u64);

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Equivalent<CacheKey> for (BlockTag, GlobalSegmentId, &UserKey) {
impl Equivalent<CacheKey> for (BlockTag, GlobalSegmentId, &u64) {
fn equivalent(&self, key: &CacheKey) -> bool {
let inner = &**key;
self.0 == inner.0 && self.1 == inner.1 && self.2 == &inner.2
Expand All @@ -57,7 +54,7 @@ impl Weighter<CacheKey, Item> for BlockWeighter {
Either::Right(block) => block
.items
.iter()
.map(|x| x.start_key.len() + std::mem::size_of::<BlockHandle>())
.map(|x| x.start_key.len() + std::mem::size_of::<KeyedBlockHandle>())
.sum::<usize>() as u32,
}
}
Expand Down Expand Up @@ -124,25 +121,25 @@ impl BlockCache {
pub fn insert_disk_block(
&self,
segment_id: GlobalSegmentId,
key: UserKey,
offset: u64,
value: Arc<ValueBlock>,
) {
if self.capacity > 0 {
self.data
.insert((BlockTag::Data, segment_id, key).into(), Left(value));
.insert((BlockTag::Data, segment_id, offset).into(), Left(value));
}
}

#[doc(hidden)]
pub fn insert_block_handle_block(
pub fn insert_index_block(
&self,
segment_id: GlobalSegmentId,
key: UserKey,
value: Arc<BlockHandleBlock>,
offset: u64,
value: Arc<IndexBlock>,
) {
if self.capacity > 0 {
self.data
.insert((BlockTag::Index, segment_id, key).into(), Right(value));
.insert((BlockTag::Index, segment_id, offset).into(), Right(value));
}
}

Expand All @@ -151,21 +148,21 @@ impl BlockCache {
pub fn get_disk_block(
&self,
segment_id: GlobalSegmentId,
key: &UserKey,
offset: u64,
) -> Option<Arc<ValueBlock>> {
let key = (BlockTag::Data, segment_id, key);
let key = (BlockTag::Data, segment_id, &offset);
let item = self.data.get(&key)?;
Some(item.left().clone())
}

#[doc(hidden)]
#[must_use]
pub fn get_block_handle_block(
pub fn get_index_block(
&self,
segment_id: GlobalSegmentId,
key: &UserKey,
) -> Option<Arc<BlockHandleBlock>> {
let key = (BlockTag::Index, segment_id, key);
offset: u64,
) -> Option<Arc<IndexBlock>> {
let key = (BlockTag::Index, segment_id, &offset);
let item = self.data.get(&key)?;
Some(item.right().clone())
}
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,12 @@ mod tests {
id,
file_size: 1,
compression: crate::segment::meta::CompressionType::Lz4,
table_type: crate::segment::meta::TableType::Block,
item_count: 0,
key_count: 0,
key_range: KeyRange::new((vec![].into(), vec![].into())),
tombstone_count: 0,
range_tombstone_count: 0,
uncompressed_size: 0,
seqnos: (0, created_at as u64),
},
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/levelled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,12 @@ mod tests {
id,
file_size: size,
compression: crate::segment::meta::CompressionType::Lz4,
table_type: crate::segment::meta::TableType::Block,
item_count: 0,
key_count: 0,
key_range,
tombstone_count: 0,
range_tombstone_count: 0,
uncompressed_size: 0,
seqnos: (0, 0),
},
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ mod tests {
id,
file_size: 1,
compression: crate::segment::meta::CompressionType::Lz4,
table_type: crate::segment::meta::TableType::Block,
item_count: 0,
key_count: 0,
key_range: KeyRange::new((vec![].into(), vec![].into())),
tombstone_count: 0,
range_tombstone_count: 0,
uncompressed_size: 0,
seqnos: (0, 0),
},
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ mod tests {
id,
file_size: size_mib * 1_024 * 1_024,
compression: crate::segment::meta::CompressionType::Lz4,
table_type: crate::segment::meta::TableType::Block,
item_count: 0,
key_count: 0,
key_range: KeyRange::new((vec![].into(), vec![].into())),
tombstone_count: 0,
range_tombstone_count: 0,
uncompressed_size: size_mib * 1_024 * 1_024,
seqnos: (0, max_seqno),
},
Expand Down
13 changes: 11 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
descriptor_table::FileDescriptorTable,
segment::meta::CompressionType,
segment::meta::{CompressionType, TableType},
serde::{Deserializable, Serializable},
BlockCache, DeserializeError, SerializeError, Tree,
};
Expand Down Expand Up @@ -66,6 +66,8 @@ pub struct PersistedConfig {

/// What type of compression is used
compression: CompressionType,

table_type: TableType,
}

const DEFAULT_FILE_FOLDER: &str = ".lsm.data";
Expand All @@ -78,6 +80,7 @@ impl Default for PersistedConfig {
level_ratio: 8,
r#type: TreeType::Standard,
compression: CompressionType::Lz4,
table_type: TableType::Block,
}
}
}
Expand All @@ -86,6 +89,7 @@ impl Serializable for PersistedConfig {
fn serialize<W: Write>(&self, writer: &mut W) -> Result<(), SerializeError> {
writer.write_u8(self.r#type.into())?;
writer.write_u8(self.compression.into())?;
writer.write_u8(self.table_type.into())?;
writer.write_u32::<BigEndian>(self.block_size)?;
writer.write_u8(self.level_count)?;
writer.write_u8(self.level_ratio)?;
Expand All @@ -101,13 +105,17 @@ impl Deserializable for PersistedConfig {
let compression = reader.read_u8()?;
let compression = CompressionType::try_from(compression).expect("invalid compression type");

let table_type = reader.read_u8()?;
let table_type = TableType::try_from(table_type).expect("invalid table type");

let block_size = reader.read_u32::<BigEndian>()?;
let level_count = reader.read_u8()?;
let level_ratio = reader.read_u8()?;

Ok(Self {
r#type: tree_type,
compression,
table_type,
block_size,
level_count,
level_ratio,
Expand Down Expand Up @@ -150,7 +158,7 @@ impl Default for Config {
impl Config {
/// Initializes a new config
pub fn new<P: AsRef<Path>>(path: P) -> Self {
let inner = Default::default();
let inner = PersistedConfig::default();

Self {
inner,
Expand Down Expand Up @@ -250,6 +258,7 @@ mod tests {
let config = PersistedConfig {
block_size: 4_096,
compression: CompressionType::Lz4,
table_type: TableType::Block,
level_count: 7,
level_ratio: 8,
r#type: TreeType::Standard,
Expand Down
6 changes: 3 additions & 3 deletions src/disk_block.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::serde::{Deserializable, DeserializeError, Serializable, SerializeError};
use byteorder::{BigEndian, ReadBytesExt};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use lz4_flex::{compress_prepend_size, decompress_size_prepended};
use std::io::{Cursor, Read, Write};

Expand Down Expand Up @@ -73,13 +73,13 @@ impl<T: Clone + Serializable + Deserializable> DiskBlock<T> {
impl<T: Clone + Serializable + Deserializable> Serializable for DiskBlock<T> {
fn serialize<W: Write>(&self, writer: &mut W) -> Result<(), SerializeError> {
// Write CRC
writer.write_all(&self.crc.to_be_bytes())?;
writer.write_u32::<BigEndian>(self.crc)?;

// Write number of items

// NOTE: Truncation is okay and actually needed
#[allow(clippy::cast_possible_truncation)]
writer.write_all(&(self.items.len() as u32).to_be_bytes())?;
writer.write_u32::<BigEndian>(self.items.len() as u32)?;

// Serialize each value
for value in self.items.iter() {
Expand Down
2 changes: 2 additions & 0 deletions src/levels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,10 +390,12 @@ mod tests {
id,
file_size: 0,
compression: crate::segment::meta::CompressionType::Lz4,
table_type: crate::segment::meta::TableType::Block,
item_count: 0,
key_count: 0,
key_range,
tombstone_count: 0,
range_tombstone_count: 0,
uncompressed_size: 0,
seqnos: (0, 0),
},
Expand Down
Loading