Skip to content

Commit a38452f

Browse files
committed
feat(cubestore): Queue - optimize ADD (RocksTanle->insert_if_not_exists)
1 parent 7987b75 commit a38452f

File tree

3 files changed

+81
-29
lines changed

3 files changed

+81
-29
lines changed

rust/cubestore/cubestore/benches/cachestore_queue.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,39 @@ async fn do_insert(
5555
}
5656
}
5757

58-
fn do_insert_bench(c: &mut Criterion, runtime: &Runtime, total: usize, size_kb: usize) {
58+
fn do_insert_bench(
59+
c: &mut Criterion,
60+
runtime: &Runtime,
61+
total: usize,
62+
size_kb: usize,
63+
do_duplicates: bool,
64+
) {
5965
let cachestore = runtime.block_on(async {
60-
prepare_cachestore(&format!("cachestore_queue_add_{}", size_kb)).unwrap()
66+
prepare_cachestore(&format!(
67+
"cachestore_queue_add_{}_{}",
68+
size_kb, do_duplicates
69+
))
70+
.unwrap()
6171
});
6272

6373
c.bench_with_input(
64-
BenchmarkId::new(format!("queue_add queues:1, size:{} kb", size_kb), total),
74+
BenchmarkId::new(
75+
format!(
76+
"queue_add duplicates_check: {} queues:1, size:{} kb",
77+
do_duplicates, size_kb
78+
),
79+
total,
80+
),
6581
&(total, size_kb),
6682
|b, (total, size_kb)| {
6783
let mut insert_id_padding = 0;
6884

6985
b.to_async(runtime).iter(|| {
7086
let prev_value = insert_id_padding.clone();
71-
insert_id_padding += total;
87+
88+
if !do_duplicates {
89+
insert_id_padding += total;
90+
}
7291

7392
do_insert(
7493
&cachestore,
@@ -190,9 +209,11 @@ fn do_get_bench(
190209
fn do_benches(c: &mut Criterion) {
191210
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
192211

193-
do_insert_bench(c, &runtime, 512, 64);
194-
do_insert_bench(c, &runtime, 512, 256);
195-
do_insert_bench(c, &runtime, 512, 512);
212+
do_insert_bench(c, &runtime, 512, 64, false);
213+
do_insert_bench(c, &runtime, 512, 256, false);
214+
do_insert_bench(c, &runtime, 512, 512, false);
215+
216+
do_insert_bench(c, &runtime, 1_000, 64, true);
196217

197218
do_list_bench(c, &runtime, Some(QueueItemStatus::Pending), 1_000, 128, 128);
198219
do_list_bench(c, &runtime, Some(QueueItemStatus::Active), 1_000, 128, 128);

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,14 +1071,10 @@ impl CacheStore for RocksCacheStore {
10711071
&QueueItemRocksIndex::ByPrefixAndStatus,
10721072
)?;
10731073

1074-
let index_key = QueueItemIndexKey::ByPath(payload.path.clone());
1075-
let id_row_opt = queue_schema
1076-
.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?;
1077-
1078-
let (id, added) = if let Some(row) = id_row_opt {
1079-
(row.id, false)
1080-
} else {
1081-
let queue_item_row = queue_schema.insert(
1074+
let (id, added) = {
1075+
let (queue_item_row, added) = queue_schema.insert_if_not_exists(
1076+
&QueueItemIndexKey::ByPath(payload.path.clone()),
1077+
&QueueItemRocksIndex::ByPath,
10821078
QueueItem::new(
10831079
payload.path,
10841080
QueueItem::status_default(),
@@ -1087,19 +1083,20 @@ impl CacheStore for RocksCacheStore {
10871083
),
10881084
batch_pipe,
10891085
)?;
1086+
if added {
1087+
let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone());
1088+
queue_payload_schema.insert_with_pk(
1089+
queue_item_row.id,
1090+
QueueItemPayload::new(
1091+
payload.value,
1092+
queue_item_row.row.get_created().clone(),
1093+
queue_item_row.row.get_expire().clone(),
1094+
),
1095+
batch_pipe,
1096+
)?;
1097+
}
10901098

1091-
let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone());
1092-
queue_payload_schema.insert_with_pk(
1093-
queue_item_row.id,
1094-
QueueItemPayload::new(
1095-
payload.value,
1096-
queue_item_row.row.get_created().clone(),
1097-
queue_item_row.row.get_expire().clone(),
1098-
),
1099-
batch_pipe,
1100-
)?;
1101-
1102-
(queue_item_row.id, true)
1099+
(queue_item_row.id, added)
11031100
};
11041101

11051102
Ok(QueueAddResponse {

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,12 +487,19 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
487487
row_id: Option<u64>,
488488
row: Self::T,
489489
batch_pipe: &mut BatchPipe,
490+
secondary_index_id_to_ignore: Option<u32>,
490491
) -> Result<IdRow<Self::T>, CubeError> {
491492
let mut ser = flexbuffers::FlexbufferSerializer::new();
492493
row.serialize(&mut ser).unwrap();
493494
let serialized_row = ser.take_buffer();
494495

495496
for index in Self::indexes().iter() {
497+
if let Some(index_id_to_ignore) = secondary_index_id_to_ignore {
498+
if index.get_id() == index_id_to_ignore {
499+
continue;
500+
}
501+
}
502+
496503
if index.is_unique() {
497504
let hash = index.key_hash(&row);
498505
let index_val = index.index_key_by(&row);
@@ -540,15 +547,42 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
540547
row: Self::T,
541548
batch_pipe: &mut BatchPipe,
542549
) -> Result<IdRow<Self::T>, CubeError> {
543-
self.do_insert(Some(row_id), row, batch_pipe)
550+
self.do_insert(Some(row_id), row, batch_pipe, None)
544551
}
545552

546553
fn insert(
547554
&self,
548555
row: Self::T,
549556
batch_pipe: &mut BatchPipe,
550557
) -> Result<IdRow<Self::T>, CubeError> {
551-
self.do_insert(None, row, batch_pipe)
558+
self.do_insert(None, row, batch_pipe, None)
559+
}
560+
561+
/// Special optimized insert method that checks if the row already exists in the secondary index.
562+
/// If it exists, it returns the existing row without inserting a new one.
563+
///
564+
/// The idea is to skip check if the row already exists in the secondary index while inserting.
565+
fn insert_if_not_exists<K: Debug + Hash>(
566+
&self,
567+
row_key: &K,
568+
secondary_index: &impl RocksSecondaryIndex<Self::T, K>,
569+
row: Self::T,
570+
batch_pipe: &mut BatchPipe,
571+
) -> Result<(IdRow<Self::T>, bool), CubeError> {
572+
let id_row_opt = self.get_single_opt_row_by_index(row_key, secondary_index)?;
573+
574+
if let Some(row) = id_row_opt {
575+
Ok((row, false))
576+
} else {
577+
let row = self.do_insert(
578+
None,
579+
row,
580+
batch_pipe,
581+
Some(RocksSecondaryIndex::get_id(secondary_index)),
582+
)?;
583+
584+
Ok((row, true))
585+
}
552586
}
553587

554588
fn migrate(&self) -> Result<(), CubeError> {

0 commit comments

Comments
 (0)