From a38452fa58d3f651dbf00635d1abeb814ad3c8b3 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 30 Jun 2025 20:48:04 +0200 Subject: [PATCH] feat(cubestore): Queue - optimize ADD (RocksTanle->insert_if_not_exists) --- .../cubestore/benches/cachestore_queue.rs | 35 +++++++++++++---- .../src/cachestore/cache_rocksstore.rs | 37 +++++++++--------- .../cubestore/src/metastore/rocks_table.rs | 38 ++++++++++++++++++- 3 files changed, 81 insertions(+), 29 deletions(-) diff --git a/rust/cubestore/cubestore/benches/cachestore_queue.rs b/rust/cubestore/cubestore/benches/cachestore_queue.rs index 8dccaf6be74bb..922bbb44d4c0f 100644 --- a/rust/cubestore/cubestore/benches/cachestore_queue.rs +++ b/rust/cubestore/cubestore/benches/cachestore_queue.rs @@ -55,20 +55,39 @@ async fn do_insert( } } -fn do_insert_bench(c: &mut Criterion, runtime: &Runtime, total: usize, size_kb: usize) { +fn do_insert_bench( + c: &mut Criterion, + runtime: &Runtime, + total: usize, + size_kb: usize, + do_duplicates: bool, +) { let cachestore = runtime.block_on(async { - prepare_cachestore(&format!("cachestore_queue_add_{}", size_kb)).unwrap() + prepare_cachestore(&format!( + "cachestore_queue_add_{}_{}", + size_kb, do_duplicates + )) + .unwrap() }); c.bench_with_input( - BenchmarkId::new(format!("queue_add queues:1, size:{} kb", size_kb), total), + BenchmarkId::new( + format!( + "queue_add duplicates_check: {} queues:1, size:{} kb", + do_duplicates, size_kb + ), + total, + ), &(total, size_kb), |b, (total, size_kb)| { let mut insert_id_padding = 0; b.to_async(runtime).iter(|| { let prev_value = insert_id_padding.clone(); - insert_id_padding += total; + + if !do_duplicates { + insert_id_padding += total; + } do_insert( &cachestore, @@ -190,9 +209,11 @@ fn do_get_bench( fn do_benches(c: &mut Criterion) { let runtime = Builder::new_multi_thread().enable_all().build().unwrap(); - do_insert_bench(c, &runtime, 512, 64); - do_insert_bench(c, &runtime, 512, 256); - do_insert_bench(c, &runtime, 512, 512); + do_insert_bench(c, &runtime, 512, 64, false); + do_insert_bench(c, &runtime, 512, 256, false); + do_insert_bench(c, &runtime, 512, 512, false); + + do_insert_bench(c, &runtime, 1_000, 64, true); do_list_bench(c, &runtime, Some(QueueItemStatus::Pending), 1_000, 128, 128); do_list_bench(c, &runtime, Some(QueueItemStatus::Active), 1_000, 128, 128); diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index c7ffc32a1a98c..9886e780d1b56 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -1071,14 +1071,10 @@ impl CacheStore for RocksCacheStore { &QueueItemRocksIndex::ByPrefixAndStatus, )?; - let index_key = QueueItemIndexKey::ByPath(payload.path.clone()); - let id_row_opt = queue_schema - .get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?; - - let (id, added) = if let Some(row) = id_row_opt { - (row.id, false) - } else { - let queue_item_row = queue_schema.insert( + let (id, added) = { + let (queue_item_row, added) = queue_schema.insert_if_not_exists( + &QueueItemIndexKey::ByPath(payload.path.clone()), + &QueueItemRocksIndex::ByPath, QueueItem::new( payload.path, QueueItem::status_default(), @@ -1087,19 +1083,20 @@ impl CacheStore for RocksCacheStore { ), batch_pipe, )?; + if added { + let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); + queue_payload_schema.insert_with_pk( + queue_item_row.id, + QueueItemPayload::new( + payload.value, + queue_item_row.row.get_created().clone(), + queue_item_row.row.get_expire().clone(), + ), + batch_pipe, + )?; + } - let queue_payload_schema = QueueItemPayloadRocksTable::new(db_ref.clone()); - queue_payload_schema.insert_with_pk( - queue_item_row.id, - QueueItemPayload::new( - payload.value, - queue_item_row.row.get_created().clone(), - queue_item_row.row.get_expire().clone(), - ), - batch_pipe, - )?; - - (queue_item_row.id, true) + (queue_item_row.id, added) }; Ok(QueueAddResponse { diff --git a/rust/cubestore/cubestore/src/metastore/rocks_table.rs b/rust/cubestore/cubestore/src/metastore/rocks_table.rs index 55b90680df53f..b0b08bca21e5d 100644 --- a/rust/cubestore/cubestore/src/metastore/rocks_table.rs +++ b/rust/cubestore/cubestore/src/metastore/rocks_table.rs @@ -487,12 +487,19 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { row_id: Option, row: Self::T, batch_pipe: &mut BatchPipe, + secondary_index_id_to_ignore: Option, ) -> Result, CubeError> { let mut ser = flexbuffers::FlexbufferSerializer::new(); row.serialize(&mut ser).unwrap(); let serialized_row = ser.take_buffer(); for index in Self::indexes().iter() { + if let Some(index_id_to_ignore) = secondary_index_id_to_ignore { + if index.get_id() == index_id_to_ignore { + continue; + } + } + if index.is_unique() { let hash = index.key_hash(&row); let index_val = index.index_key_by(&row); @@ -540,7 +547,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { row: Self::T, batch_pipe: &mut BatchPipe, ) -> Result, CubeError> { - self.do_insert(Some(row_id), row, batch_pipe) + self.do_insert(Some(row_id), row, batch_pipe, None) } fn insert( @@ -548,7 +555,34 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync { row: Self::T, batch_pipe: &mut BatchPipe, ) -> Result, CubeError> { - self.do_insert(None, row, batch_pipe) + self.do_insert(None, row, batch_pipe, None) + } + + /// Special optimized insert method that checks if the row already exists in the secondary index. + /// If it exists, it returns the existing row without inserting a new one. + /// + /// The idea is to skip check if the row already exists in the secondary index while inserting. + fn insert_if_not_exists( + &self, + row_key: &K, + secondary_index: &impl RocksSecondaryIndex, + row: Self::T, + batch_pipe: &mut BatchPipe, + ) -> Result<(IdRow, bool), CubeError> { + let id_row_opt = self.get_single_opt_row_by_index(row_key, secondary_index)?; + + if let Some(row) = id_row_opt { + Ok((row, false)) + } else { + let row = self.do_insert( + None, + row, + batch_pipe, + Some(RocksSecondaryIndex::get_id(secondary_index)), + )?; + + Ok((row, true)) + } } fn migrate(&self) -> Result<(), CubeError> {