Skip to content

Commit 7987b75

Browse files
authored
feat(cubestore): Rockstore - optimize index scanning (#9728)
queue_add queues:1, size:64 kb/512: time: [644.70 ms 648.88 ms 653.45 ms] -> [547.18 ms 558.58 ms 571.78 ms] queue_add queues:1, size:256 kb/512 time: [334.21 ms 336.78 ms 339.56 ms] -> [286.95 ms 291.70 ms 297.12 ms] queue_add queues:1, size:512 kb/512 time: [363.73 ms 367.60 ms 371.73 ms] -> [304.42 ms 310.45 ms 317.43 ms]
1 parent 6e73860 commit 7987b75

File tree

4 files changed

+37
-54
lines changed

4 files changed

+37
-54
lines changed

rust/cubestore/cubestore/src/cachestore/cache_eviction_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,7 @@ impl CacheEvictionManager {
958958
cache_schema.update_extended_ttl_secondary_index(
959959
row_id,
960960
&CacheItemRocksIndex::ByPath,
961-
item.key_hash.to_vec(),
961+
item.key_hash,
962962
RocksSecondaryIndexValueTTLExtended {
963963
lfu: item.lfu,
964964
lru: item.lru.decode_value_as_opt_datetime()?,

rust/cubestore/cubestore/src/cachestore/compaction.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ mod tests {
362362
let index = CacheItemRocksIndex::ByPath;
363363
let key = RowKey::SecondaryIndex(
364364
CacheItemRocksTable::index_id(index.get_id()),
365-
index.key_hash(&row).to_be_bytes().to_vec(),
365+
index.key_hash(&row).to_be_bytes(),
366366
1,
367367
);
368368

@@ -386,7 +386,7 @@ mod tests {
386386
let index = CacheItemRocksIndex::ByPath;
387387
let key = RowKey::SecondaryIndex(
388388
CacheItemRocksTable::index_id(index.get_id()),
389-
index.key_hash(&row).to_be_bytes().to_vec(),
389+
index.key_hash(&row).to_be_bytes(),
390390
1,
391391
);
392392

@@ -410,11 +410,11 @@ mod tests {
410410
let index = CacheItemRocksIndex::ByPath;
411411
let key = RowKey::SecondaryIndex(
412412
CacheItemRocksTable::index_id(index.get_id()),
413-
index.key_hash(&row).to_be_bytes().to_vec(),
413+
index.key_hash(&row).to_be_bytes(),
414414
1,
415415
);
416416

417-
// Indexes with TTL use new format (v2) for indexes, but index migration doesnt skip
417+
// Indexes with TTL use a new format (v2) for indexes, but index migration doesn't skip
418418
// compaction for old rows
419419
let index_value = RocksSecondaryIndexValue::Hash("kek".as_bytes())
420420
.to_bytes(RocksSecondaryIndexValueVersion::OnlyHash)

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
1919
use serde_repr::*;
2020
use std::collections::HashMap;
2121
use std::fmt::Debug;
22-
use std::io::{Cursor, Write};
22+
use std::io::{Cursor, Read, Write};
2323

2424
use crate::metastore::snapshot_info::SnapshotInfo;
2525
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
@@ -122,7 +122,7 @@ pub fn get_fixed_prefix() -> usize {
122122
13
123123
}
124124

125-
pub type SecondaryKey = Vec<u8>;
125+
pub type SecondaryKeyHash = [u8; 8];
126126
pub type IndexId = u32;
127127

128128
#[derive(Clone)]
@@ -378,7 +378,7 @@ impl<'a> RocksSecondaryIndexValue<'a> {
378378
pub enum RowKey {
379379
Table(TableId, /** row_id */ u64),
380380
Sequence(TableId),
381-
SecondaryIndex(IndexId, SecondaryKey, /** row_id */ u64),
381+
SecondaryIndex(IndexId, SecondaryKeyHash, /** row_id */ u64),
382382
SecondaryIndexInfo { index_id: IndexId },
383383
TableInfo { table_id: TableId },
384384
}
@@ -421,11 +421,10 @@ impl RowKey {
421421
)?)),
422422
3 => {
423423
let table_id = IndexId::from(reader.read_u32::<BigEndian>()?);
424-
let mut secondary_key: SecondaryKey = SecondaryKey::new();
425-
let sc_length = bytes.len() - 13;
426-
for _i in 0..sc_length {
427-
secondary_key.push(reader.read_u8()?);
428-
}
424+
425+
let mut secondary_key: SecondaryKeyHash = [0_u8; 8];
426+
reader.read_exact(&mut secondary_key)?;
427+
429428
let row_id = reader.read_u64::<BigEndian>()?;
430429

431430
Ok(RowKey::SecondaryIndex(table_id, secondary_key, row_id))

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 25 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::metastore::rocks_store::TableId;
22
use crate::metastore::{
33
get_fixed_prefix, BatchPipe, DbTableRef, IdRow, IndexId, KeyVal, MemorySequence,
44
MetaStoreEvent, RocksSecondaryIndexValue, RocksSecondaryIndexValueTTLExtended,
5-
RocksSecondaryIndexValueVersion, RocksTableStats, RowKey, SecondaryIndexInfo, SecondaryKey,
5+
RocksSecondaryIndexValueVersion, RocksTableStats, RowKey, SecondaryIndexInfo, SecondaryKeyHash,
66
TableInfo,
77
};
88
use crate::CubeError;
@@ -303,7 +303,7 @@ pub struct IndexScanIter<'a, RT: RocksTable + ?Sized> {
303303
table: &'a RT,
304304
index_id: u32,
305305
secondary_key_val: Vec<u8>,
306-
secondary_key_hash: Vec<u8>,
306+
secondary_key_hash: SecondaryKeyHash,
307307
iter: DBIterator<'a>,
308308
}
309309

@@ -364,7 +364,7 @@ where
364364
#[derive(Debug)]
365365
pub struct SecondaryIndexValueScanIterItem {
366366
pub row_id: u64,
367-
pub key_hash: SecondaryKey,
367+
pub key_hash: SecondaryKeyHash,
368368
pub ttl: Option<DateTime<Utc>>,
369369
pub extended: Option<RocksSecondaryIndexValueTTLExtended>,
370370
}
@@ -496,11 +496,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
496496
if index.is_unique() {
497497
let hash = index.key_hash(&row);
498498
let index_val = index.index_key_by(&row);
499-
let existing_keys = self.get_row_ids_from_index(
500-
index.get_id(),
501-
&index_val,
502-
&hash.to_be_bytes().to_vec(),
503-
)?;
499+
let existing_keys =
500+
self.get_row_ids_from_index(index.get_id(), &index_val, hash.to_be_bytes())?;
504501
if existing_keys.len() > 0 {
505502
return Err(CubeError::user(
506503
format!(
@@ -759,7 +756,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
759756
let existing_keys = self.get_row_ids_from_index(
760757
RocksSecondaryIndex::get_id(secondary_index),
761758
&index_val,
762-
&hash.to_be_bytes().to_vec(),
759+
hash.to_be_bytes(),
763760
)?;
764761

765762
Ok(existing_keys)
@@ -832,8 +829,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
832829
K: Hash,
833830
{
834831
let row_ids = self.get_row_ids_by_index(row_key, secondary_index)?;
835-
836-
let mut res = Vec::new();
832+
let mut res = Vec::with_capacity(row_ids.len());
837833

838834
for id in row_ids {
839835
if let Some(row) = self.get_row(id)? {
@@ -969,7 +965,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
969965
&self,
970966
row_id: u64,
971967
secondary_index: &'a impl RocksSecondaryIndex<Self::T, K>,
972-
secondary_key_hash: SecondaryKey,
968+
secondary_key_hash: SecondaryKeyHash,
973969
extended: RocksSecondaryIndexValueTTLExtended,
974970
batch_pipe: &mut BatchPipe,
975971
) -> Result<bool, CubeError>
@@ -1141,11 +1137,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
11411137
) -> KeyVal {
11421138
let hash = index.key_hash(row);
11431139
let index_val = index.index_value(row);
1144-
let key = RowKey::SecondaryIndex(
1145-
Self::index_id(index.get_id()),
1146-
hash.to_be_bytes().to_vec(),
1147-
row_id,
1148-
);
1140+
let key =
1141+
RowKey::SecondaryIndex(Self::index_id(index.get_id()), hash.to_be_bytes(), row_id);
11491142

11501143
KeyVal {
11511144
key: key.to_bytes(),
@@ -1157,11 +1150,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
11571150
let mut res = Vec::new();
11581151
for index in Self::indexes().iter() {
11591152
let hash = index.key_hash(&row);
1160-
let key = RowKey::SecondaryIndex(
1161-
Self::index_id(index.get_id()),
1162-
hash.to_be_bytes().to_vec(),
1163-
row_id,
1164-
);
1153+
let key =
1154+
RowKey::SecondaryIndex(Self::index_id(index.get_id()), hash.to_be_bytes(), row_id);
11651155
res.push(KeyVal {
11661156
key: key.to_bytes(),
11671157
val: vec![],
@@ -1247,17 +1237,17 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
12471237
&self,
12481238
secondary_id: u32,
12491239
secondary_key_val: &Vec<u8>,
1250-
secondary_key_hash: &Vec<u8>,
1240+
secondary_key_hash: SecondaryKeyHash,
12511241
) -> Result<Vec<u64>, CubeError> {
12521242
let ref db = self.snapshot();
12531243
let key_len = secondary_key_hash.len();
1254-
let key_min =
1255-
RowKey::SecondaryIndex(Self::index_id(secondary_id), secondary_key_hash.clone(), 0);
1244+
let key_min = RowKey::SecondaryIndex(Self::index_id(secondary_id), secondary_key_hash, 0);
12561245

12571246
let mut res: Vec<u64> = Vec::new();
12581247

12591248
let mut opts = ReadOptions::default();
12601249
opts.set_prefix_same_as_start(true);
1250+
12611251
let iter = db.iterator_opt(
12621252
IteratorMode::From(&key_min.to_bytes()[0..(key_len + 5)], Direction::Forward),
12631253
opts,
@@ -1269,10 +1259,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
12691259
if let RowKey::SecondaryIndex(_, secondary_index_hash, row_id) =
12701260
RowKey::from_bytes(&key)
12711261
{
1272-
if !secondary_index_hash
1273-
.iter()
1274-
.zip(secondary_key_hash)
1275-
.all(|(a, b)| a == b)
1262+
if secondary_index_hash.len() != secondary_key_hash.len()
1263+
|| secondary_index_hash != secondary_key_hash
12761264
{
12771265
break;
12781266
}
@@ -1284,9 +1272,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
12841272
RocksSecondaryIndexValue::HashAndTTLExtended(h, expire, _) => (h, expire),
12851273
};
12861274

1287-
if secondary_key_val.len() != hash.len()
1288-
|| !hash.iter().zip(secondary_key_val).all(|(a, b)| a == b)
1289-
{
1275+
if hash.len() != secondary_key_val.len() || hash != secondary_key_val.as_slice() {
12901276
continue;
12911277
}
12921278

@@ -1341,8 +1327,9 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
13411327
batch: &mut WriteBatch,
13421328
) -> Result<u64, CubeError> {
13431329
let ref db = self.snapshot();
1344-
let zero_vec = vec![0 as u8; 8];
1345-
let key_min = RowKey::SecondaryIndex(Self::index_id(secondary_id), zero_vec.clone(), 0);
1330+
1331+
let zero_vec = [0 as u8; 8];
1332+
let key_min = RowKey::SecondaryIndex(Self::index_id(secondary_id), zero_vec, 0);
13461333

13471334
let mut opts = ReadOptions::default();
13481335
opts.set_prefix_same_as_start(false);
@@ -1408,7 +1395,8 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
14081395
let ref db = self.snapshot();
14091396

14101397
let index_id = RocksSecondaryIndex::get_id(secondary_index);
1411-
let row_key = RowKey::SecondaryIndex(Self::index_id(index_id), vec![], 0);
1398+
let zero_vec = [0 as u8; 8];
1399+
let row_key = RowKey::SecondaryIndex(Self::index_id(index_id), zero_vec, 0);
14121400

14131401
let mut opts = ReadOptions::default();
14141402
opts.set_prefix_same_as_start(false);
@@ -1433,16 +1421,12 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
14331421
{
14341422
let ref db = self.snapshot();
14351423

1436-
let secondary_key_hash = secondary_index
1437-
.typed_key_hash(&row_key)
1438-
.to_be_bytes()
1439-
.to_vec();
1424+
let secondary_key_hash = secondary_index.typed_key_hash(&row_key).to_be_bytes() as [u8; 8];
14401425
let secondary_key_val = secondary_index.key_to_bytes(&row_key);
14411426

14421427
let index_id = RocksSecondaryIndex::get_id(secondary_index);
14431428
let key_len = secondary_key_hash.len();
1444-
let key_min =
1445-
RowKey::SecondaryIndex(Self::index_id(index_id), secondary_key_hash.clone(), 0);
1429+
let key_min = RowKey::SecondaryIndex(Self::index_id(index_id), secondary_key_hash, 0);
14461430

14471431
let mut opts = ReadOptions::default();
14481432
opts.set_prefix_same_as_start(true);

0 commit comments

Comments
 (0)