Skip to content

Commit bdcb372

Browse files
committed
feat(cubestore): Queue - optimize ADD (RocksTanle->insert_if_not_exists)
1 parent 066045e commit bdcb372

File tree

3 files changed

+60
-28
lines changed

3 files changed

+60
-28
lines changed

rust/cubestore/cubestore/benches/cachestore_queue.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,23 @@ async fn do_insert(
5555
}
5656
}
5757

58-
fn do_insert_bench(c: &mut Criterion, runtime: &Runtime, total: usize, size_kb: usize) {
58+
fn do_insert_bench(c: &mut Criterion, runtime: &Runtime, total: usize, size_kb: usize, do_duplicates: bool) {
5959
let cachestore = runtime.block_on(async {
60-
prepare_cachestore(&format!("cachestore_queue_add_{}", size_kb)).unwrap()
60+
prepare_cachestore(&format!("cachestore_queue_add_{}_{}", size_kb, do_duplicates)).unwrap()
6161
});
6262

6363
c.bench_with_input(
64-
BenchmarkId::new(format!("queue_add queues:1, size:{} kb", size_kb), total),
64+
BenchmarkId::new(format!("queue_add duplicates_check: {} queues:1, size:{} kb", do_duplicates, size_kb), total),
6565
&(total, size_kb),
6666
|b, (total, size_kb)| {
6767
let mut insert_id_padding = 0;
6868

6969
b.to_async(runtime).iter(|| {
7070
let prev_value = insert_id_padding.clone();
71-
insert_id_padding += total;
71+
72+
if !do_duplicates {
73+
insert_id_padding += total;
74+
}
7275

7376
do_insert(
7477
&cachestore,
@@ -190,9 +193,11 @@ fn do_get_bench(
190193
fn do_benches(c: &mut Criterion) {
191194
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
192195

193-
do_insert_bench(c, &runtime, 512, 64);
194-
do_insert_bench(c, &runtime, 512, 256);
195-
do_insert_bench(c, &runtime, 512, 512);
196+
do_insert_bench(c, &runtime, 512, 64, false);
197+
do_insert_bench(c, &runtime, 512, 256, false);
198+
do_insert_bench(c, &runtime, 512, 512, false);
199+
200+
do_insert_bench(c, &runtime, 1_000, 64, true);
196201

197202
do_list_bench(c, &runtime, Some(QueueItemStatus::Pending), 1_000, 128, 128);
198203
do_list_bench(c, &runtime, Some(QueueItemStatus::Active), 1_000, 128, 128);

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,33 +1071,31 @@ impl CacheStore for RocksCacheStore {
10711071
&QueueItemRocksIndex::ByPrefixAndStatus,
10721072
)?;
10731073

1074-
let index_key = QueueItemIndexKey::ByPath(payload.path.clone());
1075-
let id_row_opt = queue_schema
1076-
.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?;
1077-
1078-
let (id, added) = if let Some(row) = id_row_opt {
1079-
(row.id, false)
1080-
} else {
1081-
let queue_item_row = queue_schema.insert(
1074+
let (id, added) = {
1075+
let (queue_item_row, added) = queue_schema.insert_if_not_exists(
1076+
&QueueItemIndexKey::ByPath(payload.path.clone()),
1077+
&QueueItemRocksIndex::ByPath,
10821078
QueueItem::new(
10831079
payload.path,
10841080
QueueItem::status_default(),
10851081
payload.priority,
10861082
payload.orphaned.clone(),
10871083
),
1088-
batch_pipe,
1084+
batch_pipe
10891085
)?;
10901086

1091-
let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone());
1092-
queue_payload_schema.insert_with_pk(
1093-
queue_item_row.id,
1094-
QueueItemPayload::new(
1095-
payload.value,
1096-
queue_item_row.row.get_created().clone(),
1097-
queue_item_row.row.get_expire().clone(),
1098-
),
1099-
batch_pipe,
1100-
)?;
1087+
if added {
1088+
let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone());
1089+
queue_payload_schema.insert_with_pk(
1090+
queue_item_row.id,
1091+
QueueItemPayload::new(
1092+
payload.value,
1093+
queue_item_row.row.get_created().clone(),
1094+
queue_item_row.row.get_expire().clone(),
1095+
),
1096+
batch_pipe,
1097+
)?;
1098+
}
11011099

11021100
(queue_item_row.id, true)
11031101
};

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,12 +487,19 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
487487
row_id: Option<u64>,
488488
row: Self::T,
489489
batch_pipe: &mut BatchPipe,
490+
secondary_index_id_to_ignore: Option<u32>,
490491
) -> Result<IdRow<Self::T>, CubeError> {
491492
let mut ser = flexbuffers::FlexbufferSerializer::new();
492493
row.serialize(&mut ser).unwrap();
493494
let serialized_row = ser.take_buffer();
494495

495496
for index in Self::indexes().iter() {
497+
if let Some(index_id_to_ignore) = secondary_index_id_to_ignore {
498+
if index.get_id() == index_id_to_ignore {
499+
continue;
500+
}
501+
}
502+
496503
if index.is_unique() {
497504
let hash = index.key_hash(&row);
498505
let index_val = index.index_key_by(&row);
@@ -543,15 +550,37 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
543550
row: Self::T,
544551
batch_pipe: &mut BatchPipe,
545552
) -> Result<IdRow<Self::T>, CubeError> {
546-
self.do_insert(Some(row_id), row, batch_pipe)
553+
self.do_insert(Some(row_id), row, batch_pipe, None)
547554
}
548555

549556
fn insert(
550557
&self,
551558
row: Self::T,
552559
batch_pipe: &mut BatchPipe,
553560
) -> Result<IdRow<Self::T>, CubeError> {
554-
self.do_insert(None, row, batch_pipe)
561+
self.do_insert(None, row, batch_pipe, None)
562+
}
563+
564+
/// Special optimized insert method that checks if the row already exists in the secondary index.
565+
/// If it exists, it returns the existing row without inserting a new one.
566+
///
567+
/// The idea is to skip check if the row already exists in the secondary index while inserting.
568+
fn insert_if_not_exists<K: Debug + Hash>(
569+
&self,
570+
row_key: &K,
571+
secondary_index: &impl RocksSecondaryIndex<Self::T, K>,
572+
row: Self::T,
573+
batch_pipe: &mut BatchPipe,
574+
) -> Result<(IdRow<Self::T>, bool), CubeError> {
575+
let id_row_opt = self.get_single_opt_row_by_index(row_key, secondary_index)?;
576+
577+
if let Some(row) = id_row_opt {
578+
Ok((row, false))
579+
} else {
580+
let row = self.do_insert(None, row, batch_pipe, Some(RocksSecondaryIndex::get_id(secondary_index)))?;
581+
582+
Ok((row, true))
583+
}
555584
}
556585

557586
fn migrate(&self) -> Result<(), CubeError> {

0 commit comments

Comments
 (0)